I have a simple Python backend using falcon and websockets. If a client makes a call to an endpoint (e.g., to submit data) all other connected clients are notified via their respective websocket connection, i.e., the backend makes a broadcast to all currently connected clients. In general, this works just fine. Here's the minimal script for the falcon app
import falcon
from db.dbmanager import DBManager
from ws.wsserver import WebSocketServer
from api.resources.liveqa import DemoResource
dbm = DBManager() # PostgreSQL connection pool; works fine with multiple workers
wss = WebSocketServer() # Works only with 1 worker
app = falcon.App()
demo_resource = DemoResource(dbm, wss)
app.add_route('/api/v1/demo', demo_resource)
And here is the code for the websockets server which I instantiate and pass the resource class:
import json
import asyncio
import websockets
import threading
class WebSocketServer:
def __init__(self):
self.clients = {}
self.start_server()
async def handler(self, ws, path):
session_id = path.split('/')[-1]
if session_id in self.clients:
self.clients[session_id].add(ws)
else:
self.clients[session_id] = {ws}
try:
async for msg in ws:
pass # The clients are not supposed to send anything
except websockets.ConnectionClosedError:
pass
finally:
self.clients[session_id].remove(ws)
async def send(self, client, msg):
await client.send(msg)
def broadcast(self, session_id, msg):
if session_id not in self.clients:
return
for client in self.clients[session_id]:
try:
asyncio.run(self.send(client, json.dumps(msg)))
except:
pass
def start_server(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(self.handler, host='111.111.111.111', port=5555)
asyncio.get_event_loop().run_until_complete(start_server)
threading.Thread(target=asyncio.get_event_loop().run_forever).start()
I use Gunicorn as server for the backend, and it works if I use just 1 worker. However, if I try --workers 2 I get the error that port 5555 is already in use. I guess this makes sense as each worker is trying to create a WebSocketServer instance using the same ip/port-pair.
What is the best / cleanest / most phytonic way to address this? I assume that I have to ensure that only one WebSocketServer instance is created. But how?
On a side note, I assume that a DBManager instance get created for each worker as well. While it doesn't throw an error as there can be multiple connections pools, I guess ensuring a single instance of DBManager is also the preferred way.
First of all, even running with one worker is potentially problematic, because Gunicorn is primarily a pre-forking server, and forking a process with threads is, in general, unsafe and may lead to unpredictable results.
One way to solve this is to use Gunicorn's server hooks to only start a thread (in this case a WebSocket server) in one of the workers, and only do that after forking. For instance,
This simplistic prototype is not infallible, as we should also handle server reloads, the worker getting killed, etc. Speaking of which, you should probably use another worker type than
syncfor potentially long running requests, or set a long timeout, because otherwise the worker can get killed, taking the WebSocket thread with it. Specifying a number of threads should automatically change your worker type intogthread.Note that here I implemented a custom Gunicorn application, but you could achieve the same effect by specifying hooks via a configuration file.
Another option is to use the ASGI flavour of Falcon, and implement even the WebSocket part inside your app:
Note that Gunicorn itself does not "speak" ASGI, so you would either need to use an ASGI app server, or use Gunicorn as a process manager for Uvicorn workers. For instance, assuming your file is called
test.py, you could run Uvicorn directly as:However, if you went the ASGI route, you would need to implement your responders as coroutine functions (
async def on_get(...)etc), or run your synchronous DB code in a threadpool executor.