I use aiokafka to consume from, filter message fields and produce messages back to kafka. I run 4 async consumers which put messages to async queue. Then single process consumes that queue and produces to async output_queue. Multiple produces consume from async output_queue and send back to kafka.
I wanted to achieve solution so I would have:
MANY consumers >> processor >> MANY producers.
I would like to solve problem with consumers/producers first before I focus on processor.
The problem I experience is that the code produces slowly like 50 messages per second. I have a stream of 100k messages so I must have some bug in the code.
How can I fix it?
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
BROKERS = [
"BROKER0:PORT",
"BROKER1:PORT",
"BROKER2:PORT",
]
GROUP_ID = "group_id"
TOPIC_INPUT = "topic_input"
TOPIC_OUTPUT = "topic_output"
async def consume(queue):
consumer = AIOKafkaConsumer(
TOPIC_INPUT,
bootstrap_servers=BROKERS,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id=GROUP_ID,
auto_offset_reset="latest"
)
await consumer.start()
try:
async for message in consumer:
processed_message = {
"timestamp": message.timestamp,
"col1": message.value["col1"],
"col2": message.value["col2"],
"col3": message.value["col3"],
}
await queue.put(processed_message)
finally:
await consumer.stop()
async def process_message(message):
print(message)
return message
async def process_messages(queue, output_queue):
while True:
message = await queue.get()
processed_message = await process_message(message)
await output_queue.put(processed_message)
queue.task_done()
# async def produce(output_queue):
# producer = AIOKafkaProducer(
# bootstrap_servers=BROKERS,
# value_serializer=lambda m: json.dumps(m).encode('utf-8')
# )
# await producer.start()
# try:
# while True:
# message = await output_queue.get()
# print(message)
# await producer.send_and_wait(TOPIC_OUTPUT, message)
# output_queue.task_done()
# finally:
# await producer.stop()
async def main():
queue = asyncio.Queue(maxsize=1000000)
output_queue = asyncio.Queue(maxsize=1000000)
consumers = [asyncio.create_task(consume(queue)) for i in range(4)]
# producers = [asyncio.create_task(produce(output_queue)) for i in range(3)]
process_task = asyncio.create_task(process_messages(queue, output_queue))
# await asyncio.gather(*consumers, *producers, process_task)
await asyncio.gather(*consumers, process_task)
if __name__ == '__main__':
asyncio.run(main())