I used to use concurrent.futures.ThreadPoolExecutor in my projects, running my synchronous functions in separate threads with it.
Nowadays I mostly use asynchronous code with asyncio, but I have a problem that I can't find a solution for using asyncio.
Let's say I have an async function
async def do_job(a, b, c, d):
if a < b:
raise CustomException("A is smaller than b")
else:
return True, "Work done"
(just an example, in fact the function is very extensive, and raising custom exceptions in some cases is a mandatory part of it)
Assume that I need to run 100 instances of a given function with a limit of N instances running simultaneously. I also want to receive results from the threads as they execute without waiting for all tasks to finish (along with the parameters with which the given task was run) and also handle errors and add new instances to the task pool in some cases.
ThreadPoolExecutor allowed me to achieve this goal (not sure about adding new tasks to the pool because I didn't need it then), but I can't find anything in asyncio that meets my needs.
My example using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:
futures = {executor.submit(exec_f, a, b): a for a in param_list}
for future in concurrent.futures.as_completed(futures):
param_a = futures[future]
try:
result = future.result()
# Process result
except Exception as ex:
# Process exception
Tell me, please, maybe I'm missing something, and this can be implemented with asyncio, or I should give up this idea and combine asyncio with concurrent.futures.ThreadPoolExecutor (it seems that this is not very correct).
You can control
max_threadsin asyncio usingSemaphore. And add new tasks just usingcreate_task().For example: