• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python wait.re_search_wait函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tempest.thirdparty.boto.utils.wait.re_search_wait函数的典型用法代码示例。如果您正苦于以下问题:Python re_search_wait函数的具体用法?Python re_search_wait怎么用?Python re_search_wait使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了re_search_wait函数的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: destroy_volume_wait

    def destroy_volume_wait(cls, volume):
        """Delete volume, tryies to detach first.
           Use just for teardown!
        """
        exc_num = 0
        snaps = volume.snapshots()
        if len(snaps):
            LOG.critical("%s Volume has %s snapshot(s)", volume.id,
                         map(snaps.id, snaps))

        # NOTE(afazekas): detaching/attching not valid EC2 status
        def _volume_state():
            volume.update(validate=True)
            try:
                if volume.status != "available":
                    volume.detach(force=True)
            except BaseException as exc:
                LOG.exception(exc)
                # exc_num += 1 "nonlocal" not in python2
            return volume.status

        try:
            re_search_wait(_volume_state, "available")  # not validates status
            LOG.info(_volume_state())
            volume.delete()
        except BaseException as exc:
            LOG.exception(exc)
            exc_num += 1
        if exc_num:
            raise exceptions.TearDownException(num=exc_num)
开发者ID:pradeepkumars,项目名称:tempest,代码行数:30,代码来源:test.py


示例2: destroy_volume_wait

    def destroy_volume_wait(cls, volume):
        """Delete volume, tries to detach first.
           Use just for teardown!
        """
        exc_num = 0
        snaps = volume.snapshots()
        if len(snaps):
            LOG.critical("%s Volume has %s snapshot(s)", volume.id,
                         map(snaps.id, snaps))

        # NOTE(afazekas): detaching/attaching not valid EC2 status
        def _volume_state():
            volume.update(validate=True)
            try:
                # NOTE(gmann): Make sure volume is attached.
                # Checking status as 'not "available"' is not enough to make
                # sure volume is attached as it can be in "error" state
                if volume.status == "in-use":
                    volume.detach(force=True)
            except BaseException:
                LOG.exception("Failed to detach volume %s" % volume)
                # exc_num += 1 "nonlocal" not in python2
            return volume.status

        try:
            wait.re_search_wait(_volume_state, "available")
            # not validates status
            LOG.info(_volume_state())
            volume.delete()
        except BaseException:
            LOG.exception("Failed to delete volume %s" % volume)
            exc_num += 1
        if exc_num:
            raise exceptions.TearDownException(num=exc_num)
开发者ID:rakeshmi,项目名称:tempest,代码行数:34,代码来源:test.py


示例3: destroy_reservation

    def destroy_reservation(cls, reservation):
        """Terminate instances in a reservation, just for teardown."""
        exc_num = 0

        def _instance_state():
            try:
                instance.update(validate=True)
            except ValueError:
                return "_GONE"
            except exception.EC2ResponseError as exc:
                if cls.ec2_error_code.\
                        client.InvalidInstanceID.NotFound.match(exc):
                    return "_GONE"
                # NOTE(afazekas): incorrect code,
                # but the resource must be destoreyd
                if exc.error_code == "InstanceNotFound":
                    return "_GONE"

            return instance.state

        for instance in reservation.instances:
            try:
                instance.terminate()
                re_search_wait(_instance_state, "_GONE")
            except BaseException as exc:
                LOG.exception(exc)
                exc_num += 1
        if exc_num:
            raise exceptions.TearDownException(num=exc_num)
开发者ID:pradeepkumars,项目名称:tempest,代码行数:29,代码来源:test.py


示例4: test_001_attach_volume

    def test_001_attach_volume(self):
        """Attach volume"""

        if self.ctx.ssh is None:
            raise self.skipException("Booting failed")

        self._start_test()

        # NOTE(apavlov): ec2-create-volume -z ZONE -s SIZE_GB
        zone = self.ctx.instance.placement
        volume = self.ec2_client.create_volume(self.volume_size, zone)
        self.addResourceCleanUp(self.destroy_volume_wait, volume)
        self.ctx.volume = volume
        # NOTE(apavlov): wait it (ec2-describe-volumes VOLUME)
        self.assertVolumeStatusWait(volume, "available")

        # NOTE(apavlov): ec2-attach-volume -d /dev/XXX -i INSTANCE VOLUME
        # and wait until it will be available
        self.ctx.part_lines = self.ctx.ssh.get_partitions().split('\n')
        volume.attach(self.ctx.instance.id, "/dev/" + self.volume_attach_name)

        # NOTE(apavlov): "attaching" invalid EC2 status #1074901
        self.assertVolumeStatusWait(self._volume_state, "in-use")
        boto_wait.re_search_wait(self._volume_state, "in-use")

        boto_wait.state_wait(self._part_state, 1)
        part_lines_new = self.ctx.ssh.get_partitions().split('\n')
        volume_name = utils.detect_new_volume(self.ctx.part_lines,
                                              part_lines_new)
        self.ctx.part_lines = part_lines_new

        self._end_test("Create and attach volume")

        self.ctx.ssh.exec_command("PATH=$PATH:/usr/sbin:/usr/bin "
            "&& sudo mkfs.ext3 /dev/" + volume_name)
        self.ctx.ssh.exec_command("sudo mkdir -m 777 /vol "
            "&& sudo mount /dev/" + volume_name + " /vol")
        self.ctx.volume_ready = True

        self._check_test()
开发者ID:JioCloudCompute,项目名称:ec2-api,代码行数:40,代码来源:test_ec2_volume_benchmark.py


示例5: test_005_detach_volume

    def test_005_detach_volume(self):
        """Detach volume"""

        if self.ctx.ssh is None:
            raise self.skipException("Booting failed")
        if not self.ctx.volume_ready:
            raise self.skipException("Volume preparation failed")

        self._start_test()

        self.ctx.ssh.exec_command("sudo umount /vol")

        self.ctx.volume.detach()

        # NOTE(apavlov): "detaching" invalid EC2 status #1074901
        self.assertVolumeStatusWait(self._volume_state, "available")
        boto_wait.re_search_wait(self._volume_state, "available")

        self._end_test("Detach volume")

        boto_wait.state_wait(self._part_state, -1)

        self._check_test()
开发者ID:JioCloudCompute,项目名称:ec2-api,代码行数:23,代码来源:test_ec2_volume_benchmark.py


示例6: test_integration_1

    def test_integration_1(self):
        # EC2 1. integration test (not strict)
        image_ami = self.ec2_client.get_image(self.images["ami"]["image_id"])
        sec_group_name = data_utils.rand_name("securitygroup-")
        group_desc = sec_group_name + " security group description "
        security_group = self.ec2_client.create_security_group(sec_group_name,
                                                               group_desc)
        self.addResourceCleanUp(self.destroy_security_group_wait,
                                security_group)
        self.assertTrue(
            self.ec2_client.authorize_security_group(
                sec_group_name,
                ip_protocol="icmp",
                cidr_ip="0.0.0.0/0",
                from_port=-1,
                to_port=-1))
        self.assertTrue(
            self.ec2_client.authorize_security_group(
                sec_group_name,
                ip_protocol="tcp",
                cidr_ip="0.0.0.0/0",
                from_port=22,
                to_port=22))
        reservation = image_ami.run(kernel_id=self.images["aki"]["image_id"],
                                    ramdisk_id=self.images["ari"]["image_id"],
                                    instance_type=self.instance_type,
                                    key_name=self.keypair_name,
                                    security_groups=(sec_group_name,))
        self.addResourceCleanUp(self.destroy_reservation,
                                reservation)
        volume = self.ec2_client.create_volume(1, self.zone)
        self.addResourceCleanUp(self.destroy_volume_wait, volume)
        instance = reservation.instances[0]
        LOG.info("state: %s", instance.state)
        if instance.state != "running":
            self.assertInstanceStateWait(instance, "running")

        address = self.ec2_client.allocate_address()
        rcuk_a = self.addResourceCleanUp(address.delete)
        self.assertTrue(address.associate(instance.id))

        rcuk_da = self.addResourceCleanUp(address.disassociate)
        # TODO(afazekas): ping test. dependecy/permission ?

        self.assertVolumeStatusWait(volume, "available")
        # NOTE(afazekas): it may be reports availble before it is available

        ssh = RemoteClient(address.public_ip,
                           CONF.compute.ssh_user,
                           pkey=self.keypair.material)
        text = data_utils.rand_name("Pattern text for console output -")
        resp = ssh.write_to_console(text)
        self.assertFalse(resp)

        def _output():
            output = instance.get_console_output()
            return output.output

        re_search_wait(_output, text)
        part_lines = ssh.get_partitions().split('\n')
        volume.attach(instance.id, "/dev/vdh")

        def _volume_state():
            volume.update(validate=True)
            return volume.status

        self.assertVolumeStatusWait(_volume_state, "in-use")
        re_search_wait(_volume_state, "in-use")

        # NOTE(afazekas):  Different Hypervisor backends names
        # differently the devices,
        # now we just test is the partition number increased/decrised

        def _part_state():
            current = ssh.get_partitions().split('\n')
            if current > part_lines:
                return 'INCREASE'
            if current < part_lines:
                return 'DECREASE'
            return 'EQUAL'

        state_wait(_part_state, 'INCREASE')
        part_lines = ssh.get_partitions().split('\n')

        # TODO(afazekas): Resource compare to the flavor settings

        volume.detach()

        self.assertVolumeStatusWait(_volume_state, "available")
        re_search_wait(_volume_state, "available")
        LOG.info("Volume %s state: %s", volume.id, volume.status)

        state_wait(_part_state, 'DECREASE')

        instance.stop()
        address.disassociate()
        self.assertAddressDissasociatedWait(address)
        self.cancelResourceCleanUp(rcuk_da)
        address.release()
        self.assertAddressReleasedWait(address)
#.........这里部分代码省略.........
开发者ID:LIS,项目名称:LIS-Tempest,代码行数:101,代码来源:test_ec2_instance_run.py


示例7: test_compute_with_volumes

    def test_compute_with_volumes(self):
        # EC2 1. integration test (not strict)
        image_ami = self.ec2_client.get_image(self.images["ami"]["image_id"])
        sec_group_name = data_utils.rand_name("securitygroup")
        group_desc = sec_group_name + " security group description "
        security_group = self.ec2_client.create_security_group(sec_group_name,
                                                               group_desc)
        self.addResourceCleanUp(self.destroy_security_group_wait,
                                security_group)
        self.assertTrue(
            self.ec2_client.authorize_security_group(
                sec_group_name,
                ip_protocol="icmp",
                cidr_ip="0.0.0.0/0",
                from_port=-1,
                to_port=-1))
        self.assertTrue(
            self.ec2_client.authorize_security_group(
                sec_group_name,
                ip_protocol="tcp",
                cidr_ip="0.0.0.0/0",
                from_port=22,
                to_port=22))
        reservation = image_ami.run(kernel_id=self.images["aki"]["image_id"],
                                    ramdisk_id=self.images["ari"]["image_id"],
                                    instance_type=self.instance_type,
                                    key_name=self.keypair_name,
                                    security_groups=(sec_group_name,))

        LOG.debug("Instance booted - state: %s",
                  reservation.instances[0].state)

        self.addResourceCleanUp(self.destroy_reservation,
                                reservation)
        volume = self.ec2_client.create_volume(CONF.volume.volume_size,
                                               self.zone)
        LOG.debug("Volume created - status: %s", volume.status)

        self.addResourceCleanUp(self.destroy_volume_wait, volume)
        instance = reservation.instances[0]
        if instance.state != "running":
            self.assertInstanceStateWait(instance, "running")
        LOG.debug("Instance now running - state: %s", instance.state)

        address = self.ec2_client.allocate_address()
        rcuk_a = self.addResourceCleanUp(address.delete)
        self.assertTrue(address.associate(instance.id))

        rcuk_da = self.addResourceCleanUp(address.disassociate)
        # TODO(afazekas): ping test. dependecy/permission ?

        self.assertVolumeStatusWait(volume, "available")
        # NOTE(afazekas): it may be reports available before it is available

        ssh = remote_client.RemoteClient(address.public_ip,
                                         CONF.compute.ssh_user,
                                         pkey=self.keypair.material)
        text = data_utils.rand_name("Pattern text for console output")
        try:
            resp = ssh.write_to_console(text)
        except Exception:
            if not CONF.compute_feature_enabled.console_output:
                LOG.debug('Console output not supported, cannot log')
            else:
                console_output = instance.get_console_output().output
                LOG.debug('Console output for %s\nbody=\n%s',
                          instance.id, console_output)
            raise

        self.assertFalse(resp)

        def _output():
            output = instance.get_console_output()
            return output.output

        wait.re_search_wait(_output, text)
        part_lines = ssh.get_partitions().split('\n')
        volume.attach(instance.id, "/dev/vdh")

        def _volume_state():
            """Return volume state realizing that 'in-use' is overloaded."""
            volume.update(validate=True)
            status = volume.status
            attached = volume.attach_data.status
            LOG.debug("Volume %s is in status: %s, attach_status: %s",
                      volume.id, status, attached)
            # Nova reports 'in-use' on 'attaching' volumes because we
            # have a single volume status, and EC2 has 2. Ensure that
            # if we aren't attached yet we return something other than
            # 'in-use'
            if status == 'in-use' and attached != 'attached':
                return 'attaching'
            else:
                return status

        wait.re_search_wait(_volume_state, "in-use")

        # NOTE(afazekas):  Different Hypervisor backends names
        # differently the devices,
        # now we just test is the partition number increased/decrised
#.........这里部分代码省略.........
开发者ID:NeetaP,项目名称:tempest,代码行数:101,代码来源:test_ec2_instance_run.py


示例8: _run_scenario

    def _run_scenario(self, scenario_func, snapshot=None):
        # NOTE(apavlov): ec2-run-instances --key KEYPAIR IMAGE
        reservation = self.ec2_client.run_instances(self.image_id,
            instance_type=self.instance_type,
            key_name=self.keypair.name,
            security_groups=(self.sec_group_name,))
        self.addResourceCleanUp(self.destroy_reservation, reservation)
        instance = reservation.instances[0]
        LOG.info("state: %s", instance.state)
        # NOTE(apavlov): wait until it runs (ec2-describe-instances INSTANCE)
        if instance.state != "running":
            self.assertInstanceStateWait(instance, "running")

        # NOTE(apavlov): ec2-create-volume -z ZONE -s SIZE_GB
        zone = instance.placement
        volume = self.ec2_client.create_volume(1, zone, snapshot=snapshot)
        self.addResourceCleanUp(self.destroy_volume_wait, volume)
        # NOTE(apavlov): wait it (ec2-describe-volumes VOLUME)
        self.assertVolumeStatusWait(volume, "available")

        ip_address = self._prepare_public_ip(instance)
        ssh = remote_client.RemoteClient(ip_address,
                                         self.ssh_user,
                                         pkey=self.keypair.material)

        # NOTE(apavlov): ec2-attach-volume -d /dev/XXX -i INSTANCE VOLUME
        # and wait until it will be available
        part_lines = ssh.get_partitions().split('\n')
        volume.attach(instance.id, "/dev/" + self.volume_attach_name)

        def _volume_state():
            volume.update(validate=True)
            return volume.status

        self.assertVolumeStatusWait(_volume_state, "in-use")
        boto_wait.re_search_wait(_volume_state, "in-use")

        def _part_state():
            current = ssh.get_partitions().split('\n')
            if len(current) > len(part_lines):
                return 1
            if len(current) < len(part_lines):
                return -1
            return 0

        boto_wait.state_wait(_part_state, 1)
        part_lines_new = ssh.get_partitions().split('\n')
        self.volume_name = utils.detect_new_volume(part_lines, part_lines_new)
        part_lines = part_lines_new

        self._correct_ns_if_needed(ssh)

        snapshot = scenario_func(ssh, volume.id)

        # NOTE(apavlov): stop this instance(imagine that it will be used)
        instance.stop()
        LOG.info("state: %s", instance.state)
        if instance.state != "stopped":
            self.assertInstanceStateWait(instance, "stopped")

        return snapshot
开发者ID:JioCloudCompute,项目名称:ec2-api,代码行数:61,代码来源:test_ec2_instance_mysql.py



注:本文中的tempest.thirdparty.boto.utils.wait.re_search_wait函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python wait.state_wait函数代码示例发布时间:2022-05-27
下一篇:
Python fake_http.fake_http_response函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap