Why are there so many data loss after dask.dataframe to_csv

44 Views Asked by At

Im a newbie to Dask distribute,now I am doing a simple test to learn it and got a very strange situation, here is my code:

import numpy as np
import pandas as pd

import dask.dataframe as dd

data_vol = 2000
index = pd.date_range("2021-09-01", periods=data_vol, freq="1h")
df = pd.DataFrame({"a": np.arange(data_vol), "b": ["abcaddbe"] * data_vol, 'time': index})
ddf = dd.from_pandas(df, npartitions=10)

df2 = pd.DataFrame({"c": np.arange(data_vol), "d": ["xyzopq"] * data_vol, 'time': reversed(index)})
ddf2 = dd.from_pandas(df2, npartitions=10)

ddf['timestamp'] = ddf.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))
ddf2['timestamp'] = ddf2.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))

def merge_onindex(ddf, ddf2):
    ret = ddf.merge(ddf2)
    ret["add"] = ret.a + ret.c + 1
    return ret


from dask.distributed import Client
import dask

dask.config.set({"dataframe.shuffle.method": "tasks"})
client = Client("tcp://172.17.0.2:8786")

ddf_st = client.scatter(ddf.set_index('timestamp'), broadcast=True)
ddf2_st = client.scatter(ddf2.set_index("timestamp"), broadcast=True)

dd_merge_res = client.submit(merge_onindex, ddf_st, ddf2_st)
## Future: merge_onindex status: finished, type: dask.dataframe.core.DataFrame, key: merge_onindex-da1eb54a93de0c19af3093b76230b9f6

dd_merge_res.result().to_csv("/jupyter/merge_single.csv", single_file=True)

then I run wc -l merge_single.csv, there was only hundreds lines and the line number would vary each time I ran it.

Here are some head lines:

,a,b,time,c,d,add
0,19,abcaddbe,2021-09-01 19:00:00,1980,xyzopq,2000
1,22,abcaddbe,2021-09-01 22:00:00,1977,xyzopq,2000
2,35,abcaddbe,2021-09-02 11:00:00,1964,xyzopq,2000
3,37,abcaddbe,2021-09-02 13:00:00,1962,xyzopq,2000
4,50,abcaddbe,2021-09-03 02:00:00,1949,xyzopq,2000
5,58,abcaddbe,2021-09-03 10:00:00,1941,xyzopq,2000
6,78,abcaddbe,2021-09-04 06:00:00,1921,xyzopq,2000
7,84,abcaddbe,2021-09-04 12:00:00,1915,xyzopq,2000
8,112,abcaddbe,2021-09-05 16:00:00,1887,xyzopq,2000

The existing lines are correct but many other lines are missing !

Thanks for any help!

My environment:

docker base image: python:3.8
dask: 2023.5.0
2 docker containers as worker and one as master. Each has 3 cpus.
2

There are 2 best solutions below

0
On

The issue you are encountering is likely due to using scatter on the entire Dask DataFrame and then applying the merge operation. This approach can result in data shuffling, which might cause some data to be lost or duplicated. To solve this problem, you can perform the merge operation directly on the Dask DataFrames without using scatter. Dask will handle the distributed computation internally.

import dask.dataframe as dd

data_vol = 2000
index = pd.date_range("2021-09-01", periods=data_vol, freq="1h")
df = pd.DataFrame({"a": np.arange(data_vol), "b": ["abcaddbe"] * data_vol, 'time': index})
ddf = dd.from_pandas(df, npartitions=10)

df2 = pd.DataFrame({"c": np.arange(data_vol), "d": ["xyzopq"] * data_vol, 'time': reversed(index)})
ddf2 = dd.from_pandas(df2, npartitions=10)

ddf['timestamp'] = ddf.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))
ddf2['timestamp'] = ddf2.time.apply(lambda x: int(x.timestamp()), meta=('time', 'int64'))

def merge_onindex(ddf, ddf2):
    ret = ddf.merge(ddf2)
    ret["add"] = ret.a + ret.c + 1
    return ret

ddf = ddf.set_index('timestamp')
ddf2 = ddf2.set_index("timestamp")

# Use the custom merge function with map
dd_merge_res = ddf.map_partitions(custom_merge_onindex, ddf2)

from dask.distributed import Client

client = Client("tcp://172.17.0.2:8786")

dd_merge_res.to_csv("/jupyter/merge_single_custom.csv", single_file=True)
0
On

After some contemplations, I found one solution:

f = [client.submit(merge_noindex, d1, d2) 
    for d1, d2 in zip(ddf_st.result().partitions, ddf2_st.result().partitions)]
[r.result().to_csv(f"/jupyter/merge_partitions/{i}.csv", single_file=True)
     for i, r in enumerate(f)]

No doubt that the dask.dataframe is composed of many partitions but the client.submit only run some of them, not all, You have to include them explicitly.

I think there should be many nicer and eligantier methods to handle this problem.

Plz show me if u have!