pandas multiprocessing - using global objects in all processes

53 Views Asked by At

I need to execute the function by different processes, while each process should be formed according to the table data (months) and the function itself should receive dataframes.

I hope someone will write the code for the correct call of multiprocessing in this case

The programs will work something like this:

I have a function with data manipulation which needs to get three dataframes (df1,df2,df3).

def manipulation(df1,df2,df3):

    """
    reading data from a data frame and doing all sorts of data manipulation 
    for multiprocessing
    """

    return df

This function will return 1 dataframe as result which I want to concatenate with results from all processes.

Processes and data should be divided by months (a field in dataframes)

Months = pd.DatetimeIndex(data['Date']).month.drop_duplicates().tolist()
df1 = df1[pd.DatetimeIndex(df1['Date']).month == m]
df2 = df1[pd.DatetimeIndex(df2['Date']).month == m]
df3 = df1[pd.DatetimeIndex(df3['Date']).month == m]

I have just several ideas and do not know how to realize them correctly:

  1. use Starmap and pass to the function list of lists of dataframes as arg:

    arg = [ [df1[pd.DatetimeIndex(df1['Date']).month == 1],df1[pd.DatetimeIndex(df2['Date']).month == 1],df1[pd.DatetimeIndex(df2['Date']).month == 1]], [df1[pd.DatetimeIndex(df1['Date']).month == 2],df1[pd.DatetimeIndex(df2['Date']).month == 2],df1[pd.DatetimeIndex(df2['Date']).month == 2]].....]
    
    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    results = pool.map(manipulation, arg)
    pool.close()
    pool.join()
    results_df = pd.concat(results)
    

    So here I have a question how to create such a list and will it compute by months in this case?

  2. create processes in loop and join them:

    processes = []
    for index, month in enumerate(Months):
        p = multiprocessing.Process(target=manipulation, args=(month,))
        processes.append(p)
        p.start()
    
    # Wait for all processes to finish
    for p in processes:
        p.join()

But how to transmit a dataframes (df1,df2,df3) in this case?

1

There are 1 best solutions below

0
Daneel Ank On

This example suits well:

import multiprocessing

def my_func(df):
   df1 = pd.read...
   df2 = pd.read...
   df3 = pd.read...
   # modify df here



   return df1

if __name__ == "__main__":
    df = pd.DataFrame({'month': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
    with multiprocessing.Pool(processes = (mp.cpu_count() - 1)) as pool:
        groups = (g for _, g in df.groupby("month"))
        print(df)
        print(groups)
        out = []
        for res in pool.imap_unordered(my_func, groups):
            print()
            print(res)
            out.append(res)
    final_df = pd.concat(out)