Splitting tasks using Prefect

1.3k Views Asked by At

I need to create a batch that can process tasks with this workflow:

                                             | task 4
                                  | task 3 ->| task 4
                       | task 2 ->           | task 4
                           
                                  | task 3 ->| task 4
    input ->  task 1 ->
                       | task 2 -> ... 
  • Task #1 processes input data and returns a list of lists.
  • Task #2 receives the list from task #1 and also returns a list of lists.
  • Task #3 receives the list from task #2 and also returns a list of lists.
  • Task #4 receives the list from task #4 and processes data in the list.

For example, task #1 returns [[],[],[],[]]. It means flow must run 4 task #2 in parallel. Each task #2 returns [[],[],[]]. Now we must have 4x3 task #3. Then task #3 returns [[],[]]. Finally flow must run 4x3x2 tasks #4.

Is it possible to do using Prefect Flow? I tried to use mapping functionality but it seems does not support such complicated workflow schema (or perhaps I do not use it properly).

with Flow('test') as flow:
   res1 = task1()
   res2 = task2.map(res1)
   res3 = task3.map(res2)
   res4 = task4.map(res3)

When I run the flow task1 returns the correct number of lists. Then flow creates 4 task2 and each of them returns the list of three lists. But instead of creating 12 task3, the flow creates only 4 of them. Each task3 receives the list of 4 lists as it was created with task1 instead of 1 list from task2.

Any ideas about how can I create such a workflow?

0

There are 0 best solutions below