When using current version of dask ('0.7.5', github: [a1]) due to large size of data, I was able to perform partitioned calculations by means of dask.dataframe api. But for a large DataFrame that was stored as record in bcolz ('0.12.1', github: [a2]) I got an IndexError when doing this:
import dask.dataframe as dd
import bcolz
ctable = bcolz.open('test.bcolz', mode='r')
df_dd = dd.from_bcolz(ctable, chunksize=int(1E6))
# some calculations
z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)
df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1)
# actual computation
df_dd_out.compute()
Error was (abbreviated traceback output):
# ...
File "/usr/local/lib/python3.5/dist-packages/dask/async.py", line 481, in get_async
raise(remote_exception(res, tb))
dask.async.IndexError: index out of bounds
Actually the error was only there when doing the dd.concat action. Something like
out = (z1.to_frame('z1') + z2.to_frame('z2')).compute()
was working.
But also when parts of data were read in memory this error was in some cases there, at least for partition lengths (npartition) >1 and specific data sizes.
ctable_mem_b = ctable[:int(1E7)] # larger in-memory copy
df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b),
npartitions=10)
See full testing code _test_dask_error.py, and full output with tracebacks _test_out.txt.
Actually at that step I stopped my investigation, because I have no clue how to debug this error in async.py to the root cause. Sure I will report this as bug (if there is no hint to user/usage error). But: How to do the debugging to find the root cause?
_[a1]: _https://github.com/blaze/dask/tree/077b1b82ad03f855a960d252df2aaaa72b5b1cc5
_[a2]: _https://github.com/Blosc/bcolz/tree/562fd3092d1fee17372c11cadca54d1dab10cf9a
Taken from the FAQ of the dask documentation
Q: How do I debug my program when using dask?
If you want to dive down with a Python debugger a common cause of frustration is the asynchronous schedulers which, because they run your code on different workers, are unable to provide access to the Python debugger. Fortunately you can change to a synchronous scheduler like
dask.get
ordask.async.get_sync
by providing aget=
keyword to thecompute
method::Both
dask.async.get_sync
anddask.get
will provide traceback traversals.dask.async.get_sync
uses the same machinery of the async schedulers but with only one worker.dask.get
is dead-simple but does not cache data and so can be slow for some workloads.Comment
I'm curious to see what the issue is. If the cause is not immediately obvious after using the method above then I recommend raising an issue on the dask issue tracker.