• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.wait_until_true函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.common.utils.wait_until_true函数的典型用法代码示例。如果您正苦于以下问题:Python wait_until_true函数的具体用法?Python wait_until_true怎么用?Python wait_until_true使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了wait_until_true函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_both_ha_router_lost_gw_connection

    def test_both_ha_router_lost_gw_connection(self):
        self.agent.conf.set_override(
            'ha_vrrp_health_check_interval', 5)
        self.failover_agent.conf.set_override(
            'ha_vrrp_health_check_interval', 5)

        router1, router2 = self.create_ha_routers()

        master_router, slave_router = self._get_master_and_slave_routers(
            router1, router2)

        self.fail_gw_router_port(master_router)
        self.fail_gw_router_port(slave_router)

        common_utils.wait_until_true(
            lambda: master_router.ha_state == 'master')
        common_utils.wait_until_true(
            lambda: slave_router.ha_state == 'master')

        self.restore_gw_router_port(master_router)

        new_master, new_slave = self._get_master_and_slave_routers(
            master_router, slave_router)

        self.assertEqual(master_router, new_master)
        self.assertEqual(slave_router, new_slave)
开发者ID:cubeek,项目名称:neutron,代码行数:26,代码来源:test_ha_router.py


示例2: test_mtu_update

    def test_mtu_update(self):
        tenant_id = uuidutils.generate_uuid()

        router = self.safe_client.create_router(tenant_id)
        network = self.safe_client.create_network(tenant_id)
        subnet = self.safe_client.create_subnet(
            tenant_id, network['id'], '20.0.0.0/24', gateway_ip='20.0.0.1')
        self.safe_client.add_router_interface(router['id'], subnet['id'])

        namespace = "%[email protected]%s" % (
            self._get_namespace(router['id']),
            self.environment.hosts[0].l3_agent.get_namespace_suffix(), )
        self._assert_namespace_exists(namespace)

        ip = ip_lib.IPWrapper(namespace)
        common_utils.wait_until_true(lambda: ip.get_devices())

        devices = ip.get_devices()
        self.assertEqual(1, len(devices))

        ri_dev = devices[0]
        mtu = ri_dev.link.mtu
        self.assertEqual(1500, mtu)

        mtu -= 1
        network = self.safe_client.update_network(network['id'], mtu=mtu)
        common_utils.wait_until_true(lambda: ri_dev.link.mtu == mtu)
开发者ID:cubeek,项目名称:neutron,代码行数:27,代码来源:test_l3_agent.py


示例3: _test_controller_timeout_does_not_break_connectivity

    def _test_controller_timeout_does_not_break_connectivity(self,
                                                             kill_signal=None):
        # Environment preparation is effectively the same as connectivity test
        vms = self._prepare_vms_in_single_network()
        vms.ping_all()

        ns0 = vms[0].namespace
        ip1 = vms[1].ip

        LOG.debug("Stopping agents (hence also OVS bridge controllers)")
        for host in self.environment.hosts:
            if kill_signal is not None:
                host.l2_agent.stop(kill_signal=kill_signal)
            else:
                host.l2_agent.stop()

        # Ping to make sure that 3 x 5 seconds is overcame even under a high
        # load. The time was chosen to match three times inactivity_probe time,
        # which is the time after which the OVS vswitchd
        # treats the controller as dead and starts managing the bridge
        # by itself when the fail type settings is not set to secure (see
        # ovs-vsctl man page for further details)
        with net_helpers.async_ping(ns0, [ip1], timeout=2, count=25) as done:
            common_utils.wait_until_true(
                done,
                exception=RuntimeError("Networking interrupted after "
                                       "controllers have vanished"))
开发者ID:cubeek,项目名称:neutron,代码行数:27,代码来源:test_connectivity.py


示例4: _assert_dvr_external_device

    def _assert_dvr_external_device(self, router):
        external_port = router.get_ex_gw_port()
        snat_ns_name = dvr_snat_ns.SnatNamespace.get_snat_ns_name(
            router.router_id)

        # if the agent is in dvr_snat mode, then we have to check
        # that the correct ports and ip addresses exist in the
        # snat_ns_name namespace
        if self.agent.conf.agent_mode == 'dvr_snat':
            device_exists = functools.partial(
                self.device_exists_with_ips_and_mac,
                external_port,
                router.get_external_device_name,
                snat_ns_name)
            utils.wait_until_true(device_exists)
        # if the agent is in dvr mode then the snat_ns_name namespace
        # should not be present at all:
        elif self.agent.conf.agent_mode == 'dvr':
            self.assertFalse(
                self._namespace_exists(snat_ns_name),
                "namespace %s was found but agent is in dvr mode not dvr_snat"
                % (str(snat_ns_name))
            )
        # if the agent is anything else the test is misconfigured
        # we force a test failure with message
        else:
            self.assertTrue(False, " agent not configured for dvr or dvr_snat")
开发者ID:suda999,项目名称:neutron,代码行数:27,代码来源:test_dvr_router.py


示例5: _assert_ping_during_agents_restart

    def _assert_ping_during_agents_restart(
            self, agents, src_namespace, ips, restart_timeout=10,
            ping_timeout=1, count=10):
        with net_helpers.async_ping(
                src_namespace, ips, timeout=ping_timeout,
                count=count) as done:
            LOG.debug("Restarting agents")
            executor = futures.ThreadPoolExecutor(max_workers=len(agents))
            restarts = [agent.restart(executor=executor)
                        for agent in agents]

            futures.wait(restarts, timeout=restart_timeout)

            self.assertTrue(all([r.done() for r in restarts]))
            LOG.debug("Restarting agents - done")

            # It is necessary to give agents time to initialize
            # because some crucial steps (e.g. setting up bridge flows)
            # happen only after RPC is established
            agent_names = ', '.join({agent.process_fixture.process_name
                                     for agent in agents})
            common_utils.wait_until_true(
                done,
                timeout=count * (ping_timeout + 1),
                exception=RuntimeError("Could not ping the other VM, "
                                       "re-starting %s leads to network "
                                       "disruption" % agent_names))
开发者ID:openstack,项目名称:neutron,代码行数:27,代码来源:base.py


示例6: _assert_router_does_not_exist

 def _assert_router_does_not_exist(self, router):
     # If the namespace assertion succeeds
     # then the devices and iptable rules have also been deleted,
     # so there's no need to check that explicitly.
     self.assertFalse(self._namespace_exists(router.ns_name))
     common_utils.wait_until_true(
         lambda: not self._metadata_proxy_exists(self.agent.conf, router))
开发者ID:openstack,项目名称:neutron,代码行数:7,代码来源:framework.py


示例7: _test_metadata_proxy_spawn_kill_with_subnet_create_delete

    def _test_metadata_proxy_spawn_kill_with_subnet_create_delete(self):
        network = self.network_dict_for_dhcp(ip_version=6)
        self.configure_dhcp_for_network(network=network)
        pm = self._get_metadata_proxy_process(network)

        # A newly created network with ipv6 subnet will not have metadata proxy
        self.assertFalse(pm.active)

        new_network = copy.deepcopy(network)
        dhcp_enabled_ipv4_subnet = self.create_subnet_dict(network.id)
        new_network.subnets.append(dhcp_enabled_ipv4_subnet)
        self.mock_plugin_api.get_network_info.return_value = new_network
        self.agent.refresh_dhcp_helper(network.id)
        # Metadata proxy should be spawned for the newly added subnet
        common_utils.wait_until_true(
            lambda: pm.active,
            timeout=5,
            sleep=0.1,
            exception=RuntimeError("Metadata proxy didn't spawn"))

        self.mock_plugin_api.get_network_info.return_value = network
        self.agent.refresh_dhcp_helper(network.id)
        # Metadata proxy should be killed because network doesn't need it.
        common_utils.wait_until_true(
            lambda: not pm.active,
            timeout=5,
            sleep=0.1,
            exception=RuntimeError("Metadata proxy didn't get killed"))
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:28,代码来源:test_dhcp_agent.py


示例8: test_has_updates

 def test_has_updates(self):
     utils.wait_until_true(lambda: self.monitor.has_updates)
     # clear the event list
     self.monitor.get_events()
     self.useFixture(net_helpers.OVSPortFixture())
     # has_updates after port addition should become True
     utils.wait_until_true(lambda: self.monitor.has_updates is True)
开发者ID:cubeek,项目名称:neutron,代码行数:7,代码来源:test_ovsdb_monitor.py


示例9: block_until_ping

 def block_until_ping(self, dst_ip):
     predicate = functools.partial(self.ping_predicate, dst_ip)
     utils.wait_until_true(
         predicate,
         exception=FakeMachineException(
             "No ICMP reply obtained from IP address %s" % dst_ip)
     )
开发者ID:coreycb,项目名称:neutron,代码行数:7,代码来源:machine_fixtures.py


示例10: test_cleanup_network_namespaces_cleans_dhcp_and_l3_namespaces

    def test_cleanup_network_namespaces_cleans_dhcp_and_l3_namespaces(self):
        dhcp_namespace = self.useFixture(
            net_helpers.NamespaceFixture(dhcp.NS_PREFIX)).name
        l3_namespace = self.useFixture(
            net_helpers.NamespaceFixture(namespaces.NS_PREFIX)).name
        bridge = self.useFixture(
            net_helpers.VethPortFixture(namespace=dhcp_namespace)).bridge
        self.useFixture(
            net_helpers.VethPortFixture(bridge, l3_namespace))

        # we scope the get_namespaces to our own ones not to affect other
        # tests, as otherwise cleanup will kill them all
        self.get_namespaces.return_value = [l3_namespace, dhcp_namespace]

        # launch processes in each namespace to make sure they're
        # killed during cleanup
        procs_launched = self._launch_processes([l3_namespace, dhcp_namespace])
        self.assertIsNot(procs_launched, 0)
        common_utils.wait_until_true(
            lambda: self._get_num_spawned_procs() == procs_launched,
            timeout=15,
            exception=Exception("Didn't spawn expected number of processes"))

        netns_cleanup.cleanup_network_namespaces(self.conf)

        self.get_namespaces_p.stop()
        namespaces_now = ip_lib.list_network_namespaces()
        procs_after = self._get_num_spawned_procs()
        self.assertEqual(procs_after, 0)
        self.assertNotIn(l3_namespace, namespaces_now)
        self.assertNotIn(dhcp_namespace, namespaces_now)
开发者ID:igordcard,项目名称:neutron,代码行数:31,代码来源:test_netns_cleanup.py


示例11: test_mtu_update

    def test_mtu_update(self):
        # The test case needs access to devices in nested namespaces. ip_lib
        # doesn't support it, and it's probably unsafe to touch the library for
        # testing matters.
        # TODO(jlibosva) revisit when ip_lib supports nested namespaces
        if self.environment.hosts[0].dhcp_agent.namespace is not None:
            self.skipTest("ip_lib doesn't support nested namespaces")

        self.vm.block_until_dhcp_config_done()

        namespace = dhcp_agent._get_namespace_name(
            self.network['id'],
            suffix=self.environment.hosts[0].dhcp_agent.get_namespace_suffix())
        ip = ip_lib.IPWrapper(namespace)

        devices = ip.get_devices()
        self.assertEqual(1, len(devices))

        dhcp_dev = devices[0]
        mtu = dhcp_dev.link.mtu
        self.assertEqual(1450, mtu)

        mtu -= 1
        self.safe_client.update_network(self.network['id'], mtu=mtu)
        common_utils.wait_until_true(lambda: dhcp_dev.link.mtu == mtu)
开发者ID:igordcard,项目名称:neutron,代码行数:25,代码来源:test_dhcp_agent.py


示例12: test_l2_agent_restart

    def test_l2_agent_restart(self, agent_restart_timeout=20):
        # Environment preparation is effectively the same as connectivity test
        vms = self._prepare_vms_in_single_network()
        vms.ping_all()

        ns0 = vms[0].namespace
        ip1 = vms[1].ip
        agents = [host.l2_agent for host in self.environment.hosts]

        # Restart agents on all nodes simultaneously while pinging across
        # the hosts. The ping has to cross int and phys bridges and travels
        # via central bridge as the vms are on separate hosts.
        with net_helpers.async_ping(ns0, [ip1], timeout=2,
                                    count=agent_restart_timeout) as done:
            LOG.debug("Restarting agents")
            executor = futures.ThreadPoolExecutor(max_workers=len(agents))
            restarts = [agent.restart(executor=executor)
                        for agent in agents]

            futures.wait(restarts, timeout=agent_restart_timeout)

            self.assertTrue(all([r.done() for r in restarts]))
            LOG.debug("Restarting agents - done")

            # It is necessary to give agents time to initialize
            # because some crucial steps (e.g. setting up bridge flows)
            # happen only after RPC is established
            common_utils.wait_until_true(
                done,
                exception=RuntimeError("Could not ping the other VM, L2 agent "
                                       "restart leads to network disruption"))
开发者ID:eayunstack,项目名称:neutron,代码行数:31,代码来源:test_connectivity.py


示例13: _test_restart_service_on_sighup

    def _test_restart_service_on_sighup(self, service, workers=1):
        """Test that a service correctly (re)starts on receiving SIGHUP.

        1. Start a service with a given number of workers.
        2. Send SIGHUP to the service.
        3. Wait for workers (if any) to (re)start.
        """

        self._start_server(callback=service, workers=workers)
        os.kill(self.service_pid, signal.SIGHUP)

        expected_msg = FAKE_START_MSG * workers * 2

        # Wait for temp file to be created and its size reaching the expected
        # value
        expected_size = len(expected_msg)
        condition = lambda: (os.path.isfile(self.temp_file)
                             and os.stat(self.temp_file).st_size ==
                             expected_size)

        utils.wait_until_true(
            condition, timeout=5, sleep=0.1,
            exception=RuntimeError(
                "Timed out waiting for file %(filename)s to be created and "
                "its size become equal to %(size)s." %
                {'filename': self.temp_file,
                 'size': expected_size}))

        # Verify that start has been called twice for each worker (one for
        # initial start, and the second one on SIGHUP after children were
        # terminated).
        with open(self.temp_file, 'r') as f:
            res = f.readline()
            self.assertEqual(expected_msg, res)
开发者ID:21atlas,项目名称:neutron,代码行数:34,代码来源:test_server.py


示例14: test_new_fip_sends_garp

 def test_new_fip_sends_garp(self):
     next_ip_cidr = net_helpers.increment_ip_cidr(self.machines.ip_cidr, 2)
     expected_ip = str(netaddr.IPNetwork(next_ip_cidr).ip)
     # Create incomplete ARP entry
     self.peer.assert_no_ping(expected_ip)
     has_entry = has_expected_arp_entry(
         self.peer.port.name,
         self.peer.namespace,
         expected_ip,
         self.router.port.link.address)
     self.assertFalse(has_entry)
     self.router.port.addr.add(next_ip_cidr)
     has_arp_entry_predicate = functools.partial(
         has_expected_arp_entry,
         self.peer.port.name,
         self.peer.namespace,
         expected_ip,
         self.router.port.link.address,
     )
     exc = RuntimeError(
         "No ARP entry in %s namespace containing IP address %s and MAC "
         "address %s" % (
             self.peer.namespace,
             expected_ip,
             self.router.port.link.address))
     utils.wait_until_true(
         has_arp_entry_predicate,
         exception=exc)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:28,代码来源:test_keepalived_state_change.py


示例15: _wait_until_ipv6_forwarding_has_state

    def _wait_until_ipv6_forwarding_has_state(self, ns_name, dev_name, state):

        def _ipv6_forwarding_has_state():
            return ip_lib.get_ipv6_forwarding(
                device=dev_name, namespace=ns_name) == state

        common_utils.wait_until_true(_ipv6_forwarding_has_state)
开发者ID:openstack,项目名称:neutron,代码行数:7,代码来源:framework.py


示例16: test_ha_router_restart_agents_no_packet_lost

    def test_ha_router_restart_agents_no_packet_lost(self):
        tenant_id = uuidutils.generate_uuid()
        ext_net, ext_sub = self._create_external_network_and_subnet(tenant_id)
        router = self.safe_client.create_router(tenant_id, ha=True,
                                                external_network=ext_net['id'])

        external_vm = self.useFixture(
            machine_fixtures.FakeMachine(
                self.environment.central_external_bridge,
                common_utils.ip_to_cidr(ext_sub['gateway_ip'], 24)))

        common_utils.wait_until_true(
            lambda:
            len(self.client.list_l3_agent_hosting_routers(
                router['id'])['agents']) == 2,
            timeout=90)

        common_utils.wait_until_true(
            functools.partial(
                self._is_ha_router_active_on_one_agent,
                router['id']),
            timeout=90)

        router_ip = router['external_gateway_info'][
            'external_fixed_ips'][0]['ip_address']
        l3_agents = [host.agents['l3'] for host in self.environment.hosts]

        self._assert_ping_during_agents_restart(
            l3_agents, external_vm.namespace, [router_ip], count=60)
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:29,代码来源:test_l3_agent.py


示例17: block_until_no_ping

 def block_until_no_ping(self, dst_ip):
     predicate = functools.partial(
         lambda ip: not self.ping_predicate(ip), dst_ip)
     utils.wait_until_true(
         predicate,
         exception=FakeMachineException(
             "ICMP packets still pass to %s IP address." % dst_ip)
     )
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:8,代码来源:machine_fixtures.py


示例18: _wait_for_min_bw_rule_applied

 def _wait_for_min_bw_rule_applied(self, vm, min_bw, direction):
     if direction == constants.EGRESS_DIRECTION:
         utils.wait_until_true(
             lambda: vm.bridge.get_egress_min_bw_for_port(
                 vm.neutron_port['id']) == min_bw)
     elif direction == constants.INGRESS_DIRECTION:
         self.fail('"%s" direction not implemented'
                   % constants.INGRESS_DIRECTION)
开发者ID:openstack,项目名称:neutron,代码行数:8,代码来源:test_qos.py


示例19: block_until_dhcp_config_done

 def block_until_dhcp_config_done(self):
     utils.wait_until_true(
         lambda: self.ip_configured() and self.gateway_configured(),
         exception=machine_fixtures.FakeMachineException(
             "Address %s or gateway %s not configured properly on "
             "port %s" % (self.ip_cidr, self.gateway_ip, self.port.name)
         )
     )
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:8,代码来源:machine.py


示例20: destroy_state_change_monitor

 def destroy_state_change_monitor(self, process_monitor):
     pm = self._get_state_change_monitor_process_manager()
     process_monitor.unregister(self.router_id, IP_MONITOR_PROCESS_SERVICE)
     pm.disable(sig=str(int(signal.SIGTERM)))
     try:
         common_utils.wait_until_true(lambda: not pm.active, timeout=SIGTERM_TIMEOUT)
     except common_utils.WaitTimeout:
         pm.disable(sig=str(int(signal.SIGKILL)))
开发者ID:noironetworks,项目名称:neutron,代码行数:8,代码来源:ha_router.py



注:本文中的neutron.common.utils.wait_until_true函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python context.get_admin_context函数代码示例发布时间:2022-05-27
下一篇:
Python utils.subprocess_popen函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap