• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python factory.content_manager函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.server.managers.factory.content_manager函数的典型用法代码示例。如果您正苦于以下问题:Python content_manager函数的具体用法?Python content_manager怎么用?Python content_manager使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了content_manager函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_resolve_dependencies_by_criteria

    def test_resolve_dependencies_by_criteria(self):
        # Setup
        report = 'dep report'
        mock_plugins.MOCK_IMPORTER.resolve_dependencies.return_value = report

        unit_id_1 = manager_factory.content_manager().add_content_unit('type-1', None,
                                                                       {'key-1': 'unit-id-1'})
        unit_id_2 = manager_factory.content_manager().add_content_unit('type-1', None,
                                                                       {'key-1': 'dep-1'})

        association_manager = manager_factory.repo_unit_association_manager()
        association_manager.associate_unit_by_id(self.repo_id, 'type-1', unit_id_1)
        association_manager.associate_unit_by_id(self.repo_id, 'type-1', unit_id_2)

        criteria = UnitAssociationCriteria(type_ids=['type-1'], unit_filters={'key-1': 'unit-id-1'})

        # Test
        result = self.manager.resolve_dependencies_by_criteria(self.repo_id, criteria, {})

        # Verify
        self.assertEqual(report, result)

        self.assertEqual(1, mock_plugins.MOCK_IMPORTER.resolve_dependencies.call_count)

        args = mock_plugins.MOCK_IMPORTER.resolve_dependencies.call_args[0]
        self.assertEqual(1, len(args[1]))
开发者ID:credativ,项目名称:pulp,代码行数:26,代码来源:test_dependency.py


示例2: test_resolve_dependencies_by_unit

    def test_resolve_dependencies_by_unit(self):
        # Setup
        report = 'dep report'
        mock_plugins.MOCK_IMPORTER.resolve_dependencies.return_value = report

        unit_id_1 = manager_factory.content_manager().add_content_unit('type-1', None, {'key-1' : 'v1'})
        unit_id_2 = manager_factory.content_manager().add_content_unit('type-1', None, {'key-1' : 'v2'})

        association_manager = manager_factory.repo_unit_association_manager()
        association_manager.associate_unit_by_id(self.repo_id, 'type-1', unit_id_1, 'user', 'admin')
        association_manager.associate_unit_by_id(self.repo_id, 'type-1', unit_id_2, 'user', 'admin')

        # Test
        result = self.manager.resolve_dependencies_by_units(self.repo_id, [], {})

        # Verify
        self.assertEqual(result, report)

        self.assertEqual(1, mock_plugins.MOCK_IMPORTER.resolve_dependencies.call_count)

        args = mock_plugins.MOCK_IMPORTER.resolve_dependencies.call_args[0]
        self.assertEqual(args[0].id, self.repo_id)
        self.assertEqual(len(args[1]), 0)
        self.assertTrue(isinstance(args[2], DependencyResolutionConduit))
        self.assertTrue(isinstance(args[3], PluginCallConfiguration))
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:25,代码来源:test_dependency_manager.py


示例3: link_unit

    def link_unit(self, from_unit, to_unit, bidirectional=False):
        """
        Creates a reference between two content units. The semantics of what
        this relationship means depends on the types of content units being
        used; this call simply ensures that Pulp will save and make available
        the indication that a reference exists from one unit to another.

        By default, the reference will only exist on the from_unit side. If
        the bidirectional flag is set to true, a second reference will be created
        on the to_unit to refer back to the from_unit.

        Units passed to this call must have their id fields set by the Pulp server.

        @param from_unit: owner of the reference
        @type  from_unit: L{Unit}

        @param to_unit: will be referenced by the from_unit
        @type  to_unit: L{Unit}
        """
        content_manager = manager_factory.content_manager()

        try:
            content_manager.link_referenced_content_units(from_unit.type_id, from_unit.id, to_unit.type_id, [to_unit.id])

            if bidirectional:
                content_manager.link_referenced_content_units(to_unit.type_id, to_unit.id, from_unit.type_id, [from_unit.id])
        except Exception, e:
            _LOG.exception(_('Child link from parent [%(parent)s] to child [%(child)s] failed' %
                             {'parent': str(from_unit), 'child': str(to_unit)}))
            raise ImporterConduitException(e), None, sys.exc_info()[2]
开发者ID:signull,项目名称:pulp,代码行数:30,代码来源:mixins.py


示例4: _add_unit

    def _add_unit(self, unit, pulp_unit):
        """
        Add a unit. If it already exists, update it.

        This deals with a race condition where a unit might try to be updated,
        but does not exist. Before this method can complete, another workflow
        might add that same unit, causing the DuplicateKeyError below. This can
        happen if two syncs are running concurrently of repositories that have
        overlapping content.

        :param unit:        the unit to be updated
        :type  unit:        pulp.plugins.model.Unit
        :param pulp_unit:   the unit to be updated, as a dict
        :type  pulp_unit:   dict

        :return:    id of the updated unit
        :rtype:     basestring
        """
        content_manager = manager_factory.content_manager()
        try:
            unit_id = content_manager.add_content_unit(unit.type_id, None, pulp_unit)
            self._added_count += 1
            return unit_id
        except DuplicateKeyError:
            _logger.debug(_('cannot add unit; already exists. updating instead.'))
            return self._update_unit(unit, pulp_unit)
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:26,代码来源:mixins.py


示例5: test_syntactic_sugar_methods

    def test_syntactic_sugar_methods(self):
        """
        Tests the syntactic sugar methods for retrieving specific managers.
        """
        # Setup
        factory.initialize()

        # Test
        self.assertTrue(isinstance(factory.authentication_manager(), AuthenticationManager))
        self.assertTrue(isinstance(factory.cert_generation_manager(), CertGenerationManager))
        self.assertTrue(isinstance(factory.certificate_manager(), CertificateManager))
        self.assertTrue(isinstance(factory.password_manager(), PasswordManager))
        self.assertTrue(isinstance(factory.permission_manager(), PermissionManager))
        self.assertTrue(isinstance(factory.permission_query_manager(), PermissionQueryManager))
        self.assertTrue(isinstance(factory.role_manager(), RoleManager))
        self.assertTrue(isinstance(factory.role_query_manager(), RoleQueryManager))
        self.assertTrue(isinstance(factory.user_manager(), UserManager))
        self.assertTrue(isinstance(factory.user_query_manager(), UserQueryManager))
        self.assertTrue(isinstance(factory.repo_manager(), RepoManager))
        self.assertTrue(isinstance(factory.repo_unit_association_manager(),
                                   RepoUnitAssociationManager))
        self.assertTrue(isinstance(factory.repo_publish_manager(), RepoPublishManager))
        self.assertTrue(isinstance(factory.repo_query_manager(), RepoQueryManager))
        self.assertTrue(isinstance(factory.repo_sync_manager(), RepoSyncManager))
        self.assertTrue(isinstance(factory.content_manager(), ContentManager))
        self.assertTrue(isinstance(factory.content_query_manager(), ContentQueryManager))
        self.assertTrue(isinstance(factory.content_upload_manager(), ContentUploadManager))
        self.assertTrue(isinstance(factory.consumer_manager(), ConsumerManager))
        self.assertTrue(isinstance(factory.topic_publish_manager(), TopicPublishManager))
开发者ID:credativ,项目名称:pulp,代码行数:29,代码来源:test_factory.py


示例6: add_unit

def add_unit(id, repo_id, type_id):
    metadata = {'id' : id, 'repo_id' : repo_id,}

    unit_id = factory.content_manager().add_content_unit(
        type_id, None, metadata)

    return unit_id
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:7,代码来源:test_0004_migrate.py


示例7: gen_content_unit_with_directory

def gen_content_unit_with_directory(content_type_id, content_root, name=None):
    name = name or ''.join(random.sample(string.ascii_letters, 5))
    path = os.path.join(content_root, name)
    os.mkdir(path)
    unit = {'name': name,
            '_content_type_id': content_type_id,
            '_storage_path': path}
    content_manager = manager_factory.content_manager()
    unit_id = content_manager.add_content_unit(content_type_id, None, unit)
    unit['_id'] = unit_id
    return unit
开发者ID:domcleal,项目名称:pulp,代码行数:11,代码来源:test_content_orphan_manager.py


示例8: gen_content_unit

def gen_content_unit(content_type_id, content_root, name=None):
    name = name or "".join(random.sample(string.ascii_letters, 5))
    path = os.path.join(content_root, name)
    file = open(path, mode="w")
    file.write("")
    file.close()
    unit = {"name": name, "_content_type_id": content_type_id, "_storage_path": path}
    content_manager = manager_factory.content_manager()
    unit_id = content_manager.add_content_unit(content_type_id, None, unit)
    unit["_id"] = unit_id
    return unit
开发者ID:ryanschneider,项目名称:pulp,代码行数:11,代码来源:test_content_orphan_manager.py


示例9: gen_content_unit

def gen_content_unit(content_type_id, content_root, name=None):
    name = name or ''.join(random.sample(string.ascii_letters, 5))
    path = os.path.join(content_root, name)
    file = open(path, mode='w')
    file.write('')
    file.close()
    unit = {'name': name,
            '_content_type_id': content_type_id,
            '_storage_path': path}
    content_manager = manager_factory.content_manager()
    unit_id = content_manager.add_content_unit(content_type_id, None, unit)
    unit['_id'] = unit_id
    return unit
开发者ID:domcleal,项目名称:pulp,代码行数:13,代码来源:test_content_orphan_manager.py


示例10: populate_units

 def populate_units(self, key, typedef, additional_key=None):
     for i in range(1, 10):
         unit_id = 'unit-%s' % self.UNIT_ID
         md = {key: str(i)}
         if additional_key:
             md[additional_key] = str(i)
         manager = factory.content_manager()
         manager.add_content_unit(typedef.id, unit_id, md)
         manager = factory.repo_unit_association_manager()
         manager.associate_unit_by_id(
             self.REPO_ID,
             typedef.id,
             unit_id)
         self.UNIT_ID += 1
开发者ID:credativ,项目名称:pulp,代码行数:14,代码来源:test_profiler.py


示例11: populate_units

 def populate_units(self, key, typedef):
     for i in range(1,10):
         unit_id = 'unit-%s' % self.UNIT_ID
         md = {key:str(i)}
         manager = factory.content_manager()
         manager.add_content_unit(typedef.id, unit_id, md)
         manager = factory.repo_unit_association_manager()
         manager.associate_unit_by_id(
             self.REPO_ID,
             typedef.id,
             unit_id,
             OWNER_TYPE_IMPORTER,
             'test-importer')
         self.UNIT_ID += 1
开发者ID:domcleal,项目名称:pulp,代码行数:14,代码来源:test_profiler_conduit.py


示例12: test_syntactic_sugar_methods

    def test_syntactic_sugar_methods(self):
        """
        Tests the syntactic sugar methods for retrieving specific managers.
        """

        # Test
        self.assertTrue(isinstance(factory.repo_manager(), RepoManager))
        self.assertTrue(isinstance(factory.repo_unit_association_manager(), RepoUnitAssociationManager))
        self.assertTrue(isinstance(factory.repo_publish_manager(), RepoPublishManager))
        self.assertTrue(isinstance(factory.repo_query_manager(), RepoQueryManager))
        self.assertTrue(isinstance(factory.repo_sync_manager(), RepoSyncManager))
        self.assertTrue(isinstance(factory.content_manager(), ContentManager))
        self.assertTrue(isinstance(factory.content_query_manager(), ContentQueryManager))
        self.assertTrue(isinstance(factory.content_upload_manager(), ContentUploadManager))
        self.assertTrue(isinstance(factory.consumer_manager(), ConsumerManager))
开发者ID:stpierre,项目名称:pulp,代码行数:15,代码来源:test_manager_factory.py


示例13: populate

 def populate(self):
     # make content/ dir.
     os.makedirs(os.path.join(self.parentfs, 'content'))
     pulp_conf.set('server', 'storage_dir', self.parentfs)
     # create repo
     manager = managers.repo_manager()
     manager.create_repo(self.REPO_ID)
     # add units
     units = []
     for n in range(0, self.NUM_UNITS):
         unit_id = self.UNIT_ID % n
         unit = dict(self.UNIT_METADATA)
         unit['N'] = n
         # add unit file
         storage_dir = pulp_conf.get('server', 'storage_dir')
         storage_path = \
             os.path.join(storage_dir, 'content',
                 '.'.join((unit_id, self.UNIT_TYPE_ID)))
         unit['_storage_path'] = storage_path
         fp = open(storage_path, 'w+')
         fp.write(unit_id)
         fp.close()
         # add unit
         manager = managers.content_manager()
         manager.add_content_unit(
             self.UNIT_TYPE_ID,
             unit_id,
             unit)
         manager = managers.repo_unit_association_manager()
         # associate unit
         manager.associate_unit_by_id(
             self.REPO_ID,
             self.UNIT_TYPE_ID,
             unit_id,
             RepoContentUnit.OWNER_TYPE_IMPORTER,
             constants.HTTP_IMPORTER)
         units.append(unit)
     # CA
     self.units = units
     path = os.path.join(self.parentfs, 'ca.crt')
     fp = open(path, 'w+')
     fp.write(self.CA_CERT)
     fp.close()
     # client cert
     path = os.path.join(self.parentfs, 'local.crt')
     fp = open(path, 'w+')
     fp.write(self.CLIENT_CERT)
     fp.close()
开发者ID:bartwo,项目名称:pulp,代码行数:48,代码来源:test_plugins.py


示例14: add_units

def add_units(num_units=10):
    units = []
    n = 0
    for type_id in ALL_TYPES:
        for x in range(0, num_units):
            unit_id = create_unit_id(type_id, n)
            unit = dict(UNIT_METADATA)
            unit['N'] = n
            unit['_storage_path'] = create_storage_path(unit_id)
            manager = managers.content_manager()
            manager.add_content_unit(type_id, unit_id, unit)
            manager = managers.repo_unit_association_manager()
            # associate unit
            with mock.patch('pulp.server.managers.repo.unit_association.repo_controller'):
                manager.associate_unit_by_id(REPO_ID, type_id, unit_id)
            units.append(unit)
            n += 1
    return units
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:18,代码来源:test_conduit.py


示例15: PUT

    def PUT(self, type_id, unit_id):
        """
        Set the pulp_user_metadata field on the existing unit.

        :param type_id: The Unit's type id.
        :type  type_id: basestring
        :param unit_id: The id of the unit that you wish to set the pulp_user_metadata field on
        :type  unit_id: basestring
        """
        cqm = factory.content_query_manager()
        try:
            cqm.get_content_unit_by_id(type_id, unit_id)
        except MissingResource:
            return self.not_found(_('No content unit resource: %(r)s') % {'r': unit_id})

        cm = factory.content_manager()
        delta = {constants.PULP_USER_METADATA_FIELDNAME: self.params()}
        cm.update_content_unit(type_id, unit_id, delta)
        return self.ok(None)
开发者ID:beav,项目名称:pulp,代码行数:19,代码来源:contents.py


示例16: save_unit

    def save_unit(self, unit):
        """
        Performs two distinct steps on the Pulp server:
        - Creates or updates Pulp's knowledge of the content unit.
        - Associates the unit to the repository being synchronized.

        If a unit with the provided unit key already exists, it is updated with
        the attributes on the passed-in unit.

        A reference to the provided unit is returned from this call. This call
        will populate the unit's id field with the UUID for the unit.

        @param unit: unit object returned from the init_unit call
        @type  unit: L{Unit}

        @return: object reference to the provided unit, its state updated from the call
        @rtype:  L{Unit}
        """
        try:
            content_query_manager = manager_factory.content_query_manager()
            content_manager = manager_factory.content_manager()
            association_manager = manager_factory.repo_unit_association_manager()

            # Save or update the unit
            pulp_unit = common_utils.to_pulp_unit(unit)
            try:
                existing_unit = content_query_manager.get_content_unit_by_keys_dict(unit.type_id, unit.unit_key)
                unit.id = existing_unit['_id']
                content_manager.update_content_unit(unit.type_id, unit.id, pulp_unit)
                self._updated_count += 1
            except MissingResource:
                unit.id = content_manager.add_content_unit(unit.type_id, None, pulp_unit)
                self._added_count += 1

            # Associate it with the repo
            association_manager.associate_unit_by_id(self.repo_id, unit.type_id, unit.id, self.association_owner_type, self.association_owner_id)

            return unit
        except Exception, e:
            _LOG.exception(_('Content unit association failed [%s]' % str(unit)))
            raise ImporterConduitException(e), None, sys.exc_info()[2]
开发者ID:signull,项目名称:pulp,代码行数:41,代码来源:mixins.py


示例17: add_units

def add_units(num_units=10):
    units = []
    n = 0
    for type_id in ALL_TYPES:
        for x in range(0, num_units):
            unit_id = create_unit_id(type_id, n)
            unit = dict(UNIT_METADATA)
            unit['N'] = n
            unit['_storage_path'] = create_storage_path(unit_id)
            manager = managers.content_manager()
            manager.add_content_unit(type_id, unit_id, unit)
            manager = managers.repo_unit_association_manager()
            # associate unit
            manager.associate_unit_by_id(
                REPO_ID,
                type_id,
                unit_id,
                RepoContentUnit.OWNER_TYPE_IMPORTER,
                constants.HTTP_IMPORTER)
            units.append(unit)
            n += 1
    return units
开发者ID:cliffy94,项目名称:pulp,代码行数:22,代码来源:test_conduit.py


示例18: add_units

 def add_units(self, begin, end):
     units = []
     storage_dir = os.path.join(pulp_conf.get('server', 'storage_dir'), 'content')
     if not os.path.exists(storage_dir):
         os.makedirs(storage_dir)
     for n in range(begin, end):
         unit_id = self.UNIT_ID % n
         unit = dict(self.UNIT_KEY)
         unit.update(self.UNIT_METADATA)
         unit['N'] = n
         # add unit file
         storage_path = os.path.join(storage_dir, '.'.join((unit_id, self.UNIT_TYPE_ID)))
         if n % 2 == 0:  # even numbered has file associated
             unit['_storage_path'] = storage_path
             if n == 0:  # 1st one is a directory of files
                 os.makedirs(storage_path)
                 dist_path = os.path.join(os.path.dirname(__file__), 'data/distribution.tar')
                 tb = tarfile.open(dist_path)
                 tb.extractall(path=storage_path)
                 tb.close()
             else:
                 with open(storage_path, 'w+') as fp:
                     fp.write(unit_id)
         # add unit
         manager = managers.content_manager()
         manager.add_content_unit(
             self.UNIT_TYPE_ID,
             unit_id,
             unit)
         manager = managers.repo_unit_association_manager()
         # associate unit
         manager.associate_unit_by_id(
             self.REPO_ID,
             self.UNIT_TYPE_ID,
             unit_id,
             RepoContentUnit.OWNER_TYPE_IMPORTER,
             constants.HTTP_IMPORTER)
         units.append(unit)
     return units
开发者ID:ipanova,项目名称:pulp,代码行数:39,代码来源:test_plugins.py


示例19: _update_unit

    def _update_unit(self, unit, pulp_unit):
        """
        Update a unit. If it is not found, add it.

        :param unit:        the unit to be updated
        :type  unit:        pulp.plugins.model.Unit
        :param pulp_unit:   the unit to be updated, as a dict
        :type  pulp_unit:   dict

        :return:    id of the updated unit
        :rtype:     basestring
        """
        content_query_manager = manager_factory.content_query_manager()
        content_manager = manager_factory.content_manager()
        try:
            existing_unit = content_query_manager.get_content_unit_by_keys_dict(unit.type_id, unit.unit_key)
            unit_id = existing_unit['_id']
            content_manager.update_content_unit(unit.type_id, unit_id, pulp_unit)
            self._updated_count += 1
            return unit_id
        except MissingResource:
            logger.debug(_('cannot update unit; does not exist. adding instead.'))
            return self._add_unit(unit, pulp_unit)
开发者ID:kevinaloys,项目名称:pulp,代码行数:23,代码来源:mixins.py


示例20: put

    def put(self, request, type_id, unit_id):
        """
        Set the pulp_user_metadata field on a content unit.

        :param type_id: The Unit's type id.
        :type  type_id: basestring
        :param unit_id: The id of the unit that you wish to set the pulp_user_metadata field on
        :type  unit_id: basestring

        :return: response containing pulp user metadata_field
        :rtype: django.http.HttpResponse or HttpResponseNotFound
        """
        params = request.body_as_json
        cqm = factory.content_query_manager()
        try:
            cqm.get_content_unit_by_id(type_id, unit_id)
        except MissingResource:
            msg = _("No content unit resource: %(r)s") % {"r": unit_id}
            return generate_json_response(msg, HttpResponseNotFound)

        cm = factory.content_manager()
        delta = {constants.PULP_USER_METADATA_FIELDNAME: params}
        cm.update_content_unit(type_id, unit_id, delta)
        return generate_json_response(None)
开发者ID:credativ,项目名称:pulp,代码行数:24,代码来源:content.py



注:本文中的pulp.server.managers.factory.content_manager函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python factory.content_orphan_manager函数代码示例发布时间:2022-05-25
下一篇:
Python factory.consumer_profile_manager函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap