I've spent some time research the Python multiprocessing
module, it's use of os.fork
, and shared memory using Array
in multiprocessing
for a piece of code I'm writing.
The project itself boils down to this: I have several MxN arrays (let's suppose I have 3 arrays called A, B, and C) that I need to process to calculate a new MxN array (called D), where:
Dij = f(Aij, Bij, Cij)
The function f
is such that standard vector operations cannot be applied. This task is what I believe is called "embarrassing parallel". Given the overhead involved in multiprocessing
, I am going to break the calculation of D into blocks. For example, if D was 8x8 and I had 4 processes, each processor would be responsible for solving a 4x4 "chunk" of D.
Now, the size of the arrays has the potential to be very big (on the order of several GB), so I want all arrays to use shared memory (even array D, which will have sub-processes writing to it). I believe I have a solution to the shared array issue using a modified version of what is presented here.
However, from an implementation perspective it'd be nice to place arrays A, B, and C into a dictionary. What is unclear to me is if doing this will cause the arrays to be copied in memory when the reference counter for the dictionary is incremented within each sub-process.
To try and answer this, I wrote a little test script (see below) and tried running it using valgrind --tool=massif
to track memory usage. However, I am not quite clear how to intemperate the results from it. Specifically, whether each massiff.out file (where the number of files is equal to the number of sub-processes created by my test script + 1) denotes the memory used by that process (i.e. I need to sum them all up to get the total memory usage) or if I just need to consider the massif.out associated with the parent process.
On a side note: One of my shared memory arrays has the sub-processes writing to it. I know that this sound be avoided, especially since I am not using locks to limit only one sub-process writing to the array at any given time. Is this a problem? My thought is that since the order that the array is filled out is irrelevant, the calculation of any index is independent of any other index, and that any given sub-process will never write to the same array index as any other process, there will not be any sort of race conditions. Is this correct?
#! /usr/bin/env python
import multiprocessing as mp
import ctypes
import numpy as np
import time
import sys
import timeit
def shared_array(shape=None, lock=False):
"""
Form a shared memory numpy array.
https://stackoverflow.com/questions/5549190/is-shared-readonly-data-copied-to-different-processes-for-python-multiprocessing
"""
shared_array_base = mp.Array(ctypes.c_double, shape[0]*shape[1], lock=lock)
# Create a locked or unlocked array
if lock:
shared_array = np.frombuffer(shared_array_base.get_obj())
else:
shared_array = np.frombuffer(shared_array_base)
shared_array = shared_array.reshape(*shape)
return shared_array
def worker(indices=None, queue=None, data=None):
# Loop over each indice and "crush" some data
for i in indices:
time.sleep(0.01)
if data is not None:
data['sink'][i, :] = data['source'][i, :] + i
# Place ID for completed indice into the queue
queue.put(i)
if __name__ == '__main__':
# Set the start time
begin = timeit.default_timer()
# Size of arrays (m x n)
m = 1000
n = 1000
# Number of Processors
N = 2
# Create a queue to use for tracking progress
queue = mp.Queue()
# Create dictionary and shared arrays
data = dict()
# Form a shared array without a lock.
data['source'] = shared_array(shape=(m, n), lock=True)
data['sink'] = shared_array(shape=(m, n), lock=False)
# Create a list of the indices associated with the m direction
indices = range(0, m)
# Parse the indices list into range blocks; each process will get a block
indices_blocks = [int(i) for i in np.linspace(0, 1000, N+1)]
# Initialize a list for storing created sub-processes
procs = []
# Print initialization time-stap
print 'Time to initialize time: {}'.format(timeit.default_timer() - begin)
# Create and start each sbu-process
for i in range(1, N+1):
# Start of the block
start = indices_blocks[i-1]
# End of the block
end = indices_blocks[i]
# Create the sub-process
procs.append(mp.Process(target=worker,
args=(indices[start:end], queue, data)))
# Kill the sub-process if/when the parent is killed
procs[-1].daemon=True
# Start the sub-process
procs[-1].start()
# Initialize a list to store the indices that have been processed
completed = []
# Entry a loop dependent on whether any of the sub-processes are still alive
while any(i.is_alive() for i in procs):
# Read the queue, append completed indices, and print the progress
while not queue.empty():
done = queue.get()
if done not in completed:
completed.append(done)
message = "\rCompleted {:.2%}".format(float(len(completed))/len(indices))
sys.stdout.write(message)
sys.stdout.flush()
print ''
# Join all the sub-processes
for p in procs:
p.join()
# Print the run time and the modified sink array
print 'Running time: {}'.format(timeit.default_timer() - begin)
print data['sink']
Edit: I seems I've run into another issue; specifically, an value of n equal to 3 million will result in the kernel killing the process (I assume it's due to a memory issue). This appears to be with how shared_array() works (I can create np.zeros arrays of the same size and not have an issue). After playing with it a bit I get the traceback shown below. I'm not entirely sure what is causing the memory allocation error, but a quick Google search gives discussions about how mmap maps virtual address space, which I'm guessing is smaller than the amount of physical memory a machine has?
Traceback (most recent call last):
File "./shared_array.py", line 66, in <module>
data['source'] = shared_array(shape=(m, n), lock=True)
File "./shared_array.py", line 17, in shared_array
shared_array_base = mp.Array(ctypes.c_double, shape[0]*shape[1], lock=lock)
File "/usr/apps/python/lib/python2.7/multiprocessing/__init__.py", line 260, in Array
return Array(typecode_or_type, size_or_initializer, **kwds)
File "/usr/apps/python/lib/python2.7/multiprocessing/sharedctypes.py", line 120, in Array
obj = RawArray(typecode_or_type, size_or_initializer)
File "/usr/apps/python/lib/python2.7/multiprocessing/sharedctypes.py", line 88, in RawArray
obj = _new_value(type_)
File "/usr/apps/python/lib/python2.7/multiprocessing/sharedctypes.py", line 68, in _new_value
wrapper = heap.BufferWrapper(size)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 243, in __init__
block = BufferWrapper._heap.malloc(size)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 223, in malloc
(arena, start, stop) = self._malloc(size)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 120, in _malloc
arena = Arena(length)
File "/usr/apps/python/lib/python2.7/multiprocessing/heap.py", line 82, in __init__
self.buffer = mmap.mmap(-1, size)
mmap.error: [Errno 12] Cannot allocate memory