I have built a flow which implicitly skips running a given task if a kwarg is empty.
I use something like this within task function for skipping logic:
if kwargs.get('processors', Hierarchy()).__len__() == 0:
raise signals.SKIP('skipping task',
result=Prediction())
I want to build some unit tests to make sure that the final state of said task is skipped. What is the easiest way to get state at a task level?
I can see from docs how to get for a flow but not for a task.
Update
To add to Chris's response, I used his 1st proposed option. As my flow is defined outside of the tests I created a simple function to get a set of tasks that had skipped. In the test this was compared against a list of tasks that should have skipped:
def get_skipped_tasks(flow_state):
return set(key.name for key, value in flow_state.result.items() if value.is_skipped())
There are a few ways that I'll include here for completeness; for my example I'll use this basic flow:
Run the whole flow
When running interactively you can always run the whole flow and access individual task states from the parent flow run state; note that when you "call" a task (e.g.,
is_even(random_number)
) a copy is created, so you need to track these copies correctly.Run a piece of the flow with mocked data
When running interactively you can also pass a dictionary of task -> state that the runner will respect; these states can optionally be provided data:
Use a TaskRunner
Lastly, if you want to run state-based tests on this task alone you can use a
TaskRunner
. This gets a little more complicated because you have to recreate the upstream dependencies usingEdge
s.