本文整理汇总了Python中pty.openpty函数的典型用法代码示例。如果您正苦于以下问题:Python openpty函数的具体用法?Python openpty怎么用?Python openpty使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了openpty函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: run
def run(self):
try:
# use a pty. This enforces unbuffered output and thus
# allows for fast update
master_out_fd, slave_out_fd = pty.openpty()
master_in_fd, self.slave_in_fd = pty.openpty()
self.proc = subprocess.Popen(self.cmd, stdin=master_in_fd, stdout=slave_out_fd, stderr=slave_out_fd)
except:
self.finished.emit(False)
return
# listen to process' output
while self.proc.poll() == None:
try:
if select.select([master_out_fd], [], [master_out_fd], .1)[0]:
output = os.read(master_out_fd, 100)
if output: self.output(str(output, "utf-8"))
except InterruptedError:
pass
os.close(master_out_fd)
os.close(slave_out_fd)
os.close(master_in_fd)
os.close(self.slave_in_fd)
self.finished.emit(self.proc.wait() == 0)
开发者ID:ftCommunity,项目名称:ftcommunity-TXT,代码行数:27,代码来源:netreq.py
示例2: __init__
def __init__(self, ip=get_ipython()):
# For the registration
self.shell = ip
self.nbOutStream = sys.stdout
self.nbErrStream = sys.stderr
self.pyOutStream = sys.__stdout__
self.pyErrStream = sys.__stderr__
self.outStreamPipe_in = pty.openpty()[1]
self.errStreamPipe_in = pty.openpty()[1]
os.dup2(self.outStreamPipe_in, self.pyOutStream.fileno())
os.dup2(self.errStreamPipe_in, self.pyErrStream.fileno())
self.ioHandler = handlers.IOHandler()
self.flag = True
self.outString = ""
self.errString = ""
self.asyncCapturer = handlers.Runner(self.syncCapture)
self.isFirstPreExecute = True
self.isFirstPostExecute = True
开发者ID:ThomasStevensonQM,项目名称:root,代码行数:25,代码来源:utils.py
示例3: __init__
def __init__(self, cmd, capturestderr=False, bufsize=-1):
""" Popen3 class (isn't this actually Popen4, capturestderr = False?) that uses ptys
instead of pipes, to allow inline reading (instead of potential i/o buffering) of
output from the child process. It also stores the cmd its running (as a string)
and the thread that created the object, for later use """
import pty
# NOTE: most of this is cutnpaste from Popen3 minus the openpty calls
# popen2._cleanup()
self.prettyCmd = cmd
cmd = self.parseCmdToList(cmd)
self.cmd = cmd
self.threadIdent = thread.get_ident()
p2cread, p2cwrite = pty.openpty()
c2pread, c2pwrite = pty.openpty()
if capturestderr:
errout, errin = pty.openpty()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
if capturestderr:
os.dup2(errin, 2)
self._run_child(cmd)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, "w", bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, "r", bufsize)
if capturestderr:
os.close(errin)
self.childerr = os.fdopen(errout, "r", bufsize)
else:
self.childerr = None
开发者ID:pjenvey,项目名称:hellanzb,代码行数:35,代码来源:build_util.py
示例4: mkpty
def mkpty():
master1, slave = pty.openpty()
slaveName1 = os.ttyname(slave)
master2, slave = pty.openpty()
slaveName2 = os.ttyname(slave)
print '\nslave device names: ', slaveName1, slaveName2
return master1, master2
开发者ID:wangqiang624731186,项目名称:wangqiang,代码行数:7,代码来源:com.py
示例5: tty_capture
def tty_capture(cmd, bytes_input):
"""Capture the output of cmd with bytes_input to stdin,
with stdin, stdout and stderr as TTYs."""
mo, so = pty.openpty() # provide tty to enable line-buffering
me, se = pty.openpty()
mi, si = pty.openpty()
fdmap = {mo: 'stdout', me: 'stderr', mi: 'stdin'}
p = subprocess.Popen(
cmd, bufsize=1, stdin=si, stdout=so, stderr=se, close_fds=True)
os.write(mi, bytes_input)
timeout = .04 # seconds
res = {'stdout': b'', 'stderr': b''}
while True:
ready, _, _ = select.select([mo, me], [], [], timeout)
if ready:
for fd in ready:
data = os.read(fd, 512)
if not data:
break
res[fdmap[fd]] += data
elif p.poll() is not None: # select timed-out
break # p exited
for fd in [si, so, se, mi, mo, me]:
os.close(fd) # can't do it sooner: it leads to errno.EIO error
p.wait()
return p.returncode, res['stdout'], res['stderr']
开发者ID:F001,项目名称:deno,代码行数:28,代码来源:permission_prompt_test.py
示例6: __open_process
def __open_process(self, logger, cmd, env, cwd, **kwargs):
if logger:
logger.info('Running command: %r' % cmd)
if env:
logger.info('With env (showing only all caps):')
for key, val in env.iteritems():
if re.match(r'[A-Z0-9_]+', key):
logger.info(' %-20s = %s' % (key,val))
logger.info('With CWD: %s' % (cwd or os.getcwd()))
logger.info('-' * 100)
# provide tty to enable line buffering.
master_out_fd, slave_out_fd = pty.openpty()
master_err_fd, slave_err_fd = pty.openpty()
process = subprocess.Popen(
cmd,
cwd=cwd,
env=env,
shell=False,
bufsize=1,
stderr=slave_err_fd,
stdout=slave_out_fd,
close_fds=True)
return (process, (master_out_fd, slave_out_fd), (master_err_fd, slave_err_fd))
开发者ID:hansl,项目名称:habitat,代码行数:25,代码来源:executer.py
示例7: __init__
def __init__(self, args, cwd=None, env=None, stdin=False,
echo_stdout=True, echo_stderr=True,
capture_stdout=False, capture_stderr=False):
if cwd is not None:
cwd = str(cwd)
logging.debug('Running {}...'.format(args[0]))
logging.debug('Parameters: {}'.format(args))
logging.debug('Working directory: {}'.format(cwd))
logging.debug('Environment: {}'.format(env))
logging.debug('Echo: stdout: {}, stderr: {}'.format(echo_stdout, echo_stderr))
self.args = args
self.buffer_stdout = bytearray()
self.buffer_stderr = bytearray()
master_stdout, slave_stdout = pty.openpty()
master_stderr, slave_stderr = pty.openpty()
Process.set_nonblocking(master_stdout)
Process.set_nonblocking(master_stderr)
self.process = subprocess.Popen(args, bufsize=0, cwd=cwd, env=env,
stdin=stdin, stdout=slave_stdout, stderr=slave_stderr)
pass_to_stdout = sys.stdout.buffer if echo_stdout else None
pass_to_stderr = sys.stderr.buffer if echo_stderr else None
self._reader_stdout = self.reader(master_stdout, capture_stdout,
self.buffer_stdout, pass_to_stdout)
self._reader_stderr = self.reader(master_stderr, capture_stderr,
self.buffer_stderr, pass_to_stderr)
开发者ID:mplucinski,项目名称:builder,代码行数:33,代码来源:process.py
示例8: _create_pty_or_pipe
def _create_pty_or_pipe(copy_term_size=None):
"""
Try to create a pty and if then fails then create a normal
pipe instead.
@param copy_term_size: If a tty file descriptor is given
then the term size will be copied to the pty.
@type copy_term_size: int
@rtype: tuple
@returns: A tuple of (is_pty, master_fd, slave_fd) where
is_pty is True if a pty was successfully allocated, and
False if a normal pipe was allocated.
"""
got_pty = False
global _disable_openpty, _fbsd_test_pty
if _fbsd_test_pty and not _disable_openpty:
# Test for python openpty breakage after freebsd7 to freebsd8
# upgrade, which results in a 'Function not implemented' error
# and the process being killed.
pid = os.fork()
if pid == 0:
pty.openpty()
os._exit(os.EX_OK)
pid, status = os.waitpid(pid, 0)
if (status & 0xff) == 140:
_disable_openpty = True
_fbsd_test_pty = False
if _disable_openpty:
master_fd, slave_fd = os.pipe()
else:
try:
master_fd, slave_fd = pty.openpty()
got_pty = True
except EnvironmentError as e:
_disable_openpty = True
writemsg("openpty failed: '%s'\n" % str(e),
noiselevel=-1)
del e
master_fd, slave_fd = os.pipe()
if got_pty:
# Disable post-processing of output since otherwise weird
# things like \n -> \r\n transformations may occur.
mode = termios.tcgetattr(slave_fd)
mode[1] &= ~termios.OPOST
termios.tcsetattr(slave_fd, termios.TCSANOW, mode)
if got_pty and \
copy_term_size is not None and \
os.isatty(copy_term_size):
rows, columns = get_term_size()
set_term_size(rows, columns, slave_fd)
return (got_pty, master_fd, slave_fd)
开发者ID:zy-sunshine,项目名称:easymgc,代码行数:58,代码来源:_pty.py
示例9: _execute_process_pty
def _execute_process_pty(cmd, cwd, env, shell, stderr_to_stdout=True):
stdout_master, stdout_slave = None, None
stderr_master, stderr_slave = None, None
fds_to_close = [stdout_master, stdout_slave, stderr_master, stderr_slave]
try:
stdout_master, stdout_slave = pty.openpty()
if stderr_to_stdout:
stderr_master, stderr_slave = stdout_master, stdout_slave
else:
stderr_master, stderr_slave = pty.openpty()
p = None
while p is None:
try:
p = Popen(
cmd,
stdin=stdout_slave, stdout=stderr_slave, stderr=STDOUT,
cwd=cwd, env=env, shell=shell, close_fds=False)
except OSError as exc:
# This can happen if a file you are trying to execute is being
# written to simultaneously on Linux
# (doesn't appear to happen on OS X)
# It seems like the best strategy is to just try again later
# Worst case is that the file eventually gets deleted, then a
# different OSError would occur.
if 'Text file busy' in '{0}'.format(exc):
# This is a transient error, try again shortly
time.sleep(0.01)
continue
raise
# This causes the below select to exit when the subprocess closes.
# On Linux, this sometimes causes Errno 5 OSError's when os.read
# is called from within _yield_data, so on Linux _yield_data
# catches and passes on that particular OSError.
os.close(stdout_slave)
if not stderr_to_stdout:
os.close(stderr_slave)
left_overs = {stdout_master: b'', stderr_master: b''}
fds = [stdout_master]
if stderr_master != stdout_master:
fds.append(stderr_master)
finally:
# Make sure we don't leak file descriptors
_close_fds(fds_to_close)
# The linesep with pty's always seems to be "\r\n", even on OS X
return _yield_data(p, fds, left_overs, "\r\n", fds_to_close)
开发者ID:osrf,项目名称:osrf_pycommon,代码行数:49,代码来源:execute_process_pty.py
示例10: run
def run(self):
self.ioloop = IOLoop.instance()
(master_fd, slave_fd) = pty.openpty()
# make stdout, stderr non-blocking
fcntl.fcntl(master_fd, fcntl.F_SETFL,
fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
self.master_fd = master_fd
self.master = os.fdopen(master_fd)
# listen to stdout, stderr
self.ioloop.add_handler(master_fd, self._handle_subprocess_stdout,
self.ioloop.READ)
slave = os.fdopen(slave_fd)
self.kwargs["stdout"] = slave
self.kwargs["stderr"] = slave
self.kwargs["close_fds"] = True
self.pipe = subprocess.Popen(*self.args, **self.kwargs)
self.stdin = self.pipe.stdin
# check for process exit
self.wait_callback = PeriodicCallback(self._wait_for_end, 250)
self.wait_callback.start()
开发者ID:yipdw,项目名称:hyves-grab,代码行数:26,代码来源:pipeline.py
示例11: __init__
def __init__(self, cmd, logger):
self.thread = threading.Thread(target=TestProcess._run, args=(weakref.proxy(self),))
self.pty_master, self.pty_slave = pty.openpty()
self.logger = logger
self.process = None
self.traces = ""
self.cmd = cmd
开发者ID:jpellikk,项目名称:ebnlib,代码行数:7,代码来源:ftest.py
示例12: call_and_peek_output
def call_and_peek_output(cmd, shell=False):
import pty
import subprocess
master, slave = pty.openpty()
print cmd
p = subprocess.Popen(cmd, shell=shell, stdin=None, stdout=slave, close_fds=True)
os.close(slave)
line = ""
while True:
try:
ch = os.read(master, 1)
except OSError:
# We get this exception when the spawn process closes all references to the
# pty descriptor which we passed him to use for stdout
# (typically when it and its childs exit)
break
line += ch
if ch == '\n':
yield line
line = ""
if line:
yield line
ret = p.wait()
if ret:
raise subprocess.CalledProcessError(ret, cmd)
开发者ID:dpapadi,项目名称:multi_totem,代码行数:26,代码来源:get_flowspace.py
示例13: spawn
def spawn(self, argv=None, term=None):
if argv is None:
if "SHELL" in os.environ:
argv = [os.environ["SHELL"]]
elif "PATH" in os.environ: # searching sh in the path. It can be unusual like /system/bin/sh on android
for shell in ["bash", "sh", "ksh", "zsh", "csh", "ash"]:
for path in os.environ["PATH"].split(":"):
fullpath = os.path.join(path.strip(), shell)
if os.path.isfile(fullpath):
argv = [fullpath]
break
if argv:
break
if not argv:
argv = ["/bin/sh"]
if term is not None:
os.environ["TERM"] = term
master, slave = pty.openpty()
self.slave = slave
self.master = os.fdopen(master, "rb+wb", 0) # open file in an unbuffered mode
flags = fcntl.fcntl(self.master, fcntl.F_GETFL)
assert flags >= 0
flags = fcntl.fcntl(self.master, fcntl.F_SETFL, flags | os.O_NONBLOCK)
assert flags >= 0
self.prog = subprocess.Popen(
shell=False, args=argv, stdin=slave, stdout=slave, stderr=subprocess.STDOUT, preexec_fn=prepare
)
开发者ID:kai5263499,项目名称:pupy,代码行数:29,代码来源:ptyshell.py
示例14: __init_streams
def __init_streams(self, stdin, stdout, stderr, unbuffered):
self.stdin = self.stdout = self.stderr = None
if unbuffered:
master, slave = pty.openpty()
if stdin is PIPE:
self._child_stdin, self._stdin = (slave, master) if unbuffered else os.pipe()
self.stdin = os.fdopen(self._stdin, 'w')
elif isinstance(stdin, int):
self._child_stdin, self._stdin = stdin, -1
elif stdin is not None:
self._child_stdin, self._stdin = stdin.fileno(), -1
else:
self._child_stdin = self._stdin = -1
if stdout is PIPE:
self._stdout, self._child_stdout = (master, slave) if unbuffered else os.pipe()
self.stdout = os.fdopen(self._stdout, 'r')
elif isinstance(stdout, int):
self._stdout, self._child_stdout = -1, stdout
elif stdout is not None:
self._stdout, self._child_stdout = -1, stdout.fileno()
else:
self._stdout = self._child_stdout = -1
if stderr is PIPE:
self._stderr, self._child_stderr = os.pipe()
self.stderr = os.fdopen(self._stderr, 'r')
elif isinstance(stderr, int):
self._stderr, self._child_stderr = -1, stderr
elif stderr is not None:
self._stderr, self._child_stderr = -1, stderr.fileno()
else:
self._stderr = self._child_stderr = -1
开发者ID:rayeya,项目名称:judge,代码行数:35,代码来源:sandbox.py
示例15: _has_sudo
def _has_sudo(self, result):
_master, slave = pty.openpty()
os.setsid()
fcntl.ioctl(slave, termios.TIOCSCTTY, 0)
out, err, exit = run_command(['sudo', '-l', '-U', self.user[USER_NAME],
'sudo'])
if exit == 0:
debug("User %s is allowed to run sudo" % self.user[USER_NAME])
# sudo allows a wide range of configurations, such as controlling
# which binaries the user can execute with sudo.
# For now, we will just check whether the user is allowed to run
# any command with sudo.
out, err, exit = run_command(['sudo', '-l', '-U',
self.user[USER_NAME]])
for line in out.split('\n'):
if line and re.search("(ALL)", line):
result.value = 1
debug("User %s can run any command with sudo" %
result.value)
return
debug("User %s can only run some commands with sudo" %
self.user[USER_NAME])
else:
debug("User %s is not allowed to run sudo" % self.user[USER_NAME])
开发者ID:EmperorBeezie,项目名称:kimchi,代码行数:25,代码来源:auth.py
示例16: handle
def handle(self):
masterFd, slaveFd = pty.openpty()
try:
# if we're not in the main thread, this will not work.
signal.signal(signal.SIGTTOU, signal.SIG_IGN)
except:
pass
pid = os.fork()
if pid:
os.close(masterFd)
raise SocketConnected(slaveFd, pid)
# make parent process the pty slave - the opposite of
# pty.fork(). In this setup, the parent process continues
# to act normally, while the child process performs the
# logging. This makes it simple to kill the logging process
# when we are done with it and restore the parent process to
# normal, unlogged operation.
else:
os.close(slaveFd)
try:
protocol = TelnetServerProtocolHandler(self.request, masterFd)
protocol.handle()
finally:
os.close(masterFd)
os._exit(1)
开发者ID:fedora-conary,项目名称:conary,代码行数:26,代码来源:epdb_server.py
示例17: __enter__
def __enter__(self):
# prepare standard file descriptors for raw manipulation
self.was_blocking = os.get_blocking(0)
os.set_blocking(0, False)
try:
self.terminal_attr_stdin = termios.tcgetattr(0)
self.terminal_attr_stdout = termios.tcgetattr(1)
self.terminal_attr_stderr = termios.tcgetattr(2)
tty.setraw(0)
tty.setraw(1)
tty.setraw(2)
except termios.error: # probably redirected
self.terminal_attr_stdin = None
# redirect standard file descriptors to new PTY
master, slave = pty.openpty()
os.set_blocking(master, False)
self.real_stdin = os.dup(0)
self.real_stdout = os.dup(1)
self.real_stderr = os.dup(2)
os.close(0)
os.close(1)
os.close(2)
os.dup2(slave, 0)
os.dup2(slave, 1)
os.dup2(slave, 2)
os.close(slave)
self.terminal_pipe = master
# start REPL in separate thread
threading.Thread(target=repl, args=(self,), daemon=True).start()
return self
开发者ID:qsantos,项目名称:spyce,代码行数:33,代码来源:terminal.py
示例18: test_signal_failure
def test_signal_failure(monkeypatch):
import os
import pty
import signal
from pyrepl.unix_console import UnixConsole
def failing_signal(a, b):
raise ValueError
def really_failing_signal(a, b):
raise AssertionError
mfd, sfd = pty.openpty()
try:
with sane_term():
c = UnixConsole(sfd, sfd)
c.prepare()
c.restore()
monkeypatch.setattr(signal, 'signal', failing_signal)
c.prepare()
monkeypatch.setattr(signal, 'signal', really_failing_signal)
c.restore()
finally:
os.close(mfd)
os.close(sfd)
开发者ID:abhinavthomas,项目名称:pypy,代码行数:25,代码来源:test_bugs.py
示例19: lock
def lock(self):
"""Use conserver to lock the machine."""
# find out current status of console
debug.verbose('executing "console -i %s" to check state' %
self.get_machine_name())
proc = subprocess.Popen(["console", "-i", self.get_machine_name()],
stdout=subprocess.PIPE)
line = proc.communicate()[0]
assert(proc.returncode == 0)
# check that nobody else has it open for writing
myuser = getpass.getuser()
parts = line.strip().split(':')
conname, child, contype, details, users, state = parts[:6]
if users:
for userinfo in users.split(','):
mode, username, host, port = userinfo.split('@')[:4]
if 'w' in mode and username != myuser:
raise MachineLockedError # Machine is not free
# run a console in the background to 'hold' the lock and read output
debug.verbose('starting "console %s"' % self.get_machine_name())
# run on a PTY to work around terminal mangling code in console
(self.masterfd, slavefd) = pty.openpty()
self.lockprocess = subprocess.Popen(["console", self.get_machine_name()],
close_fds=True,
stdout=slavefd, stdin=slavefd)
os.close(slavefd)
# XXX: open in binary mode with no buffering
# otherwise select.select() may block when there is data in the buffer
self.console_out = os.fdopen(self.masterfd, 'rb', 0)
开发者ID:Karamax,项目名称:arrakis,代码行数:32,代码来源:uw.py
示例20: _node_ssh_output
def _node_ssh_output(args):
# ssh must run with stdin attached to a tty
master, slave = pty.openpty()
cmd = ('ssh-agent /bin/bash -c ' +
'"ssh-add /host-home/.vagrant.d/insecure_private_key ' +
'2> /dev/null && dcos node ssh --option StrictHostKeyChecking=no' +
' {}"').format(' '.join(args))
proc = subprocess.Popen(cmd,
stdin=slave,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
close_fds=True,
shell=True)
os.close(slave)
# wait for the ssh connection
time.sleep(8)
# kill the whole process group
os.killpg(os.getpgid(proc.pid), 15)
os.close(master)
return proc.communicate()
开发者ID:gismoranas,项目名称:dcos-cli,代码行数:25,代码来源:test_node.py
注:本文中的pty.openpty函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论