Im a newbie to Dask distribute,now I am doing a simple test to learn it and got a very strange situation, here is my code:
import numpy as np
import pandas as pd
import dask.dataframe as dd
data_vol = 2000
index = pd.date_range("2021-09-01", periods=data_vol, freq="1h")
df = pd.DataFrame({"a": np.arange(data_vol), "b": ["abcaddbe"] * data_vol, 'time': index})
ddf = dd.from_pandas(df, npartitions=10)
df2 = pd.DataFrame({"c": np.arange(data_vol), "d": ["xyzopq"] * data_vol, 'time': reversed(index)})
ddf2 = dd.from_pandas(df2, npartitions=10)
ddf['timestamp'] = ddf.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))
ddf2['timestamp'] = ddf2.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))
def merge_onindex(ddf, ddf2):
ret = ddf.merge(ddf2)
ret["add"] = ret.a + ret.c + 1
return ret
from dask.distributed import Client
import dask
dask.config.set({"dataframe.shuffle.method": "tasks"})
client = Client("tcp://172.17.0.2:8786")
ddf_st = client.scatter(ddf.set_index('timestamp'), broadcast=True)
ddf2_st = client.scatter(ddf2.set_index("timestamp"), broadcast=True)
dd_merge_res = client.submit(merge_onindex, ddf_st, ddf2_st)
## Future: merge_onindex status: finished, type: dask.dataframe.core.DataFrame, key: merge_onindex-da1eb54a93de0c19af3093b76230b9f6
dd_merge_res.result().to_csv("/jupyter/merge_single.csv", single_file=True)
then I run wc -l merge_single.csv
, there was only hundreds lines and the line number would vary each time I ran it.
Here are some head lines:
,a,b,time,c,d,add
0,19,abcaddbe,2021-09-01 19:00:00,1980,xyzopq,2000
1,22,abcaddbe,2021-09-01 22:00:00,1977,xyzopq,2000
2,35,abcaddbe,2021-09-02 11:00:00,1964,xyzopq,2000
3,37,abcaddbe,2021-09-02 13:00:00,1962,xyzopq,2000
4,50,abcaddbe,2021-09-03 02:00:00,1949,xyzopq,2000
5,58,abcaddbe,2021-09-03 10:00:00,1941,xyzopq,2000
6,78,abcaddbe,2021-09-04 06:00:00,1921,xyzopq,2000
7,84,abcaddbe,2021-09-04 12:00:00,1915,xyzopq,2000
8,112,abcaddbe,2021-09-05 16:00:00,1887,xyzopq,2000
The existing lines are correct but many other lines are missing !
Thanks for any help!
My environment:
docker base image: python:3.8
dask: 2023.5.0
2 docker containers as worker and one as master. Each has 3 cpus.
The issue you are encountering is likely due to using scatter on the entire Dask DataFrame and then applying the merge operation. This approach can result in data shuffling, which might cause some data to be lost or duplicated. To solve this problem, you can perform the merge operation directly on the Dask DataFrames without using scatter. Dask will handle the distributed computation internally.