In my case I am registering consumers on aio-pika:
async def on_message(message: AbstractIncomingMessage) -> None:
async with message.process():
print(f" [x] Received message {message!r}")
print(f" Message body is: {message.body!r}")
async def main() -> None:
# Perform connection
connection = await connect_robust("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(
"task_queue",
durable=True,
)
# Start listening the queue with name 'task_queue'
await queue.consume(on_message)
# Prevent process from exiting
await anyio.sleep_forever()
if __name__ == "__main__":
anyio.run(main)
I can't see any problem, but if feels like a hack and that should have another way, a proper way.