• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python port_agent_client.PortAgentPacket类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mi.core.instrument.port_agent_client.PortAgentPacket的典型用法代码示例。如果您正苦于以下问题:Python PortAgentPacket类的具体用法?Python PortAgentPacket怎么用?Python PortAgentPacket使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了PortAgentPacket类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_valid_complete_sample

    def test_valid_complete_sample(self):
        """
        Create a mock port agent
        """
        mock_port_agent = Mock(spec=LoggerClient)

        """
        Instantiate the driver class directly (no driver client, no driver
        client, no zmq driver process, no driver process; just own the driver)
        """
        test_driver = ooicoreInstrumentDriver(self.my_event_callback)

        current_state = test_driver.get_resource_state()
        self.assertEqual(current_state, DriverConnectionState.UNCONFIGURED)

        """
        Now configure the driver with the mock_port_agent, verifying
        that the driver transitions to the DISCONNECTED state
        """
        config = {"mock_port_agent": mock_port_agent}
        test_driver.configure(config=config)
        current_state = test_driver.get_resource_state()
        self.assertEqual(current_state, DriverConnectionState.DISCONNECTED)

        """
        Invoke the connect method of the driver: should connect to mock
        port agent.  Verify that the connection FSM transitions to CONNECTED,
        (which means that the FSM should now be reporting the ProtocolState).
        """
        test_driver.connect()
        current_state = test_driver.get_resource_state()
        self.assertEqual(current_state, DriverProtocolState.UNKNOWN)

        """
        Force the driver into AUTOSAMPLE state so that it will parse and 
        publish samples
        """
        test_driver.set_test_mode(True)
        test_driver.test_force_state(state=DriverProtocolState.AUTOSAMPLE)
        current_state = test_driver.get_resource_state()
        self.assertEqual(current_state, DriverProtocolState.AUTOSAMPLE)

        """
        - Reset test verification variables.
        - Construct a complete sample
        - Pass to got_data()
        - Verify that raw and parsed streams have been received
        """
        self.reset_test_vars()
        packet = PortAgentPacket()
        header = "\xa3\x9d\x7a\x02\x00\x1c\x0b\x2e\x00\x00\x00\x01\x80\x00\x00\x00"
        packet.unpack_header(header)
        packet.attach_data(FULL_SAMPLE)

        test_driver._protocol.got_data(packet)

        self.assertTrue(self.raw_stream_received)
        self.assertTrue(self.parsed_stream_received)
开发者ID:sfoley,项目名称:marine-integrations,代码行数:58,代码来源:test_driver.py


示例2: _get_header

def _get_header(buffer):
    packet = PortAgentPacket()
    if(len(buffer) < HEADER_SIZE): return None

    header = buffer[0:HEADER_SIZE]
    packet.unpack_header(header)

    if(packet.get_data_length() < 0): return None

    return packet
开发者ID:JeffRoy,项目名称:marine-integrations,代码行数:10,代码来源:watch_data_log.py


示例3: _get_header

def _get_header(buffer):
    packet = PortAgentPacket()
    if(len(buffer) < HEADER_SIZE): return None

    header = buffer[0:HEADER_SIZE]
    packet.unpack_header(header)

    if(packet.get_data_length() < 0): return None

    print "time: %f" % packet.get_timestamp()

    return packet
开发者ID:JeffRoy,项目名称:marine-integrations,代码行数:12,代码来源:cat_data_log.py


示例4: setUp

    def setUp(self):
        """
        """
        #self.start_couchdb() # appease the pyon gods        
        self.sample_port_timestamp = 3555423720.711772
        self.sample_driver_timestamp = 3555423721.711772
        self.sample_internal_timestamp = 3555423719.711772
        self.sample_raw_data = "SATPAR0229,10.01,2206748544,234"

        self.parsed_test_particle = self.TestDataParticle(self.sample_raw_data,
                                    port_timestamp=self.sample_port_timestamp,
                                    quality_flag=DataParticleValue.INVALID,
                                    preferred_timestamp=DataParticleKey.DRIVER_TIMESTAMP)
        self.sample_parsed_particle = {
                                DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
                                DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
                                DataParticleKey.STREAM_NAME: TEST_PARTICLE_TYPE,
                                DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
                                DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
                                DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.DRIVER_TIMESTAMP,
                                DataParticleKey.QUALITY_FLAG: DataParticleValue.INVALID,
                                DataParticleKey.VALUES: [
                                    {DataParticleKey.VALUE_ID: "temp",
                                     DataParticleKey.VALUE: "23.45"},
                                    {DataParticleKey.VALUE_ID: "cond",
                                     DataParticleKey.VALUE: "15.9"},
                                    {DataParticleKey.VALUE_ID: "depth",
                                     DataParticleKey.VALUE: "305.16"}
                                    ]
                                }

        self.port_agent_packet = PortAgentPacket()
        self.port_agent_packet.attach_data(self.sample_raw_data)
        self.port_agent_packet.pack_header()

        self.raw_test_particle = RawDataParticle(self.port_agent_packet.get_as_dict(),
                                    port_timestamp=self.sample_port_timestamp,
                                    internal_timestamp=self.sample_internal_timestamp)

        self.sample_raw_particle = {
                               DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
                               DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
                               DataParticleKey.STREAM_NAME: CommonDataParticleType.RAW,
                               DataParticleKey.INTERNAL_TIMESTAMP: self.sample_internal_timestamp,
                               DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
                               DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
                               DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.PORT_TIMESTAMP,
                               DataParticleKey.QUALITY_FLAG: "ok",
                               DataParticleKey.VALUES: [
                                   { DataParticleKey.VALUE_ID: "raw",
                                     DataParticleKey.VALUE: base64.b64encode(self.sample_raw_data),
                                     "binary": True },
                                   { DataParticleKey.VALUE_ID: "length",
                                     DataParticleKey.VALUE: int(31)},
                                   { DataParticleKey.VALUE_ID: "type",
                                     DataParticleKey.VALUE: int(2)},
                                  { DataParticleKey.VALUE_ID: "checksum",
                                    DataParticleKey.VALUE: int(2392)},
                                  ]
                                }
开发者ID:s-pearce,项目名称:marine-integrations,代码行数:60,代码来源:test_data_particle.py


示例5: _create_port_agent_packet

 def _create_port_agent_packet(self, data_item):
     ts = ntplib.system_to_ntp_time(time.time())
     port_agent_packet = PortAgentPacket()
     port_agent_packet.attach_data(data_item)
     port_agent_packet.attach_timestamp(ts)
     port_agent_packet.pack_header()
     return port_agent_packet
开发者ID:danmergens,项目名称:mi-instrument,代码行数:7,代码来源:test_driver.py


示例6: test_unpack_header

    def test_unpack_header(self):
        self.pap = PortAgentPacket()
        data_length = 32
        data = self.pap.unpack_header(array.array('B', [163, 157, 122, 2, 0, data_length + HEADER_SIZE, 14, 145, 65, 234, 142, 154, 23, 155, 51, 51]))
        got_timestamp = self.pap.get_timestamp()

        self.assertEqual(self.pap.get_header_type(), self.pap.DATA_FROM_DRIVER)
        self.assertEqual(self.pap.get_data_length(), data_length)
        self.assertEqual(got_timestamp, 1105890970.110589)
        self.assertEqual(self.pap.get_header_recv_checksum(), 3729) 
开发者ID:deverett,项目名称:marine-integrations,代码行数:10,代码来源:test_port_agent_client.py


示例7: test_unpack_header

    def test_unpack_header(self):
        self.pap = PortAgentPacket()
        data = self.pap.unpack_header(array.array('B', [163, 157, 122, 2, 0, 48, 14, 145, 65, 234, 142, 154, 23, 155, 51, 51]))

        self.assertEqual(self.pap.get_header_type(), 2)
        self.assertEqual(self.pap.get_header_length(), 32)
        self.assertEqual(time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.localtime(self.ntp_to_system_time(self.pap.get_timestamp()))), "Thu, 13 Dec 2012 14:10:04 +0000")
        self.assertEqual(self.pap.get_header_recv_checksum(), 3729) #@TODO Probably should wire in one of these checksums.
        self.assertEqual(self.pap.get_header_checksum(), None)
        pass
开发者ID:lukecampbell,项目名称:marine-integrations,代码行数:10,代码来源:test_port_agent_client.py


示例8: send_port_agent_packet

    def send_port_agent_packet(protocol, data):
        ts = ntplib.system_to_ntp_time(time.time())
        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data(data)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()

        # Push the response into the driver
        protocol.got_data(port_agent_packet)
        protocol.got_raw(port_agent_packet)
        log.debug('Sent port agent packet containing: %r', data)
开发者ID:danmergens,项目名称:marine-integrations,代码行数:11,代码来源:test_driver.py


示例9: _send_port_agent_packet

    def _send_port_agent_packet(self, driver, data):
        """
        Send the supplied data to the driver in a port agent packet
        @param driver: instrument driver instance
        @param data: data to be sent
        """
        ts = ntplib.system_to_ntp_time(time.time())
        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data(data)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()

        # Push the response into the driver
        driver._protocol.got_data(port_agent_packet)
开发者ID:kehunt06,项目名称:mi-instrument,代码行数:14,代码来源:test_driver.py


示例10: _send_port_agent_packet

    def _send_port_agent_packet(self, driver, data):
        """
        Send the supplied data to the driver in a port agent packet
        @param driver: instrument driver instance
        @param data: data to be sent
        """
        ts = ntplib.system_to_ntp_time(time.time())
        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data(data)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()

        # Push the response into the driver
        driver._protocol.got_data(port_agent_packet)
        # sleep briefly, as some state changes happen asynchronously, this should give those
        # threads time to finish
        time.sleep(.01)
开发者ID:danmergens,项目名称:mi-instrument,代码行数:17,代码来源:test_driver.py


示例11: test_dump_settings_response

    def test_dump_settings_response(self):
        """
        """
        mock_port_agent = Mock(spec=PortAgentClient)
        driver = InstrumentDriver(self._got_data_event_callback)
        driver.set_test_mode(True)

        current_state = driver.get_resource_state()
        self.assertEqual(current_state, DriverConnectionState.UNCONFIGURED)

        # Now configure the driver with the mock_port_agent, verifying
        # that the driver transitions to that state
        config = {'mock_port_agent' : mock_port_agent}
        driver.configure(config = config)

        current_state = driver.get_resource_state()
        self.assertEqual(current_state, DriverConnectionState.DISCONNECTED)

        # Invoke the connect method of the driver: should connect to mock
        # port agent.  Verify that the connection FSM transitions to CONNECTED,
        # (which means that the FSM should now be reporting the ProtocolState).
        driver.connect()
        current_state = driver.get_resource_state()
        self.assertEqual(current_state, DriverProtocolState.UNKNOWN)

        # Force the instrument into a known state
        self.assert_force_state(driver, DriverProtocolState.COMMAND)
        ts = ntplib.system_to_ntp_time(time.time())

        log.debug("DUMP_SETTINGS command response: %s", DUMP_COMMAND_RESPONSE)
        # Create and populate the port agent packet.
        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data(DUMP_COMMAND_RESPONSE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()

        # Push the response into the driver
        driver._protocol.got_data(port_agent_packet)
        response = driver._protocol._get_response(expected_prompt = 
                                                       NANO_DUMP_SETTINGS)
        
        self.assertTrue(isinstance(response[1], NANOCommandResponse))
开发者ID:StevenMyerson,项目名称:marine-integrations,代码行数:42,代码来源:test_driver.py


示例12: TestUnitDataParticle

class TestUnitDataParticle(MiUnitTestCase):
    """
    Test cases for the DataParticleGenerator class. Functions in this class
    provide unit tests and provide a tutorial on use of
    the data particle generator interface.
    """
    
    class TestDataParticle(DataParticle):
        """
        Simple test DataParticle derivative that returns fixed parsed
        data values
        
        @retval Value list for parased data set [{'value_id': foo,
                                                 'value': bar}, ...]                                   
        """
        _data_particle_type = TEST_PARTICLE_TYPE

        def _build_parsed_values(self):
            result = [{DataParticleKey.VALUE_ID: "temp",
                       DataParticleKey.VALUE: "23.45"},
                      {DataParticleKey.VALUE_ID: "cond",
                       DataParticleKey.VALUE: "15.9"},
                      {DataParticleKey.VALUE_ID: "depth",
                       DataParticleKey.VALUE: "305.16"}]
            return result

    class BadDataParticle(DataParticle):
         """
         Define a data particle that doesn't initialize _data_particle_type.
         Should raise an exception when _data_particle_type isn't initialized
         """
         pass

    def setUp(self):
        """
        """
        #self.start_couchdb() # appease the pyon gods        
        self.sample_port_timestamp = 3555423720.711772
        self.sample_driver_timestamp = 3555423721.711772
        self.sample_internal_timestamp = 3555423719.711772
        self.sample_raw_data = "SATPAR0229,10.01,2206748544,234"

        self.parsed_test_particle = self.TestDataParticle(self.sample_raw_data,
                                    port_timestamp=self.sample_port_timestamp,
                                    quality_flag=DataParticleValue.INVALID,
                                    preferred_timestamp=DataParticleKey.DRIVER_TIMESTAMP)
        self.sample_parsed_particle = {
                                DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
                                DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
                                DataParticleKey.STREAM_NAME: TEST_PARTICLE_TYPE,
                                DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
                                DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
                                DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.DRIVER_TIMESTAMP,
                                DataParticleKey.QUALITY_FLAG: DataParticleValue.INVALID,
                                DataParticleKey.VALUES: [
                                    {DataParticleKey.VALUE_ID: "temp",
                                     DataParticleKey.VALUE: "23.45"},
                                    {DataParticleKey.VALUE_ID: "cond",
                                     DataParticleKey.VALUE: "15.9"},
                                    {DataParticleKey.VALUE_ID: "depth",
                                     DataParticleKey.VALUE: "305.16"}
                                    ]
                                }

        self.port_agent_packet = PortAgentPacket()
        self.port_agent_packet.attach_data(self.sample_raw_data)
        self.port_agent_packet.pack_header()

        self.raw_test_particle = RawDataParticle(self.port_agent_packet.get_as_dict(),
                                    port_timestamp=self.sample_port_timestamp,
                                    internal_timestamp=self.sample_internal_timestamp)

        self.sample_raw_particle = {
                               DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
                               DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
                               DataParticleKey.STREAM_NAME: CommonDataParticleType.RAW,
                               DataParticleKey.INTERNAL_TIMESTAMP: self.sample_internal_timestamp,
                               DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
                               DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
                               DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.PORT_TIMESTAMP,
                               DataParticleKey.QUALITY_FLAG: "ok",
                               DataParticleKey.VALUES: [
                                   { DataParticleKey.VALUE_ID: "raw",
                                     DataParticleKey.VALUE: base64.b64encode(self.sample_raw_data),
                                     "binary": True },
                                   { DataParticleKey.VALUE_ID: "length",
                                     DataParticleKey.VALUE: int(31)},
                                   { DataParticleKey.VALUE_ID: "type",
                                     DataParticleKey.VALUE: int(2)},
                                  { DataParticleKey.VALUE_ID: "checksum",
                                    DataParticleKey.VALUE: int(2392)},
                                  ]
                                }

    def test_parsed_generate(self):
        """
        Test generation of a data particle
        """
        # Create some sample data as a param dict
        # Submit it to a data particle generator with a timestamp
#.........这里部分代码省略.........
开发者ID:s-pearce,项目名称:marine-integrations,代码行数:101,代码来源:test_data_particle.py


示例13: setUp

 def setUp(self):
     self.pap = PortAgentPacket()
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:2,代码来源:test_port_agent_client.py


示例14: PAClientTestPortAgentPacket

class PAClientTestPortAgentPacket(MiUnitTest):

    @staticmethod
    def ntp_to_system_time(date):
        """convert a NTP time to system time"""
        return date - NTP_DELTA

    @staticmethod
    def system_to_ntp_time(date):
        """convert a system time to a NTP time"""
        return date + NTP_DELTA

    def setUp(self):
        self.pap = PortAgentPacket()
        # self.test_time = time.time()
        # self.ntp_time = self.system_to_ntp_time(self.test_time)

        # self.pap.set_timestamp(self.ntp_time)

    def test_pack_header(self):
        test_data = "Only the length of this matters?"
        test_data_length = len(test_data)
        self.pap.attach_data(test_data)
        self.pap.pack_header()
        self.assertEqual(self.pap.get_data_length(), test_data_length)

    def test_get_length(self):
        test_length = 100
        self.pap.set_data_length(test_length)
        got_length = self.pap.get_data_length()
        self.assertEqual(got_length, test_length)

    def test_checksum(self):
        """
        This tests the checksum algorithm; if somebody changes the algorithm
        this test should catch it.  Had to jump through some hoops to do this;
        needed to add set_data_length and set_header because we're building our
        own header here (the one in PortAgentPacket includes the timestamp
        so the checksum is not consistent).
        """
        test_data = "This tests the checksum algorithm."
        test_length = len(test_data)
        self.pap.attach_data(test_data)

        # Now build a header
        variable_tuple = (0xa3, 0x9d, 0x7a, self.pap.DATA_FROM_DRIVER,
                          test_length + HEADER_SIZE, 0x0000,
                          0)
        self.pap.set_data_length(test_length)

        header_format = '>BBBBHHd'
        size = struct.calcsize(header_format)
        temp_header = ctypes.create_string_buffer(size)
        struct.pack_into(header_format, temp_header, 0, *variable_tuple)

        # Now set the header member in PortAgentPacket to the header
        # we built
        self.pap.set_header(temp_header.raw)

        # Now get the checksum and verify it is what we expect it to be.
        checksum = self.pap.calculate_checksum()
        self.assertEqual(checksum, 2)

    def test_unpack_header(self):
        self.pap = PortAgentPacket()
        data_length = 32
        data = self.pap.unpack_header(array.array('B',
                                                  [163, 157, 122, 2, 0, data_length + HEADER_SIZE, 14, 145, 65, 234,
                                                   142, 154, 23, 155, 51, 51]))
        got_timestamp = self.pap.get_timestamp()

        self.assertEqual(self.pap.get_header_type(), self.pap.DATA_FROM_DRIVER)
        self.assertEqual(self.pap.get_data_length(), data_length)
        self.assertEqual(got_timestamp, 1105890970.092212)
        self.assertEqual(self.pap.get_header_recv_checksum(), 3729)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:75,代码来源:test_port_agent_client.py


示例15: test_handle_packet

    def test_handle_packet(self):
        """
        Test that a default PortAgentPacket creates a DATA_FROM_DRIVER packet,
        and that the handle_packet method invokes the raw callback
        """
        pa_listener = Listener(None, None, 0, 0, 5, self.myGotData, self.myGotRaw, self.myGotListenerError,
                               self.myGotError)

        test_data = "This is a great big test"
        self.resetTestVars()
        pa_packet = PortAgentPacket()
        pa_packet.attach_data(test_data)
        pa_packet.pack_header()
        pa_packet.verify_checksum()

        pa_listener.handle_packet(pa_packet)

        self.assertTrue(self.rawCallbackCalled)

        ###
        # Test DATA_FROM_INSTRUMENT; handle_packet should invoke data and raw
        # callbacks.
        ###
        self.resetTestVars()
        pa_packet = PortAgentPacket(PortAgentPacket.DATA_FROM_INSTRUMENT)
        pa_packet.attach_data(test_data)
        pa_packet.pack_header()
        pa_packet.verify_checksum()

        pa_listener.handle_packet(pa_packet)

        self.assertTrue(self.rawCallbackCalled)
        self.assertTrue(self.dataCallbackCalled)
        self.assertFalse(self.errorCallbackCalled)
        self.assertFalse(self.listenerCallbackCalled)

        ###
        # Test PORT_AGENT_COMMAND; handle_packet should invoke raw callback.
        ###
        self.resetTestVars()
        pa_packet = PortAgentPacket(PortAgentPacket.PORT_AGENT_COMMAND)
        pa_packet.attach_data(test_data)
        pa_packet.pack_header()
        pa_packet.verify_checksum()

        pa_listener.handle_packet(pa_packet)

        self.assertTrue(self.rawCallbackCalled)
        self.assertFalse(self.dataCallbackCalled)
        self.assertFalse(self.errorCallbackCalled)
        self.assertFalse(self.listenerCallbackCalled)

        ###
        # Test PORT_AGENT_STATUS; handle_packet should invoke raw callback.
        ###
        self.resetTestVars()
        pa_packet = PortAgentPacket(PortAgentPacket.PORT_AGENT_STATUS)
        pa_packet.attach_data(test_data)
        pa_packet.pack_header()
        pa_packet.verify_checksum()

        pa_listener.handle_packet(pa_packet)

        self.assertTrue(self.rawCallbackCalled)
        self.assertFalse(self.dataCallbackCalled)
        self.assertFalse(self.errorCallbackCalled)
        self.assertFalse(self.listenerCallbackCalled)

        ###
        # Test PORT_AGENT_FAULT; handle_packet should invoke raw callback.
        ###
        self.resetTestVars()
        pa_packet = PortAgentPacket(PortAgentPacket.PORT_AGENT_FAULT)
        pa_packet.attach_data(test_data)
        pa_packet.pack_header()
        pa_packet.verify_checksum()

        pa_listener.handle_packet(pa_packet)

        self.assertTrue(self.rawCallbackCalled)
        self.assertFalse(self.dataCallbackCalled)
        self.assertFalse(self.errorCallbackCalled)
        self.assertFalse(self.listenerCallbackCalled)

        ###
        # Test INSTRUMENT_COMMAND; handle_packet should invoke raw callback.
        ###
        self.resetTestVars()
        pa_packet = PortAgentPacket(PortAgentPacket.DIGI_CMD)
        pa_packet.attach_data(test_data)
        pa_packet.pack_header()
        pa_packet.verify_checksum()

        pa_listener.handle_packet(pa_packet)

        self.assertTrue(self.rawCallbackCalled)
        self.assertFalse(self.dataCallbackCalled)
        self.assertFalse(self.errorCallbackCalled)
        self.assertFalse(self.listenerCallbackCalled)

#.........这里部分代码省略.........
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:101,代码来源:test_port_agent_client.py


示例16: TestUnitDataParticle

class TestUnitDataParticle(MiUnitTestCase):
    """
    Test cases for the DataParticleGenerator class. Functions in this class
    provide unit tests and provide a tutorial on use of
    the data particle generator interface.
    """
    
    class TestDataParticle(DataParticle):
        """
        Simple test DataParticle derivative that returns fixed parsed
        data values
        
        @retval Value list for parased data set [{'value_id': foo,
                                                 'value': bar}, ...]                                   
        """
        _data_particle_type = TEST_PARTICLE_TYPE

        def _build_parsed_values(self):
            result = [{DataParticleKey.VALUE_ID: "temp",
                       DataParticleKey.VALUE: "23.45"},
                      {DataParticleKey.VALUE_ID: "cond",
                       DataParticleKey.VALUE: "15.9"},
                      {DataParticleKey.VALUE_ID: "depth",
                       DataParticleKey.VALUE: "305.16"}]
            return result

    class BadDataParticle(DataParticle):
         """
         Define a data particle that doesn't initialize _data_particle_type.
         Should raise an exception when _data_particle_type isn't initialized
         """
         pass

    def setUp(self):
        """
        """
        #self.start_couchdb() # appease the pyon gods        
        self.sample_port_timestamp = 3555423720.711772
        self.sample_driver_timestamp = 3555423721.711772
        self.sample_internal_timestamp = 3555423719.711772
        self.sample_raw_data = "SATPAR0229,10.01,2206748544,234"

        self.parsed_test_particle = self.TestDataParticle(self.sample_raw_data,
                                    port_timestamp=self.sample_port_timestamp,
                                    quality_flag=DataParticleValue.INVALID,
                                    preferred_timestamp=DataParticleKey.DRIVER_TIMESTAMP)
        self.sample_parsed_particle = {
                                DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
                                DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
                                DataParticleKey.STREAM_NAME: TEST_PARTICLE_TYPE,
                                DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
                                DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
                                #DataParticleKey.NEW_SEQUENCE: None,
                                DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.DRIVER_TIMESTAMP,
                                DataParticleKey.QUALITY_FLAG: DataParticleValue.INVALID,
                                DataParticleKey.VALUES: [
                                    {DataParticleKey.VALUE_ID: "temp",
                                     DataParticleKey.VALUE: "23.45"},
                                    {DataParticleKey.VALUE_ID: "cond",
                                     DataParticleKey.VALUE: "15.9"},
                                    {DataParticleKey.VALUE_ID: "depth",
                                     DataParticleKey.VALUE: "305.16"}
                                    ]
                                }

        self.port_agent_packet = PortAgentPacket()
        self.port_agent_packet.attach_data(self.sample_raw_data)
        self.port_agent_packet.pack_header()

        self.raw_test_particle = RawDataParticle(self.port_agent_packet.get_as_dict(),
                                    port_timestamp=self.sample_port_timestamp,
                                    internal_timestamp=self.sample_internal_timestamp)

        self.sample_raw_particle = {
                               DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
                               DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
                               DataParticleKey.STREAM_NAME: CommonDataParticleType.RAW,
                               DataParticleKey.INTERNAL_TIMESTAMP: self.sample_internal_timestamp,
                               DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
                               DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
                               #DataParticleKey.NEW_SEQUENCE: None,
                               DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.PORT_TIMESTAMP,
                               DataParticleKey.QUALITY_FLAG: "ok",
                               DataParticleKey.VALUES: [
                                   { DataParticleKey.VALUE_ID: "raw",
                                     DataParticleKey.VALUE: base64.b64encode(self.sample_raw_data),
                                     "binary": True },
                                   { DataParticleKey.VALUE_ID: "length",
                                     DataParticleKey.VALUE: int(31)},
                                   { DataParticleKey.VALUE_ID: "type",
                                     DataParticleKey.VALUE: int(2)},
                                  { DataParticleKey.VALUE_ID: "checksum",
                                    DataParticleKey.VALUE: int(2392)},
                                  ]
                                }

    def test_new_sequence_flag(self):
        """
        Verify that we can set the new sequence flag
        """
#.........这里部分代码省略.........
开发者ID:aplmmilcic,项目名称:mi-instrument,代码行数:101,代码来源:test_data_particle.py


示例17: test_status_01

    def test_status_01(self):
        """
        """
        mock_port_agent = Mock(spec=PortAgentClient)
        driver = InstrumentDriver(self._got_data_event_callback)
        driver.set_test_mode(True)

        current_state = driver.get_resource_state()
        self.assertEqual(current_state, DriverConnectionState.UNCONFIGURED)

        # Now configure the driver with the mock_port_agent, verifying
        # that the driver transitions to that state
        config = {'mock_port_agent' : mock_port_agent}
        driver.configure(config = config)

        current_state = driver.get_resource_state()
        self.assertEqual(current_state, DriverConnectionState.DISCONNECTED)

        # Invoke the connect method of the driver: should connect to mock
        # port agent.  Verify that the connection FSM transitions to CONNECTED,
        # (which means that the FSM should now be reporting the ProtocolState).
        driver.connect()
        current_state = driver.get_resource_state()
        self.assertEqual(current_state, DriverProtocolState.UNKNOWN)

        # Force the instrument into a known state
        self.assert_force_state(driver, DriverProtocolState.COMMAND)
        ts = ntplib.system_to_ntp_time(time.time())

        log.debug("DUMP_STATUS: %s", DUMP_STATUS)
        # Create and populate the port agent packet.
        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data(DUMP_STATUS)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("HEAT,2013/06/19 23:04:37,-001,0000,0026" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("LILY,2013/06/19 23:04:38, -49.455,  34.009,193.91, 26.02,11.96,N9655" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("NANO,V,2013/06/19 23:04:38.000,13.987223,25.126694121" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("LILY,2013/06/19 23:04:39, -49.483,  33.959,193.85, 26.03,11.96,N9655" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("NANO,V,2013/06/19 23:04:39.000,13.987191,25.126709409" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("LILY,2013/06/19 23:04:40, -49.355,  33.956,193.79, 26.02,11.96,N9655" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("NANO,V,2013/06/19 23:04:40.000,13.987253,25.126725854" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("HEAT,2013/06/19 23:04:40,-001,0000,0026" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("NANO,2013/06/19 21:46:54,*APPLIED GEOMECHANICS Model MD900-T Firmware V5.2 SN-N3616 ID01" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("NANO,V,2013/06/19 21:46:54.000,13.990480,25.027793612" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
        port_agent_packet.pack_header()
        #driver._protocol.got_data(port_agent_packet)

        port_agent_packet = PortAgentPacket()
        port_agent_packet.attach_data("NANO,2013/06/19 21:46:54,*01: Vbias= 0.0000 0.0000 0.0000 0.0000" + NEWLINE)
        port_agent_packet.attach_timestamp(ts)
#.........这里部分代码省略.........
开发者ID:StevenMyerson,项目名称:marine-integrations,代码行数:101,代码来源:test_driver.py


示例18: test_sample

    def test_sample(self):
        """
        Create a mock port agent
        """
        mock_port_agent = Mock(spec=PortAgentClient)

        """
        Instantiate the driver class directly (no driver client, no driver
        client, no zmq driver process, no driver process; just own the driver)
        """                  
        test_driver = InstrumentDriver(self.my_event_callback)
        
        current_state = test_driver.get_resource_state()
        print "DHE: DriverConnectionState: " + str(current_state)
        self.assertEqual(current_state, DriverConnectionState.UNCONFIGURED)
        
        """
        Now configure the driver with the mock_port_agent, verifying
        that the driver transitions to that state
        """
        config = {'mock_port_agent' : mock_port_agent}
        # commented below out; trying real port agent
        test_driver.configure(config = config)

        """
        DHE: trying this; want to invoke the driver methods themselves, but 
        with the driver talking to the port_agent (through the client).
        Will it work? Answer: It does, but I can't step through it.  Let
        me try just running this.
        """
        #test_driver.configure(config = self.port_agent_comm_config())
        current_state = test_driver.get_resource_state()
        print "DHE: DriverConnectionState: " + str(current_state)
        self.assertEqual(current_state, DriverConnectionState.DISCONNECTED)
        
        """
        Invoke the connect method of the driver: should connect to mock
        port agent.  Verify that the connection FSM transitions to CONNECTED,
        (which means that the FSM should now be reporting the ProtocolState).
        """
        
        """
        TODO: DHE: Shouldn't have to put a sleep here; it's like the
        port_agent isn't up yet; I get a connection refused without this.
        """
        gevent.sleep(2)

        test_driver.connect()
        current_state = test_driver.get_resource_state()
        print "DHE: DriverConnectionState: " + str(current_state)
        self.assertEqual(current_state, DriverProtocolState.UNKNOWN)

        """
        Invoke the discover_state method of the driver
        """
        # commenting out because it will try wakeup
        #test_driver.discover_state()
        #current_state = test_driver.get_resource_state()
        #print "DHE: DriverConnectionState: " + str(current_state)

        """
        Force state to command state.
        """
        
        """
        This won't work right now because the SBE16 does an _update_params upon
        entry to the command_handler; since this is a unit test there is nothing
        to respond to the ds and dc commands.  Setting up mock to do it would
        be really handy.
        """
        test_driver.execute_force_state(state = DriverProtocolState.AUTOSAMPLE)
        current_state = test_driver.get_resource_state()
        print "DHE: DriverConnectionState: " + str(current_state)
        self.assertEqual(current_state, DriverProtocolState.AUTOSAMPLE)

        self.reset_test_vars()
        test_sample = "#  24.0088,  0.00001,   -0.000,   0.0117, 03 Oct 2012 20:59:04\r\n"
        
        """
        use for test p 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python protocol_param_dict.ProtocolParameterDict类代码示例发布时间:2022-05-27
下一篇:
Python port_agent_client.PortAgentClient类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap