本文整理汇总了Python中marvin.lib.utils.get_process_status函数的典型用法代码示例。如果您正苦于以下问题:Python get_process_status函数的具体用法?Python get_process_status怎么用?Python get_process_status使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_process_status函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: stop_master_router
def stop_master_router(self, vpc):
self.logger.debug("Stopping Master Router of VPC '%s'...", vpc.name)
routers = list_routers(self.api_client, domainid=self.domain.id, account=self.account.name, vpcid=vpc.id)
for router in routers:
if router.redundantstate == 'MASTER':
cmd = stopRouter.stopRouterCmd()
cmd.id = router.id
# This will not fail-over gracefully and cause a ~3.6sec downtime
# cmd.forced = 'true'
self.api_client.stopRouter(cmd)
break
for router in routers:
if router.state == 'Running':
hosts = list_hosts(self.api_client, zoneid=router.zoneid, type='Routing', state='Up', id=router.hostid)
self.assertTrue(isinstance(hosts, list))
host = next(iter(hosts or []), None)
try:
host.user, host.passwd = get_host_credentials(self.config, host.name)
get_process_status(host.ipaddress, 22, host.user, host.passwd, router.linklocalip, "sh /opt/cosmic/router/scripts/checkrouter.sh ")
except KeyError as e:
raise Exception("Exception: %s" % e)
self.logger.debug("Master Router of VPC '%s' stopped", vpc.name)
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:27,代码来源:test_privategw_acl.py
示例2: test_01_single_VPC_iptables_policies
def test_01_single_VPC_iptables_policies(self):
""" Test iptables default INPUT/FORWARD policies on VPC router """
self.logger.debug("Starting test_01_single_VPC_iptables_policies")
routers = self.entity_manager.query_routers()
self.assertEqual(isinstance(routers, list), True, "Check for list routers response return valid data")
self.entity_manager.create_network(self.services["vpc_network_offering"], self.vpc.id, "10.1.1.1")
self.entity_manager.create_network(self.services["vpc_network_offering_no_lb"], self.vpc.id, "10.1.2.1")
self.entity_manager.add_nat_rules(self.vpc.id)
self.entity_manager.do_vpc_test()
for router in routers:
if not router.isredundantrouter and router.vpcid:
hosts = list_hosts(self.apiclient, id=router.hostid)
self.assertEqual(isinstance(hosts, list), True, "Check for list hosts response return valid data")
host = hosts[0]
host.user = self.hostConfig["username"]
host.passwd = self.hostConfig["password"]
host.port = self.services["configurableData"]["host"]["port"]
tables = [self.services["configurableData"]["input"], self.services["configurableData"]["forward"]]
for table in tables:
result = None
if self.hypervisor.lower() in ("vmware", "hyperv"):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
"iptables -L %s" % table,
hypervisor=self.hypervisor,
)
else:
try:
result = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
"iptables -L %s" % table,
)
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s"
% self._testMethodName
)
self.logger.debug("iptables -L %s: %s" % (table, result))
res = str(result)
self.assertEqual(res.count("DROP"), 1, "%s Default Policy should be DROP" % table)
开发者ID:shapeblue,项目名称:Trillian,代码行数:58,代码来源:test_routers_iptables_default_policy.py
示例3: check_routers_state
def check_routers_state(self,count=2, status_to_check="MASTER", expected_count=1, showall=False):
vals = ["MASTER", "BACKUP", "UNKNOWN"]
cnts = [0, 0, 0]
self.wait_for_vrrp()
result = "UNKNOWN"
self.query_routers(count, showall)
for router in self.routers:
if router.state == "Running":
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up',
id=router.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list host returns a valid list"
)
host = hosts[0]
if self.hypervisor.lower() in ('vmware', 'hyperv'):
result = str(get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
"sh /opt/cloud/bin/checkrouter.sh ",
hypervisor=self.hypervisor
))
else:
try:
host.user, host.passwd = get_host_credentials(
self.config, host.ipaddress)
result = str(get_process_status(
host.ipaddress,
22,
host.user,
host.passwd,
router.linklocalip,
"sh /opt/cloud/bin/checkrouter.sh "
))
except KeyError:
self.skipTest(
"Marvin configuration has no host credentials to\
check router services")
if result.count(status_to_check) == 1:
cnts[vals.index(status_to_check)] += 1
if cnts[vals.index(status_to_check)] != expected_count:
self.fail("Expected '%s' routers at state '%s', but found '%s'!" % (expected_count, status_to_check, cnts[vals.index(status_to_check)]))
开发者ID:exoscale,项目名称:cloudstack,代码行数:57,代码来源:test_vpc_redundant.py
示例4: download_builtin_templates
def download_builtin_templates(apiclient, zoneid, hypervisor, host,
linklocalip, interval=60):
"""After setup wait till builtin templates are downloaded"""
# Change IPTABLES Rules
get_process_status(
host["ipaddress"],
host["port"],
host["username"],
host["password"],
linklocalip,
"iptables -P INPUT ACCEPT"
)
time.sleep(interval)
# Find the BUILTIN Templates for given Zone, Hypervisor
list_template_response = list_templates(
apiclient,
hypervisor=hypervisor,
zoneid=zoneid,
templatefilter='self'
)
if not isinstance(list_template_response, list):
raise Exception("Failed to download BUILTIN templates")
# Ensure all BUILTIN templates are downloaded
templateid = None
for template in list_template_response:
if template.templatetype == "BUILTIN":
templateid = template.id
# Sleep to ensure that template is in downloading state after adding
# Sec storage
time.sleep(interval)
while True:
template_response = list_templates(
apiclient,
id=templateid,
zoneid=zoneid,
templatefilter='self'
)
template = template_response[0]
# If template is ready,
# template.status = Download Complete
# Downloading - x% Downloaded
# Error - Any other string
if template.status == 'Download Complete':
break
elif 'Downloaded' in template.status:
time.sleep(interval)
elif 'Installing' not in template.status:
raise Exception("ErrorInDownload")
return
开发者ID:aali-dincloud,项目名称:cloudstack,代码行数:56,代码来源:common.py
示例5: wait_vm_ready
def wait_vm_ready(self, router, vmip):
self.logger.debug("Check whether VM %s is up" % vmip)
max_tries = 15
test_tries = 0
ping_result = 0
host = self.get_host_details(router)
while test_tries < max_tries:
try:
ping_result = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
"ping -c 1 " + vmip + ">/dev/null; echo $?"
)
# Return value 0 means we were able to ping
if int(ping_result[0]) == 0:
self.logger.debug("VM %s is pingable, give it 10s to request the password" % vmip)
time.sleep(10)
return True
except KeyError:
self.skipTest("Provide a marvin config file with host credentials to run %s" % self._testMethodName)
self.logger.debug("Ping result from the Router on IP '%s' is -> '%s'" % (router.linklocalip, ping_result[0]))
test_tries += 1
self.logger.debug("Executing vm ping %s/%s" % (test_tries, max_tries))
time.sleep(5)
return False
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:32,代码来源:test_password_server.py
示例6: test_dnsmasq_service_dhcp_dns_config
def test_dnsmasq_service_dhcp_dns_config(self, router, dns_server):
host = self.get_host_details(router)
nic = 'eth3'
# Get the dnsmasq config entry
command_to_execute = "grep '%s,6' /etc/dnsmasq.d/%s.conf" % (nic, nic)
dnsmasq_dns_config_result = ""
try:
dnsmasq_dns_config_result = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
command_to_execute)
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s" %
self._testMethodName)
command_result = str(dnsmasq_dns_config_result)
self.logger.debug("Got this response: %s " % command_result)
# Check to see if our VM is in the password file
if command_result.count(dns_server) == 0:
return False
return True
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:31,代码来源:test_password_and_dnsmasq_service.py
示例7: test_dhcphopts
def test_dhcphopts(self, ipaddress, router):
hosts = list_hosts(
self.apiclient,
id=router.hostid)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data")
host = hosts[0]
host.user = self.hostConfig['username']
host.passwd = self.hostConfig['password']
host.port = self.services["configurableData"]["host"]["port"]
host_tag = ipaddress.replace(".","_")
if self.hypervisor.lower() in ('vmware', 'hyperv'):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
"cat /etc/dhcpopts.txt | grep %s" % (host_tag),
hypervisor=self.hypervisor)
else:
try:
result = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
"cat /etc/dhcpopts.txt | grep %s" % (host_tag))
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s" %
self._testMethodName)
self.logger.debug("cat /etc/dhcpopts.txt | grep %s RESULT IS ==> %s" % (host_tag, result))
res = str(result)
self.assertNotEqual(
res.count(host_tag),
0,
"DHCP opts file does not contain exception for IP ==> %s!" % res)
开发者ID:priyankparihar,项目名称:cloudstack,代码行数:47,代码来源:test_router_dhcphosts.py
示例8: test_password_file_not_empty
def test_password_file_not_empty(self, vm, router):
hosts = list_hosts(
self.apiclient,
id=router.hostid)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data")
host = hosts[0]
host.user = self.hostConfig['username']
host.passwd = self.hostConfig['password']
host.port = self.services["configurableData"]["host"]["port"]
if self.hypervisor.lower() in ('vmware', 'hyperv'):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
"cat /var/cache/cloud/passwords-%s | grep %s | sed 's/=/ /g' | awk '{print $1}'" % (vm.nic[0].gateway, vm.nic[0].ipaddress),
hypervisor=self.hypervisor
)
else:
try:
result = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
"cat /var/cache/cloud/passwords-%s | grep %s | sed 's/=/ /g' | awk '{print $1}'" % (vm.nic[0].gateway, vm.nic[0].ipaddress))
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s" %
self._testMethodName)
self.logger.debug("cat /var/cache/cloud/passwords-%s | grep %s | sed 's/=/ /g' | awk '{print $1}' RESULT IS ==> %s" % (vm.nic[0].gateway, vm.nic[0].ipaddress, result))
res = str(result)
self.assertEqual(
res.count(vm.nic[0].ipaddress),
1,
"Password file is empty or doesn't exist!")
开发者ID:Accelerite,项目名称:cloudstack,代码行数:47,代码来源:test_password_server.py
示例9: test_dhcphosts
def test_dhcphosts(self, vm, router):
hosts = list_hosts(
self.apiclient,
id=router.hostid)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data")
host = hosts[0]
host.user = self.hostConfig['username']
host.passwd = self.hostConfig['password']
host.port = self.services["configurableData"]["host"]["port"]
if self.hypervisor.lower() in ('vmware', 'hyperv'):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
"cat /etc/dhcphosts.txt | grep %s | sed 's/\,/ /g' | awk '{print $2}'" % (vm.nic[0].ipaddress),
hypervisor=self.hypervisor)
else:
try:
result = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
"cat /etc/dhcphosts.txt | grep %s | sed 's/\,/ /g' | awk '{print $2}'" % (vm.nic[0].ipaddress))
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s" %
self._testMethodName)
self.logger.debug("cat /etc/dhcphosts.txt | grep %s | sed 's/\,/ /g' | awk '{print $2}' RESULT IS ==> %s" % (vm.nic[0].ipaddress, result))
res = str(result)
self.assertEqual(
res.count(vm.nic[0].ipaddress),
1,
"DHCP hosts file contains duplicate IPs ==> %s!" % res)
开发者ID:priyankparihar,项目名称:cloudstack,代码行数:46,代码来源:test_router_dhcphosts.py
示例10: test_01_routervm_iptables_policies
def test_01_routervm_iptables_policies(self):
""" Test iptables default INPUT/FORWARD policy on RouterVM """
self.logger.debug("Starting test_01_routervm_iptables_policies")
vm1 = self.entity_manager.deployvm()
routers = self.entity_manager.query_routers()
self.assertEqual(
isinstance(routers, list), True,
"Check for list routers response return valid data")
for router in routers:
if not router.isredundantrouter and not router.vpcid:
hosts = list_hosts(
self.apiclient,
id=router.hostid)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data")
host = hosts[0]
host.user = self.services["configurableData"]["host"]["username"]
host.passwd = self.services["configurableData"]["host"]["password"]
host.port = self.services["configurableData"]["host"]["port"]
tables = [self.services["configurableData"]["input"], self.services["configurableData"]["forward"]]
for table in tables:
try:
result = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
'iptables -L %s' % table)
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s" %
self._testMethodName)
self.logger.debug("iptables -L %s: %s" % (table, result))
res = str(result)
self.assertEqual(
res.count("policy DROP"),
1,
"%s Default Policy should be DROP" % table)
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:51,代码来源:test_router_ip_tables_policies.py
示例11: getCommandResultFromRouter
def getCommandResultFromRouter(self, router, command):
"""Run given command on router and return the result"""
if (self.hypervisor.lower() == 'vmware'
or self.hypervisor.lower() == 'hyperv'):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
command,
hypervisor=self.hypervisor
)
else:
hosts = list_hosts(
self.apiclient,
id=router.hostid,
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data"
)
host = hosts[0]
host.user = self.services["configurableData"]["host"]["username"]
host.passwd = self.services["configurableData"]["host"]["password"]
result = get_process_status(
host.ipaddress,
22,
host.user,
host.passwd,
router.linklocalip,
command
)
return result
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:37,代码来源:test_network.py
示例12: get_router_state
def get_router_state(self, router):
host = self.get_host_details(router)
router_state = "UNKNOWN"
try:
router_state = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
"/opt/cosmic/router/scripts/checkrouter.sh | cut -d\" \" -f2"
)
except:
self.logger.debug("Oops, unable to determine redundant state for router with link local address %s" % (router.linklocalip))
pass
self.logger.debug("The router with link local address %s reports state %s" % (router.linklocalip, router_state))
return router_state[0]
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:18,代码来源:test_vpc_vpn.py
示例13: check_routers_interface
def check_routers_interface(self,count=2, interface_to_check="eth1", expected_exists=True, showall=False):
result = ""
self.query_routers(count, showall)
for router in self.routers:
if router.state == "Running":
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up',
id=router.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list host returns a valid list"
)
host = hosts[0]
try:
host.user, host.passwd = get_host_credentials(self.config, host.ipaddress)
result = str(get_process_status(
host.ipaddress,
22,
host.user,
host.passwd,
router.linklocalip,
"ip a | grep %s | grep state | awk '{print $9;}'" % interface_to_check
))
except KeyError:
self.skipTest("Marvin configuration has no host credentials to check router services")
if expected_exists:
if (result.count("UP") == 1) or (result.count("DOWN") == 1):
self.logger.debug("Expected interface '%s' to exist and it does!" % interface_to_check)
else:
self.fail("Expected interface '%s' to exist, but it didn't!" % interface_to_check)
else:
if (result.count("UP") == 1) or (result.count("DOWN") == 1):
self.fail("Expected interface '%s' to not exist, but it did!" % interface_to_check)
else:
self.logger.debug("Expected interface '%s' to not exist, and it didn't!" % interface_to_check)
开发者ID:EdwardBetts,项目名称:blackhole,代码行数:44,代码来源:test_vpc_redundant.py
示例14: test_password_server_logs
def test_password_server_logs(self, vm, router):
host = self.get_host_details(router)
router_state = self.get_router_state(router)
if router.isredundantrouter and router_state != "MASTER":
print "Found router in non-MASTER state '" + router.redundantstate + "' so skipping test."
return True
# Get the related passwd server logs for our vm
command_to_execute = "grep %s /var/log/messages" % vm.nic[0].ipaddress
password_log_result = ""
try:
password_log_result = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
command_to_execute)
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s" %
self._testMethodName)
command_result = str(password_log_result)
self.logger.debug("Got this response: %s " % command_result)
# Check to see if our VM is in the password file
if command_result.count("password saved for VM IP") == 0:
return False
# Check if the password was retrieved from the passwd server. If it is, the actual password is replaced with 'saved_password'
if command_result.count("password sent to") == 0:
return False
return True
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:39,代码来源:test_password_server.py
示例15: test_process_running
def test_process_running(self, find_process, router):
host = self.get_host_details(router)
number_of_processes_found = 0
try:
number_of_processes_found = get_process_status(
host.ipaddress,
host.port,
host.user,
host.passwd,
router.linklocalip,
"ps aux | grep " + find_process + " | grep -v grep | wc -l"
)
except KeyError:
self.skipTest("Provide a marvin config file with host credentials to run %s" % self._testMethodName)
self.logger.debug("Result from the Router on IP '%s' is -> Number of processess found: '%s'" % (router.linklocalip, number_of_processes_found[0]))
expected_nr_or_processes = 1
self.assertEqual(int(number_of_processes_found[0]), expected_nr_or_processes,
msg="Router should have " + str(expected_nr_or_processes) + " '" + find_process + "' processes running, found " + str(number_of_processes_found[0]))
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:23,代码来源:test_password_server.py
示例16: test_05_del_cidr_verify_alias_removal
def test_05_del_cidr_verify_alias_removal(self):
"""Destroy lastvm in the CIDR and verifly alias removal
1.Deploy guest vm in new cidr
2.Verify ip alias creation
3.Destroy vm and wait for it to expunge
4.Verify ip alias removal after vm expunge
"""
list_router_response = list_routers(
self.apiclient,
zoneid=self.zone.id,
listall=True
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up',
id=router.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list host returns a valid list"
)
host = hosts[0]
self.debug("Router ID: %s, state: %s" % (router.id, router.state))
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
port = self.testdata['configurableData']['host']["publicport"]
username = self.testdata['configurableData']['host']["username"]
password = self.testdata['configurableData']['host']["password"]
# SSH to host so that host key is saved in first
# attempt
SshClient(host.ipaddress, port, username, password)
proc = "ip addr show eth0"
result = get_process_status(
host.ipaddress,
port,
username,
password,
router.linklocalip,
proc
)
res = str(result)
self.debug("ip alias configuration on VR: %s" % res)
self.assertNotEqual(
res.find(self.alias_ip)
- 1,
"ip alias is not created on VR eth0"
)
self.virtual_machine.delete(self.apiclient)
self.debug(
"Verify that expunging the last vm in the CIDR should\
delete the ip alias from VR")
ip_alias2 = self.dbclient.execute(
"select ip4_address from nic_ip_alias;"
)
self.assertEqual(
isinstance(ip_alias2, list),
True,
"Error in sql query"
)
self.assertEqual(
len(ip_alias2),
0,
"Failure in clearing ip alias entry from cloud db"
)
proc = "ip addr show eth0"
result = get_process_status(
host.ipaddress,
port,
username,
password,
router.linklocalip,
proc
)
res = str(result)
self.assertEqual(
res.find(
self.alias_ip),
- 1,
"Failed to clean up ip alias from VR even after\
last vm expunge in the CIDR")
self.debug("IP alias got deleted from VR successfully.")
self.cleanup.remove(self.virtual_machine)
return
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:99,代码来源:test_multiple_ip_ranges.py
示例17: test_01_FTPModulesInVR
def test_01_FTPModulesInVR(self):
"""
@desc: Verify FTP modules are loaded in VR of advance zone
step1 : create a VR in advance zone
step2: Verify FTP modules are there in created VR
"""
if self.zone.networktype == "Basic":
self.skipTest("This test can be run only in advance zone")
# create a virtual machine
vm = VirtualMachine.create(
self.api_client,
self.services["small"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.small_offering.id,
mode=self.services["mode"]
)
self.assertIsNotNone(vm, "Failed to deploy virtual machine")
self.cleanup.append(vm)
response = VirtualMachine.list(self.api_client, id=vm.id)
status = validateList(response)
self.assertEqual(
status[0],
PASS,
"list vm response returned invalid list")
list_router_response = list_routers(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
status = validateList(list_router_response)
self.assertEqual(
status[0], PASS, "Check list response returns a valid list")
router = list_router_response[0]
self.debug("Router ID: %s, state: %s" % (router.id, router.state))
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
if self.hypervisor.lower() in ('vmware', 'hyperv'):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
"lsmod | grep ftp",
hypervisor=self.hypervisor
)
else:
try:
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up',
id=router.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list host returns a valid list"
)
host = hosts[0]
result = get_process_status(
host.ipaddress,
22,
self.hostConfig["username"],
self.hostConfig["password"],
router.linklocalip,
"lsmod | grep ftp"
)
except Exception as e:
raise Exception("Exception raised in getting host\
credentials: %s " % e)
res = str(result)
self.debug("lsmod | grep ftp: %s" % res)
if "nf_nat_ftp" in res and "nf_conntrack_ftp" in res:
ismoduleinstalled = True
else:
ismoduleinstalled = False
self.assertEqual(
ismoduleinstalled,
True,
"nf_conntrack_ftp and nf_nat_ftp modules not installed on routers")
return
开发者ID:PCextreme,项目名称:cloudstack,代码行数:95,代码来源:test_escalations_routers.py
示例18: test_11_ss_nfs_version_on_ssvm
def test_11_ss_nfs_version_on_ssvm(self):
"""Test NFS Version on Secondary Storage mounted properly on SSVM
"""
# 1) List SSVM in zone
# 2) Get id and url from mounted nfs store
# 3) Update NFS version for previous image store
# 4) Stop SSVM
# 5) Check NFS version of mounted nfs store after SSVM starts
nfs_version = self.config.nfsVersion
if nfs_version == None:
self.skipTest('No NFS version provided in test data')
#List SSVM for zone id
list_ssvm_response = list_ssvms(
self.apiclient,
systemvmtype='secondarystoragevm',
state='Running',
zoneid=self.zone.id
)
self.assertNotEqual(
list_ssvm_response,
None
)
self.assertEqual(
isinstance(list_ssvm_response, list),
True,
"Check list response returns a valid list"
)
self.assertEqual(
len(list_ssvm_response),
1,
"Check list System VMs response"
)
ssvm = list_ssvm_response[0]
image_stores_response = ImageStore.list(self.apiclient,zoneid=self.zone.id)
if self.hypervisor.lower() in ('vmware', 'hyperv'):
# SSH into SSVMs is done via management server for Vmware and Hyper-V
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
ssvm.privateip,
"mount | grep 'type nfs'",
hypervisor=self.hypervisor)
for res in result:
split_res = res.split("on")
mounted_img_store_url = split_res[0].strip()
for img_store in image_stores_response:
img_store_url = str(img_store.url)
if img_store_url.startswith("nfs://"):
img_store_url = img_store_url[6:]
#Add colon after ip address to match output from mount command
first_slash = img_store_url.find('/')
img_store_url = img_store_url[0:first_slash] + ':' + img_store_url[first_slash:]
if img_store_url == mounted_img_store_url:
img_store_id = img_store.id
break
self.assertNotEqual(
img_store_id,
None,
"Check image store id mounted on SSVM"
)
#Update NFS version for image store mounted on SSVM
updateConfigurationCmd = updateConfiguration.updateConfigurationCmd()
updateConfigurationCmd.name = "secstorage.nfs.version"
updateConfigurationCmd.value = nfs_version
updateConfigurationCmd.imagestoreuuid = img_store_id
updateConfigurationResponse = self.apiclient.updateConfiguration(updateConfigurationCmd)
self.logger.debug("updated the parameter %s with value %s"%(updateConfigurationResponse.name, updateConfigurationResponse.value))
#Stop SSVM
self.debug("Stopping SSVM: %s" % ssvm.id)
cmd = stopSystemVm.stopSystemVmCmd()
cmd.id = ssvm.id
self.apiclient.stopSystemVm(cmd)
def checkForRunningSSVM():
new_list_ssvm_response = list_ssvms(
self.apiclient,
id=ssvm.id
)
if isinstance(new_list_ssvm_response, list):
return new_list_ssvm_response[0].state == 'Running', None
res, _ = wait_until(self.services["sleep"], self.services["timeout"], checkForRunningSSVM)
if not res:
self.fail("List SSVM call failed!")
new_list_ssvm_response = list_ssvms(
self.apiclient,
id=ssvm.id
#.........这里部分代码省略.........
开发者ID:krissterckx,项目名称:cloudstack,代码行数:101,代码来源:test_ssvm.py
示例19: test_07_stop_start_VR_verify_ip_alias
def test_07_stop_start_VR_verify_ip_alias(self):
&
|
请发表评论