def read_analogsignal(self ,
# the 2 first key arguments are imposed by neo.io API
lazy = False,
cascade = True,
channel_index = 0,
segment_duration = 15.,
t_start = -1,
):
"""
With this IO AnalogSignal can e acces directly with its channel number
"""
sr = 10000.
sinus_freq = 3. # Hz
#time vector for generated signal:
tvect = np.arange(t_start, t_start+ segment_duration , 1./sr)
if lazy:
anasig = AnalogSignal([], units='V', sampling_rate=sr * pq.Hz,
t_start=t_start * pq.s,
channel_index=channel_index)
# we add the attribute lazy_shape with the size if loaded
anasig.lazy_shape = tvect.shape
else:
# create analogsignal (sinus of 3 Hz)
sig = np.sin(2*np.pi*tvect*sinus_freq + channel_index/5.*2*np.pi)+np.random.rand(tvect.size)
anasig = AnalogSignal(sig, units= 'V', sampling_rate=sr * pq.Hz,
t_start=t_start * pq.s,
channel_index=channel_index)
# for attributes out of neo you can annotate
anasig.annotate(info = 'it is a sinus of %f Hz' %sinus_freq )
return anasig
def read_analogsignal(self,
# the 2 first key arguments are imposed by neo.io
lazy = False,
cascade = True,
#channel index as given by the neuroshare API
channel_index = 0,
#time in seconds to be read
segment_duration = 0.,
#time in seconds to start reading from
t_start = 0.,
):
#some controls:
#if no segment duration is given, use the complete file
if segment_duration ==0.:
segment_duration=float(self.metadata["TimeSpan"])
#if the segment duration is bigger than file, use the complete file
if segment_duration >=float(self.metadata["TimeSpan"]):
segment_duration=float(self.metadata["TimeSpan"])
if lazy:
anasig = AnalogSignal([], units="V", sampling_rate = self.metadata["sampRate"] * pq.Hz,
t_start=t_start * pq.s,
)
#create a dummie time vector
tvect = np.arange(t_start, t_start+ segment_duration , 1./self.metadata["sampRate"])
# we add the attribute lazy_shape with the size if loaded
anasig.lazy_shape = tvect.shape
else:
#get the analog object
sig = self.fd.get_entity(channel_index)
#get the units (V, mV etc)
sigUnits = sig.units
#get the electrode number
chanName = sig.label[-4:]
#transform t_start into index (reading will start from this index)
startat = int(t_start*self.metadata["sampRate"])
#get the number of bins to read in
bins = int((segment_duration+t_start) * self.metadata["sampRate"])
#if the number of bins to read is bigger than
#the total number of bins, read only till the end of analog object
if startat+bins > sig.item_count:
bins = sig.item_count-startat
#read the data from the sig object
sig,_,_ = sig.get_data(index = startat, count = bins)
#store it to the 'AnalogSignal' object
anasig = AnalogSignal(sig, units = sigUnits, sampling_rate=self.metadata["sampRate"] * pq.Hz,
t_start=t_start * pq.s,
t_stop = (t_start+segment_duration)*pq.s,
channel_index=channel_index)
# annotate from which electrode the signal comes from
anasig.annotate(info = "signal from channel %s" %chanName )
return anasig
def read_segment(self, n_start, n_stop, chlist=None, lazy=False, cascade=True):
"""Reads a Segment from the file and stores in database.
The Segment will contain one AnalogSignal for each channel
and will go from n_start to n_stop (in samples).
Arguments:
n_start : time in samples that the Segment begins
n_stop : time in samples that the Segment ends
Python indexing is used, so n_stop is not inclusive.
Returns a Segment object containing the data.
"""
# If no channel numbers provided, get all of them
if chlist is None:
chlist = self.loader.get_neural_channel_numbers()
# Conversion from bits to full_range units
conversion = self.full_range / 2**(8*self.header.sample_width)
# Create the Segment
seg = Segment(file_origin=self.filename)
t_start = float(n_start) / self.header.f_samp
t_stop = float(n_stop) / self.header.f_samp
seg.annotate(t_start=t_start)
seg.annotate(t_stop=t_stop)
# Load data from each channel and store
for ch in chlist:
if lazy:
sig = np.array([]) * conversion
else:
# Get the data from the loader
sig = np.array(\
self.loader._get_channel(ch)[n_start:n_stop]) * conversion
# Create an AnalogSignal with the data in it
anasig = AnalogSignal(signal=sig,
sampling_rate=self.header.f_samp*pq.Hz,
t_start=t_start*pq.s, file_origin=self.filename,
description='Channel %d from %f to %f' % (ch, t_start, t_stop),
channel_index=int(ch))
if lazy:
anasig.lazy_shape = n_stop-n_start
# Link the signal to the segment
seg.analogsignals.append(anasig)
# Link the signal to the recording channel from which it came
#rc = self.channel_number_to_recording_channel[ch]
#rc.analogsignals.append(anasig)
return seg
def _extract_signals(self, data, metadata, lazy):
signal = None
if lazy and data.size > 0:
signal = AnalogSignal([],
units=self._determine_units(metadata),
sampling_period=metadata['dt']*pq.ms)
signal.lazy_shape = None
else:
arr = numpy.vstack(self._extract_array(data, channel_index)
for channel_index in range(metadata['first_index'], metadata['last_index'] + 1))
if len(arr) > 0:
signal = AnalogSignal(arr.T,
units=self._determine_units(metadata),
sampling_period=metadata['dt']*pq.ms)
if signal is not None:
signal.annotate(label=metadata["label"],
variable=metadata["variable"])
return signal
def read_analogsignal(self,
channel_index=None,
lazy=False,
cascade=True,
):
"""
Read raw traces
Arguments:
channel_index: must be integer
"""
try:
channel_index = int(channel_index)
except TypeError:
print('channel_index must be int, not %s' %type(channel_index))
if self._attrs['app_data']:
bit_volts = self._attrs['app_data']['channel_bit_volts']
sig_unit = 'uV'
else:
bit_volts = np.ones((self._attrs['shape'][1])) # TODO: find conversion in phy generated files
sig_unit = 'bit'
if lazy:
anasig = AnalogSignal([],
units=sig_unit,
sampling_rate=self._attrs['kwik']['sample_rate']*pq.Hz,
t_start=self._attrs['kwik']['start_time']*pq.s,
channel_index=channel_index,
)
# we add the attribute lazy_shape with the size if loaded
anasig.lazy_shape = self._attrs['shape'][0]
else:
data = self._kwd['recordings'][str(self._dataset)]['data'].value[:,channel_index]
data = data * bit_volts[channel_index]
anasig = AnalogSignal(data,
units=sig_unit,
sampling_rate=self._attrs['kwik']['sample_rate']*pq.Hz,
t_start=self._attrs['kwik']['start_time']*pq.s,
channel_index=channel_index,
)
data = [] # delete from memory
# for attributes out of neo you can annotate
anasig.annotate(info='raw traces')
return anasig
def _extract_signal(self, data, metadata, channel_index, lazy):
signal = None
if lazy:
if channel_index in data[:, 1]:
signal = AnalogSignal([],
units=self._determine_units(metadata),
sampling_period=metadata['dt']*pq.ms,
channel_index=channel_index)
signal.lazy_shape = None
else:
arr = self._extract_array(data, channel_index)
if len(arr) > 0:
signal = AnalogSignal(arr,
units=self._determine_units(metadata),
sampling_period=metadata['dt']*pq.ms,
channel_index=channel_index)
if signal is not None:
signal.annotate(label=metadata["label"],
variable=metadata["variable"])
return signal
#.........这里部分代码省略.........
sampling_rate = 1. / (header['fADCSampleInterval'] *
nbchannel * 1.e-6) * pq.Hz
elif version >= 2.:
sampling_rate = 1.e6 / \
header['protocol']['fADCSequenceInterval'] * pq.Hz
# construct block
# one sweep = one segment in a block
pos = 0
for j in range(episode_array.size):
seg = Segment(index=j)
length = episode_array[j]['len']
if version < 2.:
fSynchTimeUnit = header['fSynchTimeUnit']
elif version >= 2.:
fSynchTimeUnit = header['protocol']['fSynchTimeUnit']
if (fSynchTimeUnit != 0) and (mode == 1):
length /= fSynchTimeUnit
if not lazy:
subdata = data[pos:pos+length]
subdata = subdata.reshape((int(subdata.size/nbchannel),
nbchannel)).astype('f')
if dt == np.dtype('i2'):
if version < 2.:
reformat_integer_v1(subdata, nbchannel, header)
elif version >= 2.:
reformat_integer_v2(subdata, nbchannel, header)
pos += length
if version < 2.:
chans = [chan_num for chan_num in
header['nADCSamplingSeq'] if chan_num >= 0]
else:
chans = range(nbchannel)
for n, i in enumerate(chans[:nbchannel]): # fix SamplingSeq
if version < 2.:
name = header['sADCChannelName'][i].replace(b' ', b'')
unit = header['sADCUnits'][i].replace(b'\xb5', b'u').\
replace(b' ', b'').decode('utf-8') # \xb5 is µ
num = header['nADCPtoLChannelMap'][i]
elif version >= 2.:
lADCIi = header['listADCInfo'][i]
name = lADCIi['ADCChNames'].replace(b' ', b'')
unit = lADCIi['ADCChUnits'].replace(b'\xb5', b'u').\
replace(b' ', b'').decode('utf-8')
num = header['listADCInfo'][i]['nADCNum']
if (fSynchTimeUnit == 0):
t_start = float(episode_array[j]['offset']) / sampling_rate
else:
t_start = float(episode_array[j]['offset']) * fSynchTimeUnit *1e-6* pq.s
t_start = t_start.rescale('s')
try:
pq.Quantity(1, unit)
except:
unit = ''
if lazy:
signal = [] * pq.Quantity(1, unit)
else:
signal = pq.Quantity(subdata[:, n], unit)
anaSig = AnalogSignal(signal, sampling_rate=sampling_rate,
t_start=t_start,
name=str(name),
channel_index=int(num))
if lazy:
anaSig.lazy_shape = length / nbchannel
seg.analogsignals.append(anaSig)
bl.segments.append(seg)
if mode in [3, 5]: # TODO check if tags exits in other mode
# tag is EventArray that should be attached to Block
# It is attched to the first Segment
times = []
labels = []
comments = []
for i, tag in enumerate(header['listTag']):
times.append(tag['lTagTime']/sampling_rate)
labels.append(str(tag['nTagType']))
comments.append(clean_string(tag['sComment']))
times = np.array(times)
labels = np.array(labels, dtype='S')
comments = np.array(comments, dtype='S')
# attach all tags to the first segment.
seg = bl.segments[0]
if lazy:
ea = Event(times=[] * pq.s, labels=np.array([], dtype='S'))
ea.lazy_shape = len(times)
else:
ea = Event(times=times * pq.s, labels=labels,
comments=comments)
seg.events.append(ea)
bl.create_many_to_one_relationship()
return bl
请发表评论