本文整理汇总了Python中vdsm.common.response.error函数的典型用法代码示例。如果您正苦于以下问题:Python error函数的具体用法?Python error怎么用?Python error使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了error函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: unregister
def unregister(uuids):
try:
uuids = [str(uuid.UUID(s)) for s in uuids]
except ValueError as e:
logging.warning("Attempt to unregister invalid uuid %s: %s" %
(uuids, e))
return response.error("secretBadRequestErr")
con = libvirtconnection.get()
try:
for sec_uuid in uuids:
logging.info("Unregistering secret %r", sec_uuid)
try:
virsecret = con.secretLookupByUUIDString(sec_uuid)
except libvirt.libvirtError as e:
if e.get_error_code() != libvirt.VIR_ERR_NO_SECRET:
raise
logging.debug("No such secret %r", sec_uuid)
else:
virsecret.undefine()
except libvirt.libvirtError as e:
logging.error("Could not unregister secrets: %s", e)
return response.error("secretUnregisterErr")
return response.success()
开发者ID:nirs,项目名称:vdsm,代码行数:25,代码来源:secret.py
示例2: test_diskreplicatefinish_transient_disk
def test_diskreplicatefinish_transient_disk():
src_drive = make_drive(src_drive_conf,
storage.DRIVE_SHARED_TYPE.TRANSIENT)
_vm = FakeVm([src_drive])
result = _vm.diskReplicateFinish(src_drive_conf, dst_drive_conf)
assert result == response.error("transientErr")
开发者ID:oVirt,项目名称:vdsm,代码行数:7,代码来源:diskreplicate_test.py
示例3: _recover
def _recover(self, message):
if not response.is_error(self.status):
self.status = response.error('migrateErr')
self.log.error(message)
if not self.hibernating and self._destServer is not None:
if self._vm.post_copy == PostCopyPhase.RUNNING:
# We can't recover a VM after a failed post-copy migration.
# And the destination takes care of the situation itself.
self._vm.handle_failed_post_copy(clean_vm=True)
return
try:
self._destServer.destroy(self._vm.id)
except Exception:
self.log.exception("Failed to destroy remote VM")
# if the guest was stopped before migration, we need to cont it
if self.hibernating:
self._vm.cont(ignoreStatus=True)
if self._enableGuestEvents:
self._vm.guestAgent.events.after_hibernation_failure()
elif self._enableGuestEvents:
self._vm.guestAgent.events.after_migration_failure()
# either way, migration has finished
if self._recovery:
self._vm.set_last_status(vmstatus.UP, vmstatus.MIGRATION_SOURCE)
self._recovery = False
else:
self._vm.lastStatus = vmstatus.UP
self._started = False
self._vm.send_status_event()
开发者ID:EdDev,项目名称:vdsm,代码行数:29,代码来源:migration.py
示例4: test_diskreplicatefinish_job_not_found
def test_diskreplicatefinish_job_not_found():
src_drive = make_drive(src_drive_conf)
_vm = FakeVm([src_drive])
# Passing an empty dict so 'cur' and 'end will not be found
_vm._dom = FakeDomain({})
result = _vm.diskReplicateFinish(src_drive_conf, dst_drive_conf)
assert result == response.error("replicaErr")
开发者ID:oVirt,项目名称:vdsm,代码行数:9,代码来源:diskreplicate_test.py
示例5: teardownImage
def teardownImage(self, domainId, poolId, imageId):
if imageId == TEARDOWN_ERROR_IMAGE_ID:
return response.error('teardownError')
imagepath = _vol_path(self._image_path_base, domainId, poolId, imageId)
resultpath = _vol_path(self._image_path_base, domainId, poolId,
imageId, ext='.res')
os.rename(imagepath, resultpath)
return response.success()
开发者ID:EdDev,项目名称:vdsm,代码行数:9,代码来源:seal_job_test.py
示例6: _regular_run
def _regular_run(self):
self.log.debug("Starting migration source thread")
self._recovery = False
self._update_outgoing_limit()
try:
startTime = time.time()
machineParams = self._setupRemoteMachineParams()
self._setupVdsConnection()
self._prepareGuest()
while not self._started:
try:
self.log.info("Migration semaphore: acquiring")
with SourceThread.ongoingMigrations:
self.log.info("Migration semaphore: acquired")
timeout = config.getint(
'vars', 'guest_lifecycle_event_reply_timeout')
if self.hibernating:
self._vm.guestAgent.events.before_hibernation(
wait_timeout=timeout)
elif self._enableGuestEvents:
self._vm.guestAgent.events.before_migration(
wait_timeout=timeout)
if self._migrationCanceledEvt.is_set():
self._raiseAbortError()
self.log.debug("migration semaphore acquired "
"after %d seconds",
time.time() - startTime)
migrationParams = {
'dst': self._dst,
'mode': self._mode,
'method': METHOD_ONLINE,
'dstparams': self._dstparams,
'dstqemu': self._dstqemu,
}
self._startUnderlyingMigration(
time.time(), migrationParams, machineParams
)
self._finishSuccessfully(machineParams)
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_OPERATION_ABORTED:
self.status = response.error(
'migCancelErr', message='Migration canceled')
raise
except MigrationLimitExceeded:
retry_timeout = config.getint('vars',
'migration_retry_timeout')
self.log.debug("Migration destination busy. Initiating "
"retry in %d seconds.", retry_timeout)
self._migrationCanceledEvt.wait(retry_timeout)
except MigrationDestinationSetupError as e:
self._recover(str(e))
# we know what happened, no need to dump hollow stack trace
except Exception as e:
self._recover(str(e))
self.log.exception("Failed to migrate")
开发者ID:nirs,项目名称:vdsm,代码行数:56,代码来源:migration.py
示例7: register
def register(secrets, clear=False):
try:
secrets = [Secret(params) for params in secrets]
except ValueError as e:
logging.warning("Attempt to register invalid secret: %s", e)
return response.error("secretBadRequestErr")
con = libvirtconnection.get()
try:
for secret in secrets:
logging.info("Registering secret %s", secret)
secret.register(con)
if clear:
uuids = frozenset(sec.uuid for sec in secrets)
for virsecret in con.listAllSecrets():
if virsecret.UUIDString() not in uuids and _is_ovirt_secret(virsecret):
virsecret.undefine()
except libvirt.libvirtError as e:
logging.error("Could not register secret %s: %s", secret, e)
return response.error("secretRegisterErr")
return response.success()
开发者ID:nirs,项目名称:vdsm,代码行数:22,代码来源:secret.py
示例8: test_blockjobabort_failed
def test_blockjobabort_failed(monkeypatch):
def raising_blockjobabort():
raise Exception('blockJobAbort failed')
src_drive = make_drive(src_drive_conf)
dst_drive = make_drive(dst_drive_conf)
_vm = FakeVm([src_drive, dst_drive])
_vm._dom = FakeDomain({'cur': 1, 'end': 1})
monkeypatch.setattr(FakeDomain, 'blockJobAbort', raising_blockjobabort)
result = _vm.diskReplicateFinish(src_drive_conf, dst_drive_conf)
assert result == response.error("changeDisk")
开发者ID:oVirt,项目名称:vdsm,代码行数:14,代码来源:diskreplicate_test.py
示例9: testSetNumberOfVcpusFailed
def testSetNumberOfVcpusFailed(self, virt_error, vdsm_error,
error_message):
def _fail(*args):
raise_libvirt_error(virt_error, error_message)
with MonkeyPatchScope([(hooks, 'before_set_num_of_cpus',
lambda: None)]):
with fake.VM() as testvm:
dom = fake.Domain()
dom.setVcpusFlags = _fail
testvm._dom = dom
res = testvm.setNumberOfCpus(4) # random value
self.assertEqual(res, response.error(vdsm_error))
开发者ID:oVirt,项目名称:vdsm,代码行数:15,代码来源:vmoperations_test.py
示例10: start
def start(self):
# are there any available methods for power-down?
if self.chain.callbacks:
# flag for successful power-down event detection
# this flag is common for both shutdown and reboot workflows
# because we want to exit the CallbackChain in case either
# of them happens
self.event.clear()
self.chain.start()
return response.success(message=self.returnMsg)
else:
# No tools, no ACPI
return response.error(
'exist',
message='VM without ACPI or active oVirt guest agent. '
'Try Forced Shutdown.')
开发者ID:EdDev,项目名称:vdsm,代码行数:17,代码来源:vmpowerdown.py
示例11: test_unregister_libvirt_error
def test_unregister_libvirt_error(self):
def fail(uuid):
raise vmfakecon.Error(libvirt.VIR_ERR_INTERNAL_ERROR)
self.connection.secretLookupByUUIDString = fail
res = secret.unregister([str(uuid.uuid4())])
self.assertEqual(res, response.error("secretUnregisterErr"))
开发者ID:EdDev,项目名称:vdsm,代码行数:6,代码来源:vmsecret_test.py
示例12: test_unregister_validation
def test_unregister_validation(self):
res = secret.unregister(["this-is-not-a-uuid"])
self.assertEqual(res, response.error("secretBadRequestErr"))
开发者ID:EdDev,项目名称:vdsm,代码行数:3,代码来源:vmsecret_test.py
示例13: test_register_libvirt_error
def test_register_libvirt_error(self):
def fail(xml):
raise vmfakecon.Error(libvirt.VIR_ERR_INTERNAL_ERROR)
self.connection.secretDefineXML = fail
res = secret.register([make_secret()])
self.assertEqual(res, response.error("secretRegisterErr"))
开发者ID:EdDev,项目名称:vdsm,代码行数:6,代码来源:vmsecret_test.py
示例14: test_register_validation
def test_register_validation(self):
res = secret.register([{"invalid": "secret"}])
self.assertEqual(res, response.error("secretBadRequestErr"))
开发者ID:EdDev,项目名称:vdsm,代码行数:3,代码来源:vmsecret_test.py
示例15: test_abort_from_invalid_state
def test_abort_from_invalid_state(self, status, err):
job = TestingJob(status)
jobs.add(job)
res = jobs.abort(job.id)
self.assertEqual(response.error(err), res)
开发者ID:EdDev,项目名称:vdsm,代码行数:5,代码来源:jobs_test.py
示例16: test_delete_unknown_job
def test_delete_unknown_job(self):
self.assertEqual(response.error(jobs.NoSuchJob.name),
jobs.delete('foo'))
开发者ID:EdDev,项目名称:vdsm,代码行数:3,代码来源:jobs_test.py
示例17: test_abort_not_supported
def test_abort_not_supported(self):
job = jobs.Job(str(uuid.uuid4()))
job._status = jobs.STATUS.RUNNING
jobs.add(job)
self.assertEqual(response.error(jobs.AbortNotSupported.name),
jobs.abort(job.id))
开发者ID:EdDev,项目名称:vdsm,代码行数:6,代码来源:jobs_test.py
示例18: test_has_volume_leases
def test_has_volume_leases():
_vm = FakeVm([make_drive(lease_drive_conf)])
result = _vm.diskReplicateFinish(lease_drive_conf, dst_drive_conf)
assert result == response.error('noimpl')
开发者ID:oVirt,项目名称:vdsm,代码行数:4,代码来源:diskreplicate_test.py
示例19: test_lookup_error
def test_lookup_error():
_vm = FakeVm()
result = _vm.diskReplicateFinish(src_drive_conf, dst_drive_conf)
assert result == response.error('imageErr')
开发者ID:oVirt,项目名称:vdsm,代码行数:5,代码来源:diskreplicate_test.py
示例20: test_delete_active_job
def test_delete_active_job(self, status):
job = TestingJob(status)
jobs.add(job)
self.assertEqual(response.error(jobs.JobNotDone.name),
jobs.delete(job.id))
开发者ID:EdDev,项目名称:vdsm,代码行数:5,代码来源:jobs_test.py
注:本文中的vdsm.common.response.error函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论