How to run Dask Client via call from another script?

364 Views Asked by At

I have a processing that is done in Luigi, in one of the phases I perform a series of calculations in the DataFrame. To speed up I decided to use a local Dask cluster. When I run through Python or Jupyter, the cluster goes up and I run everything right, but when it runs inside Luigi it gives the following error:

UserWarning: Failed to start diagnostic server on port 8787.

df = func(params)
df.to_csv('...')

def func(params):
  df = params.get('df')
  client = Client()
  result = [client.submit(sample, row) for index, row in df.iterrows()]
  result = client.gather(result)
  new_df = pd.DataFrame(result)
  return df

How to solve this?

1

There are 1 best solutions below

0
On BEST ANSWER

This is untested code (no experience with luigi)
How about the following code (as separate module) -

from dask.distributed import Client  
df = func(params)
df.to_csv('...')

def func(params):
  df = params.get('df')
  result = [client.submit(sample, row) for index, row in df.iterrows()]
  result = client.gather(result)
  new_df = pd.DataFrame(result)
  return df 

if __name__ == "__main__":  
    with Client() as client:  
        df_result = func(params)