• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.wait_until_true函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.agent.linux.utils.wait_until_true函数的典型用法代码示例。如果您正苦于以下问题:Python wait_until_true函数的具体用法?Python wait_until_true怎么用?Python wait_until_true使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了wait_until_true函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _wait_for_death

 def _wait_for_death(self):
     is_dead = lambda: self.proc.poll() is not None
     utils.wait_until_true(
         is_dead,
         timeout=self.TIMEOUT,
         exception=RuntimeError("Ping command hasn't ended after %d seconds." % self.TIMEOUT),
     )
开发者ID:bigswitch,项目名称:neutron,代码行数:7,代码来源:net_helpers.py


示例2: wait_until_none

def wait_until_none(predicate, timeout=5, sleep=1, exception=None):
    def internal_predicate():
        ret = predicate()
        if ret:
            return False
        return True
    wait_until_true(internal_predicate, timeout, sleep, exception)
开发者ID:HarborOS,项目名称:dragonflow,代码行数:7,代码来源:utils.py


示例3: test_dhcp_app_dos_block

    def test_dhcp_app_dos_block(self):
        def internal_predicate():
            ovs = test_utils.OvsFlowsParser()
            return (self._check_dhcp_block_rule(ovs.dump()))

        dhcp_packet = self._create_dhcp_discover()
        send_dhcp_offer = app_testing_objects.SendAction(
            self.subnet1.subnet_id,
            self.port1.port_id,
            str(dhcp_packet)
        )

        port_policies = self._create_port_policies(disable_rule=False)
        policy = self.store(
            app_testing_objects.Policy(
                initial_actions=[send_dhcp_offer,
                                send_dhcp_offer,
                                send_dhcp_offer,
                                send_dhcp_offer],
                port_policies=port_policies,
                unknown_port_action=app_testing_objects.IgnoreAction()
            )
        )

        policy.start(self.topology)
        wait_until_true(internal_predicate, 30, 1, None)
开发者ID:paperandsoap,项目名称:dragonflow,代码行数:26,代码来源:test_apps.py


示例4: test_has_updates

 def test_has_updates(self):
     utils.wait_until_true(lambda: self.monitor.has_updates)
     # clear the event list
     self.monitor.get_events()
     self.useFixture(net_helpers.OVSPortFixture())
     # has_updates after port addition should become True
     utils.wait_until_true(lambda: self.monitor.has_updates is True)
开发者ID:MODITDC,项目名称:neutron,代码行数:7,代码来源:test_ovsdb_monitor.py


示例5: setup_ha_routers

    def setup_ha_routers(self, router_public_ip, private_net_cidr):
        """Setup HA master router on agent1 and backup router on agent2"""
        router_info = self._generate_info(router_public_ip,
            private_net_cidr, enable_ha=True)
        get_ns_name = mock.patch.object(
            namespaces.RouterNamespace, '_get_ns_name').start()
        get_ns_name.return_value = "qrouter-{0}-{1}".format(
            router_info['id'], self.vpn_agent.host)

        router1 = self.manage_router(self.vpn_agent, router_info)

        router_info_2 = copy.deepcopy(router_info)
        router_info_2[l3_constants.HA_INTERFACE_KEY] = (
            l3_test_common.get_ha_interface(ip='169.254.192.2',
                                            mac='22:22:22:22:22:22'))
        get_ns_name.return_value = "qrouter-{0}-{1}".format(
            router_info['id'], self.failover_agent.host)
        router2 = self.manage_router(self.failover_agent, router_info_2)

        linux_utils.wait_until_true(lambda: router1.ha_state == 'master')
        linux_utils.wait_until_true(lambda: router2.ha_state == 'backup')

        port_namespace, port_ip = self.port_setup(router1)

        vpn_service = self.prepare_vpn_service_info(
            router1.router_id, router_public_ip, private_net_cidr)
        return {"router1": router1, "router2": router2,
                "port_namespace": port_namespace, "port_ip": port_ip,
                "vpn_service": vpn_service}
开发者ID:armando-migliaccio,项目名称:neutron-vpnaas,代码行数:29,代码来源:test_scenario.py


示例6: _test_restart_service_on_sighup

    def _test_restart_service_on_sighup(self, service, workers=1):
        """Test that a service correctly (re)starts on receiving SIGHUP.

        1. Start a service with a given number of workers.
        2. Send SIGHUP to the service.
        3. Wait for workers (if any) to (re)start.
        """

        self._start_server(callback=service, workers=workers)
        os.kill(self.service_pid, signal.SIGHUP)

        expected_msg = FAKE_START_MSG * workers * 2

        # Wait for temp file to be created and its size reaching the expected
        # value
        expected_size = len(expected_msg)
        condition = lambda: (os.path.isfile(self.temp_file)
                             and os.stat(self.temp_file).st_size ==
                             expected_size)

        utils.wait_until_true(
            condition, timeout=5, sleep=0.1,
            exception=RuntimeError(
                "Timed out waiting for file %(filename)s to be created and "
                "its size become equal to %(size)s." %
                {'filename': self.temp_file,
                 'size': expected_size}))

        # Verify that start has been called twice for each worker (one for
        # initial start, and the second one on SIGHUP after children were
        # terminated).
        with open(self.temp_file, 'r') as f:
            res = f.readline()
            self.assertEqual(expected_msg, res)
开发者ID:HoratiusTang,项目名称:neutron,代码行数:34,代码来源:test_server.py


示例7: _assert_router_does_not_exist

 def _assert_router_does_not_exist(self, router):
     # If the namespace assertion succeeds
     # then the devices and iptable rules have also been deleted,
     # so there's no need to check that explicitly.
     self.assertFalse(self._namespace_exists(router.ns_name))
     utils.wait_until_true(
         lambda: not self._metadata_proxy_exists(self.agent.conf, router))
开发者ID:Akanksha08,项目名称:neutron,代码行数:7,代码来源:test_l3_agent.py


示例8: setUp

    def setUp(self):
        super(NeutronServerFixture, self).setUp()

        self.neutron_cfg_fixture = config_fixtures.NeutronConfigFixture(
            self.temp_dir, cfg.CONF.database.connection,
            self.rabbitmq_environment)
        self.plugin_cfg_fixture = config_fixtures.ML2ConfigFixture(
            self.temp_dir)

        self.useFixture(self.neutron_cfg_fixture)
        self.useFixture(self.plugin_cfg_fixture)

        self.neutron_config = self.neutron_cfg_fixture.config
        self.plugin_config = self.plugin_cfg_fixture.config

        config_filenames = [self.neutron_cfg_fixture.filename,
                            self.plugin_cfg_fixture.filename]

        self.process_fixture = self.useFixture(ProcessFixture(
            test_name=self.test_name,
            process_name=self.NEUTRON_SERVER,
            exec_name=self.NEUTRON_SERVER,
            config_filenames=config_filenames))

        utils.wait_until_true(self.server_is_live)
开发者ID:bgxavier,项目名称:neutron,代码行数:25,代码来源:fullstack_fixtures.py


示例9: create_site

    def create_site(self, public_net, private_nets, l3ha=False):
        """Build router(s), namespaces, and ports for a site.

        For HA, we'll create a backup router and wait for both routers
        to be ready, so that we can test pings after failover.
        """
        if l3ha:
            site = SiteInfoWithHaRouter(public_net, private_nets,
                                        self.vpn_agent.host,
                                        self.failover_agent.host)
        else:
            site = SiteInfo(public_net, private_nets)

        site.router = self.create_router(self.vpn_agent, site.info)
        if l3ha:
            backup_info = site.generate_backup_router_info()
            site.backup_router = self.create_router(self.failover_agent,
                                                    backup_info)
            linux_utils.wait_until_true(
                lambda: site.router.ha_state == 'master')
            linux_utils.wait_until_true(
                lambda: site.backup_router.ha_state == 'backup')

        self.create_ports_for(site)
        return site
开发者ID:leftyLin,项目名称:neutron-vpnaas,代码行数:25,代码来源:test_scenario.py


示例10: test_create_update_subnet_with_dhcp

 def test_create_update_subnet_with_dhcp(self):
     ovs = utils.OvsFlowsParser()
     flows_before_change = ovs.dump(self.integration_bridge)
     network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
     network_id = network.create()
     subnet = {'network_id': network_id,
         'cidr': '10.10.254.0/24',
         'gateway_ip': '10.10.254.1',
         'ip_version': 4,
         'name': 'subnet-test',
         'enable_dhcp': True}
     subnet = self.neutron.create_subnet({'subnet': subnet})
     subnet_id = subnet['subnet']['id']
     dhcp_ip = utils.wait_until_is_and_return(
         lambda: self.get_dhcp_ip(network_id, subnet_id),
         exception=Exception('DHCP IP was not generated')
     )
     self.assertFalse(self.check_dhcp_rule(flows_before_change, dhcp_ip))
     wait_until_true(
         lambda: self.check_dhcp_rule(ovs.dump(self.integration_bridge),
                                      dhcp_ip),
         exception=Exception('DHCP ip was not found in OpenFlow rules'),
         timeout=5
     )
     # change dhcp
     updated_subnet = {'enable_dhcp': False}
     self.neutron.update_subnet(subnet_id, {'subnet': updated_subnet})
     time.sleep(utils.DEFAULT_CMD_TIMEOUT)
     flows_after_update = ovs.dump(self.integration_bridge)
     self.assertFalse(self.check_dhcp_rule(flows_after_update, dhcp_ip))
     network.close()
开发者ID:HarborOS,项目名称:dragonflow,代码行数:31,代码来源:test_dhcp_flows.py


示例11: _spawn_keepalived

 def _spawn_keepalived(self, keepalived_manager):
     keepalived_manager.spawn()
     process = keepalived_manager.get_process()
     utils.wait_until_true(
         lambda: process.active, timeout=5, sleep=0.01, exception=RuntimeError(_("Keepalived didn't spawn"))
     )
     return process
开发者ID:bigswitch,项目名称:neutron,代码行数:7,代码来源:test_keepalived.py


示例12: _assert_dvr_external_device

    def _assert_dvr_external_device(self, router):
        external_port = router.get_ex_gw_port()
        snat_ns_name = dvr_snat_ns.SnatNamespace.get_snat_ns_name(
            router.router_id)

        # if the agent is in dvr_snat mode, then we have to check
        # that the correct ports and ip addresses exist in the
        # snat_ns_name namespace
        if self.agent.conf.agent_mode == 'dvr_snat':
            device_exists = functools.partial(
                self.device_exists_with_ips_and_mac,
                external_port,
                router.get_external_device_name,
                snat_ns_name)
            utils.wait_until_true(device_exists)
        # if the agent is in dvr mode then the snat_ns_name namespace
        # should not be present at all:
        elif self.agent.conf.agent_mode == 'dvr':
            self.assertFalse(
                self._namespace_exists(snat_ns_name),
                "namespace %s was found but agent is in dvr mode not dvr_snat"
                % (str(snat_ns_name))
            )
        # if the agent is anything else the test is misconfigured
        # we force a test failure with message
        else:
            self.assertTrue(False, " agent not configured for dvr or dvr_snat")
开发者ID:manjeetbhatia,项目名称:test_l3,代码行数:27,代码来源:test_dvr_router.py


示例13: _read_stream

 def _read_stream(stream, timeout):
     if timeout:
         poller = select.poll()
         poller.register(stream.fileno())
         poll_predicate = functools.partial(poller.poll, 1)
         utils.wait_until_true(poll_predicate, timeout, 0.1, RuntimeError("No output in %.2f seconds" % timeout))
     return stream.readline()
开发者ID:cisco-openstack,项目名称:neutron,代码行数:7,代码来源:net_helpers.py


示例14: wait_until_fip_active

 def wait_until_fip_active(self, timeout=5, sleep=1, exception=None):
     def internal_predicate():
         fip = self.get_floatingip()
         if fip and fip.get_status() == 'ACTIVE':
             return True
         return False
     wait_until_true(internal_predicate, timeout, sleep, exception)
开发者ID:paperandsoap,项目名称:dragonflow,代码行数:7,代码来源:test_objects.py


示例15: test_has_updates

 def test_has_updates(self):
     utils.wait_until_true(lambda: self.monitor.data_received is True)
     self.assertTrue(self.monitor.has_updates, "Initial call should always be true")
     # clear the event list
     self.monitor.get_events()
     self.useFixture(net_helpers.OVSPortFixture())
     # has_updates after port addition should become True
     utils.wait_until_true(lambda: self.monitor.has_updates is True)
开发者ID:neoareslinux,项目名称:neutron,代码行数:8,代码来源:test_ovsdb_monitor.py


示例16: test_ha_router_conf_on_restarted_agent

 def test_ha_router_conf_on_restarted_agent(self):
     router_info = self.generate_router_info(enable_ha=True)
     router1 = self._create_router(self.agent, router_info)
     self._add_fip(router1, '192.168.111.12')
     restarted_agent = l3_test_agent.TestL3NATAgent(self.agent.host,
                                                    self.agent.conf)
     self._create_router(restarted_agent, router1.router)
     utils.wait_until_true(lambda: self._floating_ips_configured(router1))
开发者ID:bradleyjones,项目名称:neutron,代码行数:8,代码来源:test_l3_agent.py


示例17: _wait_for_ipsec_startup

 def _wait_for_ipsec_startup(self, router, driver, conf, should_run=True):
     """Wait for new IPSec process on failover agent to start up."""
     # check for both strongswan and openswan processes
     path = driver.processes[router.router_id].config_dir
     pid_files = ['%s/var/run/charon.pid' % path,
                  '%s/var/run/pluto.pid' % path]
     linux_utils.wait_until_true(
         lambda: should_run == self._ipsec_process_exists(
             conf, router, pid_files))
开发者ID:leftyLin,项目名称:neutron-vpnaas,代码行数:9,代码来源:test_scenario.py


示例18: _ensure_lla_task

 def _ensure_lla_task(self, gw_ifname, ns_name, lla_with_mask):
     # It would be insane for taking so long unless DAD test failed
     # In that case, the subnet would never be assigned a prefix.
     linux_utils.wait_until_true(functools.partial(self._lla_available,
                                                   gw_ifname,
                                                   ns_name,
                                                   lla_with_mask),
                                 timeout=l3_constants.LLA_TASK_TIMEOUT,
                                 sleep=2)
开发者ID:ISCAS-VDI,项目名称:neutron-base,代码行数:9,代码来源:pd.py


示例19: wait_until_is_and_return

def wait_until_is_and_return(predicate, timeout=5, sleep=1, exception=None):
    container = {}

    def internal_predicate():
        container['value'] = predicate()
        return container['value']

    wait_until_true(internal_predicate, timeout, sleep, exception)
    return container.get('value')
开发者ID:HarborOS,项目名称:dragonflow,代码行数:9,代码来源:utils.py


示例20: wait_until_bandwidth_limit_rule_applied

def wait_until_bandwidth_limit_rule_applied(bridge, port_vif, rule):
    def _bandwidth_limit_rule_applied():
        bw_rule = bridge.get_egress_bw_limit_for_port(port_vif)
        expected = None, None
        if rule:
            expected = rule.max_kbps, rule.max_burst_kbps
        return bw_rule == expected

    agent_utils.wait_until_true(_bandwidth_limit_rule_applied)
开发者ID:MODITDC,项目名称:neutron,代码行数:9,代码来源:l2_extensions.py



注:本文中的neutron.agent.linux.utils.wait_until_true函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python idlutils.row_by_record函数代码示例发布时间:2022-05-27
下一篇:
Python utils.replace_file函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap