Set group's offsets to latest with Python's confluent-kafka (one-time admin action)

2.1k Views Asked by At

Say that I have a topic my-topic and a group my-group, and that my clients use the confluent-kafka Python package. Consumers are configured with "auto.offset.reset": "earliest" to ensure that all messages are processed at least once. Now say I accidentally add 10,000 malformed messages to my-topic during early development. As a one-time administrative action, I want to seek my-group's offsets to the end of each partition so that I never see those messages again. I don't care if extra messages slip into the topic in the mean time, and I end up skipping more messages than necessary. I also don't care about whether other groups see those malformed messages. Is this kind of "fast-forwarding" possible to do at the group level, maybe with AdminClient? Or am I stuck with shutting down the existing consumers, writing a script to create a consumer in the my-group group, consuming messages and commiting offsets until those messages are gone, closing that consumer, and rebooting my real consuming process?

1

There are 1 best solutions below

1
On

You can refer to this example of setting the offset directly on the partitions: https://github.com/confluentinc/confluent-kafka-python/issues/145#issuecomment-284843254

For example:

import confluent_kafka

NEW_OFFSET = 666

if __name__ == '__main__':
    c = confluent_kafka.Consumer({
        "bootstrap.servers": "eden:9092", 
        "group.id":"my-group"
    })

    def my_assign (consumer, partitions):
        for p in partitions:
            p.offset = NEW_OFFSET
        print('assign', partitions)
        consumer.assign(partitions)

    c.subscribe(["my-topic"], on_assign=my_assign)

    while True:
        m = c.poll(1)
        if m is None:
            continue

        if m.error() is None:
            print('Received message', m)


    c.close()