P.S. Started an issue https://github.com/robinhood/faust/issues/702
Developing Faust-app:
from concurrent.futures import ProcessPoolExecutor, as_completed
import faust
app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic')
@app.task()
async def check():
# 3 is amount of different folders where archives are laced
with ProcessPoolExecutor(max_workers=3) as executor:
fs = [executor.submit(handle, directory) for directory in ['dir1', 'dir2', 'dir3']]
for future in as_completed(fs):
future.result()
def handle(directory):
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
f = sink.send_soon(value={'ts': 1234567890, 'count': 10}) # always in pending status
Faced a problem when method sink.send_soon returns FutureMessage(asyncio.Future, Awaitable[RecordMetadata]) which is always in pending status.
This is the situation when future inside another future.
Note. Function handle should be sync because one cannot pass async function to ProcessPollExecutor. Method send_soon is sync method. According to this example https://github.com/robinhood/faust/blob/b5e159f1d104ad4a6aa674d14b6ba0be19b5f9f5/examples/windowed_aggregation.py#L47 awaiting is not necessarily.
If there any way to handle pending future?
Also tried this:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import faust
loop = asyncio.get_event_loop()
app = faust.App('my-app-name', broker='kafka://localhost:9092', loop=loop)
sink = app.topic('topic')
@app.task()
async def check():
tasks = []
with ProcessPoolExecutor(max_workers=3) as executor:
for dir_ in ['dir1', 'dir2', 'dir3']:
task = asyncio.create_task(run_dir_handling(executor, dir_))
tasks.append(task)
await asyncio.gather(*tasks)
async def run_dir_handling(executor, dir_):
print('running blocking')
await loop.run_in_executor(executor, handle, dir_)
def handle(directory):
print('Handle directory')
# finding archives in directory
# unpacking 7z with mdb-files
# converting mdb tables to csv
# reading csv to dataframe
# some data manipulating
# and at last sending dataframe records to kafka
# `send_soon` is not non-`async def but `send` is async
# async `soon` cannot be implemented because of
# `await loop.run_in_executor(executor, handle, dir_) TypeError: cannot pickle 'coroutine' object` error
f = sink.send_soon(value={'ts': 1234567890, 'count': 10, 'dir': directory})
print(f) # always <FutureMessage pending>
But it didn't work too.
It seems loop is not even have a chance to run send_soon method.
question from:
https://stackoverflow.com/questions/65850427/future-inside-future-always-pending 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…