def parse_chunks(self):
"""
Parse chunks for the Recovered CO and CT parser.
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
self.handle_non_data(non_data, non_end, start)
while chunk is not None:
header_match = SIO_HEADER_MATCHER.match(chunk)
header_timestamp = header_match.group(SIO_HEADER_GROUP_TIMESTAMP)
#
# Start processing at the end of the header.
#
chunk_idx = header_match.end(0)
if header_match.group(SIO_HEADER_GROUP_ID) == ID_OFFSET:
(particles, had_error) = parse_co_data(CtdmoGhqrSioRecoveredOffsetDataParticle,
chunk[chunk_idx:-1], header_timestamp,
self._extract_sample)
if had_error[0]:
log.error('unknown data found in CO chunk %s at %d, leaving out the rest',
binascii.b2a_hex(chunk), had_error[1])
self._exception_callback(SampleException(
'unknown data found in CO chunk at %d, leaving out the rest' % had_error[1]))
result_particles.extend(particles)
if header_match.group(SIO_HEADER_GROUP_ID) == ID_INSTRUMENT:
header_str = header_match.group(0)
inductive_id = header_str[8:10]
(particles, had_error) = parse_ct_data(CtdmoGhqrRecoveredHostInstrumentDataParticle,
chunk[chunk_idx:-1], header_timestamp,
self._extract_sample, inductive_id)
if had_error[0]:
log.error('unknown data found in CT chunk %s at %d, leaving out the rest',
binascii.b2a_hex(chunk), had_error[1])
self._exception_callback(SampleException(
'unknown data found in CT chunk at %d, leaving out the rest' % had_error[1]))
result_particles.extend(particles)
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
self.handle_non_data(non_data, non_end, start)
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(nd_timestamp, non_data, non_start,
non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start,
end) = self._chunker.get_next_data_with_index()
while (chunk != None):
header_match = SIO_HEADER_MATCHER.match(chunk)
sample_count = 0
log.debug('parsing header %s', header_match.group(0)[1:32])
if header_match.group(1) == 'DO':
data_match = DATA_MATCHER.search(chunk)
if data_match:
log.debug('Found data match in chunk %s', chunk[1:32])
if not self._read_state.get(StateKey.METADATA_SENT):
# create the metadata particle
# prepend the timestamp from sio mule header to the dosta raw data,
# which is stored in header_match.group(3)
metadata_sample = self._extract_sample(
DostadMetadataDataParticle, None,
header_match.group(3) + data_match.group(0), None)
if metadata_sample:
result_particles.append(metadata_sample)
sample_count += 1
self._read_state[StateKey.METADATA_SENT] = True
# create the dosta data particle
# prepend the timestamp from sio mule header to the dosta raw data,
# which is stored in header_match.group(3)
sample = self._extract_sample(
DostadParserDataParticle, None,
header_match.group(3) + data_match.group(0), None)
if sample:
# create particle
result_particles.append(sample)
sample_count += 1
self._chunk_sample_count.append(sample_count)
(nd_timestamp, non_data, non_start,
non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start,
end) = self._chunker.get_next_data_with_index()
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(timestamp, chunk) = self._chunker.get_next_data()
while (chunk != None):
# Parse/match the SIO header
sio_header_match = SIO_HEADER_MATCHER.match(chunk)
end_of_header = sio_header_match.end(0)
sample_count = 0
if sio_header_match.group(1) == 'WE':
log.trace('read_state: %s', self._read_state)
# Parse/match the E file header
e_header_match = E_HEADER_MATCHER.search(chunk[end_of_header:end_of_header+HEADER_BYTES])
if e_header_match:
payload = chunk[end_of_header+HEADER_BYTES:-1] # '-1' to remove the '\x03' end-of-record marker
data_split = self.we_split_function(payload)
if data_split:
for ii in range(0,len(data_split)):
e_record = payload[data_split[ii][0]:data_split[ii][1]]
if not STATUS_START_MATCHER.match(e_record[0:STATUS_BYTES]):
fields = struct.unpack('>I', e_record[0:4])
self._timestamp = ntplib.system_to_ntp_time(float(fields[0]))
if len(e_record) == E_GLOBAL_SAMPLE_BYTES:
sample = self._extract_sample(FlordLWfpParserDataParticle,
None,
e_record,
self._timestamp)
if sample:
# create particle
result_particles.append(sample)
sample_count += 1
else:
self._exception_callback(UnexpectedDataException("Found unexpected data."))
else: # no e header match
self._exception_callback(UnexpectedDataException("Found unexpected data."))
self._chunk_sample_count.append(sample_count)
(timestamp, chunk) = self._chunker.get_next_data()
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index(clean=True)
while chunk is not None:
#
# Verify that the Instrument ID is the one that we want.
#
header = SIO_HEADER_MATCHER.match(chunk)
if header.group(SIO_HEADER_GROUP_ID) == ID_VEL3D_L_WFP_SIO_MULE:
#
# Extract the POSIX timestamp from the SIO Header.
#
sio_timestamp = int(header.group(SIO_HEADER_GROUP_TIMESTAMP), 16)
#
# Process the remaining Vel3d data, starting from the end of the
# SIO Header, but not including the trailing 0x03.
#
fields = self.parse_vel3d_data(PARTICLE_TYPE_SIO_INSTRUMENT,
PARTICLE_TYPE_SIO_METADATA,
chunk[header.end(0) : -1],
time_stamp=sio_timestamp)
#
# Generate the particles for this SIO block.
# Add them to the return list of particles.
#
(samples, particles) = self.generate_samples(fields)
for x in range(0, samples):
result_particles.append(particles[x])
#
# Not our instrument, but still must indicate that no samples were found.
#
else:
samples = 0
# keep track of how many samples were found in this chunk
self._chunk_sample_count.append(samples)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index(clean=True)
return result_particles
def parse_chunks(self):
"""
Parse chunks for the Telemetered parser.
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle.
Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
self.handle_non_data(non_data, non_end, start)
while chunk is not None:
header_match = SIO_HEADER_MATCHER.match(chunk)
if header_match:
header_timestamp = header_match.group(SIO_HEADER_GROUP_TIMESTAMP)
# start looping at the end of the header
chunk_idx = header_match.end(0)
if header_match.group(SIO_HEADER_GROUP_ID) == ID_INSTRUMENT:
#
# Parse the CT record, up to but not including the end of SIO block.
#
particles = self.parse_ct_record(chunk[chunk_idx:-1], header_timestamp)
result_particles.extend(particles)
elif header_match.group(SIO_HEADER_GROUP_ID) == ID_OFFSET:
(particles, had_error) = parse_co_data(CtdmoGhqrSioTelemeteredOffsetDataParticle,
chunk[chunk_idx:-1], header_timestamp,
self._extract_sample)
if had_error[0]:
log.error('unknown data found in CO chunk %s at %d, leaving out the rest',
binascii.b2a_hex(chunk), had_error[1])
self._exception_callback(SampleException(
'unknown data found in CO chunk at %d, leaving out the rest' % had_error[1]))
result_particles.extend(particles)
else:
message = 'Unexpected Sio Header ID %s' % header_match.group(SIO_HEADER_GROUP_ID)
log.warn(message)
self._exception_callback(UnexpectedDataException(message))
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
self.handle_non_data(non_data, non_end, start)
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index(clean=True)
# if there is any non data handle it
self.handle_non_data(non_data, non_end, start)
while chunk is not None:
header_match = SIO_HEADER_MATCHER.match(chunk)
if header_match.group(SIO_HEADER_GROUP_ID) == 'CS':
data_match = ENG_MATCHER.match(chunk)
if data_match:
# put timestamp from hex string to float:
posix_time = int(header_match.group(SIO_HEADER_GROUP_TIMESTAMP), 16)
log.debug('utc timestamp %s', datetime.utcfromtimestamp(posix_time))
timestamp = ntplib.system_to_ntp_time(float(posix_time))
# particle-ize the data block received, return the record
sample = self._extract_sample(self._particle_class, None, data_match, internal_timestamp=timestamp)
if sample:
# create particle
result_particles.append(sample)
else:
log.warn('CS data does not match REGEX')
self._exception_callback(SampleException('CS data does not match REGEX'))
# 'PS' IDs will also be in this file but are specifically ignored
elif header_match.group(SIO_HEADER_GROUP_ID) != 'PS':
message = 'Unexpected Sio Header ID %s' % header_match.group(SIO_HEADER_GROUP_ID)
log.warn(message)
self._exception_callback(UnexpectedDataException(message))
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index(clean=True)
# if there is any non data handle it
self.handle_non_data(non_data, non_end, start)
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
while chunk is not None:
header_match = SIO_HEADER_MATCHER.match(chunk)
if header_match.group(1) == "DO":
data_match = DATA_MATCHER.search(chunk)
if data_match:
log.debug("Found data match in chunk %s", chunk[1:32])
if not self.metadata_sent:
# create the metadata particle
# prepend the timestamp from sio mule header to the dosta raw data,
# which is stored in header_match.group(3)
metadata_sample = self._extract_sample(
self._metadata_particle_class, None, (header_match.group(3), data_match), None
)
if metadata_sample:
result_particles.append(metadata_sample)
self.metadata_sent = True
# create the dosta data particle
# prepend the timestamp from sio mule header to the dosta raw data ,
# which is stored in header_match.group(3)
sample = self._extract_sample(
self._data_particle_class, None, (header_match.group(3), data_match), None
)
if sample:
# create particle
result_particles.append(sample)
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
return result_particles
def parse_chunks(self):
"""
Parse chunks for the Telemetered parser.
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle.
Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
while chunk is not None:
header_match = SIO_HEADER_MATCHER.match(chunk)
header_timestamp = header_match.group(SIO_HEADER_GROUP_TIMESTAMP)
# start looping at the end of the header
chunk_idx = header_match.end(0)
samples = 0
if header_match.group(SIO_HEADER_GROUP_ID) == ID_INSTRUMENT:
#
# Parse the CT record, up to but not including the end of SIO block.
#
(samples, particles) = self.parse_ct_record(chunk[chunk_idx : -1],
header_timestamp)
if samples > 0:
for x in range(0, samples):
result_particles.append(particles[x])
elif header_match.group(SIO_HEADER_GROUP_ID) == ID_OFFSET:
(samples, particles) = self.parse_co_data(
CtdmoTelemeteredOffsetDataParticle,
chunk[chunk_idx : -1], header_timestamp)
if samples > 0:
for x in range(0, samples):
result_particles.append(particles[x])
# keep track of how many samples were found in this chunk
self._chunk_sample_count.append(samples)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
(timestamp, chunk) = self._chunker.get_next_data()
while chunk is not None:
# Parse/match the SIO header
sio_header_match = SIO_HEADER_MATCHER.match(chunk)
end_of_header = sio_header_match.end(0)
if sio_header_match.group(1) == 'WE':
# Parse/match the E file header
e_header_match = E_HEADER_MATCHER.search(
chunk[end_of_header:end_of_header+HEADER_BYTES])
if e_header_match:
# '-1' to remove the '\x03' end-of-record marker
payload = chunk[end_of_header+HEADER_BYTES:-1]
self._process_we_record(payload)
else:
message = "Found unexpected data."
log.warn(message)
self._exception_callback(UnexpectedDataException(
message))
else: # no e header match
message = "Found unexpected data."
log.warn(message)
self._exception_callback(UnexpectedDataException(message))
(timestamp, chunk) = self._chunker.get_next_data()
return self._result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
self.handle_non_data(non_data, non_end, start)
while chunk is not None:
header_match = SIO_HEADER_MATCHER.match(chunk)
if header_match.group(1) == 'FL':
data_match = DATA_MATCHER.search(chunk)
if data_match:
log.debug('Found data match in chunk %s', chunk[1:32])
# particle-ize the data block received, return the record
# prepend the timestamp from sio mule header to the flort raw data,
# which is stored in header_match.group(3)
sample = self._extract_sample(self._particle_class, None,
header_match.group(3) + data_match.group(0),
None)
if sample:
# create particle
result_particles.append(sample)
else:
# We found a line in the file that was unexpected. Since we are continuing,
# just log a warning.
warn_str = "Unexpected data in beginning of header: "
log.warn(warn_str + "%s", header_match.group(1))
message = warn_str + header_match.group(1)
self._exception_callback(UnexpectedDataException(message))
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
self.handle_non_data(non_data, non_end, start)
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index(clean=True)
while (chunk != None):
header_match = SIO_HEADER_MATCHER.match(chunk)
sample_count = 0
log.debug('parsing header %s', header_match.group(0)[1:32])
if header_match.group(1) == 'CS':
log.debug('\n\nCS Header detected::: %s\n\n', header_match.group(0)[1:32])
data_match = ENG_MATCHER.match(chunk)
if data_match:
# put timestamp from hex string to float:
posix_time = int(header_match.group(3), 16)
log.debug('utc timestamp %s', datetime.utcfromtimestamp(posix_time))
self._timestamp = ntplib.system_to_ntp_time(float(posix_time))
# particle-ize the data block received, return the record
sample = self._extract_sample(self._particle_class, ENG_MATCHER, chunk, self._timestamp)
if sample:
# create particle
result_particles.append(sample)
sample_count +=1
else:
log.warn('CS data does not match REGEX')
self._exception_callback(SampleException('CS data does not match REGEX'))
self._chunk_sample_count.append(sample_count)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index(clean=True)
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
sample_count = 0
while (chunk != None):
header_match = SIO_HEADER_MATCHER.match(chunk)
sample_count = 0
log.debug('parsing header %s', header_match.group(0)[1:32])
if header_match.group(1) == 'DO':
data_match = DATA_MATCHER.search(chunk)
if data_match:
log.debug('Found data match in chunk %s', chunk[1:32])
# particle-ize the data block received, return the record
sample = self._extract_sample(DostadParserDataParticle, None,
header_match.group(3) + data_match.group(0),
None)
if sample:
# create particle
result_particles.append(sample)
sample_count += 1
self._chunk_sample_count.append(sample_count)
(nd_timestamp, non_data, non_start, non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index(clean=True)
while chunk is not None:
header_match = SIO_HEADER_MATCHER.match(chunk)
# Check to see if we are dealing with a wfp_eng SIO chunk
if header_match.group(1) == 'WE':
self._current_controller_timestamp = header_match.group(3)
self._process_wfp_eng_chunk(chunk[len(header_match.group(0)):])
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
return self._result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
# non-data does not need to be handled here because for the single file
# the data may be corrected and re-written later, it is just ignored until it matches
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
while chunk != None:
header_match = SIO_HEADER_MATCHER.match(chunk)
sample_count = 0
if header_match.group(1) == "PH":
# start after the sio header
index = header_match.end(0)
last_index = index
chunk_len = len(chunk)
while index < chunk_len:
data_match = DATA_MATCHER.match(chunk[index:])
control_match = CONTROL_MATCHER.match(chunk[index:])
# check for any valid match and make sure no extra data was found between valid matches
if data_match or control_match or chunk[index] == SIO_END:
# if the indices don't match we have data that doesn't match
# exclude the expected possible ph end bytes
if last_index != index and chunk[last_index:index] != PH_EXTRA_END:
# we found bad data, send a sample exception but keep processing the file
log.warning("unknown data found in chunk %s from %d to %d", chunk[1:32], last_index, index)
self._exception_callback(
SampleException(
"unknown data found in chunk %s from %d to %d" % (chunk[1:32], last_index, index)
)
)
# stop processing this sio block, it is bad
break
if data_match:
log.debug("Found data match in chunk %s at index %d", chunk[1:32], index)
# particle-ize the data block received, return the record
# pre-pend the sio header timestamp to the data record (in header_match.group(3))
sample = self._extract_sample(
PhsenParserDataParticle, None, header_match.group(3) + data_match.group(0), None
)
if sample:
# create particle
result_particles.append(sample)
sample_count += 1
index += len(data_match.group(0))
last_index = index
elif control_match:
log.debug("Found control match in chunk %s at index %d", chunk[1:32], index)
# particle-ize the data block received, return the record
# pre-pend the sio header timestamp to the control record (in header_match.group(3))
sample = self._extract_sample(
PhsenControlDataParticle, None, header_match.group(3) + control_match.group(0), None
)
if sample:
# create particle
result_particles.append(sample)
sample_count += 1
index += len(control_match.group(0))
last_index = index
elif chunk[index] == SIO_END:
# found end of sio block marker, we are done with this chunk
break
else:
# we found extra data, warn on chunks of extra data not each byte
index += 1
self._chunk_sample_count.append(sample_count)
# non-data does not need to be handled here because for the single file
# the data may be corrected and re-written later, it is just ignored until it matches
(timestamp, chunk, start, end) = self._chunker.get_next_data_with_index()
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list if nothing was parsed.
"""
result_particles = []
(timestamp, chunk) = self._chunker.get_next_data()
while chunk is not None:
sio_header_match = SIO_HEADER_MATCHER.match(chunk)
log.debug('parsing header %s', sio_header_match.group(0)[1:SIO_HEADER_BYTES])
if sio_header_match.group(1) != 'WE':
log.warn(" chunk did not match header WE %s", chunk[0:SIO_HEADER_BYTES])
# get the next chunk
(timestamp, chunk) = self._chunker.get_next_data()
continue # jump to next iteration of the chunk loop
# Parse/match the E file header
e_header_match = WFP_E_GLOBAL_FLAGS_HEADER_MATCHER.search(
chunk[SIO_HEADER_BYTES:SIO_HEADER_BYTES+HEADER_BYTES+1])
if not e_header_match:
# no e header match
log.warn("*BAD E HEADER 0x%s",
":".join("{:02x}".format(ord(c)) for c in chunk))
self._exception_callback(UnexpectedDataException("Found unexpected data."))
# get the next chunk
(timestamp, chunk) = self._chunker.get_next_data()
continue # jump to next iteration of the chunk loop
payload = chunk[SIO_HEADER_BYTES+HEADER_BYTES+1:]
data_split = self.we_split_function(payload)
log.debug('Found data match in chunk %s', chunk[1:SIO_HEADER_BYTES])
for ii in range(0, len(data_split)):
e_record = payload[data_split[ii][0]:data_split[ii][1]]
log.debug('Extracted E Record to store in particle %s', hexlify(e_record))
# particle-ize the data block received, return the record
if not STATUS_START_MATCHER.match(e_record[0:STATUS_BYTES]):
fields = struct.unpack('>I', e_record[0:4])
timestamp_s = float(fields[0])
timestamp = ntplib.system_to_ntp_time(timestamp_s)
if len(e_record) == E_GLOBAL_SAMPLE_BYTES:
# create particle
log.debug('Particle created with raw data %s', hexlify(e_record))
log.debug('Particle timestamp = %f', timestamp)
sample = self._extract_sample(DostaLnWfpSioDataParticle,
None,
e_record,
timestamp)
result_particles.append(sample)
(timestamp, chunk) = self._chunker.get_next_data()
return result_particles
def parse_chunks(self):
"""
Parse out any pending data chunks in the chunker. If
it is a valid data piece, build a particle, update the position and
timestamp. Go until the chunker has no more valid data.
@retval a list of tuples with sample particles encountered in this
parsing, plus the state. An empty list of nothing was parsed.
"""
result_particles = []
# non-data does not need to be handled here because for the single file
# the data may be corrected and re-written later, it is just ignored until it matches
(nd_timestamp, non_data, non_start,
non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start,
end) = self._chunker.get_next_data_with_index()
self.handle_non_data(non_data, non_end, start)
while chunk is not None:
header_match = SIO_HEADER_MATCHER.match(chunk)
if header_match.group(1) == 'PH':
# start after the sio header
index = header_match.end(0)
last_index = index
chunk_len = len(chunk)
while index < chunk_len:
data_match = DATA_MATCHER.match(chunk[index:])
control_match = CONTROL_MATCHER.match(chunk[index:])
# check for any valid match and make sure no extra data was found between valid matches
if data_match or control_match or chunk[
index] == SIO_BLOCK_END:
# if the indices don't match we have data that doesn't match
# exclude the expected possible ph end bytes
if last_index != index and chunk[
last_index:index] != PH_EXTRA_END:
# we found bad data, send a sample exception but keep processing the file
msg = "unknown data found in chunk %s from %d to %d" % (
chunk[1:32], last_index, index)
log.warning(msg)
self._exception_callback(SampleException(msg))
# stop processing this sio block, it is bad
break
if data_match:
log.debug('Found data match in chunk %s at index %d',
chunk[1:32], index)
# particle-ize the data block received, return the record
sample = self._extract_sample(
PhsenAbcdefSioDataParticle, None,
(header_match.group(3), data_match), None)
result_particles.append(sample)
index += len(data_match.group(0))
last_index = index
elif control_match:
log.debug(
'Found control match in chunk %s at index %d',
chunk[1:32], index)
# particle-ize the data block received, return the record
sample = self._extract_sample(
PhsenAbcdefSioControlDataParticle, None,
(header_match.group(3), control_match), None)
result_particles.append(sample)
index += len(control_match.group(0))
last_index = index
elif chunk[index] == SIO_BLOCK_END:
# found end of sio block marker, we are done with this chunk
break
else:
# we found extra data, warn on chunks of extra data not each byte
index += 1
else:
# we found unexpected sio instrument id
msg = "Unexpected sio instrument header ID %s" % header_match.group(
1)
log.warning(msg)
self._exception_callback(UnexpectedDataException(msg))
# non-data does not need to be handled here because for the single file
# the data may be corrected and re-written later, it is just ignored until it matches
(nd_timestamp, non_data, non_start,
non_end) = self._chunker.get_next_non_data_with_index(clean=False)
(timestamp, chunk, start,
end) = self._chunker.get_next_data_with_index()
self.handle_non_data(non_data, non_end, start)
return result_particles
请发表评论