• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python stdout.fileno函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sys.stdout.fileno函数的典型用法代码示例。如果您正苦于以下问题:Python fileno函数的具体用法?Python fileno怎么用?Python fileno使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了fileno函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: daemonize

   def daemonize(self):
      """
      Forks the process(es) from the controlling terminal
      and redirects I/O streams for logging.
      """

      self.fork()

      chdir(getcwd())
      setsid()
      umask(0)
  
      self.fork()
      stdout.flush()
      stderr.flush()

      si= file(self.stdin, 'w+')
      so= file(self.stdout, 'a+')
      se= file(self.stderr, 'a+', 0)

      dup2(si.fileno(), stdin.fileno())
      dup2(so.fileno(), stdout.fileno())
      dup2(se.fileno(), stderr.fileno())

      register(self.del_pid)
      self.set_pid()
开发者ID:richardjmarini,项目名称:Impetus,代码行数:26,代码来源:impetus.py


示例2: parse_args

def parse_args():
    p = argparse.ArgumentParser(description='Create a git repository with the history of the specified Wikipedia article.')
    p.add_argument('article_name')
    p.add_argument('-n', '--no-import', dest='doimport', default=True, action='store_false',
                   help="Don't invoke git fast-import; only generate fast-import data stream")
    p.add_argument('-o', '--out', help='Output directory or fast-import stream file')
    g=p.add_mutually_exclusive_group()
    g.add_argument('--lang', default=lang, help='Wikipedia language code (default %(default)s)')
    g.add_argument('--site', help='Alternate site (e.g. http://commons.wikimedia.org[/w/])')

    args = p.parse_args()
    if not args.doimport:
        if args.out is None:
            # http://stackoverflow.com/a/2374507/20789
            if platform == "win32":
                import os, msvcrt
                msvcrt.setmode(stdout.fileno(), os.O_BINARY)
            args.out = stdout
        else:
            try:
                args.out = argparse.FileType('wb')(args.out)
            except argparse.ArgumentTypeError as e:
                p.error(e.args[0])

    return p, args
开发者ID:dualsky,项目名称:wp2git,代码行数:25,代码来源:wp2git.py


示例3: get_terminal_size_posix

def get_terminal_size_posix():
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            cr = struct.unpack('hh',
                               fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
            return cr
        except:
            pass
    cr = ioctl_GWINSZ(stdin.fileno()) or \
         ioctl_GWINSZ(stdout.fileno()) or \
         ioctl_GWINSZ(stderr.fileno())
    if not cr:
        try:
            with os.open(os.ctermid(), os.O_RDONLY) as fd:
            	cr = ioctl_GWINSZ(fd)
        except:
            pass
    if not cr:
        try:
            cr = (os.environ['LINES'], os.environ['COLUMNS'])
        except:
            pass
    if not cr:
        raise TerminalError('cannot determine terminal size from POSIX')
    return int(cr[1]), int(cr[0])
开发者ID:edrevo,项目名称:fiware-cosmos-platform,代码行数:27,代码来源:terminal.py


示例4: _daemonize

 def _daemonize(self):
     try:
         pid = os.fork()
         if pid > 0:
             exit()
     except OSError as e:
         error(_('Error entering daemon mode: %s') % e.strerror)
         exit()
     os.chdir('/')
     os.setsid()
     os.umask(0)
     stdout.flush()
     stderr.flush()
     si = open(os.devnull, 'r')
     so = open(os.devnull, 'a+')
     se = open(os.devnull, 'a+')
     os.dup2(si.fileno(), stdin.fileno())
     os.dup2(so.fileno(), stdout.fileno())
     os.dup2(se.fileno(), stderr.fileno())
     on_exit(self._quit)
     old_log = getLogger()
     if old_log.handlers:
         for handler in old_log.handlers:
             old_log.removeHandler(handler)
     log(filename=self.logfile, level=self.loglevel,
         format='%(asctime)s %(levelname)-8s %(message)s')
     self._set_pid()
开发者ID:debomatic,项目名称:debomatic,代码行数:27,代码来源:process.py


示例5: darwin_terminal_shell_is_focused

def darwin_terminal_shell_is_focused():
    focused_tty = osascript_tell(
        'Terminal',
        'tty of (first tab of (first window whose frontmost is true) '
        'whose selected is true)',
    )
    return focused_tty == ttyname(stdout.fileno())
开发者ID:dixudx,项目名称:ntfy,代码行数:7,代码来源:terminal.py


示例6: daemonize

    def daemonize(self):
        try:
            pid = fork()
            if pid > 0:
                # exit first parent
                _exit(0)
        except OSError as e:
            stderr.write(
                "fork #1 failed: {0:d} ({1:s})\n".format(
                    e.errno, str(e)
                )
            )

            raise SystemExit(1)

        # decouple from parent environment
        chdir(self.path)
        setsid()
        umask(0)

        # do second fork
        try:
            pid = fork()
            if pid > 0:
                # exit from second parent
                _exit(0)
        except OSError as e:
            stderr.write(
                "fork #2 failed: {0:d} ({1:s})\n".format(
                    e.errno, str(e)
                )
            )

            raise SystemExit(1)

        # redirect standard file descriptors
        stdout.flush()
        stderr.flush()

        maxfd = getrlimit(RLIMIT_NOFILE)[1]
        if maxfd == RLIM_INFINITY:
            maxfd = 2048

        closerange(0, maxfd)

        si = open(self.stdin, "r")
        so = open(self.stdout, "a+")
        se = open(self.stderr, "a+")

        dup2(si.fileno(), stdin.fileno())
        dup2(so.fileno(), stdout.fileno())
        dup2(se.fileno(), stderr.fileno())

        self.fire(writepid())
        self.fire(daemonized(self))
开发者ID:AdricEpic,项目名称:circuits,代码行数:55,代码来源:daemon.py


示例7: read_data

def read_data(cmd_txt, parm,  server_addr, fname, skip, get):
    res = b""
    try:
        s = socket()
        s.settimeout(5)
        s.connect(server_addr)
        if s:
# pack & send command
            cmd = cmd_txt + parm
            cmd = pack("<l%ds" % len(cmd), len(cmd), cmd.encode())
            s.send(cmd)
# get length of response
            ss = s.recv(4)
            size = unpack("<l", ss)[0]
# skip header
            hdr_size = len(cmd_txt) + 4
            name = s.recv(hdr_size) 
#            stderr.write("Command %s, parm %s, Size %d, Header_size %d\n" % (cmd_txt, parm, size, hdr_size))
            size -= hdr_size
# open file, if required
            if fname != None:
                if fname:
                    f = open(fname, "wb")
                else:
                    f = fdopen(stdout.fileno(), 'wb') # binary stdout
# if additional skip, do it
            while skip > 0:
                img = s.recv(min(skip, 4096))
                size -= len(img)
                skip -= len(img)
# check, whether not the full data is required
            if get > 0 and get < size:
                skip = size - get
                size = get
# get bulk data
            while size > 0:
                img = s.recv(min(size, 4096))
                if fname != None:
                    f.write(img)
                else:
                    res += img
                size -= len(img)
# eat up what's left                
            while skip > 0:
                img = s.recv(min(skip, 4096))
                skip -= len(img)
# and that's it
            if fname:
                f.close()
            s.close()
            return res
    except Exception as err:
        stderr.write("Error attempting to get data, reason: %s\n" % err)
        return False
开发者ID:gedankenexpt,项目名称:owonread,代码行数:54,代码来源:owonread.py


示例8: open_log

def open_log(folder, name, use_stdout=False):
    """Return a writable file-like object representing a log file.

    :arg folder: The folder to put the log file in
    :arg name: The name of the log file
    :arg use_stdout: If True, return a handle to stdout for verbose output,
        duplicated so it can be closed with impunity.

    """
    if use_stdout:
        return fdopen(dup(stdout.fileno()), 'w')
    return open(join(folder, name), 'w', 1)
开发者ID:nbstar,项目名称:dxr,代码行数:12,代码来源:utils.py


示例9: daemonize

 def daemonize(self):
     if fork(): exit(0)
     umask(0) 
     setsid() 
     if fork(): exit(0)
     stdout.flush()
     stderr.flush()
     si = file('/dev/null', 'r')
     so = file('/dev/null', 'a+')
     se = file('/dev/null', 'a+', 0)
     dup2(si.fileno(), stdin.fileno())
     dup2(so.fileno(), stdout.fileno())
     dup2(se.fileno(), stderr.fileno())
开发者ID:Acey9,项目名称:SEnginx,代码行数:13,代码来源:nx_commons.py


示例10: open_log

def open_log(config_or_tree, name, use_stdout=False):
    """Return a writable file-like object representing a log file.

    :arg config_or_tree: a Config or Tree object which tells us which folder to
        put the log file in
    :arg name: The name of the log file
    :arg use_stdout: If True, return a handle to stdout for verbose output,
        duplicated so it can be closed with impunity.

    """
    if use_stdout:
        return os.fdopen(dup(stdout.fileno()), 'w')
    return open(os.path.join(config_or_tree.log_folder, name), 'w', 1)
开发者ID:brunoschneider17,项目名称:dxr,代码行数:13,代码来源:utils.py


示例11: sudo

def sudo(command):
    """do shell "%s" with administrator privileges
    http://developer.apple.com/library/mac/#technotes/tn2065/_index.html"""
    if isatty(stdout.fileno()): # terminal
        cmd="sudo %s" % command
        r=run(cmd)
    else: # GUI
        escaped=command.replace('"','\\"')
        cmd='do shell script "%s" with administrator privileges' % escaped
        r=run("osascript",cmd)
    if r.status_code==0:
        return r.std_out
    else:
        raise Exception(r.std_err)
开发者ID:RaulSierro,项目名称:Orkiv,代码行数:14,代码来源:__init__.py


示例12: _prompting_input

    def _prompting_input(self):
        '''
        Return prompting stdin.__iter__ decorator...

        If either:
        - prompt explicitly specified.
        - both stdin/out are a tty
        '''
        if self.args.prompt or \
           isatty(stdin.fileno()) and isatty(stdout.fileno()):
            return InteractiveInput(prompt=self.args.prompt or
                                    self.DEFAULT_PROMPT)
        else:
            return stdin
开发者ID:pilona,项目名称:RPN,代码行数:14,代码来源:cli.py


示例13: parse_args

def parse_args():
    """Parses the command line arguments."""
    p = argparse.ArgumentParser(
        description='Create a git repository with the history of the specified Wikipedia article.')
    p.add_argument('article_name')

    output = p.add_argument_group('Output options')
    g = output.add_mutually_exclusive_group()
    g.add_argument('-n', '--no-import', dest='doimport', default=True, action='store_false',
                   help='Don\'t invoke git fast-import; only generate fast-import data stream')
    g.add_argument('-b', '--bare', action='store_true',
                   help='Import to a bare repository (no working tree)')
    output.add_argument('-o', '--out',
                        help='Output directory or fast-import stream file')

    site = p.add_argument_group('MediaWiki site selection')
    g = site.add_mutually_exclusive_group()
    g.add_argument('--lang', default=locale.getdefaultlocale()[0].split('_')[0] or '',
                   help='Wikipedia language code (default %(default)s)')
    g.add_argument('--site',
                   help='Alternate MediaWiki site (e.g. https://commons.wikimedia.org[/w/])')

    revision = p.add_argument_group('Revision options')
    revision.add_argument('--expandtemplates', action='store_true', help='Expand templates')
    revision.add_argument('--start', help='ISO8601 timestamp to start listing from')
    revision.add_argument('--end', help='ISO8601 timestamp to end listing at')
    revision.add_argument('--user', help='Only list revisions made by this user')
    revision.add_argument('--excludeuser', help='Exclude revisions made by this user')

    args = p.parse_args()
    if not args.doimport:
        if args.out is None:
            # https://stackoverflow.com/a/2374507/20789
            if platform == 'win32':
                import msvcrt
                msvcrt.setmode(stdout.fileno(), os.O_BINARY)
            try:
                args.out = stdout.buffer
            except AttributeError:
                args.out = stdout
        else:
            try:
                args.out = argparse.FileType('wb')(args.out)
            except argparse.ArgumentTypeError as e:
                p.error(e.args[0])

    return p, args
开发者ID:simon04,项目名称:wp2git,代码行数:47,代码来源:wp2git.py


示例14: extractlayers

def extractlayers(dc, args, layers, top_most_layer_id):
    target_path = args.target
    flags = O_WRONLY

    if target_path == _TARGET_STDOUT:
        target_fd = stdout.fileno()
    else:
        flags |= O_CREAT | O_TRUNC

        if not args.force:
            flags |= O_EXCL

        target_fd = logexception(_LOGGER, ERROR, 'unable to open target file "{}": {{e}}'.format(target_path), os_open, target_path, flags, 0o666)

    with fdopen(target_fd, 'wb') as target_file:
        if hasattr(target_file, 'seekable'):
            seekable = target_file.seekable()
        else:
            try:
                seekable = not lseek(target_fd, 0, SEEK_CUR) < 0 \
                    and S_ISREG(fstat(target_fd).st_mode)
            except OSError as e:
                if errorcode.get(e.errno) != 'ESPIPE':
                    raise

                seekable = False

        open_args = { 'fileobj': target_file }

        if args.compression is None:
            open_args['mode'] = 'w' if seekable else 'w|'
        else:
            if seekable:
                mode = 'w:{}'
                open_args['compresslevel'] = args.compress_level
                _, ext = ospath_splitext(target_path)

                if ext.lower() != '{}{}'.format(ospath_extsep, args.compression):
                    _LOGGER.warning('target name "%s" doesn\'t match compression type ("%s")', target_path, args.compression)
            else:
                mode = 'w|{}'
                _LOGGER.warning('target "%s" is not seekable, ignoring compression level (%d)', target_path, args.compress_level)

            open_args['mode'] = mode.format(args.compression)

        with tarfile_open(**open_args) as tar_file:
            dimgx_extractlayers(dc, layers, tar_file, top_most_layer_id)
开发者ID:pombredanne,项目名称:py-dimgx,代码行数:47,代码来源:cmd.py


示例15: __init__

 def __init__(self, device, baudrate=None, parity=None, rtscts=False,
              debug=False):
     self._termstates = []
     if not mswin and stdout.isatty():
         self._termstates = [(fd, tcgetattr(fd)) for fd in
                             (stdin.fileno(), stdout.fileno(),
                              stderr.fileno())]
     self._device = device
     self._baudrate = baudrate or self.DEFAULT_BAUDRATE
     self._port = self._open_port(self._device, self._baudrate, parity,
                                  rtscts, debug)
     self._resume = False
     self._silent = False
     self._rxq = deque()
     self._rxe = Event()
     self._debug = debug
     register(self._cleanup)
开发者ID:eblot,项目名称:pyftdi,代码行数:17,代码来源:pyterm.py


示例16: daemonize

def daemonize():
    from os import fork, chdir, setsid, umask, getpid, dup2
    from sys import stdout, stderr, stdin, exit
    if fork():
        exit(0)
    chdir("/")
    setsid()
    umask(0)
    if fork():
        exit(0)
    stdout.flush()
    stderr.flush()
    n1 = open('/dev/null', 'r')
    n2 = open('/dev/null', 'w')
    dup2(n1.fileno(), stdin.fileno())
    dup2(n2.fileno(), stdout.fileno())
    dup2(n2.fileno(), stderr.fileno())
    return getpid()
开发者ID:ymv,项目名称:zhk,代码行数:18,代码来源:zhk.py


示例17: doFork

def doFork():
  if os.fork(): exit(0)
  os.umask(0)
  os.setsid()
  if os.fork(): exit(0)

  stdout.flush()
  stderr.flush()

  si = os.open('/dev/null')
  so = os.open('/dev/null','w')
  se = os.open('/dev/null','w')

  os.dup2(si.fileno(),stdin.fileno())
  os.dup2(so.fileno(),stdout.fileno())
  os.dup2(se.fileno(),stderr.fileno())

  # drop privs
  # setgid(getgrnam(group).gr_gid)
  # setuid(getpwnam(user).pw_uid)
  os.chdir('/')
开发者ID:AngeloRivera,项目名称:Dencoder,代码行数:21,代码来源:dencoder.py


示例18: daemonize

   def daemonize(self):
      '''
      Forks then sets up the I/O stream for the daemon 
      '''
      self.fork()

      chdir(getcwd())
      setsid()
      umask(0)
  
      self.fork()
      stdout.flush()
      stderr.flush()

      si= file(self.stdin, 'w+')
      so= file(self.stdout, 'a+')
      se= file(self.stderr, 'a+', 0)

      dup2(si.fileno(), stdin.fileno())
      dup2(so.fileno(), stdout.fileno())
      dup2(se.fileno(), stderr.fileno())

      register(self.delPID)
      self.setPID()
开发者ID:richardjmarini,项目名称:Impetus-old,代码行数:24,代码来源:daemon.py


示例19: int

#!/usr/bin/env python

from __future__ import print_function
from datetime import datetime
from struct import pack
from time import sleep
from sys import stdout, stderr
import os

sleep_time = int(os.environ.get('SLEEP_TIME', 10))

# Turn off stdout buffering
stdout = os.fdopen(stdout.fileno(), 'w', 0)

print('clock controller: syncing every %d seconds' % sleep_time, file=stderr)

while True:
    now = datetime.now()

    # convert to 12-hour
    hour = 12 if ((now.hour % 12) == 0) else (now.hour % 12)
    minute, second, pm = now.minute, now.second, (now.hour > 11)

    time_msg = ('[', pack('>BBBB', hour, minute, second, pm), '%]')
    print(*time_msg, sep='', file=stdout)

    sleep(sleep_time)

开发者ID:robbles,项目名称:conduit.js,代码行数:27,代码来源:clock.py


示例20: countdown

def countdown(
    stdscr,
    alt_format=False,
    font=DEFAULT_FONT,
    blink=False,
    critical=3,
    quit_after=None,
    text=None,
    timespec=None,
    title=None,
    voice=None,
    no_seconds=False,
    no_text_magic=True,
    no_figlet=False,
    no_window_title=False,
    **kwargs
):
    try:
        sync_start, target = parse_timestr(timespec)
    except ValueError:
        click.echo("Unable to parse TIME value '{}'".format(timespec))
        exit(64)
    curses_lock, input_queue, quit_event = setup(stdscr)
    figlet = Figlet(font=font)

    if title and not no_figlet:
        title = figlet.renderText(title)

    input_thread = Thread(
        args=(stdscr, input_queue, quit_event, curses_lock),
        target=input_thread_body,
    )
    input_thread.start()

    seconds_total = seconds_left = int(ceil((target - datetime.now()).total_seconds()))

    try:
        while seconds_left > 0 or blink or text:
            figlet.width = stdscr.getmaxyx()[1]
            if alt_format:
                countdown_text = format_seconds_alt(
                    seconds_left, seconds_total, hide_seconds=no_seconds)
            else:
                countdown_text = format_seconds(seconds_left, hide_seconds=no_seconds)
            if seconds_left > 0:
                with curses_lock:
                    if not no_window_title:
                        os.write(stdout.fileno(), "\033]2;{0}\007".format(countdown_text).encode())
                    stdscr.erase()
                    draw_text(
                        stdscr,
                        countdown_text if no_figlet else figlet.renderText(countdown_text),
                        color=1 if seconds_left <= critical else 0,
                        fallback=countdown_text,
                        title=title,
                    )
            if seconds_left <= 10 and voice:
                voice_exec = "echo"
                if os.path.exists("/usr/bin/say"):
                    voice_exec = "/usr/bin/say"
                elif os.path.exists("/usr/bin/espeak"):
                    voice_exec = "/usr/bin/espeak"
                Popen([voice_exec, "-v", voice, str(seconds_left)])

            # We want to sleep until this point of time has been
            # reached:
            sleep_target = sync_start + timedelta(seconds=1)

            # If sync_start has microsecond=0, it might happen that we
            # need to skip one frame (the very first one). This occurs
            # when the program has been startet at, say,
            # "2014-05-29 20:27:57.930651". Now suppose rendering the
            # frame took about 0.2 seconds. The real time now is
            # "2014-05-29 20:27:58.130000" and sleep_target is
            # "2014-05-29 20:27:58.000000" which is in the past! We're
            # already too late. We could either skip that frame
            # completely or we can draw it right now. I chose to do the
            # latter: Only sleep if haven't already missed our target.
            now = datetime.now()
            if sleep_target > now and seconds_left > 0:
                try:
                    input_action = input_queue.get(True, (sleep_target - now).total_seconds())
                except Empty:
                    input_action = None
                if input_action == INPUT_PAUSE:
                    pause_start = datetime.now()
                    with curses_lock:
                        stdscr.erase()
                        draw_text(
                            stdscr,
                            countdown_text if no_figlet else figlet.renderText(countdown_text),
                            color=3,
                            fallback=countdown_text,
                        )
                    input_action = input_queue.get()
                    if input_action == INPUT_PAUSE:
                        sync_start += (datetime.now() - pause_start)
                        target += (datetime.now() - pause_start)
                if input_action == INPUT_EXIT:  # no elif here! input_action may have changed
                    break
#.........这里部分代码省略.........
开发者ID:dav-,项目名称:termdown,代码行数:101,代码来源:termdown.py



注:本文中的sys.stdout.fileno函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python stdout.flush函数代码示例发布时间:2022-05-27
下一篇:
Python stdin.readlines函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap