• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python factory.repo_unit_association_manager函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.server.managers.factory.repo_unit_association_manager函数的典型用法代码示例。如果您正苦于以下问题:Python repo_unit_association_manager函数的具体用法?Python repo_unit_association_manager怎么用?Python repo_unit_association_manager使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了repo_unit_association_manager函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: POST

    def POST(self, dest_repo_id):

        # Params
        params = self.params()
        source_repo_id = params.get('source_repo_id', None)
        overrides = params.get('override_config', None)

        if source_repo_id is None:
            raise exceptions.MissingValue(['source_repo_id'])

        criteria = params.get('criteria', None)
        if criteria is not None:
            try:
                criteria = UnitAssociationCriteria.from_client_input(criteria)
            except:
                _LOG.exception('Error parsing association criteria [%s]' % criteria)
                raise exceptions.PulpDataException(), None, sys.exc_info()[2]

        association_manager = manager_factory.repo_unit_association_manager()
        resources = {dispatch_constants.RESOURCE_REPOSITORY_TYPE: {source_repo_id: dispatch_constants.RESOURCE_READ_OPERATION,
                                                                   dest_repo_id: dispatch_constants.RESOURCE_UPDATE_OPERATION}}
        tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, dest_repo_id),
                resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, source_repo_id),
                action_tag('associate')]
        call_request = CallRequest(association_manager.associate_from_repo,
                                   [source_repo_id, dest_repo_id],
                                   {'criteria': criteria, 'import_config_override': overrides},
                                   resources=resources,
                                   tags=tags,
                                   archive=True)
        return execution.execute_async(self, call_request)
开发者ID:ryanschneider,项目名称:pulp,代码行数:31,代码来源:repositories.py


示例2: save_unit

    def save_unit(self, unit):
        """
        Performs two distinct steps on the Pulp server:
        - Creates or updates Pulp's knowledge of the content unit.
        - Associates the unit to the repository being synchronized.

        If a unit with the provided unit key already exists, it is updated with
        the attributes on the passed-in unit.

        A reference to the provided unit is returned from this call. This call
        will populate the unit's id field with the UUID for the unit.

        :param unit: unit object returned from the init_unit call
        :type  unit: Unit

        :return: object reference to the provided unit, its state updated from the call
        :rtype:  Unit
        """
        try:
            association_manager = manager_factory.repo_unit_association_manager()

            # Save or update the unit
            pulp_unit = common_utils.to_pulp_unit(unit)
            unit.id = self._update_unit(unit, pulp_unit)

            # Associate it with the repo
            association_manager.associate_unit_by_id(
                self.repo_id, unit.type_id, unit.id)

            return unit
        except Exception, e:
            _logger.exception(_('Content unit association failed [%s]' % str(unit)))
            raise ImporterConduitException(e), None, sys.exc_info()[2]
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:33,代码来源:mixins.py


示例3: __init__

    def __init__(self, source_repo_id, dest_repo_id, source_importer_id, dest_importer_id):
        """
        :param source_repo_id: ID of the repository from which units are being copied
        :type  source_repo_id: str
        :param dest_repo_id: ID of the repository into which units are being copied
        :type  dest_repo_id: str
        :param source_importer_id: ID of the importer on the source repository
        :type  source_importer_id: str
        :param dest_importer_id:  ID of the importer on the destination repository
        :type  dest_importer_id: str
        """
        ImporterScratchPadMixin.__init__(self, dest_repo_id, dest_importer_id)
        RepoScratchPadMixin.__init__(self, dest_repo_id, ImporterConduitException)
        SearchUnitsMixin.__init__(self, ImporterConduitException)
        AddUnitMixin.__init__(self, dest_repo_id, dest_importer_id)

        self.source_repo_id = source_repo_id
        self.dest_repo_id = dest_repo_id

        self.source_importer_id = source_importer_id
        self.dest_importer_id = dest_importer_id

        self.__association_manager = manager_factory.repo_unit_association_manager()
        self.__association_query_manager = manager_factory.repo_unit_association_query_manager()
        self.__importer_manager = manager_factory.repo_importer_manager()
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:unit_import.py


示例4: __init__

    def __init__(self, source_repo_id, dest_repo_id, source_importer_id, dest_importer_id,
                 association_owner_type, association_owner_id):
        """
        :param source_repo_id: ID of the repository from which units are being copied
        :type  source_repo_id: str
        :param dest_repo_id: ID of the repository into which units are being copied
        :type  dest_repo_id: str
        :param source_importer_id: ID of the importer on the source repository
        :type  source_importer_id: str
        :param dest_importer_id:  ID of the importer on the destination repository
        :type  dest_importer_id: str
        :param association_owner_type: distinguishes the owner when creating an
               association through this conduit
        :type  association_owner_type: str
        :param association_owner_id: specific ID of the owner when creating an
               association through this conduit
        :type  association_owner_id: str
        """
        ImporterScratchPadMixin.__init__(self, dest_repo_id, dest_importer_id)
        RepoScratchPadMixin.__init__(self, dest_repo_id, ImporterConduitException)
        SearchUnitsMixin.__init__(self, ImporterConduitException)
        AddUnitMixin.__init__(self, dest_repo_id, dest_importer_id, association_owner_type, association_owner_id)

        self.source_repo_id = source_repo_id
        self.dest_repo_id = dest_repo_id

        self.source_importer_id = source_importer_id
        self.dest_importer_id = dest_importer_id

        self.association_owner_type = association_owner_type
        self.association_owner_id = association_owner_id

        self.__association_manager = manager_factory.repo_unit_association_manager()
        self.__association_query_manager = manager_factory.repo_unit_association_query_manager()
        self.__importer_manager = manager_factory.repo_importer_manager()
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:35,代码来源:unit_import.py


示例5: test_syntactic_sugar_methods

    def test_syntactic_sugar_methods(self):
        """
        Tests the syntactic sugar methods for retrieving specific managers.
        """
        # Setup
        factory.initialize()

        # Test
        self.assertTrue(isinstance(factory.authentication_manager(), AuthenticationManager))
        self.assertTrue(isinstance(factory.cert_generation_manager(), CertGenerationManager))
        self.assertTrue(isinstance(factory.certificate_manager(), CertificateManager))
        self.assertTrue(isinstance(factory.password_manager(), PasswordManager))
        self.assertTrue(isinstance(factory.permission_manager(), PermissionManager))
        self.assertTrue(isinstance(factory.permission_query_manager(), PermissionQueryManager))
        self.assertTrue(isinstance(factory.role_manager(), RoleManager))
        self.assertTrue(isinstance(factory.role_query_manager(), RoleQueryManager))
        self.assertTrue(isinstance(factory.user_manager(), UserManager))
        self.assertTrue(isinstance(factory.user_query_manager(), UserQueryManager))
        self.assertTrue(isinstance(factory.repo_manager(), RepoManager))
        self.assertTrue(isinstance(factory.repo_unit_association_manager(),
                                   RepoUnitAssociationManager))
        self.assertTrue(isinstance(factory.repo_publish_manager(), RepoPublishManager))
        self.assertTrue(isinstance(factory.repo_query_manager(), RepoQueryManager))
        self.assertTrue(isinstance(factory.repo_sync_manager(), RepoSyncManager))
        self.assertTrue(isinstance(factory.content_manager(), ContentManager))
        self.assertTrue(isinstance(factory.content_query_manager(), ContentQueryManager))
        self.assertTrue(isinstance(factory.content_upload_manager(), ContentUploadManager))
        self.assertTrue(isinstance(factory.consumer_manager(), ConsumerManager))
        self.assertTrue(isinstance(factory.topic_publish_manager(), TopicPublishManager))
开发者ID:credativ,项目名称:pulp,代码行数:29,代码来源:test_factory.py


示例6: test_resolve_dependencies_by_unit

    def test_resolve_dependencies_by_unit(self):
        # Setup
        report = 'dep report'
        mock_plugins.MOCK_IMPORTER.resolve_dependencies.return_value = report

        unit_id_1 = manager_factory.content_manager().add_content_unit('type-1', None, {'key-1' : 'v1'})
        unit_id_2 = manager_factory.content_manager().add_content_unit('type-1', None, {'key-1' : 'v2'})

        association_manager = manager_factory.repo_unit_association_manager()
        association_manager.associate_unit_by_id(self.repo_id, 'type-1', unit_id_1, 'user', 'admin')
        association_manager.associate_unit_by_id(self.repo_id, 'type-1', unit_id_2, 'user', 'admin')

        # Test
        result = self.manager.resolve_dependencies_by_units(self.repo_id, [], {})

        # Verify
        self.assertEqual(result, report)

        self.assertEqual(1, mock_plugins.MOCK_IMPORTER.resolve_dependencies.call_count)

        args = mock_plugins.MOCK_IMPORTER.resolve_dependencies.call_args[0]
        self.assertEqual(args[0].id, self.repo_id)
        self.assertEqual(len(args[1]), 0)
        self.assertTrue(isinstance(args[2], DependencyResolutionConduit))
        self.assertTrue(isinstance(args[3], PluginCallConfiguration))
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:25,代码来源:test_dependency_manager.py


示例7: test_resolve_dependencies_by_criteria

    def test_resolve_dependencies_by_criteria(self):
        # Setup
        report = 'dep report'
        mock_plugins.MOCK_IMPORTER.resolve_dependencies.return_value = report

        unit_id_1 = manager_factory.content_manager().add_content_unit('type-1', None,
                                                                       {'key-1': 'unit-id-1'})
        unit_id_2 = manager_factory.content_manager().add_content_unit('type-1', None,
                                                                       {'key-1': 'dep-1'})

        association_manager = manager_factory.repo_unit_association_manager()
        association_manager.associate_unit_by_id(self.repo_id, 'type-1', unit_id_1)
        association_manager.associate_unit_by_id(self.repo_id, 'type-1', unit_id_2)

        criteria = UnitAssociationCriteria(type_ids=['type-1'], unit_filters={'key-1': 'unit-id-1'})

        # Test
        result = self.manager.resolve_dependencies_by_criteria(self.repo_id, criteria, {})

        # Verify
        self.assertEqual(report, result)

        self.assertEqual(1, mock_plugins.MOCK_IMPORTER.resolve_dependencies.call_count)

        args = mock_plugins.MOCK_IMPORTER.resolve_dependencies.call_args[0]
        self.assertEqual(1, len(args[1]))
开发者ID:credativ,项目名称:pulp,代码行数:26,代码来源:test_dependency.py


示例8: __init__

    def __init__(self, repo_id, importer_id, association_owner_type, association_owner_id):
        RepoScratchPadMixin.__init__(self, repo_id, ImporterConduitException)
        ImporterScratchPadMixin.__init__(self, repo_id, importer_id)
        AddUnitMixin.__init__(self, repo_id, importer_id, association_owner_type, association_owner_id)
        SingleRepoUnitsMixin.__init__(self, repo_id, ImporterConduitException)
        StatusMixin.__init__(self, importer_id, ImporterConduitException)

        self._association_manager = manager_factory.repo_unit_association_manager()

        self._removed_count = 0
开发者ID:ehelms,项目名称:pulp,代码行数:10,代码来源:repo_sync.py


示例9: _purge_orphaned_blobs

    def _purge_orphaned_blobs(repo, units):
        """
        Purge blobs associated with removed manifests when no longer
        referenced by any remaining manifests.

        :param repo: The affected repository.
        :type  repo: pulp.plugins.model.Repository
        :param units: List of removed units.
        :type  units: list of: pulp.plugins.model.AssociatedUnit
        """
        # Find blob digests referenced by removed manifests (orphaned)

        orphaned = set()
        for unit in units:
            if unit.type_id != constants.MANIFEST_TYPE_ID:
                continue
            manifest = unit
            for layer in manifest.metadata['fs_layers']:
                digest = layer['blobSum']
                orphaned.add(digest)

        # Find blob digests still referenced by other manifests (adopted)

        if not orphaned:
            # nothing orphaned
            return
        adopted = set()
        manager = manager_factory.repo_unit_association_query_manager()
        for manifest in manager.get_units_by_type(repo.id, constants.MANIFEST_TYPE_ID):
            for layer in manifest.metadata['fs_layers']:
                digest = layer['blobSum']
                adopted.add(digest)

        # Remove unreferenced blobs

        orphaned = orphaned.difference(adopted)
        if not orphaned:
            # all adopted
            return

        unit_filter = {
            'digest': {
                '$in': sorted(orphaned)
            }
        }
        criteria = UnitAssociationCriteria(
            type_ids=[constants.BLOB_TYPE_ID],
            unit_filters=unit_filter)
        manager = manager_factory.repo_unit_association_manager()
        manager.unassociate_by_criteria(
            repo_id=repo.id,
            criteria=criteria,
            owner_type='',  # unused
            owner_id='',    # unused
            notify_plugins=False)
开发者ID:maxamillion,项目名称:pulp_docker,代码行数:55,代码来源:importer.py


示例10: __init__

    def __init__(self, repo_id, importer_id, importer_object_id):
        RepoScratchPadMixin.__init__(self, repo_id, ImporterConduitException)
        ImporterScratchPadMixin.__init__(self, repo_id, importer_id)
        AddUnitMixin.__init__(self, repo_id, importer_id)
        SingleRepoUnitsMixin.__init__(self, repo_id, ImporterConduitException)
        StatusMixin.__init__(self, importer_id, ImporterConduitException)
        SearchUnitsMixin.__init__(self, ImporterConduitException)

        self.importer_object_id = importer_object_id
        self._association_manager = manager_factory.repo_unit_association_manager()
        self._content_query_manager = manager_factory.content_query_manager()

        self._removed_count = 0
开发者ID:alanoe,项目名称:pulp,代码行数:13,代码来源:repo_sync.py


示例11: __init__

    def __init__(self, source_repo_id, dest_repo_id, source_importer_id, dest_importer_id):
        ImporterScratchPadMixin.__init__(self, dest_repo_id, dest_importer_id)
        RepoScratchPadMixin.__init__(self, dest_repo_id, ImporterConduitException)

        self.source_repo_id = source_repo_id
        self.dest_repo_id = dest_repo_id

        self.source_importer_id = source_importer_id
        self.dest_importer_id = dest_importer_id

        self.__association_manager = manager_factory.repo_unit_association_manager()
        self.__association_query_manager = manager_factory.repo_unit_association_query_manager()
        self.__importer_manager = manager_factory.repo_importer_manager()
开发者ID:stpierre,项目名称:pulp,代码行数:13,代码来源:unit_import.py


示例12: populate_units

 def populate_units(self, key, typedef, additional_key=None):
     for i in range(1, 10):
         unit_id = 'unit-%s' % self.UNIT_ID
         md = {key: str(i)}
         if additional_key:
             md[additional_key] = str(i)
         manager = factory.content_manager()
         manager.add_content_unit(typedef.id, unit_id, md)
         manager = factory.repo_unit_association_manager()
         manager.associate_unit_by_id(
             self.REPO_ID,
             typedef.id,
             unit_id)
         self.UNIT_ID += 1
开发者ID:credativ,项目名称:pulp,代码行数:14,代码来源:test_profiler.py


示例13: populate_units

 def populate_units(self, key, typedef):
     for i in range(1,10):
         unit_id = 'unit-%s' % self.UNIT_ID
         md = {key:str(i)}
         manager = factory.content_manager()
         manager.add_content_unit(typedef.id, unit_id, md)
         manager = factory.repo_unit_association_manager()
         manager.associate_unit_by_id(
             self.REPO_ID,
             typedef.id,
             unit_id,
             OWNER_TYPE_IMPORTER,
             'test-importer')
         self.UNIT_ID += 1
开发者ID:domcleal,项目名称:pulp,代码行数:14,代码来源:test_profiler_conduit.py


示例14: test_syntactic_sugar_methods

    def test_syntactic_sugar_methods(self):
        """
        Tests the syntactic sugar methods for retrieving specific managers.
        """

        # Test
        self.assertTrue(isinstance(factory.repo_manager(), RepoManager))
        self.assertTrue(isinstance(factory.repo_unit_association_manager(), RepoUnitAssociationManager))
        self.assertTrue(isinstance(factory.repo_publish_manager(), RepoPublishManager))
        self.assertTrue(isinstance(factory.repo_query_manager(), RepoQueryManager))
        self.assertTrue(isinstance(factory.repo_sync_manager(), RepoSyncManager))
        self.assertTrue(isinstance(factory.content_manager(), ContentManager))
        self.assertTrue(isinstance(factory.content_query_manager(), ContentQueryManager))
        self.assertTrue(isinstance(factory.content_upload_manager(), ContentUploadManager))
        self.assertTrue(isinstance(factory.consumer_manager(), ConsumerManager))
开发者ID:stpierre,项目名称:pulp,代码行数:15,代码来源:test_manager_factory.py


示例15: populate

 def populate(self):
     # make content/ dir.
     os.makedirs(os.path.join(self.parentfs, 'content'))
     pulp_conf.set('server', 'storage_dir', self.parentfs)
     # create repo
     manager = managers.repo_manager()
     manager.create_repo(self.REPO_ID)
     # add units
     units = []
     for n in range(0, self.NUM_UNITS):
         unit_id = self.UNIT_ID % n
         unit = dict(self.UNIT_METADATA)
         unit['N'] = n
         # add unit file
         storage_dir = pulp_conf.get('server', 'storage_dir')
         storage_path = \
             os.path.join(storage_dir, 'content',
                 '.'.join((unit_id, self.UNIT_TYPE_ID)))
         unit['_storage_path'] = storage_path
         fp = open(storage_path, 'w+')
         fp.write(unit_id)
         fp.close()
         # add unit
         manager = managers.content_manager()
         manager.add_content_unit(
             self.UNIT_TYPE_ID,
             unit_id,
             unit)
         manager = managers.repo_unit_association_manager()
         # associate unit
         manager.associate_unit_by_id(
             self.REPO_ID,
             self.UNIT_TYPE_ID,
             unit_id,
             RepoContentUnit.OWNER_TYPE_IMPORTER,
             constants.HTTP_IMPORTER)
         units.append(unit)
     # CA
     self.units = units
     path = os.path.join(self.parentfs, 'ca.crt')
     fp = open(path, 'w+')
     fp.write(self.CA_CERT)
     fp.close()
     # client cert
     path = os.path.join(self.parentfs, 'local.crt')
     fp = open(path, 'w+')
     fp.write(self.CLIENT_CERT)
     fp.close()
开发者ID:bartwo,项目名称:pulp,代码行数:48,代码来源:test_plugins.py


示例16: add_units

def add_units(num_units=10):
    units = []
    n = 0
    for type_id in ALL_TYPES:
        for x in range(0, num_units):
            unit_id = create_unit_id(type_id, n)
            unit = dict(UNIT_METADATA)
            unit['N'] = n
            unit['_storage_path'] = create_storage_path(unit_id)
            manager = managers.content_manager()
            manager.add_content_unit(type_id, unit_id, unit)
            manager = managers.repo_unit_association_manager()
            # associate unit
            with mock.patch('pulp.server.managers.repo.unit_association.repo_controller'):
                manager.associate_unit_by_id(REPO_ID, type_id, unit_id)
            units.append(unit)
            n += 1
    return units
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:18,代码来源:test_conduit.py


示例17: _purge_unlinked_blobs

    def _purge_unlinked_blobs(repo, manifest):
        """
        Purge blobs associated with the given Manifests when removing it would leave them no longer
        referenced by any remaining Manifests.

        :param repo:  The affected repository.
        :type  repo:  pulp.server.db.model.Repository
        :param units: List of removed units.
        :type  units: list of: pulp.plugins.model.AssociatedUnit
        """
        # Find blob digests referenced by removed manifests (orphaned)
        orphaned = set()
        map((lambda layer: orphaned.add(layer.blob_sum)), manifest.fs_layers)
        if not orphaned:
            # nothing orphaned
            return

        # Find blob digests still referenced by other manifests (adopted)
        adopted = set()
        criteria = UnitAssociationCriteria(type_ids=[constants.MANIFEST_TYPE_ID],
                                           unit_filters={'digest__ne': manifest.digest})
        for manifest in unit_association.RepoUnitAssociationManager._units_from_criteria(
                repo, criteria):
            map((lambda layer: adopted.add(layer.blob_sum)), manifest.fs_layers)

        # Remove unreferenced blobs
        orphaned = orphaned.difference(adopted)
        if not orphaned:
            # all adopted
            return

        unit_filter = {
            'digest': {
                '$in': sorted(orphaned)
            }
        }
        criteria = UnitAssociationCriteria(
            type_ids=[constants.BLOB_TYPE_ID],
            unit_filters=unit_filter)
        manager = manager_factory.repo_unit_association_manager()
        manager.unassociate_by_criteria(
            repo_id=repo.repo_id,
            criteria=criteria,
            notify_plugins=False)
开发者ID:daviddavis,项目名称:pulp_docker,代码行数:44,代码来源:importer.py


示例18: POST

    def POST(self, dest_repo_id):

        # Params
        params = self.params()
        source_repo_id = params.get('source_repo_id', None)
        overrides = params.get('override_config', None)

        if source_repo_id is None:
            raise exceptions.MissingValue(['source_repo_id'])

        # A 404 only applies to things in the URL, so the destination repo
        # check allows the MissingResource to bubble up, but if the source
        # repo doesn't exist, it's considered bad data.
        repo_query_manager = manager_factory.repo_query_manager()
        repo_query_manager.get_repository(dest_repo_id)

        try:
            repo_query_manager.get_repository(source_repo_id)
        except exceptions.MissingResource:
            raise exceptions.InvalidValue(['source_repo_id'])

        criteria = params.get('criteria', None)
        if criteria is not None:
            try:
                criteria = UnitAssociationCriteria.from_client_input(criteria)
            except:
                _LOG.error('Error parsing association criteria [%s]' % criteria)
                raise exceptions.PulpDataException(), None, sys.exc_info()[2]

        association_manager = manager_factory.repo_unit_association_manager()
        resources = {dispatch_constants.RESOURCE_REPOSITORY_TYPE: {source_repo_id: dispatch_constants.RESOURCE_READ_OPERATION,
                                                                   dest_repo_id: dispatch_constants.RESOURCE_UPDATE_OPERATION}}
        tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, dest_repo_id),
                resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, source_repo_id),
                action_tag('associate')]
        call_request = CallRequest(association_manager.associate_from_repo,
                                   [source_repo_id, dest_repo_id],
                                   {'criteria': criteria, 'import_config_override': overrides},
                                   resources=resources,
                                   tags=tags,
                                   archive=True)
        return execution.execute_async(self, call_request)
开发者ID:domcleal,项目名称:pulp,代码行数:42,代码来源:repositories.py


示例19: save_unit

    def save_unit(self, unit):
        """
        Performs two distinct steps on the Pulp server:
        - Creates or updates Pulp's knowledge of the content unit.
        - Associates the unit to the repository being synchronized.

        If a unit with the provided unit key already exists, it is updated with
        the attributes on the passed-in unit.

        A reference to the provided unit is returned from this call. This call
        will populate the unit's id field with the UUID for the unit.

        @param unit: unit object returned from the init_unit call
        @type  unit: L{Unit}

        @return: object reference to the provided unit, its state updated from the call
        @rtype:  L{Unit}
        """
        try:
            content_query_manager = manager_factory.content_query_manager()
            content_manager = manager_factory.content_manager()
            association_manager = manager_factory.repo_unit_association_manager()

            # Save or update the unit
            pulp_unit = common_utils.to_pulp_unit(unit)
            try:
                existing_unit = content_query_manager.get_content_unit_by_keys_dict(unit.type_id, unit.unit_key)
                unit.id = existing_unit['_id']
                content_manager.update_content_unit(unit.type_id, unit.id, pulp_unit)
                self._updated_count += 1
            except MissingResource:
                unit.id = content_manager.add_content_unit(unit.type_id, None, pulp_unit)
                self._added_count += 1

            # Associate it with the repo
            association_manager.associate_unit_by_id(self.repo_id, unit.type_id, unit.id, self.association_owner_type, self.association_owner_id)

            return unit
        except Exception, e:
            _LOG.exception(_('Content unit association failed [%s]' % str(unit)))
            raise ImporterConduitException(e), None, sys.exc_info()[2]
开发者ID:signull,项目名称:pulp,代码行数:41,代码来源:mixins.py


示例20: add_units

def add_units(num_units=10):
    units = []
    n = 0
    for type_id in ALL_TYPES:
        for x in range(0, num_units):
            unit_id = create_unit_id(type_id, n)
            unit = dict(UNIT_METADATA)
            unit['N'] = n
            unit['_storage_path'] = create_storage_path(unit_id)
            manager = managers.content_manager()
            manager.add_content_unit(type_id, unit_id, unit)
            manager = managers.repo_unit_association_manager()
            # associate unit
            manager.associate_unit_by_id(
                REPO_ID,
                type_id,
                unit_id,
                RepoContentUnit.OWNER_TYPE_IMPORTER,
                constants.HTTP_IMPORTER)
            units.append(unit)
            n += 1
    return units
开发者ID:cliffy94,项目名称:pulp,代码行数:22,代码来源:test_conduit.py



注:本文中的pulp.server.managers.factory.repo_unit_association_manager函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap