本文整理汇总了Python中tempest.common.waiters.wait_for_server_status函数的典型用法代码示例。如果您正苦于以下问题:Python wait_for_server_status函数的具体用法?Python wait_for_server_status怎么用?Python wait_for_server_status使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wait_for_server_status函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _test_live_migration
def _test_live_migration(self, state='ACTIVE', volume_backed=False):
"""Tests live migration between two hosts.
Requires CONF.compute_feature_enabled.live_migration to be True.
:param state: The vm_state the migrated server should be in before and
after the live migration. Supported values are 'ACTIVE'
and 'PAUSED'.
:param volume_backed: If the instance is volume backed or not. If
volume_backed, *block* migration is not used.
"""
# Live migrate an instance to another host
server_id = self.create_test_server(wait_until="ACTIVE",
volume_backed=volume_backed)['id']
source_host = self.get_host_for_server(server_id)
destination_host = self.get_host_other_than(server_id)
if state == 'PAUSED':
self.admin_servers_client.pause_server(server_id)
waiters.wait_for_server_status(self.admin_servers_client,
server_id, state)
LOG.info("Live migrate from source %s to destination %s",
source_host, destination_host)
self._live_migrate(server_id, destination_host, state, volume_backed)
if CONF.compute_feature_enabled.live_migrate_back_and_forth:
# If live_migrate_back_and_forth is enabled it is a grenade job.
# Therefore test should validate whether LM is compatible in both
# ways, so live migrate VM back to the source host
LOG.info("Live migrate back to source %s", source_host)
self._live_migrate(server_id, source_host, state, volume_backed)
开发者ID:masayukig,项目名称:tempest,代码行数:31,代码来源:test_live_migration.py
示例2: _stop_instances
def _stop_instances(self, instances):
# NOTE(gfidente): two loops so we do not wait for the status twice
for i in instances:
self.servers_client.stop(i['id'])
for i in instances:
waiters.wait_for_server_status(self.servers_client,
i['id'], 'SHUTOFF')
开发者ID:Juraci,项目名称:tempest,代码行数:7,代码来源:test_volume_boot_pattern.py
示例3: test_rebuild_server_in_error_state
def test_rebuild_server_in_error_state(self):
# The server in error state should be rebuilt using the provided
# image and changed to ACTIVE state
# resetting vm state require admin privilege
self.client.reset_state(self.s1_id, state='error')
rebuilt_server = self.non_admin_client.rebuild_server(
self.s1_id, self.image_ref_alt)['server']
self.addCleanup(waiters.wait_for_server_status, self.non_admin_client,
self.s1_id, 'ACTIVE')
self.addCleanup(self.non_admin_client.rebuild_server, self.s1_id,
self.image_ref)
# Verify the properties in the initial response are correct
self.assertEqual(self.s1_id, rebuilt_server['id'])
rebuilt_image_id = rebuilt_server['image']['id']
self.assertEqual(self.image_ref_alt, rebuilt_image_id)
self.assertEqual(self.flavor_ref, rebuilt_server['flavor']['id'])
waiters.wait_for_server_status(self.non_admin_client,
rebuilt_server['id'], 'ACTIVE',
raise_on_error=False)
# Verify the server properties after rebuilding
server = (self.non_admin_client.show_server(rebuilt_server['id'])
['server'])
rebuilt_image_id = server['image']['id']
self.assertEqual(self.image_ref_alt, rebuilt_image_id)
开发者ID:varapreddy,项目名称:tempest,代码行数:26,代码来源:test_servers.py
示例4: setUp
def setUp(self):
super(ServersNegativeTestMultiTenantJSON, self).setUp()
try:
waiters.wait_for_server_status(self.client, self.server_id,
'ACTIVE')
except Exception:
self.__class__.server_id = self.rebuild_server(self.server_id)
开发者ID:NeCTAR-RC,项目名称:tempest,代码行数:7,代码来源:test_servers_negative.py
示例5: test_delete_server_while_in_pause_state
def test_delete_server_while_in_pause_state(self):
# Delete a server while it's VM state is Pause
server = self.create_test_server(wait_until='ACTIVE')
self.client.pause_server(server['id'])
waiters.wait_for_server_status(self.client, server['id'], 'PAUSED')
self.client.delete_server(server['id'])
waiters.wait_for_server_termination(self.client, server['id'])
开发者ID:WoolenWang,项目名称:tempest,代码行数:7,代码来源:test_delete_server.py
示例6: _test_live_migration
def _test_live_migration(self, state='ACTIVE', volume_backed=False):
"""Tests live migration between two hosts.
Requires CONF.compute_feature_enabled.live_migration to be True.
:param state: The vm_state the migrated server should be in before and
after the live migration. Supported values are 'ACTIVE'
and 'PAUSED'.
:param volume_backed: If the instance is volume backed or not. If
volume_backed, *block* migration is not used.
"""
# Live migrate an instance to another host
server_id = self.create_test_server(wait_until="ACTIVE",
volume_backed=volume_backed)['id']
actual_host = self._get_host_for_server(server_id)
target_host = self._get_host_other_than(actual_host)
if state == 'PAUSED':
self.admin_servers_client.pause_server(server_id)
waiters.wait_for_server_status(self.admin_servers_client,
server_id, state)
self._migrate_server_to(server_id, target_host, volume_backed)
waiters.wait_for_server_status(self.servers_client, server_id, state)
migration_list = (self.admin_migration_client.list_migrations()
['migrations'])
msg = ("Live Migration failed. Migrations list for Instance "
"%s: [" % server_id)
for live_migration in migration_list:
if (live_migration['instance_uuid'] == server_id):
msg += "\n%s" % live_migration
msg += "]"
self.assertEqual(target_host, self._get_host_for_server(server_id),
msg)
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:35,代码来源:test_live_migration.py
示例7: _create_server
def _create_server(self, name):
keypair = self.create_keypair()
security_groups = [{'name': self.security_group['name']}]
create_kwargs = {
'networks': [
{'uuid': self.network['id']},
],
'key_name': keypair['name'],
'security_groups': security_groups,
'name': name
}
net_name = self.network['name']
server = self.create_server(**create_kwargs)
waiters.wait_for_server_status(self.servers_client,
server['id'], 'ACTIVE')
server = self.servers_client.show_server(server['id'])
server = server['server']
self.servers_keypairs[server['id']] = keypair
if (config.network.public_network_id and not
config.network.project_networks_reachable):
public_network_id = config.network.public_network_id
floating_ip = self.create_floating_ip(
server, public_network_id)
self.floating_ips[floating_ip] = server
self.server_ips[server['id']] = floating_ip.floating_ip_address
else:
self.server_ips[server['id']] =\
server['addresses'][net_name][0]['addr']
self.server_fixed_ips[server['id']] =\
server['addresses'][net_name][0]['addr']
self.assertTrue(self.servers_keypairs)
return server
开发者ID:fnaval,项目名称:neutron-lbaas,代码行数:32,代码来源:base.py
示例8: _test_cold_migrate_server
def _test_cold_migrate_server(self, revert=False):
if CONF.compute.min_compute_nodes < 2:
msg = "Less than 2 compute nodes, skipping multinode tests."
raise self.skipException(msg)
server = self.create_test_server(wait_until="ACTIVE")
src_host = self.admin_servers_client.show_server(
server['id'])['server']['OS-EXT-SRV-ATTR:host']
self.admin_servers_client.migrate_server(server['id'])
waiters.wait_for_server_status(self.servers_client,
server['id'], 'VERIFY_RESIZE')
if revert:
self.servers_client.revert_resize_server(server['id'])
assert_func = self.assertEqual
else:
self.servers_client.confirm_resize_server(server['id'])
assert_func = self.assertNotEqual
waiters.wait_for_server_status(self.servers_client,
server['id'], 'ACTIVE')
dst_host = self.admin_servers_client.show_server(
server['id'])['server']['OS-EXT-SRV-ATTR:host']
assert_func(src_host, dst_host)
开发者ID:Juniper,项目名称:tempest,代码行数:26,代码来源:test_migrations.py
示例9: boot_instance
def boot_instance(self):
self.instance = self.create_server(
key_name=self.keypair['name'])
self.wait_node(self.instance['id'])
self.node = self.get_node(instance_id=self.instance['id'])
self.wait_power_state(self.node['uuid'], BaremetalPowerStates.POWER_ON)
self.wait_provisioning_state(
self.node['uuid'],
[BaremetalProvisionStates.DEPLOYWAIT,
BaremetalProvisionStates.ACTIVE],
timeout=CONF.baremetal.deploywait_timeout)
self.wait_provisioning_state(self.node['uuid'],
BaremetalProvisionStates.ACTIVE,
timeout=CONF.baremetal.active_timeout,
interval=30)
waiters.wait_for_server_status(self.servers_client,
self.instance['id'], 'ACTIVE')
self.node = self.get_node(instance_id=self.instance['id'])
self.instance = (self.servers_client.show_server(self.instance['id'])
['server'])
开发者ID:SahilTikale,项目名称:ironic,代码行数:25,代码来源:baremetal_manager.py
示例10: _rebuild_server_and_check
def _rebuild_server_and_check(self, image_ref):
rebuilt_server = self.client.rebuild_server(self.server_id, image_ref)["server"]
waiters.wait_for_server_status(self.client, self.server_id, "ACTIVE")
msg = "Server was not rebuilt to the original image. " "The original image: {0}. The current image: {1}".format(
image_ref, rebuilt_server["image"]["id"]
)
self.assertEqual(image_ref, rebuilt_server["image"]["id"], msg)
开发者ID:sebrandon1,项目名称:tempest,代码行数:7,代码来源:test_server_actions.py
示例11: _test_reboot_server
def _test_reboot_server(self, reboot_type):
if CONF.validation.run_validation:
# Get the time the server was last rebooted,
server = self.client.show_server(self.server_id)["server"]
linux_client = remote_client.RemoteClient(
self.get_server_ip(server),
self.ssh_user,
self.password,
self.validation_resources["keypair"]["private_key"],
server=server,
servers_client=self.client,
)
boot_time = linux_client.get_boot_time()
# NOTE: This sync is for avoiding the loss of pub key data
# in a server
linux_client.exec_command("sync")
self.client.reboot_server(self.server_id, type=reboot_type)
waiters.wait_for_server_status(self.client, self.server_id, "ACTIVE")
if CONF.validation.run_validation:
# Log in and verify the boot time has changed
linux_client = remote_client.RemoteClient(
self.get_server_ip(server),
self.ssh_user,
self.password,
self.validation_resources["keypair"]["private_key"],
server=server,
servers_client=self.client,
)
new_boot_time = linux_client.get_boot_time()
self.assertTrue(new_boot_time > boot_time, "%s > %s" % (new_boot_time, boot_time))
开发者ID:sebrandon1,项目名称:tempest,代码行数:33,代码来源:test_server_actions.py
示例12: _test_live_block_migration
def _test_live_block_migration(self, state='ACTIVE'):
"""Tests live block migration between two hosts.
Requires CONF.compute_feature_enabled.live_migration to be True.
:param state: The vm_state the migrated server should be in before and
after the live migration. Supported values are 'ACTIVE'
and 'PAUSED'.
"""
# Live block migrate an instance to another host
if len(self._get_compute_hostnames()) < 2:
raise self.skipTest(
"Less than 2 compute nodes, skipping migration test.")
server_id = self._get_an_active_server()
actual_host = self._get_host_for_server(server_id)
target_host = self._get_host_other_than(actual_host)
if state == 'PAUSED':
self.admin_servers_client.pause_server(server_id)
waiters.wait_for_server_status(self.admin_servers_client,
server_id, state)
self._migrate_server_to(server_id, target_host)
waiters.wait_for_server_status(self.servers_client, server_id, state)
self.assertEqual(target_host, self._get_host_for_server(server_id))
开发者ID:nunogt,项目名称:tempest,代码行数:25,代码来源:test_live_migration.py
示例13: _shelve_then_unshelve_server
def _shelve_then_unshelve_server(self, server):
compute.shelve_server(self.servers_client, server['id'],
force_shelve_offload=True)
self.servers_client.unshelve_server(server['id'])
waiters.wait_for_server_status(self.servers_client, server['id'],
'ACTIVE')
开发者ID:masayukig,项目名称:tempest,代码行数:7,代码来源:test_shelve_instance.py
示例14: _test_reboot_server
def _test_reboot_server(self, reboot_type):
if CONF.validation.run_validation:
# Get the time the server was last rebooted,
server = self.client.show_server(self.server_id)['server']
linux_client = remote_client.RemoteClient(
self.get_server_ip(server),
self.ssh_user,
self.password,
self.validation_resources['keypair']['private_key'],
server=server,
servers_client=self.client)
boot_time = linux_client.get_boot_time()
self.client.reboot_server(self.server_id, type=reboot_type)
waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
if CONF.validation.run_validation:
# Log in and verify the boot time has changed
linux_client = remote_client.RemoteClient(
self.get_server_ip(server),
self.ssh_user,
self.password,
self.validation_resources['keypair']['private_key'],
server=server,
servers_client=self.client)
new_boot_time = linux_client.get_boot_time()
self.assertTrue(new_boot_time > boot_time,
'%s > %s' % (new_boot_time, boot_time))
开发者ID:devfaz,项目名称:tempest,代码行数:28,代码来源:test_server_actions.py
示例15: test_associate_already_associated_floating_ip
def test_associate_already_associated_floating_ip(self):
# positive test:Association of an already associated floating IP
# to specific server should change the association of the Floating IP
# Create server so as to use for Multiple association
new_name = data_utils.rand_name('floating_server')
body = self.create_test_server(name=new_name)
waiters.wait_for_server_status(self.servers_client,
body['id'], 'ACTIVE')
self.new_server_id = body['id']
self.addCleanup(self.servers_client.delete_server, self.new_server_id)
# Associating floating IP for the first time
self.client.associate_floating_ip_to_server(
self.floating_ip,
self.server_id)
# Associating floating IP for the second time
self.client.associate_floating_ip_to_server(
self.floating_ip,
self.new_server_id)
self.addCleanup(self.client.disassociate_floating_ip_from_server,
self.floating_ip,
self.new_server_id)
# Make sure no longer associated with old server
self.assertRaises((lib_exc.NotFound,
lib_exc.UnprocessableEntity,
lib_exc.Conflict),
self.client.disassociate_floating_ip_from_server,
self.floating_ip, self.server_id)
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:30,代码来源:test_floating_ips_actions.py
示例16: _test_reboot_server
def _test_reboot_server(self, reboot_type):
if CONF.validation.run_validation:
validation_resources = self.get_class_validation_resources(
self.os_primary)
# Get the time the server was last rebooted,
server = self.client.show_server(self.server_id)['server']
linux_client = remote_client.RemoteClient(
self.get_server_ip(server, validation_resources),
self.ssh_user,
self.password,
validation_resources['keypair']['private_key'],
server=server,
servers_client=self.client)
boot_time = linux_client.get_boot_time()
# NOTE: This sync is for avoiding the loss of pub key data
# in a server
linux_client.exec_command("sync")
self.client.reboot_server(self.server_id, type=reboot_type)
waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
if CONF.validation.run_validation:
# Log in and verify the boot time has changed
linux_client = remote_client.RemoteClient(
self.get_server_ip(server, validation_resources),
self.ssh_user,
self.password,
validation_resources['keypair']['private_key'],
server=server,
servers_client=self.client)
new_boot_time = linux_client.get_boot_time()
self.assertGreater(new_boot_time, boot_time,
'%s > %s' % (new_boot_time, boot_time))
开发者ID:masayukig,项目名称:tempest,代码行数:34,代码来源:test_server_actions.py
示例17: test_update_server_name_in_stop_state
def test_update_server_name_in_stop_state(self):
# The server name should be changed to the the provided value
server = self.create_test_server(wait_until='ACTIVE')
self.client.stop(server['id'])
waiters.wait_for_server_status(self.client, server['id'], 'SHUTOFF')
updated_server = self._update_server_name(server['id'], 'SHUTOFF')
self.assertNotIn('progress', updated_server)
开发者ID:nunogt,项目名称:tempest,代码行数:7,代码来源:test_servers.py
示例18: test_resize_server_revert_with_volume_attached
def test_resize_server_revert_with_volume_attached(self):
# Tests attaching a volume to a server instance and then resizing
# the instance. Once the instance is resized, revert the resize which
# should move the instance and volume attachment back to the original
# compute host.
# Create a blank volume and attach it to the server created in setUp.
volume = self.create_volume()
server = self.client.show_server(self.server_id)['server']
self.attach_volume(server, volume)
# Now resize the server with the blank volume attached.
self.client.resize_server(self.server_id, self.flavor_ref_alt)
# Explicitly delete the server to get a new one for later
# tests. Avoids resize down race issues.
self.addCleanup(self.delete_server, self.server_id)
waiters.wait_for_server_status(
self.client, self.server_id, 'VERIFY_RESIZE')
# Now revert the resize which should move the instance and it's volume
# attachment back to the original source compute host.
self.client.revert_resize_server(self.server_id)
waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
# Make sure everything still looks OK.
server = self.client.show_server(self.server_id)['server']
# The flavor id is not returned in the server response after
# microversion 2.46 so handle that gracefully.
if server['flavor'].get('id'):
self.assertEqual(self.flavor_ref, server['flavor']['id'])
attached_volumes = server['os-extended-volumes:volumes_attached']
self.assertEqual(1, len(attached_volumes))
self.assertEqual(volume['id'], attached_volumes[0]['id'])
开发者ID:masayukig,项目名称:tempest,代码行数:30,代码来源:test_server_actions.py
示例19: _wait_for_server
def _wait_for_server(self, server):
waiters.wait_for_server_status(self.manager.servers_client,
server['server']['id'],
constants.SERVER_STATUS_ACTIVE)
self.check_connectivity(server['fip']['floating_ip_address'],
CONF.validation.image_ssh_user,
self.keypair['private_key'])
开发者ID:SUSE-Cloud,项目名称:neutron,代码行数:7,代码来源:test_trunk.py
示例20: test_shelve_unshelve_server
def test_shelve_unshelve_server(self):
if CONF.image_feature_enabled.api_v2:
glance_client = self.os_primary.image_client_v2
elif CONF.image_feature_enabled.api_v1:
glance_client = self.os_primary.image_client
else:
raise lib_exc.InvalidConfiguration(
'Either api_v1 or api_v2 must be True in '
'[image-feature-enabled].')
compute.shelve_server(self.client, self.server_id,
force_shelve_offload=True)
def _unshelve_server():
server_info = self.client.show_server(self.server_id)['server']
if 'SHELVED' in server_info['status']:
self.client.unshelve_server(self.server_id)
self.addCleanup(_unshelve_server)
server = self.client.show_server(self.server_id)['server']
image_name = server['name'] + '-shelved'
params = {'name': image_name}
if CONF.image_feature_enabled.api_v2:
images = glance_client.list_images(params)['images']
elif CONF.image_feature_enabled.api_v1:
images = glance_client.list_images(
detail=True, **params)['images']
self.assertEqual(1, len(images))
self.assertEqual(image_name, images[0]['name'])
self.client.unshelve_server(self.server_id)
waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
glance_client.wait_for_resource_deletion(images[0]['id'])
开发者ID:masayukig,项目名称:tempest,代码行数:32,代码来源:test_server_actions.py
注:本文中的tempest.common.waiters.wait_for_server_status函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论