• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python factory.repo_importer_manager函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.server.managers.factory.repo_importer_manager函数的典型用法代码示例。如果您正苦于以下问题:Python repo_importer_manager函数的具体用法?Python repo_importer_manager怎么用?Python repo_importer_manager使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了repo_importer_manager函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setUp

    def setUp(self):
        super(DependencyManagerTests, self).setUp()

        mock_plugins.install()

        database.update_database([TYPE_1_DEF])

        self.repo_id = 'dep-repo'
        self.manager = manager_factory.dependency_manager()

        manager_factory.repo_manager().create_repo(self.repo_id)
        manager_factory.repo_importer_manager().set_importer(self.repo_id, 'mock-importer', {})
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:12,代码来源:test_dependency_manager.py


示例2: POST

    def POST(self, repo_id):
        """
        Associate an importer with a repository.

        This will validate that the repository exists and that there is an importer with the
        importer_type_id given. However, the importer configuration validation only checks the
        provided values against a standard set of importer configuration keys. The importer
        specific validation is called on association, so any type specific configuration will
        be validated later. This means the spawned task could fail with a validation error.

        :param repo_id: the repository to associate the importer with
        :type  repo_id: str
        """
        params = self.params()
        importer_type = params.get('importer_type_id', None)
        config = params.get('importer_config', None)

        # This call will raise the appropriate exception
        importer_manager = manager_factory.repo_importer_manager()
        importer_manager.validate_importer_config(repo_id, importer_type, config)

        # Note: If an importer exists, it's removed, so no need to handle 409s.
        # Note: If the plugin raises an exception during initialization, let it
        #  bubble up and be handled like any other 500.

        task_tags = [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
                     tags.action_tag('add_importer')]
        async_result = set_importer.apply_async_with_reservation(
            tags.RESOURCE_REPOSITORY_TYPE, repo_id, [repo_id, importer_type],
            {'repo_plugin_config': config}, tags=task_tags)
        raise exceptions.OperationPostponed(async_result)
开发者ID:beav,项目名称:pulp,代码行数:31,代码来源:repositories.py


示例3: test_delete_with_plugin_error

    def test_delete_with_plugin_error(self):
        """
        Tests deleting a repo where one (or more) of the plugins raises an error.
        """

        # Setup
        self.manager.create_repo('doomed')

        importer_manager = manager_factory.repo_importer_manager()
        distributor_manager = manager_factory.repo_distributor_manager()

        importer_manager.set_importer('doomed', 'mock-importer', {})
        distributor_manager.add_distributor('doomed', 'mock-distributor', {}, True,
                                            distributor_id='dist-1')

        #    Setup both mocks to raise errors on removal
        mock_plugins.MOCK_IMPORTER.importer_removed.side_effect = Exception('Splat')
        mock_plugins.MOCK_DISTRIBUTOR.distributor_removed.side_effect = Exception('Pow')

        # Test
        try:
            self.manager.delete_repo('doomed')
            self.fail('No exception raised during repo delete')
        except exceptions.PulpExecutionException:
            pass

        # Cleanup - need to manually clear the side effects
        mock_plugins.MOCK_IMPORTER.importer_removed.side_effect = None
        mock_plugins.MOCK_DISTRIBUTOR.distributor_removed.side_effect = None
开发者ID:beav,项目名称:pulp,代码行数:29,代码来源:test_cud.py


示例4: __init__

    def __init__(self, source_repo_id, dest_repo_id, source_importer_id, dest_importer_id,
                 association_owner_type, association_owner_id):
        """
        :param source_repo_id: ID of the repository from which units are being copied
        :type  source_repo_id: str
        :param dest_repo_id: ID of the repository into which units are being copied
        :type  dest_repo_id: str
        :param source_importer_id: ID of the importer on the source repository
        :type  source_importer_id: str
        :param dest_importer_id:  ID of the importer on the destination repository
        :type  dest_importer_id: str
        :param association_owner_type: distinguishes the owner when creating an
               association through this conduit
        :type  association_owner_type: str
        :param association_owner_id: specific ID of the owner when creating an
               association through this conduit
        :type  association_owner_id: str
        """
        ImporterScratchPadMixin.__init__(self, dest_repo_id, dest_importer_id)
        RepoScratchPadMixin.__init__(self, dest_repo_id, ImporterConduitException)
        SearchUnitsMixin.__init__(self, ImporterConduitException)
        AddUnitMixin.__init__(self, dest_repo_id, dest_importer_id, association_owner_type, association_owner_id)

        self.source_repo_id = source_repo_id
        self.dest_repo_id = dest_repo_id

        self.source_importer_id = source_importer_id
        self.dest_importer_id = dest_importer_id

        self.association_owner_type = association_owner_type
        self.association_owner_id = association_owner_id

        self.__association_manager = manager_factory.repo_unit_association_manager()
        self.__association_query_manager = manager_factory.repo_unit_association_query_manager()
        self.__importer_manager = manager_factory.repo_importer_manager()
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:35,代码来源:unit_import.py


示例5: remove_from_importer

def remove_from_importer(repo_id, transfer_units):

    # Retrieve the repo from the database and convert to the transfer repo
    repo_query_manager = manager_factory.repo_query_manager()
    repo = repo_query_manager.get_repository(repo_id)

    importer_manager = manager_factory.repo_importer_manager()
    repo_importer = importer_manager.get_importer(repo_id)

    transfer_repo = common_utils.to_transfer_repo(repo)
    transfer_repo.working_dir = common_utils.importer_working_dir(repo_importer['importer_type_id'],
                                                                  repo_id, mkdir=True)

    # Retrieve the plugin instance to invoke
    importer_instance, plugin_config = plugin_api.get_importer_by_id(
        repo_importer['importer_type_id'])
    call_config = PluginCallConfiguration(plugin_config, repo_importer['config'])

    # Invoke the importer's remove method
    try:
        importer_instance.remove_units(transfer_repo, transfer_units, call_config)
    except Exception:
        msg = _('Exception from importer [%(i)s] while removing units from repo [%(r)s]')
        msg = msg % {'i': repo_importer['id'], 'r': repo_id}
        logger.exception(msg)
开发者ID:preethit,项目名称:pulp-1,代码行数:25,代码来源:unit_association.py


示例6: create_sync_schedule

    def create_sync_schedule(self, repo_id, importer_id, sync_options, schedule_data):
        """
        Create a new sync schedule for a given repository using the given importer.
        @param repo_id:
        @param importer_id:
        @param sync_options:
        @param schedule_data:
        @return:
        """

        # validate the input
        self._validate_importer(repo_id, importer_id)
        self._validate_keys(sync_options, _SYNC_OPTION_KEYS)
        if 'schedule' not in schedule_data:
            raise pulp_exceptions.MissingValue(['schedule'])

        # build the sync call request
        sync_manager = managers_factory.repo_sync_manager()
        args = [repo_id]
        kwargs = {'sync_config_override': sync_options['override_config']}
        weight = pulp_config.config.getint('tasks', 'sync_weight')
        tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
                resource_tag(dispatch_constants.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id)]
        call_request = CallRequest(sync_manager.sync, args, kwargs, weight=weight, tags=tags, archive=True)
        call_request.reads_resource(dispatch_constants.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id)
        call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)
        call_request.add_life_cycle_callback(dispatch_constants.CALL_ENQUEUE_LIFE_CYCLE_CALLBACK, sync_manager.prep_sync)

        # schedule the sync
        scheduler = dispatch_factory.scheduler()
        schedule_id = scheduler.add(call_request, **schedule_data)
        importer_manager = managers_factory.repo_importer_manager()
        importer_manager.add_sync_schedule(repo_id, schedule_id)
        return schedule_id
开发者ID:ehelms,项目名称:pulp,代码行数:34,代码来源:cud.py


示例7: test_post

    def test_post(self, _reserve_resource, mock_apply_async):
        # Setup
        task_id = str(uuid.uuid4())
        mock_apply_async.return_value = AsyncResult(task_id)
        _reserve_resource.return_value = ReservedResourceApplyAsync()
        upload_id = self.upload_manager.initialize_upload()
        self.upload_manager.save_data(upload_id, 0, 'string data')

        repo_manager = manager_factory.repo_manager()
        repo_manager.create_repo('repo-upload')
        importer_manager = manager_factory.repo_importer_manager()
        importer_manager.set_importer('repo-upload', 'dummy-importer', {})

        # Test
        body = {
            'upload_id' : upload_id,
            'unit_type_id' : 'dummy-type',
            'unit_key' : {'name' : 'foo'},
            'unit_metadata' : {'stuff' : 'bar'},
        }
        status, body = self.post('/v2/repositories/repo-upload/actions/import_upload/', body)

        # Verify
        self.assertEqual(202, status)
        assert_body_matches_async_task(body, mock_apply_async.return_value)
        exepcted_call_args = ['repo-upload', 'dummy-type',
                              {'name': 'foo'}, {'stuff': 'bar'}, 
                              upload_id]
        self.assertEqual(exepcted_call_args, mock_apply_async.call_args[0][0])
开发者ID:aweiteka,项目名称:pulp,代码行数:29,代码来源:test_contents_controller.py


示例8: __init__

    def __init__(self, source_repo_id, dest_repo_id, source_importer_id, dest_importer_id):
        """
        :param source_repo_id: ID of the repository from which units are being copied
        :type  source_repo_id: str
        :param dest_repo_id: ID of the repository into which units are being copied
        :type  dest_repo_id: str
        :param source_importer_id: ID of the importer on the source repository
        :type  source_importer_id: str
        :param dest_importer_id:  ID of the importer on the destination repository
        :type  dest_importer_id: str
        """
        ImporterScratchPadMixin.__init__(self, dest_repo_id, dest_importer_id)
        RepoScratchPadMixin.__init__(self, dest_repo_id, ImporterConduitException)
        SearchUnitsMixin.__init__(self, ImporterConduitException)
        AddUnitMixin.__init__(self, dest_repo_id, dest_importer_id)

        self.source_repo_id = source_repo_id
        self.dest_repo_id = dest_repo_id

        self.source_importer_id = source_importer_id
        self.dest_importer_id = dest_importer_id

        self.__association_manager = manager_factory.repo_unit_association_manager()
        self.__association_query_manager = manager_factory.repo_unit_association_query_manager()
        self.__importer_manager = manager_factory.repo_importer_manager()
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:unit_import.py


示例9: test_post_with_override_config

    def test_post_with_override_config(self, mock_get_worker_for_reservation, mock_uuid,
                                       mock_apply_async):
        # Setup
        uuid_list = [uuid.uuid4() for i in range(10)]
        mock_uuid.uuid4.side_effect = copy.deepcopy(uuid_list)
        expected_async_result = AsyncResult(str(uuid_list[0]))
        mock_get_worker_for_reservation.return_value = Worker('some_queue', datetime.datetime.now())
        upload_id = self.upload_manager.initialize_upload()
        self.upload_manager.save_data(upload_id, 0, 'string data')

        repo_manager = manager_factory.repo_manager()
        repo_manager.create_repo('repo-upload')
        importer_manager = manager_factory.repo_importer_manager()
        importer_manager.set_importer('repo-upload', 'dummy-importer', {})

        # Test
        test_override_config = {'key1': 'value1', 'key2': 'value2'}
        body = {
            'upload_id' : upload_id,
            'unit_type_id' : 'dummy-type',
            'unit_key' : {'name' : 'foo'},
            'unit_metadata' : {'stuff' : 'bar'},
            'override_config': test_override_config,
        }
        status, body = self.post('/v2/repositories/repo-upload/actions/import_upload/', body)

        # Verify
        self.assertEqual(202, status)
        assert_body_matches_async_task(body, expected_async_result)
        exepcted_call_args = ['repo-upload', 'dummy-type',
                              {'name': 'foo'}, {'stuff': 'bar'},
                              upload_id, test_override_config]
        self.assertEqual(exepcted_call_args, mock_apply_async.call_args[0][0])
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:33,代码来源:test_contents_controller.py


示例10: put

    def put(self, request, repo_id, importer_id):
        """
        Associate an importer to a repository.

        :param request: WSGI request object
        :type  request: django.core.handlers.wsgi.WSGIRequest
        :param repo_id: The id of the repository
        :type  repo_id: str
        :param importer_id: The id of the importer to associate
        :type  importer_id: str

        :raises pulp_exceptions.MissingValue: if required param importer_config is not in the body
        :raises pulp_exceptions.MissingResource: if importer does not match the repo's importer
        :raises pulp_exceptions.OperationPostponed: dispatch a task
        """

        importer_manager = manager_factory.repo_importer_manager()
        importer = importer_manager.get_importer(repo_id)
        if importer['id'] != importer_id:
            raise pulp_exceptions.MissingResource(importer_id=importer_id)

        importer_config = request.body_as_json.get('importer_config', None)

        if importer_config is None:
            raise pulp_exceptions.MissingValue(['importer_config'])

        task_tags = [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
                     tags.resource_tag(tags.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id),
                     tags.action_tag('update_importer')]
        async_result = repo_importer_manager.update_importer_config.apply_async_with_reservation(
            tags.RESOURCE_REPOSITORY_TYPE,
            repo_id, [repo_id], {'importer_config': importer_config}, tags=task_tags)
        raise pulp_exceptions.OperationPostponed(async_result)
开发者ID:hgschmie,项目名称:pulp,代码行数:33,代码来源:repositories.py


示例11: _process_repos

    def _process_repos(repos, importers=False, distributors=False):
        """
        Apply standard processing to a collection of repositories being returned to a client. Adds
        the object link and optionally adds related importers and distributors.

        :param repos: collection of repositories
        :type  repos: list, tuple
        :param importers: if True, adds related importers under the attribute "importers".
        :type  importers: bool
        :param distributors: if True, adds related distributors under the attribute "distributors"
        :type  distributors: bool

        :return: the same list that was passed in, just for convenience. The list itself is not
                 modified- only its members are modified in-place.
        :rtype:  list of Repo instances
        """

        if importers:
            _merge_related_objects(
                'importers', manager_factory.repo_importer_manager(), repos)
        if distributors:
            _merge_related_objects(
                'distributors', manager_factory.repo_distributor_manager(), repos)

        for repo in repos:
            repo['_href'] = reverse('repo_resource', kwargs={'repo_id': repo['id']})
            _convert_repo_dates_to_strings(repo)

            # Remove internally used scratchpad from repo details
            if 'scratchpad' in repo:
                del repo['scratchpad']

        return repos
开发者ID:hgschmie,项目名称:pulp,代码行数:33,代码来源:repositories.py


示例12: delete

    def delete(self, request, repo_id, importer_id):
        """
        Remove an importer from a repository.

        :param request: WSGI request object
        :type  request: django.core.handlers.wsgi.WSGIRequest
        :param repo_id: The id of the repository to remove the importer from
        :type  repo_id: str
        :param importer_id: The id of the importer to remove from the given repository
        :type  importer_id: str

        :raises pulp_exceptions.MissingResource: if importer cannot be found for this repo
        :raises pulp_exceptions.OperationPostponed: to dispatch a task to delete the importer
        """

        importer_manager = manager_factory.repo_importer_manager()
        importer = importer_manager.get_importer(repo_id)
        if importer['id'] != importer_id:
            raise pulp_exceptions.MissingResource(importer_id=importer_id)

        task_tags = [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
                     tags.resource_tag(tags.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id),
                     tags.action_tag('delete_importer')]
        async_result = repo_importer_manager.remove_importer.apply_async_with_reservation(
            tags.RESOURCE_REPOSITORY_TYPE, repo_id, [repo_id], tags=task_tags)
        raise pulp_exceptions.OperationPostponed(async_result)
开发者ID:hgschmie,项目名称:pulp,代码行数:26,代码来源:repositories.py


示例13: post

    def post(self, request, repo_id):
        """
        Associate an importer with a repository.

        This will validate that the repository exists and that there is an importer with the
        importer_type_id given. However, the importer configuration validation only checks the
        provided values against a standard set of importer configuration keys. The importer
        specific validation is called on association, so any type specific configuration will
        be validated later. This means the spawned task could fail with a validation error.

        :param request: WSGI request object
        :type  request: django.core.handlers.wsgi.WSGIRequest
        :param repo_id: the repository to associate the importer with
        :type  repo_id: str

        :raises pulp_exceptions.OperationPostponed: dispatch a task
        """

        importer_type = request.body_as_json.get('importer_type_id', None)
        config = request.body_as_json.get('importer_config', None)

        # Validation occurs within the manager
        importer_manager = manager_factory.repo_importer_manager()
        importer_manager.validate_importer_config(repo_id, importer_type, config)

        task_tags = [tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
                     tags.action_tag('add_importer')]
        async_result = repo_importer_manager.set_importer.apply_async_with_reservation(
            tags.RESOURCE_REPOSITORY_TYPE, repo_id, [repo_id, importer_type],
            {'repo_plugin_config': config}, tags=task_tags)
        raise pulp_exceptions.OperationPostponed(async_result)
开发者ID:hgschmie,项目名称:pulp,代码行数:31,代码来源:repositories.py


示例14: get

    def get(self, request, repo_id):
        """
        Looks for query parameters 'importers' and 'distributors', and will add
        the corresponding fields to the repository returned. Query parameter
        'details' is equivalent to passing both 'importers' and 'distributors'.

        :param request: WSGI request object
        :type  request: django.core.handlers.wsgi.WSGIRequest
        :param repo_id: id of requested repository
        :type  repo_id: str

        :return: Response containing a serialized dict for the requested repo.
        :rtype : django.http.HttpResponse
        :raises pulp_exceptions.MissingResource: if repo cannot be found
        """

        query_manager = manager_factory.repo_query_manager()
        repo = query_manager.find_by_id(repo_id)

        if repo is None:
            raise pulp_exceptions.MissingResource(repo=repo_id)

        repo['_href'] = reverse('repo_resource', kwargs={'repo_id': repo_id})
        _convert_repo_dates_to_strings(repo)

        details = request.GET.get('details', 'false').lower() == 'true'
        if request.GET.get('importers', 'false').lower() == 'true' or details:
            repo = _merge_related_objects(
                'importers', manager_factory.repo_importer_manager(), (repo,))[0]
        if request.GET.get('distributors', 'false').lower() == 'true' or details:
            repo = _merge_related_objects(
                'distributors', manager_factory.repo_distributor_manager(), (repo,))[0]

        return generate_json_response_with_pulp_encoder(repo)
开发者ID:hgschmie,项目名称:pulp,代码行数:34,代码来源:repositories.py


示例15: _process_repos

    def _process_repos(repos, importers=False, distributors=False):
        """
        Apply standard processing to a collection of repositories being returned
        to a client.  Adds the object link and optionally adds related importers
        and distributors.

        @param repos: collection of repositories
        @type  repos: list, tuple

        @param importers:   iff True, adds related importers under the
                            attribute "importers".
        @type  importers:   bool

        @param distributors:    iff True, adds related distributors under the
                                attribute "distributors".
        @type  distributors:    bool

        @return the same list that was passed in, just for convenience. The list
                itself is not modified- only its members are modified in-place.
        @rtype  list of Repo instances
        """
        if importers:
            _merge_related_objects(
                'importers', manager_factory.repo_importer_manager(), repos)
        if distributors:
            _merge_related_objects(
                'distributors', manager_factory.repo_distributor_manager(), repos)

        for repo in repos:
            repo.update(serialization.link.search_safe_link_obj(repo['id']))

        return repos
开发者ID:ryanschneider,项目名称:pulp,代码行数:32,代码来源:repositories.py


示例16: PUT

    def PUT(self, repo_id, importer_id):

        # Params (validation will occur in the manager)
        params = self.params()
        importer_config = params.get('importer_config', None)

        if importer_config is None:
            _LOG.error('Missing configuration updating importer for repository [%s]' % repo_id)
            raise exceptions.MissingValue(['importer_config'])

        importer_manager = manager_factory.repo_importer_manager()
        resources = {dispatch_constants.RESOURCE_REPOSITORY_TYPE: {repo_id: dispatch_constants.RESOURCE_UPDATE_OPERATION},
                     dispatch_constants.RESOURCE_REPOSITORY_IMPORTER_TYPE: {importer_id: dispatch_constants.RESOURCE_UPDATE_OPERATION}}
        tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
                resource_tag(dispatch_constants.RESOURCE_REPOSITORY_IMPORTER_TYPE, importer_id),
                action_tag('update_importer')]
        call_request = CallRequest(importer_manager.update_importer_config,
                                   [repo_id],
                                   {'importer_config': importer_config},
                                   resources=resources,
                                   tags=tags,
                                   archive=True,
                                   kwarg_blacklist=['importer_config'])
        result = execution.execute(call_request)
        return self.ok(result)
开发者ID:domcleal,项目名称:pulp,代码行数:25,代码来源:repositories.py


示例17: delete

def delete(repo_id):
    """
    Delete a repository and inform other affected collections.

    :param repo_id: id of the repository to delete.
    :type  repo_id: str

    :raise pulp_exceptions.PulpExecutionException: if any part of the process fails; the exception
                                                   will contain information on which sections failed

    :return: A TaskResult object with the details of any errors or spawned tasks
    :rtype:  pulp.server.async.tasks.TaskResult
    """

    # With so much going on during a delete, it's possible that a few things could go wrong while
    # others are successful. We track lesser errors that shouldn't abort the entire process until
    # the end and then raise an exception describing the incompleteness of the delete. The exception
    # arguments are captured as the second element in the tuple, but the user will have to look at
    # the server logs for more information.
    error_tuples = []  # tuple of failed step and exception arguments

    importer_manager = manager_factory.repo_importer_manager()
    distributor_manager = manager_factory.repo_distributor_manager()

    # Inform the importer
    importer_coll = RepoImporter.get_collection()
    repo_importer = importer_coll.find_one({'repo_id': repo_id})
    if repo_importer is not None:
        try:
            importer_manager.remove_importer(repo_id)
        except Exception, e:
            _logger.exception('Error received removing importer [%s] from repo [%s]' % (
                repo_importer['importer_type_id'], repo_id))
            error_tuples.append(e)
开发者ID:zjhuntin,项目名称:pulp,代码行数:34,代码来源:repository.py


示例18: populate

 def populate(self, strategy=constants.DEFAULT_STRATEGY, ssl=False):
     PluginTestBase.populate(self)
     # register child
     manager = managers.consumer_manager()
     manager.register(self.PULP_ID, notes={constants.STRATEGY_NOTE_KEY: strategy})
     manager = managers.repo_importer_manager()
     # add importer
     importer_conf = {
         constants.MANIFEST_URL_KEYWORD: 'http://redhat.com',
         constants.STRATEGY_KEYWORD: constants.DEFAULT_STRATEGY,
         constants.PROTOCOL_KEYWORD: 'file',
     }
     manager.set_importer(self.REPO_ID, constants.HTTP_IMPORTER, importer_conf)
     # add distributors
     if ssl:
         dist_conf = self.dist_conf_with_ssl()
     else:
         dist_conf = self.dist_conf()
     manager = managers.repo_distributor_manager()
     manager.add_distributor(
         self.REPO_ID,
         constants.HTTP_DISTRIBUTOR,
         dist_conf,
         False,
         constants.HTTP_DISTRIBUTOR)
     manager.add_distributor(self.REPO_ID, FAKE_DISTRIBUTOR, {}, False, FAKE_DISTRIBUTOR)
     # bind
     conf = {constants.STRATEGY_KEYWORD: strategy}
     manager = managers.consumer_bind_manager()
     manager.bind(self.PULP_ID, self.REPO_ID, constants.HTTP_DISTRIBUTOR, False, conf)
开发者ID:bartwo,项目名称:pulp,代码行数:30,代码来源:test_plugins.py


示例19: remove_from_importer

def remove_from_importer(repo_id, removed_units):

    # Retrieve the repo from the database and convert to the transfer repo
    repo_query_manager = manager_factory.repo_query_manager()
    repo = repo_query_manager.get_repository(repo_id)

    importer_manager = manager_factory.repo_importer_manager()
    repo_importer = importer_manager.get_importer(repo_id)

    transfer_repo = common_utils.to_transfer_repo(repo)
    transfer_repo.working_dir = common_utils.importer_working_dir(repo_importer['importer_type_id'], repo_id, mkdir=True)

    # Convert the units into transfer units
    unit_type_ids = calculate_associated_type_ids(repo_id, removed_units)
    transfer_units = create_transfer_units(removed_units, unit_type_ids)

    # Retrieve the plugin instance to invoke
    importer_instance, plugin_config = plugin_api.get_importer_by_id(repo_importer['importer_type_id'])
    call_config = PluginCallConfiguration(plugin_config, repo_importer['config'])

    # Invoke the importer's remove method
    try:
        importer_instance.remove_units(transfer_repo, transfer_units, call_config)
    except Exception:
        _LOG.exception('Exception from importer [%s] while removing units from repo [%s]' % (repo_importer['id'], repo_id))
开发者ID:juwu,项目名称:pulp,代码行数:25,代码来源:unit_association.py


示例20: verify

 def verify(self, num_units=PluginTestBase.NUM_UNITS):
     # repository
     manager = managers.repo_query_manager()
     manager.get_repository(self.REPO_ID)
     # importer
     manager = managers.repo_importer_manager()
     importer = manager.get_importer(self.REPO_ID)
     manifest_url = importer['config'][constants.MANIFEST_URL_KEYWORD]
     self.assertTrue(manifest_url.endswith('%s/manifest.json.gz' % self.REPO_ID))
     # distributor
     manager = managers.repo_distributor_manager()
     manager.get_distributor(self.REPO_ID, FAKE_DISTRIBUTOR)
     self.assertRaises(MissingResource, manager.get_distributor, self.REPO_ID, constants.HTTP_DISTRIBUTOR)
     # check units
     manager = managers.repo_unit_association_query_manager()
     units = manager.get_units(self.REPO_ID)
     units = dict([(u['metadata']['N'], u) for u in units])
     self.assertEqual(len(units), num_units)
     for n in range(0, num_units):
         unit = units[n]
         unit_id = self.UNIT_ID % n
         metadata = unit['metadata']
         storage_path = metadata['_storage_path'].replace('//', '/')
         self.assertEqual(unit['unit_type_id'], self.UNIT_TYPE_ID)
         self.assertEqual(unit['repo_id'], self.REPO_ID)
         self.assertEqual(unit['owner_id'], constants.HTTP_IMPORTER)
         file_path = '.'.join((unit_id, self.UNIT_TYPE_ID))
         self.assertEqual(storage_path, os.path.join(self.childfs, 'content', file_path))
         self.assertTrue(os.path.exists(storage_path))
         fp = open(storage_path)
         content = fp.read()
         fp.close()
         self.assertEqual(content, unit_id)
开发者ID:bartwo,项目名称:pulp,代码行数:33,代码来源:test_plugins.py



注:本文中的pulp.server.managers.factory.repo_importer_manager函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python factory.repo_manager函数代码示例发布时间:2022-05-25
下一篇:
Python factory.repo_group_manager函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap