I am using Dask for a complicated operation. First I do a reduction which produces a moderately sized df (a few MBs) which I then need to pass to each worker to calculate the final result so my code looks a bit like this
intermediate_result = ddf.reduction().compute()
final_result = ddf.reduction(
chunk=function, chunk_kwargs={"intermediate_result": intermediate_result}
)
However I am getting the warning message that looks like this
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s)
I have tried doing this
intermediate_result = client.scatter(intermediate_result, broadcast=True)
But this isn't working as the function now sees this as a Future object and not the datatype it is supposed to be. I can't seem to find any documentation on how to use scatter with reductions, does anyone know how to do this? Or should I just ignore the warning message and pass the moderately sized df as I am?
Actually, the best solution probably is not to scatter your materialised result, but to avoid computing it in the first place. You can simply remove the
.compute()
, which will mean all the calculation gets done in one stage, with the results automatically moved where you need them.Alternatively, if you want to have a clear boundary between the stages, you can use
which will kick off the reduction and store it on workers without pulling it to the client. You can choose to wait on this to finish before the next step or not.