Kafka Listener : unable to prevent retry on a particular exception using SimpleRetryPolicy

2.1k Views Asked by At

I have a Kafka listener which can throw a JsonProcessingException and another custom exception (Lets say exception X) . I want kafka listener to retry only when a JsonProcessingException is thrown , and not when exception X is thrown.

To achieve this I passed on a retryTemplate to ConcurrentKafkaListenerContainerFactory. I used a SimpleRetryTemplate to mention Exceptions to be retried, however this does not work.

The listener is retrying on exception X as well.

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> KafkaListenerContainerFactory(){
        LOGGER.debug("Creating ConcurrentKafkaInsertionListner");
        ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        listenerFactory.setConsumerFactory(consumerFactory());
        listenerFactory.setRetryTemplate(kafkaListenerRetryTemplate());
        listenerFactory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer((KafkaOperations<String, String>)kafkaTemplate())));
        return listenerFactory; 
    }

private RetryTemplate kafkaListenerRetryTemplate() {

        LOGGER.debug("Creating KafkaRetry Template");
        RetryTemplate retryTemplate = new RetryTemplate();

      /* here retry policy is used to set the number of attempts to retry and what exceptions you wanted to try and what you don't want to retry.*/
        retryTemplate.setRetryPolicy(getSimpleRetryPolicy());

        return retryTemplate;
    }

 private SimpleRetryPolicy getSimpleRetryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
        LOGGER.debug("Creating Kafka listener retry policy");
        //exceptionMap.put(Exception.class, false);
        //exceptionMap.put(Throwable.class, false);
        exceptionMap.put(JsonProcessingException.class, true);
        exceptionMap.put(ExceptionX.class, false);
        return new SimpleRetryPolicy(3,exceptionMap,true);
    }

I am not sure what i am missing. I tried also setting traverCauses to false, and setting Exception.class and Throwable.class in the exceptionMap to false.

I am using spring boot ver 2.3.4

1

There are 1 best solutions below

0
On

It is no longer necessary to use a RetryTemplate; you can now add a BackOff and addNotRetryableException() to the SeekToCurrentErrorHandler - in fact having both means you have nested retries.

In order to disable retries in the STCEH, use a FixedBackOff with 0 retries. The default BackOff is 9 retries (10 attempts) with no back off.