I have a pandas data frame and want to apply a costly operation to each group. Therefore, I want to parallelize this task using dask. The initial data frame should be broadcasted. But the computation only fails with:
<Future: error, key: iterated_costly_function-4aff5e66b6af1c073dc2cfd0d2dbb6f3>
<Future: error, key: iterated_costly_function-74d26e42c758a8cc177047d7a0f49ff4>
Here is the code:
import pandas as pd
df = pd.DataFrame({'foo':[1,2,3,4,5,6], 'bar':['a', 'a', 'b', 'b', 'a', 'b']})
display(df)
unique_values = df.bar.unique()
print(unique_values)
for v in unique_values:
subset_df = df[df.bar == v]
display(subset_df)
Now when using dask:
import pandas as pd
from tqdm import tqdm
tqdm.pandas()
from time import sleep
from dask.distributed import Client, progress
from dask.distributed import wait, as_completed
from dask.distributed import Variable
from dask import delayed
# https://stackoverflow.com/questions/49406987/how-do-we-choose-nthreads-and-nprocs-per-worker-in-dask-distributed
client = Client()#threads_per_worker=8, n_workers=2)
client
remote_df = client.scatter(df, broadcast=True)
global_var = Variable(name="remote_data")
global_var.set(remote_df)
def iterated_costly_function(v):
df = global_var.get()
subset_df = df[df.bar == v]
#subset_df = apply_some_costly_function(subset_df, x=1, y=2, z=3)
# not implemented here for sake of simplicity
sleep(3)
return subset_df#.values # make it return something
futures = client.map(iterated_costly_function, unique_values)
wait(futures)
for f in tqdm(futures):
print(f)
What is wrong in the way I try to access the broadcast variable?
I would write your function like this
where
Variable
using its name, rather than passing it in the closure (you could have passed the name string as an argument).result()
to get its value.