本文整理汇总了Python中model_factory.rpm_models函数的典型用法代码示例。如果您正苦于以下问题:Python rpm_models函数的具体用法?Python rpm_models怎么用?Python rpm_models使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了rpm_models函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
super(TestRemoveOldVersions, self).setUp()
self.rpms = model_factory.rpm_models(3, True)
self.rpms.extend(model_factory.rpm_models(2, False))
self.srpms = model_factory.srpm_models(3, True)
self.srpms.extend(model_factory.srpm_models(2, False))
self.drpms = model_factory.drpm_models(3, True)
self.drpms.extend(model_factory.drpm_models(2, False))
开发者ID:goosemania,项目名称:pulp_rpm,代码行数:8,代码来源:test_purge.py
示例2: test_rpms_check_all_and_associate_negative
def test_rpms_check_all_and_associate_negative(self, mock_isfile, mock_save, mock_search_all_units):
mock_search_all_units.return_value = []
mock_isfile.return_value = True
units = model_factory.rpm_models(3)
input_units = set([unit.as_named_tuple for unit in units])
result = check_all_and_associate(input_units, self.conduit)
self.assertEqual(len(list(result)), 3)
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:7,代码来源:test_sync.py
示例3: test_associate_already_downloaded_units_negative
def test_associate_already_downloaded_units_negative(self, mock_save, mock_search_all_units):
mock_search_all_units.return_value = []
units = model_factory.rpm_models(3)
for unit in units:
unit.metadata['relativepath'] = 'test-relative-path'
result = associate_already_downloaded_units(models.RPM.TYPE, units, self.conduit)
self.assertEqual(len(list(result)), 3)
开发者ID:beav,项目名称:pulp_rpm,代码行数:7,代码来源:test_sync.py
示例4: test_rpms_to_download
def test_rpms_to_download(self, mock_package_list_generator, mock_create_downloader):
"""
test with only RPMs specified to download
"""
file_handle = StringIO()
self.metadata_files.get_metadata_file_handle = mock.MagicMock(
spec_set=self.metadata_files.get_metadata_file_handle,
side_effect=[file_handle, None], # None means it will skip DRPMs
)
rpms = model_factory.rpm_models(3)
for rpm in rpms:
rpm.metadata['relativepath'] = self.RELATIVEPATH
mock_package_list_generator.return_value = rpms
self.downloader.download = mock.MagicMock(spec_set=self.downloader.download)
mock_create_downloader.return_value = self.downloader
# call download, passing in only two of the 3 rpms as units we want
report = self.reposync.download(self.metadata_files, set(m.as_named_tuple for m in rpms[:2]), set())
# make sure we skipped DRPMs
self.assertEqual(self.downloader.download.call_count, 1)
self.assertEqual(mock_package_list_generator.call_count, 1)
# verify that the download requests were correct
requests = list(self.downloader.download.call_args[0][0])
self.assertEqual(len(requests), 2)
self.assertEqual(requests[0].url, os.path.join(self.url, self.RELATIVEPATH))
self.assertEqual(requests[0].destination, os.path.join(self.reposync.tmp_dir, self.RELATIVEPATH))
self.assertTrue(requests[0].data is rpms[0])
self.assertEqual(requests[1].url, os.path.join(self.url, self.RELATIVEPATH))
self.assertEqual(requests[1].destination, os.path.join(self.reposync.tmp_dir, self.RELATIVEPATH))
self.assertTrue(requests[1].data is rpms[1])
self.assertTrue(file_handle.closed)
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:33,代码来源:test_sync.py
示例5: test_keep_two
def test_keep_two(self):
self.config.override_config[importer_constants.KEY_UNITS_RETAIN_OLD_COUNT] = 1
units = model_factory.rpm_models(3, True)
units.extend(model_factory.rpm_models(2))
for unit in units:
unit.metadata['size'] = 1024
# the generator can yield results out of their original order, which is ok
result = self.reposync._identify_wanted_versions(units)
self.assertFalse(units[0].as_named_tuple in result)
self.assertTrue(units[1].as_named_tuple in result)
self.assertTrue(units[2].as_named_tuple in result)
self.assertTrue(units[3].as_named_tuple in result)
self.assertTrue(units[4].as_named_tuple in result)
for size in result.values():
self.assertEqual(size, 1024)
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:17,代码来源:test_sync.py
示例6: test_keep_all
def test_keep_all(self):
self.config.override_config[importer_constants.KEY_UNITS_RETAIN_OLD_COUNT] = None
units = model_factory.rpm_models(3)
for unit in units:
unit.metadata['size'] = 1024
result = sorted(self.reposync._identify_wanted_versions(units).keys())
self.assertEqual([u.as_named_tuple for u in units], result)
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:9,代码来源:test_sync.py
示例7: test_with_to_download
def test_with_to_download(self):
units = model_factory.rpm_models(3)
# specify which we want
to_download = set([unit.as_named_tuple for unit in units[:2]])
result = list(self.reposync._filtered_unit_generator(units, to_download))
# make sure we only got the ones we want
self.assertEqual(result, units[:2])
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:9,代码来源:test_sync.py
示例8: test_no_epoch
def test_no_epoch(self):
rpm = model_factory.rpm_models(1)[0]
# simulates repos that don't have epochs in their errata
rpm.epoch = None
ret = associate._no_checksum_clean_unit_key(rpm.as_named_tuple)
self.assertTrue(isinstance(ret, dict))
self.assertTrue('epoch' not in ret)
开发者ID:hjensas,项目名称:pulp_rpm,代码行数:9,代码来源:test_associate.py
示例9: test_rpm
def test_rpm(self, mock_copyfile):
model = model_factory.rpm_models(1)[0]
unit = Unit(model.TYPE, model.unit_key, model.metadata, '/')
# passing "None" ensures that the importer isn't being called
ret = associate._associate_unit('', None, unit)
self.assertTrue(ret is unit)
self.assertEqual(mock_copyfile.call_count, 0)
开发者ID:hjensas,项目名称:pulp_rpm,代码行数:9,代码来源:test_associate.py
示例10: test_all
def test_all(self):
rpm = model_factory.rpm_models(1)[0]
ret = associate._no_checksum_clean_unit_key(rpm.as_named_tuple)
self.assertTrue(isinstance(ret, dict))
self.assertTrue('checksum' not in ret)
self.assertTrue('checksumtype' not in ret)
for key in ['name', 'epoch', 'version', 'release', 'arch']:
self.assertEqual(ret[key], rpm.unit_key[key])
开发者ID:hjensas,项目名称:pulp_rpm,代码行数:10,代码来源:test_associate.py
示例11: test_some_existing
def test_some_existing(self):
postfix = model_factory.rpm_models(1)[0]
postfix.name = 'postfix'
vim = model_factory.rpm_models(1)[0]
vim.name = 'vim-common'
existing = [
Unit(postfix.TYPE, postfix.unit_key, postfix.metadata, ''),
Unit(vim.TYPE, vim.unit_key, vim.metadata, ''),
]
conduit = ImportUnitConduit('', '','', '', '', '')
conduit.get_destination_units = mock.MagicMock(spec_set=conduit.get_destination_units,
return_value=existing)
ret = associate.get_rpms_to_copy_by_name(self.RPM_NAMES, conduit)
self.assertEqual(set(ret), set(['python-mock']))
self.assertEqual(conduit.get_destination_units.call_count, 1)
self.assertTrue(isinstance(conduit.get_destination_units.call_args[0][0], UnitAssociationCriteria))
self.assertEqual(conduit.get_destination_units.call_args[0][0].type_ids, [models.RPM.TYPE])
self.assertEqual(conduit.get_destination_units.call_args[0][0].unit_fields, models.RPM.UNIT_KEY_NAMES)
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:20,代码来源:test_associate.py
示例12: test_rpms_check_all_and_associate_positive
def test_rpms_check_all_and_associate_positive(self, mock_isfile, mock_save, mock_search_all_units):
units = model_factory.rpm_models(3)
mock_search_all_units.return_value = units
mock_isfile.return_value = True
input_units = set([unit.as_named_tuple for unit in units])
for unit in units:
unit.metadata['filename'] = 'test-filename'
unit.storage_path = "existing_storage_path"
result = check_all_and_associate(input_units, self.conduit)
self.assertEqual(len(list(result)), 0)
# verify we are saving the storage path
for c in mock_save.mock_calls:
(conduit, unit) = c[1]
self.assertEquals(unit.storage_path, "existing_storage_path")
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:14,代码来源:test_sync.py
示例13: test_remove_missing_units
def test_remove_missing_units(self, mock_get_existing):
self.conduit.remove_unit = mock.MagicMock(spec_set=self.conduit.remove_unit)
# setup such that only one of the 2 existing units appears to be present
# in the remote repo, thus the other unit should be purged
mock_get_existing.return_value = model_factory.rpm_units(2)
remote_named_tuples = set(model.as_named_tuple for model in model_factory.rpm_models(2))
common_unit = mock_get_existing.return_value[1]
common_named_tuple = models.RPM.NAMEDTUPLE(**common_unit.unit_key)
remote_named_tuples.add(common_named_tuple)
purge.remove_missing_units(self.conduit, models.RPM, remote_named_tuples)
mock_get_existing.assert_called_once_with(models.RPM, self.conduit.get_units)
self.conduit.remove_unit.assert_called_once_with(mock_get_existing.return_value[0])
开发者ID:msutter,项目名称:pulp_rpm,代码行数:14,代码来源:test_purge.py
示例14: test_rpm
def test_rpm(self, mock_open, mock_package_list_generator):
rpms = model_factory.rpm_models(2)
mock_package_list_generator.return_value = rpms
fake_file = StringIO()
mock_open.return_value = fake_file
process_func = lambda x: x
ret = purge.get_remote_units(self.metadata_files, self.FAKE_TYPE, 'bar', process_func)
mock_open.assert_called_once_with('/a/b/c', 'r')
self.assertTrue(fake_file.closed)
mock_package_list_generator.assert_called_once_with(fake_file, 'bar', process_func)
self.assertEqual(len(rpms), len(ret))
for model in rpms:
self.assertTrue(model.as_named_tuple in ret)
开发者ID:bechtoldt,项目名称:pulp_rpm,代码行数:15,代码来源:test_purge.py
示例15: test_with_existing_deps
def test_with_existing_deps(self, mock_find, mock_get_existing):
conduit = mock.MagicMock()
rpms = model_factory.rpm_units(1)
deps = model_factory.rpm_models(2)
dep_units = [Unit(model.TYPE, model.unit_key, model.metadata, '') for model in deps]
mock_find.return_value = [r.as_named_tuple for r in deps]
mock_get_existing.return_value = dep_units
associate.copy_rpms(rpms, conduit, True)
self.assertEqual(conduit.associate_unit.call_count, 1)
self.assertEqual(mock_find.call_count, 1)
self.assertEqual(mock_find.call_args[0][0], set(rpms))
# called once directly, and once from filter_available_rpms
self.assertEqual(mock_get_existing.call_count, 2)
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:15,代码来源:test_associate.py
示例16: test_calls_identify_wanted_and_existing
def test_calls_identify_wanted_and_existing(self, mock_check_repo, mock_identify,
mock_generator, mock_open):
primary_file = StringIO()
mock_open.return_value = primary_file
model = model_factory.rpm_models(1)[0]
self.metadata_files.metadata[primary.METADATA_FILE_NAME] = {'local_path': '/path/to/primary'}
mock_generator.return_value = [model.as_named_tuple]
mock_identify.return_value = {model.as_named_tuple: 1024}
mock_check_repo.return_value = set([model.as_named_tuple])
ret = self.reposync._decide_rpms_to_download(self.metadata_files)
self.assertEqual(ret, (set([model.as_named_tuple]), 1, 1024))
mock_open.assert_called_once_with('/path/to/primary', 'r')
mock_generator.assert_called_once_with(primary_file, primary.PACKAGE_TAG, primary.process_package_element)
mock_identify.assert_called_once_with(mock_generator.return_value)
self.assertTrue(primary_file.closed)
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:17,代码来源:test_sync.py
示例17: test_none_to_download
def test_none_to_download(self, mock_package_list_generator):
"""
make sure it does nothing if there are no units specified to download
"""
self.metadata_files.get_metadata_file_handle = mock.MagicMock(
spec_set=self.metadata_files.get_metadata_file_handle,
side_effect=StringIO,
)
mock_package_list_generator.side_effect = iter([model_factory.rpm_models(3),
model_factory.drpm_models(3)])
report = self.reposync.download(self.metadata_files, set(), set())
self.assertTrue(report.success_flag)
self.assertEqual(report.added_count, 0)
self.assertEqual(report.removed_count, 0)
self.assertEqual(report.updated_count, 0)
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:17,代码来源:test_sync.py
示例18: test_rpm
def test_rpm(self, mock_open, mock_package_list_generator):
rpms = model_factory.rpm_models(2)
mock_package_list_generator.return_value = rpms
fake_file = StringIO()
mock_open.return_value = fake_file
def process_func(x):
return x
file_function = functools.partial(self.metadata_files.get_metadata_file_handle, self.FAKE_TYPE)
ret = purge.get_remote_units(file_function, "bar", process_func)
mock_open.assert_called_once_with("/a/b/c", "r")
self.assertTrue(fake_file.closed)
mock_package_list_generator.assert_called_once_with(fake_file, "bar", process_func)
self.assertEqual(len(rpms), len(ret))
for model in rpms:
self.assertTrue(model.as_named_tuple in ret)
开发者ID:hjensas,项目名称:pulp_rpm,代码行数:18,代码来源:test_purge.py
示例19: test_with_recursive_deps
def test_with_recursive_deps(self, mock_find, mock_get_existing, mock_filter):
"""
Test getting dependencies that do not exist in the repository already
"""
conduit = mock.MagicMock()
# Create the primary RPMS that we want to copy
rpms = model_factory.rpm_units(1)
# Create the recursive dependencies that we want to copy
deps = model_factory.rpm_models(2)
dep_units = [Unit(model.TYPE, model.unit_key, model.metadata, '') for model in deps]
# the first call to filter gets all the dependencies for the parent repository
# The second time it is called during the recursion and all the units have already
# been copied.
mock_filter.side_effect = iter([dep_units, []])
# The get existing units always assumes there are no units in the target repository
mock_get_existing.return_value = []
unit_set = associate.copy_rpms(rpms, conduit, True)
merged_set = set(dep_units)
merged_set.update(rpms)
self.assertEquals(unit_set, merged_set)
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:23,代码来源:test_associate.py
示例20: test_all
def test_all(self, mock_decide_drpms, mock_decide_rpms):
rpm_model = model_factory.rpm_models(1)[0]
drpm_model = model_factory.drpm_models(1)[0]
mock_decide_rpms.return_value = (set([rpm_model.as_named_tuple]), 1, 1024)
mock_decide_drpms.return_value = (set([drpm_model.as_named_tuple]), 1, 1024)
self.conduit.set_progress = mock.MagicMock(spec_set=self.conduit.set_progress)
ret = self.reposync._decide_what_to_download(self.metadata_files)
self.assertEqual(ret[0], set([rpm_model.as_named_tuple]))
self.assertEqual(ret[1], set([drpm_model.as_named_tuple]))
self.assertEqual(len(ret), 2)
mock_decide_rpms.assert_called_once_with(self.metadata_files)
mock_decide_drpms.assert_called_once_with(self.metadata_files)
self.assertEqual(self.conduit.set_progress.call_count, 1)
# make sure we reported initial progress values correctly
report = self.conduit.set_progress.call_args[0][0]
self.assertEqual(report['content']['size_total'], 2048)
self.assertEqual(report['content']['size_left'], 2048)
self.assertEqual(report['content']['items_total'], 2)
self.assertEqual(report['content']['items_left'], 2)
self.assertEqual(report['content']['details']['rpm_total'], 1)
self.assertEqual(report['content']['details']['drpm_total'], 1)
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:24,代码来源:test_sync.py
注:本文中的model_factory.rpm_models函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论