I have some async Python code I want to test, which include something like
async def task(self):
while True:
await self.thing.change()
...
self.run_callbacks()
In my test file, I'm starting the task in the setUp() function like so:
def setUp(self):
...
self.loop = asyncio.get_event_loop()
self.task = self.loop.create_task(self.object_under_test.task())
then wanting to verify that the callbacks ran:
def test_callback(self):
callback = mock.MagicMock()
self.object_under_test.add_callback('cb1', callback)
# ???
callback.assert_called_once()
Where I'm stuck is how to mock thing.change() in the task. It needs to block until I tell it to (at ???) at which point it will return. If I mock it after starting the task, then it won't make any difference as the task is already waiting for the un-mocked function to return. If I mock it prior to starting the task, then it will always return something and not block.
Any suggestions how I might achieve this?
I've worked out how to synchronise using the asyncio.Event object and as side effect on the mock: