I have two daemon processes running. One feeds data into a shared multiprocessing.Queue, and the other takes data from this Queue and submits it into a multiprocessing.pool.apply_async. The async result is sent to another multiprocessing.Queue.
Some example code:
import multiprocessing
from multiprocessing import Process
import random
import time
def my_method(i):
return i*i
class DataFeeder:
input_queue = None
@staticmethod
def stream_to_queue(iq):
if DataFeeder.input_queue is None:
DataFeeder.input_queue = iq
while True:
time.sleep(1)
dat = random.choice(range(0, 50))
print(f"feeding {dat}")
DataFeeder.input_queue.put(dat)
class DataEater:
input_queue = None
results_queue = None
pool = None
@staticmethod
def eat(iq, rq, p):
if DataEater.input_queue is None:
DataEater.input_queue = iq
if DataEater.results_queue is None:
DataEater.results_queue = rq
if DataEater.pool is None:
DataEater.pool = p
while True:
time.sleep(0.1)
dat = DataEater.input_queue.get(0.1) # 100ms timeout
print(f"eating {dat}")
async_result = DataEater.pool.apply_async(my_method, (dat,))
print(f"async_result {async_result}")
DataEater.results_queue.put_nowait(async_result)
if __name__ == '__main__':
with multiprocessing.Manager() as m:
input_q = m.Queue()
output_q = m.Queue()
pool = m.Pool(8)
dfp = Process(target=DataFeeder.stream_to_queue, args=(input_q,), daemon=True)
dfp.start()
dep = Process(target=DataEater.eat, args=(input_q, output_q, pool), daemon=True)
dep.start()
dep.join()
dfp.join()
The resulting stacktrace is as follows:
Process Process-3:
Traceback (most recent call last):
File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/chhunb/PycharmProjects/micromanager_server/umanager_server/multiproctest.py", line 54, in eat
async_result = DataEater.pool.apply_async(my_method, (dat,))
File "<string>", line 2, in apply_async
File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/managers.py", line 816, in _callmethod
proxytype = self._manager._registry[token.typeid][-1]
AttributeError: 'NoneType' object has no attribute '_registry'
I've tried spawning the pool locally in the eat() method, so that it would spawn from within the new process. This won't work because the result is an async_result and has to be shared with other processes using the results_queue.
I've tried completely ignoring the DataFeeder by hard coding some string into the apply-async input. Same error.
This open python issue is very similar, if not exactly the same as what i'm doing: https://github.com/python/cpython/issues/80100 The issue is still open so does that mean it was never resolved in later versions of python?
Do i have to consider a completely different design pattern to get this functionality?
The following is a hackish approach which was tested only on Python 3.11 on Windows. It may break for other than the CPython implementation, other Python versions, other OS'
Moreover it currently transfers the authentication key of the manager's server between processes (this is considered unsafe) but this could be changed if necessary with a fixed key.