I have a few Iceberg v2 tables defined and a Flink job that reads them in a streaming fashion before transforming to another Iceberg table.
If the source tables are basic, then subscribing to them works great and the SQL query can continuously run.
But if the tables are defined with 'write.upsert.enabled'='true', then the subscribing Flink SQL will read only once, and not react to new snapshots. Even if the SQL definition asks it to monitor intervals and the streaming strategy is any incremental version.
Flink streaming query that normally works:
INSERT INTO iceberg.target_packaging
SELECT
usr.`user_id` AS `user_id`,
usr.`adress` AS `address`,
ord.`item_id` AS `item_id`,
....
FROM
iceberg.source_users /*+ OPTIONS('streaming'='true', 'monitor-interval'='15s') */ usr
JOIN
iceberg.source_orders /*+ OPTIONS('streaming'='true', 'monitor-interval'='15s') */ ord ON usr.`user_id` = ord.`user_id`;
The streaming join works great if the source Iceberg tables are defined like this:
CREATE TABLE iceberg.source_users (
`user_id` STRING,
`adress` STRING,
....
PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2');
Resulting table properties example:
[current-snapshot-id=7980858807056176990,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd]
But the streaming join runs only once, and then stops triggering on new snapshots. It does not finish though, just stops reacting from source and produces no new records.
CREATE TABLE iceberg.source_users (
`user_id` STRING,
`adress` STRING,
....
PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');
Resulting table properties example:
[current-snapshot-id=3566387524956156231,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd,write.upsert.enabled=true]
In my Flink job i simply define the connector and run the SQL join/insert. Both source and target table is already defined.
I also noticed that If I have an SQL Join, it too stops streaming if at least one table has upsert enabled.
Looking at the documentation for both Iceberg and Flink I don't find any indication that enabling upsert should alter the behaviour. Only that Flink monitoring strategy and checkpointing is enabled, which it is. I'm using Flink 1.17.2 and Iceberg 1.4.2
I got an answer from the Iceberg community on Github, and basically this is not supported.
Looking at the Spark documentation, the same applies for Spark Streaming.