How to speed up (parallelize) a grouped row-wise rolling mean calculation?

173 Views Asked by At

I am calculating a grouped row-wise moving average on a large data set. However, the process takes a too long time on a single thread. How can I efficiently speed up the process?

Please find a reproducible example below:

dataframe = pd.DataFrame({'id': range(2),
                   'group_id': range(2),
                   'Date_1_F1': [1,2], 
                   'Date_2_F1': [2,4], 
                   'Date_3_F1': [3, 6],
                   'Date_4_F1': [4,8], 
                   'Date_1_F2': [2,11], 
                   'Date_2_F2': [6, 13], 
                   'Date_3-F2': [10, 15],
                   'Date_4_F2': [14, 17]})
dataframe

   id  group_id  Date_1_F1  ...  Date_2_F2  Date_3-F2  Date_4_F2
0   0         0          1  ...          6         10         14
1   1         1          2  ...         13         15         17

I have a function that returns the (row-wise) smoothed version of the dataset.

def smooth_ts(dataframe, ma_parameter = 2):

    dataframe = (dataframe
                 .set_index(["id", "group_id"])
                 .groupby(lambda x: x.split("_")[-1], axis = 1, group_keys=False)
                 .apply(lambda x: x.rolling(ma_parameter, axis = 1)
                        .mean()
                        .dropna(axis=1, how='all')))
    
    dataframe.reset_index(inplace = True)
    
    return dataframe

smoothed_df = smooth_ts(dataframe)

Thank you very much

1

There are 1 best solutions below

3
ko3 On BEST ANSWER

You could (1) melt your data frame using pd.melt, (2) create your grouping variable, (3) sort and group it aggregated by rolling.mean(2). Then you can use df.pivot to display the required data. In this approach, there is an apply method that can be parallelized using swifter. Here is an example:

import pandas as pd
import numpy as np

import swifter

dataframe = pd.DataFrame({'id': range(2),
                   'group_id': range(2),
                   'Date_1_F1': [1,2], 
                   'Date_2_F1': [2,4], 
                   'Date_3_F1': [3, 6],
                   'Date_4_F1': [4,8], 
                   'Date_1_F2': [2,11], 
                   'Date_2_F2': [6, 13], 
                   'Date_3-F2': [10, 15],
                   'Date_4_F2': [14, 17]})

df_melted = pd.melt(dataframe, id_vars=['id', 'group_id'])
# Use next line if you want to parallelize the apply method 
# df_melted['groups'] = df_melted['variable'].str.split('_').swifter.apply(lambda v: v[-1])
df_melted['groups'] = df_melted['variable'].str.split('_').apply(lambda v: v[-1])

df_melted = df_melted.sort_values(['id', 'group_id', 'groups'])

df_tmp = df_melted.copy()
df_tmp['rolling_val'] = df_tmp.groupby(['id', 'group_id', 'groups'])['value'].rolling(2).mean().values
df_tmp.pivot(index=['id', 'group_id'], columns='variable', values='rolling_val').dropna(axis=1).reset_index().rename_axis(None, axis=1)

If you want to stick to your approach, you can accelerate it using the Pool object from the multiprocessing library, which parallelizes the mapping of a function to an iterator.

import pandas as pd
import numpy as np
from multiprocessing import Pool


dataframe = pd.DataFrame({'id': range(2),
                   'group_id': range(2),
                   'Date_1_F1': [1,2], 
                   'Date_2_F1': [2,4], 
                   'Date_3_F1': [3, 6],
                   'Date_4_F1': [4,8], 
                   'Date_1_F2': [2,11], 
                   'Date_2_F2': [6, 13], 
                   'Date_3-F2': [10, 15],
                   'Date_4_F2': [14, 17]})
dataframe

def smooth_ts(dataframe, ma_parameter = 2):

    dataframe = (dataframe
                 .set_index(["id", "group_id"])
                 .groupby(lambda x: x.split("_")[-1], axis = 1, group_keys=False)
                 .apply(lambda x: x.rolling(ma_parameter, axis = 1)
                        .mean()
                        .dropna(axis=1, how='all')))
    
    dataframe.reset_index(inplace = True)
    
    return dataframe

id_chunks = np.array_split(dataframe.id.unique(), 2) # 2 : number of splits => corresponds to number of chunks
df_chunks = [dataframe[dataframe['id'].isin(i)] for i in id_chunks] # list containing chunked data frames
with Pool(2) as p: dfs_chunks = p.map(smooth_ts, df_chunks) # applies function smooth_ts to list of data frames, use two processors as dfs_chunks only contain two data frames. For more chunks, number of processors can be increased
pd.concat(dfs_chunks).reset_index(drop=True)