Does calling seek on a consumer in a group set the offset for the whole group

1.8k Views Asked by At

I have 3 consumers in the same group , i call seek() to set the offset on one partition on one consumer. Will that set the offset for all consumers in the group or just the offset on that particular partition.

I want to be able to reset the group to start over with all records in the log. But i will only call seek on one consumer.

2

There are 2 best solutions below

0
On

Having 3 consumers in the same group means that a partition is assigned to only one of these consumers so when you call seek, it has an effect only for that partition on which there is only one consumer so the others aren't affected.

2
On

seek() will rewind to the specified offset in that partition only. However,for old high level consumer api,there was no way to do it. But in call to method

public void seek(TopicPartition partition,
    long offset);

It will rewind to offset in partition and topic of provided TopicPartition.

If you want to reset the group consumption,then you have to make seek calls TopicPartition with the offset for each partition.

Or you can use offsetsForTimes and call for group partitions as below:

Map<TopicPartition, Long> query = new HashMap<>();
query.put(new TopicPartition("topic-name", 0),Instant.now().minus(10, MINUTES).toEpochMilli());
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
result.entrySet().stream().forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));

If you are using the older high level consumer, currently there is no api to reset the offsets in the consumer. The only way is to stop all consumers and reset the offsets for that consumer group in ZK manually.

However, in 0.11.0 release a tool was added to reset offsets for groups with different scopes like topics,partitions,etc. You can find the details here : https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling . and https://issues.apache.org/jira/browse/KAFKA-4743