本文整理汇总了Python中mi.dataset.parser.dosta_abcdjm_cspp.DostaAbcdjmCsppParser类的典型用法代码示例。如果您正苦于以下问题:Python DostaAbcdjmCsppParser类的具体用法?Python DostaAbcdjmCsppParser怎么用?Python DostaAbcdjmCsppParser使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DostaAbcdjmCsppParser类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_get_many
def test_get_many(self):
"""
Read test data and pull out multiple data particles at one time.
Assert that the results are those we expected.
"""
file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
stream_handle = open(file_path, 'rb')
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
None, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
# Let's attempt to retrieve 20 particles
particles = parser.get_records(20)
log.info("Exception callback value: %s", self.exception_callback_value)
# Should end up with 20 particles
self.assertTrue(len(particles) == 20)
expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')
for i in range(len(particles)):
self.assert_result(expected_results['data'][i], particles[i])
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:25,代码来源:test_dosta_abcdjm_cspp.py
示例2: test_simple
def test_simple(self):
"""
Read test data and pull out data particles.
Assert that the results are those we expected.
"""
file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
stream_handle = open(file_path, 'rb')
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
None, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(5)
log.info("Exception callback value: %s", self.exception_callback_value)
self.assertTrue(self.exception_callback_value is None)
self.assertTrue(len(particles) == 5)
expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')
for i in range(len(particles)):
self.assert_result(expected_results['data'][i], particles[i])
stream_handle.close()
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:27,代码来源:test_dosta_abcdjm_cspp.py
示例3: test_midstate_start
def test_midstate_start(self):
"""
This test makes sure that we retrieve the correct particles upon starting with an offsetted state.
"""
expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')
file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
stream_handle = open(file_path, 'rb')
initial_state = {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True}
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
initial_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(2)
log.info("Num particles: %s", len(particles))
self.assertTrue(len(particles) == 2)
for i in range(len(particles)):
self.assert_result(expected_results['data'][i+2], particles[i])
log.info("******** Read State: %s", parser._read_state)
log.info("******** State: %s", parser._state)
self.assertTrue(parser._state == {StateKey.POSITION: 518, StateKey.METADATA_EXTRACTED: True})
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:29,代码来源:test_dosta_abcdjm_cspp.py
示例4: test_bad_header_start_date
def test_bad_header_start_date(self):
"""
Ensure that bad data is skipped when it exists.
"""
with open(os.path.join(RESOURCE_PATH, 'BadHeaderProcessedData_PPB_OPT.txt'), 'r') as stream_handle:
parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)
# parser should return metadata without start date filled in
parser.get_records(1)
self.assertEqual(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:12,代码来源:test_dosta_abcdjm_cspp.py
示例5: test_bad_header_source_file_name
def test_bad_header_source_file_name(self):
"""
Ensure that bad source file name produces an error
"""
with open(os.path.join(RESOURCE_PATH, 'BadHeaderSourceFileName_PPB_OPT.txt'), 'r') as stream_handle:
parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)
parser.get_records(1)
self.assertEquals(len(self.exception_callback_value), 1)
self.assertIsInstance(self.exception_callback_value[0], SampleEncodingException)
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:13,代码来源:test_dosta_abcdjm_cspp.py
示例6: test_bad_data_record
def test_bad_data_record(self):
"""
Ensure that bad data creates a recoverable sample exception and parsing continues
"""
with open(os.path.join(RESOURCE_PATH, 'BadDataRecord_PPB_OPT.txt'), 'r') as stream_handle:
parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)
particles = parser.get_records(19)
self.assert_particles(particles, 'BadDataRecord_PPB_OPT.yml', RESOURCE_PATH)
self.assertEquals(len(self.exception_callback_value), 1)
self.assertIsInstance(self.exception_callback_value[0], RecoverableSampleException)
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:15,代码来源:test_dosta_abcdjm_cspp.py
示例7: test_linux_source_path_handling
def test_linux_source_path_handling(self):
"""
Read linux source path test data and assert that the results are those we expected.
"""
with open(os.path.join(RESOURCE_PATH, 'linux_11079894_PPB_OPT.txt'), 'r') as stream_handle:
parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)
particles = parser.get_records(20)
self.assertTrue(len(particles) == 20)
self.assert_particles(particles, 'linux.yml', RESOURCE_PATH)
self.assertEqual(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:15,代码来源:test_dosta_abcdjm_cspp.py
示例8: test_long_stream
def test_long_stream(self):
"""
Test a long stream
"""
with open(os.path.join(RESOURCE_PATH, '11079364_PPB_OPT.txt'), 'r') as stream_handle:
parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)
# Let's attempt to retrieve more particles than are available
particles = parser.get_records(300)
# Should end up with 272 particles
self.assertTrue(len(particles) == 272)
self.assertEquals(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:15,代码来源:test_dosta_abcdjm_cspp.py
示例9: test_bad_header_start_date
def test_bad_header_start_date(self):
"""
Ensure that bad data is skipped when it exists.
"""
file_path = os.path.join(RESOURCE_PATH, 'BadHeaderProcessedData_PPB_OPT.txt')
stream_handle = open(file_path, 'rb')
log.info(self.exception_callback_value)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
None, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
with self.assertRaises(SampleException):
parser.get_records(1)
stream_handle.close()
开发者ID:JeffRoy,项目名称:marine-integrations,代码行数:19,代码来源:test_dosta_abcdjm_cspp.py
示例10: test_simple_telem
def test_simple_telem(self):
"""
Read test data and pull out data particles.
Assert that the results are those we expected.
"""
with open(os.path.join(RESOURCE_PATH, '11194982_PPD_OPT.txt'), 'r') as stream_handle:
parser = DostaAbcdjmCsppParser(self.config_telemetered, stream_handle, self.exception_callback)
# Attempt to retrieve 20 particles, there are only 18 in the file though
particles = parser.get_records(20)
# Should end up with 18 particles
self.assertTrue(len(particles) == 18)
self.assert_particles(particles, '11194982_PPD_OPT.yml', RESOURCE_PATH)
self.assertEquals(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:19,代码来源:test_dosta_abcdjm_cspp.py
示例11: test_simple_recov
def test_simple_recov(self):
"""
Read test data and pull out data particles.
Assert that the results are those we expected.
"""
with open(os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt'), 'r') as stream_handle:
parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)
# Attempt to retrieve 20 particles, there are more in this file but only verify 20
particles = parser.get_records(20)
# Should end up with 20 particles
self.assertTrue(len(particles) == 20)
self.assert_particles(particles, '11079894_PPB_OPT.yml', RESOURCE_PATH)
self.assertEquals(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:19,代码来源:test_dosta_abcdjm_cspp.py
示例12: test_bad_header_source_file_name
def test_bad_header_source_file_name(self):
"""
Ensure that bad data is skipped when it exists.
"""
file_path = os.path.join(RESOURCE_PATH, 'BadHeaderSourceFileName_PPB_OPT.txt')
stream_handle = open(file_path, 'rb')
log.info(self.exception_callback_value)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
None, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
parser.get_records(1)
log.info("Exception callback value: %s", self.exception_callback_value)
self.assertTrue(self.exception_callback_value != None)
stream_handle.close()
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:22,代码来源:test_dosta_abcdjm_cspp.py
示例13: test_air_saturation_preset
def test_air_saturation_preset(self):
"""
Ensure that input files containing the air saturation field are parsed correctly.
Redmine #10238 Identified additional parameter enabled after first deployment
"""
with open(os.path.join(RESOURCE_PATH, 'ucspp_32260420_PPB_OPT.txt'), 'rU') as stream_handle:
parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)
# get the metadata particle and first 2 instrument particles and verify values.
particles = parser.get_records(3)
self.assertTrue(len(particles) == 3)
self.assertEqual(self.exception_callback_value, [])
self.assert_particles(particles, 'ucspp_32260420_PPB_OPT.yml', RESOURCE_PATH)
# get remaining particles and verify parsed without error.
particles = parser.get_records(100)
self.assertTrue(len(particles) == 93)
self.assertEqual(self.exception_callback_value, [])
开发者ID:danmergens,项目名称:mi-instrument,代码行数:22,代码来源:test_dosta_abcdjm_cspp.py
示例14: test_get_many
def test_get_many(self):
"""
Read test data and pull out data particles in smaller groups.
Assert that the results are those we expected.
"""
with open(os.path.join(RESOURCE_PATH, '11079419_PPB_OPT.txt'), 'r') as stream_handle:
parser = DostaAbcdjmCsppParser(self.config_recovered, stream_handle, self.exception_callback)
# Attempt to retrieve 20 total particles
particles = parser.get_records(5)
particles2 = parser.get_records(10)
particles.extend(particles2)
particles3 = parser.get_records(5)
particles.extend(particles3)
# Should end up with 20 particles
self.assertTrue(len(particles) == 20)
self.assert_particles(particles, '11079419_PPB_OPT.yml', RESOURCE_PATH)
self.assertEquals(self.exception_callback_value, [])
开发者ID:GrimJ,项目名称:mi-dataset,代码行数:23,代码来源:test_dosta_abcdjm_cspp.py
示例15: test_long_stream
def test_long_stream(self):
"""
Test a long stream
"""
file_path = os.path.join(RESOURCE_PATH, '11079364_PPB_OPT.txt')
stream_handle = open(file_path, 'rb')
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
None, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
# Let's attempt to retrieve 2000 particles
particles = parser.get_records(300)
log.info("Num particles: %s", len(particles))
log.info("Exception callback value: %s", self.exception_callback_value)
# Should end up with 272 particles
self.assertTrue(len(particles) == 272)
stream_handle.close()
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:24,代码来源:test_dosta_abcdjm_cspp.py
示例16: create_yml
def create_yml(self):
"""
This utility creates a yml file
"""
fid = open(os.path.join(RESOURCE_PATH, '11194982_PPD_OPT.txt'))
test_buffer = fid.read()
fid.close()
self.stream_handle = StringIO(test_buffer)
self.parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_TELEMETERED),
None, self.stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = self.parser.get_records(20)
log.info("Exception callback value: %s", self.exception_callback_value)
self.particle_to_yml(particles, '11194982_PPD_OPT.yml')
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:20,代码来源:test_dosta_abcdjm_cspp.py
示例17: test_state_after_one_record_retrieval
def test_state_after_one_record_retrieval(self):
"""
This test makes sure that we get the correct particles upon requesting one record at
a time.
"""
expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')
file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
stream_handle = open(file_path, 'rb')
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
None, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(1)
log.info("Num particles: %s", len(particles))
self.assertTrue(len(particles) == 1)
log.info("11111111 Read State: %s", parser._read_state)
log.info("11111111 State: %s", parser._state)
self.assertTrue(parser._state == {StateKey.POSITION: 0, StateKey.METADATA_EXTRACTED: True})
self.assert_result(expected_results['data'][0], particles[0])
new_state = copy.copy(parser._state)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
new_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(1)
log.info("22222222 Read State: %s", parser._read_state)
log.info("22222222 State: %s", parser._state)
self.assertTrue(parser._state == {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True})
self.assert_result(expected_results['data'][1], particles[0])
new_state = copy.copy(parser._state)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
new_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(1)
log.info("33333333 Read State: %s", parser._read_state)
log.info("33333333 State: %s", parser._state)
self.assertTrue(parser._state == {StateKey.POSITION: 425, StateKey.METADATA_EXTRACTED: True})
self.assert_result(expected_results['data'][2], particles[0])
new_state = copy.copy(parser._state)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
new_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(1)
log.info("44444444 Read State: %s", parser._read_state)
log.info("44444444 State: %s", parser._state)
self.assertTrue(parser._state == {StateKey.POSITION: 518, StateKey.METADATA_EXTRACTED: True})
self.assert_result(expected_results['data'][3], particles[0])
new_state = copy.copy(parser._state)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
new_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(1)
log.info("55555555 Read State: %s", parser._read_state)
log.info("55555555 State: %s", parser._state)
self.assertTrue(parser._state == {StateKey.POSITION: 611, StateKey.METADATA_EXTRACTED: True})
self.assert_result(expected_results['data'][4], particles[0])
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:92,代码来源:test_dosta_abcdjm_cspp.py
示例18: test_state_after_two_record_retrievals
def test_state_after_two_record_retrievals(self):
"""
This test makes sure that we get the correct particles upon requesting two records at
a time.
"""
expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')
file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
stream_handle = open(file_path, 'rb')
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
None, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(2)
log.info("Num particles: %s", len(particles))
self.assertTrue(len(particles) == 2)
for i in range(len(particles)):
self.assert_result(expected_results['data'][i], particles[i])
log.info("11111111 Read State: %s", parser._read_state)
log.info("11111111 State: %s", parser._state)
the_new_state = {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True}
log.info("11111111 new parser state: %s", the_new_state)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
the_new_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(2)
log.info("Num particles: %s", len(particles))
self.assertTrue(len(particles) == 2)
for i in range(len(particles)):
self.assert_result(expected_results['data'][i+2], particles[i])
log.info("22222222 Read State: %s", parser._read_state)
log.info("22222222 State: %s", parser._state)
the_new_state = {StateKey.POSITION: 480, StateKey.METADATA_EXTRACTED: True}
log.info("22222222 new parser state: %s", the_new_state)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
the_new_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(2)
log.info("Num particles: %s", len(particles))
self.assertTrue(len(particles) == 2)
for i in range(len(particles)):
self.assert_result(expected_results['data'][i+4], particles[i])
log.info("33333333 Read State: %s", parser._read_state)
log.info("33333333 State: %s", parser._state)
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:67,代码来源:test_dosta_abcdjm_cspp.py
示例19: DostaAbcdjmCsppParserUnitTestCase
class DostaAbcdjmCsppParserUnitTestCase(ParserUnitTestCase):
"""
dosta_abcdjm_cspp Parser unit test suite
"""
def state_callback(self, state, file_ingested):
""" Call back method to watch what comes in via the position callback """
self.state_callback_value = state
self.file_ingested_value = file_ingested
def pub_callback(self, pub):
""" Call back method to watch what comes in via the publish callback """
self.publish_callback_value = pub
def exception_callback(self, exception):
""" Callback method to watch what comes in via the exception callback """
self.exception_callback_value = exception
def setUp(self):
ParserUnitTestCase.setUp(self)
self.config = {
DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED: {
DataSetDriverConfigKeys.PARTICLE_MODULE: 'mi.dataset.parser.dosta_abcdjm_cspp',
DataSetDriverConfigKeys.PARTICLE_CLASS: None,
DataSetDriverConfigKeys.PARTICLE_CLASSES_DICT: {
METADATA_PARTICLE_CLASS_KEY: DostaAbcdjmCsppMetadataRecoveredDataParticle,
DATA_PARTICLE_CLASS_KEY: DostaAbcdjmCsppInstrumentRecoveredDataParticle,
}
},
DataTypeKey.DOSTA_ABCDJM_CSPP_TELEMETERED: {
DataSetDriverConfigKeys.PARTICLE_MODULE: 'mi.dataset.parser.dosta_abcdjm_cspp',
DataSetDriverConfigKeys.PARTICLE_CLASS: None,
DataSetDriverConfigKeys.PARTICLE_CLASSES_DICT: {
METADATA_PARTICLE_CLASS_KEY: DostaAbcdjmCsppMetadataTelemeteredDataParticle,
DATA_PARTICLE_CLASS_KEY: DostaAbcdjmCsppInstrumentTelemeteredDataParticle,
}
},
}
# Define test data particles and their associated timestamps which will be
# compared with returned results
self.file_ingested_value = None
self.state_callback_value = None
self.publish_callback_value = None
self.exception_callback_value = None
def particle_to_yml(self, particles, filename, mode='w'):
"""
This is added as a testing helper, not actually as part of the parser tests. Since the same particles
will be used for the driver test it is helpful to write them to .yml in the same form they need in the
results.yml fids here.
"""
# open write append, if you want to start from scratch manually delete this fid
fid = open(os.path.join(RESOURCE_PATH, filename), mode)
fid.write('header:\n')
fid.write(" particle_object: 'MULTIPLE'\n")
fid.write(" particle_type: 'MULTIPLE'\n")
fid.write('data:\n')
for i in range(0, len(particles)):
particle_dict = particles[i].generate_dict()
fid.write(' - _index: %d\n' %(i+1))
fid.write(' particle_object: %s\n' % particles[i].__class__.__name__)
fid.write(' particle_type: %s\n' % particle_dict.get('stream_name'))
fid.write(' internal_timestamp: %f\n' % particle_dict.get('internal_timestamp'))
for val in particle_dict.get('values'):
if isinstance(val.get('value'), float):
fid.write(' %s: %16.6f\n' % (val.get('value_id'), val.get('value')))
elif isinstance(val.get('value'), str):
fid.write(" %s: '%s'\n" % (val.get('value_id'), val.get('value')))
else:
fid.write(' %s: %s\n' % (val.get('value_id'), val.get('value')))
fid.close()
def create_yml(self):
"""
This utility creates a yml file
"""
fid = open(os.path.join(RESOURCE_PATH, '11194982_PPD_OPT.txt'))
test_buffer = fid.read()
fid.close()
self.stream_handle = StringIO(test_buffer)
self.parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_TELEMETERED),
None, self.stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = self.parser.get_records(20)
log.info("Exception callback value: %s", self.exception_callback_value)
self.particle_to_yml(particles, '11194982_PPD_OPT.yml')
def test_simple(self):
#.........这里部分代码省略.........
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:101,代码来源:test_dosta_abcdjm_cspp.py
示例20: test_state_reset
def test_state_reset(self):
"""
This test makes sure that we retrieve the correct particles upon resetting the state to a prior position.
"""
expected_results = self.get_dict_from_yml('11079894_PPB_OPT.yml')
file_path = os.path.join(RESOURCE_PATH, '11079894_PPB_OPT.txt')
stream_handle = open(file_path, 'rb')
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
None, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(1)
log.info("Num particles: %s", len(particles))
self.assertTrue(len(particles) == 1)
log.info("11111111 Read State: %s", parser._read_state)
log.info("11111111 State: %s", parser._state)
self.assertTrue(parser._state == {StateKey.POSITION: 0, StateKey.METADATA_EXTRACTED: True})
self.assert_result(expected_results['data'][0], particles[0])
new_state = copy.copy(parser._state)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
new_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(1)
log.info("22222222 Read State: %s", parser._read_state)
log.info("22222222 State: %s", parser._state)
self.assertTrue(len(particles) == 1)
self.assertTrue(parser._state == {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True})
self.assert_result(expected_results['data'][1], particles[0])
new_state = copy.copy(parser._state)
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
new_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
particles = parser.get_records(1)
log.info("33333333 Read State: %s", parser._read_state)
log.info("33333333 State: %s", parser._state)
self.assertTrue(parser._state == {StateKey.POSITION: 425, StateKey.METADATA_EXTRACTED: True})
self.assert_result(expected_results['data'][2], particles[0])
new_state = {StateKey.POSITION: 0, StateKey.METADATA_EXTRACTED: True}
parser = DostaAbcdjmCsppParser(self.config.get(DataTypeKey.DOSTA_ABCDJM_CSPP_RECOVERED),
new_state, stream_handle,
self.state_callback, self.pub_callback,
self.exception_callback)
# Now retrieve two particles. We should end up with the metadata and first data record
particles = parser.get_records(1)
log.info("44444444 Read State: %s", parser._read_state)
log.info("44444444 State: %s", parser._state)
self.assertTrue(parser._state == {StateKey.POSITION: 332, StateKey.METADATA_EXTRACTED: True})
self.assert_result(expected_results['data'][1], particles[0])
开发者ID:NickAlmonte,项目名称:mi-dataset,代码行数:77,代码来源:test_dosta_abcdjm_cspp.py
注:本文中的mi.dataset.parser.dosta_abcdjm_cspp.DostaAbcdjmCsppParser类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论