I wrote a aio_pika consumer task that is supposed to run forever in a FastAPI app. This task is part of a manager object which implement a pub/sub pattern:
from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage
class MyManager(object):
def __init__(self):
self.waiter = asyncio.Future()
def publish(self, value):
waiter, self.waiter = self.waiter, asyncio.Future()
waiter.set_result((value, self.waiter))
async def subscribe(self):
waiter = self.waiter
while True:
value, waiter = await waiter
yield value
__aiter__ = subscribe
async def on_message(self, message: AbstractIncomingMessage) -> None:
try:
async with message.process():
# do whatever deserialization of the received item
item = json.loads(message.body)
# share the item with subscribers
self.publish(item)
except Exception as e:
logger.error(e, exc_info=True)
async def run(self):
connection = await connect_robust(
settings.amqp_url,
loop=asyncio.get_running_loop()
)
channel = await connection.channel()
my_queue = await channel.get_queue('my-queue')
await my_queue.consume(self.on_message)
await asyncio.Future()
await connection.close()
This consumer tasks is created as during startup of the FastAPI app:
my_manager = asyncio.Future()
@app.on_event("startup")
async def on_startup():
my_manager.set_result(MyManager())
task = asyncio.create_task((await my_manager).run())
Note that the manager is only instantiated during on_startup to ensure that there is an existing asyncio loop.
Unfortunately, the task stops working after a few weeks/months. I was unable to log what event caused the this. I not sure if the the task crashes or if the connection to the AMQP server dropped without ever reconnecting. I am not even sure how/where to catch/log the issue.
What could possibly be the cause of this issue and how to fix it?
As an additional context, the manager is used in a Server Sent Events route:
@router.get('/items')
async def items_stream(request: Request):
async def event_publisher():
try:
aiter = (await my_manager).__aiter__()
while True:
task = asyncio.create_task(aiter.__anext__())
event = await asyncio.shield(task)
yield dict(data=event)
except asyncio.CancelledError as e:
print(f'Disconnected from client (via refresh/close) {request.client}')
raise e
return EventSourceResponse(event_publisher())
The async iterator is shielded to prevent the issue described here.
While I did not clearly identified what exception was raised, I hardened the connection so the task reconnects in case of error:
This is running continuously for a month without any permanent disconnection.