I am trying to create a custom TaskGroup class to replace existing subdags in our airflow pipelines.
My concern is to pass the dag object instead of using context manager. I used this tutorial to create custom class.
Let's have a look at my custom TaskGroup code:
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
class TestTaskGroup(TaskGroup):
def __init__(self, dag, group_id="TestTaskGroup", *args, **kwargs):
super().__init__(dag=dag,group_id=group_id, *args, **kwargs)
self.dag = dag
@task(task_group=self)
def emptyTask():
EmptyOperator(task_id="emptytask", dag=self.dag)
@task(task_group=self)
def pythonTask():
PythonOperator(task_id="pythontask", dag=self.dag, python_callable=lambda:print("Hello World"))
emptyTask() >> pythonTask()
In testdag.py :
from taskgroups.testtaskgroup import TestTaskGroup
from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator
dag = DAG(
dag_id="testdag",
start_date = datetime(2024,2,26),
schedule_interval="@daily",
catchup=False,
max_active_runs=1,
default_args={
"owner":"hemant.sah",
}
)
sometask = EmptyOperator(task_id="sometask", dag=dag)
grouptask = TestTaskGroup(dag=dag)
sometask >>grouptask
Above code gives me error:
airflow.exceptions.AirflowException: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(_PythonDecoratedOperator): TestTaskGroup.emptyTask>, <Task(_PythonDecoratedOperator): TestTaskGroup.pythonTask>]
What I tried:
I tried bringing TestTaskGroup inside context manager, this way it works but I want to understand why it's not working when I pass a dag object in TestTaskGroup(dag=dag)
with DAG() as dag:
sometask = EmptyOperator(task_id="sometask")
grouptask = TestTaskGroup() # in this way I am not passing dag object and it works
sometask >> grouptask
I could not find a way where I can pass dag object without creating context manager. Preferring this approach because we have many dags and by following this approach can reduce code changes.
I believe your TaskGroup is badly indented. You should define them in the class and not in the constructor (that would be my guess)