def test_01_check_revert_snapshot(self):
""" Test revert snapshot on XenServer
# 1. Deploy a VM.
# 2. Take VM snapshot.
# 3. Verify that volume snapshot fails with error
can not create volume snapshot for VM with VM-snapshot
"""
# Step 1
vm = VirtualMachine.create(
self.userapiclient,
self.testdata["small"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
zoneid=self.zone.id,
)
volumes_cluster_list = list_volumes(
self.apiclient,
virtualmachineid=vm.id,
type='ROOT',
listall=True
)
volume_list_validation = validateList(volumes_cluster_list)
self.assertEqual(
volume_list_validation[0],
PASS,
"Event list validation failed due to %s" %
volume_list_validation[2]
)
root_volume = volumes_cluster_list[0]
#Step 2
vm_snap = VmSnapshot.create(self.apiclient,
vm.id)
self.assertEqual(
vm_snap.state,
"Ready",
"Check the snapshot of vm is ready!"
)
#Step 3
with self.assertRaises(Exception):
Snapshot.create(
self.apiclient,
root_volume.id)
return
def test_05_snapshots_per_project(self):
"""Test Snapshot limit per project
"""
# Validate the following
# 1. set max no of snapshots per project to 1.
# 2. Create one snapshot in the project. Snapshot should be
# successfully created
# 5. Try to create another snapshot in this project. It should give
# user an appropriate error and an alert should be generated.
if self.hypervisor.lower() in ["hyperv"]:
raise self.skipTest("Snapshots feature is not supported on Hyper-V")
self.debug("Updating snapshot resource limits for project: %s" % self.project.id)
# Set usage_vm=1 for Account 1
update_resource_limit(self.apiclient, 3, max=1, projectid=self.project.id) # Snapshot
self.debug("Deploying VM for account: %s" % self.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id,
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(virtual_machine_1.state, "Running", "Check VM state is Running or not")
# Get the Root disk of VM
volumes = list_volumes(
self.apiclient, virtualmachineid=virtual_machine_1.id, projectid=self.project.id, type="ROOT"
)
self.assertEqual(isinstance(volumes, list), True, "Check for list volume response return valid data")
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a snapshot from the ROOTDISK
snapshot_1 = Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
self.cleanup.append(snapshot_1)
# list snapshots
snapshots = list_snapshots(self.apiclient, projectid=self.project.id)
self.debug("snapshots list: %s" % snapshots)
self.assertEqual(validateList(snapshots)[0], PASS, "Snapshots list validation failed")
self.assertEqual(len(snapshots), 1, "Snapshots list should have exactly one entity")
# Exception should be raised for second snapshot
with self.assertRaises(Exception):
Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
return
def test_01_test_vm_volume_snapshot(self):
"""
@Desc: Test that Volume snapshot for root volume is allowed
when VM snapshot is present for the VM
@Steps:
1: Deploy a VM and create a VM snapshot for VM
2: Try to create snapshot for the root volume of the VM,
It should not fail
"""
# Creating Virtual Machine
virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
)
VmSnapshot.create(
self.apiclient,
virtual_machine.id,
)
volumes = Volume.list(self.apiclient,
virtualmachineid=virtual_machine.id,
type="ROOT",
listall=True)
self.assertEqual(validateList(volumes)[0], PASS,
"Failed to get root volume of the VM")
snapshot = Snapshot.create(
self.apiclient,
volumes[0].id,
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Snapshot created: ID - %s" % snapshot.id)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
validateList(snapshots)[0],
PASS,
"Invalid snapshot list"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
return
def test_06_create_snapshots_in_project(self):
"""Test create snapshots in project
"""
# Validate the following
# 1. Create a project
# 2. Add some snapshots to the project
# 3. Verify snapshot created inside project can only be used in inside
# the project
self.debug("Deploying VM for Project: %s" % self.project.id)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id,
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(virtual_machine_1.state, "Running", "Check VM state is Running or not")
# Get the Root disk of VM
volumes = list_volumes(self.apiclient, projectid=self.project.id, type="ROOT", listall=True)
self.assertEqual(isinstance(volumes, list), True, "Check for list volume response return valid data")
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a snapshot from the ROOTDISK
snapshot = Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
self.cleanup.append(snapshot)
# Verify Snapshot state
self.assertEqual(
snapshot.state in ["BackedUp", "CreatedOnPrimary", "Allocated"],
True,
"Check Snapshot state is in one of the mentioned possible states, \
It is currently: %s"
% snapshot.state,
)
snapshots = Snapshot.list(self.apiclient, account=self.account.name, domainid=self.account.domainid)
self.assertEqual(snapshots, None, "Snapshots should not be available outside the project")
return
def test_02_snapshot_data_disk(self):
"""Test Snapshot Data Disk
"""
if self.hypervisor.lower() in ['hyperv']:
self.skipTest("Snapshots feature is not supported on Hyper-V")
volume = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='DATADISK',
listall=True
)
self.assertEqual(
isinstance(volume, list),
True,
"Check list response returns a valid list"
)
self.debug("Creating a Snapshot from data volume: %s" % volume[0].id)
snapshot = Snapshot.create(
self.apiclient,
volume[0].id,
account=self.account.name,
domainid=self.account.domainid
)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.assertTrue(
is_snapshot_on_nfs(
self.apiclient,
self.dbclient,
self.config,
self.zone.id,
snapshot.id))
return
def test_01_test_vm_volume_snapshot(self):
"""
@Desc: Test that Volume snapshot for root volume is not allowed
when VM snapshot is present for the VM
@Steps:
1: Deploy a VM and create a VM snapshot for VM
2: Try to create snapshot for the root volume of the VM,
It should expect Exception
"""
# Creating Virtual Machine
virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
)
VmSnapshot.create(
self.apiclient,
virtual_machine.id,
)
volumes = Volume.list(self.apiclient,
virtualmachineid=virtual_machine.id,
type="ROOT",
listall=True)
self.assertEqual(validateList(volumes)[0], PASS,
"Failed to get root volume of the VM")
volume = volumes[0]
with self.assertRaises(Exception):
Snapshot.create(self.apiclient,
volume_id=volume.id)
return
def test_04_template_from_snapshot(self):
"""Create Template from snapshot
"""
# Validate the following
# 2. Snapshot the Root disk
# 3. Create Template from snapshot
# 4. Deploy Virtual machine using this template
# 5. VM should be in running state
userapiclient = self.testClient.getUserApiClient(UserName=self.account.name, DomainName=self.account.domain)
volumes = Volume.list(userapiclient, virtualmachineid=self.virtual_machine.id, type="ROOT", listall=True)
volume = volumes[0]
self.debug("Creating a snapshot from volume: %s" % volume.id)
# Create a snapshot of volume
snapshot = Snapshot.create(userapiclient, volume.id, account=self.account.name, domainid=self.account.domainid)
self.debug("Creating a template from snapshot: %s" % snapshot.id)
# Generate template from the snapshot
template = Template.create_from_snapshot(userapiclient, snapshot, self.services["template"])
self.cleanup.append(template)
# Verify created template
templates = Template.list(
userapiclient, templatefilter=self.services["template"]["templatefilter"], id=template.id
)
self.assertNotEqual(templates, None, "Check if result exists in list item call")
self.assertEqual(templates[0].id, template.id, "Check new template id in list resources call")
self.debug("Deploying a VM from template: %s" % template.id)
# Deploy new virtual machine using template
virtual_machine = VirtualMachine.create(
userapiclient,
self.services["virtual_machine"],
templateid=template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
)
self.cleanup.append(virtual_machine)
vm_response = VirtualMachine.list(
userapiclient, id=virtual_machine.id, account=self.account.name, domainid=self.account.domainid
)
self.assertEqual(isinstance(vm_response, list), True, "Check for list VM response return valid list")
# Verify VM response to check whether VM deployment was successful
self.assertNotEqual(len(vm_response), 0, "Check VMs available in List VMs response")
vm = vm_response[0]
self.assertEqual(vm.state, "Running", "Check the state of VM created from Template")
return
def test_04_reoccuring_snapshot_rules(self):
"""
1) Create a VM using the Service offering IsVolatile enabled
2) Apply a recurring snapshot rule on the Volume.
3) After a couple of snapshots are taken reboot the VM.
Verify the following conditions
1) New root disk should be formed
2) The recurring snapshot rule should be deleted
"""
self.hypervisor = self.testClient.getHypervisorInfo()
vms = VirtualMachine.list(
self.apiclient,
id=self.vm_with_reset.id,
listall=True
)
vm_list_validation_result = validateList(vms)
self.assertEqual(vm_list_validation_result[0], PASS, "vm list validation failed due to %s" %
vm_list_validation_result[2])
vm_with_reset = vm_list_validation_result[1]
vm_with_reset_root_disk_id = self.get_root_device_uuid_for_vm(
vm_with_reset.id,
vm_with_reset.rootdeviceid
)
self.debug("Creating recurring snapshot policy for root disk on vm created with IsVolatile=True")
self.debug("Snapshot Policy - Type : %s Scheduled Hours : %s" %(
self.services["recurring_snapshot"]["intervaltype"],
self.services["recurring_snapshot"]["schedule"]))
recurring_snapshot = SnapshotPolicy.create(
self.apiclient,
vm_with_reset_root_disk_id,
self.services["recurring_snapshot"]
)
#ListSnapshotPolicy should return newly created policy
list_snapshots_policy = SnapshotPolicy.list(
self.apiclient,
id=recurring_snapshot.id,
volumeid=vm_with_reset_root_disk_id
)
snapshot_list_validation_result = validateList(list_snapshots_policy)
self.assertEqual(snapshot_list_validation_result[0], PASS, "snapshot list validation failed due to %s" %
snapshot_list_validation_result[2])
snapshots_policy = snapshot_list_validation_result[1]
self.assertEqual(
snapshots_policy.id,
recurring_snapshot.id,
"Check recurring snapshot id in list resources call"
)
self.assertEqual(
snapshots_policy.maxsnaps,
self.services["recurring_snapshot"]["maxsnaps"],
"Check interval type in list resources call"
)
sleep_seconds = (self.services["recurring_snapshot"]["schedule"]) * 3600 + 600
sleep_minutes = sleep_seconds/60
self.debug("Sleeping for %s minutes till the volume is snapshoted" %sleep_minutes)
time.sleep(sleep_seconds)
retriesCount = self.services["retriesCount"]
while True:
snapshots = Snapshot.list(
self.apiclient,
volumeid=vm_with_reset_root_disk_id,
intervaltype=\
self.services["recurring_snapshot"]["intervaltype"],
snapshottype=RECURRING,
listall=True
)
snapshot_list_validation_result = validateList(snapshots)
if snapshot_list_validation_result[0] == PASS:
break
elif retriesCount == 0:
self.fail("Failed to get snapshots list")
time.sleep(60)
retriesCount = retriesCount - 1
# rebooting the vm with isVolatile = True
try:
self.vm_with_reset.reboot(self.apiclient)
except Exception as e:
self.fail("Failed to reboot the virtual machine. Error: %s" % e)
# Check if the the root disk was destroyed and recreated for isVolatile=True
self.debug("Checking whether root disk of VM with isVolatile=True was destroyed")
vms = VirtualMachine.list(
self.apiclient,
#.........这里部分代码省略.........
def test_01_storage_snapshots_limits(self):
""" Storage and Snapshot Limit
1. Create Snapshot of ROOT disk.
2. Verify the Secondary Storage value
is increased by the size of snapshot.
3. Delete Snaphshot.
4. Verify the Secondary
Storage value is decreased by the size of snapshot.
5. Set the Snapshot limit of Account.
6. Create Snasphots till limit is reached.
7. Create Snapshot of ROOT Volume.
Creation should fail.
8. Delete few Snapshots.
9. Create Snapshot again.
Creation should succeed.
"""
# Get ROOT Volume
root_volumes_list = Volume.list(
self.userapiclient,
virtualmachineid=self.vm.id,
type='ROOT'
)
status = validateList(root_volumes_list)
self.assertEqual(status[0], PASS, "ROOT Volume List Validation Failed")
root_volume = root_volumes_list[0]
self.data_volume_created = Volume.create(
self.userapiclient,
self.testdata["volume"],
zoneid=self.zone.id,
account=self.account.name,
domainid=self.account.domainid,
diskofferingid=self.disk_offering.id
)
self.cleanup.append(self.data_volume_created)
data_volumes_list = Volume.list(
self.userapiclient,
id=self.data_volume_created.id
)
status = validateList(data_volumes_list)
self.assertEqual(status[0], PASS, "DATA Volume List Validation Failed")
self.data_volume = data_volumes_list[0]
self.vm.attach_volume(
self.userapiclient,
self.data_volume
)
# Get Secondary Storage Value from Database
qryresult_before_snapshot = self.dbclient.execute(
" select id, account_name, secondaryStorageTotal\
from account_view where account_name = '%s';" %
self.account.name)
status = validateList(qryresult_before_snapshot)
self.assertEqual(
status[0],
PASS,
"Check sql query to return SecondaryStorageTotal of account")
secStorageBeforeSnapshot = qryresult_before_snapshot[0][2]
# Step 1
snapshot = Snapshot.create(
self.userapiclient,
root_volume.id)
snapshots_list = Snapshot.list(self.userapiclient,
id=snapshot.id)
status = validateList(snapshots_list)
self.assertEqual(status[0], PASS, "Snapshots List Validation Failed")
# Verify Snapshot state
self.assertEqual(
snapshots_list[0].state.lower() in [
BACKED_UP,
],
True,
"Snapshot state is not as expected. It is %s" %
snapshots_list[0].state
)
# Step 2
qryresult_after_snapshot = self.dbclient.execute(
" select id, account_name, secondaryStorageTotal\
from account_view where account_name = '%s';" %
self.account.name)
status = validateList(qryresult_after_snapshot)
self.assertEqual(
status[0],
PASS,
#.........这里部分代码省略.........
def test_03_reuse_template_name(self):
"""TS_BUG_011-Test Reusing deleted template name
"""
# Validate the following
# 1. Deploy VM using default template, small service offering
# and small data disk offering.
# 2. Perform snapshot on the root disk of this VM.
# 3. Create a template from snapshot.
# 4. Delete the template and create a new template with same name
# 5. Template should be created succesfully
# Create a snapshot from the ROOTDISK
snapshot = Snapshot.create(
self.apiclient,
self.volume.id,
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created snapshot with ID: %s" % snapshot.id)
snapshots = Snapshot.list(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list snapshots call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check snapshot id in list resources call"
)
# Generate template from the snapshot
template = Template.create_from_snapshot(
self.apiclient,
snapshot,
self.services["template"],
random_name=False
)
self.debug("Created template from snapshot: %s" % template.id)
templates = Template.list(
self.apiclient,
templatefilter=\
self.services["template"]["templatefilter"],
id=template.id
)
self.assertEqual(
isinstance(templates, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
templates,
None,
"Check if result exists in list item call"
)
self.assertEqual(
templates[0].isready,
True,
"Check new template state in list templates call"
)
self.debug("Deleting template: %s" % template.id)
template.delete(self.apiclient)
# Wait for some time to ensure template state is reflected in other calls
time.sleep(self.services["sleep"])
# Generate template from the snapshot
self.debug("Creating template from snapshot: %s with same name" %
template.id)
template = Template.create_from_snapshot(
self.apiclient,
snapshot,
self.services["template"],
random_name=False
)
templates = Template.list(
self.apiclient,
templatefilter=\
self.services["template"]["templatefilter"],
id=template.id
)
self.assertEqual(
isinstance(templates, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
#.........这里部分代码省略.........
def test_02_check_size_snapshotTemplate(self):
"""TS_BUG_010-Test check size of snapshot and template
"""
# Validate the following
# 1. Deploy VM using default template, small service offering
# and small data disk offering.
# 2. Perform snapshot on the root disk of this VM.
# 3. Create a template from snapshot.
# 4. Check the size of snapshot and template
# Create a snapshot from the ROOTDISK
snapshot = Snapshot.create(
self.apiclient,
self.volume.id,
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created snapshot with ID: %s" % snapshot.id)
snapshots = Snapshot.list(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list snapshots call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check snapshot id in list resources call"
)
# Generate template from the snapshot
template = Template.create_from_snapshot(
self.apiclient,
snapshot,
self.services["template"]
)
self.cleanup.append(template)
self.debug("Created template from snapshot with ID: %s" % template.id)
templates = Template.list(
self.apiclient,
templatefilter=\
self.services["template"]["templatefilter"],
id=template.id
)
self.assertEqual(
isinstance(templates, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
templates,
None,
"Check if result exists in list item call"
)
self.assertEqual(
templates[0].isready,
True,
"Check new template state in list templates call"
)
# check size of template with that of snapshot
self.assertEqual(
templates[0].size,
self.volume.size,
"Derived template size (%s) does not match snapshot size (%s)" % (templates[0].size, self.volume.size)
)
return
请发表评论