Re-raise python exceptions raised by threads in the main python process

84 Views Asked by At

I have seen many questions similar to this one on this platform, but none of the answers quite resolve my current issue.

I have a program which sends/receives messages over a serial connection. Messages can be received at any time, so once the connection is established, I spawn two threads: one to read messages off the port and queue them, and one to handle the messages received. These threads must run indefinitely until the connection is closed. By default, python's behavior for dealing with exceptions in threads is to just print the exception to terminal and kill the thread. Obviously this is not going to work for me, and it seems the best solution so far is to somehow raise the exception in the main thread to show that the listener or processor thread had an exception and died.

How should I properly catch exceptions that occur within indefinitely-running IO threads in my program?

I have tried overriding threading.excepthook with a custom function to re-raise the exception, but this does not work for me, since that function still gets called from within the thread raising the exception, not the main thread.

I have attached a super simple code snippet to represent very generally what the code is currently doing.

import threading
from time import sleep
import random

keep_thread_going = True


def thread_func(delay):
    while keep_thread_going:
        sleep(delay)
        # Simulate an exeption randomly occuring
        if random.randint(0, 5) == 0:
            raise Exception("The thread happened to throw an exception")


def except_hook_test(args):
    print("Custom exception handing...")
    raise args.exc_value  # What do I do here to raise this exception in the main thread?


def main():
    threading.excepthook = except_hook_test
    thread_obj = threading.Thread(target=thread_func, args=[1], daemon=True)
    thread_obj.start()
    # Simulate doing other stuff while the thread is running
    sleep(5)
    # End the thread
    global keep_thread_going
    keep_thread_going = False
    thread_obj.join()
    print("Done")


main()
3

There are 3 best solutions below

4
Mark Tolonen On

Exceptions can be sent back to the main thread via a queue. Capture the exception and send it back to the main thread to be re-raised. If you want to make it clear that the main thread is raising the exception, you can use raise exception from original_exception syntax:

import threading
import queue

def thread_func(q):
    try:
        raise ValueError('some value error')
    except Exception as e:
        q.put((threading.current_thread().name, e))

if __name__ == '__main__':
    q = queue.Queue()
    thread_obj = threading.Thread(target=thread_func, name='Worker', args=(q, ))
    thread_obj.start()
    try:
        tname, e = q.get(timeout=2)
        raise RuntimeError(f'Exception raised from {tname}') from e
    except queue.Empty as e:
        pass
    thread_obj.join()
    print('Done')

Output:

Traceback (most recent call last):
  File "C:\test.py", line 6, in thread_func
    raise ValueError('some value error')
ValueError: some value error

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\test.py", line 16, in <module>
    raise RuntimeError(f'Exception raised from {tname}') from e
RuntimeError: Exception raised from Worker

The exception can be serialized through a multiprocessing.Queue as well, but needs a little more work to get the stack trace:

import multiprocessing as mp
import traceback

def thread_func(q):
    try:
        raise ValueError('some value error')
    except Exception as e:
        q.put((mp.current_process().name, traceback.format_exception(e)))

if __name__ == '__main__':
    q = mp.Queue()
    thread_obj = mp.Process(target=thread_func, name='Worker', args=(q, ))
    thread_obj.start()
    try:
        tname, msg = q.get(timeout=2)
        print(f'Exception raised from {tname}:')
        print('  '+'  '.join(msg))
    except queue.Empty as e:
        pass
    thread_obj.join()
    print('Done')

Output:

Exception raised from Worker:
  Traceback (most recent call last):
    File "C:\test.py", line 6, in thread_func
    raise ValueError('some value error')
  ValueError: some value error

Done
0
Hai Vu On

I played with threading.excepthook and found that even if I raise an exception within the hook, the exception does not propagate to the main thread. A work-around is to create a global EXCEPTION object and watch for that to change:

import logging
import random
import threading
from time import sleep

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)s %(threadName)s: %(message)s",
)

keep_thread_going = True
EXCEPTION = None


def thread_func(delay):
    while keep_thread_going:
        sleep(delay)
        # Simulate an exeption randomly occuring
        value = random.randint(0, 5)
        logging.debug("value=%r", value)
        if value == 0:
            raise ValueError("The thread happened to throw an exception")


def except_hook_test(args):
    global keep_thread_going
    global EXCEPTION

    logging.debug("Custom exception handing...")
    keep_thread_going = False
    EXCEPTION = args.exc_value


def main():
    global keep_thread_going
    global EXCEPTION

    threading.excepthook = except_hook_test

    thread_obj = threading.Thread(
        target=thread_func, args=[1], daemon=True, name="thread_func"
    )
    thread_obj.start()

    # Loop to wait
    logging.debug("Waiting...")
    wait_duration_in_seconds = 5
    for _ in range(wait_duration_in_seconds * 2):
        if EXCEPTION is not None:
            break
        sleep(0.5)

    # Handle Exception
    if EXCEPTION is None:
        logging.debug("No exception encountered")
    else:
        logging.debug("Exception encountered: %s", EXCEPTION)


if __name__ == "__main__":
    main()

Sample output when an exception occurs:

2024-01-23 17:20:30,721 DEBUG MainThread: Waiting...
2024-01-23 17:20:31,726 DEBUG thread_func: value=2
2024-01-23 17:20:32,732 DEBUG thread_func: value=3
2024-01-23 17:20:33,738 DEBUG thread_func: value=0
2024-01-23 17:20:33,738 DEBUG thread_func: Custom exception handing...
2024-01-23 17:20:33,745 DEBUG MainThread: Exception encountered: The thread happened to throw an exception

Sample output when no exception occurs:

2024-01-23 17:15:05,911 DEBUG MainThread: Waiting...
2024-01-23 17:15:06,916 DEBUG thread_func: value=5
2024-01-23 17:15:07,917 DEBUG thread_func: value=5
2024-01-23 17:15:08,919 DEBUG thread_func: value=3
2024-01-23 17:15:09,922 DEBUG thread_func: value=4
2024-01-23 17:15:10,924 DEBUG thread_func: value=4
2024-01-23 17:15:10,950 DEBUG MainThread: No exception encountered

Notes

  • In except_hook_test, instead of raising, I set the global EXCEPTION and keep_thread_going to appropriate values when an exception occurs.
  • In the main thread, instead of a simple sleep function call, I have a loop that check every half second to see if the global EXCEPTION changed.
  • The loop will end either when time is up or when an exception occurred
  • When the loop ends, I will check to see if an exception occurred.
  • When dealing with thread, I found logging works better than print because the former is thread-safe and offers more details (such as which thread the message come from)
0
Booboo On

I believe the most straightforward way of doing this is by using a multithreading pool of size 1 (since we are only submitting a single "task"), which allows exceptions to be passed back. In the following code I am actually catching the exception that is being thrown in the main thread for demo purposes, but you do not need to enclose the statement result = async_result.get()) in a try block, in which case the main thread will throw the exception that thread_func has thrown.

from multiprocessing.pool import ThreadPool
from time import sleep
import random

keep_task_going = True

def thread_func(delay):
    while keep_task_going:
        sleep(delay)
        # Simulate an exeption randomly occuring
        if random.randint(0, 5) == 0:
            raise Exception("The thread happened to throw an exception")


def main():
    with ThreadPool(1) as pool:
        async_result = pool.apply_async(thread_func, args=(1,))
        # Simulate doing other stuff while the thread is running
        sleep(5)
        # End the submitted task:
        global keep_task_going
        keep_task_going = False
        try:
            # This will throw an exception if the thread_func
            # throws one. Here we are catching it just for demo purposes
            # but we do not need to enclose the following
            # in a try block if we do not want to catch it:
            result = async_result.get()
        except Exception as e:
            print('Caught exception:', e)
        else:
            print('Got result:', result)


if __name__ == '__main__':
    main()

Update

Instead of using a multithreding pool, you can use the following run_in_thread function. The wait time has been decrease to 3 seconds to give the thread a chance to complete normally:

import threading
import queue

def run_in_thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
    """The arguments to this function are identical to the arguments to the
    threading.Thread intiializer. This function, however, returns
    an "asynchronous result" object similar to what would be returned
    by a call to multiprocessing.pool.ThreadPool.apply_async."""

    if target is None:
        raise ValueError('The target argumnent must be a callable')

    result_queue = queue.SimpleQueue()

    def worker(*args, **kwargs):
        try:
            result = target(*args, **kwargs)
        except Exception as e:
            result = e
        result_queue.put(result)


    t = threading.Thread(group, worker, name, args, kwargs, daemon=daemon)
    t.start()

    class Result:
        @staticmethod
        def get():
            result = result_queue.get()
            t.join()
            if isinstance(result, Exception):
                raise result
            return result

        @staticmethod
        def ready():
            return not t.is_alive()

        @staticmethod
        def wait(timeout=None):
            return t.join(timeout)

    return Result


from time import sleep
import random

keep_thread_going = True

def thread_func(delay):
    while keep_thread_going:
        sleep(delay)
        # Simulate an exeption randomly occuring
        if random.randint(0, 5) == 0:
            raise Exception("The thread happened to throw an exception")
    return True

def main():
    async_result = run_in_thread(target=thread_func, args=[1], daemon=True)
    # Simulate doing other stuff while the thread is running
    #sleep(5)
    # Wait for 3 seconds or for the thread to terminate.
    # This give the thread a chance to complete normally:
    async_result.wait(3)
    if not async_result.ready():
        # The thread is still running
        # End the thread
        global keep_thread_going
        keep_thread_going = False
    # Get return value from thread_func or re-raise
    # any exception thrown in thread_func:
    print(async_result.get())
    print("Done")

main()

Sometimes the code will print:

True
Done

And sometimes it will print:

Traceback (most recent call last):
  File "C:\Booboo\test\test.py", line 68, in <module>
    main()
  File "C:\Booboo\test\test.py", line 65, in main
    print(async_result.get())
  File "C:\Booboo\test\test.py", line 24, in get
    raise result
  File "C:\Booboo\test\test.py", line 9, in worker
    result = callable(*args, **kwargs)
  File "C:\Booboo\test\test.py", line 48, in thread_func
    raise Exception("The thread happened to throw an exception")
Exception: The thread happened to throw an exception