本文整理汇总了Python中twisted.trial.util._runSequentially函数的典型用法代码示例。如果您正苦于以下问题:Python _runSequentially函数的具体用法?Python _runSequentially怎么用?Python _runSequentially使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了_runSequentially函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_emptyList
def test_emptyList(self):
"""
When asked to run an empty list of callables, runSequentially returns a
successful Deferred that fires an empty list.
"""
d = util._runSequentially([])
self.assertDeferredResult(d, self.assertEqual, [])
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:7,代码来源:test_util.py
示例2: test_singleAsynchronousSuccess
def test_singleAsynchronousSuccess(self):
"""
When given a callable that returns a successful Deferred, include the
result of the Deferred in the results list, tagged with a SUCCESS flag.
"""
d = util._runSequentially([lambda: defer.succeed(None)])
self.assertDeferredResult(d, self.assertEqual, [(defer.SUCCESS, None)])
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:7,代码来源:test_util.py
示例3: test_callablesCalledInOrder
def test_callablesCalledInOrder(self):
"""
Check that the callables are called in the given order, one after the
other.
"""
log = []
deferreds = []
def append(value):
d = defer.Deferred()
log.append(value)
deferreds.append(d)
return d
d = util._runSequentially([lambda: append('foo'),
lambda: append('bar')])
# runSequentially should wait until the Deferred has fired before
# running the second callable.
self.assertEqual(log, ['foo'])
deferreds[-1].callback(None)
self.assertEqual(log, ['foo', 'bar'])
# Because returning created Deferreds makes jml happy.
deferreds[-1].callback(None)
return d
开发者ID:Almad,项目名称:twisted,代码行数:26,代码来源:test_util.py
示例4: test_singleSynchronousSuccess
def test_singleSynchronousSuccess(self):
"""
When given a callable that succeeds without returning a Deferred,
include the return value in the results list, tagged with a SUCCESS
flag.
"""
d = util._runSequentially([lambda: None])
self.assertDeferredResult(d, self.assertEqual, [(defer.SUCCESS, None)])
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:8,代码来源:test_util.py
示例5: test_singleAsynchronousFailure
def test_singleAsynchronousFailure(self):
"""
When given a callable that returns a failing Deferred, include the
failure the results list, tagged with a FAILURE flag.
"""
d = util._runSequentially([lambda: defer.fail(ValueError('foo'))])
def check(results):
[(flag, fail)] = results
fail.trap(ValueError)
self.assertEqual(fail.getErrorMessage(), 'foo')
self.assertEqual(flag, defer.FAILURE)
self.assertDeferredResult(d, check)
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:12,代码来源:test_util.py
示例6: test_singleSynchronousFailure
def test_singleSynchronousFailure(self):
"""
When given a callable that raises an exception, include a Failure for
that exception in the results list, tagged with a FAILURE flag.
"""
d = util._runSequentially([lambda: self.fail('foo')])
def check(results):
[(flag, fail)] = results
fail.trap(self.failureException)
self.assertEqual(fail.getErrorMessage(), 'foo')
self.assertEqual(flag, defer.FAILURE)
self.assertDeferredResult(d, check)
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:12,代码来源:test_util.py
示例7: _runCleanups
def _runCleanups(self):
"""
Run the cleanups added with L{addCleanup} in order.
@return: A C{Deferred} that fires when all cleanups are run.
"""
def _makeFunction(f, args, kwargs):
return lambda: f(*args, **kwargs)
callables = []
while len(self._cleanups) > 0:
f, args, kwargs = self._cleanups.pop()
callables.append(_makeFunction(f, args, kwargs))
return util._runSequentially(callables)
开发者ID:AlexanderHerlan,项目名称:syncpy,代码行数:13,代码来源:_asynctest.py
示例8: test_stopOnFirstError
def test_stopOnFirstError(self):
"""
If the C{stopOnFirstError} option is passed to C{runSequentially}, then
no further callables are called after the first exception is raised.
"""
d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
stopOnFirstError=True)
def check(results):
[(flag1, fail)] = results
fail.trap(self.failureException)
self.assertEqual(flag1, defer.FAILURE)
self.assertEqual(fail.getErrorMessage(), 'foo')
self.assertDeferredResult(d, check)
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:13,代码来源:test_util.py
示例9: test_continuesAfterError
def test_continuesAfterError(self):
"""
If one of the callables raises an error, then runSequentially continues
to run the remaining callables.
"""
d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'])
def check(results):
[(flag1, fail), (flag2, result)] = results
fail.trap(self.failureException)
self.assertEqual(flag1, defer.FAILURE)
self.assertEqual(fail.getErrorMessage(), 'foo')
self.assertEqual(flag2, defer.SUCCESS)
self.assertEqual(result, 'bar')
self.assertDeferredResult(d, check)
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:14,代码来源:test_util.py
示例10: test_stripFlags
def test_stripFlags(self):
"""
If the C{stripFlags} option is passed to C{runSequentially} then the
SUCCESS / FAILURE flags are stripped from the output. Instead, the
Deferred fires a flat list of results containing only the results and
failures.
"""
d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
stripFlags=True)
def check(results):
[fail, result] = results
fail.trap(self.failureException)
self.assertEqual(fail.getErrorMessage(), 'foo')
self.assertEqual(result, 'bar')
return d.addCallback(check)
开发者ID:Almad,项目名称:twisted,代码行数:15,代码来源:test_util.py
示例11: test_continuesAfterError
def test_continuesAfterError(self):
"""
If one of the callables raises an error, then runSequentially continues
to run the remaining callables.
"""
d = util._runSequentially([lambda: self.fail("foo"), lambda: "bar"])
def check(results):
[(flag1, fail), (flag2, result)] = results
fail.trap(self.failureException)
self.assertEqual(flag1, defer.FAILURE)
self.assertEqual(fail.getErrorMessage(), "foo")
self.assertEqual(flag2, defer.SUCCESS)
self.assertEqual(result, "bar")
return d.addCallback(check)
开发者ID:pombredanne,项目名称:toppatch,代码行数:16,代码来源:test_util.py
注:本文中的twisted.trial.util._runSequentially函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论