Migration Prefect1-2 and multithreading

60 Views Asked by At

Please, need your help. I am seeking a way to migrate my quiet complexity Flow code from Prefect-1 to Prefect-2. Flow worked perfectly on the old Prefect version. But when I remade it to 2-nd version it started working well only on local runs.

So, initial code that work on prefect1 looks like this:

with Flow("load-somename-api", schedule=schedule) as flow:
    working_dir = "/home/…"
    flow.run_config = LocalRun(working_dir=working_dir)

    flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=6)
      # Some parameters
    date_range_type = Parameter("p01_date_range_type", default="LAST_7_DAYS")
    start_date = Parameter("p02_start_date", default=None)
    end_date = Parameter("p03_end_date", default=None)
    campaign_list = Parameter("p04_campaign_list", default="ALL")

    date_range = get_dates(date_range_type, start_date, end_date)
    truncate_loading = truncate_table()
    campaigns_df = get_campaigns(date_range, campaign_list, truncate_loading)

# Here starting map on 6 workers and subtasks began working in parallel, waiting the completions of “campaigns_df”
# There is about 3-4 hundreds of subtasks
    campaign_info = get_campaigns_info.map(campaigns_df[0])

# After “campaign_info” completion next task should start also in parallel 6-threads load
# There is about 2 thousands subtasks
    profile_info = get_profiles_info.map(
        campaigns_df[1], upstream_tasks=[unmapped(campaign_info)]
    )
# Then we took some list with a lot of int values
    banners_list = get_banners_list(upstream_tasks=[profile_info])

# First we mapped list and execute about 2-3 thsnds subtasks
    banners_info = get_banners_info.map(banners_list)

# Second – mapping list in another task and execute them on 6 threads
    banners_stats = get_banners_stats.map(
        banners_list, unmapped(date_range), upstream_tasks=[unmapped(banners_info)]
    )

Code after remake on Prefect-2:

@flow(name="load_somename_api", task_runner=DaskTaskRunner(
    cluster_kwargs={"n_workers": 3, "threads_per_worker": 2}
))
def load_somename_api_flow(params: InputParameters):

    date_range = get_dates(params.date_range_type, params.start_date, params.end_date)
    truncate_loading = truncate_table()
    campaigns_df = get_campaigns(date_range, params.campaign_list, truncate_loading)

# There is trouble, I think. Locally it creates a lot of (about 2-3k) tasks in queue and execute them – 6 parts in batch and finally all flow complete fine
# But on server it is only add all tasks in queue and don’t execute them, so when the next task start all flow dropping into error
    campaigns_info =  [get_campaigns_info.submit(i, wait_for=[campaigns_df]) for i in campaigns_df[0]]
    profile_info = [get_profiles_info.submit(j, wait_for=[campaigns_info]) for j in campaigns_df[1]]

    banners_list = get_banners_list(wait_for=[profile_info])

    for k in range(len(banners_list)):
        bn_lst = banners_list_slice(k, banners_list)
        for bn in bn_lst:
            banners_info = get_banners_info.submit(bn, wait_for=[bn_lst])
            get_banners_stats_tasks = get_banners_stats.submit(bn, unmapped(date_range), wait_for=[banners_info])

Logs info when flow falls in error. It submits 1793 subtasks and starts to run the next task.

enter image description here

Any idea how to make it work on Prefect2 server (not only local) just like it worked on Prefect1? Some tasks should be performed in 6 or more threads. Not just to be added to the queue, but to be executed also.

Thank you!

0

There are 0 best solutions below