The below code is my Dockerfile for Kafka-connect-JDBC and MySQL-driver
FROM debezium/connect:1.3
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV MYSQL_DRIVER_VERSION 8.0.20
ARG KAFKA_JDBC_VERSION=5.5.0
RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
| tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-8.0.20/mysql-connector-java-${MYSQL_DRIVER_VERSION}.jar
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
docker build . --tag kafka kafka-connect-sink
Below is my source db json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}'
Below is my destination db sink json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
"name": "inventory-connector-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
"connection.user": "pavan",
"connection.password": "root",
"topics": "dbserver1.inventory.customers",
"table.name.format": "pk.customers",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '{
"name": "inventory-connector-sink-addresses",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
"connection.user": "pavan",
"connection.password": "root",
"topics": "dbserver1.inventory.addresses",
"table.name.format": "pk.addresses",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'
with this configuration i need to subscribe to each topic but problem is i had 100+ tables to get replicate in destination db is there anyway i can do it in single json configuration so that i can subscribe to all topics.
You can use
topics(ortopics.regex) property to define the list of topics to consume andtable.name.formatproperty of JBDC Sink connector orRegexRouterSMT (or combine them) to override destination table names: