Multiple Retries Happening in Kafka

467 Views Asked by At

I have used RetryTemplate along with DeadLetterPublishingRecoverer and SeekToCurrentErrorHandler in my Kafka Consumer config but the retries are not stopping and exceeding the limit.

public class KafkaConsumerConfig {

    @Value("${employee_profile_events_topic}")
    private String kafkaTopicName;

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaURL;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Bean
    public ConsumerFactory<String, EmployeeDTO> consumerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURL);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopicName);
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new JsonDeserializer<>(EmployeeDTO.class));
    }

    @Bean(name = "employeeProfileChangeKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, EmployeeDTO> employeeProfileChangeKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, EmployeeDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setStatefulRetry(true);
        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(errorHandler(deadLetterPublishingRecoverer(kafkaTemplate))
        );
        return factory;
    }

    @Bean
    public RetryTemplate retryTemplate() {
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(500);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        return retryTemplate;
    }

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<String, String> kafkaTemplate) {
        return new DeadLetterPublishingRecoverer(kafkaTemplate);
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(1000L, 4))
    }
}

everything else has been configured properly. Even after the given number of retries, it is not publishing the record to DeadLetterPublishingRecoverer.

1

There are 1 best solutions below

1
Aryak Deshpande On

This works for me:

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Value("${error.topic.suffix}")
    private String errorTopicSuffix;

    @Value("${retry.attempts}")
    private int maxAttempts;

    @Value("${retry.interval}")
    private long interval;

    @Autowired
    private ConsumerUtil consumerUtil;

    /**
     * This is a bean that handles the recovery of a message that has been retried
     * and still could not be processed. 
     **/
    public DeadLetterPublishingRecoverer publishingRecoverer() {

        return new DeadLetterPublishingRecoverer(template, (message, exception) -> {

            log.error("Exception still persists after exhaustive retries. Trying to recover failed message.");

            // your logic here

        });

    }

    /**
     * This is a bean that handles errors in consumer. The code in this method gets
     * triggered when an exception occurs in the consumer processing code 
     **/
    public DefaultErrorHandler errorHandler() {

        /** if these exceptions occur in the consumer, record will not be retried **/
        //List<Class<? extends Exception>> exceptionsToIgnore = List.of(JsonMappingException.class);

        /** if these exceptions occur in the consumer, record will be retried **/
        List<Class<? extends Exception>> exceptionsToRetry = List.of(ResourceAccessException.class);

        /** we specify the no. of retries and the interval between 2 retry attempts **/
        var fixedBackOff = new FixedBackOff(interval, maxAttempts);

        /** there is an option of exponential backoffs avaialble too :) **/
        var expBackoff = new ExponentialBackOffWithMaxRetries(maxAttempts);
        expBackoff.setInitialInterval(interval);

        var errorHandler = new DefaultErrorHandler(publishingRecoverer(), fixedBackOff);

        /** this is a listener that monitors each retry attempt **/
        errorHandler.setRetryListeners((consumerRecord, ex, deliveryAttempt) -> {

            log.error("Failed record in retry listener exception. Delivery attempt : {}", deliveryAttempt);

            if (deliveryAttempt > maxAttempts) {
                log.error("RETRIES EXHAUSTED.");
                
                // your logic here
            }
        });

        // exceptionsToIgnore.forEach(errorHandler::addNotRetryableExceptions);
        exceptionsToRetry.forEach(errorHandler::addRetryableExceptions);
        return errorHandler;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);

        /** overriding the commit mode from auto-commit to manual **/
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);

        /** useful for parallel processing, could be equal to partitions **/
        //factory.setConcurrency(3);

        /** attaching an error handler for handling exceptions in consumer **/
        factory.setCommonErrorHandler(errorHandler());
        return factory;
    }

}

Also, the error handler used is compatible with Spring 2.6.x onwards. Try upgrading if the above code does not work and it should work fine.