Airflow/PostgresOperator Impossible to use xcom + params with Jinja template

100 Views Asked by At

I have a DAG with a loop on operator. Like this :

with DAG(
    dag_id="injection_of_items_size",
    default_args=DEFAULT_ARGS,
    schedule_interval="@hourly",
) as dag:

    for item in ["CAR", "HOUSE"]:
        size_count = PythonOperator(
            task_id=f"{items}_size",
            python_callable=calculate_size,
        )

        
        import_size = PostgresOperator(
            task_id=f"{items}_import_size",
            postgres_conn_id=CONN_ID,
            sql="/import_size.sql",
            autocommit=True,
            params={"item": item}
        )

My Python Operator return in xcom an int value that I want to get to inject in a table. In my .sql file I have this :

INSERT INTO public.size_items(
    items, size
)
VALUES (
   '{{ params.item }}',
   '{{{{ ti.xcom_pull(task_ids="{{ params.item }}_size") }}}}'
);

I tried different format to get the xcom but impossible to find the good one. Is there a possibility ? If not I can simply directly use sql in sql param of the PostgesOperator, but for huge query it's not really easy to read.

0

There are 0 best solutions below