Dask how to scatter data when doing a reduction

690 Views Asked by At

I am using Dask for a complicated operation. First I do a reduction which produces a moderately sized df (a few MBs) which I then need to pass to each worker to calculate the final result so my code looks a bit like this

intermediate_result = ddf.reduction().compute()

final_result = ddf.reduction(
    chunk=function, chunk_kwargs={"intermediate_result": intermediate_result}
)

However I am getting the warning message that looks like this

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)

I have tried doing this

intermediate_result = client.scatter(intermediate_result, broadcast=True)

But this isn't working as the function now sees this as a Future object and not the datatype it is supposed to be. I can't seem to find any documentation on how to use scatter with reductions, does anyone know how to do this? Or should I just ignore the warning message and pass the moderately sized df as I am?

1

There are 1 best solutions below

2
On

Actually, the best solution probably is not to scatter your materialised result, but to avoid computing it in the first place. You can simply remove the .compute(), which will mean all the calculation gets done in one stage, with the results automatically moved where you need them.

Alternatively, if you want to have a clear boundary between the stages, you can use

intermediate_result = ddf.reduction().persist()

which will kick off the reduction and store it on workers without pulling it to the client. You can choose to wait on this to finish before the next step or not.