We have dags running in astronomer 9.4.0 using airflow version 2.7.2+astro.3
I have a dag which have KubernetesPodOperator as a task , which runs the golang application. Inside the Golang application I am adding data into the xcom using /airflow/xcom/return.json. When I run the Dag which runs the KubernetesPodOperator task, it stores the data successfully in xcom and I can see data in airflow xcom UI.
Now the same Golang application is accepting data from xcom which it stores previously in xcom. But it is not able to pull data from xcom, instead it prints None. I am trying to pass data to golang application via env variable of KubernetesPodOperator.
My Dag with KubernetesPodOperator looks like this:
namespace = conf.get("Kubernetes", "NAMESPACE")
default_args = {
'owner' : 'myself'
'start_date' : datetime(2024, 2, 2, 0, 0),
'retries': 3,
'provide_context': True,
}
dag = DAG(
'redshift_test',
description='DAG to retrieve data from redshift',
default_args=default_args,
schedule_interval='*/15 * * * *',
catchup=False,
tags=[
"redshift-operations'
],
max_active_runs=1,
)
redshift_task = KubernetesPodOperator(
dag=dag,
namespace=namespace,
image=<private ecr repo url>,
cmds=["./redshift-task", "redshift-check"],
arguments=[],
name='redshift_task',
task_id='redshift_task',
get_logs=True,
env_vars={
'LOG_LEVEL':'info',
'MAX_DATE': "{{ti.xcom_pull(task_ids='redshift_task', key='return_value')}}",
}
depends_on_past=False,
do_xcom_push=True,
)
I tried using kwargs and context to get data from xcom and push it to env_vars of KubernetesPodOperator.
env_vars={
'MAX_DATE': "{{context['ti'].xcom_pull(task_ids='redshift_task', key='return_value')}}"
}
env_vars={
'MAX_DATE': "{{kwargs['ti'].xcom_pull(task_ids='redshift_task', key='return_value')}}"
}
But these also not working.
I was able to solve this later. When we try to pull using ti.xcom_pull, it pulls data from current execution. But in my case I want the data added in last execution. So for that I need to add include_prior_dates=True in xcom_pull. After adding this it was able to pull data from last execution.
Something like this... 'MAX_DATE': "{{ti.xcom_pull(task_ids='redshift_task', key='return_value', include_prior_dates=True)}}",