本文整理汇总了Python中tvb.core.entities.storage.dao.get_burst_by_id函数的典型用法代码示例。如果您正苦于以下问题:Python get_burst_by_id函数的具体用法?Python get_burst_by_id怎么用?Python get_burst_by_id使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_burst_by_id函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _wait_for_burst
def _wait_for_burst(self, burst_config, error_expected=False, timeout=40):
"""
Method that just waits until a burst configuration is finished or a maximum timeout is reached.
:param burst_config: the burst configuration that should be waited on
:param timeout: the maximum number of seconds to wait after the burst
"""
waited = 0
while burst_config.status == BurstConfiguration.BURST_RUNNING and waited <= timeout:
sleep(0.5)
waited += 0.5
burst_config = dao.get_burst_by_id(burst_config.id)
if waited > timeout:
self.burst_service.stop_burst(burst_config)
self.fail("Timed out waiting for simulations to finish. We will cancel it")
if error_expected and burst_config.status != BurstConfiguration.BURST_ERROR:
self.burst_service.stop_burst(burst_config)
self.fail("Burst should have failed due to invalid input data.")
if (not error_expected) and burst_config.status != BurstConfiguration.BURST_FINISHED:
msg = "Burst status should have been FINISH. Instead got %s %s" % (burst_config.status,
burst_config.error_message)
self.burst_service.stop_burst(burst_config)
self.fail(msg)
return burst_config
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:28,代码来源:burst_service_test.py
示例2: test_launch_group_burst_no_metric
def test_launch_group_burst_no_metric(self):
"""
Test the launch burst method from burst service. Try to launch a burst with test adapter which has
no metrics associated. This should fail.
"""
burst_config = self.burst_service.new_burst_configuration(self.test_project.id)
algo_id = self.flow_service.get_algorithm_by_module_and_class('tvb.tests.framework.adapters.testadapter1',
'TestAdapter1').id
kwargs_replica = {'test1_val1': '[0, 1, 2]', 'test1_val2': '0', model.RANGE_PARAMETER_1: 'test1_val1'}
test_portlet = dao.get_portlet_by_identifier(self.PORTLET_ID)
tab_config = {test_portlet.id: [(0, 0), (0, 1), (1, 0)]}
self._add_portlets_to_burst(burst_config, tab_config)
burst_config.update_simulator_configuration(kwargs_replica)
burst_id, _ = self.burst_service.launch_burst(burst_config, 0, algo_id, self.test_user.id)
burst_config = dao.get_burst_by_id(burst_id)
# Wait maximum x seconds for burst to finish
self._wait_for_burst(burst_config, error_expected=True)
launched_workflows = dao.get_workflows_for_burst(burst_id, is_count=True)
self.assertEqual(3, launched_workflows, "3 workflows should have been launched due to group parameter.")
op_groups = self.count_all_entities(model.OperationGroup)
dt_groups = self.count_all_entities(model.DataTypeGroup)
self.assertEqual(5, op_groups, "An operation group should have been created for each step.")
self.assertEqual(5, dt_groups, "An dataType group should have been created for each step.")
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:26,代码来源:burst_service_test.py
示例3: test_load_tab_configuration
def test_load_tab_configuration(self):
"""
Create a burst with some predefined portlets in some known positions. Check that the
load_tab_configuration method does what it is expected, and we get the portlets in the
corresponding tab positions.
"""
burst_config = self.burst_service.new_burst_configuration(self.test_project.id)
SIMULATOR_MODULE = 'tvb.tests.framework.adapters.testadapter1'
SIMULATOR_CLASS = 'TestAdapter1'
algo_id = self.flow_service.get_algorithm_by_module_and_class(SIMULATOR_MODULE, SIMULATOR_CLASS).id
kwargs_replica = {'test1_val1': '0', 'test1_val2': '0'}
test_portlet = dao.get_portlet_by_identifier(self.PORTLET_ID)
# Add test_portlet to positions (0,0), (0,1) and (1,0)
tab_config = {test_portlet.id: [(0, 0), (0, 1), (1, 0)]}
self._add_portlets_to_burst(burst_config, tab_config)
burst_config.update_simulator_configuration(kwargs_replica)
burst_id, _ = self.burst_service.launch_burst(burst_config, 0, algo_id, self.test_user.id)
burst_config = dao.get_burst_by_id(burst_id)
burst_config = self._wait_for_burst(burst_config)
burst_wf = dao.get_workflows_for_burst(burst_config.id)[0]
wf_step = dao.get_workflow_steps(burst_wf.id)[0]
burst_config.prepare_after_load()
for tab in burst_config.tabs:
for portlet in tab.portlets:
self.assertTrue(portlet is None, "Before loading the tab configuration all portlets should be none.")
burst_config = self.burst_service.load_tab_configuration(burst_config, wf_step.fk_operation)
for tab_idx, tab in enumerate(burst_config.tabs):
for portlet_idx, portlet in enumerate(tab.portlets):
if (tab_idx == 0 and portlet_idx in [0, 1]) or (tab_idx == 1 and portlet_idx == 0):
self.assertTrue(portlet is not None, "portlet gonfiguration not set")
self.assertEqual(test_portlet.id, portlet.portlet_id, "Unexpected portlet entity loaded.")
else:
self.assertTrue(portlet is None, "Before loading the tab configuration all portlets should be none")
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:33,代码来源:burst_service_test.py
示例4: test_launch_burst
def test_launch_burst(self):
"""
Test the launch burst method from burst service.
"""
first_step_algo = self.flow_service.get_algorithm_by_module_and_class(
'tvb.tests.framework.adapters.testadapter1', 'TestAdapter1')
adapter_interface = self.flow_service.prepare_adapter(self.test_project.id, first_step_algo)
ui_submited_simulator_iface_replica = {}
kwargs_replica = {}
for entry in adapter_interface:
ui_submited_simulator_iface_replica[entry[ABCAdapter.KEY_NAME]] = {model.KEY_PARAMETER_CHECKED: True,
model.KEY_SAVED_VALUE: entry[
ABCAdapter.KEY_DEFAULT]}
kwargs_replica[entry[ABCAdapter.KEY_NAME]] = entry[ABCAdapter.KEY_DEFAULT]
burst_config = self.burst_service.new_burst_configuration(self.test_project.id)
burst_config.simulator_configuration = ui_submited_simulator_iface_replica
test_portlet = dao.get_portlet_by_identifier(self.PORTLET_ID)
tab_config = {test_portlet.id: [(0, 0), (0, 1), (1, 0)]}
self._add_portlets_to_burst(burst_config, tab_config)
burst_config.update_simulator_configuration(kwargs_replica)
burst_id, _ = self.burst_service.launch_burst(burst_config, 0, first_step_algo.id, self.test_user.id)
burst_config = dao.get_burst_by_id(burst_id)
self.assertTrue(burst_config.status in (BurstConfiguration.BURST_FINISHED, BurstConfiguration.BURST_RUNNING),
"Burst not launched successfully!")
# Wait maximum x seconds for burst to finish
self._wait_for_burst(burst_config)
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:26,代码来源:burst_service_test.py
示例5: load_burst
def load_burst(self, burst_id):
"""
:param burst_id: the id of the burst that should be loaded
Having this input the method should:
- load the entity from the DB
- get all the workflow steps for the saved burst id
- go trough the visualization workflow steps to create the tab
configuration of the burst using the tab_index and index_in_tab
fields saved on each workflow_step
"""
burst = dao.get_burst_by_id(burst_id)
burst.prepare_after_load()
burst.reset_tabs()
burst_workflows = dao.get_workflows_for_burst(burst.id)
group_gid = None
if len(burst_workflows) == 1:
# A simple burst with no range parameters
burst = self.__populate_tabs_from_workflow(burst, burst_workflows[0])
elif len(burst_workflows) > 1:
# A burst workflow with a range of values, created multiple workflows and need
# to launch parameter space exploration with the resulted group
self.__populate_tabs_from_workflow(burst, burst_workflows[0])
executed_steps = dao.get_workflow_steps(burst_workflows[0].id)
operation = dao.get_operation_by_id(executed_steps[0].fk_operation)
if operation.operation_group:
workflow_group = dao.get_datatypegroup_by_op_group_id(operation.operation_group.id)
group_gid = workflow_group.gid
return burst, group_gid
开发者ID:lcosters,项目名称:tvb-framework,代码行数:33,代码来源:burst_service.py
示例6: _prepare_and_launch_async_burst
def _prepare_and_launch_async_burst(self, length=4, is_range=False, nr_ops=0, wait_to_finish=0):
"""
Launch an asynchronous burst with a simulation having all the default parameters, only the length received as
a parameters. This is launched with actual simulator and not with a dummy test adapter as replacement.
:param length: the length of the simulation in milliseconds. This is also used in case we need
a group burst, in which case we will have `nr_ops` simulations with lengths starting from
`length` to `length + nr_ops` milliseconds
:param is_range: a boolean which switches between a group burst and a non group burst.
!! even if `is_range` is `True` you still need a non-zero positive `nr_ops` to have an actual group burst
:param nr_ops: the number of operations in the group burst
"""
launch_params = self._prepare_simulation_params(length, is_range, nr_ops)
burst_config = self.burst_service.new_burst_configuration(self.test_project.id)
burst_config.update_simulator_configuration(launch_params)
burst_id = self.burst_service.launch_burst(burst_config, 0, self.sim_algorithm.id, self.test_user.id)[0]
burst_config = dao.get_burst_by_id(burst_id)
__timeout = 15
__waited = 0
# Wait a maximum of 15 seconds for the burst launch to be performed
while dao.get_workflows_for_burst(burst_config.id, is_count=True) == 0 and __waited < __timeout:
sleep(0.5)
__waited += 0.5
if wait_to_finish:
burst_config = self._wait_for_burst(burst_config, timeout=wait_to_finish)
return burst_config
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:28,代码来源:burst_service_test.py
示例7: test_remove_started_burst
def test_remove_started_burst(self):
"""
Try removing a started burst, which should result in it getting canceled.
"""
burst_entity = self._prepare_and_launch_async_burst(length=20000)
self.assertEqual(BurstConfiguration.BURST_RUNNING, burst_entity.status,
'A 20000 length simulation should still be started immediately after launch.')
got_deleted = self.burst_service.cancel_or_remove_burst(burst_entity.id)
self.assertFalse(got_deleted, "Burst should be cancelled before deleted.")
burst_entity = dao.get_burst_by_id(burst_entity.id)
self.assertEqual(BurstConfiguration.BURST_CANCELED, burst_entity.status,
'Deleting a running burst should just cancel it first.')
got_deleted = self.burst_service.cancel_or_remove_burst(burst_entity.id)
self.assertTrue(got_deleted, "Burst should be deleted if status is cancelled.")
burst_entity = dao.get_burst_by_id(burst_entity.id)
self.assertTrue(burst_entity is None, "Removing a canceled burst should delete it from db.")
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:16,代码来源:burst_service_test.py
示例8: test_launch_burst_invalid_portlet_analyzer_data
def test_launch_burst_invalid_portlet_analyzer_data(self):
"""
Test that burst is marked as error if invalid data is passed to the first step.
"""
algo_id = self.flow_service.get_algorithm_by_module_and_class('tvb.tests.framework.adapters.testadapter1',
'TestAdapter1').id
#Adapter tries to do an int(test1_val1) and int(test1_val2) so this should be valid
burst_config = self.burst_service.new_burst_configuration(self.test_project.id)
kwargs_replica = {'test1_val1': '1', 'test1_val2': '0'}
burst_config.update_simulator_configuration(kwargs_replica)
test_portlet = dao.get_portlet_by_identifier(self.PORTLET_ID)
portlet_configuration = self.burst_service.new_portlet_configuration(test_portlet.id)
#Portlet analyzer tries to do int(input) which should fail
declared_overwrites = {ADAPTER_PREFIX_ROOT + '0test_non_dt_input': 'asa'}
self.burst_service.update_portlet_configuration(portlet_configuration, declared_overwrites)
burst_config.tabs[0].portlets[0] = portlet_configuration
burst_id, _ = self.burst_service.launch_burst(burst_config, 0, algo_id, self.test_user.id)
burst_config = dao.get_burst_by_id(burst_id)
#Wait maximum x seconds for burst to finish
burst_config = self._wait_for_burst(burst_config, error_expected=True)
burst_wf = dao.get_workflows_for_burst(burst_config.id)[0]
wf_steps = dao.get_workflow_steps(burst_wf.id)
self.assertTrue(len(wf_steps) == 2,
"Should have exactly 2 wf steps. One for 'simulation' one for portlet analyze operation.")
simulator_op = dao.get_operation_by_id(wf_steps[0].fk_operation)
self.assertEqual(model.STATUS_FINISHED, simulator_op.status,
"First operation should be simulator which should have 'finished' status.")
portlet_analyze_op = dao.get_operation_by_id(wf_steps[1].fk_operation)
self.assertEqual(portlet_analyze_op.status, model.STATUS_ERROR,
"Second operation should be portlet analyze step which should have 'error' status.")
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:33,代码来源:burst_service_test.py
示例9: rename_burst
def rename_burst(burst_id, new_name):
"""
Rename the burst given by burst_id, setting it's new name to
burst_name.
"""
burst = dao.get_burst_by_id(burst_id)
burst.name = new_name
dao.store_entity(burst)
开发者ID:lcosters,项目名称:tvb-framework,代码行数:8,代码来源:burst_service.py
示例10: test_rename_burst
def test_rename_burst(self):
"""
Test that renaming of a burst functions properly.
"""
burst_config = TestFactory.store_burst(self.test_project.id)
self.burst_service.rename_burst(burst_config.id, "new_burst_name")
loaded_burst = dao.get_burst_by_id(burst_config.id)
self.assertEqual(loaded_burst.name, "new_burst_name", "Burst was not renamed properly.")
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:8,代码来源:burst_service_test.py
示例11: test_rename_burst
def test_rename_burst(self):
"""
Create and store a burst, then rename it and check that it
works as expected.
"""
burst = self._store_burst(self.test_project.id, 'started', {'test': 'test'}, 'burst1')
self.burst_c.rename_burst(burst.id, "test_new_burst_name")
renamed_burst = dao.get_burst_by_id(burst.id)
self.assertEqual(renamed_burst.name, "test_new_burst_name")
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:9,代码来源:burst_controller_test.py
示例12: test_store_burst_config
def test_store_burst_config(self):
"""
Test that a burst entity is properly stored in db.
"""
burst_config = TestFactory.store_burst(self.test_project.id)
self.assertTrue(burst_config.id is not None, 'Burst was not stored properly.')
stored_entity = dao.get_burst_by_id(burst_config.id)
self.assertTrue(stored_entity is not None, 'Burst was not stored properly.')
self._compare_bursts(burst_config, stored_entity)
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:9,代码来源:burst_service_test.py
示例13: prepare_next_step
def prepare_next_step(self, last_executed_op_id):
"""
If the operation with id 'last_executed_op_id' resulted after
the execution of a workflow step then this method will launch
the operation corresponding to the next step from the workflow.
"""
try:
current_step, next_workflow_step = self._get_data(last_executed_op_id)
if next_workflow_step is not None:
operation = dao.get_operation_by_id(next_workflow_step.fk_operation)
dynamic_param_names = next_workflow_step.dynamic_workflow_param_names
if len(dynamic_param_names) > 0:
op_params = json.loads(operation.parameters)
for param_name in dynamic_param_names:
dynamic_param = op_params[param_name]
former_step = dao.get_workflow_step_by_step_index(next_workflow_step.fk_workflow,
dynamic_param[wf_cfg.STEP_INDEX_KEY])
if type(dynamic_param[wf_cfg.DATATYPE_INDEX_KEY]) is IntType:
datatypes = dao.get_results_for_operation(former_step.fk_operation)
op_params[param_name] = datatypes[dynamic_param[wf_cfg.DATATYPE_INDEX_KEY]].gid
else:
previous_operation = dao.get_operation_by_id(former_step.fk_operation)
op_params[param_name] = json.loads(previous_operation.parameters)[
dynamic_param[wf_cfg.DATATYPE_INDEX_KEY]]
operation.parameters = json.dumps(op_params)
operation = dao.store_entity(operation)
return operation.id
else:
if current_step is not None:
current_workflow = dao.get_workflow_by_id(current_step.fk_workflow)
current_workflow.status = current_workflow.STATUS_FINISHED
dao.store_entity(current_workflow)
burst_entity = dao.get_burst_by_id(current_workflow.fk_burst)
parallel_workflows = dao.get_workflows_for_burst(burst_entity.id)
all_finished = True
for workflow in parallel_workflows:
if workflow.status == workflow.STATUS_STARTED:
all_finished = False
if all_finished:
self.mark_burst_finished(burst_entity, success=True)
disk_size = dao.get_burst_disk_size(burst_entity.id) # Transform from kB to MB
if disk_size > 0:
user = dao.get_project_by_id(burst_entity.fk_project).administrator
user.used_disk_space = user.used_disk_space + disk_size
dao.store_entity(user)
else:
operation = dao.get_operation_by_id(last_executed_op_id)
disk_size = dao.get_disk_size_for_operation(operation.id) # Transform from kB to MB
if disk_size > 0:
user = dao.get_user_by_id(operation.fk_launched_by)
user.used_disk_space = user.used_disk_space + disk_size
dao.store_entity(user)
return None
except Exception, excep:
self.logger.error(excep)
self.logger.exception(excep)
raise WorkflowInterStepsException(excep)
开发者ID:HuifangWang,项目名称:the-virtual-brain-website,代码行数:57,代码来源:workflow_service.py
示例14: export_burst
def export_burst(self, burst_id):
"""
:param burst_id: ID for existing burst
:return: JSON of burst representation.
"""
burst = dao.get_burst_by_id(burst_id)
if burst is None:
raise InvalidExportDataException("Could not find burst with ID " + str(burst_id))
burst_info = self._build_burst_export_dict(burst)
return json.dumps(burst_info)
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:11,代码来源:export_manager.py
示例15: _long_burst_launch
def _long_burst_launch(self, is_range=False):
self.burst_c.index()
connectivity = DatatypesFactory().create_connectivity()[1]
launch_params = copy.deepcopy(SIMULATOR_PARAMETERS)
launch_params['connectivity'] = dao.get_datatype_by_id(connectivity.id).gid
launch_params['simulation_length'] = '10000'
if is_range:
launch_params['conduction_speed'] = '[10,15,20]'
launch_params[model.RANGE_PARAMETER_1] = 'conduction_speed'
launch_params = {"simulator_parameters": json.dumps(launch_params)}
burst_id = json.loads(self.burst_c.launch_burst("new", "test_burst", **launch_params))['id']
return dao.get_burst_by_id(burst_id)
开发者ID:maedoc,项目名称:tvb-framework,代码行数:12,代码来源:flow_controller_test.py
示例16: test_launch_burst_invalid_simulator_data
def test_launch_burst_invalid_simulator_data(self):
"""
Test that burst is marked as error if invalid data is passed to the first step.
"""
algo_id = self.flow_service.get_algorithm_by_module_and_class('tvb.tests.framework.adapters.testadapter1',
'TestAdapter1').id
#Adapter tries to do an int(test1_val1) so this should fail
burst_config = self.burst_service.new_burst_configuration(self.test_project.id)
kwargs_replica = {'test1_val1': 'asa', 'test1_val2': '0'}
burst_config.update_simulator_configuration(kwargs_replica)
burst_id, _ = self.burst_service.launch_burst(burst_config, 0, algo_id, self.test_user.id)
burst_config = dao.get_burst_by_id(burst_id)
#Wait maximum x seconds for burst to finish
self._wait_for_burst(burst_config, error_expected=True)
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:14,代码来源:burst_service_test.py
示例17: test_launch_burst
def test_launch_burst(self):
"""
Launch a burst and check that it finishes correctly and before timeout (100)
"""
self.burst_c.index()
connectivity = self._burst_create_connectivity()
launch_params = copy.deepcopy(SIMULATOR_PARAMETERS)
launch_params['connectivity'] = connectivity.gid
launch_params['simulation_length'] = '10'
launch_params = {"simulator_parameters": json.dumps(launch_params)}
burst_id = json.loads(self.burst_c.launch_burst("new", "test_burst", **launch_params))['id']
waited = 1
timeout = 100
burst_config = dao.get_burst_by_id(burst_id)
while burst_config.status == BurstConfiguration.BURST_RUNNING and waited <= timeout:
sleep(0.5)
waited += 0.5
burst_config = dao.get_burst_by_id(burst_config.id)
if waited > timeout:
self.fail("Timed out waiting for simulations to finish.")
if burst_config.status != BurstConfiguration.BURST_FINISHED:
BurstService().stop_burst(burst_config)
self.fail("Burst should have finished successfully.")
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:23,代码来源:burst_controller_test.py
示例18: update_history_status
def update_history_status(self, id_list):
"""
For each burst_id received in the id_list read new status from DB and return a list [id, new_status] pair.
"""
result = []
for b_id in id_list:
burst = dao.get_burst_by_id(b_id)
burst.prepare_after_load()
if burst is not None:
result.append([burst.id, burst.status, burst.is_group,
"Check Operations page for error Message" if burst.status == burst.BURST_ERROR else ''])
else:
self.logger.debug("Could not find burst with id=" + str(b_id) + ". Might have been deleted by user!!")
return result
开发者ID:HuifangWang,项目名称:the-virtual-brain-website,代码行数:14,代码来源:burst_service.py
示例19: cancel_or_remove_burst
def cancel_or_remove_burst(self, burst_id):
"""
Cancel (if burst is still running) or Remove the burst given by burst_id.
:returns True when Remove operation was done and False when Cancel
"""
burst_entity = dao.get_burst_by_id(burst_id)
if burst_entity.status == burst_entity.BURST_RUNNING:
self.stop_burst(burst_entity)
return False
service = ProjectService()
## Remove each DataType in current burst.
## We can not leave all on cascade, because it won't work on SQLite for mapped dataTypes.
datatypes = dao.get_all_datatypes_in_burst(burst_id)
## Get operations linked to current burst before removing the burst or else
## the burst won't be there to identify operations any more.
remaining_ops = dao.get_operations_in_burst(burst_id)
# Remove burst first to delete work-flow steps which still hold foreign keys to operations.
correct = dao.remove_entity(burst_entity.__class__, burst_id)
if not correct:
raise RemoveDataTypeException("Could not remove Burst entity!")
for datatype in datatypes:
service.remove_datatype(burst_entity.fk_project, datatype.gid, False)
## Remove all Operations remained.
correct = True
remaining_op_groups = set()
project = dao.get_project_by_id(burst_entity.fk_project)
for oper in remaining_ops:
is_remaining = dao.get_generic_entity(oper.__class__, oper.id)
if len(is_remaining) == 0:
### Operation removed cascaded.
continue
if oper.fk_operation_group is not None and oper.fk_operation_group not in remaining_op_groups:
is_remaining = dao.get_generic_entity(model.OperationGroup, oper.fk_operation_group)
if len(is_remaining) > 0:
remaining_op_groups.add(oper.fk_operation_group)
correct = correct and dao.remove_entity(model.OperationGroup, oper.fk_operation_group)
correct = correct and dao.remove_entity(oper.__class__, oper.id)
service.structure_helper.remove_operation_data(project.name, oper.id)
if not correct:
raise RemoveDataTypeException("Could not remove Burst because a linked operation could not be dropped!!")
return True
开发者ID:lcosters,项目名称:tvb-framework,代码行数:47,代码来源:burst_service.py
示例20: test_branch_burst
def test_branch_burst(self):
"""
Test the branching of an existing burst.
"""
burst_config = self._prepare_and_launch_async_burst(wait_to_finish=60)
burst_config.prepare_after_load()
launch_params = self._prepare_simulation_params(4)
burst_config.update_simulator_configuration(launch_params)
burst_id, _ = self.burst_service.launch_burst(burst_config, 0, self.sim_algorithm.id,
self.test_user.id, "branch")
burst_config = dao.get_burst_by_id(burst_id)
self._wait_for_burst(burst_config)
ts_regions = self.count_all_entities(TimeSeriesRegion)
sim_states = self.count_all_entities(SimulationState)
self.assertEqual(2, ts_regions, "An operation group should have been created for each step.")
self.assertEqual(2, sim_states, "An dataType group should have been created for each step.")
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:19,代码来源:burst_service_test.py
注:本文中的tvb.core.entities.storage.dao.get_burst_by_id函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论