I am using Apache Airflow with a DAG with 2 tasks.
Task 1 pulls a list of ids using a SELECT
query, and sets the result using xcom_push
.
Task 2 needs to xcom_pull
that list and convert it to a comma-separated string and use that in an IN
clause of an UPDATE
query. I am however unable to parse this list returned by xcom_pull
using
join(map(str, "{{xcom_pull(key='file_ids_to_update'}}"))
Looking for help regarding how do I convert a list returned using xcom_pull
and convert it to a comma separated list of ids
I want to use xcom_pull
and parse its response as a comma:
def get_processed_files(ti):
sql = "select id from files where status='DONE'"
pg_hook = PostgresHook(postgres_conn_id="conn_id")
connection = pg_hook.get_conn()
cursor = connection.cursor()
cursor.execute(sql)
files = cursor.fetchall()
ti.xcom_push(key="file_ids_to_update", value=files)
archive_file = PostgresOperator(task_id="archive_processed_file", postgres_conn_id="upflow",
sql="update files set update_date=now() where id in (%(list_of_ids)s)",
parameters={"list_of_ids": ",".join(map(str, "{{ti.xcom_pull(key='file_ids_to_update')}}"))})
In fact, the method
join
should be inside your jinja template, where you want to apply it on the result ofxcom_pull
during the run_time and not on the string of your jinja template:Here is an example which can help you to debug and easily test the method: