Test async functions with python-rq

61 Views Asked by At

I am writing tests for a FastAPI app using pytest. I am using pytest_asyncio to test async functions.

def get_queue():
  return Queue()

async def task_runner():
  ...

@app.post('/run-task')
async def run_task(queue: Queue = Depends(get_queue)):
  ...
  queue.enqueue(task_runner)

in the tests file I have the following:

def get_test_queue():
    return Queue(is_async=False)

app.dependency_overrides[get_queue] = get_test_queue

@pytest.mark.asyncio
async def test_run_task(client):
  await client.post('/run-task')
  # Assertions (requires task to be complete)

I set is_async=False as in the docs

When I run the tests I get the following error:

RuntimeError: Cannot run the event loop while another loop is running

Does anyone has faced similar issue or has a work-around for this problem?

I tried to patch the _execute method in the rq.job.Job class but I am not used to asyncio library.

0

There are 0 best solutions below