I write Dag for Airflow. My Dag consists for 4 tasks: start_task(DummyOperator)>> read_veriables(PythonOperator) >> make_table(LivyOperator)>>end_task(DummyOperator). In read_variables i've wrote func, which push in XCOM some values
def read_vars_func(ti=None):
for key,val in args.items():
ti.xcom_push(key=key, value=value)
read_variables = PythonOperator(
task_id = 'read_vars',
dag = dag,
provide_context = True,
python_callable = read_vars_func
)
in task make_table i read values from task_instance
make_table = LivyOperator(
task_id = 'make_table',
dag = dag,
livy_conn_id = "{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'livy_conn_id')}}",
file = "{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'script_path')}}"
...
)
When run dag i catche an exception The conn_id '{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'livy_conn_id')}}' isn't defines.
Why xcom in task = make_table is empty?
i read docs https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html
publish all code of DAG
import datetime as dt
from airflow import DAG
from airflow.operator.dummy import DummyOperator
from airflow.operator.python import PythonOperator
args={ 'start_month': '20210901',
'end_month': '20211001',
'debug': 'True',
'env': 'test'
}
default_args = {
'owner': 'airflow',
'depends_on_part': False,
'start_date': dt.datetime(2021, 9, 1)
'email':['my@email']
'email_on_failure': True
'email_on_retry': False,
'retries':0,
'retry_delay':dt.timedelta(minutes = 20),
}
dag = DAG(
dag_id = 'f_dag',
default_args = default_args,
schedule_interval = None,
catchup = False,
tags = ['DIT']
description = 'Rides'
)
start_task = DummyOperator(task_id = 'start', dag=dag)
end_task = DummyOperator(task_id = 'end', dag=dag)
def read_vars_func(ti=None):
args['livy_conn_id']='livy_connection_id' if args['env']=='test' else 'livy_connection_id'
args['script_path']='/data/src'
for key,val in args.items():
ti.xcom_push(key=key, value=value)
read_variables = PythonOperator(
task_id = 'read_vars',
dag = dag,
provide_context = True
python_callable = read_vars_func
)
make_table = LivyOperator(
task_id = 'make_table',
dag = dag,
livy_conn_id = "{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'livy_conn_id')}}",
polling_interval = 30,
name = "dit_{{task_instance.xcom_pull(task_ids='read_vars', key='start_month')}}"
file = "{}/exec_file.py".format("{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'script_path')}}")
args = [
'--start_month', "{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'start_month')}}",
'--end_month', "{{ task_instance.xcom_pull(task_ids = 'read_vars', key='end_month')}}"
]
)
start_task>>read_variables>>make_table>>end_task
Maybe the error isn't in xcom, but in definition of connections? but there is livy_connection_id in list connection
It's all fine in how you handle xcom. The issue lies elsewhere.
In LivyOperator docs all constructor arguments that are templated are described with proper annotation.
livy_conn_id
is not. Your:is taken literally, it's not translated into:
Airflow documentation has a solution for this exact case - extending an operator to add a templated field. In your case this should work:
When you use
BetterLivyOperator
in your DAG it should correctly evaluate template with xcom value.