本文整理汇总了Python中virttest.utils_misc.wait_for函数的典型用法代码示例。如果您正苦于以下问题:Python wait_for函数的具体用法?Python wait_for怎么用?Python wait_for使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wait_for函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: check_guest_meminfo
def check_guest_meminfo(old_mem, check_option):
"""
Check meminfo on guest.
"""
assert old_mem is not None
session = vm.wait_for_login()
# Hot-plugged memory should be online by udev rules
udev_file = "/lib/udev/rules.d/80-hotplug-cpu-mem.rules"
udev_rules = ('SUBSYSTEM=="memory", ACTION=="add", TEST=="state",'
' ATTR{state}=="offline", ATTR{state}="online"')
cmd = ("grep memory %s || echo '%s' >> %s"
% (udev_file, udev_rules, udev_file))
session.cmd(cmd)
# Wait a while for new memory to be detected.
utils_misc.wait_for(
lambda: vm.get_totalmem_sys(online) != int(old_mem), 30, first=20.0)
new_mem = vm.get_totalmem_sys(online)
session.close()
logging.debug("Memtotal on guest: %s", new_mem)
no_of_times = 1
if at_times:
no_of_times = at_times
if check_option == "attach":
if new_mem != int(old_mem) + (int(tg_size) * no_of_times):
test.fail("Total memory on guest couldn't changed after "
"attach memory device")
if check_option == "detach":
if new_mem != int(old_mem) - (int(tg_size) * no_of_times):
test.fail("Total memory on guest couldn't changed after "
"detach memory device")
开发者ID:balamuruhans,项目名称:tp-libvirt,代码行数:31,代码来源:libvirt_mem.py
示例2: check_memory
def check_memory(self, vm=None):
"""
Check is guest memory is really match assgined to VM.
:param vm: VM object, get VM object from env if vm is None.
"""
error_context.context("Verify memory info", logging.info)
if not vm:
vm = self.env.get_vm(self.params["main_vm"])
vm.verify_alive()
threshold = float(self.params.get("threshold", 0.10))
timeout = float(self.params.get("wait_resume_timeout", 60))
# Notes:
# some sub test will pause VM, here need to wait VM resume
# then check memory info in guest.
utils_misc.wait_for(lambda: not vm.is_paused(), timeout=timeout)
utils_misc.verify_host_dmesg()
self.os_type = self.params.get("os_type")
guest_mem_size = super(MemoryHotplugTest, self).get_guest_total_mem(vm)
vm_mem_size = self.get_vm_mem(vm)
if abs(guest_mem_size - vm_mem_size) > vm_mem_size * threshold:
msg = ("Assigned '%s MB' memory to '%s'"
"but, '%s MB' memory detect by OS" %
(vm_mem_size, vm.name, guest_mem_size))
raise exceptions.TestFail(msg)
开发者ID:ngu-niny,项目名称:avocado-vt,代码行数:25,代码来源:__init__.py
示例3: run
def run(test, params, env):
"""
drive_mirror_stress test:
1). guest installation
2). start mirror during guest installation
3). after installation complete, reboot guest verfiy guest reboot correctly.
:param test: QEMU test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
"""
args = (test, params, env)
bg = utils_misc.InterruptedThread(utils_test.run_virt_sub_test, args,
{"sub_type": "unattended_install"})
bg.start()
utils_misc.wait_for(bg.is_alive, timeout=10)
time.sleep(random.uniform(60, 200))
tag = params["source_image"]
mirror_test = drive_mirror.DriveMirror(test, params, env, tag)
mirror_test.trash_files.append(mirror_test.image_file)
try:
mirror_test.start()
mirror_test.wait_for_steady()
mirror_test.reopen()
bg.join()
finally:
mirror_test.clean()
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:28,代码来源:drive_mirror_installation.py
示例4: guest_netwok_connecting_check
def guest_netwok_connecting_check(guest_ip, link_up, change_queues=False):
"""
Check whether guest network is connective by ping
"""
if link_up:
vm.wait_for_login()
guest_ip = vm.get_address()
if change_queues:
env["run_change_queues"] = False
bg_thread = utils.InterruptedThread(change_queues_number_repeatly,
(guest_ifname,))
bg_thread.start()
utils_misc.wait_for(lambda: env["run_change_queues"], 30, 0, 2,
"wait queues change start")
_, output = utils_test.ping(guest_ip, count=10, timeout=20)
if not link_up and utils_test.get_loss_ratio(output) != 100:
err_msg = "guest network still connecting after down the link"
raise error.TestFail(err_msg)
elif link_up and utils_test.get_loss_ratio(output) == 100:
err_msg = "All packets lost during ping guest ip after link up"
raise error.TestFail(err_msg)
else:
logging.info("Guest network connecting is exactly as expected")
if change_queues:
env["run_change_queues"] = False
bg_thread.join()
开发者ID:Xiangmin,项目名称:tp-qemu,代码行数:29,代码来源:set_link.py
示例5: run
def run(test, params, env):
"""
block_stream_installation test:
1). guest installation
2). live snapshot during guest installation
3). block stream afterwards
:param test: QEMU test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
"""
args = (test, params, env)
bg = utils_misc.InterruptedThread(utils_test.run_virt_sub_test, args,
{"sub_type": "unattended_install"})
bg.start()
utils_misc.wait_for(bg.is_alive, timeout=10)
time.sleep(random.uniform(60, 200))
tag = params["source_image"]
stream_test = blk_stream.BlockStream(test, params, env, tag)
stream_test.trash_files.append(stream_test.image_file)
try:
stream_test.create_snapshots()
stream_test.start()
stream_test.wait_for_finished()
bg.join()
finally:
stream_test.clean()
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:27,代码来源:block_stream_installation.py
示例6: get_dirty
def get_dirty(session, frozen=False):
"""
Get dirty data of guest
"""
try:
data_cmd = "cat /proc/meminfo | grep Dirty"
if not frozen:
result = utils_misc.wait_for(lambda:
int(session.
cmd_output(data_cmd).
strip().split()[1]) != 0,
60)
if result:
return int(session.cmd_output(data_cmd).strip().
split()[1])
else:
return 0
dirty_info = session.cmd_output(data_cmd).strip()
return int(dirty_info.split()[1])
else:
result = utils_misc.wait_for(lambda:
int(session.
cmd_output(data_cmd).
strip().split()[1]) == 0,
60)
if result:
return 0
else:
return int(session.cmd_output(data_cmd).strip().
split()[1])
except (IndexError, ValueError), details:
raise error.TestFail("Get dirty info failed: %s" % details)
开发者ID:PandaWei,项目名称:tp-libvirt,代码行数:32,代码来源:virsh_qemu_agent_command_fs.py
示例7: get_ip_by_mac
def get_ip_by_mac(mac_addr, try_dhclint=False):
"""
Get interface IP address by given MAC addrss. If try_dhclint is
True, then try to allocate IP addrss for the interface.
"""
session = vm.wait_for_login()
def f():
return utils_net.get_guest_ip_addr(session, mac_addr)
try:
ip_addr = utils_misc.wait_for(f, 10)
if ip_addr is None:
iface_name = utils_net.get_linux_ifname(session, mac_addr)
if try_dhclint:
session.cmd("dhclient %s" % iface_name)
ip_addr = utils_misc.wait_for(f, 10)
else:
# No IP for the interface, just print the interface name
logging.warn("Find '%s' with MAC address '%s', "
"but which has no IP address", iface_name,
mac_addr)
finally:
session.close()
return ip_addr
开发者ID:Hao-Liu,项目名称:tp-libvirt,代码行数:25,代码来源:virsh_net_dhcp_leases.py
示例8: clear_interface_linux
def clear_interface_linux(vm, login_timeout, timeout):
"""
Clears user interface of a vm without reboot
:param vm: VM where cleaning is required
"""
logging.info("restarting X/gdm on: %s", vm.name)
session = vm.wait_for_login(username="root", password="123456",
timeout=login_timeout)
if "release 7" in session.cmd('cat /etc/redhat-release'):
command = "gdm"
pgrep_process = "'^gdm$'"
else:
command = "Xorg"
pgrep_process = "Xorg"
try:
pid = session.cmd("pgrep %s" % pgrep_process)
session.cmd("killall %s" % command)
utils_misc.wait_for(lambda: _is_pid_alive(session, pid), 10,
timeout, 0.2)
except:
pass
try:
session.cmd("ps -C %s" % command)
except ShellCmdError:
raise error.TestFail("X/gdm not running")
开发者ID:Andrei-Stepanov,项目名称:virt-test,代码行数:29,代码来源:utils_spice.py
示例9: check_boot_result
def check_boot_result(boot_fail_info, device_name):
"""
Check boot result, and logout from iscsi device if boot from iscsi.
"""
logging.info("Wait for display and check boot info.")
infos = boot_fail_info.split(';')
f = lambda: re.search(infos[0], vm.serial_console.get_output())
utils_misc.wait_for(f, timeout, 1)
logging.info("Try to boot from '%s'" % device_name)
try:
if dev_name == "hard-drive" or (dev_name == "scsi-hd" and not
params.get("image_name_stg")):
error.context("Log into the guest to verify it's up",
logging.info)
session = vm.wait_for_login(timeout=timeout)
session.close()
vm.destroy()
return
output = vm.serial_console.get_output()
for i in infos:
if not re.search(i, output):
raise error.TestFail("Could not boot from"
" '%s'" % device_name)
finally:
cleanup(device_name)
开发者ID:NixSilva,项目名称:virt-test,代码行数:29,代码来源:boot_from_device.py
示例10: receiver
def receiver():
""" Receive side """
logging.info("Starting receiver process on %s", receiver_addr)
if vm_receiver:
session = vm_receiver.wait_for_login(timeout=login_timeout)
else:
username = params.get("username", "")
password = params.get("password", "")
prompt = params.get("shell_prompt", "[\#\$]")
linesep = eval("'%s'" % params.get("shell_linesep", r"\n"))
client = params.get("shell_client")
port = int(params.get("shell_port"))
log_filename = ("session-%s-%s.log" % (receiver_addr,
utils_misc.generate_random_string(4)))
session = remote.remote_login(client, receiver_addr, port,
username, password, prompt,
linesep, log_filename, timeout)
session.set_status_test_command("echo %errorlevel%")
install_ntttcp(session)
ntttcp_receiver_cmd = params.get("ntttcp_receiver_cmd")
global _receiver_ready
f = open(results_path + ".receiver", 'a')
for b in buffers:
utils_misc.wait_for(lambda: not _wait(), timeout)
_receiver_ready = True
rbuf = params.get("fixed_rbuf", b)
cmd = ntttcp_receiver_cmd % (
session_num, receiver_addr, rbuf, buf_num)
r = session.cmd_output(cmd, timeout=timeout,
print_func=logging.debug)
f.write("Send buffer size: %s\n%s\n%s" % (b, cmd, r))
f.close()
session.close()
开发者ID:ayiyaliing,项目名称:virt-test,代码行数:33,代码来源:ntttcp.py
示例11: add_device
def add_device(pci_num, queues=1):
info_pci_ref = vm.monitor.info("pci")
reference = session.cmd_output(reference_cmd)
try:
# get function for adding device.
add_fuction = local_functions["%s_%s" % (cmd_type, pci_type)]
except Exception:
raise error.TestError("No function for adding '%s' dev with '%s'" %
(pci_type, cmd_type))
after_add = None
if add_fuction:
# Do add pci device.
after_add = add_fuction(pci_num, queues)
try:
# Define a helper function to compare the output
def _new_shown():
o = session.cmd_output(reference_cmd)
return o != reference
# Define a helper function to catch PCI device string
def _find_pci():
output = session.cmd_output(params.get("find_pci_cmd"))
output = map(string.strip, output.splitlines())
ref = map(string.strip, reference.splitlines())
output = [_ for _ in output if _ not in ref]
output = "\n".join(output)
if re.search(params.get("match_string"), output, re.I):
return True
return False
error.context("Start checking new added device")
# Compare the output of 'info pci'
if after_add == info_pci_ref:
raise error.TestFail("No new PCI device shown after executing "
"monitor command: 'info pci'")
secs = int(params.get("wait_secs_for_hook_up"))
if not utils_misc.wait_for(_new_shown, test_timeout, secs, 3):
raise error.TestFail("No new device shown in output of command "
"executed inside the guest: %s" %
reference_cmd)
if not utils_misc.wait_for(_find_pci, test_timeout, 3, 3):
raise error.TestFail("PCI %s %s device not found in guest. "
"Command was: %s" %
(pci_model, pci_type,
params.get("find_pci_cmd")))
# Test the newly added device
try:
session.cmd(params.get("pci_test_cmd") % (pci_num + 1))
except aexpect.ShellError, e:
raise error.TestFail("Check for %s device failed after PCI "
"hotplug. Output: %r" % (pci_type, e.output))
except Exception:
pci_del(pci_num, ignore_failure=True)
raise
开发者ID:FT4VT,项目名称:FT4VM-L1_test,代码行数:60,代码来源:pci_hotplug.py
示例12: run_ip_test
def run_ip_test(session, ip_ver):
"""
Check iptables on host and ipv6 address on guest
"""
if ip_ver == "ipv6":
# Clean up iptables rules for guest to get ipv6 address
session.cmd_status("ip6tables -F")
utils_net.restart_guest_network(session, iface_mac,
ip_version=ip_ver)
# It may take some time to get the ip address
def get_ip_func():
return utils_net.get_guest_ip_addr(session, iface_mac,
ip_version=ip_ver)
utils_misc.wait_for(get_ip_func, 10)
vm_ip = get_ip_func()
logging.debug("Guest has ip: %s", vm_ip)
if not vm_ip:
raise error.TestFail("Can't find ip address on guest")
ping_cmd = "ping -c 5"
ip_gateway = net_ip_address
if ip_ver == "ipv6":
ping_cmd = "ping6 -c 5"
ip_gateway = net_ipv6_address
if ip_gateway:
if utils.system("%s %s" % (ping_cmd, ip_gateway),
ignore_status=True):
raise error.TestFail("Failed to ping gateway address: %s"
% ip_gateway)
开发者ID:Antique,项目名称:tp-libvirt,代码行数:30,代码来源:iface_network.py
示例13: wait_for_balloon_complete
def wait_for_balloon_complete(self, timeout):
"""
Wait until guest memory don't change
"""
logging.info("Wait until guest memory don't change")
is_stable = self._mem_state()
utils_misc.wait_for(is_stable.next, timeout, step=10.0)
开发者ID:Zhengtong,项目名称:tp-qemu,代码行数:7,代码来源:balloon_check.py
示例14: run_bg_stress_test
def run_bg_stress_test(bg_stress_test):
"""
Run backgroud test.
:param bg_stress_test: Background test.
:return: return the background case thread if it's successful;
else raise error.
"""
error_context.context("Run test %s background" % bg_stress_test,
logging.info)
stress_thread = None
wait_time = float(params.get("wait_bg_time", 60))
target_process = params.get("target_process", "")
bg_stress_run_flag = params.get("bg_stress_run_flag")
# Need to set bg_stress_run_flag in some cases to make sure all
# necessary steps are active
env[bg_stress_run_flag] = False
stress_thread = utils.InterruptedThread(
utils_test.run_virt_sub_test, (test, params, env),
{"sub_type": bg_stress_test})
stress_thread.start()
if not utils_misc.wait_for(lambda: check_bg_running(target_process),
120, 0, 1):
raise exceptions.TestFail("Backgroud test %s is not "
"alive!" % bg_stress_test)
if params.get("set_bg_stress_flag", "no") == "yes":
logging.info("Wait %s test start" % bg_stress_test)
if not utils_misc.wait_for(lambda: env.get(bg_stress_run_flag),
wait_time, 0, 0.5):
err = "Fail to start %s test" % bg_stress_test
raise exceptions.TestError(err)
env["bg_status"] = 1
return stress_thread
开发者ID:PyLearner,项目名称:tp-qemu,代码行数:33,代码来源:driver_in_use.py
示例15: run_ip_test
def run_ip_test(session, ip_ver):
"""
Check iptables on host and ipv6 address on guest
"""
if ip_ver == "ipv6":
# Clean up iptables rules for guest to get ipv6 address
session.cmd_status("ip6tables -F")
# It may take some time to get the ip address
def get_ip_func():
return utils_net.get_guest_ip_addr(session, iface_mac,
ip_version=ip_ver)
utils_misc.wait_for(get_ip_func, 5)
if not get_ip_func():
utils_net.restart_guest_network(session, iface_mac,
ip_version=ip_ver)
utils_misc.wait_for(get_ip_func, 5)
vm_ip = get_ip_func()
logging.debug("Guest has ip: %s", vm_ip)
if not vm_ip:
test.fail("Can't find ip address on guest")
ip_gateway = net_ip_address
if ip_ver == "ipv6":
ip_gateway = net_ipv6_address
# Cleanup ip6talbes on host for ping6 test
process.system("ip6tables -F")
if ip_gateway and not routes:
ping_s, _ = ping(dest=ip_gateway, count=5,
timeout=10, session=session)
if ping_s:
test.fail("Failed to ping gateway address: %s" % ip_gateway)
开发者ID:lento-sun,项目名称:tp-libvirt,代码行数:31,代码来源:iface_network.py
示例16: launch_netperf_client
def launch_netperf_client(test, server_ips, netperf_clients, test_option,
test_duration, netperf_para_sess,
netperf_cmd_prefix):
"""
start netperf client in guest.
"""
start_time = time.time()
stop_time = start_time + test_duration * 1.5
logging.info("server_ips = %s", server_ips)
for s_ip in server_ips:
for n_client in netperf_clients:
n_client.bg_start(s_ip, test_option,
netperf_para_sess, netperf_cmd_prefix)
if utils_misc.wait_for(n_client.is_netperf_running, 30, 0, 1,
"Wait netperf test start"):
logging.info("Netperf test start successfully.")
else:
test.error("Can not start netperf client.")
for n_client in netperf_clients:
if n_client.is_netperf_running():
left_time = stop_time - time.time()
utils_misc.wait_for(lambda: not
n_client.is_netperf_running(),
left_time, 0, 5,
"Wait netperf test finish %ss" % left_time)
开发者ID:ldoktor,项目名称:tp-qemu,代码行数:26,代码来源:multi_nics_stress.py
示例17: guest_netwok_connecting_check
def guest_netwok_connecting_check(guest_ip, link_up, change_queues=False):
"""
Check whether guest network is connective by ping
"""
if change_queues:
env["run_change_queues"] = False
bg_thread = utils_misc.InterruptedThread(
change_queues_number_repeatly, (guest_ifname,))
bg_thread.start()
utils_misc.wait_for(lambda: env["run_change_queues"], 30, 0, 2,
"wait queues change start")
time.sleep(0.5)
output = utils_test.ping(guest_ip, 10, interface=host_interface,
timeout=20, session=None)[1]
if not link_up and utils_test.get_loss_ratio(output) < 80:
err_msg = "guest network still connecting after down the link"
test.fail(err_msg)
elif link_up and utils_test.get_loss_ratio(output) > 20:
err_msg = "All packets lost during ping guest ip after link up"
test.fail(err_msg)
if change_queues:
env["run_change_queues"] = False
bg_thread.join()
开发者ID:ldoktor,项目名称:tp-qemu,代码行数:25,代码来源:set_link.py
示例18: do_fstrim
def do_fstrim(fstrim_type, vm, status_error=False):
"""
Execute fstrim in different ways, and check its result.
"""
# ensure that volume capacity won't change before trimming disk.
if not _sync_finished():
utils_misc.wait_for(_sync_finished, timeout=300)
if fstrim_type == "fstrim_cmd":
session = vm.wait_for_login()
output = session.cmd_output("fstrim -v /mnt", timeout=240)
session.close()
logging.debug(output)
if re.search("Operation not supported", output):
if status_error:
# virtio is not supported in unmap operations
logging.debug("Expected failure: virtio do not support fstrim")
return
else:
raise error.TestFail("Not supported fstrim on supported "
"envrionment.Bug?")
try:
trimmed_bytes = re.search("\d+\sbytes",
output).group(0).split()[0]
trimmed = int(trimmed_bytes)
logging.debug("Trimmed size is:%s bytes", trimmed)
except (AttributeError, IndexError), detail:
raise error.TestFail("Do fstrim failed:%s" % detail)
if trimmed == 0:
raise error.TestFail("Trimmed size is 0.")
开发者ID:noxdafox,项目名称:tp-libvirt,代码行数:29,代码来源:storage_discard.py
示例19: check_guest_flags
def check_guest_flags(bash_cmd, flags):
"""
Check bypass_cache option for single guest.
"""
# Drop caches.
drop_caches()
virsh_cmd = "service libvirt-guests stop"
check_flags_parallel(virsh_cmd, bash_cmd %
(managed_save_file, managed_save_file,
"1", flags), flags)
ret = utils.run("service libvirt-guests status",
ignore_status=True)
logging.info("status output: %s", ret.stdout)
if not re.findall(r"Suspending %s" % vm_name,
ret.stdout, re.M):
raise error.TestFail("Can't see messages of suspending vm")
# status command should return 3.
if ret.exit_status != 3:
raise error.TestFail("The exit code %s for libvirt-guests"
" status is not correct" % ret)
# Wait for VM in shut off state
utils_misc.wait_for(lambda: vm.state() == "shut off", 10)
virsh_cmd = "service libvirt-guests start"
check_flags_parallel(virsh_cmd, bash_cmd %
(managed_save_file, managed_save_file,
"0", flags), flags)
开发者ID:Antique,项目名称:tp-libvirt,代码行数:27,代码来源:virsh_managedsave.py
示例20: add_device
def add_device(pci_num):
reference_cmd = params["reference_cmd"]
find_pci_cmd = params["find_pci_cmd"]
info_pci_ref = vm.monitor.info("pci")
reference = session.cmd_output(reference_cmd)
try:
# get function for adding device.
add_fuction = local_functions["%s_iov" % cmd_type]
except Exception:
raise error.TestError(
"No function for adding sr-iov dev with '%s'" %
cmd_type)
after_add = None
if add_fuction:
# Do add pci device.
after_add = add_fuction(pci_num)
try:
# Define a helper function to compare the output
def _new_shown():
o = session.cmd_output(reference_cmd)
return o != reference
# Define a helper function to catch PCI device string
def _find_pci():
o = session.cmd_output(find_pci_cmd)
if re.search(match_string, o, re.IGNORECASE):
return True
else:
return False
error.context("Start checking new added device")
# Compare the output of 'info pci'
if after_add == info_pci_ref:
raise error.TestFail("No new PCI device shown after executing "
"monitor command: 'info pci'")
secs = int(params["wait_secs_for_hook_up"])
if not utils_misc.wait_for(_new_shown, test_timeout, secs, 3):
raise error.TestFail("No new device shown in output of command "
"executed inside the guest: %s" %
reference_cmd)
if not utils_misc.wait_for(_find_pci, test_timeout, 3, 3):
raise error.TestFail("New add sr-iov device not found in guest. "
"Command was: %s" % find_pci_cmd)
# Test the newly added device
try:
session.cmd(params["pci_test_cmd"] % (pci_num + 1))
except aexpect.ShellError, e:
raise error.TestFail("Check for sr-iov device failed after PCI "
"hotplug. Output: %r" % e.output)
except Exception:
pci_del(pci_num, ignore_failure=True)
raise
开发者ID:FT4VT,项目名称:FT4VM-L1_test,代码行数:58,代码来源:sr_iov_hotplug.py
注:本文中的virttest.utils_misc.wait_for函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论