• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python ec2utils.ec2_inst_id_to_uuid函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中nova.api.ec2.ec2utils.ec2_inst_id_to_uuid函数的典型用法代码示例。如果您正苦于以下问题:Python ec2_inst_id_to_uuid函数的具体用法?Python ec2_inst_id_to_uuid怎么用?Python ec2_inst_id_to_uuid使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了ec2_inst_id_to_uuid函数的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_run_with_snapshot

    def test_run_with_snapshot(self):
        # Makes sure run/stop/start instance with snapshot works.
        availability_zone = "zone1:host1"
        vol1 = self.cloud.create_volume(self.context, size=1, availability_zone=availability_zone)

        snap1 = self.cloud.create_snapshot(
            self.context, vol1["volumeId"], name="snap-1", description="test snap of vol %s" % vol1["volumeId"]
        )
        snap1_uuid = ec2utils.ec2_snap_id_to_uuid(snap1["snapshotId"])

        snap2 = self.cloud.create_snapshot(
            self.context, vol1["volumeId"], name="snap-2", description="test snap of vol %s" % vol1["volumeId"]
        )
        snap2_uuid = ec2utils.ec2_snap_id_to_uuid(snap2["snapshotId"])

        kwargs = {
            "image_id": "ami-1",
            "instance_type": CONF.default_instance_type,
            "max_count": 1,
            "block_device_mapping": [
                {"device_name": "/dev/vdb", "snapshot_id": snap1_uuid, "delete_on_termination": False},
                {"device_name": "/dev/vdc", "snapshot_id": snap2_uuid, "delete_on_termination": True},
            ],
        }
        ec2_instance_id = self._run_instance(**kwargs)
        instance_uuid = ec2utils.ec2_inst_id_to_uuid(self.context, ec2_instance_id)

        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v["instance_uuid"] == instance_uuid]

        self.assertEqual(len(vols), 2)

        vol1_id = None
        vol2_id = None
        for vol in vols:
            snapshot_uuid = vol["snapshot_id"]
            if snapshot_uuid == snap1_uuid:
                vol1_id = vol["id"]
                mountpoint = "/dev/vdb"
            elif snapshot_uuid == snap2_uuid:
                vol2_id = vol["id"]
                mountpoint = "/dev/vdc"
            else:
                self.fail()

            self._assert_volume_attached(vol, instance_uuid, mountpoint)

        # Just make sure we found them
        self.assertTrue(vol1_id)
        self.assertTrue(vol2_id)

        self.cloud.terminate_instances(self.context, [ec2_instance_id])

        admin_ctxt = context.get_admin_context(read_deleted="no")
        vol = self.volume_api.get(admin_ctxt, vol1_id)
        self._assert_volume_detached(vol)
        self.assertFalse(vol["deleted"])
开发者ID:comstud,项目名称:nova,代码行数:57,代码来源:test_cinder_cloud.py


示例2: test_run_with_snapshot

    def test_run_with_snapshot(self):
        """Makes sure run/stop/start instance with snapshot works."""
        availability_zone = 'zone1:host1'
        vol1 = self.cloud.create_volume(self.context,
                                          size=1,
                                          availability_zone=availability_zone)

        snap1 = self.cloud.create_snapshot(self.context,
                                           vol1['volumeId'],
                                           name='snap-1',
                                           description='test snap of vol %s' %
                                               vol1['volumeId'])
        snap1_uuid = ec2utils.ec2_snap_id_to_uuid(snap1['snapshotId'])

        snap2 = self.cloud.create_snapshot(self.context,
                                           vol1['volumeId'],
                                           name='snap-2',
                                           description='test snap of vol %s' %
                                               vol1['volumeId'])
        snap2_uuid = ec2utils.ec2_snap_id_to_uuid(snap2['snapshotId'])

        kwargs = {'image_id': 'ami-1',
                  'instance_type': CONF.default_instance_type,
                  'max_count': 1,
                  'block_device_mapping': [{'device_name': '/dev/vdb',
                                            'snapshot_id': snap1_uuid,
                                            'delete_on_termination': False, },
                                           {'device_name': '/dev/vdc',
                                            'snapshot_id': snap2_uuid,
                                            'delete_on_termination': True}]}
        ec2_instance_id = self._run_instance(**kwargs)
        instance_uuid = ec2utils.ec2_inst_id_to_uuid(self.context,
                                                     ec2_instance_id)

        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v['instance_uuid'] == instance_uuid]

        self.assertEqual(len(vols), 2)

        vol1_id = None
        vol2_id = None
        for vol in vols:
            snapshot_uuid = vol['snapshot_id']
            if snapshot_uuid == snap1_uuid:
                vol1_id = vol['id']
                mountpoint = '/dev/vdb'
            elif snapshot_uuid == snap2_uuid:
                vol2_id = vol['id']
                mountpoint = '/dev/vdc'
            else:
                self.fail()

            self._assert_volume_attached(vol, instance_uuid, mountpoint)

        #Just make sure we found them
        self.assertTrue(vol1_id)
        self.assertTrue(vol2_id)

        self.cloud.terminate_instances(self.context, [ec2_instance_id])

        admin_ctxt = context.get_admin_context(read_deleted="no")
        vol = self.volume_api.get(admin_ctxt, vol1_id)
        self._assert_volume_detached(vol)
        self.assertFalse(vol['deleted'])
开发者ID:Karamax,项目名称:nova,代码行数:64,代码来源:test_cinder_cloud.py


示例3: test_stop_with_attached_volume

    def test_stop_with_attached_volume(self):
        """Make sure attach info is reflected to block device mapping"""

        availability_zone = 'zone1:host1'
        vol1 = self.cloud.create_volume(self.context,
                                          size=1,
                                          availability_zone=availability_zone)
        vol2 = self.cloud.create_volume(self.context,
                                          size=1,
                                          availability_zone=availability_zone)
        vol1_uuid = ec2utils.ec2_vol_id_to_uuid(vol1['volumeId'])
        vol2_uuid = ec2utils.ec2_vol_id_to_uuid(vol2['volumeId'])

        # enforce periodic tasks run in short time to avoid wait for 60s.
        self._restart_compute_service(periodic_interval=0.3)
        kwargs = {'image_id': 'ami-1',
                  'instance_type': CONF.default_instance_type,
                  'max_count': 1,
                  'block_device_mapping': [{'device_name': '/dev/sdb',
                                            'volume_id': vol1_uuid,
                                            'delete_on_termination': True}]}
        ec2_instance_id = self._run_instance(**kwargs)
        instance_uuid = ec2utils.ec2_inst_id_to_uuid(self.context,
                                                     ec2_instance_id)

        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v['instance_uuid'] == instance_uuid]
        self.assertEqual(len(vols), 1)
        for vol in vols:
            self.assertEqual(vol['id'], vol1_uuid)
            self._assert_volume_attached(vol, instance_uuid, '/dev/sdb')
        vol = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_detached(vol)

        instance = db.instance_get_by_uuid(self.context, instance_uuid)
        self.cloud.compute_api.attach_volume(self.context,
                                             instance,
                                             volume_id=vol2_uuid,
                                             device='/dev/sdc')

        vol1 = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_attached(vol1, instance_uuid, '/dev/sdb')

        vol2 = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_attached(vol2, instance_uuid, '/dev/sdc')

        self.cloud.compute_api.detach_volume(self.context,
                                             volume_id=vol1_uuid)

        vol1 = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_detached(vol1)

        result = self.cloud.stop_instances(self.context, [ec2_instance_id])
        self.assertTrue(result)

        vol2 = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_attached(vol2, instance_uuid, '/dev/sdc')

        self.cloud.start_instances(self.context, [ec2_instance_id])
        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v['instance_uuid'] == instance_uuid]
        self.assertEqual(len(vols), 1)

        self._assert_volume_detached(vol1)

        vol1 = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_detached(vol1)

        self.cloud.terminate_instances(self.context, [ec2_instance_id])
开发者ID:Karamax,项目名称:nova,代码行数:69,代码来源:test_cinder_cloud.py


示例4: test_stop_start_with_volume

    def test_stop_start_with_volume(self):
        """Make sure run instance with block device mapping works"""
        availability_zone = 'zone1:host1'
        vol1 = self.cloud.create_volume(self.context,
                                          size=1,
                                          availability_zone=availability_zone)
        vol2 = self.cloud.create_volume(self.context,
                                          size=1,
                                          availability_zone=availability_zone)
        vol1_uuid = ec2utils.ec2_vol_id_to_uuid(vol1['volumeId'])
        vol2_uuid = ec2utils.ec2_vol_id_to_uuid(vol2['volumeId'])
        # enforce periodic tasks run in short time to avoid wait for 60s.
        self._restart_compute_service(periodic_interval=0.3)

        kwargs = {'image_id': 'ami-1',
                  'instance_type': CONF.default_instance_type,
                  'max_count': 1,
                  'block_device_mapping': [{'device_name': '/dev/sdb',
                                            'volume_id': vol1_uuid,
                                            'delete_on_termination': False},
                                           {'device_name': '/dev/sdc',
                                            'volume_id': vol2_uuid,
                                            'delete_on_termination': True},
                                           ]}
        ec2_instance_id = self._run_instance(**kwargs)
        instance_uuid = ec2utils.ec2_inst_id_to_uuid(self.context,
                                                     ec2_instance_id)
        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v['instance_uuid'] == instance_uuid]

        self.assertEqual(len(vols), 2)
        for vol in vols:
            self.assertTrue(str(vol['id']) == str(vol1_uuid) or
                str(vol['id']) == str(vol2_uuid))
            if(str(vol['id']) == str(vol1_uuid)):
                self.volume_api.attach(self.context, vol,
                                       instance_uuid, '/dev/sdb')
            elif(str(vol['id']) == str(vol2_uuid)):
                self.volume_api.attach(self.context, vol,
                                       instance_uuid, '/dev/sdc')

        vol = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_attached(vol, instance_uuid, '/dev/sdb')

        vol = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_attached(vol, instance_uuid, '/dev/sdc')

        result = self.cloud.stop_instances(self.context, [ec2_instance_id])
        self.assertTrue(result)

        vol = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_attached(vol, instance_uuid, '/dev/sdb')

        vol = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_attached(vol, instance_uuid, '/dev/sdb')

        vol = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_attached(vol, instance_uuid, '/dev/sdc')

        self.cloud.start_instances(self.context, [ec2_instance_id])
        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v['instance_uuid'] == instance_uuid]
        self.assertEqual(len(vols), 2)
        for vol in vols:
            self.assertTrue(str(vol['id']) == str(vol1_uuid) or
                            str(vol['id']) == str(vol2_uuid))
            self.assertTrue(vol['mountpoint'] == '/dev/sdb' or
                            vol['mountpoint'] == '/dev/sdc')
            self.assertEqual(vol['instance_uuid'], instance_uuid)
            self.assertEqual(vol['status'], "in-use")
            self.assertEqual(vol['attach_status'], "attached")

        #Here we puke...
        self.cloud.terminate_instances(self.context, [ec2_instance_id])

        admin_ctxt = context.get_admin_context(read_deleted="no")
        vol = self.volume_api.get(admin_ctxt, vol2_uuid)
        self.assertFalse(vol['deleted'])
        self.cloud.delete_volume(self.context, vol1['volumeId'])
        self._restart_compute_service()
开发者ID:Karamax,项目名称:nova,代码行数:80,代码来源:test_cinder_cloud.py


示例5: test_stop_with_attached_volume

    def test_stop_with_attached_volume(self):
        # Make sure attach info is reflected to block device mapping.

        availability_zone = "zone1:host1"
        vol1 = self.cloud.create_volume(self.context, size=1, availability_zone=availability_zone)
        vol2 = self.cloud.create_volume(self.context, size=1, availability_zone=availability_zone)
        vol1_uuid = ec2utils.ec2_vol_id_to_uuid(vol1["volumeId"])
        vol2_uuid = ec2utils.ec2_vol_id_to_uuid(vol2["volumeId"])

        # enforce periodic tasks run in short time to avoid wait for 60s.
        self._restart_compute_service(periodic_interval_max=0.3)
        kwargs = {
            "image_id": "ami-1",
            "instance_type": CONF.default_instance_type,
            "max_count": 1,
            "block_device_mapping": [
                {"device_name": "/dev/sdb", "volume_id": vol1_uuid, "delete_on_termination": True}
            ],
        }
        ec2_instance_id = self._run_instance(**kwargs)
        instance_uuid = ec2utils.ec2_inst_id_to_uuid(self.context, ec2_instance_id)

        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v["instance_uuid"] == instance_uuid]
        self.assertEqual(len(vols), 1)
        for vol in vols:
            self.assertEqual(vol["id"], vol1_uuid)
            self._assert_volume_attached(vol, instance_uuid, "/dev/sdb")
        vol = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_detached(vol)

        instance = db.instance_get_by_uuid(self.context, instance_uuid)
        self.cloud.compute_api.attach_volume(self.context, instance, volume_id=vol2_uuid, device="/dev/sdc")

        vol1 = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_attached(vol1, instance_uuid, "/dev/sdb")

        vol2 = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_attached(vol2, instance_uuid, "/dev/sdc")

        self.cloud.compute_api.detach_volume(self.context, instance, vol1)

        vol1 = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_detached(vol1)

        result = self.cloud.stop_instances(self.context, [ec2_instance_id])
        self.assertTrue(result)

        vol2 = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_attached(vol2, instance_uuid, "/dev/sdc")

        self.cloud.start_instances(self.context, [ec2_instance_id])
        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v["instance_uuid"] == instance_uuid]
        self.assertEqual(len(vols), 1)

        self._assert_volume_detached(vol1)

        vol1 = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_detached(vol1)

        self.cloud.terminate_instances(self.context, [ec2_instance_id])
开发者ID:comstud,项目名称:nova,代码行数:62,代码来源:test_cinder_cloud.py


示例6: test_stop_start_with_volume

    def test_stop_start_with_volume(self):
        # Make sure run instance with block device mapping works.
        availability_zone = "zone1:host1"
        vol1 = self.cloud.create_volume(self.context, size=1, availability_zone=availability_zone)
        vol2 = self.cloud.create_volume(self.context, size=1, availability_zone=availability_zone)
        vol1_uuid = ec2utils.ec2_vol_id_to_uuid(vol1["volumeId"])
        vol2_uuid = ec2utils.ec2_vol_id_to_uuid(vol2["volumeId"])
        # enforce periodic tasks run in short time to avoid wait for 60s.
        self._restart_compute_service(periodic_interval_max=0.3)

        kwargs = {
            "image_id": "ami-1",
            "instance_type": CONF.default_instance_type,
            "max_count": 1,
            "block_device_mapping": [
                {"device_name": "/dev/sdb", "volume_id": vol1_uuid, "delete_on_termination": False},
                {"device_name": "/dev/sdc", "volume_id": vol2_uuid, "delete_on_termination": True},
            ],
        }
        ec2_instance_id = self._run_instance(**kwargs)
        instance_uuid = ec2utils.ec2_inst_id_to_uuid(self.context, ec2_instance_id)
        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v["instance_uuid"] == instance_uuid]

        self.assertEqual(len(vols), 2)
        for vol in vols:
            self.assertTrue(str(vol["id"]) == str(vol1_uuid) or str(vol["id"]) == str(vol2_uuid))
            if str(vol["id"]) == str(vol1_uuid):
                self.volume_api.attach(self.context, vol, instance_uuid, "/dev/sdb")
            elif str(vol["id"]) == str(vol2_uuid):
                self.volume_api.attach(self.context, vol, instance_uuid, "/dev/sdc")

        vol = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_attached(vol, instance_uuid, "/dev/sdb")

        vol = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_attached(vol, instance_uuid, "/dev/sdc")

        result = self.cloud.stop_instances(self.context, [ec2_instance_id])
        self.assertTrue(result)

        vol = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_attached(vol, instance_uuid, "/dev/sdb")

        vol = self.volume_api.get(self.context, vol1_uuid)
        self._assert_volume_attached(vol, instance_uuid, "/dev/sdb")

        vol = self.volume_api.get(self.context, vol2_uuid)
        self._assert_volume_attached(vol, instance_uuid, "/dev/sdc")

        self.cloud.start_instances(self.context, [ec2_instance_id])
        vols = self.volume_api.get_all(self.context)
        vols = [v for v in vols if v["instance_uuid"] == instance_uuid]
        self.assertEqual(len(vols), 2)
        for vol in vols:
            self.assertTrue(str(vol["id"]) == str(vol1_uuid) or str(vol["id"]) == str(vol2_uuid))
            self.assertTrue(vol["mountpoint"] == "/dev/sdb" or vol["mountpoint"] == "/dev/sdc")
            self.assertEqual(vol["instance_uuid"], instance_uuid)
            self.assertEqual(vol["status"], "in-use")
            self.assertEqual(vol["attach_status"], "attached")

        # Here we puke...
        self.cloud.terminate_instances(self.context, [ec2_instance_id])

        admin_ctxt = context.get_admin_context(read_deleted="no")
        vol = self.volume_api.get(admin_ctxt, vol2_uuid)
        self.assertFalse(vol["deleted"])
        self.cloud.delete_volume(self.context, vol1["volumeId"])
        self._restart_compute_service()
开发者ID:comstud,项目名称:nova,代码行数:69,代码来源:test_cinder_cloud.py



注:本文中的nova.api.ec2.ec2utils.ec2_inst_id_to_uuid函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python ec2utils.ec2_vol_id_to_uuid函数代码示例发布时间:2022-05-27
下一篇:
Python ec2utils.ec2_id_to_id函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap