I have a Kafka listener which can throw a JsonProcessingException and another custom exception (Lets say exception X) . I want kafka listener to retry only when a JsonProcessingException is thrown , and not when exception X is thrown.
To achieve this I passed on a retryTemplate to ConcurrentKafkaListenerContainerFactory. I used a SimpleRetryTemplate to mention Exceptions to be retried, however this does not work.
The listener is retrying on exception X as well.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> KafkaListenerContainerFactory(){
LOGGER.debug("Creating ConcurrentKafkaInsertionListner");
ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory = new ConcurrentKafkaListenerContainerFactory<>();
listenerFactory.setConsumerFactory(consumerFactory());
listenerFactory.setRetryTemplate(kafkaListenerRetryTemplate());
listenerFactory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer((KafkaOperations<String, String>)kafkaTemplate())));
return listenerFactory;
}
private RetryTemplate kafkaListenerRetryTemplate() {
LOGGER.debug("Creating KafkaRetry Template");
RetryTemplate retryTemplate = new RetryTemplate();
/* here retry policy is used to set the number of attempts to retry and what exceptions you wanted to try and what you don't want to retry.*/
retryTemplate.setRetryPolicy(getSimpleRetryPolicy());
return retryTemplate;
}
private SimpleRetryPolicy getSimpleRetryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
LOGGER.debug("Creating Kafka listener retry policy");
//exceptionMap.put(Exception.class, false);
//exceptionMap.put(Throwable.class, false);
exceptionMap.put(JsonProcessingException.class, true);
exceptionMap.put(ExceptionX.class, false);
return new SimpleRetryPolicy(3,exceptionMap,true);
}
I am not sure what i am missing. I tried also setting traverCauses to false, and setting Exception.class and Throwable.class in the exceptionMap to false.
I am using spring boot ver 2.3.4
It is no longer necessary to use a
RetryTemplate
; you can now add aBackOff
andaddNotRetryableException()
to theSeekToCurrentErrorHandler
- in fact having both means you have nested retries.In order to disable retries in the STCEH, use a
FixedBackOff
with 0 retries. The defaultBackOff
is 9 retries (10 attempts) with no back off.