Python Process Pool signal handling

1.3k Views Asked by At

I have a multiprocessing.Pool running tasks that I wan't to exit gracefully in case of a terminate by handling the SIGTERM signal

This is my code example (used python 3.9)

import os
import signal
import time
from multiprocessing import Pool


class SigTermException(Exception):
    pass


def sigtermhandler(signum, frame):
    raise SigTermException('sigterm')


def f():
    print(os.getpid())
    try:
        while True:
            print("loop")
            time.sleep(5)
    except SigTermException:
        print("Received SIGTERM")


def main():
    signal.signal(signal.SIGTERM, sigtermhandler)
    pool = Pool()

    pool.apply_async(f)
    print("wait 5")
    time.sleep(5)
    print("Terminating")
    pool.terminate()
    print("Joining")
    pool.join()
    print("Exiting")


if __name__ == '__main__':
    main()

I was expecting to print

...
Terminating
Received SIGTERM
Joining
Exiting

However it seems it doesn't go past pool.terminate()

Here's an example

wait 5
92363
loop
Terminating
loop
Received SIGTERM

Performing a ps I see the following

  92362 pts/0    S+     0:00  |   |   \_ python signal_pool.py
  92363 pts/0    S+     0:00  |   |       \_ python signal_pool.py

So it looks like the child process is still 'alive'

Also tested the solution mentioned here to no avail

Any hints o help is appreciated

1

There are 1 best solutions below

1
On

Your worker function, f, runs forever yet your main process sleeps just for 5 seconds and then calls terminate on the pool which would result in killing any running tasks. This contradicts your saying you would like to have your tasks exit gracefully in case of receiving a SIGTERM because as it now stands, they will not exit gracefully in the absence of a SIGTERM.

So I would think the main process should be waiting as long as necessary for the submitted task or tasks to complete -- this is the usual situation, right? It also seems that when I tried this and issued a kill -15 command, perhaps because the main process is just in a wait state waiting for the submitted task to complete, that the worker function alone handled this and the signal was never passed to the main process. I therefore did not need a try/except block in the main proceess.

import os
import signal
import time
from multiprocessing import Pool


class SigTermException(Exception):
    pass


def sigtermhandler(signum, frame):
    raise SigTermException('sigterm')


def f():
    print(os.getpid())
    try:
        while True:
            print("loop")
            time.sleep(5)
    except SigTermException:
        print("Received SIGTERM")


def main():
    signal.signal(signal.SIGTERM, sigtermhandler)
    pool = Pool()
    async_result = pool.apply_async(f)
    print("waiting for task to complete ...")
    async_result.get() # wait for task to complete
    pool.close()
    print("Joining")
    pool.join()
    print("Exiting")


if __name__ == '__main__':
    main()

Printed:

waiting for task to complete ...
98
loop
Received SIGTERM
Joining
Exiting

You can also just do:

def main():
    signal.signal(signal.SIGTERM, sigtermhandler)
    pool = Pool()
    pool.apply_async(f)
    print("waiting for all tasks to complete ...")
    pool.close()
    pool.join()
    print("Exiting")