本文整理汇总了Python中tornado.test.util.exec_test函数的典型用法代码示例。如果您正苦于以下问题:Python exec_test函数的具体用法?Python exec_test怎么用?Python exec_test使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了exec_test函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_asyncio_adapter
def test_asyncio_adapter(self):
# This test demonstrates that when using the asyncio coroutine
# runner (i.e. run_until_complete), the to_asyncio_future
# adapter is needed. No adapter is needed in the other direction,
# as demonstrated by other tests in the package.
@gen.coroutine
def tornado_coroutine():
yield gen.Task(self.io_loop.add_callback)
raise gen.Return(42)
native_coroutine_without_adapter = exec_test(globals(), locals(), """
async def native_coroutine_without_adapter():
return await tornado_coroutine()
""")["native_coroutine_without_adapter"]
native_coroutine_with_adapter = exec_test(globals(), locals(), """
async def native_coroutine_with_adapter():
return await to_asyncio_future(tornado_coroutine())
""")["native_coroutine_with_adapter"]
# Use the adapter, but two degrees from the tornado coroutine.
native_coroutine_with_adapter2 = exec_test(globals(), locals(), """
async def native_coroutine_with_adapter2():
return await to_asyncio_future(native_coroutine_without_adapter())
""")["native_coroutine_with_adapter2"]
# Tornado supports native coroutines both with and without adapters
self.assertEqual(
self.io_loop.run_sync(native_coroutine_without_adapter),
42)
self.assertEqual(
self.io_loop.run_sync(native_coroutine_with_adapter),
42)
self.assertEqual(
self.io_loop.run_sync(native_coroutine_with_adapter2),
42)
# Asyncio only supports coroutines that yield asyncio-compatible
# Futures (which our Future is since 5.0).
self.assertEqual(
asyncio.get_event_loop().run_until_complete(
native_coroutine_without_adapter()),
42)
self.assertEqual(
asyncio.get_event_loop().run_until_complete(
native_coroutine_with_adapter()),
42)
self.assertEqual(
asyncio.get_event_loop().run_until_complete(
native_coroutine_with_adapter2()),
42)
开发者ID:alexdxy,项目名称:tornado,代码行数:50,代码来源:asyncio_test.py
示例2: test_native_coroutine
def test_native_coroutine(self):
namespace = exec_test(globals(), locals(), """
@gen_test
async def test(self):
self.finished = True
""")
namespace['test'](self)
开发者ID:conn4575,项目名称:tornado,代码行数:8,代码来源:testing_test.py
示例3: test_native_coroutine
def test_native_coroutine(self):
@gen.coroutine
def f1():
yield gen.moment
namespace = exec_test(globals(), locals(), """
async def f2():
await f1()
""")
self.io_loop.run_sync(namespace['f2'])
开发者ID:jacklicn,项目名称:tornado,代码行数:10,代码来源:ioloop_test.py
示例4: test_async_return
def test_async_return(self):
namespace = exec_test(globals(), locals(), """
@gen.coroutine
def f():
yield gen.Task(self.io_loop.add_callback)
return 42
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
self.finished = True
开发者ID:netmosquito,项目名称:tornado,代码行数:10,代码来源:gen_test.py
示例5: test_async_with_timeout
def test_async_with_timeout(self):
namespace = exec_test(globals(), locals(), """
async def f1():
return 42
""")
result = yield gen.with_timeout(datetime.timedelta(hours=1),
namespace['f1']())
self.assertEqual(result, 42)
self.finished = True
开发者ID:alexdxy,项目名称:tornado,代码行数:10,代码来源:gen_test.py
示例6: test_native_body_producer_chunked
def test_native_body_producer_chunked(self):
namespace = exec_test(globals(), locals(), """
async def body_producer(write):
await write(b'1234')
await gen.Task(IOLoop.current().add_callback)
await write(b'5678')
""")
response = self.fetch("/echo_post", method="POST",
body_producer=namespace["body_producer"])
response.rethrow()
self.assertEqual(response.body, b"12345678")
开发者ID:ArthurGarnier,项目名称:SickRage,代码行数:11,代码来源:simple_httpclient_test.py
示例7: test_exception_logging_native_coro
def test_exception_logging_native_coro(self):
"""The IOLoop examines exceptions from awaitables and logs them."""
namespace = exec_test(globals(), locals(), """
async def callback():
self.io_loop.add_callback(self.stop)
1 / 0
""")
with NullContext():
self.io_loop.add_callback(namespace["callback"])
with ExpectLog(app_log, "Exception in callback"):
self.wait()
开发者ID:heewa,项目名称:tornado,代码行数:11,代码来源:ioloop_test.py
示例8: test_async_await
def test_async_await(self):
# This test verifies that an async function can await a
# yield-based gen.coroutine, and that a gen.coroutine
# (the test method itself) can yield an async function.
namespace = exec_test(globals(), locals(), """
async def f():
await gen.Task(self.io_loop.add_callback)
return 42
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
self.finished = True
开发者ID:netmosquito,项目名称:tornado,代码行数:12,代码来源:gen_test.py
示例9: test_asyncio_yield_from
def test_asyncio_yield_from(self):
# Test that we can use asyncio coroutines with 'yield from'
# instead of asyncio.async(). This requires python 3.3 syntax.
namespace = exec_test(globals(), locals(), """
@gen.coroutine
def f():
event_loop = asyncio.get_event_loop()
x = yield from event_loop.run_in_executor(None, lambda: 42)
return x
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
开发者ID:102hailan,项目名称:tornado,代码行数:12,代码来源:asyncio_test.py
示例10: test_asyncio_sleep_zero
def test_asyncio_sleep_zero(self):
# asyncio.sleep(0) turns into a special case (equivalent to
# `yield None`)
namespace = exec_test(globals(), locals(), """
async def f():
import asyncio
await asyncio.sleep(0)
return 42
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
self.finished = True
开发者ID:alexdxy,项目名称:tornado,代码行数:12,代码来源:gen_test.py
示例11: test_native_body_producer_chunked
def test_native_body_producer_chunked(self):
namespace = exec_test(globals(), locals(), """
async def body_producer(write):
await write(b'1234')
import asyncio
await asyncio.sleep(0)
await write(b'5678')
""")
response = self.fetch("/echo_post", method="POST",
body_producer=namespace["body_producer"])
response.rethrow()
self.assertEqual(response.body, b"12345678")
开发者ID:dkdenza,项目名称:tornado,代码行数:12,代码来源:simple_httpclient_test.py
示例12: test_context_manager_async_await
def test_context_manager_async_await(self):
# Repeat the above test using 'async with'.
sem = locks.Semaphore()
namespace = exec_test(globals(), locals(), """
async def f():
async with sem as yielded:
self.assertTrue(yielded is None)
""")
yield namespace['f']()
# Semaphore was released and can be acquired again.
self.assertTrue(sem.acquire().done())
开发者ID:102hailan,项目名称:tornado,代码行数:13,代码来源:locks_test.py
示例13: test_native_coroutine_timeout
def test_native_coroutine_timeout(self):
# Set a short timeout and exceed it.
namespace = exec_test(globals(), locals(), """
@gen_test(timeout=0.1)
async def test(self):
await gen.sleep(1)
""")
try:
namespace['test'](self)
self.fail("did not get expected exception")
except ioloop.TimeoutError:
self.finished = True
开发者ID:conn4575,项目名称:tornado,代码行数:13,代码来源:testing_test.py
示例14: test_async_early_return
def test_async_early_return(self):
# A yield statement exists but is not executed, which means
# this function "returns" via an exception. This exception
# doesn't happen before the exception handling is set up.
namespace = exec_test(globals(), locals(), """
@gen.coroutine
def f():
if True:
return 42
yield gen.Task(self.io_loop.add_callback)
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
self.finished = True
开发者ID:netmosquito,项目名称:tornado,代码行数:14,代码来源:gen_test.py
示例15: test_async_await_mixed_multi
def test_async_await_mixed_multi(self):
namespace = exec_test(globals(), locals(), """
async def f1():
await gen.Task(self.io_loop.add_callback)
return 42
""")
@gen.coroutine
def f2():
yield gen.Task(self.io_loop.add_callback)
raise gen.Return(43)
results = yield [namespace['f1'](), f2()]
self.assertEqual(results, [42, 43])
self.finished = True
开发者ID:netmosquito,项目名称:tornado,代码行数:15,代码来源:gen_test.py
示例16: test_async_for
def test_async_for(self):
q = queues.Queue()
for i in range(5):
q.put(i)
namespace = exec_test(globals(), locals(), """
async def f():
results = []
async for i in q:
results.append(i)
if i == 4:
return results
""")
results = yield namespace['f']()
self.assertEqual(results, list(range(5)))
开发者ID:102hailan,项目名称:tornado,代码行数:15,代码来源:queues_test.py
示例17: test_acquire_fifo_async_with
def test_acquire_fifo_async_with(self):
# Repeat the above test using `async with lock:`
# instead of `with (yield lock.acquire()):`.
lock = locks.Lock()
self.assertTrue(lock.acquire().done())
N = 5
history = []
namespace = exec_test(globals(), locals(), """
async def f(idx):
async with lock:
history.append(idx)
""")
futures = [namespace['f'](i) for i in range(N)]
lock.release()
yield futures
self.assertEqual(list(range(N)), history)
开发者ID:102hailan,项目名称:tornado,代码行数:17,代码来源:locks_test.py
示例18: test_async_await
def test_async_await(self):
class Object(object):
def __init__(self):
self.executor = futures.thread.ThreadPoolExecutor(1)
@run_on_executor()
def f(self):
return 42
o = Object()
namespace = exec_test(globals(), locals(), """
async def f():
answer = await o.f()
return answer
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
开发者ID:conn4575,项目名称:tornado,代码行数:17,代码来源:concurrent_test.py
示例19: test_undecorated_coroutine
def test_undecorated_coroutine(self):
namespace = exec_test(globals(), locals(), """
class Test(AsyncTestCase):
async def test_coro(self):
pass
""")
test_class = namespace['Test']
test = test_class('test_coro')
result = unittest.TestResult()
# Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
with warnings.catch_warnings():
warnings.simplefilter('ignore')
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
开发者ID:conn4575,项目名称:tornado,代码行数:18,代码来源:testing_test.py
示例20: test_handle_stream_native_coroutine
def test_handle_stream_native_coroutine(self):
# handle_stream may be a native coroutine.
namespace = exec_test(globals(), locals(), """
class TestServer(TCPServer):
async def handle_stream(self, stream, address):
stream.write(b'data')
stream.close()
""")
sock, port = bind_unused_port()
server = namespace['TestServer']()
server.add_socket(sock)
client = IOStream(socket.socket())
yield client.connect(('localhost', port))
result = yield client.read_until_close()
self.assertEqual(result, b'data')
server.stop()
client.close()
开发者ID:heewa,项目名称:tornado,代码行数:19,代码来源:tcpserver_test.py
注:本文中的tornado.test.util.exec_test函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论