I'm wondering How can I return a dask Dataframe when I call a map_partitions
instead of a pd.Dataframe in order to avoid memory issues.
Input Dataframe
id | name | pet_id
---------------------
1 Charlie pet_1
2 Max pet_2
3 Buddy pet_3
4 Oscar pet_4
expected output from map_partitions
pet_id | name | date | is_healty
------------------------------------------
pet_1 Charlie 11-20-2018 False
pet_1 Charlie 02-17-2020 True
pet_1 Charlie 04-30-2020 True
pet_2 Max 10-17-2020 True
pet_3 Buddy 01-20-2020 True
pet_3 Buddy 12-12-2020 False
pet_4 Oscar 08-24-2019 True
I already did the following function and is working if I return a pd.Dataframe. But if I return a dask.dataframe an *** AssertionError
is raised
def get_pets_appointments(df):
dask_ddf = None
for k, pet_id in df["pet_id"].iteritems():
_resp = pets.get_pet_appointments(pet_id) # http POST call
tmp_df = pd.DataFrame(_resp)
if dask_ddf is None:
# First iteration, initialize Dask dataframe
dask_ddf = dd.from_pandas(tmp_df, npartitions=1)
continue
# Work with Dask dataframe in order to avoid Memory Issues
dask_ddf = dd.concat([dask_ddf, tmp_df])
# this line works fine
# return dask_ddf.compute()
# this is raising AssertionError
return dask_ddf
And I'm invoking the function as follow
pets_app_df = pets_df.map_partitions(get_pets_appointments)
Short answer: no (sorry)
The purpose of
map_partitions
is to act on each of the constituent pandas dataframes of a dask dataframe. The expectation is, that you will be making a new dataframe of the same number of partitions as the original. I think you are wanting to split each partition into many partitions; you could do this by calling.repartition
beforehand.However, I am surprised:
both of the dataframes you give here are in memory, so how would making them into a dask-dataframe help you? Repeatedly concatenating (actually, appending) isn't a great model.