本文整理汇总了Python中tests.assertions.assert_length函数的典型用法代码示例。如果您正苦于以下问题:Python assert_length函数的具体用法?Python assert_length怎么用?Python assert_length使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了assert_length函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_start
def test_start(self):
self.job_run._do_start = Turtle()
assert self.job_run.start()
assert_call(self.job_run.notify, 0, self.job_run.EVENT_START)
assert_call(self.job_run._do_start, 0)
assert_length(self.job_run.notify.calls, 1)
开发者ID:Bklyn,项目名称:Tron,代码行数:7,代码来源:jobrun_test.py
示例2: test_start_no_startable_action_runs
def test_start_no_startable_action_runs(self):
self.job_run._do_start = Turtle()
self.job_run.action_runs.has_startable_action_runs = False
assert not self.job_run.start()
assert_call(self.job_run.notify, 0, self.job_run.EVENT_START)
assert_length(self.job_run.notify.calls, 1)
开发者ID:Bklyn,项目名称:Tron,代码行数:7,代码来源:jobrun_test.py
示例3: test_manual_start_with_run_time
def test_manual_start_with_run_time(self):
run_time = datetime.datetime(2012, 3, 14, 15, 9, 26)
manual_runs = self.job_scheduler.manual_start(run_time)
self.job.build_new_runs.assert_called_with(run_time, manual=True)
assert_length(manual_runs, 1)
self.manual_run.start.assert_called_once_with()
开发者ID:ContextLogic,项目名称:Tron,代码行数:7,代码来源:job_test.py
示例4: test_handler_finished_with_cleanup
def test_handler_finished_with_cleanup(self):
self.job_run.action_runs.cleanup_action_run = Turtle(is_done=False)
self.job_run.finalize = Turtle()
self.job_run.handler(None, actionrun.ActionRun.STATE_SUCCEEDED)
assert_length(self.job_run.finalize.calls, 0)
assert_call(self.job_run.action_runs.cleanup_action_run.start, 0)
开发者ID:Bklyn,项目名称:Tron,代码行数:7,代码来源:jobrun_test.py
示例5: test_from_state_node_no_longer_exists
def test_from_state_node_no_longer_exists(self):
run = jobrun.JobRun.from_state(self.state_data, self.action_graph,
self.output_path, self.context, self.node_pool)
assert_length(run.action_runs.run_map, 1)
assert_equal(run.job_name, 'thejobname')
assert_equal(run.run_time, self.run_time)
assert_equal(run.node, self.node_pool)
开发者ID:ContextLogic,项目名称:Tron,代码行数:7,代码来源:jobrun_test.py
示例6: test_cancel_schedules_a_new_run
def test_cancel_schedules_a_new_run(self):
config = BASIC_CONFIG + dedent("""
jobs:
- name: "a_job"
node: local
schedule: "daily 05:00:00"
actions:
- name: "first_action"
command: "echo OK"
""")
client = self.sandbox.client
self.sandbox.save_config(config)
self.sandbox.trond()
job_url = client.get_url('MASTER.a_job')
self.sandbox.tronctl(['cancel', 'MASTER.a_job.0'])
def wait_on_cancel():
return len(client.job(job_url)['runs']) == 2
sandbox.wait_on_sandbox(wait_on_cancel)
job_runs = client.job(job_url)['runs']
assert_length(job_runs, 2)
run_states = [run['state'] for run in job_runs]
assert_equal(run_states, ['SCHE', 'CANC'])
开发者ID:strategist922,项目名称:Tron,代码行数:25,代码来源:trond_test.py
示例7: test_build_action_run_collection
def test_build_action_run_collection(self):
collection = ActionRunFactory.build_action_run_collection(self.job_run)
assert_equal(collection.action_graph, self.action_graph)
assert_in('act1', collection.run_map)
assert_in('act2', collection.run_map)
assert_length(collection.run_map, 2)
assert_equal(collection.run_map['act1'].action_name, 'act1')
开发者ID:Bklyn,项目名称:Tron,代码行数:7,代码来源:actionrun_test.py
示例8: test_handler_not_done
def test_handler_not_done(self):
self.job_run.action_runs.is_active = True
self.job_run._start_action_runs = lambda: []
self.job_run.finalize = Turtle()
self.job_run.handler(None, actionrun.ActionRun.STATE_SUCCEEDED)
assert_length(self.job_run.finalize.calls, 0)
开发者ID:Bklyn,项目名称:Tron,代码行数:7,代码来源:jobrun_test.py
示例9: test_handler_no_queued
def test_handler_no_queued(self):
self.job_scheduler.run_job = Turtle()
def get_queued(state):
if state == ActionRun.STATE_QUEUED:
return []
self.job.runs.get_runs_by_state = get_queued
self.job_scheduler.handler(self.job, job.Job.NOTIFY_RUN_DONE)
assert_length(self.job_scheduler.run_job.calls, 0)
开发者ID:ninsen,项目名称:Tron,代码行数:8,代码来源:job_test.py
示例10: test_get_runs_to_schedule_no_last_run
def test_get_runs_to_schedule_no_last_run(self):
self.job.runs.get_newest = lambda **kwargs: None
job_runs = list(self.job_scheduler.get_runs_to_schedule())
assert_length(self.job.scheduler.next_run_time.calls, 1)
assert_length(job_runs, 1)
# This should return a JobRun which has the job attached as an observer
assert_call(job_runs[0].attach, 0, True, self.job)
开发者ID:ninsen,项目名称:Tron,代码行数:8,代码来源:job_test.py
示例11: test_get_runs_to_schedule_no_pending
def test_get_runs_to_schedule_no_pending(self):
job_runs = list(self.job_scheduler.get_runs_to_schedule())
assert_call(self.job.runs.get_newest, 0, include_manual=False)
assert_length(self.job.scheduler.next_run_time.calls, 1)
assert_length(job_runs, 1)
# This should return a JobRun which has the job attached as an observer
assert_call(job_runs[0].attach, 0, True, self.job)
开发者ID:ninsen,项目名称:Tron,代码行数:8,代码来源:job_test.py
示例12: test_get_active
def test_get_active(self):
starting_run = self._mock_run(
run_num=self.run_collection.next_run_num(),
state=actionrun.ActionRun.STATE_STARTING)
self.run_collection.runs.appendleft(starting_run)
active = list(self.run_collection.get_active())
assert_length(active, 2)
assert_equal(active, [starting_run, self.job_runs[1]])
开发者ID:ContextLogic,项目名称:Tron,代码行数:8,代码来源:jobrun_test.py
示例13: test_remove_old_runs
def test_remove_old_runs(self):
self.run_collection.run_limit = 1
self.run_collection.remove_old_runs()
assert_length(self.run_collection.runs, 1)
assert_call(self.job_runs[-1].cleanup, 0)
for job_run in self.run_collection.runs:
assert_length(job_run.cancel.calls, 0)
开发者ID:pombredanne,项目名称:Tron,代码行数:8,代码来源:jobrun_test.py
示例14: test_run_job_schedule_on_complete
def test_run_job_schedule_on_complete(self):
self.job_scheduler.schedule = Turtle()
self.scheduler.schedule_on_complete = True
self.job.runs.get_active = lambda s: []
job_run = Turtle(is_cancelled=False)
self.job_scheduler.run_job(job_run)
assert_length(job_run.start.calls, 1)
assert_length(self.job_scheduler.schedule.calls, 0)
开发者ID:ContextLogic,项目名称:Tron,代码行数:8,代码来源:job_test.py
示例15: test_get_runs_to_schedule_no_last_run
def test_get_runs_to_schedule_no_last_run(self):
self.job.runs.get_newest.return_value = None
job_runs = list(self.job_scheduler.get_runs_to_schedule(False))
self.job.scheduler.next_run_time.assert_called_once_with(None)
assert_length(job_runs, 1)
# This should return a JobRun which has the job attached as an observer
job_runs[0].attach.assert_any_call(True, self.job)
开发者ID:ContextLogic,项目名称:Tron,代码行数:8,代码来源:job_test.py
示例16: test_restore_state_no_state
def test_restore_state_no_state(self):
def restore(jobs, services):
return {}, {}
self.mcp.state_watcher = Turtle(restore=restore)
self.mcp.restore_state()
for job in self.mcp.jobs.values():
assert_length(job.restore_job_state.calls, 0)
for service in self.mcp.services.values():
assert_length(service.restore_service_state.calls, 0)
开发者ID:strategist922,项目名称:Tron,代码行数:9,代码来源:mcp_test.py
示例17: test_pending
def test_pending(self):
run_num = self.run_collection.next_run_num()
scheduled_run = self._mock_run(
run_num=run_num,
state=actionrun.ActionRun.STATE_SCHEDULED)
self.run_collection.runs.appendleft(scheduled_run)
pending = list(self.run_collection.get_pending())
assert_length(pending, 2)
assert_equal(pending, [scheduled_run, self.job_runs[0]])
开发者ID:ContextLogic,项目名称:Tron,代码行数:9,代码来源:jobrun_test.py
示例18: test_build_and_sort
def test_build_and_sort(self):
autospec_method(self.collection.sort)
count = 4
builder, seq = mock.Mock(), range(count)
instances = self.collection._build_and_sort(builder, seq)
self.collection.sort.assert_called_with()
assert_equal(builder.mock_calls, [mock.call(i) for i in seq])
assert_length(instances, count)
assert_equal(instances, self.collection.instances)
开发者ID:Feriority,项目名称:Tron,代码行数:9,代码来源:serviceinstance_test.py
示例19: test_get_active_with_node
def test_get_active_with_node(self):
starting_run = self._mock_run(
run_num=self.run_collection.next_run_num(),
state=actionrun.ActionRun.STATE_STARTING)
starting_run.node = 'differentnode'
self.run_collection.runs.appendleft(starting_run)
active = list(self.run_collection.get_active('anode'))
assert_length(active, 1)
assert_equal(active, [self.job_runs[1]])
开发者ID:ContextLogic,项目名称:Tron,代码行数:9,代码来源:jobrun_test.py
示例20: test_get_runs_to_schedule_queue_with_pending
def test_get_runs_to_schedule_queue_with_pending(self):
job_runs = list(self.job_scheduler.get_runs_to_schedule(False))
self.job.runs.get_newest.assert_called_with(include_manual=False)
self.job.scheduler.next_run_time.assert_called_once_with(
self.job.runs.get_newest.return_value.run_time)
assert_length(job_runs, 1)
# This should return a JobRun which has the job attached as an observer
job_runs[0].attach.assert_any_call(True, self.job)
开发者ID:ContextLogic,项目名称:Tron,代码行数:9,代码来源:job_test.py
注:本文中的tests.assertions.assert_length函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论