You could use a queue and a fixed number of tasks for reading, and write from the main task. The main task can use an event to find out that new data is available from the readers and and a shared dict to get it from them. For example (untested):
async def reader(q, id_to_data, data_ready):
while True:
id = await q.get()
data = await read_async(id)
id_to_data[id] = data
data_ready.set()
async def main():
q = asyncio.Queue()
for id in range(1000):
await q.put(id)
id_to_data = {}
data_ready = asyncio.Event()
readers = [asyncio.create_task(reader(q, id_to_data, data_ready))
for _ in 3]
for id in range(1000):
while True:
# wait for the current ID to appear before writing
if id in id_to_data:
data = id_to_data.pop(id)
await data.write_async(f'{id}.csv')
break
# move on to the next ID
else:
# wait for new data and try again
await data_ready.wait()
data_ready.clear()
for r in readers:
r.cancel()
Using a separate queue for results instead of the event wouldn't work because a queue is unordered. A priority queue would fix that, bit it would still immediately return the lowest id currently available, whereas the writer needs the next id in order to process all ids in order.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…