Airflow ti.xcom_pull() return None

2.2k Views Asked by At

I have connection to local Postgres DB. I'm trying take some rows from table in 'task_get_titanic_data' with get_titanic_data function and apply some changes by second task task_process_titanic_data and process_titanic_data.

import pandas as pd
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook


def get_titanic_data():
    sql_stmt = 'select * from titanic limit 100'
    pg_hook = PostgresHook(
        postgres_conn_id='postgres_db',
        schema='airflow_database'
    )
    pg_conn = pg_hook.get_conn()
    cursor = pg_conn.cursor()
    cursor.execute(sql_stmt)
    result = cursor.fetchall()
    return result


def process_titanic_data(ti):
    # print(ti.xcom_pull(task_ids=['get_titanic_data']))
    titanic = ti.xcom_pull(task_ids=['get_titanic_data'])
    if not titanic:
        raise Exception('No data')

    titanic = pd.DataFrame(data=titanic, columns=[
                                                    'PassengerId', 'Survived',
                                                    'Pclass', 'Name',
                                                    'Sex', 'Age',
                                                    'SibSp', 'Parch',
                                                    'Fare', 'Embarked'
                            ])

    titanic = titanic[(titanic.Age <= 16) & (titanic.Pclass == 1) & (titanic.Survived == 1)]
    titanic.to_csv(Variable.get('tmp_titanic_csv_location'), index=False)


with DAG(
        dag_id='postgres_db_dag',
        schedule_interval='@daily',
        start_date=datetime(year=2023, month=2, day=21),
        catchup=False
) as dag:
    task_get_titanic_data = PythonOperator(
        task_id='get_titanic_data',
        python_callable=get_titanic_data,
        # do_xcom_push=True
    )

    task_process_titanic_data = PythonOperator(
        task_id='process_titanic_data',
        python_callable=process_titanic_data
    )

When I take data by first task - All is ok.

1

Problem is: when I start second task raise my Exception from process_titanic_data function:

2

  1. Why?
  2. How can I fix this?

I have no idea about causes. Seems like all Variables created and all paths is ok.

Tried go inside Task_instance and understand cause... Checked all Variables and Paths... Google it as much as I can... Saw and tried this. Nothing!

1

There are 1 best solutions below

0
On

The "process_titanic_data" is most likely pulling nothing from XCom because it is running concurrently with the "get_titanic_data" task. If you are not directly using the output of a task directly as an input for another (via TaskFlow API or otherwise), you need to explicitly set the dependencies.

Adding get_titanic_data >> process_titanic_data add the end of the DAG file should do the trick.

Side note, one "gotcha" I also see is how you're pulling the XComs. If the task_ids arg is an iterable of task IDs (e.g. a list, etc.), Airflow assumes you want to retrieve multiple XComs and the fetched XComs will be a list of those retrieved XComs. Presumably you don't want a list of a list of records from the 'titanic' table. Try using titanic = ti.xcom_pull(task_ids='get_titanic_data') instead.