Kafka Recover from Commit Failed Exception

8.1k Views Asked by At

I have an issue where my commit fails because poll() is too long (why this happens I don't know, there was no messages and it was simply read/committing on an empty queue, and my poll-interval is set to hours). Then when it hits read() again it doesn't rebalance for some reason. However this only happens when my code is running on bluemix, locally when I reproduce the exception the next read() causes a rebalance.

What's the proper way to recover from a CommitFailedException? Should I close() and recreate my consumer? Or is calling read() supposed to rebalance and let me continue?

2

There are 2 best solutions below

5
On BEST ANSWER

The commitSync method will automatically retry indefinitely so if you are getting CommitFailedException then it's not a retriable condition and calling commit again is not likely to help. You are getting this exception because your consumer has been kicked out of the consumer group.

If you were using commitAsync to commit offsets then retries are not automatic and you may receive a RetriableCommitFailedException to indicate a potentially transient error for which you can manually retry the commit again. Sounds like this is not what is happening in your case but I include it for completeness of this answer.

Once your consumer is kicked out of the group and you get this CommitFailedException exception you can just keep calling poll() until the rebalancing is complete and you are admitted back into the consumer group (potentially with a new set of partitions than before) and it will continue.

If your application is not tolerant to the condition in which the partitions (and therefore the keys) you are receiving change mid-stream then you should implement a rebalance listener which will be called upon change of partition assignment. See http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html

If you are just trying to work around the fact that offsets expire every 24hours then you will need to call commit at least once per day to keep the offset up to date in addition to calling poll() periodically to stay in the consumer group

1
On

@kyl so I believe with the default kafka-java client, the consumer will heartbeat every 3s and the session timeout is 10s, so your consumer should stay within the group without being taken out and a rebalance occurring. What was the message included with your CommitFailedException? I'm assuming the commit is failing because you've been kicked out.

a few other questions on this:

  1. do you have multiple consumers coming & going, and/or are you intentionally wanting to use consumer groups rather than just a single consumer?

  2. what do you mean by "my poll interval is set to hours"?

  3. what do you mean by "committing on an empty queue" ?

Can you share a snippet of your consumer loop code, as that might help better explain what you're doing