I try to set up a debezium sink connector for postgreSQL. The "architecture" is: I have a Python script that sends events to the kafka topic (I see them inside the Kafka topic), then I have a debezium Sinc Connector, which should send events to the postgreSQL DB. DB is already created with a dedicated table.

The ERROR I am getting is this:

debezium | 2024-03-11 13:53:44,431 ERROR  ||  WorkerSinkTask{id=location-db-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
debezium | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
debezium |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
debezium |      at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:533)
debezium |      at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
debezium |      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
debezium |      at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
debezium |      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
debezium |      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
debezium |      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
debezium |      at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
debezium |      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
debezium |      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
debezium |      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
debezium |      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
debezium |      at java.base/java.lang.Thread.run(Thread.java:829)
debezium | Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
debezium |      at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:337)
debezium |      at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
debezium |      at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:533)
debezium |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
debezium |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
debezium |      ... 14 more
debezium | 2024-03-11 13:53:44,432 INFO   ||  Closing session.   [io.debezium.connector.jdbc.JdbcChangeEventSink]

This is my event, which I sent from a Python script to Kafka:

{
  "schema": {
    "type": "struct",
    "fields": [
      { "type": "string", "optional": "false", "field": "id" },
      { "type": "string", "optional": "false", "field": "type" },
      { "type": "string", "optional": "false", "field": "event_name" },
      { "type": "string", "optional": "false", "field": "event_date" },
      { "type": "string", "optional": "false", "field": "location" }
    ],
    "payload": {
      "id": "1",
      "type": "EventAdvertisement",
      "event_name": "EventABC",
      "event_date": "20230928",
      "location": "VenueXYZ"
    }
  }
}

Here is the screenshot of Kafka:

enter image description here

enter image description here

Here is my Debezium config file set up:

{
"name": "location-db-connector", 
"config": {

"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", 
"tasks.max": "1", 

"connection.url": "jdbc:postgresql://in_postgres:5432/in_postgres_db", 
"connection.username": "in_postgres_user", 
"connection.password": "in_postgres_password", 

"insert.mode": "upsert",  
"delete.enabled": "true",  
"primary.key.mode": "record_key",  
"schema.evolution": "basic",  
"database.time_zone": "UTC",  
"topics": "dataqualitymeasurement",
"auto.create":"true"
}
}

In the Enternet I found possible solution, to use additional parameters:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",  
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"

Here is the link to the answer: Kafka Connect JDBC Sink Connector giving WorkerSinkTask ERROR

If I use it I have the same error. Can anyone please help me sink my data in MySql. Thanks in advance.

0

There are 0 best solutions below