How to improve my concurrent processing time in this code?

71 Views Asked by At

I have programmed the following code in Python:

from concurrent.futures import ProcessPoolExecutor, \
                               ThreadPoolExecutor,   \
                               as_completed
import random
import time

def addS(v):
    s=0
    start=time.time()
    for i in range(0,len(v)):
        s=s+v[i]
    start1=time.time()
    print ("sec time ",start1-start," sum is ",s)

def dummyFun(l):
    s=0
    for i in range(0,len(l)):
        s=s+l[i]
    return s

def main():
    workers=4
    v = [random.randint(1,101) for _ in range(1000000)]
    addS(v)
    dim=(int)(len(v)/(workers*10))
    s=0
        
    chunks=(v[k:k+dim] for k in range(0,len(v),(int)(len(v)/(workers*10))))
    start=time.time()
   
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures=[executor.submit(dummyFun,chunk) for chunk in chunks]
    
    start1=time.time()

    for future in as_completed(futures):
        s=s+future.result()
    print ("concurrent time ",start1-start," sum is ",s)

when I use the ProcessPoolExecutor() my results are the following:

       sec time  0.06529831886291504  sum is  51004089
concurrent time  0.5757372379302979   sum is  51004089

I have also changed to ThreadPoolExecutor() and the results are:

       sec time  0.06471061706542969  sum is  50981197
concurrent time  0.09298276901245117  sum is  50981197

Also, when I set up max_workers parameter as None, I got the following results:

       sec time  0.06425285339355469  sum is  50983899
concurrent time  0.09010934829711914  sum is  50983899 

How can I improve my concurrent time? because the sequential time keeps being faster all the time or is there a reason for this?

1

There are 1 best solutions below

2
On

Q : "How can I improve my concurrent time?"

Avoid all process-instantiation costs and use memory-I/O efficient vectorised code for the rest, best combined with numpy-broadcasting

s = np.random.randint( 1, 101, size = 1_000_000, dtype = np.int8 ).sum()

where hardware AVX-performance can outweight any other, software-only, tricks.

For testing, it is fair to first and always pre-set np.random.seed( seed = someHausNumero ) otherwise all tests cease to be repeatable, providing each time different results, which a serious, repeatable science never wants, does it?

:o)


thanks @user3666197, but I was dubious why my sequential time is shorter than the concurrent version? maybe it is because the list operations are optimized in Python? – Little 13 mins ago

Having a small amount of items ( 1E6 is a small foot-print of the data ), process-instantiations and similar ...PoolExecutor()-overheads introduce more costs than a split-work will ever be able to repay to at least a break-even state ( not paying more than ever getting back ). Try the same with off-cache, memory-I/O more intensive sizes, well above say 1E10+ bytes ( given you have RAM / swap-space enough, sure )

While the problem above is a "just"-[ CONCURRENT ] orchestration of the flow of processing-tasks, you may like to read a few details on add-on overhead costs and atomicity-of-work, that both influence the maximum practically achievable speedup, compared to a pure-[ SERIAL ] (or sequential ) flow of work >>> https://stackoverflow.com/revisions/18374629/3

The nature of the add-on costs is the same - parameters need SER/comms/DES + results need SER/comms/DES and all that comes at memory-allocation add-on costs and memory-I/O costs - for which see this >>> https://stackoverflow.com/questions/4087280/approximate-cost-to-access-various-caches-and-main-memory/33065382?r=SearchResults&s=2|0.0000#33065382