I have a set of CPU and IO bound tasks, where each IO bound task needs to wait for a corresponding CPU bound task to finish. The IO bound task (waiting for an API call) are non-blocking but long-running. I don't want the next CPU bound task to wait for a non-blocking IO bound task to finish. Instead I'd like to be able to start work on a new CPU bound task as soon as the previous one is finished.
In addition I'd like to leverage a ProcessPoolExecutor for running a number of CPU bound tasks in parallel, while the IO bound tasks run non-blocking, e.g. in the default loop or in a separate ThreadPoolExecutor.
This code that uses only the default event loop has the expected behaviour, i.e. CPU bound (blocking) tasks run sequentially for 10s and the IO bound tasks (non-blocking) run 'concurrently' for another 2s.
import asyncio
import time
TIME_CPU_BOUND_SECS = 1.0
TIME_IO_BOUND_SECS = 2.0
async def cpu_bound():
time.sleep(TIME_CPU_BOUND_SECS)
return "CPU"
async def io_bound_with_cpu_dependency(task: asyncio.Task):
await task
await asyncio.sleep(TIME_IO_BOUND_SECS)
return "IO"
async def main():
tasks = []
t0 = time.time()
for _ in range(10):
cpu_task = asyncio.create_task(cpu_bound())
io_task = asyncio.create_task(io_bound_with_cpu_dependency(cpu_task))
tasks.append(io_task)
t = time.time() - t0
print(f"Time for scheduling is {t=:.3f}s.")
t0 = time.time()
for task in tasks:
await task
t = time.time() - t0
print(f"Time for execution is {t=:.3f}s.")
if __name__ == "__main__":
asyncio.run(main())
>>> Time for scheduling is t=0.001s.
>>> Time for execution is t=12.035s.
However, I can't manage to use two different executors when IO bound tasks depend (await) the result from their CPU bound task, e.g. below approach fails
async def main():
loop = asyncio.get_running_loop()
cpu_tasks = []
io_tasks = []
with(
concurrent.futures.ProcessPoolExecutor() as process_pool,
concurrent.futures.ThreadPoolExecutor(max_workers=5) as thread_pool
):
t0 = time.time()
for _ in range(10):
cpu_task = await loop.run_in_executor(process_pool, cpu_bound)
io_task = await loop.run_in_executor(thread_pool, io_bound_with_cpu_dependency, cpu_task)
cpu_tasks.append(cpu_task)
io_tasks.append(io_task)
t = time.time() - t0
print(f"Time for scheduling is {t=:.3f}s.")
t0 = time.time()
[await task for task in cpu_tasks]
[await task for task in io_tasks]
t = time.time() - t0
print(f"Time for execution is {t=:.3f}s.")
>>>Traceback (most recent call last):
File "/Users/btschroer/work/git/selection/tmp/concurrency/foo.py", line 90, in <module>
asyncio.run(main())
File "/Users/btschroer/.miniforge3/envs/selection/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Users/btschroer/.miniforge3/envs/selection/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/Users/btschroer/work/git/selection/tmp/concurrency/foo.py", line 70, in main
cpu_task = await loop.run_in_executor(process_pool, cpu_bound)
TypeError: cannot pickle 'coroutine' object
I am understanding that it is not possible to 'send' the 'awaitable' returned from the cpu_pool to the io_pool.
What I don't understand is whether
i) I have to immediately await each call to loop.run_in_exectur or whether I can first schedule to an executor and then await later
ii) Whether it is possible to implement the above within the python asyncio framework at all.
Using a ProcessPoolExecutor with e.g 5 processes and another ThreadPoolExecutor I'd expect the script to be able to finish in ~5s, 1s + 1s for 5 concurrent CPU bound tasks and 5 I/O bound tasks finish after 3s and another 5 after 5s.
What I'd ideally like to achieve is a structure in which I can schedule blocking and non-blocking tasks upfront. Encode the dependencies between tasks when scheduling and then using separate executors to run the tasks concurrently, controlling the number of concurrent CPU bound tasks and the number of available threads for the I/O bound tasks separately.
Any suggestions on how to achieve this are much appreciated. My I/O bound code is long-running and non-blocking async code so attempting this using asyncio seemed natural to me, but I don't have my mind absolutely set on it. If there is another solution I'd be open to model it in a different manner - as long as I can leverage the non-blocking nature of the I/O bound tasks and it doesn't have massive infrastructure implications, e.g. needing to setup and maintain a database, using sophisticated message queue architecture, or containerisation and putting it on a kubernetes cluster.