I'm trying to achieve production-readiness of debezium CDC. One problem I came across is, that after a conenction loss (e.g. restart of source DB), it fails to reconnect:
WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
io.debezium.DebeziumException: Couldn't obtain encoding for database
- Could anyone recommend an out-of-the box solution for this?
- An idea was to create a docker image on top of official debezium, that would add a script periodically calling
GET localhost:8083/connectors/XX/status -> body.tasks[0].state
, and triggerPOST localhost:8083/connectors/XX/restart
, but calling/restart
seems not to be working.
So far, I found a very old thread, which was not very helpful. Has anything changed since? Debezium source task fails to reconnect to postgresql DB when DB container is re-created
Thank you for any idea.
Environment:
- quay.io/debezium/connect:2.5.1.Final
- Postgres 15
- no kubernetes or any other automation mechanism
- connector config:
"name": "xx-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.37.51",
"plugin.name": "pgoutput",
"database.port": "5432",
"database.user": "debezium",
"database.password": "debezium",
"database.dbname": "xx",
"schema.include.list": "xx",
"table.include.list": "xx.x",
"database.server.name": "xx-db3",
"topic.prefix": "debezium.testdb",
"snapshot.mode": "never",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
- connector/status output. BTW, why is reported as RUNNING?
"name": "inventory-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.17.0.5:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "172.17.0.5:8083",
"trace": "io.debezium.DebeziumException: Couldn't obtain encoding for database xx\n\tat io.debezium.connector.postgresql.connection.PostgresConnection.getDatabaseCharset(PostgresConnection.java:577)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:86)\n\tat io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)\n\tat io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.postgresql.util.PSQLException: Connection to 192.168.37.51:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:342)\n\tat org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:54)\n\tat org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:263)\n\tat org.postgresql.Driver.makeConnection(Driver.java:443)\n\tat org.postgresql.Driver.connect(Driver.java:297)\n\tat io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:243)\n\tat io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:129)\n\tat io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:875)\n\tat io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:870)\n\tat io.debezium.connector.postgresql.connection.PostgresConnection.getDatabaseCharset(PostgresConnection.java:574)\n\t... 14 more\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)\n\tat java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.postgresql.core.PGStream.createSocket(PGStream.java:243)\n\tat org.postgresql.core.PGStream.<init>(PGStream.java:98)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:132)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:258)\n\t... 23 more\n"
}
],
"type": "source"
}
Calling
/restart
does not work, but combination of/stop
and/resume
does.Because I find this a bit tricky, question 1 is still relevant.
Here is the script: