本文整理汇总了Python中tarfile.open函数的典型用法代码示例。如果您正苦于以下问题:Python open函数的具体用法?Python open怎么用?Python open使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了open函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: maybe_download_and_extract
def maybe_download_and_extract(data_url, dest_dir='/tmp/imagenet'):
"""
Download and extract model tar file. If the pretrained model we're using doesn't already exist,
downloads it and unpacks it into a directory.
:param data_url: url where tar.gz file exists
:param dest_dir: destination directory untar to
:return:
"""
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
filename = data_url.split('/')[-1]
filepath = os.path.join(dest_dir, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' %
(filename,
float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(data_url,
filepath,
_progress)
print()
statinfo = os.stat(filepath)
print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
tarfile.open(filepath, 'r:gz').extractall(dest_dir)
开发者ID:danellecline,项目名称:mbari-videotag,代码行数:27,代码来源:util.py
示例2: maybe_download_and_extract
def maybe_download_and_extract():
"""Download and extract model tar file.
If the pretrained model we're using doesn't already exist, this function
downloads it from the TensorFlow.org website and unpacks it into a directory.
"""
dest_directory = FLAGS.model_dir
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = DATA_URL.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' %
(filename,
float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(DATA_URL,
filepath,
_progress)
print()
statinfo = os.stat(filepath)
print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
tarfile.open(filepath, 'r:gz').extractall(dest_directory)
开发者ID:MrRabbit0o0,项目名称:tensorflow,代码行数:26,代码来源:retrain.py
示例3: setUp
def setUp(self):
super(ImportTestCase, self).setUp()
self.url = reverse_course_url('import_handler', self.course.id)
self.content_dir = path(tempfile.mkdtemp())
def touch(name):
""" Equivalent to shell's 'touch'"""
with file(name, 'a'):
os.utime(name, None)
# Create tar test files -----------------------------------------------
# OK course:
good_dir = tempfile.mkdtemp(dir=self.content_dir)
os.makedirs(os.path.join(good_dir, "course"))
with open(os.path.join(good_dir, "course.xml"), "w+") as f:
f.write('<course url_name="2013_Spring" org="EDx" course="0.00x"/>')
with open(os.path.join(good_dir, "course", "2013_Spring.xml"), "w+") as f:
f.write('<course></course>')
self.good_tar = os.path.join(self.content_dir, "good.tar.gz")
with tarfile.open(self.good_tar, "w:gz") as gtar:
gtar.add(good_dir)
# Bad course (no 'course.xml' file):
bad_dir = tempfile.mkdtemp(dir=self.content_dir)
touch(os.path.join(bad_dir, "bad.xml"))
self.bad_tar = os.path.join(self.content_dir, "bad.tar.gz")
with tarfile.open(self.bad_tar, "w:gz") as btar:
btar.add(bad_dir)
self.unsafe_common_dir = path(tempfile.mkdtemp(dir=self.content_dir))
开发者ID:1amongus,项目名称:edx-platform,代码行数:32,代码来源:test_import_export.py
示例4: install_from_source
def install_from_source(setuptools_source, pip_source):
setuptools_temp_dir = tempfile.mkdtemp('-setuptools', 'ptvs-')
pip_temp_dir = tempfile.mkdtemp('-pip', 'ptvs-')
cwd = os.getcwd()
try:
os.chdir(setuptools_temp_dir)
print('Downloading setuptools from ' + setuptools_source)
sys.stdout.flush()
setuptools_package, _ = urlretrieve(setuptools_source, 'setuptools.tar.gz')
package = tarfile.open(setuptools_package)
try:
safe_members = [m for m in package.getmembers() if not m.name.startswith(('..', '\\'))]
package.extractall(setuptools_temp_dir, members=safe_members)
finally:
package.close()
extracted_dirs = [d for d in os.listdir(setuptools_temp_dir) if os.path.exists(os.path.join(d, 'setup.py'))]
if not extracted_dirs:
raise OSError("Failed to find setuptools's setup.py")
extracted_dir = extracted_dirs[0]
print('\nInstalling from ' + extracted_dir)
sys.stdout.flush()
os.chdir(extracted_dir)
subprocess.check_call(
EXECUTABLE + ['setup.py', 'install', '--single-version-externally-managed', '--record', 'setuptools.txt']
)
os.chdir(pip_temp_dir)
print('Downloading pip from ' + pip_source)
sys.stdout.flush()
pip_package, _ = urlretrieve(pip_source, 'pip.tar.gz')
package = tarfile.open(pip_package)
try:
safe_members = [m for m in package.getmembers() if not m.name.startswith(('..', '\\'))]
package.extractall(pip_temp_dir, members=safe_members)
finally:
package.close()
extracted_dirs = [d for d in os.listdir(pip_temp_dir) if os.path.exists(os.path.join(d, 'setup.py'))]
if not extracted_dirs:
raise OSError("Failed to find pip's setup.py")
extracted_dir = extracted_dirs[0]
print('\nInstalling from ' + extracted_dir)
sys.stdout.flush()
os.chdir(extracted_dir)
subprocess.check_call(
EXECUTABLE + ['setup.py', 'install', '--single-version-externally-managed', '--record', 'pip.txt']
)
print('\nInstallation Complete')
sys.stdout.flush()
finally:
os.chdir(cwd)
shutil.rmtree(setuptools_temp_dir, ignore_errors=True)
shutil.rmtree(pip_temp_dir, ignore_errors=True)
开发者ID:aurv,项目名称:PTVS,代码行数:60,代码来源:pip_downloader.py
示例5: unpack_maflib
def unpack_maflib(directory):
with _Cleaner(directory) as c:
content = _read_archive(__file__)
os.makedirs(os.path.join(directory, 'maflib'))
os.chdir(directory)
bz2_name = TAR_NAME + '.bz2'
with open(bz2_name, 'wb') as f:
f.write(content)
try:
t = tarfile.open(bz2_name)
except:
try:
os.system('bunzip2 ' + bz2_name)
t = tarfile.open(TAR_NAME)
except:
raise Exception('Cannot extract maflib. Check that python bz2 module or bunzip2 command is available.')
try:
t.extractall()
finally:
t.close()
try:
os.remove(bz2_name)
os.remove(TAR_NAME)
except:
pass
maflib_path = os.path.abspath(os.getcwd())
return maflib_path
开发者ID:pfi,项目名称:maf,代码行数:33,代码来源:maf.py
示例6: __call__
def __call__(self, path, target):
"""Extract C{path} into C{target} using the C{zipfile} module.
@note: No need to use C{tarfile.is_tarfile} because we want an
exception on failure anyway."""
import tarfile
tarfile.open(path, 'r').extractall(target)
开发者ID:ssokolow,项目名称:unball,代码行数:7,代码来源:extractors.py
示例7: unpack
def unpack(filename, destination):
dirname = ''
print('Extracting {0}'.format(filename))
if filename.endswith('tar.gz'):
tfile = tarfile.open(filename, 'r:gz')
tfile.extractall(destination)
dirname = tfile.getnames()[0]
elif filename.endswith('tar.bz2'):
tfile = tarfile.open(filename, 'r:bz2')
tfile.extractall(destination)
dirname = tfile.getnames()[0]
elif filename.endswith('zip'):
zfile = zipfile.ZipFile(filename)
zfile.extractall(destination)
dirname = zfile.namelist()[0]
else:
raise NotImplementedError('Unsupported archive type')
# a little trick to rename tool directories so they don't contain version number
rename_candidate = re.match(r'^([a-z][^\-]*\-*)+', dirname)
if rename_candidate is not None:
rename_to = rename_candidate.group(0).encode('ascii').strip('-')
if rename_to != dirname:
print('Renaming {0} to {1}'.format(dirname, rename_to))
if os.path.isdir(rename_to):
shutil.rmtree(rename_to)
shutil.move(dirname, rename_to)
开发者ID:digistump,项目名称:OakCore,代码行数:28,代码来源:get.py
示例8: extract
def extract(self):
# initialize the progress bar
self.progressbar.set_fraction(0)
self.progressbar.set_text(_('Installing'))
self.progressbar.show()
self.refresh_gtk()
extracted = False
try:
if self.common.paths['tarball_file'][-2:] == 'xz':
# if tarball is .tar.xz
xz = lzma.LZMAFile(self.common.paths['tarball_file'])
tf = tarfile.open(fileobj=xz)
tf.extractall(self.common.paths['tbb']['dir'])
extracted = True
else:
# if tarball is .tar.gz
if tarfile.is_tarfile(self.common.paths['tarball_file']):
tf = tarfile.open(self.common.paths['tarball_file'])
tf.extractall(self.common.paths['tbb']['dir'])
extracted = True
except:
pass
if not extracted:
self.set_gui('task', _("Tor Browser Launcher doesn't understand the file format of {0}".format(self.common.paths['tarball_file'])), ['start_over'], False)
self.clear_ui()
self.build_ui()
return
# installation is finished, so save installed_version
self.common.settings['installed_version'] = self.common.settings['latest_version']
self.common.save_settings()
self.run_task()
开发者ID:isislovecruft,项目名称:torbrowser-launcher,代码行数:35,代码来源:launcher.py
示例9: download_20newsgroups
def download_20newsgroups(target_dir, cache_path):
""" Download the 20Newsgroups data and convert is in a zipped pickle
storage.
"""
archive_path = os.path.join(target_dir, ARCHIVE_NAME)
train_path = os.path.join(target_dir, TRAIN_FOLDER)
test_path = os.path.join(target_dir, TEST_FOLDER)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
if not os.path.exists(archive_path):
logger.warn("Downloading dataset from %s (14 MB)", URL)
opener = urllib.urlopen(URL)
open(archive_path, 'wb').write(opener.read())
logger.info("Decompressing %s", archive_path)
tarfile.open(archive_path, "r:gz").extractall(path=target_dir)
os.remove(archive_path)
# Store a zipped pickle
cache = dict(
train=load_files(train_path),
test=load_files(test_path)
)
open(cache_path, 'wb').write(pickle.dumps(cache).encode('zip'))
shutil.rmtree(target_dir)
return cache
开发者ID:jolos,项目名称:scikit-learn,代码行数:28,代码来源:twenty_newsgroups.py
示例10: datafiles
def datafiles(self):
""" Get list of readable datafiles from asset (multiple filenames if tar or hdf file) """
path = os.path.dirname(self.filename)
indexfile = os.path.join(path, self.filename + '.index')
if os.path.exists(indexfile):
datafiles = File2List(indexfile)
if len(datafiles) > 0:
return datafiles
try:
if tarfile.is_tarfile(self.filename):
tfile = tarfile.open(self.filename)
tfile = tarfile.open(self.filename)
datafiles = tfile.getnames()
elif zipfile.is_zipfile(self.filename):
zfile = zipfile.ZipFile(self.filename)
datafiles = ['/vsizip/' + os.path.join(self.filename, f)
for f in zfile.namelist()]
else:
# Try subdatasets
fh = gdal.Open(self.filename)
sds = fh.GetSubDatasets()
datafiles = [s[0] for s in sds]
if len(datafiles) > 0:
List2File(datafiles, indexfile)
return datafiles
else:
return [self.filename]
except Exception as e:
raise Exception('Problem accessing asset(s) in {}\n ({})'
.format(self.filename, e))
开发者ID:Applied-GeoSolutions,项目名称:gips,代码行数:30,代码来源:core.py
示例11: _downloadAndExtractTarball
def _downloadAndExtractTarball(tarbalUrl, targetDir):
try:
remoteFile = urllib2.urlopen(tarbalUrl)
except Exception as ex:
print 'Failed contacting:', tarbalUrl, ' with error:"', ex, '" retrying...'
remoteFile = urllib2.urlopen(tarbalUrl)
try:
shutil.rmtree(targetDir, ignore_errors=True)
os.makedirs(targetDir)
except OSError:
pass
localTarBall = os.path.join(targetDir, os.path.basename(tarbalUrl))
targetFile = open(localTarBall, 'wb')
while True:
data = remoteFile.read()
if not data:
break
targetFile.write(data)
remoteFile.close()
targetFile.close()
print 'Expanding tarball:', localTarBall
tarfile.open(localTarBall, 'r:gz').extractall(targetDir)
开发者ID:gonzaraweb,项目名称:SlipStreamClient,代码行数:25,代码来源:slipstream.bootstrap.py
示例12: prepareTar
def prepareTar(doc, visibleTar=False):
# Finish the spec
specOutput = tempfile.NamedTemporaryFile(delete=False)
doc.finish(outputFilename=specOutput.name)
# Build the TAR file
if visibleTar:
tar = tarfile.open(name="test.tar", mode='w')
else:
f = tempfile.NamedTemporaryFile(delete=False)
tar = tarfile.open(fileobj=f, mode='w')
tar.add(specOutput.name, arcname="Overview.html")
additionalFiles = extensions.BSPublishAdditionalFiles(["images", "diagrams", "examples"])
for fname in additionalFiles:
try:
if isinstance(fname, basestring):
tar.add(fname)
elif isinstance(fname, list):
tar.add(fname[0], arcname=fname[1])
except OSError:
pass
tar.close()
specOutput.close()
os.remove(specOutput.name)
if visibleTar:
return open("test.tar", "rb")
else:
f.seek(0)
return f
开发者ID:garykac,项目名称:bikeshed,代码行数:28,代码来源:publish.py
示例13: extract_tar_archive
def extract_tar_archive(archive_path, destination_directory):
"""
Extracts the given tarball to the given destination directory.
It automatically handles the following compressed archives on both Python2
and Python3.
- gz
- xz
- bz2
- lzma
:param archive_path: The path to the archive which should be extracted.
:type archive_path: string
:param destination_directory: The directory where the files should be
extracted to. The directory does not have to exist prior to calling
this function; it will be automatically created, if not.
:type destination_directory: string
"""
# lzma (.lzma and .xz) compressed archives are not automatically
# uncompressed on python2.
if archive_path.endswith('.xz') or archive_path.endswith('.lzma'):
if not six.PY3:
with contextlib.closing(lzma.LZMAFile(archive_path)) as lzma_file:
with tarfile.open(fileobj=lzma_file) as archive_file:
archive_file.extractall(destination_directory)
return
# In all other cases, tarfile handles compression automatically
with tarfile.open(archive_path) as archive_file:
archive_file.extractall(destination_directory)
开发者ID:pombredanne,项目名称:DistroTracker,代码行数:30,代码来源:__init__.py
示例14: test_make_distribution_owner_group
def test_make_distribution_owner_group(self):
dist, cmd = self.get_cmd()
cmd.formats = ['gztar']
cmd.owner = pwd.getpwuid(0)[0]
cmd.group = grp.getgrgid(0)[0]
cmd.ensure_finalized()
cmd.run()
archive_name = join(self.tmp_dir, 'dist', 'fake-1.0.tar.gz')
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, 0)
self.assertEqual(member.gid, 0)
finally:
archive.close()
dist, cmd = self.get_cmd()
cmd.formats = ['gztar']
cmd.ensure_finalized()
cmd.run()
archive_name = join(self.tmp_dir, 'dist', 'fake-1.0.tar.gz')
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEqual(member.uid, os.getuid())
finally:
archive.close()
开发者ID:webiumsk,项目名称:WOT-0.9.15.1,代码行数:29,代码来源:test_sdist.py
示例15: read
def read(self, all_tags=False, self_provides=True, *extra_tags):
arfile = ar.Ar(fh = self.__file)
arfile.read()
debbin = arfile.get_file('debian-binary')
if debbin is None:
raise DebError(self.__path, 'no debian binary')
if debbin.read() != '2.0\n':
raise DebError(self.__path, 'invalid debian binary format')
control = arfile.get_file('control.tar.gz')
if control is not None:
# XXX: python2.4 relies on a name
tar = tarfile.open(name='control.tar.gz', fileobj=control)
else:
control = arfile.get_file('control.tar.xz')
if control is None:
raise DebError(self.__path, 'missing control.tar')
if not HAVE_LZMA:
raise DebError(self.__path, 'can\'t open control.tar.xz without python-lzma')
decompressed = lzma.decompress(control.read())
tar = tarfile.open(name="control.tar.xz",
fileobj=StringIO.StringIO(decompressed))
try:
name = './control'
# workaround for python2.4's tarfile module
if 'control' in tar.getnames():
name = 'control'
control = tar.extractfile(name)
except KeyError:
raise DebError(self.__path,
'missing \'control\' file in control.tar')
self.__parse_control(control, all_tags, self_provides, *extra_tags)
return self
开发者ID:adrianschroeter,项目名称:osc,代码行数:32,代码来源:debquery.py
示例16: extract
def extract(self, archive_file_path, output_file_path, progress):
output_file_path = u"\\\\?\\" + os.path.abspath(output_file_path)
logging.info("Extracting {0}".format(self.__url))
os.makedirs(output_file_path)
filename, extension = os.path.splitext(self.__file_name)
if extension == ".gz" or extension == ".tgz":
with tarfile.open(archive_file_path, 'r:gz') as arch:
arch.extractall(output_file_path)
elif extension == ".bz2":
with tarfile.open(archive_file_path, 'r:bz2') as arch:
arch.extractall(output_file_path)
elif extension == ".zip":
with zipfile.ZipFile(archive_file_path) as arch:
arch.extractall(output_file_path)
elif extension == ".7z":
subprocess.call(["7za", "x", archive_file_path, "-o{}".format(output_file_path)])
else:
logging.error("unsupported file extension {0}".format(extension))
for i in range(self.__tree_depth):
sub_dirs = os.listdir(output_file_path)
if len(sub_dirs) != 1:
raise ValueError("unexpected archive structure,"
" expected exactly one directory in {}".format(output_file_path))
source_dir = os.path.join(output_file_path, sub_dirs[0])
for src in os.listdir(source_dir):
shutil.move(os.path.join(source_dir, src), output_file_path)
shutil.rmtree(source_dir)
开发者ID:GrantSP,项目名称:modorganizer-umbrella,代码行数:32,代码来源:urldownload.py
示例17: handle
def handle(self, dump_path, **options):
if dump_path == "-":
arc = tarfile.open(fileobj=sys.stdin, mode="r:gz")
else:
arc = tarfile.open(dump_path, mode="r:gz")
base_path = tempfile.mkdtemp()
arc.extractall(path=base_path)
path = glob(os.path.join(base_path, "*"))[0]
# media files
# shutil.copytree(os.path.join(path, 'media'), settings.MEDIA_ROOT)
dir_util.copy_tree(os.path.join(path, "media"), settings.MEDIA_ROOT)
# load db fields
old_stdout = sys.stdout
sys.stdout = open(os.path.join(path, "backup_db_dump.json"), "w")
call_command("dumpdata", indent=4)
sys.stdout.close()
sys.stdout = old_stdout
call_command("flush", noinput=True, interactive=False)
call_command("reset", "contenttypes", "auth", noinput=True, interactive=False)
call_command("loaddata", os.path.join(path, "db_dump.json"))
# rebase FilepathFields
call_command("rebase_filepathfields", os.path.join(path, "fpf_bases_dump.json"))
开发者ID:gabrielgrant,项目名称:django-data-mover,代码行数:25,代码来源:unpack_data.py
示例18: run_simcoal
def run_simcoal(self, par_file, num_sims, ploydi='1', parDir=None):
if parDir is None:
parDir = os.sep.join([self.dataDir, 'SimCoal', 'runs'])
par_file_root = par_file[:-4]
tar_name = os.sep.join([self.cacheDir, ploydi, par_file_root +
'.tar.bz2'])
if os.access(tar_name, os.R_OK):
tf = tarfile.open(tar_name)
tar_num_sims = len(tf.getmembers()) - 3
else:
tar_num_sims = 0
if tar_num_sims >= num_sims:
tf.extractall(parDir)
tf.close()
return
else:
try:
tf.close()
except NameError:
pass # not opened in the first place, OK.
scc = SimCoalController(self.simcoalDir)
scc.run_simcoal(par_file, num_sims, ploydi, parDir)
tf = tarfile.open(tar_name, 'w:bz2')
tf.add(os.sep.join([parDir, par_file_root]), par_file_root)
tf.close()
开发者ID:bosborne,项目名称:BioPythonUtils,代码行数:25,代码来源:Cache.py
示例19: consolidate_tarballs_job
def consolidate_tarballs_job(job, fname_to_id):
"""
Combine the contents of separate tarballs into one.
Subdirs within the tarball will be named the keys in **fname_to_id
:param JobFunctionWrappingJob job: passed automatically by Toil
:param dict[str,str] fname_to_id: Dictionary of the form: file-name-prefix=FileStoreID
:return: The file store ID of the generated tarball
:rtype: str
"""
work_dir = job.fileStore.getLocalTempDir()
# Retrieve output file paths to consolidate
tar_paths = []
for fname, file_store_id in fname_to_id.iteritems():
p = job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, fname + '.tar.gz'))
tar_paths.append((p, fname))
# I/O
# output_name is arbitrary as this job function returns a FileStoreId
output_name = 'foo.tar.gz'
out_tar = os.path.join(work_dir, output_name)
# Consolidate separate tarballs into one
with tarfile.open(os.path.join(work_dir, out_tar), 'w:gz') as f_out:
for tar, fname in tar_paths:
with tarfile.open(tar, 'r') as f_in:
for tarinfo in f_in:
with closing(f_in.extractfile(tarinfo)) as f_in_file:
tarinfo.name = os.path.join(output_name, fname, os.path.basename(tarinfo.name))
f_out.addfile(tarinfo, fileobj=f_in_file)
return job.fileStore.writeGlobalFile(out_tar)
开发者ID:jsteward2930,项目名称:toil-scripts,代码行数:29,代码来源:files.py
示例20: run
def run(self, connection, args=None):
if not self.parameters.get(self.param_key, None): # idempotency
return connection
connection = super(ExtractRootfs, self).run(connection, args)
root = self.data['download_action'][self.param_key]['file']
root_dir = mkdtemp(basedir=DISPATCHER_DOWNLOAD_DIR)
if self.use_tarfile:
try:
tar = tarfile.open(root)
tar.extractall(root_dir)
tar.close()
except tarfile.TarError as exc:
raise JobError("Unable to unpack %s: '%s' - %s" % (self.param_key, os.path.basename(root), exc))
elif self.use_lzma:
with contextlib.closing(lzma.LZMAFile(root)) as xz:
with tarfile.open(fileobj=xz) as tarball:
try:
tarball.extractall(root_dir)
except tarfile.TarError as exc:
raise JobError("Unable to unpack %s: '%s' - %s" % (self.param_key, os.path.basename(root), exc))
else:
raise RuntimeError("Unable to decompress %s: '%s'" % (self.param_key, os.path.basename(root)))
self.set_common_data('file', self.file_key, root_dir)
self.logger.debug("Extracted %s to %s" % (self.file_key, root_dir))
return connection
开发者ID:dl9pf,项目名称:lava-dispatcher,代码行数:25,代码来源:apply_overlay.py
注:本文中的tarfile.open函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论