本文整理汇总了Python中pulp_rpm.plugins.distributors.export_distributor.distributor.ISODistributor类的典型用法代码示例。如果您正苦于以下问题:Python ISODistributor类的具体用法?Python ISODistributor怎么用?Python ISODistributor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ISODistributor类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_publish_repo
def test_publish_repo(self, mock_validate, export_publisher, mock_ensure_downloaded):
mock_validate.return_value = (True, None)
distributor = ISODistributor()
export_publisher.return_value = mock.Mock()
export_publisher.return_value.process_lifecycle.return_value = 'foo'
self.assertEquals('foo', distributor.publish_repo(self.repo, self.conduit, self.config))
开发者ID:ATIX-AG,项目名称:pulp_rpm,代码行数:8,代码来源:test_distributor.py
示例2: test_publish_repo
def test_publish_repo(self, mock_validate, export_publisher):
mock_validate.return_value = (True, None)
distributor = ISODistributor()
export_publisher.return_value = mock.Mock()
export_publisher.return_value.publish.return_value = 'foo'
self.assertEquals('foo', distributor.publish_repo(self.repo, self.conduit, self.config))
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:8,代码来源:test_distributor.py
示例3: test_cancel_publish_repo
def test_cancel_publish_repo(self):
"""
Test cancel_publish_repo, which is not currently fully supported
"""
distributor = ISODistributor()
distributor._publisher = mock.Mock()
distributor.cancel_publish_repo()
self.assertTrue(distributor._publisher.cancel.called)
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:10,代码来源:test_distributor.py
示例4: test_cancel_publish_repo
def test_cancel_publish_repo(self, mock_cancel_createrepo):
"""
Test cancel_publish_repo, which is not currently fully supported
"""
distributor = ISODistributor()
distributor.working_dir = '/working/dir'
distributor.cancel_publish_repo()
mock_cancel_createrepo.assert_called_once_with(distributor.working_dir)
开发者ID:preethit,项目名称:pulp_rpm,代码行数:10,代码来源:test_distributor.py
示例5: test_set_progress
def test_set_progress(self):
"""
Test set_progress, which simply checks if the progress_callback is None before calling it
"""
# Setup
mock_callback = mock.Mock()
distributor = ISODistributor()
# Test
distributor.set_progress('id', 'status', mock_callback)
mock_callback.assert_called_once_with('id', 'status')
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:11,代码来源:test_distributor.py
示例6: TestPublishIsos
class TestPublishIsos(unittest.TestCase):
"""
Tests the _publish_isos method in GroupISODistributor. This method decides what the publishing
directories should be, cleans them up, and hands everything off to the publish_iso method in
export_utils.
"""
def setUp(self):
self.distributor = ISODistributor()
self.repo = Repository('repo_id', working_dir='/working/dir')
self.config = {PUBLISH_HTTP_KEYWORD: True, PUBLISH_HTTPS_KEYWORD: True}
self.publish_iso = export_utils.publish_isos
export_utils.publish_isos = mock.Mock()
def tearDown(self):
export_utils.publish_isos = self.publish_iso
@mock.patch('shutil.rmtree', autospec=True)
def test_publish_isos(self, mock_rmtree):
"""
Test that publish_isos is called with the expected arguments
"""
# Setup
http_publish_dir = os.path.join(EXPORT_HTTP_DIR, self.repo.id)
https_publish_dir = os.path.join(EXPORT_HTTPS_DIR, self.repo.id)
# Test
self.distributor._publish_isos(self.repo, PluginCallConfiguration({}, self.config))
self.assertEqual(2, mock_rmtree.call_count)
self.assertEqual(http_publish_dir, mock_rmtree.call_args_list[0][0][0])
self.assertEqual(https_publish_dir, mock_rmtree.call_args_list[1][0][0])
export_utils.publish_isos.assert_called_once_with(self.repo.working_dir, self.repo.id,
http_publish_dir, https_publish_dir, None,
None)
@mock.patch('shutil.rmtree', autospec=True)
def test_publish_http_https_false(self, mock_rmtree):
"""
Test that when the config has publishing http and https set to false, publish_isos is called
with None for https_dir and http_dir
"""
# Setup
self.config[PUBLISH_HTTPS_KEYWORD] = False
self.config[PUBLISH_HTTP_KEYWORD] = False
self.distributor._publish_isos(self.repo, PluginCallConfiguration({}, self.config))
http_publish_dir = os.path.join(EXPORT_HTTP_DIR, self.repo.id)
https_publish_dir = os.path.join(EXPORT_HTTPS_DIR, self.repo.id)
# Test
self.assertEqual(2, mock_rmtree.call_count)
self.assertEqual(http_publish_dir, mock_rmtree.call_args_list[0][0][0])
self.assertEqual(https_publish_dir, mock_rmtree.call_args_list[1][0][0])
export_utils.publish_isos.assert_called_once_with(self.repo.working_dir, self.repo.id, None,
None, None, None)
开发者ID:preethit,项目名称:pulp_rpm,代码行数:54,代码来源:test_distributor.py
示例7: test_set_progress_no_callback
def test_set_progress_no_callback(self):
"""
Assert that set_progress don't not attempt to call the callback when it is None
"""
# Setup
distributor = ISODistributor()
# Test
try:
distributor.set_progress('id', 'status', None)
except AttributeError:
self.fail('set_progress should not try to call None')
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:12,代码来源:test_distributor.py
示例8: setUp
def setUp(self):
self.distributor = ISODistributor()
self.repo = Repository('repo_id', working_dir='/working/dir')
self.config = {PUBLISH_HTTP_KEYWORD: True, PUBLISH_HTTPS_KEYWORD: True}
self.publish_iso = export_utils.publish_isos
export_utils.publish_isos = mock.Mock()
开发者ID:preethit,项目名称:pulp_rpm,代码行数:7,代码来源:test_distributor.py
示例9: test_validate_config
def test_validate_config(self):
"""
Test the validate_config method in ISODistributor, which just hands the config off to a helper
method in export_utils
"""
# Setup
validate_config = export_utils.validate_export_config
export_utils.validate_export_config = mock.MagicMock()
distributor = ISODistributor()
# Test. All validate_config should do is hand the config argument to the export_utils validator
distributor.validate_config(None, 'config', None)
export_utils.validate_export_config.assert_called_once_with('config')
# Clean up
export_utils.validate_export_config = validate_config
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:16,代码来源:test_export_distributor.py
示例10: test_metadata
def test_metadata(self):
"""
Test the overridden metadata method in ISODistributor
"""
metadata = ISODistributor.metadata()
expected_types = [TYPE_ID_RPM, TYPE_ID_SRPM, TYPE_ID_DRPM, TYPE_ID_ERRATA,
TYPE_ID_DISTRO, TYPE_ID_PKG_CATEGORY, TYPE_ID_PKG_GROUP]
self.assertEquals(metadata['id'], TYPE_ID_DISTRIBUTOR_EXPORT)
self.assertEqual(set(expected_types), set(metadata['types']))
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:9,代码来源:test_distributor.py
示例11: test_distributor_removed
def test_distributor_removed(self, mock_configuration):
master_dir = os.path.join(self.working_dir, 'master')
http_dir = os.path.join(self.working_dir, 'http')
https_dir = os.path.join(self.working_dir, 'https')
repo_working_dir = os.path.join(self.working_dir, 'repodir')
self.repo.working_dir = repo_working_dir
mock_configuration.get_master_publish_dir.return_value = master_dir
mock_configuration.HTTP_EXPORT_DIR = http_dir
mock_configuration.HTTPS_EXPORT_DIR = https_dir
http_dir = os.path.join(http_dir, self.repo.id)
https_dir = os.path.join(https_dir, self.repo.id)
os.makedirs(http_dir)
os.makedirs(https_dir)
os.makedirs(master_dir)
os.makedirs(repo_working_dir)
distributor = ISODistributor()
distributor.distributor_removed(self.repo, self.config)
self.assertFalse(os.path.exists(http_dir))
self.assertFalse(os.path.exists(https_dir))
self.assertFalse(os.path.exists(master_dir))
开发者ID:ATIX-AG,项目名称:pulp_rpm,代码行数:22,代码来源:test_distributor.py
示例12: TestPublishRepo
class TestPublishRepo(unittest.TestCase):
"""
Tests publish_repo in ISODistributor
"""
def setUp(self):
self.config_dict = {
PUBLISH_HTTP_KEYWORD: False,
PUBLISH_HTTPS_KEYWORD: True
}
# Set up the distributor
self.distributor = ISODistributor()
self.distributor._publish_isos = mock.Mock(spec=ISODistributor._publish_isos)
# Arguments for the distributor
self.repo = Repository(id='repo-id', working_dir='/working/dir')
self.mock_conduit = mock.Mock(spec=RepoPublishConduit)
self.config = PluginCallConfiguration({}, self.config_dict)
# It's difficult to mock patch the export_utils, so do it here.
self.cleanup_working_dir = export_utils.cleanup_working_dir
self.validate_export_config = export_utils.validate_export_config
self.export_complete_repo = export_utils.export_complete_repo
self.export_incremental = export_utils.export_incremental_content
self.retrieve_repo_config = export_utils.retrieve_repo_config
self.generate_listing_files = util.generate_listing_files
self.rmtree = shutil.rmtree
self.makdirs = os.makedirs
export_utils.cleanup_working_dir = mock.Mock(spec=export_utils.cleanup_working_dir)
export_utils.validate_export_config = mock.Mock(return_value=(True, None))
export_utils.export_complete_repo = mock.Mock(return_value=({}, {'errors': []}))
export_utils.export_incremental_content = mock.Mock(return_value=({}, {'errors': ()}))
export_utils.retrieve_repo_config = mock.Mock(return_value=('/working/dir/repo', None))
util.generate_listing_files = mock.Mock()
shutil.rmtree = mock.Mock(spec=shutil.rmtree)
os.makedirs = mock.Mock(spec=os.makedirs)
def tearDown(self):
export_utils.cleanup_working_dir = self.cleanup_working_dir
export_utils.validate_export_config = self.validate_export_config
export_utils.export_complete_repo = self.export_complete_repo
export_utils.export_incremental_content = self.export_incremental
export_utils.retrieve_repo_config = self.retrieve_repo_config
util.generate_listing_files = self.generate_listing_files
shutil.rmtree = self.rmtree
os.makedirs = self.makdirs
def test_failed_override_config(self):
"""
Tests that when invalid override configuration is given, an exception is raised.
"""
# Setup
export_utils.validate_export_config.return_value = (False, 'failed validation')
# Test
self.assertRaises(PulpDataException, self.distributor.publish_repo, self.repo,
self.mock_conduit, self.config)
def test_working_dir_cleanup(self):
"""
Check that the working directory is cleaned before use. This is done because the ISOs are
currently stored there
"""
self.distributor.publish_repo(self.repo, self.mock_conduit, self.config)
shutil.rmtree.assert_called_once_with(self.repo.working_dir, ignore_errors=True)
os.makedirs.assert_called_once_with(self.repo.working_dir)
def test_export_with_export_dir(self):
"""
Test that _publish_isos isn't called when there is an export directory in the config, and that
the correct working directory is used.
"""
# Set the config to have an export directory
self.config_dict[EXPORT_DIRECTORY_KEYWORD] = '/my/export/dir'
config = PluginCallConfiguration({}, self.config_dict)
# Test
self.distributor.publish_repo(self.repo, self.mock_conduit, config)
self.assertEqual(0, self.distributor._publish_isos.call_count)
self.assertEqual(1, self.mock_conduit.build_success_report.call_count)
def test_export_iso_publish(self):
"""
Test that _publish_iso gets called when an export dir isn't in the config
"""
self.distributor.publish_repo(self.repo, self.mock_conduit, self.config)
self.assertEqual(1, self.distributor._publish_isos.call_count)
self.assertEqual(self.repo, self.distributor._publish_isos.call_args[0][0])
self.assertEqual(self.config, self.distributor._publish_isos.call_args[0][1])
self.assertEqual(1, self.mock_conduit.build_success_report.call_count)
def test_export_complete_repo(self):
"""
Test that when a date filter doesn't exist, export_complete_repo is called
"""
self.distributor.publish_repo(self.repo, self.mock_conduit, self.config)
self.assertEqual(1, export_utils.export_complete_repo.call_count)
self.assertEqual('repo-id', export_utils.export_complete_repo.call_args[0][0])
self.assertEqual('/working/dir/repo', export_utils.export_complete_repo.call_args[0][1])
#.........这里部分代码省略.........
开发者ID:preethit,项目名称:pulp_rpm,代码行数:101,代码来源:test_distributor.py
注:本文中的pulp_rpm.plugins.distributors.export_distributor.distributor.ISODistributor类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论