Currently, I'm working on a project that listens to every API request, perform checks on it and sends it back. Sadly, API managed to overload my single threaded AsyncIO and multi-threading attempts, but CPU never reached even 20% of load.
So my idea is to use multiprocessing and fully load (70-90%) the CPU. Best way (I thought) was to use concurrent.futures.ProcessPoolExecutor
or multiprocessing.Pool
as I could simply set max_workers
and max_tasks_per_child
and with power of asyncio
and handle as many requests as CPU could possibly handle. But I failed.
My current problem is with process creation inside my custom Queue system. To be exact, I don't exactly know how to create an isolated process that would:
- spawn an
asyncio
loop - run async function (with 3s timeout) that has
asyncio.TaskGroup()
- run all tasks
- return data
basically, I want to create a copy of required check functions (Queue system provides them), run them in the isolated process and return data.
Here is the add_queue
method:
async def add_queue(self, function: str, id: int | None, **kwargs) -> None:
functions: tuple | None = self.filter.get(function)
process_executor: syscon.concurrent.futures.ProcessPoolExecutor = self.CONNECTOR.process_executor
if not functions:
self.logger.log(level="DEBUG", message=f"@Queue | No match found for {function}.")
elif guild_id is None:
self.logger.log(level="DEBUG", message=f"@Queue | No ID found in {function} request.")
elif process_executor is None:
self.logger.log(level="ERROR", message="ProcessPoolExecutor does NOT exist!")
else:
try:
self.logger.log(level="DEBUG", message=f"@Queue | Submitting task to ProcessPoolExecutor.")
self.process_executor.submit(self.process_event_runner, functions=functions, id=id, **kwargs)
except Exception as error:
self.logger.log(level="ERROR", message=f"@Queue | {type(error).__name__}: {error}")
and the method that gets called:
def process_event_runner(self, functions: tuple, id: int, **inputedargs) -> None:
self.logger.log(level="DEBUG", message=f"@Queue.process_event_runner | Runner executing.")
event_loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
try:
event_loop.run_until_complete(asyncio.wait_for(async_task_worker(functions=functions, loop=event_loop, id=id, **inputedargs), timeout=3))
except asyncio.TimeoutError:
self.logger.log(level="WARNING", message=f"@Queue.process_event_runner | Task timed out. Terminating.")
except asyncio.CancelledError:
self.logger.log(level="WARNING", message=f"@Queue.process_event_runner | Task cancelled. Terminating.")
except ExceptionGroup as groupError:
self.logger.log(level="ERROR", message=f"@Queue.process_event_runner | Exception Group Error: {', '.join([f'SubE {num}: {exception}' for num, exception in enumerate(groupError.exceptions, 1)])}")
except Exception as error:
self.logger.log(level="ERROR", message=f"@Queue.process_event_runner | {type(error).__name__}: {error}")
async def async_task_worker(functions, loop: asyncio.AbstractEventLoop, id: int, **inputedargs):
asyncio.set_event_loop(loop)
async with asyncio.TaskGroup() as group:
tasks: list[asyncio.Task] = [group.create_task(func(**inputedargs)) for func in functions]
return True
Both of these methods provided, are inside a class named Queue
.
After running this, nothing happened. Figured out that .submit()
did not execute provided function, but executed main.py
which only has the whole system start-up. Queue system is in src.core.transmitters.queue
.
How can I create an isolated processes, that creates a copy of the runner function (process_event_runner
) and all the other functions that are defined inside a tuple in functions
variable and runs them? And still have a way to limit workers
and a way to use asyncio
? Is it even possible?
For any questions on how the whole system works, feel free to ask in the comments and I will try my best to answer them.
As asked by the Michael in the comments, here is the full traceback of the circular import error that I get when trying to import the specified function (process_event_runner) in main.py
before if __name__ == "__main__"
:
Traceback (most recent call last):
File "d:\Projects\RedSecurity\main.py", line 1, in <module>
from src.core.transmitters.queue import process_event_runner
File "d:\Projects\RedSecurity\src\core\transmitters\queue.py", line 3, in <module>
import src.sysConnector as syscon
File "d:\Projects\RedSecurity\src\sysConnector.py", line 9, in <module>
from src.core.transmitters.queue import Queue
ImportError: cannot import name 'Queue' from partially initialized module 'src.core.transmitters.queue' (most likely due to a circular import) (d:\Projects\RedSecurity\src\core\transmitters\queue.py)
The loader is after the if __main__...
rule and start method is set to spawn
. Like this:
from src.core.transmitters.queue import process_event_runner
if __name__ == "__main__":
import discord, sys, aiohttp, os, json, platform, glob, concurrent.futures, multiprocessing
sys.dont_write_bytecode = True
from discord.ext import commands
import src.system.colors as colors
import src.sysConnector as syscon
import src.setup as setup
class Bot(commands.AutoShardedBot):
#enabling all intents and setting up command_prefix
#initializing bot (intents, shards, prefix..)
def __init__(self, connector: syscon.SysConnector, setup: setup.Setup) -> None:
super().__init__(command_prefix="> ", intents=discord.Intents.all(), shard_count=1, help_command=None, max_messages=10000)
...