Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
705 views
in Technique[技术] by (71.8m points)

python multiprocessing .join() deadlock depends on worker function

I am using the multiprocessing python library to spawn 4 Process() objects to parallelize a cpu intensive task. The task (inspiration and code from this great article) is to compute the prime factors for every integer in a list.

main.py:

import random
import multiprocessing
import sys

num_inputs  = 4000
num_procs   = 4
proc_inputs = num_inputs/num_procs
input_list  = [int(1000*random.random()) for i in xrange(num_inputs)]

output_queue = multiprocessing.Queue()
procs        = []
for p_i in xrange(num_procs):
  print "Process [%d]"%p_i
  proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
  print " - num inputs: [%d]"%len(proc_list)

  # Using target=worker1 HANGS on join
  p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
  # Using target=worker2 RETURNS with success
  #p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))

  procs.append(p)
  p.start()

for p in jobs:
  print "joining ", p, output_queue.qsize(), output_queue.full()
  p.join()
  print "joined  ", p, output_queue.qsize(), output_queue.full()

print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
    ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)

Observation:

  • If the target for each process is the function worker1, for an input list larger than 4000 elements the main thread gets stuck on .join(), waiting for the spawned processes to terminate and never returns.
  • If the target for each process is the function worker2, for the same input list the code works just fine and the main thread returns.

This is very confusing to me, as the only difference between worker1 and worker2 (see below) is that the former inserts individual lists in the Queue whereas the latter inserts a single list of lists for each process.

Why is there deadlock using worker1 and not using worker2 target? Shouldn't both (or neither) go beyond the Multiprocessing Queue maxsize limit is 32767?


worker1 vs worker2:

def worker1(proc_num, proc_list, output_queue):
    '''worker function which deadlocks'''  
    for num in proc_list:
        output_queue.put(factorize_naive(num))

def worker2(proc_num, proc_list, output_queue):
    '''worker function that works'''
    workers_stuff = []

    for num in proc_list:
        workers_stuff.append(factorize_naive(num))
    output_queue.put(workers_stuff)

There are a lot of similar questions on SO, but I believe the core of this questions is clearly distinct from all of them.

Related Links:

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The docs warn about this:

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

While a Queue appears to be unbounded, under the covers queued items are buffered in memory to avoid overloading inter-process pipes. A process cannot end normally before those memory buffers are flushed. Your worker1() puts a lot more items on the queue than your worker2(), and that's all there is to it. Note that the number of items that can queued before the implementation resorts to buffering in memory isn't defined: it can vary across OS and Python release.

As the docs suggest, the normal way to avoid this is to .get() all the items off the queue before you attempt to .join() the processes. As you've discovered, whether it's necessary to do so depends in an undefined way on how many items have been put on the queue by each worker process.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...