本文整理汇总了Python中marvin.lib.common.list_volumes函数的典型用法代码示例。如果您正苦于以下问题:Python list_volumes函数的具体用法?Python list_volumes怎么用?Python list_volumes使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了list_volumes函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_04_snapshot_limit
def test_04_snapshot_limit(self):
"""Test snapshot limit in snapshot policies
"""
# Validate the following
# 1. Perform hourly recurring snapshot on the root disk of VM and keep
# the maxsnapshots as 1
# 2. listSnapshots should list the snapshot that was created
# snapshot folder in secondary storage should contain only one
# snapshot image(/secondary/snapshots/$accountid/$volumeid/)
# Get the Root disk of VM
volumes = list_volumes(self.apiclient, virtualmachineid=self.virtual_machine.id, type="ROOT", listall=True)
self.assertEqual(isinstance(volumes, list), True, "Check list response returns a valid list")
volume = volumes[0]
# Create a snapshot policy
recurring_snapshot = SnapshotPolicy.create(self.apiclient, volume.id, self.services["recurring_snapshot"])
self.cleanup.append(recurring_snapshot)
snapshot_policy = list_snapshot_policy(self.apiclient, id=recurring_snapshot.id, volumeid=volume.id)
self.assertEqual(isinstance(snapshot_policy, list), True, "Check list response returns a valid list")
self.assertNotEqual(snapshot_policy, None, "Check if result exists in list item call")
self.assertEqual(
snapshot_policy[0].id, recurring_snapshot.id, "Check recurring snapshot id in list resources call"
)
self.assertEqual(
snapshot_policy[0].maxsnaps,
self.services["recurring_snapshot"]["maxsnaps"],
"Check interval type in list resources call",
)
# Sleep for (maxsnaps+1) hours to verify
# only maxsnaps snapshots are retained
time.sleep((int(self.services["recurring_snapshot"]["maxsnaps"]) + 1) * 3600)
# Verify the snapshot was created or not
snapshots = list_snapshots(
self.apiclient,
volumeid=volume.id,
intervaltype=self.services["recurring_snapshot"]["intervaltype"],
snapshottype="RECURRING",
listall=True,
)
self.assertEqual(isinstance(snapshots, list), True, "Check list response returns a valid list")
self.assertEqual(
len(snapshots),
self.services["recurring_snapshot"]["maxsnaps"],
"Check maximum number of recurring snapshots retained",
)
snapshot = snapshots[0]
# Sleep to ensure that snapshot is reflected in sec storage
time.sleep(self.services["sleep"])
self.assertTrue(is_snapshot_on_nfs(self.apiclient, self.dbclient, self.config, self.zone.id, snapshot.id))
return
开发者ID:MissionCriticalCloudOldRepos,项目名称:cosmic-core,代码行数:56,代码来源:test_snapshot_limits.py
示例2: _check_and_get_cs_volume
def _check_and_get_cs_volume(self, volume_id, volume_name):
list_volumes_response = list_volumes(self.apiClient, id=volume_id)
sf_util.check_list(list_volumes_response, 1, self, TestVolumes._should_only_be_one_volume_in_list_err_msg)
cs_volume = list_volumes_response[0]
self._check_volume(cs_volume, volume_name)
return cs_volume
开发者ID:nvazquez,项目名称:cloudstack,代码行数:10,代码来源:TestVolumes.py
示例3: test_01_check_revert_snapshot
def test_01_check_revert_snapshot(self):
""" Test revert snapshot on XenServer
# 1. Deploy a VM.
# 2. Take VM snapshot.
# 3. Verify that volume snapshot fails with error
can not create volume snapshot for VM with VM-snapshot
"""
# Step 1
vm = VirtualMachine.create(
self.userapiclient,
self.testdata["small"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
zoneid=self.zone.id,
)
volumes_cluster_list = list_volumes(
self.apiclient,
virtualmachineid=vm.id,
type='ROOT',
listall=True
)
volume_list_validation = validateList(volumes_cluster_list)
self.assertEqual(
volume_list_validation[0],
PASS,
"Event list validation failed due to %s" %
volume_list_validation[2]
)
root_volume = volumes_cluster_list[0]
#Step 2
vm_snap = VmSnapshot.create(self.apiclient,
vm.id)
self.assertEqual(
vm_snap.state,
"Ready",
"Check the snapshot of vm is ready!"
)
#Step 3
with self.assertRaises(Exception):
Snapshot.create(
self.apiclient,
root_volume.id)
return
开发者ID:Accelerite,项目名称:cloudstack,代码行数:55,代码来源:testpath_revert_snap.py
示例4: test_05_use_private_template_in_project
def test_05_use_private_template_in_project(self):
"""Test use of private template in a project
"""
# 1. Create a project
# 2. Verify that in order to use somebody's Private template for vm
# creation in the project, permission to use the template has to
# be granted to the Project (use API 'updateTemplatePermissions'
# with project id to achieve that).
try:
self.debug("Deploying VM for with public template: %s" % self.template.id)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id,
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(virtual_machine_1.state, "Running", "Check VM state is Running or not")
virtual_machine_1.stop(self.apiclient)
# Get the Root disk of VM
volumes = list_volumes(self.apiclient, projectid=self.project.id, type="ROOT", listall=True)
self.assertEqual(isinstance(volumes, list), True, "Check for list volume response return valid data")
volume = volumes[0]
self.debug("Creating template from volume: %s" % volume.id)
# Create a template from the ROOTDISK
template_1 = Template.create(self.userapiclient, self.services["template"], volumeid=volume.id)
self.cleanup.append(template_1)
# Verify Template state
self.assertEqual(template_1.isready, True, "Check Template is in ready state or not")
# Update template permissions to grant permission to project
self.debug(
"Updating template permissions:%s to grant access to project: %s" % (template_1.id, self.project.id)
)
template_1.updatePermissions(self.apiclient, op="add", projectids=self.project.id)
self.debug("Deploying VM for with privileged template: %s" % self.template.id)
virtual_machine_2 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=template_1.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id,
)
self.cleanup.append(virtual_machine_2)
# Verify VM state
self.assertEqual(virtual_machine_2.state, "Running", "Check VM state is Running or not")
except Exception as e:
self.fail("Exception occured: %s" % e)
return
开发者ID:maksimov,项目名称:cloudstack,代码行数:55,代码来源:test_project_resources.py
示例5: _check_volume_state
def _check_volume_state(api_client, volume_id, volume_state):
volume = list_volumes(
api_client,
id=volume_id,
listall=True
)[0]
if str(volume.state).lower() == volume_state.lower():
return True, ""
return False, "The volume is not in the '" + volume_state + "' state. State = " + str(volume.state)
开发者ID:PCextreme,项目名称:cloudstack,代码行数:11,代码来源:TestUploadDownload.py
示例6: test_01_storage_migrate_root_and_data_disks
def test_01_storage_migrate_root_and_data_disks(self):
src_host, dest_host = self._get_source_and_dest_hosts()
virtual_machine = VirtualMachine.create(
self.apiClient,
self.testdata[TestData.virtualMachine],
accountid=self.account.name,
zoneid=self.zone.id,
serviceofferingid=self.compute_offering_1.id,
templateid=self.template.id,
domainid=self.domain.id,
hostid=src_host.id,
startvm=True
)
self.cleanup.append(virtual_machine)
cs_root_volume = list_volumes(self.apiClient, listall=True, virtualmachineid=virtual_machine.id)[0]
sf_account_id = sf_util.get_sf_account_id(self.cs_api, self.account.id, self.primary_storage.id, self,
TestVMMigrationWithStorage._sf_account_id_should_be_non_zero_int_err_msg)
sf_volumes = sf_util.get_active_sf_volumes(self.sfe, sf_account_id)
sf_root_volume = sf_util.check_and_get_sf_volume(sf_volumes, cs_root_volume.name, self)
cs_data_volume = Volume.create(
self.apiClient,
self.testdata[TestData.volume_1],
account=self.account.name,
domainid=self.domain.id,
zoneid=self.zone.id,
diskofferingid=self.disk_offering_1.id
)
self.cleanup.append(cs_data_volume)
cs_data_volume = virtual_machine.attach_volume(
self.apiClient,
cs_data_volume
)
sf_volumes = sf_util.get_active_sf_volumes(self.sfe, sf_account_id)
sf_data_volume = sf_util.check_and_get_sf_volume(sf_volumes, cs_data_volume.name, self)
sf_root_volume, sf_data_volume = self._migrate_and_verify(virtual_machine, dest_host, cs_root_volume, cs_data_volume, sf_account_id,
sf_root_volume, sf_data_volume, self.xen_session_1, self.xen_session_2)
src_host, dest_host = dest_host, src_host
self._migrate_and_verify(virtual_machine, dest_host, cs_root_volume, cs_data_volume, sf_account_id, sf_root_volume, sf_data_volume,
self.xen_session_2, self.xen_session_1)
开发者ID:milamberspace,项目名称:cloudstack,代码行数:53,代码来源:TestVMMigrationWithStorage.py
示例7: test_02_snapshot_data_disk
def test_02_snapshot_data_disk(self):
"""Test Snapshot Data Disk
"""
if self.hypervisor.lower() in ['hyperv']:
self.skipTest("Snapshots feature is not supported on Hyper-V")
volume = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='DATADISK',
listall=True
)
self.assertEqual(
isinstance(volume, list),
True,
"Check list response returns a valid list"
)
self.debug("Creating a Snapshot from data volume: %s" % volume[0].id)
snapshot = Snapshot.create(
self.apiclient,
volume[0].id,
account=self.account.name,
domainid=self.account.domainid
)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.assertTrue(
is_snapshot_on_nfs(
self.apiclient,
self.dbclient,
self.config,
self.zone.id,
snapshot.id))
return
开发者ID:vaddanak,项目名称:challenges,代码行数:52,代码来源:test_snapshots.py
示例8: check_and_get_cs_volume
def check_and_get_cs_volume(obj_test, volume_id, volume_name, obj_assert):
list_volumes_response = list_volumes(
obj_test.apiClient,
id=volume_id
)
check_list(list_volumes_response, 1, obj_assert, "There should only be one volume in this list.")
cs_volume = list_volumes_response[0]
check_volume(obj_test, cs_volume, volume_name, obj_assert)
return cs_volume
开发者ID:PCextreme,项目名称:cloudstack,代码行数:13,代码来源:sf_util.py
示例9: test_05_snapshots_per_project
def test_05_snapshots_per_project(self):
"""Test Snapshot limit per project
"""
# Validate the following
# 1. set max no of snapshots per project to 1.
# 2. Create one snapshot in the project. Snapshot should be
# successfully created
# 5. Try to create another snapshot in this project. It should give
# user an appropriate error and an alert should be generated.
if self.hypervisor.lower() in ["hyperv"]:
raise self.skipTest("Snapshots feature is not supported on Hyper-V")
self.debug("Updating snapshot resource limits for project: %s" % self.project.id)
# Set usage_vm=1 for Account 1
update_resource_limit(self.apiclient, 3, max=1, projectid=self.project.id) # Snapshot
self.debug("Deploying VM for account: %s" % self.account.name)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id,
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(virtual_machine_1.state, "Running", "Check VM state is Running or not")
# Get the Root disk of VM
volumes = list_volumes(
self.apiclient, virtualmachineid=virtual_machine_1.id, projectid=self.project.id, type="ROOT"
)
self.assertEqual(isinstance(volumes, list), True, "Check for list volume response return valid data")
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a snapshot from the ROOTDISK
snapshot_1 = Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
self.cleanup.append(snapshot_1)
# list snapshots
snapshots = list_snapshots(self.apiclient, projectid=self.project.id)
self.debug("snapshots list: %s" % snapshots)
self.assertEqual(validateList(snapshots)[0], PASS, "Snapshots list validation failed")
self.assertEqual(len(snapshots), 1, "Snapshots list should have exactly one entity")
# Exception should be raised for second snapshot
with self.assertRaises(Exception):
Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
return
开发者ID:vaddanak,项目名称:challenges,代码行数:51,代码来源:test_project_limits.py
示例10: test_01_snapshot_data_disk
def test_01_snapshot_data_disk(self):
"""Test Snapshot Data Disk
"""
volume = list_volumes(
self.apiclient,
virtualmachineid=self.virtual_machine_with_disk.id,
type='DATADISK',
listall=True
)
self.assertEqual(
isinstance(volume, list),
True,
"Check list response returns a valid list"
)
self.debug("Creating a Snapshot from data volume: %s" % volume[0].id)
snapshot = Snapshot.create(
self.apiclient,
volume[0].id,
account=self.account.name,
domainid=self.account.domainid,
asyncbackup=True
)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list item call"
)
self.assertEqual(
snapshots[0].id,
snapshot.id,
"Check resource id in list resources call"
)
self.assertEqual(
snapshot.state,
"BackingUp",
"Check resource state in list resources call"
)
return
开发者ID:Accelerite,项目名称:cloudstack,代码行数:49,代码来源:test_separate_backup_from_snapshot.py
示例11: _validate_storage
def _validate_storage(self, storage, vm):
list_volumes_response = list_volumes(
self.apiClient, virtualmachineid=vm.id, listall=True)
self.assertNotEqual(
list_volumes_response, None,
"'list_volumes_response' should not be equal to 'None'.")
for volume in list_volumes_response:
if volume.type.upper() == "ROOT":
volumeData = volume
self.assertEqual(volume.storage, storage.name,
"Volume not created for VM " + str(vm.id))
#Verify in cloudstack
storage_pools_response = list_storage_pools(
self.apiClient, id=storage.id)
self.assertEqual(
int(volumeData.size),
int(storage_pools_response[0].disksizeused),
"Allocated disk sizes are not same in volumes and primary stoarge")
#Verify in datera
datera_primarystorage_name = "cloudstack-" + storage.id
for instance in self.datera_api.app_instances.list():
if instance['name'] == datera_primarystorage_name:
app_instance_response = instance
app_instance_response_disk = (
app_instance_response['storage_instances']
['storage-1']['volumes']['volume-1']
['capacity_in_use'] * 1073741824)
self.assertEqual(
int(volumeData.size),
int(app_instance_response_disk),
"App instance usage size is incorrect")
#Verify in xen server
for key, value in self.xen_session.xenapi.SR.get_all_records().items():
if value['name_description'] == storage.id:
xen_server_response = value
self.assertEqual(
int(xen_server_response['physical_utilisation']),
int(volumeData.size),
"Allocated disk sizes is incorrect in xenserver")
开发者ID:Datera,项目名称:cloudstack-driver,代码行数:45,代码来源:test_volumes.py
示例12: test_06_create_snapshots_in_project
def test_06_create_snapshots_in_project(self):
"""Test create snapshots in project
"""
# Validate the following
# 1. Create a project
# 2. Add some snapshots to the project
# 3. Verify snapshot created inside project can only be used in inside
# the project
self.debug("Deploying VM for Project: %s" % self.project.id)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id,
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(virtual_machine_1.state, "Running", "Check VM state is Running or not")
# Get the Root disk of VM
volumes = list_volumes(self.apiclient, projectid=self.project.id, type="ROOT", listall=True)
self.assertEqual(isinstance(volumes, list), True, "Check for list volume response return valid data")
self.debug("Creating snapshot from volume: %s" % volumes[0].id)
# Create a snapshot from the ROOTDISK
snapshot = Snapshot.create(self.apiclient, volumes[0].id, projectid=self.project.id)
self.cleanup.append(snapshot)
# Verify Snapshot state
self.assertEqual(
snapshot.state in ["BackedUp", "CreatedOnPrimary", "Allocated"],
True,
"Check Snapshot state is in one of the mentioned possible states, \
It is currently: %s"
% snapshot.state,
)
snapshots = Snapshot.list(self.apiclient, account=self.account.name, domainid=self.account.domainid)
self.assertEqual(snapshots, None, "Snapshots should not be available outside the project")
return
开发者ID:maksimov,项目名称:cloudstack,代码行数:41,代码来源:test_project_resources.py
示例13: test_04_public_template_use_in_project
def test_04_public_template_use_in_project(self):
"""Test Templates creation in projects
"""
# 1. Create a project
# 2. Verify Public templates can be used without any restriction
# 3. Verify that template created in project can be used in project
# without any restrictions
try:
self.debug("Deploying VM for with public template: %s" % self.template.id)
virtual_machine_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
projectid=self.project.id,
)
self.cleanup.append(virtual_machine_1)
# Verify VM state
self.assertEqual(virtual_machine_1.state, "Running", "Check VM state is Running or not")
virtual_machine_1.stop(self.apiclient)
# Get the Root disk of VM
volumes = list_volumes(self.apiclient, projectid=self.project.id, type="ROOT", listall=True)
self.assertEqual(isinstance(volumes, list), True, "Check for list volume response return valid data")
volume = volumes[0]
self.debug("Creating template from volume: %s" % volume.id)
# Create a template from the ROOTDISK
template_1 = Template.create(
self.apiclient, self.services["template"], volumeid=volume.id, projectid=self.project.id
)
self.cleanup.append(template_1)
# Verify Template state
self.assertEqual(template_1.isready, True, "Check Template is in ready state or not")
except Exception as e:
self.fail("Exception occured: %s" % e)
return
开发者ID:maksimov,项目名称:cloudstack,代码行数:38,代码来源:test_project_resources.py
示例14: create_vm
def create_vm(self,
account,
domain,
isRunning=False,
project =None,
limit =None,
pfrule =False,
lbrule =None,
natrule =None,
volume =None,
snapshot =False):
#TODO: Implemnt pfrule/lbrule/natrule
self.debug("Deploying instance in the account: %s" % account.name)
self.virtual_machine = VirtualMachine.create(self.apiclient,
self.services["virtual_machine"],
accountid=account.name,
domainid=domain.id,
serviceofferingid=self.service_offering.id,
mode=self.zone.networktype if pfrule else 'basic',
projectid=project.id if project else None)
self.debug("Deployed instance in account: %s" % account.name)
list_virtual_machines(self.apiclient,
id=self.virtual_machine.id)
if snapshot:
volumes = list_volumes(self.apiclient,
virtualmachineid=self.virtual_machine.id,
type='ROOT',
listall=True)
self.snapshot = Snapshot.create(self.apiclient,
volumes[0].id,
account=account.name,
domainid=account.domainid)
if volume:
self.virtual_machine.attach_volume(self.apiclient,
volume)
if not isRunning:
self.virtual_machine.stop(self.apiclient)
self.cleanup.append(self.virtual_machine)
开发者ID:PCextreme,项目名称:cloudstack,代码行数:38,代码来源:test_assign_vm.py
示例15: test_12_move_across_subdomain_vm_volumes
def test_12_move_across_subdomain_vm_volumes(self):
"""Test as domain admin, stop a VM from subdomain1 and attempt to move it to subdomain2
"""
# Validate the following:
# 1. deploy VM in sub subdomain1 with volumes.
# 3. assignVirtualMachine to subdomain2
userapiclient = self.testClient.getUserApiClient(UserName=self.sdomain_account_user1['account'].name,
DomainName=self.sdomain_account_user1['domain'].name,
type=2)
self.create_vm(self.sdomain_account_user1['account'], self.sdomain_account_user1['domain'],volume=self.sdomain_account_user1['volume'])
self.assertRaises(Exception, self.virtual_machine.assign_virtual_machine, userapiclient, self.sdomain_account_user2['account'].name ,self.sdomain_account_user2['domain'].id)
# Check all volumes attached to same VM
list_volume_response = list_volumes(self.apiclient,
virtualmachineid=self.virtual_machine.id,
type='DATADISK',
listall=True)
self.assertEqual(isinstance(list_volume_response, list),
True,
"Check list volumes response for valid list")
self.assertNotEqual(list_volume_response[0].domainid, self.sdomain_account_user2['domain'].id, "Volume ownership not changed.")
self.virtual_machine.detach_volume(self.apiclient,
self.sdomain_account_user1['volume'])
开发者ID:PCextreme,项目名称:cloudstack,代码行数:23,代码来源:test_assign_vm.py
示例16: _get_root_volume
def _get_root_volume(self, vm):
list_volumes_response = list_volumes(
self.apiClient,
virtualmachineid=vm.id,
listall=True
)
self.assertNotEqual(
list_volumes_response,
None,
"'list_volumes_response' should not be equal to 'None'."
)
self.assertEqual(
len(list_volumes_response) > 0,
True,
"'len(list_volumes_response)' should be greater than 0."
)
for volume in list_volumes_response:
if volume.type.upper() == "ROOT":
return volume
self.assert_(False, "Unable to locate the ROOT volume of the VM with the following ID: " + str(vm.id))
开发者ID:Accelerite,项目名称:cloudstack,代码行数:24,代码来源:TestAddRemoveHosts.py
示例17: setUpClass
def setUpClass(cls):
cls.testClient = super(TestAccountSnapshotClean, cls).getClsTestClient()
cls.api_client = cls.testClient.getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
cls.services['mode'] = cls.zone.networktype
cls.hypervisor = cls.testClient.getHypervisorInfo()
if cls.hypervisor.lower() in ['lxc']:
raise unittest.SkipTest("snapshots are not supported on %s" % cls.hypervisor.lower())
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["server"]["zoneid"] = cls.zone.id
cls.services["template"] = template.id
cls._cleanup = []
try:
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.name
if cls.zone.localstorageenabled:
cls.services["service_offering"]["storagetype"] = "local"
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["server"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id
)
# Get the Root disk of VM
volumes = list_volumes(
cls.api_client,
virtualmachineid=cls.virtual_machine.id,
type='ROOT',
listall=True
)
volume = volumes[0]
# Create a snapshot from the ROOTDISK
cls.snapshot = Snapshot.create(cls.api_client, volume.id)
except Exception, e:
cls.tearDownClass()
unittest.SkipTest("setupClass fails for %s" % cls.__name__)
raise e
开发者ID:Accelerite,项目名称:cloudstack,代码行数:64,代码来源:test_snapshot_gc.py
示例18: test_02_host_maintenance_mode_with_activities
#.........这里部分代码省略.........
network.id
))
self.debug("Creating PF rule for IP address: %s" %
public_ip.ipaddress.ipaddress)
NATRule.create(
self.apiclient,
virtual_machine,
self.services["natrule"],
ipaddressid=public_ip.ipaddress.id
)
self.debug("Creating LB rule on IP with NAT: %s" %
public_ip.ipaddress.ipaddress)
# Create Load Balancer rule on IP already having NAT rule
lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
ipaddressid=public_ip.ipaddress.id,
accountid=self.account.name
)
self.debug("Created LB rule with ID: %s" % lb_rule.id)
# Should be able to SSH VM
try:
self.debug("SSH into VM: %s" % virtual_machine.id)
virtual_machine.get_ssh_client(
ipaddress=public_ip.ipaddress.ipaddress)
except Exception as e:
self.fail("SSH Access failed for %s: %s" %
(virtual_machine.ipaddress, e)
)
# Get the Root disk of VM
volumes = list_volumes(
self.apiclient,
virtualmachineid=virtual_machine.id,
type='ROOT',
listall=True
)
volume = volumes[0]
self.debug(
"Root volume of VM(%s): %s" % (
virtual_machine.name,
volume.name
))
# Create a snapshot from the ROOTDISK
self.debug("Creating snapshot on ROOT volume: %s" % volume.name)
snapshot = Snapshot.create(self.apiclient, volumes[0].id)
self.debug("Snapshot created: ID - %s" % snapshot.id)
snapshots = list_snapshots(
self.apiclient,
id=snapshot.id,
listall=True
)
self.assertEqual(
isinstance(snapshots, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
snapshots,
None,
"Check if result exists in list snapshots call"
)
self.assertEqual(
开发者ID:CIETstudents,项目名称:cloudstack,代码行数:67,代码来源:test_high_availability.py
示例19: setUpClass
def setUpClass(cls):
testClient = super(TestConcurrentSnapshots, cls).getClsTestClient()
cls.apiclient = testClient.getApiClient()
cls.testdata = testClient.getParsedTestDataConfig()
cls.hypervisor = cls.testClient.getHypervisorInfo()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.apiclient)
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
cls.template = get_template(
cls.apiclient,
cls.zone.id,
cls.testdata["ostype"])
cls._cleanup = []
cls.vm_pool = []
cls.snapshotSupported = True
if cls.hypervisor.lower() in ["hyperv", "lxc"]:
cls.snapshotSupported = False
return
# Set sleep time as per Snapshot Recurring Policy - HOURLY
cls.sleep_time_for_hourly_policy = 60 * 60 * 1
cls.mgtSvrDetails = cls.config.__dict__["mgtSvr"][0].__dict__
try:
# Create an account
cls.account = Account.create(
cls.apiclient,
cls.testdata["account"],
domainid=cls.domain.id
)
cls._cleanup.append(cls.account)
# Create user api client of the account
cls.userapiclient = testClient.getUserApiClient(
UserName=cls.account.name,
DomainName=cls.account.domain
)
# Create Service offering
cls.service_offering = ServiceOffering.create(
cls.apiclient,
cls.testdata["service_offering"],
)
cls._cleanup.append(cls.service_offering)
for i in range(4):
cls.vm = VirtualMachine.create(
cls.apiclient,
cls.testdata["small"],
templateid=cls.template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id,
zoneid=cls.zone.id,
mode=cls.zone.networktype
)
cls.vm_pool.append(cls.vm)
cls._cleanup.append(cls.vm)
cls.checksum_pool = []
cls.root_pool = []
cls.snapshot_pool = []
cls.rec_policy_pool = []
for vm in cls.vm_pool:
root_volumes = list_volumes(
cls.apiclient,
virtualmachineid=vm.id,
type='ROOT',
listall=True
)
checksum_root = createChecksum(
cls.testdata,
vm,
root_volumes[0],
"rootdiskdevice")
cls.checksum_pool.append(checksum_root)
cls.root_pool.append(root_volumes[0])
try:
cls.pools = StoragePool.list(cls.apiclient, zoneid=cls.zone.id)
except Exception as e:
raise unittest.SkipTest(e)
except Exception as e:
cls.tearDownClass()
raise e
return
开发者ID:Accelerite,项目名称:cloudstack,代码行数:95,代码来源:testpath_volume_cuncurrent_snapshots.py
示例20: _get_updated_cs_volume
def _get_updated_cs_volume(self, cs_volume_id):
return list_volumes(self.apiClient, listall=True, id=cs_volume_id)[0]
开发者ID:milamberspace,项目名称:cloudstack,代码行数:2,代码来源:TestVMMigrationWithStorage.py
注:本文中的marvin.lib.common.list_volumes函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论