How to get in prefect core local task final state?

1.1k Views Asked by At

I have built a flow which implicitly skips running a given task if a kwarg is empty.

I use something like this within task function for skipping logic:

if kwargs.get('processors', Hierarchy()).__len__() == 0:
                    raise signals.SKIP('skipping task',
                                       result=Prediction())    

I want to build some unit tests to make sure that the final state of said task is skipped. What is the easiest way to get state at a task level?

I can see from docs how to get for a flow but not for a task.

Update

To add to Chris's response, I used his 1st proposed option. As my flow is defined outside of the tests I created a simple function to get a set of tasks that had skipped. In the test this was compared against a list of tasks that should have skipped:

def get_skipped_tasks(flow_state):
        return set(key.name for key, value in flow_state.result.items() if value.is_skipped())
1

There are 1 best solutions below

1
On BEST ANSWER

There are a few ways that I'll include here for completeness; for my example I'll use this basic flow:

from prefect import task, Flow
from prefect.engine.signals import SKIP
import random


@task
def random_number():
    return random.randint(0, 100)

@task
def is_even(num):
    if num % 2:
        raise SKIP("odd number")
    return True

with Flow("dummy") as flow:
    even_task = is_even(random_number)

Run the whole flow

When running interactively you can always run the whole flow and access individual task states from the parent flow run state; note that when you "call" a task (e.g., is_even(random_number)) a copy is created, so you need to track these copies correctly.

flow_state = flow.run()

assert flow_state.result[even_task].is_skipped() # for example

Run a piece of the flow with mocked data

When running interactively you can also pass a dictionary of task -> state that the runner will respect; these states can optionally be provided data:

from prefect.engine.state import Success

mocked_state = Success(result=2)


flow_state = flow.run(task_states={random_number: mocked_state})
assert not flow_state.result[even_task].is_skipped()

Use a TaskRunner

Lastly, if you want to run state-based tests on this task alone you can use a TaskRunner. This gets a little more complicated because you have to recreate the upstream dependencies using Edges.

from prefect.engine.task_runner import TaskRunner
from prefect.edge import Edge

runner = TaskRunner(task=even_task)
edge = Edge(key="num", upstream_task=random_number, downstream_task=even_task)


task_state = runner.run(upstream_states={edge: mocked_state})
assert not task_state.is_skipped()