Airflow task instance xcom is empty in next task

131 Views Asked by At

I write Dag for Airflow. My Dag consists for 4 tasks: start_task(DummyOperator)>> read_veriables(PythonOperator) >> make_table(LivyOperator)>>end_task(DummyOperator). In read_variables i've wrote func, which push in XCOM some values

def read_vars_func(ti=None):
    for key,val in args.items():
        ti.xcom_push(key=key, value=value)


read_variables = PythonOperator(
    task_id = 'read_vars',
    dag = dag,
    provide_context = True,
    python_callable = read_vars_func
)

in task make_table i read values from task_instance

make_table = LivyOperator(
    task_id = 'make_table',
    dag = dag,
    livy_conn_id = "{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'livy_conn_id')}}",
    file = "{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'script_path')}}"
...
)

When run dag i catche an exception The conn_id '{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'livy_conn_id')}}' isn't defines.

I have look the xcom in tasks xcom in tasks

Why xcom in task = make_table is empty?

i read docs https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html

publish all code of DAG

import datetime as dt
from airflow import DAG
from airflow.operator.dummy import DummyOperator
from airflow.operator.python import PythonOperator


args={ 'start_month': '20210901',
       'end_month': '20211001', 
       'debug': 'True', 
       'env': 'test'
     } 

default_args = {
'owner': 'airflow',
'depends_on_part': False,
'start_date': dt.datetime(2021, 9, 1)
'email':['my@email']
'email_on_failure': True
'email_on_retry': False,
'retries':0,
'retry_delay':dt.timedelta(minutes = 20),
}

dag = DAG(
dag_id = 'f_dag',
default_args = default_args,
schedule_interval = None,
catchup = False,
tags = ['DIT']
description = 'Rides'
)

start_task = DummyOperator(task_id = 'start', dag=dag)
end_task = DummyOperator(task_id = 'end', dag=dag)

def read_vars_func(ti=None):
    args['livy_conn_id']='livy_connection_id' if args['env']=='test' else 'livy_connection_id'
    args['script_path']='/data/src'
    for key,val in args.items():
        ti.xcom_push(key=key, value=value)

read_variables = PythonOperator(
    task_id = 'read_vars',
    dag = dag,
    provide_context = True
    python_callable = read_vars_func
)

make_table = LivyOperator(
    task_id = 'make_table',
    dag = dag,
    livy_conn_id = "{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'livy_conn_id')}}",
    polling_interval = 30,
    name = "dit_{{task_instance.xcom_pull(task_ids='read_vars', key='start_month')}}"
    file = "{}/exec_file.py".format("{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'script_path')}}")
    args = [
'--start_month', "{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'start_month')}}",
'--end_month', "{{ task_instance.xcom_pull(task_ids = 'read_vars', key='end_month')}}"
]
)

start_task>>read_variables>>make_table>>end_task


Maybe the error isn't in xcom, but in definition of connections? but there is livy_connection_id in list connection enter image description here

1

There are 1 best solutions below

0
On

It's all fine in how you handle xcom. The issue lies elsewhere.

In LivyOperator docs all constructor arguments that are templated are described with proper annotation. livy_conn_id is not. Your:

livy_conn_id = "{{ task_instance.xcom_pull(task_ids = 'read_vars', key = 'livy_conn_id')}}"

is taken literally, it's not translated into:

livy_conn_id = "livy_connection_id"

Airflow documentation has a solution for this exact case - extending an operator to add a templated field. In your case this should work:

class BetterLivyOperator(LivyOperator):
    template_fields: Sequence[str] = (*LivyOperator.template_fields, "livy_conn_id")

When you use BetterLivyOperator in your DAG it should correctly evaluate template with xcom value.