• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python port_agent_client.PortAgentClient类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mi.core.instrument.port_agent_client.PortAgentClient的典型用法代码示例。如果您正苦于以下问题:Python PortAgentClient类的具体用法?Python PortAgentClient怎么用?Python PortAgentClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了PortAgentClient类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_start_pa_client_lost_port_agent_rx

    def test_start_pa_client_lost_port_agent_rx(self):
        """
        This test starts the port agent and then stops the port agent and
        verifies that the error callback was called (because the listener
        is the only one that will see the error, since there is no send
        operation).
        """

        self.resetTestVars()

        self.init_instrument_simulator()
        self.startPortAgent()

        pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
        pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)

        try:
            self.stop_port_agent()

        except InstrumentConnectionException as e:
            log.error("Exception caught: %r" % e)

        time.sleep(5)

        # Assert that the error_callback was called.  At this moment the listener
        # is seeing the error first, and that does not call the exception, so
        # don't test for that yet.
        self.assertTrue(self.errorCallbackCalled)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:28,代码来源:test_port_agent_client.py


示例2: test_pa_client_retry

    def test_pa_client_retry(self):
        """
        Test that the port agent client will not continually try to recover
        when the port agent closes the connection gracefully because it has
        another client connected.
        """

        exception_raised = False
        self.resetTestVars()

        self.init_instrument_simulator()
        self.startPortAgent()
        time.sleep(2)

        # Start a TCP client that will connect to the data port; this sets up the
        # situation where the Port Agent will immediately close the connection
        # because it already has one
        self.tcp_client = TcpClient("localhost", self.data_port)
        time.sleep(2)

        pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)

        try:
            pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)
        except InstrumentConnectionException:
            exception_raised = True

        # Give it some time to retry
        time.sleep(4)

        self.assertTrue(exception_raised)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:31,代码来源:test_port_agent_client.py


示例3: test_port_agent_client_send

    def test_port_agent_client_send(self):
        ipaddr = "67.58.49.194"
        port = 4000
        paClient = PortAgentClient(self.ipaddr, self.port)
        # paClient = PortAgentClient(ipaddr, port)
        paClient.init_comms(self.myGotData)

        paClient.send("this is a test\n")
开发者ID:lytlej,项目名称:marine-integrations,代码行数:8,代码来源:test_port_agent_client.py


示例4: test_start_paClient_no_port_agent

    def test_start_paClient_no_port_agent(self):

        print "port agent client test begin"

        paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
        
        paClient.init_comms(self.myGotData, self.myGotRaw, self.myGotError)
        
        self.assertTrue(self.errorCallbackCalled)
开发者ID:lukecampbell,项目名称:marine-integrations,代码行数:9,代码来源:test_port_agent_client.py


示例5: test_start_paClient_lost_port_agent_tx

    def test_start_paClient_lost_port_agent_tx(self):
        """
        This test starts the port agent and then starts the port agent client
        in a special way that will not start the listener thread.  This will
        guarantee that the send context is the one the sees the error.
        """

        self.resetTestVars()

        self.init_instrument_simulator()
        self.startPortAgent()

        paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)

        """
        Give the port agent time to initialize
        """
        time.sleep(5)
        
        paClient.init_comms(self.myGotData, self.myGotRaw, self.myGotError, self.myGotListenerError, start_listener = False)
        
        try:
            self.stop_port_agent()    
            data = "this big ol' test should cause send context to fail"
            paClient.send(data)
        
            time.sleep(1)

        except InstrumentConnectionException as e:
            log.error("Exception caught: %r" % (e))
            exceptionCaught = True
            
        else:
            exceptionCaught = False
        
        time.sleep(5)
    
        """
        Assert that the error_callback was called.  For this test the listener
        should not be running, so the send context should see the error, and that
        should throw an exception.  Assert that the callback WAS called and that
        an exception WAS thrown.
        """
        self.assertTrue(self.errorCallbackCalled)        
        self.assertTrue(exceptionCaught)
开发者ID:deverett,项目名称:marine-integrations,代码行数:45,代码来源:test_port_agent_client.py


示例6: test_pa_client_rx_heartbeat

    def test_pa_client_rx_heartbeat(self):
        """
        Test that the port agent can send heartbeats when the pa_client has
        a heartbeat_interval of 0.  The port_agent_config() method above
        sets the heartbeat interval.
        """

        self.resetTestVars()

        self.init_instrument_simulator()
        self.startPortAgent()
        time.sleep(5)

        pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
        pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)

        time.sleep(10)

        self.assertFalse(self.errorCallbackCalled)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:19,代码来源:test_port_agent_client.py


示例7: test_start_paClient_with_port_agent

    def test_start_paClient_with_port_agent(self):

        self.init_instrument_simulator()
        self.startPortAgent()

        paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
        
        paClient.init_comms(self.myGotData, self.myGotRaw, self.myGotError)
        
        data = "this is a great big test"
        paClient.send(data)
        
        time.sleep(1)

        self._instrument_simulator.send(data)
        
        time.sleep(5)

        paClient.stop_comms()
        
        self.assertTrue(self.rawCallbackCalled)
        self.assertTrue(self.dataCallbackCalled)
开发者ID:lukecampbell,项目名称:marine-integrations,代码行数:22,代码来源:test_port_agent_client.py


示例8: test_start_paClient_with_port_agent

    def test_start_paClient_with_port_agent(self):

        self.resetTestVars()
        
        self.init_instrument_simulator()
        self.startPortAgent()

        paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)

        try:        
            paClient.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)
        
        except InstrumentConnectionException as e:
            log.error("Exception caught: %r" % (e))
            exceptionCaught = True
            
        else:
            exceptionCaught = False
        
            data = "this is a great big test"
            paClient.send(data)
        
            time.sleep(1)
    
            self._instrument_simulator.send(data)
            
            time.sleep(5)
    
        paClient.stop_comms()

        """
        Assert that the error_callback was not called, that an exception was not
        caught, and that the data and raw callbacks were called.
        """
        self.assertFalse(self.errorCallbackCalled)        
        self.assertFalse(exceptionCaught)
        self.assertTrue(self.rawCallbackCalled)
        self.assertTrue(self.dataCallbackCalled)
开发者ID:deverett,项目名称:marine-integrations,代码行数:38,代码来源:test_port_agent_client.py


示例9: test_callback_error

    def test_callback_error(self):
        paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
        
        """
        Mock up the init_comms method; the callback_error will try to invoke
        it, which will try to connect to the port_agent
        """
        mock_init_comms = Mock(spec = "init_comms")
        paClient.init_comms = mock_init_comms
        
        """
        Test that True is returned because the callback will try a recovery, 
        and it doesn't matter at that point that there is no higher-level
        callback registered.  Also assert that mock_init_comms was called.
        """
        retValue = paClient.callback_error("This is a great big error")
        self.assertTrue(retValue)
        self.assertTrue(len(mock_init_comms.mock_calls) == 1)

        """
        Now call callback_error again.  This time it should return False
        because no higher-level callback has been registered.  Also assert
        that mock_init_calls hasn't been called again (still is 1).
        """
        retValue = paClient.callback_error("This is a big boo boo")
        self.assertFalse(retValue)
        self.assertTrue(len(mock_init_comms.mock_calls) == 1)

        """
        Now call again with a callback registered; assert that the retValue
        is True (callback registered), that mock_init_comms is still 1, and
        that the higher-level callback was called.
        """
        self.resetTestVars()
        paClient.user_callback_error = self.myGotError
        retValue = paClient.callback_error("Another big boo boo")
        self.assertTrue(retValue)
        self.assertTrue(len(mock_init_comms.mock_calls) == 1)
        self.assertTrue(self.errorCallbackCalled == 1)
开发者ID:lukecampbell,项目名称:marine-integrations,代码行数:39,代码来源:test_port_agent_client.py


示例10: test_start_pa_client_lost_port_agent_tx_rx

    def test_start_pa_client_lost_port_agent_tx_rx(self):
        """
        This test starts the port agent and the instrument_simulator and
        tests that data is sent and received first; then it stops the port
        agent and tests that the error_callback was called.
        """

        self.resetTestVars()

        self.init_instrument_simulator()
        self.startPortAgent()

        pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
        pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)

        # Now send some data; there should be no errors.
        try:
            data = "this is a great big test"
            pa_client.send(data)

            time.sleep(1)

            self._instrument_simulator.send(data)

        except InstrumentConnectionException as e:
            log.error("Exception caught: %r" % e)
            exception_caught = True

        else:
            exception_caught = False

        time.sleep(1)

        # Assert that the error_callback was NOT called, that an exception was NOT
        # caught, and that the data and raw callbacks WERE called.
        self.assertFalse(self.errorCallbackCalled)
        self.assertFalse(exception_caught)
        self.assertTrue(self.rawCallbackCalled)
        self.assertTrue(self.dataCallbackCalled)

        # Now reset the test variables and try again; this time after stopping
        # the port agent.  Should be errors
        self.resetTestVars()

        try:
            self.stop_port_agent()
            log.debug("Port agent stopped")
            data = "this is another great big test"
            pa_client.send(data)

            time.sleep(1)

            log.debug("Sending from simulator")
            self._instrument_simulator.send(data)

        except InstrumentConnectionException as e:
            log.error("Exception caught: %r" % e)

        time.sleep(5)

        # Assert that the error_callback WAS called.  The listener usually
        # is seeing the error first, and that does not call the exception, so
        # only assert that the error callback was called.
        self.assertTrue(self.errorCallbackCalled)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:64,代码来源:test_port_agent_client.py


示例11: test_start_pa_client_no_port_agent_big_data

    def test_start_pa_client_no_port_agent_big_data(self):

        self.resetTestVars()

        logging.getLogger('mi.core.instrument.port_agent_client').setLevel(logging.DEBUG)

        # I put this in here because PortAgentPacket cannot make a new packet
        # with a valid checksum.
        def makepacket(msgtype, timestamp, data):
            from struct import Struct

            SYNC = (0xA3, 0x9D, 0x7A)
            HEADER_FORMAT = "!BBBBHHd"
            header_struct = Struct(HEADER_FORMAT)
            HEADER_SIZE = header_struct.size

            def calculate_checksum(data, seed=0):
                n = seed
                for datum in data:
                    n ^= datum
                return n

            def pack_header(buf, msgtype, pktsize, checksum, timestamp):
                sync1, sync2, sync3 = SYNC
                header_struct.pack_into(buf, 0, sync1, sync2, sync3, msgtype, pktsize,
                                        checksum, timestamp)

            pktsize = HEADER_SIZE + len(data)
            pkt = bytearray(pktsize)
            pack_header(pkt, msgtype, pktsize, 0, timestamp)
            pkt[HEADER_SIZE:] = data
            checksum = calculate_checksum(pkt)
            pack_header(pkt, msgtype, pktsize, checksum, timestamp)
            return pkt

        # Make a BIG packet
        data = "A" * (2 ** 16 - HEADER_SIZE - 1)
        txpkt = makepacket(PortAgentPacket.DATA_FROM_INSTRUMENT, 0.0, data)

        def handle(sock, addr):
            # Send it in pieces
            sock.sendall(txpkt[:1500])
            time.sleep(1)
            sock.sendall(txpkt[1500:])
            time.sleep(10)

        import gevent.server

        dataserver = gevent.server.StreamServer((self.ipaddr, self.data_port), handle)
        cmdserver = gevent.server.StreamServer((self.ipaddr, self.cmd_port), lambda x, y: None)

        pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)

        try:
            dataserver.start()
            cmdserver.start()
            pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)

        except InstrumentConnectionException as e:
            log.error("Exception caught: %r" % e)
            raise

        else:
            time.sleep(5)

        finally:
            pa_client.stop_comms()
            dataserver.kill()
            cmdserver.kill()

        # Assert that the error_callback was not called, that an exception was not
        # caught, and that the data and raw callbacks were called.
        self.assertFalse(self.errorCallbackCalled)
        self.assertTrue(self.rawCallbackCalled)
        self.assertTrue(self.dataCallbackCalled)

        self.assertEquals(self.pa_packet.get_data_length(), len(data))
        self.assertEquals(len(self.pa_packet.get_data()), len(data))
        # don't use assertEquals b/c it will print 64kb
        self.assert_(self.pa_packet.get_data() == data)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:80,代码来源:test_port_agent_client.py


示例12: test_port_agent_client_receive

 def test_port_agent_client_receive(self):
     ipaddr = "67.58.49.194"
     port = 4000
     paClient = PortAgentClient(self.ipaddr, self.port)
     # paClient = PortAgentClient(ipaddr, port)
     paClient.init_comms(self.myGotData)
开发者ID:lytlej,项目名称:marine-integrations,代码行数:6,代码来源:test_port_agent_client.py



注:本文中的mi.core.instrument.port_agent_client.PortAgentClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap