本文整理汇总了Python中marvin.lib.base.NATRule类的典型用法代码示例。如果您正苦于以下问题:Python NATRule类的具体用法?Python NATRule怎么用?Python NATRule使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了NATRule类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.account = Account.create(self.apiclient, self.services["account"], domainid=self.domain.id)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
)
self.virtual_machine_2 = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
)
self.public_ip = PublicIPAddress.create(
self.apiclient,
self.virtual_machine.account,
self.virtual_machine.zoneid,
self.virtual_machine.domainid,
self.services["virtual_machine"],
)
NATRule.create(
self.apiclient, self.virtual_machine, self.services["natrule"], ipaddressid=self.public_ip.ipaddress.id
)
self.cleanup = [self.account]
return
开发者ID:ktenzer,项目名称:cloudstack,代码行数:35,代码来源:test_haproxy.py
示例2: test_02_use_vpn_port
def test_02_use_vpn_port(self):
"""Test create VPN when L2TP port in use"""
# Validate the following
# 1. set a port forward for UDP: 1701 and enable VPN
# 2. set port forward rule for the udp port 1701 over which L2TP works
# 3. port forward should prevent VPN from being enabled
self.debug("Creating a port forwarding rule on port 1701")
# Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
self.public_ip.ipaddress.id)
self.debug("Verifying the NAT rule created")
nat_rules = NATRule.list(self.apiclient, id=nat_rule.id, listall=True)
self.assertEqual(isinstance(nat_rules, list),
True,
"List NAT rules should return a valid response")
self.debug("Enabling the VPN connection for IP: %s" %
self.public_ip.ipaddress)
with self.assertRaises(Exception):
self.create_VPN(self.public_ip)
self.debug("Create VPN connection failed! Test successful!")
return
开发者ID:CIETstudents,项目名称:cloudstack,代码行数:29,代码来源:test_vpn_users.py
示例3: acquire_Public_Ip
def acquire_Public_Ip(self):
"""Acquires the public IP"""
try:
self.debug("Acquiring public IP for account: %s" %
self.account.name)
public_ip = PublicIPAddress.create(
self.apiclient,
self.virtual_machine.account,
self.virtual_machine.zoneid,
self.virtual_machine.domainid,
self.services["virtual_machine"]
)
self.debug("Acquired public IP: %s" %
public_ip.ipaddress.ipaddress)
self.debug("Configuring NAT rule for the acquired public ip")
NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
ipaddressid=public_ip.ipaddress.id
)
return public_ip
except Exception as e:
self.fail("Failed to acquire new public IP: %s" % e)
开发者ID:diejiazhao,项目名称:cloudstack,代码行数:28,代码来源:test_haproxy.py
示例4: create_vm
def create_vm(self, pfrule=False, egress_policy=True, RR=False):
self.create_network_offering(egress_policy, RR)
# Creating network using the network offering created
self.debug("Creating network with network offering: %s" % self.network_offering.id)
self.network = Network.create(
self.apiclient,
self.services["network"],
accountid=self.account.name,
domainid=self.account.domainid,
networkofferingid=self.network_offering.id,
zoneid=self.zone.id,
)
self.debug("Created network with ID: %s" % self.network.id)
self.debug("Deploying instance in the account: %s" % self.account.name)
project = None
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.domain.id,
serviceofferingid=self.service_offering.id,
mode=self.zone.networktype if pfrule else "basic",
networkids=[str(self.network.id)],
projectid=project.id if project else None,
)
self.debug("Deployed instance %s in account: %s" % (self.virtual_machine.id, self.account.name))
# Checking if VM is running or not, in case it is deployed in error state, test case fails
self.vm_list = list_virtual_machines(self.apiclient, id=self.virtual_machine.id)
self.assertEqual(validateList(self.vm_list)[0], PASS, "vm list validation failed, vm list is %s" % self.vm_list)
self.assertEqual(
str(self.vm_list[0].state).lower(),
"running",
"VM state should be running, it is %s" % self.vm_list[0].state,
)
self.public_ip = PublicIPAddress.create(
self.apiclient,
accountid=self.account.name,
zoneid=self.zone.id,
domainid=self.account.domainid,
networkid=self.network.id,
)
# Open up firewall port for SSH
FireWallRule.create(
self.apiclient,
ipaddressid=self.public_ip.ipaddress.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=["0.0.0.0/0"],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"],
)
self.debug("Creating NAT rule for VM ID: %s" % self.virtual_machine.id)
# Create NAT rule
NATRule.create(self.apiclient, self.virtual_machine, self.services["natrule"], self.public_ip.ipaddress.id)
return
开发者ID:tianshangjun,项目名称:cloudstack,代码行数:60,代码来源:test_egress_fw_rules.py
示例5: createNetworkRulesForVM
def createNetworkRulesForVM(apiclient, virtualmachine, ruletype,
account, networkruledata):
"""Acquire IP, create Firewall and NAT/StaticNAT rule
(associating it with given vm) for that IP"""
try:
public_ip = PublicIPAddress.create(
apiclient,accountid=account.name,
zoneid=virtualmachine.zoneid,domainid=account.domainid,
networkid=virtualmachine.nic[0].networkid)
FireWallRule.create(
apiclient,ipaddressid=public_ip.ipaddress.id,
protocol='TCP', cidrlist=[networkruledata["fwrule"]["cidr"]],
startport=networkruledata["fwrule"]["startport"],
endport=networkruledata["fwrule"]["endport"]
)
if ruletype == NAT_RULE:
# Create NAT rule
NATRule.create(apiclient, virtualmachine,
networkruledata["natrule"],ipaddressid=public_ip.ipaddress.id,
networkid=virtualmachine.nic[0].networkid)
elif ruletype == STATIC_NAT_RULE:
# Enable Static NAT for VM
StaticNATRule.enable(apiclient,public_ip.ipaddress.id,
virtualmachine.id, networkid=virtualmachine.nic[0].networkid)
except Exception as e:
[FAIL, e]
return [PASS, public_ip]
开发者ID:aali-dincloud,项目名称:cloudstack,代码行数:30,代码来源:common.py
示例6: _create_natrule
def _create_natrule(self, vpc, vm, public_port, private_port, public_ip, network, services=None):
self.logger.debug("Creating NAT rule in network for vm with public IP")
if not services:
self.services["natrule"]["privateport"] = private_port
self.services["natrule"]["publicport"] = public_port
self.services["natrule"]["startport"] = public_port
self.services["natrule"]["endport"] = public_port
services = self.services["natrule"]
nat_rule = NATRule.create(
apiclient=self.apiclient,
services=services,
ipaddressid=public_ip.ipaddress.id,
virtual_machine=vm,
networkid=network.id
)
self.assertIsNotNone(
nat_rule, "Failed to create NAT Rule for %s" % public_ip.ipaddress.ipaddress)
self.logger.debug(
"Adding NetworkACL rules to make NAT rule accessible")
vm.ssh_ip = nat_rule.ipaddress
vm.public_ip = nat_rule.ipaddress
vm.public_port = int(public_port)
return nat_rule
开发者ID:Accelerite,项目名称:cloudstack,代码行数:25,代码来源:test_vpc_vpn.py
示例7: test_router_dns_guestipquery
def test_router_dns_guestipquery(self):
"""Checks that guest VM can query VR DNS"""
self.logger.debug("Starting test_router_dns_guestipquery...")
public_ip = self.test_router_common()[0]
self.logger.debug("Creating Firewall rule for VM ID: %s" % self.vm.id)
FireWallRule.create(
self.apiclient,
ipaddressid=public_ip.id,
protocol=self.services["natrule1"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule1"]["publicport"],
endport=self.services["natrule1"]["publicport"]
)
self.logger.debug("Creating NAT rule for VM ID: %s" % self.vm.id)
nat_rule1 = NATRule.create(
self.apiclient,
self.vm,
self.services["natrule1"],
public_ip.id
)
nat_rules = list_nat_rules(
self.apiclient,
id=nat_rule1.id
)
self.assertEqual(
isinstance(nat_rules, list),
True,
"Check for list NAT rules response return valid data"
)
self.assertTrue(
len(nat_rules) >= 1,
"Check for list NAT rules to have at least one rule"
)
self.assertEqual(
nat_rules[0].state,
'Active',
"Check list port forwarding rules"
)
result = None
try:
self.logger.debug("SSH into guest VM with IP: %s" % nat_rule1.ipaddress)
ssh = self.vm.get_ssh_client(ipaddress=nat_rule1.ipaddress, port=self.services['natrule1']["publicport"], retries=8)
result = str(ssh.execute("nslookup google.com"))
except Exception as e:
self.fail("Failed to SSH into VM - %s due to exception: %s" % (nat_rule1.ipaddress, e))
if not result:
self.fail("Did not to receive any response from the guest VM, failing.")
self.assertTrue("google.com" in result and "#53" in result,
"VR DNS should serve requests from guest network, unable to get valid nslookup result from guest VM.")
开发者ID:Accelerite,项目名称:cloudstack,代码行数:55,代码来源:test_router_dns.py
示例8: test_isolate_network_FW_PF_default_routes
def test_isolate_network_FW_PF_default_routes(self):
"""Stop existing router, add a PF rule and check we can access the VM """
self.logger.debug("Starting test_isolate_network_FW_PF_default_routes...")
routers = list_routers(self.apiclient, account=self.account.name, domainid=self.account.domainid)
self.assertEqual(isinstance(routers, list), True, "Check for list routers response return valid data")
self.assertNotEqual(len(routers), 0, "Check list router response")
router = routers[0]
self.assertEqual(router.state, "Running", "Check list router response for router state")
public_ips = list_publicIP(
self.apiclient, account=self.account.name, domainid=self.account.domainid, zoneid=self.zone.id
)
self.assertEqual(isinstance(public_ips, list), True, "Check for list public IPs response return valid data")
public_ip = public_ips[0]
self.logger.debug("Creating Firewall rule for VM ID: %s" % self.vm_1.id)
FireWallRule.create(
self.apiclient,
ipaddressid=public_ip.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=["0.0.0.0/0"],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"],
)
self.logger.debug("Creating NAT rule for VM ID: %s" % self.vm_1.id)
# Create NAT rule
nat_rule = NATRule.create(self.apiclient, self.vm_1, self.services["natrule"], public_ip.id)
nat_rules = list_nat_rules(self.apiclient, id=nat_rule.id)
self.assertEqual(isinstance(nat_rules, list), True, "Check for list NAT rules response return valid data")
self.assertEqual(nat_rules[0].state, "Active", "Check list port forwarding rules")
result = "failed"
try:
ssh_command = "ping -c 3 8.8.8.8"
self.logger.debug("SSH into VM with ID: %s" % nat_rule.ipaddress)
ssh = self.vm_1.get_ssh_client(
ipaddress=nat_rule.ipaddress, port=self.services["natrule"]["publicport"], retries=5
)
result = str(ssh.execute(ssh_command))
self.logger.debug("SSH result: %s; COUNT is ==> %s" % (result, result.count("3 packets received")))
except:
self.fail("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
self.assertEqual(result.count("3 packets received"), 1, "Ping to outside world from VM should be successful")
return
开发者ID:jannyg,项目名称:cloudstack,代码行数:55,代码来源:test_routers_network_ops.py
示例9: create_natrule
def create_natrule(self, vm, public_ip, network):
self.logger.debug('Creating NAT rule in network for vm with public IP')
nat_rule = NATRule.create(
self.api_client,
vm,
self.vm_services['small'],
ipaddressid=public_ip.ipaddress.id,
openfirewall=True,
networkid=network.id
)
self.test_cleanup.append(nat_rule)
return nat_rule
开发者ID:EdwardBetts,项目名称:blackhole,代码行数:12,代码来源:test_nicira.py
示例10: deploy_portforwards
def deploy_portforwards(self, portforwards_data, virtualmachines_data, vpc, publicipaddress):
for portforward_data in portforwards_data:
for virtualmachine_data in virtualmachines_data:
if virtualmachine_data['data']['name'] == portforward_data['data']['virtualmachinename']:
for nic_data in virtualmachine_data['data']['nics']:
if nic_data['data']['guestip'] == portforward_data['data']['nic']:
network = get_network(
api_client=self.api_client,
name=nic_data['data']['networkname'],
vpc=vpc
)
virtualmachine = get_virtual_machine(
api_client=self.api_client,
name=self.dynamic_names['vms'][virtualmachine_data['data']['name']],
network=network
)
self.logger.debug('>>> PORT FORWARD => Creating...')
portforward = NATRule.create(
api_client=self.api_client,
data=portforward_data['data'],
network=network,
virtual_machine=virtualmachine,
ipaddress=publicipaddress
)
Tag.create(
api_client=self.api_client,
resourceType='UserVm',
resourceIds=[virtualmachine.id],
tags=[
{
'key': 'sship',
'value': publicipaddress.ipaddress.ipaddress
},
{
'key': 'sshport',
'value': portforward_data['data']['publicport']
}
]
)
self.logger.debug('>>> PORT FORWARD => ID: %s => Public Start Port: %s '
'=> Public End Port: %s => Private Start Port: %s '
'=> Private End Port: %s => CIDR List: %s => Protocol: %s '
'=> State: %s => IP: %s => VM: %s', portforward.id,
portforward.publicport, portforward.publicendport,
portforward.privateport, portforward.privateendport, portforward.cidrlist,
portforward.protocol, portforward.state, portforward.ipaddressid,
portforward.virtualmachineid)
开发者ID:MissionCriticalCloud,项目名称:cosmic,代码行数:50,代码来源:testScenarioManager.py
示例11: test_03_enable_vpn_use_port
def test_03_enable_vpn_use_port(self):
"""Test create NAT rule when VPN when L2TP enabled"""
# Validate the following
# 1. Enable a VPN connection on source NAT
# 2. Add a VPN user
# 3. add a port forward rule for UDP port 1701. Should result in error
# saying that VPN is enabled over port 1701
self.debug("Enabling the VPN connection for IP: %s" %
self.public_ip.ipaddress)
self.create_VPN(self.public_ip)
self.debug("Creating a port forwarding rule on port 1701")
# Create NAT rule
with self.assertRaises(Exception):
NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
self.public_ip.ipaddress.id)
self.debug("Create NAT rule failed! Test successful!")
return
开发者ID:CIETstudents,项目名称:cloudstack,代码行数:24,代码来源:test_vpn_users.py
示例12: createNetworkRules
def createNetworkRules(self, rule, ipaddressobj, networkid):
""" Create specified rule on acquired public IP and
default network of virtual machine
"""
# Open up firewall port for SSH
self.fw_rule = FireWallRule.create(
self.apiclient,
ipaddressid=ipaddressobj.ipaddress.id,
protocol=self.services["fwrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["fwrule"]["startport"],
endport=self.services["fwrule"]["endport"]
)
if rule == STATIC_NAT_RULE:
StaticNATRule.enable(
self.apiclient,
ipaddressobj.ipaddress.id,
self.virtual_machine.id,
networkid
)
elif rule == LB_RULE:
self.lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
ipaddressid=ipaddressobj.ipaddress.id,
accountid=self.account.name,
networkid=self.virtual_machine.nic[0].networkid,
domainid=self.account.domainid)
vmidipmap = [{"vmid": str(self.virtual_machine.id),
"vmip": str(self.virtual_machine.nic[0].ipaddress)}]
self.lb_rule.assign(
self.apiclient,
vmidipmap=vmidipmap
)
else:
self.nat_rule = NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
ipaddressobj.ipaddress.id
)
return
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:46,代码来源:test_network.py
示例13: create_NatRule_For_VM
def create_NatRule_For_VM(self, vm, public_ip, network):
self.debug("Creatinng NAT rule in network for vm with public IP")
nat_rule = NATRule.create(self.apiclient,
vm,
self.services["natrule"],
ipaddressid=public_ip.ipaddress.id,
openfirewall=False,
networkid=network.id,
vpcid=self.vpc.id
)
self.debug("Adding NetwrokACl rules to make NAT rule accessible")
nwacl_nat = NetworkACL.create(self.apiclient,
networkid=network.id,
services=self.services["natrule"],
traffictype='Ingress'
)
self.debug('nwacl_nat=%s' % nwacl_nat.__dict__)
return nat_rule
开发者ID:MANIKANDANVEN,项目名称:cloudstack,代码行数:19,代码来源:test_vpc_network_lbrules.py
示例14: create_natrule
def create_natrule(self, vm, public_ip, network, vpc_id):
self.logger.debug("Creating NAT rule in network for vm with public IP")
nat_rule_services = self.services["natrule"]
nat_rule = NATRule.create(
self.apiclient,
vm,
nat_rule_services,
ipaddressid=public_ip.ipaddress.id,
openfirewall=False,
networkid=network.id,
vpcid=vpc_id,
)
self.logger.debug("Adding NetworkACL rules to make NAT rule accessible")
nwacl_nat = NetworkACL.create(
self.apiclient, networkid=network.id, services=nat_rule_services, traffictype="Ingress"
)
self.logger.debug("nwacl_nat=%s" % nwacl_nat.__dict__)
return nat_rule
开发者ID:shapeblue,项目名称:Trillian,代码行数:21,代码来源:test_routers_iptables_default_policy.py
示例15: setUp
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.services = self.testClient.getParsedTestDataConfig()
# Get Zone, Domain and templates
self.domain = get_domain(self.apiclient)
self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
template = get_template(
self.apiclient,
self.zone.id,
self.services["ostype"]
)
self.services["virtual_machine"]["zoneid"] = self.zone.id
# Create an account, network, VM and IP addresses
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True,
domainid=self.domain.id
)
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offerings"]
)
self.vm_1 = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
templateid=template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
try:
src_nat_ip_addr = src_nat_ip_addrs[0]
except Exception as e:
self.fail("SSH failed for VM with IP: %s %s" %
(src_nat_ip_addr.ipaddress, e))
self.lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
src_nat_ip_addr.id,
self.account.name
)
self.lb_rule.assign(self.apiclient, [self.vm_1])
self.nat_rule = NATRule.create(
self.apiclient,
self.vm_1,
self.services["natrule"],
src_nat_ip_addr.id
)
self.cleanup = []
return
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:63,代码来源:test_network.py
示例16: test_01_port_fwd_on_src_nat
def test_01_port_fwd_on_src_nat(self):
"""Test for port forwarding on source NAT"""
# Validate the following:
# 1. listPortForwarding rules API should return the added PF rule
# 2. attempt to do an ssh into the user VM through the sourceNAT
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(src_nat_ip_addrs, list),
True,
"Check list response returns a valid list"
)
src_nat_ip_addr = src_nat_ip_addrs[0]
# Check if VM is in Running state before creating NAT rule
vm_response = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
vm_response[0].state,
'Running',
"VM state should be Running before creating a NAT rule."
)
# Open up firewall port for SSH
FireWallRule.create(
self.apiclient,
ipaddressid=src_nat_ip_addr.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
# Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
src_nat_ip_addr.id
)
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
self.assertEqual(
isinstance(list_nat_rule_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_nat_rule_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
list_nat_rule_response[0].id,
nat_rule.id,
"Check Correct Port forwarding Rule is returned"
)
# SSH virtual machine to test port forwarding
try:
self.debug("SSHing into VM with IP address %s with NAT IP %s" %
(
self.virtual_machine.ipaddress,
src_nat_ip_addr.ipaddress
))
self.virtual_machine.get_ssh_client(src_nat_ip_addr.ipaddress)
vm_response = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id
)
if vm_response[0].state != 'Running':
self.fail(
"State of VM : %s is not found to be Running" % str(
self.virtual_machine.ipaddress))
except Exception as e:
self.fail(
#.........这里部分代码省略.........
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:101,代码来源:test_network.py
示例17: test_02_port_fwd_on_non_src_nat
def test_02_port_fwd_on_non_src_nat(self):
"""Test for port forwarding on non source NAT"""
# Validate the following:
# 1. listPortForwardingRules should not return the deleted rule anymore
# 2. attempt to do ssh should now fail
ip_address = PublicIPAddress.create(
self.apiclient,
self.account.name,
self.zone.id,
self.account.domainid,
self.services["virtual_machine"]
)
self.cleanup.append(ip_address)
# Check if VM is in Running state before creating NAT rule
vm_response = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
vm_response[0].state,
'Running',
"VM state should be Running before creating a NAT rule."
)
# Open up firewall port for SSH
FireWallRule.create(
self.apiclient,
ipaddressid=ip_address.ipaddress.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
# Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
ip_address.ipaddress.id
)
# Validate the following:
# 1. listPortForwardingRules should not return the deleted rule anymore
# 2. attempt to do ssh should now fail
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
self.assertEqual(
isinstance(list_nat_rule_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_nat_rule_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
list_nat_rule_response[0].id,
nat_rule.id,
"Check Correct Port forwarding Rule is returned"
)
try:
self.debug("SSHing into VM with IP address %s with NAT IP %s" %
(
self.virtual_machine.ipaddress,
ip_address.ipaddress.ipaddress
))
self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" %
(self.virtual_machine.ipaddress, e)
)
nat_rule.delete(self.apiclient)
try:
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
except CloudstackAPIException:
self.debug("Nat Rule is deleted")
#.........这里部分代码省略.........
开发者ID:Tosta-Mixta,项目名称:cloudstack,代码行数:101,代码来源:test_network.py
示例18: test_01_host_maintenance_mode
def test_01_host_maintenance_mode(self):
"""Test host maintenance mode
"""
# Validate the following
# 1. Create Vms. Acquire IP. Create port forwarding & load balancing
# rules for Vms.
# 2. Host 1: put to maintenance mode. All Vms should failover to Host
# 2 in cluster. Vms should be in running state. All port forwarding
# rules and load balancing Rules should work.
# 3. After failover to Host 2 succeeds, deploy Vms. Deploy Vms on host
# 2 should succeed.
# 4. Host 1: cancel maintenance mode.
# 5. Host 2 : put to maintenance mode. All Vms should failover to
# Host 1 in cluster.
# 6. After failover to Host 1 succeeds, deploy VMs. Deploy Vms on
# host 1 should succeed.
hosts = Host.list(
self.apiclient,
zoneid=self.zone.id,
resourcestate='Enabled',
type='Routing'
)
self.assertEqual(
isinstance(hosts, list),
True,
"List hosts should return valid host response"
)
if len(hosts) < 2:
self.skipTest("There must be at least 2 hosts present in cluster")
self.debug("Checking HA with hosts: %s, %s" % (
hosts[0].name,
hosts[1].name
))
self.debug("Deploying VM in account: %s" % self.account.name)
# Spawn an instance in that network
virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
vms = VirtualMachine.list(
self.apiclient,
id=virtual_machine.id,
listall=True
)
self.assertEqual(
isinstance(vms, list),
True,
"List VMs should return valid response for deployed VM"
)
self.assertNotEqual(
len(vms),
0,
"List VMs should return valid response for deployed VM"
)
vm = vms[0]
self.debug("Deployed VM on host: %s" % vm.hostid)
self.assertEqual(
vm.state,
"Running",
"Deployed VM should be in RUnning state"
)
networks = Network.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid,
listall=True
)
self.assertEqual(
isinstance(networks, list),
True,
"List networks should return valid list for the account"
)
network = networks[0]
self.debug("Associating public IP for account: %s" %
self.account.name)
public_ip = PublicIPAddress.create(
self.apiclient,
accountid=self.account.name,
zoneid=self.zone.id,
domainid=self.account.domainid,
networkid=network.id
)
self.debug("Associated %s with network %s" % (
public_ip.ipaddress.ipaddress,
network.id
))
self.debug("Creating PF rule for IP address: %s" %
public_ip.ipaddress.ipaddress)
NATRule.create(
self.apiclient,
virtual_machine,
self.services["natrule"],
#.........这里部分代码省略.........
开发者ID:CIETstudents,项目名称:cloudstack,代码行数:101,代码来源:test_high_availability.py
示例19: test_08_add_TCP_PF_Rule_In_VPN
def test_08_add_TCP_PF_Rule_In_VPN(self):
"""
Test to add TCP Port Forwarding rule for specific ports(500,1701 and 4500) in VPN
"""
# Steps for verification
# 1. Enable vpn on SourceNAT IP address
# 2. Configure PF with TCP ports 500,1701 and 4500. It should be allowed
# Should not conflict with UPD ports used for VPN
vm_res = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id,
listall=True
)
self.assertEqual(
validateList(vm_res)[0],
PASS,
"Failed to list virtual machine"
)
network_id = vm_res[0].nic[0].networkid
src_nat_list = PublicIPAddress.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid,
listall=True,
issourcenat=True,
associatednetworkid=network_id
)
self.assertEqual(
validateList(src_nat_list)[0],
PASS,
"Failed to list source nat ip address"
)
ip = src_nat_list[0]
try:
vpn = Vpn.create(
self.apiclient,
publicipid=ip.id,
account=self.account.name,
domainid=self.account.domainid,
)
self.assertIsNotNone(
vpn,
"Failed to create remote access vpn"
)
except Exception as e:
self.fail("Failed to enable vpn on SourceNAT IP with error: %s" % e)
#Create PF rule with TCP ports 500,4500 and 1701
self.services['natrule']['protocol']="TCP"
for port in [500, 4500, 1701]:
self.services['natrule']['privateport'] = port
self.services['natrule']['publicport'] = port
try:
nat = NATRule.create(
self.apiclient,
|
请发表评论