Debezium Kafka Connect for Postgresql receiving unexpected update message received

244 Views Asked by At

I am using debezium kafka connect to stream postgresql database CDC events to kafka cluster. Currently, I am on version 1.9.0 but want to upgrade to version 2.4.0

We are using Kafka outbox pattern to stream database records to Kafka queues.

I am able to upgrade to 2.4.0 but getting a strange warning

 Unexpected update message received Struct{id=1} and ignored   [io.debezium.transforms.outbox.EventRouterDelegate]

Below is my configuration

{
    "value.converter.delegate.converter.type.schemas.enable": "false",
    "database.password": "*****",
    "database.user": "postgres",
    "publication.name": "dbz_outbox_pub",
    "slot.name": "debezium2",
    "topic.creation.default.cleanup.policy": "compact",
    "heartbeat.topic.prefix": "__debezium-heartbeat",
    "tasks.max": "1",
    "transforms": "outbox",
    "topic.creation.default.partitions": "1",
    "heartbeat.interval.ms": "60000",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "transforms.outbox.table.field.event.key": "aggregateid",
    "publication.autocreate.mode": "filtered",
    "topic.creation.default.replication.factor": "1",
    "heartbeat.action.query": "CREATE TABLE IF NOT EXISTS debezium_heartbeat (id SERIAL PRIMARY KEY, ts TIMESTAMP WITH TIME ZONE);\nINSERT INTO debezium_heartbeat (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts;",
    "plugin.name": "pgoutput",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "table.include.list": "public.kafkaoutbox,public.debezium_heartbeat",
    "topic.creation.default.compression.type": "uncompressed",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "database.dbname": "superipdev_cellb",
    "value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter",
    "database.port": "5432",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "${routedByValue}",
    "tombstones.on.delete": "false",
    "key.converter.delegate.converter.type.schemas.enable": "false",
    "event.processing.failure.handling.mode": "warn",
    "schema.include.list": "",
    "snapshot.mode": "never",
    "database.hostname": "host.docker.internal",
    "topic.prefix": "cell_b",
    "database.server.name": "cell_b",
    "name": "debezium2"
}

has anyone come across this before? Any help would be highly appreciated.

1

There are 1 best solutions below

2
On

Seems that you are doing an update on the outbox table. The SMT EventRouterDelegate on Debezium will ignore all operations except INSERT as you are not supposed to update the records from the outbox table. Code for this is here