I'm using spring-kafka
version 1.1.3
to consume messages from a topic. Auto commit is set to true
and max.poll.records
to 10
in the consumer config. session.timeout.ms
is negotiated to 10 seconds with the server.
Upon receiving a message I persist part of it to the database. My database tends to be quite slow sometimes, which results in a session timeout by the kafka listener:
Auto offset commit failed for group mygroup: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Since I can't increase the session timeout on the server and the max.poll.records
is already down at 10, I'd like to be able to wrap my database call in a transaction, which would rollback in case of a kafka session timeout.
Is this possible and how can I accomplish this?
Unfortunately I wasn't able to find a solution in the docs.
Decided to upgrade to Kafka 0.11 since it adds transaction support (see Release Notes).