• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.get_process_status函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中marvin.lib.utils.get_process_status函数的典型用法代码示例。如果您正苦于以下问题:Python get_process_status函数的具体用法?Python get_process_status怎么用?Python get_process_status使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_process_status函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: stop_master_router

    def stop_master_router(self, vpc):

        self.logger.debug("Stopping Master Router of VPC '%s'...", vpc.name)
        routers = list_routers(self.api_client, domainid=self.domain.id, account=self.account.name, vpcid=vpc.id)
        for router in routers:
            if router.redundantstate == 'MASTER':
                cmd = stopRouter.stopRouterCmd()
                cmd.id = router.id
                # This will not fail-over gracefully and cause a ~3.6sec downtime
                # cmd.forced = 'true'
                self.api_client.stopRouter(cmd)
                break

        for router in routers:
            if router.state == 'Running':
                hosts = list_hosts(self.api_client, zoneid=router.zoneid, type='Routing', state='Up', id=router.hostid)
                self.assertTrue(isinstance(hosts, list))
                host = next(iter(hosts or []), None)

                try:
                    host.user, host.passwd = get_host_credentials(self.config, host.name)
                    get_process_status(host.ipaddress, 22, host.user, host.passwd, router.linklocalip, "sh /opt/cosmic/router/scripts/checkrouter.sh ")

                except KeyError as e:
                    raise Exception("Exception: %s" % e)

        self.logger.debug("Master Router of VPC '%s' stopped", vpc.name)
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:27,代码来源:test_privategw_acl.py


示例2: test_01_single_VPC_iptables_policies

    def test_01_single_VPC_iptables_policies(self):
        """ Test iptables default INPUT/FORWARD policies on VPC router """
        self.logger.debug("Starting test_01_single_VPC_iptables_policies")

        routers = self.entity_manager.query_routers()

        self.assertEqual(isinstance(routers, list), True, "Check for list routers response return valid data")

        self.entity_manager.create_network(self.services["vpc_network_offering"], self.vpc.id, "10.1.1.1")
        self.entity_manager.create_network(self.services["vpc_network_offering_no_lb"], self.vpc.id, "10.1.2.1")

        self.entity_manager.add_nat_rules(self.vpc.id)
        self.entity_manager.do_vpc_test()

        for router in routers:
            if not router.isredundantrouter and router.vpcid:
                hosts = list_hosts(self.apiclient, id=router.hostid)
                self.assertEqual(isinstance(hosts, list), True, "Check for list hosts response return valid data")

                host = hosts[0]
                host.user = self.hostConfig["username"]
                host.passwd = self.hostConfig["password"]
                host.port = self.services["configurableData"]["host"]["port"]
                tables = [self.services["configurableData"]["input"], self.services["configurableData"]["forward"]]

                for table in tables:
                    result = None
                    if self.hypervisor.lower() in ("vmware", "hyperv"):
                        result = get_process_status(
                            self.apiclient.connection.mgtSvr,
                            22,
                            self.apiclient.connection.user,
                            self.apiclient.connection.passwd,
                            router.linklocalip,
                            "iptables -L %s" % table,
                            hypervisor=self.hypervisor,
                        )
                    else:
                        try:
                            result = get_process_status(
                                host.ipaddress,
                                host.port,
                                host.user,
                                host.passwd,
                                router.linklocalip,
                                "iptables -L %s" % table,
                            )
                        except KeyError:
                            self.skipTest(
                                "Provide a marvin config file with host\
                                        credentials to run %s"
                                % self._testMethodName
                            )

                    self.logger.debug("iptables -L %s: %s" % (table, result))
                    res = str(result)

                    self.assertEqual(res.count("DROP"), 1, "%s Default Policy should be DROP" % table)
开发者ID:shapeblue,项目名称:Trillian,代码行数:58,代码来源:test_routers_iptables_default_policy.py


示例3: check_routers_state

    def check_routers_state(self,count=2, status_to_check="MASTER", expected_count=1, showall=False):
        vals = ["MASTER", "BACKUP", "UNKNOWN"]
        cnts = [0, 0, 0]

        self.wait_for_vrrp()

        result = "UNKNOWN"
        self.query_routers(count, showall)
        for router in self.routers:
            if router.state == "Running":
                hosts = list_hosts(
                    self.apiclient,
                    zoneid=router.zoneid,
                    type='Routing',
                    state='Up',
                    id=router.hostid
                )
                self.assertEqual(
                    isinstance(hosts, list),
                    True,
                    "Check list host returns a valid list"
                )
                host = hosts[0]

                if self.hypervisor.lower() in ('vmware', 'hyperv'):
                        result = str(get_process_status(
                            self.apiclient.connection.mgtSvr,
                            22,
                            self.apiclient.connection.user,
                            self.apiclient.connection.passwd,
                            router.linklocalip,
                            "sh /opt/cloud/bin/checkrouter.sh ",
                            hypervisor=self.hypervisor
                        ))
                else:
                    try:
                        host.user, host.passwd = get_host_credentials(
                            self.config, host.ipaddress)
                        result = str(get_process_status(
                            host.ipaddress,
                            22,
                            host.user,
                            host.passwd,
                            router.linklocalip,
                            "sh /opt/cloud/bin/checkrouter.sh "
                        ))

                    except KeyError:
                        self.skipTest(
                            "Marvin configuration has no host credentials to\
                                    check router services")
            
                if result.count(status_to_check) == 1:
                    cnts[vals.index(status_to_check)] += 1

        if cnts[vals.index(status_to_check)] != expected_count:
            self.fail("Expected '%s' routers at state '%s', but found '%s'!" % (expected_count, status_to_check, cnts[vals.index(status_to_check)]))
开发者ID:exoscale,项目名称:cloudstack,代码行数:57,代码来源:test_vpc_redundant.py


示例4: download_builtin_templates

def download_builtin_templates(apiclient, zoneid, hypervisor, host,
                               linklocalip, interval=60):
    """After setup wait till builtin templates are downloaded"""

    # Change IPTABLES Rules
    get_process_status(
        host["ipaddress"],
        host["port"],
        host["username"],
        host["password"],
        linklocalip,
        "iptables -P INPUT ACCEPT"
    )
    time.sleep(interval)
    # Find the BUILTIN Templates for given Zone, Hypervisor
    list_template_response = list_templates(
        apiclient,
        hypervisor=hypervisor,
        zoneid=zoneid,
        templatefilter='self'
    )

    if not isinstance(list_template_response, list):
        raise Exception("Failed to download BUILTIN templates")

    # Ensure all BUILTIN templates are downloaded
    templateid = None
    for template in list_template_response:
        if template.templatetype == "BUILTIN":
            templateid = template.id

    # Sleep to ensure that template is in downloading state after adding
    # Sec storage
    time.sleep(interval)
    while True:
        template_response = list_templates(
            apiclient,
            id=templateid,
            zoneid=zoneid,
            templatefilter='self'
        )
        template = template_response[0]
        # If template is ready,
        # template.status = Download Complete
        # Downloading - x% Downloaded
        # Error - Any other string
        if template.status == 'Download Complete':
            break

        elif 'Downloaded' in template.status:
            time.sleep(interval)

        elif 'Installing' not in template.status:
            raise Exception("ErrorInDownload")

    return
开发者ID:aali-dincloud,项目名称:cloudstack,代码行数:56,代码来源:common.py


示例5: wait_vm_ready

    def wait_vm_ready(self, router, vmip):
        self.logger.debug("Check whether VM %s is up" % vmip)
        max_tries = 15
        test_tries = 0
        ping_result = 0
        host = self.get_host_details(router)

        while test_tries < max_tries:
            try:
                ping_result = get_process_status(
                    host.ipaddress,
                    host.port,
                    host.user,
                    host.passwd,
                    router.linklocalip,
                    "ping -c 1 " + vmip + ">/dev/null; echo $?"
                )
                # Return value 0 means we were able to ping
                if int(ping_result[0]) == 0:
                    self.logger.debug("VM %s is pingable, give it 10s to request the password" % vmip)
                    time.sleep(10)
                    return True

            except KeyError:
                self.skipTest("Provide a marvin config file with host credentials to run %s" % self._testMethodName)

            self.logger.debug("Ping result from the Router on IP '%s' is -> '%s'" % (router.linklocalip, ping_result[0]))

            test_tries += 1
            self.logger.debug("Executing vm ping %s/%s" % (test_tries, max_tries))
            time.sleep(5)
        return False
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:32,代码来源:test_password_server.py


示例6: test_dnsmasq_service_dhcp_dns_config

    def test_dnsmasq_service_dhcp_dns_config(self, router, dns_server):
        host = self.get_host_details(router)

        nic = 'eth3'

        # Get the dnsmasq config entry
        command_to_execute = "grep '%s,6' /etc/dnsmasq.d/%s.conf" % (nic, nic)

        dnsmasq_dns_config_result = ""
        try:
            dnsmasq_dns_config_result = get_process_status(
                host.ipaddress,
                host.port,
                host.user,
                host.passwd,
                router.linklocalip,
                command_to_execute)
        except KeyError:
            self.skipTest(
                "Provide a marvin config file with host\
                        credentials to run %s" %
                self._testMethodName)

        command_result = str(dnsmasq_dns_config_result)
        self.logger.debug("Got this response: %s " % command_result)

        # Check to see if our VM is in the password file
        if command_result.count(dns_server) == 0:
            return False

        return True
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:31,代码来源:test_password_and_dnsmasq_service.py


示例7: test_dhcphopts

    def test_dhcphopts(self, ipaddress, router):
        hosts = list_hosts(
            self.apiclient,
            id=router.hostid)

        self.assertEqual(
            isinstance(hosts, list),
            True,
            "Check for list hosts response return valid data")

        host = hosts[0]
        host.user = self.hostConfig['username']
        host.passwd = self.hostConfig['password']
        host.port = self.services["configurableData"]["host"]["port"]

        host_tag = ipaddress.replace(".","_")
        if self.hypervisor.lower() in ('vmware', 'hyperv'):
            result = get_process_status(
                self.apiclient.connection.mgtSvr,
                22,
                self.apiclient.connection.user,
                self.apiclient.connection.passwd,
                router.linklocalip,
                "cat /etc/dhcpopts.txt | grep %s" % (host_tag),
                hypervisor=self.hypervisor)
        else:
            try:
                result = get_process_status(
                    host.ipaddress,
                    host.port,
                    host.user,
                    host.passwd,
                    router.linklocalip,
                    "cat /etc/dhcpopts.txt | grep %s" % (host_tag))
            except KeyError:
                self.skipTest(
                    "Provide a marvin config file with host\
                            credentials to run %s" %
                    self._testMethodName)

        self.logger.debug("cat /etc/dhcpopts.txt | grep %s RESULT IS ==> %s" % (host_tag, result))
        res = str(result)

        self.assertNotEqual(
            res.count(host_tag),
            0,
            "DHCP opts file does not contain exception for IP ==> %s!" % res)
开发者ID:priyankparihar,项目名称:cloudstack,代码行数:47,代码来源:test_router_dhcphosts.py


示例8: test_password_file_not_empty

    def test_password_file_not_empty(self, vm, router):
        hosts = list_hosts(
            self.apiclient,
            id=router.hostid)

        self.assertEqual(
            isinstance(hosts, list),
            True,
            "Check for list hosts response return valid data")

        host = hosts[0]
        host.user = self.hostConfig['username']
        host.passwd = self.hostConfig['password']
        host.port = self.services["configurableData"]["host"]["port"]

        if self.hypervisor.lower() in ('vmware', 'hyperv'):
            result = get_process_status(
                self.apiclient.connection.mgtSvr,
                22,
                self.apiclient.connection.user,
                self.apiclient.connection.passwd,
                router.linklocalip,
                "cat /var/cache/cloud/passwords-%s | grep %s | sed 's/=/ /g' | awk '{print $1}'" % (vm.nic[0].gateway, vm.nic[0].ipaddress),
                hypervisor=self.hypervisor
            )
        else:
            try:
                result = get_process_status(
                    host.ipaddress,
                    host.port,
                    host.user,
                    host.passwd,
                    router.linklocalip,
                    "cat /var/cache/cloud/passwords-%s | grep %s | sed 's/=/ /g' | awk '{print $1}'" % (vm.nic[0].gateway, vm.nic[0].ipaddress))
            except KeyError:
                self.skipTest(
                    "Provide a marvin config file with host\
                            credentials to run %s" %
                    self._testMethodName)

        self.logger.debug("cat /var/cache/cloud/passwords-%s | grep %s | sed 's/=/ /g' | awk '{print $1}' RESULT IS ==> %s" % (vm.nic[0].gateway, vm.nic[0].ipaddress, result))
        res = str(result)
        
        self.assertEqual(
            res.count(vm.nic[0].ipaddress),
            1,
            "Password file is empty or doesn't exist!")
开发者ID:Accelerite,项目名称:cloudstack,代码行数:47,代码来源:test_password_server.py


示例9: test_dhcphosts

    def test_dhcphosts(self, vm, router):
        hosts = list_hosts(
            self.apiclient,
            id=router.hostid)

        self.assertEqual(
            isinstance(hosts, list),
            True,
            "Check for list hosts response return valid data")

        host = hosts[0]
        host.user = self.hostConfig['username']
        host.passwd = self.hostConfig['password']
        host.port = self.services["configurableData"]["host"]["port"]

        if self.hypervisor.lower() in ('vmware', 'hyperv'):
            result = get_process_status(
                self.apiclient.connection.mgtSvr,
                22,
                self.apiclient.connection.user,
                self.apiclient.connection.passwd,
                router.linklocalip,
                "cat /etc/dhcphosts.txt | grep %s | sed 's/\,/ /g' | awk '{print $2}'" % (vm.nic[0].ipaddress),
                hypervisor=self.hypervisor)
        else:
            try:
                result = get_process_status(
                    host.ipaddress,
                    host.port,
                    host.user,
                    host.passwd,
                    router.linklocalip,
                    "cat /etc/dhcphosts.txt | grep %s | sed 's/\,/ /g' | awk '{print $2}'" % (vm.nic[0].ipaddress))
            except KeyError:
                self.skipTest(
                    "Provide a marvin config file with host\
                            credentials to run %s" %
                    self._testMethodName)

        self.logger.debug("cat /etc/dhcphosts.txt | grep %s | sed 's/\,/ /g' | awk '{print $2}' RESULT IS ==> %s" % (vm.nic[0].ipaddress, result))
        res = str(result)

        self.assertEqual(
            res.count(vm.nic[0].ipaddress),
            1,
            "DHCP hosts file contains duplicate IPs ==> %s!" % res)
开发者ID:priyankparihar,项目名称:cloudstack,代码行数:46,代码来源:test_router_dhcphosts.py


示例10: test_01_routervm_iptables_policies

    def test_01_routervm_iptables_policies(self):
        """ Test iptables default INPUT/FORWARD policy on RouterVM """

        self.logger.debug("Starting test_01_routervm_iptables_policies")

        vm1 = self.entity_manager.deployvm()

        routers = self.entity_manager.query_routers()

        self.assertEqual(
            isinstance(routers, list), True,
            "Check for list routers response return valid data")

        for router in routers:
            if not router.isredundantrouter and not router.vpcid:
                hosts = list_hosts(
                    self.apiclient,
                    id=router.hostid)
                self.assertEqual(
                    isinstance(hosts, list),
                    True,
                    "Check for list hosts response return valid data")

                host = hosts[0]
                host.user = self.services["configurableData"]["host"]["username"]
                host.passwd = self.services["configurableData"]["host"]["password"]
                host.port = self.services["configurableData"]["host"]["port"]
                tables = [self.services["configurableData"]["input"], self.services["configurableData"]["forward"]]

                for table in tables:
                    try:
                        result = get_process_status(
                            host.ipaddress,
                            host.port,
                            host.user,
                            host.passwd,
                            router.linklocalip,
                            'iptables -L %s' % table)
                    except KeyError:
                        self.skipTest(
                            "Provide a marvin config file with host\
                                    credentials to run %s" %
                            self._testMethodName)

                    self.logger.debug("iptables -L %s: %s" % (table, result))
                    res = str(result)

                    self.assertEqual(
                        res.count("policy DROP"),
                        1,
                        "%s Default Policy should be DROP" % table)
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:51,代码来源:test_router_ip_tables_policies.py


示例11: getCommandResultFromRouter

    def getCommandResultFromRouter(self, router, command):
        """Run given command on router and return the result"""

        if (self.hypervisor.lower() == 'vmware'
                or self.hypervisor.lower() == 'hyperv'):
            result = get_process_status(
                self.apiclient.connection.mgtSvr,
                22,
                self.apiclient.connection.user,
                self.apiclient.connection.passwd,
                router.linklocalip,
                command,
                hypervisor=self.hypervisor
            )
        else:
            hosts = list_hosts(
                self.apiclient,
                id=router.hostid,
            )
            self.assertEqual(
                isinstance(hosts, list),
                True,
                "Check for list hosts response return valid data"
            )
            host = hosts[0]
            host.user = self.services["configurableData"]["host"]["username"]
            host.passwd = self.services["configurableData"]["host"]["password"]

            result = get_process_status(
                host.ipaddress,
                22,
                host.user,
                host.passwd,
                router.linklocalip,
                command
            )
        return result
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:37,代码来源:test_network.py


示例12: get_router_state

    def get_router_state(self, router):
        host = self.get_host_details(router)

        router_state = "UNKNOWN"
        try:
            router_state = get_process_status(
                host.ipaddress,
                host.port,
                host.user,
                host.passwd,
                router.linklocalip,
                "/opt/cosmic/router/scripts/checkrouter.sh | cut -d\" \" -f2"
            )
        except:
            self.logger.debug("Oops, unable to determine redundant state for router with link local address %s" % (router.linklocalip))
            pass
        self.logger.debug("The router with link local address %s reports state %s" % (router.linklocalip, router_state))
        return router_state[0]
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:18,代码来源:test_vpc_vpn.py


示例13: check_routers_interface

    def check_routers_interface(self,count=2, interface_to_check="eth1", expected_exists=True, showall=False):
        result = ""

        self.query_routers(count, showall)
        for router in self.routers:
            if router.state == "Running":
                hosts = list_hosts(
                    self.apiclient,
                    zoneid=router.zoneid,
                    type='Routing',
                    state='Up',
                    id=router.hostid
                )
                self.assertEqual(
                    isinstance(hosts, list),
                    True,
                    "Check list host returns a valid list"
                )
                host = hosts[0]

                try:
                    host.user, host.passwd = get_host_credentials(self.config, host.ipaddress)
                    result = str(get_process_status(
                        host.ipaddress,
                        22,
                        host.user,
                        host.passwd,
                        router.linklocalip,
                        "ip a | grep %s | grep state | awk '{print $9;}'" % interface_to_check
                    ))

                except KeyError:
                    self.skipTest("Marvin configuration has no host credentials to check router services")

                if expected_exists:
                    if (result.count("UP") == 1) or (result.count("DOWN") == 1):
                        self.logger.debug("Expected interface '%s' to exist and it does!" % interface_to_check)
                    else:
                        self.fail("Expected interface '%s' to exist, but it didn't!" % interface_to_check)
                else:
                    if (result.count("UP") == 1) or (result.count("DOWN") == 1):
                        self.fail("Expected interface '%s' to not exist, but it did!" % interface_to_check)
                    else:
                        self.logger.debug("Expected interface '%s' to not exist, and it didn't!" % interface_to_check)
开发者ID:EdwardBetts,项目名称:blackhole,代码行数:44,代码来源:test_vpc_redundant.py


示例14: test_password_server_logs

    def test_password_server_logs(self, vm, router):
        host = self.get_host_details(router)

        router_state = self.get_router_state(router)

        if router.isredundantrouter and router_state != "MASTER":
            print "Found router in non-MASTER state '" + router.redundantstate + "' so skipping test."
            return True

        # Get the related passwd server logs for our vm
        command_to_execute = "grep %s /var/log/messages" % vm.nic[0].ipaddress

        password_log_result = ""
        try:
            password_log_result = get_process_status(
                host.ipaddress,
                host.port,
                host.user,
                host.passwd,
                router.linklocalip,
                command_to_execute)
        except KeyError:
            self.skipTest(
                "Provide a marvin config file with host\
                        credentials to run %s" %
                self._testMethodName)

        command_result = str(password_log_result)
        self.logger.debug("Got this response: %s " % command_result)

        # Check to see if our VM is in the password file
        if command_result.count("password saved for VM IP") == 0:
            return False

        # Check if the password was retrieved from the passwd server. If it is, the actual password is replaced with 'saved_password'
        if command_result.count("password sent to") == 0:
            return False

        return True
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:39,代码来源:test_password_server.py


示例15: test_process_running

    def test_process_running(self, find_process, router):
        host = self.get_host_details(router)

        number_of_processes_found = 0
        try:
            number_of_processes_found = get_process_status(
                host.ipaddress,
                host.port,
                host.user,
                host.passwd,
                router.linklocalip,
                "ps aux | grep " + find_process + " | grep -v grep | wc -l"
            )

        except KeyError:
            self.skipTest("Provide a marvin config file with host credentials to run %s" % self._testMethodName)

        self.logger.debug("Result from the Router on IP '%s' is -> Number of processess found: '%s'" % (router.linklocalip, number_of_processes_found[0]))

        expected_nr_or_processes = 1

        self.assertEqual(int(number_of_processes_found[0]), expected_nr_or_processes,
                         msg="Router should have " + str(expected_nr_or_processes) + " '" + find_process + "' processes running, found " + str(number_of_processes_found[0]))
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:23,代码来源:test_password_server.py


示例16: test_05_del_cidr_verify_alias_removal

    def test_05_del_cidr_verify_alias_removal(self):
        """Destroy lastvm in the CIDR and verifly alias removal
            1.Deploy guest vm in new cidr
            2.Verify ip alias creation
            3.Destroy vm and wait for it to expunge
            4.Verify ip alias removal after vm expunge
        """
        list_router_response = list_routers(
            self.apiclient,
            zoneid=self.zone.id,
            listall=True
        )
        self.assertEqual(
            isinstance(list_router_response, list),
            True,
            "Check list response returns a valid list"
        )
        router = list_router_response[0]
        hosts = list_hosts(
            self.apiclient,
            zoneid=router.zoneid,
            type='Routing',
            state='Up',
            id=router.hostid
        )
        self.assertEqual(
            isinstance(hosts, list),
            True,
            "Check list host returns a valid list"
        )
        host = hosts[0]
        self.debug("Router ID: %s, state: %s" % (router.id, router.state))
        self.assertEqual(
            router.state,
            'Running',
            "Check list router response for router state"
        )

        port = self.testdata['configurableData']['host']["publicport"]
        username = self.testdata['configurableData']['host']["username"]
        password = self.testdata['configurableData']['host']["password"]

        # SSH to host so that host key is saved in first
        # attempt
        SshClient(host.ipaddress, port, username, password)

        proc = "ip addr show eth0"
        result = get_process_status(
            host.ipaddress,
            port,
            username,
            password,
            router.linklocalip,
            proc
        )
        res = str(result)
        self.debug("ip alias configuration on VR: %s" % res)
        self.assertNotEqual(
            res.find(self.alias_ip)
            - 1,
            "ip alias is not created on VR eth0"
        )
        self.virtual_machine.delete(self.apiclient)
        self.debug(
            "Verify that expunging the last vm in the CIDR should\
                    delete the ip alias from VR")
        ip_alias2 = self.dbclient.execute(
            "select ip4_address from nic_ip_alias;"
        )
        self.assertEqual(
            isinstance(ip_alias2, list),
            True,
            "Error in sql query"
        )
        self.assertEqual(
            len(ip_alias2),
            0,
            "Failure in clearing ip alias entry from cloud db"
        )

        proc = "ip addr show eth0"
        result = get_process_status(
            host.ipaddress,
            port,
            username,
            password,
            router.linklocalip,
            proc
        )
        res = str(result)
        self.assertEqual(
            res.find(
                self.alias_ip),
            - 1,
            "Failed to clean up ip alias from VR even after\
                    last vm expunge in the CIDR")
        self.debug("IP alias got deleted from VR successfully.")
        self.cleanup.remove(self.virtual_machine)
        return
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:99,代码来源:test_multiple_ip_ranges.py


示例17: test_01_FTPModulesInVR

    def test_01_FTPModulesInVR(self):
        """
        @desc: Verify FTP modules are loaded in VR of advance zone
        step1 : create a VR in advance zone
        step2: Verify FTP modules are there in created VR
        """
        if self.zone.networktype == "Basic":
            self.skipTest("This test can be run only in advance zone")

        # create a virtual machine
        vm = VirtualMachine.create(
            self.api_client,
            self.services["small"],
            accountid=self.account.name,
            domainid=self.account.domainid,
            serviceofferingid=self.small_offering.id,
            mode=self.services["mode"]
        )
        self.assertIsNotNone(vm, "Failed to deploy virtual machine")
        self.cleanup.append(vm)
        response = VirtualMachine.list(self.api_client, id=vm.id)
        status = validateList(response)
        self.assertEqual(
            status[0],
            PASS,
            "list vm response returned invalid list")
        list_router_response = list_routers(
            self.apiclient,
            account=self.account.name,
            domainid=self.account.domainid
        )
        status = validateList(list_router_response)
        self.assertEqual(
            status[0], PASS, "Check list response returns a valid list")
        router = list_router_response[0]

        self.debug("Router ID: %s, state: %s" % (router.id, router.state))

        self.assertEqual(
            router.state,
            'Running',
            "Check list router response for router state"
        )

        if self.hypervisor.lower() in ('vmware', 'hyperv'):
            result = get_process_status(
                self.apiclient.connection.mgtSvr,
                22,
                self.apiclient.connection.user,
                self.apiclient.connection.passwd,
                router.linklocalip,
                "lsmod | grep ftp",
                hypervisor=self.hypervisor
            )
        else:
            try:
                hosts = list_hosts(
                    self.apiclient,
                    zoneid=router.zoneid,
                    type='Routing',
                    state='Up',
                    id=router.hostid
                )

                self.assertEqual(
                    isinstance(hosts, list),
                    True,
                    "Check list host returns a valid list"
                )

                host = hosts[0]
                result = get_process_status(
                    host.ipaddress,
                    22,
                    self.hostConfig["username"],
                    self.hostConfig["password"],
                    router.linklocalip,
                    "lsmod | grep ftp"
                )

            except Exception as e:
                raise Exception("Exception raised in getting host\
                        credentials: %s " % e)

        res = str(result)
        self.debug("lsmod | grep ftp: %s" % res)
        if "nf_nat_ftp" in res and "nf_conntrack_ftp" in res:
            ismoduleinstalled = True
        else:
            ismoduleinstalled = False
        self.assertEqual(
            ismoduleinstalled,
            True,
            "nf_conntrack_ftp and nf_nat_ftp modules not installed on routers")
        return
开发者ID:PCextreme,项目名称:cloudstack,代码行数:95,代码来源:test_escalations_routers.py


示例18: test_11_ss_nfs_version_on_ssvm

    def test_11_ss_nfs_version_on_ssvm(self):
        """Test NFS Version on Secondary Storage mounted properly on SSVM
        """

        # 1) List SSVM in zone
        # 2) Get id and url from mounted nfs store
        # 3) Update NFS version for previous image store
        # 4) Stop SSVM
        # 5) Check NFS version of mounted nfs store after SSVM starts 

        nfs_version = self.config.nfsVersion
        if nfs_version == None:
            self.skipTest('No NFS version provided in test data')

        #List SSVM for zone id
        list_ssvm_response = list_ssvms(
            self.apiclient,
            systemvmtype='secondarystoragevm',
            state='Running',
            zoneid=self.zone.id
        )
        self.assertNotEqual(
            list_ssvm_response,
            None
        )
        self.assertEqual(
            isinstance(list_ssvm_response, list),
            True,
            "Check list response returns a valid list"
        )
        self.assertEqual(
            len(list_ssvm_response),
            1,
            "Check list System VMs response"
        )

        ssvm = list_ssvm_response[0]
        image_stores_response = ImageStore.list(self.apiclient,zoneid=self.zone.id)

        if self.hypervisor.lower() in ('vmware', 'hyperv'):
            # SSH into SSVMs is done via management server for Vmware and Hyper-V
            result = get_process_status(
                self.apiclient.connection.mgtSvr,
                22,
                self.apiclient.connection.user,
                self.apiclient.connection.passwd,
                ssvm.privateip,
                "mount | grep 'type nfs'",
                hypervisor=self.hypervisor)

        for res in result:
            split_res = res.split("on")
            mounted_img_store_url = split_res[0].strip()
            for img_store in image_stores_response:
                img_store_url = str(img_store.url)
                if img_store_url.startswith("nfs://"):
                    img_store_url = img_store_url[6:]
                    #Add colon after ip address to match output from mount command
                    first_slash = img_store_url.find('/')
                    img_store_url = img_store_url[0:first_slash] + ':' + img_store_url[first_slash:]
                    if img_store_url == mounted_img_store_url:
                        img_store_id = img_store.id
                        break

        self.assertNotEqual(
            img_store_id,
            None,
            "Check image store id mounted on SSVM"
        )

        #Update NFS version for image store mounted on SSVM
        updateConfigurationCmd = updateConfiguration.updateConfigurationCmd()
        updateConfigurationCmd.name = "secstorage.nfs.version"
        updateConfigurationCmd.value = nfs_version
        updateConfigurationCmd.imagestoreuuid = img_store_id

        updateConfigurationResponse = self.apiclient.updateConfiguration(updateConfigurationCmd)
        self.logger.debug("updated the parameter %s with value %s"%(updateConfigurationResponse.name, updateConfigurationResponse.value))

        #Stop SSVM
        self.debug("Stopping SSVM: %s" % ssvm.id)
        cmd = stopSystemVm.stopSystemVmCmd()
        cmd.id = ssvm.id
        self.apiclient.stopSystemVm(cmd)

        def checkForRunningSSVM():
            new_list_ssvm_response = list_ssvms(
                self.apiclient,
                id=ssvm.id
            )
            if isinstance(new_list_ssvm_response, list):
                return new_list_ssvm_response[0].state == 'Running', None                
            
        res, _ = wait_until(self.services["sleep"], self.services["timeout"], checkForRunningSSVM)
        if not res:
            self.fail("List SSVM call failed!")
        
        new_list_ssvm_response = list_ssvms(
                self.apiclient,
                id=ssvm.id
#.........这里部分代码省略.........
开发者ID:krissterckx,项目名称:cloudstack,代码行数:101,代码来源:test_ssvm.py


示例19: test_07_stop_start_VR_verify_ip_alias

    def test_07_stop_start_VR_verify_ip_alias(self):
        & 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.random_gen函数代码示例发布时间:2022-05-27
下一篇:
Python utils.cleanup_resources函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap