I am trying to mask and then apply a unique
operation on one column. A simplified version of the code i am using is reported below:
import numpy as np
import pandas as pd
import dask.dataframe as dd
data = np.random.randint(0,100,(1000,2))
ddf = dd.from_pandas(pd.DataFrame(data, columns = ['data','id']), npartitions = 2)
mask = ddf['data'] > 0
unique_false = ddf[~mask]['id'].unique()
unique_true = ddf[mask]['id'].unique()
results = dask.compute([unique_true, unique_false])
This quick example works fine. My real data are composed of ~5000
columns, where one column is used to filter and one is to get unique ids. The data are stored in 200
parquet partitions, each of these partitions weights 9MB, but when loaded in memory (ddf.get_partition(0).compute().info()
) weights ~5GB
. Given that i have around 400GB
of RAM, I would assume that I can load around 80
partitions (maybe less given the overhead of other operations). From the dashboard I can see that dask is trying to execute all task at once (in memory tasks are always the same, it does not matter how many workers).
I wrote this to test the time it takes to process a partition:
start = time.time()
df = ddf.get_partition(0).compute()
mask = df['data'] > 0
unique_true = df[mask]['id'].unique()
unique_false = df[~mask]['id'].unique()
print(time.time() - start)
It takes around 60s
and it requires around 7GB
of RAM. If I start a ProcessPool and assuming I am only running 50
partitions at a time, it would take 4-5
minutes.
I know the core of Dask exactly does what I did with a single partition, so my question would be why Dask tries to execute all tasks in parallel instead of one at a time? Is there a way to limit the task execution? And is this the real problem here or am I missing something?
I found several questions here to limit task execution. All points here: https://distributed.dask.org/en/latest/resources.html. However, I believe I should not force this behavior and let Dask do its best. I should also mention that Dask is able to run the code when setting 5 workers in single thread with 80GB of RAM each (but it takes a lot more than the time it would take with the process pool method i mentioned).
I am on python 3.6.10
and Dask 2.17.2
.