本文整理汇总了Python中virttest.data_dir.get_tmp_dir函数的典型用法代码示例。如果您正苦于以下问题:Python get_tmp_dir函数的具体用法?Python get_tmp_dir怎么用?Python get_tmp_dir使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_tmp_dir函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: file_exists
def file_exists(params, filename_path):
sg_uri = create_gluster_uri(params, stripped=True)
g_uri = create_gluster_uri(params, stripped=False)
# Using directly /tmp dir because directory should be really temporary and
# should be deleted immediately when no longer needed and
# created directory don't file tmp dir by any data.
tmpdir = "gmount-%s" % (utils_misc.generate_random_string(6))
tmpdir_path = os.path.join(data_dir.get_tmp_dir(), tmpdir)
while os.path.exists(tmpdir_path):
tmpdir = "gmount-%s" % (utils_misc.generate_random_string(6))
tmpdir_path = os.path.join(data_dir.get_tmp_dir(), tmpdir)
ret = False
try:
try:
os.mkdir(tmpdir_path)
glusterfs_mount(sg_uri, tmpdir_path)
mount_filename_path = os.path.join(tmpdir_path,
filename_path[len(g_uri):])
if os.path.exists(mount_filename_path):
ret = True
except Exception as e:
logging.error("Failed to mount gluster volume %s to"
" mount dir %s: %s" % (sg_uri, tmpdir_path, e))
finally:
if utils_misc.umount(sg_uri, tmpdir_path, "glusterfs", False,
"fuse.glusterfs"):
try:
os.rmdir(tmpdir_path)
except OSError:
pass
else:
logging.warning("Unable to unmount tmp directory %s with glusterfs"
" mount.", tmpdir_path)
return ret
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:34,代码来源:gluster.py
示例2: compose_disk_options
def compose_disk_options(test, params, opt_names):
"""
Compose the {disk,mem}spec options
The diskspec file need to add suitable dir with the name which is configed
individually, The 'value' after 'file=' is a parameter which also need to
get from cfg
:params: test & params: system parameters
:params: opt_names: params get from cfg of {disk,mem}spec options
"""
if "snapshot=no" in opt_names:
return opt_names
if opt_names.find("file=") >= 0:
opt_disk = opt_names.split("file=")
opt_list = opt_disk[1].split(",")
if len(opt_list) > 1:
left_opt = opt_list[1]
else:
left_opt = ""
if params.get("bad_disk") is not None or \
params.get("reuse_external") == "yes":
spec_disk = os.path.join(data_dir.get_tmp_dir(), params.get(opt_list[0]))
else:
spec_disk = os.path.join(data_dir.get_tmp_dir(), opt_list[0])
return opt_disk[0] + "file=" + spec_disk + left_opt
开发者ID:nasastry,项目名称:tp-libvirt,代码行数:29,代码来源:virsh_snapshot_create_as.py
示例3: get_kernel_file
def get_kernel_file(vm):
"""
Get the kernel info file from guest.
"""
guestkallsyms = os.path.join(data_dir.get_tmp_dir(), "kallsyms")
guestmodules = os.path.join(data_dir.get_tmp_dir(), "modules")
# scp will miss the content of /proc/kallsysm.
session = vm.wait_for_login()
session.cmd("cat /proc/kallsyms > /root/kallsyms")
session.cmd("cat /proc/modules > /root/modules")
vm.copy_files_from("/root/kallsyms", guestkallsyms)
vm.copy_files_from("/root/modules", guestmodules)
return (guestkallsyms, guestmodules)
开发者ID:Antique,项目名称:tp-libvirt,代码行数:14,代码来源:perf_kvm.py
示例4: test_virt_tar_out
def test_virt_tar_out(test, vm, params):
"""
1) Write a tempfile to guest
2) Copy file to host with tar-out
3) Delete created file
"""
content = "This is file for test of virt-tar-out."
path = params.get("vt_temp_file", "/tmp/test_virt_tar_out")
file_dir = os.path.dirname(path)
path_on_host = os.path.join(data_dir.get_tmp_dir(),
"test_virt_tar_out.tar")
vt = utils_test.libguestfs.VirtTools(vm, params)
mountpoint = params.get("vt_mountpoint")
if mountpoint is None:
tmpdir = "gmount-%s" % (utils_misc.generate_random_string(6))
mountpoint = "/tmp/%s" % tmpdir
if not os.path.exists(mountpoint):
os.mkdir(mountpoint)
writes, writeo = vt.write_file_with_guestmount(mountpoint, path, content,
cleanup=False)
if utils_misc.umount("", mountpoint, "") is False:
logging.error("Umount vm's filesytem failed.")
if writes is False:
test.fail("Write file to mounted filesystem failed.")
logging.info("Create %s successfully.", path)
# Copy file to host
tar_out_result = vt.tar_out(file_dir, path_on_host)
logging.debug(tar_out_result)
if tar_out_result.exit_status:
test.fail("Tar out failed.")
logging.info("Tar out successfully.")
# uncompress file and check file in it.
uc_result = process.run("cd %s && tar xf %s" % (file_dir, path_on_host),
shell=True)
logging.debug(uc_result)
try:
os.remove(path_on_host)
except IOError as detail:
test.fail(str(detail))
if uc_result.exit_status:
test.fail("uncompress file on host failed.")
logging.info("uncompress file on host successfully.")
# Check file
cat_result = process.run("cat %s" % path, ignore_status=True, shell=True)
logging.debug(cat_result)
try:
os.remove(path)
except IOError as detail:
logging.error(detail)
if cat_result.exit_status:
test.fail("Cat file failed.")
else:
if not re.search(content, cat_result.stdout_text):
test.fail("Catted file do not match.")
开发者ID:balamuruhans,项目名称:tp-libvirt,代码行数:60,代码来源:virt_file_operations.py
示例5: setup_ceph_auth
def setup_ceph_auth():
disk_path = ("rbd:%s:mon_host=%s" % (ceph_disk_name, ceph_mon_ip))
disk_path += (":id=%s:key=%s" % (ceph_auth_user, ceph_auth_key))
if not utils_package.package_install(["ceph-common"]):
test.error("Failed to install ceph-common")
with open(key_file, 'w') as f:
f.write("[%s]\n\tkey = %s\n" %
(ceph_client_name, ceph_client_key))
# Delete the disk if it exists
cmd = ("rbd -m {0} {1} info {2} && rbd -m {0} {1} rm "
"{2}".format(ceph_mon_ip, key_opt, ceph_disk_name))
process.run(cmd, ignore_status=True, shell=True)
# Create an local image and make FS on it.
img_file = os.path.join(data_dir.get_tmp_dir(), "test.img")
disk_cmd = ("qemu-img create -f raw {0} 10M && mkfs.ext4 -F {0}"
.format(img_file))
process.run(disk_cmd, ignore_status=False, shell=True)
# Convert the image to remote storage
# Ceph can only support raw format
disk_cmd = ("qemu-img convert -O %s %s %s"
% ("raw", img_file, disk_path))
process.run(disk_cmd, ignore_status=False, shell=True)
开发者ID:yalzhang,项目名称:tp-libvirt,代码行数:27,代码来源:virsh_pool_auth.py
示例6: test_disk_format
def test_disk_format(vm, params):
"""
Test command disk-format
"""
add_ref = params.get("gf_add_ref", "disk")
readonly = params.get("gf_add_readonly", "no")
gf = utils_test.libguestfs.GuestfishTools(params)
image_path = params.get("image_path")
gf.add_drive_opts(image_path, readonly=readonly)
gf.run()
image_dir = params.get("img_dir", data_dir.get_tmp_dir())
image_name = params.get('image_name')
image_format = params["image_format"]
params['image_name'] = 'test'
support_format = ['raw', 'cow', 'qcow', 'qcow2', 'vdi', 'vmdk', 'vpc']
for i in support_format:
params['image_format'] = i
image = qemu_storage.QemuImg(params, image_dir, '')
image_path, _ = image.create(params)
result = gf.disk_format(image_path).stdout.strip()
os.remove(image_path)
if result != i:
gf.close_session()
raise error.TestFail("Format is %s, expected is %s" % (result, i))
gf.close_session()
params['image_name'] = image_name
params["image_format"] = image_format
开发者ID:zcyes,项目名称:tp-libvirt,代码行数:33,代码来源:guestfish_block_dev.py
示例7: run
def run(test, params, env):
"""
Test for virt-xml-validate
"""
# Get the full path of virt-xml-validate command.
VIRT_XML_VALIDATE = os_dep.command("virt-xml-validate")
vm_name = params.get("main_vm", "virt-tests-vm1")
vm = env.get_vm(vm_name)
schema = params.get("schema", "domain")
output = params.get("output_file", "output")
output_path = os.path.join(data_dir.get_tmp_dir(), output)
if schema == "domain":
virsh.dumpxml(vm_name, to_file=output_path)
# TODO Add more case for other schema.
cmd = "%s %s %s" % (VIRT_XML_VALIDATE, output_path, schema)
cmd_result = utils.run(cmd, ignore_status=True)
if cmd_result.exit_status:
raise error.TestFail("virt-xml-validate command failed.\n"
"Detail: %s." % cmd_result)
if cmd_result.stdout.count("fail"):
raise error.TestFail("xml fails to validate\n"
"Detail: %s." % cmd_result)
开发者ID:Acidburn0zzz,项目名称:tp-libvirt,代码行数:26,代码来源:virt_xml_validate.py
示例8: test_tar_in
def test_tar_in(test, vm, params):
"""
1) Fall into guestfish session w/ inspector
2) Write a tempfile on host
3) Copy file to guest with tar-in
4) Delete created file
5) Check file on guest
"""
content = "This is file for test of tar-in."
path = params.get("gf_temp_file", "/tmp/test_tar_in")
path_on_host = os.path.join(data_dir.get_tmp_dir(), "test_tar_in.tar")
# Create a file on host
try:
with open(path, 'w') as fd:
fd.write(content)
except IOError as detail:
test.cancel("Prepare file on host failed:%s" % detail)
try:
tar = tarfile.open(path_on_host, "w")
tar.add(path)
tar.close()
except tarfile.TarError as detail:
test.cancel("Prepare tar file on host failed:%s" % detail)
params['libvirt_domain'] = vm.name
params['gf_inspector'] = True
gf = utils_test.libguestfs.GuestfishTools(params)
# Copy file to guest
tar_in_result = gf.tar_in(path_on_host, "/")
logging.debug(tar_in_result)
# Delete file on host
try:
os.remove(path)
os.remove(path_on_host)
except OSError as detail:
# Let it go because file maybe not exist
logging.warning(detail)
if tar_in_result.exit_status:
gf.close_session()
test.fail("Tar in failed.")
logging.info("Tar in successfully.")
# Cat file on guest
cat_result = gf.cat(path)
rm_result = gf.rm(path)
gf.close_session()
logging.debug(cat_result)
logging.debug(rm_result)
if cat_result.exit_status:
test.fail("Cat file failed.")
else:
if not re.search(content, cat_result.stdout):
test.fail("Catted file do not match")
if rm_result.exit_status:
test.fail("Rm file failed.")
logging.info("Rm %s successfully.", path)
开发者ID:balamuruhans,项目名称:tp-libvirt,代码行数:60,代码来源:guestfs_file_operations.py
示例9: make_image_file_path
def make_image_file_path(self, index):
"""Create backing file for test disk device"""
return os.path.join(data_dir.get_tmp_dir(),
'disk_%s_%s_%d.raw'
% (self.__class__.__name__,
self.identifier,
index))
开发者ID:bssrikanth,项目名称:tp-libvirt,代码行数:7,代码来源:virsh_attach_device.py
示例10: test_download
def test_download(vm, params):
"""
Test command download
"""
add_ref = params.get("gf_add_ref", "disk")
readonly = "yes" == params.get("gf_add_readonly")
gf = utils_test.libguestfs.GuestfishTools(params)
if add_ref == "disk":
image_path = params.get("image_path")
gf.add_drive_opts(image_path, readonly=readonly)
elif add_ref == "domain":
vm_name = params.get("main_vm")
gf.add_domain(vm_name, readonly=readonly)
gf.run()
gf.do_mount("/")
gf.write("/src.txt", "Hello World")
src_size = gf.filesize("/src.txt").stdout.strip()
dest = "%s/dest.txt" % data_dir.get_tmp_dir()
gf.download("/src.txt", "%s" % dest)
gf.close_session()
content = commands.getoutput("cat %s" % dest)
commands.getstatus("rm %s" % dest)
if content != "Hello World":
raise error.TestFail("Content or filesize is not match")
开发者ID:CongLi,项目名称:tp-libvirt,代码行数:31,代码来源:guestfish_misc.py
示例11: secret_validate
def secret_validate(file=None, **virsh_dargs):
"""
Test for schema secret
"""
tmp_dir = data_dir.get_tmp_dir()
volume_path = os.path.join(tmp_dir, "secret_volume")
ephemeral = "no"
private = "no"
secret_xml_obj = SecretXML(ephemeral, private)
status, uuid = commands.getstatusoutput("uuidgen")
if status:
raise error.TestNAError("Failed to generate valid uuid")
secret_xml_obj.uuid = uuid
secret_xml_obj.volume = volume_path
secret_xml_obj.usage = "volume"
secret_obj_xmlfile = os.path.join(SECRET_DIR, uuid + ".xml")
cmd_result = virsh.secret_define(secret_xml_obj.xml, debug=True)
cmd_result = virsh.secret_list(**virsh_dargs)
libvirt.check_exit_status(cmd_result)
try:
uuid = re.findall(r"(\S+)\ +(\S+)[\ +\n]", str(cmd_result.stdout))[1][0]
except IndexError:
raise error.TestError("Fail to get secret uuid")
if uuid:
try:
virsh.secret_dumpxml(uuid, to_file=file, **virsh_dargs)
except error.CmdError, e:
raise error.TestError(str(e))
开发者ID:uni-peter-zheng,项目名称:tp-libvirt,代码行数:33,代码来源:virt_xml_validate.py
示例12: test_alloc
def test_alloc(vm, params):
"""
Test command alloc:
"""
add_ref = params.get("gf_add_ref", "disk")
readonly = "yes" == params.get("gf_add_readonly")
gf = utils_test.libguestfs.GuestfishTools(params)
img_dir = params.get("img_dir", data_dir.get_tmp_dir())
test_img_normal = img_dir + '/alloc_test_normal.img'
test_img_error = img_dir + '/alloc_test_error.img'
os.system('rm -f %s' % test_img_normal)
os.system('rm -f %s' % test_img_error)
gf.alloc(test_img_normal, '100M')
if not os.path.exists(test_img_normal):
gf.close_session()
os.system('rm -f %s' % test_img_normal)
raise error.TestFail("test_alloc failed, file not allocated correctly")
temp, avil_size = commands.getstatusoutput("df -P -B 1G %s | awk 'NR==2{print $4}'" % img_dir)
gf_result = gf.alloc(test_img_error, str(int(avil_size) + 10) + 'G')
if gf_result.exit_status == 0 or os.path.exists(test_img_error):
gf.close_session()
logging.error(gf_result)
os.system('rm -f %s' % test_img_error)
raise error.TestFail("test_alloc failed, alloc doesn't fail without enough space")
gf.close_session()
os.system('rm -f %s' % test_img_normal)
os.system('rm -f %s' % test_img_error)
开发者ID:zcyes,项目名称:tp-libvirt,代码行数:29,代码来源:guestfish_utils.py
示例13: post_restart_save_restore
def post_restart_save_restore(params, libvirtd, vm):
"""
Cleanup for test restart_save_restore
"""
save_path = os.path.join(data_dir.get_tmp_dir(), 'tmp.save')
if os.path.exists(save_path):
os.remove(save_path)
开发者ID:balamuruhans,项目名称:tp-libvirt,代码行数:7,代码来源:crash_regression.py
示例14: post_pm_test
def post_pm_test(params, libvirtd, vm):
"""
Cleanup for pm_test.
"""
save_path = os.path.join(data_dir.get_tmp_dir(), 'tmp.save')
if os.path.exists(save_path):
os.remove(save_path)
开发者ID:balamuruhans,项目名称:tp-libvirt,代码行数:7,代码来源:crash_regression.py
示例15: run
def run(test, params, env):
"""
Test command: virsh dump.
This command can dump the core of a domain to a file for analysis.
1. Positive testing
1.1 Dump domain with valid options.
1.2 Avoid file system cache when dumping.
1.3 Compress the dump images to valid/invalid formats.
2. Negative testing
2.1 Dump domain to a non-exist directory.
2.2 Dump domain with invalid option.
2.3 Dump a shut-off domain.
"""
vm_name = params.get("main_vm", "vm1")
vm = env.get_vm(vm_name)
options = params.get("dump_options")
dump_file = params.get("dump_file", "vm.core")
dump_dir = params.get("dump_dir", data_dir.get_tmp_dir())
if os.path.dirname(dump_file) is "":
dump_file = os.path.join(dump_dir, dump_file)
dump_image_format = params.get("dump_image_format")
start_vm = params.get("start_vm") == "yes"
paused_after_start_vm = params.get("paused_after_start_vm") == "yes"
status_error = params.get("status_error", "no") == "yes"
timeout = int(params.get("check_pid_timeout", "5"))
memory_dump_format = params.get("memory_dump_format", "")
uri = params.get("virsh_uri")
unprivileged_user = params.get('unprivileged_user')
if unprivileged_user:
if unprivileged_user.count('EXAMPLE'):
unprivileged_user = 'testacl'
if not libvirt_version.version_compare(1, 1, 1):
if params.get('setup_libvirt_polkit') == 'yes':
test.cancel("API acl test not supported in current"
" libvirt version.")
def wait_pid_active(pid, timeout=5):
"""
Wait for pid in running status
:param: pid: Desired pid
:param: timeout: Max time we can wait
"""
cmd = ("cat /proc/%d/stat | awk '{print $3}'" % pid)
try:
while (True):
timeout = timeout - 1
if not timeout:
test.cancel("Time out for waiting pid!")
pid_status = process.run(cmd, ignore_status=False, shell=True).stdout.strip()
if pid_status != "R":
time.sleep(1)
continue
else:
break
except Exception, detail:
test.fail(detail)
开发者ID:lento-sun,项目名称:tp-libvirt,代码行数:60,代码来源:virsh_dump.py
示例16: test_upload_offset
def test_upload_offset(vm, params):
"""
Test command upload_offset
"""
add_ref = params.get("gf_add_ref", "disk")
readonly = params.get("gf_add_readonly", "no")
gf = utils_test.libguestfs.GuestfishTools(params)
image_path = params.get("image_path")
gf.add_drive_opts(image_path, readonly=readonly)
gf.run()
gf.do_mount("/")
filename = "%s/src.txt" % data_dir.get_tmp_dir()
string = "Hello World"
commands.getoutput("echo %s > %s" % (string, filename))
gf.upload_offset(filename, "/dest.txt", 0)
content = gf.cat("/dest.txt").stdout.strip()
gf.close_session()
commands.getoutput("rm %s" % filename)
if content != string:
raise error.TestFail("Content is not correct")
开发者ID:zcyes,项目名称:tp-libvirt,代码行数:26,代码来源:guestfish_misc.py
示例17: execute
def execute(self):
super(GitRepoParamHelper, self).execute()
cwd = os.path.curdir
os.chdir(self.destination_dir)
process.system('git remote add origin %s' %
self.uri, ignore_status=True)
if self.recursive == 'yes':
process.system('git submodule init')
process.system('git submodule update')
if self.tag:
process.system('git checkout %s' % self.tag)
if self.key_file is not None:
try:
gnupg_home = os.path.join(data_dir.get_tmp_dir(),
'gnupg')
if not os.path.isdir(gnupg_home):
os.makedirs(gnupg_home)
os.environ['GNUPGHOME'] = gnupg_home
process.system('gpg --import %s' % self.key_file)
logging.debug('Verifying if tag is actually signed with '
'GPG key ID %s' % self.key_file)
process.system('git tag -v %s' % self.tag)
except process.CmdError:
raise exceptions.TestError("GPG signature check for git repo "
"%s failed" % self.name)
# Log the top commit message, good for quick reference
process.system('git log -1')
os.chdir(cwd)
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:32,代码来源:build_helper.py
示例18: test_upload
def test_upload(vm, params):
"""
Test command upload
"""
add_ref = params.get("gf_add_ref", "disk")
readonly = "yes" == params.get("gf_add_readonly")
gf = utils_test.libguestfs.GuestfishTools(params)
if add_ref == "disk":
image_path = params.get("image_path")
gf.add_drive_opts(image_path, readonly=readonly)
elif add_ref == "domain":
vm_name = params.get("main_vm")
gf.add_domain(vm_name, readonly=readonly)
gf.run()
gf.do_mount("/")
filename = "%s/src.txt" % data_dir.get_tmp_dir()
fd = open(filename, "w+")
fd.write("Hello World")
fd.close()
gf.upload(filename, "/dest.txt")
content = gf.cat("/dest.txt").stdout.strip()
gf.close_session()
commands.getoutput("rm %s" % filename)
if content != "Hello World":
raise error.TestFail("Content is not correct")
开发者ID:CongLi,项目名称:tp-libvirt,代码行数:31,代码来源:guestfish_misc.py
示例19: __init__
def __init__(self, test, params):
self.vm_name = params.get("main_vm", "avocado-vt-vm1")
self.env = params.get("env")
self.vm = self.env.get_vm(self.vm_name)
self.operation_timeout = int(params.get("operation_timeout"))
self.nfs_no_response_sign = params.get("nfs_no_response_sign")
self.export_options = params.get("export_options")
self.mount_options = params.get("mount_options")
self.operation = params.get("operation")
self.operation_option = params.get("operation_option")
self.sav_filename = params.get("sav_filename")
self.iptable_rule = params.get("iptable_rule")
self.vmxml_backup = vm_xml.VMXML.new_from_inactive_dumpxml(
self.vm_name)
self.disk_tgt = self.vmxml_backup.get_disk_all().keys()[0]
def setup_nfs_disk(vm_name):
"""
Setup disk on nfs storage
"""
vmxml = vm_xml.VMXML.new_from_dumpxml(self.vm_name)
disk_xml = vmxml.get_devices(device_type="disk")[0]
source_image_path = disk_xml.source.attrs['file']
logging.debug("VM origin image path: %s", source_image_path)
seLinuxBool = SELinuxBoolean(params)
seLinuxBool.setup()
nfs_res = utils_test.libvirt.setup_or_cleanup_nfs(
is_setup=True,
is_mount=True,
export_options=self.export_options,
mount_options=self.mount_options)
logging.debug('NFS is setup ~~~~~~~~~~')
cp_cmd = process.run("cp %s %s" % (
source_image_path, nfs_res['mount_dir']))
time.sleep(3)
logging.debug("cp command: %s", cp_cmd.command)
vmxml.del_device(disk_xml)
nfs_image_path = os.path.join(
nfs_res['mount_dir'], os.path.basename(source_image_path))
logging.debug("VM new image path %s", nfs_image_path)
disk_dict = {'attrs': {'file': nfs_image_path}}
disk_xml.source = disk_xml.new_disk_source(**disk_dict)
logging.debug("VM new disk xml:\n%s", disk_xml)
vmxml.add_device(disk_xml)
vmxml.sync()
return nfs_res
def operations():
"""
Do save | domstats | blkdeviotune operations
"""
if self.operation == "save":
virsh.save(self.vm_name, self.save_file, debug=True, timeout=self.operation_timeout)
if self.operation == "domstats":
virsh.domstats(self.vm_name, opertions=self.operation_option, debug=True, timeout=self.operation_timeout)
if self.operation == "blkdeviotune":
virsh.blkdeviotune(self.vm_name, self.disk_tgt, debug=True, timeout=self.operation_timeout)
self.nfs_res = setup_nfs_disk(self.vm_name)
self.save_file = os.path.join(data_dir.get_tmp_dir(), self.sav_filename)
self.td = threading.Thread(target=operations)
开发者ID:lento-sun,项目名称:tp-libvirt,代码行数:60,代码来源:resource_abnormal.py
示例20: setUp
def setUp(self):
# Make all virsh commands fail the test unconditionally
for symbol in dir(virsh):
if symbol not in virsh.NOCLOSE:
# Exceptions are callable
setattr(virsh, symbol, self.bogusVirshFailureException)
# cause any called virsh commands to fail testing unless a mock declared
# necessary so virsh module doesn't complain about missing virsh command
# and to catch any libvirt_xml interface which calls virsh functions
# unexpectidly.
self.dummy_virsh = virsh.Virsh(virsh_exec='/bin/false',
uri='qemu:///system',
debug=True,
ignore_status=True)
# make a tmp_dir to store informations.
LibvirtXMLTestBase.__doms_dir__ = os.path.join(data_dir.get_tmp_dir(),
'domains')
if not os.path.isdir(LibvirtXMLTestBase.__doms_dir__):
os.makedirs(LibvirtXMLTestBase.__doms_dir__)
# Normally not kosher to call super_set, but required here for testing
self.dummy_virsh.__super_set__('capabilities', self._capabilities)
self.dummy_virsh.__super_set__('dumpxml', self._dumpxml)
self.dummy_virsh.__super_set__('domuuid', self._domuuid)
self.dummy_virsh.__super_set__('define', self._define)
self.dummy_virsh.__super_set__('nodedev_dumpxml', self._nodedev_dumpxml)
开发者ID:Antique,项目名称:virt-test,代码行数:27,代码来源:libvirt_xml_unittest.py
注:本文中的virttest.data_dir.get_tmp_dir函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论