• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python remoteSSHClient.remoteSSHClient函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中marvin.remoteSSHClient.remoteSSHClient函数的典型用法代码示例。如果您正苦于以下问题:Python remoteSSHClient函数的具体用法?Python remoteSSHClient怎么用?Python remoteSSHClient使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了remoteSSHClient函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_process_status

def get_process_status(hostip, port, username, password, linklocalip, process, hypervisor=None):
    """Double hop and returns a process status"""

    #SSH to the machine
    ssh = remoteSSHClient(hostip, port, username, password)
    if str(hypervisor).lower() == 'vmware':
        ssh_command = "ssh -i /var/cloudstack/management/.ssh/id_rsa -ostricthostkeychecking=no "
    else:
        ssh_command = "ssh -i ~/.ssh/id_rsa.cloud -ostricthostkeychecking=no "

    ssh_command = ssh_command +\
                  "-oUserKnownHostsFile=/dev/null -p 3922 %s %s" % (
                      linklocalip,
                      process)

    # Double hop into router
    timeout = 5
    # Ensure the SSH login is successful
    while True:
        res = ssh.execute(ssh_command)

        if res[0] != "Host key verification failed.":
            break
        elif timeout == 0:
            break

        time.sleep(5)
        timeout = timeout - 1
    return res
开发者ID:fengzhanghome,项目名称:cloudstack,代码行数:29,代码来源:utils.py


示例2: download_systemplates_sec_storage

def download_systemplates_sec_storage(server, services):
    """Download System templates on sec storage"""

    try:
        # Login to management server
        ssh = remoteSSHClient(server["ipaddress"], server["port"], server["username"], server["password"])
    except Exception:
        raise Exception("SSH access failted for server with IP address: %s" % server["ipaddess"])
    # Mount Secondary Storage on Management Server
    cmds = [
        "mkdir -p %s" % services["mnt_dir"],
        "mount -t nfs %s:/%s %s" % (services["sec_storage"], services["path"], services["mnt_dir"]),
        "%s -m %s -u %s -h %s -F"
        % (services["command"], services["mnt_dir"], services["download_url"], services["hypervisor"]),
    ]
    for c in cmds:
        result = ssh.execute(c)

    res = str(result)

    # Unmount the Secondary storage
    ssh.execute("umount %s" % (services["mnt_dir"]))

    if res.count("Successfully installed system VM template") == 1:
        return
    else:
        raise Exception("Failed to download System Templates on Sec Storage")
    return
开发者ID:mba811,项目名称:incubator-cloudstack,代码行数:28,代码来源:common.py


示例3: is_server_ssh_ready

def is_server_ssh_ready(ipaddress, port, username, password, retries=10, timeout=30, keyPairFileLocation=None):
    """Return ssh handle else wait till sshd is running"""
    try:
        ssh = remoteSSHClient(
            host=ipaddress,
            port=port,
            user=username,
            passwd=password,
            keyPairFileLocation=keyPairFileLocation,
            retries=retries,
            delay=timeout)
    except Exception, e:
        raise Exception("Failed to bring up ssh service in time. Waited %ss. Error is %s" % (retries * timeout, e))
开发者ID:fengzhanghome,项目名称:cloudstack,代码行数:13,代码来源:utils.py


示例4: test_DeployVm

    def test_DeployVm(self):
        """
        Let's start by defining the attributes of our VM that we will be
        deploying on CloudStack. We will be assuming a single zone is available
        and is configured and all templates are Ready

        The hardcoded values are used only for brevity. 
        """
        deployVmCmd = deployVirtualMachine.deployVirtualMachineCmd()
        deployVmCmd.zoneid = self.zone.uuid
        deployVmCmd.templateid = self.template.uuid #CentOS 5.6 builtin
        deployVmCmd.serviceofferingid = self.service_offering.uuid

        deployVmResponse = self.apiClient.deployVirtualMachine(deployVmCmd)
        self.debug("VM %s was deployed in the job %s"%(deployVmResponse.id, deployVmResponse.jobid))

        # At this point our VM is expected to be Running. Let's find out what
        # listVirtualMachines tells us about VMs in this account

        listVmCmd = listVirtualMachines.listVirtualMachinesCmd()
        listVmCmd.id = deployVmResponse.id
        listVmResponse = self.apiClient.listVirtualMachines(listVmCmd)

        self.assertNotEqual(len(listVmResponse), 0, "Check if the list API \
                            returns a non-empty response")

        vm = listVmResponse[0]
        self.assertEqual(vm.state, "Running", "Check if VM has reached Running state in CS")

        hostname = vm.name
        nattedip = self.setUpNAT(vm.id)

        self.assertEqual(vm.id, deployVmResponse.id, "Check if the VM returned \
                         is the same as the one we deployed")


        self.assertEqual(vm.state, "Running", "Check if VM has reached \
                         a state of running")

        # SSH login and compare hostname        
        self.debug("Attempting to SSH into %s over %s of %s"%(nattedip, "22", vm.name))
        ssh_client = remoteSSHClient(nattedip, "22", "root", "password")
        stdout = ssh_client.execute("hostname")

        self.assertEqual(hostname, stdout[0], "cloudstack VM name and hostname \
                         do not match")
开发者ID:AsherBond,项目名称:incubator-cloudstack,代码行数:46,代码来源:testSshDeployVM.py


示例5: try_ssh

    def try_ssh(self, ip_addr, hostnames):
        try:
            self.debug(
                "SSH into NAT Rule (Public IP: %s)" % ip_addr)

            # If Round Robin Algorithm is chosen,
            # each ssh command should alternate between VMs

            ssh_1  = remoteSSHClient(
                                    ip_addr,
                                    22,
                                    self.services["natrule"]["username"],
                                    self.services["natrule"]["password"]
                                    )
            hostnames.append(ssh_1.execute("hostname")[0])
            self.debug(hostnames)
        except Exception as e:
            self.fail("%s: SSH failed for VM with IP Address: %s" %
                                    (e, ip_addr))
        return hostnames
开发者ID:amoghv,项目名称:cloudstack,代码行数:20,代码来源:test_haproxy.py


示例6: is_snapshot_on_nfs

    def is_snapshot_on_nfs(self, snapshot_id):
        """
        Checks whether a snapshot with id (not UUID) `snapshot_id` is present on the nfs storage

        @param snapshot_id: id of the snapshot (not uuid)
        @return: True if snapshot is found, False otherwise
        """
        secondaryStores = ImageStore.list(self.apiclient, zoneid=self.zone.id)
        self.assertTrue(isinstance(secondaryStores, list), "Not a valid response for listImageStores")
        self.assertNotEqual(len(secondaryStores), 0, "No image stores found in zone %s" % self.zone.id)
        secondaryStore = secondaryStores[0]
        if str(secondaryStore.providername).lower() != "nfs":
            self.skipTest("TODO: %s test works only against nfs secondary storage" % self._testMethodName)

        qresultset = self.dbclient.execute(
            "select install_path from snapshot_store_ref where snapshot_id='%s' and store_role='Image';" % snapshot_id
        )
        self.assertEqual(
            isinstance(qresultset, list),
            True,
            "Invalid db query response for snapshot %s" % snapshot_id
        )
        self.assertNotEqual(
            len(qresultset),
            0,
            "No such snapshot %s found in the cloudstack db" % snapshot_id
        )
        snapshotPath = qresultset[0][0]
        nfsurl = secondaryStore.url
        # parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
        from urllib2 import urlparse
        parse_url = urlparse.urlsplit(nfsurl, scheme='nfs')
        host, path = parse_url.netloc, parse_url.path
        # Sleep to ensure that snapshot is reflected in sec storage
        time.sleep(self.services["sleep"])
        snapshots = []
        try:
            # Login to Secondary storage VM to check snapshot present on sec disk
            ssh_client = remoteSSHClient(
                self.config.mgtSvr[0].mgtSvrIp,
                22,
                self.config.mgtSvr[0].user,
                self.config.mgtSvr[0].passwd,
            )

            cmds = [
                "mkdir -p %s" % self.services["paths"]["mount_dir"],
                "mount -t %s %s%s %s" % (
                    'nfs',
                    host,
                    path,
                    self.services["paths"]["mount_dir"]
                    ),
                "ls %s" % (
                    os.path.join(self.services["paths"]["mount_dir"], snapshotPath)
                    ),
            ]

            for c in cmds:
                self.debug("command: %s" % c)
                result = ssh_client.execute(c)
                self.debug("Result: %s" % result)

            snapshots.extend(result)
            # Unmount the Sec Storage
            cmds = [
                "cd",
                "umount %s" % (self.services["paths"]["mount_dir"]),
            ]
            for c in cmds:
                ssh_client.execute(c)
        except Exception as e:
            self.fail("SSH failed for management server: %s - %s" %
                      (self.config.mgtSvr[0].mgtSvrIp, e))
        return snapshots.count(snapshot_id) == 1
开发者ID:lafferty,项目名称:cloudstack,代码行数:75,代码来源:test_snapshot_gc.py


示例7: test_09_appcookie_leastconn

    def test_09_appcookie_leastconn(self):
        """Test Create a "AppCookie" stick policy for a Lb rule with leastconn
        """

        # Validate the following
        # 1. Configure Netscaler for load balancing.
        # 2. Create a Network offering with LB services provided by Netscaler
        #    and all other services by VR.
        # 3. Create a new account/user.
        # 4. Deploy few VMs using a network from the above created Network
        #    offering.
        # 5. Create a "AppCookie" stick policy for a Lb rule with
        #    "leastconn" algorithm

        self.debug(
            "Creating LB rule for IP address: %s with leastconn algo" %
                                        self.public_ip.ipaddress.ipaddress)

        self.services["lbrule"]["alg"] = 'leastconn'
        self.services["lbrule"]["publicport"] = 80
        self.services["lbrule"]["privateport"] = 80
        lb_rule = LoadBalancerRule.create(
                                    self.apiclient,
                                    self.services["lbrule"],
                                    ipaddressid=self.public_ip.ipaddress.id,
                                    accountid=self.account.name,
                                    networkid=self.network.id
                                )
        self.cleanup.append(lb_rule)
        self.debug("Created the load balancing rule for public IP: %s" %
                                            self.public_ip.ipaddress.ipaddress)

        self.debug("Assigning VM instance: %s to LB rule: %s" % (
                                                    self.virtual_machine.name,
                                                    lb_rule.name
                                                    ))
        lb_rule.assign(self.apiclient, [self.virtual_machine])
        self.debug("Assigned VM instance: %s to lb rule: %s" % (
                                                    self.virtual_machine.name,
                                                    lb_rule.name
                                                    ))
        self.debug(
            "Configuring 'SourceBased' Sticky policy on lb rule: %s" %
                                                                lb_rule.name)
        try:
            result = lb_rule.createSticky(
                             self.apiclient,
                             methodname='AppCookie',
                             name='AppCookieLeastConn',
                             param={"name": 20}
                             )
            self.debug("Response: %s" % result)
        except Exception as e:
            self.fail("Configure sticky policy failed with exception: %s" % e)

        self.debug("SSH into Netscaler to check whether sticky policy configured properly or not?")
        self.debug("SSH into netscaler: %s" %
                                    self.services["netscaler"]["ipaddress"])
        try:
            ssh_client = remoteSSHClient(
                                    self.services["netscaler"]["ipaddress"],
                                    self.services["netscaler"]["port"],
                                    self.services["netscaler"]["username"],
                                    self.services["netscaler"]["password"],
                                    )
            cmd = "show lb vserver Cloud-VirtualServer-%s-%s" % (
                                        self.public_ip.ipaddress.ipaddress,
                                        lb_rule.publicport)
            self.debug("command: %s" % cmd)
            res = ssh_client.execute(cmd)
            result = str(res)
            self.debug("Output: %s" % result)

            self.assertEqual(
                    result.count("Persistence: RULE"),
                    1,
                    "'AppCookie' sticky policy should be configured on NS"
                    )

            self.assertEqual(
                    result.count("Configured Method: LEASTCONNECTION"),
                    1,
                    "'leastconn' algorithm should be configured on NS"
                    )

        except Exception as e:
            self.fail("SSH Access failed for %s: %s" % \
                      (self.services["netscaler"]["ipaddress"], e))
        return
开发者ID:mba811,项目名称:incubator-cloudstack,代码行数:89,代码来源:test_netscaler_lb_sticky.py


示例8: is_snapshot_on_nfs

def is_snapshot_on_nfs(apiclient, dbconn, config, zoneid, snapshotid):
    """
    Checks whether a snapshot with id (not UUID) `snapshotid` is present on the nfs storage

    @param apiclient: api client connection
    @param @dbconn:  connection to the cloudstack db
    @param config: marvin configuration file
    @param zoneid: uuid of the zone on which the secondary nfs storage pool is mounted
    @param snapshotid: uuid of the snapshot
    @return: True if snapshot is found, False otherwise
    """

    from base import ImageStore, Snapshot
    secondaryStores = ImageStore.list(apiclient, zoneid=zoneid)

    assert isinstance(secondaryStores, list), "Not a valid response for listImageStores"
    assert len(secondaryStores) != 0, "No image stores found in zone %s" % zoneid

    secondaryStore = secondaryStores[0]

    if str(secondaryStore.providername).lower() != "nfs":
        raise Exception(
            "is_snapshot_on_nfs works only against nfs secondary storage. found %s" % str(secondaryStore.providername))

    qresultset = dbconn.execute(
                        "select id from snapshots where uuid = '%s';" \
                        % str(snapshotid)
                        )
    if len(qresultset) == 0:
        raise Exception(
            "No snapshot found in cloudstack with id %s" % snapshotid)


    snapshotid = qresultset[0][0]
    qresultset = dbconn.execute(
        "select install_path from snapshot_store_ref where snapshot_id='%s' and store_role='Image';" % snapshotid
    )

    assert isinstance(qresultset, list), "Invalid db query response for snapshot %s" % snapshotid
    assert len(qresultset) != 0, "No such snapshot %s found in the cloudstack db" % snapshotid

    snapshotPath = qresultset[0][0]

    nfsurl = secondaryStore.url
    # parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']
    from urllib2 import urlparse
    parse_url = urlparse.urlsplit(nfsurl, scheme='nfs')
    host, path = parse_url.netloc, parse_url.path

    if not config.mgtSvr:
        raise Exception("Your marvin configuration does not contain mgmt server credentials")
    host, user, passwd = config.mgtSvr[0].mgtSvrIp, config.mgtSvr[0].user, config.mgtSvr[0].passwd

    try:
        ssh_client = remoteSSHClient(
            host,
            22,
            user,
            passwd,
        )
        cmds = [
                "mkdir -p %s /mnt/tmp",
                "mount -t %s %s%s /mnt/tmp" % (
                    'nfs',
                    host,
                    path,
                    ),
                "test -f %s && echo 'snapshot exists'" % (
                    os.path.join("/mnt/tmp", snapshotPath)
                    ),
            ]

        for c in cmds:
            result = ssh_client.execute(c)

        # Unmount the Sec Storage
        cmds = [
                "cd",
                "umount /mnt/tmp",
            ]
        for c in cmds:
            ssh_client.execute(c)
    except Exception as e:
        raise Exception("SSH failed for management server: %s - %s" %
                      (config[0].mgtSvrIp, e))
    return 'snapshot exists' in result
开发者ID:mba811,项目名称:incubator-cloudstack,代码行数:86,代码来源:utils.py


示例9: exec_script_on_user_vm

    def exec_script_on_user_vm(self, script, exec_cmd_params, expected_result, negative_test=False):
        try:

            vm_network_id = self.virtual_machine.nic[0].networkid
            vm_ipaddress = self.virtual_machine.nic[0].ipaddress
            list_routers_response = list_routers(
                self.apiclient, account=self.account.name, domainid=self.account.domainid, networkid=vm_network_id
            )
            self.assertEqual(
                isinstance(list_routers_response, list), True, "Check for list routers response return valid data"
            )
            router = list_routers_response[0]

            # Once host or mgt server is reached, SSH to the router connected to VM
            # look for Router for Cloudstack VM network.
            if self.apiclient.hypervisor.lower() == "vmware":
                # SSH is done via management server for Vmware
                sourceip = self.apiclient.connection.mgtSvr
            else:
                # For others, we will have to get the ipaddress of host connected to vm
                hosts = list_hosts(self.apiclient, id=router.hostid)
                self.assertEqual(isinstance(hosts, list), True, "Check list response returns a valid list")
                host = hosts[0]
                sourceip = host.ipaddress

            self.debug("Sleep %s seconds for network on router to be up" % self.services["sleep"])
            time.sleep(self.services["sleep"])

            if self.apiclient.hypervisor.lower() == "vmware":
                key_file = " -i /var/cloudstack/management/.ssh/id_rsa "
            else:
                key_file = " -i /root/.ssh/id_rsa.cloud "

            ssh_cmd = "ssh -o UserKnownHostsFile=/dev/null  -o StrictHostKeyChecking=no -o LogLevel=quiet"
            expect_script = (
                "#!/usr/bin/expect\n"
                + "spawn %s %s -p 3922 [email protected]%s\n" % (ssh_cmd, key_file, router.linklocalip)
                + 'expect "[email protected]%s:~#"\n' % (router.name)
                + 'send "%s [email protected]%s %s; exit $?\r"\n' % (ssh_cmd, vm_ipaddress, script)
                + 'expect "[email protected]%s\'s password: "\n' % (vm_ipaddress)
                + 'send "password\r"\n'
                + "interact\n"
            )
            self.debug("expect_script>>\n%s<<expect_script" % expect_script)

            script_file = "/tmp/expect_script.exp"
            fd = open(script_file, "w")
            fd.write(expect_script)
            fd.close()

            ssh = remoteSSHClient(host=sourceip, port=22, user="root", passwd=self.services["host_password"])
            self.debug("SSH client to : %s obtained" % sourceip)
            ssh.scp(script_file, script_file)
            ssh.execute("chmod +x %s" % script_file)
            self.debug("%s %s" % (script_file, exec_cmd_params))

            self.debug("sleep %s seconds for egress rule to affect on Router." % self.services["sleep"])
            time.sleep(self.services["sleep"])

            result = ssh.execute("%s %s" % (script_file, exec_cmd_params))
            self.debug("Result is=%s" % result)

            exec_success = False
            if str(result).strip() == expected_result:
                self.debug("script executed successfully exec_success=True")
                exec_success = True

            ssh.execute("rm -rf %s" % script_file)

            if negative_test:
                self.assertEqual(exec_success, True, "Script result is %s matching with %s" % (result, expected_result))
            else:
                self.assertEqual(
                    exec_success, True, "Script result is %s is not matching with %s" % (result, expected_result)
                )

        except Exception as e:
            self.debug("Error=%s" % e)
            raise e
开发者ID:ngtuna,项目名称:cloudstack-autoscale,代码行数:79,代码来源:test_egress_fw_rules.py


示例10: test_02_accountSnapshotClean


#.........这里部分代码省略.........
        # Get the Secondary Storage details from  list Hosts
        hosts = list_hosts(
                                 self.apiclient,
                                 type='SecondaryStorage',
                                 zoneid=self.zone.id
                            )
        self.assertEqual(
                            isinstance(hosts, list),
                            True,
                            "Check list response returns a valid list"
                        )
        uuids = []
        for host in hosts:
            # hosts[0].name = "nfs://192.168.100.21/export/test"
            parse_url = (host.name).split('/')
            # parse_url = ['nfs:', '', '192.168.100.21', 'export', 'test']

            # Stripping end ':' from storage type
            storage_type = parse_url[0][:-1]
            # Split IP address and export path from name
            sec_storage_ip = parse_url[2]
            # Sec Storage IP: 192.168.100.21

            if sec_storage_ip[-1] != ":":
                sec_storage_ip = sec_storage_ip + ":"

            export_path = '/'.join(parse_url[3:])
            # Export path: export/test

            # Sleep to ensure that snapshot is reflected in sec storage
            time.sleep(self.services["sleep"])
            try:
                # Login to Secondary storage VM to check snapshot present on sec disk
                ssh_client = remoteSSHClient(
                                    self.services["mgmt_server"]["ipaddress"],
                                    self.services["mgmt_server"]["port"],
                                    self.services["mgmt_server"]["username"],
                                    self.services["mgmt_server"]["password"],
                                    )

                cmds = [
                    "mkdir -p %s" % self.services["paths"]["mount_dir"],
                    "mount -t %s %s/%s %s" % (
                                         storage_type,
                                         sec_storage_ip,
                                         export_path,
                                         self.services["paths"]["mount_dir"]
                                         ),
                    "ls %s/snapshots/%s/%s" % (
                                               self.services["paths"]["mount_dir"],
                                               account_id,
                                               volume_id
                                               ),
                ]

                for c in cmds:
                    self.debug("command: %s" % c)
                    result = ssh_client.execute(c)
                    self.debug("Result: %s" % result)

                uuids.append(result)

                # Unmount the Sec Storage
                cmds = [
                    "umount %s" % (self.services["mount_dir"]),
                    ]
开发者ID:galaxyshen,项目名称:cloudstack,代码行数:67,代码来源:test_snapshot_gc.py


示例11: exec_script_on_user_vm

    def exec_script_on_user_vm(self, script, exec_cmd_params, expected_result, negative_test=False):
        try:

            vm_network_id = self.virtual_machine.nic[0].networkid
            vm_ipaddress  = self.virtual_machine.nic[0].ipaddress
            list_routers_response = list_routers(self.apiclient,
                                                 account=self.account.name,
                                                 domainid=self.account.domainid,
                                                 networkid=vm_network_id)
            self.assertEqual(isinstance(list_routers_response, list),
                             True,
                             "Check for list routers response return valid data")
            router = list_routers_response[0]

            #Once host or mgt server is reached, SSH to the router connected to VM
            # look for Router for Cloudstack VM network.
            if self.apiclient.hypervisor.lower() == 'vmware':
                #SSH is done via management server for Vmware
                sourceip = self.apiclient.connection.mgtSvr
            else:
                #For others, we will have to get the ipaddress of host connected to vm
                hosts = list_hosts(self.apiclient,
                                   id=router.hostid)
                self.assertEqual(isinstance(hosts, list),
                                 True,
                                 "Check list response returns a valid list")
                host = hosts[0]
                sourceip = host.ipaddress

            self.debug("Sleep %s seconds for network on router to be up"
                        % self.services['sleep'])
            time.sleep(self.services['sleep'])

            if self.apiclient.hypervisor.lower() == 'vmware':
                key_file = " -i /var/cloudstack/management/.ssh/id_rsa "
            else:
                key_file = " -i /root/.ssh/id_rsa.cloud "

            ssh_cmd = "ssh -o UserKnownHostsFile=/dev/null  -o StrictHostKeyChecking=no -o LogLevel=quiet"
            expect_script = "#!/usr/bin/expect\n" + \
                          "spawn %s %s -p 3922 [email protected]%s\n"  % (ssh_cmd, key_file, router.linklocalip) + \
                          "expect \"[email protected]%s:~#\"\n"   % (router.name) + \
                          "send \"%s [email protected]%s %s; exit $?\r\"\n" % (ssh_cmd, vm_ipaddress, script) + \
                          "expect \"[email protected]%s's password: \"\n"  % (vm_ipaddress) + \
                          "send \"password\r\"\n" + \
                          "interact\n"
            self.debug("expect_script>>\n%s<<expect_script" % expect_script)

            script_file = '/tmp/expect_script.exp'
            fd = open(script_file,'w')
            fd.write(expect_script)
            fd.close()

            ssh = remoteSSHClient(host=sourceip,
                                  port=22,
                                  user='root',
                                  passwd=self.services["host_password"])
            self.debug("SSH client to : %s obtained" % sourceip)
            ssh.scp(script_file, script_file)
            ssh.execute('chmod +x %s' % script_file)
            self.debug("%s %s" % (script_file, exec_cmd_params))

            exec_success = False
            #Timeout set to 3 minutes
            timeout = 180
            while timeout:
                self.debug('sleep %s seconds for egress rule to affect on Router.' % self.services['sleep'])
                time.sleep(self.services['sleep'])
                result = ssh.execute("%s %s" % (script_file, exec_cmd_params))
                self.debug('Result is=%s' % result)
                self.debug('Expected result is=%s' % expected_result)
                
                if str(result).strip() == expected_result:
                    exec_success = True
                    break
                else:
                    if result == []:
                        self.fail("Router is not accessible")
                    # This means router network did not come up as yet loop back.
                    if "send" in result[0]:
                        timeout -= self.services['sleep']
                    else: # Failed due to some other error
                        break
            #end while
            
            if timeout == 0:
                self.fail("Router network failed to come up after 3 minutes.")

            ssh.execute('rm -rf %s' % script_file)

            if negative_test:
                self.assertEqual(exec_success,
                                 True,
                                 "Script result is %s matching with %s" % (result, expected_result))
            else:
                self.assertEqual(exec_success,
                                 True,
                                 "Script result is %s is not matching with %s" % (result, expected_result))

        except Exception as e:
#.........这里部分代码省略.........
开发者ID:bluebottle,项目名称:cloudstack,代码行数:101,代码来源:test_egress_fw_rules.py



注:本文中的marvin.remoteSSHClient.remoteSSHClient函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sshClient.SshClient类代码示例发布时间:2022-05-27
下一篇:
Python utils.validateList函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap