本文整理汇总了Python中psutil.Process类的典型用法代码示例。如果您正苦于以下问题:Python Process类的具体用法?Python Process怎么用?Python Process使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Process类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_infinite_loop
def test_infinite_loop(self):
# Given
user_answer = ("#!/bin/bash\nwhile [ 1 ] ;"
" do echo "" > /dev/null ; done")
kwargs = {
'metadata': {
'user_answer': user_answer,
'file_paths': self.file_paths,
'partial_grading': False,
'language': 'bash'
},
'test_case_data': self.test_case_data,
}
# When
grader = Grader(self.in_dir)
result = grader.evaluate(kwargs)
# Then
self.assertFalse(result.get("success"))
self.assert_correct_output(self.timeout_msg,
result.get("error")[0]["message"]
)
parent_proc = Process(os.getpid()).children()
if parent_proc:
children_procs = Process(parent_proc[0].pid)
self.assertFalse(any(children_procs.children(recursive=True)))
开发者ID:prathamesh920,项目名称:online_test,代码行数:27,代码来源:test_bash_evaluation.py
示例2: get_soledad_server_pid
def get_soledad_server_pid():
output = check_output(['pidof', 'python'])
for pid in output.split():
proc = Process(int(pid))
cmdline = proc.cmdline()
if args.issubset(set(cmdline)):
return int(pid)
开发者ID:leapcode,项目名称:soledad,代码行数:7,代码来源:utils.py
示例3: main
def main(argv):
p = Popen(argv[1:])
try:
client = ExitCodeClient()
try:
proc = Process(pid=p.pid)
while p.poll() is None:
total_cpu_percent = proc.get_cpu_percent(interval=0)
for child_proc in proc.get_children(recursive=True):
total_cpu_percent += child_proc.get_cpu_percent(interval=0)
client.send_status(total_cpu_percent)
time.sleep(0.1) # recommended waiting period from psutil docs
except NoSuchProcess:
pass
except KeyboardInterrupt:
try:
p.terminate()
except OSError:
pass
client.send_status(LEDMode.Success if p.returncode == 0 else LEDMode.Error)
time.sleep(0.5) # Give server time to read value before connection closes
client.shutdown()
return p.returncode
开发者ID:logiconcepts819,项目名称:program-status-server,代码行数:26,代码来源:main.py
示例4: test_infinite_loop
def test_infinite_loop(self):
# Given
user_answer = ("class Test {\n\tint square_num(int a)"
" {\n\t\twhile(0==0){\n\t\t}\n\t}\n}")
kwargs = {
'metadata': {
'user_answer': user_answer,
'file_paths': self.file_paths,
'partial_grading': False,
'language': 'java'
}, 'test_case_data': self.test_case_data,
}
# When
grader = Grader(self.in_dir)
result = grader.evaluate(kwargs)
# Then
self.assertFalse(result.get("success"))
self.assert_correct_output(self.timeout_msg,
result.get("error")[0]["message"]
)
parent_proc = Process(os.getpid()).children()
if parent_proc:
children_procs = Process(parent_proc[0].pid)
self.assertFalse(any(children_procs.children(recursive=True)))
开发者ID:FOSSEE,项目名称:online_test,代码行数:26,代码来源:test_java_evaluation.py
示例5: test_infinite_loop
def test_infinite_loop(self):
# Given
user_answer = dedent("""
#include<stdio.h>
int main(void){
while(0==0){
printf("abc");}
}""")
kwargs = {
'metadata': {
'user_answer': user_answer,
'file_paths': self.file_paths,
'partial_grading': False,
'language': 'cpp'
}, 'test_case_data': self.test_case_data,
}
# When
grader = Grader(self.in_dir)
result = grader.evaluate(kwargs)
# Then
self.assertFalse(result.get("success"))
self.assert_correct_output(self.timeout_msg,
result.get("error")[0]["message"]
)
parent_proc = Process(os.getpid()).children()
if parent_proc:
children_procs = Process(parent_proc[0].pid)
self.assertFalse(any(children_procs.children(recursive=True)))
开发者ID:FOSSEE,项目名称:online_test,代码行数:30,代码来源:test_c_cpp_evaluation.py
示例6: end_broker_process
def end_broker_process(self):
try:
broker_process = Process(self.broker.pid)
except NoSuchProcess:
return # was killed
# get stdout and stderr
select_config = [self.broker.stdout, self.broker.stderr], [], [], 0.1
stdout, stderr = [], []
result = select.select(*select_config)
while any(result):
if result[0]:
stdout.append(result[0][0].readline())
if result[1]:
stderr.append(result[1][0].readline())
result = select.select(*select_config)
if stdout and DEBUG_STDOUT:
_print_debug('STDOUT', ''.join(stdout))
if stderr and DEBUG_STDERR:
_print_debug('STDERR', ''.join(stderr))
# kill main process and its children
children = [process.pid for process in broker_process.get_children()]
_kill(self.broker.pid, timeout=TIMEOUT / 1000.0)
for child_pid in children:
_kill(child_pid, timeout=TIMEOUT / 1000.0)
开发者ID:andrebco,项目名称:pypelinin,代码行数:25,代码来源:test_broker.py
示例7: done
def done(self):
try:
myself = Process(os.getpid())
for child in myself.get_children():
child.kill()
except Exception as e:
sys.stderr.write(str(e) + '\n')
开发者ID:naiteluode,项目名称:ftasomc,代码行数:7,代码来源:models.py
示例8: click
def click(x, notify=False, pid=None, pids=None, webdriver=None, window_name=None, debug=False):
if debug:
print("[beryl] starting click")
print("\tpid: " + str(pid))
print("\twebdriver: " + str(webdriver))
print("\twindow_name: " + str(window_name))
print("\tstr(type(webdriver)): " + str(type(webdriver)))
type_as_string = str(type(x))
webdriver_type_as_string = str(type(webdriver))
if webdriver_type_as_string == "<class 'selenium.webdriver.firefox.webdriver.WebDriver'>":
pids = [webdriver.binary.process.pid]
elif webdriver_type_as_string == "<class 'selenium.webdriver.chrome.webdriver.WebDriver'>":
process = Process(webdriver.service.process.pid)
if hasattr(process, "children"):
pids = [p.pid for p in process.children()]
elif hasattr(process, "get_children"):
pids = [p.pid for p in process.get_children()]
if isinstance(x, str) or isinstance(x, unicode):
if x.endswith(".png") or x.endswith(".jpg"):
click_image(x, notify=notify)
else:
click_text(x, notify=notify, pids=pids, window_name=window_name, debug=debug)
elif isinstance(x, PngImageFile):
click_image(x,notify=notify)
elif isinstance(x, tuple):
click_location(x,notify=notify)
开发者ID:DanielJDufour,项目名称:breeze,代码行数:31,代码来源:__init__.py
示例9: trace_memory_usage
def trace_memory_usage(self, frame, event, arg):
"""Callback for sys.settrace
Args:
frame: frame is the current stack frame
event: event is a string: 'call', 'line', 'return', 'exception', 'c_call', 'c_return', or 'c_exception'
arg: arg depends on the event type.
Returns:
function: wrap_func
"""
if event in ('call', 'line', 'return') and frame.f_code in self.code_map:
if event != 'call':
# "call" event just saves the lineno but not the memory
process = Process(getpid())
mem = process.memory_info()[0] / float(2 ** 20)
# if there is already a measurement for that line get the max
old_mem = self.code_map[frame.f_code].get(self.prevline, 0)
self.code_map[frame.f_code][self.prevline] = max(mem, old_mem)
self.prevline = frame.f_lineno
if self._original_trace_function is not None:
self._original_trace_function(frame, event, arg)
return self.trace_memory_usage
开发者ID:peter1000,项目名称:SpeedIT,代码行数:25,代码来源:LineMemoryProfileIT.py
示例10: test_cleanup_children_on_terminate
def test_cleanup_children_on_terminate(self):
"""
Subprocesses spawned by tasks should be terminated on terminate
"""
class HangingSubprocessTask(luigi.Task):
def run(self):
python = sys.executable
check_call([python, '-c', 'while True: pass'])
task = HangingSubprocessTask()
queue = mock.Mock()
worker_id = 1
task_process = TaskProcess(task, worker_id, queue, lambda: None, lambda: None)
task_process.start()
parent = Process(task_process.pid)
while not parent.children():
# wait for child process to startup
sleep(0.01)
[child] = parent.children()
task_process.terminate()
child.wait(timeout=1.0) # wait for terminate to complete
self.assertFalse(parent.is_running())
self.assertFalse(child.is_running())
开发者ID:01-,项目名称:luigi,代码行数:27,代码来源:worker_task_test.py
示例11: get_procinfo_by_address
def get_procinfo_by_address(ip_src, ip_dst, port_src=None, port_dst=None):
"""
Gets Infos about the Prozess associated with the given address information
Both port_src and port_dst must be not None or None at the same time.
return -- [pid, "/path/to/command", "user", "hash"] or []
"""
result = []
if port_src is not None:
pids = [c.pid for c in net_connections()
if len(c.raddr) != 0 and c.pid is not None and
(c.laddr[0], c.raddr[0], c.laddr[1], c.raddr[1]) == (ip_src, ip_dst, port_src, port_dst)]
else:
pids = [c.pid for c in net_connections()
if len(c.raddr) != 0 and c.pid is not None and (c.laddr[0], c.raddr[0]) == (ip_src, ip_dst)]
try:
if len(pids) > 1:
logger.warning("more than 1 matching process: %r", pids)
proc = Process(pids[0])
cmd = [pids[0], proc.cmdline(), proc.username()]
hash_input = "%d%s%s" % (cmd[0], cmd[1], cmd[2])
procinfo_hash = hashlib.sha256(hash_input.encode("UTF-8"))
cmd.append(procinfo_hash)
logger.debug("process info: %r", cmd)
return cmd
except IndexError:
pass
return []
开发者ID:mike01,项目名称:ifi,代码行数:30,代码来源:procinfo.py
示例12: ppid_cascade
def ppid_cascade(process=None):
if process is None:
process = Process()
process = process.parent()
while process:
yield process.pid
process = process.parent()
开发者ID:phillipberndt,项目名称:scripts,代码行数:7,代码来源:pskill.py
示例13: _get_shell_pid
def _get_shell_pid():
"""Returns parent process pid."""
proc = Process(os.getpid())
try:
return proc.parent().pid
except TypeError:
return proc.parent.pid
开发者ID:Googulator,项目名称:thefuck,代码行数:8,代码来源:not_configured.py
示例14: worker
def worker(self, obj):
if self.active_workers:
for node, active_workers in self.active_workers.iteritems():
for worker in active_workers:
if worker['id'] == obj.task_id:
p = Process(worker['worker_pid'])
return 'CPU:%.1f%% RAM:%.2f%%' % (p.cpu_percent(0.05), p.memory_percent())
return 'N/a'
开发者ID:silverfix,项目名称:django-accio,代码行数:8,代码来源:admin.py
示例15: run
def run(self):
self._isRunning = True
process = Process(os.getpid())
while self._isRunning:
self.cpuUsage = process.get_cpu_percent(self._pollInterval)
self.memUsage = process.get_memory_info()[0]
if self.delegate is not None:
self.delegate.processCpuUsage(self.cpuUsage)
self.delegate.processMemUsage(self.memUsage)
开发者ID:teragonaudio,项目名称:Lucidity,代码行数:9,代码来源:performance.py
示例16: test_valid_is_running
def test_valid_is_running(self):
p = Process()
_, f = mkstemp()
try:
with open(f, 'w') as pid_file:
pid_file.write('{0} {1:6f}'.format(p.pid, p.create_time()))
self.assertTrue(_is_running(f))
finally:
unlink(f)
开发者ID:Prince88,项目名称:highlander,代码行数:9,代码来源:highlander_tests.py
示例17: test_valid_is_running
def test_valid_is_running(self):
p = Process()
d = mkdtemp()
f = _get_pid_filename(d)
try:
with open(f, 'w') as pid_file:
pid_file.write('{0} {1:6f}'.format(p.pid, p.create_time()))
self.assertTrue(_is_running(d))
finally:
rmtree(d)
开发者ID:linzhonghong,项目名称:highlander,代码行数:10,代码来源:highlander_tests.py
示例18: _set_running
def _set_running(filename):
"""Write the current process information to disk.
:param filename: The name of the file where the process information will be written.
"""
if isfile(str(filename)):
raise PidFileExistsError()
p = Process()
with open(filename, 'w') as f:
f.write('{0} {1:.6f}'.format(p.pid, p.create_time()))
开发者ID:Prince88,项目名称:highlander,代码行数:10,代码来源:highlander.py
示例19: common_option
def common_option(pid):
''' common options to be used in both restore and dump '''
options = list()
proc = Process(pid)
if proc.terminal():
options.append("--shell-job")
if proc.connections(kind="inet"):
options.append("--tcp-established")
return options
开发者ID:shivika04,项目名称:pmigrate,代码行数:11,代码来源:proc_migrate.py
示例20: testFDTDServiceOpenFiles
def testFDTDServiceOpenFiles():
"""
#41 - Too many open files (fdtd side)
"""
hostName = os.uname()[1]
f = getTempFile(functionalFDTDConfiguration)
inputOption = "--config=%s" % f.name
conf = ConfigFDTD(inputOption.split())
conf.sanitize()
testName = inspect.stack()[0][3]
logger = Logger(name=testName, logFile="/tmp/fdtdtest-%s.log" % testName, level=logging.DEBUG)
apMon = None
fdtd = FDTD(conf, apMon, logger)
proc = Process(os.getpid())
initStateNumOpenFiles = len(proc.get_open_files())
for testAction in [TestAction("fakeSrc", "fakeDst") for i in range(3)]:
r = fdtd.service.service(testAction)
logger.debug("Result: %s" % r)
assert r.status == 0
# after TestAction, there should not be left behind any open files
numOpenFilesNow = len(proc.get_open_files())
assert initStateNumOpenFiles == numOpenFilesNow
# test on ReceivingServerAction - it's action after which the
# separate logger is not closed, test the number of open files went +1,
# send CleanupProcessesAction and shall again remain
# initStateNumOpenFiles send appropriate TestAction first (like in real)
serverId = "server-id"
testAction = TestAction(hostName, hostName)
testAction.id = serverId
r = fdtd.service.service(testAction)
assert r.status == 0
options = dict(gridUserDest="someuserDest", clientIP=os.uname()[1], destFiles=[])
recvServerAction = ReceivingServerAction(testAction.id, options)
r = fdtd.service.service(recvServerAction)
print(r.msg)
assert r.status == 0
numOpenFilesNow = len(proc.get_open_files())
# there should be only 1 extra opened file now
assert initStateNumOpenFiles == numOpenFilesNow - 1
cleanupAction = CleanupProcessesAction(serverId, timeout=2)
r = fdtd.service.service(cleanupAction)
print(r.msg)
assert r.status == 0
numOpenFilesNow = len(proc.get_open_files())
assert initStateNumOpenFiles == numOpenFilesNow
fdtd.shutdown()
fdtd.pyroDaemon.closedown()
logger.close()
开发者ID:juztas,项目名称:fdtcp,代码行数:54,代码来源:test_fdtd.py
注:本文中的psutil.Process类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论