• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python common.list_snapshots函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中marvin.lib.common.list_snapshots函数的典型用法代码示例。如果您正苦于以下问题:Python list_snapshots函数的具体用法?Python list_snapshots怎么用?Python list_snapshots使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了list_snapshots函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_04_snapshot_limit

    def test_04_snapshot_limit(self):
        """Test snapshot limit in snapshot policies
        """
        # Validate the following
        # 1. Perform hourly recurring snapshot on the root disk of VM and keep
        #    the maxsnapshots as 1
        # 2. listSnapshots should list the snapshot that was created
        #    snapshot folder in secondary storage should contain only one
        #    snapshot image(/secondary/snapshots/$accountid/$volumeid/)

        # Get the Root disk of VM
        volumes = list_volumes(self.apiclient, virtualmachineid=self.virtual_machine.id, type="ROOT", listall=True)
        self.assertEqual(isinstance(volumes, list), True, "Check list response returns a valid list")
        volume = volumes[0]

        # Create a snapshot policy
        recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume.id, self.services["recurring_snapshot"])
        self.cleanup.append(recurring_snapshot)

        snapshot_policy = list_snapshot_policy(self.apiclient, id=recurring_snapshot.id, volumeid=volume.id)
        self.assertEqual(isinstance(snapshot_policy, list), True, "Check list response returns a valid list")

        self.assertNotEqual(snapshot_policy, None, "Check if result exists in list item call")

        self.assertEqual(
            snapshot_policy[0].id, recurring_snapshot.id, "Check recurring snapshot id in list resources call"
        )
        self.assertEqual(
            snapshot_policy[0].maxsnaps,
            self.services["recurring_snapshot"]["maxsnaps"],
            "Check interval type in list resources call",
        )
        # Sleep for (maxsnaps+1) hours to verify
        # only maxsnaps snapshots are retained
        time.sleep((int(self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600)

        # Verify the snapshot was created or not
        snapshots = list_snapshots(
            self.apiclient,
            volumeid=volume.id,
            intervaltype=self.services["recurring_snapshot"]["intervaltype"],
            snapshottype="RECURRING",
            listall=True,
        )

        self.assertEqual(isinstance(snapshots, list), True, "Check list response returns a valid list")
        self.assertEqual(
            len(snapshots),
            self.services["recurring_snapshot"]["maxsnaps"],
            "Check maximum number of recurring snapshots retained",
        )
        snapshot = snapshots[0]
        # Sleep to ensure that snapshot is reflected in sec storage
        time.sleep(self.services["sleep"])
        self.assertTrue(is_snapshot_on_nfs(self.apiclient, self.dbclient, self.config, self.zone.id, snapshot.id))
        return
开发者ID:MissionCriticalCloudOldRepos,项目名称:cosmic-core,代码行数:56,代码来源:test_snapshot_limits.py


示例2: test_01_test_vm_volume_snapshot

    def test_01_test_vm_volume_snapshot(self):
        """
        @Desc: Test that Volume snapshot for root volume is allowed
        when VM snapshot is present for the VM
        @Steps:
        1: Deploy a VM and create a VM snapshot for VM
        2: Try to create snapshot for the root volume of the VM,
        It should not fail
        """

        # Creating Virtual Machine
        virtual_machine = VirtualMachine.create(
            self.apiclient,
            self.services["virtual_machine"],
            accountid=self.account.name,
            domainid=self.account.domainid,
            serviceofferingid=self.service_offering.id,
        )

        VmSnapshot.create(
            self.apiclient,
            virtual_machine.id,
        )

        volumes = Volume.list(self.apiclient,
                              virtualmachineid=virtual_machine.id,
                              type="ROOT",
                              listall=True)

        self.assertEqual(validateList(volumes)[0], PASS,
                "Failed to get root volume of the VM")

        snapshot = Snapshot.create(
            self.apiclient,
            volumes[0].id,
            account=self.account.name,
            domainid=self.account.domainid
        )
        self.debug("Snapshot created: ID - %s" % snapshot.id)
        snapshots = list_snapshots(
            self.apiclient,
            id=snapshot.id
        )
        self.assertEqual(
            validateList(snapshots)[0],
            PASS,
            "Invalid snapshot list"
        )
        self.assertEqual(
            snapshots[0].id,
            snapshot.id,
            "Check resource id in list resources call"
        )
        return
开发者ID:CIETstudents,项目名称:cloudstack,代码行数:54,代码来源:test_vm_snapshots.py


示例3: test_02_snapshot_data_disk

    def test_02_snapshot_data_disk(self):
        """Test Snapshot Data Disk
        """
        if self.hypervisor.lower() in ['hyperv']:
            self.skipTest("Snapshots feature is not supported on Hyper-V")

        volume = list_volumes(
            self.apiclient,
            virtualmachineid=self.virtual_machine_with_disk.id,
            type='DATADISK',
            listall=True
        )
        self.assertEqual(
            isinstance(volume, list),
            True,
            "Check list response returns a valid list"
        )

        self.debug("Creating a Snapshot from data volume: %s" % volume[0].id)
        snapshot = Snapshot.create(
            self.apiclient,
            volume[0].id,
            account=self.account.name,
            domainid=self.account.domainid
        )
        snapshots = list_snapshots(
            self.apiclient,
            id=snapshot.id
        )
        self.assertEqual(
            isinstance(snapshots, list),
            True,
            "Check list response returns a valid list"
        )
        self.assertNotEqual(
            snapshots,
            None,
            "Check if result exists in list item call"
        )
        self.assertEqual(
            snapshots[0].id,
            snapshot.id,
            "Check resource id in list resources call"
        )
        self.assertTrue(
            is_snapshot_on_nfs(
                self.apiclient,
                self.dbclient,
                self.config,
                self.zone.id,
                snapshot.id))
        return
开发者ID:vaddanak,项目名称:challenges,代码行数:52,代码来源:test_snapshots.py


示例4: test_13_move_across_subdomain_vm_snapshot

 def test_13_move_across_subdomain_vm_snapshot(self):
     """Test as domain admin, stop a VM from subdomain1 and attempt to move it to subdomain2
     """
     # Validate the following:
     # 1. deploy VM in sub subdomain1 with snapshot.
     # 3. assignVirtualMachine to subdomain2
     self.create_vm(self.sdomain_account_user1['account'], self.sdomain_account_user1['domain'], snapshot=True)
     self.virtual_machine.assign_virtual_machine(self.apiclient, self.sdomain_account_user2['account'].name ,self.sdomain_account_user2['domain'].id)
     snapshots = list_snapshots(self.apiclient,
                                id=self.snapshot.id)
     self.assertEqual(snapshots,
                      None,
                      "Snapshots stil present for a vm in domain")
开发者ID:K0zka,项目名称:cloudstack,代码行数:13,代码来源:test_assign_vm.py


示例5: test_05_snapshots_per_project

    def test_05_snapshots_per_project(self):
        """Test Snapshot limit per project
        """
        # Validate the following
        # 1. set max no of snapshots per project to 1.
        # 2. Create one snapshot in the project. Snapshot should be
        #    successfully created
        # 5. Try to create another snapshot in this project. It should give
        #    user an appropriate error and an alert should be generated.

        if self.hypervisor.lower() in ["hyperv"]:
            raise self.skipTest("Snapshots feature is not supported on Hyper-V")
        self.debug("Updating snapshot resource limits for project: %s" % self.project.id)
        # Set usage_vm=1 for Account 1
        update_resource_limit(self.apiclient, 3, max=1, projectid=self.project.id)  # Snapshot

        self.debug("Deploying VM for account: %s" % self.account.name)
        virtual_machine_1 = VirtualMachine.create(
            self.apiclient,
            self.services["server"],
            templateid=self.template.id,
            serviceofferingid=self.service_offering.id,
            projectid=self.project.id,
        )
        self.cleanup.append(virtual_machine_1)
        # Verify VM state
        self.assertEqual(virtual_machine_1.state, "Running", "Check VM state is Running or not")

        # Get the Root disk of VM
        volumes = list_volumes(
            self.apiclient, virtualmachineid=virtual_machine_1.id, projectid=self.project.id, type="ROOT"
        )
        self.assertEqual(isinstance(volumes, list), True, "Check for list volume response return valid data")

        self.debug("Creating snapshot from volume: %s" % volumes[0].id)
        # Create a snapshot from the ROOTDISK
        snapshot_1 = Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
        self.cleanup.append(snapshot_1)

        # list snapshots
        snapshots = list_snapshots(self.apiclient, projectid=self.project.id)

        self.debug("snapshots list: %s" % snapshots)

        self.assertEqual(validateList(snapshots)[0], PASS, "Snapshots list validation failed")
        self.assertEqual(len(snapshots), 1, "Snapshots list should have exactly one entity")

        # Exception should be raised for second snapshot
        with self.assertRaises(Exception):
            Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
        return
开发者ID:vaddanak,项目名称:challenges,代码行数:51,代码来源:test_project_limits.py


示例6: test_01_snapshot_data_disk

    def test_01_snapshot_data_disk(self):
        """Test Snapshot Data Disk
        """

        volume = list_volumes(
            self.apiclient,
            virtualmachineid=self.virtual_machine_with_disk.id,
            type='DATADISK',
            listall=True
        )
        self.assertEqual(
            isinstance(volume, list),
            True,
            "Check list response returns a valid list"
        )

        self.debug("Creating a Snapshot from data volume: %s" % volume[0].id)
        snapshot = Snapshot.create(
            self.apiclient,
            volume[0].id,
            account=self.account.name,
            domainid=self.account.domainid,
            asyncbackup=True
        )
        snapshots = list_snapshots(
            self.apiclient,
            id=snapshot.id
        )
        self.assertEqual(
            isinstance(snapshots, list),
            True,
            "Check list response returns a valid list"
        )
        self.assertNotEqual(
            snapshots,
            None,
            "Check if result exists in list item call"
        )
        self.assertEqual(
            snapshots[0].id,
            snapshot.id,
            "Check resource id in list resources call"
        )
        self.assertEqual(
            snapshot.state,
            "BackingUp",
            "Check resource state in list resources call"
        )
        return
开发者ID:Accelerite,项目名称:cloudstack,代码行数:49,代码来源:test_separate_backup_from_snapshot.py


示例7: get_Snapshots_For_Account

 def get_Snapshots_For_Account(self, account, domainid):
     try:
         snapshots = list_snapshots(
                                   self.apiclient,
                                   account=account,
                                   domainid=domainid,
                                   listall=True,
                                   key='type',
                                   value='manual'
                                   )
         self.debug("List Snapshots result : %s" % snapshots)
         self.assertEqual(
                          isinstance(snapshots, list),
                          True,
                          "List snapshots shall return a valid list"
                          )
         return snapshots
     except Exception as e:
         self.fail("Failed to fetch snapshots for account: %s - %s" %
                                                             (account, e))
开发者ID:K0zka,项目名称:cloudstack,代码行数:20,代码来源:test_snapshots_improvement.py


示例8: test_01_create__snapshot_new_resized_rootvolume_size

    def test_01_create__snapshot_new_resized_rootvolume_size(self):
        """Test create snapshot on resized root volume

        # Validate the following

        # 1. Deploy a VM without any disk offering (only root disk)
        # 2. Perform(resize)  of the root  volume
        # 3. Perform snapshot on resized volume
        """

        # deploy a vm

        try:
            if self.updateclone:

                    self.virtual_machine = VirtualMachine.create(
                        self.apiclient, self.services["virtual_machine"],
                        accountid=self.parentd_admin.name,
                        domainid=self.parent_domain.id,
                        serviceofferingid=self.services_offering_vmware.id,
                        mode=self.zone.networktype
                    )
            else:
                    self.virtual_machine = VirtualMachine.create(
                        self.apiclient, self.services["virtual_machine"],
                        accountid=self.parentd_admin.name,
                        domainid=self.parent_domain.id,
                        serviceofferingid=self.service_offering.id,
                        mode=self.zone.networktype
                    )

            # listVirtual machine
            list_vms = VirtualMachine.list(self.apiclient,
                                           id=self.virtual_machine.id)

            self.debug(
                "Verify listVirtualMachines response for virtual machine: %s" %
                self.virtual_machine.id
            )
            res = validateList(list_vms)
            self.assertNotEqual(res[2], INVALID_INPUT, "Invalid list response")

            vm = list_vms[0]
            self.assertEqual(
                vm.id,
                self.virtual_machine.id,
                "Virtual Machine ids do not match"
            )
            self.assertEqual(
                vm.name,
                self.virtual_machine.name,
                "Virtual Machine names do not match"
            )
            self.assertEqual(
                vm.state,
                "Running",
                msg="VM is not in Running state"
            )
            result = self.chk_volume_resize(self.apiclient, vm)
            if result:
                # get root vol from created vm, verify it is correct size
                list_volume_response = Volume.list(
                    self.apiclient,
                    virtualmachineid=
                    self.virtual_machine.id,
                    type='ROOT',
                    listall='True'
                )
                res = validateList(list_volume_response)
                self.assertNotEqual(res[2], INVALID_INPUT, "listVolumes returned invalid object in response")
                rootvolume = list_volume_response[0]
                self.debug("Creating a Snapshot from root  volume: "
                           "%s" % rootvolume.id)
                snapshot = Snapshot.create(
                    self.apiclient,
                    rootvolume.id,
                    account=self.parentd_admin.name,
                    domainid=self.parent_domain.id
                )
                snapshots = list_snapshots(
                    self.apiclient,
                    id=snapshot.id
                )
                res = validateList(snapshots)
                self.assertNotEqual(res[2], INVALID_INPUT, "Invalid list response")
                self.assertEqual(
                    snapshots[0].id,
                    snapshot.id,
                    "Check resource id in list resources call"
                )
            else:
                self.debug("Volume resize is failed")

        except Exception as e:
            raise Exception("Exception while performing"
                            "  the snapshot on resized root volume"
                            " test case: %s" % e)

        self.cleanup.append(self.virtual_machine)
        self.cleanup.append(snapshot)
#.........这里部分代码省略.........
开发者ID:Accelerite,项目名称:cloudstack,代码行数:101,代码来源:test_rootvolume_resize.py


示例9: test_02_host_maintenance_mode_with_activities


#.........这里部分代码省略.........
            self.services["lbrule"],
            ipaddressid=public_ip.ipaddress.id,
            accountid=self.account.name
        )
        self.debug("Created LB rule with ID: %s" % lb_rule.id)

        # Should be able to SSH VM
        try:
            self.debug("SSH into VM: %s" % virtual_machine.id)
            virtual_machine.get_ssh_client(
                ipaddress=public_ip.ipaddress.ipaddress)
        except Exception as e:
            self.fail("SSH Access failed for %s: %s" %
                      (virtual_machine.ipaddress, e)
                      )
        # Get the Root disk of VM
        volumes = list_volumes(
            self.apiclient,
            virtualmachineid=virtual_machine.id,
            type='ROOT',
            listall=True
        )
        volume = volumes[0]
        self.debug(
            "Root volume of VM(%s): %s" % (
                virtual_machine.name,
                volume.name
            ))
        # Create a snapshot from the ROOTDISK
        self.debug("Creating snapshot on ROOT volume: %s" % volume.name)
        snapshot = Snapshot.create(self.apiclient, volumes[0].id)
        self.debug("Snapshot created: ID - %s" % snapshot.id)

        snapshots = list_snapshots(
            self.apiclient,
            id=snapshot.id,
            listall=True
        )
        self.assertEqual(
            isinstance(snapshots, list),
            True,
            "Check list response returns a valid list"
        )
        self.assertNotEqual(
            snapshots,
            None,
            "Check if result exists in list snapshots call"
        )
        self.assertEqual(
            snapshots[0].id,
            snapshot.id,
            "Check snapshot id in list resources call"
        )

        # Generate template from the snapshot
        self.debug("Generating template from snapshot: %s" % snapshot.name)
        template = Template.create_from_snapshot(
            self.apiclient,
            snapshot,
            self.services["templates"]
        )
        self.debug("Created template from snapshot: %s" % template.id)

        templates = list_templates(
            self.apiclient,
            templatefilter=self.services["templates"]["templatefilter"],
开发者ID:CIETstudents,项目名称:cloudstack,代码行数:67,代码来源:test_high_availability.py


示例10: test_02_accountSnapshotClean

    def test_02_accountSnapshotClean(self):
        """Test snapshot cleanup after account deletion
        """
        # Validate the following
        # 1. listAccounts API should list out the newly created account
        # 2. listVirtualMachines() command should return the deployed VM.
        #    State of this VM should be "Running"
        # 3. a)listSnapshots should list the snapshot that was created.
        #    b)verify that secondary storage NFS share contains the reqd volume
        #      under /secondary/snapshots/$accountid/$volumeid/$snapshot_id
        # 4. a)listAccounts should not list account that is deleted
        #    b) snapshot image($snapshot_id) should be deleted from the
        #       /secondary/snapshots/$accountid/$volumeid/

        try:
            accounts = list_accounts(
                                 self.apiclient,
                                 id=self.account.id
                                 )
            self.assertEqual(
                            isinstance(accounts, list),
                            True,
                            "Check list response returns a valid list"
                        )
            self.assertNotEqual(
                             len(accounts),
                             0,
                             "Check list Accounts response"
                             )

            # Verify the snapshot was created or not
            snapshots = list_snapshots(
                                   self.apiclient,
                                   id=self.snapshot.id
                                   )
            self.assertEqual(
                            isinstance(snapshots, list),
                            True,
                            "Check list response returns a valid list"
                        )
            self.assertNotEqual(
                            snapshots,
                            None,
                            "No such snapshot %s found" % self.snapshot.id
                            )
            self.assertEqual(
                            snapshots[0].id,
                            self.snapshot.id,
                            "Check snapshot id in list resources call"
                        )

            self.assertTrue(is_snapshot_on_nfs(self.apiclient, self.dbclient, self.config, self.zone.id, self.snapshot.id),
                "Snapshot was not found on NFS")
        except Exception as e:
            self._cleanup.append(self.account)
            self.fail("Exception occured: %s" % e)

        self.debug("Deleting account: %s" % self.account.name)
        # Delete account
        self.account.delete(self.apiclient)

        # Wait for account cleanup interval
        wait_for_cleanup(self.apiclient, configs=["account.cleanup.interval"])

        with self.assertRaises(Exception):
            accounts = list_accounts(
                                 self.apiclient,
                                 id=self.account.id
                                 )

        self.assertFalse(is_snapshot_on_nfs(self.apiclient, self.dbclient, self.config, self.zone.id, self.snapshot.id),
                                            "Snapshot was still found on NFS after account gc")
        return
开发者ID:Accelerite,项目名称:cloudstack,代码行数:73,代码来源:test_snapshot_gc.py


示例11: test_01_volume_snapshot

    def test_01_volume_snapshot(self):
        """ Test Volume (root) Snapshot
        # 1. Deploy a VM on primary storage and .
        # 2. Take snapshot on root disk
        # 3. Verify the snapshot's entry in the "snapshots" table 
                and presence of the corresponding 
                snapshot on the Secondary Storage
        # 4. Create Template from the Snapshot and Deploy a 
                VM using the Template
        # 5. Log in to the VM from template and make verify 
                the contents of the ROOT disk matches with the snapshot.
        # 6. Delete Snapshot and Deploy a Linux VM from the 
             Template and verify the successful deployment of the VM.
        # 7. Create multiple snapshots on the same volume and 
                Check the integrity of all the snapshots by creating 
                a template from the snapshot and deploying a Vm from it 
                and delete one of the snapshots
        # 8. Verify that the original checksum matches with the checksum 
                of VM's created from remaning snapshots
        # 9. Make verify the contents of the ROOT disk 
                matches with the snapshot
        # 10.Verify that Snapshot of both DATA and ROOT volume should 
                succeed when snapshot of Data disk of a VM is taken 
                when snapshot of ROOT volume of VM is in progress
        # 11.Create snapshot of data disk and verify the original checksum 
                matches with the volume created from snapshot
        # 12.Verify that volume's state should not change when snapshot of 
                a DATA volume is taken that is attached to a VM
        # 13.Verify that volume's state should not change when snapshot of 
                a DATA volume is taken that is not attached to a VM
        # 14.Verify that create Snapshot with quiescevm=True should succeed
        # 15.revertSnapshot() to revert VM to a specified 
                Volume snapshot for root volume
        """

        # Step 1
        # Get ROOT Volume Id
        root_volumes_cluster_list = list_volumes(
            self.apiclient,
            virtualmachineid=self.vm_1.id,
            type='ROOT',
            listall=True
        )

        root_volume_cluster = root_volumes_cluster_list[0]

        disk_volumes_cluster_list = list_volumes(
            self.apiclient,
            virtualmachineid=self.vm_1.id,
            type='DATADISK',
            listall=True
        )

        data_disk = disk_volumes_cluster_list[0]

        root_vol_state = root_volume_cluster.state

        ckecksum_random_root_cluster = createChecksum(
            service=self.testdata,
            virtual_machine=self.vm_1,
            disk=root_volume_cluster,
            disk_type="rootdiskdevice")

        self.vm_1.stop(self.apiclient)
        root_vol_snap = Snapshot.create(
            self.apiclient,
            root_volume_cluster.id)

        self.assertEqual(
            root_vol_snap.state,
            "BackedUp",
            "Check if the snapshot state is correct "
        )

        self.assertEqual(
            root_vol_state,
            root_volume_cluster.state,
            "Check if volume state has changed"
        )

        self.vm_1.start(self.apiclient)
        # Step 2
        snapshot_list = list_snapshots(
            self.apiclient,
            id=root_vol_snap.id
        )

        self.assertNotEqual(
            snapshot_list,
            None,
            "Check if result exists in list item call"
        )
        self.assertEqual(
            snapshot_list[0].id,
            root_vol_snap.id,
            "Check resource id in list resources call"
        )

        self.assertTrue(
            is_snapshot_on_nfs(
#.........这里部分代码省略.........
开发者ID:Cormoran96,项目名称:cloudstack,代码行数:101,代码来源:testpath_volume_snapshot.py


示例12: test_01_concurrent_snapshots

    def test_01_concurrent_snapshots(self):
        """Concurrent Snapshots
            1. Create snapshot on 2 new VMs in parallel and check
                    1. all snapshot jobs are running
                    2. listSnapshots should list all the snapshots
                    3. Verify secondary_storage NFS share
                       contains the required volume under
                       /secondary/snapshots/$accountid/$volumeid/$snapshot_uuid.
                    4. Verify backup_snap_id was non null in "snapshots"table
            2. Perform step 1 for all the 4 VM's.
            3. Verify that VM gets migrated when snapshot
                is in pregress for the VM.
            4. Verify that snapshots get created when
                VM's are stoped in between snapshot creation.
            5. Perform live Migration then stop all the
                VM's after that verify that snapshot creation success .
            6. Verify success of snapshots creation in case:
                Stop the running VM while performing
                concurrent snapshot on volumes
            7. Verify success of snapshots creation in case:
                Start Migration of VM's and then Stop the running VM then
                performing concurrent snapshot on volumes
        """
        # Step 1
        try:
            create_snapshot_thread_1 = Thread(
                target=CreateSnapshot,
                args=(
                    self,
                    self.root_pool[0],
                    False))
            create_snapshot_thread_2 = Thread(
                target=CreateSnapshot,
                args=(
                    self,
                    self.root_pool[1],
                    False))
            create_snapshot_thread_1.start()
            create_snapshot_thread_2.start()
            create_snapshot_thread_1.join()
            create_snapshot_thread_2.join()

        except:
            self.debug("Error: unable to start thread")

        snapshots = list_snapshots(
            self.apiclient,
            account=self.account.name,
            domainid=self.account.domainid,
            listall=True
        )

        for snapshot in self.snapshot_pool:
            self.assertTrue(snapshot.id in any(
                s.id) for s in snapshots)

        for snapshot in self.snapshot_pool:
            self.assertTrue(
                is_snapshot_on_nfs(
                    self.apiclient,
                    self.dbclient,
                    self.config,
                    self.zone.id,
                    snapshot.id))

        for snapshot in self.snapshot_pool:
            snapshot.delete(self.apiclient)

        self.snapshot_pool = []
        # Step 2
        thread_pool = []
        for i in range(4):
            try:
                create_snapshot_thread_1 = Thread(
                    target=CreateSnapshot,
                    args=(
                        self,
                        self.root_pool[i],
                        False))
                thread_pool.append(create_snapshot_thread_1)

            except Exception as e:
                raise Exception(
                    "Warning: Exception unable to start thread : %s" %
                    e)

        for thread in thread_pool:
            thread.start()

        for thread in thread_pool:
            thread.join()

        snapshots = list_snapshots(
            self.apiclient,
            account=self.account.name,
            domainid=self.account.domainid,
            listall=True
        )

        for snapshot in self.snapshot_pool:
#.........这里部分代码省略.........
开发者ID:Accelerite,项目名称:cloudstack,代码行数:101,代码来源:testpath_volume_cuncurrent_snapshots.py


示例13: test_01_disable_enable_zone

    def test_01_disable_enable_zone(self):
        """disable enable zone
            1. Disable zone and verify following things:
                For admin user:
                    1. Should be create to start/stop exsiting vms
                    2. Should be create to deploy new vm, snapshot,volume,
                       template,iso in the same zone
                For Non-admin user:
                    1. Should be create to start/stop exsiting vms
                    2. Should not be create to deploy new vm, snapshot,volume,
                       template,iso in the same zone
            2. Enable the above disabled zone and verify that:
                -All users should be create to deploy new vm,
                    snapshot,volume,template,iso in the same zone
            3. Try to delete the zone and it should fail with error message:
                -"The zone is not deletable because there are
                    servers running in this zone"
        """
        # Step 1
        vm_user = VirtualMachine.create(
            self.userapiclient,
            self.testdata["small"],
            templateid=self.template.id,
            accountid=self.account.name,
            domainid=self.account.domainid,
            serviceofferingid=self.service_offering.id,
            zoneid=self.zone.id
        )

        vm_root = VirtualMachine.create(
            self.apiclient,
            self.testdata["small"],
            templateid=self.template.id,
            accountid=self.account.name,
            domainid=self.account.domainid,
            serviceofferingid=self.service_offering.id,
            zoneid=self.zone.id
        )

        cmd = updateZone.updateZoneCmd()
        cmd.id = self.zone.id
        cmd.allocationstate = DISABLED
        self.apiclient.updateZone(cmd)
        zoneList = Zone.list(self.apiclient, id=self.zone.id)

        self.assertEqual(zoneList[0].allocationstate,
                         DISABLED,
                         "Check if the zone is in disabled state"
                         )

        # Both user and admin vms shoul be running
        self.assertEqual(vm_user.state,
                         RUNNING,
                         "Verify that the user vm is running")

        self.assertEqual(vm_root.state,
                         RUNNING,
                         "Verify that the admin vm is running")

        vm_root.stop(self.apiclient)
        vm_user.stop(self.apiclient)

        root_state = self.dbclient.execute(
            "select state from vm_instance where name='%s'" %
            vm_root.name)[0][0]

        user_state = self.dbclient.execute(
            "select state from vm_instance where name='%s'" %
            vm_user.name)[0][0]

        self.assertEqual(root_state,
                         STOPPED,
                         "verify that vm is Stopped")

        self.assertEqual(user_state,
                         STOPPED,
                         "verify that vm is stopped")

        root_volume = list_volumes(
            self.userapiclient,
            virtualmachineid=vm_root.id,
            type='ROOT',
            listall=True
        )

        snap = Snapshot.create(
            self.apiclient,
            root_volume[0].id)

        self.assertNotEqual(snap,
                            None,
                            "Verify that admin should be \
                                    able to create snapshot")

        snapshots = list_snapshots(
            self.apiclient,
            volumeid=root_volume[0].id,
            listall=True)

        template_from_snapshot = Template.create_from_snapshot(
#.........这里部分代码省略.........
开发者ID:ZhangQingcheng,项目名称:cloudstack,代码行数:101,代码来源:testpath_disable_enable_zone.py


示例14: test_03_volume_rec_snapshot

    def test_03_volume_rec_snapshot(self):
        """ Test Volume (root) Snapshot
        # 1. For snapshot.delta.max > maxsnaps verify that when number of snapshot exceeds 
                maxsnaps value previous snapshot should get deleted from database but remain 
                on secondary storage and when the value exceeds snapshot.delta.max the 
                snapshot should get deleted from secondary storage
        """

        if self.hypervisor.lower() != "xenserver":
            self.skipTest("Skip test for hypervisor other than Xenserver")

        # Step 1
        self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'
        self.testdata["recurring_snapshot"]["schedule"] = 1
        recurring_snapshot_root = SnapshotPolicy.create(
            self.apiclient,
            self.volume[0].id,
            self.testdata["recurring_snapshot"]
        )

        Configurations.update(self.apiclient,
                              name="snapshot.delta.max",
                              value="3"
                              )

        list_snapshots_policy = list_snapshot_policy(
            self.apiclient,
            id=recurring_snapshot_root.id,
            volumeid=self.volume[0].id
        )
        list_validation = validateList(list_snapshots_policy)

        self.assertEqual(
            list_validation[0],
            PASS,
            "snapshot list validation failed due to %s" %
            list_validation[2])

        timeout = self.testdata["timeout"]
        while True:
            snapshots = list_snapshots(
                self.apiclient,
                volumeid=self.volume[0].id,
                intervaltype=self.testdata[
                    "recurring_snapshot"]["intervaltype"],
                snapshottype='RECURRING',
                listall=True
            )

            if isinstance(snapshots, list):
                break

            elif timeout == 0:
                raise Exception("List snapshots API call failed.")

        time.sleep(3600 * 2)

        snapshots_2 = list_snapshots(
            self.apiclient,
            volumeid=self.volume[0].id,
            intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],
            snapshottype='RECURRING',
            listall=True
        )

        self.assertTrue(snapshots[0] not in snapshots_2)

        for snapshot in snapshots_2:
            snapshots.append(snapshot)

        time.sleep(360)
        self.assertEqual(
            self.dbclient.execute(
                "select status  from snapshots where uuid='%s'" %
                snapshots[0].id)[0][0],
            "Destroyed"
        )

        self.assertTrue(
            is_snapshot_on_nfs(
                self.apiclient,
                self.dbclient,
                self.config,
                self.zone.id,
                snapshots[0].id))

        time.sleep(3600)

        snapshots_3 = list_snapshots(
            self.apiclient,
            volumeid=self.volume[0].id,
            intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],
            snapshottype='RECURRING',
            listall=True
        )

        self.assertTrue(snapshots[1] not in snapshots_3)
        snapshots.append(snapshots_3[1])
        time.sleep(180)

#.........这里部分代码省略.........
开发者ID:Accelerite,项目名称:cloudstack,代码行数:101,代码来源:testpath_volume_recurring_snap.py


示例15: test_01_disable_enable_cluster

    def test_01_disable_enable_cluster(self):
        """disable enable cluster
            1. Disable cluster and verify following things:
                For admin user:
                     --Should be able to create new vm, snapshot,
                     volume,template,iso in the same cluster
                For Non-admin user:
                     --Should not be able create new vm, snapshot,
                     volume,template,iso in the same cluster
            2. Enable the above disabled cluster and verify that:
                -All users should be create to deploy new vm, snapshot,
                volume,template,iso in the same cluster
            3. Disable the managestate of the cluster and verify that:
                --Host in the cluster should get disconnected
                --VM's in the cluster are ping-able and ssh to
                --Creation of new VM in the cluster should fail
            4. Enable the managestate of the cluster and verify that:
                --Hosts in the cluster get connected
                --VM's in the cluster are accessible
            5. Try to delete the cluster and it should fail with error message:
                -"The cluster is not deletable because there are
                servers running in this cluster"

        """
        # Step 1
        vm_user = VirtualMachine.create(
            self.userapiclient,
            self.testdata["small"],
            templateid=self.template.id,
            accountid=self.account.name,
            domainid=self.account.domainid,
            serviceofferingid=self.service_offering.id,
            zoneid=self.zone.id,
            mode=self.zone.networktype,
        )

        self.vm_list.append(vm_user)

        vm_root = VirtualMachine.create(
            self.apiclient,
            self.testdata["small"],
            templateid=self.template.id,
            accountid=self.admin_account.name,
            domainid=self.admin_account.domainid,
            serviceofferingid=self.service_offering.id,
            zoneid=self.zone.id,
            mode=self.zone.networktype,
        )

        self.vm_list.append(vm_root)

        cmd = updateCluster.updateClusterCmd()
        cmd.id = self.cluster.id
        cmd.allocationstate = DISABLED
        self.apiclient.updateCluster(cmd)
        clusterList = Cluster.list(self.apiclient, id=self.cluster.id)

        self.assertEqual(clusterList[0].allocationstate, DISABLED, "Check if the cluster is in disabled state")

        # Verify the existing vms should be running
        self.assertEqual(vm_user.state.lower(), "running", "Verify that the user vm is running")

        self.assertEqual(vm_root.state.lower(), "running", "Verify that the root vm is running")

        VirtualMachine.create(
            self.apiclient,
            self.testdata["small"],
            templateid=self.template.id,
            accountid=self.admin_account.name,
            domainid=self.admin_account.domainid,
            serviceofferingid=self.service_offering.id,
            zoneid=self.zone.id,
        )

        root_volume = list_volumes(self.apiclient, virtualmachineid=vm_root.id, type="ROOT", listall=True)

        self.assertEqual(
            validateList(root_volume)[0], PASS, "list root volume response is empty for volume id %s" % vm_root.id
        )

        if self.snapshotSupported:
            Snapshot.create(self.apiclient, root_volume[0].id)

            snapshots = list_snapshots(self.apiclient, volumeid=root_volume[0].id, listall=True)
            self.assertEqual(
                validateList(snapshots)[0], PASS, "list snapshot  is empty for volume id %s" % root_volume[0].id
            )

            Template.create_from_snapshot(self.apiclient, snapshots[0], self.testdata["privatetemplate"])

        builtin_info = get_builtin_template_info(self.apiclient, self.zone.id)
        self.testdata["privatetemplate"]["url"] = builtin_info[0]
        self.testdata["privatetemplate"]["hypervisor"] = builtin_info[1]
        self.testdata["privatetemplate"]["format"] = builtin_info[2]

        Template.register(self.apiclient, self.testdata["privatetemplate"], zoneid=self.zone.id)

        Volume.create(
            self.apiclient,
            self.testdata["volume"],
#.........这里部分代码省略.........
开发者ID:MissionCriticalCloudOldRepos,项目名称:cosmic-core,代码行数:101,代码来源:testpath_disable_enable_zone.py


示例16: test_01_volume_snapshot

该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python common.list_ssvms函数代码示例发布时间:2022-05-27
下一篇:
Python common.list_routers函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap