I am using Debezium to stream data from Postgres to Kafka. I used the same source connector config for several Postgres instances and there is only one instance can not work. It seems that the reason comes from the source database instance, not the connector. Here are some of the info, does anyone have any idea on this:
- The log "Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart" just keeps showing up, and no data can be snapshot or streamed.
- The postgres instance is in the same internal network with debezium instance and no such firewall config is set. So I don't think it's due to the network or connectivity. The wal level of course set up to logical.
- When I start the connector, it create a transaction with ShareLock instead of AccessShareLock like other working instances. I 'm not sure if this cause the problem but this is one of the difference that I spotted.
- Another strange thing is that, for other working instances and connectors, when I remove the connector, the backend_pid working on the relevant replication slot on the Postgres instances also be removed. But for the error Postgres instance, when I remove the connector, the backend_pid working on the replication slot still running, and I have to terminate it by myself.
The connector config as below:
{
"name": "user-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"topic.prefix": "sink-db-user-dev",
"metrics.prefix": "sink-db-user-dev",
"publication.name": "db_user_dev_publication",
"publication.autocreate.mode": "all_tables",
"slot.name": "db_user_dev",
"snapshot.mode":"never",
"database.server.name": "postgres",
"database.hostname": "****",
"database.port": "****",
"database.user": "****",
"database.password": "****",
"database.server.id": "1",
"database.dbname": "****",
"snapshot.lock.timeout.ms":"60000",
"snapshot.include.collection.list": "****",
"table.include.list": "****s",
"schema.include.list": "public",
"heartbeat.interval.ms": "5000",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"max.request.size": "1048576000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"plugin.name": "pgoutput",
"time.precision.mode": "connect",
"max.batch.size": "40960",
"max.queue.size": "163840",
"offset.flush.timeout.ms": "60000",
"offset.flush.interval.ms": "10000",
"snapshot.fetch.size": "51200"
}
}
The transaction created with ShareLock
- I believe it's due to the source database config because it's working fine for other databases, but I just can't figure out the reason