I've below dag
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import pendulum
from datetime import datetime
local_tz = pendulum.timezone('US/Pacific')
DAG_ID = "demo"
with DAG(
dag_id=DAG_ID,
default_args={
"owner": "GFT Reporting Services",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False
},
default_view="graph",
schedule_interval=None,
start_date=datetime(2022, 1, 4, tzinfo=local_tz),
tags=['demo'],
catchup=False
) as dag:
def get_runtime_params(**kwargs):
dag_run: DagRun = kwargs['dag_run']
message = dag_run.conf['message']
period = message['period']
Variable.set('trigger_period', value=period, serialize_json=True)
return "Its Demo"
def get_doing(**kwargs):
print("doing Airflow")
start = DummyOperator(task_id='begin_execution')
end = DummyOperator(task_id='end_execution')
with TaskGroup(group_id='demo_compute') as demo_compute:
task1 = PythonOperator(dag=dag,
task_id='get_params_to_update_config',
python_callable=get_runtime_params
)
taskName = Variable.get("trigger_period", deserialize_json=True)
with TaskGroup(group_id=taskName) as inside_demo_compute:
get_doingTask = PythonOperator(dag=dag,
task_id='get_doing',
python_callable=get_doing
)
task1 >> get_doingTask
start >> demo_compute >> end
As you can see, taskName
default value is assigned from Variable trigger_period
, and when dag is triggered and running, it takes value from trigger_period
variable only, but its updated by the value from conf
Any option to not to use taskName
default value from Variable = trigger_period
and manage it in Dag itself ?
Any suggestion will be helpful