I want to compare different areas of a 2 dimensional array $A$ with a given array $b$ of a smaller size. Since I have to do it a lot of times, it is crucial that this is performed very fast. I have a solution that works fine, but I hoped it could be done nicer and faster.
In detail:
Let's say we have a big array and a small array. I loop over all possible 'patches' within the big array that have the same size as the small array and compare this patches with the given small array.
def get_best_fit(big_array, small_array):
# we assume the small array is square
patch_size = small_array.shape[0]
min_value = np.inf
for x in range(patch_size, big_array.shape[0] - patch_size):
for y in range(patch_size, big_array.shape[1] - patch_size):
p = get_patch_term(x, y, patch_size, big_array)
tmp = some_metric(p, small_array)
if min_value > tmp:
min_value = tmp
min_patch = p
return min_patch, min_value
In order to get the patches I got this direct array access implementation:
def get_patch_term(x, y, patch_size, data):
"""
a patch has the size (patch_size)^^2
"""
patch = data[(x - (patch_size-1)/2): (x + (patch_size-1)/2 + 1),
(y - (patch_size-1)/2): (y + (patch_size-1)/2 + 1)]
return patch
I guess that this is the most crucial task and can be performed faster but I am not sure about it.
I had a look into Cython but maybe I did it wrong, I am not really familiar with it.
My first attempt was a direct translation into cython:
def get_patch_term_fast(Py_ssize_t x_i, Py_ssize_t y_i,
Py_ssize_t patch_size,
np.ndarray[DTYPE_t, ndim=2] big_array):
assert big_array.dtype == DTYPE
patch_size = (patch_size - 1)/2
cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
return patch
And this seems to be faster (see below) but I thought that a parallel approach should be better, so I came up with this
def get_patch_term_fast_parallel(Py_ssize_t x_i, Py_ssize_t y_i,
Py_ssize_t patch_size,
np.ndarray[DTYPE_t, ndim=2] big_array):
assert big_array.dtype == DTYPE
patch_size = (patch_size - 1)/2
assert big_array.dtype == DTYPE
cdef Py_ssize_t x
cdef Py_ssize_t y
cdef np.ndarray[DTYPE_t, ndim=1] patch = np.empty(np.power((2 * patch_size) + 1, 2))
with nogil, parallel():
for x in prange(x_i - patch_size, x_i + patch_size + 1):
for y in prange(y_i - patch_size, y_i + patch_size + 1):
patch[((x - (x_i - patch_size)) * (2 * patch_size + 1)) + (y - (y_i - patch_size))] = big_array[x, y]
#cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
return patch
Which is, unfortunately, not faster. For testing I used:
A = np.array(range(1200), dtype=np.float).reshape(30, 40)
b = np.array([41, 42, 81, 84]).reshape(2, 2)
x = 7
y = 7
print(timeit.timeit(lambda:get_patch_term_fast(x, y, 3, A), number=300))
print(timeit.timeit(lambda:get_patch_term_fast_parallel(x, y, 3, A).reshape(3,3), number=300))
print(timeit.timeit(lambda:get_patch_term(x, y, 3, A), number=300))
Which gives
0.0008792859734967351
0.0029909340664744377
0.0029337930027395487
So, my first question is, is it possible to do it faster? The second question would be, why is the parallel approach not faster than the original numpy implementation?
Edit:
I tried to further parallelize the get_best_fit function but unfortunately I get a lot of errors stating that I can not assign a Python object without gil.
Here is the code:
def get_best_fit_fast(np.ndarray[DTYPE_t, ndim=2] big_array,
np.ndarray[DTYPE_t, ndim=2] small_array):
# we assume the small array is square
cdef Py_ssize_t patch_size = small_array.shape[0]
cdef Py_ssize_t x
cdef Py_ssize_t y
cdef Py_ssize_t x_range = big_array.shape[0] - patch_size
cdef Py_ssize_t y_range = big_array.shape[1] - patch_size
cdef np.ndarray[DTYPE_t, ndim=2] p
cdef np.ndarray[DTYPE_t, ndim=2] weights = np.empty((x_range - patch_size)*(y_range - patch_size)).reshape((x_range - patch_size), (y_range - patch_size))
with nogil, parallel():
for x in prange(patch_size, x_range):
for y in prange(patch_size, y_range):
p = get_patch_term_fast(x, y, patch_size, big_array)
weights[x - patch_size, y - patch_size] = np.linalg.norm(np.abs(p - small_array))
return np.min(weights)
PS: I omitted the part of returning the smallest patch...
initialy posted another answer based on pattern matching (carried away by the title), post another answer
use one loop to loop over both arrays (big and small) and apply partial correlation metric (or whatever) without slicing lists all the time:
as a sidenote, observe the fact (depending on metric of course) that once one patch has been completed, the next patch (either one row down or one column right) shares much with the previous patch, only its initial and final rows (or columns) change thus the new correlation can be computed faster from the previous correlation by subtracting previous row and adding new row (or vice-versa). Since metric function is not given is added as observation.