I have to create a DAG with tasks that are dynamically generated through dynamic task mapping and do the subsequent actions depending on the state of each task. For example, what I want is a DAG with a structure as shown in the following picture:

enter image description here

task A returns a list of dict. each task B_1, ..., B_n processes the each dict. Number of task B is the same with length of the list returned from task A. each task C_1, ..., C_n has to be triggered when the upstream B_1, ..., B_n task is finished with successful state. each task D_1, ..., D_n has to be triggered when the upstream B_1, ..., B_n task is finished with failed state.

While making the DAG, I'm confusing because of the mix of concepts such as task_group and branch task etc.

My code is like:

    @task
    def A_task() -> list:
        # A() returns [{"arg1": "foo", "arg2": ["bar", "baz", ...]}]
        return A()

    @task
    def B_task(arg1: str, arg2: list) -> None:
        B(arg1, arg2)


    @task(trigger_rule=TriggerRule.ALL_SUCCESS)
    def C_task() -> None:
        print("success")

    @task(trigger_rule=TriggerRule.ONE_FAILED)
    def D_task() -> None:
        print("failed")

    data = A_task()
    B_task.expand_kwargs(data) >> [C_task(), D_task()]

I've tried this code but this isn't I wanted.

enter image description here

For example task D has to be created and triggered for each task B's status. But with this code, when one of the B tasks fails, the D task runs once.

    @task
    def A_task() -> list:
        # A() returns [{"arg1": "foo", "arg2": ["bar", "baz", ...]}]
        return A()

    @task_group
    def B_task_group(arg1, arg2):
        @task
        def B_task(arg1, arg2):
            B(arg1, arg2)

        @task(trigger_rule=TriggerRule.ALL_SUCCESS)
        def C_task() -> None:
            print("success")

        @task(trigger_rule=TriggerRule.ONE_FAILED)
        def D_task() -> None:
            print("failed")

        @task
        def E_task() -> None:
            print("after C, print this")

        B_task(arg1, arg2) >> C_task() >> E_task() >> F_task()
        B_task(arg1, arg2) >> D_task() >> F_task()

    A_task() >> B_task_group()

I've tried this code using task_group. But the graph is like:

enter image description here

So I have no idea how I configure the tasks to get a DAG what I explained. I'd appreciate any help.

0

There are 0 best solutions below