RAPIDS: How to use one dataframe in a UDF called with apply_rows of another dataframe?

345 Views Asked by At

For each row in dataframe A, I need to query DF B. I need to do something like this: filter B rows by values in column b1 (B.b1) which are in a range defined by columns A.a1 and A.a2 and assign combined values to column A.a3.

In pandas that would be something like:

A.a1 = B[(B.b1>A.a2) & (B.b1<A.a3)]['b2'].values

I tried passing a dataframe in a function parameter of the UDF but got an error:

ValueError: Cannot determine Numba type of <class 'cudf.core.dataframe.DataFrame'>

Below is a working Python sample using Pandas.

toyevents = pd.DataFrame.from_dict({'end': {0: 8.748356416,
         1: 8.752231441000001,
         2: 8.756627850000001,
         3: 8.760818359,
         4: 8.765967569,
         5: 8.77041589,
         6: 8.774226174,
         7: 8.776358813,
         8: 8.77866835,
         9: 8.780719302000001},
 'name_id': {0: 18452.0,
             1: 20586.0,
             2: 20491.0,
             3: 20610.0,
             4: 20589.0,
             5: 20589.0,
             6: 19165.0,
             7: 20589.0,
             8: 20586.0,
             9: 19064.0},
 'start': {0: 8.748299848,
           1: 8.752229263,
           2: 8.756596980000001,
           3: 8.760816603,
           4: 8.765957310000001,
           5: 8.770381615,
           6: 8.77414259,
           7: 8.776349745000001,
           8: 8.778666861000001,
           9: 8.780674982}})

toynvtx = pd.DataFrame.from_dict({'NvtxEvent.Text': {0: 'Iteration 32',
                    1: 'FWD pass',
                    2: 'Prediction and loss',
                    3: 'BWD pass',
                    4: 'Optimizer update'},
 'end': {0: 8.802574018000001,
         1: 8.771325765,
         2: 8.771688249,
         3: 8.792846429,
         4: 8.802333183},
 'start': {0: 8.744061385,
           1: 8.747272157000001,
           2: 8.771329333,
           3: 8.771691628000001,
           4: 8.792851876}})

# Search NVTX ranges encompassing [start,end] range.
def pickNVTX(r,nvtx):
    start = r['start']
    end = r['end']
    start_early = nvtx[nvtx['start'] <= start]
    end_later = start_early[start_early['end'] >= end]
    return ','.join(end_later['NvtxEvent.Text'])

# Using apply()
toyevents.loc[:,'nvtx'] = toyevents_.apply(pickNVTX,nvtx=toynvtx,axis=1)

# Method 2. Using iterrows()
for i, row in toyevents.iterrows():
    toyevents.loc[i, 'nvtx'] = ','.join(
        toynvtx[(toynvtx.start <= row.start)
                & (toynvtx.end >= row.end)]['NvtxEvent.Text'].values)
1

There are 1 best solutions below

2
On BEST ANSWER

You would probably want to use an inequality (conditional) join for this kind of problem. This is not currently supported in pandas, cuDF, or BlazingSQL.

If your data isn’t enormous, you can do this with a combination of a cross join, boolean mask, and groupby collect_list. A UDF would probably also work if you provide the second dataframe as an argument so you can index into it and loop (but this will get a messy and inefficient).

The output of your example is:

        end  name_id     start                   nvtx
0  8.748356  18452.0  8.748300  Iteration 32,FWD pass
1  8.752231  20586.0  8.752229  Iteration 32,FWD pass
2  8.756628  20491.0  8.756597  Iteration 32,FWD pass
3  8.760818  20610.0  8.760817  Iteration 32,FWD pass
4  8.765968  20589.0  8.765957  Iteration 32,FWD pass
5  8.770416  20589.0  8.770382  Iteration 32,FWD pass
6  8.774226  19165.0  8.774143  Iteration 32,BWD pass
7  8.776359  20589.0  8.776350  Iteration 32,BWD pass
8  8.778668  20586.0  8.778667  Iteration 32,BWD pass
9  8.780719  19064.0  8.780675  Iteration 32,BWD pass

The following code would provide the same output, with a List column rather than a string column.

# put the example data on the GPU
toyevents = cudf.from_pandas(toyevents)
toynvtx = cudf.from_pandas(toynvtx)
​
# cross join
toyevents['key'] = 1
toynvtx['key'] = 1
merged = toyevents.merge(toynvtx, how="outer", on="key")
del merged["key"]

# filter
mask = (merged.start_y <= merged.start_x) & (merged.end_y >= merged.end_x)
del merged["start_y"], merged["end_y"]

# collect list
merged[mask].groupby(["end_x", "name_id", "start_x"])["NvtxEvent.Text"].agg(list)
end_x     name_id  start_x 
8.748356  18452.0  8.748300    [Iteration 32, FWD pass]
8.752231  20586.0  8.752229    [Iteration 32, FWD pass]
8.756628  20491.0  8.756597    [Iteration 32, FWD pass]
8.760818  20610.0  8.760817    [Iteration 32, FWD pass]
8.765968  20589.0  8.765957    [Iteration 32, FWD pass]
8.770416  20589.0  8.770382    [Iteration 32, FWD pass]
8.774226  19165.0  8.774143    [Iteration 32, BWD pass]
8.776359  20589.0  8.776350    [Iteration 32, BWD pass]
8.778668  20586.0  8.778667    [Iteration 32, BWD pass]
8.780719  19064.0  8.780675    [Iteration 32, BWD pass]
Name: NvtxEvent.Text, dtype: list