Indexing array by its value to its nearest non-integer step?

113 Views Asked by At

My apologies if I haven't phrased this properly. I have an array of ~10^7 monotonically increasing elements:

example_array = np.array([10, 10.5, 13, 15, 20, 35, 37.1, 40, 50])

I'd like to find the indexes such that indexing the original array yields closest to linear spacing between elements (without explicitly interpolating). In other words, I'd like the indexes which resample the example_array with closest to linear spacing between elements. So for example_array and a spacing of 10.5:

output_index = np.array([1, 4, 5, 7, 8])
print(example_array[output_index]) # [10.5, 20. , 35. , 40. , 50. ]

How do I achieve this quickly with numpy operations?

I would have thought that taking the modulo is the first step:

In [1]: import numpy as np

In [2]: example_array = np.array([10, 10.5, 13, 15, 20, 35, 37.1, 40, 50])

In [3]: spacing = 10.5

In [4]: example_array/spacing

Out[4]: array([0.95238095, 1.        , 1.23809524, 1.42857143, 1.9047619 ,
       3.33333333, 3.53333333, 3.80952381, 4.76190476])

In [5]: example_array%spacing

Out[5]: array([10. ,  0. ,  2.5,  4.5,  9.5,  3.5,  5.6,  8.5,  8. ])

But I can't seem to make the distinction here. effectively I want the indexes wherever Out[4] is closest to each integer, once; which is (1, 1.9047619, 3.33333333, 3.80952381, 4.76190476).


::EDIT:: As pointed out I had missed one, inserted that now. Also similar in some way to Finding closest values in two numpy arrays


Here's my testing, credit to AJ Biffl for the answer! If you're wondering why I didn't run nearest_integers2D with rand_array, it's because it leads to a killed process.

In [1]: import numpy as np

In [2]: def nearest_integers2D(arr, spacing):
   ...:     nmax = int(arr[-1]/spacing)
   ...:     range_arr = np.arange(1, nmax+1)
   ...:     difference = np.abs(arr[:, np.newaxis]/spacing - range_arr)
   ...:     return np.argmin(difference, axis=0)
   ...: 

In [3]: def nearest_integers_sort_fast(arr, spacing):
   ...:     nmin = int(arr[0]//spacing) + 1
   ...:     nmax = int(arr[-1]//spacing) + 1
   ...:     idx = np.zeros(nmax - nmin + 1, dtype = int)
   ...:     n = nmin
   ...:     dprev = spacing
   ...:     xprev = -nmin*spacing
   ...:     for i, x in enumerate(arr):
   ...:         d = abs(n*spacing - x)
   ...:         if d > dprev:
   ...:             idx[n-nmin] = i-1
   ...:             if n < len(arr):
   ...:                 n += 1
   ...:                 d = abs(n*spacing - x)
   ...:                 dprev += abs(n*spacing - xprev)
   ...:         else:
   ...:             dprev = d
   ...:         xprev = x
   ...:     idx[-1] = len(arr)-1
   ...:     return idx
   ...: 

In [4]: example_array = np.array([10, 10.5, 13, 15, 20, 35, 37.1, 40, 50])

In [5]: rand_array = np.cumsum(np.random.random(1_000_000))

In [6]: %timeit nearest_integers2D(example_array, 10.5)
9.21 µs ± 9.22 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

In [7]: %timeit nearest_integers_sort_fast(example_array, 10.5)
7.88 µs ± 63.4 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

In [8]: %timeit nearest_integers_sort_fast(rand_array, 10.5)
373 ms ± 3.22 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
2

There are 2 best solutions below

2
On BEST ANSWER

I'm not sure a vectorized solution will be preferable here. You're trying to minimize the distance between elements in your array and integer multiples of the spacing parameter, but every integer needs exactly one corresponding solution, so you can't set a global threshold to find "close" entries and the only way to minimize against multiple points at once is to create a 2d array of size (N,n), where N is the number of entries in your original array and n is the number of points you're trying to minimize against, and then minimize each row of the 2d array. This will quickly blow up your memory and is not recommended. That method looks something like this:

def nearest_integers_vec(arr, spacing):
    nmin = int(arr[0]//spacing)
    nmax = int(arr[-1]//spacing)+1
    arr2 = np.tile(arr, (nmax-nmin, 1))
    arr3 = abs(arr2 - np.array([np.arange(nmin+1, nmax+1)*spacing]).T)
    return np.argmin(arr3, axis = 1)

In [1]: idx = nearest_integers_vec(example_array, spacing)
In [2]: example_array[idx]
Out[2]: array([10.5, 20. , 35. , 40. , 50. ])

The preferred method would be to forego vectorization entirely and calculate the indices by hand and add them to a list. There are many ways to do this.

Naive Loops

Look through the entire array every time for the closest element

Related question: Find nearest value in numpy array

Option 1: map()

def nearest_integers_map(arr, spacing):
    nmin = int(arr[0]//spacing) + 1
    nmax = int(arr[-1]//spacing) + 1
    return list(map(lambda i: np.argmin(abs(arr - i*spacing)), range(nmin, nmax+1)))

Option 2: List Comprehension

def nearest_integers_lc(arr, spacing):
    nmin = int(arr[0]//spacing) + 1
    nmax = int(arr[-1]//spacing) + 1
    return [np.argmin(abs(arr - i*spacing)) for i in range(nmin, nmax+1)]

These are the simplest to implement, and both of these methods take a similar amount of time to complete, but that time is still probably far too long for an array of your desired size. So we move on.

Sorted Loop

Use the face that the data is pre-sorted to check only a few elements at a time for the next minimum

There's a lot of ways to do this, but the simplest conceptually is

def nearest_integers_sort(arr, spacing):
    nmin = int(arr[0]//spacing) + 1
    nmax = int(arr[-1]//spacing) + 1
    idx = np.zeros(nmax - nmin + 1, dtype = int)
    n = nmin
    N = len(arr)
    dprev = spacing
    i = 0
    while(i < N):
        d = abs(arr[i] - n*spacing)
        if d > dprev:
            idx[n-nmin] = i-1
            n += 1
            dprev = abs(arr[i-1] - n*spacing)
        else:
            dprev = d
            i += 1
    idx[-1] = N-1
    return idx

This function keeps track of how far each element is from the next integer multiple of spacing, and when it finds a local minimum it writes it to the array and moves on.

You can also use enumerate() instead to let C do the array indexing/loop iteration in return for a bit more speed, but it requires a few more variables to keep track of all the moving pieces:

def nearest_integers_sort_fast(arr, spacing):
    nmin = int(arr[0]//spacing) + 1
    nmax = int(arr[-1]//spacing) + 1
    idx = np.zeros(nmax - nmin + 1, dtype = int)
    n = nmin
    dprev = spacing
    xprev = -nmin*spacing
    for i, x in enumerate(arr):
        d = abs(n*spacing - x)
        if d > dprev:
            idx[n-nmin] = i-1
            if n < len(arr):
                n += 1
                d = abs(n*spacing - x)
                dprev += abs(n*spacing - xprev)
        else:
            dprev = d
        xprev = x
    idx[-1] = len(arr)-1
    return idx

There are certainly further optimizations you could make, but either of these last two will take a reasonable amount of time to complete.

0
On

This problem doesn't seem so straightforward.

I had a try using graph theory. We can first compute all differences, subtract half the expected step and compute the modulo step. This will give you a list of all potential jumps from one value to another in multiples of your step.

Then you can build a graph from that and try to find a path:

example_array = np.array([10, 10.5, 13, 15, 20, 35, 40, 50])

# define expected step
N = 10.5

# compute pairwise differences
d = abs(example_array[:, None] - example_array)

# subtracting mid-step and modulo step
# this enables to have intervals close to a multiple of the step
# as a value close to N/2
d2 = (d-N/2)%N

# identify intervals close to a multiple of the step
# but greater than 0
# using a relative tolerance of 15%
# and absolute tolerance of 0.5 for values close to each other
# tweak this as desired
m = np.where(np.isclose(d, 0, atol=0.5),
             False, np.isclose(d2, N//2, rtol=0.15)
            )

# building the graph
import networkx as nx
G = nx.from_numpy_array(np.triu(m), create_using=nx.DiGraph)
G.remove_edges_from(nx.selfloop_edges(G))

# getting longest path
nx.dag_longest_path(G)

Output: [0, 4, 6, 7]

The graph:

enter image description here