I have a debezium MySqlconnector which looks like this:
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.17.0.1",
"database.port": "33060",
"database.user": "root",
"database.password": "123456",
"database.server.id": "3",
"database.serverTimezone": "UTC",
"snapshot.mode": "schema_only",
"poll.interval.ms": "5000",
"topic.prefix": "sdxlife",
"database.exclude.list": "information_schema,mysql,performance_schema,sys",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092,kafka2:9092,kafka3:9092",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"schema.history.internal.kafka.topic": "schemahistory._schemas",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 1,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4"
}
it capture all the change events in multiple databases in the mysql and push to kafka with a topic prefix called "sdxlife". Then I create another JdbcSinkConnector to sync data to another mysql engine:
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://root:[email protected]:6033/",
"connection.username": "root",
"connection.password": "123456",
"delete.enabled": "true",
"auto.create": "true",
"auto.evolve": "true",
"primary.key.mode": "record_key",
"pk.mode":"record_key",
"insert.mode": "upsert",
"schema.evolution": "basic",
"database.time_zone": "UTC",
"topics":"",
"topics.regex": "sdxlife\\\\.(.\*)",
"table.name.format": "${topic}",
"key.converter.schema.registry.url":"http://apicurio-registry:8080/apis/registry/v2",
"value.converter.schema.registry.url": "http://apicurio-registry:8080/apis/registry/v2",
"value.converter.apicurio.registry.url":"http://apicurio-registry:8080/apis/registry/v2",
"key.converter.apicurio.registry.url":"http://apicurio-registry:8080/apis/registry/v2",
"key.converter.apicurio.registry.converter.deserializer":"io.apicurio.registry.utils.serde.AvroKafkaDeserializer",
"key.converter.apicurio.registry.converter.serializer":"io.apicurio.registry.utils.serde.AvroKafkaSerializer",
"value.converter.apicurio.registry.converter.serializer":"io.apicurio.registry.utils.serde.AvroKafkaSerializer",
"value.converter.apicurio.registry.converter.deserializer":"io.apicurio.registry.utils.serde.AvroKafkaDeserializer",
"transforms": "unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState"
}
the sink connector doesn't working due to SQL error.
Caused by: org.hibernate.exception.GenericJDBCException: JDBC exception executing SQL [CREATE TABLE sdxlife_auth_auth_user
...
Caused by: java.sql.SQLException: No database selected
I can see the sink connector has replaced the period(.) with underscore(_) in the topic name . the topic name is
sdxlife.auth.auth_user
how can i make it work?