Why python multiprocessing takes more time than serial code? How to speedup this?

2.9k Views Asked by At

I was trying out the Python multiprocessing module. In the code below the serial execution time 0.09 seconds and the parallel execution time is 0.2 seconds. As I am getting no speedup, I think I might be going wrong somewhere

import multiprocessing as mp
from random import uniform, randrange
import time

# m = mp.Manager()
out_queue = mp.Queue()

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals


def flop(val, a, b, out_queue):
    cals = []
    for v in val:
        cals.append(v + a * b)
    # print cals
    out_queue.put(cals)
    # print "Exec over"


def concurrency():
    # out_queue1 = mp.Queue()
    # out_queue2 = mp.Queue()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    start_time = time.time()
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))
    p1.start()
    out_queue.get()
    # print "\nFinal:", len(out_queue.get())
    p2.start()
    out_queue.get()
    # print "\nFinal:", len(out_queue.get())
    p3.start()
    out_queue.get()

    p4.start()
    out_queue.get()

    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print "Running time parallel: ", time.time() - start_time, "secs"

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

My system has four cores. Please let me know of ways I can speedup this code. Also, what are my options for parallel programming with python(other than the multiprocessing module).

Thanks and Regards

2

There are 2 best solutions below

5
On

out_queue.get() blocks until a result is available by default. So you are essentially starting a process and waiting for it to finish before starting the next process. Instead, start all the processes, then get all the results.

Example:

#!python2
import multiprocessing as mp
from random import uniform, randrange
import time

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals

def flop(val, a, b, out_queue):
    cals = []
    for v in val:
        cals.append(v + a * b)
    out_queue.put(cals)
    # time.sleep(3)

def concurrency():
    out_queue = mp.Queue()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    start_time = time.time()
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())

    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print "Running time parallel: ", time.time() - start_time, "secs"

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over" 

Output:

1000000
250000
250000
250000
250000
Running time parallel:  3.54999995232  secs
Running time serial:    0.203000068665 secs

Note that parallel time is still slower. This is due to the overhead of starting 4 other Python processes. Your processing time for the whole job is only .2 seconds. The 3.5 seconds for parallel is mostly just starting up the processes. Note the commented out # time.sleep(3) above in flop(). Add that code in and the times are:

1000000
250000
250000
250000
250000
Running time parallel:  6.50900006294  secs
Running time serial:    0.203000068665 secs

The overall time only got 3 seconds faster (not 12) because they were running in parallel. You need a lot more data to make parallel processing worthwhile.

Here's a version where you can visually see how long it takes to start the processes. "here" is printed as each process begins to run flop(). An event is used to start all threads at the same time, and only the processing time is counted:

#!python2
import multiprocessing as mp
from random import uniform, randrange
import time

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals

def flop(val, a, b, out_queue, start):
    print 'here'
    start.wait()
    cals = []
    for v in val:
        cals.append(v + a * b)
    out_queue.put(cals)
    time.sleep(3)

def concurrency():
    out_queue = mp.Queue()
    start = mp.Event()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue, start))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue, start))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue, start))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue, start))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    time.sleep(5) # Wait for processes to start.  See Barrier in Python 3.2+ for a better solution.
    print "go"
    start.set()
    start_time = time.time()
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print "Running time parallel: ", time.time() - start_time, "secs"

    p1.join()
    p2.join()
    p3.join()
    p4.join()

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

Output:

1000000
here           # note these print about a second apart.
here
here
here
go
250000
250000
250000
250000
Running time parallel:  0.171999931335 secs
Running time serial:    0.203000068665 secs

Now, the processing time got faster. Not by a lot...probably due to the interprocess communication to get the results.

0
On

Love is a passion . . . but can hurt a lot, if one's belief is just blind or naive to evidence

I love python for its ease of use, for its universality, yet, getting towards HPC performance requires more, hardware-related insights and optimisation-tweaking efforts are needed to be also put in.

@RupjitChakraborty as you might enjoy in my answer below, the same result could be received in a pure-[SERIAL]-code ~50x faster than in your best case and about ~100x faster than Mark's reported time. Feel free to re-test it on your hardware, so as to have a same platform for a bit more rigorous comparisons of performance readings. Nevertheless, enjoy the hunt for performance! – user3666197 Dec 1 '17 at 13:39

If I may put a few cents into this never-ending hunt for performance:
- try to well understand both the original Amdahl's Law + its new overhead-strict re-formulation
- try to well quantify the costs of add-on overheads that appear on process-management
- try to well quantify the costs of add-on overheads that relate to large data transfers ( one-stop cost )
- try to avoid any and all potential (b)locking, some might be hidden "behind" used constructors
- try to avoid any processing-unrelated overhead costs of synchronisation + communication
- try to prevent any CPU_core cache misses and also best minimise coherence losses ( yes, easy to say, hard to code - i.e. a manually crafted code often gets better than a simple one-liner, using some highly-abstracted syntax-constructor ( but at a cost one cannot manage ), as you can take better steps in cache-related decision under your control, than to rely on doing this by some context unaware pre-fabricated universal ( i.e. unrelated to your particular priorities ) code transformation )


Want speedup?
Always systematically test individual factors in isolation:

As a brief view into the actual costs your code will pay ( in [us] ) never guess, test it.

Test-case A: measures process-management [SERIAL]-process-scheduling add-on costs
Test-case B: measures remote process memory allocation add-on costs
Test-case C: measures remote process [CONCURRENT]-process-scheduling computing costs
Test-case D: measures remote process workloads impact on [CONCURRENT] scheduling costs

For details,
one may read further and re-use / improve naive code templates
in chapter [ The Architecture, Resources and Process-scheduling facts that matter ].

As Mark has warned already, another costs to the overhead-strict Amdahl's Law speedup calculation will come from data-transfers from the main process towards each of the spawned subprocesses, where pure-[SERIAL] add-on overheads will and do grow more than linearly scaled to data volume, due to colliding access patterns, resource physical-capacity contention, shared-objects signallisation-(b)locking-overheads, and similar, hardware un-avoidable obstacles.

Before going any deeper into performance-tweaking options, one may propose an easy Test-case E: for measuring this very class of memory-data-transfers add-on costs:

def a_FAT_DATA_XFER_COSTS_FUN( anIndeedFatPieceOfDATA ):
    """                                                 __doc__
    The intent of this FUN() is indeed to do nothing at all,
                             but to be able to benchmark
                             add-on overhead costs
                             raised by a need to transfer
                             some large amount of data
                             from a main()-process
                             to this FUN()-subprocess spawned.
    """
    return ( anIndeedFatPieceOfDATA[ 0]
           + anIndeedFatPieceOfDATA[-1]
             )

##############################################################
###  A NAIVE TEST BENCH
##############################################################
from zmq import Stopwatch; aClk = Stopwatch()
JOBS_TO_SPAWN =  4         # TUNE:  1,  2,  4,   5,  10, ..
RUNS_TO_RUN   = 10         # TUNE: 10, 20, 50, 100, 200, 500, 1000, ..
SIZE_TO_XFER  = 1E+6       # TUNE: +6, +7, +8,  +9, +10, ..

DATA_TO_XFER  = [ 1 for _ in range( int( SIZE_TO_XFER ) ) ]

try:
     aClk.start()
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
     joblib.Parallel(  n_jobs = JOBS_TO_SPAWN
                      )( joblib.delayed( a_FAT_DATA_XFER_COSTS_FUN )
                                       ( a_FAT_DATA )
                                   for ( a_FAT_DATA )
                                   in  [       DATA_TO_XFER
                                         for _ in range( RUNS_TO_RUN )
                                         ]
                         )
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
except:
     pass
finally:
     try:
         _ = aClk.stop()
     except:
         _ = -1
         pass

template = "CLK:: {0:_>24d} [us] @{1: >3d} run{2: >5d} RUNS ( {3: >12.3f}[MB]"

print( template.format( _,
                        JOBS_TO_SPAWN,
                        RUNS_TO_RUN,
                        SIZE_TO_SEND / 1024. /1024.
                        )
       )

Please let me know of ways I can speedup this code.

  • learn about numba, definitely worth knowing this tool for performance boosting
  • learn about vectorisation of operations
  • after mastering these two, might look into re-formulating an already perfect code into Cython

rVEC = np.random.uniform( 1, 4, 1E+6 )

def flop_NaivePY( r, a, b ):
    return(       r+(a *b ) )

aClk.start(); _ = flop_NaivePY( rVEC, a, b ); aClk.stop()
4868L
4253L
4113L
4376L
4333L
4137L
4.~_____[ms] @ 1.000.000 FLOAT-OPS, COOL, RIGHT?

Yet, this code is awfully wrong if thinking about performance.

Let's turn on numpy in-place assignments, avoiding duplicate memory allocations and similar processing-inefficiencies:

def flop_InplaceNUMPY( r, a, b ):
       r += a * b
       return r

aClk.start(); _ = flop_InplaceNUMPY( rVEC, a, b ); aClk.stop()
2459L
2426L
2658L
2444L
2421L
2430L
2429L
4.??         @ 1.000.000 FLOAT-OPS, COOL, RIGHT? NOT AS SEEN NOW
2.~!____[ms] @ 1.000.000 FLOAT-OPS, HALF, BETTER!
                                          BUT
                                          ALSO TEST THE SCALING
                                          ONCE GONE OFF CACHE,
                                          THAT TEST GET SMELL OF A NEED
                                                              TO OPTIMISE
                                                              CODE DESIGN

Cautious experimentators will soon exhibit that later might be seen even killed python-process during the naive-code runs, as insufficient memory allocation request will get suffocated and panicked to terminate on larger sizes above ~1E+9 )

this all will bring otherwise pure-[SERIAL] code on steroids, yet without paying any but zero add-on costs and uncle Gene Amdahl will reward your process-scheduling and hardware-architecture knowledge and efforts spent during code-design on max.

No better advice exists . . . except going into a pure clairvoyance business, where re-testing is never available