def proc_src_condition_unit_repetition(sweep, damaIndex, timeStamp, sweepLen,
side, ADperiod, respWin, filename):
'''Get the repetion for a unit in a condition in a src file that has been
processed by the official matlab function. See proc_src for details'''
damaIndex = damaIndex.astype('int32')
if len(sweep):
times = np.array([res[0, 0] for res in sweep['time']])
shapes = np.concatenate([res.flatten()[np.newaxis][np.newaxis] for res
in sweep['shape']], axis=0)
trig2 = np.array([res[0, 0] for res in sweep['trig2']])
else:
times = np.array([])
shapes = np.array([[[]]])
trig2 = np.array([])
times = pq.Quantity(times, units=pq.ms, dtype=np.float32)
t_start = pq.Quantity(0, units=pq.ms, dtype=np.float32)
t_stop = pq.Quantity(sweepLen, units=pq.ms, dtype=np.float32)
trig2 = pq.Quantity(trig2, units=pq.ms, dtype=np.uint8)
waveforms = pq.Quantity(shapes, dtype=np.int8, units=pq.mV)
sampling_period = pq.Quantity(ADperiod, units=pq.us)
train = SpikeTrain(times=times, t_start=t_start, t_stop=t_stop,
trig2=trig2, dtype=np.float32, timestamp=timeStamp,
dama_index=damaIndex, side=side, copy=True,
respwin=respWin, waveforms=waveforms,
file_origin=filename)
train.annotations['side'] = side
train.sampling_period = sampling_period
return train
def test__construct_subsegment_by_unit(self):
nb_seg = 3
nb_unit = 7
unit_with_sig = np.array([0, 2, 5])
signal_types = ['Vm', 'Conductances']
sig_len = 100
# channelindexes
chxs = [ChannelIndex(name='Vm',
index=unit_with_sig),
ChannelIndex(name='Conductance',
index=unit_with_sig)]
# Unit
all_unit = []
for u in range(nb_unit):
un = Unit(name='Unit #%d' % u, channel_indexes=np.array([u]))
assert_neo_object_is_compliant(un)
all_unit.append(un)
blk = Block()
blk.channel_indexes = chxs
for s in range(nb_seg):
seg = Segment(name='Simulation %s' % s)
for j in range(nb_unit):
st = SpikeTrain([1, 2], units='ms',
t_start=0., t_stop=10)
st.unit = all_unit[j]
for t in signal_types:
anasigarr = AnalogSignal(np.zeros((sig_len,
len(unit_with_sig))),
units='nA',
sampling_rate=1000.*pq.Hz,
channel_indexes=unit_with_sig)
seg.analogsignals.append(anasigarr)
blk.create_many_to_one_relationship()
for unit in all_unit:
assert_neo_object_is_compliant(unit)
for chx in chxs:
assert_neo_object_is_compliant(chx)
assert_neo_object_is_compliant(blk)
# what you want
newseg = seg.construct_subsegment_by_unit(all_unit[:4])
assert_neo_object_is_compliant(newseg)
def read_spiketrain(self ,
# the 2 first key arguments are imposed by neo.io API
lazy = False,
cascade = True,
segment_duration = 15.,
t_start = -1,
channel_index = 0,
):
"""
With this IO SpikeTrain can e acces directly with its channel number
"""
# There are 2 possibles behaviour for a SpikeTrain
# holding many Spike instance or directly holding spike times
# we choose here the first :
if not HAVE_SCIPY:
raise SCIPY_ERR
num_spike_by_spiketrain = 40
sr = 10000.
if lazy:
times = [ ]
else:
times = (np.random.rand(num_spike_by_spiketrain)*segment_duration +
t_start)
# create a spiketrain
spiketr = SpikeTrain(times, t_start = t_start*pq.s, t_stop = (t_start+segment_duration)*pq.s ,
units = pq.s,
name = 'it is a spiketrain from exampleio',
)
if lazy:
# we add the attribute lazy_shape with the size if loaded
spiketr.lazy_shape = (num_spike_by_spiketrain,)
# ours spiketrains also hold the waveforms:
# 1 generate a fake spike shape (2d array if trodness >1)
w1 = -stats.nct.pdf(np.arange(11,60,4), 5,20)[::-1]/3.
w2 = stats.nct.pdf(np.arange(11,60,2), 5,20)
w = np.r_[ w1 , w2 ]
w = -w/max(w)
if not lazy:
# in the neo API the waveforms attr is 3 D in case tetrode
# in our case it is mono electrode so dim 1 is size 1
waveforms = np.tile( w[np.newaxis,np.newaxis,:], ( num_spike_by_spiketrain ,1, 1) )
waveforms *= np.random.randn(*waveforms.shape)/6+1
spiketr.waveforms = waveforms*pq.mV
spiketr.sampling_rate = sr * pq.Hz
spiketr.left_sweep = 1.5* pq.s
# for attributes out of neo you can annotate
spiketr.annotate(channel_index = channel_index)
return spiketr
def read_segment(self,
lazy = False,
cascade = True,
delimiter = '\t',
t_start = 0.*pq.s,
unit = pq.s,
):
"""
Arguments:
delimiter : columns delimiter in file '\t' or one space or two space or ',' or ';'
t_start : time start of all spiketrain 0 by default
unit : unit of spike times, can be a str or directly a Quantities
"""
unit = pq.Quantity(1, unit)
seg = Segment(file_origin = os.path.basename(self.filename))
if not cascade:
return seg
f = open(self.filename, 'Ur')
for i,line in enumerate(f) :
alldata = line[:-1].split(delimiter)
if alldata[-1] == '': alldata = alldata[:-1]
if alldata[0] == '': alldata = alldata[1:]
if lazy:
spike_times = [ ]
t_stop = t_start
else:
spike_times = np.array(alldata).astype('f')
t_stop = spike_times.max()*unit
sptr = SpikeTrain(spike_times*unit, t_start=t_start, t_stop=t_stop)
if lazy:
sptr.lazy_shape = len(alldata)
sptr.annotate(channel_index = i)
seg.spiketrains.append(sptr)
f.close()
seg.create_many_to_one_relationship()
return seg
def __save_segment(self):
'''
Write the segment to the Block if it exists
'''
# if this is the beginning of the first condition, then we don't want
# to save, so exit
# but set __seg from None to False so we know next time to create a
# segment even if there are no spike in the condition
if self.__seg is None:
self.__seg = False
return
if not self.__seg:
# create dummy values if there are no SpikeTrains in this condition
self.__seg = Segment(file_origin=self._filename,
**self.__params)
self.__spiketimes = []
if self.__lazy:
train = SpikeTrain(pq.Quantity([], dtype=np.float32,
units=pq.ms),
t_start=0*pq.ms, t_stop=self.__t_stop * pq.ms,
file_origin=self._filename)
train.lazy_shape = len(self.__spiketimes)
else:
times = pq.Quantity(self.__spiketimes, dtype=np.float32,
units=pq.ms)
train = SpikeTrain(times,
t_start=0*pq.ms, t_stop=self.__t_stop * pq.ms,
file_origin=self._filename)
self.__seg.spiketrains = [train]
self.__unit.spiketrains.append(train)
self._blk.segments.append(self.__seg)
# set an empty segment
# from now on, we need to set __seg to False rather than None so
# that if there is a condition with no SpikeTrains we know
# to create an empty Segment
self.__seg = False
def _handle_processing_group(self, block):
# todo: handle other modules than Units
units_group = self._file.get('processing/Units/UnitTimes')
segment_map = dict((segment.name, segment) for segment in block.segments)
for name, group in units_group.items():
if name == 'unit_list':
pass # todo
else:
segment_name = group['source'].value
#desc = group['unit_description'].value # use this to store Neo Unit id?
segment = segment_map[segment_name]
if self._lazy:
times = np.array(())
lazy_shape = group['times'].shape
else:
times = group['times'].value
spiketrain = SpikeTrain(times, units=pq.second,
t_stop=group['t_stop'].value*pq.second) # todo: this is a custom Neo value, general NWB files will not have this - use segment.t_stop instead in that case?
if self._lazy:
spiketrain.lazy_shape = lazy_shape
spiketrain.segment = segment
segment.spiketrains.append(spiketrain)
def read_block(self, lazy=False):
"""Returns a Block containing spike information.
There is no obvious way to infer the segment boundaries from
raw spike times, so for now all spike times are returned in one
big segment. The way around this would be to specify the segment
boundaries, and then change this code to put the spikes in the right
segments.
"""
assert not lazy, 'Do not support lazy'
# Create block and segment to hold all the data
block = Block()
# Search data directory for KlustaKwik files.
# If nothing found, return empty block
self._fetfiles = self._fp.read_filenames('fet')
self._clufiles = self._fp.read_filenames('clu')
if len(self._fetfiles) == 0:
return block
# Create a single segment to hold all of the data
seg = Segment(name='seg0', index=0, file_origin=self.filename)
block.segments.append(seg)
# Load spike times from each group and store in a dict, keyed
# by group number
self.spiketrains = dict()
for group in sorted(self._fetfiles.keys()):
# Load spike times
fetfile = self._fetfiles[group]
spks, features = self._load_spike_times(fetfile)
# Load cluster ids or generate
if group in self._clufiles:
clufile = self._clufiles[group]
uids = self._load_unit_id(clufile)
else:
# unclustered data, assume all zeros
uids = np.zeros(spks.shape, dtype=np.int32)
# error check
if len(spks) != len(uids):
raise ValueError("lengths of fet and clu files are different")
# Create Unit for each cluster
unique_unit_ids = np.unique(uids)
for unit_id in sorted(unique_unit_ids):
# Initialize the unit
u = Unit(name=('unit %d from group %d' % (unit_id, group)),
index=unit_id, group=group)
# Initialize a new SpikeTrain for the spikes from this unit
st = SpikeTrain(
times=spks[uids == unit_id] / self.sampling_rate,
units='sec', t_start=0.0,
t_stop=spks.max() / self.sampling_rate,
name=('unit %d from group %d' % (unit_id, group)))
st.annotations['cluster'] = unit_id
st.annotations['group'] = group
# put features in
if len(features) != 0:
st.annotations['waveform_features'] = features
# Link
u.spiketrains.append(st)
seg.spiketrains.append(st)
block.create_many_to_one_relationship()
return block
请发表评论