I have connection to local Postgres DB. I'm trying take some rows from table in 'task_get_titanic_data' with get_titanic_data
function and apply some changes by second task task_process_titanic_data
and process_titanic_data
.
import pandas as pd
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_titanic_data():
sql_stmt = 'select * from titanic limit 100'
pg_hook = PostgresHook(
postgres_conn_id='postgres_db',
schema='airflow_database'
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
cursor.execute(sql_stmt)
result = cursor.fetchall()
return result
def process_titanic_data(ti):
# print(ti.xcom_pull(task_ids=['get_titanic_data']))
titanic = ti.xcom_pull(task_ids=['get_titanic_data'])
if not titanic:
raise Exception('No data')
titanic = pd.DataFrame(data=titanic, columns=[
'PassengerId', 'Survived',
'Pclass', 'Name',
'Sex', 'Age',
'SibSp', 'Parch',
'Fare', 'Embarked'
])
titanic = titanic[(titanic.Age <= 16) & (titanic.Pclass == 1) & (titanic.Survived == 1)]
titanic.to_csv(Variable.get('tmp_titanic_csv_location'), index=False)
with DAG(
dag_id='postgres_db_dag',
schedule_interval='@daily',
start_date=datetime(year=2023, month=2, day=21),
catchup=False
) as dag:
task_get_titanic_data = PythonOperator(
task_id='get_titanic_data',
python_callable=get_titanic_data,
# do_xcom_push=True
)
task_process_titanic_data = PythonOperator(
task_id='process_titanic_data',
python_callable=process_titanic_data
)
When I take data by first task - All is ok.
Problem is: when I start second task raise my Exception from process_titanic_data
function:
- Why?
- How can I fix this?
I have no idea about causes. Seems like all Variables created and all paths is ok.
Tried go inside Task_instance and understand cause... Checked all Variables and Paths... Google it as much as I can... Saw and tried this. Nothing!
The "process_titanic_data" is most likely pulling nothing from XCom because it is running concurrently with the "get_titanic_data" task. If you are not directly using the output of a task directly as an input for another (via TaskFlow API or otherwise), you need to explicitly set the dependencies.
Adding
get_titanic_data >> process_titanic_data
add the end of the DAG file should do the trick.Side note, one "gotcha" I also see is how you're pulling the XComs. If the
task_ids
arg is an iterable of task IDs (e.g. a list, etc.), Airflow assumes you want to retrieve multiple XComs and the fetched XComs will be a list of those retrieved XComs. Presumably you don't want a list of a list of records from the 'titanic' table. Try usingtitanic = ti.xcom_pull(task_ids='get_titanic_data')
instead.