def postprocess_vm(test, params, env, name):
"""
Postprocess a single VM object according to the instructions in params.
Kill the VM if requested and get a screendump.
:param test: An Autotest test object.
:param params: A dict containing VM postprocessing parameters.
:param env: The environment (a dict-like object).
:param name: The name of the VM object.
"""
vm = env.get_vm(name)
if not vm:
return
# Close all SSH sessions that might be active to this VM
for s in vm.remote_sessions[:]:
try:
s.close()
vm.remote_sessions.remove(s)
except Exception:
pass
if params.get("kill_vm") == "yes":
kill_vm_timeout = float(params.get("kill_vm_timeout", 0))
if kill_vm_timeout:
utils_misc.wait_for(vm.is_dead, kill_vm_timeout, 0, 1)
vm.destroy(gracefully=params.get("kill_vm_gracefully") == "yes")
def reboot(self, session=None, method="shell", nic_index=0, timeout=240):
"""
Reboot the VM and wait for it to come back up by trying to log in until
timeout expires.
@param session: A shell session object or None.
@param method: Reboot method. Can be "shell" (send a shell reboot
command).
@param nic_index: Index of NIC to access in the VM, when logging in
after rebooting.
@param timeout: Time to wait for login to succeed (after rebooting).
@return: A new shell session object.
"""
error.base_context("rebooting '%s'" % self.name, logging.info)
error.context("before reboot")
session = session or self.login()
error.context()
if method == "shell":
session.sendline(self.params.get("reboot_command"))
else:
raise virt_vm.VMRebootError("Unknown reboot method: %s" % method)
error.context("waiting for guest to go down", logging.info)
if not utils_misc.wait_for(lambda: not session.is_responsive(timeout=30), 120, 0, 1):
raise virt_vm.VMRebootError("Guest refuses to go down")
session.close()
error.context("logging in after reboot", logging.info)
return self.wait_for_login(nic_index, timeout=timeout)
def destroy(self, gracefully=True, free_mac_addresses=True):
"""
Destroy the VM.
If gracefully is True, first attempt to shutdown the VM with a shell
command. If that fails, send SIGKILL to the qemu process.
@param gracefully: If True, an attempt will be made to end the VM
using a shell command before trying to end the qemu process
with a 'quit' or a kill signal.
@param free_mac_addresses: If vm is undefined with libvirt, also
release/reset associated mac address
"""
try:
# Is it already dead?
if self.is_alive():
logging.debug("Destroying VM")
if gracefully and self.params.get("shutdown_command"):
# Try to destroy with shell command
logging.debug("Trying to shutdown VM with shell command")
try:
session = self.login()
except (remote.LoginError, virt_vm.VMError), e:
logging.debug(e)
else:
try:
# Send the shutdown command
session.sendline(self.params.get("shutdown_command"))
logging.debug("Shutdown command sent; waiting for VM "
"to go down...")
if utils_misc.wait_for(self.is_dead, 60, 1, 1):
logging.debug("VM is down")
return
finally:
session.close()
virsh.destroy(self.name, uri=self.connect_uri)
finally:
if self.serial_console:
self.serial_console.close()
for f in ([self.get_testlog_filename(),
self.get_serial_console_filename()]):
try:
os.unlink(f)
except OSError:
pass
if hasattr(self, "migration_file"):
try:
os.unlink(self.migration_file)
except OSError:
pass
if free_mac_addresses:
if self.is_persistent():
logging.warning("Requested MAC address release from "
"persistent vm %s. Ignoring." % self.name)
else:
logging.debug("Releasing MAC addresses for vm %s." % self.name)
for nic_name in self.virtnet.nic_name_list():
self.virtnet.free_mac_address(nic_name)
def start(self):
"""
Starts this VM.
"""
self.uuid = virsh.domuuid(self.name, uri=self.connect_uri)
# Pull in mac addresses from libvirt guest definition
for index, nic in enumerate(self.virtnet):
try:
mac = self.get_virsh_mac_address(index)
if not nic.has_key("mac"):
logging.debug("Updating nic %d with mac %s on vm %s" % (index, mac, self.name))
nic.mac = mac
elif nic.mac != mac:
logging.warning(
"Requested mac %s doesn't match mac %s " "as defined for vm %s" % (nic.mac, mac, self.name)
)
# TODO: Checkout/Set nic_model, nettype, netdst also
except virt_vm.VMMACAddressMissingError:
logging.warning("Nic %d requested by test but not defined for" " vm %s" % (index, self.name))
if virsh.start(self.name, uri=self.connect_uri):
# Wait for the domain to be created
has_started = utils_misc.wait_for(
func=self.is_alive, timeout=60, text=("waiting for domain %s " "to start" % self.name)
)
if has_started is None:
raise virt_vm.VMStartError(self.name, "libvirt domain not " "active after start")
self.uuid = virsh.domuuid(self.name, uri=self.connect_uri)
else:
raise virt_vm.VMStartError(self.name, "libvirt domain failed " "to start")
def read_until_output_matches(
self, patterns, filter=lambda x: x, timeout=60, internal_timeout=None, print_func=None
):
"""
Read using read_nonblocking until a match is found using match_patterns,
or until timeout expires. Before attempting to search for a match, the
data is filtered using the filter function provided.
@brief: Read from child using read_nonblocking until a pattern
matches.
@param patterns: List of strings (regular expression patterns)
@param filter: Function to apply to the data read from the child before
attempting to match it against the patterns (should take and
return a string)
@param timeout: The duration (in seconds) to wait until a match is
found
@param internal_timeout: The timeout to pass to read_nonblocking
@param print_func: A function to be used to print the data being read
(should take a string parameter)
@return: Tuple containing the match index and the data read so far
@raise ExpectTimeoutError: Raised if timeout expires
@raise ExpectProcessTerminatedError: Raised if the child process
terminates while waiting for output
@raise ExpectError: Raised if an unknown error occurs
"""
fd = self._get_fd("expect")
o = ""
end_time = time.time() + timeout
while True:
try:
r, w, x = select.select([fd], [], [], max(0, end_time - time.time()))
except (select.error, TypeError):
break
if not r:
raise ExpectTimeoutError(patterns, o)
# Read data from child
data = self.read_nonblocking(internal_timeout, end_time - time.time())
if not data:
break
# Print it if necessary
if print_func:
for line in data.splitlines():
print_func(line)
# Look for patterns
o += data
match = self.match_patterns(filter(o), patterns)
if match is not None:
return match, o
# Check if the child has terminated
if utils_misc.wait_for(lambda: not self.is_alive(), 5, 0, 0.1):
raise ExpectProcessTerminatedError(patterns, self.get_status(), o)
else:
# This shouldn't happen
raise ExpectError(patterns, o)
def wait_for_get_address(self, nic_index_or_name, timeout=30, internal_timeout=1):
"""
Wait for a nic to acquire an IP address, then return it.
"""
# Don't let VMIPAddressMissingError/VMAddressVerificationError through
def _get_address():
try:
return self.get_address(nic_index_or_name)
except (VMIPAddressMissingError, VMAddressVerificationError):
return False
if not utils_misc.wait_for(_get_address, timeout, internal_timeout):
raise VMIPAddressMissingError(self.virtnet[nic_index_or_name].mac)
return self.get_address(nic_index_or_name)
def wait_for_get_address(self, nic_index_or_name, timeout=30, internal_timeout=1, ip_version="ipv4"):
"""
Wait for a nic to acquire an IP address, then return it.
For ipv6 linklocal address, we can generate it by nic mac,
so we can ignore this case
"""
# Don't let VMIPAddressMissingError/VMAddressVerificationError through
def _get_address():
try:
return self.get_address(nic_index_or_name)
except (VMIPAddressMissingError, VMAddressVerificationError):
return False
if not utils_misc.wait_for(_get_address, timeout, internal_timeout):
raise VMIPAddressMissingError(self.virtnet[nic_index_or_name].mac)
return self.get_address(nic_index_or_name)
def wait_for_get_address(self, nic_index_or_name, timeout=30,
internal_timeout=1, ip_version='ipv4'):
"""
Wait for a nic to acquire an IP address, then return it.
For ipv6 linklocal address, we can generate it by nic mac,
so we can ignore this case
"""
# Don't let VMIPAddressMissingError/VMAddressVerificationError through
def _get_address():
try:
return self.get_address(nic_index_or_name)
except (VMIPAddressMissingError, VMAddressVerificationError):
return False
if not utils_misc.wait_for(_get_address, timeout, internal_timeout):
if self.is_dead():
raise VMIPAddressMissingError(self.virtnet[nic_index_or_name].mac)
try:
s_session = None
# for windows guest make sure your guest supports
# login by serial_console
s_session = self.wait_for_serial_login()
nic_mac = self.get_mac_address(nic_index_or_name)
os_type = self.params.get("os_type")
try:
utils_net.renew_guest_ip(s_session, nic_mac,
os_type, ip_version)
return self.get_address(nic_index_or_name)
except (VMIPAddressMissingError, VMAddressVerificationError):
try:
nic_address = utils_net.get_guest_ip_addr(s_session,
nic_mac,
os_type,
ip_version)
if nic_address:
mac_key = nic_mac
if ip_version == "ipv6":
mac_key = "%s_6" % nic_mac
self.address_cache[mac_key.lower()] = nic_address
return nic_address
except Exception, err:
logging.debug("Can not get guest address, '%s'" % err)
raise VMIPAddressMissingError(nic_mac)
finally:
if s_session:
s_session.close()
return self.get_address(nic_index_or_name)
def setup(self):
"""
Access the iscsi target. And return the local raw device name.
"""
if self.iscsidevice.logged_in():
logging.warn("Session already present. Don't need to" " login again")
else:
self.iscsidevice.login()
if utils_misc.wait_for(self.iscsidevice.get_device_name, self.iscsi_init_timeout):
device_name = self.iscsidevice.get_device_name()
else:
raise error.TestError("Can not get iscsi device name in host" " in %ss" % self.iscsi_init_timeout)
if self.device_id:
device_name += self.device_id
return device_name
def preprocess(test, params, env):
"""
Preprocess all VMs and images according to the instructions in params.
Also, collect some host information, such as the KVM version.
@param test: An Autotest test object.
@param params: A dict containing all VM and image parameters.
@param env: The environment (a dict-like object).
"""
error.context("preprocessing")
# First, let's verify if this test does require root or not. If it
# does and the test suite is running as a regular user, we shall just
# throw a TestNAError exception, which will skip the test.
if params.get('requires_root', 'no') == 'yes':
utils_test.verify_running_as_root()
port = params.get('shell_port')
prompt = params.get('shell_prompt')
address = params.get('ovirt_node_address')
username = params.get('ovirt_node_user')
password = params.get('ovirt_node_password')
# Start tcpdump if it isn't already running
if "address_cache" not in env:
env["address_cache"] = {}
if "tcpdump" in env and not env["tcpdump"].is_alive():
env["tcpdump"].close()
del env["tcpdump"]
if "tcpdump" not in env and params.get("run_tcpdump", "yes") == "yes":
cmd = "%s -npvi any 'dst port 68'" % utils_misc.find_command("tcpdump")
if params.get("remote_preprocess") == "yes":
login_cmd = ("ssh -o UserKnownHostsFile=/dev/null -o \
PreferredAuthentications=password -p %s %[email protected]%s" %
(port, username, address))
env["tcpdump"] = aexpect.ShellSession(
login_cmd,
output_func=_update_address_cache,
output_params=(env["address_cache"],))
remote._remote_login(env["tcpdump"], username, password, prompt)
env["tcpdump"].sendline(cmd)
else:
env["tcpdump"] = aexpect.Tail(
command=cmd,
output_func=_update_address_cache,
output_params=(env["address_cache"],))
if utils_misc.wait_for(lambda: not env["tcpdump"].is_alive(),
0.1, 0.1, 1.0):
logging.warn("Could not start tcpdump")
logging.warn("Status: %s" % env["tcpdump"].get_status())
logging.warn("Output:" + utils_misc.format_str_for_message(
env["tcpdump"].get_output()))
# Destroy and remove VMs that are no longer needed in the environment
requested_vms = params.objects("vms")
for key in env.keys():
vm = env[key]
if not isinstance(vm, virt_vm.BaseVM):
continue
if not vm.name in requested_vms:
vm.destroy()
del env[key]
# Get Host cpu type
if params.get("auto_cpu_model") == "yes":
if not env.get("cpu_model"):
env["cpu_model"] = utils_misc.get_cpu_model()
params["cpu_model"] = env.get("cpu_model")
kvm_ver_cmd = params.get("kvm_ver_cmd", "")
if kvm_ver_cmd:
try:
cmd_result = utils.run(kvm_ver_cmd)
kvm_version = cmd_result.stdout.strip()
except error.CmdError:
kvm_version = "Unknown"
else:
# Get the KVM kernel module version and write it as a keyval
if os.path.exists("/dev/kvm"):
try:
kvm_version = open("/sys/module/kvm/version").read().strip()
except Exception:
kvm_version = os.uname()[2]
else:
logging.warning("KVM module not loaded")
kvm_version = "Unknown"
logging.debug("KVM version: %s" % kvm_version)
test.write_test_keyval({"kvm_version": kvm_version})
# Get the KVM userspace version and write it as a keyval
kvm_userspace_ver_cmd = params.get("kvm_userspace_ver_cmd", "")
if kvm_userspace_ver_cmd:
try:
cmd_result = utils.run(kvm_userspace_ver_cmd)
kvm_userspace_version = cmd_result.stdout.strip()
except error.CmdError:
kvm_userspace_version = "Unknown"
#.........这里部分代码省略.........
video = video_maker.GstPythonVideoMaker()
if (video.has_element('vp8enc') and video.has_element('webmmux')):
video_file = os.path.join(test.debugdir, "%s-%s.webm" %
(vm.name, test.iteration))
else:
video_file = os.path.join(test.debugdir, "%s-%s.ogg" %
(vm.name, test.iteration))
video.start(screendump_dir, video_file)
except Exception, detail:
logging.info("Video creation failed for vm %s: %s", vm.name, detail)
if params.get("kill_vm") == "yes":
kill_vm_timeout = float(params.get("kill_vm_timeout", 0))
if kill_vm_timeout:
utils_misc.wait_for(vm.is_dead, kill_vm_timeout, 0, 1)
vm.destroy(gracefully = params.get("kill_vm_gracefully") == "yes")
def process_command(test, params, env, command, command_timeout,
command_noncritical):
"""
Pre- or post- custom commands to be executed before/after a test is run
@param test: An Autotest test object.
@param params: A dict containing all VM and image parameters.
@param env: The environment (a dict-like object).
@param command: Command to be run.
@param command_timeout: Timeout for command execution.
@param command_noncritical: If True test will not fail if command fails.
"""
def preprocess(test, params, env):
"""
Preprocess all VMs and images according to the instructions in params.
Also, collect some host information, such as the KVM version.
:param test: An Autotest test object.
:param params: A dict containing all VM and image parameters.
:param env: The environment (a dict-like object).
"""
error.context("preprocessing")
# First, let's verify if this test does require root or not. If it
# does and the test suite is running as a regular user, we shall just
# throw a TestNAError exception, which will skip the test.
if params.get('requires_root', 'no') == 'yes':
utils_misc.verify_running_as_root()
port = params.get('shell_port')
prompt = params.get('shell_prompt')
address = params.get('ovirt_node_address')
username = params.get('ovirt_node_user')
password = params.get('ovirt_node_password')
setup_pb = False
for nic in params.get('nics', "").split():
nic_params = params.object_params(nic)
if nic_params.get('netdst') == 'private':
setup_pb = True
params_pb = nic_params
params['netdst_%s' % nic] = nic_params.get("priv_brname", 'atbr0')
if setup_pb:
brcfg = test_setup.PrivateBridgeConfig(params_pb)
brcfg.setup()
base_dir = data_dir.get_data_dir()
if params.get("storage_type") == "iscsi":
iscsidev = qemu_storage.Iscsidev(params, base_dir, "iscsi")
params["image_name"] = iscsidev.setup()
params["image_raw_device"] = "yes"
# Start tcpdump if it isn't already running
if "address_cache" not in env:
env["address_cache"] = {}
if "tcpdump" in env and not env["tcpdump"].is_alive():
env["tcpdump"].close()
del env["tcpdump"]
if "tcpdump" not in env and params.get("run_tcpdump", "yes") == "yes":
cmd = "%s -npvi any 'port 68'" % utils_misc.find_command("tcpdump")
if params.get("remote_preprocess") == "yes":
login_cmd = ("ssh -o UserKnownHostsFile=/dev/null -o \
PreferredAuthentications=password -p %s %[email protected]%s" %
(port, username, address))
env["tcpdump"] = aexpect.ShellSession(
login_cmd,
output_func=_update_address_cache,
output_params=(env["address_cache"],))
remote.handle_prompts(env["tcpdump"], username, password, prompt)
env["tcpdump"].sendline(cmd)
else:
env["tcpdump"] = aexpect.Tail(
command=cmd,
output_func=_tcpdump_handler,
output_params=(env["address_cache"], "tcpdump.log",))
if utils_misc.wait_for(lambda: not env["tcpdump"].is_alive(),
0.1, 0.1, 1.0):
logging.warn("Could not start tcpdump")
logging.warn("Status: %s" % env["tcpdump"].get_status())
logging.warn("Output:" + utils_misc.format_str_for_message(
env["tcpdump"].get_output()))
# Destroy and remove VMs that are no longer needed in the environment
requested_vms = params.objects("vms")
for key in env.keys():
vm = env[key]
if not isinstance(vm, virt_vm.BaseVM):
continue
if not vm.name in requested_vms:
vm.destroy()
del env[key]
if (params.get("auto_cpu_model") == "yes" and
params.get("vm_type") == "qemu"):
if not env.get("cpu_model"):
env["cpu_model"] = utils_misc.get_qemu_best_cpu_model(params)
params["cpu_model"] = env.get("cpu_model")
kvm_ver_cmd = params.get("kvm_ver_cmd", "")
if kvm_ver_cmd:
try:
cmd_result = utils.run(kvm_ver_cmd)
kvm_version = cmd_result.stdout.strip()
except error.CmdError:
kvm_version = "Unknown"
else:
# Get the KVM kernel module version and write it as a keyval
if os.path.exists("/dev/kvm"):
try:
kvm_version = open("/sys/module/kvm/version").read().strip()
#.........这里部分代码省略.........
def get_address(self, index=0):
"""
Return the IP address of a NIC or guest (in host space).
:param index: Name or index of the NIC whose address is requested.
:return: 'localhost': Port redirection is in use
:return: IP address of NIC if valid in arp cache.
:raise VMMACAddressMissingError: If no MAC address is defined for the
requested NIC
:raise VMIPAddressMissingError: If no IP address is found for the the
NIC's MAC address
:raise VMAddressVerificationError: If the MAC-IP address mapping cannot
be verified (using arping)
"""
nic = self.virtnet[index]
self.ip_version = self.params.get("ip_version", "ipv4").lower()
# TODO: Determine port redirection in use w/o checking nettype
if nic.nettype not in ['bridge', 'macvtap']:
hostname = socket.gethostname()
return socket.gethostbyname(hostname)
if not nic.has_key('mac') and self.params.get('vm_type') == 'libvirt':
# Look it up from xml
nic.mac = self.get_virsh_mac_address(index)
# else TODO: Look up mac from existing qemu-kvm process
if not nic.has_key('mac'):
raise VMMACAddressMissingError(index)
if self.ip_version == "ipv4":
# Get the IP address from arp cache, try upper and lower case
arp_ip = self.address_cache.get(nic.mac.upper())
if not arp_ip:
arp_ip = self.address_cache.get(nic.mac.lower())
if not arp_ip and os.geteuid() != 0:
# For non-root, tcpdump won't work for finding IP address,
# try arp
ip_map = utils_net.parse_arp()
arp_ip = ip_map.get(nic.mac.lower())
if arp_ip:
self.address_cache[nic.mac.lower()] = arp_ip
if not arp_ip:
raise VMIPAddressMissingError(nic.mac)
# Make sure the IP address is assigned to one or more macs
# for this guest
macs = self.virtnet.mac_list()
# SR-IOV cards may not be in same subnet with the card used by
# host by default, so arp checks won't work. Therefore, do not
# raise VMAddressVerificationError when SR-IOV is used.
nic_params = self.params.object_params(nic.nic_name)
pci_assignable = nic_params.get("pci_assignable") != "no"
if not utils_net.verify_ip_address_ownership(arp_ip, macs):
if pci_assignable:
msg = "Could not verify DHCP lease: %s-> %s." % (nic.mac,
arp_ip)
msg += (" Maybe %s is not in the same subnet "
"as the host (SR-IOV in use)" % arp_ip)
logging.error(msg)
else:
raise VMAddressVerificationError(nic.mac, arp_ip)
logging.debug('Found/Verified IP %s for VM %s NIC %s',
arp_ip, self.name, str(index))
return arp_ip
elif self.ip_version == "ipv6":
# Try to get and return IPV6 address
if self.params.get('using_linklocal') == "yes":
ipv6_addr = utils_net.ipv6_from_mac_addr(nic.mac)
# Using global address
else:
mac_key = "%s_6" % nic.mac
ipv6_addr = self.address_cache.get(mac_key.lower())
if not ipv6_addr:
raise VMIPAddressMissingError(nic.mac)
# Check whether the ipv6 address is reachable
utils_net.refresh_neigh_table(nic.netdst, ipv6_addr)
if not utils_misc.wait_for(lambda: utils_net.neigh_reachable(
ipv6_addr, nic.netdst),
30, 0, 1, "Wait neighbour reachable"):
raise VMAddressVerificationError(nic.mac, ipv6_addr)
return ipv6_addr
#.........这里部分代码省略.........
expected_hash = cdrom_params.get("md5sum_1m")
compare = True
elif cdrom_params.get("md5sum"):
logging.debug("Comparing expected MD5 sum with MD5 sum of " "ISO file...")
actual_hash = utils.hash_file(iso, method="md5")
expected_hash = cdrom_params.get("md5sum")
compare = True
elif cdrom_params.get("sha1sum"):
logging.debug("Comparing expected SHA1 sum with SHA1 sum " "of ISO file...")
actual_hash = utils.hash_file(iso, method="sha1")
expected_hash = cdrom_params.get("sha1sum")
compare = True
if compare:
if actual_hash == expected_hash:
logging.debug("Hashes match")
else:
raise virt_vm.VMHashMismatchError(actual_hash, expected_hash)
# Make sure the following code is not executed by more than one thread
# at the same time
lockfile = open("/tmp/libvirt-autotest-vm-create.lock", "w+")
fcntl.lockf(lockfile, fcntl.LOCK_EX)
try:
# Handle port redirections
redir_names = params.objects("redirs")
host_ports = utils_misc.find_free_ports(5000, 6000, len(redir_names))
self.redirs = {}
for i in range(len(redir_names)):
redir_params = params.object_params(redir_names[i])
guest_port = int(redir_params.get("guest_port"))
self.redirs[guest_port] = host_ports[i]
# Find available PCI devices
self.pci_devices = []
for device in params.objects("pci_devices"):
self.pci_devices.append(device)
# Find available VNC port, if needed
if params.get("display") == "vnc":
if params.get("vnc_autoport") == "yes":
self.vnc_port = None
self.vnc_autoport = True
else:
self.vnc_port = utils_misc.find_free_port(5900, 6100)
self.vnc_autoport = False
# Find available spice port, if needed
if params.get("spice"):
self.spice_port = utils_misc.find_free_port(8000, 8100)
# Find random UUID if specified 'uuid = random' in config file
if params.get("uuid") == "random":
f = open("/proc/sys/kernel/random/uuid")
self.uuid = f.read().strip()
f.close()
# Generate or copy MAC addresses for all NICs
for nic in self.virtnet:
nic_params = dict(nic)
if mac_source:
# Will raise exception if source doesn't
# have cooresponding nic
logging.debug("Copying mac for nic %s from VM %s" % (nic.nic_name, mac_source.nam))
nic_params["mac"] = mac_source.get_mac_address(nic.nic_name)
# make_create_command() calls vm.add_nic (i.e. on a copy)
nic = self.add_nic(**nic_params)
logging.debug("VM.create activating nic %s" % nic)
self.activate_nic(nic.nic_name)
# Make qemu command
install_command = self.make_create_command()
logging.info("Running libvirt command (reformatted):")
for item in install_command.replace(" -", " \n -").splitlines():
logging.info("%s", item)
utils.run(install_command, verbose=False)
# Wait for the domain to be created
utils_misc.wait_for(func=self.is_alive, timeout=60, text=("waiting for domain %s to start" % self.name))
self.uuid = virsh.domuuid(self.name, uri=self.connect_uri)
# Establish a session with the serial console
if self.only_pty == True:
self.serial_console = aexpect.ShellSession(
"virsh console %s" % self.name,
auto_close=False,
output_func=utils_misc.log_line,
output_params=("serial-%s.log" % name,),
)
else:
self.serial_console = aexpect.ShellSession(
"tail -f %s" % self.get_serial_console_filename(),
auto_close=False,
output_func=utils_misc.log_line,
output_params=("serial-%s.log" % name,),
)
finally:
fcntl.lockf(lockfile, fcntl.LOCK_UN)
lockfile.close()
请发表评论