• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python pty.openpty函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pty.openpty函数的典型用法代码示例。如果您正苦于以下问题:Python openpty函数的具体用法?Python openpty怎么用?Python openpty使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了openpty函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: run

    def run(self):
        try:
            # use a pty. This enforces unbuffered output and thus
            # allows for fast update
            master_out_fd, slave_out_fd = pty.openpty()
            master_in_fd, self.slave_in_fd = pty.openpty()
            self.proc = subprocess.Popen(self.cmd, stdin=master_in_fd, stdout=slave_out_fd, stderr=slave_out_fd)
        except:
            self.finished.emit(False)
            return

        # listen to process' output
        while self.proc.poll() == None:
            try:
                if select.select([master_out_fd], [], [master_out_fd], .1)[0]:
                    output = os.read(master_out_fd, 100)
                    if output: self.output(str(output, "utf-8"))
            except InterruptedError:
                pass

        os.close(master_out_fd)
        os.close(slave_out_fd)

        os.close(master_in_fd)
        os.close(self.slave_in_fd)

        self.finished.emit(self.proc.wait() == 0)
开发者ID:ftCommunity,项目名称:ftcommunity-TXT,代码行数:27,代码来源:netreq.py


示例2: __init__

    def __init__(self, ip=get_ipython()):
        # For the registration
        self.shell = ip

        self.nbOutStream = sys.stdout
        self.nbErrStream = sys.stderr

        self.pyOutStream = sys.__stdout__
        self.pyErrStream = sys.__stderr__

        self.outStreamPipe_in = pty.openpty()[1]
        self.errStreamPipe_in = pty.openpty()[1]

        os.dup2(self.outStreamPipe_in, self.pyOutStream.fileno())
        os.dup2(self.errStreamPipe_in, self.pyErrStream.fileno())

        self.ioHandler = handlers.IOHandler()
        self.flag = True
        self.outString = ""
        self.errString = ""

        self.asyncCapturer = handlers.Runner(self.syncCapture)

        self.isFirstPreExecute = True
        self.isFirstPostExecute = True
开发者ID:ThomasStevensonQM,项目名称:root,代码行数:25,代码来源:utils.py


示例3: __init__

    def __init__(self, cmd, capturestderr=False, bufsize=-1):
        """ Popen3 class (isn't this actually Popen4, capturestderr = False?) that uses ptys
        instead of pipes, to allow inline reading (instead of potential i/o buffering) of
        output from the child process. It also stores the cmd its running (as a string)
        and the thread that created the object, for later use """
        import pty

        # NOTE: most of this is cutnpaste from Popen3 minus the openpty calls
        # popen2._cleanup()
        self.prettyCmd = cmd
        cmd = self.parseCmdToList(cmd)
        self.cmd = cmd
        self.threadIdent = thread.get_ident()

        p2cread, p2cwrite = pty.openpty()
        c2pread, c2pwrite = pty.openpty()
        if capturestderr:
            errout, errin = pty.openpty()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            if capturestderr:
                os.dup2(errin, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, "w", bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, "r", bufsize)
        if capturestderr:
            os.close(errin)
            self.childerr = os.fdopen(errout, "r", bufsize)
        else:
            self.childerr = None
开发者ID:pjenvey,项目名称:hellanzb,代码行数:35,代码来源:build_util.py


示例4: mkpty

def mkpty():
    master1, slave = pty.openpty()
    slaveName1 = os.ttyname(slave)
    master2, slave = pty.openpty()
    slaveName2 = os.ttyname(slave)
    print '\nslave device names: ', slaveName1, slaveName2
    return master1, master2
开发者ID:wangqiang624731186,项目名称:wangqiang,代码行数:7,代码来源:com.py


示例5: tty_capture

def tty_capture(cmd, bytes_input):
    """Capture the output of cmd with bytes_input to stdin,
    with stdin, stdout and stderr as TTYs."""
    mo, so = pty.openpty()  # provide tty to enable line-buffering
    me, se = pty.openpty()
    mi, si = pty.openpty()
    fdmap = {mo: 'stdout', me: 'stderr', mi: 'stdin'}

    p = subprocess.Popen(
        cmd, bufsize=1, stdin=si, stdout=so, stderr=se, close_fds=True)
    os.write(mi, bytes_input)

    timeout = .04  # seconds
    res = {'stdout': b'', 'stderr': b''}
    while True:
        ready, _, _ = select.select([mo, me], [], [], timeout)
        if ready:
            for fd in ready:
                data = os.read(fd, 512)
                if not data:
                    break
                res[fdmap[fd]] += data
        elif p.poll() is not None:  # select timed-out
            break  # p exited
    for fd in [si, so, se, mi, mo, me]:
        os.close(fd)  # can't do it sooner: it leads to errno.EIO error
    p.wait()
    return p.returncode, res['stdout'], res['stderr']
开发者ID:F001,项目名称:deno,代码行数:28,代码来源:permission_prompt_test.py


示例6: __open_process

    def __open_process(self, logger, cmd, env, cwd, **kwargs):
        if logger:
            logger.info('Running command: %r' % cmd)
            if env:
                logger.info('With env (showing only all caps):')
                for key, val in env.iteritems():
                    if re.match(r'[A-Z0-9_]+', key):
                        logger.info('  %-20s = %s' % (key,val))
            logger.info('With CWD: %s' % (cwd or os.getcwd()))
            logger.info('-' * 100)

        # provide tty to enable line buffering.
        master_out_fd, slave_out_fd = pty.openpty()
        master_err_fd, slave_err_fd = pty.openpty()

        process = subprocess.Popen(
            cmd,
            cwd=cwd,
            env=env,
            shell=False,
            bufsize=1,
            stderr=slave_err_fd,
            stdout=slave_out_fd,
            close_fds=True)
        return (process, (master_out_fd, slave_out_fd), (master_err_fd, slave_err_fd))
开发者ID:hansl,项目名称:habitat,代码行数:25,代码来源:executer.py


示例7: __init__

	def __init__(self, args, cwd=None, env=None, stdin=False,
			echo_stdout=True, echo_stderr=True,
			capture_stdout=False, capture_stderr=False):
		if cwd is not None:
			cwd = str(cwd)

		logging.debug('Running {}...'.format(args[0]))
		logging.debug('Parameters: {}'.format(args))
		logging.debug('Working directory: {}'.format(cwd))
		logging.debug('Environment: {}'.format(env))
		logging.debug('Echo: stdout: {}, stderr: {}'.format(echo_stdout, echo_stderr))

		self.args = args

		self.buffer_stdout = bytearray()
		self.buffer_stderr = bytearray()

		master_stdout, slave_stdout = pty.openpty()
		master_stderr, slave_stderr = pty.openpty()

		Process.set_nonblocking(master_stdout)
		Process.set_nonblocking(master_stderr)

		self.process = subprocess.Popen(args, bufsize=0, cwd=cwd, env=env,
			stdin=stdin, stdout=slave_stdout, stderr=slave_stderr)

		pass_to_stdout = sys.stdout.buffer if echo_stdout else None
		pass_to_stderr = sys.stderr.buffer if echo_stderr else None

		self._reader_stdout = self.reader(master_stdout, capture_stdout,
									self.buffer_stdout, pass_to_stdout)
		self._reader_stderr = self.reader(master_stderr, capture_stderr,
									self.buffer_stderr, pass_to_stderr)
开发者ID:mplucinski,项目名称:builder,代码行数:33,代码来源:process.py


示例8: _create_pty_or_pipe

def _create_pty_or_pipe(copy_term_size=None):
	"""
	Try to create a pty and if then fails then create a normal
	pipe instead.

	@param copy_term_size: If a tty file descriptor is given
		then the term size will be copied to the pty.
	@type copy_term_size: int
	@rtype: tuple
	@returns: A tuple of (is_pty, master_fd, slave_fd) where
		is_pty is True if a pty was successfully allocated, and
		False if a normal pipe was allocated.
	"""

	got_pty = False

	global _disable_openpty, _fbsd_test_pty

	if _fbsd_test_pty and not _disable_openpty:
		# Test for python openpty breakage after freebsd7 to freebsd8
		# upgrade, which results in a 'Function not implemented' error
		# and the process being killed.
		pid = os.fork()
		if pid == 0:
			pty.openpty()
			os._exit(os.EX_OK)
		pid, status = os.waitpid(pid, 0)
		if (status & 0xff) == 140:
			_disable_openpty = True
		_fbsd_test_pty = False

	if _disable_openpty:
		master_fd, slave_fd = os.pipe()
	else:
		try:
			master_fd, slave_fd = pty.openpty()
			got_pty = True
		except EnvironmentError as e:
			_disable_openpty = True
			writemsg("openpty failed: '%s'\n" % str(e),
				noiselevel=-1)
			del e
			master_fd, slave_fd = os.pipe()

	if got_pty:
		# Disable post-processing of output since otherwise weird
		# things like \n -> \r\n transformations may occur.
		mode = termios.tcgetattr(slave_fd)
		mode[1] &= ~termios.OPOST
		termios.tcsetattr(slave_fd, termios.TCSANOW, mode)

	if got_pty and \
		copy_term_size is not None and \
		os.isatty(copy_term_size):
		rows, columns = get_term_size()
		set_term_size(rows, columns, slave_fd)

	return (got_pty, master_fd, slave_fd)
开发者ID:zy-sunshine,项目名称:easymgc,代码行数:58,代码来源:_pty.py


示例9: _execute_process_pty

def _execute_process_pty(cmd, cwd, env, shell, stderr_to_stdout=True):
    stdout_master, stdout_slave = None, None
    stderr_master, stderr_slave = None, None
    fds_to_close = [stdout_master, stdout_slave, stderr_master, stderr_slave]
    try:
        stdout_master, stdout_slave = pty.openpty()
        if stderr_to_stdout:
            stderr_master, stderr_slave = stdout_master, stdout_slave
        else:
            stderr_master, stderr_slave = pty.openpty()

        p = None
        while p is None:
            try:
                p = Popen(
                    cmd,
                    stdin=stdout_slave, stdout=stderr_slave, stderr=STDOUT,
                    cwd=cwd, env=env, shell=shell, close_fds=False)
            except OSError as exc:
                # This can happen if a file you are trying to execute is being
                # written to simultaneously on Linux
                # (doesn't appear to happen on OS X)
                # It seems like the best strategy is to just try again later
                # Worst case is that the file eventually gets deleted, then a
                # different OSError would occur.
                if 'Text file busy' in '{0}'.format(exc):
                    # This is a transient error, try again shortly
                    time.sleep(0.01)
                    continue
                raise
        # This causes the below select to exit when the subprocess closes.
        # On Linux, this sometimes causes Errno 5 OSError's when os.read
        # is called from within _yield_data, so on Linux _yield_data
        # catches and passes on that particular OSError.
        os.close(stdout_slave)
        if not stderr_to_stdout:
            os.close(stderr_slave)

        left_overs = {stdout_master: b'', stderr_master: b''}

        fds = [stdout_master]
        if stderr_master != stdout_master:
            fds.append(stderr_master)
    finally:
        # Make sure we don't leak file descriptors
        _close_fds(fds_to_close)

    # The linesep with pty's always seems to be "\r\n", even on OS X
    return _yield_data(p, fds, left_overs, "\r\n", fds_to_close)
开发者ID:osrf,项目名称:osrf_pycommon,代码行数:49,代码来源:execute_process_pty.py


示例10: run

    def run(self):
        self.ioloop = IOLoop.instance()
        (master_fd, slave_fd) = pty.openpty()

        # make stdout, stderr non-blocking
        fcntl.fcntl(master_fd, fcntl.F_SETFL,
            fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)

        self.master_fd = master_fd
        self.master = os.fdopen(master_fd)

        # listen to stdout, stderr
        self.ioloop.add_handler(master_fd, self._handle_subprocess_stdout,
            self.ioloop.READ)

        slave = os.fdopen(slave_fd)
        self.kwargs["stdout"] = slave
        self.kwargs["stderr"] = slave
        self.kwargs["close_fds"] = True
        self.pipe = subprocess.Popen(*self.args, **self.kwargs)

        self.stdin = self.pipe.stdin

        # check for process exit
        self.wait_callback = PeriodicCallback(self._wait_for_end, 250)
        self.wait_callback.start()
开发者ID:yipdw,项目名称:hyves-grab,代码行数:26,代码来源:pipeline.py


示例11: __init__

 def __init__(self, cmd, logger):
     self.thread = threading.Thread(target=TestProcess._run, args=(weakref.proxy(self),))
     self.pty_master, self.pty_slave = pty.openpty()
     self.logger = logger
     self.process = None
     self.traces = ""
     self.cmd = cmd
开发者ID:jpellikk,项目名称:ebnlib,代码行数:7,代码来源:ftest.py


示例12: call_and_peek_output

def call_and_peek_output(cmd, shell=False):
    import pty
    import subprocess
    master, slave = pty.openpty()
    print cmd
    p = subprocess.Popen(cmd, shell=shell, stdin=None, stdout=slave, close_fds=True)
    os.close(slave)
    line = ""
    while True:
        try:
            ch = os.read(master, 1)
        except OSError:
            # We get this exception when the spawn process closes all references to the
            # pty descriptor which we passed him to use for stdout
            # (typically when it and its childs exit)
            break
        line += ch
        if ch == '\n':
            yield line
            line = ""
    if line:
        yield line

    ret = p.wait()
    if ret:
        raise subprocess.CalledProcessError(ret, cmd)
开发者ID:dpapadi,项目名称:multi_totem,代码行数:26,代码来源:get_flowspace.py


示例13: spawn

    def spawn(self, argv=None, term=None):
        if argv is None:
            if "SHELL" in os.environ:
                argv = [os.environ["SHELL"]]
            elif "PATH" in os.environ:  # searching sh in the path. It can be unusual like /system/bin/sh on android
                for shell in ["bash", "sh", "ksh", "zsh", "csh", "ash"]:
                    for path in os.environ["PATH"].split(":"):
                        fullpath = os.path.join(path.strip(), shell)
                        if os.path.isfile(fullpath):
                            argv = [fullpath]
                            break
                    if argv:
                        break
        if not argv:
            argv = ["/bin/sh"]

        if term is not None:
            os.environ["TERM"] = term

        master, slave = pty.openpty()
        self.slave = slave
        self.master = os.fdopen(master, "rb+wb", 0)  # open file in an unbuffered mode
        flags = fcntl.fcntl(self.master, fcntl.F_GETFL)
        assert flags >= 0
        flags = fcntl.fcntl(self.master, fcntl.F_SETFL, flags | os.O_NONBLOCK)
        assert flags >= 0
        self.prog = subprocess.Popen(
            shell=False, args=argv, stdin=slave, stdout=slave, stderr=subprocess.STDOUT, preexec_fn=prepare
        )
开发者ID:kai5263499,项目名称:pupy,代码行数:29,代码来源:ptyshell.py


示例14: __init_streams

    def __init_streams(self, stdin, stdout, stderr, unbuffered):
        self.stdin = self.stdout = self.stderr = None
        
        if unbuffered:
            master, slave = pty.openpty()

        if stdin is PIPE:
            self._child_stdin, self._stdin = (slave, master) if unbuffered else os.pipe()
            self.stdin = os.fdopen(self._stdin, 'w')
        elif isinstance(stdin, int):
            self._child_stdin, self._stdin = stdin, -1
        elif stdin is not None:
            self._child_stdin, self._stdin = stdin.fileno(), -1
        else:
            self._child_stdin = self._stdin = -1

        if stdout is PIPE:
            self._stdout, self._child_stdout = (master, slave) if unbuffered else os.pipe()
            self.stdout = os.fdopen(self._stdout, 'r')
        elif isinstance(stdout, int):
            self._stdout, self._child_stdout = -1, stdout
        elif stdout is not None:
            self._stdout, self._child_stdout = -1, stdout.fileno()
        else:
            self._stdout = self._child_stdout = -1

        if stderr is PIPE:
            self._stderr, self._child_stderr = os.pipe()
            self.stderr = os.fdopen(self._stderr, 'r')
        elif isinstance(stderr, int):
            self._stderr, self._child_stderr = -1, stderr
        elif stderr is not None:
            self._stderr, self._child_stderr = -1, stderr.fileno()
        else:
            self._stderr = self._child_stderr = -1
开发者ID:rayeya,项目名称:judge,代码行数:35,代码来源:sandbox.py


示例15: _has_sudo

    def _has_sudo(self, result):
        _master, slave = pty.openpty()
        os.setsid()
        fcntl.ioctl(slave, termios.TIOCSCTTY, 0)

        out, err, exit = run_command(['sudo', '-l', '-U', self.user[USER_NAME],
                                      'sudo'])
        if exit == 0:
            debug("User %s is allowed to run sudo" % self.user[USER_NAME])
            # sudo allows a wide range of configurations, such as controlling
            # which binaries the user can execute with sudo.
            # For now, we will just check whether the user is allowed to run
            # any command with sudo.
            out, err, exit = run_command(['sudo', '-l', '-U',
                                          self.user[USER_NAME]])
            for line in out.split('\n'):
                if line and re.search("(ALL)", line):
                    result.value = 1
                    debug("User %s can run any command with sudo" %
                          result.value)
                    return
            debug("User %s can only run some commands with sudo" %
                  self.user[USER_NAME])
        else:
            debug("User %s is not allowed to run sudo" % self.user[USER_NAME])
开发者ID:EmperorBeezie,项目名称:kimchi,代码行数:25,代码来源:auth.py


示例16: handle

    def handle(self):
        masterFd, slaveFd = pty.openpty()

        try:
            # if we're not in the main thread, this will not work.
            signal.signal(signal.SIGTTOU, signal.SIG_IGN)
        except:
            pass
        pid = os.fork()
        if pid:
            os.close(masterFd)
            raise SocketConnected(slaveFd, pid)
            # make parent process the pty slave - the opposite of
            # pty.fork().  In this setup, the parent process continues
            # to act normally, while the child process performs the
            # logging.  This makes it simple to kill the logging process
            # when we are done with it and restore the parent process to
            # normal, unlogged operation.
        else:
            os.close(slaveFd)
            try:
                protocol = TelnetServerProtocolHandler(self.request, masterFd)
                protocol.handle()
            finally:
                os.close(masterFd)
                os._exit(1)
开发者ID:fedora-conary,项目名称:conary,代码行数:26,代码来源:epdb_server.py


示例17: __enter__

    def __enter__(self):
        # prepare standard file descriptors for raw manipulation
        self.was_blocking = os.get_blocking(0)
        os.set_blocking(0, False)
        try:
            self.terminal_attr_stdin = termios.tcgetattr(0)
            self.terminal_attr_stdout = termios.tcgetattr(1)
            self.terminal_attr_stderr = termios.tcgetattr(2)
            tty.setraw(0)
            tty.setraw(1)
            tty.setraw(2)
        except termios.error:  # probably redirected
            self.terminal_attr_stdin = None

        # redirect standard file descriptors to new PTY
        master, slave = pty.openpty()
        os.set_blocking(master, False)
        self.real_stdin = os.dup(0)
        self.real_stdout = os.dup(1)
        self.real_stderr = os.dup(2)
        os.close(0)
        os.close(1)
        os.close(2)
        os.dup2(slave, 0)
        os.dup2(slave, 1)
        os.dup2(slave, 2)
        os.close(slave)
        self.terminal_pipe = master

        # start REPL in separate thread
        threading.Thread(target=repl, args=(self,), daemon=True).start()

        return self
开发者ID:qsantos,项目名称:spyce,代码行数:33,代码来源:terminal.py


示例18: test_signal_failure

def test_signal_failure(monkeypatch):
    import os
    import pty
    import signal
    from pyrepl.unix_console import UnixConsole

    def failing_signal(a, b):
        raise ValueError

    def really_failing_signal(a, b):
        raise AssertionError

    mfd, sfd = pty.openpty()
    try:
        with sane_term():
            c = UnixConsole(sfd, sfd)
            c.prepare()
            c.restore()
            monkeypatch.setattr(signal, 'signal', failing_signal)
            c.prepare()
            monkeypatch.setattr(signal, 'signal', really_failing_signal)
            c.restore()
    finally:
        os.close(mfd)
        os.close(sfd)
开发者ID:abhinavthomas,项目名称:pypy,代码行数:25,代码来源:test_bugs.py


示例19: lock

    def lock(self):
        """Use conserver to lock the machine."""

        # find out current status of console
        debug.verbose('executing "console -i %s" to check state' %
                      self.get_machine_name())
        proc = subprocess.Popen(["console", "-i", self.get_machine_name()],
                                stdout=subprocess.PIPE)
        line = proc.communicate()[0]
        assert(proc.returncode == 0)

        # check that nobody else has it open for writing
        myuser = getpass.getuser()
        parts = line.strip().split(':')
        conname, child, contype, details, users, state = parts[:6]
        if users:
            for userinfo in users.split(','):
                mode, username, host, port = userinfo.split('@')[:4]
                if 'w' in mode and username != myuser:
                    raise MachineLockedError # Machine is not free

        # run a console in the background to 'hold' the lock and read output
        debug.verbose('starting "console %s"' % self.get_machine_name())
        # run on a PTY to work around terminal mangling code in console
        (self.masterfd, slavefd) = pty.openpty()
        self.lockprocess = subprocess.Popen(["console", self.get_machine_name()],
                                            close_fds=True,
                                            stdout=slavefd, stdin=slavefd)
        os.close(slavefd)
        # XXX: open in binary mode with no buffering
        # otherwise select.select() may block when there is data in the buffer
        self.console_out = os.fdopen(self.masterfd, 'rb', 0)
开发者ID:Karamax,项目名称:arrakis,代码行数:32,代码来源:uw.py


示例20: _node_ssh_output

def _node_ssh_output(args):
    # ssh must run with stdin attached to a tty
    master, slave = pty.openpty()

    cmd = ('ssh-agent /bin/bash -c ' +
           '"ssh-add /host-home/.vagrant.d/insecure_private_key ' +
           '2> /dev/null && dcos node ssh --option StrictHostKeyChecking=no' +
           ' {}"').format(' '.join(args))
    proc = subprocess.Popen(cmd,
                            stdin=slave,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            preexec_fn=os.setsid,
                            close_fds=True,
                            shell=True)
    os.close(slave)

    # wait for the ssh connection
    time.sleep(8)

    # kill the whole process group
    os.killpg(os.getpgid(proc.pid), 15)

    os.close(master)
    return proc.communicate()
开发者ID:gismoranas,项目名称:dcos-cli,代码行数:25,代码来源:test_node.py



注:本文中的pty.openpty函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pty.spawn函数代码示例发布时间:2022-05-25
下一篇:
Python pty.fork函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap