Dask - return a dask.dataframe on map_partition call

374 Views Asked by At

I'm wondering How can I return a dask Dataframe when I call a map_partitions instead of a pd.Dataframe in order to avoid memory issues.

Input Dataframe

id  | name   | pet_id
---------------------
1    Charlie  pet_1
2    Max      pet_2
3    Buddy    pet_3
4    Oscar    pet_4

expected output from map_partitions

pet_id | name    |    date    | is_healty
------------------------------------------
pet_1    Charlie   11-20-2018   False
pet_1    Charlie   02-17-2020   True
pet_1    Charlie   04-30-2020   True
pet_2    Max       10-17-2020   True
pet_3    Buddy     01-20-2020   True
pet_3    Buddy     12-12-2020   False
pet_4    Oscar     08-24-2019   True

I already did the following function and is working if I return a pd.Dataframe. But if I return a dask.dataframe an *** AssertionError is raised

def get_pets_appointments(df):
    dask_ddf = None
    for k, pet_id in df["pet_id"].iteritems():
        _resp = pets.get_pet_appointments(pet_id) # http POST call
        tmp_df = pd.DataFrame(_resp)
        if dask_ddf is None:
            # First iteration, initialize Dask dataframe
            dask_ddf = dd.from_pandas(tmp_df, npartitions=1)
            continue
        # Work with Dask dataframe in order to avoid Memory Issues
        dask_ddf = dd.concat([dask_ddf, tmp_df])
    # this line works fine
    # return dask_ddf.compute()
    
    # this is raising AssertionError
    return dask_ddf

And I'm invoking the function as follow

pets_app_df = pets_df.map_partitions(get_pets_appointments)
1

There are 1 best solutions below

1
On

Short answer: no (sorry)

The purpose of map_partitions is to act on each of the constituent pandas dataframes of a dask dataframe. The expectation is, that you will be making a new dataframe of the same number of partitions as the original. I think you are wanting to split each partition into many partitions; you could do this by calling .repartition beforehand.

However, I am surprised:

dask_ddf = dd.from_pandas(tmp_df, npartitions=1)
...
dask_ddf = dd.concat([dask_ddf, tmp_df])

both of the dataframes you give here are in memory, so how would making them into a dask-dataframe help you? Repeatedly concatenating (actually, appending) isn't a great model.