A newer to Kafka, Flink and Tidb.
Assume I have three source MySql tables s_a
, s_b
, and s_c
, and want to collect records to target TiDb table t_a
and t_b
in a realtime.
The mapping rules are
`s_a` --> `t_a`
`s_b` union `s_c` ---> `t_b` with some transformation (e.g., field remapping).
The solution I adopted is kafka + Flink with Tidb sink, where binlog changes are subscriped to Kafka topic; Flink consumes the topic and write the transformed result to Tidb. The problem for me in the flink code part are:
how to can I easily restore the json string (that has information of operatin, tables), polled from kafka, to different kinds of DTO operation (e.g, insert/creat
t_a
ort_b
). I have found a tool calledDebezium
as Kafka&Flink connector, but it looks like it requires equality between the source table and the target table.How to write the transformation
VKDataMapper
if i have multiple target tables. I have difficult in defineing theT
as it can bet_a
DTO(Data Transfer Object), ort_b
DTO.
The existing sample code for me is like:
//The main routine.
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
//consume is FlinkkafkaConsumer. TopicFilter returns true.
environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
.addSink(new TidbSink());
try {
environment.execute();
} catch (Exception e) {
log.error("exception {}", e);
}
public class VKDataMapper implements MapFunction<String, T> {
@Override
public T map(String value) throws Exception {
//How T can represents both `T_a data DTO` `T_b`....,
return null;
}
}
Why not try the Flink SQL? In this way, you only need to create some tables in Flink, and then define your tasks through sql like:
See some examples in https://github.com/LittleFall/flink-tidb-rdw, feel free to ask anything which makes you confused.