I am a beginner in Kafka.
I have been trying to implement exponential retry for failed records in Kafka consumer. After 4 retries the consumer needs to shut down. Retries should take place after 1 minute then 5 minute then again after 15 minutes and then 30 minutes. After all these attempts, if retry is not successful then I need to shut down consumer.
I have done the following to implment it. But after 5 minutes(max.poll.interval), consoumer re-balances. How to complete all retries attempt(5 attempts in case of failure) and after that shut down the consumer?
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(60000);
backOffPolicy.setMultiplier(5);
backOffPolicy.setMaxInterval(900000);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
private RetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(RecoverableDataAccessException.class, true);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(4, exceptionMap, true);
return simpleRetryPolicy;
}
max.poll.interval
is the upper limit on the time that kafka cluster would wait for a poll call from the consumer before considering it as "dead". Once one of the consumers of a consumer group is considered dead, kafka cluster would trigger a rebalance on that consumer group.In your use case, you want to wait >51mins (retry time + actual processing time) before making a subsequent poll. So you would need to increase this property to a sufficient value (>51mins) so that the kafka cluster does not assume that the consumer has died.