本文整理汇总了Python中pulp.plugins.util.publish_step.PublishStep类的典型用法代码示例。如果您正苦于以下问题:Python PublishStep类的具体用法?Python PublishStep怎么用?Python PublishStep使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了PublishStep类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_process_lifecycle_reports_on_error
def test_process_lifecycle_reports_on_error(self):
step = PublishStep('parent')
step.process = Mock(side_effect=Exception('Foo'))
step.report_progress = Mock()
self.assertRaises(Exception, step.process_lifecycle)
step.report_progress.assert_called_once_with(force=True)
开发者ID:aweiteka,项目名称:pulp,代码行数:8,代码来源:test_publish_step.py
示例2: test_create_symlink_no_link_parent
def test_create_symlink_no_link_parent(self):
source_path = os.path.join(self.working_dir, 'source')
link_path = os.path.join(self.published_dir, 'foo/bar/baz/link')
touch(source_path)
self.assertFalse(os.path.exists(os.path.dirname(link_path)))
PublishStep._create_symlink(source_path, link_path)
self.assertTrue(os.path.exists(link_path))
开发者ID:aweiteka,项目名称:pulp,代码行数:10,代码来源:test_publish_step.py
示例3: test_get_progress_report_description
def test_get_progress_report_description(self):
step = PublishStep('bar_step')
step.description = 'bar'
step.progress_details = 'baz'
step.error_details = "foo"
step.state = reporting_constants.STATE_COMPLETE
step.total_units = 2
step.progress_successes = 1
step.progress_failures = 1
report = step.get_progress_report()
target_report = {
reporting_constants.PROGRESS_STEP_TYPE_KEY: 'bar_step',
reporting_constants.PROGRESS_NUM_SUCCESSES_KEY: 1,
reporting_constants.PROGRESS_STATE_KEY: step.state,
reporting_constants.PROGRESS_ERROR_DETAILS_KEY: step.error_details,
reporting_constants.PROGRESS_NUM_PROCESSED_KEY: 2,
reporting_constants.PROGRESS_NUM_FAILURES_KEY: 1,
reporting_constants.PROGRESS_ITEMS_TOTAL_KEY: 2,
reporting_constants.PROGRESS_DESCRIPTION_KEY: 'bar',
reporting_constants.PROGRESS_DETAILS_KEY: 'baz',
reporting_constants.PROGRESS_STEP_UUID: step.uuid
}
compare_dict(report[0], target_report)
开发者ID:hgschmie,项目名称:pulp,代码行数:25,代码来源:test_publish_step.py
示例4: test_process_step_failure_reported_on_metadata_finalized
def test_process_step_failure_reported_on_metadata_finalized(self, mock_get_units):
self.publisher.repo.content_unit_counts = {'FOO_TYPE': 1}
mock_get_units.return_value = ['mock_unit']
step = PublishStep('foo_step')
step.parent = self.publisher
step.finalize = Mock(side_effect=Exception())
self.assertRaises(Exception, step.process)
self.assertEquals(step.state, reporting_constants.STATE_FAILED)
self.assertEquals(step.progress_successes, 1)
self.assertEquals(step.progress_failures, 1)
self.assertEquals(step.total_units, 1)
开发者ID:hgschmie,项目名称:pulp,代码行数:11,代码来源:test_publish_step.py
示例5: test_publish_exception_still_removes_working_dir
def test_publish_exception_still_removes_working_dir(self, mock_rmtree):
step = PublishStep("foo")
work_dir = os.path.join(self.working_dir, 'foo')
step.working_dir = work_dir
step.process_lifecycle = Mock(side_effect=Exception('foo'))
step._build_final_report = Mock()
self.assertRaises(Exception, step.publish)
self.assertTrue(step.process_lifecycle.called)
self.assertFalse(step._build_final_report.called)
mock_rmtree.assert_called_once_with(work_dir, ignore_errors=True)
开发者ID:aweiteka,项目名称:pulp,代码行数:11,代码来源:test_publish_step.py
示例6: test_publish_distribution_packages_link_with_packagedir_delete_existing_packages
def test_publish_distribution_packages_link_with_packagedir_delete_existing_packages(self):
packages_dir = os.path.join(self.working_dir, 'Packages')
old_directory = os.path.join(self.working_dir, "foo")
os.mkdir(old_directory)
PublishStep._create_symlink(old_directory, packages_dir)
self.assertEquals(os.path.realpath(packages_dir), old_directory)
unit = self._generate_distribution_unit('one', {'packagedir': 'Packages'})
step = publish.PublishDistributionStep()
step.parent = self.publisher
step._publish_distribution_packages_link(unit)
self.assertFalse(os.path.islink(packages_dir))
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:11,代码来源:test_publish.py
示例7: test_record_failure
def test_record_failure(self):
publish_step = PublishStep('foo_step')
publish_step.parent = self.publisher
error_msg = 'Too bad, so sad'
try:
raise Exception(error_msg)
except Exception, e:
tb = sys.exc_info()[2]
publish_step._record_failure(e, tb)
开发者ID:hgschmie,项目名称:pulp,代码行数:12,代码来源:test_publish_step.py
示例8: test_clear_directory
def test_clear_directory(self):
for file_name in ('one', 'two', 'three'):
touch(os.path.join(self.working_dir, file_name))
os.makedirs(os.path.join(self.working_dir, 'four'))
self.assertEqual(len(os.listdir(self.working_dir)), 4)
step = PublishStep("foo")
step._clear_directory(self.working_dir, ['two'])
self.assertEqual(len(os.listdir(self.working_dir)), 1)
开发者ID:aweiteka,项目名称:pulp,代码行数:12,代码来源:test_publish_step.py
示例9: test_build_final_report_success
def test_build_final_report_success(self):
step_one = PublishStep('step_one')
step_one.state = reporting_constants.STATE_COMPLETE
step_two = PublishStep('step_two')
step_two.state = reporting_constants.STATE_COMPLETE
self.publisher.add_child(step_one)
self.publisher.add_child(step_two)
report = self.publisher._build_final_report()
self.assertTrue(report.success_flag)
开发者ID:hgschmie,项目名称:pulp,代码行数:12,代码来源:test_publish_step.py
示例10: test_create_symlink
def test_create_symlink(self):
source_path = os.path.join(self.working_dir, 'source')
link_path = os.path.join(self.published_dir, 'link')
touch(source_path)
self.assertFalse(os.path.exists(link_path))
PublishStep._create_symlink(source_path, link_path)
self.assertTrue(os.path.exists(link_path))
self.assertTrue(os.path.islink(link_path))
self.assertEqual(os.readlink(link_path), source_path)
开发者ID:aweiteka,项目名称:pulp,代码行数:12,代码来源:test_publish_step.py
示例11: test_create_symlink_link_exists_and_is_correct
def test_create_symlink_link_exists_and_is_correct(self):
new_source_path = os.path.join(self.working_dir, 'new_source')
link_path = os.path.join(self.published_dir, 'link')
touch(new_source_path)
os.symlink(new_source_path, link_path)
self.assertEqual(os.readlink(link_path), new_source_path)
PublishStep._create_symlink(new_source_path, link_path)
self.assertEqual(os.readlink(link_path), new_source_path)
开发者ID:aweiteka,项目名称:pulp,代码行数:13,代码来源:test_publish_step.py
示例12: __init__
def __init__(self, repo, publish_conduit, config):
"""
:param repo: Pulp managed Yum repository
:type repo: pulp.plugins.model.Repository
:param publish_conduit: Conduit providing access to relative Pulp functionality
:type publish_conduit: pulp.plugins.conduits.repo_publish.RepoPublishConduit
:param config: Pulp configuration for the distributor
:type config: pulp.plugins.config.PluginCallConfiguration
"""
super(GlancePublisher, self).__init__(constants.PUBLISH_STEP_GLANCE_PUBLISHER,
repo, publish_conduit, config)
publish_step = PublishStep(constants.PUBLISH_STEP_OVER_GLANCE_REST)
publish_step.description = _('Pushing files to Glance.')
self.add_child(PublishImagesStep())
开发者ID:bowlofeggs,项目名称:pulp_openstack,代码行数:15,代码来源:glance_publish_steps.py
示例13: test_process_lifecycle
def test_process_lifecycle(self):
step = PublishStep('parent')
step.process = Mock()
child_step = PublishStep('child')
child_step.process = Mock()
step.add_child(child_step)
step.report_progress = Mock()
step.process_lifecycle()
step.process.assert_called_once_with()
child_step.process.assert_called_once_with()
step.report_progress.assert_called_once_with(force=True)
开发者ID:aweiteka,项目名称:pulp,代码行数:13,代码来源:test_publish_step.py
示例14: test_create_symlink_link_exists
def test_create_symlink_link_exists(self):
old_source_path = os.path.join(self.working_dir, 'old_source')
new_source_path = os.path.join(self.working_dir, 'new_source')
link_path = os.path.join(self.published_dir, 'link')
touch(old_source_path)
touch(new_source_path)
os.symlink(old_source_path, link_path)
self.assertEqual(os.readlink(link_path), old_source_path)
link_path_with_slash = link_path + '/'
PublishStep._create_symlink(new_source_path, link_path_with_slash)
self.assertEqual(os.readlink(link_path), new_source_path)
开发者ID:aweiteka,项目名称:pulp,代码行数:17,代码来源:test_publish_step.py
示例15: test_cancel_before_processing
def test_cancel_before_processing(self):
self.publisher.repo.content_unit_counts = {'FOO_TYPE': 2}
step = PublishStep('foo_step')
step.is_skipped = Mock()
step.cancel()
step.process()
self.assertEquals(0, step.is_skipped.call_count)
开发者ID:hgschmie,项目名称:pulp,代码行数:7,代码来源:test_publish_step.py
示例16: test_get_progress_report_summary
def test_get_progress_report_summary(self):
parent_step = PublishStep('parent_step')
step = PublishStep('foo_step')
parent_step.add_child(step)
step.state = reporting_constants.STATE_COMPLETE
report = parent_step.get_progress_report_summary()
target_report = {
'foo_step': reporting_constants.STATE_COMPLETE
}
compare_dict(report, target_report)
开发者ID:hgschmie,项目名称:pulp,代码行数:10,代码来源:test_publish_step.py
示例17: test_publish
def test_publish(self, mock_rmtree):
step = PublishStep("foo")
work_dir = os.path.join(self.working_dir, 'foo')
step.working_dir = work_dir
step.process_lifecycle = Mock()
step._build_final_report = Mock()
step.publish()
self.assertTrue(step.process_lifecycle.called)
self.assertTrue(step._build_final_report.called)
mock_rmtree.assert_called_once_with(work_dir, ignore_errors=True)
开发者ID:aweiteka,项目名称:pulp,代码行数:11,代码来源:test_publish_step.py
示例18: test_process_child_on_error_notifies_parent
def test_process_child_on_error_notifies_parent(self):
step = PublishStep('parent')
child_step = PublishStep('child')
child_step.initialize = Mock(side_effect=Exception('boo'))
child_step.on_error = Mock(side_effect=Exception('flux'))
step.on_error = Mock()
step.add_child(child_step)
self.assertRaises(Exception, step.process_lifecycle)
self.assertEquals(reporting_constants.STATE_FAILED, step.state)
self.assertEquals(reporting_constants.STATE_FAILED, child_step.state)
self.assertTrue(step.on_error.called)
self.assertTrue(child_step.on_error.called)
开发者ID:aweiteka,项目名称:pulp,代码行数:15,代码来源:test_publish_step.py
示例19: test_publish
def test_publish(self):
# just test that process_lifecycle got called, that is where the functionality lives now
step = PublishStep("foo")
step.process_lifecycle = Mock()
step.publish()
self.assertTrue(step.process_lifecycle.called)
开发者ID:hgschmie,项目名称:pulp,代码行数:6,代码来源:test_publish_step.py
示例20: test_clear_children
def test_clear_children(self):
step = PublishStep("foo")
step.children = ['bar']
step.clear_children()
self.assertEquals(0, len(step.children))
开发者ID:hgschmie,项目名称:pulp,代码行数:5,代码来源:test_publish_step.py
注:本文中的pulp.plugins.util.publish_step.PublishStep类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论