Kafka MongoDB sink Connector exception handling

479 Views Asked by At

I have created a connector from Kafka to MongoDB to sink the data. In some cases, there is a case in which I got the wrong data on my topic. So that topic sink with the DB at that time it will give me a duplicate key issue due to the index which I created.

But in this case, I want to move that data in dlq. But it is not moving the record.

this is my connector can anyone please help me with this.

{
  "name": "test_1",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "test",
    "connection.uri": "xxx",
    "database": "test",
    "collection": "test_record",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://xxx:8081",
    "document.id.strategy.overwrite.existing": "true",
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
    "transforms": "hk",
    "transforms.hk.type": "org.apache.kafka.connect.transforms.HoistField$Key",
    "transforms.hk.field": "_id",
    "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
    "write.method": "upsert",
    "errors.tolerance":"all",
    "errors.deadletterqueue.topic.name":"dlq_sink",
    "errors.deadletterqueue.context.headers.enable":true,
    "errors.retry.delay.max.ms": 60000,
    "errors.retry.timeout": 300000
  }
}

Thanks,

0

There are 0 best solutions below