Iceberg copies instead of deleting when using Flink CDC to sync data from MySQL to Iceberg

107 Views Asked by At

I have created an iceberg table as follows:

CREATE TABLE ars.results(
  "id" varchar, 
  "input_md5" varchar, 
  "output_md5" varchar, 
  "log" varchar, 
  "metric" varchar, 
  "create_time" TIMESTAMP(6) WITH TIME ZONE, 
  "update_time" TIMESTAMP(6) WITH TIME ZONE, 
  "workflow_id_id" varchar, 
  "error_details" varchar, 
  "error_stage" varchar, 
  "error_type" varchar
)
WITH ( format = 'PARQUET',partitioning = ARRAY['hour(create_time)'])

Then, I use Flink CDC to sync MySQL data to Iceberg.

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.checkpointing_mode import CheckpointingMode

def run():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1).enable_checkpointing(2000, CheckpointingMode.EXACTLY_ONCE)
    
    table_env = StreamTableEnvironment.create(env)

    table_env.execute_sql("""
        CREATE TABLE src_result (
            `id` STRING,
            input_md5 STRING,
            output_md5 STRING,
            log STRING,
            metric STRING,
            create_time TIMESTAMP(6),      
            update_time TIMESTAMP(6),
            workflow_id_id STRING,
            error_details STRING,
            error_stage STRING,
            error_type STRING,
            PRIMARY KEY (`id`) NOT ENFORCED
        ) WITH (
            'connector' = 'mysql-cdc',
            'hostname' = 'x',
            'port' = '3306',
            'username' = 'x',
            'password' = 'x',
            'database-name' = 'x',
            'table-name' = 'result'
        );
    """)

    table_env.execute_sql("CREATE CATALOG iceberg WITH ("
                      "'type'='iceberg', "
                      "'catalog-type'='hive', "
                      "'uri'='thrift://x',"
                      "'warehouse'='tos://x',"
                      "'format-version'='2')")
    
    
    table_env.execute_sql("""INSERT INTO iceberg.results select * from src_result;""")

Everything looks fine. But when I delete a row in MySQL, I found 2 identical rows in Iceberg. What could possibly go wrong?

Flink version 1.16.2

Flink CDC version flink-sql-connector-mysql-cdc-2.2.1

Database and its version MySQL 5.7

I confirmed my Iceberg table format version is v2, which means it allows update and delete on row level. Update operation has the same issue. After updating a row in MySQL, there are 3 rows in MySQL (2 original, 1 updated).

0

There are 0 best solutions below