Iceberg schema not merging missing columns

581 Views Asked by At

I am creating an Iceberg table using the following code in an AWS glue job:

df.writeTo(f'glue_catalog.{DATABASE_NAME}.{TABLE_NAME}') \
    .using('iceberg') \
    .tableProperty("location", TABLE_LOCATION) \
    .tableProperty("write.spark.accept-any-schema", "true") \
    .tableProperty("format-version", "2") \
    .createOrReplace()

The table is created and I can see it in Glue/LF and I can query it in Athena.

I have another job that is trying to upsert data using the following:

df_upsert.createOrReplaceTempView("upsert_items")
upsert_query = f"""
MERGE INTO glue_catalog.{DATABASE_NAME}.{TABLE_NAME} target
USING (SELECT * FROM upsert_items) updates
ON {join_condidtion}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(upsert_query)

The GlueJob fails and says:

AnalysisException: cannot resolve my_column in MERGE command given columns [updates.col1, updates.col2, ...

How can I merge in new data when columns may be missing or columns may be added. I thought Iceberg would handle this by filling in NULL for the missing/new columns because I set "write.spark.accept-any-schema" = true.

Running Spark version 3.3.0-amzn-1
AWS GlueJob v4
Iceberg v1.0.0

1

There are 1 best solutions below

0
On

According to documentation:

The writer must enable the mergeSchema option.

data.writeTo("prod.db.sample").option("mergeSchema","true").append()

This is not currently doable with spark.sql("MERGE ...").

There is an open "feature request" issue to handle this.

One "non optimal" solution, is to detect if a column is found in source while not there yet in target and then do and ALTER TABLE target ADD COLUMN before the MERGE statement. ‍♂️‍♂️‍♂️