Prefect - Unable to Persist Dask Dataframe to Dask Client

54 Views Asked by At

I'm fairly new Prefect and struggling with using Dask persistence within a Prefect Flow. I'm not sure if this an issue with code or a limitation within Prefect.

I made the following trivial example of where I'm running into issues. My real use case is far more complex, but I'm hoping this example will concisely illustrate the core of my issue. I want to persist a Dask DataFrame to the Dask client provided by the Prefect DaskTaskRunner. I then want to pass that persisted dataframe to multiple downstream tasks.

"""
Example of persisting a Dask DataFrame with Prefect. DOES NOT work.
"""

import dask.dataframe as dd
import pandas as pd
from prefect import task, flow
from prefect_dask import get_dask_client, DaskTaskRunner
from dask.distributed import wait

@task
def load_ddf_A() -> dd.DataFrame:
    """
    Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This does not work.
    """

    with get_dask_client():
        data = [
            [1,2,3], [4,5,6], [7,8,9]
            ,[11,12,13], [14,15,16], [17,18,19]
            ,[21,22,23], [24,25,26], [27,28,29]
            ,[31,32,33], [34,35,36], [37,38,39]
            ,[41,42,43], [44,45,46], [47,48,49]
            ,[51,52,53], [54,55,56], [57,58,59]
            ,[61,62,63], [64,65,66], [67,68,69]
            ,[71,72,23], [74,75,76], [77,78,79]
            ,[81,82,33], [84,85,86], [87,88,89]
            ,[91,92,93], [94,95,96], [97,98,99]
        ]

        cols = ['A', 'B', 'C']

        df = pd.DataFrame(data=data, columns=cols)
        ddf = dd.from_pandas(df, npartitions=1)
        ddf = ddf.persist()
        wait(ddf)

    return ddf

@task
def calc_ddf_B(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 2

@task
def calc_ddf_C(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 3

@flow(task_runner=DaskTaskRunner())
def example_flow():
    """
    Example Prefect Flow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
    """

    ddf_A = load_ddf_A.submit()
    ddf_B = calc_ddf_B.submit(ddf_A)
    ddf_C = calc_ddf_C.submit(ddf_A)

    print('ddf_B is: \n')
    print(ddf_B.result().head(10))
    print('\nddf_C is: \n')
    print(ddf_C.result().head(10))

if __name__ == '__main__':
    example_flow()

This approach fails when trying to persist the dataframe. I receive the following error message:

14:22:22.878 | ERROR   | Task run 'load_ddf_A-0' - Crash detected! Execution was cancelled by the runtime environment.
2023-08-28 14:22:22,922 - distributed.worker - WARNING - Compute Failed
Key:       load_ddf_A-0-72b3fab73f2c45c8a3bdc15afffee986-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x000001AFC5275310>, 'task_run': TaskRun(id=UUID('72b3fab7-3f2c-45c8-a3bd-c15afffee986'), name='load_ddf_A-0', flow_run_id=UUID('613a493a-d2cc-4ae1-ad5a-a339f8588ffd'), task_ke
y='__main__.load_ddf_A', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), ta
gs=[], state_id=UUID('38834eaa-1282-4121-8e8a-8a4b736d767f'), task_inputs={}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 8, 28, 21, 22, 17, 768737,
 tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microse
conds=13275), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {}, 'wait_for': None, 
Exception: 'CancelledError()'

14:22:22.922 | WARNING | distributed.worker - Compute Failed
Key:       load_ddf_A-0-72b3fab73f2c45c8a3bdc15afffee986-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x000001AFC5275310>, 'task_run': TaskRun(id=UUID('72b3fab7-3f2c-45c8-a3bd-c15afffee986'), name='load_ddf_A-0', flow_run_id=UUID('613a493a-d2cc-4ae1-ad5a-a339f8588ffd'), task_ke
y='__main__.load_ddf_A', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), ta
gs=[], state_id=UUID('38834eaa-1282-4121-8e8a-8a4b736d767f'), task_inputs={}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 8, 28, 21, 22, 17, 768737,
 tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microse
conds=13275), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {}, 'wait_for': None,
Exception: 'CancelledError()'

The following code implements the same logic, but with pure Dask. It works as expected.


"""
Example of persisting a Dask DataFrame without Prefect. DOES work.
"""

import dask.dataframe as dd
import pandas as pd
from dask.distributed import wait, LocalCluster, Client

def load_ddf_A() -> dd.DataFrame:
    """
    Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This does work.
    """

    data = [
        [1,2,3], [4,5,6], [7,8,9]
        ,[11,12,13], [14,15,16], [17,18,19]
        ,[21,22,23], [24,25,26], [27,28,29]
        ,[31,32,33], [34,35,36], [37,38,39]
        ,[41,42,43], [44,45,46], [47,48,49]
        ,[51,52,53], [54,55,56], [57,58,59]
        ,[61,62,63], [64,65,66], [67,68,69]
        ,[71,72,23], [74,75,76], [77,78,79]
        ,[81,82,33], [84,85,86], [87,88,89]
        ,[91,92,93], [94,95,96], [97,98,99]
    ]

    cols = ['A', 'B', 'C']

    df = pd.DataFrame(data=data, columns=cols)
    ddf = dd.from_pandas(df, npartitions=1)
    ddf = ddf.persist()
    wait(ddf)

    return ddf

def calc_ddf_B(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 2

def calc_ddf_C(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 3.
    """

    return source_ddf * 3

def example_flow():
    """
    Example Dask workflow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
    """

    ddf_A = load_ddf_A()
    ddf_B = calc_ddf_B(ddf_A)
    ddf_C = calc_ddf_C(ddf_A)

    print('ddf_B is: \n')
    print(ddf_B.head(10))
    print('\nddf_C is: \n')
    print(ddf_C.head(10))

if __name__ == '__main__':
    cluster = LocalCluster()
    client = Client(cluster)
    example_flow()

Any suggestions on where I'm going wrong with Prefect would be much appreciated!

1

There are 1 best solutions below

0
giob12 On

You are submitting to Dask the load_ddf_A task, which in turn performs a Dask .persist() over your dataframe. The result of the task will be a dataframe having some unresolved Dask futures, and I'm not sure whether that is handled correctly by Prefect tasks when passing results around.

Indeed, if you call .compute() instead of .persist() you'll see the flow completing correctly, but I understand that is not ideal.