Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
205 views
in Technique[技术] by (71.8m points)

python - Read in parallel and write sequentially?

I have the following code which read and write for each id sequentially.

async def main():
    while id < 1000:
       data = await read_async(id) 
       await data.write_async(f'{id}.csv')
       id += 1
       

read_async() takes several minutes and write_async() takes less than one minute to run. Now I want to

  1. Run read_async(id) in parallel. However, at most 3 calls can be run in parallel because of memory limitation.
  2. write_async has to be run sequentially, i.e., write_async(n+1) cannot be run before write_async(n).
question from:https://stackoverflow.com/questions/65922160/read-in-parallel-and-write-sequentially

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You could use a queue and a fixed number of tasks for reading, and write from the main task. The main task can use an event to find out that new data is available from the readers and and a shared dict to get it from them. For example (untested):

async def reader(q, id_to_data, data_ready):
    while True:
        id = await q.get()
        data = await read_async(id) 
        id_to_data[id] = data
        data_ready.set()

async def main():
    q = asyncio.Queue()
    for id in range(1000):
        await q.put(id)

    id_to_data = {}
    data_ready = asyncio.Event()
    readers = [asyncio.create_task(reader(q, id_to_data, data_ready))
               for _ in 3]

    for id in range(1000):
       while True:
           # wait for the current ID to appear before writing
           if id in id_to_data:
               data = id_to_data.pop(id)
               await data.write_async(f'{id}.csv')
               break
               # move on to the next ID
           else:
               # wait for new data and try again
               await data_ready.wait()
               data_ready.clear()

    for r in readers:
        r.cancel()

Using a separate queue for results instead of the event wouldn't work because a queue is unordered. A priority queue would fix that, bit it would still immediately return the lowest id currently available, whereas the writer needs the next id in order to process all ids in order.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...