Apache airflow xcom for variable task id

625 Views Asked by At

I am currently working on a project which involves copying data from an S3 stage location to our tables managed by snowflake. However, it only works fine if I only have 1 table to copy data to(as I only need 1 task group and can therefore hard code the group name). When I have 2 or more tables, I run into an issue with the xcom process where I don't know how to get each job to pass the list of columns for its own table to the sql script as a parameter. I first tried passing the task id as a parameter but then nothing was returned by the SQL file. I then tried to hard code the task ID as a constant value but then airflow complains that there are duplicate tasks with the same ID. What I am looking for is a way to use xcom to get the list columns for each table(generated in the get_value step) and then pass it to the sql file as a parameter so the sql file is able to select the list of columns from the table

My python code is as follows

tableList = ["table_a","table_b"]

def test_dag():

 @Task
 def get_value(table: str):
  ######################################################
  #Code which returns the column names within the table#
  ######################################################
  return {
   "id_value": "ID: {}".format(id),
   "date": datetime.today(),
   "columns": cols
  }

for tab in tableList:
 id = "tg_{}".format(tab)
 with TaskGroup(group_id=tg_id) as tg:
  val = **get_value**(ids)
  insertTable = SnowflakeOperator(task_id = "copy_snowflake_{}".format(tab),
   snowflake_conn_id = "conn",
   sql = "insertTableTemplate.sql"
   params={
    ############################################################
    #code to get the xcom and parameters from the **get_value** step
    ############################################################
   }   
  )
  val >> insertTable

My sql script insertTableTemplate.sql

copy into table {{ params.table }}
(
 select {{ params.columns }}
 from stage.path 
 (file_format => '{{params.format}}', pattern => '{{params.pattern}}')
);
1

There are 1 best solutions below

0
On

If the xcom value is formatted the same as what you need to pass into the params you might be able to use this:

from airflow import XComArg
...
...
insertTable = SnowflakeOperator(task_id = "copy_snowflake_{}".format(tab),
   snowflake_conn_id = "conn",
   sql = "insertTableTemplate.sql"
   params=XComArg(<get_value_task_id>, 'return_value')
  )

XcomArg acts as a place holder until the xcom is populated. I use it alot with dynamic task mapping, but i think it will work in this situation too.