本文整理汇总了Python中neutron.agent.linux.utils.get_root_helper_child_pid函数的典型用法代码示例。如果您正苦于以下问题:Python get_root_helper_child_pid函数的具体用法?Python get_root_helper_child_pid怎么用?Python get_root_helper_child_pid使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_root_helper_child_pid函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: pid
def pid(self):
if self._process:
if not self._pid:
self._pid = utils.get_root_helper_child_pid(
self._process.pid,
self.cmd_without_namespace,
run_as_root=self.run_as_root)
return self._pid
开发者ID:eayunstack,项目名称:neutron,代码行数:8,代码来源:async_process.py
示例2: test_async_process_respawns
def test_async_process_respawns(self):
proc = async_process.AsyncProcess(["tail", "-f", self.test_file_path], respawn_interval=0)
proc.start()
# Ensure that the same output is read twice
self._check_stdout(proc)
pid = utils.get_root_helper_child_pid(proc._process.pid, proc.root_helper)
proc._kill_process(pid)
self._check_stdout(proc)
proc.stop()
开发者ID:jdevesa,项目名称:neutron,代码行数:10,代码来源:test_async_process.py
示例3: test_killed_monitor_respawns
def test_killed_monitor_respawns(self):
self.monitor.respawn_interval = 0
old_pid = self.monitor._process.pid
output1 = self.collect_initial_output()
pid = utils.get_root_helper_child_pid(old_pid, self.root_helper)
self.monitor._kill_process(pid)
self.monitor._reset_queues()
while (self.monitor._process.pid == old_pid):
eventlet.sleep(0.01)
output2 = self.collect_initial_output()
# Initial output should appear twice
self.assertEqual(output1, output2)
开发者ID:afori,项目名称:neutron,代码行数:12,代码来源:test_ovsdb_monitor.py
示例4: _wait_for_child_process
def _wait_for_child_process(self, timeout=CHILD_PROCESS_TIMEOUT, sleep=CHILD_PROCESS_SLEEP):
def child_is_running():
child_pid = utils.get_root_helper_child_pid(self.pid, run_as_root=True)
if utils.pid_invoked_with_cmdline(child_pid, self.cmd):
return True
utils.wait_until_true(
child_is_running,
timeout,
exception=RuntimeError("Process %s hasn't been spawned " "in %d seconds" % (self.cmd, timeout)),
)
self.child_pid = utils.get_root_helper_child_pid(self.pid, run_as_root=True)
开发者ID:cisco-openstack,项目名称:neutron,代码行数:12,代码来源:net_helpers.py
示例5: _test_get_root_helper_child_pid
def _test_get_root_helper_child_pid(self, expected=_marker, run_as_root=False, pids=None):
def _find_child_pids(x):
if not pids:
return []
pids.pop(0)
return pids
mock_pid = object()
with mock.patch.object(utils, "find_child_pids", side_effect=_find_child_pids):
actual = utils.get_root_helper_child_pid(mock_pid, run_as_root)
if expected is _marker:
expected = str(mock_pid)
self.assertEqual(expected, actual)
开发者ID:nate-johnston,项目名称:neutron,代码行数:13,代码来源:test_utils.py
示例6: test_get_root_helper_child_pid_returns_first_child
def test_get_root_helper_child_pid_returns_first_child(self):
"""Test that the first child, not lowest child pid is returned.
Test creates following process tree:
sudo +
|
+--rootwrap +
|
+--bash+
|
+--sleep 100
and tests that pid of `bash' command is returned.
"""
def wait_for_sleep_is_spawned(parent_pid):
proc_tree = utils.execute(
['pstree', parent_pid], check_exit_code=False)
processes = [command.strip() for command in proc_tree.split('---')
if command]
if processes:
return 'sleep' == processes[-1]
cmd = ['bash', '-c', '(sleep 100)']
proc = async_process.AsyncProcess(cmd, run_as_root=True)
proc.start()
# root helpers spawn their child processes asynchronously, and we
# don't want to use proc.start(block=True) as that uses
# get_root_helper_child_pid (The method under test) internally.
sudo_pid = proc._process.pid
common_utils.wait_until_true(
functools.partial(
wait_for_sleep_is_spawned,
sudo_pid),
sleep=0.1)
child_pid = utils.get_root_helper_child_pid(
sudo_pid, cmd, run_as_root=True)
self.assertIsNotNone(
child_pid,
"get_root_helper_child_pid is expected to return the pid of the "
"bash process")
self._addcleanup_sleep_process(child_pid)
with open('/proc/%s/cmdline' % child_pid, 'r') as f_proc_cmdline:
cmdline = f_proc_cmdline.readline().split('\0')[0]
self.assertIn('bash', cmdline)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:47,代码来源:test_utils.py
示例7: _kill
def _kill(self, respawning=False):
"""Kill the process and the associated watcher greenthreads.
:param respawning: Optional, whether respawn will be subsequently
attempted.
"""
# Halt the greenthreads
self._kill_event.send()
pid = utils.get_root_helper_child_pid(
self._process.pid, self.root_helper)
if pid:
self._kill_process(pid)
if not respawning:
# Clear the kill event to ensure the process can be
# explicitly started again.
self._kill_event = None
开发者ID:afori,项目名称:neutron,代码行数:18,代码来源:async_process.py
示例8: _test_get_root_helper_child_pid
def _test_get_root_helper_child_pid(self, expected=_marker, run_as_root=False, pids=None, cmds=None):
def _find_child_pids(x):
if not pids:
return []
pids.pop(0)
return pids
mock_pid = object()
pid_invoked_with_cmdline = {}
if cmds:
pid_invoked_with_cmdline["side_effect"] = cmds
else:
pid_invoked_with_cmdline["return_value"] = False
with mock.patch.object(utils, "find_child_pids", side_effect=_find_child_pids), mock.patch.object(
utils, "pid_invoked_with_cmdline", **pid_invoked_with_cmdline
):
actual = utils.get_root_helper_child_pid(mock_pid, mock.ANY, run_as_root)
if expected is _marker:
expected = str(mock_pid)
self.assertEqual(expected, actual)
开发者ID:openstack,项目名称:neutron,代码行数:20,代码来源:test_utils.py
示例9: child_is_running
def child_is_running():
child_pid = utils.get_root_helper_child_pid(
self.pid, self.cmd, run_as_root=True)
if utils.pid_invoked_with_cmdline(child_pid, self.cmd):
return True
开发者ID:21atlas,项目名称:neutron,代码行数:5,代码来源:net_helpers.py
示例10: pid
def pid(self):
if self._process:
return utils.get_root_helper_child_pid(
self._process.pid,
run_as_root=self.run_as_root)
开发者ID:asgard-lab,项目名称:neutron,代码行数:5,代码来源:async_process.py
注:本文中的neutron.agent.linux.utils.get_root_helper_child_pid函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论