I'm running some simple tests with Dask distributed and Datashader but I'm running into two problems that I haven't been able to solve neither understand why it happens.

The data I’m working with consists of 1.7 billion rows with 97 columns each, distributed into 64 parquet files. My test code is the following, in which I simply plot two columns of the data in a scatter plot, following the example code at the bottom of https://datashader.org/user_guide/Performance.html :

def plot(file_path):
    dask_df = dd.read_parquet(file_path,  engine='pyarrow') 
    cvs = ds.Canvas(plot_width=600, plot_height=300)
    agg = cvs.points(dask_df, 'x', 'y')
    img = tf.shade(agg, cmap=['lightblue', 'darkblue'])
    return img

futures = [dask_client.submit(plot,file) for f in files_paths]
result = [f.result() for f in futures]  #array with each plot per file

The two problems are the following:

First, my workers take way too many data into memory. For example, I've run the previous code with just one worker and one file. Even though one file is 11gb, the Dask dashboard shows around 50gb loaded into memory. The only solution I have found to this is to change the following line, expliciting a small slice of the columns:

def plot(file_path):
    dask_df = dd.read_parquet(file_path,  columns=['x','y',...], engine='pyarrow') 
    …

Although this works (and makes sense because I’m only using 2 columns for the plot) it’s still confusing as to why the workers use that much memory.

The second problem is that, even though I have configured in my ~/.config/dask/distributed.yaml file that at 70% a spill should happen, my workers keep crashing because they run out of memory:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting distributed.nanny - WARNING - Restarting worker

Finally, when I plot all the points, bringing only 5 columns with columns=['x','y','a','b','c'] when reading the data, I’m getting unreasonable slow times. Despite the files being split into 8 disk for speeding up the I/O and working with 8 cores (8 workers) it takes 5 minutes for the 1.7 billion points to plot.

I'm using: dask 2.18.0, distributed 2.19.0, datashader 0.10.0, and python 3.7.7.

I've been struggling with this for a whole week so any advice would be highly appreciated. Please feel free to ask me for any other information that may be missing.

1

There are 1 best solutions below

0
On

Although this works (and makes sense because I’m only using 2 columns for the plot) it’s still confusing as to why the workers use that much memory.

Parquet is a relatively efficient format. For example your data may be compressed on disk but is uncompressed in Pandas, or the Pandas string type might be causing some bloat (Pandas uses Python strings, which are large).

The second problem is that, even though I have configured in my ~/.config/dask/distributed.yaml file that at 70% a spill should happen, my workers keep crashing because they run out of memory:

I'm not sure what to tell you with this one. Dask can't stop Python functions from running out of RAM. I would check in with the datashader folks, but I would expect their code to be pretty tight.

Finally, when I plot all the points, bringing only 5 columns with columns=['x','y','a','b','c'] when reading the data, I’m getting unreasonable slow times. Despite the files being split into 8 disk for speeding up the I/O and working with 8 cores (8 workers) it takes 5 minutes for the 1.7 billion points to plot.

It's hard to diagnose performance issues over stack overflow. I recommend following the guidance here: https://docs.dask.org/en/latest/understanding-performance.html