• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python database.update_database函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.plugins.types.database.update_database函数的典型用法代码示例。如果您正苦于以下问题:Python update_database函数的具体用法?Python update_database怎么用?Python update_database使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了update_database函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_update_no_changes

    def test_update_no_changes(self):
        """
        Tests the common use case of loading type definitions that have been
        loaded already and have not changed.
        """

        # Setup
        defs = [DEF_1, DEF_2, DEF_3, DEF_4]
        types_db.update_database(defs)

        # Test
        same_defs = [DEF_4, DEF_3, DEF_2, DEF_1] # no real reason for this, just felt better than using the previous list
        types_db.update_database(same_defs)

        # Verify
        all_collection_names = types_db.all_type_collection_names()
        self.assertEqual(len(same_defs), len(all_collection_names))

        for d in defs:
            self.assertTrue(types_db.unit_collection_name(d.id) in all_collection_names)

            # Quick sanity check on the indexes
            collection = types_db.type_units_collection(d.id)
            all_indexes = collection.index_information()

            total_index_count = 1 + 1 + len(d.search_indexes) # _id + unit key + all search
            self.assertEqual(total_index_count, len(all_indexes))
开发者ID:ashcrow,项目名称:pulp,代码行数:27,代码来源:test_types_database.py


示例2: test_update_missing_no_error

    def test_update_missing_no_error(self):
        """
        Tests that updating a previously loaded database with some missing
        definitions does not throw an error.
        """

        # Setup
        defs = [DEF_1, DEF_2, DEF_3]
        types_db.update_database(defs)

        # Test
        new_defs = [DEF_4]
        types_db.update_database(new_defs)

        # Verify
        all_collection_names = types_db.all_type_collection_names()
        self.assertEqual(len(defs) + len(new_defs), len(all_collection_names)) # old are not deleted

        for d in defs:
            self.assertTrue(types_db.unit_collection_name(d.id) in all_collection_names)

            # Quick sanity check on the indexes
            collection = types_db.type_units_collection(d.id)
            all_indexes = collection.index_information()

            total_index_count = 1 + 1 + len(d.search_indexes) # _id + unit key + all search
            self.assertEqual(total_index_count, len(all_indexes))
开发者ID:ashcrow,项目名称:pulp,代码行数:27,代码来源:test_types_database.py


示例3: init_types

 def init_types(self):
     database.clean()
     fp = open(self.TYPES_PATH)
     td = TypeDescriptor(os.path.basename(self.TYPES_PATH), fp.read())
     fp.close()
     definitions = parser.parse([td])
     database.update_database(definitions)
开发者ID:bechtoldt,项目名称:pulp_rpm,代码行数:7,代码来源:test_errata.py


示例4: setUp

    def setUp(self):
        super(RepoUnitAssociationManagerTests, self).setUp()
        database.update_database([TYPE_1_DEF, TYPE_2_DEF, MOCK_TYPE_DEF])
        mock_plugins.install()

        self.manager = association_manager.RepoUnitAssociationManager()
        self.repo_manager = repo_manager.RepoManager()
        self.importer_manager = importer_manager.RepoImporterManager()
        self.content_manager = content_cud_manager.ContentManager()

        # Set up a valid configured repo for the tests
        self.repo_id = 'associate-repo'
        self.repo_manager.create_repo(self.repo_id)
        self.importer_manager.set_importer(self.repo_id, 'mock-importer', {})

        # Create units that can be associated to a repo
        self.unit_type_id = 'mock-type'

        self.unit_id = 'test-unit-id'
        self.unit_key = {'key-1': 'test-unit'}
        self.content_manager.add_content_unit(self.unit_type_id, self.unit_id, self.unit_key)

        self.unit_id_2 = 'test-unit-id-2'
        self.unit_key_2 = {'key-1': 'test-unit-2'}
        self.content_manager.add_content_unit(self.unit_type_id, self.unit_id_2, self.unit_key_2)
开发者ID:pombreda,项目名称:pulp,代码行数:25,代码来源:test_unit_association.py


示例5: setUp

 def setUp(self):
     super(UnitAssociationQueryTests, self).setUp()
     database.update_database(_QUERY_TYPES)
     self.manager = association_query_manager.RepoUnitAssociationQueryManager()
     self.association_manager = association_manager.RepoUnitAssociationManager()
     self.content_manager = content_cud_manager.ContentManager()
     self._populate()
开发者ID:alanoe,项目名称:pulp,代码行数:7,代码来源:test_unit_association_query.py


示例6: setUp

    def setUp(self):
        super(Migration0004Tests, self).setUp()

        # Special way to import modules that start with a number
        self.migration = _import_all_the_way(
            'pulp_rpm.plugins.migrations.0004_pkg_group_category_repoid')

        factory.initialize()
        api.initialize(False)
        types_db.update_database([TYPE_DEF_GROUP, TYPE_DEF_CATEGORY])

        # Create the repositories necessary for the tests
        self.source_repo_id = 'source-repo'  # where units were copied from with the bad code
        self.dest_repo_id = 'dest-repo'  # where bad units were copied to

        source_repo = model.Repository(repo_id=self.source_repo_id)
        source_repo.save()
        dest_repo = model.Repository(repo_id=self.dest_repo_id)
        dest_repo.save()

        source_importer = model.Importer(self.source_repo_id, 'yum_importer', {})
        source_importer.save()

        dest_importer = model.Importer(self.dest_repo_id, 'yum_importer', {})
        dest_importer.save()
开发者ID:FlorianHeigl,项目名称:pulp_rpm,代码行数:25,代码来源:test_0004_migrate.py


示例7: setUp

    def setUp(self):
        super(Migration0004Tests, self).setUp()

        # Special way to import modules that start with a number
        self.migration = _import_all_the_way(
            'pulp_rpm.plugins.migrations.0004_pkg_group_category_repoid')

        factory.initialize()
        types_db.update_database([TYPE_DEF_GROUP, TYPE_DEF_CATEGORY])

        # Create the repositories necessary for the tests
        self.source_repo_id = 'source-repo'  # where units were copied from with the bad code
        self.dest_repo_id = 'dest-repo'  # where bad units were copied to

        source_repo = Repo(self.source_repo_id, '')
        Repo.get_collection().insert(source_repo, safe=True)

        dest_repo = Repo(self.dest_repo_id, '')
        Repo.get_collection().insert(dest_repo, safe=True)

        source_importer = RepoImporter(self.source_repo_id, 'yum_importer', 'yum_importer', {})
        RepoImporter.get_collection().insert(source_importer, safe=True)

        dest_importer = RepoImporter(self.dest_repo_id, 'yum_importer', 'yum_importer', {})
        RepoImporter.get_collection().insert(dest_importer, safe=True)
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:25,代码来源:test_0004_migrate.py


示例8: load_content_types

def load_content_types(types_dir=_TYPES_DIR, dry_run=False, drop_indices=False):
    """
    Check or update database with content unit types information.

    :param types_dir: path to content unit type JSON files,
                      currently used only for node.json
    :type  types_dir:  str
    :param dry_run: if True, no modifications to database will be made, defaults to False
    :type  dry_run:  bool
    :param drop_indices: if True, indices for the collections of modified unit types
                         will be dropped, defaults to False
    :type  drop_indices:  bool
    :return: None if dry_run is set to False,
             list of content unit types to be created or updated, if dry_run is set to True
    :rtype:  None or list of TypeDefinition
    """
    if not os.access(types_dir, os.F_OK | os.R_OK):
        msg = _('Cannot load types: path does not exist or cannot be read: %(p)s')
        _logger.critical(msg % {'p': types_dir})
        raise IOError(msg % {'p': types_dir})

    # to handle node.json only
    descriptors = _load_type_descriptors(types_dir)
    pre_mongoengine_definitions = parser.parse(descriptors)

    # get information about content unit types from entry points
    mongoengine_definitions = _generate_plugin_definitions()

    if dry_run:
        return _check_content_definitions(pre_mongoengine_definitions + mongoengine_definitions)
    else:
        database.update_database(pre_mongoengine_definitions, drop_indices=drop_indices,
                                 create_indexes=True)
        database.update_database(mongoengine_definitions, drop_indices=drop_indices,
                                 create_indexes=False)
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:35,代码来源:api.py


示例9: setUp

 def setUp(self):
     super(UnitAssociationQueryTests, self).setUp()
     database.update_database(_QUERY_TYPES)
     self.manager = association_query_manager.RepoUnitAssociationQueryManager()
     self.association_manager = association_manager.RepoUnitAssociationManager()
     self.content_manager = content_cud_manager.ContentManager()
     # so we don't try to refresh the unit count on non-existing repos
     manager_factory._CLASSES[manager_factory.TYPE_REPO] = mock.MagicMock()
     self._populate()
开发者ID:fdammeke,项目名称:pulp,代码行数:9,代码来源:test_repo_unit_association_query_manager.py


示例10: setUp

 def setUp(self):
     super(BaseProfilerConduitTests, self).setUp()
     Consumer.get_collection().remove()
     RepoDistributor.get_collection().remove()
     Bind.get_collection().remove()
     RepoContentUnit.get_collection().remove()
     UnitProfile.get_collection().remove()
     plugin_api._create_manager()
     typedb.update_database([self.TYPE_1_DEF, self.TYPE_2_DEF])
     mock_plugins.install()
开发者ID:jeremycline,项目名称:pulp,代码行数:10,代码来源:test_profiler.py


示例11: setUp

    def setUp(self):
        super(RepoUnitAssociationManagerTests, self).setUp()
        database.update_database([TYPE_1_DEF, TYPE_2_DEF, MOCK_TYPE_DEF])
        mock_plugins.install()
        # so we don't try to refresh the unit count on non-existing repos
        manager_factory._CLASSES[manager_factory.TYPE_REPO] = mock.MagicMock()

        self.manager = association_manager.RepoUnitAssociationManager()
        self.repo_manager = repo_manager.RepoManager()
        self.importer_manager = importer_manager.RepoImporterManager()
        self.content_manager = content_cud_manager.ContentManager()
开发者ID:stpierre,项目名称:pulp,代码行数:11,代码来源:test_repo_unit_association_manager.py


示例12: setUp

    def setUp(self, mock_repo_qs, mock_imp_qs, mock_remove):
        super(RepoSyncConduitTests, self).setUp()
        mock_plugins.install()
        types_database.update_database([TYPE_1_DEF, TYPE_2_DEF])

        self.association_manager = association_manager.RepoUnitAssociationManager()
        self.association_query_manager = association_query_manager.RepoUnitAssociationQueryManager()
        self.content_manager = content_manager.ContentManager()
        self.query_manager = query_manager.ContentQueryManager()

        self.conduit = RepoSyncConduit('repo-1', 'test-importer', 'abc123')
        importer_controller.set_importer('repo-1', 'mock-importer', {})
开发者ID:alanoe,项目名称:pulp,代码行数:12,代码来源:test_repo_sync.py


示例13: setUp

    def setUp(self):
        super(DependencyManagerTests, self).setUp()

        mock_plugins.install()

        database.update_database([TYPE_1_DEF])

        self.repo_id = 'dep-repo'
        self.manager = manager_factory.dependency_manager()

        manager_factory.repo_manager().create_repo(self.repo_id)
        manager_factory.repo_importer_manager().set_importer(self.repo_id, 'mock-importer', {})
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:12,代码来源:test_dependency_manager.py


示例14: test_all_type_definitions

    def test_all_type_definitions(self):
        """
        Tests retrieving all type definitions from the database.
        """

        # Setup
        defs = [DEF_1, DEF_2, DEF_3, DEF_4]
        types_db.update_database(defs)

        # Test
        all_defs = types_db.all_type_definitions()

        # Verify
        self.assertEqual(4, len(all_defs))
开发者ID:ashcrow,项目名称:pulp,代码行数:14,代码来源:test_types_database.py


示例15: setUp

    def setUp(self):
        super(RepoSyncConduitTests, self).setUp()
        mock_plugins.install()
        types_database.update_database([TYPE_1_DEF, TYPE_2_DEF])

        self.repo_manager = repo_manager.RepoManager()
        self.importer_manager = importer_manager.RepoImporterManager()
        self.sync_manager = sync_manager.RepoSyncManager()
        self.association_manager = association_manager.RepoUnitAssociationManager()
        self.association_query_manager = association_query_manager.RepoUnitAssociationQueryManager()
        self.content_manager = content_manager.ContentManager()
        self.query_manager = query_manager.ContentQueryManager()

        self.repo_manager.create_repo('repo-1')
        self.conduit = RepoSyncConduit('repo-1', 'test-importer', 'importer', 'importer-id')
开发者ID:stpierre,项目名称:pulp,代码行数:15,代码来源:test_repo_sync_conduit.py


示例16: test_update_failed_create

    def test_update_failed_create(self):
        """
        Simulates a failure to create a collection by passing in a bad ID for
        the definition.
        """

        # Setup
        busted = TypeDefinition('[email protected]#$%^&*()', 'Busted', 'Busted', None, None, [])
        defs = [DEF_1, busted]

        # Tests
        try:
            types_db.update_database(defs)
            self.fail('Update with a failed create did not raise exception')
        except types_db.UpdateFailed, e:
            self.assertEqual(1, len(e.type_definitions))
            self.assertEqual(busted, e.type_definitions[0])
            print(e)
开发者ID:ashcrow,项目名称:pulp,代码行数:18,代码来源:test_types_database.py


示例17: test_update_missing_with_error

    def test_update_missing_with_error(self):
        """
        Tests that updating a previously loaded database with some missing
        definitions correctly throws an error when requested.
        """

        # Setup
        defs = [DEF_1, DEF_2, DEF_3]
        types_db.update_database(defs)

        # Test
        new_defs = [DEF_4]

        try:
            types_db.update_database(new_defs, error_on_missing_definitions=True)
            self.fail('MissingDefinitions exception expected')
        except types_db.MissingDefinitions, e:
            self.assertEqual(3, len(e.missing_type_ids))
            self.assertTrue(DEF_1.id in e.missing_type_ids)
            self.assertTrue(DEF_2.id in e.missing_type_ids)
            self.assertTrue(DEF_3.id in e.missing_type_ids)
            print(e) # used to test the __str__ impl
开发者ID:ashcrow,项目名称:pulp,代码行数:22,代码来源:test_types_database.py


示例18: test_update_clean_database

    def test_update_clean_database(self):
        """
        Tests calling update on a completely clean types database.
        """

        # Test
        defs = [DEF_1, DEF_2, DEF_3, DEF_4]
        types_db.update_database(defs)

        # Verify
        all_collection_names = types_db.all_type_collection_names()
        self.assertEqual(len(defs), len(all_collection_names))

        for d in defs:
            self.assertTrue(types_db.unit_collection_name(d.id) in all_collection_names)

            # Quick sanity check on the indexes
            collection = types_db.type_units_collection(d.id)
            all_indexes = collection.index_information()

            total_index_count = 1 + 1 + len(d.search_indexes) # _id + unit key + all search
            self.assertEqual(total_index_count, len(all_indexes))
开发者ID:ashcrow,项目名称:pulp,代码行数:22,代码来源:test_types_database.py


示例19: setUp

 def setUp(self):
     super(OrphanManagerTests, self).setUp()
     content_type_db.update_database([PHONY_TYPE_1, PHONY_TYPE_2])
     self.content_root = tempfile.mkdtemp(prefix='content_orphan_manager_unittests-')
     self.orphan_manager = OrphanManager()
开发者ID:domcleal,项目名称:pulp,代码行数:5,代码来源:test_content_orphan_manager.py


示例20: _load_type_definitions

def _load_type_definitions(descriptors, drop_indices=False):
    """
    :type descriptors: list [TypeDescriptor, ...]
    """
    definitions = parser.parse(descriptors)
    database.update_database(definitions, drop_indices=drop_indices)
开发者ID:jeremycline,项目名称:pulp,代码行数:6,代码来源:api.py



注:本文中的pulp.plugins.types.database.update_database函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python metadata_writer.MetadataFileContext类代码示例发布时间:2022-05-25
下一篇:
Python database.type_units_collection函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap