Using `dask` to fill `boost_histograms` stored in class in parallel

143 Views Asked by At

I have an dask -boost_histogram question. I have a code structure as follows:

I have a class defined in some script:

class MyHist:
     def __init__(....):
         self.bh = None
     def make_hist(...):
           axis = bh.axis.Regular(....)
     @dask.delayed
     def fill_hist(data)
         self.bh.fill(data)

and in another script I want to fill multiple histograms in parallel with dask. The data are awkward arrays that I read from input, and for that I do something like:

     from dask.distributed import Client
     cl = Client()
     histos = [MyHist(..), MyHist(another...)]
     for i, file in enumerate(files):
         data = dask.delayed(open_file(file))
         for myhist in histos:
             if i ==0:  myhist.make_hist()
             fill_results.append(myhist.fill_hist(data)
      dask.compute(*fill_results)

If I then try to call

for j, h in enumerate(histos):
        print(h.bh) 

I get empty histograms. However, if I print the boost histogram inside the fill_hist funciton, the histograms seem to be filled.

Does the dask computation create a deep copy or something of the MyHist object to perform the computation, and hence fill the bh associated with that copy? or am I doing something wrong here?

===================================================================== Update:

I see a similar or worse wall-time when using dask to read and fill than using sequential code. This is the case whether or not I use my code or the suggested answer. For an example that doesn't use an intermediate class, I've written the following code:

files = get_input_file_paths('myprocess')

@dask.delayed
def make_a_var(jet_pt):
    jets_pt = copy(jet_pt)
    jets_pt = ak.mask(jets_pt, ak.count(jets_pt, axis=1)>=1)
    return jets_pt[:, 0]*1e-3

@dask.delayed
def make_and_fill(data, axes):
    h = bh.Histogram(*axes, storage=bh.storage.Weight())
    h.fill(data)
    return h 

batch_size = 4
results = []
for i in range(0, len(files), batch_size):
    batch = []
    for j, file in enumerate(files[i:i+batch_size]):
        data = dask.delayed(read_file(file))
        var = data['jet_pt']
        new_var = make_a_var(var)
        new_var = new_pt.to_numpy() # Needed bc bh breaks for masked ak arrays
        new_var= new_var.compressed()
        for k in range(10):
            axes = (bh.axis.Regular(25, 0, 250), )
            h = make_and_fill(new_var, axes)
            batch.append(h)
    results.append(batch)
dask.compute(*results)

It takes a similar amount of wall-time ~7s to run this code sequentially as well as with dask, for k in range(10). For k in range(100) the parallel code takes 15s and sequential takes 21s, which is not as big of an improvement as I would have thought.

1

There are 1 best solutions below

3
On

I believe Jim's comment is correct w.r.t. the source of the problem; I'll also offer a solution I think may be helpful in solving the problem:

I think the definition of your class makes it difficult to work correctly with dask; that is, you probably will have an easier time if your fill_hist method was actually a free function. And in your loop you are actually calling dask.delayed on an already delayed method (this is likely not what you want to do):

fill_results.append(dask.delayed(myhist.fill_hist(data))
#                                       ^^^^^^^^^
#                                 already delayed method

My suggestion would be to go with a free function:

@dask.delayed
def fill_hist(data, axes, storage=None):
    storage = storage or bh.storage.Double()
    h = bh.Histogram(*axes, storage=storage)
    h.fill(data)
    return h

@dask.delayed
def open_file(fname):
    data = some_function_to_get_data(fname)
    return data

axes = (bh.axis.Regular(100, -10, 10),)  # tuple with a single axis
tasks = []
for f in files:
    data = open_file(f)
    hist = fill_hist(data=data, axes=axes)
    tasks.append(hist)

results = dask.compute(tasks)

This pattern is very similar to how dask-histogram works on its backend, (and dask-histogram has support for dask-awkward!)