本文整理汇总了Python中utils_misc.generate_random_string函数的典型用法代码示例。如果您正苦于以下问题:Python generate_random_string函数的具体用法?Python generate_random_string怎么用?Python generate_random_string使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了generate_random_string函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: file_exists
def file_exists(params, filename_path):
sg_uri = create_gluster_uri(params, stripped=True)
g_uri = create_gluster_uri(params, stripped=False)
# Using directly /tmp dir because directory should be really temporary and
# should be deleted immediately when no longer needed and
# created directory don't file tmp dir by any data.
tmpdir = "gmount-%s" % (utils_misc.generate_random_string(6))
tmpdir_path = os.path.join("/tmp", tmpdir)
while os.path.exists(tmpdir_path):
tmpdir = "gmount-%s" % (utils_misc.generate_random_string(6))
tmpdir_path = os.path.join("/tmp", tmpdir)
ret = False
try:
try:
os.mkdir(tmpdir_path)
glusterfs_mount(sg_uri, tmpdir_path)
mount_filename_path = os.path.join(tmpdir_path,
filename_path[len(g_uri):])
if os.path.exists(mount_filename_path):
ret = True
except Exception, e:
logging.error("Failed to mount gluster volume %s to"
" mount dir %s: %s" % (sg_uri, tmpdir_path, e))
finally:
if utils_misc.umount(sg_uri, tmpdir_path, "glusterfs", False,
"fuse.glusterfs"):
try:
os.rmdir(tmpdir_path)
except OSError:
pass
else:
logging.warning("Unable to unmount tmp directory %s with glusterfs"
" mount.", tmpdir_path)
return ret
开发者ID:xigao,项目名称:virt-test,代码行数:34,代码来源:gluster.py
示例2: update_env
def update_env(env):
@utils_env.lock_safe
def _update_env(env, key, value):
env["changing_dict"][key] = value
if "changing_dict" not in env:
env["changing_dict"] = {}
while True:
key = "%s" % utils_misc.generate_random_string(length=10)
value = "%s" % utils_misc.generate_random_string(length=10)
_update_env(env, key, value)
if termination_event.isSet():
break
开发者ID:MiriamDeng,项目名称:virt-test,代码行数:13,代码来源:utils_env_unittest.py
示例3: __init__
def __init__(
self,
test,
params,
image_name,
blkdebug_cfg="",
prompt=r"qemu-io>\s*$",
log_filename=None,
io_options="",
log_func=None,
):
self.type = ""
if log_filename:
log_filename += "-" + utils_misc.generate_random_string(4)
self.output_func = utils_misc.log_line
self.output_params = (log_filename,)
else:
self.output_func = None
self.output_params = ()
self.output_prefix = ""
self.prompt = prompt
self.blkdebug_cfg = blkdebug_cfg
self.qemu_io_cmd = utils_misc.get_path(test.bindir, params.get("qemu_io_binary", "qemu-io"))
self.io_options = io_options
self.run_command = False
self.image_name = image_name
self.blkdebug_cfg = blkdebug_cfg
self.log_func = log_func
开发者ID:iwamatsu,项目名称:autotest,代码行数:29,代码来源:qemu_io.py
示例4: login
def login(self, nic_index=0, timeout=LOGIN_TIMEOUT,
username=None, password=None):
"""
Log into the guest via SSH/Telnet/Netcat.
If timeout expires while waiting for output from the guest (e.g. a
password prompt or a shell prompt) -- fail.
@param nic_index: The index of the NIC to connect to.
@param timeout: Time (seconds) before giving up logging into the
guest.
@return: A ShellSession object.
"""
error.context("logging into '%s'" % self.name)
if not username:
username = self.params.get("username", "")
if not password:
password = self.params.get("password", "")
prompt = self.params.get("shell_prompt", "[\#\$]")
linesep = eval("'%s'" % self.params.get("shell_linesep", r"\n"))
client = self.params.get("shell_client")
address = self.get_address(nic_index)
port = self.get_port(int(self.params.get("shell_port")))
log_filename = ("session-%s-%s.log" %
(self.name, utils_misc.generate_random_string(4)))
session = remote.remote_login(client, address, port, username,
password, prompt, linesep,
log_filename, timeout)
session.set_status_test_command(self.params.get("status_test_command",
""))
self.remote_sessions.append(session)
return session
开发者ID:QiuMike,项目名称:virt-test,代码行数:31,代码来源:virt_vm.py
示例5: copy_files_from
def copy_files_from(self, guest_path, host_path, nic_index=0, limit="",
verbose=False, timeout=COPY_FILES_TIMEOUT,
username=None,password=None):
"""
Transfer files from the guest.
@param host_path: Guest path
@param guest_path: Host path
@param nic_index: The index of the NIC to connect to.
@param limit: Speed limit of file transfer.
@param verbose: If True, log some stats using logging.debug (RSS only)
@param timeout: Time (seconds) before giving up on doing the remote
copy.
"""
error.context("receiving file(s) from '%s'" % self.name)
if not username:
username = self.params.get("username", "")
if not password:
password = self.params.get("password", "")
client = self.params.get("file_transfer_client")
address = self.get_address(nic_index)
port = self.get_port(int(self.params.get("file_transfer_port")))
log_filename = ("transfer-%s-from-%s-%s.log" %
(self.name, address,
utils_misc.generate_random_string(4)))
remote.copy_files_from(address, client, username, password, port,
guest_path, host_path, limit, log_filename,
verbose, timeout)
utils_misc.close_log_file(log_filename)
开发者ID:QiuMike,项目名称:virt-test,代码行数:29,代码来源:virt_vm.py
示例6: get_backup_set
def get_backup_set(filename, backup_dir, action, good):
"""
Get all sources and destinations required for each backup.
"""
if not os.path.isdir(backup_dir):
os.makedirs(backup_dir)
basename = os.path.basename(filename)
bkp_set = []
if good:
src = filename
dst = os.path.join(backup_dir, "%s.backup" % basename)
if action == 'backup':
bkp_set = [[src, dst]]
elif action == 'restore':
bkp_set = [[dst, src]]
else:
# We have to make 2 backups, one of the bad image, another one
# of the good image
src_bad = filename
src_good = os.path.join(backup_dir, "%s.backup" % basename)
hsh = utils_misc.generate_random_string(4)
dst_bad = (os.path.join(backup_dir, "%s.bad.%s" %
(basename, hsh)))
dst_good = (os.path.join(backup_dir, "%s.good.%s" %
(basename, hsh)))
if action == 'backup':
bkp_set = [[src_bad, dst_bad], [src_good, dst_good]]
elif action == 'restore':
bkp_set = [[src_good, src_bad]]
if not bkp_set:
logging.error("No backup sets for action: %s, state: %s",
action, good)
return bkp_set
开发者ID:Guannan-Ren,项目名称:virt-test,代码行数:35,代码来源:storage.py
示例7: test_full_set
def test_full_set(self):
props = {}
for propertea in self.VirtIface.__slots__:
props[propertea] = utils_misc.generate_random_string(16)
virtiface = self.VirtIface(props)
what_func = lambda propertea:props[propertea]
self.loop_assert(virtiface, props.keys(), what_func)
开发者ID:HeidCloud,项目名称:virt-test,代码行数:7,代码来源:utils_net_unittest.py
示例8: copy_files_to
def copy_files_to(self, host_path, guest_path, nic_index=0, limit="",
verbose=False, timeout=COPY_FILES_TIMEOUT,
username=None, password=None):
"""
Transfer files to the remote host(guest).
:param host_path: Host path
:param guest_path: Guest path
:param nic_index: The index of the NIC to connect to.
:param limit: Speed limit of file transfer.
:param verbose: If True, log some stats using logging.debug (RSS only)
:param timeout: Time (seconds) before giving up on doing the remote
copy.
"""
error.context("sending file(s) to '%s'" % self.name)
if not username:
username = self.params.get("username", "")
if not password:
password = self.params.get("password", "")
client = self.params.get("file_transfer_client")
address = self.get_address(nic_index)
neigh_attach_if = ""
if address.lower().startswith("fe80"):
neigh_attach_if = utils_net.get_neigh_attch_interface(address)
port = self.get_port(int(self.params.get("file_transfer_port")))
log_filename = ("transfer-%s-to-%s-%s.log" %
(self.name, address,
utils_misc.generate_random_string(4)))
remote.copy_files_to(address, client, username, password, port,
host_path, guest_path, limit, log_filename,
verbose, timeout, interface=neigh_attach_if)
utils_misc.close_log_file(log_filename)
开发者ID:cheliu,项目名称:virt-test,代码行数:32,代码来源:virt_vm.py
示例9: _generate_unique_id
def _generate_unique_id(self):
"""
Generate a unique identifier for this VM
"""
while True:
self.instance = time.strftime("%Y%m%d-%H%M%S-") + utils_misc.generate_random_string(8)
if not glob.glob("/tmp/*%s" % self.instance):
break
开发者ID:rbian,项目名称:virt-test,代码行数:8,代码来源:virt_vm.py
示例10: __init__
def __init__(self, vm_name, params):
self.name = vm_name
self.params = params
self.vm_type = self.params.get('vm_type')
self.driver_type = self.params.get('driver_type')
self.instance = ("%s-%s" % (
time.strftime("%Y%m%d-%H%M%S"),
utils_misc.generate_random_string(16)))
开发者ID:Andrei-Stepanov,项目名称:virt-test,代码行数:8,代码来源:utils_net_unittest.py
示例11: test_full_set
def test_full_set(self):
def what_func(propertea):
return props[propertea]
props = {}
for propertea in self.VirtIface.__all_slots__:
props[propertea] = utils_misc.generate_random_string(16)
virtiface = self.VirtIface(props)
self.loop_assert(virtiface, props.keys(), what_func)
开发者ID:Andrei-Stepanov,项目名称:virt-test,代码行数:8,代码来源:utils_net_unittest.py
示例12: test_half_set
def test_half_set(self):
half_prop_end = (len(self.VirtIface.__slots__) / 2) + 1
props = {}
for propertea in self.VirtIface.__slots__[0:half_prop_end]:
props[propertea] = utils_misc.generate_random_string(16)
virtiface = self.VirtIface(props)
what_func = lambda propertea:props[propertea]
self.loop_assert(virtiface, props.keys(), what_func)
开发者ID:HeidCloud,项目名称:virt-test,代码行数:8,代码来源:utils_net_unittest.py
示例13: cmd
def cmd(self, cmd, args=None, timeout=CMD_TIMEOUT, debug=True, fd=None):
"""
Send a QMP monitor command and return the response.
Note: an id is automatically assigned to the command and the response
is checked for the presence of the same id.
@param cmd: Command to send
@param args: A dict containing command arguments, or None
@param timeout: Time duration to wait for response
@param debug: Whether to print the commands being sent and responses
@param fd: file object or file descriptor to pass
@return: The response received
@raise MonitorLockError: Raised if the lock cannot be acquired
@raise MonitorSocketError: Raised if a socket error occurs
@raise MonitorProtocolError: Raised if no response is received
@raise QMPCmdError: Raised if the response is an error message
(the exception's args are (cmd, args, data)
where data is the error data)
"""
self._log_command(cmd, debug)
if not self._acquire_lock():
raise MonitorLockError("Could not acquire exclusive lock to send "
"QMP command '%s'" % cmd)
try:
# Read any data that might be available
self._read_objects()
# Send command
q_id = utils_misc.generate_random_string(8)
cmdobj = self._build_cmd(cmd, args, q_id)
if fd is not None:
if self._passfd is None:
self._passfd = passfd_setup.import_passfd()
# If command includes a file descriptor, use passfd module
self._passfd.sendfd(self._socket, fd, json.dumps(cmdobj) + "\n")
else:
self._send(json.dumps(cmdobj) + "\n")
# Read response
r = self._get_response(q_id, timeout)
if r is None:
raise MonitorProtocolError("Received no response to QMP "
"command '%s', or received a "
"response with an incorrect id"
% cmd)
if "return" in r:
ret = r["return"]
if ret:
self._log_response(cmd, ret, debug)
return ret
if "error" in r:
raise QMPCmdError(cmd, args, r["error"])
finally:
self._lock.release()
开发者ID:bonzini,项目名称:virt-test,代码行数:57,代码来源:qemu_monitor.py
示例14: test_apendex_set
def test_apendex_set(self):
"""
Verify container ignores unknown key names
"""
props = {}
for propertea in self.VirtIface.__slots__:
props[propertea] = utils_misc.generate_random_string(16)
more_props = {}
for idx in xrange(0,16):
more_props[utils_misc.generate_random_string(
16)] = utils_misc.generate_random_string(16)
#Keep seperated for testing
apendex_set = {}
apendex_set.update(props)
apendex_set.update(more_props)
virtiface = self.VirtIface(apendex_set)
what_func = lambda propertea:props[propertea]
# str(props) guarantees apendex set wasn't incorporated
self.loop_assert(virtiface, props.keys(), what_func, str(props))
开发者ID:tjamrisk,项目名称:autotest,代码行数:19,代码来源:utils_misc_unittest.py
示例15: get_backup_name
def get_backup_name(filename, backup_dir, good):
if not os.path.isdir(backup_dir):
os.makedirs(backup_dir)
basename = os.path.basename(filename)
if good:
backup_filename = "%s.backup" % basename
else:
backup_filename = ("%s.bad.%s" %
(basename,
utils_misc.generate_random_string(4)))
return os.path.join(backup_dir, backup_filename)
开发者ID:iwamatsu,项目名称:autotest,代码行数:11,代码来源:storage.py
示例16: test_apendex_set
def test_apendex_set(self):
"""
Verify container ignores unknown key names
"""
def what_func(propertea):
return props[propertea]
props = {}
for propertea in self.VirtIface.__all_slots__:
props[propertea] = utils_misc.generate_random_string(16)
more_props = {}
for _ in xrange(0, 16):
key = utils_misc.generate_random_string(16)
value = utils_misc.generate_random_string(16)
more_props[key] = value
# Keep separated for testing
apendex_set = {}
apendex_set.update(props)
apendex_set.update(more_props)
virtiface = self.VirtIface(apendex_set)
# str(props) guarantees apendex set wasn't incorporated
self.loop_assert(virtiface, props.keys(), what_func)
开发者ID:Andrei-Stepanov,项目名称:virt-test,代码行数:21,代码来源:utils_net_unittest.py
示例17: login
def login(self, nic_index=0, timeout=LOGIN_TIMEOUT,
username=None, password=None):
"""
Log into the guest via SSH/Telnet/Netcat.
If timeout expires while waiting for output from the guest (e.g. a
password prompt or a shell prompt) -- fail.
:param nic_index: The index of the NIC to connect to.
:param timeout: Time (seconds) before giving up logging into the
guest.
:return: A ShellSession object.
"""
error.context("logging into '%s'" % self.name)
if not username:
username = self.params.get("username", "")
if not password:
password = self.params.get("password", "")
prompt = self.params.get("shell_prompt", "[\#\$]")
linesep = eval("'%s'" % self.params.get("shell_linesep", r"\n"))
client = self.params.get("shell_client")
ip_version = self.params.get("ip_version", "ipv4").lower()
neigh_attach_if = ""
address = self.wait_for_get_address(nic_index, timeout=360,
ip_version=ip_version)
if address and address.lower().startswith("fe80"):
neigh_attach_if = utils_net.get_neigh_attch_interface(address)
port = self.get_port(int(self.params.get("shell_port")))
log_filename = ("session-%s-%s.log" %
(self.name, utils_misc.generate_random_string(4)))
session = remote.remote_login(client, address, port, username,
password, prompt, linesep,
log_filename, timeout,
interface=neigh_attach_if)
session.set_status_test_command(self.params.get("status_test_command",
""))
self.remote_sessions.append(session)
return session
开发者ID:aginies,项目名称:virt-test,代码行数:37,代码来源:virt_vm.py
示例18: _take_screendumps
def _take_screendumps(test, params, env):
global _screendump_thread_termination_event
temp_dir = test.debugdir
if params.get("screendump_temp_dir"):
temp_dir = utils_misc.get_path(test.bindir,
params.get("screendump_temp_dir"))
try:
os.makedirs(temp_dir)
except OSError:
pass
temp_filename = os.path.join(temp_dir, "scrdump-%s.ppm" %
utils_misc.generate_random_string(6))
delay = float(params.get("screendump_delay", 5))
quality = int(params.get("screendump_quality", 30))
cache = {}
counter = {}
while True:
for vm in env.get_all_vms():
if vm not in counter.keys():
counter[vm] = 0
if not vm.is_alive():
continue
try:
vm.screendump(filename=temp_filename, debug=False)
except kvm_monitor.MonitorError, e:
logging.warn(e)
continue
except AttributeError, e:
logging.warn(e)
continue
if not os.path.exists(temp_filename):
logging.warn("VM '%s' failed to produce a screendump", vm.name)
continue
if not ppm_utils.image_verify_ppm_file(temp_filename):
logging.warn("VM '%s' produced an invalid screendump", vm.name)
os.unlink(temp_filename)
continue
screendump_dir = os.path.join(test.debugdir,
"screendumps_%s" % vm.name)
try:
os.makedirs(screendump_dir)
except OSError:
pass
counter[vm] += 1
screendump_filename = os.path.join(screendump_dir, "%04d.jpg" %
counter[vm])
hsh = utils.hash_file(temp_filename)
if hsh in cache:
try:
os.link(cache[hsh], screendump_filename)
except OSError:
pass
else:
try:
try:
image = PIL.Image.open(temp_filename)
image.save(screendump_filename, format="JPEG",
quality=quality)
cache[hsh] = screendump_filename
except IOError, error_detail:
logging.warning("VM '%s' failed to produce a "
"screendump: %s", vm.name, error_detail)
# Decrement the counter as we in fact failed to
# produce a converted screendump
counter[vm] -= 1
except NameError:
pass
os.unlink(temp_filename)
开发者ID:ddefolo,项目名称:autotest,代码行数:70,代码来源:env_process.py
示例19: __init__
def __init__(self, command=None, id=None, auto_close=False, echo=False, linesep="\n"):
"""
Initialize the class and run command as a child process.
@param command: Command to run, or None if accessing an already running
server.
@param id: ID of an already running server, if accessing a running
server, or None if starting a new one.
@param auto_close: If True, close() the instance automatically when its
reference count drops to zero (default False).
@param echo: Boolean indicating whether echo should be initially
enabled for the pseudo terminal running the subprocess. This
parameter has an effect only when starting a new server.
@param linesep: Line separator to be appended to strings sent to the
child process by sendline().
"""
self.id = id or utils_misc.generate_random_string(8)
# Define filenames for communication with server
base_dir = "/tmp/kvm_spawn"
try:
os.makedirs(base_dir)
except Exception:
pass
(
self.shell_pid_filename,
self.status_filename,
self.output_filename,
self.inpipe_filename,
self.lock_server_running_filename,
self.lock_client_starting_filename,
) = _get_filenames(base_dir, self.id)
# Remember some attributes
self.auto_close = auto_close
self.echo = echo
self.linesep = linesep
# Make sure the 'readers' and 'close_hooks' attributes exist
if not hasattr(self, "readers"):
self.readers = []
if not hasattr(self, "close_hooks"):
self.close_hooks = []
# Define the reader filenames
self.reader_filenames = dict(
(reader, _get_reader_filename(base_dir, self.id, reader)) for reader in self.readers
)
# Let the server know a client intends to open some pipes;
# if the executed command terminates quickly, the server will wait for
# the client to release the lock before exiting
lock_client_starting = _lock(self.lock_client_starting_filename)
# Start the server (which runs the command)
if command:
sub = subprocess.Popen(
"%s %s" % (sys.executable, __file__),
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
# Send parameters to the server
sub.stdin.write("%s\n" % self.id)
sub.stdin.write("%s\n" % echo)
sub.stdin.write("%s\n" % ",".join(self.readers))
sub.stdin.write("%s\n" % command)
# Wait for the server to complete its initialization
while not "Server %s ready" % self.id in sub.stdout.readline():
pass
# Open the reading pipes
self.reader_fds = {}
try:
assert _locked(self.lock_server_running_filename)
for reader, filename in self.reader_filenames.items():
self.reader_fds[reader] = os.open(filename, os.O_RDONLY)
except Exception:
pass
# Allow the server to continue
_unlock(lock_client_starting)
开发者ID:vliaskov,项目名称:virt-test,代码行数:83,代码来源:aexpect.py
示例20: __init__
def __init__(self, address="", port=123, tmpdir=None):
self.instance = "%s-%s" % (time.strftime("%Y%m%d-%H%M%S"), utils_misc.generate_random_string(16))
self.port = port
开发者ID:MiriamDeng,项目名称:virt-test,代码行数:3,代码来源:utils_env_unittest.py
注:本文中的utils_misc.generate_random_string函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论