I am creating an Iceberg table using the following code in an AWS glue job:
df.writeTo(f'glue_catalog.{DATABASE_NAME}.{TABLE_NAME}') \
.using('iceberg') \
.tableProperty("location", TABLE_LOCATION) \
.tableProperty("write.spark.accept-any-schema", "true") \
.tableProperty("format-version", "2") \
.createOrReplace()
The table is created and I can see it in Glue/LF and I can query it in Athena.
I have another job that is trying to upsert data using the following:
df_upsert.createOrReplaceTempView("upsert_items")
upsert_query = f"""
MERGE INTO glue_catalog.{DATABASE_NAME}.{TABLE_NAME} target
USING (SELECT * FROM upsert_items) updates
ON {join_condidtion}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(upsert_query)
The GlueJob fails and says:
AnalysisException: cannot resolve my_column in MERGE command given columns [updates.col1, updates.col2, ...
How can I merge in new data when columns may be missing or columns may be added. I thought Iceberg would handle this by filling in NULL for the missing/new columns because I set "write.spark.accept-any-schema" = true.
Running Spark version 3.3.0-amzn-1
AWS GlueJob v4
Iceberg v1.0.0
According to documentation:
This is not currently doable with
spark.sql("MERGE ...")
.There is an open "feature request" issue to handle this.
One "non optimal" solution, is to detect if a column is found in source while not there yet in target and then do and
ALTER TABLE target ADD COLUMN
before the MERGE statement. ♂️♂️♂️