How do I terminate such long running CPU-bound computations within a method?
The approach you tried doesn't work because the futures returned by ProcessPoolExecutor
are not cancellable. Although asyncio's run_in_executor
tries to propagate the cancellation, it is simply ignored by Future.cancel
once the task starts executing.
There is no fundamental reason for that. Unlike threads, processes can be safely terminated, so it would be perfectly possible for ProcessPoolExecutor.submit
to return a future whose cancel
terminated the corresponding process. Asyncio coroutines have defined cancellation semantics and would automatically make use of it. Unfortunately, ProcessPoolExecutor.submit
returns a regular concurrent.futures.Future
, which assumes the lowest common denominator and treats a running future as untouchable.
As a result, to cancel tasks executed in subprocesses, one must circumvent the ProcessPoolExecutor
altogether and manage one's own processes. The challenge is how to do this without reimplementing half of multiprocessing
. One option offered by the standard library is to (ab)use multiprocessing.Pool
for this purpose, because it supports reliable shutdown of worker processes. A CancellablePool
could work as follows:
- Instead of spawning a fixed number of processes, spawn a fixed number of 1-worker pools.
- Assign tasks to pools from an asyncio coroutine. If the coroutine is canceled while waiting for the task to finish in the other process, terminate the single-process pool and create a new one.
- Since everything is coordinated from the single asyncio thread, don't worry about race conditions such as accidentally killing a process which has already started executing another task. (This would need to be prevented if one were to support cancellation in
ProcessPoolExecutor
.)
Here is a sample implementation of that idea:
import asyncio
import multiprocessing
class CancellablePool:
def __init__(self, max_workers=3):
self._free = {self._new_pool() for _ in range(max_workers)}
self._working = set()
self._change = asyncio.Event()
def _new_pool(self):
return multiprocessing.Pool(1)
async def apply(self, fn, *args):
"""
Like multiprocessing.Pool.apply_async, but:
* is an asyncio coroutine
* terminates the process if cancelled
"""
while not self._free:
await self._change.wait()
self._change.clear()
pool = usable_pool = self._free.pop()
self._working.add(pool)
loop = asyncio.get_event_loop()
fut = loop.create_future()
def _on_done(obj):
loop.call_soon_threadsafe(fut.set_result, obj)
def _on_err(err):
loop.call_soon_threadsafe(fut.set_exception, err)
pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)
try:
return await fut
except asyncio.CancelledError:
pool.terminate()
usable_pool = self._new_pool()
finally:
self._working.remove(pool)
self._free.add(usable_pool)
self._change.set()
def shutdown(self):
for p in self._working | self._free:
p.terminate()
self._free.clear()
A minimalistic test case showing cancellation:
def really_long_process():
print("I am a really long computation.....")
large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(large_val))
async def main():
loop = asyncio.get_event_loop()
pool = CancellablePool()
tasks = [loop.create_task(pool.apply(really_long_process))
for _ in range(5)]
for t in tasks:
try:
await asyncio.wait_for(t, 1)
except asyncio.TimeoutError:
print('task timed out and cancelled')
pool.shutdown()
asyncio.get_event_loop().run_until_complete(main())
Note how the CPU usage never exceeds 3 cores, and how it starts dropping near the end of the test, indicating that the processes are being terminated as expected.
To apply it to the code from the question, make self._lmz_executor
an instance of CancellablePool
and change self._loop.run_in_executor(...)
to self._loop.create_task(self._lmz_executor.apply(...))
.