本文整理汇总了Python中tarfile.TarFile类的典型用法代码示例。如果您正苦于以下问题:Python TarFile类的具体用法?Python TarFile怎么用?Python TarFile使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TarFile类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: install_repo
def install_repo(self, repo):
if repo in KNOWN_PUBLIC_REPOS:
repo = KNOWN_PUBLIC_REPOS[repo]['path'] # replace it by the url
git_path = which('git')
if not git_path:
return ('git command not found: You need to have git installed on '
'your system to be able to install git based plugins.', )
# TODO: Update download path of plugin.
if repo.endswith('tar.gz'):
tar = TarFile(fileobj=urlopen(repo))
tar.extractall(path=self.plugin_dir)
s = repo.split(':')[-1].split('/')[-2:]
human_name = '/'.join(s).rstrip('.tar.gz')
else:
human_name = human_name_for_git_url(repo)
p = subprocess.Popen([git_path, 'clone', repo, human_name], cwd=self.plugin_dir, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
feedback = p.stdout.read().decode('utf-8')
error_feedback = p.stderr.read().decode('utf-8')
if p.wait():
return "Could not load this plugin: \n\n%s\n\n---\n\n%s" % (feedback, error_feedback),
self.add_plugin_repo(human_name, repo)
return self.update_dynamic_plugins()
开发者ID:s905060,项目名称:errbot,代码行数:25,代码来源:plugin_manager.py
示例2: install_packages
def install_packages(self):
self._update_package_with_install_path()
installed_packages = []
for p in self._packages:
if 'install_from' not in p:
print("[dem] Could not find package: {}, version: {}".format(p['name'], p['version']))
else:
if not self._cache.is_package_installed(p['name'], p['version']):
print('[dem] installing {}-{}'.format(p['name'], p['version']))
if p['install_from_ext'] == 'zip':
with ZipFile(p['install_from'], 'r') as archive:
locations = self._extract(archive, p)
elif p['install_from_ext'] == 'tar.gz':
with TarFile.open(p['install_from'], 'r:gz') as archive:
locations = self._extract(archive, p)
elif p['install_from_ext'] == 'tar.bz2':
with TarFile.open(p['install_from'], 'r:bz2') as archive:
locations = self._extract(archive, p)
elif p['install_from_ext'] == 'gz':
with gzip.open(p['install_from'], 'r') as archive:
locations = self._extract(archive, p)
if 'pkg-config' in p:
PkgConfigProcessor.replace_prefix(locations, p['pkg-config'])
else:
print('[dem] {}-{} already installed'.format(p['name'], p['version']))
locations = self._cache.install_locations(p['name'])
package = dict()
package[p['name']] = {'version': p['version'], 'type': 'local', 'install_locations': locations}
installed_packages.append(package)
return installed_packages
开发者ID:nitehawck,项目名称:dem,代码行数:31,代码来源:archive.py
示例3: rebuild
def rebuild(filename, tag=None, format="gz"):
import tempfile, shutil
tmpdir = tempfile.mkdtemp()
zonedir = os.path.join(tmpdir, "zoneinfo")
moduledir = os.path.dirname(__file__)
if tag: tag = "-"+tag
targetname = "zoneinfo%s.tar.%s" % (tag, format)
try:
tf = TarFile.open(filename)
# The "backwards" zone file contains links to other files, so must be
# processed as last
for name in sorted(tf.getnames(),
key=lambda k: k != "backward" and k or "z"):
if not (name.endswith(".sh") or
name.endswith(".tab") or
name == "leapseconds"):
tf.extract(name, tmpdir)
filepath = os.path.join(tmpdir, name)
os.system("zic -d %s %s" % (zonedir, filepath))
tf.close()
target = os.path.join(moduledir, targetname)
for entry in os.listdir(moduledir):
if entry.startswith("zoneinfo") and ".tar." in entry:
os.unlink(os.path.join(moduledir, entry))
tf = TarFile.open(target, "w:%s" % format)
for entry in os.listdir(zonedir):
entrypath = os.path.join(zonedir, entry)
tf.add(entrypath, entry)
tf.close()
finally:
shutil.rmtree(tmpdir)
开发者ID:ljxangus,项目名称:SNS,代码行数:31,代码来源:__init__.py
示例4: rebuild
def rebuild(filename, tag=None, format="gz", zonegroups=[], metadata=None):
"""Rebuild the internal timezone info in dateutil/zoneinfo/zoneinfo*tar*
filename is the timezone tarball from ``ftp.iana.org/tz``.
"""
tmpdir = tempfile.mkdtemp()
zonedir = os.path.join(tmpdir, "zoneinfo")
moduledir = os.path.dirname(__file__)
try:
with TarFile.open(filename) as tf:
for name in zonegroups:
tf.extract(name, tmpdir)
filepaths = [os.path.join(tmpdir, n) for n in zonegroups]
try:
check_call(["zic", "-d", zonedir] + filepaths)
except OSError as e:
_print_on_nosuchfile(e)
raise
# write metadata file
with open(os.path.join(zonedir, METADATA_FN), 'w') as f:
json.dump(metadata, f, indent=4, sort_keys=True)
target = os.path.join(moduledir, ZONEFILENAME)
with TarFile.open(target, "w:%s" % format) as tf:
for entry in os.listdir(zonedir):
entrypath = os.path.join(zonedir, entry)
tf.add(entrypath, entry)
finally:
shutil.rmtree(tmpdir)
开发者ID:ArthurGarnier,项目名称:SickRage,代码行数:29,代码来源:rebuild.py
示例5: create_archive
def create_archive(self):
(handle, path) = mkstemp(dir=self.temp_dir)
os.close(handle)
archive = TarFile(path, mode="w")
archive.add(os.path.join(_common.RSRC, "full.mp3"), "full.mp3")
archive.close()
return path
开发者ID:jerryh91,项目名称:beets,代码行数:7,代码来源:test_importer.py
示例6: test_tar_experiment_download
def test_tar_experiment_download(self):
self.assertTrue(all(df.verified for df in self.dfs))
response = self.client.get(reverse(
'tardis.tardis_portal.download.streaming_download_experiment',
args=(self.exp.id, 'tar')))
with NamedTemporaryFile('w') as tarfile:
for c in response.streaming_content:
tarfile.write(c)
tarfile.flush()
self.assertEqual(int(response['Content-Length']),
os.stat(tarfile.name).st_size)
tf = TarFile(tarfile.name)
if settings.EXP_SPACES_TO_UNDERSCORES:
exp_title = self.exp.title.replace(' ', '_')
else:
exp_title = self.exp.title
exp_title = quote(exp_title,
safe=settings.SAFE_FILESYSTEM_CHARACTERS)
for df in self.dfs:
full_path = os.path.join(
exp_title,
quote(self.ds.description,
safe=settings.SAFE_FILESYSTEM_CHARACTERS),
df.directory, df.filename)
# docker has a file path limit of ~240 characters
if os.environ.get('DOCKER_BUILD', 'false') != 'true':
tf.extract(full_path, '/tmp')
self.assertEqual(
os.stat(os.path.join('/tmp', full_path)).st_size,
int(df.size))
开发者ID:keithschulze,项目名称:mytardis,代码行数:30,代码来源:test_tar_download.py
示例7: rebuild
def rebuild(filename, tag=None, format="gz"):
import tempfile, shutil
tmpdir = tempfile.mkdtemp()
zonedir = os.path.join(tmpdir, "zoneinfo")
moduledir = os.path.dirname(__file__)
if tag: tag = "-"+tag
targetname = "zoneinfo%s.tar.%s" % (tag, format)
try:
tf = TarFile.open(filename)
for name in tf.getnames():
if not (name.endswith(".sh") or
name.endswith(".tab") or
name == "leapseconds"):
tf.extract(name, tmpdir)
filepath = os.path.join(tmpdir, name)
os.system("zic -d %s %s" % (zonedir, filepath))
tf.close()
target = os.path.join(moduledir, targetname)
for entry in os.listdir(moduledir):
if entry.startswith("zoneinfo") and ".tar." in entry:
os.unlink(os.path.join(moduledir, entry))
tf = TarFile.open(target, "w:%s" % format)
for entry in os.listdir(zonedir):
entrypath = os.path.join(zonedir, entry)
tf.add(entrypath, entry)
tf.close()
finally:
shutil.rmtree(tmpdir)
开发者ID:mcepar1,项目名称:Scheduler,代码行数:28,代码来源:__init__.py
示例8: install
def install(self, mess, args):
""" install a plugin repository from the given source or a known public repo (see !repos to find those).
for example from a known repo : !install err-codebot
for example a git url : [email protected]:gbin/plugin.git
or an url towards a tar.gz archive : http://www.gootz.net/plugin-latest.tar.gz
"""
if not args.strip():
return "You should have an urls/git repo argument"
if args in KNOWN_PUBLIC_REPOS:
args = KNOWN_PUBLIC_REPOS[args][0] # replace it by the url
git_path = which('git')
if not git_path:
return 'git command not found: You need to have git installed on your system to by able to install git based plugins.'
if args.endswith('tar.gz'):
tar = TarFile(fileobj=urlopen(args))
tar.extractall(path= PLUGIN_DIR)
human_name = args.split('/')[-1][:-7]
else:
human_name = human_name_for_git_url(args)
p = subprocess.Popen([git_path, 'clone', args, human_name], cwd = PLUGIN_DIR, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
feedback = p.stdout.read()
error_feedback = p.stderr.read()
if p.wait():
return "Could not load this plugin : \n%s\n---\n%s" % (feedback, error_feedback)
self.add_plugin_repo(human_name, args)
errors = self.update_dynamic_plugins()
if errors:
self.send(mess.getFrom(), 'Some plugins are generating errors:\n' + '\n'.join(errors) , message_type=mess.getType())
else:
self.send(mess.getFrom(), "A new plugin repository named %s has been installed correctly from %s. Refreshing the plugins commands..." % (human_name, args), message_type=mess.getType())
self.activate_non_started_plugins()
return "Plugin reload done."
开发者ID:glenbot,项目名称:err,代码行数:34,代码来源:errBot.py
示例9: _createScriptExtensionTarArchive
def _createScriptExtensionTarArchive(self, sourceDirectory, scriptExtensionName):
""" Creates a TAR archive for the given script extension. """
tarFileName = scriptExtensionName + ".tar"
tarFilePath = os.path.join(self.__buildConfiguration.distDirectory, tarFileName)
tarFile = TarFile(tarFilePath, "w")
for inputDirectory in ["lib", "src"]:
baseDirectory = os.path.join(sourceDirectory, inputDirectory)
if os.path.exists(baseDirectory):
for packageDirName in os.listdir(baseDirectory):
pythonModulesToAddList = list()
packageDirectory = os.path.join(baseDirectory, packageDirName)
if os.path.exists(packageDirectory):
for walkTuple in os.walk(packageDirectory):
directoryPath = walkTuple[0]
fileNameList = walkTuple[2]
for fileName in fileNameList:
if fileName.endswith(".py") or fileName == "SCRIPTS":
filePath = os.path.join(directoryPath, fileName)
pythonModulesToAddList.append(filePath)
for pythonModule in pythonModulesToAddList:
startPosition = pythonModule.find(baseDirectory) + len(baseDirectory) + 1
archiveName = pythonModule[startPosition:]
tarFile.add(pythonModule, archiveName)
tarFile.close()
if self.verbose:
print("Created tar archive '%s'." % tarFilePath)
开发者ID:DLR-SC,项目名称:DataFinder,代码行数:29,代码来源:package_script_extension.py
示例10: main
def main(argv):
import getopt
def usage():
print('usage: %s [-b basedir] cmd [arg ...]' % argv[0])
return 100
try:
(opts, args) = getopt.getopt(argv[1:], 'db:')
except getopt.GetoptError:
return usage()
debug = 0
basedir = 'tar'
for (k, v) in opts:
if k == '-d': debug += 1
elif k == '-b': basedir = v
tardb = TarDB(basedir)
if not args: return usage()
cmd = args.pop(0)
if cmd == 'create':
tardb.create()
elif cmd == 'import':
tardb.open()
for path in args:
tar = TarFile(path)
while True:
info = tar.next()
if info is None: break
fp = tar.fileobj
fp.seek(info.offset+BLOCKSIZE)
data = fp.read(info.size)
tardb.add_record(info, data)
tardb.flush()
tardb.close()
elif cmd == 'add':
tardb.open()
for path in args:
name = os.path.basename(path)
info = TarInfo(name)
with open(path, 'rb') as fp:
data = fp.read()
recno = tardb.add_record(info, data)
print(recno)
tardb.close()
elif cmd == 'get':
tardb.open()
for recno in args:
recno = int(recno)
(_, data) = tardb.get_recinfo(recno, True)
sys.stdout.buffer.write(data)
tardb.close()
elif cmd == 'getinfo':
tardb.open()
for recno in args:
recno = int(recno)
(info, _) = tardb.get_recinfo(recno, False)
print(info)
tardb.close()
else:
return usage()
return 0
开发者ID:euske,项目名称:verbose-giggle,代码行数:59,代码来源:tardb.py
示例11: move_certs
def move_certs(self, paths):
self.log.info("Staging internal ssl certs for %s", self._log_name)
yield self.pull_image(self.move_certs_image)
# create the volume
volume_name = self.format_volume_name(self.certs_volume_name, self)
# create volume passes even if it already exists
self.log.info("Creating ssl volume %s for %s", volume_name, self._log_name)
yield self.docker('create_volume', volume_name)
# create a tar archive of the internal cert files
# docker.put_archive takes a tarfile and a running container
# and unpacks the archive into the container
nb_paths = {}
tar_buf = BytesIO()
archive = TarFile(fileobj=tar_buf, mode='w')
for key, hub_path in paths.items():
fname = os.path.basename(hub_path)
nb_paths[key] = '/certs/' + fname
with open(hub_path, 'rb') as f:
content = f.read()
tarinfo = TarInfo(name=fname)
tarinfo.size = len(content)
tarinfo.mtime = os.stat(hub_path).st_mtime
tarinfo.mode = 0o644
archive.addfile(tarinfo, BytesIO(content))
archive.close()
tar_buf.seek(0)
# run a container to stage the certs,
# mounting the volume at /certs/
host_config = self.client.create_host_config(
binds={
volume_name: {"bind": "/certs", "mode": "rw"},
},
)
container = yield self.docker('create_container',
self.move_certs_image,
volumes=["/certs"],
host_config=host_config,
)
container_id = container['Id']
self.log.debug(
"Container %s is creating ssl certs for %s",
container_id[:12], self._log_name,
)
# start the container
yield self.docker('start', container_id)
# stage the archive to the container
try:
yield self.docker(
'put_archive',
container=container_id,
path='/certs',
data=tar_buf,
)
finally:
yield self.docker('remove_container', container_id)
return nb_paths
开发者ID:jupyterhub,项目名称:dockerspawner,代码行数:59,代码来源:dockerspawner.py
示例12: read_file_from_image
def read_file_from_image(img: tarfile.TarFile,
file_path: str,
autoclose=False) -> bytes:
if autoclose:
with closing(img.extractfile(file_path)) as fd:
return fd.read()
else:
return img.extractfile(file_path).read()
开发者ID:yege0201,项目名称:dockerscan,代码行数:8,代码来源:docker_api.py
示例13: generate_tar
def generate_tar(entries):
tar_buf = BytesIO()
tar_file = TarFile(mode="w", fileobj=tar_buf)
for path, contents in entries.items():
tar_info = TarInfo(name=path)
tar_info.size = len(contents)
tar_file.addfile(tar_info, fileobj=BytesIO(contents))
return BytesIO(tar_buf.getvalue())
开发者ID:thepwagner,项目名称:flotilla,代码行数:8,代码来源:test_revision.py
示例14: parse_backup_label
def parse_backup_label(self, basebackup_path):
tar = TarFile(basebackup_path)
content = tar.extractfile("backup_label").read() # pylint: disable=no-member
for line in content.split(b"\n"):
if line.startswith(b"START WAL LOCATION"):
start_wal_segment = line.split(b" ")[5].strip(b")").decode("utf8")
self.log.debug("Found: %r as starting wal segment", start_wal_segment)
return start_wal_segment
开发者ID:Ormod,项目名称:pghoard,代码行数:8,代码来源:basebackup.py
示例15: reader
def reader(self):
"""Package up filesystem contents as a tarball."""
result = BytesIO()
tarball = TarFile(fileobj=result, mode="w")
for child in self.path.children():
tarball.add(child.path, arcname=child.basename(), recursive=True)
tarball.close()
result.seek(0, 0)
yield result
开发者ID:networkelements,项目名称:flocker,代码行数:9,代码来源:memory.py
示例16: download
def download(self):
"""
Ein Download wird ausgeführt
"""
self.init2() # Basisklasse einrichten
simulation = self.request.POST.get("simulation", False)
self._setup_path()
if simulation:
self.request.echo("<h1>Download Simulation!</h1><pre>")
self.request.echo("request path: %s\n" % self.request_path)
log_typ = "download simulation start"
else:
log_typ = "download start"
self.db.log(log_typ, self.context['request_path'])
artist = self.request.POST.get("artist", "")
album = self.request.POST.get("album", "")
files, _ = self._read_dir()
args = {"prefix": "PyDown_%s_" % self.request.environ["REMOTE_USER"]}
if self.request.cfg["temp"]:
args["dir"] = self.request.cfg["temp"]
temp = NamedTemporaryFile(**args)
tar = TarFile(mode="w", fileobj=temp)
if simulation:
self.request.write("-"*80)
self.request.write("\n")
for file_info in files:
filename = file_info[0]
abs_path = posixpath.join(self.request_path, filename)
arcname = posixpath.join(artist, album, filename)
if simulation:
#~ self.request.write("absolute path..: %s\n" % abs_path)
self.request.write("<strong>%s</strong>\n" % arcname)
try:
tar.add(abs_path, arcname)
except IOError, e:
self.request.write("<h1>Error</h1><h2>Can't create archive: %s</h2>" % e)
try:
tar.close()
except:
pass
try:
temp.close()
except:
pass
return
开发者ID:Aaron1011,项目名称:python-code-snippets,代码行数:56,代码来源:PyDown.py
示例17: TarFileWrapper
class TarFileWrapper(ArchiveFileWrapper):
def __init__(self, fh, *args, **kwargs):
self.archive = TarFile(fileobj=fh)
super(TarFileWrapper, self).__init__(*args, **kwargs)
def extract_file(self, *args, **kwarg):
return self.archive.extractfile(*args, **kwarg)
def names(self):
return self.archive.getnames()
开发者ID:fcurella,项目名称:cookiejar,代码行数:10,代码来源:extractor.py
示例18: download
def download(self, src, dest, extract_here=False):
client = connect()
with SpooledTemporaryFile() as file:
file.write(client.copy(self.container_id, src).read())
file.seek(0)
tfile = TarFile(fileobj=file)
if extract_here:
base = len(os.path.basename(src)) + 1
for member in tfile.getmembers():
member.name = member.name[base:]
tfile.extractall(path=dest)
开发者ID:pombredanne,项目名称:meuh-python,代码行数:12,代码来源:bot.py
示例19: extract_package
def extract_package(package_name, path, logger):
try:
if ".tar" in package_name:
TarFile.open(package_name).extractall(path)
elif ".zip" in package_name:
ZipFile(package_name).extractall(path)
else:
raise FileTypeError("It's not a TAR or ZIP archive.")
except Exception as err:
register_exception(alert_admin=True,
prefix="Elsevier error extracting package.")
logger.error("Error extraction package file: %s %s"
% (path, err))
开发者ID:Dziolas,项目名称:scoap3_old,代码行数:13,代码来源:scoap3utils.py
示例20: __enter__
def __enter__(self):
if self.tf is not None:
raise ValueError('Cannot re-enter')
if '://' in self.web_archive:
info('Downloading from {0}'.format(self.web_archive))
dl = requests.get(self.web_archive)
self.tf = TarFile.open(path(self.web_archive).basename(),
'r:*', fileobj=io.BytesIO(dl.content))
else:
self.tf = TarFile.open(self.web_archive)
return self.tf
开发者ID:cecedille1,项目名称:Sett,代码行数:13,代码来源:tar.py
注:本文中的tarfile.TarFile类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论