本文整理汇总了Python中tempfile.SpooledTemporaryFile类的典型用法代码示例。如果您正苦于以下问题:Python SpooledTemporaryFile类的具体用法?Python SpooledTemporaryFile怎么用?Python SpooledTemporaryFile使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SpooledTemporaryFile类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: string2spool
def string2spool(input_string):
"""Takes a string as an argument and returns an open file handle with the
contents of the string"""
file_object=SpooledTemporaryFile()
file_object.write(input_string)
file_object.seek(0)
return file_object
开发者ID:dmf24,项目名称:pyscripting,代码行数:7,代码来源:files.py
示例2: gen_data
def gen_data(location=None, **kwargs):
"""Fetches realtime data and generates records"""
url = '%s/%s' % (kwargs['BASE_URL'], location)
r = requests.get(url)
f = SpooledTemporaryFile() # wrap to access `fileno`
f.write(r.content)
return io.read_xls(r., sanitize=True, encoding=r.encoding)
开发者ID:reubano,项目名称:cookiecutter-collector,代码行数:7,代码来源:utils.py
示例3: __init__
def __init__(self, data=None, fp=None, length=-1):
assert bool(data is not None) ^ bool(fp)
if length == -1:
if data is not None:
length = len(data)
else:
length = get_size(fp) # can be -1
# We allow writer reuse, but if we're working with a stream, we cannot
# seek. Copy the data to a tempfile.
if fp and not can_seek(fp):
newfp = SpooledTemporaryFile(MAX_INMEMORY_SIZE)
sendfile(newfp, fp)
length = newfp.tell()
newfp.seek(0)
fp = newfp
self.data = data
self.fp = fp
self.fpreads = 0 # keep track of fp usage
self.length = length
assert length >= 0
self.use_tempfile = length > MAX_INMEMORY_SIZE
开发者ID:rmoorman,项目名称:pstore,代码行数:25,代码来源:crypt.py
示例4: fetch_data
def fetch_data(config):
"""Fetches realtime data and generates records"""
ckan = CKAN(config['ENDPOINT'], apikey=config['API_KEY'])
# r = ckan.fetch_resource(config['RID']) # if using ckanutils
resource = ckan.action.resource_show(id=config['RID'])
url = resource.get('perma_link') or resource.get('url')
r = requests.get(url, stream=True)
if any('403' in h.headers.get('x-ckan-error', '') for h in r.history):
raise NotAuthorized(
'Access to fetch resource %s was denied.' % config['RID'])
try:
ext = splitext(url)[1].split('.')[1]
except IndexError:
ext = cv.ctype2ext(r.headers['Content-Type'])
if ext == 'csv':
records = io.read_csv(r.raw, sanitize=True, encoding=r.encoding)
elif ext in {'xls', 'xlsx'}:
r = requests.get(url)
f = SpooledTemporaryFile()
f.write(r.content)
records = io.read_xls(f, sanitize=True, encoding=r.encoding)
else:
msg = 'Filetype `%s` unsupported.'
msg += 'Please view tabutils.io documentation for assistance.'
raise TypeError(msg)
constraints = [('adm0_name', 'a'), ('mp_month', '3'), ('mp_year', '2015')]
filterer = lambda x: all(x[k].lower().startswith(v) for k, v in constraints)
return it.ifilter(filterer, records)
开发者ID:nelsonkimaiga,项目名称:hdx-file-proxy,代码行数:33,代码来源:utils.py
示例5: send
def send(self, request, stream=None, timeout=None, verify=None, cert=None, proxies=None):
pathname = url_to_path(request.url)
resp = Response()
resp.status_code = 200
resp.url = request.url
try:
stats = lstat(pathname)
except (IOError, OSError) as exc:
resp.status_code = 404
message = {
"error": "file does not exist",
"path": pathname,
"exception": repr(exc),
}
fh = SpooledTemporaryFile()
fh.write(ensure_binary(json.dumps(message)))
fh.seek(0)
resp.raw = fh
resp.close = resp.raw.close
else:
modified = formatdate(stats.st_mtime, usegmt=True)
content_type = guess_type(pathname)[0] or "text/plain"
resp.headers = CaseInsensitiveDict({
"Content-Type": content_type,
"Content-Length": stats.st_size,
"Last-Modified": modified,
})
resp.raw = open(pathname, "rb")
resp.close = resp.raw.close
return resp
开发者ID:ESSS,项目名称:conda,代码行数:33,代码来源:localfs.py
示例6: create_dump
def create_dump(self):
if not self.connection.is_usable():
self.connection.connect()
dump_file = SpooledTemporaryFile(max_size=10 * 1024 * 1024)
self._write_dump(dump_file)
dump_file.seek(0)
return dump_file
开发者ID:Fisiu,项目名称:django-dbbackup,代码行数:7,代码来源:sqlite.py
示例7: push_index
def push_index(self):
stream = SpooledTemporaryFile(max_size=20 * MB)
pointers = 0
stream.write(struct.pack(OFFSET_FMT, pointers))
self.indexes.append([
stream, pointers, self.block_size - self.pointer_size
])
开发者ID:aparup,项目名称:common_crawl_index,代码行数:7,代码来源:pbtree.py
示例8: close
def close(self):
"""Send the change to the DFS, and close the file."""
self.flush()
if 'c' not in self.mode:
SpooledTemporaryFile.close(self)
开发者ID:Alexis-D,项目名称:DFS,代码行数:7,代码来源:client.py
示例9: run
def run(self, opts):
from lzma.xz import compress
self.h = sha1()
tdir = mkdtemp("calibre-mathjax-build")
try:
src = opts.path_to_mathjax or self.download_mathjax_release(tdir, opts.mathjax_url)
self.info("Compressing MathJax...")
t = SpooledTemporaryFile()
with ZipFile(t, "w", ZIP_STORED) as zf:
self.add_file(zf, self.j(src, "unpacked", "MathJax.js"), "MathJax.js")
self.add_tree(
zf,
self.j(src, "fonts", "HTML-CSS", self.FONT_FAMILY, "woff"),
"fonts/HTML-CSS/%s/woff" % self.FONT_FAMILY,
)
for d in "extensions jax/element jax/input jax/output/CommonHTML".split():
self.add_tree(zf, self.j(src, "unpacked", *d.split("/")), d)
zf.comment = self.h.hexdigest()
t.seek(0)
with open(self.j(self.RESOURCES, "content-server", "mathjax.zip.xz"), "wb") as f:
compress(t, f, level=9)
with open(self.j(self.RESOURCES, "content-server", "mathjax.version"), "wb") as f:
f.write(zf.comment)
finally:
shutil.rmtree(tdir)
开发者ID:timpalpant,项目名称:calibre,代码行数:27,代码来源:mathjax.py
示例10: GoogleCloudFile
class GoogleCloudFile(File):
def __init__(self, name, mode, storage):
self.name = name
self.mime_type = mimetypes.guess_type(name)[0]
self._mode = mode
self._storage = storage
# NOTE(mattrobenolt): This is the same change in behavior as in
# the s3 backend. We're opting now to load the file
# or metadata at this step. This means we won't actually
# know a file doesn't exist until we try to read it.
self.blob = FancyBlob(storage.download_url, self.name, storage.bucket)
self._file = None
self._is_dirty = False
@property
def size(self):
return self.blob.size
def _get_file(self):
if self._file is None:
with metrics.timer('filestore.read', instance='gcs'):
self._file = SpooledTemporaryFile(
max_size=self._storage.max_memory_size,
suffix=".GSStorageFile",
dir=None,
)
if 'r' in self._mode:
self._is_dirty = False
self.blob.download_to_file(self._file)
self._file.seek(0)
return self._file
def _set_file(self, value):
self._file = value
file = property(_get_file, _set_file)
def read(self, num_bytes=None):
if 'r' not in self._mode:
raise AttributeError("File was not opened in read mode.")
if num_bytes is None:
num_bytes = -1
return super(GoogleCloudFile, self).read(num_bytes)
def write(self, content):
if 'w' not in self._mode:
raise AttributeError("File was not opened in write mode.")
self._is_dirty = True
return super(GoogleCloudFile, self).write(force_bytes(content))
def close(self):
if self._file is not None:
if self._is_dirty:
self.file.seek(0)
self.blob.upload_from_file(self.file, content_type=self.mime_type)
self._file.close()
self._file = None
开发者ID:alexandrul,项目名称:sentry,代码行数:59,代码来源:gcs.py
示例11: start
def start(self, args):
self.outFile = SpooledTemporaryFile()
self.errFile = SpooledTemporaryFile()
self.cmdline = list2cmdline(args)
print 'starting: ' + self.cmdline
self.process = Popen(args,
stderr=self.errFile, stdout=self.outFile, universal_newlines=False)
self.process_start = time()
开发者ID:GRay63,项目名称:TestAppGUI,代码行数:8,代码来源:startfinish.py
示例12: _open
def _open(self, name, mode = 'rb') -> File:
name = self._transform_name(name)
content = self.service.get_blob_content(self.container, name)
file = SpooledTemporaryFile()
file.write(content)
file.seek(0) # explicitly reset to allow reading from the beginning afterwards as-is
return File(file)
开发者ID:Bunkerbewohner,项目名称:azurepython3,代码行数:8,代码来源:djangostorage.py
示例13: generate
def generate(self):
points = self.points()
self.buffer = 2*self.pad
count = np.zeros([x + 2*self.buffer for x in self.expanded_size])
density = np.zeros([x + 2*self.buffer for x in self.expanded_size])
# Render the B&W density version of the heatmap
dot_size = self.dot.shape[0]
for x, y, weight in points:
x1 = x + self.buffer - (dot_size - 1)/2
y1 = y + self.buffer - (dot_size - 1)/2
count[y1:(y1 + dot_size),
x1:(x1 + dot_size)] += self.dot
density[y1:(y1 + dot_size),
x1:(x1+ dot_size)] += self.dot*float(weight)
# Pick the field to map
if gheat_settings.GHEAT_MAP_MODE == gheat_settings.GHEAT_MAP_MODE_COUNT:
img = count
#opacity = np.zeros(img.shape()) + 255
elif gheat_settings.GHEAT_MAP_MODE == gheat_settings.GHEAT_MAP_MODE_SUM_DENSITY:
img = density
#opacity = np.clip(count, 0, gheat_settings.GHEAT_OPACITY_LIMIT)
elif gheat_settings.GHEAT_MAP_MODE == gheat_settings.GHEAT_MAP_MODE_MEAN_DENSITY:
img = density
img[count > 0] /= count[count > 0]
#opacity = np.clip(count, 0, gheat_settings.GHEAT_OPACITY_LIMIT)
else:
raise ValueError, 'Unknown map mode'
# Crop resulting density image (which could have grown) into the
# actual canvas size we want
img = img[(self.pad + self.buffer):(SIZE + self.pad + self.buffer),
(self.pad + self.buffer):(SIZE + self.pad + self.buffer)]
#opacity = opacity[self.pad:(SIZE + self.pad), self.pad:(SIZE + self.pad)]
# Maybe use a logarithm
img = np.where(img>0, np.log(img)+1, img)
# Convert to a 0 to 255 image
img = np.clip(256.0*np.power(img/gheat_settings.GHEAT_MAX_VALUE,
gheat_settings.GHEAT_SCALING_COEFFICIENT), 0, 255.999).astype('uint8')
# Given the B&W density image, generate a color heatmap based on
# this Tile's color scheme.
colour_image = np.zeros((SIZE, SIZE, 4), 'uint8') + 255
for i in range(3):
colour_image[:,:,i] = self.schemeobj.colors[:,i][255 - img]
colour_image[:,:,3] = np.where(img > gheat_settings.GHEAT_MIN_DENSITY, 255, 0)
tmpfile = SpooledTemporaryFile()
writer = png.Writer(SIZE, SIZE, alpha=True, bitdepth=8)
writer.write(tmpfile, np.reshape(colour_image, (SIZE, SIZE*4)))
tmpfile.seek(0)
return tmpfile
开发者ID:dragonfly-science,项目名称:django-gheat,代码行数:58,代码来源:numeric.py
示例14: test_run_command_stdin
def test_run_command_stdin(self):
connector = BaseCommandDBConnector()
stdin = SpooledTemporaryFile()
stdin.write(b'foo')
stdin.seek(0)
# Run
stdout, stderr = connector.run_command('cat', stdin=stdin)
self.assertEqual(stdout.read(), b'foo')
self.assertFalse(stderr.read())
开发者ID:pombredanne,项目名称:django-dbbackup,代码行数:9,代码来源:test_connectors.py
示例15: file_from_content
def file_from_content(content):
f = content
if isinstance(content, cgi.FieldStorage):
f = content.file
elif isinstance(content, byte_string):
f = SpooledTemporaryFile(INMEMORY_FILESIZE)
f.write(content)
f.seek(0)
return f
开发者ID:dirn,项目名称:depot,代码行数:9,代码来源:utils.py
示例16: GoogleCloudFile
class GoogleCloudFile(File):
def __init__(self, name, mode, storage):
self.name = name
self.mime_type = mimetypes.guess_type(name)[0]
self._mode = mode
self._storage = storage
self.blob = storage.bucket.get_blob(name)
if not self.blob and 'w' in mode:
self.blob = Blob(self.name, storage.bucket)
self._file = None
self._is_dirty = False
@property
def size(self):
return self.blob.size
def _get_file(self):
if self._file is None:
self._file = SpooledTemporaryFile(
max_size=self._storage.max_memory_size,
suffix=".GSStorageFile",
dir=setting("FILE_UPLOAD_TEMP_DIR", None)
)
if 'r' in self._mode:
self._is_dirty = False
self.blob.download_to_file(self._file)
self._file.seek(0)
return self._file
def _set_file(self, value):
self._file = value
file = property(_get_file, _set_file)
def read(self, num_bytes=None):
if 'r' not in self._mode:
raise AttributeError("File was not opened in read mode.")
if num_bytes is None:
num_bytes = -1
return super(GoogleCloudFile, self).read(num_bytes)
def write(self, content):
if 'w' not in self._mode:
raise AttributeError("File was not opened in write mode.")
self._is_dirty = True
return super(GoogleCloudFile, self).write(force_bytes(content))
def close(self):
if self._file is not None:
if self._is_dirty:
self.file.seek(0)
self.blob.upload_from_file(self.file, content_type=self.mime_type)
self._file.close()
self._file = None
开发者ID:cogzidel,项目名称:healthchecks,代码行数:56,代码来源:gcloud.py
示例17: close
def close(self):
"""On close, seek to 0 and write the data via the API, then close()
for realz
"""
logger.debug("close() called on WriteBuffer")
self.seek(0)
logger.debug("Attempting to create file at dir_path %s with name %s" %
(self.path, self.filename))
self.fs.rc.fs.write_file(self, self.fullpath)
SpooledTemporaryFile.close(self) # old-style class!
开发者ID:Qumulo,项目名称:qftpd,代码行数:10,代码来源:qftpd.py
示例18: read_file_handle
def read_file_handle(self, filename):
"""Get the file data, put it in a SpooledTemporaryFile object for return
and reading
"""
logger.debug("read_file_handle('%s')" % filename)
read_buffer = SpooledTemporaryFile()
response = self.rc.fs.read_file(read_buffer, filename)
logger.debug(response)
read_buffer.seek(0)
return read_buffer
开发者ID:Qumulo,项目名称:qftpd,代码行数:10,代码来源:qftpd.py
示例19: filter_file
def filter_file(filter, filename, membuffer=10485760):
tmp = SpooledTemporaryFile(max_size=membuffer)
with open(filename) as input:
for line in input:
if filter(line):
tmp.write(line)
tmp.seek(0)
with open(filename, "w") as output:
for line in tmp:
output.write(line)
开发者ID:StackOps,项目名称:stackops-agent,代码行数:10,代码来源:utils.py
示例20: run_command
def run_command(self, command, stdin=None, env=None):
"""
Launch a shell command line.
:param command: Command line to launch
:type command: str
:param stdin: Standard input of command
:type stdin: file
:param env: Environment variable used in command
:type env: dict
:return: Standard output of command
:rtype: file
"""
cmd = shlex.split(command)
stdout = SpooledTemporaryFile(max_size=settings.TMP_FILE_MAX_SIZE,
dir=settings.TMP_DIR)
stderr = SpooledTemporaryFile(max_size=settings.TMP_FILE_MAX_SIZE,
dir=settings.TMP_DIR)
full_env = self.env.copy()
full_env.update(env or {})
try:
process = Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr,
env=full_env)
process.wait()
if process.poll():
stderr.seek(0)
raise exceptions.CommandConnectorError(
"Error running: {}\n{}".format(command, stderr.read()))
stdout.seek(0)
stderr.seek(0)
return stdout, stderr
except OSError as err:
raise exceptions.CommandConnectorError(
"Error running: {}\n{}".format(command, str(err)))
开发者ID:Ubiwhere,项目名称:django-dbbackup,代码行数:34,代码来源:base.py
注:本文中的tempfile.SpooledTemporaryFile类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论