• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python core.SpikeTrain类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neo.core.SpikeTrain的典型用法代码示例。如果您正苦于以下问题:Python SpikeTrain类的具体用法?Python SpikeTrain怎么用?Python SpikeTrain使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SpikeTrain类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: proc_src_condition_unit_repetition

def proc_src_condition_unit_repetition(sweep, damaIndex, timeStamp, sweepLen,
                                       side, ADperiod, respWin, filename):
    '''Get the repetion for a unit in a condition in a src file that has been
    processed by the official matlab function.  See proc_src for details'''
    damaIndex = damaIndex.astype('int32')
    if len(sweep):
        times = np.array([res[0, 0] for res in sweep['time']])
        shapes = np.concatenate([res.flatten()[np.newaxis][np.newaxis] for res
                                 in sweep['shape']], axis=0)
        trig2 = np.array([res[0, 0] for res in sweep['trig2']])
    else:
        times = np.array([])
        shapes = np.array([[[]]])
        trig2 = np.array([])

    times = pq.Quantity(times, units=pq.ms, dtype=np.float32)
    t_start = pq.Quantity(0, units=pq.ms, dtype=np.float32)
    t_stop = pq.Quantity(sweepLen, units=pq.ms, dtype=np.float32)
    trig2 = pq.Quantity(trig2, units=pq.ms, dtype=np.uint8)
    waveforms = pq.Quantity(shapes, dtype=np.int8, units=pq.mV)
    sampling_period = pq.Quantity(ADperiod, units=pq.us)

    train = SpikeTrain(times=times, t_start=t_start, t_stop=t_stop,
                       trig2=trig2, dtype=np.float32, timestamp=timeStamp,
                       dama_index=damaIndex, side=side, copy=True,
                       respwin=respWin, waveforms=waveforms,
                       file_origin=filename)
    train.annotations['side'] = side
    train.sampling_period = sampling_period
    return train
开发者ID:INM-6,项目名称:python-neo,代码行数:30,代码来源:test_brainwaresrcio.py


示例2: test__construct_subsegment_by_unit

    def test__construct_subsegment_by_unit(self):
        nb_seg = 3
        nb_unit = 7
        unit_with_sig = [0, 2, 5]
        signal_types = ['Vm', 'Conductances']
        sig_len = 100

        #recordingchannelgroups
        rcgs = [ RecordingChannelGroup(name = 'Vm', channel_indexes = unit_with_sig),
                        RecordingChannelGroup(name = 'Conductance', channel_indexes = unit_with_sig), ]

        # Unit
        all_unit = [ ]
        for u in range(nb_unit):
            un = Unit(name = 'Unit #%d' % u, channel_indexes = [u])
            all_unit.append(un)

        bl = Block()
        for s in range(nb_seg):
            seg = Segment(name = 'Simulation %s' % s)
            for j in range(nb_unit):
                st = SpikeTrain([1, 2, 3], units = 'ms', t_start = 0., t_stop = 10)
                st.unit = all_unit[j]

            for t in signal_types:
                anasigarr = AnalogSignalArray( np.zeros((sig_len, len(unit_with_sig)) ), units = 'nA',
                                sampling_rate = 1000.*pq.Hz, channel_indexes = unit_with_sig )
                seg.analogsignalarrays.append(anasigarr)

        # what you want
        subseg = seg.construct_subsegment_by_unit(all_unit[:4])
开发者ID:dengemann,项目名称:python-neo,代码行数:31,代码来源:test_segment.py


示例3: _read_spiketrain

 def _read_spiketrain(self, node, parent):
     attributes = self._get_standard_attributes(node)
     t_start = self._get_quantity(node["t_start"])
     t_stop = self._get_quantity(node["t_stop"])
     # todo: handle sampling_rate, waveforms, left_sweep
     spiketrain = SpikeTrain(self._get_quantity(node["times"]),
                             t_start=t_start, t_stop=t_stop,
                             **attributes)
     spiketrain.segment = parent
     self.object_refs[node.attrs["object_ref"]] = spiketrain
     return spiketrain
开发者ID:INM-6,项目名称:python-neo,代码行数:11,代码来源:hdf5io.py


示例4: _extract_spikes

 def _extract_spikes(self, data, metadata, channel_index, lazy):
     spiketrain = None
     if lazy:
         if channel_index in data[:, 1]:
             spiketrain = SpikeTrain([], units=pq.ms, t_stop=0.0)
             spiketrain.lazy_shape = None
     else:
         spike_times = self._extract_array(data, channel_index)
         if len(spike_times) > 0:
             spiketrain = SpikeTrain(spike_times, units=pq.ms, t_stop=spike_times.max())
     if spiketrain is not None:
         spiketrain.annotate(label=metadata["label"],
                             channel_index=channel_index,
                             dt=metadata["dt"])
         return spiketrain
开发者ID:bal47,项目名称:python-neo,代码行数:15,代码来源:pynnio.py


示例5: test__construct_subsegment_by_unit

    def test__construct_subsegment_by_unit(self):
        nb_seg = 3
        nb_unit = 7
        unit_with_sig = np.array([0, 2, 5])
        signal_types = ['Vm', 'Conductances']
        sig_len = 100

        # channelindexes
        chxs = [ChannelIndex(name='Vm',
                             index=unit_with_sig),
                ChannelIndex(name='Conductance',
                             index=unit_with_sig)]

        # Unit
        all_unit = []
        for u in range(nb_unit):
            un = Unit(name='Unit #%d' % u, channel_indexes=np.array([u]))
            assert_neo_object_is_compliant(un)
            all_unit.append(un)

        blk = Block()
        blk.channel_indexes = chxs
        for s in range(nb_seg):
            seg = Segment(name='Simulation %s' % s)
            for j in range(nb_unit):
                st = SpikeTrain([1, 2], units='ms',
                                t_start=0., t_stop=10)
                st.unit = all_unit[j]

            for t in signal_types:
                anasigarr = AnalogSignal(np.zeros((sig_len,
                                                        len(unit_with_sig))),
                                              units='nA',
                                              sampling_rate=1000.*pq.Hz,
                                              channel_indexes=unit_with_sig)
                seg.analogsignals.append(anasigarr)

        blk.create_many_to_one_relationship()
        for unit in all_unit:
            assert_neo_object_is_compliant(unit)
        for chx in chxs:
            assert_neo_object_is_compliant(chx)
        assert_neo_object_is_compliant(blk)

        # what you want
        newseg = seg.construct_subsegment_by_unit(all_unit[:4])
        assert_neo_object_is_compliant(newseg)
开发者ID:msenoville,项目名称:python-neo,代码行数:47,代码来源:test_segment.py


示例6: read_spiketrain

    def read_spiketrain(self ,
                                            # the 2 first key arguments are imposed by neo.io API
                                            lazy = False,
                                            cascade = True,

                                                segment_duration = 15.,
                                                t_start = -1,
                                                channel_index = 0,
                                                ):
        """
        With this IO SpikeTrain can e acces directly with its channel number
        """
        # There are 2 possibles behaviour for a SpikeTrain
        # holding many Spike instance or directly holding spike times
        # we choose here the first :
        if not HAVE_SCIPY:
            raise SCIPY_ERR

        num_spike_by_spiketrain = 40
        sr = 10000.

        if lazy:
            times = [ ]
        else:
            times = (np.random.rand(num_spike_by_spiketrain)*segment_duration +
                     t_start)

        # create a spiketrain
        spiketr = SpikeTrain(times, t_start = t_start*pq.s, t_stop = (t_start+segment_duration)*pq.s ,
                                            units = pq.s,
                                            name = 'it is a spiketrain from exampleio',
                                            )

        if lazy:
            # we add the attribute lazy_shape with the size if loaded
            spiketr.lazy_shape = (num_spike_by_spiketrain,)

        # ours spiketrains also hold the waveforms:

        # 1 generate a fake spike shape (2d array if trodness >1)
        w1 = -stats.nct.pdf(np.arange(11,60,4), 5,20)[::-1]/3.
        w2 = stats.nct.pdf(np.arange(11,60,2), 5,20)
        w = np.r_[ w1 , w2 ]
        w = -w/max(w)

        if not lazy:
            # in the neo API the waveforms attr is 3 D in case tetrode
            # in our case it is mono electrode so dim 1 is size 1
            waveforms  = np.tile( w[np.newaxis,np.newaxis,:], ( num_spike_by_spiketrain ,1, 1) )
            waveforms *=  np.random.randn(*waveforms.shape)/6+1
            spiketr.waveforms = waveforms*pq.mV
            spiketr.sampling_rate = sr * pq.Hz
            spiketr.left_sweep = 1.5* pq.s

        # for attributes out of neo you can annotate
        spiketr.annotate(channel_index = channel_index)

        return spiketr
开发者ID:CINPLA,项目名称:python-neo,代码行数:58,代码来源:exampleio.py


示例7: create_all_annotated

    def create_all_annotated(cls):
        times = cls.rquant(1, pq.s)
        signal = cls.rquant(1, pq.V)
        blk = Block()
        blk.annotate(**cls.rdict(3))

        seg = Segment()
        seg.annotate(**cls.rdict(4))
        blk.segments.append(seg)

        asig = AnalogSignal(signal=signal, sampling_rate=pq.Hz)
        asig.annotate(**cls.rdict(2))
        seg.analogsignals.append(asig)

        isig = IrregularlySampledSignal(times=times, signal=signal,
                                        time_units=pq.s)
        isig.annotate(**cls.rdict(2))
        seg.irregularlysampledsignals.append(isig)

        epoch = Epoch(times=times, durations=times)
        epoch.annotate(**cls.rdict(4))
        seg.epochs.append(epoch)

        event = Event(times=times)
        event.annotate(**cls.rdict(4))
        seg.events.append(event)

        spiketrain = SpikeTrain(times=times, t_stop=pq.s, units=pq.s)
        d = cls.rdict(6)
        d["quantity"] = pq.Quantity(10, "mV")
        d["qarray"] = pq.Quantity(range(10), "mA")
        spiketrain.annotate(**d)
        seg.spiketrains.append(spiketrain)

        chx = ChannelIndex(name="achx", index=[1, 2], channel_ids=[0, 10])
        chx.annotate(**cls.rdict(5))
        blk.channel_indexes.append(chx)

        unit = Unit()
        unit.annotate(**cls.rdict(2))
        chx.units.append(unit)

        return blk
开发者ID:theunissenlab,项目名称:python-neo,代码行数:43,代码来源:test_nixio.py


示例8: read_segment

    def read_segment(self,
                            lazy = False,
                            cascade = True,
                            delimiter = '\t',
                            t_start = 0.*pq.s,
                            unit = pq.s,
                            ):
        """
        Arguments:
            delimiter  :  columns delimiter in file  '\t' or one space or two space or ',' or ';'
            t_start : time start of all spiketrain 0 by default
            unit : unit of spike times, can be a str or directly a Quantities
        """
        unit = pq.Quantity(1, unit)

        seg = Segment(file_origin = os.path.basename(self.filename))
        if not cascade:
            return seg

        f = open(self.filename, 'Ur')
        for i,line in enumerate(f) :
            alldata = line[:-1].split(delimiter)
            if alldata[-1] == '': alldata = alldata[:-1]
            if alldata[0] == '': alldata = alldata[1:]
            if lazy:
                spike_times = [ ]
                t_stop = t_start
            else:
                spike_times = np.array(alldata).astype('f')
                t_stop = spike_times.max()*unit

            sptr = SpikeTrain(spike_times*unit, t_start=t_start, t_stop=t_stop)
            if lazy:
                sptr.lazy_shape = len(alldata)

            sptr.annotate(channel_index = i)
            seg.spiketrains.append(sptr)
        f.close()

        seg.create_many_to_one_relationship()
        return seg
开发者ID:ChrisNolan1992,项目名称:python-neo,代码行数:41,代码来源:asciispiketrainio.py


示例9: test_spiketrain_write

    def test_spiketrain_write(self):
        block = Block()
        seg = Segment()
        block.segments.append(seg)

        spiketrain = SpikeTrain(times=[3, 4, 5]*pq.s, t_stop=10.0,
                                name="spikes!", description="sssssspikes")
        seg.spiketrains.append(spiketrain)
        self.write_and_compare([block])

        waveforms = self.rquant((3, 5, 10), pq.mV)
        spiketrain = SpikeTrain(times=[1, 1.1, 1.2]*pq.ms, t_stop=1.5*pq.s,
                                name="spikes with wf",
                                description="spikes for waveform test",
                                waveforms=waveforms)

        seg.spiketrains.append(spiketrain)
        self.write_and_compare([block])

        spiketrain.left_sweep = np.random.random(10)*pq.ms
        self.write_and_compare([block])
开发者ID:theunissenlab,项目名称:python-neo,代码行数:21,代码来源:test_nixio.py


示例10: __save_segment

    def __save_segment(self):
        '''
        Write the segment to the Block if it exists
        '''
        # if this is the beginning of the first condition, then we don't want
        # to save, so exit
        # but set __seg from None to False so we know next time to create a
        # segment even if there are no spike in the condition
        if self.__seg is None:
            self.__seg = False
            return

        if not self.__seg:
            # create dummy values if there are no SpikeTrains in this condition
            self.__seg = Segment(file_origin=self._filename,
                                 **self.__params)
            self.__spiketimes = []

        if self.__lazy:
            train = SpikeTrain(pq.Quantity([], dtype=np.float32,
                                           units=pq.ms),
                               t_start=0*pq.ms, t_stop=self.__t_stop * pq.ms,
                               file_origin=self._filename)
            train.lazy_shape = len(self.__spiketimes)
        else:
            times = pq.Quantity(self.__spiketimes, dtype=np.float32,
                                units=pq.ms)
            train = SpikeTrain(times,
                               t_start=0*pq.ms, t_stop=self.__t_stop * pq.ms,
                               file_origin=self._filename)

        self.__seg.spiketrains = [train]
        self.__unit.spiketrains.append(train)
        self._blk.segments.append(self.__seg)

        # set an empty segment
        # from now on, we need to set __seg to False rather than None so
        # that if there is a condition with no SpikeTrains we know
        # to create an empty Segment
        self.__seg = False
开发者ID:NeuroArchive,项目名称:python-neo,代码行数:40,代码来源:brainwaref32io.py


示例11: _handle_processing_group

 def _handle_processing_group(self, block):
     # todo: handle other modules than Units
     units_group = self._file.get('processing/Units/UnitTimes')
     segment_map = dict((segment.name, segment) for segment in block.segments)
     for name, group in units_group.items():
         if name == 'unit_list':
             pass  # todo
         else:
             segment_name = group['source'].value
             #desc = group['unit_description'].value  # use this to store Neo Unit id?
             segment = segment_map[segment_name]
             if self._lazy:
                 times = np.array(())
                 lazy_shape = group['times'].shape
             else:
                 times = group['times'].value
             spiketrain = SpikeTrain(times, units=pq.second,
                                     t_stop=group['t_stop'].value*pq.second)  # todo: this is a custom Neo value, general NWB files will not have this - use segment.t_stop instead in that case?
             if self._lazy:
                 spiketrain.lazy_shape = lazy_shape
             spiketrain.segment = segment
             segment.spiketrains.append(spiketrain)
开发者ID:MartinHeroux,项目名称:ScientificallySound_files,代码行数:22,代码来源:nwbio_LOCAL_4246.py


示例12: read_segment


#.........这里部分代码省略.........
                        # print('sortcode updated')
                        break
            except OSError:
                sortresult_filename = None
            except IOError:
                sortresult_filename = None


        for type_code, type_label in tdt_event_type:
            mask1 = tsq['evtype']==type_code
            codes = np.unique(tsq[mask1]['code'])

            for code in codes:
                mask2 = mask1 & (tsq['code']==code)
                channels = np.unique(tsq[mask2]['channel'])

                for channel in channels:
                    mask3 = mask2 & (tsq['channel']==channel)

                    if type_label in ['EVTYPE_STRON', 'EVTYPE_STROFF']:
                        if lazy:
                            times = [ ]*pq.s
                            labels = np.array([ ], dtype=str)
                        else:
                            times = (tsq[mask3]['timestamp'] - global_t_start) * pq.s
                            labels = tsq[mask3]['eventoffset'].view('float64').astype('S')
                        ea = Event(times=times,
                                   name=code,
                                   channel_index=int(channel),
                                   labels=labels)
                        if lazy:
                            ea.lazy_shape = np.sum(mask3)
                        seg.events.append(ea)

                    elif type_label == 'EVTYPE_SNIP':
                        sortcodes = np.unique(tsq[mask3]['sortcode'])
                        for sortcode in sortcodes:
                            mask4 = mask3 & (tsq['sortcode']==sortcode)
                            nb_spike = np.sum(mask4)
                            sr = tsq[mask4]['frequency'][0]
                            waveformsize = tsq[mask4]['size'][0]-10
                            if lazy:
                                times = [ ]*pq.s
                                waveforms = None
                            else:
                                times = (tsq[mask4]['timestamp'] - global_t_start) * pq.s
                                dt = np.dtype(data_formats[ tsq[mask3]['dataformat'][0]])
                                waveforms = get_chunks(tsq[mask4]['size'],tsq[mask4]['eventoffset'], tev_array).view(dt)
                                waveforms = waveforms.reshape(nb_spike, -1, waveformsize)
                                waveforms = waveforms * pq.mV
                            if nb_spike > 0:
                             #   t_start = (tsq['timestamp'][0] - global_t_start) * pq.s # this hould work but not
                                t_start = 0 *pq.s
                                t_stop = (tsq['timestamp'][-1] - global_t_start) * pq.s

                            else:
                                t_start = 0 *pq.s
                                t_stop = 0 *pq.s
                            st = SpikeTrain(times           = times,
                                            name            = 'Chan{0} Code{1}'.format(channel,sortcode),
                                            t_start         = t_start,
                                            t_stop          = t_stop,
                                            waveforms       = waveforms,
                                            left_sweep      = waveformsize/2./sr * pq.s,
                                            sampling_rate   = sr * pq.Hz,
                                            )
                            st.annotate(channel_index=channel)
                            if lazy:
                                st.lazy_shape = nb_spike
                            seg.spiketrains.append(st)

                    elif type_label == 'EVTYPE_STREAM':
                        dt = np.dtype(data_formats[ tsq[mask3]['dataformat'][0]])
                        shape = np.sum(tsq[mask3]['size']-10)
                        sr = tsq[mask3]['frequency'][0]
                        if lazy:
                            signal = [ ]
                        else:
                            if PY3K:
                                signame = code.decode('ascii')
                            else:
                                signame = code
                            sev_filename = os.path.join(subdir, tankname+'_'+blockname+'_'+signame+'_ch'+str(channel)+'.sev')
                            try:
                                #sig_array = np.memmap(sev_filename, mode = 'r', dtype = 'uint8') # if memory problem use this instead
                                sig_array = np.fromfile(sev_filename, dtype='uint8')
                            except IOError:
                                sig_array = tev_array
                            signal = get_chunks(tsq[mask3]['size'],tsq[mask3]['eventoffset'],  sig_array).view(dt)

                        anasig = AnalogSignal(signal        = signal* pq.V,
                                              name          = '{0} {1}'.format(code, channel),
                                              sampling_rate = sr * pq.Hz,
                                              t_start       = (tsq[mask3]['timestamp'][0] - global_t_start) * pq.s,
                                              channel_index = int(channel)
                                              )
                        if lazy:
                            anasig.lazy_shape = shape
                        seg.analogsignals.append(anasig)
        return seg
开发者ID:SummitKwan,项目名称:python-neo,代码行数:101,代码来源:tdtio.py


示例13: read_segment


#.........这里部分代码省略.........
                    unit = dataBlockHeader['Unit']
                    pos = pos_spikes[chan,unit]
                    stimearrays[chan, unit][pos] = time
                    if load_spike_waveform and n1*n2 != 0 :
                        swfarrays[chan,unit][pos,:,:] = np.fromstring( fid.read(n1*n2*2) , dtype = 'i2').reshape(n1,n2).astype('f4')
                    else:
                        fid.seek(n1*n2*2,1)
                    pos_spikes[chan,unit] +=1
                
                elif dataBlockHeader['Type'] == 4:
                    # event
                    pos = eventpositions[chan]
                    evarrays[chan]['times'][pos] = time
                    evarrays[chan]['labels'][pos] = dataBlockHeader['Unit']
                    eventpositions[chan]+= 1

                elif dataBlockHeader['Type'] == 5:
                    #signal
                    data = np.fromstring( fid.read(n2*2) , dtype = 'i2').astype('f4')
                    sigarrays[chan][sample_positions[chan] : sample_positions[chan]+data.size] = data
                    sample_positions[chan] += data.size


        ## Step 4: create neo object
        for chan, h in iteritems(eventHeaders):
            if lazy:
                times = []
                labels = None
            else:
                times = evarrays[chan]['times']
                labels = evarrays[chan]['labels']
            ea = EventArray(
                times*pq.s,
                labels=labels,
                channel_name=eventHeaders[chan]['Name'],
                channel_index=chan
            )
            if lazy:
                ea.lazy_shape = nb_events[chan]
            seg.eventarrays.append(ea)

            
        for chan, h in iteritems(slowChannelHeaders):
            if lazy:
                signal = [ ]
            else:
                if globalHeader['Version'] ==100 or globalHeader['Version'] ==101 :
                    gain = 5000./(2048*slowChannelHeaders[chan]['Gain']*1000.)
                elif globalHeader['Version'] ==102 :
                    gain = 5000./(2048*slowChannelHeaders[chan]['Gain']*slowChannelHeaders[chan]['PreampGain'])
                elif globalHeader['Version'] >= 103:
                    gain = globalHeader['SlowMaxMagnitudeMV']/(.5*(2**globalHeader['BitsPerSpikeSample'])*\
                                                        slowChannelHeaders[chan]['Gain']*slowChannelHeaders[chan]['PreampGain'])
                signal = sigarrays[chan]*gain
            anasig =  AnalogSignal(signal*pq.V,
                sampling_rate = float(slowChannelHeaders[chan]['ADFreq'])*pq.Hz,
                t_start = t_starts[chan]*pq.s,
                channel_index = slowChannelHeaders[chan]['Channel'],
                channel_name = slowChannelHeaders[chan]['Name'],
            )
            if lazy:
                anasig.lazy_shape = nb_samples[chan]
            seg.analogsignals.append(anasig)
            
        for (chan, unit), value in np.ndenumerate(nb_spikes):
            if nb_spikes[chan, unit] == 0: continue
            if lazy:
                times = [ ]
                waveforms = None
                t_stop = 0
            else:
                times = stimearrays[chan,unit]
                t_stop = times.max()
                if load_spike_waveform:
                    if globalHeader['Version'] <103:
                        gain = 3000./(2048*dspChannelHeaders[chan]['Gain']*1000.)
                    elif globalHeader['Version'] >=103 and globalHeader['Version'] <105:
                        gain = globalHeader['SpikeMaxMagnitudeMV']/(.5*2.**(globalHeader['BitsPerSpikeSample'])*1000.)
                    elif globalHeader['Version'] >105:
                        gain = globalHeader['SpikeMaxMagnitudeMV']/(.5*2.**(globalHeader['BitsPerSpikeSample'])*globalHeader['SpikePreAmpGain'])                    
                    waveforms = swfarrays[chan, unit] * gain * pq.V
                else:
                    waveforms = None
            sptr = SpikeTrain(
                times,
                units='s', 
                t_stop=t_stop*pq.s,
                waveforms=waveforms
            )
            sptr.annotate(unit_name = dspChannelHeaders[chan]['Name'])
            sptr.annotate(channel_index = chan)
            for key, val in dspChannelHeaders[chan].iteritems():
                sptr.annotate(**{key: val})

            if lazy:
                sptr.lazy_shape = nb_spikes[chan,unit]
            seg.spiketrains.append(sptr)

        seg.create_many_to_one_relationship()
        return seg
开发者ID:bal47,项目名称:python-neo,代码行数:101,代码来源:plexonio.py


示例14: test__issue_285

    def test__issue_285(self):
        ##Spiketrain
        train = SpikeTrain([3, 4, 5] * pq.s, t_stop=10.0)
        unit = Unit()
        train.unit = unit
        unit.spiketrains.append(train)

        epoch = Epoch([0, 10, 20], [2, 2, 2], ["a", "b", "c"], units="ms")

        blk = Block()
        seg = Segment()
        seg.spiketrains.append(train)
        seg.epochs.append(epoch)
        epoch.segment = seg
        blk.segments.append(seg)

        reader = PickleIO(filename="blk.pkl")
        reader.write(blk)

        reader = PickleIO(filename="blk.pkl")
        r_blk = reader.read_block()
        r_seg = r_blk.segments[0]
        self.assertIsInstance(r_seg.spiketrains[0].unit, Unit)
        self.assertIsInstance(r_seg.epochs[0], Epoch)
        os.remove('blk.pkl')
        ##Epoch
        train = Epoch(times=np.arange(0, 30, 10)*pq.s,durations=[10, 5, 7]*pq.ms,labels=np.array(['btn0', 'btn1', 'btn2'], dtype='S'))
        train.segment = Segment()
        unit = Unit()
        unit.spiketrains.append(train)
        blk = Block()
        seg = Segment()
        seg.spiketrains.append(train)
        blk.segments.append(seg)

        reader = PickleIO(filename="blk.pkl")
        reader.write(blk)

        reader = PickleIO(filename="blk.pkl")
        r_blk = reader.read_block()
        r_seg = r_blk.segments[0]
        self.assertIsInstance(r_seg.spiketrains[0].segment, Segment)
        os.remove('blk.pkl')
        ##Event
        train = Event(np.arange(0, 30, 10)*pq.s,labels=np.array(['trig0', 'trig1', 'trig2'],dtype='S'))
        train.segment = Segment()
        unit = Unit()
        unit.spiketrains.append(train)

        blk = Block()
        seg = Segment()
        seg.spiketrains.append(train)
        blk.segments.append(seg)

        reader = PickleIO(filename="blk.pkl")
        reader.write(blk)

        reader = PickleIO(filename="blk.pkl")
        r_blk = reader.read_block()
        r_seg = r_blk.segments[0]
        self.assertIsInstance(r_seg.spiketrains[0].segment, Segment)
        os.remove('blk.pkl')
        ##IrregularlySampledSignal
        train =  IrregularlySampledSignal([0.0, 1.23, 6.78], [1, 2, 3],units='mV', time_units='ms')
        train.segment = Segment()
        unit = Unit()
        train.channel_index = ChannelIndex(1)
        unit.spiketrains.append(train)

        blk = Block()
        seg = Segment()
        seg.spiketrains.append(train)
        blk.segments.append(seg)
        blk.segments[0].block = blk

        reader = PickleIO(filename="blk.pkl")
        reader.write(blk)

        reader = PickleIO(filename="blk.pkl")
        r_blk = reader.read_block()
        r_seg = r_blk.segments[0]
        self.assertIsInstance(r_seg.spiketrains[0].segment, Segment)
        self.assertIsInstance(r_seg.spiketrains[0].channel_index, ChannelIndex)
        os.remove('blk.pkl')
开发者ID:mpsonntag,项目名称:python-neo,代码行数:84,代码来源:test_pickleio.py


示例15: read_block

    def read_block(self, lazy=False):
        """Returns a Block containing spike information.

        There is no obvious way to infer the segment boundaries from
        raw spike times, so for now all spike times are returned in one
        big segment. The way around this would be to specify the segment
        boundaries, and then change this code to put the spikes in the right
        segments.
        """
        assert not lazy, 'Do not support lazy'

        # Create block and segment to hold all the data
        block = Block()
        # Search data directory for KlustaKwik files.
        # If nothing found, return empty block
        self._fetfiles = self._fp.read_filenames('fet')
        self._clufiles = self._fp.read_filenames('clu')
        if len(self._fetfiles) == 0:
            return block

        # Create a single segment to hold all of the data
        seg = Segment(name='seg0', index=0, file_origin=self.filename)
        block.segments.append(seg)

        # Load spike times from each group and store in a dict, keyed
        # by group number
        self.spiketrains = dict()
        for group in sorted(self._fetfiles.keys()):
            # Load spike times
            fetfile = self._fetfiles[group]
            spks, features = self._load_spike_times(fetfile)

            # Load cluster ids or generate
            if group in self._clufiles:
                clufile = self._clufiles[group]
                uids = self._load_unit_id(clufile)
            else:
                # unclustered data, assume all zeros
                uids = np.zeros(spks.shape, dtype=np.int32)

            # error check
            if len(spks) != len(uids):
                raise ValueError("lengths of fet and clu files are different")

            # Create Unit for each cluster
            unique_unit_ids = np.unique(uids)
            for unit_id in sorted(unique_unit_ids):
                # Initialize the unit
                u = Unit(name=('unit %d from group %d' % (unit_id, group)),
                         index=unit_id, group=group)

                # Initialize a new SpikeTrain for the spikes from this unit
                st = SpikeTrain(
                    times=spks[uids == unit_id] / self.sampling_rate,
                    units='sec', t_start=0.0,
                    t_stop=spks.max() / self.sampling_rate,
                    name=('unit %d from group %d' % (unit_id, group)))
                st.annotations['cluster'] = unit_id
                st.annotations['group'] = group

                # put features in
                if len(features) != 0:
                    st.annotations['waveform_features'] = features

                # Link
                u.spiketrains.append(st)
                seg.spiketrains.append(st)

        block.create_many_to_one_relationship()
        return block
开发者ID:INM-6,项目名称:python-neo,代码行数:70,代码来源:klustakwikio.py


示例16: read_one_channel_event_or_spike

    def read_one_channel_event_or_spike(self, fid, channel_num, header,
                                        lazy=True):
        # return SPikeTrain or Event
        channelHeader = header.channelHeaders[channel_num]
        if channelHeader.firstblock < 0:
            return
        if channelHeader.kind not in [2, 3, 4, 5, 6, 7, 8]:
            return

        # # Step 1 : type of blocks
        if channelHeader.kind in [2, 3, 4]:
            # Event data
            fmt = [('tick', 'i4')]
        elif channelHeader.kind in [5]:
            # Marker data
            fmt = [('tick', 'i4'), ('marker', 'i4')]
        elif channelHeader.kind in [6]:
            # AdcMark data
            fmt = [('tick', 'i4'), ('marker', 'i4'),
                   ('adc', 'S%d' % channelHeader.n_extra)]
        elif channelHeader.kind in [7]:
            #  RealMark data
            fmt = [('tick', 'i4'), ('marker', 'i4'),
                   ('real', 'S%d' % channelHeader.n_extra)]
        elif channelHeader.kind in [8]:
            # TextMark data
            fmt = [('tick', 'i4'), ('marker', 'i4'),
                   ('label', 'S%d' % channelHeader.n_extra)]
        dt = np.dtype(fmt)

        ## Step 2 : first read for allocating mem
        fid.seek(channelHeader.firstblock)
        totalitems = 0
        for _ in range(channelHeader.blocks):
            blockHeader = HeaderReader(fid, np.dtype(blockHeaderDesciption))
            totalitems += blockHeader.items
            if blockHeader.succ_block > 0:
                fid.seek(blockHeader.succ_block)
        #~ print 'totalitems' , totalitems

        if lazy:
            if channelHeader.kind in [2, 3, 4, 5, 8]:
                ea = Event()
                ea.annotate(channel_index=channel_num)
                ea.lazy_shape = totalitems
                return ea

            elif channelHeader.kind in [6, 7]:
                # correct value for t_stop to be put in later
                sptr = SpikeTrain([] * pq.s, t_stop=1e99)
                sptr.annotate(channel_index=channel_num, ced_unit = 0)
                sptr.lazy_shape = totalitems
                return sptr
        else:
            alltrigs = np.zeros(totalitems, dtype=dt)
            ## Step 3 : read
            fid.seek(channelHeader.firstblock)
            pos = 0
            for _ in range(channelHeader.blocks):
                blockHeader = HeaderReader(
                    fid, np.dtype(blockHeaderDesciption))
                # read all events in block
                trigs = np.fromstring(
                    fid.read(blockHeader.items * dt.itemsize), dtype=dt)

                alltrigs[pos:pos + trigs.size] = trigs
                pos += trigs.size
                if blockHeader.succ_block > 0:
                    fid.seek(blockHeader.succ_block)

            ## Step 3 convert in neo standard class: eventarrays or spiketrains
            alltimes = alltrigs['tick'].astype(
                'f') * header.us_per_time * header.dtime_base * pq.s

            if channelHeader.kind in [2, 3, 4, 5, 8]:
                #events
                ea = Event(alltimes)
                ea.annotate(channel_index=channel_num)
                if channelHeader.kind >= 5:
                    # Spike2 marker is closer to label sens of neo
                    ea.labels = alltrigs['marker'].astype('S32')
                if channelHeader.kind == 8:
                    ea.annotate(extra_labels=alltrigs['label'])
                return ea

            elif channelHeader.kind in [6, 7]:
                # spiketrains

                # waveforms
                if channelHeader.kind == 6:
                    waveforms = np.fromstring(alltrigs['adc'].tostring(),
                                              dtype='i2')
                    waveforms = waveforms.astype(
                        'f4') * channelHeader.scale / 6553.6 + \
                        channelHeader.offset
                elif channelHeader.kind == 7:
                    waveforms = np.fromstring(alltrigs['real'].tostring(),
                                              dtype='f4')

                if header.system_id >= 6 and channelHeader.interleave > 1:
#.........这里部分代码省略.........
开发者ID:MartinHeroux,项目名称:ScientificallySound_files,代码行数:101,代码来源:spike2io.py


示例17: read_nev


#.........这里部分代码省略.........
        dt_trig =  [('samplepos', 'uint32'),
                    ('id', 'uint16'), 
                    ('reason', 'uint8'), 
                    ('reserved0', 'uint8'), 
                    ('digital_port', 'uint16'), 
                    ('reserved1', 'S{0}'.format(h['packet_size']-10)),
                ]
        data_trigger = data.view(dt_trig)[mask]
        # Digital Triggers (PaquetID 0)
        is_digital = (data_trigger ['reason']&1)>0
        create_event_array_trig_or_analog(is_digital, 'Digital trigger', labelmode =  'digital_port' )
        
        # Analog Triggers (PaquetID 0)
        if version in ['2.1', '2.2' ]:
            for i in range(5):
                is_analog = (data_trigger ['reason']&(2**(i+1)))>0
                create_event_array_trig_or_analog(is_analog, 'Analog trigger {0}'.format(i), labelmode = None)
        
        # Comments
        mask = (data['id']==0xFFF) 
        dt_comments = [('samplepos', 'uint32'),
                    ('id', 'uint16'), 
                    ('charset', 'uint8'), 
                    ('reserved0', 'uint8'), 
                    ('color', 'uint32'), 
                    ('comment', 'S{0}'.format(h['packet_size']-12)),
                ]
        data_comments = data.view(dt_comments)[mask]
        if data_comments.size>0:
            if lazy:
                times = [ ]
                labels = [ ]
            else:
                times = data_comments['samplepos'].astype(float)/sr
                labels = data_comments['comment'].astype('S')
            ev = EventArray(times= times*pq.s,
                            labels= labels,
                            name='Comments')
            if lazy:
                ev.lazy_shape = np.sum(is_digital)
            seg.eventarrays.append(ev)
        
        
        # READ Spike channel
        channel_ids = all_ids[(all_ids>0) & (all_ids<=2048)]

        # get the dtype of waveform (this is stupidly complicated)
        if nev_header['additionnal_flag']&0x1:
            #dtype_waveforms = { k:'int16' for k in channel_ids }
            dtype_waveforms = dict( (k,'int16') for k in channel_ids)
        else:
            # there is a code electrodes by electrodes given the approiate dtype
            neuewav_header = ext_header['NEUEVWAV']
            dtype_waveform = dict(zip(neuewav_header['channel_id'], neuewav_header['num_bytes_per_waveform']))
            dtypes_conv = { 0: 'int8', 1 : 'int8', 2: 'int16', 4 : 'int32' }
            #dtype_waveforms = { k:dtypes_conv[v] for k,v in dtype_waveform.items() }
            dtype_waveforms = dict( (k,dtypes_conv[v]) for k,v in dtype_waveform.items() )
        
        dt2 =   [('samplepos', 'uint32'),
                    ('id', 'uint16'), 
                    ('cluster', 'uint8'), 
                    ('reserved0', 'uint8'), 
                    ('waveform','uint8',(h['packet_size']-8, )),
                ]
        data_spike = data.view(dt2)
        
        for channel_id in channel_ids:
            data_spike_chan = data_spike[data['id']==channel_id]
            cluster_ids = np.unique(data_spike_chan['cluster'])
            for cluster_id in cluster_ids:
                if cluster_id==0: 
                    name =  'unclassified'
                elif cluster_id==255:
                    name =  'noise'
                else:
                    name = 'Cluster {0}'.format(cluster_id)
                name = 'Channel {0} '.format(channel_id)+name
                
                data_spike_chan_clus = data_spike_chan[data_spike_chan['cluster']==cluster_id]
                n_spike = data_spike_chan_clus.size
                waveforms, w_sampling_rate, left_sweep = None, None, None
                if lazy:
                    times = [ ]
                else:
                    times = data_spike_chan_clus['samplepos'].astype(float)/sr
                    if load_waveforms:
                        dtype_waveform = dtype_waveforms[channel_id]
                        waveform_size = (h['packet_size']-8)/np.dtype(dtype_waveform).itemsize
                        waveforms = data_spike_chan_clus['waveform'].flatten().view(dtype_waveform)
                        waveforms = waveforms.reshape(n_spike,1, waveform_size)
                        waveforms =waveforms*pq.uV
                        w_sampling_rate = wsr*pq.Hz
                        left_sweep = waveform_size//2/sr*pq.s
                st = SpikeTrain(times =  times*pq.s, name = name,
                                t_start = t_start, t_stop =t_stop,
                                waveforms = waveforms, sampling_rate = w_sampling_rate, left_sweep = left_sweep)
                st.annotate(channel_index = int(channel_id))
                if lazy:
                    st.lazy_shape = n_spike
                seg.spiketrains.append(st)
开发者ID:guangxingli,项目名称:python-neo,代码行数:101,代码来源:blackrockio.py


示例18: read_block

该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python analogsignal.AnalogSignal类代码示例发布时间:2022-05-27
下一篇:
Python core.Segment类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap