本文整理汇总了Python中pylzma.decompressobj函数的典型用法代码示例。如果您正苦于以下问题:Python decompressobj函数的具体用法?Python decompressobj怎么用?Python decompressobj使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了decompressobj函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _read_lzma2
def _read_lzma2(self, coder, input, level, num_coders):
size = self._uncompressed[level]
is_last_coder = (level + 1) == num_coders
if is_last_coder and not self._folder.solid:
dec = pylzma.decompressobj(maxlength=self._start+size, lzma2=True)
else:
dec = pylzma.decompressobj(lzma2=True)
try:
return self._read_from_decompressor(coder, dec, input, level, num_coders, with_cache=True)
except ValueError:
if self._is_encrypted():
raise WrongPasswordError('invalid password')
raise
开发者ID:fancycode,项目名称:pylzma,代码行数:14,代码来源:py7zlib.py
示例2: test_decompression_stream_props
def test_decompression_stream_props(self):
# test decompression with properties in separate step
decompress = pylzma.decompressobj()
data = decompress.decompress(self.plain_with_eos[:5])
data += decompress.decompress(self.plain_with_eos[5:])
data += decompress.flush()
self.assertEqual(data, self.plain)
开发者ID:hiddenillusion,项目名称:pylzma,代码行数:7,代码来源:test_pylzma.py
示例3: test_decompression_stream_two
def test_decompression_stream_two(self):
# test decompression in two steps
decompress = pylzma.decompressobj()
data = decompress.decompress(self.plain_with_eos[:10])
data += decompress.decompress(self.plain_with_eos[10:])
data += decompress.flush()
self.assertEqual(data, self.plain)
开发者ID:hiddenillusion,项目名称:pylzma,代码行数:7,代码来源:test_pylzma.py
示例4: unlzma
def unlzma(fi, fo, fi_close=True, fo_close=True, bufs=6553500):
""" Decompress `fi` into `fo` (`file` or filename) """
if isinstance(fi, str):
fi, fi_n = open(fi, "rb"), fi
# fi_close = True
if isinstance(fo, str):
fo, fo_n = open(fo, "wb"), fo
# fo_close = True
# i.seek(0)
# XXXX: better way?
# * s.decompress *requires* an `output buffer size`, i.e. size of the
# unpacked data, otherwise packed data is stored in internal buffers
# and returned on flush (which gets problematic).
# * Suggested default is to read by 1 byte and use the default buffer
# size. Which gets slow.
# * Nearest hax to fix is to use output buffer over 30x (or something)
# the size of input buffer. Which is not a nice thing to do, but
# works... mostly.
# * ... anyway, symptoms: slowdown on decompression down to zero speed,
# high memory usage (up to almost size of the uncompressed file),
# after which all the decompressed data is written in one go.
in_bufs = int(bufs / 100)
s = pylzma.decompressobj()
while True:
tmp = fi.read(in_bufs)
if not tmp:
break
fo.write(s.decompress(tmp, bufs))
fo.write(s.flush())
if fo_close:
fo.close()
if fi_close:
fi.close()
return fi, fo
开发者ID:HoverHell,项目名称:pyaux,代码行数:35,代码来源:lzcat.py
示例5: test_decompression_stream_reset
def test_decompression_stream_reset(self):
# test reset
decompress = pylzma.decompressobj()
data = decompress.decompress(self.plain_with_eos[:10])
decompress.reset()
data = decompress.decompress(self.plain_with_eos[:15])
data += decompress.decompress(self.plain_with_eos[15:])
data += decompress.flush()
self.assertEqual(data, self.plain)
开发者ID:hiddenillusion,项目名称:pylzma,代码行数:9,代码来源:test_pylzma.py
示例6: _read_lzma
def _read_lzma(self, coder, input):
dec = pylzma.decompressobj(maxlength=self._start+self.size)
try:
return self._read_from_decompressor(coder, dec, input, checkremaining=True, with_cache=True)
except ValueError:
if self._is_encrypted():
raise WrongPasswordError('invalid password')
raise
开发者ID:Allen-smith,项目名称:ctf-tools,代码行数:9,代码来源:7z2john.py
示例7: test_decompression_streaming_noeos
def test_decompression_streaming_noeos(self):
# test decompressing with one byte at a time...
decompress = pylzma.decompressobj(maxlength=len(self.plain))
infile = BytesIO(self.plain_without_eos)
outfile = BytesIO()
while 1:
data = infile.read(1)
if not data: break
outfile.write(decompress.decompress(data, 1))
outfile.write(decompress.flush())
self.assertEqual(outfile.getvalue(), self.plain)
开发者ID:hiddenillusion,项目名称:pylzma,代码行数:11,代码来源:test_pylzma.py
示例8: test_compress_large_stream
def test_compress_large_stream(self):
# decompress large block of repeating data, stream version (bug reported by Christopher Perkins)
data = bytes("asdf", 'ascii')*123456
decompress = pylzma.decompressobj()
infile = BytesIO(pylzma.compress(data))
outfile = BytesIO()
while 1:
tmp = infile.read(1)
if not tmp: break
outfile.write(decompress.decompress(tmp))
outfile.write(decompress.flush())
self.failUnless(data == outfile.getvalue())
开发者ID:hiddenillusion,项目名称:pylzma,代码行数:12,代码来源:test_pylzma.py
示例9: test_compress_large_stream_bigchunks
def test_compress_large_stream_bigchunks(self):
# decompress large block of repeating data, stream version with big chunks
data = bytes("asdf", 'ascii')*123456
decompress = pylzma.decompressobj()
infile = BytesIO(pylzma.compress(data))
outfile = BytesIO()
while 1:
tmp = infile.read(1024)
if not tmp: break
outfile.write(decompress.decompress(tmp))
outfile.write(decompress.flush())
self.failUnless(data == outfile.getvalue())
开发者ID:hiddenillusion,项目名称:pylzma,代码行数:12,代码来源:test_pylzma.py
示例10: unjsllzma
def unjsllzma(fi, fi_close=True, parse_fn=None, handle_fail=None, bufs=655350):
""" Make a generator for reading an lzma-compressed file with
json(or something else) in lines.
`parse_fn` is th function(v) to process lines with (defaults to
`json.loads`)
`handle_fail` if a fuction(value, exception) for handling a failure to
parse the value; value is skipped if it raises _IgnoreTheError
exception, otherwise its return value is yielded. default: skip all
failures.
"""
if parse_fn is None:
try:
import simplejson as json
except ImportError:
print("Error importing (preferred) simplejson")
import json
parse_fn = json.loads
if handle_fail is None:
handle_fail = _handle_fail_default
def try_loads(v):
try:
return parse_fn(v)
except Exception as e:
return handle_fail(v, e)
if isinstance(fi, str):
fi = open(fi, 'rb')
tmp2 = '' # buffer for unfunushed lines
in_bufs = int(bufs / 100) # XXX: see lzcat.py note around in_bufs
s = pylzma.decompressobj()
cont = True
while cont:
tmp = fi.read(in_bufs)
if not tmp: # nothing more can be read
tmp2 += s.flush()
cont = False
else:
# XXX: TODO: use bytearray.extend (likely).
tmp2 = tmp2 + s.decompress(tmp, bufs)
tmp3 = tmp2.split('\n') # finished and unfinished lines
for v in tmp3[:-1]:
try:
r = try_loads(v)
except _IgnoreTheError:
continue # no more handling requested, just skip it
yield r
tmp2 = tmp3[-1]
if fi_close:
fi.close()
开发者ID:HoverHell,项目名称:pyaux,代码行数:52,代码来源:lzmah.py
示例11: decomp_lzma
def decomp_lzma(inputname, outputname):
prev_time = datetime.now()
comp_file = open(inputname, 'rb')
ret_file = open(outputname, 'wb')
obj = pylzma.decompressobj()
while True:
tmp = comp_file.read(8192)
if not tmp: break
ret_file.write(obj.decompress(tmp))
ret_file.write(obj.flush())
comp_file.close()
ret_file.close()
time_diff = str(datetime.now()-prev_time)
return outputname, str(time_diff)
开发者ID:Navaneethsen,项目名称:elijah-cloudlet,代码行数:16,代码来源:lib_old_cloudlet.py
示例12: decomp_lzma
def decomp_lzma(inputname, outputname):
prev_time = datetime.now()
comp_file = open(inputname, 'rb')
ret_file = open(outputname, 'wb')
obj = pylzma.decompressobj()
while True:
tmp = comp_file.read(8192)
if not tmp: break
ret_file.write(obj.decompress(tmp))
ret_file.write(obj.flush())
comp_file.close()
ret_file.close()
time_diff = (datetime.now()-prev_time)
if time_diff.seconds == 0:
return outputname, str(time_diff), '-1'
else:
return outputname, str(time_diff), str(os.path.getsize(inputname)/time_diff.seconds)
开发者ID:Navaneethsen,项目名称:elijah-cloudlet,代码行数:17,代码来源:decomp_compare.py
示例13: read
def read(self):
data = ''
idx = 0
cnt = 0
dec = pylzma.decompressobj(maxlength=self._start+self.size)
self._file.seek(self._src_start)
dec.decompress(self._folder.coders[0]['properties'])
total = self.compressed
if total is None:
remaining = self._start+self.size
out = StringIO()
while remaining > 0:
data = self._file.read(1024)
tmp = dec.decompress(data, remaining)
out.write(tmp)
remaining -= len(tmp)
data = out.getvalue()
else:
data = dec.decompress(self._file.read(total), self._start+self.size)
return data[self._start:self._start+self.size]
开发者ID:9072997,项目名称:wikireader,代码行数:21,代码来源:py7zlib.py
示例14: decomp_worker
def decomp_worker(in_queue, out_queue, time_queue):
start_time = datetime.now()
data_size = 0
counter = 0
obj = pylzma.decompressobj()
while True:
chunk = in_queue.get()
if chunk == END_OF_FILE:
break
data_size = data_size + len(chunk)
decomp_chunk = obj.decompress(chunk)
#print "in decomp : %d %d" % (data_size, len(decomp_chunk))
in_queue.task_done()
out_queue.put(decomp_chunk)
counter = counter + 1
out_queue.put(END_OF_FILE)
end_time = datetime.now()
time_queue.put({'start_time':start_time, 'end_time':end_time})
print "[Decomp] : (%s)-(%s)=(%s) (%d loop, %d bytes)" % (start_time.strftime('%X'), end_time.strftime('%X'), str(end_time-start_time), counter, data_size)
开发者ID:Balakrishnan-Vivek,项目名称:elijah-cloudlet,代码行数:21,代码来源:lib_old_synthesis.py
示例15: read_size
def read_size( self, toread ):
data = ''
idx = 0 #IGNORE:W0612
cnt = 0 #IGNORE:W0612
dec = pylzma.decompressobj( maxlength = self._start + toread )
self._file.seek( self._src_start )
dec.decompress( self._folder.coders[0]['properties'] )
total = self.compressed #IGNORE:E1101
if total is None:
remaining = self._start + toread
out = StringIO()
while remaining > 0:
data = self._file.read( 1024 )
tmp = dec.decompress( data, remaining )
out.write( tmp )
remaining -= len( tmp )
data = out.getvalue()
else:
data = dec.decompress( self._file.read( total ), self._start + toread )
return data[self._start:self._start + toread]
开发者ID:SiloDS,项目名称:RToolDS,代码行数:21,代码来源:py7zlib2.py
示例16: setup_0_extracted
def setup_0_extracted(self):
"""Decompress setup-0 data to disk"""
# decompress setup-0.bin data
DecompressBuffer = 4096
DecompressCRCSize = 4
f = open(self.filename, 'rb')
f.seek(self.TSetupLdrOffsetTable['Offset0'] + self.SetupIDSize + self.CRCCompressedBlockHeaderSize)
decompress = pylzma.decompressobj()
with open(self.setup_0_filename, 'wb') as o:
read_count = 0
while read_count < self.TCompressedBlockHeader['StoredSize']:
crc = f.read(DecompressCRCSize)
data = f.read(DecompressBuffer)
#assert(zlib.crc32(data) == struct.unpack('<l', crc)[0])
o.write(decompress.decompress(data, DecompressBuffer))
read_count += len(crc) + len(data)
o.write(decompress.flush())
f.close()
return True
开发者ID:lkraider,项目名称:innounpy,代码行数:22,代码来源:innounpy.py
示例17: decomp_worker
def decomp_worker(in_path, out_path, time_queue):
in_file = open(in_path, "rb")
out_file = open(out_path, "w+b")
start_time = datetime.now()
data_size = 0
counter = 0
obj = pylzma.decompressobj()
while True:
chunk = in_file.read(CHUNK_SIZE)
if not chunk:
break
data_size = data_size + len(chunk)
decomp_chunk = obj.decompress(chunk)
#print "in decomp : %d %d" % (data_size, len(decomp_chunk))
out_file.write(decomp_chunk)
counter = counter + 1
in_file.close()
out_file.close()
end_time = datetime.now()
time_queue.put({'start_time':start_time, 'end_time':end_time})
print "[Decomp] : (%s)-(%s)=(%s) (%d loop, %d bytes)" % (start_time.strftime('%X'), end_time.strftime('%X'), str(end_time-start_time), counter, data_size)
开发者ID:Balakrishnan-Vivek,项目名称:elijah-cloudlet,代码行数:23,代码来源:lib_old_synthesis_sequential.py
示例18: len
#!/usr/bin/env python3
import pylzma
import sys
import struct
if len(sys.argv) < 3:
print("Usage: ./decompress-sc.py [input.sc] [output.scf]")
exit(0)
with open(sys.argv[1], 'rb') as fin, open(sys.argv[2], 'wb') as fout:
header = fin.read(5)
raw_size = fin.read(4)
size = struct.unpack('<I', raw_size)[0]
obj = pylzma.decompressobj(maxlength=size)
fout.write(obj.decompress(header))
while True:
chunk = fin.read(4096)
if not chunk:
break
fout.write(obj.decompress(chunk))
# Sometimes the following line will crash with
# 'Negative size passed to PyBytes_FromStringAndSize'
try:
fout.write(obj.flush())
except SystemError as e:
print(e)
# decompress-sc.py --- Decompress CoC SC files.
# Copyright (C) 2013 kennytm
开发者ID:JamShan,项目名称:CoCCalc,代码行数:31,代码来源:decompress-sc.py
示例19: __init__
def __init__(self, fp, length):
FileEntry.__init__(self, fp, length)
self.decomp = pylzma.decompressobj()
self.buffer = ""
开发者ID:dhirajkhatiwada1,项目名称:uludag,代码行数:4,代码来源:zipfileext.py
示例20: _read_lzma
def _read_lzma(self, coder, input):
dec = pylzma.decompressobj(maxlength=self._start+self.size)
return self._read_from_decompressor(coder, dec, checkremaining=True)
开发者ID:AsKaGeeK,项目名称:romcollectionbrowser,代码行数:3,代码来源:py7zlib.py
注:本文中的pylzma.decompressobj函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论