I want to submit functions with Dask that have large (gigabyte scale) arguments. What is the best way to do this? I want to run this function many times with different (small) parameters.
Example (bad)
This uses the concurrent.futures interface. We could use the dask.delayed interface just as easily.
x = np.random.random(size=100000000) # 800MB array
params = list(range(100)) # 100 small parameters
def f(x, param):
pass
from dask.distributed import Client
c = Client()
futures = [c.submit(f, x, param) for param in params]
But this is slower than I would expect or results in memory errors.
OK, so what's wrong here is that each task contains the numpy array
x
, which is large. For each of the 100 tasks that we submit we need to serializex
, send it up to the scheduler, send it over to the worker, etc..Instead, we'll send the array up to the cluster once:
Now
future
is a token that points to an arrayx
that lives on the cluster. Now we can submit tasks that refer to this remote future, instead of the numpy array on our local client.This is now much faster, and lets Dask control data movement more effectively.
Scatter data to all workers
If you expect to need to move the array x to all workers eventually then you may want to broadcast the array to start
Use Dask Delayed
Futures work fine with dask.delayed as well. There is no performance benefit here, but some people prefer this interface: