本文整理汇总了Python中pulp.bindings.responses.Task类的典型用法代码示例。如果您正苦于以下问题:Python Task类的具体用法?Python Task怎么用?Python Task使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Task类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_display_with_keyboard_interrupt
def test_display_with_keyboard_interrupt(self, mock_display):
# Setup
task_list = []
for i in range(0, 3):
task = Task(TASK_TEMPLATE)
task.task_id = 'task_%s' % i
task_list.append(task)
# Side effect to simulate keyboard interrupt
def interrupt(context, renderer, task_id, quiet_waiting=True):
if task_id == 'task_1':
raise KeyboardInterrupt()
else:
return task_id
mock_display.side_effect = interrupt
# Test
status._display_status(self.context, self.renderer, task_list)
# Verify
self.assertEqual(2, mock_display.call_count) # not called for the third task
for i, call_args in enumerate(mock_display.call_args_list):
self.assertEqual(call_args[0][0], self.context)
self.assertEqual(call_args[0][1], self.renderer)
self.assertEqual(call_args[0][2], 'task_%s' % i)
expected_quiet = i > 0
self.assertEqual(call_args[1]['quiet_waiting'], expected_quiet)
开发者ID:bartwo,项目名称:pulp,代码行数:28,代码来源:test_client_sync_status.py
示例2: test_task_header_action_tag_only
def test_task_header_action_tag_only(self):
task = Task({})
task.tags = [tags.action_tag(tags.ACTION_UPDATE_DISTRIBUTOR)]
self.command.task_header(task)
self.assertEqual(self.prompt.get_write_tags(), [tags.ACTION_UPDATE_DISTRIBUTOR])
开发者ID:aweiteka,项目名称:pulp,代码行数:7,代码来源:test_cudl.py
示例3: test_task_header_unrelated_tags
def test_task_header_unrelated_tags(self):
task = Task({})
task.tags = ['foo', 'bar']
self.command.task_header(task)
self.assertEqual(self.prompt.get_write_tags(), [])
开发者ID:aweiteka,项目名称:pulp,代码行数:7,代码来源:test_cudl.py
示例4: test_task_header_no_tags
def test_task_header_no_tags(self):
task = Task({})
task.tags = []
self.command.task_header(task)
self.assertEqual(self.prompt.get_write_tags(), [])
开发者ID:aweiteka,项目名称:pulp,代码行数:7,代码来源:test_cudl.py
示例5: test_succeeded_hands_off_errors
def test_succeeded_hands_off_errors(self, mock_render):
task = Task(self.TASK_TEMPLATE)
task.result = {
'details': self._generate_details({'errors': {'foo/bar': {}}}),
'num_changes': 1
}
self.command.succeeded('', task)
mock_render.assert_called_once_with(task.result)
开发者ID:abhaychrungoo,项目名称:pulp_puppet,代码行数:9,代码来源:test_consumer_content.py
示例6: test_succeeded_one_change
def test_succeeded_one_change(self):
task = Task(self.TASK_TEMPLATE)
task.result = {
'details': self._generate_details(),
'num_changes': 1
}
self.command.succeeded('', task)
tags = self.prompt.get_write_tags()
self.assertTrue(content.TAG_CHANGE_MADE in tags)
开发者ID:abhaychrungoo,项目名称:pulp_puppet,代码行数:10,代码来源:test_consumer_content.py
示例7: test_succeeded_no_change
def test_succeeded_no_change(self):
task = Task(self.TASK_TEMPLATE)
task.result = {
'details': {constants.TYPE_PUPPET_MODULE: {'details': {}}},
'num_changes': 0
}
self.command.succeeded('', task)
tags = self.prompt.get_write_tags()
self.assertTrue(content.TAG_NO_CHANGES in tags)
开发者ID:abhaychrungoo,项目名称:pulp_puppet,代码行数:10,代码来源:test_consumer_content.py
示例8: test_display_status_rejected
def test_display_status_rejected(self):
# Setup
rejected_task = Task(TASK_TEMPLATE)
rejected_task.response = RESPONSE_REJECTED
# Test
status._display_status(self.context, self.renderer, [rejected_task])
# Verify
expected_tags = ['ctrl-c', 'rejected-msg', 'rejected-desc']
self.assertEqual(expected_tags, self.prompt.get_write_tags())
开发者ID:bartwo,项目名称:pulp,代码行数:11,代码来源:test_client_sync_status.py
示例9: test_display_status_postponed
def test_display_status_postponed(self):
# Setup
postponed_task = Task(TASK_TEMPLATE)
postponed_task.response = RESPONSE_POSTPONED
postponed_task.state = STATE_WAITING
# Test
status._display_status(self.context, self.renderer, [postponed_task])
# Verify
expected_tags = ['ctrl-c', 'postponed']
self.assertEqual(expected_tags, self.prompt.get_write_tags())
开发者ID:bartwo,项目名称:pulp,代码行数:12,代码来源:test_client_sync_status.py
示例10: test_succeeded_multiple_changes
def test_succeeded_multiple_changes(self):
task = Task(self.TASK_TEMPLATE)
task.result = {
'details': self._generate_details(),
'num_changes': 2
}
self.command.succeeded('', task)
tags = self.prompt.get_write_tags()
self.assertTrue(content.TAG_CHANGE_MADE in tags)
# make sure it's just 1 message even though there were 2 changes
self.assertEqual(len(filter(lambda x: x==content.TAG_CHANGE_MADE, tags)), 1)
开发者ID:abhaychrungoo,项目名称:pulp_puppet,代码行数:12,代码来源:test_consumer_content.py
示例11: test_task_header_with_dist_tags
def test_task_header_with_dist_tags(self):
task = Task({})
task.tags = [
tags.action_tag(tags.ACTION_UPDATE_DISTRIBUTOR),
tags.resource_tag(tags.RESOURCE_REPOSITORY_DISTRIBUTOR_TYPE, 'some_distributor'),
]
self.command.task_header(task)
self.assertEqual(self.prompt.get_write_tags(), [tags.ACTION_UPDATE_DISTRIBUTOR])
# the message in this case should end with the distributor type
self.assertTrue(self.recorder.lines[0].strip().endswith('some_distributor'))
开发者ID:aweiteka,项目名称:pulp,代码行数:12,代码来源:test_cudl.py
示例12: poll
def poll(task_id):
task = Task(TASK_TEMPLATE)
# Wait for the first 2 polls
if mock_get.call_count < 3:
task.state = STATE_WAITING
# Running for the next 10
elif mock_get.call_count < 13:
task.state = STATE_RUNNING
# Finally finish
else:
task.state = STATE_FINISHED
return Response(200, task)
开发者ID:bartwo,项目名称:pulp,代码行数:16,代码来源:test_client_sync_status.py
示例13: test_poll_additional_spawned_tasks_list
def test_poll_additional_spawned_tasks_list(self):
"""
Test polling over a list where a task has spawned additional tasks that need to be
added to the polling list
Task Count: 3
Statuses: None; normal progression
Result: All Success
"""
# Setup
sim = TaskSimulator()
sim.install(self.bindings)
states_1 = [STATE_WAITING, STATE_RUNNING, STATE_FINISHED]
states_2 = [STATE_WAITING, STATE_WAITING, STATE_RUNNING, STATE_FINISHED]
states_3 = [STATE_WAITING, STATE_RUNNING, STATE_RUNNING, STATE_RUNNING, STATE_FINISHED]
task_1_states = sim.add_task_states('1', states_1)
sim.add_task_states('2', states_2)
sim.add_task_states('3', states_3)
container_task = Task({})
task_list = sim.get_all_tasks().response_body
task_1_states[2].spawned_tasks = task_list[1:]
# Test
container_task.spawned_tasks = sim.get_all_tasks().response_body
completed_tasks = self.command.poll(task_list[:1], {})
expected_tags = ['abort', # default, always displayed
# states_1
'delayed-spinner', 'running-spinner', 'succeeded',
# states_2
'header', 'delayed-spinner', 'running-spinner', 'running-spinner',
'succeeded',
# states_3
'header', 'delayed-spinner', 'running-spinner', 'running-spinner',
'running-spinner', 'succeeded',
]
found_tags = self.prompt.get_write_tags()
self.assertEqual(expected_tags, found_tags)
self.assertTrue(isinstance(completed_tasks, list))
self.assertEqual(3, len(completed_tasks))
for i in range(0, 3):
self.assertEqual(STATE_FINISHED, completed_tasks[i].state)
开发者ID:preethit,项目名称:pulp-1,代码行数:46,代码来源:test_polling.py
示例14: test_poll_spawned_tasks_list
def test_poll_spawned_tasks_list(self):
"""
Test the structure where a command has both synchronous and asynchronous sections
and returns a task structure with a result and a spawned_tasks list
Task Count: 3
Statuses: None; normal progression
Result: All Success
"""
# Setup
sim = TaskSimulator()
sim.install(self.bindings)
states_1 = [STATE_WAITING, STATE_RUNNING, STATE_FINISHED]
states_2 = [STATE_WAITING, STATE_WAITING, STATE_RUNNING, STATE_FINISHED]
states_3 = [STATE_WAITING, STATE_RUNNING, STATE_RUNNING, STATE_RUNNING, STATE_FINISHED]
sim.add_task_states('1', states_1)
sim.add_task_states('2', states_2)
sim.add_task_states('3', states_3)
container_task = Task({})
# Test
container_task.spawned_tasks = sim.get_all_tasks().response_body
completed_tasks = self.command.poll(container_task, {})
expected_tags = ['abort', # default, always displayed
# states_1
'header', 'delayed-spinner', 'running-spinner', 'running-spinner', 'succeeded',
# states_2
'header', 'delayed-spinner', 'delayed-spinner', 'running-spinner', 'running-spinner',
'succeeded',
# states_3
'header', 'delayed-spinner', 'running-spinner', 'running-spinner',
'running-spinner', 'running-spinner', 'succeeded',
]
found_tags = self.prompt.get_write_tags()
self.assertEqual(expected_tags, found_tags)
self.assertTrue(isinstance(completed_tasks, list))
self.assertEqual(3, len(completed_tasks))
for i in range(0, 3):
self.assertEqual(STATE_FINISHED, completed_tasks[i].state)
开发者ID:preethit,项目名称:pulp-1,代码行数:45,代码来源:test_polling.py
示例15: test_display_status
def test_display_status(self, mock_display):
# Setup
task_list = []
for i in range(0, 2):
task = Task(TASK_TEMPLATE)
task.task_id = 'task_%s' % i
task_list.append(task)
# Test
status._display_status(self.context, self.renderer, task_list)
# Verify
self.assertEqual(2, mock_display.call_count)
for i, call_args in enumerate(mock_display.call_args_list):
self.assertEqual(call_args[0][0], self.context)
self.assertEqual(call_args[0][1], self.renderer)
self.assertEqual(call_args[0][2], 'task_%s' % i)
expected_quiet = i > 0
self.assertEqual(call_args[1]['quiet_waiting'], expected_quiet)
开发者ID:bartwo,项目名称:pulp,代码行数:20,代码来源:test_client_sync_status.py
注:本文中的pulp.bindings.responses.Task类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论