I'm running a large number of CPU-bound computations in separate processes and want to perform some callback as each job finishes. To do this I'm combining asyncio
with a concurrent.futures.ProcessPoolExecutor
, and I'm setting a callback on each Future
via add_done_callback
:
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
import time
import random
def run_job(job):
length = random.randrange(1, 10)
time.sleep(length)
return f"slept for {length} s"
def on_complete(future, job):
print(f"Finished computing job {job}: {future.result()}")
async def run_jobs(executor, jobs):
"""Run computations in separate processes and update results asynchronously."""
loop = asyncio.get_event_loop()
tasks = []
print(f"Scheduling {len(jobs)} jobs")
for job in jobs:
future = loop.run_in_executor(
executor,
run_job,
job,
)
future.add_done_callback(
partial(on_complete, job=job)
)
tasks.append(future)
await asyncio.wait(tasks)
if __name__ == "__main__":
jobs = [str(jobnum) for jobnum in range(1, 10)]
with ProcessPoolExecutor() as executor:
asyncio.run(run_jobs(executor, jobs))
This works well, except that if the computation is interrupted e.g. by a KeyboardInterrupt
then stderr
gets spammed with messages from each subprocess saying RuntimeError: Event loop is closed
. This answer to this question suggests to use the Python 3.7+ asyncio.run
function since it handles clean-up after interrupts, but this appears not to work for jobs running in separate processes.
The traceback from one of the RuntimeError
s emitted after a KeyboardInterrupt
points to a possible cause:
exception calling callback for <Future at 0x7fd4f679ea60 state=finished returned str>
Traceback (most recent call last):
File "/path/to/lib/python3.8/concurrent/futures/_base.py", line 328, in _invoke_callbacks
callback(self)
...other stuff...
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
My guess is that when asyncio.run
cancels the running tasks while handling the interrupt, the callbacks get called within each Future
's separate process, but asyncio.run
closes the event loop before these callbacks run since it isn't blocked by the callback execution.
EDIT: I commented out the add_done_callback
block while the futures get created, and this doesn't change the outcome - suggesting the problem is nothing to do with the callbacks after all, and instead the same problem as seen here.