Convert xcom_pull list in the "in" clause of SQL query

231 Views Asked by At

I am using Apache Airflow with a DAG with 2 tasks.

Task 1 pulls a list of ids using a SELECT query, and sets the result using xcom_push.

Task 2 needs to xcom_pull that list and convert it to a comma-separated string and use that in an IN clause of an UPDATE query. I am however unable to parse this list returned by xcom_pull using

join(map(str, "{{xcom_pull(key='file_ids_to_update'}}"))

Looking for help regarding how do I convert a list returned using xcom_pull and convert it to a comma separated list of ids

I want to use xcom_pull and parse its response as a comma:

def get_processed_files(ti):
    sql = "select id from files where status='DONE'"
    pg_hook = PostgresHook(postgres_conn_id="conn_id")
    connection = pg_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(sql)
    files = cursor.fetchall()
    ti.xcom_push(key="file_ids_to_update", value=files)

archive_file = PostgresOperator(task_id="archive_processed_file", postgres_conn_id="upflow",
                                sql="update files set update_date=now() where id in (%(list_of_ids)s)",
                                parameters={"list_of_ids": ",".join(map(str, "{{ti.xcom_pull(key='file_ids_to_update')}}"))})
1

There are 1 best solutions below

0
On

In fact, the method join should be inside your jinja template, where you want to apply it on the result of xcom_pull during the run_time and not on the string of your jinja template:

parameters={"list_of_ids": "{{ ','.join(ti.xcom_pull(task_ids='file_ids_to_update')) }}"}

Here is an example which can help you to debug and easily test the method:

with DAG(
        'read_and_parse_xcom',
        start_date=datetime(2022, 8, 26)
) as dag:
    task1 = PythonOperator(
        task_id="t1",
        python_callable=lambda: ['id1', 'id2', 'id3', 'id4']
    )

    task2 = BashOperator(
        task_id="t2",
        bash_command="echo {{ ','.join(ti.xcom_pull(task_ids='t1')) }}"
    )

    task1 >> task2