Duplicate message coming to @Dlthandler multiple times even after acknowledging msg

27 Views Asked by At

I am trying to test @RetryableTopic feature of Kafka where after retrying the message 3 times , I would like to push it to Kafka SQS. I am getting call from retry thread 4 times and call to DltHandler 2 times .As far as I know,in Spring Retry, the @DltHandler annotation is used to handle messages that have failed after all retry attempts have been exhausted and have been moved to the Dead Letter Queue (DLQ) so I am expecting that call to processMessage() only once. I am tracking call to consumer() method with thread name to identify which thread is calling this method. Not sure which part I am missing ?

@Bean(ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY)
public KafkaTemplate<String, String> kafkaTemplateForDlt() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public RetryTopicConfiguration myRetryTopic(@Qualifier(ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY)KafkaTemplate<String, String> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}
}

      
@Slf4j
@Component
public class ProductEventConsumer {
@Autowired
private ProductServiceImpl productServiceImpl;

@Autowired ObjectMapper objectMapper;

@Value("${aws.sqsDLQ}")
private String productdlq;

@Autowired
private SqsTemplate sqsTemplate;

@RetryableTopic(
          backoff = @Backoff(delayExpression = "10000", multiplierExpression = "0"), 
          attempts = "3", 
          kafkaTemplate = ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY,
          include = {SocketTimeoutException.class,ArithmeticException.class})
@KafkaListener(id=ProductServiceConstants.PRODUCT_KAFKA_CONSUMER_ID, idIsGroup=false,
        topics="#{'${spring.kafka.product-topic}'}",containerFactory=ProductServiceConstants.PRODUCT_KAFKA_CONSUMER_FACTORY)
public void consumer(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack) {
    try{
        log.info("START:Received request via kafka:{} thread:{}",consumerRecord.value()
                ,Thread.currentThread().getName());
        int result = 10 / 0;
        ack.acknowledge();
    }catch( JsonProcessingException e) {
        log.error("END:Exception occured while saving item:{}",e.getMessage());
    }
}

@DltHandler
public void processMessage(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack) {
    try{
        log.error("START:Pushing message to SQS DLQ:{}",consumerRecord.key());
        sqsTemplate.send(sqsSendOptions -> sqsSendOptions.queue(productdlq).payload(consumerRecord.value()));
    }catch(Exception e) {
        log.error("END:Failure while pushing msg to sqs dlq:{} key:{}",e.getMessage(),consumerRecord.key());
    }
    finally {
        ack.acknowledge();
    }
}    

}
1

There are 1 best solutions below

0
Lorenzo Von Matterhorn On

Please have a look at this example. I believe that branch so-78201070 does what you are looking for:

return RetryTopicConfigurationBuilder.newInstance()
        .retryTopicSuffix("-" + appName + "-retry")
        .maxAttempts(3) // original processing + 2x retry
        .fixedBackOff(1000L) // 1000ms delay between retries
        .suffixTopicsWithIndexValues()
        .dltSuffix("-" + appName + ".dlt")
        .includeTopic(topicToInclude)
        .dltHandlerMethod("eventListener", "processDltEvent")
        .create(template);

The test retryAndDlt sends an event that gets retried twice and is then sent to the DLQ/DLT.

Hope it helps.