FastAPI `run_in_threadpool` getting stuck

12k Views Asked by At

I have implemented all my routes using async. And have followed all guidelines from the FastAPI documentation.

Every route has multiple DB calls, which does not have async support, so they are normal function like this

def db_fetch(query):
    # I take a few seconds to respond
    return 

To avoid blocking my event loop I use fastapi.concurrancy.run_in_threadpool

Now the issue is, when a large number of requests come, my new requests are getting blocked. Even if I close the browser tab (cancel request), the entire app gets stuck till the older requests get processed.

What am I doing wrong here?

I use uvicorn as my ASGI server. I run in a kubernetes cluster with 2 replica.

Few suspects: Am I spawning too many threads? Is it some bug within uvicron? Not really sure!

2

There are 2 best solutions below

1
On

It is as you've said an issue with too many threads. Under the hood, fastapi uses starlette which in turn uses anyio's to_thread.run_sync. As described here, too many threads could lead to an issue and you could shield them using a semaphore to set an upper bound on the maximum threads created. In code, that would read roughly like

# Core Library
from typing import TypeVar, Callable
from typing_extensions import ParamSpec
# Third party
from anyio import Semaphore
from starlette.concurrency import run_in_threadpool

# To not have too many threads running (which could happen on too many concurrent
# requests, we limit it with a semaphore.
MAX_CONCURRENT_THREADS = 10
MAX_THREADS_GUARD = Semaphore(MAX_CONCURRENT_THREADS)
T = TypeVar("T")
P = ParamSpec("P")


async def run_async(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
    async with MAX_THREADS_GUARD:
        return await run_in_threadpool(func, *args, **kwargs)
1
On
import threading
import time

from anyio.to_thread import current_default_thread_limiter
from fastapi import FastAPI
from starlette.concurrency import run_in_threadpool

app = FastAPI()
current_default_thread_limiter().total_tokens = 1000


def test():
    print(threading.current_thread().native_id)
    time.sleep(10)
    return 'ok'


@app.get("/")
async def read_root():
    result = await run_in_threadpool(func=test)
    print(result)
    return {"Hello": "World"}