How to replicate all changes from source to destination db using debezium and confluent-sink-connector running on docker

617 Views Asked by At

The below code is my Dockerfile for Kafka-connect-JDBC and MySQL-driver

FROM debezium/connect:1.3
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV MYSQL_DRIVER_VERSION 8.0.20
ARG KAFKA_JDBC_VERSION=5.5.0
RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
    | tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-8.0.20/mysql-connector-java-${MYSQL_DRIVER_VERSION}.jar
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
    curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
docker build . --tag kafka kafka-connect-sink 

Below is my source db json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.inventory"
    }
}'

Below is my destination db sink json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
    "name": "inventory-connector-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
        "connection.user": "pavan",
        "connection.password": "root",
        "topics": "dbserver1.inventory.customers",
        "table.name.format": "pk.customers",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_key",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite"
    }
}'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
    "name": "inventory-connector-sink-addresses",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
        "connection.user": "pavan",
        "connection.password": "root",
        "topics": "dbserver1.inventory.addresses",
        "table.name.format": "pk.addresses",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_key",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite"
    }
}'

with this configuration i need to subscribe to each topic but problem is i had 100+ tables to get replicate in destination db is there anyway i can do it in single json configuration so that i can subscribe to all topics.

1

There are 1 best solutions below

2
On BEST ANSWER

You can use topics (or topics.regex) property to define the list of topics to consume and table.name.format property of JBDC Sink connector or RegexRouter SMT (or combine them) to override destination table names:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
    "name": "inventory-connector-sink-addresses",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
        "connection.user": "pavan",
        "connection.password": "root",
        "topics": "dbserver1.inventory.addresses,dbserver1.inventory.customers",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "true",
        "insert.mode": "upsert",
        "pk.fields": "",
        "pk.mode": "record_key",
        "transforms": "route,unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite",

        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "pk.$3"
    }
}'