prefect loop over flows: two schedules

737 Views Asked by At

I have a specific ETL problem: in my case data comes from a paginated resource, and can be fetched by pages of fixed size. So the call to the API specifies page 1, page 2, page 3 etc. According to my plan each page is then transformed and loaded. So an ETL of a page is an etl_batch flow (of atomic (page)tasks), but then I would like to run ETL over the whole paginated resource, until the last available page. So I thought this should be solved by a flow with a loop (called every hour, for example) over etl_batch flows (called every couple of seconds). Based on references below I realized the model has to be cast as a flow of a task of (page)flows of (page)tasks.

docs.prefect.io/core/advanced_tutorials/task-looping.html

stackoverflow.com/questions/68103561/looping-tasks-in-prefect

Here's the issue I encountered : if I specify the schedule for etl_batch flow its state appears to be reset. But I comment out etl_batch flow schedule schedule_every_second, then the loop is working correctly, i.e page is incremented (please see the snippet below). However, for this ETL pipeline it is important to run etl_batch flow and (page)tasks on schedule due to possible saturation of the API.

I assume the problem is due to the fact that cache is being cleared when etl_batch flow is run on schedule (I tried to insert cache_for= here and there to no avail). Do you have any suggestions?

from datetime import timedelta
from prefect import task, Flow, context, Parameter
from prefect.schedules import IntervalSchedule
from prefect.engine.signals import LOOP

schedule = IntervalSchedule(
    start_date=None,
    interval=timedelta(seconds=3),
)

schedule_every_second = IntervalSchedule(
    start_date=None,
    interval=timedelta(seconds=1),
)


@task
def next_page(page):
    print(f" {page}")
    return page + 1

@task
def paginate():
    current_page = context.get("task_loop_result", 1)
    with Flow('etl_batch',
              # schedule=schedule_every_second
              ) as flow:
        x = Parameter('x')
        add = next_page(x)
    state = flow.run(parameters={'x': current_page})
    nr = state.result[add]._result.value

    if nr > 3:
        return nr
    raise LOOP(result=nr)


if __name__ == "__main__":
    with Flow("etl", schedule=schedule) as etl_flow:
        r = paginate()
    f = etl_flow.run()

1

There are 1 best solutions below

2
On

You are trying to use both task looping and a scheduled flow together to fetch from your external resource multiple times. You should only need one. You should use task looping to iterate a task n times. Reasons below.

Calling flow.run() on a scheduled flow will run infinitely, your program never moves forward from the line flow.run(parameters={'x': current_page}) is called. The surrounding task isn't looping at all. You are just calling the same flow with the same x=current_page value over and over again, on a schedule.

You should remove the schedule from your sub flow, and simply use time.sleep(1) to enforce a waiting period. Then you can use loop results to collect your paginated resources, as described