I have the following set-up:
import multiprocessing
def heavy_work(args):
queue, data, idx = args
state = 0
for elem in queue:
# decide how to process current data point, store collected information in 'state'
else:
queue.put(modify(data[idx], state))
def mp_heavy_work(data):
queue = multiprocessing.Manager().Queue(len(data))
pool = multiprocessing.Pool(processes=4)
pool.map(heavy_work, ((queue, data, i) for i in range(len(data))))
Problem is, queues are not iterable, so line 5 does not work. I need to know how previous data points were modified in order to decide for new ones, so reading the write-access shared container (currently queue
) is necessary. I wanted to rely on a 'primitive' process-safe type instead of locks, because the main effort is done inside the loop -- so locking it each time a process enters makes the multiprocessing redundant.
Is there a way?
Why do you need it to be iterable? You can just use the normal
queue.get()
.If you really want an iterable, you can wrap your queue object:
This will call queue.get on each iteration until it hits a 'sentianal' object, then it will
StopIteration
. This way you can manually close the queue.If you want to do the
else
block when the queue is empty (without manually ending the iteration), you must rely on the Queue.Empty exception:You could even wrap this up in your own custom wrapper: