本文整理汇总了Python中pulp.server.managers.factory.repo_importer_manager函数的典型用法代码示例。如果您正苦于以下问题:Python repo_importer_manager函数的具体用法?Python repo_importer_manager怎么用?Python repo_importer_manager使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了repo_importer_manager函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
super(DependencyManagerTests, self).setUp()
mock_plugins.install()
database.update_database([TYPE_1_DEF])
self.repo_id = 'dep-repo'
self.manager = manager_factory.dependency_manager()
manager_factory.repo_manager().create_repo(self.repo_id)
manager_factory.repo_importer_manager().set_importer(self.repo_id, 'mock-importer', {})
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:12,代码来源:test_dependency_manager.py
示例2: POST
def POST(self, repo_id):
"""
Associate an importer with a repository.
This will validate that the repository exists and that there is an importer with the
importer_type_id given. However, the importer configuration validation only checks the
provided values against a standard set of importer configuration keys. The importer
specific validation is called on association, so any type specific configuration will
be validated later. This means the spawned task could fail with a validation error.
:param repo_id: the repository to associate the importer with
:type repo_id: str
"""
params = self.params()
importer_type = params.get('importer_type_id', None)
config = params.get('importer_config', None)
# This call will raise the appropriate exception
importer_manager = manager_factory.repo_importer_manager()
importer_manager.validate_importer_config(repo_id, importer_type, config)
# Note: If an importer exists, it's removed, so no need to handle 409s.
# Note: If the plugin raises an exception during initialization, let it
# bubble up and be handled like any other 500.
task_tags = [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
tags.action_tag('add_importer')]
async_result = set_importer.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, repo_id, [repo_id, importer_type],
{'repo_plugin_config': config}, tags=task_tags)
raise exceptions.OperationPostponed(async_result)
开发者ID:beav,项目名称:pulp,代码行数:31,代码来源:repositories.py
示例3: test_delete_with_plugin_error
def test_delete_with_plugin_error(self):
"""
Tests deleting a repo where one (or more) of the plugins raises an error.
"""
# Setup
self.manager.create_repo('doomed')
importer_manager = manager_factory.repo_importer_manager()
distributor_manager = manager_factory.repo_distributor_manager()
importer_manager.set_importer('doomed', 'mock-importer', {})
distributor_manager.add_distributor('doomed', 'mock-distributor', {}, True,
distributor_id='dist-1')
# Setup both mocks to raise errors on removal
mock_plugins.MOCK_IMPORTER.importer_removed.side_effect = Exception('Splat')
mock_plugins.MOCK_DISTRIBUTOR.distributor_removed.side_effect = Exception('Pow')
# Test
try:
self.manager.delete_repo('doomed')
self.fail('No exception raised during repo delete')
except exceptions.PulpExecutionException:
pass
# Cleanup - need to manually clear the side effects
mock_plugins.MOCK_IMPORTER.importer_removed.side_effect = None
mock_plugins.MOCK_DISTRIBUTOR.distributor_removed.side_effect = None
开发者ID:beav,项目名称:pulp,代码行数:29,代码来源:test_cud.py
示例4: __init__
def __init__(self, source_repo_id, dest_repo_id, source_importer_id, dest_importer_id,
association_owner_type, association_owner_id):
"""
:param source_repo_id: ID of the repository from which units are being copied
:type source_repo_id: str
:param dest_repo_id: ID of the repository into which units are being copied
:type dest_repo_id: str
:param source_importer_id: ID of the importer on the source repository
:type source_importer_id: str
:param dest_importer_id: ID of the importer on the destination repository
:type dest_importer_id: str
:param association_owner_type: distinguishes the owner when creating an
association through this conduit
:type association_owner_type: str
:param association_owner_id: specific ID of the owner when creating an
association through this conduit
:type association_owner_id: str
"""
ImporterScratchPadMixin.__init__(self, dest_repo_id, dest_importer_id)
RepoScratchPadMixin.__init__(self, dest_repo_id, ImporterConduitException)
SearchUnitsMixin.__init__(self, ImporterConduitException)
AddUnitMixin.__init__(self, dest_repo_id, dest_importer_id, association_owner_type, association_owner_id)
self.source_repo_id = source_repo_id
self.dest_repo_id = dest_repo_id
self.source_importer_id = source_importer_id
self.dest_importer_id = dest_importer_id
self.association_owner_type = association_owner_type
self.association_owner_id = association_owner_id
self.__association_manager = manager_factory.repo_unit_association_manager()
self.__association_query_manager = manager_factory.repo_unit_association_query_manager()
self.__importer_manager = manager_factory.repo_importer_manager()
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:35,代码来源:unit_import.py
示例5: remove_from_importer
def remove_from_importer(repo_id, transfer_units):
# Retrieve the repo from the database and convert to the transfer repo
repo_query_manager = manager_factory.repo_query_manager()
repo = repo_query_manager.get_repository(repo_id)
importer_manager = manager_factory.repo_importer_manager()
repo_importer = importer_manager.get_importer(repo_id)
transfer_repo = common_utils.to_transfer_repo(repo)
transfer_repo.working_dir = common_utils.importer_working_dir(repo_importer['importer_type_id'],
repo_id, mkdir=True)
# Retrieve the plugin instance to invoke
importer_instance, plugin_config = plugin_api.get_importer_by_id(
repo_importer['importer_type_id'])
call_config = PluginCallConfiguration(plugin_config, repo_importer['config'])
# Invoke the importer's remove method
try:
importer_instance.remove_units(transfer_repo, transfer_units, call_config)
except Exception:
msg = _('Exception from importer [%(i)s] while removing units from repo [%(r)s]')
msg = msg % {'i': repo_importer['id'], 'r': repo_id}
logger.exception(msg)
开发者ID:preethit,项目名称:pulp-1,代码行数:25,代码来源:unit_association.py
示例6: create_sync_schedule
def create_sync_schedule(self, repo_id, importer_id, sync_options, schedule_data):
"""
Create a new sync schedule for a given repository using the given importer.
@param repo_id:
@param importer_id:
@param sync_options:
@param schedule_data:
@return:
"""
# validate the input
self._validate_importer(repo_id, importer_id)
self._validate_keys(sync_options, _SYNC_OPTION_KEYS)
if 'schedule' not in schedule_data:
raise pulp_exceptions.MissingValue(['schedule'])
# build the sync call request
sync_manager = managers_factory.repo_sync_manager()
args = [repo_id]
kwargs = {'sync_config_override': sync_options['override_config']}
weight = pulp_config.config.getint('tasks', 'sync_weight')
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
resource_tag(dispatch_constants.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id)]
call_request = CallRequest(sync_manager.sync, args, kwargs, weight=weight, tags=tags, archive=True)
call_request.reads_resource(dispatch_constants.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id)
call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)
call_request.add_life_cycle_callback(dispatch_constants.CALL_ENQUEUE_LIFE_CYCLE_CALLBACK, sync_manager.prep_sync)
# schedule the sync
scheduler = dispatch_factory.scheduler()
schedule_id = scheduler.add(call_request, **schedule_data)
importer_manager = managers_factory.repo_importer_manager()
importer_manager.add_sync_schedule(repo_id, schedule_id)
return schedule_id
开发者ID:ehelms,项目名称:pulp,代码行数:34,代码来源:cud.py
示例7: test_post
def test_post(self, _reserve_resource, mock_apply_async):
# Setup
task_id = str(uuid.uuid4())
mock_apply_async.return_value = AsyncResult(task_id)
_reserve_resource.return_value = ReservedResourceApplyAsync()
upload_id = self.upload_manager.initialize_upload()
self.upload_manager.save_data(upload_id, 0, 'string data')
repo_manager = manager_factory.repo_manager()
repo_manager.create_repo('repo-upload')
importer_manager = manager_factory.repo_importer_manager()
importer_manager.set_importer('repo-upload', 'dummy-importer', {})
# Test
body = {
'upload_id' : upload_id,
'unit_type_id' : 'dummy-type',
'unit_key' : {'name' : 'foo'},
'unit_metadata' : {'stuff' : 'bar'},
}
status, body = self.post('/v2/repositories/repo-upload/actions/import_upload/', body)
# Verify
self.assertEqual(202, status)
assert_body_matches_async_task(body, mock_apply_async.return_value)
exepcted_call_args = ['repo-upload', 'dummy-type',
{'name': 'foo'}, {'stuff': 'bar'},
upload_id]
self.assertEqual(exepcted_call_args, mock_apply_async.call_args[0][0])
开发者ID:aweiteka,项目名称:pulp,代码行数:29,代码来源:test_contents_controller.py
示例8: __init__
def __init__(self, source_repo_id, dest_repo_id, source_importer_id, dest_importer_id):
"""
:param source_repo_id: ID of the repository from which units are being copied
:type source_repo_id: str
:param dest_repo_id: ID of the repository into which units are being copied
:type dest_repo_id: str
:param source_importer_id: ID of the importer on the source repository
:type source_importer_id: str
:param dest_importer_id: ID of the importer on the destination repository
:type dest_importer_id: str
"""
ImporterScratchPadMixin.__init__(self, dest_repo_id, dest_importer_id)
RepoScratchPadMixin.__init__(self, dest_repo_id, ImporterConduitException)
SearchUnitsMixin.__init__(self, ImporterConduitException)
AddUnitMixin.__init__(self, dest_repo_id, dest_importer_id)
self.source_repo_id = source_repo_id
self.dest_repo_id = dest_repo_id
self.source_importer_id = source_importer_id
self.dest_importer_id = dest_importer_id
self.__association_manager = manager_factory.repo_unit_association_manager()
self.__association_query_manager = manager_factory.repo_unit_association_query_manager()
self.__importer_manager = manager_factory.repo_importer_manager()
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:unit_import.py
示例9: test_post_with_override_config
def test_post_with_override_config(self, mock_get_worker_for_reservation, mock_uuid,
mock_apply_async):
# Setup
uuid_list = [uuid.uuid4() for i in range(10)]
mock_uuid.uuid4.side_effect = copy.deepcopy(uuid_list)
expected_async_result = AsyncResult(str(uuid_list[0]))
mock_get_worker_for_reservation.return_value = Worker('some_queue', datetime.datetime.now())
upload_id = self.upload_manager.initialize_upload()
self.upload_manager.save_data(upload_id, 0, 'string data')
repo_manager = manager_factory.repo_manager()
repo_manager.create_repo('repo-upload')
importer_manager = manager_factory.repo_importer_manager()
importer_manager.set_importer('repo-upload', 'dummy-importer', {})
# Test
test_override_config = {'key1': 'value1', 'key2': 'value2'}
body = {
'upload_id' : upload_id,
'unit_type_id' : 'dummy-type',
'unit_key' : {'name' : 'foo'},
'unit_metadata' : {'stuff' : 'bar'},
'override_config': test_override_config,
}
status, body = self.post('/v2/repositories/repo-upload/actions/import_upload/', body)
# Verify
self.assertEqual(202, status)
assert_body_matches_async_task(body, expected_async_result)
exepcted_call_args = ['repo-upload', 'dummy-type',
{'name': 'foo'}, {'stuff': 'bar'},
upload_id, test_override_config]
self.assertEqual(exepcted_call_args, mock_apply_async.call_args[0][0])
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:33,代码来源:test_contents_controller.py
示例10: put
def put(self, request, repo_id, importer_id):
"""
Associate an importer to a repository.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:param repo_id: The id of the repository
:type repo_id: str
:param importer_id: The id of the importer to associate
:type importer_id: str
:raises pulp_exceptions.MissingValue: if required param importer_config is not in the body
:raises pulp_exceptions.MissingResource: if importer does not match the repo's importer
:raises pulp_exceptions.OperationPostponed: dispatch a task
"""
importer_manager = manager_factory.repo_importer_manager()
importer = importer_manager.get_importer(repo_id)
if importer['id'] != importer_id:
raise pulp_exceptions.MissingResource(importer_id=importer_id)
importer_config = request.body_as_json.get('importer_config', None)
if importer_config is None:
raise pulp_exceptions.MissingValue(['importer_config'])
task_tags = [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
tags.resource_tag(tags.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id),
tags.action_tag('update_importer')]
async_result = repo_importer_manager.update_importer_config.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE,
repo_id, [repo_id], {'importer_config': importer_config}, tags=task_tags)
raise pulp_exceptions.OperationPostponed(async_result)
开发者ID:hgschmie,项目名称:pulp,代码行数:33,代码来源:repositories.py
示例11: _process_repos
def _process_repos(repos, importers=False, distributors=False):
"""
Apply standard processing to a collection of repositories being returned to a client. Adds
the object link and optionally adds related importers and distributors.
:param repos: collection of repositories
:type repos: list, tuple
:param importers: if True, adds related importers under the attribute "importers".
:type importers: bool
:param distributors: if True, adds related distributors under the attribute "distributors"
:type distributors: bool
:return: the same list that was passed in, just for convenience. The list itself is not
modified- only its members are modified in-place.
:rtype: list of Repo instances
"""
if importers:
_merge_related_objects(
'importers', manager_factory.repo_importer_manager(), repos)
if distributors:
_merge_related_objects(
'distributors', manager_factory.repo_distributor_manager(), repos)
for repo in repos:
repo['_href'] = reverse('repo_resource', kwargs={'repo_id': repo['id']})
_convert_repo_dates_to_strings(repo)
# Remove internally used scratchpad from repo details
if 'scratchpad' in repo:
del repo['scratchpad']
return repos
开发者ID:hgschmie,项目名称:pulp,代码行数:33,代码来源:repositories.py
示例12: delete
def delete(self, request, repo_id, importer_id):
"""
Remove an importer from a repository.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:param repo_id: The id of the repository to remove the importer from
:type repo_id: str
:param importer_id: The id of the importer to remove from the given repository
:type importer_id: str
:raises pulp_exceptions.MissingResource: if importer cannot be found for this repo
:raises pulp_exceptions.OperationPostponed: to dispatch a task to delete the importer
"""
importer_manager = manager_factory.repo_importer_manager()
importer = importer_manager.get_importer(repo_id)
if importer['id'] != importer_id:
raise pulp_exceptions.MissingResource(importer_id=importer_id)
task_tags = [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
tags.resource_tag(tags.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id),
tags.action_tag('delete_importer')]
async_result = repo_importer_manager.remove_importer.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, repo_id, [repo_id], tags=task_tags)
raise pulp_exceptions.OperationPostponed(async_result)
开发者ID:hgschmie,项目名称:pulp,代码行数:26,代码来源:repositories.py
示例13: post
def post(self, request, repo_id):
"""
Associate an importer with a repository.
This will validate that the repository exists and that there is an importer with the
importer_type_id given. However, the importer configuration validation only checks the
provided values against a standard set of importer configuration keys. The importer
specific validation is called on association, so any type specific configuration will
be validated later. This means the spawned task could fail with a validation error.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:param repo_id: the repository to associate the importer with
:type repo_id: str
:raises pulp_exceptions.OperationPostponed: dispatch a task
"""
importer_type = request.body_as_json.get('importer_type_id', None)
config = request.body_as_json.get('importer_config', None)
# Validation occurs within the manager
importer_manager = manager_factory.repo_importer_manager()
importer_manager.validate_importer_config(repo_id, importer_type, config)
task_tags = [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
tags.action_tag('add_importer')]
async_result = repo_importer_manager.set_importer.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, repo_id, [repo_id, importer_type],
{'repo_plugin_config': config}, tags=task_tags)
raise pulp_exceptions.OperationPostponed(async_result)
开发者ID:hgschmie,项目名称:pulp,代码行数:31,代码来源:repositories.py
示例14: get
def get(self, request, repo_id):
"""
Looks for query parameters 'importers' and 'distributors', and will add
the corresponding fields to the repository returned. Query parameter
'details' is equivalent to passing both 'importers' and 'distributors'.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:param repo_id: id of requested repository
:type repo_id: str
:return: Response containing a serialized dict for the requested repo.
:rtype : django.http.HttpResponse
:raises pulp_exceptions.MissingResource: if repo cannot be found
"""
query_manager = manager_factory.repo_query_manager()
repo = query_manager.find_by_id(repo_id)
if repo is None:
raise pulp_exceptions.MissingResource(repo=repo_id)
repo['_href'] = reverse('repo_resource', kwargs={'repo_id': repo_id})
_convert_repo_dates_to_strings(repo)
details = request.GET.get('details', 'false').lower() == 'true'
if request.GET.get('importers', 'false').lower() == 'true' or details:
repo = _merge_related_objects(
'importers', manager_factory.repo_importer_manager(), (repo,))[0]
if request.GET.get('distributors', 'false').lower() == 'true' or details:
repo = _merge_related_objects(
'distributors', manager_factory.repo_distributor_manager(), (repo,))[0]
return generate_json_response_with_pulp_encoder(repo)
开发者ID:hgschmie,项目名称:pulp,代码行数:34,代码来源:repositories.py
示例15: _process_repos
def _process_repos(repos, importers=False, distributors=False):
"""
Apply standard processing to a collection of repositories being returned
to a client. Adds the object link and optionally adds related importers
and distributors.
@param repos: collection of repositories
@type repos: list, tuple
@param importers: iff True, adds related importers under the
attribute "importers".
@type importers: bool
@param distributors: iff True, adds related distributors under the
attribute "distributors".
@type distributors: bool
@return the same list that was passed in, just for convenience. The list
itself is not modified- only its members are modified in-place.
@rtype list of Repo instances
"""
if importers:
_merge_related_objects(
'importers', manager_factory.repo_importer_manager(), repos)
if distributors:
_merge_related_objects(
'distributors', manager_factory.repo_distributor_manager(), repos)
for repo in repos:
repo.update(serialization.link.search_safe_link_obj(repo['id']))
return repos
开发者ID:ryanschneider,项目名称:pulp,代码行数:32,代码来源:repositories.py
示例16: PUT
def PUT(self, repo_id, importer_id):
# Params (validation will occur in the manager)
params = self.params()
importer_config = params.get('importer_config', None)
if importer_config is None:
_LOG.error('Missing configuration updating importer for repository [%s]' % repo_id)
raise exceptions.MissingValue(['importer_config'])
importer_manager = manager_factory.repo_importer_manager()
resources = {dispatch_constants.RESOURCE_REPOSITORY_TYPE: {repo_id: dispatch_constants.RESOURCE_UPDATE_OPERATION},
dispatch_constants.RESOURCE_REPOSITORY_IMPORTER_TYPE: {importer_id: dispatch_constants.RESOURCE_UPDATE_OPERATION}}
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
resource_tag(dispatch_constants.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id),
action_tag('update_importer')]
call_request = CallRequest(importer_manager.update_importer_config,
[repo_id],
{'importer_config': importer_config},
resources=resources,
tags=tags,
archive=True,
kwarg_blacklist=['importer_config'])
result = execution.execute(call_request)
return self.ok(result)
开发者ID:domcleal,项目名称:pulp,代码行数:25,代码来源:repositories.py
示例17: delete
def delete(repo_id):
"""
Delete a repository and inform other affected collections.
:param repo_id: id of the repository to delete.
:type repo_id: str
:raise pulp_exceptions.PulpExecutionException: if any part of the process fails; the exception
will contain information on which sections failed
:return: A TaskResult object with the details of any errors or spawned tasks
:rtype: pulp.server.async.tasks.TaskResult
"""
# With so much going on during a delete, it's possible that a few things could go wrong while
# others are successful. We track lesser errors that shouldn't abort the entire process until
# the end and then raise an exception describing the incompleteness of the delete. The exception
# arguments are captured as the second element in the tuple, but the user will have to look at
# the server logs for more information.
error_tuples = [] # tuple of failed step and exception arguments
importer_manager = manager_factory.repo_importer_manager()
distributor_manager = manager_factory.repo_distributor_manager()
# Inform the importer
importer_coll = RepoImporter.get_collection()
repo_importer = importer_coll.find_one({'repo_id': repo_id})
if repo_importer is not None:
try:
importer_manager.remove_importer(repo_id)
except Exception, e:
_logger.exception('Error received removing importer [%s] from repo [%s]' % (
repo_importer['importer_type_id'], repo_id))
error_tuples.append(e)
开发者ID:zjhuntin,项目名称:pulp,代码行数:34,代码来源:repository.py
示例18: populate
def populate(self, strategy=constants.DEFAULT_STRATEGY, ssl=False):
PluginTestBase.populate(self)
# register child
manager = managers.consumer_manager()
manager.register(self.PULP_ID, notes={constants.STRATEGY_NOTE_KEY: strategy})
manager = managers.repo_importer_manager()
# add importer
importer_conf = {
constants.MANIFEST_URL_KEYWORD: 'http://redhat.com',
constants.STRATEGY_KEYWORD: constants.DEFAULT_STRATEGY,
constants.PROTOCOL_KEYWORD: 'file',
}
manager.set_importer(self.REPO_ID, constants.HTTP_IMPORTER, importer_conf)
# add distributors
if ssl:
dist_conf = self.dist_conf_with_ssl()
else:
dist_conf = self.dist_conf()
manager = managers.repo_distributor_manager()
manager.add_distributor(
self.REPO_ID,
constants.HTTP_DISTRIBUTOR,
dist_conf,
False,
constants.HTTP_DISTRIBUTOR)
manager.add_distributor(self.REPO_ID, FAKE_DISTRIBUTOR, {}, False, FAKE_DISTRIBUTOR)
# bind
conf = {constants.STRATEGY_KEYWORD: strategy}
manager = managers.consumer_bind_manager()
manager.bind(self.PULP_ID, self.REPO_ID, constants.HTTP_DISTRIBUTOR, False, conf)
开发者ID:bartwo,项目名称:pulp,代码行数:30,代码来源:test_plugins.py
示例19: remove_from_importer
def remove_from_importer(repo_id, removed_units):
# Retrieve the repo from the database and convert to the transfer repo
repo_query_manager = manager_factory.repo_query_manager()
repo = repo_query_manager.get_repository(repo_id)
importer_manager = manager_factory.repo_importer_manager()
repo_importer = importer_manager.get_importer(repo_id)
transfer_repo = common_utils.to_transfer_repo(repo)
transfer_repo.working_dir = common_utils.importer_working_dir(repo_importer['importer_type_id'], repo_id, mkdir=True)
# Convert the units into transfer units
unit_type_ids = calculate_associated_type_ids(repo_id, removed_units)
transfer_units = create_transfer_units(removed_units, unit_type_ids)
# Retrieve the plugin instance to invoke
importer_instance, plugin_config = plugin_api.get_importer_by_id(repo_importer['importer_type_id'])
call_config = PluginCallConfiguration(plugin_config, repo_importer['config'])
# Invoke the importer's remove method
try:
importer_instance.remove_units(transfer_repo, transfer_units, call_config)
except Exception:
_LOG.exception('Exception from importer [%s] while removing units from repo [%s]' % (repo_importer['id'], repo_id))
开发者ID:juwu,项目名称:pulp,代码行数:25,代码来源:unit_association.py
示例20: verify
def verify(self, num_units=PluginTestBase.NUM_UNITS):
# repository
manager = managers.repo_query_manager()
manager.get_repository(self.REPO_ID)
# importer
manager = managers.repo_importer_manager()
importer = manager.get_importer(self.REPO_ID)
manifest_url = importer['config'][constants.MANIFEST_URL_KEYWORD]
self.assertTrue(manifest_url.endswith('%s/manifest.json.gz' % self.REPO_ID))
# distributor
manager = managers.repo_distributor_manager()
manager.get_distributor(self.REPO_ID, FAKE_DISTRIBUTOR)
self.assertRaises(MissingResource, manager.get_distributor, self.REPO_ID, constants.HTTP_DISTRIBUTOR)
# check units
manager = managers.repo_unit_association_query_manager()
units = manager.get_units(self.REPO_ID)
units = dict([(u['metadata']['N'], u) for u in units])
self.assertEqual(len(units), num_units)
for n in range(0, num_units):
unit = units[n]
unit_id = self.UNIT_ID % n
metadata = unit['metadata']
storage_path = metadata['_storage_path'].replace('//', '/')
self.assertEqual(unit['unit_type_id'], self.UNIT_TYPE_ID)
self.assertEqual(unit['repo_id'], self.REPO_ID)
self.assertEqual(unit['owner_id'], constants.HTTP_IMPORTER)
file_path = '.'.join((unit_id, self.UNIT_TYPE_ID))
self.assertEqual(storage_path, os.path.join(self.childfs, 'content', file_path))
self.assertTrue(os.path.exists(storage_path))
fp = open(storage_path)
content = fp.read()
fp.close()
self.assertEqual(content, unit_id)
开发者ID:bartwo,项目名称:pulp,代码行数:33,代码来源:test_plugins.py
注:本文中的pulp.server.managers.factory.repo_importer_manager函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论