python cuDF groupby apply with ordered data

818 Views Asked by At

I have some ordered data where there is a hierarchy of events. Each column is a unique id of an event with respect to the event above it in the hierarchy. Something similar to how each day number is unique in a month, and each month number is unique in a year. I want to get the lowest level to be unique within the highest level, like making every day unique in a year by numbering from 1 to 365. My use case is not specific to days, months, and years.

Before:

| ID | EVENT_1 | EVENT_2 | EVENT_3 |
| -- | ------- | ------- | ------- |
|  1 |       1 |       1 |       1 |
|  1 |       1 |       1 |       2 |
|  1 |       1 |       1 |       3 |
|  1 |       1 |       2 |       1 |
|  1 |       1 |       2 |       2 |
|  1 |       1 |       3 |       1 |
|  1 |       1 |       3 |       2 |
|  1 |       2 |       1 |       1 |
|  1 |       2 |       1 |       2 |

After:

| ID | EVENT_1 | EVENT_2 | EVENT_3 | EVENT_3A |
| -- | ------- | ------- | ------- | -------- |
|  1 |       1 |       1 |       1 |        1 |
|  1 |       1 |       1 |       2 |        2 |
|  1 |       1 |       1 |       3 |        3 |
|  1 |       1 |       2 |       1 |        4 |
|  1 |       1 |       2 |       2 |        5 |
|  1 |       1 |       3 |       1 |        6 |
|  1 |       1 |       3 |       2 |        7 |
|  1 |       2 |       1 |       1 |        1 |
|  1 |       2 |       1 |       2 |        2 |

The goal is to get a column where for each id, there is an EVENT_3A such that EVENT_3A is the order in which EVENT_3 happens with respect to EVENT_1 (as if there was no EVENT_2). Additionally, there are many IDs which this must be calculated independently for. Right now I am doing this on the CPU, but it takes a long time so I would like to switch to doing it on a GPU.

My main idea is to do a groupby('ID').apply_grouped() or groupby('ID').agg() but I do not know what to put in the apply_grouped() or agg() functions. I was doing this before with dask on the CPU, but it was more intuitive because the grouped DataFrame was passed directly to the apply() function. It seems that in cuDF I have to pass the incols and I am unable to figure out how to treat those as a DataFrame.

There are approximately 5,000 IDs, so ideally each grouped ID would be processed by a core in the GPU, but I am not sure if it can work like that since I am new to programming for a GPU.

Any suggestions or solutions are helpful, thank you.

1

There are 1 best solutions below

1
On BEST ANSWER

The goal is to get a column where for each id, there is an EVENT_3A such that EVENT_3A is the order in which EVENT_3 happens with respect to EVENT_1 (as if there was no EVENT_2).

What you are describing is a groupby cumulative count operation with the keys as [ID, EVENT_1]. It's not yet implemented in cuDF, so you would want to use a user defined function. For example:

Your setup:

import cudf
from numba import cuda
import numpy as np
​
data = {
    "ID":[1,1,1,1,1,1,1,1,1],
    "EVENT_1":[1,1,1,1,1,1,1,2,2,],
    "EVENT_2":[1,1,1,2,2,3,3,1,1],
    "EVENT_3":[1,2,3,1,2,1,2,1,2]
}

​
gdf = cudf.DataFrame(data)
print(gdf)
   ID  EVENT_1  EVENT_2  EVENT_3
0   1        1        1        1
1   1        1        1        2
2   1        1        1        3
3   1        1        2        1
4   1        1        2        2
5   1        1        3        1
6   1        1        3        2
7   1        2        1        1
8   1        2        1        2

We can and should use apply_grouped here. I encourage you to look at the documentation to fully understand what's going on here, but at a high level we can use the within-group thread index as the index of that row as the count. We pass the EVENT_3 column, so we make sure the column name and function argument matches.

def cumcount(EVENT_3, cumcount):
    for i in range(cuda.threadIdx.x, len(EVENT_3), cuda.blockDim.x):
        cumcount[i] = i + 1 # since your exmaple counts start with 1 rather than 0


results = gdf.groupby(["ID", "EVENT_1"]).apply_grouped(cumcount,
                               incols=['EVENT_3'],
                               outcols=dict(cumcount=np.int32))

print(results.sort_index()) # get the original row order, for demonstration
   ID  EVENT_1  EVENT_2  EVENT_3  cumcount
0   1        1        1        1         1
1   1        1        1        2         2
2   1        1        1        3         3
3   1        1        2        1         4
4   1        1        2        2         5
5   1        1        3        1         6
6   1        1        3        2         7
7   1        2        1        1         1
8   1        2        1        2         2

As a sanity check, you can prove these results match pandas on larger data.

n_ids = 5000
n_rows = 10000000
​
df = pd.DataFrame({
    "ID": np.random.choice(range(n_ids), n_rows),
    "EVENT_1": np.random.choice(range(500), n_rows),
    "EVENT_2": np.random.choice(range(500), n_rows),
    "EVENT_3": np.random.choice(range(n_ids), n_rows)
})

gdf = cudf.from_pandas(df)
results = gdf.groupby(["ID", "EVENT_1"]).apply_grouped(cumcount,
                               incols=['EVENT_3'],
                               outcols=dict(cumcount=np.int32))
results = results.sort_index()

pdf_res = df.groupby(["ID", "EVENT_1"]).EVENT_3.cumcount() + 1
print(pdf_res.astype("int32").equals(results['cumcount'].to_pandas()))
True

Please note that using df.groupby([ID, EVENT_1]).EVENT_3.cumcount() + 1 in pandas is likely quite fast if you have < 1 million rows and a reasonable number of groups, as groupby cumcount is fairly efficient. With that said, the cuDF UDF will be much faster at scale.