本文整理汇总了Python中tvb.core.entities.storage.dao.get_datatype_by_id函数的典型用法代码示例。如果您正苦于以下问题:Python get_datatype_by_id函数的具体用法?Python get_datatype_by_id怎么用?Python get_datatype_by_id使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_datatype_by_id函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_launch_two_ops_HDD_with_space
def test_launch_two_ops_HDD_with_space(self):
"""
Launch two operations and give enough available space for user so that both should finish.
"""
module = "tvb.tests.framework.adapters.testadapter3"
class_name = "TestAdapterHDDRequired"
group = dao.find_group(module, class_name)
adapter = FlowService().build_adapter_instance(group)
output = adapter.get_output()
output_type = output[0].__name__
data = {"test": 100}
TvbProfile.current.MAX_DISK_SPACE = 2 * float(adapter.get_required_disk_size(**data))
tmp_folder = FilesHelper().get_project_folder(self.test_project, "TEMP")
self.operation_service.initiate_operation(self.test_user, self.test_project.id, adapter,
tmp_folder, method_name=ABCAdapter.LAUNCH_METHOD, **data)
dts = dao.get_values_of_datatype(self.test_project.id, Datatype2)[0]
self.assertEqual(len(dts), 1)
datatype = dao.get_datatype_by_id(dts[0][0])
self.assertEqual(datatype.subject, DataTypeMetaData.DEFAULT_SUBJECT, "Wrong data stored.")
self.assertEqual(datatype.type, output_type, "Wrong data stored.")
#Now update the maximum disk size to be the size of the previously resulted datatypes (transform from kB to MB)
#plus what is estimated to be required from the next one (transform from B to MB)
TvbProfile.current.MAX_DISK_SPACE = float(datatype.disk_size) + float(adapter.get_required_disk_size(**data))
self.operation_service.initiate_operation(self.test_user, self.test_project.id, adapter,
tmp_folder, method_name=ABCAdapter.LAUNCH_METHOD, **data)
dts = dao.get_values_of_datatype(self.test_project.id, Datatype2)[0]
self.assertEqual(len(dts), 2)
datatype = dao.get_datatype_by_id(dts[1][0])
self.assertEqual(datatype.subject, DataTypeMetaData.DEFAULT_SUBJECT, "Wrong data stored.")
self.assertEqual(datatype.type, output_type, "Wrong data stored.")
开发者ID:unimauro,项目名称:tvb-framework,代码行数:31,代码来源:operation_service_test.py
示例2: _check_if_datatype_was_removed
def _check_if_datatype_was_removed(self, datatype):
"""
Check if a certain datatype was removed.
"""
try:
dao.get_datatype_by_id(datatype.id)
self.fail("The datatype was not deleted.")
except Exception:
pass
try:
dao.get_operation_by_id(datatype.fk_from_operation)
self.fail("The operation was not deleted.")
except Exception:
pass
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:14,代码来源:project_structure_test.py
示例3: update_dt
def update_dt(dt_id, new_create_date):
dt = dao.get_datatype_by_id(dt_id)
dt.create_date = new_create_date
dao.store_entity(dt)
# Update MetaData in H5 as well.
dt = dao.get_datatype_by_gid(dt.gid)
dt.persist_full_metadata()
开发者ID:amitsaroj001,项目名称:tvb-framework,代码行数:7,代码来源:modify_h5_metadata.py
示例4: test_datatypes_groups
def test_datatypes_groups(self):
"""
Tests if the dataType group is set correct on the dataTypes resulted from the same operation group.
"""
flow_service = FlowService()
all_operations = dao.get_filtered_operations(self.test_project.id, None)
assert len(all_operations) == 0, "There should be no operation"
adapter_instance = TestFactory.create_adapter('tvb.tests.framework.adapters.testadapter3', 'TestAdapter3')
data = {model.RANGE_PARAMETER_1: 'param_5', 'param_5': [1, 2]}
## Create Group of operations
flow_service.fire_operation(adapter_instance, self.test_user, self.test_project.id, **data)
all_operations = dao.get_filtered_operations(self.test_project.id, None)
assert len(all_operations) == 1, "Expected one operation group"
assert all_operations[0][2] == 2, "Expected 2 operations in group"
operation_group_id = all_operations[0][3]
assert operation_group_id != None, "The operation should be part of a group."
self.operation_service.stop_operation(all_operations[0][0])
self.operation_service.stop_operation(all_operations[0][1])
## Make sure operations are executed
self.operation_service.launch_operation(all_operations[0][0], False)
self.operation_service.launch_operation(all_operations[0][1], False)
resulted_datatypes = dao.get_datatype_in_group(operation_group_id=operation_group_id)
assert len(resulted_datatypes) >= 2, "Expected at least 2, but: " + str(len(resulted_datatypes))
dt = dao.get_datatype_by_id(resulted_datatypes[0].id)
datatype_group = dao.get_datatypegroup_by_op_group_id(operation_group_id)
assert dt.fk_datatype_group == datatype_group.id, "DataTypeGroup is incorrect"
开发者ID:maedoc,项目名称:tvb-framework,代码行数:33,代码来源:operation_service_test.py
示例5: test_datatypes_groups
def test_datatypes_groups(self):
"""
Tests if the dataType group is set correct on the dataTypes resulted from the same operation group.
"""
flow_service = FlowService()
all_operations = dao.get_filtered_operations(self.test_project.id, None)
self.assertEqual(len(all_operations), 0, "There should be no operation")
algogroup = dao.find_group("tvb.tests.framework.adapters.testadapter3", "TestAdapter3")
group, _ = flow_service.prepare_adapter(self.test_project.id, algogroup)
adapter_instance = flow_service.build_adapter_instance(group)
data = {model.RANGE_PARAMETER_1: "param_5", "param_5": [1, 2]}
## Create Group of operations
flow_service.fire_operation(adapter_instance, self.test_user, self.test_project.id, **data)
all_operations = dao.get_filtered_operations(self.test_project.id, None)
self.assertEqual(len(all_operations), 1, "Expected one operation group")
self.assertEqual(all_operations[0][2], 2, "Expected 2 operations in group")
operation_group_id = all_operations[0][3]
self.assertNotEquals(operation_group_id, None, "The operation should be part of a group.")
self.operation_service.stop_operation(all_operations[0][0])
self.operation_service.stop_operation(all_operations[0][1])
## Make sure operations are executed
self.operation_service.launch_operation(all_operations[0][0], False)
self.operation_service.launch_operation(all_operations[0][1], False)
resulted_datatypes = dao.get_datatype_in_group(operation_group_id=operation_group_id)
self.assertTrue(len(resulted_datatypes) >= 2, "Expected at least 2, but: " + str(len(resulted_datatypes)))
dt = dao.get_datatype_by_id(resulted_datatypes[0].id)
datatype_group = dao.get_datatypegroup_by_op_group_id(operation_group_id)
self.assertEqual(dt.fk_datatype_group, datatype_group.id, "DataTypeGroup is incorrect")
开发者ID:lcosters,项目名称:tvb-framework,代码行数:35,代码来源:operation_service_test.py
示例6: test_initiate_operation
def test_initiate_operation(self):
"""
Test the actual operation flow by executing a test adapter.
"""
module = "tvb.tests.framework.adapters.testadapter1"
class_name = "TestAdapter1"
group = dao.find_group(module, class_name)
adapter = FlowService().build_adapter_instance(group)
output = adapter.get_output()
output_type = output[0].__name__
data = {"test1_val1": 5, "test1_val2": 5}
tmp_folder = FilesHelper().get_project_folder(self.test_project, "TEMP")
res = self.operation_service.initiate_operation(
self.test_user, self.test_project.id, adapter, tmp_folder, **data
)
self.assertTrue(res.index("has finished.") > 10, "Operation didn't finish")
group = dao.find_group(module, class_name)
self.assertEqual(group.module, "tvb.tests.framework.adapters.testadapter1", "Wrong data stored.")
self.assertEqual(group.classname, "TestAdapter1", "Wrong data stored.")
dts, count = dao.get_values_of_datatype(self.test_project.id, Datatype1)
self.assertEqual(count, 1)
self.assertEqual(len(dts), 1)
datatype = dao.get_datatype_by_id(dts[0][0])
self.assertEqual(datatype.subject, DataTypeMetaData.DEFAULT_SUBJECT, "Wrong data stored.")
self.assertEqual(datatype.type, output_type, "Wrong data stored.")
开发者ID:lcosters,项目名称:tvb-framework,代码行数:25,代码来源:operation_service_test.py
示例7: get_datatype_and_datatypegroup_inputs_for_operation
def get_datatype_and_datatypegroup_inputs_for_operation(operation_gid, selected_filter):
"""
Returns the dataTypes that are used as input parameters for the given operation.
'selected_filter' - is expected to be a visibility filter.
If any dataType is part of a dataType group then the dataType group will
be returned instead of that dataType.
"""
all_datatypes = ProjectService._review_operation_inputs(operation_gid)[0]
datatype_inputs = []
for datatype in all_datatypes:
if selected_filter.display_name == StaticFiltersFactory.RELEVANT_VIEW:
if datatype.visible:
datatype_inputs.append(datatype)
else:
datatype_inputs.append(datatype)
datatypes = []
datatype_groups = dict()
for data_type in datatype_inputs:
if data_type.fk_datatype_group is None:
datatypes.append(data_type)
elif data_type.fk_datatype_group not in datatype_groups:
dt_group = dao.get_datatype_by_id(data_type.fk_datatype_group)
datatype_groups[data_type.fk_datatype_group] = dt_group
datatypes.extend([v for _, v in datatype_groups.iteritems()])
return datatypes
开发者ID:rajul,项目名称:tvb-framework,代码行数:27,代码来源:project_service.py
示例8: set_datatype_visibility
def set_datatype_visibility(datatype_gid, is_visible):
"""
Sets the dataType visibility. If the given dataType is a dataType group or it is part of a
dataType group than this method will set the visibility for each dataType from this group.
"""
def set_visibility(dt):
""" set visibility flag, persist in db and h5"""
dt.visible = is_visible
dt = dao.store_entity(dt)
dt.persist_full_metadata()
def set_group_descendants_visibility(datatype_group_id):
datatypes_in_group = dao.get_datatypes_from_datatype_group(datatype_group_id)
for group_dt in datatypes_in_group:
set_visibility(group_dt)
datatype = dao.get_datatype_by_gid(datatype_gid)
if isinstance(datatype, DataTypeGroup): # datatype is a group
set_group_descendants_visibility(datatype.id)
elif datatype.fk_datatype_group is not None: # datatype is member of a group
set_group_descendants_visibility(datatype.fk_datatype_group)
# the datatype to be updated is the parent datatype group
datatype = dao.get_datatype_by_id(datatype.fk_datatype_group)
# update the datatype or datatype group.
set_visibility(datatype)
开发者ID:rajul,项目名称:tvb-framework,代码行数:27,代码来源:project_service.py
示例9: test_set_datatype_visibility
def test_set_datatype_visibility(self):
"""
Check if the visibility for a datatype is set correct.
"""
#it's a list of 3 elem.
mapped_arrays = self._create_mapped_arrays(self.test_project.id)
for mapped_array in mapped_arrays:
is_visible = dao.get_datatype_by_id(mapped_array[0]).visible
self.assertTrue(is_visible, "The data type should be visible.")
self.project_service.set_datatype_visibility(mapped_arrays[0][2], False)
for i in range(len(mapped_arrays)):
is_visible = dao.get_datatype_by_id(mapped_arrays[i][0]).visible
if not i:
self.assertFalse(is_visible, "The data type should not be visible.")
else:
self.assertTrue(is_visible, "The data type should be visible.")
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:17,代码来源:project_structure_test.py
示例10: remove_datatype
def remove_datatype(self, project_id, datatype_gid, skip_validation=False):
"""
Method used for removing a dataType. If the given dataType is a DatatypeGroup
or a dataType from a DataTypeGroup than this method will remove the entire group.
The operation(s) used for creating the dataType(s) will also be removed.
"""
datatype = dao.get_datatype_by_gid(datatype_gid)
if datatype is None:
self.logger.warning("Attempt to delete DT[%s] which no longer exists." % datatype_gid)
return
user = dao.get_user_for_datatype(datatype.id)
freed_space = datatype.disk_size or 0
is_datatype_group = False
if dao.is_datatype_group(datatype_gid):
is_datatype_group = True
freed_space = dao.get_datatype_group_disk_size(datatype.id)
elif datatype.fk_datatype_group is not None:
is_datatype_group = True
datatype = dao.get_datatype_by_id(datatype.fk_datatype_group)
freed_space = dao.get_datatype_group_disk_size(datatype.id)
operations_set = [datatype.fk_from_operation]
correct = True
if is_datatype_group:
self.logger.debug("Removing datatype group %s" % datatype)
data_list = dao.get_datatypes_from_datatype_group(datatype.id)
for adata in data_list:
self._remove_project_node_files(project_id, adata.gid, skip_validation)
if adata.fk_from_operation not in operations_set:
operations_set.append(adata.fk_from_operation)
datatype_group = dao.get_datatype_group_by_gid(datatype.gid)
dao.remove_datatype(datatype_gid)
correct = correct and dao.remove_entity(model.OperationGroup, datatype_group.fk_operation_group)
else:
self.logger.debug("Removing datatype %s" % datatype)
self._remove_project_node_files(project_id, datatype.gid, skip_validation)
## Remove Operation entity in case no other DataType needs them.
project = dao.get_project_by_id(project_id)
for operation_id in operations_set:
dependent_dt = dao.get_generic_entity(model.DataType, operation_id, "fk_from_operation")
if len(dependent_dt) > 0:
### Do not remove Operation in case DataType still exist referring it.
continue
correct = correct and dao.remove_entity(model.Operation, operation_id)
## Make sure Operation folder is removed
self.structure_helper.remove_operation_data(project.name, datatype.fk_from_operation)
if not correct:
raise RemoveDataTypeException("Could not remove DataType " + str(datatype_gid))
user.used_disk_space = user.used_disk_space - freed_space
dao.store_entity(user)
开发者ID:unimauro,项目名称:tvb-framework,代码行数:56,代码来源:project_service.py
示例11: set_datatype_visibility
def set_datatype_visibility(datatype_gid, is_visible):
"""
Sets the dataType visibility. If the given dataType is a dataType group or it is part of a
dataType group than this method will set the visibility for each dataType from this group.
"""
datatype = dao.get_datatype_by_gid(datatype_gid)
if datatype.fk_datatype_group is not None:
datatype_gid = dao.get_datatype_by_id(datatype.fk_datatype_group).gid
dao.set_datatype_visibility(datatype_gid, is_visible)
开发者ID:HuifangWang,项目名称:the-virtual-brain-website,代码行数:10,代码来源:project_service.py
示例12: _create_connectivity
def _create_connectivity(self, nodes_number):
"""
Create a connectivity entity and return its GID
"""
storage_path = FilesHelper().get_project_folder(self.test_project, str(self.operation.id))
connectivity = Connectivity(storage_path=storage_path)
connectivity.weights = numpy.ones((nodes_number, nodes_number))
connectivity.centres = numpy.ones((nodes_number, 3))
adapter_instance = StoreAdapter([connectivity])
OperationService().initiate_prelaunch(self.operation, adapter_instance, {})
return dao.get_datatype_by_id(connectivity.id).gid
开发者ID:sdiazpier,项目名称:tvb-framework,代码行数:12,代码来源:simulator_adapter_test.py
示例13: test_remove_datatype
def test_remove_datatype(self):
"""
Tests the deletion of a datatype.
"""
#it's a list of 3 elem.
array_wrappers = self._create_mapped_arrays(self.test_project.id)
dt_list = []
for array_wrapper in array_wrappers:
dt_list.append(dao.get_datatype_by_id(array_wrapper[0]))
self.project_service.remove_datatype(self.test_project.id, dt_list[0].gid)
self._check_if_datatype_was_removed(dt_list[0])
开发者ID:LauHoiYanGladys,项目名称:tvb-framework,代码行数:12,代码来源:project_structure_test.py
示例14: _long_burst_launch
def _long_burst_launch(self, is_range=False):
self.burst_c.index()
connectivity = DatatypesFactory().create_connectivity()[1]
launch_params = copy.deepcopy(SIMULATOR_PARAMETERS)
launch_params['connectivity'] = dao.get_datatype_by_id(connectivity.id).gid
launch_params['simulation_length'] = '10000'
if is_range:
launch_params['conduction_speed'] = '[10,15,20]'
launch_params[model.RANGE_PARAMETER_1] = 'conduction_speed'
launch_params = {"simulator_parameters": json.dumps(launch_params)}
burst_id = json.loads(self.burst_c.launch_burst("new", "test_burst", **launch_params))['id']
return dao.get_burst_by_id(burst_id)
开发者ID:maedoc,项目名称:tvb-framework,代码行数:12,代码来源:flow_controller_test.py
示例15: test_get_inputs_for_operation
def test_get_inputs_for_operation(self):
"""
Tests method get_datatype_and_datatypegroup_inputs_for_operation.
Verifies filters' influence over results is as expected
"""
algo_group = dao.find_group('tvb.tests.framework.adapters.testadapter3', 'TestAdapter3')
algo = dao.get_algorithm_by_group(algo_group.id)
array_wrappers = self._create_mapped_arrays(self.test_project.id)
ids = []
for datatype in array_wrappers:
ids.append(datatype[0])
datatype = dao.get_datatype_by_id(ids[0])
datatype.visible = False
dao.store_entity(datatype)
parameters = json.dumps({"param_5": "1", "param_1": array_wrappers[0][2],
"param_2": array_wrappers[1][2], "param_3": array_wrappers[2][2], "param_6": "0"})
operation = model.Operation(self.test_user.id, self.test_project.id, algo.id, parameters)
operation = dao.store_entity(operation)
inputs = self.project_service.get_datatype_and_datatypegroup_inputs_for_operation(operation.gid,
self.relevant_filter)
self.assertEqual(len(inputs), 2)
self.assertTrue(ids[1] in [inputs[0].id, inputs[1].id], "Retrieved wrong dataType.")
self.assertTrue(ids[2] in [inputs[0].id, inputs[1].id], "Retrieved wrong dataType.")
self.assertFalse(ids[0] in [inputs[0].id, inputs[1].id], "Retrieved wrong dataType.")
inputs = self.project_service.get_datatype_and_datatypegroup_inputs_for_operation(operation.gid,
self.full_filter)
self.assertEqual(len(inputs), 3, "Incorrect number of operations.")
self.assertTrue(ids[0] in [inputs[0].id, inputs[1].id, inputs[2].id], "Retrieved wrong dataType.")
self.assertTrue(ids[1] in [inputs[0].id, inputs[1].id, inputs[2].id], "Retrieved wrong dataType.")
self.assertTrue(ids[2] in [inputs[0].id, inputs[1].id, inputs[2].id], "Retrieved wrong dataType.")
project, dt_group_id, first_dt, _ = self._create_datatype_group()
first_dt.visible = False
dao.store_entity(first_dt)
parameters = json.dumps({"other_param": "_", "param_1": first_dt.gid})
operation = model.Operation(self.test_user.id, project.id, algo.id, parameters)
operation = dao.store_entity(operation)
inputs = self.project_service.get_datatype_and_datatypegroup_inputs_for_operation(operation.gid,
self.relevant_filter)
self.assertEqual(len(inputs), 0, "Incorrect number of dataTypes.")
inputs = self.project_service.get_datatype_and_datatypegroup_inputs_for_operation(operation.gid,
self.full_filter)
self.assertEqual(len(inputs), 1, "Incorrect number of dataTypes.")
self.assertEqual(inputs[0].id, dt_group_id, "Wrong dataType.")
self.assertTrue(inputs[0].id != first_dt.id, "Wrong dataType.")
开发者ID:amitsaroj001,项目名称:tvb-framework,代码行数:51,代码来源:project_structure_test.py
示例16: __datatype2metastructure
def __datatype2metastructure(row, dt_ids):
"""
Convert a list of data retrieved from DB and create a DataTypeMetaData object.
"""
data = {}
is_group = False
group = None
if row[7] is not None and row[7] and row[14] in dt_ids:
is_group = True
group = dao.get_generic_entity(model.OperationGroup, row[7])
if group and len(group):
group = group[0]
else:
is_group = False
datatype_group = None
if row[14] is not None and row[14] in dt_ids:
datatype_group = dao.get_datatype_by_id(row[14])
dt_entity = dao.get_datatype_by_gid(row[9])
data[DataTypeMetaData.KEY_TITLE] = dt_entity.display_name
## All these fields are necessary here for dynamic Tree levels.
data[DataTypeMetaData.KEY_NODE_TYPE] = datatype_group.type if datatype_group is not None else row[0]
data[DataTypeMetaData.KEY_STATE] = row[1]
data[DataTypeMetaData.KEY_SUBJECT] = str(row[2])
operation_name = CommonDetails.compute_operation_name(row[3], row[4], row[5])
data[DataTypeMetaData.KEY_OPERATION_TYPE] = operation_name
data[DataTypeMetaData.KEY_AUTHOR] = row[6]
data[DataTypeMetaData.KEY_OPERATION_TAG] = group.name if is_group else row[8]
data[DataTypeMetaData.KEY_OP_GROUP_ID] = group.id if is_group else None
data[DataTypeMetaData.KEY_GID] = datatype_group.gid if datatype_group is not None else row[9]
data[DataTypeMetaData.KEY_DATE] = date2string(row[10]) if (row[10] is not None) else ''
data[DataTypeMetaData.KEY_DATATYPE_ID] = datatype_group.id if datatype_group is not None else row[11]
data[DataTypeMetaData.KEY_LINK] = row[12]
data[DataTypeMetaData.KEY_OPERATION_ALGORITHM] = row[5]
date_string = row[10].strftime(MONTH_YEAR_FORMAT) if row[10] is not None else ""
data[DataTypeMetaData.KEY_CREATE_DATA_MONTH] = date_string
date_string = row[10].strftime(DAY_MONTH_YEAR_FORMAT) if row[10] is not None else ""
data[DataTypeMetaData.KEY_CREATE_DATA_DAY] = date_string
data[DataTypeMetaData.KEY_BURST] = row[15] if row[15] is not None else '-None-'
data[DataTypeMetaData.KEY_TAG_1] = row[16] if row[16] else ''
data[DataTypeMetaData.KEY_TAG_2] = row[17] if row[17] else ''
data[DataTypeMetaData.KEY_TAG_3] = row[18] if row[18] else ''
data[DataTypeMetaData.KEY_TAG_4] = row[19] if row[19] else ''
data[DataTypeMetaData.KEY_TAG_5] = row[20] if row[20] else ''
data[DataTypeMetaData.KEY_RELEVANCY] = True if row[21] > 0 else False
invalid = True if row[13] else False
return DataTypeMetaData(data, invalid)
开发者ID:HuifangWang,项目名称:the-virtual-brain-website,代码行数:46,代码来源:project_service.py
示例17: test_update_meta_data_group
def test_update_meta_data_group(self):
"""
Test the new update metaData for a group of dataTypes.
"""
datatypes, group_id = TestFactory.create_group(self.test_user, subject="test-subject-1")
new_meta_data = {DataTypeOverlayDetails.DATA_SUBJECT: "new subject",
DataTypeOverlayDetails.DATA_STATE: "updated_state",
DataTypeOverlayDetails.CODE_OPERATION_GROUP_ID: group_id,
DataTypeOverlayDetails.CODE_OPERATION_TAG: 'newGroupName'}
self.project_service.update_metadata(new_meta_data)
for datatype in datatypes:
new_datatype = dao.get_datatype_by_id(datatype.id)
self.assertEqual(group_id, new_datatype.parent_operation.fk_operation_group)
new_group = dao.get_generic_entity(model.OperationGroup, group_id)[0]
self.assertEqual(new_group.name, "newGroupName")
self.__check_meta_data(new_meta_data, new_datatype)
开发者ID:sdiazpier,项目名称:tvb-framework,代码行数:18,代码来源:project_service_test.py
示例18: get_uids_dict
def get_uids_dict(entity_obj):
"""
Returns a dictionary containing the UIDs for the object graph of the given entity.
The given entity should be an instance of: Connectivity, Surface, Cortexor or CortexActivity.
If an obj do not have an uid then its gid will be returned.
"""
result = dict()
datatype_obj = dao.get_datatype_by_id(entity_obj.id)
if isinstance(entity_obj, Connectivity):
result[KEY_CONNECTIVITY_UID] = __get_uid(datatype_obj)
if isinstance(entity_obj, Surface):
result[KEY_SURFACE_UID] = __get_uid(datatype_obj)
if isinstance(entity_obj, TimeSeries):
surface_datatype = entity_obj.surface
connectivity_datatype = entity_obj.connectivity
result[KEY_SURFACE_UID] = __get_uid(surface_datatype)
result[KEY_CONNECTIVITY_UID] = __get_uid(connectivity_datatype)
return result
开发者ID:HuifangWang,项目名称:the-virtual-brain-website,代码行数:19,代码来源:helper_handler.py
示例19: test_launch_operation_HDD_with_space
def test_launch_operation_HDD_with_space(self):
"""
Test the actual operation flow by executing a test adapter.
"""
module = "tvb.tests.framework.adapters.testadapter3"
class_name = "TestAdapterHDDRequired"
group = dao.find_group(module, class_name)
adapter = FlowService().build_adapter_instance(group)
output = adapter.get_output()
output_type = output[0].__name__
data = {"test": 100}
TvbProfile.current.MAX_DISK_SPACE = float(adapter.get_required_disk_size(**data))
tmp_folder = FilesHelper().get_project_folder(self.test_project, "TEMP")
self.operation_service.initiate_operation(self.test_user, self.test_project.id, adapter,
tmp_folder, method_name=ABCAdapter.LAUNCH_METHOD, **data)
dts = dao.get_values_of_datatype(self.test_project.id, Datatype2)[0]
self.assertEqual(len(dts), 1)
datatype = dao.get_datatype_by_id(dts[0][0])
self.assertEqual(datatype.subject, DataTypeMetaData.DEFAULT_SUBJECT, "Wrong data stored.")
self.assertEqual(datatype.type, output_type, "Wrong data stored.")
开发者ID:unimauro,项目名称:tvb-framework,代码行数:20,代码来源:operation_service_test.py
示例20: test_get_inputs_for_op_group_simple_inputs
def test_get_inputs_for_op_group_simple_inputs(self):
"""
Tests method get_datatypes_inputs_for_operation_group.
The dataType inputs will not be part of a dataType group.
"""
#it's a list of 3 elem.
array_wrappers = self._create_mapped_arrays(self.test_project.id)
array_wrapper_ids = []
for datatype in array_wrappers:
array_wrapper_ids.append(datatype[0])
datatype = dao.get_datatype_by_id(array_wrapper_ids[0])
datatype.visible = False
dao.store_entity(datatype)
op_group = model.OperationGroup(self.test_project.id, "group", "range1[1..2]")
op_group = dao.store_entity(op_group)
params_1 = json.dumps({"param_5": "2", "param_1": array_wrappers[0][2],
"param_2": array_wrappers[1][2], "param_6": "7"})
params_2 = json.dumps({"param_5": "5", "param_3": array_wrappers[2][2],
"param_2": array_wrappers[1][2], "param_6": "6"})
algo_group = dao.find_group('tvb.tests.framework.adapters.testadapter3', 'TestAdapter3')
algo = dao.get_algorithm_by_group(algo_group.id)
op1 = model.Operation(self.test_user.id, self.test_project.id, algo.id, params_1, op_group_id=op_group.id)
op2 = model.Operation(self.test_user.id, self.test_project.id, algo.id, params_2, op_group_id=op_group.id)
dao.store_entities([op1, op2])
inputs = self.project_service.get_datatypes_inputs_for_operation_group(op_group.id, self.relevant_filter)
self.assertEqual(len(inputs), 2)
self.assertFalse(array_wrapper_ids[0] in [inputs[0].id, inputs[1].id], "Retrieved wrong dataType.")
self.assertTrue(array_wrapper_ids[1] in [inputs[0].id, inputs[1].id], "Retrieved wrong dataType.")
self.assertTrue(array_wrapper_ids[2] in [inputs[0].id, inputs[1].id], "Retrieved wrong dataType.")
inputs = self.project_service.get_datatypes_inputs_for_operation_group(op_group.id, self.full_filter)
self.assertEqual(len(inputs), 3, "Incorrect number of dataTypes.")
self.assertTrue(array_wrapper_ids[0] in [inputs[0].id, inputs[1].id, inputs[2].id])
self.assertTrue(array_wrapper_ids[1] in [inputs[0].id, inputs[1].id, inputs[2].id])
self.assertTrue(array_wrapper_ids[2] in [inputs[0].id, inputs[1].id, inputs[2].id])
开发者ID:amitsaroj001,项目名称:tvb-framework,代码行数:40,代码来源:project_structure_test.py
注:本文中的tvb.core.entities.storage.dao.get_datatype_by_id函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论