FastAPI protecting shared variable with multiprocessing lock

80 Views Asked by At

My objective is to have a stateful app that is optimized for latency, albeit going against 12-factor pattern. Thus, I need a state manager that can safely handle concurrency/race conditions to a certain extent. Take a look at the example below:

import copy
from fastapi import FastAPI
from multiprocessing import Lock


app = FastAPI()
Statemanager._bootstrap(app)

@app.get("/path1")
async def handler1():  # simple counter init/return api
    states = StateManager.global_instance
    if states.get("some-state") is None:
        states.set("some-state", 0)
    return {"status": states.get("some-state")}

@app.get("/path2")
async def handler2():  # counter increment path
    states = StateManager.global_instance
    counter = states.get("some-state")
    counter += 1
    states.set("some-state", counter)
    return {"status": states.get("some-state")}

class StateManager:
    def __init__(app):
        self._state = app.state  # starlette shared state

    def get(self, key):
        with self._state.StateManagerLock:
            try:
                retval = getattr(self._state, key)
                retval = copy.deepcopy(retval)
            except AttributeError:
                retval = None
        return retval

    def set(self, key: str, value: any):
        self._state.StateManagerLock.acquire(block=True)
        setattr(self._state, key, copy.deepcopy(value))
        self._state.StateManagerLock.release()
        return self

    def _bootstrap(cls, app):
        app.state.StateManagerLock = Lock()
        cls.global_instance = cls(app=app)
        

Am I right to assume that the state.StateManagerLock is the same lock that is used when this script is executed with uvicorn with --workers set to >1? Which is to say that set is atomic while get method is a read lock common to uvicorn worker pool.

0

There are 0 best solutions below