I am new to using asyncio in Python and I'm having trouble figuring out how to write a test which uses python-websockets (and asyncio). I want to have a client which connects to a websocket and sends a message once connected
Here is my client class:
import json
import websockets
class Client:
def __init__(self, interval):
self.interval = interval
async def connect(self):
async with websockets.connect("wss://test.com", ping_interval=self.interval) as websocket:
await self._send_message(websocket)
async def _send_message(self, websocket):
message = {"messageId": "499"}
await websocket.send(json.dumps(message))
and here is my test
import json
import unittest
from unittest.mock import AsyncMock, patch
from client import Client
class ConnectionTest(unittest.IsolatedAsyncioTestCase):
async def test_message_is_sent(self):
websocket_client = Client(interval=4)
mock_websocket = AsyncMock()
with patch("client.websockets.connect", return_value=mock_websocket):
await websocket_client.connect()
message = {"messageId": "499"}
mock_websocket.send.assert_awaited_once_with(json.dumps(message))
I'm getting the error:
E AssertionError: Expected send to have been awaited once. Awaited 0 times.
Because this context manager is being replaced with an AsyncMock() I'd assume that I'd be able to see that send was called with await, and then be able to assert on it.
Am I misunderstanding how this is supposed to be used?
Looks like you are patching wrong.
Try to patch
websockets.connectinstead ofclient.websockets.connect: