本文整理汇总了Python中mi.core.instrument.port_agent_client.PortAgentClient类的典型用法代码示例。如果您正苦于以下问题:Python PortAgentClient类的具体用法?Python PortAgentClient怎么用?Python PortAgentClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了PortAgentClient类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_start_pa_client_lost_port_agent_rx
def test_start_pa_client_lost_port_agent_rx(self):
"""
This test starts the port agent and then stops the port agent and
verifies that the error callback was called (because the listener
is the only one that will see the error, since there is no send
operation).
"""
self.resetTestVars()
self.init_instrument_simulator()
self.startPortAgent()
pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)
try:
self.stop_port_agent()
except InstrumentConnectionException as e:
log.error("Exception caught: %r" % e)
time.sleep(5)
# Assert that the error_callback was called. At this moment the listener
# is seeing the error first, and that does not call the exception, so
# don't test for that yet.
self.assertTrue(self.errorCallbackCalled)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:28,代码来源:test_port_agent_client.py
示例2: test_pa_client_retry
def test_pa_client_retry(self):
"""
Test that the port agent client will not continually try to recover
when the port agent closes the connection gracefully because it has
another client connected.
"""
exception_raised = False
self.resetTestVars()
self.init_instrument_simulator()
self.startPortAgent()
time.sleep(2)
# Start a TCP client that will connect to the data port; this sets up the
# situation where the Port Agent will immediately close the connection
# because it already has one
self.tcp_client = TcpClient("localhost", self.data_port)
time.sleep(2)
pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
try:
pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)
except InstrumentConnectionException:
exception_raised = True
# Give it some time to retry
time.sleep(4)
self.assertTrue(exception_raised)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:31,代码来源:test_port_agent_client.py
示例3: test_port_agent_client_send
def test_port_agent_client_send(self):
ipaddr = "67.58.49.194"
port = 4000
paClient = PortAgentClient(self.ipaddr, self.port)
# paClient = PortAgentClient(ipaddr, port)
paClient.init_comms(self.myGotData)
paClient.send("this is a test\n")
开发者ID:lytlej,项目名称:marine-integrations,代码行数:8,代码来源:test_port_agent_client.py
示例4: test_start_paClient_no_port_agent
def test_start_paClient_no_port_agent(self):
print "port agent client test begin"
paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
paClient.init_comms(self.myGotData, self.myGotRaw, self.myGotError)
self.assertTrue(self.errorCallbackCalled)
开发者ID:lukecampbell,项目名称:marine-integrations,代码行数:9,代码来源:test_port_agent_client.py
示例5: test_start_paClient_lost_port_agent_tx
def test_start_paClient_lost_port_agent_tx(self):
"""
This test starts the port agent and then starts the port agent client
in a special way that will not start the listener thread. This will
guarantee that the send context is the one the sees the error.
"""
self.resetTestVars()
self.init_instrument_simulator()
self.startPortAgent()
paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
"""
Give the port agent time to initialize
"""
time.sleep(5)
paClient.init_comms(self.myGotData, self.myGotRaw, self.myGotError, self.myGotListenerError, start_listener = False)
try:
self.stop_port_agent()
data = "this big ol' test should cause send context to fail"
paClient.send(data)
time.sleep(1)
except InstrumentConnectionException as e:
log.error("Exception caught: %r" % (e))
exceptionCaught = True
else:
exceptionCaught = False
time.sleep(5)
"""
Assert that the error_callback was called. For this test the listener
should not be running, so the send context should see the error, and that
should throw an exception. Assert that the callback WAS called and that
an exception WAS thrown.
"""
self.assertTrue(self.errorCallbackCalled)
self.assertTrue(exceptionCaught)
开发者ID:deverett,项目名称:marine-integrations,代码行数:45,代码来源:test_port_agent_client.py
示例6: test_pa_client_rx_heartbeat
def test_pa_client_rx_heartbeat(self):
"""
Test that the port agent can send heartbeats when the pa_client has
a heartbeat_interval of 0. The port_agent_config() method above
sets the heartbeat interval.
"""
self.resetTestVars()
self.init_instrument_simulator()
self.startPortAgent()
time.sleep(5)
pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)
time.sleep(10)
self.assertFalse(self.errorCallbackCalled)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:19,代码来源:test_port_agent_client.py
示例7: test_start_paClient_with_port_agent
def test_start_paClient_with_port_agent(self):
self.init_instrument_simulator()
self.startPortAgent()
paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
paClient.init_comms(self.myGotData, self.myGotRaw, self.myGotError)
data = "this is a great big test"
paClient.send(data)
time.sleep(1)
self._instrument_simulator.send(data)
time.sleep(5)
paClient.stop_comms()
self.assertTrue(self.rawCallbackCalled)
self.assertTrue(self.dataCallbackCalled)
开发者ID:lukecampbell,项目名称:marine-integrations,代码行数:22,代码来源:test_port_agent_client.py
示例8: test_start_paClient_with_port_agent
def test_start_paClient_with_port_agent(self):
self.resetTestVars()
self.init_instrument_simulator()
self.startPortAgent()
paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
try:
paClient.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)
except InstrumentConnectionException as e:
log.error("Exception caught: %r" % (e))
exceptionCaught = True
else:
exceptionCaught = False
data = "this is a great big test"
paClient.send(data)
time.sleep(1)
self._instrument_simulator.send(data)
time.sleep(5)
paClient.stop_comms()
"""
Assert that the error_callback was not called, that an exception was not
caught, and that the data and raw callbacks were called.
"""
self.assertFalse(self.errorCallbackCalled)
self.assertFalse(exceptionCaught)
self.assertTrue(self.rawCallbackCalled)
self.assertTrue(self.dataCallbackCalled)
开发者ID:deverett,项目名称:marine-integrations,代码行数:38,代码来源:test_port_agent_client.py
示例9: test_callback_error
def test_callback_error(self):
paClient = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
"""
Mock up the init_comms method; the callback_error will try to invoke
it, which will try to connect to the port_agent
"""
mock_init_comms = Mock(spec = "init_comms")
paClient.init_comms = mock_init_comms
"""
Test that True is returned because the callback will try a recovery,
and it doesn't matter at that point that there is no higher-level
callback registered. Also assert that mock_init_comms was called.
"""
retValue = paClient.callback_error("This is a great big error")
self.assertTrue(retValue)
self.assertTrue(len(mock_init_comms.mock_calls) == 1)
"""
Now call callback_error again. This time it should return False
because no higher-level callback has been registered. Also assert
that mock_init_calls hasn't been called again (still is 1).
"""
retValue = paClient.callback_error("This is a big boo boo")
self.assertFalse(retValue)
self.assertTrue(len(mock_init_comms.mock_calls) == 1)
"""
Now call again with a callback registered; assert that the retValue
is True (callback registered), that mock_init_comms is still 1, and
that the higher-level callback was called.
"""
self.resetTestVars()
paClient.user_callback_error = self.myGotError
retValue = paClient.callback_error("Another big boo boo")
self.assertTrue(retValue)
self.assertTrue(len(mock_init_comms.mock_calls) == 1)
self.assertTrue(self.errorCallbackCalled == 1)
开发者ID:lukecampbell,项目名称:marine-integrations,代码行数:39,代码来源:test_port_agent_client.py
示例10: test_start_pa_client_lost_port_agent_tx_rx
def test_start_pa_client_lost_port_agent_tx_rx(self):
"""
This test starts the port agent and the instrument_simulator and
tests that data is sent and received first; then it stops the port
agent and tests that the error_callback was called.
"""
self.resetTestVars()
self.init_instrument_simulator()
self.startPortAgent()
pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)
# Now send some data; there should be no errors.
try:
data = "this is a great big test"
pa_client.send(data)
time.sleep(1)
self._instrument_simulator.send(data)
except InstrumentConnectionException as e:
log.error("Exception caught: %r" % e)
exception_caught = True
else:
exception_caught = False
time.sleep(1)
# Assert that the error_callback was NOT called, that an exception was NOT
# caught, and that the data and raw callbacks WERE called.
self.assertFalse(self.errorCallbackCalled)
self.assertFalse(exception_caught)
self.assertTrue(self.rawCallbackCalled)
self.assertTrue(self.dataCallbackCalled)
# Now reset the test variables and try again; this time after stopping
# the port agent. Should be errors
self.resetTestVars()
try:
self.stop_port_agent()
log.debug("Port agent stopped")
data = "this is another great big test"
pa_client.send(data)
time.sleep(1)
log.debug("Sending from simulator")
self._instrument_simulator.send(data)
except InstrumentConnectionException as e:
log.error("Exception caught: %r" % e)
time.sleep(5)
# Assert that the error_callback WAS called. The listener usually
# is seeing the error first, and that does not call the exception, so
# only assert that the error callback was called.
self.assertTrue(self.errorCallbackCalled)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:64,代码来源:test_port_agent_client.py
示例11: test_start_pa_client_no_port_agent_big_data
def test_start_pa_client_no_port_agent_big_data(self):
self.resetTestVars()
logging.getLogger('mi.core.instrument.port_agent_client').setLevel(logging.DEBUG)
# I put this in here because PortAgentPacket cannot make a new packet
# with a valid checksum.
def makepacket(msgtype, timestamp, data):
from struct import Struct
SYNC = (0xA3, 0x9D, 0x7A)
HEADER_FORMAT = "!BBBBHHd"
header_struct = Struct(HEADER_FORMAT)
HEADER_SIZE = header_struct.size
def calculate_checksum(data, seed=0):
n = seed
for datum in data:
n ^= datum
return n
def pack_header(buf, msgtype, pktsize, checksum, timestamp):
sync1, sync2, sync3 = SYNC
header_struct.pack_into(buf, 0, sync1, sync2, sync3, msgtype, pktsize,
checksum, timestamp)
pktsize = HEADER_SIZE + len(data)
pkt = bytearray(pktsize)
pack_header(pkt, msgtype, pktsize, 0, timestamp)
pkt[HEADER_SIZE:] = data
checksum = calculate_checksum(pkt)
pack_header(pkt, msgtype, pktsize, checksum, timestamp)
return pkt
# Make a BIG packet
data = "A" * (2 ** 16 - HEADER_SIZE - 1)
txpkt = makepacket(PortAgentPacket.DATA_FROM_INSTRUMENT, 0.0, data)
def handle(sock, addr):
# Send it in pieces
sock.sendall(txpkt[:1500])
time.sleep(1)
sock.sendall(txpkt[1500:])
time.sleep(10)
import gevent.server
dataserver = gevent.server.StreamServer((self.ipaddr, self.data_port), handle)
cmdserver = gevent.server.StreamServer((self.ipaddr, self.cmd_port), lambda x, y: None)
pa_client = PortAgentClient(self.ipaddr, self.data_port, self.cmd_port)
try:
dataserver.start()
cmdserver.start()
pa_client.init_comms(self.myGotData, self.myGotRaw, self.myGotListenerError, self.myGotError)
except InstrumentConnectionException as e:
log.error("Exception caught: %r" % e)
raise
else:
time.sleep(5)
finally:
pa_client.stop_comms()
dataserver.kill()
cmdserver.kill()
# Assert that the error_callback was not called, that an exception was not
# caught, and that the data and raw callbacks were called.
self.assertFalse(self.errorCallbackCalled)
self.assertTrue(self.rawCallbackCalled)
self.assertTrue(self.dataCallbackCalled)
self.assertEquals(self.pa_packet.get_data_length(), len(data))
self.assertEquals(len(self.pa_packet.get_data()), len(data))
# don't use assertEquals b/c it will print 64kb
self.assert_(self.pa_packet.get_data() == data)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:80,代码来源:test_port_agent_client.py
示例12: test_port_agent_client_receive
def test_port_agent_client_receive(self):
ipaddr = "67.58.49.194"
port = 4000
paClient = PortAgentClient(self.ipaddr, self.port)
# paClient = PortAgentClient(ipaddr, port)
paClient.init_comms(self.myGotData)
开发者ID:lytlej,项目名称:marine-integrations,代码行数:6,代码来源:test_port_agent_client.py
注:本文中的mi.core.instrument.port_agent_client.PortAgentClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论