Dask - how to efficiently execute the right number of tasks

367 Views Asked by At

I am trying to mask and then apply a unique operation on one column. A simplified version of the code i am using is reported below:

import numpy as np
import pandas as pd
import dask.dataframe as dd

data = np.random.randint(0,100,(1000,2))
ddf = dd.from_pandas(pd.DataFrame(data, columns = ['data','id']), npartitions = 2)

mask = ddf['data'] > 0
unique_false = ddf[~mask]['id'].unique()
unique_true = ddf[mask]['id'].unique()

results = dask.compute([unique_true, unique_false])

This quick example works fine. My real data are composed of ~5000 columns, where one column is used to filter and one is to get unique ids. The data are stored in 200 parquet partitions, each of these partitions weights 9MB, but when loaded in memory (ddf.get_partition(0).compute().info()) weights ~5GB. Given that i have around 400GB of RAM, I would assume that I can load around 80 partitions (maybe less given the overhead of other operations). From the dashboard I can see that dask is trying to execute all task at once (in memory tasks are always the same, it does not matter how many workers).

I wrote this to test the time it takes to process a partition:

start = time.time()

df = ddf.get_partition(0).compute()

mask = df['data'] > 0

unique_true = df[mask]['id'].unique()
unique_false = df[~mask]['id'].unique()

print(time.time() - start)

It takes around 60s and it requires around 7GB of RAM. If I start a ProcessPool and assuming I am only running 50 partitions at a time, it would take 4-5 minutes.

I know the core of Dask exactly does what I did with a single partition, so my question would be why Dask tries to execute all tasks in parallel instead of one at a time? Is there a way to limit the task execution? And is this the real problem here or am I missing something?

I found several questions here to limit task execution. All points here: https://distributed.dask.org/en/latest/resources.html. However, I believe I should not force this behavior and let Dask do its best. I should also mention that Dask is able to run the code when setting 5 workers in single thread with 80GB of RAM each (but it takes a lot more than the time it would take with the process pool method i mentioned).

I am on python 3.6.10 and Dask 2.17.2.

0

There are 0 best solutions below