User-defined types for groupby apply with p2p shuffling

36 Views Asked by At

It seems that p2p shuffling is not supported for a dask dataframe that contains columns of user-defined types. This is not a limitation for tasks shuffling though. Here’s an example:

import dask
import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd

class Foo:
    def __init__(self, x):
        self.x = x
        
def make_df():
    return pd.DataFrame([[1, Foo(1)], [2, Foo(2)], [3, Foo(3)]] , columns=['A', 'B'])

cl = Client()
ddf = dd.from_delayed(dask.delayed(make_df)(), meta={'A': int, 'B': object})
cl.gather(cl.compute(ddf.groupby('A').apply(lambda df: len(df), meta=(None, int))))

Here ddf contains a column B of type Foo, and as a result I can’t groupby and apply on ddf with p2p shuffling.

I tried changing the meta for B to Foo but it still doesn’t work. Does there exist a way right now to do it?

Edit: it works fine if the computation is done locally, and it only fails on a cluster.

0

There are 0 best solutions below