• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python repository.RepoImporter类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.server.db.model.repository.RepoImporter的典型用法代码示例。如果您正苦于以下问题:Python RepoImporter类的具体用法?Python RepoImporter怎么用?Python RepoImporter使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了RepoImporter类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: clean

    def clean(self):
        super(RepoManagerTests, self).clean()

        model.Repository.drop_collection()
        RepoImporter.get_collection().remove()
        RepoDistributor.get_collection().remove()
        TaskStatus.objects().delete()
开发者ID:nbetm,项目名称:pulp,代码行数:7,代码来源:test_managers.py


示例2: test_delete_with_plugins

    def test_delete_with_plugins(self):
        """
        Tests that deleting a repo that has importers and distributors configured deletes them as
        well.
        """

        # Setup
        self.manager.create_repo('doomed')

        importer_manager = manager_factory.repo_importer_manager()
        distributor_manager = manager_factory.repo_distributor_manager()

        importer_manager.set_importer('doomed', 'mock-importer', {})
        distributor_manager.add_distributor('doomed', 'mock-distributor', {}, True,
                                            distributor_id='dist-1')
        distributor_manager.add_distributor('doomed', 'mock-distributor', {}, True,
                                            distributor_id='dist-2')

        self.assertEqual(1, len(list(RepoImporter.get_collection().find({'repo_id': 'doomed'}))))
        self.assertEqual(2, len(list(RepoDistributor.get_collection().find({'repo_id': 'doomed'}))))

        # Test
        self.manager.delete_repo('doomed')

        # Verify
        self.assertEqual(0, len(list(Repo.get_collection().find())))

        self.assertEqual(0, len(list(RepoImporter.get_collection().find({'repo_id': 'doomed'}))))
        self.assertEqual(0, len(list(RepoDistributor.get_collection().find({'repo_id': 'doomed'}))))

        self.assertEqual(1, mock_plugins.MOCK_IMPORTER.importer_removed.call_count)
        self.assertEqual(2, mock_plugins.MOCK_DISTRIBUTOR.distributor_removed.call_count)

        repo_working_dir = common_utils.repository_working_dir('doomed', mkdir=False)
        self.assertTrue(not os.path.exists(repo_working_dir))
开发者ID:beav,项目名称:pulp,代码行数:35,代码来源:test_cud.py


示例3: clean

    def clean(self):
        super(RepoManagerTests, self).clean()

        Repo.get_collection().remove()
        RepoImporter.get_collection().remove()
        RepoDistributor.get_collection().remove()
        dispatch.TaskStatus.objects().delete()
开发者ID:beav,项目名称:pulp,代码行数:7,代码来源:test_cud.py


示例4: setUp

    def setUp(self):
        super(Migration0004Tests, self).setUp()

        # Special way to import modules that start with a number
        self.migration = _import_all_the_way(
            'pulp_rpm.plugins.migrations.0004_pkg_group_category_repoid')

        factory.initialize()
        types_db.update_database([TYPE_DEF_GROUP, TYPE_DEF_CATEGORY])

        # Create the repositories necessary for the tests
        self.source_repo_id = 'source-repo'  # where units were copied from with the bad code
        self.dest_repo_id = 'dest-repo'  # where bad units were copied to

        source_repo = Repo(self.source_repo_id, '')
        Repo.get_collection().insert(source_repo, safe=True)

        dest_repo = Repo(self.dest_repo_id, '')
        Repo.get_collection().insert(dest_repo, safe=True)

        source_importer = RepoImporter(self.source_repo_id, 'yum_importer', 'yum_importer', {})
        RepoImporter.get_collection().insert(source_importer, safe=True)

        dest_importer = RepoImporter(self.dest_repo_id, 'yum_importer', 'yum_importer', {})
        RepoImporter.get_collection().insert(dest_importer, safe=True)
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:25,代码来源:test_0004_migrate.py


示例5: test_set_importer_with_existing

    def test_set_importer_with_existing(self, mock_repo_qs):
        """
        Tests setting a different importer on a repo that already had one.
        """

        class MockImporter2(Importer):
            @classmethod
            def metadata(cls):
                return {'types': ['mock_types_2']}

            def validate_config(self, repo_data, importer_config):
                return True

        mock_plugins.IMPORTER_MAPPINGS['mock-importer-2'] = MockImporter2()
        plugin_api._MANAGER.importers.add_plugin('mock-importer-2', MockImporter2, {})

        self.importer_manager.set_importer('change_me', 'mock-importer', {})

        # Test
        self.importer_manager.set_importer('change_me', 'mock-importer-2', {})

        # Verify
        all_importers = list(RepoImporter.get_collection().find())
        self.assertEqual(1, len(all_importers))
        self.assertEqual(all_importers[0]['id'], 'mock-importer-2')

        self.assertEqual(1, mock_plugins.MOCK_IMPORTER.importer_removed.call_count)
        RepoImporter.get_collection().remove()
开发者ID:jeremycline,项目名称:pulp,代码行数:28,代码来源:test_importer.py


示例6: clean

 def clean(self):
     Bind.get_collection().remove()
     Repo.get_collection().remove()
     RepoDistributor.get_collection().remove()
     RepoImporter.get_collection().remove()
     RepoContentUnit.get_collection().remove()
     unit_db.clean()
开发者ID:juwu,项目名称:pulp,代码行数:7,代码来源:test_plugins.py


示例7: tearDown

 def tearDown(self):
     super(TestDoSync, self).tearDown()
     mock_plugins.reset()
     manager_factory.reset()
     Repo.get_collection().remove()
     RepoImporter.get_collection().remove()
     RepoSyncResult.get_collection().remove()
     MockRepoPublishManager.reset()
开发者ID:beav,项目名称:pulp,代码行数:8,代码来源:test_sync.py


示例8: clean

    def clean(self):
        super(RepoSyncManagerTests, self).clean()
        Repo.get_collection().remove()
        RepoImporter.get_collection().remove()
        RepoSyncResult.get_collection().remove()

        # Reset the state of the mock's tracker variables
        MockRepoPublishManager.reset()
开发者ID:beav,项目名称:pulp,代码行数:8,代码来源:test_sync.py


示例9: tearDown

    def tearDown(self):
        super(Migration0004Tests, self).tearDown()

        # Delete any sample data added for the test
        types_db.clean()

        RepoContentUnit.get_collection().remove()
        RepoImporter.get_collection().remove()
        Repo.get_collection().remove()
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:9,代码来源:test_0004_migrate.py


示例10: clean

    def clean(self):
        super(DependencyManagerTests, self).clean()

        database.clean()

        Repo.get_collection().remove()
        RepoImporter.get_collection().remove()
        RepoContentUnit.get_collection().remove()

        mock_plugins.MOCK_IMPORTER.resolve_dependencies.return_value = None
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:10,代码来源:test_dependency_manager.py


示例11: clean

 def clean(self, units_only=False, plugins=False):
     RepoContentUnit.get_collection().remove()
     unit_db.clean()
     if units_only:
         return
     Bind.get_collection().remove()
     Repo.get_collection().remove()
     RepoDistributor.get_collection().remove()
     RepoImporter.get_collection().remove()
     if plugins:
         plugin_api._MANAGER.distributors.plugins = {}
开发者ID:msurovcak,项目名称:pulp,代码行数:11,代码来源:test_plugins.py


示例12: tearDown

 def tearDown(self):
     ServerTests.tearDown(self)
     shutil.rmtree(self.parentfs)
     shutil.rmtree(self.childfs)
     Consumer.get_collection().remove()
     Bind.get_collection().remove()
     model.Repository.drop_collection()
     RepoDistributor.get_collection().remove()
     RepoImporter.get_collection().remove()
     RepoContentUnit.get_collection().remove()
     unit_db.clean()
开发者ID:jeremycline,项目名称:pulp,代码行数:11,代码来源:test_plugins.py


示例13: tearDown

 def tearDown(self):
     WebTest.tearDown(self)
     shutil.rmtree(self.parentfs)
     shutil.rmtree(self.childfs)
     Consumer.get_collection().remove()
     Bind.get_collection().remove()
     Repo.get_collection().remove()
     RepoDistributor.get_collection().remove()
     RepoImporter.get_collection().remove()
     RepoContentUnit.get_collection().remove()
     unit_db.clean()
开发者ID:ipanova,项目名称:pulp,代码行数:11,代码来源:test_plugins.py


示例14: clean

 def clean(self, just_units=False, purge_plugins=False):
     RepoContentUnit.get_collection().remove()
     unit_db.clean()
     if just_units:
         return
     Bind.get_collection().remove()
     Repo.get_collection().remove()
     RepoDistributor.get_collection().remove()
     RepoImporter.get_collection().remove()
     if purge_plugins:
         plugin_api._MANAGER.importers.plugins = {}
         plugin_api._MANAGER.distributors.plugins = {}
开发者ID:bartwo,项目名称:pulp,代码行数:12,代码来源:test_plugins.py


示例15: test_remove_importer

    def test_remove_importer(self, mock_delete_schedules, mock_repo_qs):
        """
        Tests the successful case of removing an importer.
        """
        self.importer_manager.set_importer('whiterun', 'mock-importer', {})
        importer = RepoImporter.get_collection().find_one({'repo_id': 'whiterun',
                                                           'id': 'mock-importer'})
        self.assertTrue(importer is not None)
        self.importer_manager.remove_importer('whiterun')
        importer = RepoImporter.get_collection().find_one({'repo_id': 'whiterun',
                                                           'id': 'mock-importer'})
        self.assertTrue(importer is None)

        self.assertEqual(1, mock_plugins.MOCK_IMPORTER.importer_removed.call_count)
        mock_delete_schedules.assert_called_once_with('whiterun', 'mock-importer')
开发者ID:jeremycline,项目名称:pulp,代码行数:15,代码来源:test_importer.py


示例16: test_set_importer_added_raises_error

    def test_set_importer_added_raises_error(self, mock_repo_qs):
        """
        Tests simulating an error coming out of the importer's validate config method.
        """
        mock_plugins.MOCK_IMPORTER.importer_added.side_effect = Exception()
        config = {'hobbit': 'frodo'}

        try:
            self.importer_manager.set_importer('repo-1', 'mock-importer', config)
            self.fail('Exception expected for importer plugin exception')
        except exceptions.PulpExecutionException:
            pass
        finally:
            mock_plugins.MOCK_IMPORTER.importer_added.side_effect = None
            RepoImporter.get_collection().remove()
开发者ID:jeremycline,项目名称:pulp,代码行数:15,代码来源:test_importer.py


示例17: test_sync_bad_database

    def test_sync_bad_database(self):
        """
        Tests the case where the database got itself in a bad state where the
        repo thinks it has an importer but the importer-repo relationship doc
        doesn't exist in the database.
        """

        # Setup
        self.repo_manager.create_repo('good-repo')
        self.importer_manager.set_importer('good-repo', 'mock-importer', None)

        RepoImporter.get_collection().remove()

        self.assertRaises(repo_sync_manager.PulpExecutionException, self.sync_manager.sync,
                          'good-repo')
开发者ID:beav,项目名称:pulp,代码行数:15,代码来源:test_sync.py


示例18: remove_importer

    def remove_importer(repo_id):
        """
        Removes an importer from a repository.

        :param repo_id:         identifies the repo
        :type  repo_id:         str
        :raise MissingResource: if the given repo does not exist
        :raise MissingResource: if the given repo does not have an importer
        """

        importer_coll = RepoImporter.get_collection()

        # Validation
        repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id)

        repo_importer = importer_coll.find_one({"repo_id": repo_id})

        if repo_importer is None:
            raise MissingResource(repo_id)

        # remove schedules
        RepoSyncScheduleManager().delete_by_importer_id(repo_id, repo_importer["id"])

        # Call the importer's cleanup method
        importer_type_id = repo_importer["importer_type_id"]
        importer_instance, plugin_config = plugin_api.get_importer_by_id(importer_type_id)

        call_config = PluginCallConfiguration(plugin_config, repo_importer["config"])

        transfer_repo = repo_obj.to_transfer_repo()

        importer_instance.importer_removed(transfer_repo, call_config)

        # Update the database to reflect the removal
        importer_coll.remove({"repo_id": repo_id})
开发者ID:nthien,项目名称:pulp,代码行数:35,代码来源:importer.py


示例19: test_update_importer_config

    def test_update_importer_config(self, m_serializer, mock_repo_qs):
        """
        Tests the successful case of updating an importer's configuration.
        """
        orig_config = {'key1': 'initial1', 'key2': 'initial2', 'key3': 'initial3'}
        self.importer_manager.set_importer('winterhold', 'mock-importer', orig_config)
        config_delta = {'key1': 'updated1', 'key2': None}

        self.importer_manager.update_importer_config('winterhold', config_delta)
        expected_config = {'key1': 'updated1', 'key3': 'initial3'}
        set_config = m_serializer.mock_calls[0][1][0]['config']
        self.assertDictEqual(set_config, expected_config)

        # Database
        importer = RepoImporter.get_collection().find_one(
            {'repo_id': 'winterhold', 'id': 'mock-importer'})
        self.assertEqual(importer['config'], expected_config)

        # Plugin
        # initial and update
        self.assertEqual(2, mock_plugins.MOCK_IMPORTER.validate_config.call_count)
        # returns args from last call
        self.assertEqual(
            expected_config,
            mock_plugins.MOCK_IMPORTER.validate_config.call_args[0][1].repo_plugin_config)
开发者ID:jeremycline,项目名称:pulp,代码行数:25,代码来源:test_importer.py


示例20: repositories_with_yum_importers

def repositories_with_yum_importers():
    repo_importer_collection = RepoImporter.get_collection()
    repo_yum_importers = repo_importer_collection.find({'importer_type_id': _TYPE_YUM_IMPORTER}, fields=['repo_id'])
    yum_repo_ids = [i['repo_id'] for i in repo_yum_importers]
    repo_collection = Repo.get_collection()
    yum_repos = repo_collection.find({'id': {'$in': yum_repo_ids}}, fields=['id', 'scratchpad'])
    return list(yum_repos)
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:7,代码来源:0007_inventoried_custom_metadata.py



注:本文中的pulp.server.db.model.repository.RepoImporter类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python repository.RepoPublishResult类代码示例发布时间:2022-05-25
下一篇:
Python repository.RepoDistributor类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap