本文整理汇总了Python中mi.core.instrument.port_agent_client.PortAgentPacket类的典型用法代码示例。如果您正苦于以下问题:Python PortAgentPacket类的具体用法?Python PortAgentPacket怎么用?Python PortAgentPacket使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了PortAgentPacket类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_valid_complete_sample
def test_valid_complete_sample(self):
"""
Create a mock port agent
"""
mock_port_agent = Mock(spec=LoggerClient)
"""
Instantiate the driver class directly (no driver client, no driver
client, no zmq driver process, no driver process; just own the driver)
"""
test_driver = ooicoreInstrumentDriver(self.my_event_callback)
current_state = test_driver.get_resource_state()
self.assertEqual(current_state, DriverConnectionState.UNCONFIGURED)
"""
Now configure the driver with the mock_port_agent, verifying
that the driver transitions to the DISCONNECTED state
"""
config = {"mock_port_agent": mock_port_agent}
test_driver.configure(config=config)
current_state = test_driver.get_resource_state()
self.assertEqual(current_state, DriverConnectionState.DISCONNECTED)
"""
Invoke the connect method of the driver: should connect to mock
port agent. Verify that the connection FSM transitions to CONNECTED,
(which means that the FSM should now be reporting the ProtocolState).
"""
test_driver.connect()
current_state = test_driver.get_resource_state()
self.assertEqual(current_state, DriverProtocolState.UNKNOWN)
"""
Force the driver into AUTOSAMPLE state so that it will parse and
publish samples
"""
test_driver.set_test_mode(True)
test_driver.test_force_state(state=DriverProtocolState.AUTOSAMPLE)
current_state = test_driver.get_resource_state()
self.assertEqual(current_state, DriverProtocolState.AUTOSAMPLE)
"""
- Reset test verification variables.
- Construct a complete sample
- Pass to got_data()
- Verify that raw and parsed streams have been received
"""
self.reset_test_vars()
packet = PortAgentPacket()
header = "\xa3\x9d\x7a\x02\x00\x1c\x0b\x2e\x00\x00\x00\x01\x80\x00\x00\x00"
packet.unpack_header(header)
packet.attach_data(FULL_SAMPLE)
test_driver._protocol.got_data(packet)
self.assertTrue(self.raw_stream_received)
self.assertTrue(self.parsed_stream_received)
开发者ID:sfoley,项目名称:marine-integrations,代码行数:58,代码来源:test_driver.py
示例2: _get_header
def _get_header(buffer):
packet = PortAgentPacket()
if(len(buffer) < HEADER_SIZE): return None
header = buffer[0:HEADER_SIZE]
packet.unpack_header(header)
if(packet.get_data_length() < 0): return None
return packet
开发者ID:JeffRoy,项目名称:marine-integrations,代码行数:10,代码来源:watch_data_log.py
示例3: _get_header
def _get_header(buffer):
packet = PortAgentPacket()
if(len(buffer) < HEADER_SIZE): return None
header = buffer[0:HEADER_SIZE]
packet.unpack_header(header)
if(packet.get_data_length() < 0): return None
print "time: %f" % packet.get_timestamp()
return packet
开发者ID:JeffRoy,项目名称:marine-integrations,代码行数:12,代码来源:cat_data_log.py
示例4: setUp
def setUp(self):
"""
"""
#self.start_couchdb() # appease the pyon gods
self.sample_port_timestamp = 3555423720.711772
self.sample_driver_timestamp = 3555423721.711772
self.sample_internal_timestamp = 3555423719.711772
self.sample_raw_data = "SATPAR0229,10.01,2206748544,234"
self.parsed_test_particle = self.TestDataParticle(self.sample_raw_data,
port_timestamp=self.sample_port_timestamp,
quality_flag=DataParticleValue.INVALID,
preferred_timestamp=DataParticleKey.DRIVER_TIMESTAMP)
self.sample_parsed_particle = {
DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
DataParticleKey.STREAM_NAME: TEST_PARTICLE_TYPE,
DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.DRIVER_TIMESTAMP,
DataParticleKey.QUALITY_FLAG: DataParticleValue.INVALID,
DataParticleKey.VALUES: [
{DataParticleKey.VALUE_ID: "temp",
DataParticleKey.VALUE: "23.45"},
{DataParticleKey.VALUE_ID: "cond",
DataParticleKey.VALUE: "15.9"},
{DataParticleKey.VALUE_ID: "depth",
DataParticleKey.VALUE: "305.16"}
]
}
self.port_agent_packet = PortAgentPacket()
self.port_agent_packet.attach_data(self.sample_raw_data)
self.port_agent_packet.pack_header()
self.raw_test_particle = RawDataParticle(self.port_agent_packet.get_as_dict(),
port_timestamp=self.sample_port_timestamp,
internal_timestamp=self.sample_internal_timestamp)
self.sample_raw_particle = {
DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
DataParticleKey.STREAM_NAME: CommonDataParticleType.RAW,
DataParticleKey.INTERNAL_TIMESTAMP: self.sample_internal_timestamp,
DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.PORT_TIMESTAMP,
DataParticleKey.QUALITY_FLAG: "ok",
DataParticleKey.VALUES: [
{ DataParticleKey.VALUE_ID: "raw",
DataParticleKey.VALUE: base64.b64encode(self.sample_raw_data),
"binary": True },
{ DataParticleKey.VALUE_ID: "length",
DataParticleKey.VALUE: int(31)},
{ DataParticleKey.VALUE_ID: "type",
DataParticleKey.VALUE: int(2)},
{ DataParticleKey.VALUE_ID: "checksum",
DataParticleKey.VALUE: int(2392)},
]
}
开发者ID:s-pearce,项目名称:marine-integrations,代码行数:60,代码来源:test_data_particle.py
示例5: _create_port_agent_packet
def _create_port_agent_packet(self, data_item):
ts = ntplib.system_to_ntp_time(time.time())
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data(data_item)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
return port_agent_packet
开发者ID:danmergens,项目名称:mi-instrument,代码行数:7,代码来源:test_driver.py
示例6: test_unpack_header
def test_unpack_header(self):
self.pap = PortAgentPacket()
data_length = 32
data = self.pap.unpack_header(array.array('B', [163, 157, 122, 2, 0, data_length + HEADER_SIZE, 14, 145, 65, 234, 142, 154, 23, 155, 51, 51]))
got_timestamp = self.pap.get_timestamp()
self.assertEqual(self.pap.get_header_type(), self.pap.DATA_FROM_DRIVER)
self.assertEqual(self.pap.get_data_length(), data_length)
self.assertEqual(got_timestamp, 1105890970.110589)
self.assertEqual(self.pap.get_header_recv_checksum(), 3729)
开发者ID:deverett,项目名称:marine-integrations,代码行数:10,代码来源:test_port_agent_client.py
示例7: test_unpack_header
def test_unpack_header(self):
self.pap = PortAgentPacket()
data = self.pap.unpack_header(array.array('B', [163, 157, 122, 2, 0, 48, 14, 145, 65, 234, 142, 154, 23, 155, 51, 51]))
self.assertEqual(self.pap.get_header_type(), 2)
self.assertEqual(self.pap.get_header_length(), 32)
self.assertEqual(time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.localtime(self.ntp_to_system_time(self.pap.get_timestamp()))), "Thu, 13 Dec 2012 14:10:04 +0000")
self.assertEqual(self.pap.get_header_recv_checksum(), 3729) #@TODO Probably should wire in one of these checksums.
self.assertEqual(self.pap.get_header_checksum(), None)
pass
开发者ID:lukecampbell,项目名称:marine-integrations,代码行数:10,代码来源:test_port_agent_client.py
示例8: send_port_agent_packet
def send_port_agent_packet(protocol, data):
ts = ntplib.system_to_ntp_time(time.time())
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data(data)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
# Push the response into the driver
protocol.got_data(port_agent_packet)
protocol.got_raw(port_agent_packet)
log.debug('Sent port agent packet containing: %r', data)
开发者ID:danmergens,项目名称:marine-integrations,代码行数:11,代码来源:test_driver.py
示例9: _send_port_agent_packet
def _send_port_agent_packet(self, driver, data):
"""
Send the supplied data to the driver in a port agent packet
@param driver: instrument driver instance
@param data: data to be sent
"""
ts = ntplib.system_to_ntp_time(time.time())
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data(data)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
# Push the response into the driver
driver._protocol.got_data(port_agent_packet)
开发者ID:kehunt06,项目名称:mi-instrument,代码行数:14,代码来源:test_driver.py
示例10: _send_port_agent_packet
def _send_port_agent_packet(self, driver, data):
"""
Send the supplied data to the driver in a port agent packet
@param driver: instrument driver instance
@param data: data to be sent
"""
ts = ntplib.system_to_ntp_time(time.time())
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data(data)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
# Push the response into the driver
driver._protocol.got_data(port_agent_packet)
# sleep briefly, as some state changes happen asynchronously, this should give those
# threads time to finish
time.sleep(.01)
开发者ID:danmergens,项目名称:mi-instrument,代码行数:17,代码来源:test_driver.py
示例11: test_dump_settings_response
def test_dump_settings_response(self):
"""
"""
mock_port_agent = Mock(spec=PortAgentClient)
driver = InstrumentDriver(self._got_data_event_callback)
driver.set_test_mode(True)
current_state = driver.get_resource_state()
self.assertEqual(current_state, DriverConnectionState.UNCONFIGURED)
# Now configure the driver with the mock_port_agent, verifying
# that the driver transitions to that state
config = {'mock_port_agent' : mock_port_agent}
driver.configure(config = config)
current_state = driver.get_resource_state()
self.assertEqual(current_state, DriverConnectionState.DISCONNECTED)
# Invoke the connect method of the driver: should connect to mock
# port agent. Verify that the connection FSM transitions to CONNECTED,
# (which means that the FSM should now be reporting the ProtocolState).
driver.connect()
current_state = driver.get_resource_state()
self.assertEqual(current_state, DriverProtocolState.UNKNOWN)
# Force the instrument into a known state
self.assert_force_state(driver, DriverProtocolState.COMMAND)
ts = ntplib.system_to_ntp_time(time.time())
log.debug("DUMP_SETTINGS command response: %s", DUMP_COMMAND_RESPONSE)
# Create and populate the port agent packet.
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data(DUMP_COMMAND_RESPONSE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
# Push the response into the driver
driver._protocol.got_data(port_agent_packet)
response = driver._protocol._get_response(expected_prompt =
NANO_DUMP_SETTINGS)
self.assertTrue(isinstance(response[1], NANOCommandResponse))
开发者ID:StevenMyerson,项目名称:marine-integrations,代码行数:42,代码来源:test_driver.py
示例12: TestUnitDataParticle
class TestUnitDataParticle(MiUnitTestCase):
"""
Test cases for the DataParticleGenerator class. Functions in this class
provide unit tests and provide a tutorial on use of
the data particle generator interface.
"""
class TestDataParticle(DataParticle):
"""
Simple test DataParticle derivative that returns fixed parsed
data values
@retval Value list for parased data set [{'value_id': foo,
'value': bar}, ...]
"""
_data_particle_type = TEST_PARTICLE_TYPE
def _build_parsed_values(self):
result = [{DataParticleKey.VALUE_ID: "temp",
DataParticleKey.VALUE: "23.45"},
{DataParticleKey.VALUE_ID: "cond",
DataParticleKey.VALUE: "15.9"},
{DataParticleKey.VALUE_ID: "depth",
DataParticleKey.VALUE: "305.16"}]
return result
class BadDataParticle(DataParticle):
"""
Define a data particle that doesn't initialize _data_particle_type.
Should raise an exception when _data_particle_type isn't initialized
"""
pass
def setUp(self):
"""
"""
#self.start_couchdb() # appease the pyon gods
self.sample_port_timestamp = 3555423720.711772
self.sample_driver_timestamp = 3555423721.711772
self.sample_internal_timestamp = 3555423719.711772
self.sample_raw_data = "SATPAR0229,10.01,2206748544,234"
self.parsed_test_particle = self.TestDataParticle(self.sample_raw_data,
port_timestamp=self.sample_port_timestamp,
quality_flag=DataParticleValue.INVALID,
preferred_timestamp=DataParticleKey.DRIVER_TIMESTAMP)
self.sample_parsed_particle = {
DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
DataParticleKey.STREAM_NAME: TEST_PARTICLE_TYPE,
DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.DRIVER_TIMESTAMP,
DataParticleKey.QUALITY_FLAG: DataParticleValue.INVALID,
DataParticleKey.VALUES: [
{DataParticleKey.VALUE_ID: "temp",
DataParticleKey.VALUE: "23.45"},
{DataParticleKey.VALUE_ID: "cond",
DataParticleKey.VALUE: "15.9"},
{DataParticleKey.VALUE_ID: "depth",
DataParticleKey.VALUE: "305.16"}
]
}
self.port_agent_packet = PortAgentPacket()
self.port_agent_packet.attach_data(self.sample_raw_data)
self.port_agent_packet.pack_header()
self.raw_test_particle = RawDataParticle(self.port_agent_packet.get_as_dict(),
port_timestamp=self.sample_port_timestamp,
internal_timestamp=self.sample_internal_timestamp)
self.sample_raw_particle = {
DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
DataParticleKey.STREAM_NAME: CommonDataParticleType.RAW,
DataParticleKey.INTERNAL_TIMESTAMP: self.sample_internal_timestamp,
DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.PORT_TIMESTAMP,
DataParticleKey.QUALITY_FLAG: "ok",
DataParticleKey.VALUES: [
{ DataParticleKey.VALUE_ID: "raw",
DataParticleKey.VALUE: base64.b64encode(self.sample_raw_data),
"binary": True },
{ DataParticleKey.VALUE_ID: "length",
DataParticleKey.VALUE: int(31)},
{ DataParticleKey.VALUE_ID: "type",
DataParticleKey.VALUE: int(2)},
{ DataParticleKey.VALUE_ID: "checksum",
DataParticleKey.VALUE: int(2392)},
]
}
def test_parsed_generate(self):
"""
Test generation of a data particle
"""
# Create some sample data as a param dict
# Submit it to a data particle generator with a timestamp
#.........这里部分代码省略.........
开发者ID:s-pearce,项目名称:marine-integrations,代码行数:101,代码来源:test_data_particle.py
示例13: setUp
def setUp(self):
self.pap = PortAgentPacket()
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:2,代码来源:test_port_agent_client.py
示例14: PAClientTestPortAgentPacket
class PAClientTestPortAgentPacket(MiUnitTest):
@staticmethod
def ntp_to_system_time(date):
"""convert a NTP time to system time"""
return date - NTP_DELTA
@staticmethod
def system_to_ntp_time(date):
"""convert a system time to a NTP time"""
return date + NTP_DELTA
def setUp(self):
self.pap = PortAgentPacket()
# self.test_time = time.time()
# self.ntp_time = self.system_to_ntp_time(self.test_time)
# self.pap.set_timestamp(self.ntp_time)
def test_pack_header(self):
test_data = "Only the length of this matters?"
test_data_length = len(test_data)
self.pap.attach_data(test_data)
self.pap.pack_header()
self.assertEqual(self.pap.get_data_length(), test_data_length)
def test_get_length(self):
test_length = 100
self.pap.set_data_length(test_length)
got_length = self.pap.get_data_length()
self.assertEqual(got_length, test_length)
def test_checksum(self):
"""
This tests the checksum algorithm; if somebody changes the algorithm
this test should catch it. Had to jump through some hoops to do this;
needed to add set_data_length and set_header because we're building our
own header here (the one in PortAgentPacket includes the timestamp
so the checksum is not consistent).
"""
test_data = "This tests the checksum algorithm."
test_length = len(test_data)
self.pap.attach_data(test_data)
# Now build a header
variable_tuple = (0xa3, 0x9d, 0x7a, self.pap.DATA_FROM_DRIVER,
test_length + HEADER_SIZE, 0x0000,
0)
self.pap.set_data_length(test_length)
header_format = '>BBBBHHd'
size = struct.calcsize(header_format)
temp_header = ctypes.create_string_buffer(size)
struct.pack_into(header_format, temp_header, 0, *variable_tuple)
# Now set the header member in PortAgentPacket to the header
# we built
self.pap.set_header(temp_header.raw)
# Now get the checksum and verify it is what we expect it to be.
checksum = self.pap.calculate_checksum()
self.assertEqual(checksum, 2)
def test_unpack_header(self):
self.pap = PortAgentPacket()
data_length = 32
data = self.pap.unpack_header(array.array('B',
[163, 157, 122, 2, 0, data_length + HEADER_SIZE, 14, 145, 65, 234,
142, 154, 23, 155, 51, 51]))
got_timestamp = self.pap.get_timestamp()
self.assertEqual(self.pap.get_header_type(), self.pap.DATA_FROM_DRIVER)
self.assertEqual(self.pap.get_data_length(), data_length)
self.assertEqual(got_timestamp, 1105890970.092212)
self.assertEqual(self.pap.get_header_recv_checksum(), 3729)
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:75,代码来源:test_port_agent_client.py
示例15: test_handle_packet
def test_handle_packet(self):
"""
Test that a default PortAgentPacket creates a DATA_FROM_DRIVER packet,
and that the handle_packet method invokes the raw callback
"""
pa_listener = Listener(None, None, 0, 0, 5, self.myGotData, self.myGotRaw, self.myGotListenerError,
self.myGotError)
test_data = "This is a great big test"
self.resetTestVars()
pa_packet = PortAgentPacket()
pa_packet.attach_data(test_data)
pa_packet.pack_header()
pa_packet.verify_checksum()
pa_listener.handle_packet(pa_packet)
self.assertTrue(self.rawCallbackCalled)
###
# Test DATA_FROM_INSTRUMENT; handle_packet should invoke data and raw
# callbacks.
###
self.resetTestVars()
pa_packet = PortAgentPacket(PortAgentPacket.DATA_FROM_INSTRUMENT)
pa_packet.attach_data(test_data)
pa_packet.pack_header()
pa_packet.verify_checksum()
pa_listener.handle_packet(pa_packet)
self.assertTrue(self.rawCallbackCalled)
self.assertTrue(self.dataCallbackCalled)
self.assertFalse(self.errorCallbackCalled)
self.assertFalse(self.listenerCallbackCalled)
###
# Test PORT_AGENT_COMMAND; handle_packet should invoke raw callback.
###
self.resetTestVars()
pa_packet = PortAgentPacket(PortAgentPacket.PORT_AGENT_COMMAND)
pa_packet.attach_data(test_data)
pa_packet.pack_header()
pa_packet.verify_checksum()
pa_listener.handle_packet(pa_packet)
self.assertTrue(self.rawCallbackCalled)
self.assertFalse(self.dataCallbackCalled)
self.assertFalse(self.errorCallbackCalled)
self.assertFalse(self.listenerCallbackCalled)
###
# Test PORT_AGENT_STATUS; handle_packet should invoke raw callback.
###
self.resetTestVars()
pa_packet = PortAgentPacket(PortAgentPacket.PORT_AGENT_STATUS)
pa_packet.attach_data(test_data)
pa_packet.pack_header()
pa_packet.verify_checksum()
pa_listener.handle_packet(pa_packet)
self.assertTrue(self.rawCallbackCalled)
self.assertFalse(self.dataCallbackCalled)
self.assertFalse(self.errorCallbackCalled)
self.assertFalse(self.listenerCallbackCalled)
###
# Test PORT_AGENT_FAULT; handle_packet should invoke raw callback.
###
self.resetTestVars()
pa_packet = PortAgentPacket(PortAgentPacket.PORT_AGENT_FAULT)
pa_packet.attach_data(test_data)
pa_packet.pack_header()
pa_packet.verify_checksum()
pa_listener.handle_packet(pa_packet)
self.assertTrue(self.rawCallbackCalled)
self.assertFalse(self.dataCallbackCalled)
self.assertFalse(self.errorCallbackCalled)
self.assertFalse(self.listenerCallbackCalled)
###
# Test INSTRUMENT_COMMAND; handle_packet should invoke raw callback.
###
self.resetTestVars()
pa_packet = PortAgentPacket(PortAgentPacket.DIGI_CMD)
pa_packet.attach_data(test_data)
pa_packet.pack_header()
pa_packet.verify_checksum()
pa_listener.handle_packet(pa_packet)
self.assertTrue(self.rawCallbackCalled)
self.assertFalse(self.dataCallbackCalled)
self.assertFalse(self.errorCallbackCalled)
self.assertFalse(self.listenerCallbackCalled)
#.........这里部分代码省略.........
开发者ID:cameron55445,项目名称:mi-instrument,代码行数:101,代码来源:test_port_agent_client.py
示例16: TestUnitDataParticle
class TestUnitDataParticle(MiUnitTestCase):
"""
Test cases for the DataParticleGenerator class. Functions in this class
provide unit tests and provide a tutorial on use of
the data particle generator interface.
"""
class TestDataParticle(DataParticle):
"""
Simple test DataParticle derivative that returns fixed parsed
data values
@retval Value list for parased data set [{'value_id': foo,
'value': bar}, ...]
"""
_data_particle_type = TEST_PARTICLE_TYPE
def _build_parsed_values(self):
result = [{DataParticleKey.VALUE_ID: "temp",
DataParticleKey.VALUE: "23.45"},
{DataParticleKey.VALUE_ID: "cond",
DataParticleKey.VALUE: "15.9"},
{DataParticleKey.VALUE_ID: "depth",
DataParticleKey.VALUE: "305.16"}]
return result
class BadDataParticle(DataParticle):
"""
Define a data particle that doesn't initialize _data_particle_type.
Should raise an exception when _data_particle_type isn't initialized
"""
pass
def setUp(self):
"""
"""
#self.start_couchdb() # appease the pyon gods
self.sample_port_timestamp = 3555423720.711772
self.sample_driver_timestamp = 3555423721.711772
self.sample_internal_timestamp = 3555423719.711772
self.sample_raw_data = "SATPAR0229,10.01,2206748544,234"
self.parsed_test_particle = self.TestDataParticle(self.sample_raw_data,
port_timestamp=self.sample_port_timestamp,
quality_flag=DataParticleValue.INVALID,
preferred_timestamp=DataParticleKey.DRIVER_TIMESTAMP)
self.sample_parsed_particle = {
DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
DataParticleKey.STREAM_NAME: TEST_PARTICLE_TYPE,
DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
#DataParticleKey.NEW_SEQUENCE: None,
DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.DRIVER_TIMESTAMP,
DataParticleKey.QUALITY_FLAG: DataParticleValue.INVALID,
DataParticleKey.VALUES: [
{DataParticleKey.VALUE_ID: "temp",
DataParticleKey.VALUE: "23.45"},
{DataParticleKey.VALUE_ID: "cond",
DataParticleKey.VALUE: "15.9"},
{DataParticleKey.VALUE_ID: "depth",
DataParticleKey.VALUE: "305.16"}
]
}
self.port_agent_packet = PortAgentPacket()
self.port_agent_packet.attach_data(self.sample_raw_data)
self.port_agent_packet.pack_header()
self.raw_test_particle = RawDataParticle(self.port_agent_packet.get_as_dict(),
port_timestamp=self.sample_port_timestamp,
internal_timestamp=self.sample_internal_timestamp)
self.sample_raw_particle = {
DataParticleKey.PKT_FORMAT_ID: DataParticleValue.JSON_DATA,
DataParticleKey.PKT_VERSION: TEST_PARTICLE_VERSION,
DataParticleKey.STREAM_NAME: CommonDataParticleType.RAW,
DataParticleKey.INTERNAL_TIMESTAMP: self.sample_internal_timestamp,
DataParticleKey.PORT_TIMESTAMP: self.sample_port_timestamp,
DataParticleKey.DRIVER_TIMESTAMP: self.sample_driver_timestamp,
#DataParticleKey.NEW_SEQUENCE: None,
DataParticleKey.PREFERRED_TIMESTAMP: DataParticleKey.PORT_TIMESTAMP,
DataParticleKey.QUALITY_FLAG: "ok",
DataParticleKey.VALUES: [
{ DataParticleKey.VALUE_ID: "raw",
DataParticleKey.VALUE: base64.b64encode(self.sample_raw_data),
"binary": True },
{ DataParticleKey.VALUE_ID: "length",
DataParticleKey.VALUE: int(31)},
{ DataParticleKey.VALUE_ID: "type",
DataParticleKey.VALUE: int(2)},
{ DataParticleKey.VALUE_ID: "checksum",
DataParticleKey.VALUE: int(2392)},
]
}
def test_new_sequence_flag(self):
"""
Verify that we can set the new sequence flag
"""
#.........这里部分代码省略.........
开发者ID:aplmmilcic,项目名称:mi-instrument,代码行数:101,代码来源:test_data_particle.py
示例17: test_status_01
def test_status_01(self):
"""
"""
mock_port_agent = Mock(spec=PortAgentClient)
driver = InstrumentDriver(self._got_data_event_callback)
driver.set_test_mode(True)
current_state = driver.get_resource_state()
self.assertEqual(current_state, DriverConnectionState.UNCONFIGURED)
# Now configure the driver with the mock_port_agent, verifying
# that the driver transitions to that state
config = {'mock_port_agent' : mock_port_agent}
driver.configure(config = config)
current_state = driver.get_resource_state()
self.assertEqual(current_state, DriverConnectionState.DISCONNECTED)
# Invoke the connect method of the driver: should connect to mock
# port agent. Verify that the connection FSM transitions to CONNECTED,
# (which means that the FSM should now be reporting the ProtocolState).
driver.connect()
current_state = driver.get_resource_state()
self.assertEqual(current_state, DriverProtocolState.UNKNOWN)
# Force the instrument into a known state
self.assert_force_state(driver, DriverProtocolState.COMMAND)
ts = ntplib.system_to_ntp_time(time.time())
log.debug("DUMP_STATUS: %s", DUMP_STATUS)
# Create and populate the port agent packet.
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data(DUMP_STATUS)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("HEAT,2013/06/19 23:04:37,-001,0000,0026" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("LILY,2013/06/19 23:04:38, -49.455, 34.009,193.91, 26.02,11.96,N9655" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("NANO,V,2013/06/19 23:04:38.000,13.987223,25.126694121" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("LILY,2013/06/19 23:04:39, -49.483, 33.959,193.85, 26.03,11.96,N9655" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("NANO,V,2013/06/19 23:04:39.000,13.987191,25.126709409" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("LILY,2013/06/19 23:04:40, -49.355, 33.956,193.79, 26.02,11.96,N9655" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("NANO,V,2013/06/19 23:04:40.000,13.987253,25.126725854" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("HEAT,2013/06/19 23:04:40,-001,0000,0026" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("NANO,2013/06/19 21:46:54,*APPLIED GEOMECHANICS Model MD900-T Firmware V5.2 SN-N3616 ID01" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("NANO,V,2013/06/19 21:46:54.000,13.990480,25.027793612" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
port_agent_packet.pack_header()
#driver._protocol.got_data(port_agent_packet)
port_agent_packet = PortAgentPacket()
port_agent_packet.attach_data("NANO,2013/06/19 21:46:54,*01: Vbias= 0.0000 0.0000 0.0000 0.0000" + NEWLINE)
port_agent_packet.attach_timestamp(ts)
#.........这里部分代码省略.........
开发者ID:StevenMyerson,项目名称:marine-integrations,代码行数:101,代码来源:test_driver.py
示例18: test_sample
def test_sample(self):
"""
Create a mock port agent
"""
mock_port_agent = Mock(spec=PortAgentClient)
"""
Instantiate the driver class directly (no driver client, no driver
client, no zmq driver process, no driver process; just own the driver)
"""
test_driver = InstrumentDriver(self.my_event_callback)
current_state = test_driver.get_resource_state()
print "DHE: DriverConnectionState: " + str(current_state)
self.assertEqual(current_state, DriverConnectionState.UNCONFIGURED)
"""
Now configure the driver with the mock_port_agent, verifying
that the driver transitions to that state
"""
config = {'mock_port_agent' : mock_port_agent}
# commented below out; trying real port agent
test_driver.configure(config = config)
"""
DHE: trying this; want to invoke the driver methods themselves, but
with the driver talking to the port_agent (through the client).
Will it work? Answer: It does, but I can't step through it. Let
me try just running this.
"""
#test_driver.configure(config = self.port_agent_comm_config())
current_state = test_driver.get_resource_state()
print "DHE: DriverConnectionState: " + str(current_state)
self.assertEqual(current_state, DriverConnectionState.DISCONNECTED)
"""
Invoke the connect method of the driver: should connect to mock
port agent. Verify that the connection FSM transitions to CONNECTED,
(which means that the FSM should now be reporting the ProtocolState).
"""
"""
TODO: DHE: Shouldn't have to put a sleep here; it's like the
port_agent isn't up yet; I get a connection refused without this.
"""
gevent.sleep(2)
test_driver.connect()
current_state = test_driver.get_resource_state()
print "DHE: DriverConnectionState: " + str(current_state)
self.assertEqual(current_state, DriverProtocolState.UNKNOWN)
"""
Invoke the discover_state method of the driver
"""
# commenting out because it will try wakeup
#test_driver.discover_state()
#current_state = test_driver.get_resource_state()
#print "DHE: DriverConnectionState: " + str(current_state)
"""
Force state to command state.
"""
"""
This won't work right now because the SBE16 does an _update_params upon
entry to the command_handler; since this is a unit test there is nothing
to respond to the ds and dc commands. Setting up mock to do it would
be really handy.
"""
test_driver.execute_force_state(state = DriverProtocolState.AUTOSAMPLE)
current_state = test_driver.get_resource_state()
print "DHE: DriverConnectionState: " + str(current_state)
self.assertEqual(current_state, DriverProtocolState.AUTOSAMPLE)
self.reset_test_vars()
test_sample = "# 24.0088, 0.00001, -0.000, 0.0117, 03 Oct 2012 20:59:04\r\n"
"""
use for test p
|
请发表评论