Python parallelism with concurrent.futures.ProcessPoolExecutor (or multiprocessing.Pool) and AsyncIO

153 Views Asked by At

Currently, I'm working on a project that listens to every API request, perform checks on it and sends it back. Sadly, API managed to overload my single threaded AsyncIO and multi-threading attempts, but CPU never reached even 20% of load.

So my idea is to use multiprocessing and fully load (70-90%) the CPU. Best way (I thought) was to use concurrent.futures.ProcessPoolExecutor or multiprocessing.Pool as I could simply set max_workers and max_tasks_per_child and with power of asyncio and handle as many requests as CPU could possibly handle. But I failed.

My current problem is with process creation inside my custom Queue system. To be exact, I don't exactly know how to create an isolated process that would:

  • spawn an asyncio loop
  • run async function (with 3s timeout) that has asyncio.TaskGroup()
  • run all tasks
  • return data

basically, I want to create a copy of required check functions (Queue system provides them), run them in the isolated process and return data.

Here is the add_queue method:

async def add_queue(self, function: str, id: int | None, **kwargs) -> None:
    functions: tuple | None = self.filter.get(function)
    process_executor: syscon.concurrent.futures.ProcessPoolExecutor = self.CONNECTOR.process_executor

    if not functions:
        self.logger.log(level="DEBUG", message=f"@Queue | No match found for {function}.")
    elif guild_id is None:
        self.logger.log(level="DEBUG", message=f"@Queue | No ID found in {function} request.")
    elif process_executor is None:
        self.logger.log(level="ERROR", message="ProcessPoolExecutor does NOT exist!")
    else:
        try:
            self.logger.log(level="DEBUG", message=f"@Queue | Submitting task to ProcessPoolExecutor.")
            self.process_executor.submit(self.process_event_runner, functions=functions, id=id, **kwargs) 
        except Exception as error:
            self.logger.log(level="ERROR", message=f"@Queue | {type(error).__name__}: {error}")

and the method that gets called:

def process_event_runner(self, functions: tuple, id: int, **inputedargs) -> None:
    self.logger.log(level="DEBUG", message=f"@Queue.process_event_runner | Runner executing.")
    event_loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
    try:
        event_loop.run_until_complete(asyncio.wait_for(async_task_worker(functions=functions, loop=event_loop, id=id, **inputedargs), timeout=3))
    except asyncio.TimeoutError:
        self.logger.log(level="WARNING", message=f"@Queue.process_event_runner | Task timed out. Terminating.")
    except asyncio.CancelledError:
        self.logger.log(level="WARNING", message=f"@Queue.process_event_runner | Task cancelled. Terminating.")
    except ExceptionGroup as groupError:
        self.logger.log(level="ERROR", message=f"@Queue.process_event_runner | Exception Group Error: {', '.join([f'SubE {num}: {exception}' for num, exception in enumerate(groupError.exceptions, 1)])}")
    except Exception as error:
        self.logger.log(level="ERROR", message=f"@Queue.process_event_runner | {type(error).__name__}: {error}")
    
    async def async_task_worker(functions, loop: asyncio.AbstractEventLoop, id: int, **inputedargs):
        asyncio.set_event_loop(loop)     
        
        async with asyncio.TaskGroup() as group:
            tasks: list[asyncio.Task] = [group.create_task(func(**inputedargs)) for func in functions]
        return True

Both of these methods provided, are inside a class named Queue.

After running this, nothing happened. Figured out that .submit() did not execute provided function, but executed main.py which only has the whole system start-up. Queue system is in src.core.transmitters.queue.

How can I create an isolated processes, that creates a copy of the runner function (process_event_runner) and all the other functions that are defined inside a tuple in functions variable and runs them? And still have a way to limit workers and a way to use asyncio? Is it even possible?

For any questions on how the whole system works, feel free to ask in the comments and I will try my best to answer them.

As asked by the Michael in the comments, here is the full traceback of the circular import error that I get when trying to import the specified function (process_event_runner) in main.py before if __name__ == "__main__":

Traceback (most recent call last):
  File "d:\Projects\RedSecurity\main.py", line 1, in <module>
    from src.core.transmitters.queue import process_event_runner
  File "d:\Projects\RedSecurity\src\core\transmitters\queue.py", line 3, in <module>
    import src.sysConnector as syscon
  File "d:\Projects\RedSecurity\src\sysConnector.py", line 9, in <module>
    from src.core.transmitters.queue import Queue
ImportError: cannot import name 'Queue' from partially initialized module 'src.core.transmitters.queue' (most likely due to a circular import) (d:\Projects\RedSecurity\src\core\transmitters\queue.py)

The loader is after the if __main__... rule and start method is set to spawn. Like this:

from src.core.transmitters.queue import process_event_runner

if __name__ == "__main__":
    import discord, sys, aiohttp, os, json, platform, glob, concurrent.futures, multiprocessing
    sys.dont_write_bytecode = True
    from discord.ext import commands
    import src.system.colors as colors
    import src.sysConnector as syscon
    import src.setup as setup
    
    class Bot(commands.AutoShardedBot):
        #enabling all intents and setting up command_prefix
        #initializing bot (intents, shards, prefix..)
        def __init__(self, connector: syscon.SysConnector, setup: setup.Setup) -> None:
            super().__init__(command_prefix="> ", intents=discord.Intents.all(), shard_count=1, help_command=None, max_messages=10000)
            ...
0

There are 0 best solutions below