本文整理汇总了Python中neutron.common.utils.wait_until_true函数的典型用法代码示例。如果您正苦于以下问题:Python wait_until_true函数的具体用法?Python wait_until_true怎么用?Python wait_until_true使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wait_until_true函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_both_ha_router_lost_gw_connection
def test_both_ha_router_lost_gw_connection(self):
self.agent.conf.set_override(
'ha_vrrp_health_check_interval', 5)
self.failover_agent.conf.set_override(
'ha_vrrp_health_check_interval', 5)
router1, router2 = self.create_ha_routers()
master_router, slave_router = self._get_master_and_slave_routers(
router1, router2)
self.fail_gw_router_port(master_router)
self.fail_gw_router_port(slave_router)
common_utils.wait_until_true(
lambda: master_router.ha_state == 'master')
common_utils.wait_until_true(
lambda: slave_router.ha_state == 'master')
self.restore_gw_router_port(master_router)
new_master, new_slave = self._get_master_and_slave_routers(
master_router, slave_router)
self.assertEqual(master_router, new_master)
self.assertEqual(slave_router, new_slave)
开发者ID:cubeek,项目名称:neutron,代码行数:26,代码来源:test_ha_router.py
示例2: test_mtu_update
def test_mtu_update(self):
tenant_id = uuidutils.generate_uuid()
router = self.safe_client.create_router(tenant_id)
network = self.safe_client.create_network(tenant_id)
subnet = self.safe_client.create_subnet(
tenant_id, network['id'], '20.0.0.0/24', gateway_ip='20.0.0.1')
self.safe_client.add_router_interface(router['id'], subnet['id'])
namespace = "%[email protected]%s" % (
self._get_namespace(router['id']),
self.environment.hosts[0].l3_agent.get_namespace_suffix(), )
self._assert_namespace_exists(namespace)
ip = ip_lib.IPWrapper(namespace)
common_utils.wait_until_true(lambda: ip.get_devices())
devices = ip.get_devices()
self.assertEqual(1, len(devices))
ri_dev = devices[0]
mtu = ri_dev.link.mtu
self.assertEqual(1500, mtu)
mtu -= 1
network = self.safe_client.update_network(network['id'], mtu=mtu)
common_utils.wait_until_true(lambda: ri_dev.link.mtu == mtu)
开发者ID:cubeek,项目名称:neutron,代码行数:27,代码来源:test_l3_agent.py
示例3: _test_controller_timeout_does_not_break_connectivity
def _test_controller_timeout_does_not_break_connectivity(self,
kill_signal=None):
# Environment preparation is effectively the same as connectivity test
vms = self._prepare_vms_in_single_network()
vms.ping_all()
ns0 = vms[0].namespace
ip1 = vms[1].ip
LOG.debug("Stopping agents (hence also OVS bridge controllers)")
for host in self.environment.hosts:
if kill_signal is not None:
host.l2_agent.stop(kill_signal=kill_signal)
else:
host.l2_agent.stop()
# Ping to make sure that 3 x 5 seconds is overcame even under a high
# load. The time was chosen to match three times inactivity_probe time,
# which is the time after which the OVS vswitchd
# treats the controller as dead and starts managing the bridge
# by itself when the fail type settings is not set to secure (see
# ovs-vsctl man page for further details)
with net_helpers.async_ping(ns0, [ip1], timeout=2, count=25) as done:
common_utils.wait_until_true(
done,
exception=RuntimeError("Networking interrupted after "
"controllers have vanished"))
开发者ID:cubeek,项目名称:neutron,代码行数:27,代码来源:test_connectivity.py
示例4: _assert_dvr_external_device
def _assert_dvr_external_device(self, router):
external_port = router.get_ex_gw_port()
snat_ns_name = dvr_snat_ns.SnatNamespace.get_snat_ns_name(
router.router_id)
# if the agent is in dvr_snat mode, then we have to check
# that the correct ports and ip addresses exist in the
# snat_ns_name namespace
if self.agent.conf.agent_mode == 'dvr_snat':
device_exists = functools.partial(
self.device_exists_with_ips_and_mac,
external_port,
router.get_external_device_name,
snat_ns_name)
utils.wait_until_true(device_exists)
# if the agent is in dvr mode then the snat_ns_name namespace
# should not be present at all:
elif self.agent.conf.agent_mode == 'dvr':
self.assertFalse(
self._namespace_exists(snat_ns_name),
"namespace %s was found but agent is in dvr mode not dvr_snat"
% (str(snat_ns_name))
)
# if the agent is anything else the test is misconfigured
# we force a test failure with message
else:
self.assertTrue(False, " agent not configured for dvr or dvr_snat")
开发者ID:suda999,项目名称:neutron,代码行数:27,代码来源:test_dvr_router.py
示例5: _assert_ping_during_agents_restart
def _assert_ping_during_agents_restart(
self, agents, src_namespace, ips, restart_timeout=10,
ping_timeout=1, count=10):
with net_helpers.async_ping(
src_namespace, ips, timeout=ping_timeout,
count=count) as done:
LOG.debug("Restarting agents")
executor = futures.ThreadPoolExecutor(max_workers=len(agents))
restarts = [agent.restart(executor=executor)
for agent in agents]
futures.wait(restarts, timeout=restart_timeout)
self.assertTrue(all([r.done() for r in restarts]))
LOG.debug("Restarting agents - done")
# It is necessary to give agents time to initialize
# because some crucial steps (e.g. setting up bridge flows)
# happen only after RPC is established
agent_names = ', '.join({agent.process_fixture.process_name
for agent in agents})
common_utils.wait_until_true(
done,
timeout=count * (ping_timeout + 1),
exception=RuntimeError("Could not ping the other VM, "
"re-starting %s leads to network "
"disruption" % agent_names))
开发者ID:openstack,项目名称:neutron,代码行数:27,代码来源:base.py
示例6: _assert_router_does_not_exist
def _assert_router_does_not_exist(self, router):
# If the namespace assertion succeeds
# then the devices and iptable rules have also been deleted,
# so there's no need to check that explicitly.
self.assertFalse(self._namespace_exists(router.ns_name))
common_utils.wait_until_true(
lambda: not self._metadata_proxy_exists(self.agent.conf, router))
开发者ID:openstack,项目名称:neutron,代码行数:7,代码来源:framework.py
示例7: _test_metadata_proxy_spawn_kill_with_subnet_create_delete
def _test_metadata_proxy_spawn_kill_with_subnet_create_delete(self):
network = self.network_dict_for_dhcp(ip_version=6)
self.configure_dhcp_for_network(network=network)
pm = self._get_metadata_proxy_process(network)
# A newly created network with ipv6 subnet will not have metadata proxy
self.assertFalse(pm.active)
new_network = copy.deepcopy(network)
dhcp_enabled_ipv4_subnet = self.create_subnet_dict(network.id)
new_network.subnets.append(dhcp_enabled_ipv4_subnet)
self.mock_plugin_api.get_network_info.return_value = new_network
self.agent.refresh_dhcp_helper(network.id)
# Metadata proxy should be spawned for the newly added subnet
common_utils.wait_until_true(
lambda: pm.active,
timeout=5,
sleep=0.1,
exception=RuntimeError("Metadata proxy didn't spawn"))
self.mock_plugin_api.get_network_info.return_value = network
self.agent.refresh_dhcp_helper(network.id)
# Metadata proxy should be killed because network doesn't need it.
common_utils.wait_until_true(
lambda: not pm.active,
timeout=5,
sleep=0.1,
exception=RuntimeError("Metadata proxy didn't get killed"))
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:28,代码来源:test_dhcp_agent.py
示例8: test_has_updates
def test_has_updates(self):
utils.wait_until_true(lambda: self.monitor.has_updates)
# clear the event list
self.monitor.get_events()
self.useFixture(net_helpers.OVSPortFixture())
# has_updates after port addition should become True
utils.wait_until_true(lambda: self.monitor.has_updates is True)
开发者ID:cubeek,项目名称:neutron,代码行数:7,代码来源:test_ovsdb_monitor.py
示例9: block_until_ping
def block_until_ping(self, dst_ip):
predicate = functools.partial(self.ping_predicate, dst_ip)
utils.wait_until_true(
predicate,
exception=FakeMachineException(
"No ICMP reply obtained from IP address %s" % dst_ip)
)
开发者ID:coreycb,项目名称:neutron,代码行数:7,代码来源:machine_fixtures.py
示例10: test_cleanup_network_namespaces_cleans_dhcp_and_l3_namespaces
def test_cleanup_network_namespaces_cleans_dhcp_and_l3_namespaces(self):
dhcp_namespace = self.useFixture(
net_helpers.NamespaceFixture(dhcp.NS_PREFIX)).name
l3_namespace = self.useFixture(
net_helpers.NamespaceFixture(namespaces.NS_PREFIX)).name
bridge = self.useFixture(
net_helpers.VethPortFixture(namespace=dhcp_namespace)).bridge
self.useFixture(
net_helpers.VethPortFixture(bridge, l3_namespace))
# we scope the get_namespaces to our own ones not to affect other
# tests, as otherwise cleanup will kill them all
self.get_namespaces.return_value = [l3_namespace, dhcp_namespace]
# launch processes in each namespace to make sure they're
# killed during cleanup
procs_launched = self._launch_processes([l3_namespace, dhcp_namespace])
self.assertIsNot(procs_launched, 0)
common_utils.wait_until_true(
lambda: self._get_num_spawned_procs() == procs_launched,
timeout=15,
exception=Exception("Didn't spawn expected number of processes"))
netns_cleanup.cleanup_network_namespaces(self.conf)
self.get_namespaces_p.stop()
namespaces_now = ip_lib.list_network_namespaces()
procs_after = self._get_num_spawned_procs()
self.assertEqual(procs_after, 0)
self.assertNotIn(l3_namespace, namespaces_now)
self.assertNotIn(dhcp_namespace, namespaces_now)
开发者ID:igordcard,项目名称:neutron,代码行数:31,代码来源:test_netns_cleanup.py
示例11: test_mtu_update
def test_mtu_update(self):
# The test case needs access to devices in nested namespaces. ip_lib
# doesn't support it, and it's probably unsafe to touch the library for
# testing matters.
# TODO(jlibosva) revisit when ip_lib supports nested namespaces
if self.environment.hosts[0].dhcp_agent.namespace is not None:
self.skipTest("ip_lib doesn't support nested namespaces")
self.vm.block_until_dhcp_config_done()
namespace = dhcp_agent._get_namespace_name(
self.network['id'],
suffix=self.environment.hosts[0].dhcp_agent.get_namespace_suffix())
ip = ip_lib.IPWrapper(namespace)
devices = ip.get_devices()
self.assertEqual(1, len(devices))
dhcp_dev = devices[0]
mtu = dhcp_dev.link.mtu
self.assertEqual(1450, mtu)
mtu -= 1
self.safe_client.update_network(self.network['id'], mtu=mtu)
common_utils.wait_until_true(lambda: dhcp_dev.link.mtu == mtu)
开发者ID:igordcard,项目名称:neutron,代码行数:25,代码来源:test_dhcp_agent.py
示例12: test_l2_agent_restart
def test_l2_agent_restart(self, agent_restart_timeout=20):
# Environment preparation is effectively the same as connectivity test
vms = self._prepare_vms_in_single_network()
vms.ping_all()
ns0 = vms[0].namespace
ip1 = vms[1].ip
agents = [host.l2_agent for host in self.environment.hosts]
# Restart agents on all nodes simultaneously while pinging across
# the hosts. The ping has to cross int and phys bridges and travels
# via central bridge as the vms are on separate hosts.
with net_helpers.async_ping(ns0, [ip1], timeout=2,
count=agent_restart_timeout) as done:
LOG.debug("Restarting agents")
executor = futures.ThreadPoolExecutor(max_workers=len(agents))
restarts = [agent.restart(executor=executor)
for agent in agents]
futures.wait(restarts, timeout=agent_restart_timeout)
self.assertTrue(all([r.done() for r in restarts]))
LOG.debug("Restarting agents - done")
# It is necessary to give agents time to initialize
# because some crucial steps (e.g. setting up bridge flows)
# happen only after RPC is established
common_utils.wait_until_true(
done,
exception=RuntimeError("Could not ping the other VM, L2 agent "
"restart leads to network disruption"))
开发者ID:eayunstack,项目名称:neutron,代码行数:31,代码来源:test_connectivity.py
示例13: _test_restart_service_on_sighup
def _test_restart_service_on_sighup(self, service, workers=1):
"""Test that a service correctly (re)starts on receiving SIGHUP.
1. Start a service with a given number of workers.
2. Send SIGHUP to the service.
3. Wait for workers (if any) to (re)start.
"""
self._start_server(callback=service, workers=workers)
os.kill(self.service_pid, signal.SIGHUP)
expected_msg = FAKE_START_MSG * workers * 2
# Wait for temp file to be created and its size reaching the expected
# value
expected_size = len(expected_msg)
condition = lambda: (os.path.isfile(self.temp_file)
and os.stat(self.temp_file).st_size ==
expected_size)
utils.wait_until_true(
condition, timeout=5, sleep=0.1,
exception=RuntimeError(
"Timed out waiting for file %(filename)s to be created and "
"its size become equal to %(size)s." %
{'filename': self.temp_file,
'size': expected_size}))
# Verify that start has been called twice for each worker (one for
# initial start, and the second one on SIGHUP after children were
# terminated).
with open(self.temp_file, 'r') as f:
res = f.readline()
self.assertEqual(expected_msg, res)
开发者ID:21atlas,项目名称:neutron,代码行数:34,代码来源:test_server.py
示例14: test_new_fip_sends_garp
def test_new_fip_sends_garp(self):
next_ip_cidr = net_helpers.increment_ip_cidr(self.machines.ip_cidr, 2)
expected_ip = str(netaddr.IPNetwork(next_ip_cidr).ip)
# Create incomplete ARP entry
self.peer.assert_no_ping(expected_ip)
has_entry = has_expected_arp_entry(
self.peer.port.name,
self.peer.namespace,
expected_ip,
self.router.port.link.address)
self.assertFalse(has_entry)
self.router.port.addr.add(next_ip_cidr)
has_arp_entry_predicate = functools.partial(
has_expected_arp_entry,
self.peer.port.name,
self.peer.namespace,
expected_ip,
self.router.port.link.address,
)
exc = RuntimeError(
"No ARP entry in %s namespace containing IP address %s and MAC "
"address %s" % (
self.peer.namespace,
expected_ip,
self.router.port.link.address))
utils.wait_until_true(
has_arp_entry_predicate,
exception=exc)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:28,代码来源:test_keepalived_state_change.py
示例15: _wait_until_ipv6_forwarding_has_state
def _wait_until_ipv6_forwarding_has_state(self, ns_name, dev_name, state):
def _ipv6_forwarding_has_state():
return ip_lib.get_ipv6_forwarding(
device=dev_name, namespace=ns_name) == state
common_utils.wait_until_true(_ipv6_forwarding_has_state)
开发者ID:openstack,项目名称:neutron,代码行数:7,代码来源:framework.py
示例16: test_ha_router_restart_agents_no_packet_lost
def test_ha_router_restart_agents_no_packet_lost(self):
tenant_id = uuidutils.generate_uuid()
ext_net, ext_sub = self._create_external_network_and_subnet(tenant_id)
router = self.safe_client.create_router(tenant_id, ha=True,
external_network=ext_net['id'])
external_vm = self.useFixture(
machine_fixtures.FakeMachine(
self.environment.central_external_bridge,
common_utils.ip_to_cidr(ext_sub['gateway_ip'], 24)))
common_utils.wait_until_true(
lambda:
len(self.client.list_l3_agent_hosting_routers(
router['id'])['agents']) == 2,
timeout=90)
common_utils.wait_until_true(
functools.partial(
self._is_ha_router_active_on_one_agent,
router['id']),
timeout=90)
router_ip = router['external_gateway_info'][
'external_fixed_ips'][0]['ip_address']
l3_agents = [host.agents['l3'] for host in self.environment.hosts]
self._assert_ping_during_agents_restart(
l3_agents, external_vm.namespace, [router_ip], count=60)
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:29,代码来源:test_l3_agent.py
示例17: block_until_no_ping
def block_until_no_ping(self, dst_ip):
predicate = functools.partial(
lambda ip: not self.ping_predicate(ip), dst_ip)
utils.wait_until_true(
predicate,
exception=FakeMachineException(
"ICMP packets still pass to %s IP address." % dst_ip)
)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:8,代码来源:machine_fixtures.py
示例18: _wait_for_min_bw_rule_applied
def _wait_for_min_bw_rule_applied(self, vm, min_bw, direction):
if direction == constants.EGRESS_DIRECTION:
utils.wait_until_true(
lambda: vm.bridge.get_egress_min_bw_for_port(
vm.neutron_port['id']) == min_bw)
elif direction == constants.INGRESS_DIRECTION:
self.fail('"%s" direction not implemented'
% constants.INGRESS_DIRECTION)
开发者ID:openstack,项目名称:neutron,代码行数:8,代码来源:test_qos.py
示例19: block_until_dhcp_config_done
def block_until_dhcp_config_done(self):
utils.wait_until_true(
lambda: self.ip_configured() and self.gateway_configured(),
exception=machine_fixtures.FakeMachineException(
"Address %s or gateway %s not configured properly on "
"port %s" % (self.ip_cidr, self.gateway_ip, self.port.name)
)
)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:8,代码来源:machine.py
示例20: destroy_state_change_monitor
def destroy_state_change_monitor(self, process_monitor):
pm = self._get_state_change_monitor_process_manager()
process_monitor.unregister(self.router_id, IP_MONITOR_PROCESS_SERVICE)
pm.disable(sig=str(int(signal.SIGTERM)))
try:
common_utils.wait_until_true(lambda: not pm.active, timeout=SIGTERM_TIMEOUT)
except common_utils.WaitTimeout:
pm.disable(sig=str(int(signal.SIGKILL)))
开发者ID:noironetworks,项目名称:neutron,代码行数:8,代码来源:ha_router.py
注:本文中的neutron.common.utils.wait_until_true函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论