本文整理汇总了Python中sys.stdout.fileno函数的典型用法代码示例。如果您正苦于以下问题:Python fileno函数的具体用法?Python fileno怎么用?Python fileno使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了fileno函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: daemonize
def daemonize(self):
"""
Forks the process(es) from the controlling terminal
and redirects I/O streams for logging.
"""
self.fork()
chdir(getcwd())
setsid()
umask(0)
self.fork()
stdout.flush()
stderr.flush()
si= file(self.stdin, 'w+')
so= file(self.stdout, 'a+')
se= file(self.stderr, 'a+', 0)
dup2(si.fileno(), stdin.fileno())
dup2(so.fileno(), stdout.fileno())
dup2(se.fileno(), stderr.fileno())
register(self.del_pid)
self.set_pid()
开发者ID:richardjmarini,项目名称:Impetus,代码行数:26,代码来源:impetus.py
示例2: parse_args
def parse_args():
p = argparse.ArgumentParser(description='Create a git repository with the history of the specified Wikipedia article.')
p.add_argument('article_name')
p.add_argument('-n', '--no-import', dest='doimport', default=True, action='store_false',
help="Don't invoke git fast-import; only generate fast-import data stream")
p.add_argument('-o', '--out', help='Output directory or fast-import stream file')
g=p.add_mutually_exclusive_group()
g.add_argument('--lang', default=lang, help='Wikipedia language code (default %(default)s)')
g.add_argument('--site', help='Alternate site (e.g. http://commons.wikimedia.org[/w/])')
args = p.parse_args()
if not args.doimport:
if args.out is None:
# http://stackoverflow.com/a/2374507/20789
if platform == "win32":
import os, msvcrt
msvcrt.setmode(stdout.fileno(), os.O_BINARY)
args.out = stdout
else:
try:
args.out = argparse.FileType('wb')(args.out)
except argparse.ArgumentTypeError as e:
p.error(e.args[0])
return p, args
开发者ID:dualsky,项目名称:wp2git,代码行数:25,代码来源:wp2git.py
示例3: get_terminal_size_posix
def get_terminal_size_posix():
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
cr = struct.unpack('hh',
fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
return cr
except:
pass
cr = ioctl_GWINSZ(stdin.fileno()) or \
ioctl_GWINSZ(stdout.fileno()) or \
ioctl_GWINSZ(stderr.fileno())
if not cr:
try:
with os.open(os.ctermid(), os.O_RDONLY) as fd:
cr = ioctl_GWINSZ(fd)
except:
pass
if not cr:
try:
cr = (os.environ['LINES'], os.environ['COLUMNS'])
except:
pass
if not cr:
raise TerminalError('cannot determine terminal size from POSIX')
return int(cr[1]), int(cr[0])
开发者ID:edrevo,项目名称:fiware-cosmos-platform,代码行数:27,代码来源:terminal.py
示例4: _daemonize
def _daemonize(self):
try:
pid = os.fork()
if pid > 0:
exit()
except OSError as e:
error(_('Error entering daemon mode: %s') % e.strerror)
exit()
os.chdir('/')
os.setsid()
os.umask(0)
stdout.flush()
stderr.flush()
si = open(os.devnull, 'r')
so = open(os.devnull, 'a+')
se = open(os.devnull, 'a+')
os.dup2(si.fileno(), stdin.fileno())
os.dup2(so.fileno(), stdout.fileno())
os.dup2(se.fileno(), stderr.fileno())
on_exit(self._quit)
old_log = getLogger()
if old_log.handlers:
for handler in old_log.handlers:
old_log.removeHandler(handler)
log(filename=self.logfile, level=self.loglevel,
format='%(asctime)s %(levelname)-8s %(message)s')
self._set_pid()
开发者ID:debomatic,项目名称:debomatic,代码行数:27,代码来源:process.py
示例5: darwin_terminal_shell_is_focused
def darwin_terminal_shell_is_focused():
focused_tty = osascript_tell(
'Terminal',
'tty of (first tab of (first window whose frontmost is true) '
'whose selected is true)',
)
return focused_tty == ttyname(stdout.fileno())
开发者ID:dixudx,项目名称:ntfy,代码行数:7,代码来源:terminal.py
示例6: daemonize
def daemonize(self):
try:
pid = fork()
if pid > 0:
# exit first parent
_exit(0)
except OSError as e:
stderr.write(
"fork #1 failed: {0:d} ({1:s})\n".format(
e.errno, str(e)
)
)
raise SystemExit(1)
# decouple from parent environment
chdir(self.path)
setsid()
umask(0)
# do second fork
try:
pid = fork()
if pid > 0:
# exit from second parent
_exit(0)
except OSError as e:
stderr.write(
"fork #2 failed: {0:d} ({1:s})\n".format(
e.errno, str(e)
)
)
raise SystemExit(1)
# redirect standard file descriptors
stdout.flush()
stderr.flush()
maxfd = getrlimit(RLIMIT_NOFILE)[1]
if maxfd == RLIM_INFINITY:
maxfd = 2048
closerange(0, maxfd)
si = open(self.stdin, "r")
so = open(self.stdout, "a+")
se = open(self.stderr, "a+")
dup2(si.fileno(), stdin.fileno())
dup2(so.fileno(), stdout.fileno())
dup2(se.fileno(), stderr.fileno())
self.fire(writepid())
self.fire(daemonized(self))
开发者ID:AdricEpic,项目名称:circuits,代码行数:55,代码来源:daemon.py
示例7: read_data
def read_data(cmd_txt, parm, server_addr, fname, skip, get):
res = b""
try:
s = socket()
s.settimeout(5)
s.connect(server_addr)
if s:
# pack & send command
cmd = cmd_txt + parm
cmd = pack("<l%ds" % len(cmd), len(cmd), cmd.encode())
s.send(cmd)
# get length of response
ss = s.recv(4)
size = unpack("<l", ss)[0]
# skip header
hdr_size = len(cmd_txt) + 4
name = s.recv(hdr_size)
# stderr.write("Command %s, parm %s, Size %d, Header_size %d\n" % (cmd_txt, parm, size, hdr_size))
size -= hdr_size
# open file, if required
if fname != None:
if fname:
f = open(fname, "wb")
else:
f = fdopen(stdout.fileno(), 'wb') # binary stdout
# if additional skip, do it
while skip > 0:
img = s.recv(min(skip, 4096))
size -= len(img)
skip -= len(img)
# check, whether not the full data is required
if get > 0 and get < size:
skip = size - get
size = get
# get bulk data
while size > 0:
img = s.recv(min(size, 4096))
if fname != None:
f.write(img)
else:
res += img
size -= len(img)
# eat up what's left
while skip > 0:
img = s.recv(min(skip, 4096))
skip -= len(img)
# and that's it
if fname:
f.close()
s.close()
return res
except Exception as err:
stderr.write("Error attempting to get data, reason: %s\n" % err)
return False
开发者ID:gedankenexpt,项目名称:owonread,代码行数:54,代码来源:owonread.py
示例8: open_log
def open_log(folder, name, use_stdout=False):
"""Return a writable file-like object representing a log file.
:arg folder: The folder to put the log file in
:arg name: The name of the log file
:arg use_stdout: If True, return a handle to stdout for verbose output,
duplicated so it can be closed with impunity.
"""
if use_stdout:
return fdopen(dup(stdout.fileno()), 'w')
return open(join(folder, name), 'w', 1)
开发者ID:nbstar,项目名称:dxr,代码行数:12,代码来源:utils.py
示例9: daemonize
def daemonize(self):
if fork(): exit(0)
umask(0)
setsid()
if fork(): exit(0)
stdout.flush()
stderr.flush()
si = file('/dev/null', 'r')
so = file('/dev/null', 'a+')
se = file('/dev/null', 'a+', 0)
dup2(si.fileno(), stdin.fileno())
dup2(so.fileno(), stdout.fileno())
dup2(se.fileno(), stderr.fileno())
开发者ID:Acey9,项目名称:SEnginx,代码行数:13,代码来源:nx_commons.py
示例10: open_log
def open_log(config_or_tree, name, use_stdout=False):
"""Return a writable file-like object representing a log file.
:arg config_or_tree: a Config or Tree object which tells us which folder to
put the log file in
:arg name: The name of the log file
:arg use_stdout: If True, return a handle to stdout for verbose output,
duplicated so it can be closed with impunity.
"""
if use_stdout:
return os.fdopen(dup(stdout.fileno()), 'w')
return open(os.path.join(config_or_tree.log_folder, name), 'w', 1)
开发者ID:brunoschneider17,项目名称:dxr,代码行数:13,代码来源:utils.py
示例11: sudo
def sudo(command):
"""do shell "%s" with administrator privileges
http://developer.apple.com/library/mac/#technotes/tn2065/_index.html"""
if isatty(stdout.fileno()): # terminal
cmd="sudo %s" % command
r=run(cmd)
else: # GUI
escaped=command.replace('"','\\"')
cmd='do shell script "%s" with administrator privileges' % escaped
r=run("osascript",cmd)
if r.status_code==0:
return r.std_out
else:
raise Exception(r.std_err)
开发者ID:RaulSierro,项目名称:Orkiv,代码行数:14,代码来源:__init__.py
示例12: _prompting_input
def _prompting_input(self):
'''
Return prompting stdin.__iter__ decorator...
If either:
- prompt explicitly specified.
- both stdin/out are a tty
'''
if self.args.prompt or \
isatty(stdin.fileno()) and isatty(stdout.fileno()):
return InteractiveInput(prompt=self.args.prompt or
self.DEFAULT_PROMPT)
else:
return stdin
开发者ID:pilona,项目名称:RPN,代码行数:14,代码来源:cli.py
示例13: parse_args
def parse_args():
"""Parses the command line arguments."""
p = argparse.ArgumentParser(
description='Create a git repository with the history of the specified Wikipedia article.')
p.add_argument('article_name')
output = p.add_argument_group('Output options')
g = output.add_mutually_exclusive_group()
g.add_argument('-n', '--no-import', dest='doimport', default=True, action='store_false',
help='Don\'t invoke git fast-import; only generate fast-import data stream')
g.add_argument('-b', '--bare', action='store_true',
help='Import to a bare repository (no working tree)')
output.add_argument('-o', '--out',
help='Output directory or fast-import stream file')
site = p.add_argument_group('MediaWiki site selection')
g = site.add_mutually_exclusive_group()
g.add_argument('--lang', default=locale.getdefaultlocale()[0].split('_')[0] or '',
help='Wikipedia language code (default %(default)s)')
g.add_argument('--site',
help='Alternate MediaWiki site (e.g. https://commons.wikimedia.org[/w/])')
revision = p.add_argument_group('Revision options')
revision.add_argument('--expandtemplates', action='store_true', help='Expand templates')
revision.add_argument('--start', help='ISO8601 timestamp to start listing from')
revision.add_argument('--end', help='ISO8601 timestamp to end listing at')
revision.add_argument('--user', help='Only list revisions made by this user')
revision.add_argument('--excludeuser', help='Exclude revisions made by this user')
args = p.parse_args()
if not args.doimport:
if args.out is None:
# https://stackoverflow.com/a/2374507/20789
if platform == 'win32':
import msvcrt
msvcrt.setmode(stdout.fileno(), os.O_BINARY)
try:
args.out = stdout.buffer
except AttributeError:
args.out = stdout
else:
try:
args.out = argparse.FileType('wb')(args.out)
except argparse.ArgumentTypeError as e:
p.error(e.args[0])
return p, args
开发者ID:simon04,项目名称:wp2git,代码行数:47,代码来源:wp2git.py
示例14: extractlayers
def extractlayers(dc, args, layers, top_most_layer_id):
target_path = args.target
flags = O_WRONLY
if target_path == _TARGET_STDOUT:
target_fd = stdout.fileno()
else:
flags |= O_CREAT | O_TRUNC
if not args.force:
flags |= O_EXCL
target_fd = logexception(_LOGGER, ERROR, 'unable to open target file "{}": {{e}}'.format(target_path), os_open, target_path, flags, 0o666)
with fdopen(target_fd, 'wb') as target_file:
if hasattr(target_file, 'seekable'):
seekable = target_file.seekable()
else:
try:
seekable = not lseek(target_fd, 0, SEEK_CUR) < 0 \
and S_ISREG(fstat(target_fd).st_mode)
except OSError as e:
if errorcode.get(e.errno) != 'ESPIPE':
raise
seekable = False
open_args = { 'fileobj': target_file }
if args.compression is None:
open_args['mode'] = 'w' if seekable else 'w|'
else:
if seekable:
mode = 'w:{}'
open_args['compresslevel'] = args.compress_level
_, ext = ospath_splitext(target_path)
if ext.lower() != '{}{}'.format(ospath_extsep, args.compression):
_LOGGER.warning('target name "%s" doesn\'t match compression type ("%s")', target_path, args.compression)
else:
mode = 'w|{}'
_LOGGER.warning('target "%s" is not seekable, ignoring compression level (%d)', target_path, args.compress_level)
open_args['mode'] = mode.format(args.compression)
with tarfile_open(**open_args) as tar_file:
dimgx_extractlayers(dc, layers, tar_file, top_most_layer_id)
开发者ID:pombredanne,项目名称:py-dimgx,代码行数:47,代码来源:cmd.py
示例15: __init__
def __init__(self, device, baudrate=None, parity=None, rtscts=False,
debug=False):
self._termstates = []
if not mswin and stdout.isatty():
self._termstates = [(fd, tcgetattr(fd)) for fd in
(stdin.fileno(), stdout.fileno(),
stderr.fileno())]
self._device = device
self._baudrate = baudrate or self.DEFAULT_BAUDRATE
self._port = self._open_port(self._device, self._baudrate, parity,
rtscts, debug)
self._resume = False
self._silent = False
self._rxq = deque()
self._rxe = Event()
self._debug = debug
register(self._cleanup)
开发者ID:eblot,项目名称:pyftdi,代码行数:17,代码来源:pyterm.py
示例16: daemonize
def daemonize():
from os import fork, chdir, setsid, umask, getpid, dup2
from sys import stdout, stderr, stdin, exit
if fork():
exit(0)
chdir("/")
setsid()
umask(0)
if fork():
exit(0)
stdout.flush()
stderr.flush()
n1 = open('/dev/null', 'r')
n2 = open('/dev/null', 'w')
dup2(n1.fileno(), stdin.fileno())
dup2(n2.fileno(), stdout.fileno())
dup2(n2.fileno(), stderr.fileno())
return getpid()
开发者ID:ymv,项目名称:zhk,代码行数:18,代码来源:zhk.py
示例17: doFork
def doFork():
if os.fork(): exit(0)
os.umask(0)
os.setsid()
if os.fork(): exit(0)
stdout.flush()
stderr.flush()
si = os.open('/dev/null')
so = os.open('/dev/null','w')
se = os.open('/dev/null','w')
os.dup2(si.fileno(),stdin.fileno())
os.dup2(so.fileno(),stdout.fileno())
os.dup2(se.fileno(),stderr.fileno())
# drop privs
# setgid(getgrnam(group).gr_gid)
# setuid(getpwnam(user).pw_uid)
os.chdir('/')
开发者ID:AngeloRivera,项目名称:Dencoder,代码行数:21,代码来源:dencoder.py
示例18: daemonize
def daemonize(self):
'''
Forks then sets up the I/O stream for the daemon
'''
self.fork()
chdir(getcwd())
setsid()
umask(0)
self.fork()
stdout.flush()
stderr.flush()
si= file(self.stdin, 'w+')
so= file(self.stdout, 'a+')
se= file(self.stderr, 'a+', 0)
dup2(si.fileno(), stdin.fileno())
dup2(so.fileno(), stdout.fileno())
dup2(se.fileno(), stderr.fileno())
register(self.delPID)
self.setPID()
开发者ID:richardjmarini,项目名称:Impetus-old,代码行数:24,代码来源:daemon.py
示例19: int
#!/usr/bin/env python
from __future__ import print_function
from datetime import datetime
from struct import pack
from time import sleep
from sys import stdout, stderr
import os
sleep_time = int(os.environ.get('SLEEP_TIME', 10))
# Turn off stdout buffering
stdout = os.fdopen(stdout.fileno(), 'w', 0)
print('clock controller: syncing every %d seconds' % sleep_time, file=stderr)
while True:
now = datetime.now()
# convert to 12-hour
hour = 12 if ((now.hour % 12) == 0) else (now.hour % 12)
minute, second, pm = now.minute, now.second, (now.hour > 11)
time_msg = ('[', pack('>BBBB', hour, minute, second, pm), '%]')
print(*time_msg, sep='', file=stdout)
sleep(sleep_time)
开发者ID:robbles,项目名称:conduit.js,代码行数:27,代码来源:clock.py
示例20: countdown
def countdown(
stdscr,
alt_format=False,
font=DEFAULT_FONT,
blink=False,
critical=3,
quit_after=None,
text=None,
timespec=None,
title=None,
voice=None,
no_seconds=False,
no_text_magic=True,
no_figlet=False,
no_window_title=False,
**kwargs
):
try:
sync_start, target = parse_timestr(timespec)
except ValueError:
click.echo("Unable to parse TIME value '{}'".format(timespec))
exit(64)
curses_lock, input_queue, quit_event = setup(stdscr)
figlet = Figlet(font=font)
if title and not no_figlet:
title = figlet.renderText(title)
input_thread = Thread(
args=(stdscr, input_queue, quit_event, curses_lock),
target=input_thread_body,
)
input_thread.start()
seconds_total = seconds_left = int(ceil((target - datetime.now()).total_seconds()))
try:
while seconds_left > 0 or blink or text:
figlet.width = stdscr.getmaxyx()[1]
if alt_format:
countdown_text = format_seconds_alt(
seconds_left, seconds_total, hide_seconds=no_seconds)
else:
countdown_text = format_seconds(seconds_left, hide_seconds=no_seconds)
if seconds_left > 0:
with curses_lock:
if not no_window_title:
os.write(stdout.fileno(), "\033]2;{0}\007".format(countdown_text).encode())
stdscr.erase()
draw_text(
stdscr,
countdown_text if no_figlet else figlet.renderText(countdown_text),
color=1 if seconds_left <= critical else 0,
fallback=countdown_text,
title=title,
)
if seconds_left <= 10 and voice:
voice_exec = "echo"
if os.path.exists("/usr/bin/say"):
voice_exec = "/usr/bin/say"
elif os.path.exists("/usr/bin/espeak"):
voice_exec = "/usr/bin/espeak"
Popen([voice_exec, "-v", voice, str(seconds_left)])
# We want to sleep until this point of time has been
# reached:
sleep_target = sync_start + timedelta(seconds=1)
# If sync_start has microsecond=0, it might happen that we
# need to skip one frame (the very first one). This occurs
# when the program has been startet at, say,
# "2014-05-29 20:27:57.930651". Now suppose rendering the
# frame took about 0.2 seconds. The real time now is
# "2014-05-29 20:27:58.130000" and sleep_target is
# "2014-05-29 20:27:58.000000" which is in the past! We're
# already too late. We could either skip that frame
# completely or we can draw it right now. I chose to do the
# latter: Only sleep if haven't already missed our target.
now = datetime.now()
if sleep_target > now and seconds_left > 0:
try:
input_action = input_queue.get(True, (sleep_target - now).total_seconds())
except Empty:
input_action = None
if input_action == INPUT_PAUSE:
pause_start = datetime.now()
with curses_lock:
stdscr.erase()
draw_text(
stdscr,
countdown_text if no_figlet else figlet.renderText(countdown_text),
color=3,
fallback=countdown_text,
)
input_action = input_queue.get()
if input_action == INPUT_PAUSE:
sync_start += (datetime.now() - pause_start)
target += (datetime.now() - pause_start)
if input_action == INPUT_EXIT: # no elif here! input_action may have changed
break
#.........这里部分代码省略.........
开发者ID:dav-,项目名称:termdown,代码行数:101,代码来源:termdown.py
注:本文中的sys.stdout.fileno函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论