本文整理汇总了Python中mistral.db.v2.sqlalchemy.api.create_workflow_execution函数的典型用法代码示例。如果您正苦于以下问题:Python create_workflow_execution函数的具体用法?Python create_workflow_execution怎么用?Python create_workflow_execution使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了create_workflow_execution函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_executions_to_time_filter
def test_executions_to_time_filter(self):
with db_api.transaction(read_only=True):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
ctx = {
'__execution': {
'id': 'some'
}
}
result = self._evaluator.evaluate(
'_|executions(to_time="2020-01-01")', ctx
)
self.assertEqual([created0, created1], result)
result = self._evaluator.evaluate(
'_|executions(to_time="2016-12-01 15:01:00")', ctx
)
self.assertEqual([created0], result)
result = self._evaluator.evaluate(
'_|executions(id="two", to_time="2016-12-01 15:01:00")', ctx
)
self.assertEqual([], result)
开发者ID:openstack,项目名称:mistral,代码行数:28,代码来源:test_jinja_expression.py
示例2: test_get_workflow_executions
def test_get_workflow_executions(self):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
db_api.create_workflow_execution(WF_EXECS[1])
fetched = db_api.get_workflow_executions(
state=WF_EXECS[0]['state']
)
self.assertEqual(1, len(fetched))
self.assertEqual(created0, fetched[0])
开发者ID:adarshkoyya,项目名称:mistral,代码行数:10,代码来源:test_sqlalchemy_db_api.py
示例3: test_executions
def test_executions(self):
with db_api.transaction(read_only=True):
created0 = db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
ctx = {
'__execution': {
'id': 'some'
}
}
result = self._evaluator.evaluate('_|executions()', ctx)
self.assertEqual([created0, created1], result)
开发者ID:openstack,项目名称:mistral,代码行数:14,代码来源:test_jinja_expression.py
示例4: test_create_or_update_task_execution
def test_create_or_update_task_execution(self):
id = 'not-existing-id'
self.assertIsNone(db_api.load_task_execution(id))
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
values = copy.deepcopy(TASK_EXECS[0])
values.update({'workflow_execution_id': wf_ex.id})
created = db_api.create_or_update_task_execution(id, values)
self.assertIsNotNone(created)
self.assertIsNotNone(created.id)
updated = db_api.create_or_update_task_execution(
created.id,
{'state': 'RUNNING'}
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_task_execution(updated.id).state
)
fetched = db_api.get_task_execution(created.id)
self.assertEqual(updated, fetched)
开发者ID:adarshkoyya,项目名称:mistral,代码行数:29,代码来源:test_sqlalchemy_db_api.py
示例5: test_commit_multiple_objects
def test_commit_multiple_objects(self):
db_api.start_tx()
try:
created = db_api.create_workflow_execution(WF_EXECS[0])
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(created, fetched)
created_wb = db_api.create_workbook(WORKBOOKS[0])
fetched_wb = db_api.get_workbook(created_wb.name)
self.assertEqual(created_wb, fetched_wb)
self.assertTrue(self.is_db_session_open())
db_api.commit_tx()
finally:
db_api.end_tx()
self.assertFalse(self.is_db_session_open())
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(created, fetched)
fetched_wb = db_api.get_workbook(created_wb.name)
self.assertEqual(created_wb, fetched_wb)
self.assertFalse(self.is_db_session_open())
开发者ID:adarshkoyya,项目名称:mistral,代码行数:29,代码来源:test_sqlalchemy_db_api.py
示例6: test_rollback_multiple_objects
def test_rollback_multiple_objects(self):
db_api.start_tx()
try:
created = db_api.create_workflow_execution(WF_EXECS[0])
fetched = db_api.get_workflow_execution(created['id'])
self.assertEqual(created, fetched)
created_wb = db_api.create_workbook(WORKBOOKS[0])
fetched_wb = db_api.get_workbook(created_wb.name)
self.assertEqual(created_wb, fetched_wb)
self.assertTrue(self.is_db_session_open())
db_api.rollback_tx()
finally:
db_api.end_tx()
self.assertFalse(self.is_db_session_open())
self.assertRaises(
exc.NotFoundException,
db_api.get_workflow_execution,
created.id
)
self.assertRaises(
exc.NotFoundException,
db_api.get_workbook,
created_wb.name
)
self.assertFalse(self.is_db_session_open())
开发者ID:adarshkoyya,项目名称:mistral,代码行数:32,代码来源:test_sqlalchemy_db_api.py
示例7: test_task_execution_repr
def test_task_execution_repr(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
values = copy.deepcopy(TASK_EXECS[0])
values.update({'workflow_execution_id': wf_ex.id})
s = db_api.create_task_execution(values).__repr__()
self.assertIn('TaskExecution ', s)
self.assertIn("'state': 'IDLE'", s)
self.assertIn("'name': 'my_task1'", s)
开发者ID:adarshkoyya,项目名称:mistral,代码行数:11,代码来源:test_sqlalchemy_db_api.py
示例8: test_iterate_column_names
def test_iterate_column_names(self):
wf_ex = db_api.create_workflow_execution(WF_EXEC)
self.assertIsNotNone(wf_ex)
c_names = [c_name for c_name in wf_ex.iter_column_names()]
expected = set(WF_EXEC.keys())
expected.remove('some_invalid_field')
self.assertEqual(expected, set(c_names))
开发者ID:openstack,项目名称:mistral,代码行数:12,代码来源:test_db_model.py
示例9: test_create_and_get_and_load_workflow_execution
def test_create_and_get_and_load_workflow_execution(self):
created = db_api.create_workflow_execution(WF_EXECS[0])
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(created, fetched)
fetched = db_api.load_workflow_execution(created.id)
self.assertEqual(created, fetched)
self.assertIsNone(db_api.load_workflow_execution("not-existing-id"))
开发者ID:adarshkoyya,项目名称:mistral,代码行数:12,代码来源:test_sqlalchemy_db_api.py
示例10: test_iterate_columns
def test_iterate_columns(self):
wf_ex = db_api.create_workflow_execution(WF_EXEC)
self.assertIsNotNone(wf_ex)
values = {c_name: c_val for c_name, c_val in wf_ex.iter_columns()}
expected = copy.copy(WF_EXEC)
del expected['some_invalid_field']
self.assertDictEqual(expected, values)
开发者ID:openstack,项目名称:mistral,代码行数:12,代码来源:test_db_model.py
示例11: test_update_workflow_execution_env_wrong_state
def test_update_workflow_execution_env_wrong_state(self):
wf_exec_template = {
'spec': {},
'start_params': {'task': 'my_task1'},
'state': 'PAUSED',
'state_info': None,
'params': {'env': {'k1': 'abc'}},
'created_at': None,
'updated_at': None,
'context': {'__env': {'k1': 'fee fi fo fum'}},
'task_id': None,
'trust_id': None,
'description': None,
'output': None
}
states_not_permitted = [
states.RUNNING,
states.RUNNING_DELAYED,
states.SUCCESS,
states.WAITING
]
update_env = {'k1': 'foobar'}
for state in states_not_permitted:
wf_exec = copy.deepcopy(wf_exec_template)
wf_exec['state'] = state
with db_api.transaction():
created = db_api.create_workflow_execution(wf_exec)
self.assertIsNone(created.updated_at)
self.assertRaises(
exc.NotAllowedException,
wf_service.update_workflow_execution_env,
created,
update_env
)
fetched = db_api.get_workflow_execution(created.id)
self.assertDictEqual(
wf_exec['params']['env'],
fetched.params['env']
)
self.assertDictEqual(
wf_exec['context']['__env'],
fetched.context['__env']
)
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:52,代码来源:test_workflow_service.py
示例12: test_executions_state_filter
def test_executions_state_filter(self):
with db_api.transaction(read_only=True):
db_api.create_workflow_execution(WF_EXECS[0])
created1 = db_api.create_workflow_execution(WF_EXECS[1])
ctx = {
'__execution': {
'id': 'some'
}
}
result = self._evaluator.evaluate(
'_|executions(state="RUNNING")', ctx
)
self.assertEqual([created1], result)
result = self._evaluator.evaluate(
'_|executions(id="one", state="RUNNING")', ctx
)
self.assertEqual([], result)
开发者ID:openstack,项目名称:mistral,代码行数:22,代码来源:test_jinja_expression.py
示例13: _run_tx1
def _run_tx1():
with db_api.transaction():
wf_ex = db_api.create_workflow_execution(WF_EXEC)
# Release TX2 so it can read data.
sem2.release()
print("Created: %s" % wf_ex)
print("Holding TX1...")
sem1.acquire()
print("TX1 completed.")
开发者ID:ainkov,项目名称:mistral,代码行数:13,代码来源:test_sqlite_transactions.py
示例14: test_delete_workflow_execution
def test_delete_workflow_execution(self):
created = db_api.create_workflow_execution(WF_EXECS[0])
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(created, fetched)
db_api.delete_workflow_execution(created.id)
self.assertRaises(
exc.NotFoundException,
db_api.get_workflow_execution,
created.id
)
开发者ID:adarshkoyya,项目名称:mistral,代码行数:14,代码来源:test_sqlalchemy_db_api.py
示例15: test_trim_status_info
def test_trim_status_info(self):
created = db_api.create_workflow_execution(WF_EXECS[0])
self.assertIsNone(created.updated_at)
updated = db_api.update_workflow_execution(
created.id,
{'state': 'FAILED', 'state_info': ".." * 1024}
)
self.assertEqual('FAILED', updated.state)
state_info = db_api.load_execution(updated.id).state_info
self.assertEqual(
1023,
len(state_info)
)
开发者ID:adarshkoyya,项目名称:mistral,代码行数:16,代码来源:test_sqlalchemy_db_api.py
示例16: test_update_workflow_execution_env
def test_update_workflow_execution_env(self):
wf_exec_template = {
'spec': {},
'start_params': {'task': 'my_task1'},
'state': 'PAUSED',
'state_info': None,
'params': {'env': {'k1': 'abc'}},
'created_at': None,
'updated_at': None,
'context': {'__env': {'k1': 'fee fi fo fum'}},
'task_id': None,
'trust_id': None,
'description': None,
'output': None
}
states_permitted = [
states.IDLE,
states.PAUSED,
states.ERROR
]
update_env = {'k1': 'foobar'}
for state in states_permitted:
wf_exec = copy.deepcopy(wf_exec_template)
wf_exec['state'] = state
with db_api.transaction():
created = db_api.create_workflow_execution(wf_exec)
self.assertIsNone(created.updated_at)
updated = wf_service.update_workflow_execution_env(
created,
update_env
)
self.assertDictEqual(update_env, updated.params['env'])
self.assertDictEqual(update_env, updated.context['__env'])
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
开发者ID:PrinceKatiyar,项目名称:mistral,代码行数:45,代码来源:test_workflow_service.py
示例17: test_create_and_get_and_load_task_execution
def test_create_and_get_and_load_task_execution(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
values = copy.deepcopy(TASK_EXECS[0])
values.update({'workflow_execution_id': wf_ex.id})
created = db_api.create_task_execution(values)
fetched = db_api.get_task_execution(created.id)
self.assertEqual(created, fetched)
self.assertNotIsInstance(fetched.workflow_execution, list)
fetched = db_api.load_task_execution(created.id)
self.assertEqual(created, fetched)
self.assertIsNone(db_api.load_task_execution("not-existing-id"))
开发者ID:adarshkoyya,项目名称:mistral,代码行数:19,代码来源:test_sqlalchemy_db_api.py
示例18: test_delete_task_execution
def test_delete_task_execution(self):
wf_ex = db_api.create_workflow_execution(WF_EXECS[0])
values = copy.deepcopy(TASK_EXECS[0])
values.update({'workflow_execution_id': wf_ex.id})
created = db_api.create_task_execution(values)
fetched = db_api.get_task_execution(created.id)
self.assertEqual(created, fetched)
db_api.delete_task_execution(created.id)
self.assertRaises(
exc.NotFoundException,
db_api.get_task_execution,
created.id
)
开发者ID:adarshkoyya,项目名称:mistral,代码行数:19,代码来源:test_sqlalchemy_db_api.py
示例19: test_correct_locking
def test_correct_locking(self):
wf_ex = db_api.create_workflow_execution(WF_EXEC)
threads = []
number = 500
for i in range(1, number):
threads.append(
eventlet.spawn(self._run_correct_locking, wf_ex)
)
[t.wait() for t in threads]
[t.kill() for t in threads]
wf_ex = db_api.get_workflow_execution(wf_ex.id)
print("Correct locking test gave object name: %s" % wf_ex.name)
self.assertEqual(str(number), wf_ex.name)
开发者ID:openstack,项目名称:mistral,代码行数:20,代码来源:test_locking.py
示例20: test_update_workflow_execution
def test_update_workflow_execution(self):
created = db_api.create_workflow_execution(WF_EXECS[0])
self.assertIsNone(created.updated_at)
updated = db_api.update_execution(
created.id,
{'state': 'RUNNING', 'state_info': "Running..."}
)
self.assertEqual('RUNNING', updated.state)
self.assertEqual(
'RUNNING',
db_api.load_workflow_execution(updated.id).state
)
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
开发者ID:adarshkoyya,项目名称:mistral,代码行数:20,代码来源:test_sqlalchemy_db_api.py
注:本文中的mistral.db.v2.sqlalchemy.api.create_workflow_execution函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论