Say that I have a topic my-topic and a group my-group, and that my clients use the confluent-kafka Python package. Consumers are configured with "auto.offset.reset": "earliest" to ensure that all messages are processed at least once. Now say I accidentally add 10,000 malformed messages to my-topic during early development. As a one-time administrative action, I want to seek my-group's offsets to the end of each partition so that I never see those messages again. I don't care if extra messages slip into the topic in the mean time, and I end up skipping more messages than necessary. I also don't care about whether other groups see those malformed messages. Is this kind of "fast-forwarding" possible to do at the group level, maybe with AdminClient? Or am I stuck with shutting down the existing consumers, writing a script to create a consumer in the my-group group, consuming messages and commiting offsets until those messages are gone, closing that consumer, and rebooting my real consuming process?


You can refer to this example of setting the offset directly on the partitions:

For example:

import confluent_kafka


if __name__ == '__main__':
    c = confluent_kafka.Consumer({
        "bootstrap.servers": "eden:9092", 

    def my_assign (consumer, partitions):
        for p in partitions:
            p.offset = NEW_OFFSET
        print('assign', partitions)

    c.subscribe(["my-topic"], on_assign=my_assign)

    while True:
        m = c.poll(1)
        if m is None:

        if m.error() is None:
            print('Received message', m)
