python aiokafka many consumers to many producers

508 Views Asked by At

I use aiokafka to consume from, filter message fields and produce messages back to kafka. I run 4 async consumers which put messages to async queue. Then single process consumes that queue and produces to async output_queue. Multiple produces consume from async output_queue and send back to kafka.

I wanted to achieve solution so I would have:

MANY consumers >> processor >> MANY producers.

I would like to solve problem with consumers/producers first before I focus on processor.

The problem I experience is that the code produces slowly like 50 messages per second. I have a stream of 100k messages so I must have some bug in the code.

How can I fix it?

import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json

BROKERS = [
    "BROKER0:PORT",
    "BROKER1:PORT",
    "BROKER2:PORT",
]

GROUP_ID = "group_id"
TOPIC_INPUT = "topic_input"
TOPIC_OUTPUT = "topic_output"


async def consume(queue):
    consumer = AIOKafkaConsumer(
        TOPIC_INPUT,
        bootstrap_servers=BROKERS,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        group_id=GROUP_ID,
        auto_offset_reset="latest"
    )
    await consumer.start()
    try:
        async for message in consumer:
            processed_message = {
                "timestamp": message.timestamp,
                "col1": message.value["col1"],
                "col2": message.value["col2"],
                "col3": message.value["col3"],
            }
            await queue.put(processed_message)
    finally:
        await consumer.stop()


async def process_message(message):
    print(message)
    return message


async def process_messages(queue, output_queue):
    while True:
        message = await queue.get()
        processed_message = await process_message(message)
        await output_queue.put(processed_message)
        queue.task_done()


# async def produce(output_queue):
#     producer = AIOKafkaProducer(
#         bootstrap_servers=BROKERS,
#         value_serializer=lambda m: json.dumps(m).encode('utf-8')
#     )
#     await producer.start()
#     try:
#         while True:
#             message = await output_queue.get()
#             print(message)
#             await producer.send_and_wait(TOPIC_OUTPUT, message)
#             output_queue.task_done()
#     finally:
#         await producer.stop()


async def main():
    queue = asyncio.Queue(maxsize=1000000)
    output_queue = asyncio.Queue(maxsize=1000000)

    consumers = [asyncio.create_task(consume(queue)) for i in range(4)]
    # producers = [asyncio.create_task(produce(output_queue)) for i in range(3)]
    process_task = asyncio.create_task(process_messages(queue, output_queue))

    # await asyncio.gather(*consumers, *producers, process_task)
    await asyncio.gather(*consumers, process_task)



if __name__ == '__main__':
    asyncio.run(main())
0

There are 0 best solutions below