本文整理汇总了Python中pty.fork函数的典型用法代码示例。如果您正苦于以下问题:Python fork函数的具体用法?Python fork怎么用?Python fork使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了fork函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, domain, historyLimit=256, historySaveAll=True, historySaveCmds=False, cLimit=131072):
"""
Parameters:
historyLimit: specifies how many lines of history are maintained
historySaveAll: determines whether or not extra messages in
between commands are saved
historySaveCmds : determines whether or not the command echos
are included in the history buffer
"""
self.TIMEOUT = 30
self.PROMPT = "@%@%> "
self.domain = domain
self.historyBuffer = []
self.historyLines = 0
self.historyLimit = historyLimit
self.historySaveAll = historySaveAll
self.historySaveCmds = historySaveCmds
self.debugMe = False
self.limit = cLimit
consoleCmd = ["/usr/sbin/xm", "xm", "console", domain]
if verbose:
print "Console executing: %s" % str(consoleCmd)
pid, fd = pty.fork()
if pid == 0:
os.execvp("/usr/sbin/xm", consoleCmd[1:])
self.consolePid = pid
self.consoleFd = fd
tty.setraw(self.consoleFd, termios.TCSANOW)
开发者ID:avsm,项目名称:xen-1,代码行数:35,代码来源:Console.py
示例2: _connectSSHLocal
def _connectSSHLocal(self, hostname, username, passwd):
sshcmd = [self.ssh_bin, '%[email protected]%s' % (username, hostname)]
if self.ssh_extraopt:
sshcmd += self.ssh_extraopt.split()
self.addStringToClipboard(passwd)
pid, self.remote_fd = pty.fork()
if pid == pty.CHILD:
os.execlp(sshcmd[0], *sshcmd)
try:
mode = tty.tcgetattr(pty.STDIN_FILENO)
tty.setraw(pty.STDIN_FILENO)
restore = True
except tty.error:
restore = False
signal.signal(signal.SIGWINCH, self._winchHandler)
self._setRemoteTTYSize(self.remote_fd)
self._setTerminalTitle('%[email protected]%s' % (username, hostname))
try:
self._copySSHData(self.remote_fd, passwd)
except (IOError, OSError):
pass
except:
if restore:
tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, mode)
raise
if restore:
tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, mode)
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
os.close(self.remote_fd)
开发者ID:sii,项目名称:siptrack,代码行数:29,代码来源:cmdconnect.py
示例3: main
def main ():
signal.signal (signal.SIGCHLD, signal_handler)
pid, fd = pty.fork()
if pid == 0:
os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
time.sleep(10000)
nonblock (fd)
tty.setraw(fd) #STDIN_FILENO)
print 'Sending SIGKILL to child pid:', pid
time.sleep(2)
os.kill (pid, signal.SIGKILL)
print 'Entering to sleep...'
try:
time.sleep(2)
except:
print 'Sleep interrupted'
try:
os.kill(pid, 0)
print '\tChild is alive. This is ambiguous because it may be a Zombie.'
except OSError as e:
print '\tChild appears to be dead.'
# print str(e)
print
print 'Reading from master fd:', os.read (fd, 1000)
开发者ID:AmineCherrai,项目名称:pexpect,代码行数:25,代码来源:check_handler.py
示例4: test_inkey_0s_cbreak_multibyte_utf8
def test_inkey_0s_cbreak_multibyte_utf8():
"0-second inkey with multibyte utf-8 input; should decode immediately."
# utf-8 bytes represent "latin capital letter upsilon".
pid, master_fd = pty.fork()
if pid is 0: # child
try:
cov = __import__('cov_core_init').init()
except ImportError:
cov = None
term = TestTerminal()
read_until_semaphore(sys.__stdin__.fileno(), semaphore=SEMAPHORE)
os.write(sys.__stdout__.fileno(), SEMAPHORE)
with term.cbreak():
inp = term.inkey(timeout=0)
os.write(sys.__stdout__.fileno(), inp.encode('utf-8'))
if cov is not None:
cov.stop()
cov.save()
os._exit(0)
with echo_off(master_fd):
os.write(master_fd, SEND_SEMAPHORE)
os.write(master_fd, u'\u01b1'.encode('utf-8'))
read_until_semaphore(master_fd)
stime = time.time()
output = read_until_eof(master_fd)
pid, status = os.waitpid(pid, 0)
assert output == u'Ʊ'
assert os.WEXITSTATUS(status) == 0
assert math.floor(time.time() - stime) == 0.0
开发者ID:0x37N0w4N,项目名称:MARA_Framework,代码行数:30,代码来源:test_keyboard.py
示例5: test_esc_delay_cbreak_timout_0
def test_esc_delay_cbreak_timout_0():
"""esc_delay still in effect with timeout of 0 ("nonblocking")."""
pid, master_fd = pty.fork()
if pid is 0: # child
try:
cov = __import__('cov_core_init').init()
except ImportError:
cov = None
term = TestTerminal()
os.write(sys.__stdout__.fileno(), SEMAPHORE)
with term.cbreak():
stime = time.time()
inp = term.inkey(timeout=0)
measured_time = (time.time() - stime) * 100
os.write(sys.__stdout__.fileno(), (
'%s %i' % (inp.name, measured_time,)).encode('ascii'))
sys.stdout.flush()
if cov is not None:
cov.stop()
cov.save()
os._exit(0)
with echo_off(master_fd):
os.write(master_fd, u'\x1b'.encode('ascii'))
read_until_semaphore(master_fd)
stime = time.time()
key_name, duration_ms = read_until_eof(master_fd).split()
pid, status = os.waitpid(pid, 0)
assert key_name == u'KEY_ESCAPE'
assert os.WEXITSTATUS(status) == 0
assert math.floor(time.time() - stime) == 0.0
assert 35 <= int(duration_ms) <= 45, int(duration_ms)
开发者ID:0x37N0w4N,项目名称:MARA_Framework,代码行数:33,代码来源:test_keyboard.py
示例6: start
def start(self, command):
if self.using_pty:
if pty is None: # Encountered ImportError
sys.exit("You indicated pty=True, but your platform doesn't support the 'pty' module!") # noqa
self.pid, self.parent_fd = pty.fork()
# If we're the child process, load up the actual command in a
# shell, just as subprocess does; this replaces our process - whose
# pipes are all hooked up to the PTY - with the "real" one.
if self.pid == 0:
# Use execv for bare-minimum "exec w/ variable # args"
# behavior. No need for the 'p' (use PATH to find executable)
# or 'e' (define a custom/overridden shell env) variants, for
# now.
# TODO: use /bin/sh or whatever subprocess does. Only using
# bash for now because that's what we have been testing
# against.
# TODO: also see if subprocess is using equivalent of execvp...
# TODO: both pty.spawn() and pexpect.spawn() do a lot of
# setup/teardown involving tty.*, setwinsize, getrlimit,
# signal. Ostensibly we'll want some of that eventually, but if
# possible write tests - integration-level if necessary -
# before adding it!
os.execv('/bin/bash', ['/bin/bash', '-c', command])
else:
self.process = Popen(
command,
shell=True,
stdout=PIPE,
stderr=PIPE,
)
开发者ID:shenkan,项目名称:invoke,代码行数:30,代码来源:runners.py
示例7: run_bg
def run_bg(cmd, debug=False, cwd=''):
'''
run_bg(cmd)
Works the same as ``ave.cmd.run()``, except that it is non-blocking.
:returns: A *(PID, file descriptor)* tuple. The file descriptor is attached
to the new process' pseudoterminal and will carry all messages written
to ``stdout`` and ``stderr``.
.. note:: The caller *must* eventually use ``os.wait()`` or one of its
variations on the PID and ``os.close()`` on the file descriptor. Failing
to perform these cleanups will lead to defunct processes and/or running
out of pseudoterminals.
'''
# make sure 'cmd' is a list of strings
if type(cmd) in [str, unicode]:
cmd = [c for c in cmd.split() if c != '']
if debug:
sys.stderr.write(' '.join(cmd)+'\n')
sys.stderr.flush()
try:
( child_pid, child_fd ) = pty.fork()
except OSError as e:
raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e))
if child_pid == 0:
try:
if cwd != '':
os.chdir(cwd)
os.execvp(cmd[0], cmd)
except Exception, e:
raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e))
开发者ID:derek-xu,项目名称:UiAutomatorController,代码行数:33,代码来源:cmd.py
示例8: __init__
def __init__(self, hostname):
self.pid, fd = pty.fork()
if self.pid == 0:
# Child
self.launch_ssh(hostname)
sys.exit(1)
# Parent
buffered_dispatcher.__init__(self, fd)
self.temporary = False
self.hostname = hostname
self.debug = options.debug
self.enabled = True # shells can be enabled and disabled
self.state = STATE_NOT_STARTED
self.term_size = (-1, -1)
self.display_name = None
self.change_name(hostname)
self.init_string = self.configure_tty() + self.set_prompt()
self.init_string_sent = False
self.read_in_state_not_started = ''
self.command = options.command
self.last_printed_line = ''
if sys.stdout.isatty() and not options.disable_color:
COLORS.insert(0, COLORS.pop()) # Rotate the colors
self.color_code = COLORS[0]
else:
self.color_code = None
开发者ID:daniyalzade,项目名称:polysh,代码行数:27,代码来源:remote_dispatcher.py
示例9: spawn
def spawn(self, argv=None):
'''
Create a spawned process.
Based on the code for pty.spawn().
'''
assert self.master_fd is None
if not argv:
argv = [os.environ['SHELL']]
pid, master_fd = pty.fork()
self.master_fd = master_fd
if pid == pty.CHILD:
os.execlp(argv[0], *argv)
old_handler = signal.signal(signal.SIGWINCH, self._signal_winch)
try:
mode = tty.tcgetattr(pty.STDIN_FILENO)
tty.setraw(pty.STDIN_FILENO)
restore = 1
except tty.error: # This is the same as termios.error
restore = 0
self._init_fd()
try:
self._copy()
except (IOError, OSError):
if restore:
tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, mode)
os.close(master_fd)
self.master_fd = None
signal.signal(signal.SIGWINCH, old_handler)
self._set_pty_size()
开发者ID:mpobrien,项目名称:teaminal,代码行数:32,代码来源:ptyintercept.py
示例10: set_rsa
def set_rsa(host, rsa_pub, user, password):
"""
logs into system via ssh
and appends to authorized_keys using username password
:param host: name over the server
:param rsa_pub: absolute path to your id_rsa.pub
:param user: host login creds
:param password: host login creds
:param home_dir: home directory for user
"""
output = None
with open(rsa_pub, 'r') as file:
output = file.read()
cmd = '/bin/mkdir -p /root/.ssh && echo "%s" >>'
' /root/.ssh/authorized_keys &&'
' /bin/chmod -R 600 /root/.ssh' % (output.strip())
pid, fd = pty.fork()
if pid == 0:
os.execvp("/usr/bin/ssh", ["ssh",
user+'@'+host,
'-o',
'NumberOfPasswordPrompts=1',
'-o',
'StrictHostKeyChecking=no',
'-o',
'UserKnownHostsFile=/dev/null',
cmd])
elif pid > 0:
searches = {'password': password, 'continue connecting': 'yes'}
if event(fd, searches) == 'continue connecting':
event(fd, searches)
os.wait4(pid, 0)
os.close(fd)
开发者ID:itow0001,项目名称:goephor,代码行数:34,代码来源:terminal.py
示例11: oldstat
def oldstat(pid, events, t, ann=None, norm=False, guest=False):
evcmd = ",".join(events)
if guest:
cmd = "sudo perf kvm stat"
else:
cmd = "sudo perf stat"
cmd += " -e {events} --log-fd 1 -x, -p {pid}".format(events=evcmd, pid=pid)
pid, fd = pty.fork()
if pid == 0:
osexec(cmd)
# fcntl.ioctl(fd, termios.TIOCSWINSZ, struct.pack("hhhh", 24, 80, 0, 0)) # resise terminal
# disable echo
flags = termios.tcgetattr(fd)
flags[3] &= ~termios.ECHO
termios.tcsetattr(fd, termios.TCSADRAIN, flags)
time.sleep(t)
ctrl_c = termios.tcgetattr(fd)[-1][termios.VINTR] # get Ctrl+C character
os.write(fd, ctrl_c)
os.waitpid(pid, 0)
raw = b""
while True:
try:
chunk = os.read(fd, BUF_SIZE)
except OSError:
break
if not chunk:
break
raw += chunk
return PerfData(raw, ann=ann, norm=norm)
开发者ID:kopchik,项目名称:perftest,代码行数:31,代码来源:perftool.py
示例12: run
def run(self):
command = copy(self.command)
for control in self.option_controls:
command += control.get_options()
# Start Process
self.pid, self.stdout = pty.fork()
if self.pid == 0: # Child
# Close open file descriptors
# This tidbit taken from pexpect
max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
for i in xrange(3, max_fd):
try:
os.close(i)
except OSError:
pass
os.chdir(self.directory)
os.execvp(command[0], command)
raise OSError("os.exec failed!")
self.app_tree.SetItemBold(self.tree_item, True)
self.run_button.SetLabel("Stop")
self.output_box.SetValue(self.short_directory+" $ "+command_to_string(command)+"\n")
开发者ID:mrwiggin,项目名称:seawolf5,代码行数:27,代码来源:run.py
示例13: start_client
def start_client(session, cfg):
master_fd = None
try:
shell = os.environ['SHELL']
if not shell:
shell = ['/bin/bash', '-i', '-l']
if cfg.config and 'pty-config' in cfg.config and 'default-shell' in cfg.config['pty-config']:
shell = cfg.config['pty-config']['default-shell']
else:
shell = [shell]
pid, master_fd = pty.fork()
if pid == pty.CHILD:
os.execlp(shell[0], *shell)
session.interactive_shell(master_fd)
except:
logging.getLogger('pty_client').exception('pty client caught exception:')
try:
if master_fd:
os.close(master_fd)
except:
pass
开发者ID:stonewell,项目名称:pymterm,代码行数:27,代码来源:pty_client.py
示例14: test_esc_delay_cbreak_prefix_sequence
def test_esc_delay_cbreak_prefix_sequence():
"An unfinished multibyte sequence (\\x1b[) will delay an ESC by .35 "
pid, master_fd = pty.fork()
if pid is 0: # child
cov = init_subproc_coverage('test_esc_delay_cbreak_prefix_sequence')
term = TestTerminal()
os.write(sys.__stdout__.fileno(), SEMAPHORE)
with term.cbreak():
stime = time.time()
esc = term.inkey(timeout=5)
inp = term.inkey(timeout=5)
measured_time = (time.time() - stime) * 100
os.write(sys.__stdout__.fileno(), (
'%s %s %i' % (esc.name, inp, measured_time,)).encode('ascii'))
sys.stdout.flush()
if cov is not None:
cov.stop()
cov.save()
os._exit(0)
with echo_off(master_fd):
read_until_semaphore(master_fd)
stime = time.time()
os.write(master_fd, u'\x1b['.encode('ascii'))
key1_name, key2, duration_ms = read_until_eof(master_fd).split()
pid, status = os.waitpid(pid, 0)
assert key1_name == u'KEY_ESCAPE'
assert key2 == u'['
assert os.WEXITSTATUS(status) == 0
assert math.floor(time.time() - stime) == 0.0
assert 34 <= int(duration_ms) <= 45, duration_ms
开发者ID:cjfuller,项目名称:blessed,代码行数:32,代码来源:test_keyboard.py
示例15: spawnTTY
def spawnTTY(self, command, args = [], environment = None):
try:
self._pid, self._child_fd = pty.fork()
# Returning from master thread with the file decorator
if self._pid != 0:
self._stdout_fd = self._child_fd
self._stdin_fd = self._child_fd
return True
except OSError as e:
raise CommunicationOSException("Unable to fork a PTY.")
# Child thread in new pseudo-terminal
self._child_fd = sys.stdout.fileno()
# Closes files inherited from parent process
fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
for i in range (3, fd):
try:
os.close (i)
except OSError:
pass
self._stdin_fd = fd
self._stdout_fd = fd
# Spawning process
if environment is None:
os.execv(command,[y for y in filter(lambda x: x!="", [command,] + args)])
else:
os.execvpe(command,[y for y in filter(lambda x: x!="", [command,] + args)], environment)
开发者ID:troelsfr,项目名称:BatchQ,代码行数:31,代码来源:process.py
示例16: test_inkey_0s_raw_ctrl_c
def test_inkey_0s_raw_ctrl_c():
"0-second inkey with raw allows receiving ^C."
pid, master_fd = pty.fork()
if pid is 0: # child
try:
cov = __import__('cov_core_init').init()
except ImportError:
cov = None
term = TestTerminal()
read_until_semaphore(sys.__stdin__.fileno(), semaphore=SEMAPHORE)
with term.raw():
os.write(sys.__stdout__.fileno(), RECV_SEMAPHORE)
inp = term.inkey(timeout=0)
os.write(sys.__stdout__.fileno(), inp.encode('latin1'))
if cov is not None:
cov.stop()
cov.save()
os._exit(0)
with echo_off(master_fd):
os.write(master_fd, SEND_SEMAPHORE)
# ensure child is in raw mode before sending ^C,
read_until_semaphore(master_fd)
os.write(master_fd, u'\x03'.encode('latin1'))
stime = time.time()
output = read_until_eof(master_fd)
pid, status = os.waitpid(pid, 0)
assert (output == u'\x03' or
output == u'' and not os.isatty(0))
assert os.WEXITSTATUS(status) == 0
assert math.floor(time.time() - stime) == 0.0
开发者ID:NujjLTD,项目名称:nujjWebsiteNew,代码行数:31,代码来源:test_keyboard.py
示例17: _setupPty
def _setupPty(self):
(pid, master_fd)= pty.fork()
self.stdout = master_fd
self.stdin = master_fd
self.stdout = master_fd
if pid == 0:
os.execv("/bin/bash",["/bin/bash","-l"])
开发者ID:467754239,项目名称:python,代码行数:7,代码来源:ppty.py
示例18: _run_job_in_hadoop
def _run_job_in_hadoop(self):
self._counters = []
for step_num in range(self._num_steps()):
log.debug("running step %d of %d" % (step_num + 1, self._num_steps()))
step_args = self._args_for_step(step_num)
log.debug("> %s" % cmd_line(step_args))
# try to use a PTY if it's available
try:
pid, master_fd = pty.fork()
except (AttributeError, OSError):
# no PTYs, just use Popen
step_proc = Popen(step_args, stdout=PIPE, stderr=PIPE)
self._process_stderr_from_streaming(step_proc.stderr)
# there shouldn't be much output to STDOUT
for line in step_proc.stdout:
log.error("STDOUT: " + to_string(line.strip(b"\n")))
returncode = step_proc.wait()
else:
# we have PTYs
if pid == 0: # we are the child process
os.execvp(step_args[0], step_args)
else:
with os.fdopen(master_fd, "rb") as master:
# reading from master gives us the subprocess's
# stderr and stdout (it's a fake terminal)
self._process_stderr_from_streaming(master)
_, returncode = os.waitpid(pid, 0)
if returncode == 0:
# parsing needs step number for whole job
self._fetch_counters([step_num + self._start_step_num])
# printing needs step number relevant to this run of mrjob
self.print_counters([step_num + 1])
else:
msg = "Job failed with return code %d: %s" % (returncode, step_args)
log.error(msg)
# look for a Python traceback
cause = self._find_probable_cause_of_failure([step_num + self._start_step_num])
if cause:
# log cause, and put it in exception
cause_msg = [] # lines to log and put in exception
cause_msg.append("Probable cause of failure (from %s):" % cause["log_file_uri"])
cause_msg.extend(line.strip("\n") for line in cause["lines"])
if cause["input_uri"]:
cause_msg.append("(while reading from %s)" % cause["input_uri"])
for line in cause_msg:
log.error(line)
# add cause_msg to exception message
msg += "\n" + "\n".join(cause_msg) + "\n"
raise CalledProcessError(returncode, step_args)
开发者ID:senseb,项目名称:mrjob,代码行数:60,代码来源:hadoop.py
示例19: start
def start(self):
"""
Start the Vim instance if it is not already running.
This command forks in a pseudoterminal, and starts Vim, if Vim is not
already running. The pid is stored for later use.
"""
if not self.pid:
# Get the console vim executable path
#command = self.prop_main_registry.commands.vim.value()
command = 'gvim'
# Fork using pty.fork to prevent Vim taking the terminal
sock = gtk.Socket()
w = gtk.Window()
w.realize()
w.add(sock)
xid = sock.get_id()
pid, fd = pty.fork()
if pid == 0:
# Child, execute Vim with the correct servername argument
os.execvp(command, ['gvim', '-f', '--servername', self.name,
'--socketid', '%s' % xid])
#'-v'])
# os.system('%s -v --servername %s' % (command, self.name))
else:
# Parent, store the pid, and file descriptor for later.
self.pid = pid
self.childfd = fd
开发者ID:KhiemNguyenT,项目名称:coccinelle,代码行数:28,代码来源:vimcom.py
示例20: test_inkey_0s_cbreak_input
def test_inkey_0s_cbreak_input():
"0-second inkey with input; Keypress should be immediately returned."
pid, master_fd = pty.fork()
if pid is 0:
try:
cov = __import__('cov_core_init').init()
except ImportError:
cov = None
# child pauses, writes semaphore and begins awaiting input
term = TestTerminal()
read_until_semaphore(sys.__stdin__.fileno(), semaphore=SEMAPHORE)
os.write(sys.__stdout__.fileno(), SEMAPHORE)
with term.cbreak():
inp = term.inkey(timeout=0)
os.write(sys.__stdout__.fileno(), inp.encode('utf-8'))
if cov is not None:
cov.stop()
cov.save()
os._exit(0)
with echo_off(master_fd):
os.write(master_fd, SEND_SEMAPHORE)
os.write(master_fd, u'x'.encode('ascii'))
read_until_semaphore(master_fd)
stime = time.time()
output = read_until_eof(master_fd)
pid, status = os.waitpid(pid, 0)
assert output == u'x'
assert os.WEXITSTATUS(status) == 0
assert math.floor(time.time() - stime) == 0.0
开发者ID:0x37N0w4N,项目名称:MARA_Framework,代码行数:31,代码来源:test_keyboard.py
注:本文中的pty.fork函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论