Which Partition to use when Performing a Offset Seek using AIOKafkaConsumer?

888 Views Asked by At

When trying to let an AIOKafkaConsumer start reading messages from a specific offset starting_offset, how do we know which partition to be used?

I am trying to use the AIOKafkaConsumer.seek method, but it requires a TopicPartition to be specified in.

import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer


async def main():
    topic = "test"
    starting_offset = 3
    
    # Publish some messages
    producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
    await producer.start()
    for i in range(10):
        await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))

    # Start consuming from a specific offset
    consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
    await consumer.start()
    consumer.seek(None, starting_offset)

    while True:
        message = await consumer.getone()
        print("message:", message.value)


if __name__ == "__main__":
    asyncio.run(main())
1

There are 1 best solutions below

8
On

Does your topic only have one partition? If so, then use 1... Otherwise, there is no straightforward answer to that.

Partitions have individual offset values. You could seek all partitions to the same offset, but this is not guaranteed to exist for all, and you would first need to loop over a range of the partition numbers (see partitions_for_topic(topic)) to seek them individually.