def test_04_snapshot_limit(self):
"""Test snapshot limit in snapshot policies
"""
# Validate the following
# 1. Perform hourly recurring snapshot on the root disk of VM and keep
# the maxsnapshots as 1
# 2. listSnapshots should list the snapshot that was created
# snapshot folder in secondary storage should contain only one
# snapshot image(/secondary/snapshots/$accountid/$volumeid/)
# Get the Root disk of VM
volumes = list_volumes(self.apiclient, virtualmachineid=self.virtual_machine.id, type="ROOT", listall=True)
self.assertEqual(isinstance(volumes, list), True, "Check list response returns a valid list")
volume = volumes[0]
# Create a snapshot policy
recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume.id, self.services["recurring_snapshot"])
self.cleanup.append(recurring_snapshot)
snapshot_policy = list_snapshot_policy(self.apiclient, id=recurring_snapshot.id, volumeid=volume.id)
self.assertEqual(isinstance(snapshot_policy, list), True, "Check list response returns a valid list")
self.assertNotEqual(snapshot_policy, None, "Check if result exists in list item call")
self.assertEqual(
snapshot_policy[0].id, recurring_snapshot.id, "Check recurring snapshot id in list resources call"
)
self.assertEqual(
snapshot_policy[0].maxsnaps,
self.services["recurring_snapshot"]["maxsnaps"],
"Check interval type in list resources call",
)
# Sleep for (maxsnaps+1) hours to verify
# only maxsnaps snapshots are retained
time.sleep((int(self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600)
# Verify the snapshot was created or not
snapshots = list_snapshots(
self.apiclient,
volumeid=volume.id,
intervaltype=self.services["recurring_snapshot"]["intervaltype"],
snapshottype="RECURRING",
listall=True,
)
self.assertEqual(isinstance(snapshots, list), True, "Check list response returns a valid list")
self.assertEqual(
len(snapshots),
self.services["recurring_snapshot"]["maxsnaps"],
"Check maximum number of recurring snapshots retained",
)
snapshot = snapshots[0]
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
self.assertTrue(is_snapshot_on_nfs(self.apiclient, self.dbclient, self.config, self.zone.id, snapshot.id))
return
def test_01_test_vm_volume_snapshot(self):
"""
@Desc: Test that Volume snapshot for root volume is allowed
when VM snapshot is present for the VM
@Steps:
1: Deploy a VM and create a VM snapshot for VM
2: Try to create snapshot for the root volume of the VM,
It should not fail
"""
# Creating Virtual Machine
virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
)
VmSnapshot.create(
self.apiclient,
virtual_machine.id,
)
volumes = Volume.list(self.apiclient,
virtualmachineid=virtual_machine.id,
type="ROOT",
listall=True)
self.assertEqual(validateList(volumes)[0], PASS,
"Failed to get root volume of the VM")
snapshot = Snapshot.create(
self.apiclient,
volumes[0].id,
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Snapshot created: ID - %s" % snapshot.id)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
validateList(snapshots)[0],
PASS,
"Invalid snapshot list"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
return
def test_02_snapshot_data_disk(self):
"""Test Snapshot Data Disk
"""
if self.hypervisor.lower() in ['hyperv']:
self.skipTest("Snapshots feature is not supported on Hyper-V")
volume = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='DATADISK',
listall=True
)
self.assertEqual(
isinstance(volume, list),
True,
"Check list response returns a valid list"
)
self.debug("Creating a Snapshot from data volume: %s" % volume[0].id)
snapshot = Snapshot.create(
self.apiclient,
volume[0].id,
account=self.account.name,
domainid=self.account.domainid
)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.assertTrue(
is_snapshot_on_nfs(
self.apiclient,
self.dbclient,
self.config,
self.zone.id,
snapshot.id))
return
def test_13_move_across_subdomain_vm_snapshot(self):
"""Test as domain admin, stop a VM from subdomain1 and attempt to move it to subdomain2
"""
# Validate the following:
# 1. deploy VM in sub subdomain1 with snapshot.
# 3. assignVirtualMachine to subdomain2
self.create_vm(self.sdomain_account_user1['account'], self.sdomain_account_user1['domain'], snapshot=True)
self.virtual_machine.assign_virtual_machine(self.apiclient, self.sdomain_account_user2['account'].name ,self.sdomain_account_user2['domain'].id)
snapshots = list_snapshots(self.apiclient,
id=self.snapshot.id)
self.assertEqual(snapshots,
None,
"Snapshots stil present for a vm in domain")
def test_05_snapshots_per_project(self):
"""Test Snapshot limit per project
"""
# Validate the following
# 1. set max no of snapshots per project to 1.
# 2. Create one snapshot in the project. Snapshot should be
# successfully created
# 5. Try to create another snapshot in this project. It should give
# user an appropriate error and an alert should be generated.
if self.hypervisor.lower() in ["hyperv"]:
raise self.skipTest("Snapshots feature is not supported on Hyper-V")
self.debug("Updating snapshot resource limits for project: %s" % self.project.id)
# Set usage_vm=1 for Account 1
update_resource_limit(self.apiclient, 3, max=1, projectid=self.project.id) # Snapshot
self.debug("Deploying VM for account: %s" % self.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id,
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(virtual_machine_1.state, "Running", "Check VM state is Running or not")
# Get the Root disk of VM
volumes = list_volumes(
self.apiclient, virtualmachineid=virtual_machine_1.id, projectid=self.project.id, type="ROOT"
)
self.assertEqual(isinstance(volumes, list), True, "Check for list volume response return valid data")
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a snapshot from the ROOTDISK
snapshot_1 = Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
self.cleanup.append(snapshot_1)
# list snapshots
snapshots = list_snapshots(self.apiclient, projectid=self.project.id)
self.debug("snapshots list: %s" % snapshots)
self.assertEqual(validateList(snapshots)[0], PASS, "Snapshots list validation failed")
self.assertEqual(len(snapshots), 1, "Snapshots list should have exactly one entity")
# Exception should be raised for second snapshot
with self.assertRaises(Exception):
Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
return
def test_01_create__snapshot_new_resized_rootvolume_size(self):
"""Test create snapshot on resized root volume
# Validate the following
# 1. Deploy a VM without any disk offering (only root disk)
# 2. Perform(resize) of the root volume
# 3. Perform snapshot on resized volume
"""
# deploy a vm
try:
if self.updateclone:
self.virtual_machine = VirtualMachine.create(
self.apiclient, self.services["virtual_machine"],
accountid=self.parentd_admin.name,
domainid=self.parent_domain.id,
serviceofferingid=self.services_offering_vmware.id,
mode=self.zone.networktype
)
else:
self.virtual_machine = VirtualMachine.create(
self.apiclient, self.services["virtual_machine"],
accountid=self.parentd_admin.name,
domainid=self.parent_domain.id,
serviceofferingid=self.service_offering.id,
mode=self.zone.networktype
)
# listVirtual machine
list_vms = VirtualMachine.list(self.apiclient,
id=self.virtual_machine.id)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" %
self.virtual_machine.id
)
res = validateList(list_vms)
self.assertNotEqual(res[2], INVALID_INPUT, "Invalid list response")
vm = list_vms[0]
self.assertEqual(
vm.id,
self.virtual_machine.id,
"Virtual Machine ids do not match"
)
self.assertEqual(
vm.name,
self.virtual_machine.name,
"Virtual Machine names do not match"
)
self.assertEqual(
vm.state,
"Running",
msg="VM is not in Running state"
)
result = self.chk_volume_resize(self.apiclient, vm)
if result:
# get root vol from created vm, verify it is correct size
list_volume_response = Volume.list(
self.apiclient,
virtualmachineid=
self.virtual_machine.id,
type='ROOT',
listall='True'
)
res = validateList(list_volume_response)
self.assertNotEqual(res[2], INVALID_INPUT, "listVolumes returned invalid object in response")
rootvolume = list_volume_response[0]
self.debug("Creating a Snapshot from root volume: "
"%s" % rootvolume.id)
snapshot = Snapshot.create(
self.apiclient,
rootvolume.id,
account=self.parentd_admin.name,
domainid=self.parent_domain.id
)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
res = validateList(snapshots)
self.assertNotEqual(res[2], INVALID_INPUT, "Invalid list response")
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
else:
self.debug("Volume resize is failed")
except Exception as e:
raise Exception("Exception while performing"
" the snapshot on resized root volume"
" test case: %s" % e)
self.cleanup.append(self.virtual_machine)
self.cleanup.append(snapshot)
#.........这里部分代码省略.........
def test_02_accountSnapshotClean(self):
"""Test snapshot cleanup after account deletion
"""
# Validate the following
# 1. listAccounts API should list out the newly created account
# 2. listVirtualMachines() command should return the deployed VM.
# State of this VM should be "Running"
# 3. a)listSnapshots should list the snapshot that was created.
# b)verify that secondary storage NFS share contains the reqd volume
# under /secondary/snapshots/$accountid/$volumeid/$snapshot_id
# 4. a)listAccounts should not list account that is deleted
# b) snapshot image($snapshot_id) should be deleted from the
# /secondary/snapshots/$accountid/$volumeid/
try:
accounts = list_accounts(
self.apiclient,
id=self.account.id
)
self.assertEqual(
isinstance(accounts, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(accounts),
0,
"Check list Accounts response"
)
# Verify the snapshot was created or not
snapshots = list_snapshots(
self.apiclient,
id=self.snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"No such snapshot %s found" % self.snapshot.id
)
self.assertEqual(
snapshots[0].id,
self.snapshot.id,
"Check snapshot id in list resources call"
)
self.assertTrue(is_snapshot_on_nfs(self.apiclient, self.dbclient, self.config, self.zone.id, self.snapshot.id),
"Snapshot was not found on NFS")
except Exception as e:
self._cleanup.append(self.account)
self.fail("Exception occured: %s" % e)
self.debug("Deleting account: %s" % self.account.name)
# Delete account
self.account.delete(self.apiclient)
# Wait for account cleanup interval
wait_for_cleanup(self.apiclient, configs=["account.cleanup.interval"])
with self.assertRaises(Exception):
accounts = list_accounts(
self.apiclient,
id=self.account.id
)
self.assertFalse(is_snapshot_on_nfs(self.apiclient, self.dbclient, self.config, self.zone.id, self.snapshot.id),
"Snapshot was still found on NFS after account gc")
return
def test_01_volume_snapshot(self):
""" Test Volume (root) Snapshot
# 1. Deploy a VM on primary storage and .
# 2. Take snapshot on root disk
# 3. Verify the snapshot's entry in the "snapshots" table
and presence of the corresponding
snapshot on the Secondary Storage
# 4. Create Template from the Snapshot and Deploy a
VM using the Template
# 5. Log in to the VM from template and make verify
the contents of the ROOT disk matches with the snapshot.
# 6. Delete Snapshot and Deploy a Linux VM from the
Template and verify the successful deployment of the VM.
# 7. Create multiple snapshots on the same volume and
Check the integrity of all the snapshots by creating
a template from the snapshot and deploying a Vm from it
and delete one of the snapshots
# 8. Verify that the original checksum matches with the checksum
of VM's created from remaning snapshots
# 9. Make verify the contents of the ROOT disk
matches with the snapshot
# 10.Verify that Snapshot of both DATA and ROOT volume should
succeed when snapshot of Data disk of a VM is taken
when snapshot of ROOT volume of VM is in progress
# 11.Create snapshot of data disk and verify the original checksum
matches with the volume created from snapshot
# 12.Verify that volume's state should not change when snapshot of
a DATA volume is taken that is attached to a VM
# 13.Verify that volume's state should not change when snapshot of
a DATA volume is taken that is not attached to a VM
# 14.Verify that create Snapshot with quiescevm=True should succeed
# 15.revertSnapshot() to revert VM to a specified
Volume snapshot for root volume
"""
# Step 1
# Get ROOT Volume Id
root_volumes_cluster_list = list_volumes(
self.apiclient,
virtualmachineid=self.vm_1.id,
type='ROOT',
listall=True
)
root_volume_cluster = root_volumes_cluster_list[0]
disk_volumes_cluster_list = list_volumes(
self.apiclient,
virtualmachineid=self.vm_1.id,
type='DATADISK',
listall=True
)
data_disk = disk_volumes_cluster_list[0]
root_vol_state = root_volume_cluster.state
ckecksum_random_root_cluster = createChecksum(
service=self.testdata,
virtual_machine=self.vm_1,
disk=root_volume_cluster,
disk_type="rootdiskdevice")
self.vm_1.stop(self.apiclient)
root_vol_snap = Snapshot.create(
self.apiclient,
root_volume_cluster.id)
self.assertEqual(
root_vol_snap.state,
"BackedUp",
"Check if the snapshot state is correct "
)
self.assertEqual(
root_vol_state,
root_volume_cluster.state,
"Check if volume state has changed"
)
self.vm_1.start(self.apiclient)
# Step 2
snapshot_list = list_snapshots(
self.apiclient,
id=root_vol_snap.id
)
self.assertNotEqual(
snapshot_list,
None,
"Check if result exists in list item call"
)
self.assertEqual(
snapshot_list[0].id,
root_vol_snap.id,
"Check resource id in list resources call"
)
self.assertTrue(
is_snapshot_on_nfs(
#.........这里部分代码省略.........
def test_01_concurrent_snapshots(self):
"""Concurrent Snapshots
1. Create snapshot on 2 new VMs in parallel and check
1. all snapshot jobs are running
2. listSnapshots should list all the snapshots
3. Verify secondary_storage NFS share
contains the required volume under
/secondary/snapshots/$accountid/$volumeid/$snapshot_uuid.
4. Verify backup_snap_id was non null in "snapshots"table
2. Perform step 1 for all the 4 VM's.
3. Verify that VM gets migrated when snapshot
is in pregress for the VM.
4. Verify that snapshots get created when
VM's are stoped in between snapshot creation.
5. Perform live Migration then stop all the
VM's after that verify that snapshot creation success .
6. Verify success of snapshots creation in case:
Stop the running VM while performing
concurrent snapshot on volumes
7. Verify success of snapshots creation in case:
Start Migration of VM's and then Stop the running VM then
performing concurrent snapshot on volumes
"""
# Step 1
try:
create_snapshot_thread_1 = Thread(
target=CreateSnapshot,
args=(
self,
self.root_pool[0],
False))
create_snapshot_thread_2 = Thread(
target=CreateSnapshot,
args=(
self,
self.root_pool[1],
False))
create_snapshot_thread_1.start()
create_snapshot_thread_2.start()
create_snapshot_thread_1.join()
create_snapshot_thread_2.join()
except:
self.debug("Error: unable to start thread")
snapshots = list_snapshots(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid,
listall=True
)
for snapshot in self.snapshot_pool:
self.assertTrue(snapshot.id in any(
s.id) for s in snapshots)
for snapshot in self.snapshot_pool:
self.assertTrue(
is_snapshot_on_nfs(
self.apiclient,
self.dbclient,
self.config,
self.zone.id,
snapshot.id))
for snapshot in self.snapshot_pool:
snapshot.delete(self.apiclient)
self.snapshot_pool = []
# Step 2
thread_pool = []
for i in range(4):
try:
create_snapshot_thread_1 = Thread(
target=CreateSnapshot,
args=(
self,
self.root_pool[i],
False))
thread_pool.append(create_snapshot_thread_1)
except Exception as e:
raise Exception(
"Warning: Exception unable to start thread : %s" %
e)
for thread in thread_pool:
thread.start()
for thread in thread_pool:
thread.join()
snapshots = list_snapshots(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid,
listall=True
)
for snapshot in self.snapshot_pool:
#.........这里部分代码省略.........
def test_01_disable_enable_zone(self):
"""disable enable zone
1. Disable zone and verify following things:
For admin user:
1. Should be create to start/stop exsiting vms
2. Should be create to deploy new vm, snapshot,volume,
template,iso in the same zone
For Non-admin user:
1. Should be create to start/stop exsiting vms
2. Should not be create to deploy new vm, snapshot,volume,
template,iso in the same zone
2. Enable the above disabled zone and verify that:
-All users should be create to deploy new vm,
snapshot,volume,template,iso in the same zone
3. Try to delete the zone and it should fail with error message:
-"The zone is not deletable because there are
servers running in this zone"
"""
# Step 1
vm_user = VirtualMachine.create(
self.userapiclient,
self.testdata["small"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
zoneid=self.zone.id
)
vm_root = VirtualMachine.create(
self.apiclient,
self.testdata["small"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
zoneid=self.zone.id
)
cmd = updateZone.updateZoneCmd()
cmd.id = self.zone.id
cmd.allocationstate = DISABLED
self.apiclient.updateZone(cmd)
zoneList = Zone.list(self.apiclient, id=self.zone.id)
self.assertEqual(zoneList[0].allocationstate,
DISABLED,
"Check if the zone is in disabled state"
)
# Both user and admin vms shoul be running
self.assertEqual(vm_user.state,
RUNNING,
"Verify that the user vm is running")
self.assertEqual(vm_root.state,
RUNNING,
"Verify that the admin vm is running")
vm_root.stop(self.apiclient)
vm_user.stop(self.apiclient)
root_state = self.dbclient.execute(
"select state from vm_instance where name='%s'" %
vm_root.name)[0][0]
user_state = self.dbclient.execute(
"select state from vm_instance where name='%s'" %
vm_user.name)[0][0]
self.assertEqual(root_state,
STOPPED,
"verify that vm is Stopped")
self.assertEqual(user_state,
STOPPED,
"verify that vm is stopped")
root_volume = list_volumes(
self.userapiclient,
virtualmachineid=vm_root.id,
type='ROOT',
listall=True
)
snap = Snapshot.create(
self.apiclient,
root_volume[0].id)
self.assertNotEqual(snap,
None,
"Verify that admin should be \
able to create snapshot")
snapshots = list_snapshots(
self.apiclient,
volumeid=root_volume[0].id,
listall=True)
template_from_snapshot = Template.create_from_snapshot(
#.........这里部分代码省略.........
def test_03_volume_rec_snapshot(self):
""" Test Volume (root) Snapshot
# 1. For snapshot.delta.max > maxsnaps verify that when number of snapshot exceeds
maxsnaps value previous snapshot should get deleted from database but remain
on secondary storage and when the value exceeds snapshot.delta.max the
snapshot should get deleted from secondary storage
"""
if self.hypervisor.lower() != "xenserver":
self.skipTest("Skip test for hypervisor other than Xenserver")
# Step 1
self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'
self.testdata["recurring_snapshot"]["schedule"] = 1
recurring_snapshot_root = SnapshotPolicy.create(
self.apiclient,
self.volume[0].id,
self.testdata["recurring_snapshot"]
)
Configurations.update(self.apiclient,
name="snapshot.delta.max",
value="3"
)
list_snapshots_policy = list_snapshot_policy(
self.apiclient,
id=recurring_snapshot_root.id,
volumeid=self.volume[0].id
)
list_validation = validateList(list_snapshots_policy)
self.assertEqual(
list_validation[0],
PASS,
"snapshot list validation failed due to %s" %
list_validation[2])
timeout = self.testdata["timeout"]
while True:
snapshots = list_snapshots(
self.apiclient,
volumeid=self.volume[0].id,
intervaltype=self.testdata[
"recurring_snapshot"]["intervaltype"],
snapshottype='RECURRING',
listall=True
)
if isinstance(snapshots, list):
break
elif timeout == 0:
raise Exception("List snapshots API call failed.")
time.sleep(3600 * 2)
snapshots_2 = list_snapshots(
self.apiclient,
volumeid=self.volume[0].id,
intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],
snapshottype='RECURRING',
listall=True
)
self.assertTrue(snapshots[0] not in snapshots_2)
for snapshot in snapshots_2:
snapshots.append(snapshot)
time.sleep(360)
self.assertEqual(
self.dbclient.execute(
"select status from snapshots where uuid='%s'" %
snapshots[0].id)[0][0],
"Destroyed"
)
self.assertTrue(
is_snapshot_on_nfs(
self.apiclient,
self.dbclient,
self.config,
self.zone.id,
snapshots[0].id))
time.sleep(3600)
snapshots_3 = list_snapshots(
self.apiclient,
volumeid=self.volume[0].id,
intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],
snapshottype='RECURRING',
listall=True
)
self.assertTrue(snapshots[1] not in snapshots_3)
snapshots.append(snapshots_3[1])
time.sleep(180)
#.........这里部分代码省略.........
def test_01_disable_enable_cluster(self):
"""disable enable cluster
1. Disable cluster and verify following things:
For admin user:
--Should be able to create new vm, snapshot,
volume,template,iso in the same cluster
For Non-admin user:
--Should not be able create new vm, snapshot,
volume,template,iso in the same cluster
2. Enable the above disabled cluster and verify that:
-All users should be create to deploy new vm, snapshot,
volume,template,iso in the same cluster
3. Disable the managestate of the cluster and verify that:
--Host in the cluster should get disconnected
--VM's in the cluster are ping-able and ssh to
--Creation of new VM in the cluster should fail
4. Enable the managestate of the cluster and verify that:
--Hosts in the cluster get connected
--VM's in the cluster are accessible
5. Try to delete the cluster and it should fail with error message:
-"The cluster is not deletable because there are
servers running in this cluster"
"""
# Step 1
vm_user = VirtualMachine.create(
self.userapiclient,
self.testdata["small"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
zoneid=self.zone.id,
mode=self.zone.networktype,
)
self.vm_list.append(vm_user)
vm_root = VirtualMachine.create(
self.apiclient,
self.testdata["small"],
templateid=self.template.id,
accountid=self.admin_account.name,
domainid=self.admin_account.domainid,
serviceofferingid=self.service_offering.id,
zoneid=self.zone.id,
mode=self.zone.networktype,
)
self.vm_list.append(vm_root)
cmd = updateCluster.updateClusterCmd()
cmd.id = self.cluster.id
cmd.allocationstate = DISABLED
self.apiclient.updateCluster(cmd)
clusterList = Cluster.list(self.apiclient, id=self.cluster.id)
self.assertEqual(clusterList[0].allocationstate, DISABLED, "Check if the cluster is in disabled state")
# Verify the existing vms should be running
self.assertEqual(vm_user.state.lower(), "running", "Verify that the user vm is running")
self.assertEqual(vm_root.state.lower(), "running", "Verify that the root vm is running")
VirtualMachine.create(
self.apiclient,
self.testdata["small"],
templateid=self.template.id,
accountid=self.admin_account.name,
domainid=self.admin_account.domainid,
serviceofferingid=self.service_offering.id,
zoneid=self.zone.id,
)
root_volume = list_volumes(self.apiclient, virtualmachineid=vm_root.id, type="ROOT", listall=True)
self.assertEqual(
validateList(root_volume)[0], PASS, "list root volume response is empty for volume id %s" % vm_root.id
)
if self.snapshotSupported:
Snapshot.create(self.apiclient, root_volume[0].id)
snapshots = list_snapshots(self.apiclient, volumeid=root_volume[0].id, listall=True)
self.assertEqual(
validateList(snapshots)[0], PASS, "list snapshot is empty for volume id %s" % root_volume[0].id
)
Template.create_from_snapshot(self.apiclient, snapshots[0], self.testdata["privatetemplate"])
builtin_info = get_builtin_template_info(self.apiclient, self.zone.id)
self.testdata["privatetemplate"]["url"] = builtin_info[0]
self.testdata["privatetemplate"]["hypervisor"] = builtin_info[1]
self.testdata["privatetemplate"]["format"] = builtin_info[2]
Template.register(self.apiclient, self.testdata["privatetemplate"], zoneid=self.zone.id)
Volume.create(
self.apiclient,
self.testdata["volume"],
#.........这里部分代码省略.........
请发表评论