本文整理汇总了Python中tests.util.poll_until函数的典型用法代码示例。如果您正苦于以下问题:Python poll_until函数的具体用法?Python poll_until怎么用?Python poll_until使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了poll_until函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: wait_for_broken_connection
def wait_for_broken_connection(self):
"""Wait until our connection breaks."""
if not USE_IP:
return
poll_until(self.connection.is_connected,
lambda connected: not connected,
time_out=TIME_OUT_TIME)
开发者ID:pdmars,项目名称:reddwarf_lite-integration,代码行数:7,代码来源:instances_actions.py
示例2: test_get_init_pid
def test_get_init_pid(self):
def get_the_pid():
out, err = process("pgrep init | vzpid - | awk '/%s/{print $1}'"
% str(instance_info.local_id))
instance_info.pid = out.strip()
return len(instance_info.pid) > 0
poll_until(get_the_pid, sleep_time=10, time_out=(60 * 10))
开发者ID:sacharya,项目名称:reddwarf_lite-integration,代码行数:7,代码来源:instances.py
示例3: wait_for_resize
def wait_for_resize(self):
def is_finished_resizing():
instance = self.instance
if instance.status == "RESIZE":
return False
assert_equal("ACTIVE", instance.status)
return True
poll_until(is_finished_resizing, time_out=TIME_OUT_TIME)
开发者ID:pdmars,项目名称:reddwarf_lite-integration,代码行数:8,代码来源:instances_actions.py
示例4: test_create_failure_on_server_failure
def test_create_failure_on_server_failure(self):
# Fake nova will fail a server ending with 'SERVER_ERROR'."
response = self.dbaas.instances.create("test_SERVER_ERROR", 1, {"size": 1}, [])
poll_until(
lambda: self.dbaas.instances.get(response.id), lambda instance: instance.status == "ERROR", time_out=10
)
instance = self.dbaas.instances.get(response.id)
print "Status: %s" % instance.status
assert_equal(instance.status, "ERROR", "Instance did not drop to error after server prov failure.")
开发者ID:nateben,项目名称:trove-integration,代码行数:9,代码来源:instances_states.py
示例5: test_create_failure_on_dns_failure
def test_create_failure_on_dns_failure(self):
# TODO(ed-): Throw DNS-specific monkeywrench into works
response = self.dbaas.instances.create("test_DNS_ERROR", 1, {"size": 1}, [])
poll_until(
lambda: self.dbaas.instances.get(response.id), lambda instance: instance.status == "ERROR", time_out=10
)
instance = self.dbaas.instances.get(response.id)
print "Status: %s" % instance.status
assert_equal(instance.status, "ERROR", "Instance did not drop to error after DNS prov failure.")
开发者ID:nateben,项目名称:trove-integration,代码行数:9,代码来源:instances_states.py
示例6: test_create_failure_on_volume_prov_failure
def test_create_failure_on_volume_prov_failure(self):
# Fake nova will fail a volume of size 9.
response = self.dbaas.instances.create("volume_fail", 1, {"size": 9}, [])
poll_until(
lambda: self.dbaas.instances.get(response.id), lambda instance: instance.status == "ERROR", time_out=10
)
instance = self.dbaas.instances.get(response.id)
print "Status: %s" % instance.status
assert_equal(instance.status, "ERROR", "Instance did not drop to error after volume prov failure.")
开发者ID:nateben,项目名称:trove-integration,代码行数:9,代码来源:instances_states.py
示例7: update_and_wait_to_finish
def update_and_wait_to_finish(self):
instance_info.dbaas_admin.management.update(instance_info.id)
def finished():
current_version = self.get_version()
if current_version == self.next_version:
return True
# The only valid thing for it to be aside from next_version is
# old version.
assert_equal(current_version, self.old_version)
poll_until(finished, sleep_time=1, time_out=3 * 60)
开发者ID:TimSimpsonR,项目名称:reddwarf_lite-integration,代码行数:10,代码来源:instances_actions.py
示例8: test_instance_created
def test_instance_created(self):
def check_status_of_instance():
status, err = process("sudo vzctl status %s | awk '{print $5}'"
% str(instance_info.local_id))
if string_in_list(status, ["running"]):
self.assertEqual("running", status.strip())
return True
else:
return False
poll_until(check_status_of_instance, sleep_time=5, time_out=(60 * 8))
开发者ID:sacharya,项目名称:reddwarf_lite-integration,代码行数:10,代码来源:instances.py
示例9: wait_for_failure_status
def wait_for_failure_status(self):
"""Wait until status becomes running."""
def is_finished_rebooting():
instance = self.instance
if instance.status == "REBOOT":
return False
assert_equal("SHUTDOWN", instance.status)
return True
poll_until(is_finished_rebooting, time_out=TIME_OUT_TIME)
开发者ID:pdmars,项目名称:reddwarf_lite-integration,代码行数:10,代码来源:instances_actions.py
示例10: test_get_ip
def test_get_ip(self):
# wait for a few seconds for the IP to sync up
# is there a better way to do this?
def get_ip_for_instance():
result = instance_info.dbaas.instances.get(instance_info.id)
if hasattr(result, 'ip'):
instance_info.user_ip = result.ip[0]
return True
return False
poll_until(get_ip_for_instance, sleep_time=5, time_out=20)
开发者ID:TimSimpsonR,项目名称:reddwarf_lite-integration,代码行数:10,代码来源:dbaas_ovz.py
示例11: wait_for_successful_restart
def wait_for_successful_restart(self):
"""Wait until status becomes running."""
def is_finished_rebooting():
instance = self.instance
if instance.status == "REBOOT":
return False
assert_equal("ACTIVE", instance.status)
return True
poll_until(is_finished_rebooting, time_out=TIME_OUT_TIME)
开发者ID:pdmars,项目名称:reddwarf_lite-integration,代码行数:10,代码来源:instances_actions.py
示例12: set_up
def set_up(self):
if not FAKE_MODE:
raise SkipTest("This test only works in fake mode.")
self.client = create_client(is_admin=True)
self.mgmt = self.client.management
# Fake nova will fail a server ending with 'test_SERVER_ERROR'."
response = self.client.instances.create('test_SERVER_ERROR', 1,
{'size': 1}, [])
poll_until(lambda: self.client.instances.get(response.id),
lambda instance: instance.status == 'ERROR',
time_out=10)
self.id = response.id
开发者ID:0xdc,项目名称:docs-cloud-databases,代码行数:12,代码来源:instances.py
示例13: create_user
def create_user(self):
"""Create a MySQL user we can use for this test."""
users = [{"name": MYSQL_USERNAME, "password": MYSQL_PASSWORD,
"database": MYSQL_USERNAME}]
self.dbaas.users.create(instance_info.id, users)
def has_user():
users = self.dbaas.users.list(instance_info.id)
return any([user.name == MYSQL_USERNAME for user in users])
poll_until(has_user, time_out=30)
if not FAKE_MODE:
time.sleep(5)
开发者ID:cp16net,项目名称:reddwarf_lite-integration,代码行数:14,代码来源:instances_actions.py
示例14: setUp
def setUp(self):
self.user = test_config.users.find_user(Requirements(is_admin=True))
self.client = create_dbaas_client(self.user)
self.name = 'test_SERVER_ERROR'
# Create an instance with a broken compute instance.
self.response = self.client.instances.create(self.name, 1,
{'size': 1}, [])
poll_until(lambda: self.client.instances.get(self.response.id),
lambda instance: instance.status == 'ERROR',
time_out=10)
self.instance = self.client.instances.get(self.response.id)
print "Status: %s" % self.instance.status
assert_equal(self.instance.status, "ERROR",
"Instance did not drop to error after server prov failure.")
开发者ID:0xdc,项目名称:docs-cloud-databases,代码行数:14,代码来源:accounts.py
示例15: test_volume_resize_success
def test_volume_resize_success(self):
def check_resize_status():
instance = instance_info.dbaas.instances.get(instance_info.id)
if instance.status == "ACTIVE":
return True
elif instance.status == "RESIZE":
return False
else:
fail("Status should not be %s" % instance.status)
poll_until(check_resize_status, sleep_time=2, time_out=300)
volumes = db.volume_get(context.get_admin_context(),
instance_info.volume_id)
assert_equal(volumes.status, 'in-use')
assert_equal(volumes.size, self.new_volume_size)
assert_equal(volumes.attach_status, 'attached')
开发者ID:pdmars,项目名称:reddwarf_lite-integration,代码行数:17,代码来源:instances_actions.py
示例16: _wait_for_active
def _wait_for_active(self):
poll_until(lambda : self.client.instances.get(self.id),
lambda instance : instance.status == "ACTIVE",
time_out=(60 * 8))
开发者ID:0xdc,项目名称:docs-cloud-databases,代码行数:4,代码来源:instances_mysql_down.py
示例17: wait_for_instance_status
def wait_for_instance_status(self, instance_id, status="ACTIVE"):
poll_until(lambda: self.dbaas.instances.get(instance_id),
lambda instance: instance.status == status,
time_out=10)
开发者ID:0xdc,项目名称:docs-cloud-databases,代码行数:4,代码来源:instances_delete.py
示例18: wait_for_instance_task_status
def wait_for_instance_task_status(self, instance_id, description):
poll_until(lambda: self.dbaas.management.show(instance_id),
lambda instance: instance.task_description == description,
time_out=10)
开发者ID:0xdc,项目名称:docs-cloud-databases,代码行数:4,代码来源:instances_delete.py
示例19: _wait_for_new_volume_size
def _wait_for_new_volume_size(self, new_size):
poll_until(lambda : self.client.instances.get(self.id),
lambda instance : instance.volume['size'] == new_size,
time_out=(60 * 8))
开发者ID:0xdc,项目名称:docs-cloud-databases,代码行数:4,代码来源:instances_mysql_down.py
注:本文中的tests.util.poll_until函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论