本文整理汇总了Python中tempfile.TemporaryFile类的典型用法代码示例。如果您正苦于以下问题:Python TemporaryFile类的具体用法?Python TemporaryFile怎么用?Python TemporaryFile使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TemporaryFile类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: pyc
def pyc(self):
ofile=TemporaryFile('w+t')
if self.ua:
dst=ANSWER_PATH+self.id+'.pyc'
else:
dst=BINARY_PATH+self.id+'.pyc'
cmd=['python',dst]
p=Popen(cmd,stdin=self.ifile,stdout=ofile,universal_newlines=True,
preexec_fn=Tester.Limiter(self.lcpu,self.lmem),stderr=DEVNULL)
p.wait()
self.result=0
if p.returncode==-9:
self.result=-5
elif p.returncode==-11:
self.result=-6
elif p.returncode==-25:
self.result=-4
elif p.returncode<0:
self.result=-3
else:
ofile.seek(0)
if self.output!=ofile.read(-1):
self.result=-7
pass
开发者ID:LiQuidFly,项目名称:OnlineJudge,代码行数:26,代码来源:judge.py
示例2: PackageZipBuilder
class PackageZipBuilder(object):
def __init__(self, namespace, version=None):
self.namespace = namespace
self.version = version
def open_zip(self):
self.zip_file = TemporaryFile()
self.zip= ZipFile(self.zip_file, 'w')
def install_package(self):
self.open_zip()
if not self.version:
raise ValueError('You must provide a version to install a package')
package_xml = PACKAGE_XML % self.namespace
#package_xml = package_xml.encode('utf-8')
self.zip.writestr('package.xml', package_xml)
installed_package = INSTALLED_PACKAGE % self.version
#installed_package.encode('utf-8')
self.zip.writestr('installedPackages/%s.installedPackage' % self.namespace, installed_package)
return self.encode_zip()
def uninstall_package(self):
self.open_zip()
self.zip.writestr('package.xml', EMPTY_PACKAGE_XML)
self.zip.writestr('destructiveChanges.xml', PACKAGE_XML % self.namespace)
return self.encode_zip()
def encode_zip(self):
self.zip.close()
self.zip_file.seek(0)
return b64encode(self.zip_file.read())
开发者ID:Coca-ColaEnterprises,项目名称:mrbelvedere,代码行数:35,代码来源:package.py
示例3: test_read_several
def test_read_several(self):
"""Read several stanzas from file"""
tmpf = TemporaryFile()
tmpf.write("""\
version_header: 1
name: foo
val: 123
name: quoted
address: "Willowglen"
\t 42 Wallaby Way
\t Sydney
name: bar
val: 129319
""")
tmpf.seek(0)
s = read_stanza(tmpf)
self.assertEquals(s, Stanza(version_header='1'))
s = read_stanza(tmpf)
self.assertEquals(s, Stanza(name="foo", val='123'))
s = read_stanza(tmpf)
self.assertEqualDiff(s.get('name'), 'quoted')
self.assertEqualDiff(s.get('address'), ' "Willowglen"\n 42 Wallaby Way\n Sydney')
s = read_stanza(tmpf)
self.assertEquals(s, Stanza(name="bar", val='129319'))
s = read_stanza(tmpf)
self.assertEquals(s, None)
self.check_rio_file(tmpf)
开发者ID:Distrotech,项目名称:bzr,代码行数:30,代码来源:test_rio.py
示例4: set_sff_trimpoints_with_sfftools
def set_sff_trimpoints_with_sfftools(
sff_dir, technical_lengths, sffinfo_path='sffinfo', sfffile_path='sfffile',
debug=False):
"""Set trimpoints to end of technical read for all SFF files in directory.
This function essentially provides the reference implementation.
It uses the official sfftools from Roche to process the SFF files.
"""
if not (exists(sffinfo_path) or which(sffinfo_path)):
raise ApplicationNotFoundError(
'sffinfo executable not found. Is it installed and in your $PATH?')
if not (exists(sfffile_path) or which(sfffile_path)):
raise ApplicationNotFoundError(
'sfffile executable not found. Is it installed and in your $PATH?')
for lib_id, sff_fp in get_per_lib_sff_fps(sff_dir):
try:
readlength = technical_lengths[lib_id]
except KeyError:
continue
sffinfo_args = [sffinfo_path, '-s', sff_fp]
if debug:
print "Running sffinfo command %s" % sffinfo_args
sffinfo_output_file = TemporaryFile()
check_call(sffinfo_args, stdout=sffinfo_output_file)
sffinfo_output_file.seek(0)
seqlengths = {}
for line in sffinfo_output_file:
if line.startswith('>'):
fields = line[1:].split()
seq_len = fields[1].split('=')[1]
seqlengths[fields[0]] = seq_len
trim_fp = sff_fp + '.trim'
trim_file = open(trim_fp, 'w')
for id_, length in seqlengths.items():
curr_length = int(seqlengths[id_])
# Sfftools use 1-based index
left_trim = readlength + 1
# Key sequence not included in FASTA length
right_trim = curr_length + 4
if curr_length > left_trim:
trim_file.write(
"%s\t%s\t%s\n" % (id_, left_trim, right_trim))
else:
stderr.write(
'Rejected read %s with trim points %s and %s (orig '
'length %s)' % (id_, left_trim, curr_length, length))
trim_file.close()
trimmed_sff_fp = sff_fp + '.trimmed'
sfffile_args = [
sfffile_path, '-t', trim_fp, '-o', trimmed_sff_fp, sff_fp]
if debug:
print "Running sfffile command:", sfffile_args
check_call(sfffile_args, stdout=open(devnull, 'w'))
remove(sff_fp)
rename(trimmed_sff_fp, sff_fp)
开发者ID:AhmedAbdelfattah,项目名称:qiime,代码行数:60,代码来源:trim_sff_primers.py
示例5: test_get_xml_iter
def test_get_xml_iter():
#1 file object
#2 stream (file-like)
#3 string
#4 zipfile
from openpyxl.reader.worksheet import _get_xml_iter
from tempfile import TemporaryFile
FUT = _get_xml_iter
s = ""
stream = FUT(s)
assert isinstance(stream, BytesIO), type(stream)
u = unicode(s)
stream = FUT(u)
assert isinstance(stream, BytesIO), type(stream)
f = TemporaryFile(mode='rb+', prefix='openpyxl.', suffix='.unpack.temp')
stream = FUT(f)
assert isinstance(stream, tempfile), type(stream)
f.close()
from zipfile import ZipFile
t = TemporaryFile()
z = ZipFile(t, mode="w")
z.writestr("test", "whatever")
stream = FUT(z.open("test"))
assert hasattr(stream, "read")
z.close()
开发者ID:ericgazoni,项目名称:openpyxl,代码行数:28,代码来源:test_read.py
示例6: sort_diskbased
def sort_diskbased(stream, field, nsize=100000):
buf = []
files = []
count = 0
t = None
def iter_on_file(f):
try:
while True:
(key, v) = cPickle.load(f)
yield (key, t._make(v))
except EOFError:
f.close()
for elt in stream:
if isinstance(elt, StreamHeader):
t = elt.t
yield elt
elif isinstance(elt, StreamFooter):
buf.sort()
iterables = [iter_on_file(f) for f in files] + [itertools.imap(lambda obj: (getattr(obj, field), obj), buf)]
for (k, row) in heapq.merge(*iterables):
yield row
yield elt
else:
buf.append(elt)
count = count + 1
if count % nsize == 0:
buf.sort(key=lambda obj: getattr(obj, field))
f = TemporaryFile()
for item in buf:
cPickle.dump((getattr(item, field), list(item)), f, cPickle.HIGHEST_PROTOCOL)
f.flush()
files.append(f)
del buf[:]
开发者ID:MaNDRaXe,项目名称:PyBabe,代码行数:34,代码来源:mapreduce.py
示例7: process_response
def process_response(self, response):
# Parse the metadata zip file from the response
zipstr = parseString(response.content).getElementsByTagName('zipFile')
if zipstr:
zipstr = zipstr[0].firstChild.nodeValue
else:
return self.packages
zipfp = TemporaryFile()
zipfp.write(base64.b64decode(zipstr))
zipfile = ZipFile(zipfp, 'r')
packages = {}
# Loop through all files in the zip skipping anything other than InstalledPackages
for path in zipfile.namelist():
if not path.endswith('.installedPackage'):
continue
namespace = path.split('/')[-1].split('.')[0]
version = parseString(zipfile.open(path).read()).getElementsByTagName('versionNumber')
if version:
version = version[0].firstChild.nodeValue
packages[namespace] = version
self.packages = packages
return self.packages
开发者ID:Coca-ColaEnterprises,项目名称:mrbelvedere,代码行数:26,代码来源:mdapi.py
示例8: main
def main(argv):
args = docopt(__doc__, argv=argv)
headers = get_args_dict(args['--header'])
if args['--size-hint']:
headers['x-archive-size-hint'] = args['--size-hint']
# Upload keyword arguments.
upload_kwargs = dict(
metadata=get_args_dict(args['--metadata']),
headers=headers,
debug=args['--debug'],
queue_derive=True if args['--no-derive'] is False else False,
ignore_preexisting_bucket=args['--ignore-bucket'],
checksum=args['--checksum'],
verbose=True if args['--quiet'] is False else False,
retries=int(args['--retries']) if args['--retries'] else 0,
retries_sleep=int(args['--sleep']),
delete=args['--delete'],
)
if args['<file>'] == ['-'] and not args['-']:
sys.stderr.write('--remote-name is required when uploading from stdin.\n')
call(['ia', 'upload', '--help'])
sys.exit(1)
# Upload from stdin.
if args['-']:
local_file = TemporaryFile()
local_file.write(sys.stdin.read())
local_file.seek(0)
_upload_files(args, args['<identifier>'], local_file, upload_kwargs)
# Bulk upload using spreadsheet.
elif args['--spreadsheet']:
# Use the same session for each upload request.
session = ArchiveSession()
spreadsheet = csv.DictReader(open(args['--spreadsheet'], 'rU'))
prev_identifier = None
for row in spreadsheet:
local_file = row['file']
identifier = row['identifier']
del row['file']
del row['identifier']
if (not identifier) and (prev_identifier):
identifier = prev_identifier
# TODO: Clean up how indexed metadata items are coerced
# into metadata.
md_args = ['{0}:{1}'.format(k.lower(), v) for (k, v) in row.items() if v]
metadata = get_args_dict(md_args)
upload_kwargs['metadata'].update(metadata)
_upload_files(args, identifier, local_file, upload_kwargs, prev_identifier,
session)
prev_identifier = identifier
# Upload files.
else:
local_file = args['<file>']
_upload_files(args, args['<identifier>'], local_file, upload_kwargs)
开发者ID:digikeri,项目名称:internetarchive,代码行数:60,代码来源:ia_upload.py
示例9: run_cmd
def run_cmd(options, jenkins):
"""Run the jshint command using options.
Run the jshint command using options and return the output.
:param options: Options received by the code_analysis_jshint funciton.
:param jenkins: It is true when the jenkins output is turned on.
"""
# cmd is a sequence of program arguments
# first argument is child program
paths = options['directory'].split('\n')
cmd = [
options['jshint-bin'],
'--verbose',
'--exclude={0}'.format(options['jshint-exclude'] or ' ')] + paths
try:
if jenkins:
cmd.append('--reporter=jslint')
output_file_name = os.path.join(options['location'], 'jshint.xml')
output_file = open(output_file_name, 'w+')
else:
output_file = TemporaryFile('w+')
# Wrapper to subprocess.Popen
try:
# Return code is not used for jshint.
output = read_subprocess_output(cmd, output_file)[0]
return output
except OSError:
log('skip')
message = 'Command: {0}. Outputfile: {1}'.format(cmd, output_file)
raise CmdError(message)
finally:
output_file.close()
开发者ID:lewicki,项目名称:plone.recipe.codeanalysis,代码行数:35,代码来源:jshint.py
示例10: read_file
def read_file(self, data):
temp_file = TemporaryFile(mode="w+b")
if "content-length" in self.current_headers:
temp_file.write(data.read(self.current_headers["content-length"]))
else:
bytes = data.readline()
while not bytes[-2:] == "\r\n":
temp_file.write(bytes)
bytes = data.readline()
temp_file.write(bytes.rstrip())
filesize = temp_file.tell()
if filesize == 0:
self.read_boundry(data)
return
key = self.current_headers["content-disposition"]["name"]
filename = self.current_headers["content-disposition"].get("filename", "")
content_type = self.current_headers["content-type"]
if key not in self.files:
self.files[key] = []
temp_file.seek(0)
self.files[key].append({"filename":filename, "filesize":filesize, "content-type":content_type, "data":temp_file})
self.read_boundry(data)
开发者ID:aventurella,项目名称:crazy-horse,代码行数:31,代码来源:multipart.py
示例11: build
def build (self):
data = []
datapath = self.home.joinpath('data.xml')
dom = minidom.parse(datapath.absolute().as_posix())
index = 0
for page in dom.getElementsByTagName('page'):
page_data = self.parse_page(page)
page_data['page.index'] = index
data.append(page_data)
index += 1
data_loader = """
(function initData(w){{
w.Sectioner = new Object();
w.Sectioner.pages = {};
Object.freeze(w.Sectioner.pages);
}})(window);
""".format(json.dumps(data, indent=2)).encode('UTF-8')
data_js = TemporaryFile()
data_js.write(data_loader)
self.compiler.add_file(data_js, 'data.js')
for asset in dom.getElementsByTagName('asset'):
self.parse_asset(asset)
return data
开发者ID:atelier-cartographique,项目名称:static-sectioner,代码行数:27,代码来源:info.py
示例12: string_to_numpy
def string_to_numpy(string):
"""Convert human-readable string into numpy array.
Note:
loads as floats even if stored as ints.
human-readable string example:
1 2 3
4 5 6
is a string for the following array:
[[1,2,3]
[4,5,6]]
Args:
string (string): human-readable string to convert to numpy array
Returns:
numpy array
"""
f = TemporaryFile()
f.write(string)
f.seek(0)
array = np.loadtxt(f)
return array
开发者ID:mqtlam,项目名称:caffe-tools,代码行数:25,代码来源:numpyserializer.py
示例13: generate_pdf_ticket
def generate_pdf_ticket(registration=None, context=None, encoding='utf-8'):
import ho.pisa as pisa
import cStringIO as StringIO
from django.utils.six import BytesIO
if not registration and not context:
raise Http404(_("Invalid arguments"))
if not context:
d = ConfirmationEmailView.get_extra_context(registration)
context = Context(d)
template = loader.get_template('registration/ticket.html')
html = template.render(context)
if not registration:
registration = context['r']
result = StringIO.StringIO()
pdf = pisa.pisaDocument(StringIO.StringIO(html.encode("ISO-8859-1")), result)
result = result.getvalue()
try:
file = TemporaryFile()
file.write(result)
registration.ticket_file = File(file)
registration.save()
file.close()
except Exception, e:
charge = registration.charge
if charge:
charge.save_server_message(
['Failed while saving ticket file'], exception=e)
开发者ID:eleyine,项目名称:QFMS,代码行数:32,代码来源:email.py
示例14: write_lines
def write_lines(self, key, lines):
self._verify_key_format(key)
storage = self.bucket.new_key(key + ".json.gz")
buff = TemporaryFile()
archive = gzip.GzipFile(fileobj=buff, mode='w')
count = 0
for l in lines:
if hasattr(l, "__iter__"):
for ll in l:
archive.write(ll.encode("utf8"))
archive.write(b"\n")
count += 1
else:
archive.write(l.encode("utf8"))
archive.write(b"\n")
count += 1
archive.close()
file_length = buff.tell()
retry = 3
while retry:
try:
with Timer("Sending {{count}} lines in {{file_length|comma}} bytes", {"file_length": file_length, "count": count}, debug=self.settings.debug):
buff.seek(0)
storage.set_contents_from_file(buff)
break
except Exception, e:
Log.warning("could not push data to s3", cause=e)
retry -= 1
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:30,代码来源:s3.py
示例15: run_reduce
def run_reduce(self):
self.stopped_received = 0
self.merged_files = []
merged_iterator = None
while True:
# Iterate and merge files until all jobs are processed
get_next = self.get_next_file()
files = get_next
# itertools.islice(get_next, self.reduce_max_files)
all_files = [file for file in files]
iterables = [self.iter_on_file(file) for file in all_files]
merged_iterator = heapq.merge(*iterables)
if self.stopped_received < self.numprocs:
if self.debug:
debug_print("Performing intermediate merge on %u files" % len(iterables))
f = TemporaryFile()
self.merged_files.append(f)
for m in merged_iterator:
cPickle.dump(m, f, cPickle.HIGHEST_PROTOCOL)
f.seek(0)
f.flush()
else:
break
if len(self.merged_files) > 0:
if self.debug:
debug_print("Final merge")
# Final merge if required
merged_iterator = heapq.merge(
*([self.iter_on_file(stream) for stream in self.merged_files] + [merged_iterator])
)
if self.debug:
debug_print("Reduce loop")
result = self.reduce_loop(merged_iterator)
return result
开发者ID:fdouetteau,项目名称:PyMapReduce,代码行数:34,代码来源:__init__.py
示例16: test_one_key_per_block_writer
def test_one_key_per_block_writer(self):
# 2 pointers and a 1 byte string null terminated string = 10 bytes
stream = TemporaryFile()
i = IndexWriter(stream, block_size=10, terminator='\0')
i.add(0, 'b')
eq_(len(i.indexes), 1)
i.add(0, 'c')
eq_(len(i.indexes), 2)
i.finish()
stream.seek(0)
packet = stream.read()
eq_(len(packet), 30)
root_block = packet[:10]
eq_(root_block, '\x01\x00\x00\x00c\x00\x02\x00\x00\x00')
block_1 = packet[10:20]
eq_(block_1, '\x03\x00\x00\x00b\x00\x04\x00\x00\x00')
block_2 = packet[20:]
eq_(block_2, '\x04\x00\x00\x00c\x00\x05\x00\x00\x00')
开发者ID:Mondego,项目名称:pyreco,代码行数:26,代码来源:allPythonContent.py
示例17: convert_hwp5file_into_odtpkg
def convert_hwp5file_into_odtpkg(hwp5file):
from tempfile import TemporaryFile
tmpfile = TemporaryFile()
import os
tmpfile2 = os.fdopen( os.dup(tmpfile.fileno()), 'r')
from zipfile import ZipFile
zf = ZipFile(tmpfile, 'w')
from hwp5.hwp5odt import ODTPackage
odtpkg = ODTPackage(zf)
try:
from hwp5.hwp5odt import Converter
import hwp5.plat
if haveXSLTTransformer():
xslt = xslt_with_libreoffice
else:
# we use default xslt
xslt = hwp5.plat.get_xslt()
# convert without RelaxNG validation
convert = Converter(xslt)
# Embed images: see #32 - https://github.com/mete0r/pyhwp/issues/32
convert(hwp5file, odtpkg, embedimage=True)
finally:
odtpkg.close()
tmpfile2.seek(0)
odtpkg_stream = InputStreamFromFileLike(tmpfile2)
odtpkg_storage = StorageFromInputStream(odtpkg_stream)
return odtpkg_storage
开发者ID:hanul93,项目名称:pyhwp,代码行数:32,代码来源:__init__.py
示例18: dsorted
def dsorted(iterable, buffer_size=1e6, tempdir="."):
from disco.compat import pickle_load, pickle_dump
from heapq import merge
from itertools import islice
from tempfile import TemporaryFile
def read(handle):
while True:
try:
yield pickle_load(handle)
except EOFError:
return
iterator = iter(iterable)
subiters = []
while True:
buffer = sorted(islice(iterator, buffer_size))
handle = TemporaryFile(dir=tempdir)
for item in buffer:
pickle_dump(item, handle, -1)
handle.seek(0)
subiters.append(read(handle))
if len(buffer) < buffer_size:
break
return merge(*subiters)
开发者ID:spilgames,项目名称:disco,代码行数:25,代码来源:util.py
示例19: _open
def _open(self):
tmp = TemporaryFile()
resp = requests.get(self.metadata['url'], stream=True)
for chunk in resp.iter_content(256*1024):
tmp.write(chunk)
tmp.seek(0)
return tmp
开发者ID:mgax,项目名称:hoover,代码行数:7,代码来源:collectible.py
示例20: backup_dir
def backup_dir(key, data_node, directory):
temp = TemporaryFile()
archiver = Popen(["ssh", data_node, "tar", "c", directory], stdout=PIPE)
compressor = Popen(["lzma", "-z", "-9"], stdin=archiver.stdout, stdout=temp)
compressor.wait()
temp.seek(0)
key.set_contents_from_file(temp)
开发者ID:foomango,项目名称:myfalcon,代码行数:7,代码来源:new_backup.py
注:本文中的tempfile.TemporaryFile类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论