Populating an empty array within parallel processing

214 Views Asked by At

Lets say I have a myfunc that takes in an input i and j, calculates their sum and populates a passed-in array with the answer. This is what the problem looks like:

import numpy as np
from functools import partial
from multiprocessing import Pool

def myfunc(i: int, j: int, some_array: np.ndarray): 
    ans = i+j
    some_array[i,j] = ans 


some_array = np.zeros(shape = (2,2))


execute = partial(myfunc, some_array = some_array)

for i in range(2): 
    for j in range(2): 
        execute(i,j)

print(some_array)

[[0. 1.]
 [1. 2.]]

Now, lets imagine I would like to parallelize this code. I do so in the following way:

iter = [(i,j) for i in range(2) for j in range(2)]


with Pool() as p: 
    p.starmap(execute, iterable = iter)

This doesn't update the empty array everytime execute is called with different args. The final array is all zeros. This may be because p.starmap yields a list of all results at the end but given that execute is called for each iterable it should execute some_array[i,j] = ans in every call.

Any ideas/ help is much appreciated.

1

There are 1 best solutions below

0
On

The biggest issue here is that separate processes have separate memory, so when execute is called, it is with a different copy of some_array. The copy of some_array in the main process is then never updated, and the result is un-changed (all zeros). There are two ways around this: message passing, and shared memory. Most of the multiprocessing.Pool functions already have some sort of mechanism to return a value from the target function which operates via pickling the results and sending them back to the main process with a Queue. The benefit of this is that it's quite flexible, and can handle many types of data (anything that can be pickled). The downside is that you then have to re-assemble the data in the main process, and there's quite a bit of overhead in the sending process (slow). The other solution is to ask the operating system to carve out a chunk of memory that both processes can access. This is quite fast, but it takes a little setup (and cleanup), and it is limited to data that can be represented by a binary buffer. Fortunately numpy can create an array from an existing binary buffer (our shared memory block). I have a helper class I have tweaked over time to help make the bookkeeping of the shared memory easier:

import numpy as np
from multiprocessing import shared_memory, Process

class Shared_Arr: #helper class to make shared_memory arrays easier
    def __init__(self, shape, dtype, shm=None):
        self.shape=shape
        self.dtype=dtype

        if shm is None:
            n_bytes = int(np.dtype(dtype).itemsize * np.prod(shape))
            self.shm = shared_memory.SharedMemory(create=True, size=n_bytes)
            self.owner = True
        else:
            self.shm = shm
            self.owner = False

        self.close = self.shm.close
        self.unlink = self.shm.unlink

        self.arr = np.ndarray(self.shape, self.dtype, buffer=self.shm.buf)

    def __reduce__(self): #make it picklable so it can be sent to a child process correctly
        return (self.__class__, (self.shape, self.dtype, self.shm))

    def __enter__(self): #context manager is mostly for cleanup so __enter__ is uninteresting
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close() #closes the memory-mapped file
        if self.owner:
            self.unlink() #tell the OS to delete the file

def populate_arr(shared, value):
    with shared: #without the context manager you could just manually call shared.close() when you're done with it
        shared.arr[:] = value

if __name__ == "__main__":
    with Shared_Arr([10], int) as shared:
        shared.arr[:] = 0 #np.ndarray may operate like np.empty? initialize to zero
        print(shared.arr) #before modification
        p = Process(target=populate_arr, args=(shared, 5))
        p.start()
        p.join()
        print(shared.arr) #after being modified in a separate process