I'm wrote the following demo script to replicate the multiprocessing handling logic from a more complex case (from now on the original code), however the key parts should be there.
The original code running with python 3.10.12 had a memory leak when sometimes stopped by CTRL+C, as seen with htop
in Linux where stale processes were hogging up RAM. After spending some time trying to fix it, I aimed at getting all processes terminated and check their exit codes. After getting exit code -15 (correct for CTRL+C) for all processes in the printout after hitting CTRL+C, I thought the leak would stop but it persists.
The demo script code is the following:
import multiprocessing as mp
import time
class ProcessManager:
def __init__(self) -> None:
self.proc_map = {}
self.map_procs()
def map_procs(self):
for i in range(1):
self.proc_map[i] = mp.Process(target=self.parent_loop)
print("mapped processes in orchestrator")
def start_procs(self):
# Start all processes
for i, proc in self.proc_map.items():
proc.start()
print(f"Parent process {i} started.")
# wait for processes to finish
for _, proc in self.proc_map.items():
try:
proc.join() # wait for the process to finish (it may or may not, depending on loops)
except KeyboardInterrupt:
for _, proc in self.proc_map.items():
print(f"Terminating process {proc}...")
proc.terminate()
for _, proc in self.proc_map.items():
proc.join() # wait for termination to finish
def parent_loop(self):
p = Parent()
p.parent_loop()
class Parent:
def __init__(self) -> None:
self.queue = mp.Manager().Queue()
self.child = ChildDaemon(self.queue)
self.dummy_file_in_memory = self.load_big_file()
self.slow_init_func(5)
self.child.daemon_proc.start()
def slow_init_func(self, t):
print("slow init func")
time.sleep(t)
def load_big_file(self):
file_size = 1024 * 1024 * 1024 * 20
dummy_data = bytearray(file_size)
return dummy_data
def parent_loop(self):
while True:
print(f"parent loop: queue counter {self.queue.get()}")
time.sleep(5)
class ChildDaemon:
def __init__(self, queue) -> None:
self.queue = queue
self.daemon_proc = mp.Process(target=self.child_loop, args=(queue,))
self.daemon_proc.daemon = True
def child_loop(self, queue):
counter = 0
while True:
print("child loop")
time.sleep(1)
try:
queue.get_nowait()
except:
pass
try:
queue.put(counter, block=False)
except:
pass
counter += 1
if __name__=="__main__":
mp.set_start_method('spawn')
orch = ProcessManager()
orch.start_procs()
for key, proc in orch.proc_map.items():
print("Checking exitcodes of supervisor processes...")
print(f"{key} ({proc}) exitcode: {proc.exitcode}")
On Mac OS 14.1.2 and python 3.9.12, when I CTRL+C out of it, I get:
mapped processes in orchestrator
Parent process 0 started.
slow init func
child loop
child loop
parent loop: queue counter 0
child loop
child loop
child loop
child loop
parent loop: queue counter 4
child loop
child loop
^CTerminating process <Process name='Process-1' pid=13652 parent=13650 started>...
Process Process-1:2:
Traceback (most recent call last):
File "/Users/******/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Users/******/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/******/Projects/processes_leak/src.py", line 74, in child_loop
time.sleep(1)
KeyboardInterrupt
Checking exitcodes of supervisor processes...
0 (<Process name='Process-1' pid=13652 parent=13650 stopped exitcode=-SIGTERM>) exitcode: -15
htop
is showing this after the script ends:
Notes
- My hope here is that the solution for this will translate to the original code as well.
- In the original code, I cannot use an executor, as I'm encountering some non-picklable objects and I want to handle the process management in a similar way to the demo.
Edit
Thanks to the comment by @relent95, I believe the issue lies in the mp.Manager() call. I can observe python processes after even the narrower script below has finished.
The question becomes how to correctly handle closing the manager process (ideally without using a context manager, as the original project is more complex. More specifically, is that is needed, since the docs seem to say that Manager processes will be shutdown as soon as they are garbage collected or their parent process exits?
import multiprocessing as mp
import time
class ProcessManager:
def __init__(self):
self.organise()
def organise(self):
p = mp.Process(target=self.start_manager)
p.start()
try:
p.join()
except KeyboardInterrupt:
print('pm exception')
try:
self.m.shutdown()
except:
pass
p.terminate()
p.join()
def start_manager(self):
self.m = mp.Manager()
self.q = self.m.Queue()
try:
time.sleep(10)
finally:
print('start_manager exception')
self.m.shutdown()
if __name__ == "__main__":
pm = ProcessManager()
I figured out how to not leave orphan processes when handling an exception alongside a Manager() child process.
The key seems to be not to call Process.terminate(). Regardless of whether Manager.shutdown() was called beforehand, calling terminate() seems to not do the cleanup I would expect and leaves orphan processes behind, as can be observed by running (and interrupting with CTRL+C) my initial post's update section code.
This is clearly mentioned under
terminate()
in the 3.12.1 docs (but not in the same place in 3.9 docs):This code below does not leave an orphan on Mac OS 14.1.2 and python 3.9.12: