Flink: binlog transformation to multiple DTO and transformation method in flink

196 Views Asked by At

A newer to Kafka, Flink and Tidb. Assume I have three source MySql tables s_a, s_b, and s_c, and want to collect records to target TiDb table t_a and t_b in a realtime. The mapping rules are

`s_a`  --> `t_a`                   
`s_b` union `s_c`   ---> `t_b`  with some transformation (e.g., field remapping).

The solution I adopted is kafka + Flink with Tidb sink, where binlog changes are subscriped to Kafka topic; Flink consumes the topic and write the transformed result to Tidb. The problem for me in the flink code part are:

  1. how to can I easily restore the json string (that has information of operatin, tables), polled from kafka, to different kinds of DTO operation (e.g, insert/creat t_a or t_b ). I have found a tool called Debezium as Kafka&Flink connector, but it looks like it requires equality between the source table and the target table.

  2. How to write the transformation VKDataMapper if i have multiple target tables. I have difficult in defineing the T as it can be t_a DTO(Data Transfer Object), or t_b DTO.

The existing sample code for me is like:

//The main routine.

   StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
       //consume is FlinkkafkaConsumer. TopicFilter returns true. 
        environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
                .addSink(new TidbSink());

        try {
            environment.execute();
        } catch (Exception e) {
            log.error("exception {}", e);
        }
 
 
 public class VKDataMapper implements MapFunction<String, T> {

    @Override
    public T map(String value) throws Exception {
        //How T can represents both `T_a data DTO` `T_b`...., 
        return null;
    }

}
1

There are 1 best solutions below

2
On

Why not try the Flink SQL? In this way, you only need to create some tables in Flink, and then define your tasks through sql like:

insert into t_a select * from s_a;
insert into t_b select * from s_b union select * from s_c;

See some examples in https://github.com/LittleFall/flink-tidb-rdw, feel free to ask anything which makes you confused.