本文整理汇总了Python中neutron.tests.common.net_helpers.assert_no_ping函数的典型用法代码示例。如果您正苦于以下问题:Python assert_no_ping函数的具体用法?Python assert_no_ping怎么用?Python assert_no_ping使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了assert_no_ping函数的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_arp_spoof_blocks_response
def test_arp_spoof_blocks_response(self):
# this will prevent the destination from responding to the ARP
# request for it's own address
self._setup_arp_spoof_for_port(self.dst_p.name, ["192.168.0.3"])
self.src_p.addr.add("%s/24" % self.src_addr)
self.dst_p.addr.add("%s/24" % self.dst_addr)
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2)
开发者ID:neoareslinux,项目名称:neutron,代码行数:7,代码来源:test_ovs_flows.py
示例2: check_ping
def check_ping(self, from_site, to_site, instance=0, success=True):
if success:
net_helpers.assert_ping(from_site.vm[instance].namespace, to_site.vm[instance].port_ip, timeout=8, count=4)
else:
net_helpers.assert_no_ping(
from_site.vm[instance].namespace, to_site.vm[instance].port_ip, timeout=8, count=4
)
开发者ID:wywangsh,项目名称:neutron-vpnaas,代码行数:7,代码来源:test_scenario.py
示例3: test_mac_spoof_blocks_wrong_mac
def test_mac_spoof_blocks_wrong_mac(self):
self._setup_arp_spoof_for_port(self.src_p.name, [self.src_addr])
self._setup_arp_spoof_for_port(self.dst_p.name, [self.dst_addr])
self.src_p.addr.add("%s/24" % self.src_addr)
self.dst_p.addr.add("%s/24" % self.dst_addr)
net_helpers.assert_ping(self.src_namespace, self.dst_addr)
# changing the allowed mac should stop the port from working
self._setup_arp_spoof_for_port(self.src_p.name, [self.src_addr], mac="00:11:22:33:44:55")
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr)
开发者ID:sebrandon1,项目名称:neutron,代码行数:9,代码来源:test_ovs_flows.py
示例4: test_arp_spoof_blocks_icmpv6_neigh_advt
def test_arp_spoof_blocks_icmpv6_neigh_advt(self):
self.src_addr = '2000::1'
self.dst_addr = '2000::2'
# this will prevent the destination from responding (i.e., icmpv6
# neighbour advertisement) to the icmpv6 neighbour solicitation
# request for it's own address (2000::2) as spoofing rules added
# below only allow '2000::3'.
self._setup_arp_spoof_for_port(self.dst_p.name, ['2000::3'])
self.src_p.addr.add('%s/64' % self.src_addr)
self.dst_p.addr.add('%s/64' % self.dst_addr)
# make sure the IPv6 addresses are ready before pinging
self.src_p.addr.wait_until_address_ready(self.src_addr)
self.dst_p.addr.wait_until_address_ready(self.dst_addr)
net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2)
开发者ID:FedericoRessi,项目名称:neutron,代码行数:14,代码来源:test_ovs_flows.py
示例5: test_ipsec_site_connections
def test_ipsec_site_connections(self):
device = self.vpn_agent.device_drivers[0]
# Mock the method below because it causes Exception:
# RuntimeError: Second simultaneous read on fileno 5 detected.
# Unless you really know what you're doing, make sure that only
# one greenthread can read any particular socket. Consider using
# a pools.Pool. If you do know what you're doing and want to disable
# this error, call eventlet.debug.hub_prevent_multiple_readers(False)
# Can reproduce the exception in the test only
ip_lib.send_ip_addr_adv_notif = mock.Mock()
# There are no vpn services yet. get_vpn_services_on_host returns
# empty list
device.agent_rpc.get_vpn_services_on_host = mock.Mock(
return_value=[])
# instantiate network resources "router", "private network"
private_nets = list(PRIVATE_NET.subnet(24))
site1 = self.site_setup(PUBLIC_NET[4], private_nets[1])
site2 = self.site_setup(PUBLIC_NET[5], private_nets[2])
# build vpn resources
self.prepare_ipsec_conn_info(site1['vpn_service'],
site2['vpn_service'])
self.prepare_ipsec_conn_info(site2['vpn_service'],
site1['vpn_service'])
device.report_status = mock.Mock()
device.agent_rpc.get_vpn_services_on_host = mock.Mock(
return_value=[site1['vpn_service'],
site2['vpn_service']])
net_helpers.assert_no_ping(site1['port_namespace'], site2['port_ip'],
timeout=8, count=4)
net_helpers.assert_no_ping(site2['port_namespace'], site1['port_ip'],
timeout=8, count=4)
device.sync(mock.Mock(), [{'id': site1['router'].router_id},
{'id': site2['router'].router_id}])
self.addCleanup(
device._delete_vpn_processes,
[site1['router'].router_id, site2['router'].router_id], [])
net_helpers.assert_ping(site1['port_namespace'], site2['port_ip'],
timeout=8, count=4)
net_helpers.assert_ping(site2['port_namespace'], site1['port_ip'],
timeout=8, count=4)
开发者ID:armando-migliaccio,项目名称:neutron-vpnaas,代码行数:44,代码来源:test_scenario.py
示例6: assert_no_ping
def assert_no_ping(self, dst_ip):
net_helpers.assert_no_ping(self.namespace, dst_ip)
开发者ID:bgxavier,项目名称:neutron,代码行数:2,代码来源:machine_fixtures.py
示例7: test_ipsec_site_connections_with_l3ha_routers
def test_ipsec_site_connections_with_l3ha_routers(self):
"""Test ipsec site connection with HA routers.
This test creates two agents. First agent will have Legacy and HA
routers. Second agent will host only HA router. We setup ipsec
connection between legacy and HA router.
When HA router is created, agent1 will have master router and
agent2 will have backup router. Ipsec connection will be established
between legacy router and agent1's master HA router.
Then we fail the agent1's master HA router. Agent1's HA router will
transition to backup and agent2's HA router will become master.
Now ipsec connection will be established between legacy router and
agent2's master HA router
"""
self.failover_agent = self._configure_agent('agent2')
self.connect_agents(self.vpn_agent, self.failover_agent)
vpn_agent_driver = self.vpn_agent.device_drivers[0]
failover_agent_driver = self.failover_agent.device_drivers[0]
ip_lib.send_ip_addr_adv_notif = mock.Mock()
# There are no vpn services yet. get_vpn_services_on_host returns
# empty list
vpn_agent_driver.agent_rpc.get_vpn_services_on_host = mock.Mock(
return_value=[])
failover_agent_driver.agent_rpc.get_vpn_services_on_host = mock.Mock(
return_value=[])
# instantiate network resources "router", "private network"
private_nets = list(PRIVATE_NET.subnet(24))
site1 = self.site_setup(PUBLIC_NET[4], private_nets[1])
site2 = self.setup_ha_routers(PUBLIC_NET[5], private_nets[2])
router = site1['router']
router1 = site2['router1']
router2 = site2['router2']
# build vpn resources
self.prepare_ipsec_conn_info(site1['vpn_service'],
site2['vpn_service'])
self.prepare_ipsec_conn_info(site2['vpn_service'],
site1['vpn_service'])
vpn_agent_driver.report_status = mock.Mock()
failover_agent_driver.report_status = mock.Mock()
vpn_agent_driver.agent_rpc.get_vpn_services_on_host = mock.Mock(
return_value=[site1['vpn_service'],
site2['vpn_service']])
failover_agent_driver.agent_rpc.get_vpn_services_on_host = mock.Mock(
return_value=[site2['vpn_service']])
# No ipsec connection between legacy router and HA routers
net_helpers.assert_no_ping(site1['port_namespace'], site2['port_ip'],
timeout=8, count=4)
net_helpers.assert_no_ping(site2['port_namespace'], site1['port_ip'],
timeout=8, count=4)
# sync the routers
vpn_agent_driver.sync(mock.Mock(), [{'id': router.router_id},
{'id': router1.router_id}])
failover_agent_driver.sync(mock.Mock(), [{'id': router1.router_id}])
self.addCleanup(
vpn_agent_driver._delete_vpn_processes,
[router.router_id, router1.router_id], [])
# Test ipsec connection between legacy router and agent2's HA router
net_helpers.assert_ping(site1['port_namespace'], site2['port_ip'],
timeout=8, count=4)
net_helpers.assert_ping(site2['port_namespace'], site1['port_ip'],
timeout=8, count=4)
# Fail the agent1's HA router. Agent1's HA router will transition
# to backup and agent2's HA router will become master.
self._fail_ha_router(router1)
linux_utils.wait_until_true(lambda: router2.ha_state == 'master')
linux_utils.wait_until_true(lambda: router1.ha_state == 'backup')
# wait until ipsec process running in failover agent's HA router
# check for both strongswan and openswan processes
path = failover_agent_driver.processes[router2.router_id].config_dir
pid_files = ['%s/var/run/charon.pid' % path,
'%s/var/run/pluto.pid' % path]
linux_utils.wait_until_true(
lambda: self._ipsec_process_exists(
self.failover_agent.conf, router2, pid_files))
# Test ipsec connection between legacy router and agent2's HA router
net_helpers.assert_ping(site1['port_namespace'], site2['port_ip'],
timeout=8, count=4)
net_helpers.assert_ping(site2['port_namespace'], site1['port_ip'],
timeout=8, count=4)
开发者ID:armando-migliaccio,项目名称:neutron-vpnaas,代码行数:94,代码来源:test_scenario.py
示例8: test_securitygroup
def test_securitygroup(self):
"""Tests if a security group rules are working, by confirming
that 0. traffic is allowed when port security is disabled,
1. connection from outside of allowed security group is blocked
2. connection from allowed security group is permitted
3. traffic not explicitly allowed (eg. ICMP) is blocked,
4. a security group update takes effect,
5. a remote security group member addition works, and
6. an established connection stops by deleting a SG rule.
7. test other protocol functionality by using SCTP protocol
"""
index_to_sg = [0, 0, 1]
if self.firewall_driver == 'iptables_hybrid':
# The iptables_hybrid driver lacks isolation between agents
index_to_host = [0] * 3
else:
index_to_host = [0, 1, 1]
tenant_uuid = uuidutils.generate_uuid()
network = self.safe_client.create_network(tenant_uuid)
self.safe_client.create_subnet(
tenant_uuid, network['id'], '20.0.0.0/24')
sgs = [self.safe_client.create_security_group(tenant_uuid)
for i in range(2)]
ports = [
self.safe_client.create_port(tenant_uuid, network['id'],
self.environment.hosts[host].hostname,
security_groups=[],
port_security_enabled=False)
for host in index_to_host]
self.safe_client.create_security_group_rule(
tenant_uuid, sgs[0]['id'],
remote_group_id=sgs[0]['id'], direction='ingress',
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3333, port_range_max=3333)
vms = [
self.useFixture(
machine.FakeFullstackMachine(
self.environment.hosts[host],
network['id'],
tenant_uuid,
self.safe_client,
neutron_port=ports[port],
use_dhcp=True))
for port, host in enumerate(index_to_host)]
for vm in vms:
vm.block_until_boot()
# 0. check that traffic is allowed when port security is disabled
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP)
self.assert_connection(
vms[2].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP)
net_helpers.assert_ping(vms[0].namespace, vms[1].ip)
net_helpers.assert_ping(vms[0].namespace, vms[2].ip)
net_helpers.assert_ping(vms[1].namespace, vms[2].ip)
# Apply security groups to the ports
for port, sg in zip(ports, index_to_sg):
self.safe_client.client.update_port(
port['id'],
body={'port': {'port_security_enabled': True,
'security_groups': [sgs[sg]['id']]}})
# 1. connection from outside of allowed security group is blocked
netcat = net_helpers.NetcatTester(
vms[2].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP)
# Wait until port update takes effect on the ports
common_utils.wait_until_true(
netcat.test_no_connectivity,
exception=AssertionError(
"Still can connect to the VM from different host.")
)
netcat.stop_processes()
# 2. check if connection from allowed security group is permitted
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP)
# 3. check if traffic not explicitly allowed (eg. ICMP) is blocked
net_helpers.assert_no_ping(vms[0].namespace, vms[1].ip)
net_helpers.assert_no_ping(vms[0].namespace, vms[2].ip)
net_helpers.assert_no_ping(vms[1].namespace, vms[2].ip)
# 4. check if a security group update takes effect
self.assert_no_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3344,
net_helpers.NetcatTester.TCP)
self.safe_client.create_security_group_rule(
#.........这里部分代码省略.........
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:101,代码来源:test_securitygroup.py
示例9: test_tcp_securitygroup
def test_tcp_securitygroup(self):
"""Tests if a TCP security group rule is working, by confirming
that 1. connection from allowed security group is allowed,
2. connection from elsewhere is blocked,
3. traffic not explicitly allowed (eg. ICMP) is blocked,
4. a security group update takes effect,
5. a remote security group member addition works, and
6. an established connection stops by deleting a SG rule.
"""
index_to_sg = [0, 0, 1]
if self.firewall_driver == "iptables_hybrid":
# The iptables_hybrid driver lacks isolation between agents
index_to_host = [0] * 3
else:
index_to_host = [0, 1, 1]
tenant_uuid = uuidutils.generate_uuid()
network = self.safe_client.create_network(tenant_uuid)
self.safe_client.create_subnet(tenant_uuid, network["id"], "20.0.0.0/24")
sgs = [self.safe_client.create_security_group(tenant_uuid) for i in range(2)]
ports = [
self.safe_client.create_port(
tenant_uuid, network["id"], self.environment.hosts[host].hostname, security_groups=[sgs[sg]["id"]]
)
for host, sg in zip(index_to_host, index_to_sg)
]
self.safe_client.create_security_group_rule(
tenant_uuid,
sgs[0]["id"],
remote_group_id=sgs[0]["id"],
direction="ingress",
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3333,
port_range_max=3333,
)
vms = [
self.useFixture(
machine.FakeFullstackMachine(
self.environment.hosts[host], network["id"], tenant_uuid, self.safe_client, neutron_port=ports[port]
)
)
for port, host in enumerate(index_to_host)
]
for vm in vms:
vm.block_until_boot()
# 1. check if connection from allowed security group is allowed
self.assert_connection(vms[1].namespace, vms[0].namespace, vms[0].ip, 3333, net_helpers.NetcatTester.TCP)
# 2. check if connection from elsewhere is blocked
self.assert_no_connection(vms[2].namespace, vms[0].namespace, vms[0].ip, 3333, net_helpers.NetcatTester.TCP)
# 3. check if traffic not explicitly allowed (eg. ICMP) is blocked
net_helpers.assert_no_ping(vms[0].namespace, vms[1].ip)
net_helpers.assert_no_ping(vms[0].namespace, vms[2].ip)
net_helpers.assert_no_ping(vms[1].namespace, vms[2].ip)
# 4. check if a security group update takes effect
self.assert_no_connection(vms[1].namespace, vms[0].namespace, vms[0].ip, 3344, net_helpers.NetcatTester.TCP)
self.safe_client.create_security_group_rule(
tenant_uuid,
sgs[0]["id"],
remote_group_id=sgs[0]["id"],
direction="ingress",
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3344,
port_range_max=3344,
)
self.assert_connection(vms[1].namespace, vms[0].namespace, vms[0].ip, 3344, net_helpers.NetcatTester.TCP)
# 5. check if a remote security group member addition works
rule2 = self.safe_client.create_security_group_rule(
tenant_uuid,
sgs[0]["id"],
remote_group_id=sgs[1]["id"],
direction="ingress",
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3355,
port_range_max=3355,
)
self.assert_connection(vms[2].namespace, vms[0].namespace, vms[0].ip, 3355, net_helpers.NetcatTester.TCP)
index_to_host.append(index_to_host[2])
index_to_sg.append(1)
ports.append(
self.safe_client.create_port(
tenant_uuid,
network["id"],
self.environment.hosts[index_to_host[3]].hostname,
#.........这里部分代码省略.........
开发者ID:openstack,项目名称:neutron,代码行数:101,代码来源:test_securitygroup.py
示例10: test_tcp_securitygroup
def test_tcp_securitygroup(self):
"""Tests if a TCP security group rule is working, by confirming
that 1. connection from allowed security group is allowed,
2. connection from elsewhere is blocked,
3. traffic not explicitly allowed (eg. ICMP) is blocked, and
4. a security group update takes effect.
"""
index_to_sg = [0, 0, 1]
if self.firewall_driver == 'iptables_hybrid':
# The iptables_hybrid driver lacks isolation between agents
index_to_host = [0] * 3
else:
index_to_host = [0, 1, 1]
tenant_uuid = uuidutils.generate_uuid()
network = self.safe_client.create_network(tenant_uuid)
self.safe_client.create_subnet(
tenant_uuid, network['id'], '20.0.0.0/24')
sgs = [self.safe_client.create_security_group(tenant_uuid)
for i in range(2)]
ports = [
self.safe_client.create_port(tenant_uuid, network['id'],
self.environment.hosts[host].hostname,
security_groups=[sgs[sg]['id']])
for host, sg in zip(index_to_host, index_to_sg)]
self.safe_client.create_security_group_rule(
tenant_uuid, sgs[0]['id'],
remote_group_id=sgs[0]['id'], direction='ingress',
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3333, port_range_max=3333)
vms = [
self.useFixture(
machine.FakeFullstackMachine(
self.environment.hosts[host],
network['id'],
tenant_uuid,
self.safe_client,
neutron_port=ports[port]))
for port, host in enumerate(index_to_host)]
for vm in vms:
vm.block_until_boot()
# 1. check if connection from allowed security group is allowed
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP)
# 2. check if connection from elsewhere is blocked
self.assert_no_connection(
vms[2].namespace, vms[0].namespace, vms[0].ip, 3333,
net_helpers.NetcatTester.TCP)
# 3. check if traffic not explicitly allowed (eg. ICMP) is blocked
net_helpers.assert_no_ping(vms[0].namespace, vms[1].ip)
net_helpers.assert_no_ping(vms[0].namespace, vms[2].ip)
net_helpers.assert_no_ping(vms[1].namespace, vms[2].ip)
# 4. check if a security group update takes effect
self.assert_no_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3344,
net_helpers.NetcatTester.TCP)
self.safe_client.create_security_group_rule(
tenant_uuid, sgs[0]['id'],
remote_group_id=sgs[0]['id'], direction='ingress',
ethertype=constants.IPv4,
protocol=constants.PROTO_NAME_TCP,
port_range_min=3344, port_range_max=3344)
self.assert_connection(
vms[1].namespace, vms[0].namespace, vms[0].ip, 3344,
net_helpers.NetcatTester.TCP)
开发者ID:sebrandon1,项目名称:neutron,代码行数:78,代码来源:test_securitygroup.py
注:本文中的neutron.tests.common.net_helpers.assert_no_ping函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论