I am building a Kalman filter and smoother into an application and would like to use numba @njit with parallelization for high performance. Specifically, I have a panel of many time series (many "individuals") that can be filtered and smoothed independently, hence the desire for parallelization with prange. I am having an error when numba attempts to compile my code. Any help would be appreciated.
import numpy as np
from numba import prange, jit, njit
# Note on notation here: 't' means conditional on time t.
# 'tl1' means conditional on time t-1
# or time lagged one
@njit(parallel=False) # parallel=False for now
def kalman_filter(Y,Q,R,B,F,z0,sig110,sig120):
"""
Estimate Kalman filter and smoother given model parameter estimates. Uses numba and
parallelization for high performance.
Arguments
----------
Y: np.array
Array of measurements y_{i,t}. Dimension shall be TxN where
T is the number of time periods and N is the number of individuals.
Q: np.array
Array of volatilities of the state equation. Dimension shall
be TxN where T is the number of time periods and N is the
number of individuals.
R: np.array
Array of volatilities of the measurement equation. Dimension shall
be TxN where T is the number of time periods and N is the
number of individuals.
B: np.array
Array of drifts of the state equation. Dimension shall
be TxN where T is the number of time periods and N is the
number of individuals.
F: np.array
Array of persistence in the state equation. Dimension shall
be TxN where T is the number of time periods and N is the
number of individuals.
z0: np.array
Array of initial zeta by individual. Dimension shall be 2xN.
sig110: np.array
Array of initial MSE_{i,t}[0,0]. Dimensions TxN.
sig120: np.array
Array of initial MSE_{i,t}[0,1]. Dimensions TxN.
Returns
--------
zeta_t: np.array
Matrix of latent variable [z_{i,t}|t, z_{i,t-1}|t] estimated conditional on information at time t.
Dimension shall be TxNx2 where T is the number of time periods and N is the number of individuals.
zeta_tl1: np.array
Matrix of latent variable [z_{i,t}|t-1, z_{i,t-1}|t-1] estimated conditional on information at time t-1.
Dimension shall be TxNx2 where T is the number of time periods and N is the number of individuals.
mse_t: np.array
Matrix MSE_t conditional on time t. Dimension TxNx2x2
mse_tl1: np.array
Matrix MSE_t conditional on time t-1. Dimension TxNx2x2
zeta_smooth: np.array
Array [z_{i,t}|T, z_{i,t-1}|T]. Dimensions TxNx2.
mse_smooth: np.array
MSE_t conditional on time T for all i,t. Dimensions TxNx2x2.
"""
# infer dimensions
T,N = Y.shape
# iniitalize matrices
zeta_t = np.zeros((T,N,2))
zeta_tl1 = np.zeros((T,N,2))
mse_t = np.zeros((T,N,2,2))
mse_tl1 = np.zeros((T,N,2,2))
zeta_smooth = np.zeros((T,N,2))
mse_smooth = np.zeros((T,N,2,2))
# set H
H = np.array([[1.0,0.0]])
# loop through individuals
for i in prange(N):
#zeta_t[:,i,:], zeta_tl1[:,i,:], mse_t[:,i,:,:], mse_tl1[:,i,:,:], zeta_smooth[:,i,:], mse_smooth[:,i,:,:] = kalman_filter_individual(
# Y[:,i],Q[:,i],R[:,i],B[:,i],F[:,i],z0[:,i].T,sig110[i],sig120[i],smooth)
# initialize individual matrices
imse_t = np.zeros((T,2,2)) # MSE_{i,t}|t
imse_tl1 = np.zeros((T,2,2)) # MSE_{i,t}|t-1
izeta_t = np.zeros((T,2)) # zeta_{i,t}|t
izeta_tl1 = np.zeros((T,2)) # zeta_{i,t}|t-1
# initialize
imse_t[0,:,:] = np.array([[sig110[i],sig120[i]],[sig120[i],0]])
imse_tl1[0,:,:] = np.array([[sig110[i],sig120[i]],[sig120[i],0]])
izeta_t[0,0] = z0[0,i]
izeta_tl1[1,0] = z0[0,i]
# loop through time
for t in range(T):
Ft = np.array([[F[i,t],0],[1,0]])
gain = imse_tl1[t,:,:] @ H.T / (imse_tl1[t,0,0] + R[i,t])
izeta_t[t,:] = izeta_tl1[t,:] + gain.flatten() * (Y[i,t] - H @ izeta_tl1[t,:])
if t<T-1:
izeta_tl1[t+1,:] = ( Ft @ izeta_t[t,:] + np.array([[B[i,t],0]]) ).flatten()
imse_t[t,:,:] = imse_tl1[t,:,:] - gain @ H @ imse_tl1[t,:,:]
if t<T-1:
imse_tl1[t+1,:,:] = Ft @ imse_t[t,:,:] @ Ft.T + np.array([[Q[i,t],0],[0,0]])
# Kalman Smoother
# initialize indivdual output
imse_smooth = np.zeros((T,2,2))
izeta_smooth = np.zeros((T,2))
# calculate smoothed sequences
izeta_smooth[T-1,:] = izeta_t[T-1,:]
imse_smooth[T-1,:] = imse_t[T-1,:]
for tt in range(T-1):
t = (T-1) - tt # reverse range
Ft = np.array([[F[i,t],0],[1,0]])
J = imse_t[t,:] @ Ft.T @ np.linalg.inv(imse_tl1[t,:])
izeta_smooth[t,:] = izeta_t[t,:] + J @ (izeta_smooth[t+1,:] - izeta_tl1[t+1,:])
imse_smooth[t,:] = imse_t[t,:] + J @ (imse_smooth[t+1,:] - imse_tl1[t+1,:]) @ J.T
# put individual arrays into main arrays
zeta_t[:,i,:] = izeta_t
zeta_tl1[:,i,:] = izeta_tl1
mse_t[:,i,:,:] = imse_t
mse_tl1[:,i,:,:] = imse_tl1
zeta_smooth[:,i,:] = izeta_smooth
mse_smooth[:,i,:,:] = imse_smooth
return zeta_t, zeta_tl1, mse_t, mse_tl1, zeta_smooth, mse_smooth
Here is the error I get when numba attempts to compile:
C:\Users\damon\Documents\DamonDocuments\CareerEducation\lira\moat\generalized_kalman\pygenkalman\kalman_filter_numba.py:93: NumbaPerformanceWarning: '@' is faster on contiguous arrays, called on (array(float64, 2d, A), array(float64, 2d, F))
gain = imse_tl1[t,:,:] @ H.T / (imse_tl1[t,0,0] + R[i,t])
C:\Users\damon\Documents\DamonDocuments\CareerEducation\lira\moat\generalized_kalman\pygenkalman\kalman_filter_numba.py:97: NumbaPerformanceWarning: '@' is faster on contiguous arrays, called on (array(float64, 2d, C), array(float64, 2d, A))
imse_t[t,:,:] = imse_tl1[t,:,:] - gain @ H @ imse_tl1[t,:,:]
C:\Users\damon\Documents\DamonDocuments\CareerEducation\lira\moat\generalized_kalman\pygenkalman\kalman_filter_numba.py:99: NumbaPerformanceWarning: '@' is faster on contiguous arrays, called on (array(float64, 2d, C), array(float64, 2d, A))
imse_tl1[t+1,:,:] = Ft @ imse_t[t,:,:] @ Ft.T + np.array([[Q[i,t],0],[0,0]])
Traceback (most recent call last):
File "C:\Users\damon\anaconda3\lib\site-packages\spyder_kernels\py3compat.py", line 356, in compat_exec
exec(code, globals, locals)
File "c:\users\damon\documents\damondocuments\careereducation\lira\moat\generalized_kalman\pygenkalman\test.py", line 265, in <module>
test_results = run_tests(test_list,parallel=False,verbose=10)
File "c:\users\damon\documents\damondocuments\careereducation\lira\moat\generalized_kalman\pygenkalman\test.py", line 233, in run_tests
result = m.estimate_model(battery[test]['fcns'], battery[test]['X'], Y, battery[test]['init'],
File "C:\Users\damon\Documents\DamonDocuments\CareerEducation\lira\moat\generalized_kalman\pygenkalman\genkalman.py", line 90, in estimate_model
zeta_t, zeta_tl1, mse_t, mse_tl1, zeta_smooth, mse_smooth = kfn.kalman_filter(Y,Q,R,B,F,z0,sig110,sig120)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\dispatcher.py", line 487, in _compile_for_args
raise e
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\dispatcher.py", line 420, in _compile_for_args
return_val = self.compile(tuple(argtypes))
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\dispatcher.py", line 965, in compile
cres = self._compiler.compile(args, return_type)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\dispatcher.py", line 125, in compile
status, retval = self._compile_cached(args, return_type)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\dispatcher.py", line 139, in _compile_cached
retval = self._compile_core(args, return_type)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\dispatcher.py", line 152, in _compile_core
cres = compiler.compile_extra(self.targetdescr.typing_context,
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler.py", line 716, in compile_extra
return pipeline.compile_extra(func)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler.py", line 452, in compile_extra
return self._compile_bytecode()
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler.py", line 520, in _compile_bytecode
return self._compile_core()
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler.py", line 499, in _compile_core
raise e
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler.py", line 486, in _compile_core
pm.run(self.state)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler_machinery.py", line 368, in run
raise patched_exception
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler_machinery.py", line 356, in run
self._runPass(idx, pass_inst, state)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler_lock.py", line 35, in _acquire_compile_lock
return func(*args, **kwargs)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler_machinery.py", line 311, in _runPass
mutated |= check(pss.run_pass, internal_state)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\compiler_machinery.py", line 273, in check
mangled = func(compiler_state)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\typed_passes.py", line 394, in run_pass
lower.lower()
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\lowering.py", line 168, in lower
self.lower_normal_function(self.fndesc)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\lowering.py", line 222, in lower_normal_function
entry_block_tail = self.lower_function_body()
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\lowering.py", line 251, in lower_function_body
self.lower_block(block)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\lowering.py", line 265, in lower_block
self.lower_inst(inst)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\lowering.py", line 439, in lower_inst
val = self.lower_assign(ty, inst)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\lowering.py", line 626, in lower_assign
return self.lower_expr(ty, value)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\lowering.py", line 1322, in lower_expr
castvals = [self.context.cast(self.builder, val, fromty,
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\lowering.py", line 1322, in <listcomp>
castvals = [self.context.cast(self.builder, val, fromty,
File "C:\Users\damon\anaconda3\lib\site-packages\numba\core\base.py", line 702, in cast
return impl(self, builder, fromty, toty, val)
File "C:\Users\damon\anaconda3\lib\site-packages\numba\cpython\listobj.py", line 1137, in list_to_list
assert fromty.dtype == toty.dtype
AssertionError: Failed in nopython mode pipeline (step: native lowering)