python fastapi, aiokafka starvation problem

631 Views Asked by At

Problem : aiokafka consumer starving the fastapi endpoint, due to which our kubernetes liveness probes are failing and any other service calling the exposed endpoints are getting timed out.

Details :

There is a kafka consumer, which starts during fastapi startup event, and keep on listening to the particular topic.

And then there is fastapi endpoint which serves the request.

When there are lot messages in kafka topic partion, kafka consumer starving the eventloop and the requests served by fastapi endpoints are timing out.

How can we solve this problem?

#all the imports



consumer = None
consumer_task = None

log = None

def get_application():
    #initialize fastapi app and with different routes and do some stuff
    return app

app = get_application()

@app.on_event("startup")
async def startup_event():
    #initialize consumer
    await initialize()
    # start consuming
    await consume()

@app.on_event("shutdown")
async def shutdown_event():
    #close consumer

async def initialize():
    #initilize
    # get cluster layout and join group
    await consumer.start()
    await consumer.seek_to_committed()

async def consume():
    global consumer_task
    loop = asyncio.get_event_loop()
    consumer_task = loop.create_task(send_consumer_message(consumer))


async def send_consumer_message(consumer):
    try:
        # consume messages
        async for msg in consumer:
            #do message processing
    except Exception as e:
        log.info(f"message consuming failed withe error: {repr(e)}")
    finally:
        log.warning("stopping consumer")
        await consumer.stop()
0

There are 0 best solutions below