Aiokafka consumer process events in parallel

531 Views Asked by At

I'm just getting started with kafka, I have a k8s cluster in which I want to deploy event listeners. When I have one listener running, everything works fine, but with several pods they process events in parallel, I would like the event to be processed only once. How can I achieve this?

My listener code:

import asyncio, settings, json
from aiokafka import AIOKafkaConsumer


event_handler = {
    "table_create": table_create_event,
    "table_delete": table_delete_event,
}

consumer_config = [
    {
        "name": "main consumer1",
        "topics": ["storage_create", "storage_update", "storage_delete",
                   "table_create", "table_update", "table_delete",
                   "field_create", "field_update", "field_delete",
                   "value_create", "value_update", "value_delete"],
        "group_id": "cms_events"
    },
    {
        "name": "main consume2r",
        "topics": ["storage_create", "storage_update", "storage_delete",
                   "table_create", "table_update", "table_delete",
                   "field_create", "field_update", "field_delete",
                   "value_create", "value_update", "value_delete"],
        "group_id": "cms_events"
    }
]


async def consume(topics, group_id):
    consumer = AIOKafkaConsumer(
        *topics,
        bootstrap_servers='localhost:9092',
        group_id=group_id,
        auto_offset_reset="earliest",
        metadata_max_age_ms=30000,
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(
                "{}:{:d}:{:d}: key={} value={} timestamp_ms={}".format(
                    msg.topic, msg.partition, msg.offset, msg.key, msg.value,
                    msg.timestamp)
            )
            topic = msg.topic
            encode_event_body = msg.value
            decode_event_body = json.loads(encode_event_body)
            try:
                await event_handler[topic](decode_event_body)
            except Exception as exc:
                print(exc)
    finally:
        await consumer.stop()


async def main():
    await asyncio.gather(*[
            consume(topics=consumer.get("topics"), group_id=consumer.get("group_id"))
            for consumer in consumer_config
        ]
    )


if __name__ == "__main__":
    asyncio.run(main())
1

There are 1 best solutions below

0
On

For events to be processed once, the different consumers should be in the same consumer group, i.e. have the same group_id.

As you can understand from the above statement, the number of consumer in a given consumer group should be <= of the number of partitions in the consumed topic.