I have to create a DAG with tasks that are dynamically generated through dynamic task mapping and do the subsequent actions depending on the state of each task. For example, what I want is a DAG with a structure as shown in the following picture:
task A returns a list of dict. each task B_1, ..., B_n processes the each dict. Number of task B is the same with length of the list returned from task A. each task C_1, ..., C_n has to be triggered when the upstream B_1, ..., B_n task is finished with successful state. each task D_1, ..., D_n has to be triggered when the upstream B_1, ..., B_n task is finished with failed state.
While making the DAG, I'm confusing because of the mix of concepts such as task_group and branch task etc.
My code is like:
@task
def A_task() -> list:
# A() returns [{"arg1": "foo", "arg2": ["bar", "baz", ...]}]
return A()
@task
def B_task(arg1: str, arg2: list) -> None:
B(arg1, arg2)
@task(trigger_rule=TriggerRule.ALL_SUCCESS)
def C_task() -> None:
print("success")
@task(trigger_rule=TriggerRule.ONE_FAILED)
def D_task() -> None:
print("failed")
data = A_task()
B_task.expand_kwargs(data) >> [C_task(), D_task()]
I've tried this code but this isn't I wanted.
For example task D has to be created and triggered for each task B's status. But with this code, when one of the B tasks fails, the D task runs once.
@task
def A_task() -> list:
# A() returns [{"arg1": "foo", "arg2": ["bar", "baz", ...]}]
return A()
@task_group
def B_task_group(arg1, arg2):
@task
def B_task(arg1, arg2):
B(arg1, arg2)
@task(trigger_rule=TriggerRule.ALL_SUCCESS)
def C_task() -> None:
print("success")
@task(trigger_rule=TriggerRule.ONE_FAILED)
def D_task() -> None:
print("failed")
@task
def E_task() -> None:
print("after C, print this")
B_task(arg1, arg2) >> C_task() >> E_task() >> F_task()
B_task(arg1, arg2) >> D_task() >> F_task()
A_task() >> B_task_group()
I've tried this code using task_group. But the graph is like:
So I have no idea how I configure the tasks to get a DAG what I explained. I'd appreciate any help.