• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python dao.find_group函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tvb.core.entities.storage.dao.find_group函数的典型用法代码示例。如果您正苦于以下问题:Python find_group函数的具体用法?Python find_group怎么用?Python find_group使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了find_group函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_initiate_operation

 def test_initiate_operation(self):
     """
     Test the actual operation flow by executing a test adapter.
     """
     module = "tvb.tests.framework.adapters.testadapter1"
     class_name = "TestAdapter1"
     group = dao.find_group(module, class_name)
     adapter = FlowService().build_adapter_instance(group)
     output = adapter.get_output()
     output_type = output[0].__name__
     data = {"test1_val1": 5, "test1_val2": 5}
     tmp_folder = FilesHelper().get_project_folder(self.test_project, "TEMP")
     res = self.operation_service.initiate_operation(
         self.test_user, self.test_project.id, adapter, tmp_folder, **data
     )
     self.assertTrue(res.index("has finished.") > 10, "Operation didn't finish")
     group = dao.find_group(module, class_name)
     self.assertEqual(group.module, "tvb.tests.framework.adapters.testadapter1", "Wrong data stored.")
     self.assertEqual(group.classname, "TestAdapter1", "Wrong data stored.")
     dts, count = dao.get_values_of_datatype(self.test_project.id, Datatype1)
     self.assertEqual(count, 1)
     self.assertEqual(len(dts), 1)
     datatype = dao.get_datatype_by_id(dts[0][0])
     self.assertEqual(datatype.subject, DataTypeMetaData.DEFAULT_SUBJECT, "Wrong data stored.")
     self.assertEqual(datatype.type, output_type, "Wrong data stored.")
开发者ID:lcosters,项目名称:tvb-framework,代码行数:25,代码来源:operation_service_test.py


示例2: build_adapter_from_declaration

 def build_adapter_from_declaration(cls, adapter_declaration):
     """
     Build and adapter from the declaration in the portlets xml.
     """
     adapter_import_path = adapter_declaration[ABCAdapter.KEY_TYPE]
     class_name = adapter_import_path.split(".")[-1]
     module = adapter_import_path.replace("." + class_name, "")
     if "initparam" in adapter_declaration:
         algo_group = dao.find_group(module, class_name, adapter_declaration["initparam"])
     else:
         algo_group = dao.find_group(module, class_name)
     if algo_group is not None:
         return ABCAdapter.build_adapter(algo_group), algo_group
     else:
         return None, None
开发者ID:arybinski,项目名称:tvb-framework,代码行数:15,代码来源:portlet_configurer.py


示例3: test_build_adapter_instance

 def test_build_adapter_instance(self):
     """
     Test standard flow for building an adapter instance.
     """
     algo_group = dao.find_group(TEST_ADAPTER_VALID_MODULE, TEST_ADAPTER_VALID_CLASS)
     adapter = ABCAdapter.build_adapter(algo_group)
     self.assertTrue(isinstance(adapter, ABCSynchronous), "Something went wrong with valid data!")
开发者ID:amitsaroj001,项目名称:tvb-framework,代码行数:7,代码来源:flow_service_test.py


示例4: test_datatypes_groups

    def test_datatypes_groups(self):
        """
        Tests if the dataType group is set correct on the dataTypes resulted from the same operation group.
        """
        flow_service = FlowService()

        all_operations = dao.get_filtered_operations(self.test_project.id, None)
        self.assertEqual(len(all_operations), 0, "There should be no operation")

        algogroup = dao.find_group("tvb.tests.framework.adapters.testadapter3", "TestAdapter3")
        group, _ = flow_service.prepare_adapter(self.test_project.id, algogroup)
        adapter_instance = flow_service.build_adapter_instance(group)
        data = {model.RANGE_PARAMETER_1: "param_5", "param_5": [1, 2]}
        ## Create Group of operations
        flow_service.fire_operation(adapter_instance, self.test_user, self.test_project.id, **data)

        all_operations = dao.get_filtered_operations(self.test_project.id, None)
        self.assertEqual(len(all_operations), 1, "Expected one operation group")
        self.assertEqual(all_operations[0][2], 2, "Expected 2 operations in group")

        operation_group_id = all_operations[0][3]
        self.assertNotEquals(operation_group_id, None, "The operation should be part of a group.")

        self.operation_service.stop_operation(all_operations[0][0])
        self.operation_service.stop_operation(all_operations[0][1])
        ## Make sure operations are executed
        self.operation_service.launch_operation(all_operations[0][0], False)
        self.operation_service.launch_operation(all_operations[0][1], False)

        resulted_datatypes = dao.get_datatype_in_group(operation_group_id=operation_group_id)
        self.assertTrue(len(resulted_datatypes) >= 2, "Expected at least 2, but: " + str(len(resulted_datatypes)))

        dt = dao.get_datatype_by_id(resulted_datatypes[0].id)
        datatype_group = dao.get_datatypegroup_by_op_group_id(operation_group_id)
        self.assertEqual(dt.fk_datatype_group, datatype_group.id, "DataTypeGroup is incorrect")
开发者ID:lcosters,项目名称:tvb-framework,代码行数:35,代码来源:operation_service_test.py


示例5: test_launch_operation_HDD_full_space_started_ops

 def test_launch_operation_HDD_full_space_started_ops(self):
     """
     Test the actual operation flow by executing a test adapter.
     """
     space_taken_by_started = 100
     module = "tvb.tests.framework.adapters.testadapter3"
     class_name = "TestAdapterHDDRequired"
     group = dao.find_group(module, class_name)
     started_operation = model.Operation(
         self.test_user.id,
         self.test_project.id,
         group.id,
         "",
         status=model.STATUS_STARTED,
         estimated_disk_size=space_taken_by_started,
     )
     dao.store_entity(started_operation)
     adapter = FlowService().build_adapter_instance(group)
     data = {"test": 100}
     TvbProfile.current.MAX_DISK_SPACE = float(adapter.get_required_disk_size(**data) + space_taken_by_started - 1)
     tmp_folder = FilesHelper().get_project_folder(self.test_project, "TEMP")
     self.assertRaises(
         NoMemoryAvailableException,
         self.operation_service.initiate_operation,
         self.test_user,
         self.test_project.id,
         adapter,
         tmp_folder,
         **data
     )
     self._assert_no_dt2()
开发者ID:lcosters,项目名称:tvb-framework,代码行数:31,代码来源:operation_service_test.py


示例6: test_launch_two_ops_HDD_full_space

    def test_launch_two_ops_HDD_full_space(self):
        """
        Launch two operations and give available space for user so that the first should finish,
        but after the update to the user hdd size the second should not.
        """
        module = "tvb.tests.framework.adapters.testadapter3"
        class_name = "TestAdapterHDDRequired"
        group = dao.find_group(module, class_name)
        adapter = FlowService().build_adapter_instance(group)

        data = {"test": 100}
        TvbProfile.current.MAX_DISK_SPACE = 1 + float(adapter.get_required_disk_size(**data))
        tmp_folder = FilesHelper().get_project_folder(self.test_project, "TEMP")
        self.operation_service.initiate_operation(self.test_user, self.test_project.id, adapter, tmp_folder, **data)

        datatype = self._assert_stored_dt2()
        # Now update the maximum disk size to be less than size of the previously resulted datatypes (transform kB to MB)
        # plus what is estimated to be required from the next one (transform from B to MB)
        TvbProfile.current.MAX_DISK_SPACE = float(datatype.disk_size - 1) + float(
            adapter.get_required_disk_size(**data) - 1
        )

        self.assertRaises(
            NoMemoryAvailableException,
            self.operation_service.initiate_operation,
            self.test_user,
            self.test_project.id,
            adapter,
            tmp_folder,
            **data
        )
        self._assert_stored_dt2()
开发者ID:lcosters,项目名称:tvb-framework,代码行数:32,代码来源:operation_service_test.py


示例7: import_sensors

 def import_sensors(user, project, zip_path, sensors_type):
     ### Retrieve Adapter instance 
     group = dao.find_group('tvb.adapters.uploaders.sensors_importer', 'Sensors_Importer')
     importer = ABCAdapter.build_adapter(group)
     args = {'sensors_file': zip_path, 'sensors_type': sensors_type}
     ### Launch Operation
     FlowService().fire_operation(importer, user, project.id, **args)
开发者ID:unimauro,项目名称:tvb-framework,代码行数:7,代码来源:test_factory.py


示例8: test_happy_flow_surface_import

    def test_happy_flow_surface_import(self):
        """
        Verifies the happy flow for importing a surface.
        """
        dt_count_before = TestFactory.get_entity_count(self.test_project, ProjectionSurfaceEEG())
        group = dao.find_group(
            "tvb.adapters.uploaders.projection_matrix_importer", "ProjectionMatrixSurfaceEEGImporter"
        )
        importer = ABCAdapter.build_adapter(group)

        file_path = os.path.join(
            os.path.abspath(os.path.dirname(dataset.__file__)), "projection_eeg_65_surface_16k.npy"
        )
        args = {
            "projection_file": file_path,
            "dataset_name": "ProjectionMatrix",
            "sensors": self.sensors.gid,
            "surface": self.surface.gid,
            DataTypeMetaData.KEY_SUBJECT: DataTypeMetaData.DEFAULT_SUBJECT,
        }

        FlowService().fire_operation(importer, self.test_user, self.test_project.id, **args)
        dt_count_after = TestFactory.get_entity_count(self.test_project, ProjectionSurfaceEEG())

        self.assertEqual(dt_count_before + 1, dt_count_after)
开发者ID:lcosters,项目名称:tvb-framework,代码行数:25,代码来源:projection_matrix_importer_test.py


示例9: test_wrong_shape

    def test_wrong_shape(self):
        """
        Verifies that importing a different shape throws exception
        """
        group = dao.find_group(
            "tvb.adapters.uploaders.projection_matrix_importer", "ProjectionMatrixSurfaceEEGImporter"
        )
        importer = ABCAdapter.build_adapter(group)

        file_path = os.path.join(
            os.path.abspath(os.path.dirname(dataset.__file__)), "projection_eeg_62_surface_16k.mat"
        )
        args = {
            "projection_file": file_path,
            "dataset_name": "ProjectionMatrix",
            "sensors": self.sensors.gid,
            "surface": self.surface.gid,
            DataTypeMetaData.KEY_SUBJECT: DataTypeMetaData.DEFAULT_SUBJECT,
        }

        try:
            FlowService().fire_operation(importer, self.test_user, self.test_project.id, **args)
            self.fail("This was expected not to run! 62 rows in proj matrix, but 65 sensors")
        except OperationException:
            pass
开发者ID:lcosters,项目名称:tvb-framework,代码行数:25,代码来源:projection_matrix_importer_test.py


示例10: _import

    def _import(self, import_file_path, surface_gid, connectivity_gid):
        """
        This method is used for importing region mappings
        :param import_file_path: absolute path of the file to be imported
        """

        ### Retrieve Adapter instance
        group = dao.find_group("tvb.adapters.uploaders.region_mapping_importer", "RegionMapping_Importer")
        importer = ABCAdapter.build_adapter(group)

        args = {
            "mapping_file": import_file_path,
            "surface": surface_gid,
            "connectivity": connectivity_gid,
            DataTypeMetaData.KEY_SUBJECT: "test",
        }

        now = datetime.datetime.now()

        ### Launch import Operation
        FlowService().fire_operation(importer, self.test_user, self.test_project.id, **args)

        # During setup we import a CFF which creates an additional RegionMapping
        # So, here we have to find our mapping (just imported)
        data_filter = FilterChain(fields=[FilterChain.datatype + ".create_date"], operations=[">"], values=[now])
        region_mapping = self._get_entity(RegionMapping(), data_filter)

        return region_mapping
开发者ID:sdiazpier,项目名称:tvb-framework,代码行数:28,代码来源:region_mapping_importer_test.py


示例11: _export_linked_datatypes

    def _export_linked_datatypes(self, project, zip_file):
        files_helper = FilesHelper()
        linked_paths = self._get_linked_datatypes_storage_path(project)

        if not linked_paths:
            # do not export an empty operation
            return

        # Make a import operation which will contain links to other projects
        alg_group = dao.find_group(TVB_IMPORTER_MODULE, TVB_IMPORTER_CLASS)
        algo = dao.get_algorithm_by_group(alg_group.id)
        op = model.Operation(None, project.id, algo.id, '')
        op.project = project
        op.algorithm = algo
        op.id = 'links-to-external-projects'
        op.start_now()
        op.mark_complete(model.STATUS_FINISHED)

        # write operation.xml to disk
        files_helper.write_operation_metadata(op)
        op_folder = files_helper.get_operation_folder(op.project.name, op.id)
        operation_xml = files_helper.get_operation_meta_file_path(op.project.name, op.id)
        op_folder_name = os.path.basename(op_folder)

        # add operation.xml
        zip_file.write(operation_xml, op_folder_name + '/' + os.path.basename(operation_xml))

        # add linked datatypes to archive in the import operation
        for pth in linked_paths:
            zip_pth = op_folder_name + '/' + os.path.basename(pth)
            zip_file.write(pth, zip_pth)

        # remove these files, since we only want them in export archive
        files_helper.remove_folder(op_folder)
开发者ID:amitsaroj001,项目名称:tvb-framework,代码行数:34,代码来源:export_manager.py


示例12: parse_event_node

    def parse_event_node(self):
        """
        Parse the stored event node to get required data and arguments.
        """
        kw_parameters = {}
        for one_arg in self.event_node.childNodes:
            if one_arg.nodeType != Node.ELEMENT_NODE:
                continue
            if one_arg.nodeName == ELEM_ADAPTER:
                #TODO: so far there is no need for it, but we should maybe
                #handle cases where same module/class but different init parameter
                group = dao.find_group(one_arg.getAttribute(ATT_MODULE), one_arg.getAttribute(ATT_CLASS))
                adapter = ABCAdapter.build_adapter(group)
                result_uid = one_arg.getAttribute(ATT_UID)
                if result_uid:
                    kw_parameters[ATT_UID] = result_uid
                LOGGER.debug("Adapter used is %s", str(adapter.__class__))
                self.callable_object = adapter
                continue
            if one_arg.nodeName == ELEM_METHOD:
                self.call_method = one_arg.getAttribute(ATT_NAME)
                if one_arg.getAttribute(ATT_OPERATION_HIDDEN):
                    self.operation_visible = False
                continue
            if one_arg.nodeName == ELEM_ARGS:
                kw_parameters.update(_parse_arguments(one_arg))
                continue
            LOGGER.info("Ignored undefined node %s", str(one_arg.nodeName))

        self.arguments.update(kw_parameters)
开发者ID:sdiazpier,项目名称:tvb-framework,代码行数:30,代码来源:event_handlers.py


示例13: test_launch_two_ops_HDD_full_space

    def test_launch_two_ops_HDD_full_space(self):
        """
        Launch two operations and give available space for user so that the first should finish,
        but after the update to the user hdd size the second should not.
        """
        module = "tvb.tests.framework.adapters.testadapter3"
        class_name = "TestAdapterHDDRequired"
        group = dao.find_group(module, class_name)
        adapter = FlowService().build_adapter_instance(group)
        output = adapter.get_output()
        output_type = output[0].__name__
        data = {"test": 100}
        TvbProfile.current.MAX_DISK_SPACE = (1 + float(adapter.get_required_disk_size(**data)))
        tmp_folder = FilesHelper().get_project_folder(self.test_project, "TEMP")
        self.operation_service.initiate_operation(self.test_user, self.test_project.id, adapter,
                                                  tmp_folder, method_name=ABCAdapter.LAUNCH_METHOD, **data)
        dts = dao.get_values_of_datatype(self.test_project.id, Datatype2)[0]
        self.assertEqual(len(dts), 1)
        datatype = dao.get_datatype_by_id(dts[0][0])
        self.assertEqual(datatype.subject, DataTypeMetaData.DEFAULT_SUBJECT, "Wrong data stored.")
        self.assertEqual(datatype.type, output_type, "Wrong data stored.")
        #Now update the maximum disk size to be less than size of the previously resulted datatypes (transform kB to MB)
        #plus what is estimated to be required from the next one (transform from B to MB)
        TvbProfile.current.MAX_DISK_SPACE = float(datatype.disk_size - 1) + \
                                                float(adapter.get_required_disk_size(**data) - 1)

        self.assertRaises(NoMemoryAvailableException, self.operation_service.initiate_operation, self.test_user,
                          self.test_project.id, adapter,
                          tmp_folder, method_name=ABCAdapter.LAUNCH_METHOD, **data)
        dts = dao.get_values_of_datatype(self.test_project.id, Datatype2)[0]
        self.assertEqual(len(dts), 1)
开发者ID:unimauro,项目名称:tvb-framework,代码行数:31,代码来源:operation_service_test.py


示例14: create_group

 def create_group(test_user=None, test_project=None, subject="John Doe"):
     """
     Create a group of 2 operations, each with at least one resultant DataType.
     """
     if test_user is None:
         test_user = TestFactory.create_user()  
     if test_project is None:
         test_project = TestFactory.create_project(test_user)
        
     ### Retrieve Adapter instance 
     algo_group = dao.find_group('tvb.tests.framework.adapters.testadapter3', 'TestAdapter3')
     algo_category = dao.get_category_by_id(algo_group.fk_category)
     algo = dao.get_algorithm_by_group(algo_group.id) 
     
     adapter_inst = TestFactory.create_adapter(algo_group=algo_group, test_project=test_project)
     adapter_inst.meta_data = {DataTypeMetaData.KEY_SUBJECT: subject}
     args = {model.RANGE_PARAMETER_1: 'param_5', 'param_5': [1, 2]}
     
     ### Prepare Operations group. Execute them synchronously
     service = OperationService()
     operations = service.prepare_operations(test_user.id, test_project.id, algo, algo_category, {}, **args)[0]
     service.launch_operation(operations[0].id, False, adapter_inst)
     service.launch_operation(operations[1].id, False, adapter_inst)
     
     resulted_dts = dao.get_datatype_in_group(operation_group_id=operations[0].fk_operation_group)
     return resulted_dts, operations[0].fk_operation_group
开发者ID:unimauro,项目名称:tvb-framework,代码行数:26,代码来源:test_factory.py


示例15: _create_mapped_arrays

    def _create_mapped_arrays(self, project_id):
        """
        :param project_id: the project in which the arrays are created
        :return: a list of dummy `MappedArray`
        """
        count = self.flow_service.get_available_datatypes(project_id, "tvb.datatypes.arrays.MappedArray")[1]
        self.assertEqual(count, 0)
        
        algo_group = dao.find_group('tvb.tests.framework.adapters.ndimensionarrayadapter', 'NDimensionArrayAdapter')
        group, _ = self.flow_service.prepare_adapter(project_id, algo_group)

        adapter_instance = self.flow_service.build_adapter_instance(group)
        data = {'param_1': 'some value'}
        #create 3 data types
        self.flow_service.fire_operation(adapter_instance, self.test_user, project_id, **data)
        count = self.flow_service.get_available_datatypes(project_id, "tvb.datatypes.arrays.MappedArray")[1]
        self.assertEqual(count, 1)
        
        self.flow_service.fire_operation(adapter_instance, self.test_user, project_id, **data)
        count = self.flow_service.get_available_datatypes(project_id, "tvb.datatypes.arrays.MappedArray")[1]
        self.assertEqual(count, 2)
        
        self.flow_service.fire_operation(adapter_instance, self.test_user, project_id, **data)
        array_wrappers, count = self.flow_service.get_available_datatypes(project_id,
                                                                          "tvb.datatypes.arrays.MappedArray")
        self.assertEqual(count, 3)

        return array_wrappers
开发者ID:amitsaroj001,项目名称:tvb-framework,代码行数:28,代码来源:project_structure_test.py


示例16: create_operation

 def create_operation(algorithm=None, test_user=None, test_project=None, 
                      operation_status=model.STATUS_FINISHED, parameters="test params"):
     """
     Create persisted operation.
     
     :param algorithm: When not None, introspect TVB and TVB_TEST for adapters.
     :return: Operation entity after persistence. 
     """
     if algorithm is None:
         algo_group = dao.find_group('tvb.tests.framework.adapters.ndimensionarrayadapter', 'NDimensionArrayAdapter')
         algorithm = dao.get_algorithm_by_group(algo_group.id)
         
     if test_user is None:
         test_user = TestFactory.create_user()
         
     if test_project is None:
         test_project = TestFactory.create_project(test_user)
         
     meta = {DataTypeMetaData.KEY_SUBJECT: "John Doe",
             DataTypeMetaData.KEY_STATE: "RAW_DATA"}
     operation = model.Operation(test_user.id, test_project.id, algorithm.id, parameters, meta=json.dumps(meta),
                                 status=operation_status, method_name=ABCAdapter.LAUNCH_METHOD)
     dao.store_entity(operation)
     ### Make sure lazy attributes are correctly loaded.
     return dao.get_operation_by_id(operation.id)
开发者ID:unimauro,项目名称:tvb-framework,代码行数:25,代码来源:test_factory.py


示例17: test_adapter_memory

 def test_adapter_memory(self):
     """
     Test that a method not implemented exception is raised in case the
     get_required_memory_size method is not implemented.
     """
     algo_group = dao.find_group("tvb.tests.framework.adapters.testadapter3", "TestAdapterHDDRequired")
     adapter = FlowService().build_adapter_instance(algo_group)
     self.assertEqual(42, adapter.get_required_memory_size())
开发者ID:sdiazpier,项目名称:tvb-framework,代码行数:8,代码来源:adapters_memory_usage_tests.py


示例18: _run_cff_importer

    def _run_cff_importer(self, cff_path):
        ### Retrieve Adapter instance
        group = dao.find_group('tvb.adapters.uploaders.cff_importer', 'CFF_Importer')
        importer = ABCAdapter.build_adapter(group)
        args = {'cff': cff_path, DataTypeMetaData.KEY_SUBJECT: DataTypeMetaData.DEFAULT_SUBJECT}

        ### Launch Operation
        FlowService().fire_operation(importer, self.test_user, self.test_project.id, **args)
开发者ID:amitsaroj001,项目名称:tvb-framework,代码行数:8,代码来源:cff_importer_test.py


示例19: get_algorithm_by_module_and_class

 def get_algorithm_by_module_and_class(module, classname):
     """
     Get the db entry from the algorithm table for the given module and 
     class.
     """
     group = dao.find_group(module, classname)
     algo = dao.get_algorithm_by_group(group.id)
     return algo, group
开发者ID:sdiazpier,项目名称:tvb-framework,代码行数:8,代码来源:flow_service.py


示例20: import_surface_obj

    def import_surface_obj(user, project, obj_path, surface_type):
        ### Retrieve Adapter instance
        group = dao.find_group('tvb.adapters.uploaders.obj_importer', 'ObjSurfaceImporter')
        importer = ABCAdapter.build_adapter(group)
        args = {'data_file': obj_path,
                'surface_type': surface_type}

        ### Launch Operation
        FlowService().fire_operation(importer, user, project.id, **args)
开发者ID:sdiazpier,项目名称:tvb-framework,代码行数:9,代码来源:test_factory.py



注:本文中的tvb.core.entities.storage.dao.find_group函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python dao.get_burst_by_id函数代码示例发布时间:2022-05-27
下一篇:
Python storage.SA_SESSIONMAKER类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap