How to to acces process-safe container in multiprocessing.Pool

199 Views Asked by At

I have the following set-up:

import multiprocessing
def heavy_work(args):
    queue, data, idx = args
    state = 0
    for elem in queue:
        #  decide how to process current data point, store collected information in 'state'
    else:
        queue.put(modify(data[idx], state))


def mp_heavy_work(data):
    queue = multiprocessing.Manager().Queue(len(data))
    pool = multiprocessing.Pool(processes=4)
    pool.map(heavy_work, ((queue, data, i) for i in range(len(data))))

Problem is, queues are not iterable, so line 5 does not work. I need to know how previous data points were modified in order to decide for new ones, so reading the write-access shared container (currently queue) is necessary. I wanted to rely on a 'primitive' process-safe type instead of locks, because the main effort is done inside the loop -- so locking it each time a process enters makes the multiprocessing redundant.

Is there a way?

1

There are 1 best solutions below

5
On

Why do you need it to be iterable? You can just use the normal queue.get().

try:
    while True:
        elem = queue.get(False)
        # do stuff with elem
except Queue.Empty:
    queue.put(modify(data[idx], state))

If you really want an iterable, you can wrap your queue object:

for elem in iter(queue.get, 'sentinal'):
    # do stuff
else:
    # do more stuff

This will call queue.get on each iteration until it hits a 'sentianal' object, then it will StopIteration. This way you can manually close the queue.

If you want to do the else block when the queue is empty (without manually ending the iteration), you must rely on the Queue.Empty exception:

try:
    for elem in iter(lambda:queue.get(False), 'sentinal'):
        # do stuff        
except Queue.Empty:
    # do else stuff

You could even wrap this up in your own custom wrapper:

def wrap(queue):
    try:
        for elem in iter(lambda:queue.get(False), 'sentinal'):
            yield elem
        else:
            # this is executed if the 'sentinal' is sent
            # may want to special handle this (throw custom exception?)
    except Queue.Empty:
        pass # just enter iteration

...

    for elem in wrap(queue):
        # do stuff
    else:
        # do more stuff