I have a large NumPy array on my local machine that I want to parallelize with Dask.array on a cluster
import numpy as np
x = np.random.random((1000, 1000, 1000))
However when I use dask.array I find that my scheduler starts taking up a lot of RAM. Why is this? Shouldn't this data go to the workers?
import dask.array as da
x = da.from_array(x, chunks=(100, 100, 100))
from dask.distributed import Client
client = Client(...)
x = x.persist()
Whenever you
persist
orcompute
a Dask collection that data goes to the scheduler, and from there to the workers. If you want to bypass storing data on the scheduler then you will have to learn how to move data with scatter.You have three options:
Don't load data on your client machine
The best approach is to include loading data as part of your computation rather than do it locally.
Before AfterMore information about creating dask arrays is here: http://dask.pydata.org/en/latest/array-creation.html
Scatter then chunk
You can scatter your numpy array directly to a worker
This moves your data directly to a worker, then chunks from there. This is nice because it bypasses the scheduler. However you are now at risk for data loss if your workers start to fail. This only matters if you are in a massively parallel system.
This is also somewhat inefficient because all of your data in on one worker, rather than spread out. You might call
client.rebalance
or read on.Chunk then scatter
You can chunk your data locally with a local scheduler, then scatter out to the cluster.
Stay local
Alternatively you can continue to use dask locally, either with the threaded scheduler, or with the distributed scheduler using only the local process.
This will stop needless copying of data between your local process, the scheduler, and the workers. They are now all in your same local process.
See also: How to efficiently submit tasks with large arguments in Dask distributed? for a task-based version of this answer