Adding a Column to Polars DataFrame with Efficient Computation for List Manipulation

559 Views Asked by At

I'm seeking assistance in efficiently adding a new column to a Polars DataFrame that contains lists. These lists have elements added and removed based on certain conditions.

I try to explain with an example.

I have a box that can hold a maximum of 3 balls, each marked with an ID. With a column containing the ID and another column indicating the addition and removal of balls from the box, I need a new column that contains the IDs of the balls currently in the box. Balls are removed from the box by removing the ID of the last ball added (LIFO method). Original DataFrame:

# Example dataset
data = {
    'id': [5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26],
    'ball in the box': [0, 0, 1, 2, 3, 3, 3, 2, 1, 1, 0, 1, 2, 3, 2, 2, 1, 0, 0, 1, 2, 3]
}

┌─────┬───────────────────────────┐
│ id  ┆ delta balls in in the box │
│ --- ┆ ---                       │
│ i64 ┆ i64                       │
╞═════╪═══════════════════════════╡
│ 5   ┆ null                      │
│ 6   ┆ 0                         │
│ 7   ┆ 1                         │
│ 8   ┆ 1                         │
│ 9   ┆ 1                         │
│ 10  ┆ 0                         │
│ 11  ┆ 0                         │
│ 12  ┆ -1                        │
│ 13  ┆ -1                        │
│ 14  ┆ 0                         │
│ 15  ┆ -1                        │
│ 16  ┆ 1                         │
│ 17  ┆ 1                         │
│ 18  ┆ 1                         │
│ 19  ┆ -1                        │
│ 20  ┆ 0                         │
│ 21  ┆ -1                        │
│ 22  ┆ -1                        │
│ 23  ┆ 0                         │
│ 24  ┆ 1                         │
│ 25  ┆ 1                         │
│ 26  ┆ 1                         │
└─────┴───────────────────────────┘

Result DataFrame:

┌─────┬───────────────────────────┬──────────────────────────┐
│ id  ┆ delta balls in in the box ┆ balls id kept in the box │
│ --- ┆ ---                       ┆ ---                      │
│ i64 ┆ i64                       ┆ list[i64]                │
╞═════╪═══════════════════════════╪══════════════════════════╡
│ 5   ┆ null                      ┆ []                       │
│ 6   ┆ 0                         ┆ []                       │
│ 7   ┆ 1                         ┆ [7]                      │
│ 8   ┆ 1                         ┆ [7, 8]                   │
│ 9   ┆ 1                         ┆ [7, 8, 9]                │
│ 10  ┆ 0                         ┆ [7, 8, 9]                │
│ 11  ┆ 0                         ┆ [7, 8, 9]                │
│ 12  ┆ -1                        ┆ [7, 8]                   │
│ 13  ┆ -1                        ┆ [7]                      │
│ 14  ┆ 0                         ┆ [7]                      │
│ 15  ┆ -1                        ┆ []                       │
│ 16  ┆ 1                         ┆ [16]                     │
│ 17  ┆ 1                         ┆ [16, 17]                 │
│ 18  ┆ 1                         ┆ [16, 17, 18]             │
│ 19  ┆ -1                        ┆ [16, 17]                 │
│ 20  ┆ 0                         ┆ [16, 17]                 │
│ 21  ┆ -1                        ┆ [16]                     │
│ 22  ┆ -1                        ┆ []                       │
│ 23  ┆ 0                         ┆ []                       │
│ 24  ┆ 1                         ┆ [24]                     │
│ 25  ┆ 1                         ┆ [24, 25]                 │
│ 26  ┆ 1                         ┆ [24, 25, 26]             │
└─────┴───────────────────────────┴──────────────────────────┘

I've managed to achieve this using lambda functions, .iter_rows(), and other methods. However, the execution speed is inadequate for more complex DataFrames. Below, I present the code I used:

import polars as pl

def entries_list_generator(df_pl: pl.DataFrame, value_to_add_column_name: str, delta_entries_column_name: str):
    """
    This function returns a series where each cell contains the list of open positions.
    On each new addition, the cell should add the value of the new ball.
    On each new removal, the last entry from the list should be removed using the LIFO method.
    If no action is taken, the cell should retain the value of the previous cell.
    """
    
    balls_list = []
    
    for i, row in enumerate(df_pl.iter_rows(named=True)):
        if i == 0:
            balls_list.append([])  # Initialize the list for the first row
            continue
        temp = balls_list[-1].copy()  # Copy the list from the previous row
        
        delta = df[i][delta_entries_column_name].item() - df[i - 1][delta_entries_column_name].item()
        
        if delta > 0: # Add value to the list with a positive change
            temp.extend([df[i][value_to_add_column_name].item()])
            balls_list.append(temp.copy())
        elif delta < 0: # Remove last entry (LIFO) with a negative change
            temp.pop()
            balls_list.append(temp)
        elif delta == 0: # Repeat the last value with no change
            balls_list.append(temp.copy())
        else: # Raise an error for unexpected results
            raise ValueError("Check here")
    
    return pl.Series(balls_list)

# Configure to display DataFrame
pl.Config(tbl_rows=100, set_tbl_cols=30, set_tbl_width_chars=900)

# Example dataset
data = {
    'id': [5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26],
    'ball in the box': [0, 0, 1, 2, 3, 3, 3, 2, 1, 1, 0, 1, 2, 3, 2, 2, 1, 0, 0, 1, 2, 3]
}

df = pl.DataFrame(data)
df = df.with_columns(
    (pl.col('ball in the box') - pl.col('ball in the box').shift(1)).alias('delta balls in in the box')
)

# Column name for ball IDs
value_to_add_column_name = 'id'

# Column name for the change in ball entries
delta_entries_column_name = 'ball in the box'

# Add the column for balls kept in the box
df = df.with_columns(
    entries_list_generator(df,
                           value_to_add_column_name=value_to_add_column_name,
                           delta_entries_column_name=delta_entries_column_name)
    .alias("balls id kept in the box")
)

Feel free to use this revised content when posting your question on Stack Overflow.

2

There are 2 best solutions below

2
keraion On BEST ANSWER

Updated Answer


I've rewritten my original answer to add some performance improvements and reduced memory footprint.

  • Added lazy evaluation.
  • The box lists are now a dataframe that is asof_joined instead of filters.
  • lifo_level is not returned as a pl.Series, removing the need for the looped join
  • Use the list.set_difference to remove the null values from the concatenated list
import polars as pl


def lifo_level(
    df_lifo: pl.DataFrame,
    level: int,
    value_to_add_column_name: str,
    delta_entries_column_name: str,
    delta_box_column_name: str,
):
    box_add_expr = (
        (pl.col(delta_entries_column_name) == level + 1)
        & (pl.col(delta_box_column_name) == 1)
    ).arg_true()
    box_drop_expr = (
        (
            (pl.col(delta_entries_column_name) == level)
            & (pl.col(delta_box_column_name) == -1)
        )
        .arg_true()
        .append(None)
    )

    df_boxes = df_lifo.select(
        box_add=box_add_expr,
        box_drop=box_drop_expr.implode().list.slice(0, box_add_expr.len()).explode(),
    ).set_sorted("box_add")

    df_lifo = df_lifo.join_asof(
        df_boxes, left_on="row_nr", right_on="box_add"
    ).with_columns(
        box_add=pl.when(
            pl.col("row_nr") < pl.coalesce("box_drop", pl.col("row_nr") + 1)
        ).then(pl.col("box_add"))
    )

    return (
        df_lifo.join(
            df_lifo.select("row_nr", "id"),
            left_on="box_add",
            right_on="row_nr",
            how="left",
            suffix=f"_{level}",
        )
        .collect()
        .to_series(-1)
    )


def entries_list_generator(
    df_pl: pl.DataFrame,
    value_to_add_column_name: str,
    delta_entries_column_name: str,
    delta_box_column_name: str,
    levels: int,
):
    """
    This function returns a series where each cell contains the list of open positions.
    On each new addition, the cell should add the value of the new ball.
    On each new removal, the last entry from the list should be removed using the LIFO method.
    If no action is taken, the cell should retain the value of the previous cell.
    """

    df_pl = (
        df_pl.with_row_count()
        .with_columns(null_list=pl.concat_list(pl.lit(None).cast(pl.Int64)))
        .lazy()
    )

    df_pl = df_pl.select(
        pl.concat_list(
            [
                lifo_level(
                    df_pl,
                    n,
                    value_to_add_column_name,
                    delta_entries_column_name,
                    delta_box_column_name,
                )
                for n in range(levels)
            ]
        ).list.set_difference("null_list")
    ).collect()

    return df_pl.to_series(0)

Testing:

from random import randint
import polars as pl

# Column name for ball IDs
value_to_add_column_name = "id"

# Column name for the change in ball entries
delta_entries_column_name = "ball_in_box"
delta_box_column_name = "delta_in_box"

# Test with 10 million rows
id = range(5, 10_000_005)
balls = [0]
for n in range(len(id) - 1):
    balls.append(max(0, min(3, balls[-1] + randint(-1, 1))))


df = pl.DataFrame(
    {
        "id": id,
        "ball_in_box": balls,
    }
).with_columns(
    pl.col(delta_entries_column_name).diff().alias(delta_box_column_name)
)

df = df.with_columns(
    entries_list_generator(
        df,
        value_to_add_column_name=value_to_add_column_name,
        delta_entries_column_name=delta_entries_column_name,
        delta_box_column_name=delta_box_column_name,
        levels=3,
    ).alias("balls id kept in the box")
)

Results:

shape: (10_000_000, 4)
┌──────────┬─────────────┬──────────────┬──────────────────────────────┐
│ id       ┆ ball_in_box ┆ delta_in_box ┆ balls id kept in the box     │
│ ---      ┆ ---         ┆ ---          ┆ ---                          │
│ i64      ┆ i64         ┆ i64          ┆ list[i64]                    │
╞══════════╪═════════════╪══════════════╪══════════════════════════════╡
│ 5        ┆ 0           ┆ null         ┆ []                           │
│ 6        ┆ 0           ┆ 0            ┆ []                           │
│ 7        ┆ 0           ┆ 0            ┆ []                           │
│ 8        ┆ 1           ┆ 1            ┆ [8]                          │
│ …        ┆ …           ┆ …            ┆ …                            │
│ 10000001 ┆ 3           ┆ 1            ┆ [9999982, 9999983, 10000001] │
│ 10000002 ┆ 2           ┆ -1           ┆ [9999982, 9999983]           │
│ 10000003 ┆ 2           ┆ 0            ┆ [9999982, 9999983]           │
│ 10000004 ┆ 3           ┆ 1            ┆ [9999982, 9999983, 10000004] │
└──────────┴─────────────┴──────────────┴──────────────────────────────┘
0:00:06.461064

Original Answer


With this approach, the lifo_level function builds out a set of ranges per level.

For example, level 1 will look like this:

┌────────┬─────┬─────────┬──────────┐
│ row_nr ┆ id  ┆ box_add ┆ box_drop │
│ ---    ┆ --- ┆ ---     ┆ ---      │
│ u32    ┆ i64 ┆ u32     ┆ u32      │
╞════════╪═════╪═════════╪══════════╡
│ 2      ┆ 7   ┆ 2       ┆ 10       │
│ 3      ┆ 8   ┆ 2       ┆ 10       │
│ 4      ┆ 9   ┆ 2       ┆ 10       │
│ 5      ┆ 10  ┆ 2       ┆ 10       │
│ 6      ┆ 11  ┆ 2       ┆ 10       │
│ 7      ┆ 12  ┆ 2       ┆ 10       │
│ 8      ┆ 13  ┆ 2       ┆ 10       │
│ 9      ┆ 14  ┆ 2       ┆ 10       │
│ 11     ┆ 16  ┆ 11      ┆ 17       │
│ 12     ┆ 17  ┆ 11      ┆ 17       │
│ 13     ┆ 18  ┆ 11      ┆ 17       │
│ 14     ┆ 19  ┆ 11      ┆ 17       │
│ 15     ┆ 20  ┆ 11      ┆ 17       │
│ 16     ┆ 21  ┆ 11      ┆ 17       │
│ 19     ┆ 24  ┆ 19      ┆ null     │
│ 20     ┆ 25  ┆ 19      ┆ null     │
│ 21     ┆ 26  ┆ 19      ┆ null     │
└────────┴─────┴─────────┴──────────┘

This tells us that level 1 will be populated by the matching row number. (e.g. row_nr of 2 is id 7, so for rows 2 through but not including 10 will have 7 in the first position.) The null for box_drop will be used as an unbounded end.

Repeat the lifo_level lookup for each level (3 in this this case) and join the results. Afterwards, use concat_list with a filtered aggregation to drop nulls to get the resulting series.

import polars as pl

def entries_list_generator(
    df_pl: pl.DataFrame,
    value_to_add_column_name: str,
    delta_entries_column_name: str,
    delta_box_column_name: str,
    levels: int,
):
    """
    This function returns a series where each cell contains the list of open positions.
    On each new addition, the cell should add the value of the new ball.
    On each new removal, the last entry from the list should be removed using the LIFO method.
    If no action is taken, the cell should retain the value of the previous cell.
    """

    def lifo_level(
        df_lifo: pl.DataFrame,
        level: int,
        value_to_add_column_name: str,
        delta_entries_column_name: str,
        delta_box_column_name: str,
    ):
        box_add_expr = (
            (
                (pl.col(delta_entries_column_name) == level + 1)
                & (pl.col(delta_box_column_name) == 1)
            )
            .arg_true()
            .implode()
        )
        box_drop_expr = (
            (
                (pl.col(delta_entries_column_name) == level)
                & (pl.col(delta_box_column_name) == -1)
            )
            .arg_true()
            .append(None) # The two box expressions need the same number of results.
            .implode()
        )

        return (
            df_lifo.with_columns(
                box_add=box_add_expr,
                box_drop=box_drop_expr.list.slice(0, box_add_expr.list.lengths()), # Slice drop to the same length as add
            )
            .explode("box_add", "box_drop")
            .filter(
                pl.col("row_nr").is_between(
                    "box_add",
                    pl.coalesce("box_drop", pl.col("row_nr") + 1), # coalesce to row_nr + 1 for unbounded end
                    closed="left",
                )
            )
            .join(
                df_lifo,
                left_on="box_add",
                right_on="row_nr",
                how="inner",
                suffix=f"_{level}",
            )
            .select("row_nr", f"{value_to_add_column_name}_{level}")
        )

    df_pl = df_pl.with_row_count()

    # loop over each level, joining the results.
    for level in range(levels):
        df_pl = df_pl.join(
            df_pl.pipe(
                lifo_level,
                level,
                value_to_add_column_name,
                delta_entries_column_name,
                delta_box_column_name,
            ),
            on="row_nr",
            how="left",
        )

    # concatenate the levels together, dropping the null values.
    return pl.concat_list(
        df_pl.groupby("row_nr")
        .agg(pl.col(rf"^{value_to_add_column_name}_\d+$").drop_nulls())
        .drop("row_nr")
    )

df = pl.DataFrame({
    'id': [5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26],
    'ball_in_box': [0, 0, 1, 2, 3, 3, 3, 2, 1, 1, 0, 1, 2, 3, 2, 2, 1, 0, 0, 1, 2, 3]
}).with_columns(
    (
        pl.col("ball_in_box") - pl.col("ball_in_box").shift(1)
    ).alias("box_delta")
)

df = df.with_columns(
    entries_list_generator(
        df,
        value_to_add_column_name="id",
        delta_entries_column_name="ball_in_box",
        delta_box_column_name="box_delta",
        levels=3,
    ).alias("balls_id_kept_in_the_box")
)

print(df)

The results:

shape: (22, 4)
┌─────┬─────────────┬──────────────┬──────────────────────────┐
│ id  ┆ ball_in_box ┆ box_delta    ┆ balls_id_kept_in_the_box │
│ --- ┆ ---         ┆ ---          ┆ ---                      │
│ i64 ┆ i64         ┆ i64          ┆ list[i64]                │
╞═════╪═════════════╪══════════════╪══════════════════════════╡
│ 5   ┆ 0           ┆ null         ┆ []                       │
│ 6   ┆ 0           ┆ 0            ┆ []                       │
│ 7   ┆ 1           ┆ 1            ┆ [7]                      │
│ 8   ┆ 2           ┆ 1            ┆ [7, 8]                   │
│ 9   ┆ 3           ┆ 1            ┆ [7, 8, 9]                │
│ 10  ┆ 3           ┆ 0            ┆ [7, 8, 9]                │
│ 11  ┆ 3           ┆ 0            ┆ [7, 8, 9]                │
│ 12  ┆ 2           ┆ -1           ┆ [7, 8]                   │
│ 13  ┆ 1           ┆ -1           ┆ [7]                      │
│ 14  ┆ 1           ┆ 0            ┆ [7]                      │
│ 15  ┆ 0           ┆ -1           ┆ []                       │
│ 16  ┆ 1           ┆ 1            ┆ [16]                     │
│ 17  ┆ 2           ┆ 1            ┆ [16, 17]                 │
│ 18  ┆ 3           ┆ 1            ┆ [16, 17, 18]             │
│ 19  ┆ 2           ┆ -1           ┆ [16, 17]                 │
│ 20  ┆ 2           ┆ 0            ┆ [16, 17]                 │
│ 21  ┆ 1           ┆ -1           ┆ [16]                     │
│ 22  ┆ 0           ┆ -1           ┆ []                       │
│ 23  ┆ 0           ┆ 0            ┆ []                       │
│ 24  ┆ 1           ┆ 1            ┆ [24]                     │
│ 25  ┆ 2           ┆ 1            ┆ [24, 25]                 │
│ 26  ┆ 3           ┆ 1            ┆ [24, 25, 26]             │
└─────┴─────────────┴──────────────┴──────────────────────────┘

Note this should work even if the box is not completely cleared, but assuming each step only adds or removes a single ball.

0
Wayoshi On

EDIT: Not a full answer, sadly.

Thanks to pl.int_ranges, a somewhat recent add to Polars, I thought this was a straightforward answer...

df = pl.from_dict(data)

# I renamed some columns in my version of data
df.with_columns(
    delta=pl.col('ball_count').diff(),
    ball_box_ids=pl.int_ranges(
        pl.col('id') - pl.col('ball_count') + 1, pl.col('id') + 1
    ),
)
shape: (22, 4)
┌─────┬────────────┬───────┬──────────────┐
│ id  ┆ ball_count ┆ delta ┆ ball_box_ids │
│ --- ┆ ---        ┆ ---   ┆ ---          │
│ i64 ┆ i64        ┆ i64   ┆ list[i64]    │
╞═════╪════════════╪═══════╪══════════════╡
│ 5   ┆ 0          ┆ null  ┆ []           │
│ 6   ┆ 0          ┆ 0     ┆ []           │
│ 7   ┆ 1          ┆ 1     ┆ [7]          │
│ 8   ┆ 2          ┆ 1     ┆ [7, 8]       │
│ 9   ┆ 3          ┆ 1     ┆ [7, 8, 9]    │
│ 10  ┆ 3          ┆ 0     ┆ [8, 9, 10]   │
│ 11  ┆ 3          ┆ 0     ┆ [9, 10, 11]  │
│ 12  ┆ 2          ┆ -1    ┆ [11, 12]     │
│ 13  ┆ 1          ┆ -1    ┆ [13]         │
│ 14  ┆ 1          ┆ 0     ┆ [14]         │
│ 15  ┆ 0          ┆ -1    ┆ []           │
│ 16  ┆ 1          ┆ 1     ┆ [16]         │
│ 17  ┆ 2          ┆ 1     ┆ [16, 17]     │
│ 18  ┆ 3          ┆ 1     ┆ [16, 17, 18] │
│ 19  ┆ 2          ┆ -1    ┆ [18, 19]     │
│ 20  ┆ 2          ┆ 0     ┆ [19, 20]     │
│ 21  ┆ 1          ┆ -1    ┆ [21]         │
│ 22  ┆ 0          ┆ -1    ┆ []           │
│ 23  ┆ 0          ┆ 0     ┆ []           │
│ 24  ┆ 1          ┆ 1     ┆ [24]         │
│ 25  ┆ 2          ┆ 1     ┆ [24, 25]     │
│ 26  ┆ 3          ┆ 1     ┆ [24, 25, 26] │
└─────┴────────────┴───────┴──────────────┘

... But this is the FIFO (queue) way! Arrgh! I am still posting this partial answer in hopes someone can pick up where I left off.

I tried looking at cumulative_eval etc. but I don't think it works directly - columns depending on the previous calculated row value, in general, I don't think there's any way to make that efficient.

If the box, once it starts to empty, is fully emptied before adding any more balls, I played around and found this worked, adjusting the above calculated int_ranges by a complicated expression:

x = (
    pl.col('delta')
    .map_dict({1: 0, 0: 1, -1: 2})
    .cumsum()
    .over((pl.col('ball_count') == 0).rle_id())
)
df.with_columns(
    ball_box_ids=pl.int_ranges(
        pl.col('id') - pl.col('ball_count') - x + 1, pl.col('id') - x + 1
    )
)

But that is only a subset of the standard problem (and it breaks on changes of 2 or more, too).