I've created a program that processes a stream of data coming from a device and writes it to a websocket for consumption by a web app. The library I've written to read from said device and yield its computed values is a synchronous (ergo blocking) library. I don't want to re-write it in async, so I am using the asyncio run_in_executor function to run it in a thread (to be precise: I'm using tornado because the program will also accept web requests).
While the code works, I get frequent errors saying that Future exception was never retrieved. This exception is a Timeout error related to the code for running the blocking function in an executor (error below code block).
Note that I could not properly run the function in the executor without setting the asyncio event policy to tornado.platform.asyncio.AnyThreadEventLoopPolicy(). If I do not do that, I am constantly getting the error that "there is no current event loop in ThreadExecutor", and I've not found a way around it.
class ClientHandler(tornado.websocket.WebSocketHandler):
clients = set()
def __init__(self, *args, **kwargs):
self.config = kwargs.pop("config")
super().__init__(*args, **kwargs)
self.started = False
def on_message(self, message):
if message == "data":
WebClientStreamHandler.clients.add(self)
self.start_client()
def write_data(self, message: str):
for client in WebClientStreamHandler.clients:
client.write_message(message)
def start_client(self):
if self.started:
return
asyncio.set_event_loop_policy(tornado.platform.asyncio.AnyThreadEventLoopPolicy())
sync_func = partial(run_client_forever, self.config)
loop = tornado.ioloop.IOLoop.current()
loop.run_in_executor(None, sync_func)
self.started = True
ERROR (abridged):
ERROR:asyncio:Future exception was never retrieved
future: <Future finished exception=TimeoutError('timed out') created at /usr/lib/python3.11/asyncio/base_events.py:427>
Traceback:
...
File "/path/src/cps_demo/web_stream_handler.py", line 118, in start_client
loop.run_in_executor(None, sync_func)
File "/path/venv/lib/python3.11/site-packages/tornado/platform/asyncio.py", line 266, in run_in_executor
return self.asyncio_loop.run_in_executor(executor, func, *args)
File "/usr/lib/python3.11/asyncio/base_events.py", line 828, in run_in_executor
return futures.wrap_future(
File "/usr/lib/python3.11/asyncio/futures.py", line 417, in wrap_future
new_future = loop.create_future()
File "/usr/lib/python3.11/asyncio/base_events.py", line 427, in create_future
return futures.Future(loop=self)
TimeoutError: timed out
I figured it out. The exception would only be raised when a client disconnected and re-connected.
Basically, the
startedvariable is an instance variable, but since tornado starts a new instance of the handler on every connection,startedwas set toFalse, causing the client to start a second time.Since the device only supports a single TCP connection at a time, this second client eventually timed out, causing exception.
Making the
starteda class variable along withclientssolved the problem.I should probably wrap this function in a Singleton.