I need to execute the function by different processes, while each process should be formed according to the table data (months) and the function itself should receive dataframes.
I hope someone will write the code for the correct call of multiprocessing in this case
The programs will work something like this:
I have a function with data manipulation which needs to get three dataframes (df1,df2,df3).
def manipulation(df1,df2,df3):
"""
reading data from a data frame and doing all sorts of data manipulation
for multiprocessing
"""
return df
This function will return 1 dataframe as result which I want to concatenate with results from all processes.
Processes and data should be divided by months (a field in dataframes)
Months = pd.DatetimeIndex(data['Date']).month.drop_duplicates().tolist()
df1 = df1[pd.DatetimeIndex(df1['Date']).month == m]
df2 = df1[pd.DatetimeIndex(df2['Date']).month == m]
df3 = df1[pd.DatetimeIndex(df3['Date']).month == m]
I have just several ideas and do not know how to realize them correctly:
use Starmap and pass to the function list of lists of dataframes as arg:
arg = [ [df1[pd.DatetimeIndex(df1['Date']).month == 1],df1[pd.DatetimeIndex(df2['Date']).month == 1],df1[pd.DatetimeIndex(df2['Date']).month == 1]], [df1[pd.DatetimeIndex(df1['Date']).month == 2],df1[pd.DatetimeIndex(df2['Date']).month == 2],df1[pd.DatetimeIndex(df2['Date']).month == 2]].....]pool = mp.Pool(processes = (mp.cpu_count() - 1)) results = pool.map(manipulation, arg) pool.close() pool.join() results_df = pd.concat(results)So here I have a question how to create such a list and will it compute by months in this case?
create processes in loop and join them:
processes = []
for index, month in enumerate(Months):
p = multiprocessing.Process(target=manipulation, args=(month,))
processes.append(p)
p.start()
# Wait for all processes to finish
for p in processes:
p.join()
But how to transmit a dataframes (df1,df2,df3) in this case?
This example suits well: