When trying to let an AIOKafkaConsumer
start reading messages from a specific offset starting_offset
, how do we know which partition to be used?
I am trying to use the AIOKafkaConsumer.seek
method, but it requires a TopicPartition to be specified in.
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
async def main():
topic = "test"
starting_offset = 3
# Publish some messages
producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
await producer.start()
for i in range(10):
await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))
# Start consuming from a specific offset
consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
await consumer.start()
consumer.seek(None, starting_offset)
while True:
message = await consumer.getone()
print("message:", message.value)
if __name__ == "__main__":
asyncio.run(main())
Does your topic only have one partition? If so, then use
1
... Otherwise, there is no straightforward answer to that.Partitions have individual offset values. You could seek all partitions to the same offset, but this is not guaranteed to exist for all, and you would first need to loop over a range of the partition numbers (see
partitions_for_topic(topic)
) to seek them individually.