本文整理汇总了Python中virttest.utils_misc.generate_random_string函数的典型用法代码示例。如果您正苦于以下问题:Python generate_random_string函数的具体用法?Python generate_random_string怎么用?Python generate_random_string使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了generate_random_string函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: file_exists
def file_exists(params, filename_path):
sg_uri = create_gluster_uri(params, stripped=True)
g_uri = create_gluster_uri(params, stripped=False)
# Using directly /tmp dir because directory should be really temporary and
# should be deleted immediately when no longer needed and
# created directory don't file tmp dir by any data.
tmpdir = "gmount-%s" % (utils_misc.generate_random_string(6))
tmpdir_path = os.path.join(data_dir.get_tmp_dir(), tmpdir)
while os.path.exists(tmpdir_path):
tmpdir = "gmount-%s" % (utils_misc.generate_random_string(6))
tmpdir_path = os.path.join(data_dir.get_tmp_dir(), tmpdir)
ret = False
try:
try:
os.mkdir(tmpdir_path)
glusterfs_mount(sg_uri, tmpdir_path)
mount_filename_path = os.path.join(tmpdir_path,
filename_path[len(g_uri):])
if os.path.exists(mount_filename_path):
ret = True
except Exception as e:
logging.error("Failed to mount gluster volume %s to"
" mount dir %s: %s" % (sg_uri, tmpdir_path, e))
finally:
if utils_misc.umount(sg_uri, tmpdir_path, "glusterfs", False,
"fuse.glusterfs"):
try:
os.rmdir(tmpdir_path)
except OSError:
pass
else:
logging.warning("Unable to unmount tmp directory %s with glusterfs"
" mount.", tmpdir_path)
return ret
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:34,代码来源:gluster.py
示例2: test_default_default
def test_default_default(self):
style = self.get_style(
utils_misc.generate_random_string(16),
utils_misc.generate_random_string(16))
self.assertEqual(style['mac_prefix'], '9a')
self.assertEqual(style['container_class'], utils_net.QemuIface)
self.assert_(issubclass(style['container_class'], utils_net.VirtIface))
开发者ID:kylazhang,项目名称:avocado-vt,代码行数:7,代码来源:test_utils_net.py
示例3: create_bridge
def create_bridge():
"""
Create a bridge on host for test
"""
cmd_create_br = 'nmcli con add type bridge con-name %s ifname %s'
con_name = 'con_' + utils_misc.generate_random_string(3)
bridge_name = 'br_' + utils_misc.generate_random_string(3)
process.run(cmd_create_br % (con_name, bridge_name), verbose=True)
return con_name, bridge_name
开发者ID:nasastry,项目名称:tp-libvirt,代码行数:9,代码来源:mtu.py
示例4: test_arbitrart_attributes
def test_arbitrart_attributes(self):
parallel = librarian.get('parallel')(virsh_instance = self.dummy_virsh)
serial = librarian.get('serial')(virsh_instance = self.dummy_virsh)
channel = librarian.get('channel')(virsh_instance = self.dummy_virsh)
console = librarian.get('console')(virsh_instance = self.dummy_virsh)
for chardev in (parallel, serial, channel, console):
attribute1 = utils_misc.generate_random_string(10)
value1 = utils_misc.generate_random_string(10)
attribute2 = utils_misc.generate_random_string(10)
value2 = utils_misc.generate_random_string(10)
chardev.add_source(**{attribute1:value1, attribute2:value2})
chardev.add_target(**{attribute1:value1, attribute2:value2})
self.assertEqual(chardev.sources, chardev.targets)
开发者ID:LeiCui,项目名称:virt-test,代码行数:13,代码来源:libvirt_xml_unittest.py
示例5: update_env
def update_env(env):
@utils_env.lock_safe
def _update_env(env, key, value):
env["changing_dict"][key] = value
if "changing_dict" not in env:
env["changing_dict"] = {}
while True:
key = "%s" % utils_misc.generate_random_string(length=10)
value = "%s" % utils_misc.generate_random_string(length=10)
_update_env(env, key, value)
if termination_event.isSet():
break
开发者ID:ChenFanFnst,项目名称:avocado-vt,代码行数:13,代码来源:test_utils_env.py
示例6: __init__
def __init__(self, vm_name, params):
self.name = vm_name
self.params = params
self.vm_type = self.params.get('vm_type')
self.driver_type = self.params.get('driver_type')
self.instance = ("%s-%s" % (time.strftime("%Y%m%d-%H%M%S"),
utils_misc.generate_random_string(16)))
开发者ID:kylazhang,项目名称:avocado-vt,代码行数:7,代码来源:test_utils_net.py
示例7: __init__
def __init__(self, test_params):
"""
Setup one or more device xml for a device based on TestParams instance
"""
if self.__class__.identifier is None:
identifier = utils_misc.generate_random_string(4)
self.__class__.identifier = identifier
# how many of this type of device to make
self.test_params = test_params
# Copy params for this class into attributes
cls_name = self.__class__.__name__
# Already have test-prefix stripped off
for key, value in self.test_params.dev_params(cls_name).items():
# Any keys with _anything are not used by this class
if key.count('_') > 0:
logging.debug("Removing key: %s from params for class %s",
test_params.test_prefix + key, cls_name)
continue
# Attempt to convert numbers
try:
setattr(self, key, int(value))
except ValueError:
setattr(self, key, value)
if self.count < 1:
raise error.TestError("Configuration for class %s count must "
"be specified and greater than zero")
logging.info("Setting up %d %s device(s)", self.count, cls_name)
# Setup each device_xml instance
self._device_xml_list = [self.init_device(index)
# test_params.dev_params() enforces count
for index in xrange(0, self.count)]
开发者ID:Antique,项目名称:tp-libvirt,代码行数:31,代码来源:virsh_attach_device.py
示例8: test_virt_tar_out
def test_virt_tar_out(test, vm, params):
"""
1) Write a tempfile to guest
2) Copy file to host with tar-out
3) Delete created file
"""
content = "This is file for test of virt-tar-out."
path = params.get("vt_temp_file", "/tmp/test_virt_tar_out")
file_dir = os.path.dirname(path)
path_on_host = os.path.join(data_dir.get_tmp_dir(),
"test_virt_tar_out.tar")
vt = utils_test.libguestfs.VirtTools(vm, params)
mountpoint = params.get("vt_mountpoint")
if mountpoint is None:
tmpdir = "gmount-%s" % (utils_misc.generate_random_string(6))
mountpoint = "/tmp/%s" % tmpdir
if not os.path.exists(mountpoint):
os.mkdir(mountpoint)
writes, writeo = vt.write_file_with_guestmount(mountpoint, path, content,
cleanup=False)
if utils_misc.umount("", mountpoint, "") is False:
logging.error("Umount vm's filesytem failed.")
if writes is False:
test.fail("Write file to mounted filesystem failed.")
logging.info("Create %s successfully.", path)
# Copy file to host
tar_out_result = vt.tar_out(file_dir, path_on_host)
logging.debug(tar_out_result)
if tar_out_result.exit_status:
test.fail("Tar out failed.")
logging.info("Tar out successfully.")
# uncompress file and check file in it.
uc_result = process.run("cd %s && tar xf %s" % (file_dir, path_on_host),
shell=True)
logging.debug(uc_result)
try:
os.remove(path_on_host)
except IOError as detail:
test.fail(str(detail))
if uc_result.exit_status:
test.fail("uncompress file on host failed.")
logging.info("uncompress file on host successfully.")
# Check file
cat_result = process.run("cat %s" % path, ignore_status=True, shell=True)
logging.debug(cat_result)
try:
os.remove(path)
except IOError as detail:
logging.error(detail)
if cat_result.exit_status:
test.fail("Cat file failed.")
else:
if not re.search(content, cat_result.stdout_text):
test.fail("Catted file do not match.")
开发者ID:balamuruhans,项目名称:tp-libvirt,代码行数:60,代码来源:virt_file_operations.py
示例9: receiver
def receiver():
""" Receive side """
logging.info("Starting receiver process on %s", receiver_addr)
if vm_receiver:
session = vm_receiver.wait_for_login(timeout=login_timeout)
else:
username = params.get("username", "")
password = params.get("password", "")
prompt = params.get("shell_prompt", "[\#\$]")
linesep = eval("'%s'" % params.get("shell_linesep", r"\n"))
client = params.get("shell_client")
port = int(params.get("shell_port"))
log_filename = ("session-%s-%s.log" % (receiver_addr,
utils_misc.generate_random_string(4)))
session = remote.remote_login(client, receiver_addr, port,
username, password, prompt,
linesep, log_filename, timeout)
session.set_status_test_command("echo %errorlevel%")
install_ntttcp(session)
ntttcp_receiver_cmd = params.get("ntttcp_receiver_cmd")
global _receiver_ready
f = open(results_path + ".receiver", 'a')
for b in buffers:
utils_misc.wait_for(lambda: not _wait(), timeout)
_receiver_ready = True
rbuf = params.get("fixed_rbuf", b)
cmd = ntttcp_receiver_cmd % (
session_num, receiver_addr, rbuf, buf_num)
r = session.cmd_output(cmd, timeout=timeout,
print_func=logging.debug)
f.write("Send buffer size: %s\n%s\n%s" % (b, cmd, r))
f.close()
session.close()
开发者ID:ayiyaliing,项目名称:virt-test,代码行数:33,代码来源:ntttcp.py
示例10: get_backup_set
def get_backup_set(filename, backup_dir, action, good):
"""
Get all sources and destinations required for each backup.
"""
if not os.path.isdir(backup_dir):
os.makedirs(backup_dir)
basename = os.path.basename(filename)
bkp_set = []
if action not in ('backup', 'restore'):
logging.error("No backup sets for action: %s, state: %s",
action, good)
return bkp_set
if good:
src = filename
dst = os.path.join(backup_dir, "%s.backup" % basename)
if action == 'backup':
bkp_set = [[src, dst]]
elif action == 'restore':
bkp_set = [[dst, src]]
else:
# We have to make 2 backups, one of the bad image, another one
# of the good image
src_bad = filename
src_good = os.path.join(backup_dir, "%s.backup" % basename)
hsh = utils_misc.generate_random_string(4)
dst_bad = (os.path.join(backup_dir, "%s.bad.%s" %
(basename, hsh)))
dst_good = (os.path.join(backup_dir, "%s.good.%s" %
(basename, hsh)))
if action == 'backup':
bkp_set = [[src_bad, dst_bad], [src_good, dst_good]]
elif action == 'restore':
bkp_set = [[src_good, src_bad]]
return bkp_set
开发者ID:avocado-framework,项目名称:avocado-vt,代码行数:34,代码来源:storage.py
示例11: __init__
def __init__(self, test, params, env):
self.test = test
self.params = params
self.env = env
self.login_timeout = int(self.params.get("login_timeout", 360))
self.mig_timeout = float(self.params.get("mig_timeout", "3600"))
self.mig_protocol = self.params.get("migration_protocol", "tcp")
self.mig_cancel_delay = int(self.params.get("mig_cancel") == "yes") * 2
self.mig_exec_cmd_src = self.params.get("migration_exec_cmd_src")
self.mig_exec_cmd_dst = self.params.get("migration_exec_cmd_dst")
if self.mig_exec_cmd_src and "gzip" in self.mig_exec_cmd_src:
self.mig_exec_file = self.params.get("migration_exec_file",
"/var/tmp/exec")
self.mig_exec_file += "-%s" % utils_misc.generate_random_string(8)
self.mig_exec_cmd_src = self.mig_exec_cmd_src % self.mig_exec_file
self.mig_exec_cmd_dst = self.mig_exec_cmd_dst % self.mig_exec_file
self.offline = self.params.get("offline", "no") == "yes"
self.check = self.params.get("vmstate_check", "no") == "yes"
self.living_guest_os = self.params.get("migration_living_guest",
"yes") == "yes"
self.vm = self.env.get_vm(self.params["main_vm"])
self.test_command = self.params.get("migration_test_command")
self.background_command = self.params.get("migration_bg_command")
self.bg_check_command = self.params.get("migration_bg_check_command")
self.guest_stress_test = self.params.get("guest_stress_test")
self.ping_pong = self.params.get("ping_pong", 1)
self.stress_stop_cmd = self.params.get("stress_stop_cmd")
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:28,代码来源:migration_after_vm_paused.py
示例12: run_smartcard_setup
def run_smartcard_setup(test, params, env):
"""
Simple setup test to create certs on the client to be passed to VM's
smartcard.
@param test: QEMU test object.
@param params: Dictionary with the test parameters.
@param env: Dictionary with test environment.
"""
# Get necessary params
cert_list = params.get("gencerts").split(",")
cert_db = params.get("certdb")
self_sign = params.get("self_sign")
cert_trustargs = params.get("trustargs")
logging.debug("Cert List:")
for cert in cert_list:
logging.debug(cert)
logging.debug(cert_trustargs)
logging.debug("CN=" + cert)
logging.debug(cert_db)
client_vm = env.get_vm(params["client_vm"])
client_vm.verify_alive()
client_session = client_vm.wait_for_login(
timeout=int(params.get("login_timeout", 360)),
username="root", password="123456")
#generate a random string, used to create a random key for the certs
randomstring = utils_misc.generate_random_string(2048)
cmd = "echo '" + randomstring + "' > /tmp/randomtext.txt"
output = client_session.cmd(cmd)
#output2 = client_session.cmd("cat /tmp/randomtext.txt")
utils_spice.wait_timeout(5)
#for each cert listed by the test, create it on the client
for cert in cert_list:
cmd = "certutil "
if self_sign:
cmd += " -x "
cmd += "-t '" + cert_trustargs + "' -S -s " + "'CN=" + cert
cmd += "' -n '" + cert + "' -d " + cert_db
cmd += " -z " + "/tmp/randomtext.txt"
logging.debug(cmd)
output = client_session.cmd(cmd)
logging.debug("Cert Created: " + output)
cmd = "certutil -L -d " + cert_db
output = client_session.cmd(cmd)
logging.info("Listing all certs on the client: " + output)
#Verify that all the certs have been generated on the client
for cert in cert_list:
if not(cert in output):
raise error.TestFail("Certificate %s not found" % cert)
client_session.close()
开发者ID:LeiCui,项目名称:virt-test,代码行数:59,代码来源:smartcard_setup.py
示例13: specify_fstab_entry
def specify_fstab_entry(type, **kwargs):
"""
Specify entry in fstab file
"""
type_list = ['cdrom', 'uuid', 'label', 'virtio', 'sr0', 'invalid']
if type not in type_list:
test.error('Not support %s in fstab' % type)
session = kwargs['session']
# Specify cdrom device
if type == 'cdrom':
line = '/dev/cdrom /media/CDROM auto exec'
if 'grub2' in utils_misc.get_bootloader_cfg(session):
line += ',nofail'
line += ' 0 0'
logging.debug('fstab entry is "%s"', line)
cmd = [
'mkdir -p /media/CDROM',
'mount /dev/cdrom /media/CDROM',
'echo "%s" >> /etc/fstab' % line
]
for i in range(len(cmd)):
session.cmd(cmd[i])
elif type == 'sr0':
line = params.get('fstab_content')
session.cmd('echo "%s" >> /etc/fstab' % line)
elif type == 'invalid':
line = utils_misc.generate_random_string(6)
session.cmd('echo "%s" >> /etc/fstab' % line)
else:
map = {'uuid': 'UUID', 'label': 'LABEL', 'virtio': '/vd'}
logging.info(type)
if session.cmd_status('cat /etc/fstab|grep %s' % map[type]):
# Specify device by UUID
if type == 'uuid':
entry = session.cmd('blkid -s UUID|grep swap').strip().split()
# Replace path for UUID
origin = entry[0].strip(':')
replace = entry[1].replace('"', '')
# Specify virtio device
elif type == 'virtio':
entry = session.cmd('cat /etc/fstab|grep /boot').strip()
# Get the ID (no matter what, usually UUID)
origin = entry.split()[0]
key = origin.split('=')[1]
blkinfo = session.cmd('blkid|grep %s' % key).strip()
# Replace with virtio disk path
replace = blkinfo.split()[0].strip(':')
# Specify device by label
elif type == 'label':
blk = make_label(session)
entry = session.cmd('blkid|grep %s' % blk).strip()
# Remove " from LABEL="****"
replace = entry.split()[1].strip().replace('"', '')
# Replace the original id/path with label
origin = entry.split()[0].strip(':')
cmd_fstab = "sed -i 's|%s|%s|' /etc/fstab" % (origin, replace)
session.cmd(cmd_fstab)
fstab = session.cmd_output('cat /etc/fstab')
logging.debug('Content of /etc/fstab:\n%s', fstab)
开发者ID:lento-sun,项目名称:tp-libvirt,代码行数:59,代码来源:specific_kvm.py
示例14: test_full_set
def test_full_set(self):
def what_func(propertea):
return props[propertea]
props = {}
for propertea in self.VirtIface.__all_slots__:
props[propertea] = utils_misc.generate_random_string(16)
virtiface = self.VirtIface(props)
self.loop_assert(virtiface, props.keys(), what_func)
开发者ID:ChenFanFnst,项目名称:avocado-vt,代码行数:8,代码来源:test_utils_net.py
示例15: run
def run(test, params, env):
"""
live_snapshot_base test:
1). Boot up guest
2). Create a file on host and record md5
3). Copy the file to guest
3). Create live snapshot
4). Copy the file from guest,then check md5
:param test: Kvm test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment.
"""
vm = env.get_vm(params["main_vm"])
vm.verify_alive()
timeout = int(params.get("login_timeout", 3600))
session = vm.wait_for_login(timeout=timeout)
dd_timeout = params.get("dd_timeoout", 600)
copy_timeout = params.get("copy_timeoout", 600)
base_file = storage.get_image_filename(params, data_dir.get_data_dir())
device = vm.get_block({"file": base_file})
snapshot_file = "images/%s" % params.get("snapshot_name")
snapshot_file = utils_misc.get_path(data_dir.get_data_dir(), snapshot_file)
snapshot_format = params.get("snapshot_format", "qcow2")
tmp_name = utils_misc.generate_random_string(5)
src = dst = "/tmp/%s" % tmp_name
if params.get("os_type") != "linux":
dst = "c:\\users\\public\\%s" % tmp_name
try:
error_context.context("create file on host, copy it to guest",
logging.info)
cmd = params.get("dd_cmd") % src
process.system(cmd, timeout=dd_timeout, shell=True)
md5 = crypto.hash_file(src, algorithm="md5")
vm.copy_files_to(src, dst, timeout=copy_timeout)
process.system("rm -f %s" % src)
error_context.context("create live snapshot", logging.info)
if vm.live_snapshot(base_file, snapshot_file,
snapshot_format) != device:
test.fail("Fail to create snapshot")
backing_file = vm.monitor.get_backingfile(device)
if backing_file != base_file:
logging.error(
"backing file: %s, base file: %s", backing_file, base_file)
test.fail("Got incorrect backing file")
error_context.context("copy file to host, check content not changed",
logging.info)
vm.copy_files_from(dst, src, timeout=copy_timeout)
if md5 and (md5 != crypto.hash_file(src, algorithm="md5")):
test.fail("diff md5 before/after create snapshot")
session.cmd(params.get("alive_check_cmd", "dir"))
finally:
if session:
session.close()
process.system("rm -f %s %s" % (snapshot_file, src))
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:57,代码来源:live_snapshot_base.py
示例16: test_half_set
def test_half_set(self):
def what_func(propertea):
return props[propertea]
half_prop_end = (len(self.VirtIface.__all_slots__) / 2) + 1
props = {}
for propertea in self.VirtIface.__all_slots__[0:half_prop_end]:
props[propertea] = utils_misc.generate_random_string(16)
virtiface = self.VirtIface(props)
self.loop_assert(virtiface, props.keys(), what_func)
开发者ID:ChenFanFnst,项目名称:avocado-vt,代码行数:9,代码来源:test_utils_net.py
示例17: run
def run(test, params, env):
"""
Base test for vnc, mainly focus on handshaking during vnc connection setup.
This case check following point:
1) VNC server support different rfb protocol version. Now it is 3.3, 3.7
and 3.8.
2) Connection could be setup with password enable.
3) Change and __com.redhat_set_password monitor command could work.
This case will do following step:
1) Start VM with VNC password enable.
2) Handshaking after vnc password set by change.
3) Handshaking after vnc password set by __com.redhat_set_password.
4) Handshaking again after vnc password timeout.
:param test: QEMU test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environmen.
"""
vm = env.get_vm(params["main_vm"])
vm.verify_alive()
port = vm.get_vnc_port()
default_cmd = "__com.redhat_set_password protocol=vnc,"
default_cmd += "password=%s,expiration=%s"
change_passwd_cmd = params.get("change_passwd_cmd", default_cmd)
rfb_version_list = params.get("rfb_version").strip().split()
for rfb_version in rfb_version_list:
error_context.base_context("Test with guest RFB version %s" %
rfb_version)
rand = random.SystemRandom()
rand.seed()
password = utils_misc.generate_random_string(rand.randint(1, 8))
logging.info("Set VNC password to: %s", password)
timeout = rand.randint(10, 100)
logging.info("VNC password timeout is: %s", timeout)
vm.monitor.send_args_cmd(change_passwd_cmd % (password, timeout))
error_context.context("Connect to VNC server after setting password"
" to '%s'" % password)
vnc = VNC(port=port, rfb_version=rfb_version)
status = vnc.hand_shake(password)
vnc.initialize()
vnc.close()
if not status:
test.fail("VNC Authentication failed.")
logging.info("VNC Authentication pass")
logging.info("Waiting for vnc password timeout.")
time.sleep(timeout + 5)
error_context.context("Connect to VNC server after password expires")
vnc = VNC(port=port, rfb_version=rfb_version)
status = vnc.hand_shake(password)
vnc.close()
if status:
# Should not handshake succeffully.
test.fail("VNC connected with Timeout password, The"
" cmd of setting expire time doesn't work.")
开发者ID:bssrikanth,项目名称:tp-qemu,代码行数:57,代码来源:vnc.py
示例18: run
def run(test, params, env):
"""
KVM restore from file-test:
1) Pause VM
2) Save VM to file
3) Restore VM from file, and measure the time it takes
4) Remove VM restoration file
5) Check VM
:param test: QEMU test object
:param params: Dictionary with the test parameters
:param env: Dictionary with test environment
"""
vm = env.get_vm(params["main_vm"])
vm.verify_alive()
timeout = int(params.get("login_timeout", 360))
expect_time = int(params.get("expect_restore_time", 25))
session = vm.wait_for_login(timeout=timeout)
save_file = params.get("save_file", os.path.join("/tmp",
utils_misc.generate_random_string(8)))
try:
error.context("Pause VM", logging.info)
vm.pause()
error.context("Save VM to file", logging.info)
vm.save_to_file(save_file)
error.context("Restore VM from file", logging.info)
time.sleep(10)
utils_memory.drop_caches()
vm.restore_from_file(save_file)
session = vm.wait_for_login(timeout=timeout)
restore_time = utils_misc.monotonic_time() - vm.start_monotonic_time
test.write_test_keyval({'result': "%ss" % restore_time})
logging.info("Restore time: %ss" % restore_time)
finally:
try:
error.context("Remove VM restoration file", logging.info)
os.remove(save_file)
error.context("Check VM", logging.info)
vm.verify_alive()
vm.wait_for_login(timeout=timeout)
except Exception:
logging.warning("Unable to restore VM, restoring from image")
params["restore_image_after_testing"] = "yes"
if restore_time > expect_time:
raise error.TestFail(
"Guest restoration took too long: %ss" % restore_time)
session.close()
开发者ID:CongLi,项目名称:tp-qemu,代码行数:56,代码来源:save_restore_vm.py
示例19: setup
def setup(self, test, params, env):
self.br0_name = "br0-%s" % (utils_misc.generate_random_string(3))
while self.br0_name in utils_misc.get_net_if():
self.br0_name = "br0-%s" % (utils_misc.generate_random_string(3))
self.br0_ip = params.get("bridge_ip", "192.168.250.1")
self.ovs = None
error.context("Try to log into guest.")
self.vms = [env.get_vm(vm) for vm in params.get("vms").split()]
for vm in self.vms:
vm.verify_alive()
error.context("Start OpenVSwitch.")
self.ovs = openvswitch.OpenVSwitchSystem()
self.ovs.init_system()
self.ovs.check()
error.context("Add new bridge %s." % (self.br0_name))
self.ovs.add_br(self.br0_name)
utils_misc.set_net_if_ip(self.br0_name, self.br0_ip)
utils_misc.bring_up_ifname(self.br0_name)
self.dns_pidf = (utils_misc.check_add_dnsmasq_to_br(self.br0_name,
test.tmpdir))
error.context("Add new ports from vms %s to bridge %s." %
(self.vms, self.br0_name))
for vm in self.vms:
utils_misc.change_iface_bridge(vm.virtnet[1],
self.br0_name,
self.ovs)
logging.debug(self.ovs.status())
self.host = ovs_utils.Machine(src=test.srcdir)
self.mvms = [ovs_utils.Machine(vm) for vm in self.vms]
self.machines = [self.host] + self.mvms
#ForAllP(self.mvms).cmd("dhclinet")
time.sleep(5)
utils_misc.ForAllP(self.machines).fill_addrs()
开发者ID:LaneWolf,项目名称:virt-test,代码行数:41,代码来源:ovs_basic.py
示例20: nonexist_snapshot_file
def nonexist_snapshot_file(self):
"""
Generate a non-existed path of snapshot file.
"""
error_context.context("Generate a non-existed path of"
" snapshot file", logging.info)
tmp_name = utils_misc.generate_random_string(5)
dst = os.path.join(data_dir.get_tmp_dir(), tmp_name)
path = os.path.join(dst, self.snapshot_file)
if not os.path.exists(path):
return path
raise exceptions.TestFail("Path %s is existed." % path)
开发者ID:CongLi,项目名称:tp-qemu,代码行数:12,代码来源:live_snapshot_negative.py
注:本文中的virttest.utils_misc.generate_random_string函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论