本文整理汇总了Python中pulp.server.db.model.repository.RepoImporter类的典型用法代码示例。如果您正苦于以下问题:Python RepoImporter类的具体用法?Python RepoImporter怎么用?Python RepoImporter使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了RepoImporter类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: clean
def clean(self):
super(RepoManagerTests, self).clean()
model.Repository.drop_collection()
RepoImporter.get_collection().remove()
RepoDistributor.get_collection().remove()
TaskStatus.objects().delete()
开发者ID:nbetm,项目名称:pulp,代码行数:7,代码来源:test_managers.py
示例2: test_delete_with_plugins
def test_delete_with_plugins(self):
"""
Tests that deleting a repo that has importers and distributors configured deletes them as
well.
"""
# Setup
self.manager.create_repo('doomed')
importer_manager = manager_factory.repo_importer_manager()
distributor_manager = manager_factory.repo_distributor_manager()
importer_manager.set_importer('doomed', 'mock-importer', {})
distributor_manager.add_distributor('doomed', 'mock-distributor', {}, True,
distributor_id='dist-1')
distributor_manager.add_distributor('doomed', 'mock-distributor', {}, True,
distributor_id='dist-2')
self.assertEqual(1, len(list(RepoImporter.get_collection().find({'repo_id': 'doomed'}))))
self.assertEqual(2, len(list(RepoDistributor.get_collection().find({'repo_id': 'doomed'}))))
# Test
self.manager.delete_repo('doomed')
# Verify
self.assertEqual(0, len(list(Repo.get_collection().find())))
self.assertEqual(0, len(list(RepoImporter.get_collection().find({'repo_id': 'doomed'}))))
self.assertEqual(0, len(list(RepoDistributor.get_collection().find({'repo_id': 'doomed'}))))
self.assertEqual(1, mock_plugins.MOCK_IMPORTER.importer_removed.call_count)
self.assertEqual(2, mock_plugins.MOCK_DISTRIBUTOR.distributor_removed.call_count)
repo_working_dir = common_utils.repository_working_dir('doomed', mkdir=False)
self.assertTrue(not os.path.exists(repo_working_dir))
开发者ID:beav,项目名称:pulp,代码行数:35,代码来源:test_cud.py
示例3: clean
def clean(self):
super(RepoManagerTests, self).clean()
Repo.get_collection().remove()
RepoImporter.get_collection().remove()
RepoDistributor.get_collection().remove()
dispatch.TaskStatus.objects().delete()
开发者ID:beav,项目名称:pulp,代码行数:7,代码来源:test_cud.py
示例4: setUp
def setUp(self):
super(Migration0004Tests, self).setUp()
# Special way to import modules that start with a number
self.migration = _import_all_the_way(
'pulp_rpm.plugins.migrations.0004_pkg_group_category_repoid')
factory.initialize()
types_db.update_database([TYPE_DEF_GROUP, TYPE_DEF_CATEGORY])
# Create the repositories necessary for the tests
self.source_repo_id = 'source-repo' # where units were copied from with the bad code
self.dest_repo_id = 'dest-repo' # where bad units were copied to
source_repo = Repo(self.source_repo_id, '')
Repo.get_collection().insert(source_repo, safe=True)
dest_repo = Repo(self.dest_repo_id, '')
Repo.get_collection().insert(dest_repo, safe=True)
source_importer = RepoImporter(self.source_repo_id, 'yum_importer', 'yum_importer', {})
RepoImporter.get_collection().insert(source_importer, safe=True)
dest_importer = RepoImporter(self.dest_repo_id, 'yum_importer', 'yum_importer', {})
RepoImporter.get_collection().insert(dest_importer, safe=True)
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:25,代码来源:test_0004_migrate.py
示例5: test_set_importer_with_existing
def test_set_importer_with_existing(self, mock_repo_qs):
"""
Tests setting a different importer on a repo that already had one.
"""
class MockImporter2(Importer):
@classmethod
def metadata(cls):
return {'types': ['mock_types_2']}
def validate_config(self, repo_data, importer_config):
return True
mock_plugins.IMPORTER_MAPPINGS['mock-importer-2'] = MockImporter2()
plugin_api._MANAGER.importers.add_plugin('mock-importer-2', MockImporter2, {})
self.importer_manager.set_importer('change_me', 'mock-importer', {})
# Test
self.importer_manager.set_importer('change_me', 'mock-importer-2', {})
# Verify
all_importers = list(RepoImporter.get_collection().find())
self.assertEqual(1, len(all_importers))
self.assertEqual(all_importers[0]['id'], 'mock-importer-2')
self.assertEqual(1, mock_plugins.MOCK_IMPORTER.importer_removed.call_count)
RepoImporter.get_collection().remove()
开发者ID:jeremycline,项目名称:pulp,代码行数:28,代码来源:test_importer.py
示例6: clean
def clean(self):
Bind.get_collection().remove()
Repo.get_collection().remove()
RepoDistributor.get_collection().remove()
RepoImporter.get_collection().remove()
RepoContentUnit.get_collection().remove()
unit_db.clean()
开发者ID:juwu,项目名称:pulp,代码行数:7,代码来源:test_plugins.py
示例7: tearDown
def tearDown(self):
super(TestDoSync, self).tearDown()
mock_plugins.reset()
manager_factory.reset()
Repo.get_collection().remove()
RepoImporter.get_collection().remove()
RepoSyncResult.get_collection().remove()
MockRepoPublishManager.reset()
开发者ID:beav,项目名称:pulp,代码行数:8,代码来源:test_sync.py
示例8: clean
def clean(self):
super(RepoSyncManagerTests, self).clean()
Repo.get_collection().remove()
RepoImporter.get_collection().remove()
RepoSyncResult.get_collection().remove()
# Reset the state of the mock's tracker variables
MockRepoPublishManager.reset()
开发者ID:beav,项目名称:pulp,代码行数:8,代码来源:test_sync.py
示例9: tearDown
def tearDown(self):
super(Migration0004Tests, self).tearDown()
# Delete any sample data added for the test
types_db.clean()
RepoContentUnit.get_collection().remove()
RepoImporter.get_collection().remove()
Repo.get_collection().remove()
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:9,代码来源:test_0004_migrate.py
示例10: clean
def clean(self):
super(DependencyManagerTests, self).clean()
database.clean()
Repo.get_collection().remove()
RepoImporter.get_collection().remove()
RepoContentUnit.get_collection().remove()
mock_plugins.MOCK_IMPORTER.resolve_dependencies.return_value = None
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:10,代码来源:test_dependency_manager.py
示例11: clean
def clean(self, units_only=False, plugins=False):
RepoContentUnit.get_collection().remove()
unit_db.clean()
if units_only:
return
Bind.get_collection().remove()
Repo.get_collection().remove()
RepoDistributor.get_collection().remove()
RepoImporter.get_collection().remove()
if plugins:
plugin_api._MANAGER.distributors.plugins = {}
开发者ID:msurovcak,项目名称:pulp,代码行数:11,代码来源:test_plugins.py
示例12: tearDown
def tearDown(self):
ServerTests.tearDown(self)
shutil.rmtree(self.parentfs)
shutil.rmtree(self.childfs)
Consumer.get_collection().remove()
Bind.get_collection().remove()
model.Repository.drop_collection()
RepoDistributor.get_collection().remove()
RepoImporter.get_collection().remove()
RepoContentUnit.get_collection().remove()
unit_db.clean()
开发者ID:jeremycline,项目名称:pulp,代码行数:11,代码来源:test_plugins.py
示例13: tearDown
def tearDown(self):
WebTest.tearDown(self)
shutil.rmtree(self.parentfs)
shutil.rmtree(self.childfs)
Consumer.get_collection().remove()
Bind.get_collection().remove()
Repo.get_collection().remove()
RepoDistributor.get_collection().remove()
RepoImporter.get_collection().remove()
RepoContentUnit.get_collection().remove()
unit_db.clean()
开发者ID:ipanova,项目名称:pulp,代码行数:11,代码来源:test_plugins.py
示例14: clean
def clean(self, just_units=False, purge_plugins=False):
RepoContentUnit.get_collection().remove()
unit_db.clean()
if just_units:
return
Bind.get_collection().remove()
Repo.get_collection().remove()
RepoDistributor.get_collection().remove()
RepoImporter.get_collection().remove()
if purge_plugins:
plugin_api._MANAGER.importers.plugins = {}
plugin_api._MANAGER.distributors.plugins = {}
开发者ID:bartwo,项目名称:pulp,代码行数:12,代码来源:test_plugins.py
示例15: test_remove_importer
def test_remove_importer(self, mock_delete_schedules, mock_repo_qs):
"""
Tests the successful case of removing an importer.
"""
self.importer_manager.set_importer('whiterun', 'mock-importer', {})
importer = RepoImporter.get_collection().find_one({'repo_id': 'whiterun',
'id': 'mock-importer'})
self.assertTrue(importer is not None)
self.importer_manager.remove_importer('whiterun')
importer = RepoImporter.get_collection().find_one({'repo_id': 'whiterun',
'id': 'mock-importer'})
self.assertTrue(importer is None)
self.assertEqual(1, mock_plugins.MOCK_IMPORTER.importer_removed.call_count)
mock_delete_schedules.assert_called_once_with('whiterun', 'mock-importer')
开发者ID:jeremycline,项目名称:pulp,代码行数:15,代码来源:test_importer.py
示例16: test_set_importer_added_raises_error
def test_set_importer_added_raises_error(self, mock_repo_qs):
"""
Tests simulating an error coming out of the importer's validate config method.
"""
mock_plugins.MOCK_IMPORTER.importer_added.side_effect = Exception()
config = {'hobbit': 'frodo'}
try:
self.importer_manager.set_importer('repo-1', 'mock-importer', config)
self.fail('Exception expected for importer plugin exception')
except exceptions.PulpExecutionException:
pass
finally:
mock_plugins.MOCK_IMPORTER.importer_added.side_effect = None
RepoImporter.get_collection().remove()
开发者ID:jeremycline,项目名称:pulp,代码行数:15,代码来源:test_importer.py
示例17: test_sync_bad_database
def test_sync_bad_database(self):
"""
Tests the case where the database got itself in a bad state where the
repo thinks it has an importer but the importer-repo relationship doc
doesn't exist in the database.
"""
# Setup
self.repo_manager.create_repo('good-repo')
self.importer_manager.set_importer('good-repo', 'mock-importer', None)
RepoImporter.get_collection().remove()
self.assertRaises(repo_sync_manager.PulpExecutionException, self.sync_manager.sync,
'good-repo')
开发者ID:beav,项目名称:pulp,代码行数:15,代码来源:test_sync.py
示例18: remove_importer
def remove_importer(repo_id):
"""
Removes an importer from a repository.
:param repo_id: identifies the repo
:type repo_id: str
:raise MissingResource: if the given repo does not exist
:raise MissingResource: if the given repo does not have an importer
"""
importer_coll = RepoImporter.get_collection()
# Validation
repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id)
repo_importer = importer_coll.find_one({"repo_id": repo_id})
if repo_importer is None:
raise MissingResource(repo_id)
# remove schedules
RepoSyncScheduleManager().delete_by_importer_id(repo_id, repo_importer["id"])
# Call the importer's cleanup method
importer_type_id = repo_importer["importer_type_id"]
importer_instance, plugin_config = plugin_api.get_importer_by_id(importer_type_id)
call_config = PluginCallConfiguration(plugin_config, repo_importer["config"])
transfer_repo = repo_obj.to_transfer_repo()
importer_instance.importer_removed(transfer_repo, call_config)
# Update the database to reflect the removal
importer_coll.remove({"repo_id": repo_id})
开发者ID:nthien,项目名称:pulp,代码行数:35,代码来源:importer.py
示例19: test_update_importer_config
def test_update_importer_config(self, m_serializer, mock_repo_qs):
"""
Tests the successful case of updating an importer's configuration.
"""
orig_config = {'key1': 'initial1', 'key2': 'initial2', 'key3': 'initial3'}
self.importer_manager.set_importer('winterhold', 'mock-importer', orig_config)
config_delta = {'key1': 'updated1', 'key2': None}
self.importer_manager.update_importer_config('winterhold', config_delta)
expected_config = {'key1': 'updated1', 'key3': 'initial3'}
set_config = m_serializer.mock_calls[0][1][0]['config']
self.assertDictEqual(set_config, expected_config)
# Database
importer = RepoImporter.get_collection().find_one(
{'repo_id': 'winterhold', 'id': 'mock-importer'})
self.assertEqual(importer['config'], expected_config)
# Plugin
# initial and update
self.assertEqual(2, mock_plugins.MOCK_IMPORTER.validate_config.call_count)
# returns args from last call
self.assertEqual(
expected_config,
mock_plugins.MOCK_IMPORTER.validate_config.call_args[0][1].repo_plugin_config)
开发者ID:jeremycline,项目名称:pulp,代码行数:25,代码来源:test_importer.py
示例20: repositories_with_yum_importers
def repositories_with_yum_importers():
repo_importer_collection = RepoImporter.get_collection()
repo_yum_importers = repo_importer_collection.find({'importer_type_id': _TYPE_YUM_IMPORTER}, fields=['repo_id'])
yum_repo_ids = [i['repo_id'] for i in repo_yum_importers]
repo_collection = Repo.get_collection()
yum_repos = repo_collection.find({'id': {'$in': yum_repo_ids}}, fields=['id', 'scratchpad'])
return list(yum_repos)
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:7,代码来源:0007_inventoried_custom_metadata.py
注:本文中的pulp.server.db.model.repository.RepoImporter类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论