Unable to .get() from multiprocessing.Queue

876 Views Asked by At

I'm building a web application for processing ~60,000 (and growing) large files, perform some analysis and return a "best guess" that needs to be verified by a user. The files will be refined by category to avoid loading every file, but I'm still left with a scenario where I might have to process 1000+ files at a time.

These are large files that can take up to 8-9 seconds each to process, and in a 1000+ file situation it is impractical to have a user wait 8 seconds between reviews or 2 hours+ while the files are processed before hand.

To overcome this, I've decided to use multiprocessing to spawn several workers, each of which will pick from a queue of files, process them and insert into an output queue. I have another method that basically polls the output queue for items and then streams them to the client when one becomes available.

This works well, until a portion of the way through when the queue arbitrarily stops returning items. We're using gevent with Django and uwsgi in our environment and I'm aware that child process creation via multiprocessing in the context of gevent yields an undesired event loop state in the child. Greenlets spawned before forking are duplicated in the child. Therefore, I've decided to use gipc to assist in the handling of the child processes.

An example of my code (I cannot post my actual code):

import multiprocessing
import gipc
from item import Item

MAX_WORKERS = 10

class ProcessFiles(object):

    def __init__(self):
        self.input_queue = multiprocessing.Queue()
        self.output_queue = multiprocessing.Queue()
        self.file_count = 0

    def query_for_results(self):
        # Query db for records of files to process.
        # Return results and set self.file_count equal to
        # the number of records returned.
        pass

    # The subprocess.
    def worker(self):
        # Chisel away at the input queue until no items remain.
        while True:
            if self.no_items_remain():
                return

            item = self.input_queue.get(item)
            item.process()
            self.output_queue.put(item)

    def start(self):
        # Get results and store in Queue for processing
        results = self.query_for_results()
        for result in results:
             item = Item(result)
             self.input_queue.put(item)

        # Spawn workers to process files.
        for _ in xrange(MAX_WORKERS):
            process = gipc.start_process(self.worker)

        # Poll for items to send to client.
        return self.get_processed_items()

    def get_processed_items(self):

        # Wait for the output queue to hold at least 1 item.
        # When an item becomes available, yield it to client.
        count = 0
        while count != self.file_count:
            #item = self._get_processed_item()
            # Debugging:
            try:
                item = self.output_queue.get(timeout=1)
            except:
                print '\tError fetching processed item. Retrying...'
                continue

            if item:
                print 'QUEUE COUNT: {}'.format(self.output_queue.qsize())
                count += 1
                yield item
        yield 'end'

I expect the output to show the current count of the queue after processing and yielding an item:

QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
...
QUEUE COUNT: 4
QUEUE COUNT: 3
QUEUE COUNT: 2
QUEUE COUNT: 1

However, the script only manages to yield a few items before failing:

QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    ...

My question is: What exactly is happening? Why can't I get from the queue? How can I return the item I expect and avoid this?

1

There are 1 best solutions below

2
On

What is the actual exception that is being thrown when you can't get an item? You're blindly catching all exceptions that could be thrown. In addition, why not just use get without a timeout as well? You immediately try again without doing anything else. Might as just make the call to get a block until an item is ready.

With regards to the problem I think what is happening is that gipc is closing the pipes associated with your queue and thus breaking the queue. I expect an OSError is being thrown rather than queue.Empty. See this bug report for details.

As an alternative, you could use a process pool, initiate the pool before any gevent stuff happens (meaning you don't have to worry about the event loop issue). Submit jobs to the pool using imap_unordered and you should be fine.

Your start function would look something like:

def start(self):
    results = self.query_for_results()
    return self.pool.imap_unordered(self.worker, results, 
        chunksize=len(results) // self.num_procs_in_pool)

@staticmethod
def worker(item):
    item.process()
    return item