I am trying to create an SSE server Quart application that listens to notifications from a postgresql instance (and then sends an SSE message to registered EventSources).
Have gotten a simplified version of https://pgjones.gitlab.io/quart/tutorials/broadcast_tutorial.html to work, where
@app.route('/sse')
async def sse():
async def send_events():
while True:
await asyncio.sleep(60)
event = ServerSentEvent('heartbeat')
yield event.encode()
works for more than one client (tab) at the same time (declaring that route the EventSource in the client js.) . The complete example sends messages after a post to a different route though which isn't my goal.
Logically the app should connect once to the postgresql instance and listen for notifications and then those EventSources subscribed to /sse should periodically receive SSE messages.
Read https://www.psycopg.org/psycopg3/docs/advanced/async.html?highlight=async and tried
@app.before_serving
async def initialize():
app.db_conn = await psycopg.AsyncConnection.connect(connectionstring, autocommit=True)
curs = app.db_conn.cursor()
await curs.execute("LISTEN ow_insert_event")
together with
@app.route('/sse')
async def sse():
async def send_events():
while True:
gen = app.db_conn.notifies()
async for notify in gen:
event = ServerSentEvent(notify)
yield event.encode()
which works for one client once if two clients simultaneously; the second keeps listening and the first receives no more messages even if new notifications are received by the app. If there's just one client however it works continually, one notifies() generator and one consumer of said generator.
With two clients two generators are created but only one of them seems to catch notify's, maybe this is intended? I haven't seen anything in the psycopg3 docs about several notifies() async generators for the same connection (several sync generators and blocking for loops make less sense)
If I let before_serving create the async generator with .notifies() so that all clients at /sse use the same generator then the issue is -- correction -- not the same: "RuntimeError: anext(): asynchronous generator is already running" happens then. Further reading suggests it isn't possible to consume the same async generator in parallel: https://twistedmatrix.com/trac/ticket/9805
Would appreciate if could receive highlights of any obvious errors.
If there's only meant to be one notifies() async generator for one db connection with psycopg3 then the /sse route cannot consume this generator directly. (The "async for notify in generator:" doesn't work well in @app.before_serving as it prevents the app from starting)