I have a tornado
application which needs to run a blocking function on ProcessPoolExecutor
. This blocking function employs a library which emits incremental results via blinker
events. I'd like to collect these events and send them back to my tornado
app as they occur.
At first, tornado
seemed ideal for this use case because its asynchronous. I thought I could simply pass a tornado.queues.Queue
object to the function to be run on the pool and then put()
events onto this queue as part of my blinker
event callback.
However, reading the docs of tornado.queues.Queue
, I learned they are not managed across processes like multiprocessing.Queue
and are not thread safe.
Is there a way to retrieve these events from the pool
as they occur? Should I wrap multiprocessing.Queue
so it produces Futures
? That seems unlikely to work as I doubt the internals of multiprocessing
are compatible with tornado
.
[EDIT] There are some good clues here: https://gist.github.com/hoffrocket/8050711
You can do it more simply than that. Here's a coroutine that submits four slow function calls to subprocesses and awaits them:
It outputs:
You can see that the coroutine receives results in the order they complete, not the order they were submitted: future number 1 resolves after future number 2, because future number 1 slept longer. convert_yielded transforms the Futures returned by ProcessPoolExecutor into Tornado-compatible Futures that can be awaited in a coroutine.
Each future resolves to the value returned by calculate_slowly: in this case it's the same number that was passed into calculate_slowly, and the same number of seconds as calculate_slowly sleeps.
To include this in a RequestHandler, try something like this:
You can observe if you
curl localhost:8888
that the server responds incrementally to the client request.