本文整理汇总了Python中neutron.agent.linux.utils.wait_until_true函数的典型用法代码示例。如果您正苦于以下问题:Python wait_until_true函数的具体用法?Python wait_until_true怎么用?Python wait_until_true使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wait_until_true函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _wait_for_death
def _wait_for_death(self):
is_dead = lambda: self.proc.poll() is not None
utils.wait_until_true(
is_dead,
timeout=self.TIMEOUT,
exception=RuntimeError("Ping command hasn't ended after %d seconds." % self.TIMEOUT),
)
开发者ID:bigswitch,项目名称:neutron,代码行数:7,代码来源:net_helpers.py
示例2: wait_until_none
def wait_until_none(predicate, timeout=5, sleep=1, exception=None):
def internal_predicate():
ret = predicate()
if ret:
return False
return True
wait_until_true(internal_predicate, timeout, sleep, exception)
开发者ID:HarborOS,项目名称:dragonflow,代码行数:7,代码来源:utils.py
示例3: test_dhcp_app_dos_block
def test_dhcp_app_dos_block(self):
def internal_predicate():
ovs = test_utils.OvsFlowsParser()
return (self._check_dhcp_block_rule(ovs.dump()))
dhcp_packet = self._create_dhcp_discover()
send_dhcp_offer = app_testing_objects.SendAction(
self.subnet1.subnet_id,
self.port1.port_id,
str(dhcp_packet)
)
port_policies = self._create_port_policies(disable_rule=False)
policy = self.store(
app_testing_objects.Policy(
initial_actions=[send_dhcp_offer,
send_dhcp_offer,
send_dhcp_offer,
send_dhcp_offer],
port_policies=port_policies,
unknown_port_action=app_testing_objects.IgnoreAction()
)
)
policy.start(self.topology)
wait_until_true(internal_predicate, 30, 1, None)
开发者ID:paperandsoap,项目名称:dragonflow,代码行数:26,代码来源:test_apps.py
示例4: test_has_updates
def test_has_updates(self):
utils.wait_until_true(lambda: self.monitor.has_updates)
# clear the event list
self.monitor.get_events()
self.useFixture(net_helpers.OVSPortFixture())
# has_updates after port addition should become True
utils.wait_until_true(lambda: self.monitor.has_updates is True)
开发者ID:MODITDC,项目名称:neutron,代码行数:7,代码来源:test_ovsdb_monitor.py
示例5: setup_ha_routers
def setup_ha_routers(self, router_public_ip, private_net_cidr):
"""Setup HA master router on agent1 and backup router on agent2"""
router_info = self._generate_info(router_public_ip,
private_net_cidr, enable_ha=True)
get_ns_name = mock.patch.object(
namespaces.RouterNamespace, '_get_ns_name').start()
get_ns_name.return_value = "qrouter-{0}-{1}".format(
router_info['id'], self.vpn_agent.host)
router1 = self.manage_router(self.vpn_agent, router_info)
router_info_2 = copy.deepcopy(router_info)
router_info_2[l3_constants.HA_INTERFACE_KEY] = (
l3_test_common.get_ha_interface(ip='169.254.192.2',
mac='22:22:22:22:22:22'))
get_ns_name.return_value = "qrouter-{0}-{1}".format(
router_info['id'], self.failover_agent.host)
router2 = self.manage_router(self.failover_agent, router_info_2)
linux_utils.wait_until_true(lambda: router1.ha_state == 'master')
linux_utils.wait_until_true(lambda: router2.ha_state == 'backup')
port_namespace, port_ip = self.port_setup(router1)
vpn_service = self.prepare_vpn_service_info(
router1.router_id, router_public_ip, private_net_cidr)
return {"router1": router1, "router2": router2,
"port_namespace": port_namespace, "port_ip": port_ip,
"vpn_service": vpn_service}
开发者ID:armando-migliaccio,项目名称:neutron-vpnaas,代码行数:29,代码来源:test_scenario.py
示例6: _test_restart_service_on_sighup
def _test_restart_service_on_sighup(self, service, workers=1):
"""Test that a service correctly (re)starts on receiving SIGHUP.
1. Start a service with a given number of workers.
2. Send SIGHUP to the service.
3. Wait for workers (if any) to (re)start.
"""
self._start_server(callback=service, workers=workers)
os.kill(self.service_pid, signal.SIGHUP)
expected_msg = FAKE_START_MSG * workers * 2
# Wait for temp file to be created and its size reaching the expected
# value
expected_size = len(expected_msg)
condition = lambda: (os.path.isfile(self.temp_file)
and os.stat(self.temp_file).st_size ==
expected_size)
utils.wait_until_true(
condition, timeout=5, sleep=0.1,
exception=RuntimeError(
"Timed out waiting for file %(filename)s to be created and "
"its size become equal to %(size)s." %
{'filename': self.temp_file,
'size': expected_size}))
# Verify that start has been called twice for each worker (one for
# initial start, and the second one on SIGHUP after children were
# terminated).
with open(self.temp_file, 'r') as f:
res = f.readline()
self.assertEqual(expected_msg, res)
开发者ID:HoratiusTang,项目名称:neutron,代码行数:34,代码来源:test_server.py
示例7: _assert_router_does_not_exist
def _assert_router_does_not_exist(self, router):
# If the namespace assertion succeeds
# then the devices and iptable rules have also been deleted,
# so there's no need to check that explicitly.
self.assertFalse(self._namespace_exists(router.ns_name))
utils.wait_until_true(
lambda: not self._metadata_proxy_exists(self.agent.conf, router))
开发者ID:Akanksha08,项目名称:neutron,代码行数:7,代码来源:test_l3_agent.py
示例8: setUp
def setUp(self):
super(NeutronServerFixture, self).setUp()
self.neutron_cfg_fixture = config_fixtures.NeutronConfigFixture(
self.temp_dir, cfg.CONF.database.connection,
self.rabbitmq_environment)
self.plugin_cfg_fixture = config_fixtures.ML2ConfigFixture(
self.temp_dir)
self.useFixture(self.neutron_cfg_fixture)
self.useFixture(self.plugin_cfg_fixture)
self.neutron_config = self.neutron_cfg_fixture.config
self.plugin_config = self.plugin_cfg_fixture.config
config_filenames = [self.neutron_cfg_fixture.filename,
self.plugin_cfg_fixture.filename]
self.process_fixture = self.useFixture(ProcessFixture(
test_name=self.test_name,
process_name=self.NEUTRON_SERVER,
exec_name=self.NEUTRON_SERVER,
config_filenames=config_filenames))
utils.wait_until_true(self.server_is_live)
开发者ID:bgxavier,项目名称:neutron,代码行数:25,代码来源:fullstack_fixtures.py
示例9: create_site
def create_site(self, public_net, private_nets, l3ha=False):
"""Build router(s), namespaces, and ports for a site.
For HA, we'll create a backup router and wait for both routers
to be ready, so that we can test pings after failover.
"""
if l3ha:
site = SiteInfoWithHaRouter(public_net, private_nets,
self.vpn_agent.host,
self.failover_agent.host)
else:
site = SiteInfo(public_net, private_nets)
site.router = self.create_router(self.vpn_agent, site.info)
if l3ha:
backup_info = site.generate_backup_router_info()
site.backup_router = self.create_router(self.failover_agent,
backup_info)
linux_utils.wait_until_true(
lambda: site.router.ha_state == 'master')
linux_utils.wait_until_true(
lambda: site.backup_router.ha_state == 'backup')
self.create_ports_for(site)
return site
开发者ID:leftyLin,项目名称:neutron-vpnaas,代码行数:25,代码来源:test_scenario.py
示例10: test_create_update_subnet_with_dhcp
def test_create_update_subnet_with_dhcp(self):
ovs = utils.OvsFlowsParser()
flows_before_change = ovs.dump(self.integration_bridge)
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network_id = network.create()
subnet = {'network_id': network_id,
'cidr': '10.10.254.0/24',
'gateway_ip': '10.10.254.1',
'ip_version': 4,
'name': 'subnet-test',
'enable_dhcp': True}
subnet = self.neutron.create_subnet({'subnet': subnet})
subnet_id = subnet['subnet']['id']
dhcp_ip = utils.wait_until_is_and_return(
lambda: self.get_dhcp_ip(network_id, subnet_id),
exception=Exception('DHCP IP was not generated')
)
self.assertFalse(self.check_dhcp_rule(flows_before_change, dhcp_ip))
wait_until_true(
lambda: self.check_dhcp_rule(ovs.dump(self.integration_bridge),
dhcp_ip),
exception=Exception('DHCP ip was not found in OpenFlow rules'),
timeout=5
)
# change dhcp
updated_subnet = {'enable_dhcp': False}
self.neutron.update_subnet(subnet_id, {'subnet': updated_subnet})
time.sleep(utils.DEFAULT_CMD_TIMEOUT)
flows_after_update = ovs.dump(self.integration_bridge)
self.assertFalse(self.check_dhcp_rule(flows_after_update, dhcp_ip))
network.close()
开发者ID:HarborOS,项目名称:dragonflow,代码行数:31,代码来源:test_dhcp_flows.py
示例11: _spawn_keepalived
def _spawn_keepalived(self, keepalived_manager):
keepalived_manager.spawn()
process = keepalived_manager.get_process()
utils.wait_until_true(
lambda: process.active, timeout=5, sleep=0.01, exception=RuntimeError(_("Keepalived didn't spawn"))
)
return process
开发者ID:bigswitch,项目名称:neutron,代码行数:7,代码来源:test_keepalived.py
示例12: _assert_dvr_external_device
def _assert_dvr_external_device(self, router):
external_port = router.get_ex_gw_port()
snat_ns_name = dvr_snat_ns.SnatNamespace.get_snat_ns_name(
router.router_id)
# if the agent is in dvr_snat mode, then we have to check
# that the correct ports and ip addresses exist in the
# snat_ns_name namespace
if self.agent.conf.agent_mode == 'dvr_snat':
device_exists = functools.partial(
self.device_exists_with_ips_and_mac,
external_port,
router.get_external_device_name,
snat_ns_name)
utils.wait_until_true(device_exists)
# if the agent is in dvr mode then the snat_ns_name namespace
# should not be present at all:
elif self.agent.conf.agent_mode == 'dvr':
self.assertFalse(
self._namespace_exists(snat_ns_name),
"namespace %s was found but agent is in dvr mode not dvr_snat"
% (str(snat_ns_name))
)
# if the agent is anything else the test is misconfigured
# we force a test failure with message
else:
self.assertTrue(False, " agent not configured for dvr or dvr_snat")
开发者ID:manjeetbhatia,项目名称:test_l3,代码行数:27,代码来源:test_dvr_router.py
示例13: _read_stream
def _read_stream(stream, timeout):
if timeout:
poller = select.poll()
poller.register(stream.fileno())
poll_predicate = functools.partial(poller.poll, 1)
utils.wait_until_true(poll_predicate, timeout, 0.1, RuntimeError("No output in %.2f seconds" % timeout))
return stream.readline()
开发者ID:cisco-openstack,项目名称:neutron,代码行数:7,代码来源:net_helpers.py
示例14: wait_until_fip_active
def wait_until_fip_active(self, timeout=5, sleep=1, exception=None):
def internal_predicate():
fip = self.get_floatingip()
if fip and fip.get_status() == 'ACTIVE':
return True
return False
wait_until_true(internal_predicate, timeout, sleep, exception)
开发者ID:paperandsoap,项目名称:dragonflow,代码行数:7,代码来源:test_objects.py
示例15: test_has_updates
def test_has_updates(self):
utils.wait_until_true(lambda: self.monitor.data_received is True)
self.assertTrue(self.monitor.has_updates, "Initial call should always be true")
# clear the event list
self.monitor.get_events()
self.useFixture(net_helpers.OVSPortFixture())
# has_updates after port addition should become True
utils.wait_until_true(lambda: self.monitor.has_updates is True)
开发者ID:neoareslinux,项目名称:neutron,代码行数:8,代码来源:test_ovsdb_monitor.py
示例16: test_ha_router_conf_on_restarted_agent
def test_ha_router_conf_on_restarted_agent(self):
router_info = self.generate_router_info(enable_ha=True)
router1 = self._create_router(self.agent, router_info)
self._add_fip(router1, '192.168.111.12')
restarted_agent = l3_test_agent.TestL3NATAgent(self.agent.host,
self.agent.conf)
self._create_router(restarted_agent, router1.router)
utils.wait_until_true(lambda: self._floating_ips_configured(router1))
开发者ID:bradleyjones,项目名称:neutron,代码行数:8,代码来源:test_l3_agent.py
示例17: _wait_for_ipsec_startup
def _wait_for_ipsec_startup(self, router, driver, conf, should_run=True):
"""Wait for new IPSec process on failover agent to start up."""
# check for both strongswan and openswan processes
path = driver.processes[router.router_id].config_dir
pid_files = ['%s/var/run/charon.pid' % path,
'%s/var/run/pluto.pid' % path]
linux_utils.wait_until_true(
lambda: should_run == self._ipsec_process_exists(
conf, router, pid_files))
开发者ID:leftyLin,项目名称:neutron-vpnaas,代码行数:9,代码来源:test_scenario.py
示例18: _ensure_lla_task
def _ensure_lla_task(self, gw_ifname, ns_name, lla_with_mask):
# It would be insane for taking so long unless DAD test failed
# In that case, the subnet would never be assigned a prefix.
linux_utils.wait_until_true(functools.partial(self._lla_available,
gw_ifname,
ns_name,
lla_with_mask),
timeout=l3_constants.LLA_TASK_TIMEOUT,
sleep=2)
开发者ID:ISCAS-VDI,项目名称:neutron-base,代码行数:9,代码来源:pd.py
示例19: wait_until_is_and_return
def wait_until_is_and_return(predicate, timeout=5, sleep=1, exception=None):
container = {}
def internal_predicate():
container['value'] = predicate()
return container['value']
wait_until_true(internal_predicate, timeout, sleep, exception)
return container.get('value')
开发者ID:HarborOS,项目名称:dragonflow,代码行数:9,代码来源:utils.py
示例20: wait_until_bandwidth_limit_rule_applied
def wait_until_bandwidth_limit_rule_applied(bridge, port_vif, rule):
def _bandwidth_limit_rule_applied():
bw_rule = bridge.get_egress_bw_limit_for_port(port_vif)
expected = None, None
if rule:
expected = rule.max_kbps, rule.max_burst_kbps
return bw_rule == expected
agent_utils.wait_until_true(_bandwidth_limit_rule_applied)
开发者ID:MODITDC,项目名称:neutron,代码行数:9,代码来源:l2_extensions.py
注:本文中的neutron.agent.linux.utils.wait_until_true函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论