I have used RetryTemplate along with DeadLetterPublishingRecoverer and SeekToCurrentErrorHandler in my Kafka Consumer config but the retries are not stopping and exceeding the limit.
public class KafkaConsumerConfig {
@Value("${employee_profile_events_topic}")
private String kafkaTopicName;
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaURL;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Bean
public ConsumerFactory<String, EmployeeDTO> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURL);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaTopicName);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new JsonDeserializer<>(EmployeeDTO.class));
}
@Bean(name = "employeeProfileChangeKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, EmployeeDTO> employeeProfileChangeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EmployeeDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setErrorHandler(errorHandler(deadLetterPublishingRecoverer(kafkaTemplate))
);
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(500);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
return retryTemplate;
}
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<String, String> kafkaTemplate) {
return new DeadLetterPublishingRecoverer(kafkaTemplate);
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(1000L, 4))
}
}
everything else has been configured properly. Even after the given number of retries, it is not publishing the record to DeadLetterPublishingRecoverer.
This works for me:
Also, the error handler used is compatible with Spring 2.6.x onwards. Try upgrading if the above code does not work and it should work fine.