How to reduce time for multiprocessing in python

1.5k Views Asked by At

I am trying to build multiprocessing in python to reduce computation speed, but it seems like after multiprocessing, the overall speed of computation decreased significantly. I have created 4 different processes and split dataFrame into 4 different dataframe, which will be an input to each processes. After timing each process, it seems like the overhead cost is significant, and was wondering if there is way to reduce these overhead costs.

I am using windows7, python 3.5 and my machine has 8 cores.

def doSomething(args, dataPassed,):

    processing data, and calculating outputs

def parallelize_dataframe(df, nestedApply):
    df_split = np.array_split(df, 4)
    pool = multiprocessing.Pool(4)
    df = pool.map(nestedApply, df_split)
    print ('finished with Simulation')
    time = float((dt.datetime.now() - startTime).total_seconds())

    pool.close()
    pool.join()

def nestedApply(df):

    func2 = partial(doSomething, args=())
    res = df.apply(func2, axis=1)
    res =  [output Tables]
    return res

if __name__ == '__main__':

data = pd.read_sql_query(query, conn)

parallelize_dataframe(data, nestedApply)
1

There are 1 best solutions below

0
On BEST ANSWER

I would suggest to use queues instead of providing your DataFrame as chunks. You need a lot of ressources to copy each chunk and it takes quite some time to do so. You could run out of memory if your DataFrame is really big. Using queues you could benefit from fast iterators in pandas. Here is my approach. The overhead reduces with the complexity of your workers. Unfortunately, my workers are far to simple to really show that, but sleep simulates complexity a bit.

import pandas as pd
import multiprocessing as mp
import numpy as np
import time


def worker(in_queue, out_queue):
    for row in iter(in_queue.get, 'STOP'):
        value = (row[1] * row[2] / row[3]) + row[4]
        time.sleep(0.1)
        out_queue.put((row[0], value))

if __name__ == "__main__":
    # fill a DataFrame
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD'))

    in_queue = mp.Queue()
    out_queue = mp.Queue()

    # setup workers
    numProc = 2
    process = [mp.Process(target=worker,
                          args=(in_queue, out_queue)) for x in range(numProc)]

    # run processes
    for p in process:
        p.start()

    # iterator over rows
    it = df.itertuples()

    # fill queue and get data
    # code fills the queue until a new element is available in the output
    # fill blocks if no slot is available in the in_queue
    for i in range(len(df)):
        while out_queue.empty():
            # fill the queue
            try:
                row = next(it)
                in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True)  # row = (index, A, B, C, D) tuple
            except StopIteration:
                break
        row_data = out_queue.get()
        df.loc[row_data[0], "Result"] = row_data[1]

    # signals for processes stop
    for p in process:
        in_queue.put('STOP')

    # wait for processes to finish
    for p in process:
        p.join()

Using numProc = 2 it takes 50sec per loop, with numProc = 4 it is twice as fast.