Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
85 views
in Technique[技术] by (71.8m points)

python - How to use asyncio.run_coroutine_threadsafe correctly?

In short, the problem is that the future returned by asyncio.run_coroutine_threadsafe is blocking when I call future.result()

The problem is also documented in the following question with (currently) no satisfactory answer: Future from asyncio.run_coroutine_threadsafe hangs forever?

What I'm trying to achieve is to call async code from sync code, where the sync code is actually itself wrapped in async code with an existing running event loop (to make things more concrete: it's a Jupyter notebook).

I would want to send async tasks from nested sync code to the existing 'outer' event loop and 'await' its results within the nested sync code. Implied constraint: I do not want to run those tasks on a new event loop (multiple reasons).

Since it's not possible to just 'await' an async result from sync code without blocking and without using asyncio.run which creates a new event loop, I thought using a separate thread would somehow help.

From the documentation description, asyncio.run_coroutine_threadsafe sounds like the perfect candidate.

But it's still blocking...

Bellow full snippet, with a timeout when calling the future's result.

How can I get this code to work correctly?

import asyncio
from concurrent.futures import ThreadPoolExecutor


async def gather_coroutines(*coroutines):
    return await asyncio.gather(*coroutines)


def run_th_safe(loop, coroutines):
    future = asyncio.run_coroutine_threadsafe(gather_coroutines(*coroutines), loop)
    res = future.result(timeout=3)      # **** BLOCKING *****
    return res


def async2sync(*coroutines):
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(gather_coroutines(*coroutines))

    # BLOW DOESN'T WORK BECAUSE run_th_safe IS BLOCKING
    with ThreadPoolExecutor(max_workers=1) as ex:
        thread_future = ex.submit(run_th_safe, loop, coroutines)
        return thread_future.result()


# Testing
async def some_async_task(n):
    """Some async function to test"""
    print('Task running with n =', n)
    await asyncio.sleep(n/10)
    print('Inside coro', n)
    return list(range(n))


async def main_async():
    coro3 = some_async_task(30)
    coro1 = some_async_task(10)
    coro2 = some_async_task(20)
    results = async2sync(coro3, coro1, coro2)
    return results


def main_sync():
    coro3 = some_async_task(30)
    coro1 = some_async_task(10)
    coro2 = some_async_task(20)
    results = async2sync(coro3, coro1, coro2)
    return results


if __name__ == '__main__':
    # Testing functionnality with asyncio.run()
    # This works
    print(main_sync())

    # Testing functionnality with outer-loop (asyncio.run) and nested asyncio.run_coroutine_threadsafe
    # **DOESN'T WORK**
    print(asyncio.run(main_async()))

question from:https://stackoverflow.com/questions/65910442/how-to-use-asyncio-run-coroutine-threadsafe-correctly

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...