def recover(self, params=None):
"""
Recover test environment
"""
if self.selinux_enforcing:
utils_selinux.set_status("enforcing")
if virsh.domain_exists(self.vm_name):
virsh.remove_domain(self.vm_name)
image_file = params.get("image_name")
if os.path.exists(image_file):
os.remove(image_file)
if os.path.isdir(self.image_path):
os.rmdir(self.image_path)
self.env.unregister_vm(self.vm_name)
def restore(self, name):
info = name
if info['name'] == 'libvirtd':
if info['status'] == 'running':
if not self.libvirtd.start():
raise Exception('Failed to start libvirtd')
elif info['status'] == 'stopped':
if not self.libvirtd.stop():
raise Exception('Failed to stop libvirtd')
else:
raise Exception('Unknown libvirtd status %s' % info['status'])
elif info['name'] == 'selinux':
utils_selinux.set_status(info['status'])
else:
raise Exception('Unknown service %s' % info['name'])
def state_test():
states = [ServiceState(), FileState(), DirState(), DomainState(),
NetworkState(), PoolState(), SecretState(), MountState()]
for state in states:
state.backup()
utils.run('echo hello > /etc/exports')
virsh.start('virt-tests-vm1')
virsh.net_autostart('default', '--disable')
virsh.pool_destroy('mount')
utils.run('rm /var/lib/virt_test/images/hello')
utils.run('mkdir /var/lib/virt_test/images/hi')
utils_libvirtd.Libvirtd().stop()
utils_selinux.set_status('permissive')
for state in states:
lines = state.check(recover=True)
for line in lines:
print line
def setup_or_cleanup_nfs(is_setup, mount_dir="", is_mount=False,
export_options="rw,no_root_squash",
mount_src="nfs-export"):
"""
Set up or clean up nfs service on localhost.
:param is_setup: Boolean value, true for setup, false for cleanup
:param mount_dir: NFS mount point
:param is_mount: Boolean value, true for mount, false for umount
:param export_options: options for nfs dir
:return: export nfs path or nothing
"""
tmpdir = os.path.join(data_dir.get_root_dir(), 'tmp')
if not os.path.isabs(mount_src):
mount_src = os.path.join(tmpdir, mount_src)
if not mount_dir:
mount_dir = os.path.join(tmpdir, 'nfs-mount')
nfs_params = {"nfs_mount_dir": mount_dir, "nfs_mount_options": "rw",
"nfs_mount_src": mount_src, "setup_local_nfs": "yes",
"export_options": "rw,no_root_squash"}
_nfs = nfs.Nfs(nfs_params)
# Set selinux to permissive that the file in nfs
# can be used freely
if utils_misc.selinux_enforcing():
sv_status = utils_selinux.get_status()
utils_selinux.set_status("permissive")
if is_setup:
_nfs.setup()
if not is_mount:
_nfs.umount()
return mount_src
else:
_nfs.unexportfs_in_clean = True
_nfs.cleanup()
return ""
def run(test, params, env):
"""
Test DAC setting in both domain xml and qemu.conf.
(1) Init variables for test.
(2) Set VM xml and qemu.conf with proper DAC label, also set
monitor socket parent dir with propoer ownership and mode.
(3) Start VM and check the context.
"""
# Get general variables.
status_error = ('yes' == params.get("status_error", 'no'))
host_sestatus = params.get("host_selinux", "enforcing")
# Get variables about seclabel for VM.
sec_type = params.get("vm_sec_type", "dynamic")
vm_sec_model = params.get("vm_sec_model", "dac")
vm_sec_label = params.get("vm_sec_label", None)
vm_sec_relabel = params.get("vm_sec_relabel", "yes")
sec_dict = {'type': sec_type, 'model': vm_sec_model,
'relabel': vm_sec_relabel}
if vm_sec_label:
sec_dict['label'] = vm_sec_label
set_qemu_conf = "yes" == params.get("set_qemu_conf", "no")
# Get per-img seclabel variables
disk_type = params.get("disk_type")
disk_target = params.get('disk_target')
disk_src_protocol = params.get("disk_source_protocol")
vol_name = params.get("vol_name")
tmp_dir = data_dir.get_tmp_dir()
pool_name = params.get("pool_name", "gluster-pool")
brick_path = os.path.join(tmp_dir, pool_name)
invalid_label = 'yes' == params.get("invalid_label", "no")
relabel = params.get("per_img_sec_relabel")
sec_label = params.get("per_img_sec_label")
per_sec_model = params.get("per_sec_model", 'dac')
per_img_dict = {'sec_model': per_sec_model, 'relabel': relabel,
'sec_label': sec_label}
params.update(per_img_dict)
# Get qemu.conf config variables
qemu_user = params.get("qemu_user", 'qemu')
qemu_group = params.get("qemu_group", 'qemu')
dynamic_ownership = "yes" == params.get("dynamic_ownership", "yes")
# Get variables about VM and get a VM object and VMXML instance.
vm_name = params.get("main_vm")
vm = env.get_vm(vm_name)
vmxml = VMXML.new_from_inactive_dumpxml(vm_name)
backup_xml = vmxml.copy()
# Set selinux of host.
backup_sestatus = utils_selinux.get_status()
utils_selinux.set_status(host_sestatus)
qemu_sock_mod = False
qemu_sock_path = '/var/lib/libvirt/qemu/'
qemu_conf = utils_config.LibvirtQemuConfig()
libvirtd = utils_libvirtd.Libvirtd()
try:
if set_qemu_conf:
# Set qemu.conf for user and group
if qemu_user:
qemu_conf.user = qemu_user
if qemu_group:
qemu_conf.group = qemu_group
if dynamic_ownership:
qemu_conf.dynamic_ownership = 1
else:
qemu_conf.dynamic_ownership = 0
logging.debug("the qemu.conf content is: %s" % qemu_conf)
libvirtd.restart()
st = os.stat(qemu_sock_path)
if not bool(st.st_mode & stat.S_IWGRP):
# chmod g+w
os.chmod(qemu_sock_path, st.st_mode | stat.S_IWGRP)
qemu_sock_mod = True
# Set the context of the VM.
logging.debug("sec_dict is %s" % sec_dict)
vmxml.set_seclabel([sec_dict])
vmxml.sync()
# Get per-image seclabel in id string
if sec_label:
per_img_usr, per_img_grp = sec_label.split(':')
sec_label_id = format_user_group_str(per_img_usr, per_img_grp)
# Start VM to check the qemu process and image.
try:
# Set per-img sec context and start vm
utlv.set_vm_disk(vm, params)
# Start VM successfully.
if status_error:
if invalid_label:
# invalid label should fail, more info in bug 1165485
raise error.TestNAError("The label or model not valid, "
"check more info in bug: https://"
"bugzilla.redhat.com/show_bug.cgi"
"?id=1165485")
else:
raise error.TestFail("Test succeeded in negative case.")
#.........这里部分代码省略.........
def run(test, params, env):
"""
Test svirt in adding disk to VM.
(1).Init variables for test.
(2).Config qemu conf if need
(3).Label the VM and disk with proper label.
(4).Start VM and check the context.
(5).Destroy VM and check the context.
"""
# Get general variables.
status_error = ('yes' == params.get("status_error", 'no'))
host_sestatus = params.get("svirt_start_destroy_host_selinux", "enforcing")
# Get variables about seclabel for VM.
sec_type = params.get("svirt_start_destroy_vm_sec_type", "dynamic")
sec_model = params.get("svirt_start_destroy_vm_sec_model", "selinux")
sec_label = params.get("svirt_start_destroy_vm_sec_label", None)
security_driver = params.get("security_driver", None)
security_default_confined = params.get("security_default_confined", None)
security_require_confined = params.get("security_require_confined", None)
no_sec_model = 'yes' == params.get("no_sec_model", 'no')
sec_relabel = params.get("svirt_start_destroy_vm_sec_relabel", "yes")
sec_dict = {'type': sec_type, 'relabel': sec_relabel}
sec_dict_list = []
if not no_sec_model:
if "," in sec_model:
sec_models = sec_model.split(",")
for model in sec_models:
sec_dict['model'] = model
if sec_type != "none":
sec_dict['label'] = sec_label
sec_dict_copy = sec_dict.copy()
sec_dict_list.append(sec_dict_copy)
else:
sec_dict['model'] = sec_model
if sec_type != "none":
sec_dict['label'] = sec_label
sec_dict_list.append(sec_dict)
else:
sec_dict_list.append(sec_dict)
logging.debug("sec_dict_list is: %s" % sec_dict_list)
poweroff_with_destroy = ("destroy" == params.get(
"svirt_start_destroy_vm_poweroff", "destroy"))
# Get variables about VM and get a VM object and VMXML instance.
vm_name = params.get("main_vm")
vm = env.get_vm(vm_name)
vmxml = VMXML.new_from_inactive_dumpxml(vm_name)
backup_xml = vmxml.copy()
# Get varialbles about image.
img_label = params.get('svirt_start_destroy_disk_label')
# Backup disk Labels.
disks = vm.get_disk_devices()
backup_labels_of_disks = {}
backup_ownership_of_disks = {}
for disk in disks.values():
disk_path = disk['source']
backup_labels_of_disks[disk_path] = utils_selinux.get_context_of_file(
filename=disk_path)
f = os.open(disk_path, 0)
stat_re = os.fstat(f)
backup_ownership_of_disks[disk_path] = "%s:%s" % (stat_re.st_uid,
stat_re.st_gid)
# Backup selinux of host.
backup_sestatus = utils_selinux.get_status()
qemu_conf = utils_config.LibvirtQemuConfig()
libvirtd = utils_libvirtd.Libvirtd()
try:
# Set disk label
for disk in disks.values():
disk_path = disk['source']
utils_selinux.set_context_of_file(filename=disk_path,
context=img_label)
os.chown(disk_path, 107, 107)
# Set selinux of host.
utils_selinux.set_status(host_sestatus)
# Set qemu conf
if security_driver:
qemu_conf.set_string('security_driver', security_driver)
if security_default_confined:
qemu_conf.security_default_confined = security_default_confined
if security_require_confined:
qemu_conf.security_require_confined = security_require_confined
if (security_driver or security_default_confined or
security_require_confined):
logging.debug("the qemu.conf content is: %s" % qemu_conf)
libvirtd.restart()
# Set the context of the VM.
vmxml.set_seclabel(sec_dict_list)
vmxml.sync()
logging.debug("the domain xml is: %s" % vmxml.xmltreefile)
# Start VM to check the VM is able to access the image or not.
try:
vm.start()
#.........这里部分代码省略.........
def run(test, params, env):
"""
Test svirt in adding disk to VM.
(1).Init variables for test.
(2).Label the VM and disk with proper label.
(3).Start VM and check the context.
(4).Destroy VM and check the context.
"""
# Get general variables.
status_error = ('yes' == params.get("status_error", 'no'))
host_sestatus = params.get(
"svirt_undefine_define_host_selinux", "enforcing")
# Get variables about seclabel for VM.
sec_type = params.get("svirt_undefine_define_vm_sec_type", "dynamic")
sec_model = params.get("svirt_undefine_define_vm_sec_model", "selinux")
sec_label = params.get("svirt_undefine_define_vm_sec_label", None)
sec_relabel = params.get("svirt_undefine_define_vm_sec_relabel", "yes")
sec_dict = {'type': sec_type, 'model': sec_model, 'label': sec_label,
'relabel': sec_relabel}
# Get variables about VM and get a VM object and VMXML instance.
vm_name = params.get("main_vm")
vm = env.get_vm(vm_name)
vmxml = VMXML.new_from_inactive_dumpxml(vm_name)
backup_xml = vmxml.copy()
# Get varialbles about image.
img_label = params.get('svirt_undefine_define_disk_label')
# Label the disks of VM with img_label.
disks = vm.get_disk_devices()
backup_labels_of_disks = {}
for disk in disks.values():
disk_path = disk['source']
backup_labels_of_disks[disk_path] = utils_selinux.get_context_of_file(
filename=disk_path)
utils_selinux.set_context_of_file(filename=disk_path,
context=img_label)
# Set selinux of host.
backup_sestatus = utils_selinux.get_status()
utils_selinux.set_status(host_sestatus)
# Set the context of the VM.
vmxml.set_seclabel([sec_dict])
vmxml.sync()
try:
xml_file = (os.path.join(data_dir.get_tmp_dir(), "vmxml"))
if vm.is_alive():
vm.destroy()
virsh.dumpxml(vm.name, to_file=xml_file)
cmd_result = virsh.undefine(vm.name)
if cmd_result.exit_status:
raise error.TestFail("Failed to undefine vm."
"Detail: %s" % cmd_result)
cmd_result = virsh.define(xml_file)
if cmd_result.exit_status:
raise error.TestFail("Failed to define vm."
"Detail: %s" % cmd_result)
finally:
# clean up
for path, label in backup_labels_of_disks.items():
utils_selinux.set_context_of_file(filename=path, context=label)
backup_xml.sync()
utils_selinux.set_status(backup_sestatus)
debug=True)
try:
virsh.detach_disk(vm_name, target="vdf", extra="--persistent",
debug=True)
except error.CmdError:
raise error.TestFail("Detach disk 'vdf' from VM %s failed."
% vm.name)
finally:
# clean up
vm.destroy()
qemu_conf.restore()
if snapshot_name:
backup_xml.sync("--snapshots-metadata")
else:
backup_xml.sync()
for i in disk_snap_path:
if i and os.path.exists(i):
os.unlink(i)
for path, label in backup_labels_of_disks.items():
label_list = label.split(":")
os.chown(path, int(label_list[0]), int(label_list[1]))
if pvt:
try:
pvt.cleanup_pool(pool_name, pool_type, pool_target,
emulated_image)
except error.TestFail, detail:
logging.error(str(detail))
utils_selinux.set_status(backup_sestatus)
libvirtd.restart()
def run_svirt_attach_disk(test, params, env):
"""
Test svirt in adding disk to VM.
(1).Init variables for test.
(2).Create a image to attached to VM.
(3).Attach disk.
(4).Start VM and check result.
"""
# Get general variables.
status_error = ('yes' == params.get("status_error", 'no'))
host_sestatus = params.get("svirt_attach_disk_host_selinux", "enforcing")
# Get variables about seclabel for VM.
sec_type = params.get("svirt_attach_disk_vm_sec_type", "dynamic")
sec_model = params.get("svirt_attach_disk_vm_sec_model", "selinux")
sec_label = params.get("svirt_attach_disk_vm_sec_label", None)
sec_relabel = params.get("svirt_attach_disk_vm_sec_relabel", "yes")
sec_dict = {'type': sec_type, 'model': sec_model, 'label': sec_label,
'relabel': sec_relabel}
# Get variables about VM and get a VM object and VMXML instance.
vm_name = params.get("main_vm")
vm = env.get_vm(vm_name)
vmxml = VMXML.new_from_inactive_dumpxml(vm_name)
backup_xml = vmxml.copy()
# Get varialbles about image.
img_label = params.get('svirt_attach_disk_disk_label')
img_name = "svirt_disk"
# Default label for the other disks.
# To ensure VM is able to access other disks.
default_label = params.get('svirt_attach_disk_disk_default_label', None)
# Set selinux of host.
backup_sestatus = utils_selinux.get_status()
utils_selinux.set_status(host_sestatus)
# Set the default label to other disks of vm.
disks = vm.get_disk_devices()
for disk in disks.values():
utils_selinux.set_context_of_file(filename=disk['source'],
context=default_label)
# Init a QemuImg instance.
params['image_name'] = img_name
tmp_dir = data_dir.get_tmp_dir()
image = qemu_storage.QemuImg(params, tmp_dir, img_name)
# Create a image.
img_path, result = image.create(params)
# Set the context of the image.
utils_selinux.set_context_of_file(filename=img_path, context=img_label)
# Set the context of the VM.
vmxml.set_seclabel(sec_dict)
vmxml.sync()
# Do the attach action.
try:
virsh.attach_disk(vm_name, source=img_path, target="vdf",
extra="--persistent", ignore_status=False)
except error.CmdError:
raise error.TestFail("Attach disk %s to vdf on VM %s failed."
% (img_path, vm.name))
# Check result.
try:
# Start VM to check the VM is able to access the image or not.
try:
vm.start()
# Start VM successfully.
# VM with set seclabel can access the image with the
# set context.
if status_error:
raise error.TestFail('Test successed in negative case.')
except virt_vm.VMStartError, e:
# Starting VM failed.
# VM with set seclabel can not access the image with the
# set context.
if not status_error:
raise error.TestFail("Test failed in positive case."
"error: %s" % e)
finally:
# clean up
try:
virsh.detach_disk(vm_name, target="vdf", extra="--persistent",
ignore_status=False)
except error.CmdError:
raise error.TestFail("Detach disk 'vdf' from VM %s failed."
% vm.name)
image.remove()
backup_xml.sync()
utils_selinux.set_status(backup_sestatus)
def run(test, params, env):
"""
Test DAC setting in both domain xml and qemu.conf.
(1) Init variables for test.
(2) Set VM xml and qemu.conf with proper DAC label, also set image and
monitor socket parent dir with propoer ownership and mode.
(3) Start VM and check the context.
(4) Destroy VM and check the context.
"""
# Get general variables.
status_error = ('yes' == params.get("status_error", 'no'))
host_sestatus = params.get("dac_start_destroy_host_selinux", "enforcing")
qemu_group_user = "yes" == params.get("qemu_group_user", "no")
# Get variables about seclabel for VM.
sec_type = params.get("dac_start_destroy_vm_sec_type", "dynamic")
sec_model = params.get("dac_start_destroy_vm_sec_model", "dac")
sec_label = params.get("dac_start_destroy_vm_sec_label", None)
sec_relabel = params.get("dac_start_destroy_vm_sec_relabel", "yes")
security_default_confined = params.get("security_default_confined", None)
set_process_name = params.get("set_process_name", None)
sec_dict = {'type': sec_type, 'model': sec_model, 'relabel': sec_relabel}
if sec_label:
sec_dict['label'] = sec_label
set_sec_label = "yes" == params.get("set_sec_label", "no")
set_qemu_conf = "yes" == params.get("set_qemu_conf", "no")
# Get qemu.conf config variables
qemu_user = params.get("qemu_user", None)
qemu_group = params.get("qemu_group", None)
dynamic_ownership = "yes" == params.get("dynamic_ownership", "yes")
# Get variables about VM and get a VM object and VMXML instance.
vm_name = params.get("main_vm")
vm = env.get_vm(vm_name)
vmxml = VMXML.new_from_inactive_dumpxml(vm_name)
backup_xml = vmxml.copy()
# Get varialbles about image.
img_label = params.get('dac_start_destroy_disk_label')
# Label the disks of VM with img_label.
disks = vm.get_disk_devices()
backup_labels_of_disks = {}
qemu_disk_mod = False
for disk in disks.values():
disk_path = disk['source']
f = os.open(disk_path, 0)
stat_re = os.fstat(f)
backup_labels_of_disks[disk_path] = "%s:%s" % (stat_re.st_uid,
stat_re.st_gid)
label_list = img_label.split(":")
os.chown(disk_path, int(label_list[0]), int(label_list[1]))
os.close(f)
st = os.stat(disk_path)
if not bool(st.st_mode & stat.S_IWGRP):
# add group wirte mode to disk by chmod g+w
os.chmod(disk_path, st.st_mode | stat.S_IWGRP)
qemu_disk_mod = True
# Set selinux of host.
backup_sestatus = utils_selinux.get_status()
utils_selinux.set_status(host_sestatus)
def _create_user():
"""
Create a "vdsm_fake" in 'qemu' group for test
"""
logging.debug("create a user 'vdsm_fake' in 'qemu' group")
cmd = "useradd vdsm_fake -G qemu -s /sbin/nologin"
utils.run(cmd, ignore_status=False)
create_qemu_user = False
qemu_sock_mod = False
qemu_sock_path = '/var/lib/libvirt/qemu/'
qemu_conf = utils_config.LibvirtQemuConfig()
libvirtd = utils_libvirtd.Libvirtd()
try:
# Check qemu_group_user
if qemu_group_user:
if set_qemu_conf:
if "EXAMPLE" in qemu_user:
if not check_qemu_grp_user("vdsm_fake"):
_create_user()
create_qemu_user = True
qemu_user = "vdsm_fake"
qemu_group = "qemu"
if set_sec_label:
if sec_label:
if "EXAMPLE" in sec_label:
if not check_qemu_grp_user("vdsm_fake"):
_create_user()
create_qemu_user = True
sec_label = "vdsm_fake:qemu"
sec_dict['label'] = sec_label
st = os.stat(qemu_sock_path)
if not bool(st.st_mode & stat.S_IWGRP):
# chmod g+w
os.chmod(qemu_sock_path, st.st_mode | stat.S_IWGRP)
qemu_sock_mod = True
#.........这里部分代码省略.........
请发表评论