Keep original __TypeId__ header in DLT, retry topic

28 Views Asked by At

everyone I use spring boot: 3.2.1, kafka: 3.1.1

  1. I send message to kafka topic topic-main with header: TypeId = "SomeClass"

  2. Service rises some error

  3. After max-retries count works DeadLetterPublishingRecoverer which sends message to retry-topic: topic-retry

    public TopicPartition getTopicPartitionByRetryPolicy(String retryTopic, String dltTopic, ConsumerRecord consumerRecord) {

        var retries = consumerRecord.headers().lastHeader("retries");
        if (retries == null) {
          retries = new RecordHeader("retries", new byte[] {1});
          consumerRecord.headers().add(retries);
        } else {
          retries.value()[0]++;
        }
        if (retries.value()[0] > backOffProperties.getRetryCount()) {
          log.error("Publishing to DLT topic");
          return new TopicPartition(dltTopic, consumerRecord.partition());
        } else {
          log.error("Publishing to RETRY topic");
          return new TopicPartition(retryTopic, consumerRecord.partition());
        }   }
    

In retry topic I receive event with header: TypeId = "java.lang.String"

During debugging I see this header in array bytes into consumerRecord, however after produce event header value becomes "java.lang.String"

How should I fix this behaviour ? Thanks in advance

0

There are 0 best solutions below