Why does dask.dataframe compute() result gives IndexError in specific cases? How to find reason of async error?

1.3k Views Asked by At

When using current version of dask ('0.7.5', github: [a1]) due to large size of data, I was able to perform partitioned calculations by means of dask.dataframe api. But for a large DataFrame that was stored as record in bcolz ('0.12.1', github: [a2]) I got an IndexError when doing this:

import dask.dataframe as dd
import bcolz

ctable = bcolz.open('test.bcolz', mode='r')
df_dd = dd.from_bcolz(ctable, chunksize=int(1E6))

# some calculations
z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)

df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1)

# actual computation
df_dd_out.compute()

Error was (abbreviated traceback output):

# ...      
    File "/usr/local/lib/python3.5/dist-packages/dask/async.py", line 481, in get_async
        raise(remote_exception(res, tb))
dask.async.IndexError: index out of bounds

Actually the error was only there when doing the dd.concat action. Something like

out = (z1.to_frame('z1') + z2.to_frame('z2')).compute()

was working.

But also when parts of data were read in memory this error was in some cases there, at least for partition lengths (npartition) >1 and specific data sizes.

ctable_mem_b = ctable[:int(1E7)] # larger in-memory copy
df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b),
                             npartitions=10)

See full testing code _test_dask_error.py, and full output with tracebacks _test_out.txt.

Actually at that step I stopped my investigation, because I have no clue how to debug this error in async.py to the root cause. Sure I will report this as bug (if there is no hint to user/usage error). But: How to do the debugging to find the root cause?

_[a1]: _https://github.com/blaze/dask/tree/077b1b82ad03f855a960d252df2aaaa72b5b1cc5

_[a2]: _https://github.com/Blosc/bcolz/tree/562fd3092d1fee17372c11cadca54d1dab10cf9a

2

There are 2 best solutions below

2
On

Taken from the FAQ of the dask documentation

Q: How do I debug my program when using dask?

If you want to dive down with a Python debugger a common cause of frustration is the asynchronous schedulers which, because they run your code on different workers, are unable to provide access to the Python debugger. Fortunately you can change to a synchronous scheduler like dask.get or dask.async.get_sync by providing a get= keyword to the compute method::

my_array.compute(get=dask.async.get_sync)

Both dask.async.get_sync and dask.get will provide traceback traversals. dask.async.get_sync uses the same machinery of the async schedulers but with only one worker. dask.get is dead-simple but does not cache data and so can be slow for some workloads.

Comment

I'm curious to see what the issue is. If the cause is not immediately obvious after using the method above then I recommend raising an issue on the dask issue tracker.

0
On

After using

df_dd_mem_b.compute(get=dask.async.get_sync)

it became clear, that the error

#...
    File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 1637, in _loc
        result = df.loc[start:stop]

happened, because in _loc not exact boundaries are given, but stop is out-of-bound

df_dd_out_mem_b.divisions

Out: (0, 1000000, 2000000, 3000000, 4000000, 5000000, 6000000, 7000000, 8000000, 9000000, 9999999)

and in interrupted task was (e.g.) called

df.loc[1000000:2000000]

although last index label is 1999999.

Problem is that for pandas.DataFrame.loc it is given: "Allowed inputs are: [...] A slice object with labels 'a':'f', (note that contrary to usual python slices, both the start and the stop are included!)" (taken from documentation stable version, 0.17.1). Apparently for small numbers no out-of-bounds error is raised, but for large numbers (i>~1E6) I got IndexError with this test:

df = pd.DataFrame({0: range(i)}).loc[0:i]

With pd.DataFrame.iloc this uncertain behaviour seems indeed not an issue according to documentation: ".iloc will raise IndexError if a requested indexer is out-of-bounds, except slice indexers which allow out-of-bounds indexing. ", and indeed short tests showed no irregular out-of-bounds error here:

df = pd.DataFrame({0: range(i)}).iloc[0:i]

It is rather sure not a proper fix for given dask problem, because _loc is written more generic, but eventually only for specific calls which are essentially

result = df.loc[slice(*df.index[[0, -1]])]