What's the best way to implement a single consumer, multiple producer scenario using Python?

849 Views Asked by At

I have a Python program that spawns multiple producer threads and then has a loop that waits for a Queue object to have something in it. It looks something like this

for t in threads:
    t.start()
while len(threads):
    if not queue.empty():
        response = queue.get()
        # handle response
        queue.task_done()
    else:
        sleep(1)
    threads = [t for t in threads if t.is_alive()]

There has to be a more elegant way to do this. I have looked into all of the synchronization objects the threading module provides, but I don't see how any of them could be applied.

FYI, the code I have works for what I am trying to do. I am a strong believer in not fixing something that isn't broken, but I just feel like there is a better way to do this that a better programmer would have done in the first place.

2

There are 2 best solutions below

0
On

You can use a weakref, to test, if the thread is still alive:

import weakref

def consumer(queue, threads):
    while threads:
        try:
            response = queue.get(timeout=1)
            # handle response
            queue.task_done()
        except Empty:
            pass

threads = weakref.WeakSet()
for i in range(10):
    t = threading.Thread(...)
    t.start()
    threads.add(t)
del t  # remove all references to threads

consumer(queue, threads)
0
On

@ Daniel : weakref is a cool trick. Here is an alternate method that just uses the queue with an added 'termination policy'.

You will need to ensure each producer's thread target functions always put a final 'termination message' to the queue, essentially a None after they've finished producing. The consumer just waits until the appropriate number of terminations (1 per producer thread) have been received and exits the loop. This way you don't have to check if the threads have ended and there is really only one point of communication: the queue. However, if there is an exception in the consumer, the producer threads should probably be in "Daemon" mode so that they don't hold up the process while they wait for the consumer queue to be .. well, consumed.

You have to make sure that the termination message is ALWAYS sent for every producer, in some kind of try-finally indent. Otherwise you'll have to handle a timeout in the except Empty of the consumer.

import functools
def consumer(queue,num_threads_remaining):
    next_message=functools.partial(iter,functools.partial(queue.get,timeout=1),None)
    while num_threads_remaining:
        try:
            for response in next_message():
                # handle response.. hopefully exception-protected
                queue.task_done()
            # we got a None termination message
            num_threads_remaining -= 1
        except Empty: pass # handle some other check when idling?