Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
244 views
in Technique[技术] by (71.8m points)

python - Future inside future always pending

P.S. Started an issue https://github.com/robinhood/faust/issues/702

Developing Faust-app:

from concurrent.futures import ProcessPoolExecutor, as_completed

import faust

app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic')


@app.task()
async def check():
    # 3 is amount of different folders where archives are laced
    with ProcessPoolExecutor(max_workers=3) as executor:
        fs = [executor.submit(handle, directory) for directory in ['dir1', 'dir2', 'dir3']]
        for future in as_completed(fs):
            future.result()


def handle(directory):
    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka
    f = sink.send_soon(value={'ts': 1234567890, 'count': 10})  # always in pending status

Faced a problem when method sink.send_soon returns FutureMessage(asyncio.Future, Awaitable[RecordMetadata]) which is always in pending status.

This is the situation when future inside another future.

Note. Function handle should be sync because one cannot pass async function to ProcessPollExecutor. Method send_soon is sync method. According to this example https://github.com/robinhood/faust/blob/b5e159f1d104ad4a6aa674d14b6ba0be19b5f9f5/examples/windowed_aggregation.py#L47 awaiting is not necessarily.

If there any way to handle pending future?

Also tried this:

import asyncio
from concurrent.futures import ProcessPoolExecutor

import faust

loop = asyncio.get_event_loop()

app = faust.App('my-app-name', broker='kafka://localhost:9092', loop=loop)
sink = app.topic('topic')


@app.task()
async def check():
    tasks = []
    with ProcessPoolExecutor(max_workers=3) as executor:
        for dir_ in ['dir1', 'dir2', 'dir3']:
            task = asyncio.create_task(run_dir_handling(executor, dir_))
            tasks.append(task)

        await asyncio.gather(*tasks)


async def run_dir_handling(executor, dir_):
    print('running blocking')
    await loop.run_in_executor(executor, handle, dir_)


def handle(directory):
    print('Handle directory')

    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka

    # `send_soon` is not non-`async def but `send` is async
    # async `soon` cannot be implemented because of
    #    `await loop.run_in_executor(executor, handle, dir_) TypeError: cannot pickle 'coroutine' object` error
    f = sink.send_soon(value={'ts': 1234567890, 'count': 10, 'dir': directory})
    print(f)  # always <FutureMessage pending>

But it didn't work too.

It seems loop is not even have a chance to run send_soon method.

question from:https://stackoverflow.com/questions/65850427/future-inside-future-always-pending

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Changed code structure for this:

import asyncio
from concurrent.futures import ProcessPoolExecutor

import faust

loop = asyncio.get_event_loop()

app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic1')


@app.task()
async def check():
    tasks = []

    with ProcessPoolExecutor(max_workers=3) as executor:
        for dir_ in ['dir1', 'dir2', 'dir3']:
            task = asyncio.create_task(run_dir_handling(executor, dir_))
            tasks.append(task)

        await asyncio.gather(*tasks)


async def run_dir_handling(executor, dir_):
    directory = await loop.run_in_executor(executor, handle, dir_)
    await sink.send(value={'dir': directory})  
    

def handle(directory):
    print('Handle directory')

    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka

    return directory

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...