• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python psutil.Process类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中psutil.Process的典型用法代码示例。如果您正苦于以下问题:Python Process类的具体用法?Python Process怎么用?Python Process使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Process类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_infinite_loop

    def test_infinite_loop(self):
        # Given
        user_answer = ("#!/bin/bash\nwhile [ 1 ] ;"
            " do echo "" > /dev/null ; done")
        kwargs = {
                  'metadata': {
                    'user_answer': user_answer,
                    'file_paths': self.file_paths,
                    'partial_grading': False,
                    'language': 'bash'
                    },
                    'test_case_data': self.test_case_data,
                  }

        # When
        grader = Grader(self.in_dir)
        result = grader.evaluate(kwargs)

        # Then
        self.assertFalse(result.get("success"))
        self.assert_correct_output(self.timeout_msg,
                                   result.get("error")[0]["message"]
                                   )
        parent_proc = Process(os.getpid()).children()
        if parent_proc:
            children_procs = Process(parent_proc[0].pid)
            self.assertFalse(any(children_procs.children(recursive=True)))
开发者ID:prathamesh920,项目名称:online_test,代码行数:27,代码来源:test_bash_evaluation.py


示例2: get_soledad_server_pid

def get_soledad_server_pid():
    output = check_output(['pidof', 'python'])
    for pid in output.split():
        proc = Process(int(pid))
        cmdline = proc.cmdline()
        if args.issubset(set(cmdline)):
            return int(pid)
开发者ID:leapcode,项目名称:soledad,代码行数:7,代码来源:utils.py


示例3: main

def main(argv):
    p = Popen(argv[1:])
    
    try:
        client = ExitCodeClient()
        try:
            proc = Process(pid=p.pid)
            while p.poll() is None:
                total_cpu_percent = proc.get_cpu_percent(interval=0)
                for child_proc in proc.get_children(recursive=True):
                    total_cpu_percent += child_proc.get_cpu_percent(interval=0)
                client.send_status(total_cpu_percent)
                time.sleep(0.1) # recommended waiting period from psutil docs
        except NoSuchProcess:
            pass
    except KeyboardInterrupt:
        try:
            p.terminate()
        except OSError:
            pass
    
    client.send_status(LEDMode.Success if p.returncode == 0 else LEDMode.Error)
    time.sleep(0.5) # Give server time to read value before connection closes
    client.shutdown()
    
    return p.returncode
开发者ID:logiconcepts819,项目名称:program-status-server,代码行数:26,代码来源:main.py


示例4: test_infinite_loop

    def test_infinite_loop(self):
        # Given
        user_answer = ("class Test {\n\tint square_num(int a)"
                       " {\n\t\twhile(0==0){\n\t\t}\n\t}\n}")
        kwargs = {
                  'metadata': {
                    'user_answer': user_answer,
                    'file_paths': self.file_paths,
                    'partial_grading': False,
                    'language': 'java'
                    }, 'test_case_data': self.test_case_data,
                  }

        # When
        grader = Grader(self.in_dir)
        result = grader.evaluate(kwargs)

        # Then
        self.assertFalse(result.get("success"))
        self.assert_correct_output(self.timeout_msg,
                                   result.get("error")[0]["message"]
                                   )
        parent_proc = Process(os.getpid()).children()
        if parent_proc:
            children_procs = Process(parent_proc[0].pid)
            self.assertFalse(any(children_procs.children(recursive=True)))
开发者ID:FOSSEE,项目名称:online_test,代码行数:26,代码来源:test_java_evaluation.py


示例5: test_infinite_loop

    def test_infinite_loop(self):
        # Given
        user_answer = dedent("""
        #include<stdio.h>
        int main(void){
        while(0==0){
        printf("abc");}
        }""")
        kwargs = {
                  'metadata': {
                    'user_answer': user_answer,
                    'file_paths': self.file_paths,
                    'partial_grading': False,
                    'language': 'cpp'
                    }, 'test_case_data': self.test_case_data,
                  }

        # When
        grader = Grader(self.in_dir)
        result = grader.evaluate(kwargs)

        # Then
        self.assertFalse(result.get("success"))
        self.assert_correct_output(self.timeout_msg,
                                   result.get("error")[0]["message"]
                                   )
        parent_proc = Process(os.getpid()).children()
        if parent_proc:
            children_procs = Process(parent_proc[0].pid)
            self.assertFalse(any(children_procs.children(recursive=True)))
开发者ID:FOSSEE,项目名称:online_test,代码行数:30,代码来源:test_c_cpp_evaluation.py


示例6: end_broker_process

    def end_broker_process(self):
        try:
            broker_process = Process(self.broker.pid)
        except NoSuchProcess:
            return # was killed
        # get stdout and stderr
        select_config = [self.broker.stdout, self.broker.stderr], [], [], 0.1
        stdout, stderr = [], []
        result = select.select(*select_config)
        while any(result):
            if result[0]:
                stdout.append(result[0][0].readline())
            if result[1]:
                stderr.append(result[1][0].readline())
            result = select.select(*select_config)
        if stdout and DEBUG_STDOUT:
            _print_debug('STDOUT', ''.join(stdout))
        if stderr and DEBUG_STDERR:
            _print_debug('STDERR', ''.join(stderr))

        # kill main process and its children
        children = [process.pid for process in broker_process.get_children()]
        _kill(self.broker.pid, timeout=TIMEOUT / 1000.0)
        for child_pid in children:
            _kill(child_pid, timeout=TIMEOUT / 1000.0)
开发者ID:andrebco,项目名称:pypelinin,代码行数:25,代码来源:test_broker.py


示例7: done

 def done(self):
     try:
         myself = Process(os.getpid())
         for child in myself.get_children():
             child.kill()
     except Exception as e:
         sys.stderr.write(str(e) + '\n')
开发者ID:naiteluode,项目名称:ftasomc,代码行数:7,代码来源:models.py


示例8: click

def click(x, notify=False, pid=None, pids=None, webdriver=None, window_name=None, debug=False):

    if debug:
        print("[beryl] starting click")
        print("\tpid: " + str(pid))
        print("\twebdriver: " + str(webdriver))
        print("\twindow_name: " + str(window_name))
        print("\tstr(type(webdriver)): " + str(type(webdriver)))

    type_as_string = str(type(x))

    webdriver_type_as_string = str(type(webdriver))
    if webdriver_type_as_string == "<class 'selenium.webdriver.firefox.webdriver.WebDriver'>":
        pids = [webdriver.binary.process.pid]
    elif webdriver_type_as_string == "<class 'selenium.webdriver.chrome.webdriver.WebDriver'>":
        process = Process(webdriver.service.process.pid)
        if hasattr(process, "children"):
            pids = [p.pid for p in process.children()]
        elif hasattr(process, "get_children"):
            pids = [p.pid for p in process.get_children()]


    if isinstance(x, str) or isinstance(x, unicode):
        if x.endswith(".png") or x.endswith(".jpg"):
            click_image(x, notify=notify)
        else:
            click_text(x, notify=notify, pids=pids, window_name=window_name, debug=debug)
    elif isinstance(x, PngImageFile):
        click_image(x,notify=notify)
    elif isinstance(x, tuple):
        click_location(x,notify=notify)
开发者ID:DanielJDufour,项目名称:breeze,代码行数:31,代码来源:__init__.py


示例9: trace_memory_usage

   def trace_memory_usage(self, frame, event, arg):
      """Callback for sys.settrace

      Args:
         frame: frame is the current stack frame
         event: event is a string: 'call', 'line', 'return', 'exception', 'c_call', 'c_return', or 'c_exception'
         arg: arg depends on the event type.

      Returns:
         function: wrap_func
      """
      if event in ('call', 'line', 'return') and frame.f_code in self.code_map:
         if event != 'call':
            # "call" event just saves the lineno but not the memory
            process = Process(getpid())
            mem = process.memory_info()[0] / float(2 ** 20)
            # if there is already a measurement for that line get the max
            old_mem = self.code_map[frame.f_code].get(self.prevline, 0)
            self.code_map[frame.f_code][self.prevline] = max(mem, old_mem)
         self.prevline = frame.f_lineno

      if self._original_trace_function is not None:
         self._original_trace_function(frame, event, arg)

      return self.trace_memory_usage
开发者ID:peter1000,项目名称:SpeedIT,代码行数:25,代码来源:LineMemoryProfileIT.py


示例10: test_cleanup_children_on_terminate

    def test_cleanup_children_on_terminate(self):
        """
        Subprocesses spawned by tasks should be terminated on terminate
        """
        class HangingSubprocessTask(luigi.Task):
            def run(self):
                python = sys.executable
                check_call([python, '-c', 'while True: pass'])

        task = HangingSubprocessTask()
        queue = mock.Mock()
        worker_id = 1

        task_process = TaskProcess(task, worker_id, queue, lambda: None, lambda: None)
        task_process.start()

        parent = Process(task_process.pid)
        while not parent.children():
            # wait for child process to startup
            sleep(0.01)

        [child] = parent.children()
        task_process.terminate()
        child.wait(timeout=1.0)  # wait for terminate to complete

        self.assertFalse(parent.is_running())
        self.assertFalse(child.is_running())
开发者ID:01-,项目名称:luigi,代码行数:27,代码来源:worker_task_test.py


示例11: get_procinfo_by_address

def get_procinfo_by_address(ip_src, ip_dst, port_src=None, port_dst=None):
	"""
	Gets Infos about the Prozess associated with the given address information
	Both port_src and port_dst must be not None or None at the same time.

	return -- [pid, "/path/to/command", "user", "hash"] or []
	"""
	result = []

	if port_src is not None:
		pids = [c.pid for c in net_connections()
			if len(c.raddr) != 0 and c.pid is not None and
				(c.laddr[0], c.raddr[0], c.laddr[1], c.raddr[1]) == (ip_src, ip_dst, port_src, port_dst)]
	else:
		pids = [c.pid for c in net_connections()
			if len(c.raddr) != 0 and c.pid is not None and (c.laddr[0], c.raddr[0]) == (ip_src, ip_dst)]

	try:
		if len(pids) > 1:
			logger.warning("more than 1 matching process: %r", pids)
		proc = Process(pids[0])
		cmd = [pids[0], proc.cmdline(), proc.username()]
		hash_input = "%d%s%s" % (cmd[0], cmd[1], cmd[2])
		procinfo_hash = hashlib.sha256(hash_input.encode("UTF-8"))
		cmd.append(procinfo_hash)
		logger.debug("process info: %r", cmd)
		return cmd
	except IndexError:
		pass
	return []
开发者ID:mike01,项目名称:ifi,代码行数:30,代码来源:procinfo.py


示例12: ppid_cascade

def ppid_cascade(process=None):
    if process is None:
        process = Process()
    process = process.parent()
    while process:
        yield process.pid
        process = process.parent()
开发者ID:phillipberndt,项目名称:scripts,代码行数:7,代码来源:pskill.py


示例13: _get_shell_pid

def _get_shell_pid():
    """Returns parent process pid."""
    proc = Process(os.getpid())

    try:
        return proc.parent().pid
    except TypeError:
        return proc.parent.pid
开发者ID:Googulator,项目名称:thefuck,代码行数:8,代码来源:not_configured.py


示例14: worker

 def worker(self, obj):
     if self.active_workers:
         for node, active_workers in self.active_workers.iteritems():
             for worker in active_workers:
                 if worker['id'] == obj.task_id:
                     p = Process(worker['worker_pid'])
                     return 'CPU:%.1f%% RAM:%.2f%%' % (p.cpu_percent(0.05), p.memory_percent())
     return 'N/a'
开发者ID:silverfix,项目名称:django-accio,代码行数:8,代码来源:admin.py


示例15: run

 def run(self):
     self._isRunning = True
     process = Process(os.getpid())
     while self._isRunning:
         self.cpuUsage = process.get_cpu_percent(self._pollInterval)
         self.memUsage = process.get_memory_info()[0]
         if self.delegate is not None:
             self.delegate.processCpuUsage(self.cpuUsage)
             self.delegate.processMemUsage(self.memUsage)
开发者ID:teragonaudio,项目名称:Lucidity,代码行数:9,代码来源:performance.py


示例16: test_valid_is_running

 def test_valid_is_running(self):
     p = Process()
     _, f = mkstemp()
     try:
         with open(f, 'w') as pid_file:
             pid_file.write('{0} {1:6f}'.format(p.pid, p.create_time()))
         self.assertTrue(_is_running(f))
     finally:
         unlink(f)
开发者ID:Prince88,项目名称:highlander,代码行数:9,代码来源:highlander_tests.py


示例17: test_valid_is_running

 def test_valid_is_running(self):
     p = Process()
     d = mkdtemp()
     f = _get_pid_filename(d)
     try:
         with open(f, 'w') as pid_file:
             pid_file.write('{0} {1:6f}'.format(p.pid, p.create_time()))
         self.assertTrue(_is_running(d))
     finally:
         rmtree(d)
开发者ID:linzhonghong,项目名称:highlander,代码行数:10,代码来源:highlander_tests.py


示例18: _set_running

def _set_running(filename):
    """Write the current process information to disk.
    :param filename: The name of the file where the process information will be written.
    """
    if isfile(str(filename)):
        raise PidFileExistsError()

    p = Process()
    with open(filename, 'w') as f:
        f.write('{0} {1:.6f}'.format(p.pid, p.create_time()))
开发者ID:Prince88,项目名称:highlander,代码行数:10,代码来源:highlander.py


示例19: common_option

def common_option(pid):
    ''' common options to be used in both restore and dump '''

    options = list()
    proc = Process(pid)
    if proc.terminal():
        options.append("--shell-job")
    if proc.connections(kind="inet"):
        options.append("--tcp-established")

    return options
开发者ID:shivika04,项目名称:pmigrate,代码行数:11,代码来源:proc_migrate.py


示例20: testFDTDServiceOpenFiles

def testFDTDServiceOpenFiles():
    """
    #41 - Too many open files (fdtd side)

    """
    hostName = os.uname()[1]
    f = getTempFile(functionalFDTDConfiguration)
    inputOption = "--config=%s" % f.name
    conf = ConfigFDTD(inputOption.split())
    conf.sanitize()
    testName = inspect.stack()[0][3]
    logger = Logger(name=testName, logFile="/tmp/fdtdtest-%s.log" % testName, level=logging.DEBUG)
    apMon = None
    fdtd = FDTD(conf, apMon, logger)

    proc = Process(os.getpid())
    initStateNumOpenFiles = len(proc.get_open_files())

    for testAction in [TestAction("fakeSrc", "fakeDst") for i in range(3)]:
        r = fdtd.service.service(testAction)
        logger.debug("Result: %s" % r)
        assert r.status == 0

    # after TestAction, there should not be left behind any open files
    numOpenFilesNow = len(proc.get_open_files())
    assert initStateNumOpenFiles == numOpenFilesNow

    # test on ReceivingServerAction - it's action after which the
    # separate logger is not closed, test the number of open files went +1,
    # send CleanupProcessesAction and shall again remain
    # initStateNumOpenFiles send appropriate TestAction first (like in real)
    serverId = "server-id"
    testAction = TestAction(hostName, hostName)
    testAction.id = serverId
    r = fdtd.service.service(testAction)
    assert r.status == 0
    options = dict(gridUserDest="someuserDest", clientIP=os.uname()[1], destFiles=[])
    recvServerAction = ReceivingServerAction(testAction.id, options)
    r = fdtd.service.service(recvServerAction)
    print(r.msg)
    assert r.status == 0
    numOpenFilesNow = len(proc.get_open_files())
    # there should be only 1 extra opened file now
    assert initStateNumOpenFiles == numOpenFilesNow - 1
    cleanupAction = CleanupProcessesAction(serverId, timeout=2)
    r = fdtd.service.service(cleanupAction)
    print(r.msg)
    assert r.status == 0
    numOpenFilesNow = len(proc.get_open_files())
    assert initStateNumOpenFiles == numOpenFilesNow

    fdtd.shutdown()
    fdtd.pyroDaemon.closedown()
    logger.close()
开发者ID:juztas,项目名称:fdtcp,代码行数:54,代码来源:test_fdtd.py



注:本文中的psutil.Process类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python _common.sconn函数代码示例发布时间:2022-05-25
下一篇:
Python psutil.Popen类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap