debezium sink connector show 'No database selected' error?

39 Views Asked by At

I have a debezium MySqlconnector which looks like this:

    "config": { 
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "tasks.max": "1", 
        "database.hostname": "172.17.0.1", 
        "database.port": "33060", 
        "database.user": "root", 
        "database.password": "123456", 
        "database.server.id": "3", 
        "database.serverTimezone": "UTC", 
        "snapshot.mode": "schema_only", 
        "poll.interval.ms": "5000", 
        "topic.prefix": "sdxlife", 
        "database.exclude.list": "information_schema,mysql,performance_schema,sys", 
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092,kafka2:9092,kafka3:9092", 
        "key.converter.apicurio.registry.auto-register": "true", 
        "key.converter.apicurio.registry.find-latest": "true", 
        "value.converter.apicurio.registry.auto-register": "true", 
        "value.converter.apicurio.registry.find-latest": "true",  
        "schema.history.internal.kafka.topic": "schemahistory._schemas", 
        "topic.creation.default.replication.factor": 3, 
        "topic.creation.default.partitions": 1, 
        "topic.creation.default.cleanup.policy": "compact", 
        "topic.creation.default.compression.type": "lz4" 
    } 

it capture all the change events in multiple databases in the mysql and push to kafka with a topic prefix called "sdxlife". Then I create another JdbcSinkConnector to sync data to another mysql engine:

"config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://root:[email protected]:6033/",
    "connection.username": "root",
    "connection.password": "123456",
    "delete.enabled": "true",
    "auto.create": "true",
    "auto.evolve": "true",
    "primary.key.mode": "record_key",
    "pk.mode":"record_key",
    "insert.mode": "upsert",
    "schema.evolution": "basic",
    "database.time_zone": "UTC",
    "topics":"",
    "topics.regex": "sdxlife\\\\.(.\*)",
    "table.name.format": "${topic}",
    "key.converter.schema.registry.url":"http://apicurio-registry:8080/apis/registry/v2",
    "value.converter.schema.registry.url": "http://apicurio-registry:8080/apis/registry/v2",
    "value.converter.apicurio.registry.url":"http://apicurio-registry:8080/apis/registry/v2",
    "key.converter.apicurio.registry.url":"http://apicurio-registry:8080/apis/registry/v2",
    "key.converter.apicurio.registry.converter.deserializer":"io.apicurio.registry.utils.serde.AvroKafkaDeserializer",
    "key.converter.apicurio.registry.converter.serializer":"io.apicurio.registry.utils.serde.AvroKafkaSerializer",
    "value.converter.apicurio.registry.converter.serializer":"io.apicurio.registry.utils.serde.AvroKafkaSerializer",
    "value.converter.apicurio.registry.converter.deserializer":"io.apicurio.registry.utils.serde.AvroKafkaDeserializer",
    "transforms": "unwrap",
    "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState"
}

the sink connector doesn't working due to SQL error.

Caused by: org.hibernate.exception.GenericJDBCException: JDBC exception executing SQL [CREATE TABLE sdxlife_auth_auth_user 
...
Caused by: java.sql.SQLException: No database selected

I can see the sink connector has replaced the period(.) with underscore(_) in the topic name . the topic name is

sdxlife.auth.auth_user

how can i make it work?

0

There are 0 best solutions below