I have code which uses Python requests to kick off a task which runs in a worker that is started with rq. (Actually, the GET request results in one task which itself starts a second task. But this complexity shouldn't affect things, so I've left that out of the code below.) I already have a test which uses rq's SimpleWorker class to cause the code to run synchronously. This works fine. But now I'm adding requests_ratelimiter to the second task, and I want to be sure it's behaving correctly. I think I need to somehow mock the time.sleep()
function used by the rate limiter, and I can't figure out how to patch it.
routes.py
@app.route("/do_work/", methods=["POST"])
def do_work():
rq_job = my_queue.enqueue(f"my_app.worker.do_work", job_timeout=3600, *args, **kwargs)
worker.py
from requests_ratelimiter import LimiterSession
@job('my_queue', connection=redis_conn, timeout=3600, result_ttl=24 * 60 * 60)
def do_work():
session = LimiterSession(per_second=1)
r = session.get(WORK_URL)
test.py
import requests_mock
def test_get(client):
# call the Flask function to kick off the task
client.get("/do_work/")
with requests_mock.Mocker() as m:
# mock the return value of the requests.get() call in the worker
response_success = {"result": "All good"}
m.get(WORK_URL, json=response_success)
worker = SimpleWorker([my_queue], connection=redis_conn)
worker.work(burst=True) # Work until the queue is empty
A test in requests_ratelimiter
patches the sleep function using a target path of 'pyrate_limiter.limit_context_decorator.sleep'
, but that doesn't work for me because I'm not declaring pyrate_limiter
at all. I've tried mocking the time
function and then passing that into the LimiterSession, and that sort of works:
worker.py
from requests_ratelimiter import LimiterSession
from time import time
@job('my_queue', connection=redis_conn, timeout=3600, result_ttl=24 * 60 * 60)
def do_work():
session = LimiterSession(per_second=1, time_function=time)
r = session.get(WORK_URL)
test.py
import requests_mock
def test_get(client):
# call the Flask function to kick off the task
client.get("/do_work/")
with patch("my_app.worker.time", return_value=None) as mock_time:
with requests_mock.Mocker() as m:
response_success = {"result": "All good"}
m.get(URL, json=response_success)
worker = SimpleWorker([my_queue], connection=redis_conn)
worker.work(burst=True) # Work until the queue is empty
assert mock_time.call_count == 1
However, then I see time
called many more times than sleep
would be, so I don't get the info I need from it. And patching my_app.worker.time.sleep
results in the error:
AttributeError: does not have the attribute 'sleep'
I have also tried patching the pyrate_limiter
as the requests_ratelimiter
testing code does:
with patch(
"my_app.worker.requests_ratelimiter.pyrate_limiter.limit_context_decorator.sleep", return_value=None
) as mock_sleep:
But this fails with:
ModuleNotFoundError: No module named 'my_app.worker.requests_ratelimiter'; 'my_app.worker' is not a package
How can I test and make sure the rate limiter is engaging properly?
The solution was indeed to use
'pyrate_limiter.limit_context_decorator.sleep'
, despite the fact that I wasn't importing it.When I did that and made the mock return
None
, I discovered thatsleep()
was being called tens of thousands of times because it's in awhile
loop.So in the end, I also needed to use
freezegun
and a side effect on mymock_sleep
to get the behavior I wanted. Now time is frozen, butsleep()
jumps the test clock forward synchronously and instantly by the amount of seconds passed as an argument.