本文整理汇总了Python中mne.make_fixed_length_events函数的典型用法代码示例。如果您正苦于以下问题:Python make_fixed_length_events函数的具体用法?Python make_fixed_length_events怎么用?Python make_fixed_length_events使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了make_fixed_length_events函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_make_fixed_length_events
def test_make_fixed_length_events():
"""Test making events of a fixed length
"""
raw = io.Raw(raw_fname)
events = make_fixed_length_events(raw, id=1)
assert_true(events.shape[1], 3)
tmin, tmax = raw.times[[0, -1]]
duration = tmax - tmin
events = make_fixed_length_events(raw, 1, tmin, tmax, duration)
assert_equal(events.shape[0], 1)
开发者ID:The3DWizard,项目名称:mne-python,代码行数:10,代码来源:test_event.py
示例2: test_make_fixed_length_events
def test_make_fixed_length_events():
"""Test making events of a fixed length."""
raw = read_raw_fif(raw_fname)
events = make_fixed_length_events(raw, id=1)
assert events.shape[1] == 3
events_zero = make_fixed_length_events(raw, 1, first_samp=False)
assert_equal(events_zero[0, 0], 0)
assert_array_equal(events_zero[:, 0], events[:, 0] - raw.first_samp)
# With limits
tmin, tmax = raw.times[[0, -1]]
duration = tmax - tmin
events = make_fixed_length_events(raw, 1, tmin, tmax, duration)
assert_equal(events.shape[0], 1)
# With bad limits (no resulting events)
pytest.raises(ValueError, make_fixed_length_events, raw, 1,
tmin, tmax - 1e-3, duration)
# not raw, bad id or duration
pytest.raises(TypeError, make_fixed_length_events, raw, 2.3)
pytest.raises(TypeError, make_fixed_length_events, 'not raw', 2)
pytest.raises(TypeError, make_fixed_length_events, raw, 23, tmin, tmax,
'abc')
# Let's try some ugly sample rate/sample count combos
data = np.random.RandomState(0).randn(1, 27768)
# This breaks unless np.round() is used in make_fixed_length_events
info = create_info(1, 155.4499969482422)
raw = RawArray(data, info)
events = make_fixed_length_events(raw, 1, duration=raw.times[-1])
assert events[0, 0] == 0
assert len(events) == 1
# Without use_rounding=True this breaks
raw = RawArray(data[:, :21216], info)
events = make_fixed_length_events(raw, 1, duration=raw.times[-1])
assert events[0, 0] == 0
assert len(events) == 1
# Make sure it gets used properly by compute_raw_covariance
cov = compute_raw_covariance(raw, tstep=None)
expected = np.cov(data[:, :21216])
np.testing.assert_allclose(cov['data'], expected, atol=1e-12)
# overlaps
events = make_fixed_length_events(raw, 1, duration=1)
assert len(events) == 136
events_ol = make_fixed_length_events(raw, 1, duration=1, overlap=0.5)
assert len(events_ol) == 271
events_ol_2 = make_fixed_length_events(raw, 1, duration=1, overlap=0.9)
assert len(events_ol_2) == 1355
assert_array_equal(events_ol_2[:, 0], np.unique(events_ol_2[:, 0]))
with pytest.raises(ValueError, match='overlap must be'):
make_fixed_length_events(raw, 1, duration=1, overlap=1.1)
开发者ID:kambysese,项目名称:mne-python,代码行数:53,代码来源:test_event.py
示例3: test_make_fixed_length_events
def test_make_fixed_length_events():
"""Test making events of a fixed length"""
raw = io.read_raw_fif(raw_fname)
events = make_fixed_length_events(raw, id=1)
assert_true(events.shape[1], 3)
events_zero = make_fixed_length_events(raw, 1, first_samp=False)
assert_equal(events_zero[0, 0], 0)
assert_array_equal(events_zero[:, 0], events[:, 0] - raw.first_samp)
# With limits
tmin, tmax = raw.times[[0, -1]]
duration = tmax - tmin
events = make_fixed_length_events(raw, 1, tmin, tmax, duration)
assert_equal(events.shape[0], 1)
# With bad limits (no resulting events)
assert_raises(ValueError, make_fixed_length_events, raw, 1,
tmin, tmax - 1e-3, duration)
开发者ID:YashAgarwal,项目名称:mne-python,代码行数:16,代码来源:test_event.py
示例4: test_cov_ctf
def test_cov_ctf():
"""Test basic cov computation on ctf data with/without compensation."""
raw = read_raw_ctf(ctf_fname).crop(0., 2.).load_data()
events = make_fixed_length_events(raw, 99999)
assert len(events) == 2
ch_names = [raw.info['ch_names'][pick]
for pick in pick_types(raw.info, meg=True, eeg=False,
ref_meg=False)]
for comp in [0, 1]:
raw.apply_gradient_compensation(comp)
epochs = Epochs(raw, events, None, -0.2, 0.2, preload=True)
with pytest.warns(RuntimeWarning, match='Too few samples'):
noise_cov = compute_covariance(epochs, tmax=0.,
method=['empirical'])
prepare_noise_cov(noise_cov, raw.info, ch_names)
raw.apply_gradient_compensation(0)
epochs = Epochs(raw, events, None, -0.2, 0.2, preload=True)
with pytest.warns(RuntimeWarning, match='Too few samples'):
noise_cov = compute_covariance(epochs, tmax=0., method=['empirical'])
raw.apply_gradient_compensation(1)
# TODO This next call in principle should fail.
prepare_noise_cov(noise_cov, raw.info, ch_names)
# make sure comps matrices was not removed from raw
assert raw.info['comps'], 'Comps matrices removed'
开发者ID:jhouck,项目名称:mne-python,代码行数:28,代码来源:test_cov.py
示例5: raw_epochs_events
def raw_epochs_events():
"""Create raw, epochs, and events for tests."""
raw = read_raw_fif(raw_fname).set_eeg_reference(projection=True).crop(0, 3)
raw = maxwell_filter(raw, regularize=None) # heavily reduce the rank
assert raw.info['bads'] == [] # no bads
events = make_fixed_length_events(raw)
epochs = Epochs(raw, events, tmin=-0.2, tmax=0, preload=True)
return (raw, epochs, events)
开发者ID:Eric89GXL,项目名称:mne-python,代码行数:8,代码来源:test_cov.py
示例6: test_stockwell_ctf
def test_stockwell_ctf():
"""Test that Stockwell can be calculated on CTF data."""
raw = read_raw_fif(raw_ctf_fname)
raw.apply_gradient_compensation(3)
events = make_fixed_length_events(raw, duration=0.5)
evoked = Epochs(raw, events, tmin=-0.2, tmax=0.3, decim=10,
preload=True, verbose='error').average()
tfr_stockwell(evoked, verbose='error') # smoke test
开发者ID:Eric89GXL,项目名称:mne-python,代码行数:8,代码来源:test_stockwell.py
示例7: test_tfr_ctf
def test_tfr_ctf():
"""Test that TFRs can be calculated on CTF data."""
raw = read_raw_fif(raw_ctf_fname).crop(0, 1)
raw.apply_gradient_compensation(3)
events = mne.make_fixed_length_events(raw, duration=0.5)
epochs = mne.Epochs(raw, events)
for method in (tfr_multitaper, tfr_morlet):
method(epochs, [10], 1) # smoke test
开发者ID:kambysese,项目名称:mne-python,代码行数:8,代码来源:test_tfr.py
示例8: test_ctf_plotting
def test_ctf_plotting():
"""Test CTF topomap plotting."""
raw = read_raw_fif(ctf_fname, preload=True)
events = make_fixed_length_events(raw, duration=0.01)
assert len(events) > 10
evoked = Epochs(raw, events, tmin=0, tmax=0.01, baseline=None).average()
assert get_current_comp(evoked.info) == 3
# smoke test that compensation does not matter
evoked.plot_topomap(time_unit='s')
开发者ID:SherazKhan,项目名称:mne-python,代码行数:9,代码来源:test_topomap.py
示例9: test_field_map_ctf
def test_field_map_ctf():
"""Test that field mapping can be done with CTF data."""
raw = read_raw_fif(raw_ctf_fname).crop(0, 1)
raw.apply_gradient_compensation(3)
events = make_fixed_length_events(raw, duration=0.5)
evoked = Epochs(raw, events).average()
evoked.pick_channels(evoked.ch_names[:50]) # crappy mapping but faster
# smoke test
make_field_map(evoked, trans=trans_fname, subject='sample',
subjects_dir=subjects_dir)
开发者ID:Eric89GXL,项目名称:mne-python,代码行数:10,代码来源:test_field_interpolation.py
示例10: test_dipole_fitting_ctf
def test_dipole_fitting_ctf():
"""Test dipole fitting with CTF data."""
raw_ctf = read_raw_ctf(fname_ctf).set_eeg_reference()
events = make_fixed_length_events(raw_ctf, 1)
evoked = Epochs(raw_ctf, events, 1, 0, 0, baseline=None).average()
cov = make_ad_hoc_cov(evoked.info)
sphere = make_sphere_model((0., 0., 0.))
# XXX Eventually we should do some better checks about accuracy, but
# for now our CTF phantom fitting tutorials will have to do
# (otherwise we need to add that to the testing dataset, which is
# a bit too big)
fit_dipole(evoked, cov, sphere)
开发者ID:mvdoc,项目名称:mne-python,代码行数:12,代码来源:test_dipole.py
示例11: test_ctf_plotting
def test_ctf_plotting():
"""Test CTF topomap plotting."""
raw = read_raw_fif(ctf_fname, preload=True)
assert raw.compensation_grade == 3
events = make_fixed_length_events(raw, duration=0.01)
assert len(events) > 10
evoked = Epochs(raw, events, tmin=0, tmax=0.01, baseline=None).average()
assert get_current_comp(evoked.info) == 3
# smoke test that compensation does not matter
evoked.plot_topomap(time_unit='s')
# better test that topomaps can still be used without plotting ref
evoked.pick_types(meg=True, ref_meg=False)
evoked.plot_topomap()
开发者ID:Eric89GXL,项目名称:mne-python,代码行数:13,代码来源:test_topomap.py
示例12: test_plot_evoked_cov
def test_plot_evoked_cov():
"""Test plot_evoked with noise_cov."""
evoked = _get_epochs().average()
cov = read_cov(cov_fname)
cov['projs'] = [] # avoid warnings
evoked.plot(noise_cov=cov, time_unit='s')
with pytest.raises(TypeError, match='Covariance'):
evoked.plot(noise_cov=1., time_unit='s')
with pytest.raises(IOError, match='No such file'):
evoked.plot(noise_cov='nonexistent-cov.fif', time_unit='s')
raw = read_raw_fif(raw_sss_fname)
events = make_fixed_length_events(raw)
epochs = Epochs(raw, events, picks=picks)
cov = compute_covariance(epochs)
evoked_sss = epochs.average()
with pytest.warns(RuntimeWarning, match='relative scaling'):
evoked_sss.plot(noise_cov=cov, time_unit='s')
plt.close('all')
开发者ID:Eric89GXL,项目名称:mne-python,代码行数:18,代码来源:test_evoked.py
示例13: test_lcmv_ctf_comp
def test_lcmv_ctf_comp():
"""Test interpolation with compensated CTF data."""
ctf_dir = op.join(testing.data_path(download=False), 'CTF')
raw_fname = op.join(ctf_dir, 'somMDYO-18av.ds')
raw = mne.io.read_raw_ctf(raw_fname, preload=True)
events = mne.make_fixed_length_events(raw, duration=0.2)[:2]
epochs = mne.Epochs(raw, events, tmin=0., tmax=0.2)
evoked = epochs.average()
with pytest.warns(RuntimeWarning,
match='Too few samples .* estimate may be unreliable'):
data_cov = mne.compute_covariance(epochs)
fwd = mne.make_forward_solution(evoked.info, None,
mne.setup_volume_source_space(pos=15.0),
mne.make_sphere_model())
filters = mne.beamformer.make_lcmv(evoked.info, fwd, data_cov)
assert 'weights' in filters
开发者ID:SherazKhan,项目名称:mne-python,代码行数:18,代码来源:test_lcmv.py
示例14: test_plot_evoked_cov
def test_plot_evoked_cov():
"""Test plot_evoked with noise_cov."""
import matplotlib.pyplot as plt
evoked = _get_epochs().average()
cov = read_cov(cov_fname)
cov['projs'] = [] # avoid warnings
evoked.plot(noise_cov=cov, time_unit='s')
with pytest.raises(TypeError, match='Covariance'):
evoked.plot(noise_cov=1., time_unit='s')
with pytest.raises(IOError, match='No such file'):
evoked.plot(noise_cov='nonexistent-cov.fif', time_unit='s')
raw = read_raw_fif(raw_sss_fname)
events = make_fixed_length_events(raw)
epochs = Epochs(raw, events)
cov = compute_covariance(epochs)
evoked_sss = epochs.average()
with warnings.catch_warnings(record=True) as w:
evoked_sss.plot(noise_cov=cov, time_unit='s')
plt.close('all')
assert any('relative scal' in str(ww.message) for ww in w)
开发者ID:jdammers,项目名称:mne-python,代码行数:20,代码来源:test_evoked.py
示例15: get_epochs
def get_epochs(self,resample=None):
from mne.time_frequency import psd_multitaper
raw = self.raw
validation_windowsize = self.validation_windowsize
front = self.front
back = self.back
# l_freq = self.l_freq
# h_freq = self.h_freq
events = mne.make_fixed_length_events(raw,id=1,start=front,
stop=raw.times[-1]-back,
duration=validation_windowsize)
epochs = mne.Epochs(raw,events,event_id=1,tmin=0,tmax=validation_windowsize,
preload=True)
if resample is not None:
epochs.resample(resample)
# psds,freq = psd_multitaper(epochs,fmin=l_freq,
# fmax=h_freq,
# tmin=0,tmax=validation_windowsize,
# low_bias=True,)
# psds = 10 * np.log10(psds)
self.epochs = epochs
开发者ID:adowaconan,项目名称:modification-pipelines,代码行数:21,代码来源:Filter_based_and_thresholding.py
示例16: intra
def intra(subj):
'''
Performs initial computations within subject and returns average PSD and variance of all epochs.
'''
print('Now beginning intra processing on ' + subj + '...\n') * 5
# Set function parameters
fname_label = subjects_dir + '/' + subj + '/' + 'label/%s.label' % label_name
fname_raw = data_path + subj + '/' + subj + '_rest_raw_sss.fif'
if os.path.isfile(data_path + subj + '/' + subj + '_rest_raw_sss-ico-4-fwd.fif'):
fname_fwd = data_path + subj + '/' + subj + '_rest_raw_sss-ico-4-fwd.fif'
else:
print('Subject ' + subj + ' does not have a ico-4-fwd.fif on file.')
if label_name.startswith('lh.'):
hemi = 'left'
elif label_name.startswith('rh.'):
hemi = 'right'
# Load data
label = mne.read_label(fname_label)
raw = fiff.Raw(fname_raw)
forward_meg = mne.read_forward_solution(fname_fwd)
# Estimate noise covariance from teh raw data
cov = mne.compute_raw_data_covariance(raw, reject=dict(eog=150e-6))
write_cov(data_path + subj + '/' + subj + '-cov.fif', cov)
# Make inverse operator
info = raw.info
inverse_operator = make_inverse_operator(info, forward_meg, cov, loose=None, depth=0.8)
# Epoch data into 4s intervals
events = mne.make_fixed_length_events(raw, 1, start=0, stop=None,
duration=4.)
# Set up pick list: (MEG minus bad channels)
include = []
exclude = raw.info['bads']
picks = fiff.pick_types(raw.info, meg=True, eeg=False, stim=False, eog=True, include=include, exclude=exclude)
# Read epochs and remove bad epochs
epochs = mne.Epochs(raw, events, event_id, tmin, tmax, proj=True, picks=picks, baseline=(None, 0), preload=True, reject=dict(grad=4000e-13, mag=4e-12, eog=150e-6))
# Pull data for averaging later
epc_array = epochs.get_data()
# Compute the inverse solution
inv = apply_inverse_epochs(epochs, inverse_operator, lambda2, method, label=label)
#Need to add a line here to automatically create stc directory within subj
epoch_num = 1
epoch_num_str = str(epoch_num)
for i in inv:
# i.save(data_path + subj + '/tmp/' + label_name[3:] + '_rest_raw_sss-oct-6-inv' + epoch_num_str)
i.save(data_path + subj + '/tmp/' + label_name[3:] + '_rest_raw_sss-ico-4-inv' + epoch_num_str)
epoch_num = epoch_num + 1
epoch_num_str = str(epoch_num)
# The following is used to remove the empty opposing hemisphere files
# and then move the files to save into the appropriate directory
if hemi == 'left':
filelist = [ f for f in os.listdir(data_path + subj + '/tmp') if f.endswith("-rh.stc") ]
for f in filelist:
os.remove(data_path + subj + '/tmp/' + f)
keepers = [ f for f in os.listdir(data_path + subj + '/tmp') if f.endswith("-lh.stc") ]
for f in keepers:
src = f
os.rename(data_path + subj + '/tmp/' + src, data_path + subj + '/inv/' + src)
elif hemi == 'right':
filelist = [ f for f in os.listdir(data_path + subj + '/tmp') if f.endswith("-lh.stc") ]
for f in filelist:
os.remove(data_path + subj + '/tmp/' + f)
keepers = [ f for f in os.listdir(data_path + subj + '/tmp') if f.endswith("-rh.stc") ]
for f in keepers:
src = f
os.rename(data_path + subj + '/tmp/' + src, data_path + subj + '/inv/' + src)
# define frequencies of interest
bandwidth = 4. # bandwidth of the windows in Hz
# compute source space psd in label
# Note: By using "return_generator=True" stcs will be a generator object
# instead of a list. This allows us so to iterate without having to
# keep everything in memory.
psd = compute_source_psd_epochs(epochs, inverse_operator, lambda2=lambda2,
method=method, fmin=fmin, fmax=fmax,
bandwidth=bandwidth, label=label, return_generator=False)
epoch_num = 1
epoch_num_str = str(epoch_num)
for i in psd:
i.save(data_path + subj + '/' + 'tmp' + '/' + label_name[3:] + '_dspm_snr-1_PSD'+ epoch_num_str)
epoch_num = epoch_num + 1
#.........这里部分代码省略.........
开发者ID:vaer-k,项目名称:linear-analysis-psd,代码行数:101,代码来源:linear-analysis-psd.py
示例17: thresholding_filterbased_spindle_searching
#.........这里部分代码省略.........
peak_time['mean']=[];peak_at=[];duration=[]
RMS_mean=hmean(RMS)
mph['mean'] = trim_mean(RMS_mean[int(front*sfreq):-int(back*sfreq)],0.05) + lower_threshold * trimmed_std(RMS_mean,0.05)
mpl['mean'] = trim_mean(RMS_mean[int(front*sfreq):-int(back*sfreq)],0.05) + higher_threshold * trimmed_std(RMS_mean,0.05)
pass_ =RMS_mean > mph['mean']
up = np.where(np.diff(pass_.astype(int))>0)
down= np.where(np.diff(pass_.astype(int))<0)
up = up[0]
down = down[0]
###############################
#print(down[0],up[0])
if down[0] < up[0]:
down = down[1:]
#print(down[0],up[0])
#############################
if (up.shape > down.shape) or (up.shape < down.shape):
size = np.min([up.shape,down.shape])
up = up[:size]
down = down[:size]
C = np.vstack((up,down))
for pairs in C.T:
if l_bound < (time[pairs[1]] - time[pairs[0]]) < h_bound:
SegmentForPeakSearching = RMS_mean[pairs[0]:pairs[1],]
if np.max(SegmentForPeakSearching)< mpl['mean']:
temp_time = time[pairs[0]:pairs[1]]
ints_temp = np.argmax(SegmentForPeakSearching)
peak_time['mean'].append(temp_time[ints_temp])
peak_at.append(SegmentForPeakSearching[ints_temp])
duration_temp = time[pairs[1]] - time[pairs[0]]
duration.append(duration_temp)
time_find=[];mean_peak_power=[];Duration=[];
for item,PEAK,duration_time in zip(peak_time['mean'],peak_at,duration):
temp_timePoint=[]
for ii, names in enumerate(channelList):
try:
temp_timePoint.append(min(enumerate(peak_time[names]), key=lambda x: abs(x[1]-item))[1])
except:
temp_timePoint.append(item + 2)
try:
if np.sum((abs(np.array(temp_timePoint) - item)<tol).astype(int))>=syn_channels:
time_find.append(float(item))
mean_peak_power.append(PEAK)
Duration.append(duration_time)
except:
pass
if sleep_stage:
temp_time_find=[];temp_mean_peak_power=[];temp_duration=[];
# seperate out stage 2
stages = annotations[annotations.Annotation.apply(stage_check)]
On = stages[::2];Off = stages[1::2]
stage_on_off = list(zip(On.Onset.values, Off.Onset.values))
if abs(np.diff(stage_on_off[0]) - 30) < 2:
pass
else:
On = stages[1::2];Off = stages[::2]
stage_on_off = list(zip(On.Onset.values[1:], Off.Onset.values[2:]))
for single_time_find, single_mean_peak_power, single_duration in zip(time_find,mean_peak_power,Duration):
for on_time,off_time in stage_on_off:
if intervalCheck([on_time,off_time],single_time_find,tol=tol):
temp_time_find.append(single_time_find)
temp_mean_peak_power.append(single_mean_peak_power)
temp_duration.append(single_duration)
time_find=temp_time_find;mean_peak_power=temp_mean_peak_power;Duration=temp_duration
result = pd.DataFrame({'Onset':time_find,'Duration':Duration,'Annotation':['spindle']*len(Duration)})
auto_label,_ = discritized_onset_label_auto(raw,result,validation_windowsize)
decision_features=None
if proba:
events = mne.make_fixed_length_events(raw,id=1,start=0,duration=validation_windowsize)
epochs = mne.Epochs(raw,events,event_id=1,tmin=0,tmax=validation_windowsize,preload=True)
data = epochs.get_data()[:,:,:-1]
full_prop=[]
for d in data:
temp_p=[]
#fig,ax = plt.subplots(nrows=2,ncols=3,figsize=(8,8))
for ii,(name) in enumerate(zip(channelList)):#,ax.flatten())):
rms = window_rms(d[ii,:],500)
l = trim_mean(rms,0.05) + lower_threshold * trimmed_std(rms,0.05)
h = trim_mean(rms,0.05) + higher_threshold * trimmed_std(rms,0.05)
prop = (sum(rms>l)+sum(rms<h))/(sum(rms<h) - sum(rms<l))
temp_p.append(prop)
full_prop.append(temp_p)
psds,freq = mne.time_frequency.psd_multitaper(epochs,fmin=11,fmax=16,tmin=0,tmax=3,low_bias=True,)
psds = 10* np.log10(psds)
features = pd.DataFrame(np.concatenate((np.array(full_prop),psds.max(2),freq[np.argmax(psds,2)]),1))
decision_features = StandardScaler().fit_transform(features.values,auto_label)
clf = LogisticRegressionCV(Cs=np.logspace(-4,6,11),cv=5,tol=1e-7,max_iter=int(1e7))
clf.fit(decision_features,auto_label)
auto_proba=clf.predict_proba(decision_features)[:,-1]
return time_find,mean_peak_power,Duration,mph,mpl,auto_proba,auto_label
开发者ID:adowaconan,项目名称:modification-pipelines,代码行数:101,代码来源:Add+probability+to+pipeline+-+single+subject+try.py
示例18: test_low_rank
def test_low_rank():
"""Test low-rank covariance matrix estimation."""
raw = read_raw_fif(raw_fname).set_eeg_reference(projection=True).crop(0, 3)
raw = maxwell_filter(raw, regularize=None) # heavily reduce the rank
sss_proj_rank = 139 # 80 MEG + 60 EEG - 1 proj
n_ch = 366
proj_rank = 365 # one EEG proj
events = make_fixed_length_events(raw)
methods = ('empirical', 'diagonal_fixed', 'oas')
epochs = Epochs(raw, events, tmin=-0.2, tmax=0, preload=True)
bounds = {
'None': dict(empirical=(-6000, -5000),
diagonal_fixed=(-1500, -500),
oas=(-700, -600)),
'full': dict(empirical=(-9000, -8000),
diagonal_fixed=(-2000, -1600),
oas=(-1600, -1000)),
}
for rank in ('full', None):
covs = compute_covariance(
epochs, method=methods, return_estimators=True,
verbose='error', rank=rank)
for cov in covs:
method = cov['method']
these_bounds = bounds[str(rank)][method]
this_rank = _cov_rank(cov, epochs.info)
if rank is None or method == 'empirical':
assert this_rank == sss_proj_rank
else:
assert this_rank == proj_rank
assert these_bounds[0] < cov['loglik'] < these_bounds[1], \
(rank, method)
if method == 'empirical':
emp_cov = cov # save for later, rank param does not matter
# Test equivalence with mne.cov.regularize subspace
with pytest.raises(ValueError, match='are dependent.*must equal'):
regularize(emp_cov, epochs.info, rank=None, mag=0.1, grad=0.2)
assert _cov_rank(emp_cov, epochs.info) == sss_proj_rank
reg_cov = regularize(emp_cov, epochs.info, proj=True, rank='full')
assert _cov_rank(reg_cov, epochs.info) == proj_rank
del reg_cov
with catch_logging() as log:
reg_r_cov = regularize(emp_cov, epochs.info, proj=True, rank=None,
verbose=True)
log = log.getvalue()
assert 'jointly' in log
assert _cov_rank(reg_r_cov, epochs.info) == sss_proj_rank
reg_r_only_cov = regularize(emp_cov, epochs.info, proj=False, rank=None)
assert _cov_rank(reg_r_only_cov, epochs.info) == sss_proj_rank
assert_allclose(reg_r_only_cov['data'], reg_r_cov['data'])
del reg_r_only_cov, reg_r_cov
# test that rank=306 is same as rank='full'
epochs_meg = epochs.copy().pick_types()
assert len(epochs_meg.ch_names) == 306
epochs_meg.info.update(bads=[], projs=[])
cov_full = compute_covariance(epochs_meg, method='oas',
rank='full', verbose='error')
assert _cov_rank(cov_full, epochs_meg.info) == 306
cov_dict = compute_covariance(epochs_meg, method='oas',
rank=306, verbose='error')
assert _cov_rank(cov_dict, epochs_meg.info) == 306
assert_allclose(cov_full['data'], cov_dict['data'])
# Work with just EEG data to simplify projection / rank reduction
raw.pick_types(meg=False, eeg=True)
n_proj = 2
raw.add_proj(compute_proj_raw(raw, n_eeg=n_proj))
n_ch = len(raw.ch_names)
rank = n_ch - n_proj - 1 # plus avg proj
assert len(raw.info['projs']) == 3
epochs = Epochs(raw, events, tmin=-0.2, tmax=0, preload=True)
assert len(raw.ch_names) == n_ch
emp_cov = compute_covariance(epochs, rank='full', verbose='error')
assert _cov_rank(emp_cov, epochs.info) == rank
reg_cov = regularize(emp_cov, epochs.info, proj=True, rank='full')
assert _cov_rank(reg_cov, epochs.info) == rank
reg_r_cov = regularize(emp_cov, epochs.info, proj=False, rank=None)
assert _cov_rank(reg_r_cov, epochs.info) == rank
dia_cov = compute_covariance(epochs, rank=None, method='diagonal_fixed',
verbose='error')
assert _cov_rank(dia_cov, epochs.info) == rank
assert_allclose(dia_cov['data'], reg_cov['data'])
# test our deprecation: can simply remove later
epochs.pick_channels(epochs.ch_names[:103])
# degenerate
with pytest.raises(ValueError, match='can.*only be used with rank="full"'):
compute_covariance(epochs, rank=None, method='pca')
with pytest.raises(ValueError, match='can.*only be used with rank="full"'):
compute_covariance(epochs, rank=None, method='factor_analysis')
开发者ID:jhouck,项目名称:mne-python,代码行数:91,代码来源:test_cov.py
示例19: get_Onest_Amplitude_Duration_of_spindles
#.........这里部分代码省略.........
pass_ =RMS_mean > mph['mean']
up = np.where(np.diff(pass_.astype(int))>0)
down= np.where(np.diff(pass_.astype(int))<0)
up = up[0]
down = down[0]
###############################
#print(down[0],up[0])
if down[0] < up[0]:
down = down[1:]
#print(down[0],up[0])
#############################
if (up.shape > down.shape) or (up.shape < down.shape):
size = np.min([up.shape,down.shape])
up = up[:size]
down = down[:size]
C = np.vstack((up,down))
for pairs in C.T:
if l_bound < (time[pairs[1]] - time[pairs[0]]) < h_bound:
SegmentForPeakSearching = RMS_mean[pairs[0]:pairs[1],]
if np.max(SegmentForPeakSearching)< mpl['mean']:
temp_time = time[pairs[0]:pairs[1]]
ints_temp = np.argmax(SegmentForPeakSearching)
peak_time['mean'].append(temp_time[ints_temp])
peak_at.append(SegmentForPeakSearching[ints_temp])
duration_temp = time[pairs[1]] - time[pairs[0]]
duration.append(duration_temp)
time_find=[];mean_peak_power=[];Duration=[];
for item,PEAK,duration_time in zip(peak_time['mean'],peak_at,duration):
temp_timePoint=[]
for ii, names in enumerate(channelList):
try:
temp_timePoint.append(min(enumerate(peak_time[names]), key=lambda x: abs(x[1]-item))[1])
except:
temp_timePoint.append(item + 2)
try:
if np.sum((abs(np.array(temp_timePoint) - item)<tol).astype(int))>=syn_channels:
time_find.append(float(item))
mean_peak_power.append(PEAK)
Duration.append(duration_time)
except:
pass
############ the end of the processing in which no other inputs ##
#### update the spindles we found if we want to add information of sleep stages ######
if sleep_stage:
temp_time_find=[];temp_mean_peak_power=[];temp_duration=[];
# seperate out stage 2
stages = annotations[annotations.Annotation.apply(stage_check)]
On = stages[::2];Off = stages[1::2]
stage_on_off = list(zip(On.Onset.values, Off.Onset.values))
if abs(np.diff(stage_on_off[0]) - 30) < 2:
pass
else:
On = stages[1::2];Off = stages[::2]
stage_on_off = list(zip(On.Onset.values[1:], Off.Onset.values[2:]))
for single_time_find, single_mean_peak_power, single_duration in zip(time_find,mean_peak_power,Duration):
for on_time,off_time in stage_on_off:
if intervalCheck([on_time,off_time],single_time_find,tol=tol):
temp_time_find.append(single_time_find)
temp_mean_peak_power.append(single_mean_peak_power)
temp_duration.append(single_duration)
time_find=temp_time_find;mean_peak_power=temp_mean_peak_power;Duration=temp_duration
####### decision function based on spindles we have just found ####
"""
A single floating representation is computed based on the validation window size (say 3 seconds), and information like peak power densities and peak frequencies are added to the feature space.
We fit the standandardized features with the labels (spindles found by the automated pipeline)
A prediction probability is computed using scikit-learn::logisticregression
"""
decision_features=None;auto_proba=None;auto_label=None
if proba:
result = pd.DataFrame({'Onset':time_find,'Duration':Duration,'Annotation':['spindle']*len(Duration)})
auto_label,_ = discritized_onset_label_auto(raw,result,validation_windowsize)
events = mne.make_fixed_length_events(raw,id=1,start=front,stop=raw.times[-1]-back,duration=validation_windowsize)
epochs = mne.Epochs(raw,events,event_id=1,tmin=0,tmax=validation_windowsize,preload=True)
data = epochs.get_data()[:,:,:-1]
full_prop=[]
for d in data:
temp_p=[]
#fig,ax = plt.subplots(nrows=2,ncols=3,figsize=(8,8))
for ii,(name) in enumerate(zip(channelList)):#,ax.flatten())):
rms = window_rms(d[ii,:],500)
l = trim_mean(rms,0.05) + lower_threshold * trimmed_std(rms,0.05)
h = trim_mean(rms,0.05) + higher_threshold * trimmed_std(rms,0.05)
prop = (sum(rms>l)+sum(rms<h))/(sum(rms<h) - sum(rms<l))
if np.isinf(prop):
prop = (sum(rms>l)+sum(rms<h))
temp_p.append(prop)
full_prop.append(temp_p)
psds,freq = mne.time_frequency.psd_multitaper(epochs,fmin=l_freq,fmax=h_freq,tmin=0,tmax=3,low_bias=True,)
psds = 10* np.log10(psds)
features = pd.DataFrame(np.concatenate((np.array(full_prop),psds.max(2),freq[np.argmax(psds,2)]),1))
decision_features = StandardScaler().fit_transform(features.values,auto_label)
clf = LogisticRegressionCV(Cs=np.logspace(-4,6,11),cv=5,tol=1e-7,max_iter=int(1e7))
clf.fit(decision_features,auto_label)
auto_proba=clf.predict_proba(decision_features)[:,-1]
return time_find,mean_peak_power,Duration,mph,mpl,auto_proba,auto_label
开发者ID:adowaconan,项目名称:modification-pipelines,代码行数:101,代码来源:osf_test.py
示例20: print
auc_threshold='adapt')
r32 = eegPinelineDesign.detection_pipeline_crossvalidation(raw_32,raw_32.ch_names,
annotation,1000,
0.4,3.4,16,
11,16,f,cv=cv,
auc_threshold='adapt')
r61 = eegPinelineDesign.detection_pipeline_crossvalidation(raw_61,raw_61.ch_names,
annotation,1000,
0.4,3.4,int(61/2),
11,16,f,cv=cv,
auc_threshold='adapt')
print(np.mean(r6[3],0),np.mean(r32[3],0),np.mean(r61[3],0))
front=300;back=100
stop = raw_6.times[-1]-back
events = mne.make_fixed_length_events(raw_6,1,start=front,stop=stop,duration=3,)
epochs = mne.Epochs(raw_6,events,1,tmin=0,tmax=3,proj=False,preload=True)
epochs.resample(64)
gold_standard = eegPinelineDesign.read_annotation(raw_6,f)
manual_labels,_ = eegPinelineDesign.discritized_onset_label_manual(raw_6,gold_standard,3)
freqs = np.arange(11,17,1)
n_cycles = freqs / 2.
time_bandwidth = 2.0 # Least possible frequency-smoothing (1 taper)
power = tfr_multitaper(epochs,freqs,n_cycles=n_cycles,time_bandwidth=time_bandwidth,return_itc=False,average=False,)
clf = Pipeline([('vectorizer',Vectorizer()),
('scaler',StandardScaler()),
('est',exported_pipeline)])
data = power.data
labels = manual_labels
开发者ID:adowaconan,项目名称:modification-pipelines,代码行数:31,代码来源:from+6+channel+baseline+to+32+channels+to+tfc.py
注:本文中的mne.make_fixed_length_events函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论