pytest issues with a session scoped fixture and asyncio

3.4k Views Asked by At

I have multiple test files, each has an async fixture that looks like this:


@pytest.fixture(scope="module")
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()


@pytest.fixture(scope="module")
async def some_fixture():
    return await make_fixture()

I'm using xdist for parallelization. In addition I have this decorator:

@toolz.curry
def throttle(limit, f):
    semaphore = asyncio.Semaphore(limit)

    @functools.wraps(f)
    async def wrapped(*args, **kwargs):
        async with semaphore:
            return await f(*args, **kwargs)

    return wrapped

and I have a function uses it:

@throttle(10)
def f():
    ...

Now f is being called from multiple test files, and I'm getting an exception telling me that I can't use the semaphore from different event loops.

I tried moving to a session-level event loop fixture:



@pytest.fixture(scope="session", autouse=True)
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

But this only gave me:

ScopeMismatch: You tried to access the 'function' scoped fixture 'event_loop' with a 'module' scoped request object, involved factories

Is it even possible to have xdist + async fixture + semaphore working together?

1

There are 1 best solutions below

3
On BEST ANSWER

Eventually got it to work using the following conftest.py:

import asyncio

import pytest


@pytest.fixture(scope="session")
def event_loop():
    return asyncio.get_event_loop()