I am working on a project where I need to process a large dataset (50GB) with limited available memory (12GB). I have a function written in Pandas that performs grouping and calculates various statistics. However, due to memory constraints, I am looking to convert this function to use PyArrow, as I've heard that it can be more memory efficient. Here's the original Pandas function:
def prepare_summary_report(trades_multidx_df):
tc_trades_grp = trades_multidx_df.groupby(level=trades_multidx_df.index.names)
grp_profit_trades = trades_multidx_df[trades_multidx_df['PnL'] > 0].groupby(level=trades_multidx_df.index.names)
grp_loss_trades = trades_multidx_df[trades_multidx_df['PnL'] <= 0].groupby(level=trades_multidx_df.index.names)
grp_absoulte_profit = grp_profit_trades['PnL'].sum()
grp_abs_profit_cnt = grp_profit_trades['PnL'].count()
grp_absoulte_loss = grp_loss_trades['PnL'].sum()
grp_abs_loss_cnt = grp_loss_trades['PnL'].count()
grp_net_pnl = tc_trades_grp['PnL'].sum()
summary_df = pd.DataFrame(index=tc_trades_grp.indices.keys())
trade_cnt = tc_trades_grp['PnL'].count()
#More accurate to have start and end date based on candles/input from main
summary_df['dt_from'] = tc_trades_grp['Date'].min()
summary_df['dt_to'] = tc_trades_grp['Date'].max()
......... more stats......
I am looking for a way to convert this function to use PyArrow while ensuring that all operations are performed within the PyArrow environment - no Pandas. The priority is to handle the dataset in a memory-efficient way to avoid out-of-memory errors.
This was my initial failed mmap attempt:
def prepare_summary_report(trades_multidx_table):
# Use the specified grouping method
index_names = trades_multidx_table.schema.names[-6:]
grouped_table = pa.TableGroupBy(trades_multidx_table, keys=index_names)
# Create filters based on PnL values
positive_pnl_filter = pc.greater(trades_multidx_table.column('PnL'), 0)
negative_pnl_filter = pc.less_equal(trades_multidx_table.column('PnL'), 0)
.... works till here ....
FAILS in next step while doing .filter(positive_pnl_filter)
# Filter the table based on PnL values, then group and aggregate
grp_absoulte_profit = trades_multidx_table.filter(positive_pnl_filter).group_by(index_names).aggregate([('PnL', 'sum')]).column('PnL_sum')
grp_abs_profit_cnt = trades_multidx_table.filter(positive_pnl_filter).group_by(index_names).aggregate([('PnL', 'count')]).column('PnL_count')
grp_absoulte_loss = trades_multidx_table.filter(negative_pnl_filter).group_by(index_names).aggregate([('PnL', 'sum')]).column('PnL_sum')
grp_abs_loss_cnt = trades_multidx_table.filter(negative_pnl_filter).group_by(index_names).aggregate([('PnL', 'count')]).column('PnL_count')
grp_net_pnl = trades_multidx_table.group_by(index_names).aggregate([('PnL', 'sum')]).column('PnL_sum')
# Create the summary table
summary_table = pa.table({
'grp_absoulte_profit': grp_absoulte_profit,
'grp_abs_profit_cnt': grp_abs_profit_cnt,
'grp_absoulte_loss': grp_absoulte_loss,
'grp_abs_loss_cnt': grp_abs_loss_cnt,
'grp_net_pnl': grp_net_pnl,
'dt_from': trades_multidx_table.group_by(index_names).aggregate([('Date', 'min')]).column('Date_min'),
'dt_to': trades_multidx_table.group_by(index_names).aggregate([('Date', 'max')]).column('Date_max'),
})
return summary_table