本文整理汇总了Python中virttest.error_context.context函数的典型用法代码示例。如果您正苦于以下问题:Python context函数的具体用法?Python context怎么用?Python context使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了context函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: query_status
def query_status(self):
"""
query running block mirroring job info;
"""
error_context.context("query job status", logging.info)
if not self.get_status():
self.test.fail("No active job")
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:7,代码来源:drive_mirror_simple.py
示例2: _setup_hugepage
def _setup_hugepage(params):
"""
Setup the configure of host:
1. Check the size of hugepage on host.
2. Calculate the num of assigning pages by (assigning total size /
hugepage size).
3. Set hugepage by executing "echo $num > /proc/sys/vm/nr_hugepages".
4. Mount this hugepage to /mnt/kvm_hugepage.
"""
size = params['total_hugepage_size']
huge_page = test_setup.HugePageConfig(params)
error_context.context('Assign %sMB hugepages in host.' % size, logging.info)
hugepage_size = huge_page.get_hugepage_size()
logging.debug('Hugepage size is %skB in host.' % hugepage_size)
huge_page.target_hugepages = int((int(size) * 1024) // hugepage_size)
logging.debug('Set hugepages to %d pages in host.'
% huge_page.target_hugepages)
huge_page.set_node_num_huge_pages(huge_page.target_hugepages,
0, hugepage_size)
error_context.context('mount hugepages to %s'
% huge_page.hugepage_path, logging.info)
huge_page.mount_hugepage_fs()
开发者ID:ldoktor,项目名称:tp-qemu,代码行数:25,代码来源:slof_hugepage.py
示例3: balloon_memory
def balloon_memory(session, device_path):
"""
Doing memory balloon in a loop and check memory status during balloon.
:param session: VM session.
:param device_path: balloon polling path.
"""
repeat_times = int(params.get("repeat_times", 5))
logging.info("repeat times: %d" % repeat_times)
balloon_test = BallooningTestWin(test, params, env)
while repeat_times:
for tag in params.objects('test_tags'):
error_context.context("Running %s test" % tag, logging.info)
params_tag = params.object_params(tag)
balloon_type = params_tag['balloon_type']
min_sz, max_sz = balloon_test.get_memory_boundary(balloon_type)
expect_mem = int(random.uniform(min_sz, max_sz))
quit_after_test = balloon_test.run_ballooning_test(expect_mem, tag)
get_polling_output = vm.monitor.qom_get(device_path, get_balloon_property)
memory_check(vm, get_polling_output, 'stat-free-memory')
if quit_after_test:
return
balloon_test.reset_memory()
repeat_times -= 1
开发者ID:dagrh,项目名称:tp-qemu,代码行数:27,代码来源:balloon_service.py
示例4: run_test
def run_test(qemu_src_dir):
"""
run QEMU I/O test suite
:qemu_src_dir: path of qemu source code
"""
iotests_root = params.get("iotests_root", "tests/qemu-iotests")
extra_options = params.get("qemu_io_extra_options", "")
image_format = params.get("qemu_io_image_format")
result_pattern = params.get("iotests_result_pattern")
error_context.context("running qemu-iotests for image format %s"
% image_format, logging.info)
os.environ["QEMU_PROG"] = utils_misc.get_qemu_binary(params)
os.environ["QEMU_IMG_PROG"] = utils_misc.get_qemu_img_binary(params)
os.environ["QEMU_IO_PROG"] = utils_misc.get_qemu_io_binary(params)
os.environ["QEMU_NBD_PROG"] = utils_misc.get_binary('qemu-nbd', params)
os.chdir(os.path.join(qemu_src_dir, iotests_root))
cmd = './check'
if extra_options:
cmd += " %s" % extra_options
cmd += " -%s" % image_format
output = process.system_output(cmd, ignore_status=True, shell=True)
match = re.search(result_pattern, output, re.I | re.M)
if match:
iotests_log_file = "qemu_iotests_%s.log" % image_format
iotests_log_file = utils_misc.get_path(test.debugdir, iotests_log_file)
with open(iotests_log_file, 'w+') as log:
log.write(output)
log.flush()
msg = "Total test %s cases, %s failed"
raise exceptions.TestFail(msg % (match.group(2), match.group(1)))
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:31,代码来源:rh_qemu_iotests.py
示例5: start_suspend
def start_suspend(self, **args):
"""
Start suspend via qemu guest agent.
"""
error_context.context("Suspend guest via guest agent", logging.info)
if self.guest_agent:
self.guest_agent.suspend(self.suspend_mode)
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:7,代码来源:qemu_guest_agent_suspend.py
示例6: start_stress
def start_stress(session):
"""
Load stress in guest.
"""
error_context.context("Load stress in guest", logging.info)
stress_type = params.get("stress_type", "none")
if stress_type == "none":
return
if stress_type == "netperf":
bg = ""
bg_stress_test = params.get("run_bgstress")
bg = utils_misc.InterruptedThread(utils_test.run_virt_sub_test,
(test, params, env),
{"sub_type": bg_stress_test})
bg.start()
if stress_type == "io":
install_stress_app(session)
cmd = params.get("start_cmd")
logging.info("Launch stress app in guest with command: '%s'" % cmd)
session.sendline(cmd)
running = utils_misc.wait_for(lambda: stress_running(session),
timeout=150, step=5)
if not running:
test.error("Stress isn't running")
logging.info("Stress running now")
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:32,代码来源:kdump_with_stress.py
示例7: create_gluster_vol
def create_gluster_vol(params):
vol_name = params.get("gluster_volume_name")
force = params.get('force_recreate_gluster') == "yes"
brick_path = params.get("gluster_brick")
if not os.path.isabs(brick_path): # do nothing when path is absolute
base_dir = params.get("images_base_dir", data_dir.get_data_dir())
brick_path = os.path.join(base_dir, brick_path)
error_context.context("Host name lookup failed")
hostname = socket.gethostname()
if not hostname or hostname == "(none)":
if_up = utils_net.get_net_if(state="UP")
for i in if_up:
ipv4_value = utils_net.get_net_if_addrs(i)["ipv4"]
logging.debug("ipv4_value is %s", ipv4_value)
if ipv4_value != []:
ip_addr = ipv4_value[0]
break
hostname = ip_addr
# Start the gluster dameon, if not started
glusterd_start()
# Check for the volume is already present, if not create one.
if not is_gluster_vol_avail(vol_name) or force:
return gluster_vol_create(vol_name, hostname, brick_path, force)
else:
return True
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:28,代码来源:gluster.py
示例8: cleanup
def cleanup(self, force_stop=False):
error_context.context("Cleaning up test NFS share", logging.info)
process.run("umount -l -f %s" % self.mnt_dir, shell=True)
process.run("exportfs -u %s:%s" % (self.nfs_ip, self.nfs_dir),
shell=True)
if force_stop:
self.stop_service()
开发者ID:pezhang,项目名称:tp-qemu,代码行数:7,代码来源:nfs_corrupt.py
示例9: _get_service_cmds
def _get_service_cmds(self):
"""
Figure out the commands used to control the NFS service.
"""
error_context.context("Finding out commands to handle NFS service",
logging.info)
service = utils_path.find_command("service")
try:
systemctl = utils_path.find_command("systemctl")
except ValueError:
systemctl = None
if systemctl is not None:
init_script = "/etc/init.d/nfs"
service_file = "/lib/systemd/system/nfs-server.service"
if os.path.isfile(init_script):
service_name = "nfs"
elif os.path.isfile(service_file):
service_name = "nfs-server"
else:
raise NFSCorruptError("Files %s and %s absent, don't know "
"how to set up NFS for this host" %
(init_script, service_file))
start_cmd = "%s start %s.service" % (systemctl, service_name)
stop_cmd = "%s stop %s.service" % (systemctl, service_name)
restart_cmd = "%s restart %s.service" % (systemctl, service_name)
status_cmd = "%s status %s.service" % (systemctl, service_name)
else:
start_cmd = "%s nfs start" % service
stop_cmd = "%s nfs stop" % service
restart_cmd = "%s nfs restart" % service
status_cmd = "%s nfs status" % service
return [start_cmd, stop_cmd, restart_cmd, status_cmd]
开发者ID:pezhang,项目名称:tp-qemu,代码行数:34,代码来源:nfs_corrupt.py
示例10: setup_win_driver_verifier
def setup_win_driver_verifier(driver, vm, timeout=300):
"""
Enable driver verifier for windows guest.
:param driver: The driver which needs enable the verifier.
:param vm: VM object.
:param timeout: Timeout in seconds.
"""
session = vm.wait_for_login(timeout=timeout)
try:
verifier_status = _check_driver_verifier(session, driver)[1]
if not verifier_status:
error_context.context("Enable %s driver verifier" % driver,
logging.info)
verifier_setup_cmd = "verifier /standard /driver %s.sys" % driver
session.cmd(verifier_setup_cmd,
timeout=timeout,
ignore_all_errors=True)
session = vm.reboot(session)
verifier_status, output = _check_driver_verifier(session, driver)
if not verifier_status:
msg = "%s verifier is not enabled, details: %s" % (driver,
output)
raise exceptions.TestFail(msg)
logging.info("%s verifier is enabled already" % driver)
finally:
session.close()
开发者ID:ngu-niny,项目名称:avocado-vt,代码行数:27,代码来源:__init__.py
示例11: check_memory
def check_memory(self, vm=None):
"""
Check is guest memory is really match assgined to VM.
:param vm: VM object, get VM object from env if vm is None.
"""
error_context.context("Verify memory info", logging.info)
if not vm:
vm = self.env.get_vm(self.params["main_vm"])
vm.verify_alive()
threshold = float(self.params.get("threshold", 0.10))
timeout = float(self.params.get("wait_resume_timeout", 60))
# Notes:
# some sub test will pause VM, here need to wait VM resume
# then check memory info in guest.
utils_misc.wait_for(lambda: not vm.is_paused(), timeout=timeout)
utils_misc.verify_host_dmesg()
self.os_type = self.params.get("os_type")
guest_mem_size = super(MemoryHotplugTest, self).get_guest_total_mem(vm)
vm_mem_size = self.get_vm_mem(vm)
if abs(guest_mem_size - vm_mem_size) > vm_mem_size * threshold:
msg = ("Assigned '%s MB' memory to '%s'"
"but, '%s MB' memory detect by OS" %
(vm_mem_size, vm.name, guest_mem_size))
raise exceptions.TestFail(msg)
开发者ID:ngu-niny,项目名称:avocado-vt,代码行数:25,代码来源:__init__.py
示例12: run_subtest
def run_subtest(sub_test):
"""
Run subtest(e.g. rng_bat,reboot,shutdown) when it's not None
:param sub_test: subtest name
"""
error_context.context("Run %s subtest" % sub_test)
utils_test.run_virt_sub_test(test, params, env, sub_test)
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:7,代码来源:rng_hotplug.py
示例13: virtio_serial_login
def virtio_serial_login(self, port='vs1'):
error_context.context("Try to login guest via '%s'" % port,
logging.info)
username = self.params.get("username")
password = self.params.get("password")
prompt = self.params.get("shell_prompt", "[#$]")
linesep = eval("'%s'" % self.params.get("shell_linesep", r"\n"))
for vport in self.get_virtio_ports(self.vm)[1]:
if vport.name == port:
break
vport = None
if not vport:
self.test.error("Not virtio serial port '%s' found" % port)
logfile = "serial-%s-%s.log" % (vport.name, self.vm.name)
socat_cmd = "nc -U %s" % vport.hostfile
session = aexpect.ShellSession(socat_cmd, auto_close=False,
output_func=utils_misc.log_line,
output_params=(logfile,),
prompt=prompt)
session.set_linesep(linesep)
session.sendline()
self.__sessions__.append(session)
try:
remote.handle_prompts(session, username, password, prompt, 180)
self.test.fail("virtio serial '%s' should no " % port +
"channel to login")
except remote.LoginTimeoutError:
self.__sessions__.append(session)
logging.info("Can't login via %s" % port)
return session
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:31,代码来源:virtio_port_login.py
示例14: pre_step
def pre_step(self):
error_context.context("Config guest and reboot it", logging.info)
pre_cmd = self.params.get("pre_cmd") + self.params.get("pre_cmd_extra")
session = self.vm.wait_for_login(timeout=360)
session.cmd(pre_cmd, timeout=240)
session = self.vm.reboot(session=session, timeout=900, serial=False)
self.__sessions__.append(session)
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:7,代码来源:virtio_port_login.py
示例15: memory_check
def memory_check(self, step, ballooned_mem):
"""
Check memory status according expect values
:param step: the check point string
:type step: string
:param ballooned_mem: ballooned memory in current step
:type ballooned_mem: int
:return: memory size get from monitor and guest
:rtype: tuple
"""
error_context.context("Check memory status %s" % step, logging.info)
mmem = self.get_ballooned_memory()
gmem = self.get_memory_status()
gcompare_threshold = int(self.params.get("guest_compare_threshold", 100))
# for windows illegal test:set windows guest balloon in (1,100),free memory will less than 150M
if ballooned_mem >= self.ori_mem - 100:
timeout = float(self.params.get("login_timeout", 600))
session = self.vm.wait_for_login(timeout=timeout)
try:
if self.get_win_mon_free_mem(session) > 150:
self.test.fail("Balloon_min test failed %s" % step)
finally:
session.close()
else:
guest_ballooned_mem = abs(gmem - self.ori_gmem)
if (abs(mmem - self.ori_mem) != ballooned_mem or
(abs(guest_ballooned_mem - ballooned_mem) > gcompare_threshold)):
self.error_report(step, self.ori_mem - ballooned_mem, mmem, gmem)
raise exceptions.TestFail("Balloon test failed %s" % step)
return (mmem, gmem)
开发者ID:ldoktor,项目名称:tp-qemu,代码行数:31,代码来源:balloon_check.py
示例16: run
def run(test, params, env):
"""
monitor_cmds_check test:
1). bootup vm with human and qmp monitor
2). check commands in black_list is unavaliable in monitor
:param test: Qemu test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
Notes:
Please run this test with qemu/control.kernel-version to ensure it
only run when requried package installed;
"""
def is_supported(cmd):
try:
vm.monitor.verify_supported_cmd(cmd)
return True
except qemu_monitor.MonitorNotSupportedCmdError:
return False
vm = env.get_vm(params["main_vm"])
vm.verify_alive()
protocol = vm.monitor.protocol
black_cmds = params.get("black_cmds", "").split()
error_context.context("Verify black commands are unavaliable in "
"'%s' monitor" % protocol, logging.info)
logging.info("Black commands: %s" % black_cmds)
cmds = [cmd for cmd in black_cmds if is_supported(cmd)]
if cmds:
msg = "Unexpected commands %s found in %s monitor" % (cmds, protocol)
test.fail(msg)
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:34,代码来源:monitor_cmds_check.py
示例17: check_interrupt
def check_interrupt(session, vectors):
error_context.context("Check the cpu interrupt of virito",
logging.info)
vectors = int(vectors)
irq_check_cmd = params["irq_check_cmd"]
output = session.cmd_output(irq_check_cmd).strip()
if vectors == 0 or vectors == 1:
if not (re.findall("IO-APIC.*fasteoi|XICS.*Level|XIVE.*Level",
output)):
msg = "Could not find interrupt controller for virito device"
msg += " when vectors = %d" % vectors
test.fail(msg)
elif 2 <= vectors and vectors <= 8:
if not re.findall("virtio[0-9]-virtqueues", output):
msg = "Could not find the virtio device for MSI-X interrupt"
msg += " when vectors = %d " % vectors
msg += "Command %s got output %s" % (irq_check_cmd, output)
test.fail(msg)
elif vectors == 9 or vectors == 10:
if not (re.findall("virtio[0-9]-input", output) and
re.findall("virtio[0-9]-output", output)):
msg = "Could not find the virtio device for MSI-X interrupt"
msg += " when vectors = %d " % vectors
msg += "Command %s got output %s" % (irq_check_cmd, output)
test.fail(msg)
开发者ID:ldoktor,项目名称:tp-qemu,代码行数:25,代码来源:boot_with_different_vectors.py
示例18: lv_take_snapshot
def lv_take_snapshot(vg_name, lv_name,
lv_snapshot_name, lv_snapshot_size):
"""
Take a snapshot of the original logical volume.
"""
error_context.context("Taking snapshot from original logical volume",
logging.info)
if not vg_check(vg_name):
raise exceptions.TestError("Volume group could not be found")
if lv_check(vg_name, lv_snapshot_name):
raise exceptions.TestError("Snapshot already exists")
if not lv_check(vg_name, lv_name):
raise exceptions.TestError("Snapshot's origin could not be found")
cmd = ("lvcreate --size " + lv_snapshot_size + " --snapshot " +
" --name " + lv_snapshot_name + " /dev/" + vg_name + "/" + lv_name)
try:
result = process.run(cmd)
except process.CmdError as ex:
if ('Logical volume "%s" already exists in volume group "%s"' %
(lv_snapshot_name, vg_name) in results_stderr_52lts(ex.result) and
re.search(re.escape(lv_snapshot_name + " [active]"),
results_stdout_52lts(process.run("lvdisplay")))):
# the above conditions detect if merge of snapshot was postponed
logging.warning(("Logical volume %s is still active! " +
"Attempting to deactivate..."), lv_name)
lv_reactivate(vg_name, lv_name)
result = process.run(cmd)
else:
raise ex
logging.info(results_stdout_52lts(result).rstrip())
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:32,代码来源:lv_utils.py
示例19: ethtool_set
def ethtool_set(session, status):
"""
Set ethernet device offload status
:param status: New status will be changed to
"""
txt = "Set offload status for device "
txt += "'%s': %s" % (ethname, str(status))
error_context.context(txt, logging.info)
cmd = "ethtool -K %s " % ethname
cmd += " ".join([o + ' ' + s for o, s in status.items()])
err_msg = "Failed to set offload status for device '%s'" % ethname
try:
session.cmd_output_safe(cmd)
except aexpect.ShellCmdError as e:
logging.error("%s, detail: %s", err_msg, e)
return False
curr_status = dict((k, v) for k, v in ethtool_get(session).items()
if k in status.keys())
if curr_status != status:
logging.error("%s, got: '%s', expect: '%s'", err_msg,
str(curr_status), str(status))
return False
return True
开发者ID:ldoktor,项目名称:tp-qemu,代码行数:27,代码来源:ethtool.py
示例20: fsthaw
def fsthaw(self, check_status=True):
"""
Thaw File system on guest.
:param check_status: Force this function to check the fsfreeze status
before/after sending cmd.
:return: Thaw FS number if cmd succeed, -1 if guest agent doesn't
support fsfreeze cmd.
"""
error_context.context("thaw all FS in guest '%s'" % self.vm.name)
if check_status:
self.verify_fsfreeze_status(self.FSFREEZE_STATUS_FROZEN)
cmd = "guest-fsfreeze-thaw"
if self.check_has_command(cmd):
ret = self.cmd(cmd=cmd)
if check_status:
try:
self.verify_fsfreeze_status(self.FSFREEZE_STATUS_THAWED)
# pylint: disable=E0712
except VAgentFreezeStatusError:
# When the status is incorrect, reset fsfreeze status to
# 'thawed'.
self.cmd(cmd=cmd)
raise
return ret
return -1
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:27,代码来源:guest_agent.py
注:本文中的virttest.error_context.context函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论