Kafka Listener rollback transaction on session timeout

1.7k Views Asked by At

I'm using spring-kafka version 1.1.3 to consume messages from a topic. Auto commit is set to true and max.poll.records to 10 in the consumer config. session.timeout.ms is negotiated to 10 seconds with the server.

Upon receiving a message I persist part of it to the database. My database tends to be quite slow sometimes, which results in a session timeout by the kafka listener:

Auto offset commit failed for group mygroup: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Since I can't increase the session timeout on the server and the max.poll.records is already down at 10, I'd like to be able to wrap my database call in a transaction, which would rollback in case of a kafka session timeout.

Is this possible and how can I accomplish this?

Unfortunately I wasn't able to find a solution in the docs.

2

There are 2 best solutions below

0
On BEST ANSWER

Decided to upgrade to Kafka 0.11 since it adds transaction support (see Release Notes).

2
On

You have to consider to upgrade to Spring Kafka 1.2 and Kafka 0.10.x. The old Apache Kafka has a flaw with the heart-beat. So, using autoCommit and slow listener you end up with unexpected rebalancing and you are on your own with such a problem. The version of Spring Kafka you use has a logic like:

// if the container is set to auto-commit, then execute in the
// same thread
// otherwise send to the buffering queue
if (this.autoCommit) {
    invokeListener(records);
}
else {
    if (sendToListener(records)) {
        if (this.assignedPartitions != null) {
            // avoid group management rebalance due to a slow
            // consumer
            this.consumer.pause(this.assignedPartitions);
            this.paused = true;
            this.unsent = records;
        }
    }
}

So, you may consider to switch off the autoCommit and rely on the built-in pause feature which is turned on by default.