def test_01_cluster_settings(self):
"""change cpu/mem.overprovisioning.factor at cluster level and
verify the change """
listHost = Host.list(self.apiclient,
id=self.deployVmResponse.hostid
)
self.assertEqual(
validateList(listHost)[0],
PASS,
"check list host response for host id %s" %
self.deployVmResponse.hostid)
Configurations.update(self.apiclient,
clusterid=listHost[0].clusterid,
name="mem.overprovisioning.factor",
value=2)
Configurations.update(self.apiclient,
clusterid=listHost[0].clusterid,
name="cpu.overprovisioning.factor",
value=3)
list_cluster = Cluster.list(self.apiclient,
id=listHost[0].clusterid)
self.assertEqual(
validateList(list_cluster)[0],
PASS,
"check list cluster response for cluster id %s" %
listHost[0].clusterid)
self.assertEqual(int(list_cluster[0].cpuovercommitratio),
3,
"check the cpu overcommit value at cluster level ")
self.assertEqual(int(list_cluster[0].memoryovercommitratio),
2,
"check memory overcommit value at cluster level")
Configurations.update(self.apiclient,
clusterid=listHost[0].clusterid,
name="mem.overprovisioning.factor",
value=1)
Configurations.update(self.apiclient,
clusterid=listHost[0].clusterid,
name="cpu.overprovisioning.factor",
value=1)
list_cluster1 = Cluster.list(self.apiclient,
id=listHost[0].clusterid)
self.assertEqual(
validateList(list_cluster1)[0],
PASS,
"check the list cluster response for id %s" %
listHost[0].clusterid)
self.assertEqual(int(list_cluster1[0].cpuovercommitratio),
1,
"check the cpu overcommit value at cluster level ")
self.assertEqual(int(list_cluster1[0].memoryovercommitratio),
1,
"check memory overcommit value at cluster level")
def test_Scale_VM(self):
"""
@desc:
1. Enable dynamic scaling in Global settings
2. Register an CentOS 7 tempplate(with tools) and tick dynamic scaling
3. Deploy VM with this template
4.Start the VM and try to change service offering
"""
self.hypervisor = str(get_hypervisor_type(self.api_client)).lower()
if self.hypervisor != "xenserver":
self.skipTest("This test can be run only on xenserver")
self.updateConfigurAndRestart("enable.dynamic.scale.vm","true")
template = Template.register(
self.userapiclient,
self.services["CentOS7template"],
zoneid=self.zone.id,
account=self.account.name,
domainid=self.account.domainid
)
self.assertIsNotNone(template,"Failed to register CentOS 7 template")
self.debug(
"Registered a template with format {} and id {}".format(
self.services["CentOS7template"]["format"],template.id)
)
template.download(self.userapiclient)
self.cleanup.append(template)
vm = VirtualMachine.create(
self.userapiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
templateid=template.id,
zoneid=self.zone.id
)
self.assertIsNotNone(vm,"Failed to deploy virtual machine")
self.cleanup.append(vm)
response = VirtualMachine.list(self.userapiclient,id=vm.id)
status = validateList(response)
self.assertEqual(status[0],PASS,"list vm response returned invalid list")
self.assertEqual(status[1].state,"Running", "vm is not running")
service_offering = ServiceOffering.create(
self.apiClient,
self.services["service_offerings"]["big"]
)
time.sleep(self.services["sleep"])
vm.scale(self.userapiclient,service_offering.id)
scaleresponse = VirtualMachine.list(self.userapiclient,id=vm.id)
scalestatus = validateList(scaleresponse)
self.assertEqual(scalestatus[0],PASS,"list vm response returned invalid list")
self.assertEqual(scalestatus[1].serviceofferingname,service_offering.name, " service offering is not same")
self.assertEqual(scalestatus[1].serviceofferingid,service_offering.id, " service offering ids are not same")
return
def test_01_migrateVolume(self):
"""
@Desc:Volume is not retaining same uuid when migrating from one
storage to another.
Step1:Create a volume/data disk
Step2:Verify UUID of the volume
Step3:Migrate the volume to another primary storage within
the cluster
Step4:Migrating volume to new primary storage should succeed
Step5:volume UUID should not change even after migration
"""
vol = Volume.create(
self.apiclient,
self.services["volume"],
diskofferingid=self.disk_offering.id,
zoneid=self.zone.id,
account=self.account.name,
domainid=self.account.domainid,
)
self.assertIsNotNone(vol, "Failed to create volume")
vol_res = Volume.list(self.apiclient, id=vol.id)
self.assertEqual(validateList(vol_res)[0], PASS, "Invalid response returned for list volumes")
vol_uuid = vol_res[0].id
try:
self.virtual_machine.attach_volume(self.apiclient, vol)
except Exception as e:
self.fail("Attaching data disk to vm failed with error %s" % e)
pools = StoragePool.listForMigration(self.apiclient, id=vol.id)
if not pools:
self.skipTest(
"No suitable storage pools found for volume migration.\
Skipping"
)
self.assertEqual(validateList(pools)[0], PASS, "invalid pool response from findStoragePoolsForMigration")
pool = pools[0]
self.debug("Migrating Volume-ID: %s to Pool: %s" % (vol.id, pool.id))
try:
Volume.migrate(self.apiclient, volumeid=vol.id, storageid=pool.id, livemigrate="true")
except Exception as e:
self.fail("Volume migration failed with error %s" % e)
migrated_vols = Volume.list(
self.apiclient, virtualmachineid=self.virtual_machine.id, listall="true", type="DATADISK"
)
self.assertEqual(validateList(migrated_vols)[0], PASS, "invalid volumes response after migration")
migrated_vol_uuid = migrated_vols[0].id
self.assertEqual(
vol_uuid,
migrated_vol_uuid,
"Volume is not retaining same uuid when migrating from one\
storage to another",
)
self.virtual_machine.detach_volume(self.apiclient, vol)
self.cleanup.append(vol)
return
def test_01_test_vm_volume_snapshot(self):
"""
@Desc: Test that Volume snapshot for root volume is allowed
when VM snapshot is present for the VM
@Steps:
1: Deploy a VM and create a VM snapshot for VM
2: Try to create snapshot for the root volume of the VM,
It should not fail
"""
# Creating Virtual Machine
virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
)
VmSnapshot.create(
self.apiclient,
virtual_machine.id,
)
volumes = Volume.list(self.apiclient,
virtualmachineid=virtual_machine.id,
type="ROOT",
listall=True)
self.assertEqual(validateList(volumes)[0], PASS,
"Failed to get root volume of the VM")
snapshot = Snapshot.create(
self.apiclient,
volumes[0].id,
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Snapshot created: ID - %s" % snapshot.id)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
validateList(snapshots)[0],
PASS,
"Invalid snapshot list"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
return
def test_01_positive_tests_vm_deploy_shared_nw(self):
""" Positive tests for VMLC test path - Advanced Zone in Shared Network
# 1. List created service offering in setUpClass by name
# 2. List registered template with name
# 3. Create VM in account
"""
# List created service offering in setUpClass by name
listServiceOfferings = ServiceOffering.list(
self.apiclient,
name=self.service_offering_1.name,
listall=True
)
self.assertEqual(validateList(listServiceOfferings)[0], PASS,
"List validation failed for service offerings list")
self.assertEqual(listServiceOfferings[0].name,
self.service_offering_1.name,
"Names of created service offering\
and listed service offering not matching")
# List registered template with name
listTemplates = Template.list(
self.userapiclient,
templatefilter="self",
name=self.template.name,
listall=True,
zone=self.zone.id)
self.assertEqual(validateList(listTemplates)[0], PASS,
"List validation failed for templates list")
self.assertEqual(listTemplates[0].name, self.template.name,
"Names of created template and listed template\
not matching")
network = CreateNetwork(self, SHARED_NETWORK)
# Create VM in account
self.virtual_machine = VirtualMachine.create(
self.userapiclient,
self.testdata["small"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering_1.id,
networkids=[network.id, ],
zoneid=self.zone.id
)
self.cleanup.append(self.virtual_machine)
return
def test_08_removeNic_in_sharedNetwork_scope_all_as_domain_parentAdmin(
self):
"""Validate that Parent domain admin is able to remove a NIC which is
added by child domain user
"""
self.api_client.connection.apiKey = self.user_d1_apikey
self.api_client.connection.securityKey = self.user_d1_secretkey
self.debug("Removing NIC od shared Network as user d1")
vm_list = list_virtual_machines(self.api_client, id=self.vmvpc1.id)
vm_list_validation_result = validateList(vm_list)
self.assertEqual(vm_list_validation_result[0], PASS,
"vm list validation failed due to %s" %
vm_list_validation_result[2])
self.debug("virtual machine nics: %s" % vm_list[0].nic)
for nic in vm_list[0].nic:
if nic.networkid == self.shared_network_all.id:
reqNic = nic
self.vmvpc1.remove_nic(self.api_client, reqNic.id)
if not self.verify_nic(self.shared_network_all, self.vmvpc1):
self.debug(
"virtual machine has mot NIC is SharedNetwork: %s" %
self.shared_network_all.name)
else:
self.fail("network %s NIC is present in the virtual Machine %s" %
(self.shared_network_all.name, self.vmvpc1.id))
def create_vm(self, pfrule=False, egress_policy=True, RR=False):
self.create_network_offering(egress_policy, RR)
# Creating network using the network offering created
self.debug("Creating network with network offering: %s" % self.network_offering.id)
self.network = Network.create(
self.apiclient,
self.services["network"],
accountid=self.account.name,
domainid=self.account.domainid,
networkofferingid=self.network_offering.id,
zoneid=self.zone.id,
)
self.debug("Created network with ID: %s" % self.network.id)
self.debug("Deploying instance in the account: %s" % self.account.name)
project = None
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.domain.id,
serviceofferingid=self.service_offering.id,
mode=self.zone.networktype if pfrule else "basic",
networkids=[str(self.network.id)],
projectid=project.id if project else None,
)
self.debug("Deployed instance %s in account: %s" % (self.virtual_machine.id, self.account.name))
# Checking if VM is running or not, in case it is deployed in error state, test case fails
self.vm_list = list_virtual_machines(self.apiclient, id=self.virtual_machine.id)
self.assertEqual(validateList(self.vm_list)[0], PASS, "vm list validation failed, vm list is %s" % self.vm_list)
self.assertEqual(
str(self.vm_list[0].state).lower(),
"running",
"VM state should be running, it is %s" % self.vm_list[0].state,
)
self.public_ip = PublicIPAddress.create(
self.apiclient,
accountid=self.account.name,
zoneid=self.zone.id,
domainid=self.account.domainid,
networkid=self.network.id,
)
# Open up firewall port for SSH
FireWallRule.create(
self.apiclient,
ipaddressid=self.public_ip.ipaddress.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=["0.0.0.0/0"],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"],
)
self.debug("Creating NAT rule for VM ID: %s" % self.virtual_machine.id)
# Create NAT rule
NATRule.create(self.apiclient, self.virtual_machine, self.services["natrule"], self.public_ip.ipaddress.id)
return
def test_attach_volume_exceeding_primary_limits(self):
"""
# do
# 1. create a normal user account and update primary store limits to the current resource count
# 2. Upload a volume of any size
# 3. Verify that upload volume succeeds
# 4. Verify that primary storage count doesnt change
# 6. Try attaching volume to VM and verify that the attach fails (as the resource limits exceed)
# 7. Verify that primary storage count doesnt change
# done
"""
# create an account, launch a vm with default template and custom disk offering, update the primary store limits to the current primary store resource count
response = self.setupNormalAccount()
self.assertEqual(response[0], PASS, response[1])
# upload volume and verify that the volume is uploaded
volume = Volume.upload(
self.apiclient,
self.services["configurableData"]["upload_volume"],
zoneid=self.zone.id,
account=self.account.name,
domainid=self.account.domainid,
url="http://people.apache.org/~sanjeev/rajani-thin-volume.vhd",
)
volume.wait_for_upload(self.apiclient)
volumes = Volume.list(self.apiclient, id=volume.id, zoneid=self.zone.id, listall=True)
validationresult = validateList(volumes)
assert validationresult[0] == PASS, "volumes list validation failed: %s" % validationresult[2]
assert str(volumes[0].state).lower() == "uploaded", (
"Volume state should be 'uploaded' but it is %s" % volumes[0].state
)
# verify that the resource count didnt change due to upload volume
response = matchResourceCount(
self.apiclient, self.initialResourceCount, RESOURCE_PRIMARY_STORAGE, accountid=self.account.id
)
self.assertEqual(response[0], PASS, response[1])
# attach the above volume to the vm
try:
self.virtualMachine.attach_volume(self.apiclient, volume=volume)
except Exception as e:
if (
"Maximum number of resources of type 'primary_storage' for account name=" + self.account.name
in e.message
):
self.assertTrue(True, "there should be primary store resource limit reached exception")
else:
self.fail(
"only resource limit reached exception is expected. some other exception occurred. Failing the test case."
)
# resource count should match as the attach should fail due to reaching resource limits
response = matchResourceCount(
self.apiclient, self.initialResourceCount, RESOURCE_PRIMARY_STORAGE, accountid=self.account.id
)
self.assertEqual(response[0], PASS, response[1])
return
def test_attach_multiple_volumes(self):
"""Attach multiple Volumes simultaneously to a Running VM
"""
# Validate the following
# 1. All data disks attached successfully without any exception
self.debug(
"Attaching volume (ID: %s) to VM (ID: %s)" % (
self.volume1.id,
self.virtual_machine.id
))
vol1_jobId = self.attach_volume(self.apiClient, self.virtual_machine.id,self.volume1)
self.debug(
"Attaching volume (ID: %s) to VM (ID: %s)" % (
self.volume2.id,
self.virtual_machine.id
))
vol2_jobId = self.attach_volume(self.apiClient,self.virtual_machine.id, self.volume2)
self.debug(
"Attaching volume (ID: %s) to VM (ID: %s)" % (
self.volume3.id,
self.virtual_machine.id
))
vol3_jobId = self.attach_volume(self.apiClient,self.virtual_machine.id, self.volume3)
self.debug(
"Attaching volume (ID: %s) to VM (ID: %s)" % (
self.volume4.id,
self.virtual_machine.id
))
vol4_jobId = self.attach_volume(self.apiClient,self.virtual_machine.id, self.volume4)
self.query_async_job(self.apiClient,vol1_jobId.jobid)
self.query_async_job(self.apiClient,vol2_jobId.jobid)
self.query_async_job(self.apiClient,vol3_jobId.jobid)
self.query_async_job(self.apiClient,vol4_jobId.jobid)
# List all the volumes attached to the instance. Includes even the Root disk.
list_volume_response = Volume.list(
self.apiClient,
virtualmachineid=self.virtual_machine.id,
type="DATADISK",
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
validateList(list_volume_response)[0],
PASS,
"Check list response returns a valid list"
)
self.assertEqual(
len(list_volume_response),
4,
"All 4 data disks are not attached to VM Successfully"
)
return
def test_02_migrate_vm(self):
"""Test migrate VM in project
# Validate the following
# 1. Create VM with custom disk offering in a project and check
# initial primary storage count
# 2. List the hosts suitable for migrating the VM
# 3. Migrate the VM and verify that primary storage count of project remains same"""
try:
hosts = Host.list(self.apiclient,virtualmachineid=self.vm.id,
listall=True)
self.assertEqual(validateList(hosts)[0], PASS, "hosts list validation failed")
host = hosts[0]
self.vm.migrate(self.apiclient, host.id)
except Exception as e:
self.fail("Exception occured" % e)
expectedCount = self.initialResourceCount
response = matchResourceCount(
self.apiclient, expectedCount,
RESOURCE_PRIMARY_STORAGE,
projectid=self.project.id)
self.assertEqual(response[0], PASS, response[1])
return
def isIpInDesiredState(apiclient, ipaddressid, state):
""" Check if the given IP is in the correct state (given)
and return True/False accordingly"""
retriesCount = 10
ipInDesiredState = False
exceptionOccured = False
exceptionMessage = ""
try:
while retriesCount >= 0:
portableips = PublicIPAddress.list(apiclient, id=ipaddressid)
assert validateList(
portableips)[0] == PASS, "IPs list validation failed"
if str(portableips[0].state).lower() == state:
ipInDesiredState = True
break
retriesCount -= 1
time.sleep(60)
except Exception as e:
exceptionOccured = True
exceptionMessage = e
return [exceptionOccured, ipInDesiredState, e]
if not ipInDesiredState:
exceptionMessage = "Ip should be in %s state, it is in %s" %\
(state, portableips[0].state)
return [False, ipInDesiredState, exceptionMessage]
def test_vm_nic_adapter_vmxnet3(self):
"""
# 1. Register a template for VMware with nicAdapter vmxnet3
# 2. Deploy a VM using this template
# 3. Create an isolated network
# 4. Add network to VM
# 5. Verify that the NIC adapter for VM for both the nics
# is vmxnet3
"""
if self.hypervisor.lower() not in ["vmware"]:
self.skipTest("This test case is written specifically\
for Vmware hypervisor")
# Register a private template in the account with nic adapter vmxnet3
template = Template.register(
self.userapiclient,
self.testdata["configurableData"]["vmxnet3template"],
zoneid=self.zone.id,
account=self.account.name,
domainid=self.account.domainid,
details=[{"nicAdapter": self.testdata["configurableData"]["vmxnet3template"]["nicadapter"]}]
)
self.cleanup.append(template)
template.download(self.apiclient)
templates = Template.list(
self.userapiclient,
listall=True,
id=template.id,
templatefilter="self")
self.assertEqual(
validateList(templates)[0],
PASS,
"Templates list validation failed")
self.testdata["virtual_machine"]["zoneid"] = self.zone.id
self.testdata["virtual_machine"]["template"] = template.id
virtual_machine = VirtualMachine.create(
self.apiclient,
self.testdata["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id)
isolated_network = Network.create(
self.apiclient,
self.testdata["isolated_network"],
self.account.name,
self.account.domainid,
networkofferingid=self.isolated_network_offering.id)
virtual_machine.add_nic(self.apiclient, isolated_network.id)
# TODO: Add steps to check the Nic Adapter type in VCenter
# using VCenter APIs
return
请发表评论