本文整理汇总了Python中pulp.common.tags.action_tag函数的典型用法代码示例。如果您正苦于以下问题:Python action_tag函数的具体用法?Python action_tag怎么用?Python action_tag使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了action_tag函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _get_repo_tasks
def _get_repo_tasks(context, repo_id, action):
"""
Retrieve a list of incomplete Task objects for the given repo_id and action. action must be one
of 'sync', 'download', or 'publish'.
:param context: The CLI context from Okaara
:type context: pulp.client.extensions.core.ClientContext
:param repo_id: The primary key of the repository you wish to limit the Task query to
:type repo_id: basestring
:param action: One of "sync" or "publish"
:type action: basestring
:return: A list of Task objects
:rtype: list
"""
repo_tag = tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id)
if action == 'publish':
action_tag = tags.action_tag(tags.ACTION_PUBLISH_TYPE)
elif action == 'sync':
action_tag = tags.action_tag(tags.ACTION_SYNC_TYPE)
elif action == 'download':
action_tag = tags.action_tag(tags.ACTION_DOWNLOAD_TYPE)
else:
raise ValueError(
'_get_repo_tasks() does not support %(action)s as an action.' % {'action': action})
repo_search_criteria = {'filters': {'state': {'$nin': responses.COMPLETED_STATES},
'tags': {'$all': [repo_tag, action_tag]}}}
return context.server.tasks_search.search(**repo_search_criteria)
开发者ID:maxamillion,项目名称:pulp,代码行数:27,代码来源:sync_publish.py
示例2: create_unit_install_schedule
def create_unit_install_schedule(self, consumer_id, units, install_options, schedule_data ):
"""
Create a schedule for installing content units on a consumer.
@param consumer_id: unique id for the consumer
@param units: list of unit type and unit key dicts
@param install_options: options to pass to the install manager
@param schedule_data: scheduling data
@return: schedule id
"""
self._validate_consumer(consumer_id)
self._validate_keys(install_options, _UNIT_INSTALL_OPTION_KEYS)
if 'schedule' not in schedule_data:
raise pulp_exceptions.MissingValue(['schedule'])
manager = managers_factory.consumer_agent_manager()
args = [consumer_id]
kwargs = {'units': units,
'options': install_options.get('options', {})}
weight = pulp_config.config.getint('tasks', 'consumer_content_weight')
tags = [resource_tag(dispatch_constants.RESOURCE_CONSUMER_TYPE, consumer_id),
action_tag('unit_install'), action_tag('scheduled_unit_install')]
call_request = CallRequest(manager.install_content, args, kwargs, weight=weight, tags=tags, archive=True)
call_request.reads_resource(dispatch_constants.RESOURCE_CONSUMER_TYPE, consumer_id)
scheduler = dispatch_factory.scheduler()
schedule_id = scheduler.add(call_request, **schedule_data)
return schedule_id
开发者ID:ehelms,项目名称:pulp,代码行数:27,代码来源:cud.py
示例3: POST
def POST(self, repo_id):
# TODO: Add timeout support
# Params
params = self.params()
overrides = params.get("override_config", None)
# Execute the sync asynchronously
repo_sync_manager = manager_factory.repo_sync_manager()
sync_weight = pulp_config.config.getint("tasks", "sync_weight")
sync_tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id), action_tag("sync")]
sync_call_request = CallRequest(
repo_sync_manager.sync,
[repo_id],
{"sync_config_override": overrides},
weight=sync_weight,
tags=sync_tags,
archive=True,
)
sync_call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)
sync_call_request.add_life_cycle_callback(
dispatch_constants.CALL_ENQUEUE_LIFE_CYCLE_CALLBACK, repo_sync_manager.prep_sync
)
call_requests = [sync_call_request]
repo_publish_manager = manager_factory.repo_publish_manager()
auto_publish_tags = [
resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
action_tag("auto_publish"),
action_tag("publish"),
]
auto_distributors = repo_publish_manager.auto_distributors(repo_id)
for distributor in auto_distributors:
distributor_id = distributor["id"]
publish_call_request = CallRequest(
repo_publish_manager.publish, [repo_id, distributor_id], tags=auto_publish_tags, archive=True
)
publish_call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)
publish_call_request.add_life_cycle_callback(
dispatch_constants.CALL_ENQUEUE_LIFE_CYCLE_CALLBACK, repo_publish_manager.prep_publish
)
publish_call_request.depends_on(sync_call_request)
call_requests.append(publish_call_request)
# this raises an exception that is handled by the middleware,
# so no return is needed
execution.execute_multiple(call_requests)
开发者ID:ehelms,项目名称:pulp,代码行数:53,代码来源:repositories.py
示例4: task_header
def task_header(self, task):
handlers = {
tags.action_tag(tags.ACTION_BIND) : self._render_bind_header,
tags.action_tag(tags.ACTION_AGENT_BIND) : self._render_agent_bind_header,
}
# There will be exactly 1 action tag for each task (multiple resource tags)
action_tags = [t for t in task.tags if tags.is_action_tag(t)]
action_tag = action_tags[0]
handler = handlers[action_tag]
handler()
开发者ID:ashcrow,项目名称:pulp,代码行数:13,代码来源:bind.py
示例5: sync_with_auto_publish_itinerary
def sync_with_auto_publish_itinerary(repo_id, overrides=None):
"""
Create a call request list for the synchronization of a repository and the
publishing of any distributors that are configured for auto publish.
@param repo_id: id of the repository to create a sync call request list for
@type repo_id: str
@param overrides: dictionary of configuration overrides for this sync
@type overrides: dict or None
@return: list of call request instances
@rtype: list
"""
repo_sync_manager = manager_factory.repo_sync_manager()
sync_weight = pulp_config.config.getint('tasks', 'sync_weight')
sync_tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
action_tag('sync')]
sync_call_request = CallRequest(repo_sync_manager.sync,
[repo_id],
{'sync_config_override': overrides},
weight=sync_weight,
tags=sync_tags,
archive=True)
sync_call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)
sync_call_request.add_life_cycle_callback(dispatch_constants.CALL_ENQUEUE_LIFE_CYCLE_CALLBACK,
repo_sync_manager.prep_sync)
call_requests = [sync_call_request]
repo_publish_manager = manager_factory.repo_publish_manager()
auto_publish_tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
action_tag('auto_publish'), action_tag('publish')]
auto_distributors = repo_publish_manager.auto_distributors(repo_id)
for distributor in auto_distributors:
distributor_id = distributor['id']
publish_call_request = CallRequest(repo_publish_manager.publish,
[repo_id, distributor_id],
tags=auto_publish_tags,
archive=True)
publish_call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)
publish_call_request.add_life_cycle_callback(dispatch_constants.CALL_ENQUEUE_LIFE_CYCLE_CALLBACK,
repo_publish_manager.prep_publish)
publish_call_request.depends_on(sync_call_request.id, [dispatch_constants.CALL_FINISHED_STATE])
call_requests.append(publish_call_request)
return call_requests
开发者ID:pkilambi,项目名称:pulp,代码行数:49,代码来源:repo.py
示例6: PUT
def PUT(self, consumer_id, content_type):
"""
Update the association of a profile with a consumer by content type ID.
@param consumer_id: A consumer ID.
@type consumer_id: str
@param content_type: A content unit type ID.
@type content_type: str
@return: The updated model object:
{consumer_id:<str>, content_type:<str>, profile:<dict>}
@rtype: dict
"""
body = self.params()
profile = body.get('profile')
manager = managers.consumer_profile_manager()
tags = [resource_tag(dispatch_constants.RESOURCE_CONSUMER_TYPE, consumer_id),
resource_tag(dispatch_constants.RESOURCE_CONTENT_UNIT_TYPE, content_type),
action_tag('profile_update')]
call_request = CallRequest(manager.update,
[consumer_id, content_type],
{'profile': profile},
tags=tags,
weight=0,
kwarg_blacklist=['profile'])
call_request.reads_resource(dispatch_constants.RESOURCE_CONSUMER_TYPE, consumer_id)
call_report = CallReport.from_call_request(call_request)
call_report.serialize_result = False
consumer = execution.execute_sync(call_request, call_report)
link = serialization.link.child_link_obj(consumer_id, content_type)
consumer.update(link)
return self.ok(consumer)
开发者ID:graco,项目名称:pulp,代码行数:35,代码来源:consumers.py
示例7: post
def post(self, request, consumer_id):
"""
Creates an async task to regenerate content applicability data for given consumer.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:param consumer_id: The consumer ID.
:type consumer_id: str
:raises MissingResource: if some parameters are missing
:raises OperationPostponed: when an async operation is performed.
"""
consumer_query_manager = factory.consumer_query_manager()
if consumer_query_manager.find_by_id(consumer_id) is None:
raise MissingResource(consumer_id=consumer_id)
consumer_criteria = Criteria(filters={'consumer_id': consumer_id})
task_tags = [tags.action_tag('consumer_content_applicability_regeneration')]
async_result = regenerate_applicability_for_consumers.apply_async_with_reservation(
tags.RESOURCE_CONSUMER_TYPE,
consumer_id,
(consumer_criteria.as_dict(),),
tags=task_tags)
raise OperationPostponed(async_result)
开发者ID:alanoe,项目名称:pulp,代码行数:25,代码来源:consumers.py
示例8: POST
def POST(self, repo_id):
# Params (validation will occur in the manager)
params = self.params()
importer_type = params.get('importer_type_id', None)
importer_config = params.get('importer_config', None)
if importer_type is None:
_LOG.exception('Missing importer type adding importer to repository [%s]' % repo_id)
raise exceptions.MissingValue(['importer_type'])
# Note: If an importer exists, it's removed, so no need to handle 409s.
# Note: If the plugin raises an exception during initialization, let it
# bubble up and be handled like any other 500.
importer_manager = manager_factory.repo_importer_manager()
resources = {dispatch_constants.RESOURCE_REPOSITORY_TYPE: {repo_id: dispatch_constants.RESOURCE_UPDATE_OPERATION}}
weight = pulp_config.config.getint('tasks', 'create_weight')
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
action_tag('add_importer')]
call_request = CallRequest(importer_manager.set_importer,
[repo_id, importer_type, importer_config],
resources=resources,
weight=weight,
tags=tags)
return execution.execute_sync_created(self, call_request, 'importer')
开发者ID:ryanschneider,项目名称:pulp,代码行数:27,代码来源:repositories.py
示例9: test_run_already_in_progress
def test_run_already_in_progress(self, mock_publish, mock_search, mock_poll):
"""
Test the run() method when thre is already an incomplete publish operation.
"""
repo_id = 'test-repo'
data = {options.OPTION_REPO_ID.keyword: repo_id, polling.FLAG_BACKGROUND.keyword: False,
sp.FLAG_FORCE_FULL_PUBLISH.keyword: False}
# Simulate a task already running
task_data = copy.copy(CALL_REPORT_TEMPLATE)
task_data['state'] = 'running'
task = responses.Task(task_data)
mock_search.return_value = [task]
self.command.run(**data)
# Publish shouldn't get called again since it's already running
self.assertEqual(mock_publish.call_count, 0)
expected_search_query = {
'state': {'$nin': responses.COMPLETED_STATES},
'tags': {'$all': [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
tags.action_tag(tags.ACTION_PUBLISH_TYPE)]}}
mock_search.assert_called_once_with(filters=expected_search_query)
mock_poll.assert_called_once_with([task], data)
write_tags = self.prompt.get_write_tags()
self.assertEqual(2, len(write_tags))
self.assertEqual(write_tags[1], 'in-progress')
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:26,代码来源:test_sync_publish.py
示例10: test_run
def test_run(self, mock_sync, mock_search, poll):
"""
Test the run() method when there is not an existing sync Task on the server.
"""
repo_id = 'test-repo'
data = {options.OPTION_REPO_ID.keyword: repo_id, polling.FLAG_BACKGROUND.keyword: False,
sp.FLAG_FORCE_FULL_SYNC.keyword: False}
# No tasks are running
mock_search.return_value = []
# responses.Response from the sync call
task_data = copy.copy(CALL_REPORT_TEMPLATE)
task = responses.Task(task_data)
mock_sync.return_value = responses.Response(202, task)
self.command.run(**data)
mock_sync.assert_called_once_with(repo_id, None)
sync_tasks = poll.mock_calls[0][1][0]
poll.assert_called_once_with(sync_tasks, data)
expected_search_query = {
'state': {'$nin': responses.COMPLETED_STATES},
'tags': {'$all': [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
tags.action_tag(tags.ACTION_SYNC_TYPE)]}}
mock_search.assert_called_once_with(filters=expected_search_query)
self.assertEqual(self.prompt.get_write_tags(), [TAG_TITLE])
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:25,代码来源:test_sync_publish.py
示例11: PUT
def PUT(self, repo_id, distributor_id, schedule_id):
distributor_manager = manager_factory.repo_distributor_manager()
schedule_list = distributor_manager.list_publish_schedules(repo_id, distributor_id)
if schedule_id not in schedule_list:
raise exceptions.MissingResource(repo=repo_id, distributor=distributor_id, publish_schedule=schedule_id)
publish_update = {}
schedule_update = self.params()
if 'override_config' in schedule_update:
publish_update['override_config'] = schedule_update.pop('override_config')
schedule_manager = manager_factory.schedule_manager()
resources = {dispatch_constants.RESOURCE_REPOSITORY_TYPE: {repo_id: dispatch_constants.RESOURCE_READ_OPERATION},
dispatch_constants.RESOURCE_REPOSITORY_DISTRIBUTOR_TYPE: {distributor_id: dispatch_constants.RESOURCE_READ_OPERATION},
dispatch_constants.RESOURCE_SCHEDULE_TYPE: {schedule_id: dispatch_constants.RESOURCE_UPDATE_OPERATION}}
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
resource_tag(dispatch_constants.RESOURCE_REPOSITORY_DISTRIBUTOR_TYPE, distributor_id),
resource_tag(dispatch_constants.RESOURCE_SCHEDULE_TYPE, schedule_id),
action_tag('update_publish_schedule')]
call_request = CallRequest(schedule_manager.update_publish_schedule,
[repo_id, distributor_id, schedule_id, publish_update, schedule_update],
resources=resources,
tags=tags,
archive=True)
execution.execute(call_request)
scheduler = dispatch_factory.scheduler()
schedule = scheduler.get(schedule_id)
obj = serialization.dispatch.scheduled_publish_obj(schedule)
obj.update(serialization.link.current_link_obj())
return self.ok(obj)
开发者ID:ryanschneider,项目名称:pulp,代码行数:31,代码来源:repositories.py
示例12: POST
def POST(self, consumer_id):
consumer_manager = managers.consumer_manager()
consumer_manager.get_consumer(consumer_id)
schedule_data = self.params()
units = schedule_data.pop('units', None)
uninstall_options = {'options': schedule_data.pop('options', {})}
if not units:
raise MissingValue(['units'])
schedule_manager = managers.schedule_manager()
weight = pulp_config.config.getint('tasks', 'create_weight')
tags = [resource_tag(dispatch_constants.RESOURCE_CONSUMER_TYPE, consumer_id),
action_tag('create_unit_uninstall_schedule')]
call_request = CallRequest(schedule_manager.create_unit_uninstall_schedule,
[consumer_id, units, uninstall_options, schedule_data],
weight=weight,
tags=tags,
archive=True)
call_request.reads_resource(dispatch_constants.RESOURCE_CONSUMER_TYPE, consumer_id)
schedule_id = execution.execute_sync(call_request)
scheduler = dispatch_factory.scheduler()
scheduled_call = scheduler.get(schedule_id)
scheduled_obj = serialization.dispatch.scheduled_unit_management_obj(scheduled_call)
scheduled_obj.update(serialization.link.child_link_obj(schedule_id))
return self.created(scheduled_obj['_href'], scheduled_obj)
开发者ID:graco,项目名称:pulp,代码行数:32,代码来源:consumers.py
示例13: POST
def POST(self, repo_group_id):
# Params (validation will occur in the manager)
params = self.params()
distributor_type_id = params.get('distributor_type_id', None)
distributor_config = params.get('distributor_config', None)
distributor_id = params.get('distributor_id', None)
distributor_manager = managers_factory.repo_group_distributor_manager()
resources = {dispatch_constants.RESOURCE_REPOSITORY_GROUP_TYPE : {
repo_group_id : dispatch_constants.RESOURCE_UPDATE_OPERATION
}}
weight = pulp_config.config.getint('tasks', 'create_weight')
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_GROUP_TYPE, repo_group_id),
action_tag('add_distributor')]
if distributor_id is not None:
tags.append(resource_tag(dispatch_constants.RESOURCE_REPOSITORY_GROUP_DISTRIBUTOR_TYPE, distributor_id))
call_request = CallRequest(distributor_manager.add_distributor,
[repo_group_id, distributor_type_id, distributor_config, distributor_id],
resources=resources,
weight=weight,
tags=tags)
created = execution.execute(call_request)
href = serialization.link.child_link_obj(created['id'])
created.update(href)
return self.created(href['_href'], created)
开发者ID:stpierre,项目名称:pulp,代码行数:29,代码来源:repo_groups.py
示例14: test_task_header_action_tag_only
def test_task_header_action_tag_only(self):
task = Task({})
task.tags = [tags.action_tag(tags.ACTION_UPDATE_DISTRIBUTOR)]
self.command.task_header(task)
self.assertEqual(self.prompt.get_write_tags(), [tags.ACTION_UPDATE_DISTRIBUTOR])
开发者ID:aweiteka,项目名称:pulp,代码行数:7,代码来源:test_cudl.py
示例15: DELETE
def DELETE(self, repo_group_id, distributor_id):
params = self.params()
force = params.get('force', False)
distributor_manager = managers_factory.repo_group_distributor_manager()
resources = {
dispatch_constants.RESOURCE_REPOSITORY_GROUP_TYPE :
{repo_group_id : dispatch_constants.RESOURCE_UPDATE_OPERATION},
dispatch_constants.RESOURCE_REPOSITORY_GROUP_DISTRIBUTOR_TYPE :
{distributor_id : dispatch_constants.RESOURCE_DELETE_OPERATION},
}
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_GROUP_TYPE, repo_group_id),
resource_tag(dispatch_constants.RESOURCE_REPOSITORY_GROUP_DISTRIBUTOR_TYPE, distributor_id),
action_tag('remove_distributor')
]
call_request = CallRequest(distributor_manager.remove_distributor,
args=[repo_group_id, distributor_id],
kwargs={'force' : force},
resources=resources,
tags=tags,
archive=True)
execution.execute(call_request)
return self.ok(None)
开发者ID:stpierre,项目名称:pulp,代码行数:25,代码来源:repo_groups.py
示例16: test_search
def test_search(self, mock_search):
"""
Test the search method. All it really does is call the superclass search() method, and turn
the results into Tasks.
"""
connection = mock.MagicMock()
repo_id = 'some_repo'
repo_tag = tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id)
sync_tag = tags.action_tag(tags.ACTION_SYNC_TYPE)
search_criteria = {'filters': {'state': {'$nin': responses.COMPLETED_STATES},
'tags': {'$all': [repo_tag, sync_tag]}}}
response_body = [{u'task_id': u'3fff3e01-ba48-414c-a4bb-daaed7a0d2d8',
u'tags': [u'pulp:repository:%s' % repo_id, u'pulp:action:sync'],
u'start_time': 1393098484,
u'queue': u'[email protected]',
u'state': u'running', u'id': {u'$oid': u'5308fef46b565fd6740199ae'}}]
mock_search.return_value = response_body
results = tasks.TaskSearchAPI(connection).search(**search_criteria)
mock_search.assert_called_once_with(**search_criteria)
self.assertEqual(type(results), list)
self.assertEqual(len(results), 1)
task = results[0]
self.assertEqual(type(task), responses.Task)
self.assertEqual(task.task_id, response_body[0]['task_id'])
self.assertEqual(task.tags, response_body[0]['tags'])
self.assertEqual(task.start_time, response_body[0]['start_time'])
self.assertEqual(task.state, response_body[0]['state'])
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:29,代码来源:test_tasks.py
示例17: PUT
def PUT(self, repo_group_id, distributor_id):
params = self.params()
distributor_config = params.get('distributor_config', None)
if distributor_config is None:
raise pulp_exceptions.MissingValue(['distributor_config'])
distributor_manager = managers_factory.repo_group_distributor_manager()
resources = {
dispatch_constants.RESOURCE_REPOSITORY_GROUP_TYPE :
{repo_group_id : dispatch_constants.RESOURCE_UPDATE_OPERATION},
dispatch_constants.RESOURCE_REPOSITORY_GROUP_DISTRIBUTOR_TYPE :
{distributor_id : dispatch_constants.RESOURCE_UPDATE_OPERATION},
}
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_GROUP_TYPE, repo_group_id),
resource_tag(dispatch_constants.RESOURCE_REPOSITORY_GROUP_DISTRIBUTOR_TYPE, distributor_id),
action_tag('update_distributor')
]
call_request = CallRequest(distributor_manager.update_distributor_config,
args=[repo_group_id, distributor_id, distributor_config],
resources=resources,
tags=tags,
archive=True)
result = execution.execute(call_request)
href = serialization.link.current_link_obj()
result.update(href)
return self.ok(result)
开发者ID:stpierre,项目名称:pulp,代码行数:33,代码来源:repo_groups.py
示例18: POST
def POST(self):
orphans = self.params()
orphan_manager = factory.content_orphan_manager()
tags = [action_tag('delete_orphans'),
resource_tag(dispatch_constants.RESOURCE_CONTENT_UNIT_TYPE, 'orphans')]
call_request = CallRequest(orphan_manager.delete_orphans_by_id, [orphans], tags=tags, archive=True)
return execution.execute_async(self, call_request)
开发者ID:jlsherrill,项目名称:pulp,代码行数:7,代码来源:contents.py
示例19: queue_download_deferred
def queue_download_deferred():
"""
Queue a task to download all content units with entries in the DeferredDownload
collection.
"""
tags = [pulp_tags.action_tag(pulp_tags.ACTION_DEFERRED_DOWNLOADS_TYPE)]
return download_deferred.apply_async(tags=tags)
开发者ID:maxamillion,项目名称:pulp,代码行数:7,代码来源:content.py
示例20: test_bind
def test_bind(self, *mocks):
mock_agent = mocks[0]
mock_context = mocks[1]
mock_factory = mocks[2]
mock_bindings = mocks[3]
mock_task_status = mocks[4]
mock_uuid = mocks[5]
consumer = {'id': '1234'}
mock_consumer_manager = Mock()
mock_consumer_manager.get_consumer = Mock(return_value=consumer)
mock_factory.consumer_manager = Mock(return_value=mock_consumer_manager)
binding = {}
mock_bind_manager = Mock()
mock_bind_manager.get_bind = Mock(return_value=binding)
mock_bind_manager.action_pending = Mock()
mock_factory.consumer_bind_manager = Mock(return_value=mock_bind_manager)
agent_bindings = []
mock_bindings.return_value = agent_bindings
task_id = '2345'
mock_context.return_value = {}
mock_uuid.return_value = task_id
# test manager
repo_id = '100'
distributor_id = '200'
options = {}
agent_manager = AgentManager()
agent_manager.bind(consumer['id'], repo_id, distributor_id, options)
# validations
task_tags = [
tags.resource_tag(tags.RESOURCE_CONSUMER_TYPE, consumer['id']),
tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
tags.resource_tag(tags.RESOURCE_REPOSITORY_DISTRIBUTOR_TYPE, distributor_id),
tags.action_tag(tags.ACTION_AGENT_BIND)
]
mock_consumer_manager.get_consumer.assert_called_with(consumer['id'])
mock_bind_manager.get_bind.assert_called_with(consumer['id'], repo_id, distributor_id)
mock_bindings.assert_called_with([binding])
mock_context.assert_called_with(
consumer,
task_id=task_id,
action='bind',
consumer_id=consumer['id'],
repo_id=repo_id,
distributor_id=distributor_id)
mock_task_status.assert_called_with(task_id=task_id, worker_name='agent', tags=task_tags)
mock_agent.bind.assert_called_with(mock_context.return_value, agent_bindings, options)
mock_bind_manager.action_pending.assert_called_with(
consumer['id'], repo_id, distributor_id, Bind.Action.BIND, task_id)
开发者ID:alanoe,项目名称:pulp,代码行数:60,代码来源:test_agent.py
注:本文中的pulp.common.tags.action_tag函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论