How to implement retry and recover logic with Spring Reactive Kafka

1k Views Asked by At

We are using the https://github.com/reactor/reactor-kafka project for implementing Spring Reactive Kafka. But we want to utilize Kafka retry and recover logic with reactive Kafka. Can anyone provide some sample code?

1

There are 1 best solutions below

2
On

Since you are using spring ecosystem for retry and recovery you can use spring-retry looks at there documentation spring -retry. There are enough references available on web.

A sample example below class is consuming messages from kafka topic and processing.

The method consuming is marked Retryable, so in case there is exception processing it will retry and if retry doesn't succeed then the corresponding recovery method will be called.

public class KafkaListener{
  
 
  @KafkaListener(topic="books-topic", id ="group-1")
  @Retryable(maxAttempts = 3, value = Exception.class))
  public void consuming(String message){
   //  To do message processing 
   //  Whenever there is exception thrown from this method
   //   - it will retry 3 times in total
   //   - Even after retry we get exception then it will be handed of to below 
   //     recover method recoverConsuming 
  
   }

   @Recover
   public void recoverConsuming(Exception exception, String message){
     // Recovery logic 
     // you can implement your recovery scenario
    }
  
 }