ThreadPoolExecutor asyncio analogue?

55 Views Asked by At

I used to use concurrent.futures.ThreadPoolExecutor in my projects, running my synchronous functions in separate threads with it.

Nowadays I mostly use asynchronous code with asyncio, but I have a problem that I can't find a solution for using asyncio.

Let's say I have an async function

async def do_job(a, b, c, d):
     if a < b:
         raise CustomException("A is smaller than b")
     else:
         return True, "Work done"

(just an example, in fact the function is very extensive, and raising custom exceptions in some cases is a mandatory part of it)

Assume that I need to run 100 instances of a given function with a limit of N instances running simultaneously. I also want to receive results from the threads as they execute without waiting for all tasks to finish (along with the parameters with which the given task was run) and also handle errors and add new instances to the task pool in some cases.

ThreadPoolExecutor allowed me to achieve this goal (not sure about adding new tasks to the pool because I didn't need it then), but I can't find anything in asyncio that meets my needs.

My example using ThreadPoolExecutor

with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:
    futures = {executor.submit(exec_f, a, b): a for a in param_list}

    for future in concurrent.futures.as_completed(futures):
        param_a = futures[future]
        try:
            result = future.result()
            # Process result
        except Exception as ex:
            # Process exception

Tell me, please, maybe I'm missing something, and this can be implemented with asyncio, or I should give up this idea and combine asyncio with concurrent.futures.ThreadPoolExecutor (it seems that this is not very correct).

1

There are 1 best solutions below

0
master_Alish On

You can control max_threads in asyncio using Semaphore. And add new tasks just using create_task().

For example:

import asyncio
from asyncio import Semaphore


async def do_job(a, b):
    if a < b:
        raise Exception("A is smaller than b")
    else:
        return True, "Work done"


async def task_wrapper(semaphore: Semaphore, *args):
    await semaphore.acquire()
    try:
        result = await do_job(*args)
    except Exception as ex:
        pass
    finally:
        semaphore.release()


async def main():
    semaphore = Semaphore(max_threads)
    tasks = []
    for a in param_list:
        task = asyncio.create_task(task_wrapper(semaphore, a))
        tasks.append(task)

    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())