I am currently working on a project which involves copying data from an S3 stage location to our tables managed by snowflake. However, it only works fine if I only have 1 table to copy data to(as I only need 1 task group and can therefore hard code the group name). When I have 2 or more tables, I run into an issue with the xcom process where I don't know how to get each job to pass the list of columns for its own table to the sql script as a parameter. I first tried passing the task id as a parameter but then nothing was returned by the SQL file. I then tried to hard code the task ID as a constant value but then airflow complains that there are duplicate tasks with the same ID. What I am looking for is a way to use xcom to get the list columns for each table(generated in the get_value step) and then pass it to the sql file as a parameter so the sql file is able to select the list of columns from the table
My python code is as follows
tableList = ["table_a","table_b"]
def test_dag():
@Task
def get_value(table: str):
######################################################
#Code which returns the column names within the table#
######################################################
return {
"id_value": "ID: {}".format(id),
"date": datetime.today(),
"columns": cols
}
for tab in tableList:
id = "tg_{}".format(tab)
with TaskGroup(group_id=tg_id) as tg:
val = **get_value**(ids)
insertTable = SnowflakeOperator(task_id = "copy_snowflake_{}".format(tab),
snowflake_conn_id = "conn",
sql = "insertTableTemplate.sql"
params={
############################################################
#code to get the xcom and parameters from the **get_value** step
############################################################
}
)
val >> insertTable
My sql script insertTableTemplate.sql
copy into table {{ params.table }}
(
select {{ params.columns }}
from stage.path
(file_format => '{{params.format}}', pattern => '{{params.pattern}}')
);
If the xcom value is formatted the same as what you need to pass into the params you might be able to use this:
XcomArg acts as a place holder until the xcom is populated. I use it alot with dynamic task mapping, but i think it will work in this situation too.