My objective is to have a stateful app that is optimized for latency, albeit going against 12-factor pattern. Thus, I need a state manager that can safely handle concurrency/race conditions to a certain extent. Take a look at the example below:
import copy
from fastapi import FastAPI
from multiprocessing import Lock
app = FastAPI()
Statemanager._bootstrap(app)
@app.get("/path1")
async def handler1(): # simple counter init/return api
states = StateManager.global_instance
if states.get("some-state") is None:
states.set("some-state", 0)
return {"status": states.get("some-state")}
@app.get("/path2")
async def handler2(): # counter increment path
states = StateManager.global_instance
counter = states.get("some-state")
counter += 1
states.set("some-state", counter)
return {"status": states.get("some-state")}
class StateManager:
def __init__(app):
self._state = app.state # starlette shared state
def get(self, key):
with self._state.StateManagerLock:
try:
retval = getattr(self._state, key)
retval = copy.deepcopy(retval)
except AttributeError:
retval = None
return retval
def set(self, key: str, value: any):
self._state.StateManagerLock.acquire(block=True)
setattr(self._state, key, copy.deepcopy(value))
self._state.StateManagerLock.release()
return self
def _bootstrap(cls, app):
app.state.StateManagerLock = Lock()
cls.global_instance = cls(app=app)
Am I right to assume that the state.StateManagerLock
is the same lock that is used when this script is executed with uvicorn with --workers
set to >1? Which is to say that set
is atomic while get
method is a read lock common to uvicorn worker pool.