本文整理汇总了Python中test_stub.create_vm函数的典型用法代码示例。如果您正苦于以下问题:Python create_vm函数的具体用法?Python create_vm怎么用?Python create_vm使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了create_vm函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test
def test():
global vm, vm2, image_uuid
test_util.test_dsc('create VM with setting password')
for root_password in root_password_list:
test_util.test_dsc("root_password: \"%s\"" %(root_password))
vm = test_stub.create_vm(vm_name = 'c7-vm-no-sys-tag', image_name = "imageName_i_c7_no_tag")
backup_storage_list = test_lib.lib_get_backup_storage_list_by_vm(vm.vm)
for bs in backup_storage_list:
if bs.type == inventory.IMAGE_STORE_BACKUP_STORAGE_TYPE:
break
if bs.type == inventory.SFTP_BACKUP_STORAGE_TYPE:
break
if bs.type == inventory.CEPH_BACKUP_STORAGE_TYPE:
break
else:
vm.destroy()
test_util.test_skip('Not find image store type backup storage.')
vm.check()
#add tag to vm
tag_ops.create_system_tag('VmInstanceVO', vm.get_vm().uuid, "qemuga")
vm_ops.change_vm_password(vm.get_vm().uuid, "root", root_password)
#create image by the vm with tag
vm_root_volume_inv = test_lib.lib_get_root_volume(vm.get_vm())
root_volume_uuid = vm_root_volume_inv.uuid
image_option1 = test_util.ImageOption()
image_option1.set_root_volume_uuid(root_volume_uuid)
image_option1.set_name('add_tag_vm_to_image')
image_option1.set_format('qcow2')
image_option1.set_backup_storage_uuid_list([bs.uuid])
vm.stop()
image = img_ops.create_root_volume_template(image_option1)
#create vm by new image
vm2 = test_stub.create_vm(vm_name = 'c7-vm-add-tag-from-previous-vm', image_name = "add_tag_vm_to_image")
if not test_lib.lib_check_login_in_vm(vm2.get_vm(), "root", root_password):
test_util.test_fail("create vm with user:%s password: %s failed", "root", root_password)
vm_ops.change_vm_password(vm2.get_vm().uuid, "root", root_password)
image_uuid = image.uuid
if not image_uuid:
img_ops.delete_image(image_uuid)
img_ops.expunge_image(image_uuid)
test_util.test_pass('add system tag on a no system tag image test passed')
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:54,代码来源:test_no_tag_add_tag_chg_vm_passwd_c7.py
示例2: test
def test():
from vncdotool import api
global vm1
global vm2
vm1 = test_stub.create_vm()
vm1.check()
console = test_lib.lib_get_vm_console_address(vm1.get_vm().uuid)
test_util.test_logger('[vm:] %s console is on %s:%s' % (vm1.get_vm().uuid, console.hostIp, console.port))
display = str(int(console.port)-5900)
test_lib.lib_set_vm_console_password(vm1.get_vm().uuid, password1)
test_util.test_logger('set [vm:] %s console with password %s' % (vm1.get_vm().uuid, password1))
vm1.reboot()
test_lib.lib_delete_vm_console_password(vm1.get_vm().uuid)
test_util.test_logger('delete [vm:] %s console password after reboot' % (vm1.get_vm().uuid))
vm1.reboot()
if not test_lib.lib_wait_target_up(console.hostIp, console.port, timeout=60):
test_util.test_fail('[vm:] %s console on %s:%s is not connectable' % (vm1.get_vm().uuid, console.hostIp, console.port))
try:
client = api.connect(console.hostIp+":"+display)
client.keyPress('k')
test_util.test_logger('[vm:] %s console on %s:%s is connectable without password' % (vm1.get_vm().uuid, console.hostIp, console.port))
except:
test_util.test_fail('[vm:] %s console on %s:%s is not connectable without password' % (vm1.get_vm().uuid, console.hostIp, console.port))
vm2 = test_stub.create_vm()
vm2.check()
console = test_lib.lib_get_vm_console_address(vm2.get_vm().uuid)
test_util.test_logger('[vm:] %s console is on %s:%s' % (vm2.get_vm().uuid, console.hostIp, console.port))
display = str(int(console.port)-5900)
test_lib.lib_set_vm_console_password(vm2.get_vm().uuid, password1)
test_util.test_logger('set [vm:] %s console with password %s' % (vm2.get_vm().uuid, password1))
test_lib.lib_delete_vm_console_password(vm2.get_vm().uuid)
test_util.test_logger('delete [vm:] %s console password without reboot' % (vm2.get_vm().uuid))
vm1.reboot()
if not test_lib.lib_wait_target_up(console.hostIp, console.port, timeout=60):
test_util.test_fail('[vm:] %s console on %s:%s is not connectable' % (vm2.get_vm().uuid, console.hostIp, console.port))
try:
client = api.connect(console.hostIp+":"+display)
client.keyPress('k')
test_util.test_logger('[vm:] %s console on %s:%s is connectable without password' % (vm2.get_vm().uuid, console.hostIp, console.port))
except:
test_util.test_fail('[vm:] %s console on %s:%s is not connectable without password' % (vm2.get_vm().uuid, console.hostIp, console.port))
vm1.destroy()
vm2.destroy()
test_util.test_pass('Delete VM Console Password Test Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:53,代码来源:test_delete_vm_console_password.py
示例3: test
def test():
'''
Test Description:
Test add volume with negative test.
Resource required:
2 test VMs with additional 1*10M data volume.
'''
test_util.test_dsc('Create test vm and check')
vm = test_stub.create_vm()
test_obj_dict.add_vm(vm)
vm.check()
test_util.test_dsc('Create volume and check')
volume = test_stub.create_volume()
test_obj_dict.add_volume(volume)
volume.check()
test_util.test_dsc('Attach volume and check')
volume.attach(vm)
volume.check()
test_util.test_dsc('Doing negative test. Try to reattach same volume to vm again.')
try:
volume.attach(vm)
except:
test_util.test_logger('Catch expected exception. [volume:] %s can not be attached to [vm:] %s twice.' % (volume.volume.uuid, vm.vm.uuid))
test_util.test_dsc('Doing negative test. Try to delete an attached data volume.')
vm2 = test_stub.create_vm()
test_obj_dict.add_vm(vm2)
test_util.test_dsc('Doing negative test. Try to attach an attached volume to 2nd vm.')
try:
volume.attach(vm2)
except:
test_util.test_logger('Catch expected exception. [volume:] %s can not be attached to another [vm:] %s, as it is already attached to [vm:] %s.' % (volume.volume.uuid, vm2.vm.uuid, vm.vm.uuid))
volume.check()
try:
volume.delete()
except:
test_util.test_fail('Catch wrong logic: [volume:] %s can not be deleted, when it is assigned to [vm:] %s' % (volume.volume.uuid, vm.vm.uuid))
test_obj_dict.rm_volume(volume)
vm.destroy()
test_obj_dict.rm_vm(vm)
vm2.destroy()
test_obj_dict.rm_vm(vm2)
volume.check()
test_util.test_pass('Data Volume Negative Test Success')
return True
test_util.test_fail('Catch wrong logic: [volume:] %s is attached to [vm:] %s again, although it is already attached to [vm:] %s .' % (volume.volume.uuid, vm2.vm.uuid, vm.vm.uuid))
test_util.test_fail('Catch wrong logic: [volume:] %s is attached to [vm:] %s twice.' % (volume.volume.uuid, vm.vm.uuid))
开发者ID:KevinDavidMitnick,项目名称:zstack-woodpecker,代码行数:51,代码来源:test_add_volume_negative.py
示例4: test
def test():
global vm
vm = test_stub.create_vm()
vm.check()
vm.suspend()
vm.destroy()
vm.check()
vm = test_stub.create_vm()
vm.check()
vm.suspend()
vm.stop()
vm.check()
vm.destroy()
vm.check()
test_util.test_pass('Suspend VM Test Success')
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:15,代码来源:test_suspended_vm_ops.py
示例5: test
def test():
global vm
test_util.test_dsc('create VM with setting password')
for root_password in root_password_list:
test_util.test_dsc("root_password: \"%s\"" %(root_password))
#vm = test_stub.create_vm(vm_name = 'c7-vm-no-sys-tag', image_name = "imageName_i_c7_no_tag", root_password=root_password)
vm = test_stub.create_vm(vm_name = 'c7-vm-no-sys-tag', image_name = "imageName_i_c7_no_tag")
backup_storage_list = test_lib.lib_get_backup_storage_list_by_vm(vm.vm)
for bs in backup_storage_list:
if bs.type == inventory.IMAGE_STORE_BACKUP_STORAGE_TYPE:
break
if bs.type == inventory.SFTP_BACKUP_STORAGE_TYPE:
break
if bs.type == inventory.CEPH_BACKUP_STORAGE_TYPE:
break
else:
vm.destroy()
test_util.test_skip('Not find image store type backup storage.')
#if not test_lib.lib_check_login_in_vm(vm.get_vm(), "root", root_password):
# test_util.test_fail("create vm with root password: %s failed", root_password)
# stop vm && change vm password
#vm.stop()
vm.check()
try:
vm_ops.change_vm_password(vm.get_vm().uuid, "root", root_password)
except Exception, e:
if "CreateSystemTag" in str(e):
test_util.test_pass("negative test of change a no system tag image passed.")
else:
test_util.test_fail("negative test failed with not expected log: %s", str(e))
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:34,代码来源:test_no_tag_crt_chg_vm_passwd_c7.py
示例6: test
def test():
global vm
# Create VM
vm = test_stub.create_vm()
vm.check()
# Create Virtual BMC
test_stub.create_vbmc(vm = vm, port = 6230)
# Create Chassis
chassis = os.environ.get('ipminame')
test_stub.create_chassis(chassis_name = chassis)
test_stub.hack_ks(port = 6230)
chassis_uuid = test_lib.lib_get_chassis_by_name(chassis).uuid
bare_operations.power_off_baremetal(chassis_uuid)
status = bare_operations.get_power_status(chassis_uuid)
if status.status == "Chassis Power is off":
bare_operations.power_reset_baremetal(chassis_uuid)
status = bare_operations.get_power_status(chassis_uuid)
if status.status != "Chassis Power is on":
test_util.test_fail('Failed to power reset chassis')
else:
test_util.test_fail('Failed to power off chassis')
test_stub.delete_vbmc(vm = vm)
bare_operations.delete_chassis(chassis_uuid)
vm.destroy()
test_util.test_pass('Test Power Reset Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:27,代码来源:test_power_reset.py
示例7: test
def test():
global vm
# Create VM
vm = test_stub.create_vm()
vm.check()
# Create Virtual BMC
test_stub.create_vbmc(vm = vm, port = 6230)
# Create Chassis
chassis = os.environ.get('ipminame')
test_stub.create_chassis(chassis_name = chassis)
# Update Chassis
chassis_uuid = test_lib.lib_get_chassis_by_name(chassis).uuid
ipmiaddress = os.environ.get('ipmiaddress')
ipmiuser = os.environ.get('ipmiusername')
ipmipasswd = os.environ.get('ipmipassword')
test_stub.delete_vbmc(vm = vm)
test_stub.create_vbmc(vm = vm, port = 6231)
bare_operations.update_chassis(chassis_uuid=chassis_uuid, address=ipmiaddress, username=ipmiuser, password=ipmipasswd, port=6231)
test_stub.hack_ks(port = 6231)
# First time Provision
bare_operations.provision_baremetal(chassis_uuid)
hwinfo = test_stub.check_hwinfo(chassis_uuid)
if not hwinfo:
test_util.test_fail('Fail to get hardware info during the first provision')
new_port = test_lib.lib_get_chassis_by_name(chassis).ipmiPort
if new_port != "6231":
test_util.test_fail("Update Chassis's Port failed: port=%s" % new_port)
test_stub.delete_vbmc(vm = vm)
bare_operations.delete_chassis(chassis_uuid)
vm.destroy()
test_util.test_pass('Update Chassis Test Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:31,代码来源:update_chassis.py
示例8: test
def test():
global vm
import signal
def handler(signum, frame):
raise Exception()
signal.signal(signal.SIGALRM, handler)
signal.alarm(30)
boot_option_picture = os.environ.get('bootOptionPicture')
vm = test_stub.create_vm()
console = test_lib.lib_get_vm_console_address(vm.get_vm().uuid)
test_util.test_logger('[vm:] %s console is on %s:%s' % (vm.get_vm().uuid, console.hostIp, console.port))
display = str(int(console.port)-5900)
client = api.connect(console.hostIp+":"+display)
time.sleep(2)
client.keyPress('esc')
#client.captureRegion('/root/boot.png',0,100,600,600)
client.expectRegion(boot_option_picture,0,100)
test_util.test_logger('[vm:] %s support boot option' % (vm.get_vm().uuid))
# except:
# test_util.test_fail('[vm:] %s is expected to support boot option' % (vm.get_vm().uuid))
vm.destroy()
test_util.test_pass('Support VM Boot Option Test Success')
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:27,代码来源:test_support_vm_boot_option.py
示例9: test
def test():
global vm
test_util.test_dsc('create VM with setting password')
for root_password in root_password_list:
test_util.test_dsc("root_password: \"%s\"" %(root_password))
vm = test_stub.create_vm(vm_name = 'u13-vm', image_name = "imageName_i_u13", root_password=root_password)
backup_storage_list = test_lib.lib_get_backup_storage_list_by_vm(vm.vm)
for bs in backup_storage_list:
if bs.type == inventory.IMAGE_STORE_BACKUP_STORAGE_TYPE:
break
if bs.type == inventory.SFTP_BACKUP_STORAGE_TYPE:
break
if bs.type == inventory.CEPH_BACKUP_STORAGE_TYPE:
break
else:
vm.destroy()
test_util.test_skip('Not find image store type backup storage.')
if not test_lib.lib_check_login_in_vm(vm.get_vm(), "root", root_password):
test_util.test_fail("create vm with root password: %s failed", root_password)
vm.destroy()
vm.check()
vm.expunge()
vm.check()
test_util.test_pass('Set password when VM is creating is successful.')
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:29,代码来源:test_crt_vm_passwd_u13.py
示例10: test
def test():
global vm
pxe_uuid = test_lib.lib_get_pxe_by_name(os.environ.get('pxename')).uuid
# Create VM
vm = test_stub.create_vm()
vm.check()
# Stop PXE
bare_operations.stop_pxe(pxe_uuid)
if test_lib.lib_get_pxe_by_name(os.environ.get('pxename')).status != "Stopped":
test_util.test_fail('Fail to stop PXE')
# Start PXE
bare_operations.start_pxe(pxe_uuid)
if test_lib.lib_get_pxe_by_name(os.environ.get('pxename')).status != "Running":
test_util.test_fail('Fail to start PXE')
# Create Virtual BMC
test_stub.create_vbmc(vm = vm, port = 6230)
# Create Chassis
chassis = os.environ.get('ipminame')
test_stub.create_chassis(chassis_name = chassis)
test_stub.hack_ks(port = 6230)
chassis_uuid = test_lib.lib_get_chassis_by_name(chassis).uuid
# First time Provision
bare_operations.provision_baremetal(chassis_uuid)
hwinfo = test_stub.check_hwinfo(chassis_uuid)
if not hwinfo:
test_util.test_fail('Fail to get hardware info during the first provision')
test_stub.delete_vbmc(vm = vm)
bare_operations.delete_chassis(chassis_uuid)
vm.destroy()
test_util.test_pass('Start/Stop PXE Test Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:30,代码来源:test_stop_pxe.py
示例11: test
def test():
global vm
pxe_uuid = test_lib.lib_get_pxe_by_name(os.environ.get('pxename')).uuid
# Create VM
vm = test_stub.create_vm()
vm.check()
# Create Virtual BMC
test_stub.create_vbmc(vm=vm, port=6230)
# Create Chassis
chassis = os.environ.get('ipminame')
test_stub.create_chassis(chassis_name=chassis)
test_stub.hack_ks(port=6230)
chassis_uuid = test_lib.lib_get_chassis_by_name(chassis).uuid
# First time Provision
bare_operations.provision_baremetal(chassis_uuid)
bare_operations.stop_pxe(pxe_uuid)
if not test_stub.verify_chassis_status(chassis_uuid, "PxeBootFailed"):
test_util.test_fail(
'Chassis failed to get PxeBootFailed after the first provision')
bare_operations.start_pxe(pxe_uuid)
if test_lib.lib_get_pxe_by_name(os.environ.get('pxename')).status != "Running":
test_util.test_fail('Fail to start PXE')
test_stub.delete_vbmc(vm=vm)
bare_operations.delete_chassis(chassis_uuid)
vm.destroy()
test_util.test_pass('Create chassis Test Success')
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:26,代码来源:test_fail_to_pxe.py
示例12: test
def test():
global test_obj_dict
test_util.test_dsc('Create test vm and check')
vm = test_stub.create_vm()
test_obj_dict.add_vm(vm)
vm.check()
test_util.test_dsc('Create volume and check')
volume = test_stub.create_volume()
test_obj_dict.add_volume(volume)
volume.check()
volume.attach(vm)
volume.check()
test_util.test_dsc('Reboot vm and check volume again.')
vm.reboot()
vm.check()
volume.check()
volume.detach()
volume.check()
volume.delete()
volume.check()
test_obj_dict.rm_volume(volume)
vm.destroy()
test_util.test_pass('Create Data Volume for VM Test Success')
开发者ID:chancelq,项目名称:zstack-woodpecker,代码行数:28,代码来源:test_add_volume_reboot_vm.py
示例13: test
def test():
global test_obj_dict
test_util.test_dsc("Create test vm and check")
vm = test_stub.create_vm()
test_obj_dict.add_vm(vm)
vm.check()
test_util.test_dsc("Create volume and check")
volume = test_stub.create_volume()
test_obj_dict.add_volume(volume)
volume.check()
test_util.test_dsc("Attach volume and check")
volume.attach(vm)
volume.check()
test_util.test_dsc("Detach volume and check")
volume.detach()
volume.check()
test_util.test_dsc("Delete volume and check")
volume.delete()
volume.check()
test_obj_dict.rm_volume(volume)
vm.destroy()
vm.check()
test_util.test_pass("Create Data Volume for VM Test Success")
开发者ID:lstfyt,项目名称:zstack-woodpecker,代码行数:30,代码来源:test_add_volume.py
示例14: test
def test():
global vm
global trigger
global media
global trigger_action
vm = test_stub.create_vm()
vm.check()
vm_ip = vm.get_vm().vmNics[0].ip
vm_uuid = vm.get_vm().uuid
vm_username = os.environ.get('Vm_Username')
vm_password = os.environ.get('Vm_Password')
vm_port = os.environ.get('Vm_Sshport')
test_item = "host.network.io"
resource_type = "HostVO"
vm_monitor_item = test_stub.get_monitor_item(resource_type)
if test_item not in vm_monitor_item:
test_util.test_fail('%s is not available for monitor' % test_item)
hosts = res_ops.get_resource(res_ops.HOST)
host = hosts[0]
#duration = 60
duration = 30
#expression = "host.network.io{direction=\"tx\"} > 2000"
expression = "host.network.io{direction=\"tx\"} > 100"
monitor_trigger = mon_ops.create_monitor_trigger(host.uuid, duration, expression)
send_email = test_stub.create_email_media()
media = send_email.uuid
trigger_action_name = "trigger_"+ ''.join(map(lambda xx:(hex(ord(xx))[2:]),os.urandom(8)))
trigger = monitor_trigger.uuid
receive_email = os.environ.get('receive_email')
monitor_trigger_action = mon_ops.create_email_monitor_trigger_action(trigger_action_name, send_email.uuid, trigger.split(), receive_email)
trigger_action = monitor_trigger_action.uuid
host.password = os.environ.get('hostPassword')
ssh_cmd = test_stub.ssh_cmd_line(host.managementIp, host.username, host.password, port=int(host.sshPort))
t = threading.Thread(target=test_stub.run_network_tx,args=(ssh_cmd,vm_ip,))
t.start()
time.sleep(50)
test_stub.kill(ssh_cmd)
status_problem, status_ok = test_stub.query_trigger_in_loop(trigger,50)
test_util.action_logger('Trigger old status: %s triggered. Trigger new status: %s recovered' % (status_problem, status_ok ))
if status_problem != 1 or status_ok != 1:
test_util.test_fail('%s Monitor Test failed, expected Problem or OK status not triggered' % test_item)
mail_list = test_stub.receive_email()
keywords = "fired"
mail_flag = test_stub.check_email(mail_list, keywords, trigger, host.uuid)
if mail_flag == 0:
test_util.test_fail('Failed to Get Target: %s for: %s Trigger Mail' % (host.uuid, test_item))
mon_ops.delete_monitor_trigger_action(trigger_action)
mon_ops.delete_monitor_trigger(trigger)
mon_ops.delete_email_media(media)
vm.destroy()
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:59,代码来源:alert_host_network_tx.py
示例15: create_vm_wrapper
def create_vm_wrapper(vm_name, image_name, root_password):
global vms
vm = test_stub.create_vm(vm_name=vm_name, image_name=image_name, root_password=root_password)
if not vm:
test_util.test_fail("failed to create vm")
else:
vms.append(vm)
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:8,代码来源:test_bath_crt_vm_passwd_c7.py
示例16: test
def test():
global vm, exist_users
test_util.test_dsc('Change unexisted user password test')
vm = test_stub.create_vm(vm_name = 'cknewusrvmpswd-u13-64', image_name = "imageName_i_u13")
vm.check()
backup_storage_list = test_lib.lib_get_backup_storage_list_by_vm(vm.vm)
for bs in backup_storage_list:
if bs.type == inventory.IMAGE_STORE_BACKUP_STORAGE_TYPE:
break
if bs.type == inventory.SFTP_BACKUP_STORAGE_TYPE:
break
if bs.type == inventory.CEPH_BACKUP_STORAGE_TYPE:
break
else:
vm.destroy()
test_util.test_skip('Not find image store type backup storage.')
for (usr,passwd) in zip(users, passwds):
if usr not in exist_users:
test_util.test_logger("un-existed user:%s change vm password" %(usr))
#if the user is not existed, it should report
#try:
# vm_ops.change_vm_password(vm.get_vm().uuid, usr, passwd, skip_stopped_vm = None, session_uuid = None)
#except Exception,e:
# test_util.test_logger("unexisted user change vm password exception is %s" %(str(e)))
# normal_failed_string = "not exist"
# if normal_failed_string in str(e):
# test_util.test_logger("unexisted user return correct, create a the user for it.")
#else:
# test_util.test_fail("user not exist in this OS, it should not raise exception, but return a failure.")
test_stub.create_user_in_vm(vm.get_vm(), usr, passwd)
exist_users.append(usr)
#When vm is running:
vm_ops.change_vm_password(vm.get_vm().uuid, usr, passwd, skip_stopped_vm = None, session_uuid = None)
if not test_lib.lib_check_login_in_vm(vm.get_vm(), usr, passwd):
test_util.test_fail("create vm with user:%s password: %s failed", usr, passwd)
#When vm is stopped:
#vm.stop()
vm_ops.change_vm_password(vm.get_vm().uuid, "root", test_stub.original_root_password)
#vm.start()
vm.check()
vm.destroy()
vm.check()
vm.expunge()
vm.check()
test_util.test_pass('Set password when VM is creating is successful.')
开发者ID:mrwangxc,项目名称:zstack-woodpecker,代码行数:58,代码来源:test_chg_unexist_usr_passwd_u13.py
示例17: test
def test():
global vm
vm = test_stub.create_vm()
vm.check()
vm.stop()
vm.check()
vm.destroy()
vm.check()
test_util.test_pass("Stop VM Test Success")
开发者ID:lstfyt,项目名称:zstack-woodpecker,代码行数:9,代码来源:test_stop_vm.py
示例18: test
def test():
global vm
vm = test_stub.create_vm()
vm.check()
vm.reboot()
vm.check()
vm.destroy()
vm.check()
test_util.test_pass('Reboot VM Test Success')
开发者ID:KevinDavidMitnick,项目名称:zstack-woodpecker,代码行数:9,代码来源:test_reboot_vm.py
示例19: test
def test():
global vm
global trigger
global media
global trigger_action
vm = test_stub.create_vm()
vm.check()
vm_ip = vm.get_vm().vmNics[0].ip
vm_uuid = vm.get_vm().uuid
vm_username = os.environ.get('Vm_Username')
vm_password = os.environ.get('Vm_Password')
vm_port = os.environ.get('Vm_Sshport')
test_item = "vm.disk.io"
resource_type = "VmInstanceVO"
vm_monitor_item = test_stub.get_monitor_item(resource_type)
if test_item not in vm_monitor_item:
test_util.test_fail('%s is not available for monitor' % test_item)
#duration = 60
duration = 20
#expression = "vm.disk.io{type=\"iops\", direction=\"write\"} > 100.0"
expression = "vm.disk.io{type=\"iops\", direction=\"write\"} > 10.0"
monitor_trigger = mon_ops.create_monitor_trigger(vm_uuid, duration, expression)
send_email = test_stub.create_email_media()
media = send_email.uuid
trigger_action_name = "trigger"+ ''.join(map(lambda xx:(hex(ord(xx))[2:]),os.urandom(8)))
trigger = monitor_trigger.uuid
receive_email = os.environ.get('receive_email')
monitor_trigger_action = mon_ops.create_email_monitor_trigger_action(trigger_action_name, send_email.uuid, trigger.split(), receive_email)
trigger_action = monitor_trigger_action.uuid
ssh_cmd = test_stub.ssh_cmd_line(vm_ip, vm_username, vm_password, vm_port)
rw = 'write'
t = threading.Thread(target=test_stub.run_disk_load1,args=(ssh_cmd, rw,))
t.start()
time.sleep(90)
test_stub.kill(ssh_cmd)
status_problem, status_ok = test_stub.query_trigger_in_loop(trigger,50)
test_util.action_logger('Trigger old status: %s triggered. Trigger new status: %s recovered' % (status_problem, status_ok ))
if status_problem != 1 or status_ok != 1:
test_util.test_fail('%s Monitor Test failed, expected Problem or OK status not triggered' % test_item)
mail_list = test_stub.receive_email()
keywords = "fired"
mail_flag = test_stub.check_email(mail_list, keywords, trigger, vm_uuid)
if mail_flag == 0:
test_util.test_fail('Failed to Get Target: %s for: %s Trigger Mail' % (vm_uuid, test_item))
mon_ops.delete_monitor_trigger_action(trigger_action)
mon_ops.delete_monitor_trigger(trigger)
mon_ops.delete_email_media(media)
vm.destroy()
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:56,代码来源:alert_vm_disk_write_iops.py
示例20: test
def test():
global vm
vm = test_stub.create_vm()
vm.check()
cmd = 'grep "^host_uuid" /etc/libvirt/libvirtd.conf'
host = test_lib.lib_find_host_by_vm(vm.get_vm())
if not test_lib.lib_execute_ssh_cmd(host, "root", "password", cmd):
test_util.test_fail("Check host_uuid in libvirtd.conf failed")
test_util.test_pass('Check host_uuid in libvirtd Success')
vm.destroy()
开发者ID:zstackorg,项目名称:zstack-woodpecker,代码行数:10,代码来源:test_check_libvirtd_conf.py
注:本文中的test_stub.create_vm函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论