Problem : aiokafka consumer starving the fastapi endpoint, due to which our kubernetes liveness probes are failing and any other service calling the exposed endpoints are getting timed out.
Details :
There is a kafka consumer, which starts during fastapi startup event, and keep on listening to the particular topic.
And then there is fastapi endpoint which serves the request.
When there are lot messages in kafka topic partion, kafka consumer starving the eventloop and the requests served by fastapi endpoints are timing out.
How can we solve this problem?
#all the imports
consumer = None
consumer_task = None
log = None
def get_application():
#initialize fastapi app and with different routes and do some stuff
return app
app = get_application()
@app.on_event("startup")
async def startup_event():
#initialize consumer
await initialize()
# start consuming
await consume()
@app.on_event("shutdown")
async def shutdown_event():
#close consumer
async def initialize():
#initilize
# get cluster layout and join group
await consumer.start()
await consumer.seek_to_committed()
async def consume():
global consumer_task
loop = asyncio.get_event_loop()
consumer_task = loop.create_task(send_consumer_message(consumer))
async def send_consumer_message(consumer):
try:
# consume messages
async for msg in consumer:
#do message processing
except Exception as e:
log.info(f"message consuming failed withe error: {repr(e)}")
finally:
log.warning("stopping consumer")
await consumer.stop()