I have an dask
-boost_histogram
question. I have a code structure as follows:
I have a class defined in some script:
class MyHist:
def __init__(....):
self.bh = None
def make_hist(...):
axis = bh.axis.Regular(....)
@dask.delayed
def fill_hist(data)
self.bh.fill(data)
and in another script I want to fill multiple histograms in parallel with dask. The data are awkward
arrays that I read from input, and for that I do something like:
from dask.distributed import Client
cl = Client()
histos = [MyHist(..), MyHist(another...)]
for i, file in enumerate(files):
data = dask.delayed(open_file(file))
for myhist in histos:
if i ==0: myhist.make_hist()
fill_results.append(myhist.fill_hist(data)
dask.compute(*fill_results)
If I then try to call
for j, h in enumerate(histos):
print(h.bh)
I get empty histograms. However, if I print the boost histogram inside the fill_hist
funciton, the histograms seem to be filled.
Does the dask
computation create a deep copy or something of the MyHist
object to perform the computation, and hence fill the bh
associated with that copy? or am I doing something wrong here?
===================================================================== Update:
I see a similar or worse wall-time when using dask to read and fill than using sequential code. This is the case whether or not I use my code or the suggested answer. For an example that doesn't use an intermediate class, I've written the following code:
files = get_input_file_paths('myprocess')
@dask.delayed
def make_a_var(jet_pt):
jets_pt = copy(jet_pt)
jets_pt = ak.mask(jets_pt, ak.count(jets_pt, axis=1)>=1)
return jets_pt[:, 0]*1e-3
@dask.delayed
def make_and_fill(data, axes):
h = bh.Histogram(*axes, storage=bh.storage.Weight())
h.fill(data)
return h
batch_size = 4
results = []
for i in range(0, len(files), batch_size):
batch = []
for j, file in enumerate(files[i:i+batch_size]):
data = dask.delayed(read_file(file))
var = data['jet_pt']
new_var = make_a_var(var)
new_var = new_pt.to_numpy() # Needed bc bh breaks for masked ak arrays
new_var= new_var.compressed()
for k in range(10):
axes = (bh.axis.Regular(25, 0, 250), )
h = make_and_fill(new_var, axes)
batch.append(h)
results.append(batch)
dask.compute(*results)
It takes a similar amount of wall-time ~7s to run this code sequentially as well as with dask, for k in range(10)
. For k in range(100)
the parallel code takes 15s and sequential takes 21s, which is not as big of an improvement as I would have thought.
I believe Jim's comment is correct w.r.t. the source of the problem; I'll also offer a solution I think may be helpful in solving the problem:
I think the definition of your class makes it difficult to work correctly with
dask
; that is, you probably will have an easier time if yourfill_hist
method was actually a free function. And in your loop you are actually callingdask.delayed
on an alreadydelayed
method (this is likely not what you want to do):My suggestion would be to go with a free function:
This pattern is very similar to how
dask-histogram
works on its backend, (anddask-histogram
has support fordask-awkward
!)