def reset_env(vm_name, xml_file):
"""
Reset env
:param vm_name: the vm name
:xml_file: domain xml file
"""
virsh.destroy(vm_name)
virsh.undefine(vm_name)
virsh.define(xml_file)
if os.path.exists(xml_file):
os.remove(xml_file)
def restore(self, name):
dom = name
name = dom['name']
doms = self.current_state
if name in doms:
self.remove(doms[name])
domfile = tempfile.NamedTemporaryFile(delete=False)
fname = domfile.name
domfile.writelines(dom['inactive xml'])
domfile.close()
try:
if dom['persistent'] == 'yes':
res = virsh.define(fname)
if res.exit_status:
raise Exception(str(res))
if dom['state'] != 'shut off':
res = virsh.start(name)
if res.exit_status:
raise Exception(str(res))
else:
res = virsh.create(fname)
if res.exit_status:
raise Exception(str(res))
finally:
os.remove(fname)
if dom['autostart'] == 'enable':
res = virsh.autostart(name, '')
if res.exit_status:
raise Exception(str(res))
def enable_normal_boot(vmxml, check_points, define_error, test):
"""
Undefine/Define VM and check the result
:param vmxml: The instance of VMXML class
:param check_points: The list of check points of result
:param define_error: The define error status
:param test: Avocado test object
"""
logging.debug("Boot guest in normal mode:\n%s",
open(vmxml.xml).read())
vmxml.undefine(options="--nvram")
ret = virsh.define(vmxml.xml)
if ret.exit_status:
if define_error:
utlv.check_result(ret, expected_fails=check_points)
else:
test.fail("Failed to define VM from %s" % vmxml.xml)
def run_virsh_autostart(test, params, env):
"""
Test command: virsh autostart
Set(or disable) autostart for a domain
"""
vm_name = params.get("main_vm")
vm = env.get_vm(vm_name)
persistent_vm = "yes" == params.get("persistent_vm", "yes")
readonly_mode = "yes" == params.get("readonly_mode", "no")
autostart_vm = "yes" == params.get("autostart_vm", "no")
autostart_extra = params.get("autostart_extra", "")
status_error = "yes" == params.get("status_error", "no")
# Prepare transient/persistent vm
original_xml = vm.backup_xml()
if not persistent_vm and vm.is_persistent():
vm.undefine()
elif persistent_vm and not vm.is_persistent():
vm.define(original_xml)
original_autost = vm.is_autostart()
logging.debug("Original VM %s autostart: %s", vm_name, original_autost)
options = " "
if not autostart_vm:
options = "--disable "
if autostart_extra:
options += autostart_extra
# Readonly mode
ro_flag = False
if readonly_mode:
ro_flag = True
# Result check
def autostart_check():
"""
Check if the VM autostart
"""
res = False
if autostart_vm and vm.is_autostart() and vm.is_alive():
logging.debug("VM autostart as expected")
res = True
if not autostart_vm and not vm.is_autostart() and vm.is_dead():
logging.debug("VM not autostart as expected")
res = True
return res
# Run test
try:
# Make sure the VM is inactive(except transient VM)
if vm.is_persistent() and vm.is_alive():
vm.destroy()
cmd_result = virsh.autostart(vm_name, options, ignore_status=True, debug=True, readonly=ro_flag)
err = cmd_result.stderr.strip()
status = cmd_result.exit_status
# Restart libvirtd and sleep 2
utils_libvirtd.libvirtd_restart()
if not status_error:
if status:
raise error.TestFail(err)
elif not autostart_check():
raise error.TestFail("Autostart check fail")
elif status_error and status == 0:
raise error.TestFail("Expect fail, but run successfully.")
finally:
# Recover env
vm.destroy()
if not vm.is_persistent():
virsh.define(original_xml)
os.remove(original_xml)
if original_autost and not vm.is_autostart():
virsh.autostart(vm_name, "")
elif not original_autost and vm.is_autostart():
virsh.autostart(vm_name, "--disable")
def run(test, params, env):
"""
Test extended TSEG on Q35 machine types
<smm state='on'>
<tseg unit='MiB'>48</tseg>
</smm>
Steps:
1) Edit VM xml for smm or tseg sub element
2) Verify if Guest can boot as expected
3) On i440 machine types, the property does not support.
On Q35 machine types, both Seabios and OVMF Guest can bootup
"""
vm_name = params.get("main_vm", "")
vm = env.get_vm(vm_name)
smm_state = params.get("smm_state", "off")
unit = params.get("tseg_unit")
size = params.get("tseg_size")
boot_type = params.get("boot_type", "")
loader_type = params.get("loader_type")
loader = params.get("loader")
err_msg = params.get("error_msg", "")
vm_arch_name = params.get("vm_arch_name", "x86_64")
status_error = ("yes" == params.get("status_error", "no"))
if not libvirt_version.version_compare(4, 5, 0):
test.cancel("TSEG does not support in "
"current libvirt version")
if (boot_type == "seabios" and
not utils_package.package_install('seabios-bin')):
test.cancel("Failed to install Seabios")
if (boot_type == 'ovmf' and
not utils_package.package_install('OVMF')):
test.cancel("Failed to install OVMF")
# Back VM XML
v_xml_backup = vm_xml.VMXML.new_from_dumpxml(vm_name)
v_xml = vm_xml.VMXML.new_from_dumpxml(vm_name)
try:
# Specify boot loader for OVMF
if boot_type == 'ovmf':
os_xml = v_xml.os
os_xml.loader_type = loader_type
os_xml.loader = loader
os_xml.loader_readonly = "yes"
v_xml.os = os_xml
try:
features_xml = v_xml.features
except xcepts.LibvirtXMLNotFoundError:
if vm_arch_name == 'x86_64':
# ACPI is required for UEFI on x86_64
v_xml.xmltreefile.create_by_xpath("/features/acpi")
features_xml = v_xml.features
else:
features_xml = vm_xml.VMFeaturesXML()
features_xml.smm = smm_state
if unit and size:
features_xml.smm_tseg_unit = unit
features_xml.smm_tseg = size
v_xml.features = features_xml
logging.debug("New VM XML is:\n%s", v_xml)
ret = virsh.define(v_xml.xml)
utlv.check_result(ret, expected_fails=err_msg)
# Check result
if not status_error:
vm.start()
if unit and size:
# If tseg unit is KiB, convert it to MiB
# as vm dumpxml convert it automatically
if unit == 'KiB':
unit, size = unify_to_MiB(unit, size)
expect_line = "<tseg unit=\"%s\">%s</tseg>" % (unit, size)
utlv.check_dumpxml(vm, expect_line)
# Qemu cmdline use mbytes
unit, tseg_mbytes = unify_to_MiB(unit, size)
expect_line = '-global mch.extended-tseg-mbytes=%s' % size
utlv.check_qemu_cmd_line(expect_line)
finally:
logging.debug("Restore the VM XML")
if vm.is_alive():
vm.destroy()
# OVMF enable nvram by default
v_xml_backup.sync(options="--nvram")
# Check disk count after VM shutdown (with --config).
check_count_after_shutdown = True
disk_count_after_shutdown = vm_xml.VMXML.get_disk_count(vm_name)
if test_cmd == "attach-disk":
if disk_count_after_shutdown == disk_count_before_cmd:
check_count_after_shutdown = False
elif test_cmd == "detach-disk":
if disk_count_after_shutdown < disk_count_before_cmd:
check_count_after_shutdown = False
# Recover VM.
if vm.is_alive():
vm.destroy(gracefully=False)
virsh.undefine(vm_name)
virsh.define(vm_xml_file)
if os.path.exists(device_source):
os.remove(device_source)
# Check results.
if status_error == 'yes':
if status == 0:
raise error.TestFail("virsh %s exit with unexpected value."
% test_cmd)
else:
if status != 0:
raise error.TestFail("virsh %s failed." % test_cmd)
if test_cmd == "attach-disk":
if not check_count_after_cmd:
raise error.TestFail("Cannot see deivce in xml file"
" after attach.")
if not utils_libvirtd.libvirtd_is_running():
raise error.TestFail("Libvirtd service is dead.")
# Check_result
try:
try:
if not status_error:
if status == 0:
check_xml(vm_name, target, dest_path, options)
if options.count("--bandwidth"):
utl.check_blockjob(vm_name, target, "bandwidth", bandwidth)
if options.count("--pivot") + options.count("--finish") == 0:
finish_job(vm_name, target, default_timeout)
if options.count("--raw"):
check_format(dest_path, "raw")
else:
raise error.TestFail(cmd_result.stderr)
else:
if status:
logging.debug("Expect error: %s", cmd_result.stderr)
else:
raise error.TestFail("Expect fail, but run successfully.")
except JobTimeout, excpt:
if not status_error:
raise error.TestFail("Run command failed: %s" % excpt)
finally:
if vm.is_alive():
vm.destroy()
virsh.define(original_xml)
if os.path.exists(dest_path):
os.remove(dest_path)
def run(test, params, env):
"""
Test command: virsh desc.
This command allows to show or modify description or title of a domain.
1). For running domain, get/set description&title with options.
2). For shut off domian, get/set description&title with options.
3). For persistent/transient domain, get/set description&title with options.
"""
vm_name = params.get("main_vm")
vm = env.get_vm(vm_name)
options = params.get("desc_option", "")
persistent_vm = params.get("persistent_vm", "yes")
def run_cmd(name, options, desc_str, status_error):
"""
Run virsh desc command
:return: cmd output
"""
cmd_result = virsh.desc(name, options, desc_str, ignore_status=True, debug=True)
output = cmd_result.stdout.strip()
err = cmd_result.stderr.strip()
status = cmd_result.exit_status
if status_error == "no" and status:
raise error.TestFail(err)
elif status_error == "yes" and status == 0:
raise error.TestFail("Expect fail, but run successfully.")
return output
def vm_state_switch():
"""
Switch the vm state
"""
if vm.is_dead():
vm.start()
if vm.is_alive():
vm.destroy()
def desc_check(name, desc_str, state_switch):
"""
Check the domain's description or title
"""
ret = False
if state_switch:
vm_state_switch()
output = run_cmd(name, "", "", "no")
if desc_str == output:
logging.debug("Domain desc check successfully.")
ret = True
else:
logging.error("Domain desc check fail.")
if state_switch:
vm_state_switch()
return ret
def run_test():
"""
Get/Set vm desc by running virsh desc command.
"""
status_error = params.get("status_error", "no")
desc_str = params.get("desc_str", "")
state_switch = False
# Test 1: get vm desc
run_cmd(vm_name, options, "", status_error)
# Test 2: set vm desc
if options.count("--config") and vm.is_persistent():
state_switch = True
if options.count("--live") and vm.state() == "shut off":
status_error = "yes"
if len(desc_str) == 0:
desc_str = "New Description/title for the %s vm" % vm.state()
logging.debug("Use the default desc message: %s", desc_str)
run_cmd(vm_name, options, desc_str, status_error)
desc_check(vm_name, desc_str, state_switch)
# Prepare transient/persistent vm
original_xml = vm.backup_xml()
if persistent_vm == "no" and vm.is_persistent():
vm.undefine()
elif persistent_vm == "yes" and not vm.is_persistent():
vm.define(original_xml)
try:
if vm.is_dead():
vm.start()
run_test()
# Recvoer the vm and shutoff it
if persistent_vm == "yes":
vm.define(original_xml)
vm.destroy()
run_test()
finally:
vm.destroy()
virsh.define(original_xml)
os.remove(original_xml)
def run(test, params, env):
"""
Test svirt in adding disk to VM.
(1).Init variables for test.
(2).Label the VM and disk with proper label.
(3).Start VM and check the context.
(4).Destroy VM and check the context.
"""
# Get general variables.
status_error = ('yes' == params.get("status_error", 'no'))
host_sestatus = params.get(
"svirt_undefine_define_host_selinux", "enforcing")
# Get variables about seclabel for VM.
sec_type = params.get("svirt_undefine_define_vm_sec_type", "dynamic")
sec_model = params.get("svirt_undefine_define_vm_sec_model", "selinux")
sec_label = params.get("svirt_undefine_define_vm_sec_label", None)
sec_relabel = params.get("svirt_undefine_define_vm_sec_relabel", "yes")
sec_dict = {'type': sec_type, 'model': sec_model, 'label': sec_label,
'relabel': sec_relabel}
# Get variables about VM and get a VM object and VMXML instance.
vm_name = params.get("main_vm")
vm = env.get_vm(vm_name)
vmxml = VMXML.new_from_inactive_dumpxml(vm_name)
backup_xml = vmxml.copy()
# Get varialbles about image.
img_label = params.get('svirt_undefine_define_disk_label')
# Label the disks of VM with img_label.
disks = vm.get_disk_devices()
backup_labels_of_disks = {}
for disk in disks.values():
disk_path = disk['source']
backup_labels_of_disks[disk_path] = utils_selinux.get_context_of_file(
filename=disk_path)
utils_selinux.set_context_of_file(filename=disk_path,
context=img_label)
# Set selinux of host.
backup_sestatus = utils_selinux.get_status()
utils_selinux.set_status(host_sestatus)
# Set the context of the VM.
vmxml.set_seclabel([sec_dict])
vmxml.sync()
try:
xml_file = (os.path.join(data_dir.get_tmp_dir(), "vmxml"))
if vm.is_alive():
vm.destroy()
virsh.dumpxml(vm.name, to_file=xml_file)
cmd_result = virsh.undefine(vm.name)
if cmd_result.exit_status:
raise error.TestFail("Failed to undefine vm."
"Detail: %s" % cmd_result)
cmd_result = virsh.define(xml_file)
if cmd_result.exit_status:
raise error.TestFail("Failed to define vm."
"Detail: %s" % cmd_result)
finally:
# clean up
for path, label in backup_labels_of_disks.items():
utils_selinux.set_context_of_file(filename=path, context=label)
backup_xml.sync()
utils_selinux.set_status(backup_sestatus)
def run(test, params, env):
"""
Test for basic serial character device function.
1) Define the VM with specified serial device and define result meets
expectation.
2) Test whether defined XML meets expectation
3) Start the guest and check if start result meets expectation
4) Test the function of started serial device
5) Shutdown the VM and check whether cleaned up properly
6) Clean up environment
"""
def set_targets(serial):
"""
Prepare a serial device XML according to parameters
"""
machine = platform.machine()
if "ppc" in machine:
serial.target_model = 'spapr-vty'
serial.target_type = 'spapr-vio-serial'
elif "aarch" in machine:
serial.target_model = 'pl011'
serial.target_type = 'system-serial'
else:
serial.target_model = target_type
serial.target_type = target_type
def prepare_spice_graphics_device():
"""
Prepare a spice graphics device XML according to parameters
"""
graphic = Graphics(type_name='spice')
graphic.autoport = "yes"
graphic.port = "-1"
graphic.tlsPort = "-1"
return graphic
def prepare_serial_device():
"""
Prepare a serial device XML according to parameters
"""
local_serial_type = serial_type
if serial_type == "tls":
local_serial_type = "tcp"
serial = librarian.get('serial')(local_serial_type)
serial.target_port = "0"
set_targets(serial)
sources = []
logging.debug(sources_str)
for source_str in sources_str.split():
source_dict = {}
for att in source_str.split(','):
key, val = att.split(':')
source_dict[key] = val
sources.append(source_dict)
serial.sources = sources
return serial
def prepare_console_device():
"""
Prepare a serial device XML according to parameters
"""
local_serial_type = serial_type
if serial_type == "tls":
local_serial_type = "tcp"
console = librarian.get('console')(local_serial_type)
console.target_type = console_target_type
console.target_port = console_target_port
sources = []
logging.debug(sources_str)
for source_str in sources_str.split():
source_dict = {}
for att in source_str.split(','):
key, val = att.split(':')
source_dict[key] = val
sources.append(source_dict)
console.sources = sources
return console
def define_and_check():
"""
Predict the error message when defining and try to define the guest
with testing serial device.
"""
fail_patts = []
if serial_type in ['dev', 'file', 'pipe', 'unix'] and not any(
['path' in s for s in serial_dev.sources]):
fail_patts.append(r"Missing source path attribute for char device")
if serial_type in ['tcp'] and not any(
['host' in s for s in serial_dev.sources]):
fail_patts.append(r"Missing source host attribute for char device")
if serial_type in ['tcp', 'udp'] and not any(
['service' in s for s in serial_dev.sources]):
fail_patts.append(r"Missing source service attribute for char "
"device")
#.........这里部分代码省略.........
#.........这里部分代码省略.........
logging.debug(virsh.dumpxml(vm_name))
# Check if the iface with given mac address has been
# successfully detached
xml_after_detach = VMXML.new_from_dumpxml(vm_name)
iface_list_after_detach = [
iface for iface in xml_after_detach.get_devices('interface')
if iface.mac_address == mac
]
logging.debug('iface list after detach: %s', iface_list_after_detach)
if iface_list_after_detach:
test.fail('Failed to detach device: %s' % iface)
# Check again inside vm
session = vm.wait_for_serial_login()
if not utils_misc.wait_for(lambda: check_inside_vm(session, False),
timeout=60, step=5):
test.fail('Check interface inside vm failed,'
'interface not successfully detached:'
'found mac address %s' % mac)
session.close()
# Other test scenarios of pci/pcie
if case:
logging.debug('iface_kwargs: %s', iface_kwargs)
# Setting pcie-to-pci-bridge model name !=pcie-pci-bridge.
# or Invalid controller index for pcie-to-pci-bridge.
if case in ('wrong_model_name', 'invalid_index'):
pci_bridge = create_pci_device(pci_model, pci_model_name,
**pci_br_kwargs)
vmxml.add_device(pci_bridge)
result_to_check = virsh.define(vmxml.xml, debug=True)
# Attach device with invalid slot to pcie-to-pci-bridge
if case == 'attach_with_invalid_slot':
iface = create_iface(iface_model, iface_source, **iface_kwargs)
vmxml.add_device(iface)
result_to_check = virsh.define(vmxml.xml, debug=True)
# Test that pcie-to-pci-bridge has 31 available slots
if case == 'max_slots':
target_bus = cur_pci_br[0].index
target_bus = hex(int(target_bus))
logging.debug('target_bus: %s', target_bus)
# Attach 32 interfaces
for i in range(max_slots + 1):
logging.debug('address: %s', iface_kwargs['address'])
new_iface_kwargs = {'address': iface_kwargs['address']
% (target_bus, hex(i + 1))}
iface = create_iface(iface_model, iface_source,
**new_iface_kwargs)
logging.info('Attaching the %d th interface', i + 1)
result_in_loop = virsh.attach_device(
vm_name, iface.xml, flagstr='--config', debug=True)
# Attaching the 32rd interfaces will fail
if i == max_slots:
status_error = True
libvirt.check_exit_status(result_in_loop,
expect_error=status_error)
logging.debug(virsh.dumpxml(vm_name))
# Get all devices on pcie-to-pci-bridge from new xml
请发表评论