本文整理汇总了Python中posix.fdopen函数的典型用法代码示例。如果您正苦于以下问题:Python fdopen函数的具体用法?Python fdopen怎么用?Python fdopen使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了fdopen函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_parallel_writes_and_reads
def test_parallel_writes_and_reads(self):
# Warning: a test like the one below deadlocks CPython
# http://bugs.python.org/issue1164
# It also deadlocks on py.py because the space GIL is not
# released.
import thread, sys, os
try:
fdopen = self.file.fdopen
except AttributeError:
# when running with -A
skip("deadlocks on top of CPython")
read_fd, write_fd = os.pipe()
fread = fdopen(read_fd, "rb", 200)
fwrite = fdopen(write_fd, "wb", 200)
run = True
readers_done = [0]
def writer():
f = 0.1
while run:
print >> fwrite, f,
f = 4 * f - 3 * f * f
print >> fwrite, "X"
fwrite.flush()
sys.stdout.write("writer ends\n")
def reader(j):
while True:
data = fread.read(1)
# sys.stdout.write('%d%r ' % (j, data))
if data == "X":
break
sys.stdout.write("reader ends\n")
readers_done[0] += 1
for j in range(3):
thread.start_new_thread(reader, (j,))
thread.start_new_thread(writer, ())
import time
t = time.time() + 5
print "start of test"
while time.time() < t:
time.sleep(1)
print "end of test"
assert readers_done[0] == 0
run = False # end the writers
for i in range(600):
time.sleep(0.4)
sys.stdout.flush()
x = readers_done[0]
if x == 3:
break
print "readers_done == %d, still waiting..." % (x,)
else:
raise Exception("time out")
print "Passed."
开发者ID:cimarieta,项目名称:usp,代码行数:60,代码来源:test_file.py
示例2: dup
def dup(self):
import posix
if not hasattr(posix, 'fdopen'):
raise AttributeError, 'dup() method unavailable'
return posix.fdopen(posix.dup(self._file_.fileno()), self._file_.mode)
开发者ID:axray,项目名称:dataware.dreamplug,代码行数:7,代码来源:posixfile.py
示例3: dup
def dup(self):
import posix
try: ignore = posix.fdopen
except: raise AttributeError, 'dup() method unavailable'
return posix.fdopen(posix.dup(self._file_.fileno()), self._file_.mode)
开发者ID:asottile,项目名称:ancient-pythons,代码行数:7,代码来源:posixfile.py
示例4: test_ignore_ioerror_in_readall_if_nonempty_result
def test_ignore_ioerror_in_readall_if_nonempty_result(self):
# this is the behavior of regular files in CPython 2.7, as
# well as of _io.FileIO at least in CPython 3.3. This is
# *not* the behavior of _io.FileIO in CPython 3.4 or 3.5;
# see CPython's issue #21090.
import sys
try:
from posix import openpty, fdopen, write, close
except ImportError:
skip('no openpty on this platform')
if 'gnukfreebsd' in sys.platform:
skip('close() hangs forever on kFreeBSD')
read_fd, write_fd = openpty()
write(write_fd, 'Abc\n')
close(write_fd)
f = fdopen(read_fd)
# behavior on Linux: f.read() returns 'Abc\r\n', then the next time
# it raises IOError. Behavior on OS/X (Python 2.7.5): the close()
# above threw away the buffer, and f.read() always returns ''.
if sys.platform.startswith('linux'):
s = f.read()
assert s == 'Abc\r\n'
raises(IOError, f.read)
else:
s = f.read()
assert s == ''
s = f.read()
assert s == ''
f.close()
开发者ID:abhinavthomas,项目名称:pypy,代码行数:29,代码来源:test_file.py
示例5: dup2
def dup2(self, fd):
import posix
if not hasattr(posix, "fdopen"):
raise AttributeError, "dup() method unavailable"
posix.dup2(self._file_.fileno(), fd)
return posix.fdopen(fd, self._file_.mode)
开发者ID:webiumsk,项目名称:WOT-0.9.14-CT,代码行数:7,代码来源:posixfile.py
示例6: open
def open (name, flags='r', mode=0, labelset=None, endpoint=None, bufsize=4096):
py_fl = ""
c_fl = 0
map = { "w" : (posix.O_WRONLY, True),
"r" : (posix.O_RDONLY, True),
"a" : (posix.O_APPEND, True),
"c" : (posix.O_CREAT, False),
"e" : (posix.O_EXCL, False),
"t" : (posix.O_TRUNC, False)}
for c in flags:
p = map.get (c)
if p is None:
raise ValueError, "Unknown flag to open: '%s'" % c
c_fl |= p[0]
if p[1]:
py_fl += c
fd = fli._open (name, c_fl, mode, labelset, endpoint)
if fd < 0:
raise_err ("open failed on file '%s'" % name)
return posix.fdopen (fd, py_fl, bufsize)
开发者ID:imosts,项目名称:flume,代码行数:25,代码来源:flmos.py
示例7: test_readinto_error
def test_readinto_error(self):
import _socket, posix, array
s = _socket.socket()
buff = array.array("c", "X" * 65)
fh = posix.fdopen(posix.dup(s.fileno()), 'rb')
# "Transport endpoint is not connected"
raises(IOError, fh.readinto, buff)
fh.close()
s.close()
开发者ID:mozillazg,项目名称:pypy,代码行数:9,代码来源:test_file_extra.py
示例8: test_EAGAIN
def test_EAGAIN(self):
import _socket, posix
s1, s2 = _socket.socketpair()
s2.setblocking(False)
s1.send("hello")
f2 = posix.fdopen(posix.dup(s2.fileno()), 'rb', 0)
data = f2.read(12)
assert data == "hello"
f2.close()
s2.close()
s1.close()
开发者ID:kipras,项目名称:pypy,代码行数:13,代码来源:test_file_extra.py
示例9: test_ignore_ioerror_in_readall_if_nonempty_result
def test_ignore_ioerror_in_readall_if_nonempty_result(self):
# this is the behavior of regular files in CPython 2.7, as
# well as of _io.FileIO at least in CPython 3.3. This is
# *not* the behavior of _io.FileIO in CPython 3.4 or 3.5;
# see CPython's issue #21090.
try:
from posix import openpty, fdopen, write, close
except ImportError:
skip('no openpty on this platform')
read_fd, write_fd = openpty()
write(write_fd, 'Abc\n')
close(write_fd)
f = fdopen(read_fd)
s = f.read()
assert s == 'Abc\r\n'
raises(IOError, f.read)
f.close()
开发者ID:bukzor,项目名称:pypy,代码行数:17,代码来源:test_file.py
示例10: test_fdopen
def test_fdopen(self):
import os
f = self.file(self.temppath, "w")
try:
f.write("foo\nbaz\n")
finally:
f.close()
try:
fdopen = self.file.fdopen
except AttributeError:
fdopen = os.fdopen # when running with -A
fd = os.open(self.temppath, os.O_WRONLY | os.O_CREAT)
f2 = fdopen(fd, "a")
f2.seek(0, 2)
f2.write("bar\nboo")
f2.close()
# don't close fd, will get a whining __del__
f = self.file(self.temppath, "r")
try:
s = f.read()
assert s == "foo\nbaz\nbar\nboo"
finally:
f.close()
开发者ID:abhinavthomas,项目名称:pypy,代码行数:23,代码来源:test_file.py
示例11: fdopen_helper
def fdopen_helper(self, *args):
fd = os.open(test_support.TESTFN, os.O_RDONLY)
fp2 = posix.fdopen(fd, *args)
fp2.close()
开发者ID:BmanGames,项目名称:Toontown-Level-Editor,代码行数:4,代码来源:test_posix.py
注:本文中的posix.fdopen函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论