• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python waiters.wait_for_server_status函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tempest.common.waiters.wait_for_server_status函数的典型用法代码示例。如果您正苦于以下问题:Python wait_for_server_status函数的具体用法?Python wait_for_server_status怎么用?Python wait_for_server_status使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了wait_for_server_status函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _test_live_migration

    def _test_live_migration(self, state='ACTIVE', volume_backed=False):
        """Tests live migration between two hosts.

        Requires CONF.compute_feature_enabled.live_migration to be True.

        :param state: The vm_state the migrated server should be in before and
                      after the live migration. Supported values are 'ACTIVE'
                      and 'PAUSED'.
        :param volume_backed: If the instance is volume backed or not. If
                              volume_backed, *block* migration is not used.
        """
        # Live migrate an instance to another host
        server_id = self.create_test_server(wait_until="ACTIVE",
                                            volume_backed=volume_backed)['id']
        source_host = self.get_host_for_server(server_id)
        destination_host = self.get_host_other_than(server_id)

        if state == 'PAUSED':
            self.admin_servers_client.pause_server(server_id)
            waiters.wait_for_server_status(self.admin_servers_client,
                                           server_id, state)

        LOG.info("Live migrate from source %s to destination %s",
                 source_host, destination_host)
        self._live_migrate(server_id, destination_host, state, volume_backed)
        if CONF.compute_feature_enabled.live_migrate_back_and_forth:
            # If live_migrate_back_and_forth is enabled it is a grenade job.
            # Therefore test should validate whether LM is compatible in both
            # ways, so live migrate VM back to the source host
            LOG.info("Live migrate back to source %s", source_host)
            self._live_migrate(server_id, source_host, state, volume_backed)
开发者ID:masayukig,项目名称:tempest,代码行数:31,代码来源:test_live_migration.py


示例2: _stop_instances

 def _stop_instances(self, instances):
     # NOTE(gfidente): two loops so we do not wait for the status twice
     for i in instances:
         self.servers_client.stop(i['id'])
     for i in instances:
         waiters.wait_for_server_status(self.servers_client,
                                        i['id'], 'SHUTOFF')
开发者ID:Juraci,项目名称:tempest,代码行数:7,代码来源:test_volume_boot_pattern.py


示例3: test_rebuild_server_in_error_state

    def test_rebuild_server_in_error_state(self):
        # The server in error state should be rebuilt using the provided
        # image and changed to ACTIVE state

        # resetting vm state require admin privilege
        self.client.reset_state(self.s1_id, state='error')
        rebuilt_server = self.non_admin_client.rebuild_server(
            self.s1_id, self.image_ref_alt)['server']
        self.addCleanup(waiters.wait_for_server_status, self.non_admin_client,
                        self.s1_id, 'ACTIVE')
        self.addCleanup(self.non_admin_client.rebuild_server, self.s1_id,
                        self.image_ref)

        # Verify the properties in the initial response are correct
        self.assertEqual(self.s1_id, rebuilt_server['id'])
        rebuilt_image_id = rebuilt_server['image']['id']
        self.assertEqual(self.image_ref_alt, rebuilt_image_id)
        self.assertEqual(self.flavor_ref, rebuilt_server['flavor']['id'])
        waiters.wait_for_server_status(self.non_admin_client,
                                       rebuilt_server['id'], 'ACTIVE',
                                       raise_on_error=False)
        # Verify the server properties after rebuilding
        server = (self.non_admin_client.show_server(rebuilt_server['id'])
                  ['server'])
        rebuilt_image_id = server['image']['id']
        self.assertEqual(self.image_ref_alt, rebuilt_image_id)
开发者ID:varapreddy,项目名称:tempest,代码行数:26,代码来源:test_servers.py


示例4: setUp

 def setUp(self):
     super(ServersNegativeTestMultiTenantJSON, self).setUp()
     try:
         waiters.wait_for_server_status(self.client, self.server_id,
                                        'ACTIVE')
     except Exception:
         self.__class__.server_id = self.rebuild_server(self.server_id)
开发者ID:NeCTAR-RC,项目名称:tempest,代码行数:7,代码来源:test_servers_negative.py


示例5: test_delete_server_while_in_pause_state

 def test_delete_server_while_in_pause_state(self):
     # Delete a server while it's VM state is Pause
     server = self.create_test_server(wait_until='ACTIVE')
     self.client.pause_server(server['id'])
     waiters.wait_for_server_status(self.client, server['id'], 'PAUSED')
     self.client.delete_server(server['id'])
     waiters.wait_for_server_termination(self.client, server['id'])
开发者ID:WoolenWang,项目名称:tempest,代码行数:7,代码来源:test_delete_server.py


示例6: _test_live_migration

    def _test_live_migration(self, state='ACTIVE', volume_backed=False):
        """Tests live migration between two hosts.

        Requires CONF.compute_feature_enabled.live_migration to be True.

        :param state: The vm_state the migrated server should be in before and
                      after the live migration. Supported values are 'ACTIVE'
                      and 'PAUSED'.
        :param volume_backed: If the instance is volume backed or not. If
                              volume_backed, *block* migration is not used.
        """
        # Live migrate an instance to another host
        server_id = self.create_test_server(wait_until="ACTIVE",
                                            volume_backed=volume_backed)['id']
        actual_host = self._get_host_for_server(server_id)
        target_host = self._get_host_other_than(actual_host)

        if state == 'PAUSED':
            self.admin_servers_client.pause_server(server_id)
            waiters.wait_for_server_status(self.admin_servers_client,
                                           server_id, state)

        self._migrate_server_to(server_id, target_host, volume_backed)
        waiters.wait_for_server_status(self.servers_client, server_id, state)
        migration_list = (self.admin_migration_client.list_migrations()
                          ['migrations'])

        msg = ("Live Migration failed. Migrations list for Instance "
               "%s: [" % server_id)
        for live_migration in migration_list:
            if (live_migration['instance_uuid'] == server_id):
                msg += "\n%s" % live_migration
        msg += "]"
        self.assertEqual(target_host, self._get_host_for_server(server_id),
                         msg)
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:35,代码来源:test_live_migration.py


示例7: _create_server

 def _create_server(self, name):
     keypair = self.create_keypair()
     security_groups = [{'name': self.security_group['name']}]
     create_kwargs = {
         'networks': [
             {'uuid': self.network['id']},
         ],
         'key_name': keypair['name'],
         'security_groups': security_groups,
         'name': name
     }
     net_name = self.network['name']
     server = self.create_server(**create_kwargs)
     waiters.wait_for_server_status(self.servers_client,
                                    server['id'], 'ACTIVE')
     server = self.servers_client.show_server(server['id'])
     server = server['server']
     self.servers_keypairs[server['id']] = keypair
     if (config.network.public_network_id and not
             config.network.project_networks_reachable):
         public_network_id = config.network.public_network_id
         floating_ip = self.create_floating_ip(
             server, public_network_id)
         self.floating_ips[floating_ip] = server
         self.server_ips[server['id']] = floating_ip.floating_ip_address
     else:
         self.server_ips[server['id']] =\
             server['addresses'][net_name][0]['addr']
     self.server_fixed_ips[server['id']] =\
         server['addresses'][net_name][0]['addr']
     self.assertTrue(self.servers_keypairs)
     return server
开发者ID:fnaval,项目名称:neutron-lbaas,代码行数:32,代码来源:base.py


示例8: _test_cold_migrate_server

    def _test_cold_migrate_server(self, revert=False):
        if CONF.compute.min_compute_nodes < 2:
            msg = "Less than 2 compute nodes, skipping multinode tests."
            raise self.skipException(msg)

        server = self.create_test_server(wait_until="ACTIVE")
        src_host = self.admin_servers_client.show_server(
            server['id'])['server']['OS-EXT-SRV-ATTR:host']

        self.admin_servers_client.migrate_server(server['id'])

        waiters.wait_for_server_status(self.servers_client,
                                       server['id'], 'VERIFY_RESIZE')

        if revert:
            self.servers_client.revert_resize_server(server['id'])
            assert_func = self.assertEqual
        else:
            self.servers_client.confirm_resize_server(server['id'])
            assert_func = self.assertNotEqual

        waiters.wait_for_server_status(self.servers_client,
                                       server['id'], 'ACTIVE')
        dst_host = self.admin_servers_client.show_server(
            server['id'])['server']['OS-EXT-SRV-ATTR:host']
        assert_func(src_host, dst_host)
开发者ID:Juniper,项目名称:tempest,代码行数:26,代码来源:test_migrations.py


示例9: boot_instance

    def boot_instance(self):
        self.instance = self.create_server(
            key_name=self.keypair['name'])

        self.wait_node(self.instance['id'])
        self.node = self.get_node(instance_id=self.instance['id'])

        self.wait_power_state(self.node['uuid'], BaremetalPowerStates.POWER_ON)

        self.wait_provisioning_state(
            self.node['uuid'],
            [BaremetalProvisionStates.DEPLOYWAIT,
             BaremetalProvisionStates.ACTIVE],
            timeout=CONF.baremetal.deploywait_timeout)

        self.wait_provisioning_state(self.node['uuid'],
                                     BaremetalProvisionStates.ACTIVE,
                                     timeout=CONF.baremetal.active_timeout,
                                     interval=30)

        waiters.wait_for_server_status(self.servers_client,
                                       self.instance['id'], 'ACTIVE')
        self.node = self.get_node(instance_id=self.instance['id'])
        self.instance = (self.servers_client.show_server(self.instance['id'])
                         ['server'])
开发者ID:SahilTikale,项目名称:ironic,代码行数:25,代码来源:baremetal_manager.py


示例10: _rebuild_server_and_check

 def _rebuild_server_and_check(self, image_ref):
     rebuilt_server = self.client.rebuild_server(self.server_id, image_ref)["server"]
     waiters.wait_for_server_status(self.client, self.server_id, "ACTIVE")
     msg = "Server was not rebuilt to the original image. " "The original image: {0}. The current image: {1}".format(
         image_ref, rebuilt_server["image"]["id"]
     )
     self.assertEqual(image_ref, rebuilt_server["image"]["id"], msg)
开发者ID:sebrandon1,项目名称:tempest,代码行数:7,代码来源:test_server_actions.py


示例11: _test_reboot_server

    def _test_reboot_server(self, reboot_type):
        if CONF.validation.run_validation:
            # Get the time the server was last rebooted,
            server = self.client.show_server(self.server_id)["server"]
            linux_client = remote_client.RemoteClient(
                self.get_server_ip(server),
                self.ssh_user,
                self.password,
                self.validation_resources["keypair"]["private_key"],
                server=server,
                servers_client=self.client,
            )
            boot_time = linux_client.get_boot_time()

            # NOTE: This sync is for avoiding the loss of pub key data
            # in a server
            linux_client.exec_command("sync")

        self.client.reboot_server(self.server_id, type=reboot_type)
        waiters.wait_for_server_status(self.client, self.server_id, "ACTIVE")

        if CONF.validation.run_validation:
            # Log in and verify the boot time has changed
            linux_client = remote_client.RemoteClient(
                self.get_server_ip(server),
                self.ssh_user,
                self.password,
                self.validation_resources["keypair"]["private_key"],
                server=server,
                servers_client=self.client,
            )
            new_boot_time = linux_client.get_boot_time()
            self.assertTrue(new_boot_time > boot_time, "%s > %s" % (new_boot_time, boot_time))
开发者ID:sebrandon1,项目名称:tempest,代码行数:33,代码来源:test_server_actions.py


示例12: _test_live_block_migration

    def _test_live_block_migration(self, state='ACTIVE'):
        """Tests live block migration between two hosts.

        Requires CONF.compute_feature_enabled.live_migration to be True.

        :param state: The vm_state the migrated server should be in before and
                      after the live migration. Supported values are 'ACTIVE'
                      and 'PAUSED'.
        """
        # Live block migrate an instance to another host
        if len(self._get_compute_hostnames()) < 2:
            raise self.skipTest(
                "Less than 2 compute nodes, skipping migration test.")
        server_id = self._get_an_active_server()
        actual_host = self._get_host_for_server(server_id)
        target_host = self._get_host_other_than(actual_host)

        if state == 'PAUSED':
            self.admin_servers_client.pause_server(server_id)
            waiters.wait_for_server_status(self.admin_servers_client,
                                           server_id, state)

        self._migrate_server_to(server_id, target_host)
        waiters.wait_for_server_status(self.servers_client, server_id, state)
        self.assertEqual(target_host, self._get_host_for_server(server_id))
开发者ID:nunogt,项目名称:tempest,代码行数:25,代码来源:test_live_migration.py


示例13: _shelve_then_unshelve_server

    def _shelve_then_unshelve_server(self, server):
        compute.shelve_server(self.servers_client, server['id'],
                              force_shelve_offload=True)

        self.servers_client.unshelve_server(server['id'])
        waiters.wait_for_server_status(self.servers_client, server['id'],
                                       'ACTIVE')
开发者ID:masayukig,项目名称:tempest,代码行数:7,代码来源:test_shelve_instance.py


示例14: _test_reboot_server

    def _test_reboot_server(self, reboot_type):
        if CONF.validation.run_validation:
            # Get the time the server was last rebooted,
            server = self.client.show_server(self.server_id)['server']
            linux_client = remote_client.RemoteClient(
                self.get_server_ip(server),
                self.ssh_user,
                self.password,
                self.validation_resources['keypair']['private_key'],
                server=server,
                servers_client=self.client)
            boot_time = linux_client.get_boot_time()

        self.client.reboot_server(self.server_id, type=reboot_type)
        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')

        if CONF.validation.run_validation:
            # Log in and verify the boot time has changed
            linux_client = remote_client.RemoteClient(
                self.get_server_ip(server),
                self.ssh_user,
                self.password,
                self.validation_resources['keypair']['private_key'],
                server=server,
                servers_client=self.client)
            new_boot_time = linux_client.get_boot_time()
            self.assertTrue(new_boot_time > boot_time,
                            '%s > %s' % (new_boot_time, boot_time))
开发者ID:devfaz,项目名称:tempest,代码行数:28,代码来源:test_server_actions.py


示例15: test_associate_already_associated_floating_ip

    def test_associate_already_associated_floating_ip(self):
        # positive test:Association of an already associated floating IP
        # to specific server should change the association of the Floating IP
        # Create server so as to use for Multiple association
        new_name = data_utils.rand_name('floating_server')
        body = self.create_test_server(name=new_name)
        waiters.wait_for_server_status(self.servers_client,
                                       body['id'], 'ACTIVE')
        self.new_server_id = body['id']
        self.addCleanup(self.servers_client.delete_server, self.new_server_id)

        # Associating floating IP for the first time
        self.client.associate_floating_ip_to_server(
            self.floating_ip,
            self.server_id)
        # Associating floating IP for the second time
        self.client.associate_floating_ip_to_server(
            self.floating_ip,
            self.new_server_id)

        self.addCleanup(self.client.disassociate_floating_ip_from_server,
                        self.floating_ip,
                        self.new_server_id)

        # Make sure no longer associated with old server
        self.assertRaises((lib_exc.NotFound,
                           lib_exc.UnprocessableEntity,
                           lib_exc.Conflict),
                          self.client.disassociate_floating_ip_from_server,
                          self.floating_ip, self.server_id)
开发者ID:Hybrid-Cloud,项目名称:tempest,代码行数:30,代码来源:test_floating_ips_actions.py


示例16: _test_reboot_server

    def _test_reboot_server(self, reboot_type):
        if CONF.validation.run_validation:
            validation_resources = self.get_class_validation_resources(
                self.os_primary)
            # Get the time the server was last rebooted,
            server = self.client.show_server(self.server_id)['server']
            linux_client = remote_client.RemoteClient(
                self.get_server_ip(server, validation_resources),
                self.ssh_user,
                self.password,
                validation_resources['keypair']['private_key'],
                server=server,
                servers_client=self.client)
            boot_time = linux_client.get_boot_time()

            # NOTE: This sync is for avoiding the loss of pub key data
            # in a server
            linux_client.exec_command("sync")

        self.client.reboot_server(self.server_id, type=reboot_type)
        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')

        if CONF.validation.run_validation:
            # Log in and verify the boot time has changed
            linux_client = remote_client.RemoteClient(
                self.get_server_ip(server, validation_resources),
                self.ssh_user,
                self.password,
                validation_resources['keypair']['private_key'],
                server=server,
                servers_client=self.client)
            new_boot_time = linux_client.get_boot_time()
            self.assertGreater(new_boot_time, boot_time,
                               '%s > %s' % (new_boot_time, boot_time))
开发者ID:masayukig,项目名称:tempest,代码行数:34,代码来源:test_server_actions.py


示例17: test_update_server_name_in_stop_state

 def test_update_server_name_in_stop_state(self):
     # The server name should be changed to the the provided value
     server = self.create_test_server(wait_until='ACTIVE')
     self.client.stop(server['id'])
     waiters.wait_for_server_status(self.client, server['id'], 'SHUTOFF')
     updated_server = self._update_server_name(server['id'], 'SHUTOFF')
     self.assertNotIn('progress', updated_server)
开发者ID:nunogt,项目名称:tempest,代码行数:7,代码来源:test_servers.py


示例18: test_resize_server_revert_with_volume_attached

    def test_resize_server_revert_with_volume_attached(self):
        # Tests attaching a volume to a server instance and then resizing
        # the instance. Once the instance is resized, revert the resize which
        # should move the instance and volume attachment back to the original
        # compute host.

        # Create a blank volume and attach it to the server created in setUp.
        volume = self.create_volume()
        server = self.client.show_server(self.server_id)['server']
        self.attach_volume(server, volume)
        # Now resize the server with the blank volume attached.
        self.client.resize_server(self.server_id, self.flavor_ref_alt)
        # Explicitly delete the server to get a new one for later
        # tests. Avoids resize down race issues.
        self.addCleanup(self.delete_server, self.server_id)
        waiters.wait_for_server_status(
            self.client, self.server_id, 'VERIFY_RESIZE')
        # Now revert the resize which should move the instance and it's volume
        # attachment back to the original source compute host.
        self.client.revert_resize_server(self.server_id)
        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
        # Make sure everything still looks OK.
        server = self.client.show_server(self.server_id)['server']
        # The flavor id is not returned in the server response after
        # microversion 2.46 so handle that gracefully.
        if server['flavor'].get('id'):
            self.assertEqual(self.flavor_ref, server['flavor']['id'])
        attached_volumes = server['os-extended-volumes:volumes_attached']
        self.assertEqual(1, len(attached_volumes))
        self.assertEqual(volume['id'], attached_volumes[0]['id'])
开发者ID:masayukig,项目名称:tempest,代码行数:30,代码来源:test_server_actions.py


示例19: _wait_for_server

 def _wait_for_server(self, server):
     waiters.wait_for_server_status(self.manager.servers_client,
                                    server['server']['id'],
                                    constants.SERVER_STATUS_ACTIVE)
     self.check_connectivity(server['fip']['floating_ip_address'],
                             CONF.validation.image_ssh_user,
                             self.keypair['private_key'])
开发者ID:SUSE-Cloud,项目名称:neutron,代码行数:7,代码来源:test_trunk.py


示例20: test_shelve_unshelve_server

    def test_shelve_unshelve_server(self):
        if CONF.image_feature_enabled.api_v2:
            glance_client = self.os_primary.image_client_v2
        elif CONF.image_feature_enabled.api_v1:
            glance_client = self.os_primary.image_client
        else:
            raise lib_exc.InvalidConfiguration(
                'Either api_v1 or api_v2 must be True in '
                '[image-feature-enabled].')
        compute.shelve_server(self.client, self.server_id,
                              force_shelve_offload=True)

        def _unshelve_server():
            server_info = self.client.show_server(self.server_id)['server']
            if 'SHELVED' in server_info['status']:
                self.client.unshelve_server(self.server_id)
        self.addCleanup(_unshelve_server)

        server = self.client.show_server(self.server_id)['server']
        image_name = server['name'] + '-shelved'
        params = {'name': image_name}
        if CONF.image_feature_enabled.api_v2:
            images = glance_client.list_images(params)['images']
        elif CONF.image_feature_enabled.api_v1:
            images = glance_client.list_images(
                detail=True, **params)['images']
        self.assertEqual(1, len(images))
        self.assertEqual(image_name, images[0]['name'])

        self.client.unshelve_server(self.server_id)
        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
        glance_client.wait_for_resource_deletion(images[0]['id'])
开发者ID:masayukig,项目名称:tempest,代码行数:32,代码来源:test_server_actions.py



注:本文中的tempest.common.waiters.wait_for_server_status函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python waiters.wait_for_server_termination函数代码示例发布时间:2022-05-27
下一篇:
Python waiters.wait_for_image_status函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap