Data Manipulation in PyArrow for Large Datasets

115 Views Asked by At

I am working on a project where I need to process a large dataset (50GB) with limited available memory (12GB). I have a function written in Pandas that performs grouping and calculates various statistics. However, due to memory constraints, I am looking to convert this function to use PyArrow, as I've heard that it can be more memory efficient. Here's the original Pandas function:

def prepare_summary_report(trades_multidx_df):
    tc_trades_grp = trades_multidx_df.groupby(level=trades_multidx_df.index.names)
    grp_profit_trades = trades_multidx_df[trades_multidx_df['PnL'] > 0].groupby(level=trades_multidx_df.index.names)
    grp_loss_trades = trades_multidx_df[trades_multidx_df['PnL'] <= 0].groupby(level=trades_multidx_df.index.names)

    grp_absoulte_profit = grp_profit_trades['PnL'].sum()
    grp_abs_profit_cnt = grp_profit_trades['PnL'].count()
    grp_absoulte_loss = grp_loss_trades['PnL'].sum()
    grp_abs_loss_cnt = grp_loss_trades['PnL'].count()
    grp_net_pnl = tc_trades_grp['PnL'].sum()

    summary_df = pd.DataFrame(index=tc_trades_grp.indices.keys())

    trade_cnt = tc_trades_grp['PnL'].count()
    #More accurate to have start and end date based on candles/input from main
    summary_df['dt_from'] = tc_trades_grp['Date'].min()
    summary_df['dt_to'] = tc_trades_grp['Date'].max()
    ......... more stats......

I am looking for a way to convert this function to use PyArrow while ensuring that all operations are performed within the PyArrow environment - no Pandas. The priority is to handle the dataset in a memory-efficient way to avoid out-of-memory errors.

This was my initial failed mmap attempt:

def prepare_summary_report(trades_multidx_table):
    # Use the specified grouping method
    index_names = trades_multidx_table.schema.names[-6:]
    grouped_table = pa.TableGroupBy(trades_multidx_table, keys=index_names)

    # Create filters based on PnL values
    positive_pnl_filter = pc.greater(trades_multidx_table.column('PnL'), 0)
    negative_pnl_filter = pc.less_equal(trades_multidx_table.column('PnL'), 0)

    .... works till here ....
    FAILS in next step while doing .filter(positive_pnl_filter)

    # Filter the table based on PnL values, then group and aggregate
    grp_absoulte_profit = trades_multidx_table.filter(positive_pnl_filter).group_by(index_names).aggregate([('PnL', 'sum')]).column('PnL_sum')
    grp_abs_profit_cnt = trades_multidx_table.filter(positive_pnl_filter).group_by(index_names).aggregate([('PnL', 'count')]).column('PnL_count')
    grp_absoulte_loss = trades_multidx_table.filter(negative_pnl_filter).group_by(index_names).aggregate([('PnL', 'sum')]).column('PnL_sum')
    grp_abs_loss_cnt = trades_multidx_table.filter(negative_pnl_filter).group_by(index_names).aggregate([('PnL', 'count')]).column('PnL_count')
    grp_net_pnl = trades_multidx_table.group_by(index_names).aggregate([('PnL', 'sum')]).column('PnL_sum')

    # Create the summary table
    summary_table = pa.table({
        'grp_absoulte_profit': grp_absoulte_profit,
        'grp_abs_profit_cnt': grp_abs_profit_cnt,
        'grp_absoulte_loss': grp_absoulte_loss,
        'grp_abs_loss_cnt': grp_abs_loss_cnt,
        'grp_net_pnl': grp_net_pnl,
        'dt_from': trades_multidx_table.group_by(index_names).aggregate([('Date', 'min')]).column('Date_min'),
        'dt_to': trades_multidx_table.group_by(index_names).aggregate([('Date', 'max')]).column('Date_max'),
    })

    return summary_table
0

There are 0 best solutions below