Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
389 views
in Technique[技术] by (71.8m points)

parallel processing - Why python multiprocessing takes more time than serial code? How to speedup this?

I was trying out the Python multiprocessing module. In the code below the serial execution time 0.09 seconds and the parallel execution time is 0.2 seconds. As I am getting no speedup, I think I might be going wrong somewhere

import multiprocessing as mp
from random import uniform, randrange
import time

# m = mp.Manager()
out_queue = mp.Queue()

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals


def flop(val, a, b, out_queue):
    cals = []
    for v in val:
        cals.append(v + a * b)
    # print cals
    out_queue.put(cals)
    # print "Exec over"


def concurrency():
    # out_queue1 = mp.Queue()
    # out_queue2 = mp.Queue()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    start_time = time.time()
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))
    p1.start()
    out_queue.get()
    # print "
Final:", len(out_queue.get())
    p2.start()
    out_queue.get()
    # print "
Final:", len(out_queue.get())
    p3.start()
    out_queue.get()

    p4.start()
    out_queue.get()

    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print "Running time parallel: ", time.time() - start_time, "secs"

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

My system has four cores. Please let me know of ways I can speedup this code. Also, what are my options for parallel programming with python(other than the multiprocessing module).

Thanks and Regards

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

out_queue.get() blocks until a result is available by default. So you are essentially starting a process and waiting for it to finish before starting the next process. Instead, start all the processes, then get all the results.

Example:

    #!python2
    import multiprocessing as mp
    from random import uniform, randrange
    import time

    def flop_no(rand_nos, a, b):
        cals = []
        for r in rand_nos:
            cals.append(r + a * b)
        return cals

    def flop(val, a, b, out_queue):
        cals = []
        for v in val:
            cals.append(v + a * b)
        out_queue.put(cals)
        # time.sleep(3)

    def concurrency():
        out_queue = mp.Queue()
        a = 3.3
        b = 4.4
        rand_nos = [uniform(1, 4) for i in range(1000000)]
        print len(rand_nos)
        # for i in range(5):
        start_time = time.time()
        p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
        p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
        p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
        p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))

        p1.start()
        p2.start()
        p3.start()
        p4.start()

        print len(out_queue.get())
        print len(out_queue.get())
        print len(out_queue.get())
        print len(out_queue.get())

        p1.join()
        p2.join()
        p3.join()
        p4.join()

        print "Running time parallel: ", time.time() - start_time, "secs"

    def no_concurrency():
        a = 3.3
        b = 4.4
        rand_nos = [uniform(1, 4) for i in range(1000000)]
        start_time = time.time()
        cals = flop_no(rand_nos, a, b)
        print "Running time serial: ", time.time() - start_time, "secs"

    if __name__ == '__main__':
        concurrency()
        no_concurrency()
        # print "Program over" 

Output:

    1000000
    250000
    250000
    250000
    250000
    Running time parallel:  3.54999995232  secs
    Running time serial:    0.203000068665 secs

Note that parallel time is still slower.  This is due to the overhead of starting 4 other Python processes.  Your processing time for the whole job is only .2 seconds.  The 3.5 seconds for parallel is mostly just starting up the processes.  Note the commented out `# time.sleep(3)` above in `flop()`.  Add that code in and the times are:

    1000000
    250000
    250000
    250000
    250000
    Running time parallel:  6.50900006294  secs
    Running time serial:    0.203000068665 secs

The overall time only got 3 seconds faster (not 12) because they were running in parallel.  You need a lot more data to make parallel processing worthwhile.

Here's a version where you can visually see how long it takes to start the processes.  "here" is printed as each process begins to run `flop()`.  An event is used to start all threads at the same time, and only the processing time is counted:

#!python2

import multiprocessing as mp
from random import uniform, randrange
import time

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals

def flop(val, a, b, out_queue, start):
    print 'here'
    start.wait()
    cals = []
    for v in val:
        cals.append(v + a * b)
    out_queue.put(cals)
    time.sleep(3)

def concurrency():
    out_queue = mp.Queue()
    start = mp.Event()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue, start))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue, start))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue, start))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue, start))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    time.sleep(5) # Wait for processes to start.  See Barrier in Python 3.2+ for a better solution.
    print "go"
    start.set()
    start_time = time.time()
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print "Running time parallel: ", time.time() - start_time, "secs"

    p1.join()
    p2.join()
    p3.join()
    p4.join()

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

Output:

1000000
here           # note these print about a second apart.
here
here
here
go
250000
250000
250000
250000
Running time parallel:  0.171999931335 secs
Running time serial:    0.203000068665 secs

Now, the processing time got faster. Not by a lot...probably due to the interprocess communication to get the results.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...