Apache Airflow:Create reusable custom TaskGroup

114 Views Asked by At

I am trying to create a custom TaskGroup class to replace existing subdags in our airflow pipelines.

My concern is to pass the dag object instead of using context manager. I used this tutorial to create custom class.

Let's have a look at my custom TaskGroup code:

from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task

class TestTaskGroup(TaskGroup):
    def __init__(self, dag,  group_id="TestTaskGroup", *args, **kwargs):
        super().__init__(dag=dag,group_id=group_id, *args, **kwargs)
        self.dag = dag

        @task(task_group=self)
        def emptyTask():            
            EmptyOperator(task_id="emptytask", dag=self.dag)
            

        @task(task_group=self)
        def pythonTask():            
            PythonOperator(task_id="pythontask", dag=self.dag, python_callable=lambda:print("Hello World"))

        emptyTask() >> pythonTask()

In testdag.py :


from taskgroups.testtaskgroup import TestTaskGroup
from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator

dag = DAG(
    dag_id="testdag",
    start_date = datetime(2024,2,26),
    schedule_interval="@daily",
    catchup=False,
    max_active_runs=1,
    default_args={
        "owner":"hemant.sah",
    }
) 

sometask = EmptyOperator(task_id="sometask", dag=dag)
grouptask = TestTaskGroup(dag=dag)

sometask >>grouptask

Above code gives me error:

airflow.exceptions.AirflowException: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(_PythonDecoratedOperator): TestTaskGroup.emptyTask>, <Task(_PythonDecoratedOperator): TestTaskGroup.pythonTask>]

What I tried:

I tried bringing TestTaskGroup inside context manager, this way it works but I want to understand why it's not working when I pass a dag object in TestTaskGroup(dag=dag)

with DAG() as dag:
    sometask = EmptyOperator(task_id="sometask")
    grouptask = TestTaskGroup() # in this way I am not passing dag object and it works
    sometask >> grouptask

I could not find a way where I can pass dag object without creating context manager. Preferring this approach because we have many dags and by following this approach can reduce code changes.

1

There are 1 best solutions below

5
Jarek Potiuk On

I believe your TaskGroup is badly indented. You should define them in the class and not in the constructor (that would be my guess)