本文整理汇总了Python中tables.open_file函数的典型用法代码示例。如果您正苦于以下问题:Python open_file函数的具体用法?Python open_file怎么用?Python open_file使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了open_file函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_maximum_overview_size
def test_maximum_overview_size(self):
filename = make_temp_dir('maxisze.hdf5')
env = Environment(trajectory='Testmigrate', filename=filename,
log_config=get_log_config(), add_time=True)
traj = env.v_trajectory
for irun in range(pypetconstants.HDF5_MAX_OVERVIEW_TABLE_LENGTH):
traj.f_add_parameter('f%d.x' % irun, 5)
traj.f_store()
store = pt.open_file(filename, mode='r+')
table = store.root._f_get_child(traj.v_name).overview.parameters_overview
self.assertEquals(table.nrows, pypetconstants.HDF5_MAX_OVERVIEW_TABLE_LENGTH)
store.close()
for irun in range(pypetconstants.HDF5_MAX_OVERVIEW_TABLE_LENGTH,
2*pypetconstants.HDF5_MAX_OVERVIEW_TABLE_LENGTH):
traj.f_add_parameter('f%d.x' % irun, 5)
traj.f_store()
store = pt.open_file(filename, mode='r+')
table = store.root._f_get_child(traj.v_name).overview.parameters_overview
self.assertEquals(table.nrows, pypetconstants.HDF5_MAX_OVERVIEW_TABLE_LENGTH)
store.close()
env.f_disable_logging()
开发者ID:SmokinCaterpillar,项目名称:pypet,代码行数:32,代码来源:storage_test.py
示例2: test02_copy
def test02_copy(self):
"""Checking (X)Array.copy() method ('numetic' flavor)"""
srcfile = self._testFilename("oldflavor_numeric.h5")
tmpfile = tempfile.mktemp(".h5")
shutil.copy(srcfile, tmpfile)
try:
# Open the HDF5 with old numeric flavor
with tables.open_file(tmpfile, "r+") as h5file:
# Copy to another location
self.assertWarns(FlavorWarning,
h5file.root.array1.copy, '/', 'array1copy')
h5file.root.array2.copy('/', 'array2copy')
h5file.root.carray1.copy('/', 'carray1copy')
h5file.root.carray2.copy('/', 'carray2copy')
h5file.root.vlarray1.copy('/', 'vlarray1copy')
h5file.root.vlarray2.copy('/', 'vlarray2copy')
if self.close:
h5file.close()
h5file = tables.open_file(tmpfile)
else:
h5file.flush()
# Assert other properties in array
self.assertEqual(h5file.root.array1copy.flavor, 'numeric')
self.assertEqual(h5file.root.array2copy.flavor, 'python')
self.assertEqual(h5file.root.carray1copy.flavor, 'numeric')
self.assertEqual(h5file.root.carray2copy.flavor, 'python')
self.assertEqual(h5file.root.vlarray1copy.flavor, 'numeric')
self.assertEqual(h5file.root.vlarray2copy.flavor, 'python')
finally:
os.remove(tmpfile)
开发者ID:ESSS,项目名称:PyTables,代码行数:33,代码来源:test_backcompat.py
示例3: validate_results
def validate_results(test, expected_path, actual_path):
"""Validate results by comparing in and output HDF5 files
:param test: instance of the TestCase.
:param expected_path: path to the reference data.
:param actual_path: path to the output from the test.
"""
with tables.open_file(expected_path, 'r') as expected_file, \
tables.open_file(actual_path, 'r') as actual_file:
for expected_node in expected_file.walk_nodes('/', 'Leaf'):
try:
actual_node = actual_file.get_node(expected_node._v_pathname)
except tables.NoSuchNodeError:
test.fail("Node '%s' does not exist in datafile" %
expected_node._v_pathname)
if type(expected_node) is tables.table.Table:
validate_tables(test, expected_node, actual_node)
elif type(expected_node) is tables.vlarray.VLArray:
validate_vlarrays(test, expected_node, actual_node)
elif type(expected_node) is tables.array.Array:
validate_arrays(test, expected_node, actual_node)
else:
raise NotImplementedError
validate_attributes(test, expected_node, actual_node)
validate_attributes(test, expected_file.root, actual_file.root)
开发者ID:OpenCosmics,项目名称:sapphire,代码行数:26,代码来源:validate_results.py
示例4: test02_CompareTable
def test02_CompareTable(self):
"Comparing written time data with read data in a Table."
wtime = 1234567890.123456
# Create test Table with data.
h5file = tables.open_file(
self.h5fname, 'w', title="Test for comparing Time tables")
tbl = h5file.create_table('/', 'test', self.MyTimeRow)
row = tbl.row
row['t32col'] = int(wtime)
row['t64col'] = (wtime, wtime)
row.append()
h5file.close()
# Check the written data.
h5file = tables.open_file(self.h5fname)
recarr = h5file.root.test.read(0)
h5file.close()
self.assertEqual(recarr['t32col'][0], int(wtime),
"Stored and retrieved values do not match.")
comp = (recarr['t64col'][0] == numpy.array((wtime, wtime)))
self.assertTrue(numpy.alltrue(comp),
"Stored and retrieved values do not match.")
开发者ID:ymarfoq,项目名称:outilACVDesagregation,代码行数:26,代码来源:test_timetype.py
示例5: test_2D_multiphase
def test_2D_multiphase(self):
# RELOAD MODULES
self.reload_modules()
pnList = [(twp_navier_stokes_p, twp_navier_stokes_n),
(clsvof_p, clsvof_n)]
self.so = multiphase_so
pList=[]
nList=[]
sList=[]
for (pModule,nModule) in pnList:
pList.append(pModule)
if pList[-1].name == None:
pList[-1].name = pModule.__name__
nList.append(nModule)
for i in range(len(pnList)):
sList.append(default_s)
self.so.name += "_2D_falling_bubble"
# NUMERICAL SOLUTION #
ns = proteus.NumericalSolution.NS_base(self.so,
pList,
nList,
sList,
opts)
ns.calculateSolution('2D_falling_bubble')
# COMPARE VS SAVED FILES #
expected_path = 'comparison_files/multiphase_2D_falling_bubble.h5'
expected = tables.open_file(os.path.join(self._scriptdir,expected_path))
actual = tables.open_file('multiphase_2D_falling_bubble.h5','r')
assert np.allclose(expected.root.phi_t2,actual.root.phi_t2,atol=1e-10)
expected.close()
actual.close()
开发者ID:arnsong,项目名称:proteus,代码行数:31,代码来源:test_clsvof_with_rans2p.py
示例6: main
def main(argv):
args = parse_args(argv[1:])
fileout = os.path.abspath(args.output)
start = time()
for fin in args.inputs:
filein = os.path.abspath(fin)
print 'Concatenating %s' % filein
if not os.path.exists(fileout):
copyfile(filein, fileout)
else:
# Can't use HdfStorage.readCoordinates because it needs an
# Ice.Communicator object, so there's no point using the
# OMERO.tables interface
tout = tables.open_file(fileout, 'r+')
tin = tables.open_file(filein, 'r')
nrows = tin.root.OME.Measurements.nrows
for a in range(0, nrows, ROW_CHUNK):
b = min(nrows, a + ROW_CHUNK)
print '\tRows %d:%d' % (a, b)
rows = tin.root.OME.Measurements.read_coordinates(range(a, b))
tout.root.OME.Measurements.append(rows)
tin.close()
tout.close()
print '\tCumulative time: %d seconds' % (time() - start)
print 'Done'
开发者ID:IDR,项目名称:pydoop-features,代码行数:30,代码来源:concatfeatures.py
示例7: test_case_2
def test_case_2(self):
# Set parameters for this test
parameters.ct.test_case=2
# RELOAD MODULES
self.reload_modules()
pnList = [(clsvof_p, clsvof_n)]
self.so = default_so
self.so.tnList = clsvof_n.tnList
pList=[]
nList=[]
sList=[]
for (pModule,nModule) in pnList:
pList.append(pModule)
if pList[-1].name == None:
pList[-1].name = pModule.__name__
nList.append(nModule)
for i in range(len(pnList)):
sList.append(default_s)
self.so.name = "clsvof_test_case_2"
# NUMERICAL SOLUTION #
ns = proteus.NumericalSolution.NS_base(self.so,
pList,
nList,
sList,
opts)
ns.calculateSolution('test_case_2')
# COMPARE VS SAVED FILES #
expected_path = 'comparison_files/clsvof_test_case_2.h5'
expected = tables.open_file(os.path.join(self._scriptdir,expected_path))
actual = tables.open_file('clsvof_test_case_2.h5','r')
assert np.allclose(expected.root.u_t2,actual.root.u_t2,atol=1e-10)
expected.close()
actual.close()
开发者ID:arnsong,项目名称:proteus,代码行数:33,代码来源:test_clsvof.py
示例8: __init__
def __init__( self,
fnContigLengths,
fnWssd,
overwrite,
openMode,
groupsToCheck=[],
compression=False ):
self.compress = compression
assert os.path.exists(fnContigLengths)
if openMode=='r':
assert not overwrite
assert os.path.exists(fnWssd), fnWssd
debug_output('WssdBase: reading contig lengths from file %s'%fnContigLengths)
self.mContigNameLen = {}
for l in open(fnContigLengths,'r'):
l=l.replace('\n','').split('\t')
self.mContigNameLen[l[0]]=int(l[1])
debug_output('WSSD space: %d contigs totaling %d bp'%( len(self.mContigNameLen), sum(self.mContigNameLen.values()) ))
if overwrite or not os.path.exists(fnWssd):
self.tbl = tables.open_file( fnWssd, 'w' )
else:
if openMode=='r':
self.tbl = tables.open_file( fnWssd, 'r' )
else:
self.tbl = tables.open_file( fnWssd, 'a' )
开发者ID:bnelsj,项目名称:wssd_sunk,代码行数:31,代码来源:wssd_common.py
示例9: process
def process(self, rows_slice):
with Worker.hdf5_lock:
with tables.open_file(self.hdf5_file, 'r+') as fileh:
T = fileh.get_node(self.path + '/temporaries')
tmp = T[rows_slice, ...]
ind = np.arange(0, rows_slice.stop - rows_slice.start)
# tmp = - A_new
tmp -= self.rows_sum
diag_A = tmp[ind, rows_slice.start + ind].copy()
np.clip(tmp, 0, np.inf, tmp)
tmp[ind, rows_slice.start + ind] = diag_A
Worker.hdf5_lock.acquire()
with tables.open_file(self.hdf5_file, 'r+') as fileh:
A = fileh.get_node(self.path + '/availabilities')
a = A[rows_slice, ...]
Worker.hdf5_lock.release()
# yet more damping
a = a * self.damping - tmp * (1 - self.damping)
with Worker.hdf5_lock:
with tables.open_file(self.hdf5_file, 'r+') as fileh:
A = fileh.get_node(self.path + '/availabilities')
T = fileh.get_node(self.path + '/temporaries')
A[rows_slice, ...] = a
T[rows_slice, ...] = tmp
del a, tmp
开发者ID:GGiecold,项目名称:Concurrent_AP,代码行数:35,代码来源:Concurrent_AP.py
示例10: create_synth
def create_synth(kind, prec):
prefix_orig = "cellzome/cellzome-"
iname = dirname + prefix_orig + "none-" + prec + ".h5"
f = tb.open_file(iname, "r")
if prec == "single":
type_ = tb.Float32Atom()
else:
type_ = tb.Float64Atom()
prefix = "synth/synth-"
for clevel in range(10):
oname = "%s/%s-%s%d-%s.h5" % (dirname, prefix, kind, clevel, prec)
# print "creating...", iname
f2 = tb.open_file(oname, "w")
if kind in ["none", "numpy"]:
filters = None
else:
filters = tb.Filters(complib=kind, complevel=clevel, shuffle=shuffle)
for name in ["maxarea", "mascotscore"]:
col = f.get_node("/", name)
r = f2.create_carray("/", name, type_, col.shape, filters=filters)
if name == "maxarea":
r[:] = np.arange(col.nrows, dtype=type_.dtype)
else:
r[:] = np.arange(col.nrows, 0, dtype=type_.dtype)
f2.close()
if clevel == 0:
size = 1.5 * float(os.stat(oname)[6])
f.close()
return size
开发者ID:eumiro,项目名称:PyTables,代码行数:35,代码来源:blosc.py
示例11: open
def open(self, mode, ncols=1, nrows=1, xll=0, yll=0, cellsize=1, nodatavalue=-9999.0,
dataset_name="dummy", group_prefix="row", table_prefix="col", index_format="04i", variables=[], units=[]):
# Initialise
fpath = os.path.join(self.folder, self.name);
if (mode[0] == 'w'):
# Open the file
self.__datafile = tables.open_file(fpath, 'w');
# Assign the data attributes
self.ncols = ncols;
self.nrows = nrows;
self.xll = xll;
self.yll = yll;
self.cellsize = cellsize;
self.nodatavalue = nodatavalue;
self.dataset_name = dataset_name;
self.group_prefix = group_prefix;
self.table_prefix = table_prefix;
self.index_format = index_format;
self.variables = variables;
self.units = units;
self.writeheader();
else:
# If file does not exist, then ...
if os.path.exists(fpath):
# Retrieve the data attributes from the header file
self.readheader();
GridEnvelope2D.__init__(self, self.ncols, self.nrows, self.xll, self.yll, self.cellsize, self.cellsize);
self.__datafile = tables.open_file(fpath, 'r');
return True;
else: return False;
开发者ID:ajwdewit,项目名称:ggcmi,代码行数:31,代码来源:hdf5raster.py
示例12: h5_apply_func
def h5_apply_func(input_path, output_path, node_func):
"""
Apply node_func to all nodes of input_path and store the result in
output_path
Parameters
----------
input_path : str
path to .h5 input file
output_path : str
path to .h5 output file
node_func : function
function that will be applied to all nodes
func(node, new_parent) -> new_node
new_node must be node if node must be copied
None if node must not be copied
another Node if node must not be copied (was already
handled/copied/modified by func)
"""
with tables.open_file(input_path) as input_file, \
tables.open_file(output_path, mode="w") as output_file:
for node in input_file.walk_nodes(classname='Leaf'):
if node is not input_file.root:
print(node._v_pathname, "...", end=' ')
parent_path = node._v_parent._v_pathname
if parent_path in output_file:
new_parent = output_file.get_node(parent_path)
else:
new_parent = output_file._create_path(parent_path)
new_node = node_func(node, new_parent)
if new_node is node:
print("copying (without modifications) ...", end=' ')
node._f_copy(new_parent)
print("done.")
开发者ID:liam2,项目名称:liam2,代码行数:34,代码来源:idchanger.py
示例13: __init__
def __init__(self, cfg):
self.cfg = cfg
self.path = os.path.join(self.cfg.subsets_path, 'data.db')
self.results = None
if os.path.exists(self.path):
try:
self.h5 = tables.open_file(self.path, 'a')
self.results = self.h5.root.results
except:
# If anything fails, we just create a new database...
log.warning("""Failed to open existing database at %s, or
database is corrupted. Creating a new one""", self.path)
self.results = None
# Something went wrong!
if not self.results:
try:
# Try closing this, just in case
self.h5.close()
except:
pass
# Compression is good -- and faster, according to the pytables docs...
f = tables.Filters(complib='blosc', complevel=5)
self.h5 = tables.open_file(self.path, 'w', filters=f)
self.results = self.h5.create_table(
'/', 'results', cfg.data_layout.data_type)
self.results.cols.subset_id.create_csindex()
assert isinstance(self.results, tables.Table)
assert self.results.indexed
开发者ID:brettc,项目名称:partitionfinder,代码行数:31,代码来源:database.py
示例14: ptconcat
def ptconcat(output_file, input_files, overwrite=False):
"""Concatenate HDF5 Files"""
filt = tb.Filters(
complevel=5, shuffle=True, fletcher32=True, complib='zlib'
)
out_tabs = {}
dt_file = input_files[0]
log.info("Reading data struct '%s'..." % dt_file)
h5struc = tb.open_file(dt_file, 'r')
log.info("Opening output file '%s'..." % output_file)
if overwrite:
outmode = 'w'
else:
outmode = 'a'
h5out = tb.open_file(output_file, outmode)
for node in h5struc.walk_nodes('/', classname='Table'):
path = node._v_pathname
log.debug(path)
dtype = node.dtype
p, n = os.path.split(path)
out_tabs[path] = h5out.create_table(
p, n, description=dtype, filters=filt, createparents=True
)
h5struc.close()
for fname in input_files:
log.info('Reading %s...' % fname)
h5 = tb.open_file(fname)
for path, out in out_tabs.items():
tab = h5.get_node(path)
out.append(tab[:])
h5.close()
h5out.close()
开发者ID:tamasgal,项目名称:km3pipe,代码行数:33,代码来源:ptconcat.py
示例15: test_estimator_pytables
def test_estimator_pytables():
m1 = MyEstimator(a=1, b='a', c=None, d=False, e=np.zeros(3)).fit(None)
f = tables.open_file(fn, 'w')
m1.to_pytables(f.root)
f.close()
g = tables.open_file(fn)
m2 = MyEstimator.from_pytables(g.root.MyEstimator)
print m1.__dict__
print m2.__dict__
for key, value in m1.get_params().iteritems():
if any(isinstance(value, t) for t in [int, float, str]):
assert value == getattr(m2, key, object())
else:
eq(value, getattr(m2, key, object()), err_msg='error on param key=%s' % key)
for key in m1._get_estimate_names():
value = getattr(m1, key)
if any(isinstance(value, t) for t in [int, float, str]):
assert value == getattr(m2, key, object())
else:
eq(value, getattr(m2, key, object()), err_msg='error on estimate key=%s' % key)
g.close()
开发者ID:rmcgibbo,项目名称:msmbuilder3,代码行数:27,代码来源:test_to_from_pytables.py
示例16: __init__
def __init__(self, parent, filename):
if not isinstance(filename, string_types):
raise ValueError(
'Pytables requires filename parameter as string. Got {} instead.'
.format(filename.__class__))
self.parent = parent
self.version = HDFPartition.VERSION
self.n_rows = 0
self.n_cols = 0
self.cache = []
if os.path.exists(filename):
self._h5_file = open_file(filename, mode='a')
self.meta = HDFReader._read_meta(self._h5_file)
self.version, self.n_rows, self.n_cols = _get_file_header(
self._h5_file.root.partition.file_header)
else:
# No, doesn't exist
self._h5_file = open_file(filename, mode='w')
self.meta = deepcopy(MPRowsFile.META_TEMPLATE)
self.header_mangler = lambda name: re.sub('_+', '_', re.sub('[^\w_]', '_', name).lower()).rstrip('_')
if self.n_rows == 0:
self.meta['about']['create_time'] = time.time()
开发者ID:CivicKnowledge,项目名称:ambry_sources,代码行数:29,代码来源:core.py
示例17: test_EV2
def test_EV2(self):
thelper_vof.ct.STABILIZATION_TYPE = 1 # EV
thelper_vof.ct.ENTROPY_TYPE = 2 #logarithmic
thelper_vof.ct.cE = 0.1
thelper_vof.ct.FCT = True
reload(thelper_vof_p)
reload(thelper_vof_n)
self.so.name = self.pList[0].name+"_EV2"
# NUMERICAL SOLUTION #
ns = proteus.NumericalSolution.NS_base(self.so,
self.pList,
self.nList,
self.sList,
opts)
self.sim_names.append(ns.modelList[0].name)
ns.calculateSolution('vof')
# COMPARE VS SAVED FILES #
expected_path = 'comparison_files/vof_level_3_EV2.h5'
expected = tables.open_file(os.path.join(self._scriptdir,expected_path))
actual = tables.open_file('vof_level_3_EV2.h5','r')
assert np.allclose(expected.root.u_t2,
actual.root.u_t2,
atol=1e-10)
expected.close()
actual.close()
开发者ID:arnsong,项目名称:proteus,代码行数:25,代码来源:test_vof.py
示例18: __init__
def __init__(self, output_dir, chrom_list):
# combined allele-specific read counts
as_count_filename = "%s/combined_as_count.h5" % output_dir
self.as_count_h5 = tables.open_file(as_count_filename, "w")
# combined mapped read counts
read_count_filename = "%s/combined_read_count.h5" % output_dir
self.read_count_h5 = tables.open_file(read_count_filename, "w")
# counts of genotypes
ref_count_filename = "%s/combined_ref_count.h5" % output_dir
self.ref_count_h5 = tables.open_file(ref_count_filename, "w")
alt_count_filename = "%s/combined_alt_count.h5" % output_dir
self.alt_count_h5 = tables.open_file(alt_count_filename, "w")
het_count_filename = "%s/combined_het_count.h5" % output_dir
self.het_count_h5 = tables.open_file(het_count_filename, "w")
self.filenames = [as_count_filename, read_count_filename,
ref_count_filename, alt_count_filename,
het_count_filename]
self.h5_files = [self.as_count_h5, self.read_count_h5,
self.ref_count_h5, self.alt_count_h5,
self.het_count_h5]
# initialize all of these files
atom = tables.UInt16Atom(dflt=0)
for h5f in self.h5_files:
for chrom in chrom_list:
self.create_carray(h5f, chrom, atom)
开发者ID:bmvdgeijn,项目名称:WASP,代码行数:33,代码来源:get_target_regions.py
示例19: test01b_Compare64VLArray
def test01b_Compare64VLArray(self):
"Comparing several written and read 64-bit time values in a VLArray."
# Create test VLArray with data.
h5file = tables.open_file(
self.h5fname, 'w', title="Test for comparing Time64 VL arrays")
vla = h5file.create_vlarray('/', 'test', self.myTime64Atom)
# Size of the test.
nrows = vla.nrowsinbuf + 34 # Add some more rows than buffer.
# Only for home checks; the value above should check better
# the I/O with multiple buffers.
# nrows = 10
for i in xrange(nrows):
j = i * 2
vla.append((j + 0.012, j + 1 + 0.012))
h5file.close()
# Check the written data.
h5file = tables.open_file(self.h5fname)
arr = h5file.root.test.read()
h5file.close()
arr = numpy.array(arr)
orig_val = numpy.arange(0, nrows * 2, dtype=numpy.int32) + 0.012
orig_val.shape = (nrows, 1, 2)
if common.verbose:
print "Original values:", orig_val
print "Retrieved values:", arr
self.assertTrue(allequal(arr, orig_val),
"Stored and retrieved values do not match.")
开发者ID:bbudescu,项目名称:PyTables,代码行数:32,代码来源:test_timetype.py
示例20: __init__
def __init__(self, h5_filename_queue):
"""
param h5_filename_queue: a queue of temporary hdf5 files
"""
self.h5_filename_queue = h5_filename_queue
tables.open_file(table_path, 'w').close() #creates a new file
super(WriteHDF5Thread, self).__init__()
开发者ID:drmaize,项目名称:compvision,代码行数:7,代码来源:bqfeature.py
注:本文中的tables.open_file函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论