I have created an iceberg table as follows:
CREATE TABLE ars.results(
"id" varchar,
"input_md5" varchar,
"output_md5" varchar,
"log" varchar,
"metric" varchar,
"create_time" TIMESTAMP(6) WITH TIME ZONE,
"update_time" TIMESTAMP(6) WITH TIME ZONE,
"workflow_id_id" varchar,
"error_details" varchar,
"error_stage" varchar,
"error_type" varchar
)
WITH ( format = 'PARQUET',partitioning = ARRAY['hour(create_time)'])
Then, I use Flink CDC to sync MySQL data to Iceberg.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.checkpointing_mode import CheckpointingMode
def run():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1).enable_checkpointing(2000, CheckpointingMode.EXACTLY_ONCE)
table_env = StreamTableEnvironment.create(env)
table_env.execute_sql("""
CREATE TABLE src_result (
`id` STRING,
input_md5 STRING,
output_md5 STRING,
log STRING,
metric STRING,
create_time TIMESTAMP(6),
update_time TIMESTAMP(6),
workflow_id_id STRING,
error_details STRING,
error_stage STRING,
error_type STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'x',
'port' = '3306',
'username' = 'x',
'password' = 'x',
'database-name' = 'x',
'table-name' = 'result'
);
""")
table_env.execute_sql("CREATE CATALOG iceberg WITH ("
"'type'='iceberg', "
"'catalog-type'='hive', "
"'uri'='thrift://x',"
"'warehouse'='tos://x',"
"'format-version'='2')")
table_env.execute_sql("""INSERT INTO iceberg.results select * from src_result;""")
Everything looks fine. But when I delete a row in MySQL, I found 2 identical rows in Iceberg. What could possibly go wrong?
Flink version 1.16.2
Flink CDC version flink-sql-connector-mysql-cdc-2.2.1
Database and its version MySQL 5.7
I confirmed my Iceberg table format version is v2, which means it allows update and delete on row level. Update operation has the same issue. After updating a row in MySQL, there are 3 rows in MySQL (2 original, 1 updated).