Determining Memory Consumption with Python Multiprocessing and Shared Arrays

401 Views Asked by At

I've spent some time research the Python multiprocessing module, it's use of os.fork, and shared memory using Array in multiprocessing for a piece of code I'm writing.

The project itself boils down to this: I have several MxN arrays (let's suppose I have 3 arrays called A, B, and C) that I need to process to calculate a new MxN array (called D), where:

Dij = f(Aij, Bij, Cij)

The function f is such that standard vector operations cannot be applied. This task is what I believe is called "embarrassing parallel". Given the overhead involved in multiprocessing, I am going to break the calculation of D into blocks. For example, if D was 8x8 and I had 4 processes, each processor would be responsible for solving a 4x4 "chunk" of D.

Now, the size of the arrays has the potential to be very big (on the order of several GB), so I want all arrays to use shared memory (even array D, which will have sub-processes writing to it). I believe I have a solution to the shared array issue using a modified version of what is presented here.

However, from an implementation perspective it'd be nice to place arrays A, B, and C into a dictionary. What is unclear to me is if doing this will cause the arrays to be copied in memory when the reference counter for the dictionary is incremented within each sub-process.

To try and answer this, I wrote a little test script (see below) and tried running it using valgrind --tool=massif to track memory usage. However, I am not quite clear how to intemperate the results from it. Specifically, whether each massiff.out file (where the number of files is equal to the number of sub-processes created by my test script + 1) denotes the memory used by that process (i.e. I need to sum them all up to get the total memory usage) or if I just need to consider the massif.out associated with the parent process.

On a side note: One of my shared memory arrays has the sub-processes writing to it. I know that this sound be avoided, especially since I am not using locks to limit only one sub-process writing to the array at any given time. Is this a problem? My thought is that since the order that the array is filled out is irrelevant, the calculation of any index is independent of any other index, and that any given sub-process will never write to the same array index as any other process, there will not be any sort of race conditions. Is this correct?

#! /usr/bin/env python

import multiprocessing as mp
import ctypes
import numpy as np
import time
import sys
import timeit

def shared_array(shape=None, lock=False):
    """
    Form a shared memory numpy array.

    https://stackoverflow.com/questions/5549190/is-shared-readonly-data-copied-to-different-processes-for-python-multiprocessing 
    """

    shared_array_base = mp.Array(ctypes.c_double, shape[0]*shape[1], lock=lock)

    # Create a locked or unlocked array
    if lock:

        shared_array = np.frombuffer(shared_array_base.get_obj())

    else:

        shared_array = np.frombuffer(shared_array_base)

    shared_array = shared_array.reshape(*shape)

    return shared_array

def worker(indices=None, queue=None, data=None):

    # Loop over each indice and "crush" some data
    for i in indices:

        time.sleep(0.01)

        if data is not None:

            data['sink'][i, :] = data['source'][i, :] + i

        # Place ID for completed indice into the queue
        queue.put(i)

if __name__ == '__main__':

    # Set the start time
    begin = timeit.default_timer()

    # Size of arrays (m x n)
    m = 1000

    n = 1000

    # Number of Processors
    N = 2

    # Create a queue to use for tracking progress
    queue = mp.Queue()

    # Create dictionary and shared arrays
    data = dict()

    # Form a shared array without a lock.
    data['source'] = shared_array(shape=(m, n), lock=True)

    data['sink']   = shared_array(shape=(m, n), lock=False)

    # Create a list of the indices associated with the m direction
    indices = range(0, m)

    # Parse the indices list into range blocks; each process will get a block
    indices_blocks = [int(i) for i in np.linspace(0, 1000, N+1)]

    # Initialize a list for storing created sub-processes
    procs = []

    # Print initialization time-stap
    print 'Time to initialize time: {}'.format(timeit.default_timer() - begin)

    # Create and start each sbu-process
    for i in range(1, N+1):

        # Start of the block
        start = indices_blocks[i-1]

        # End of the block
        end   = indices_blocks[i]

        # Create the sub-process
        procs.append(mp.Process(target=worker,
                                args=(indices[start:end], queue, data)))

        # Kill the sub-process if/when the parent is killed
        procs[-1].daemon=True

        # Start the sub-process
        procs[-1].start()


    # Initialize a list to store the indices that have been processed
    completed = []

    # Entry a loop dependent on whether any of the sub-processes are still alive
    while any(i.is_alive() for i in procs):

        # Read the queue, append completed indices, and print the progress
        while not queue.empty():

            done = queue.get()

            if done not in completed:

                completed.append(done)

            message = "\rCompleted {:.2%}".format(float(len(completed))/len(indices))

            sys.stdout.write(message)

            sys.stdout.flush()

    print ''

    # Join all the sub-processes
    for p in procs:

        p.join()    

    # Print the run time and the modified sink array
    print 'Running time: {}'.format(timeit.default_timer() - begin)

    print data['sink']

Edit: I seems I've run into another issue; specifically, an value of n equal to 3 million will result in the kernel killing the process (I assume it's due to a memory issue). This appears to be with how shared_array() works (I can create np.zeros arrays of the same size and not have an issue). After playing with it a bit I get the traceback shown below. I'm not entirely sure what is causing the memory allocation error, but a quick Google search gives discussions about how mmap maps virtual address space, which I'm guessing is smaller than the amount of physical memory a machine has?

Traceback (most recent call last):
File "./shared_array.py", line 66, in <module>
    data['source'] = shared_array(shape=(m, n), lock=True)
File "./shared_array.py", line 17, in shared_array
    shared_array_base = mp.Array(ctypes.c_double, shape[0]*shape[1], lock=lock)
File "/usr/apps/python/lib/python2.7/multiprocessing/__init__.py", line 260, in Array
    return Array(typecode_or_type, size_or_initializer, **kwds)
File "/usr/apps/python/lib/python2.7/multiprocessing/sharedctypes.py", line 120, in Array
    obj = RawArray(typecode_or_type, size_or_initializer)
File "/usr/apps/python/lib/python2.7/multiprocessing/sharedctypes.py", line 88, in RawArray
    obj = _new_value(type_)
File "/usr/apps/python/lib/python2.7/multiprocessing/sharedctypes.py", line 68, in _new_value
    wrapper = heap.BufferWrapper(size)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 243, in __init__
    block = BufferWrapper._heap.malloc(size)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 223, in malloc
    (arena, start, stop) = self._malloc(size)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 120, in _malloc
    arena = Arena(length)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 82, in __init__
    self.buffer = mmap.mmap(-1, size)
mmap.error: [Errno 12] Cannot allocate memory
0

There are 0 best solutions below