I have translated the algorithm of the Generalized Goertzel technique in Python from the Matlab code that can be found here.
I have trouble using it on real data and only on real data: generating a testing "synthetic" signal with 5 sin components the Goertzel returns correct frequencies, obviously with better accuracy than FFT; both the techniques are aligned.
However, if I provide market data on N samples the FFT gives the lowest frequency f = 1/N as expected; Goertzel returns all frequencies higher than 1. The time frame of the data is 1 hour, but the data is unlabeled with timestamps, it could be also seconds, so my expectation is that the two ways of calculating the frequency transform should return, apart from different accuracies, the same harmonics on the frequency domain.
Why am I getting the lowest frequency in one method but greater than 1 using another method with real data?
import numpy as np
def goertzel_general_shortened(x, indvec, maxes_tollerance = 100):
# Check input arguments
if len(indvec) < 1:
raise ValueError('Not enough input arguments')
if not isinstance(x, np.ndarray) or x.size == 0:
raise ValueError('X must be a nonempty numpy array')
if not isinstance(indvec, np.ndarray) or indvec.size == 0:
raise ValueError('INDVEC must be a nonempty numpy array')
if np.iscomplex(indvec).any():
raise ValueError('INDVEC must contain real numbers')
lx = len(x)
x = x.reshape(lx, 1) # forcing x to be a column vector
# Initialization
no_freq = len(indvec)
y = np.zeros((no_freq,), dtype=complex)
# Computation via second-order system
for cnt_freq in range(no_freq):
# Precompute constants
pik_term = 2 * np.pi * indvec[cnt_freq] / lx
cos_pik_term2 = 2 * np.cos(pik_term)
cc = np.exp(-1j * pik_term) # complex constant
# State variables
s0 = 0
s1 = 0
s2 = 0
# Main loop
for ind in range(lx - 1):
s0 = x[ind] + cos_pik_term2 * s1 - s2
s2 = s1
s1 = s0
# Final computations
s0 = x[lx - 1] + cos_pik_term2 * s1 - s2
y[cnt_freq] = s0 - s1 * cc
# Complex multiplication substituting the last iteration
# and correcting the phase for potentially non-integer valued
# frequencies at the same time
y[cnt_freq] = y[cnt_freq] * np.exp(-1j * pik_term * (lx - 1))
return y
Here are the charts of the FFT and Goertzel transform for the synthetic testing 5 components signal
here the Goertzel one
the original frequencies were
signal_frequencies = [30.5, 47.4, 80.8, 120.7, 133]
Instead, if I try to download market data
data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")
and try to transform the data['Close'] of SPY, this is what I get with the FFT transform with N = 800 samples
and the rebuilt signal on the first 2 components (not so good)
and this is what I get with the Goertzel transform
Note that the first peaks on FFT are below 0.005, for Goertzel above 1.
This is the way in which I tested the FFT on SPY data
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots
def analyze_and_plot(data, num_samples, start_date, num_harmonics):
num_harmonics = num_harmonics *1
# Seleziona i dati nell'intervallo specificato
original_data = data
data = data[data.index >= start_date]
# Estrai i campioni desiderati
data = data.head(num_samples)
# Calcola la FFT dei valori "Close"
fft_result = np.fft.fft(data["Close"].values)
frequency_range = np.fft.fftfreq(len(fft_result))
print("Frequencies: ")
print(frequency_range)
print("N Frequencies: ")
print(len(frequency_range))
print("First frequencies magnitude: ")
print(np.abs(fft_result[0:num_harmonics]))
# Trova le armoniche dominanti
# top_harmonics = np.argsort(np.abs(fft_result))[::-1][:num_harmonics]
top_harmonics = np.argsort(np.abs(fft_result[0:400]))[::-1][1:(num_harmonics + 1)] # skip first one
print("Top harmonics: ")
print(top_harmonics)
# top_harmonics = [1, 4]#, 8, 5, 9]
# Creazione del grafico per lo spettro
spectrum_trace = go.Scatter(x=frequency_range, y=np.abs(fft_result), mode='lines', name='FFT Spectrum')
fig_spectrum = go.Figure(spectrum_trace)
fig_spectrum.update_layout(title="FFT Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))
# Calcola la ricostruzione basata sulle prime N armoniche
reconstructed_signal = np.zeros(len(data))
time = np.linspace(0, num_samples, num_samples, endpoint=False)
# print('time')
# print(time)
for harmonic_index in top_harmonics[:num_harmonics]:
amplitude = np.abs(fft_result[harmonic_index]) #.real
phase = np.angle(fft_result[harmonic_index])
frequency = frequency_range[harmonic_index]
reconstructed_signal += amplitude * np.cos(2 * np.pi * frequency * time + phase)
# print('first reconstructed_signal len')
# print(len(reconstructed_signal))
zeros = np.zeros(len(original_data) - 2*len(data))
reconstructed_signal = np.concatenate((reconstructed_signal, reconstructed_signal), axis = 0)
# print('second reconstructed_signal len')
# print(len(reconstructed_signal))
reconstructed_signal = np.concatenate((reconstructed_signal, zeros), axis = 0)
original_data['reconstructed_signal'] = reconstructed_signal
# print('reconstructed_signal len')
# print(len(reconstructed_signal))
# print('original_data len')
# print(len(original_data))
# print('reconstructed_signal[300:320]')
# print(reconstructed_signal[290:320])
# print('original_data[300:320]')
# print(original_data[290:320][['Close', 'reconstructed_signal']])
# reconstructed_signal = np.fft.ifft(fft_result[top_harmonics[:num_harmonics]])
# print('reconstructed_signal')
# print(reconstructed_signal)
# Converte i valori complessi in valori reali per la ricostruzione
# reconstructed_signal_real = reconstructed_signal.real
# Creazione del secondo grafico con due subplot
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))
# Aggiungi il grafico di "Close" originale al primo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)
# Aggiungi il grafico della ricostruzione al secondo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)
# Aggiorna il layout del secondo grafico
fig.update_xaxes(title_text="Time", row=2, col=1)
fig.update_yaxes(title_text="Value", row=1, col=1)
fig.update_yaxes(title_text="Value", row=2, col=1)
# Aggiorna il layout generale
fig.update_layout(title="Close Analysis and Reconstruction")
# Visualizza il grafico dello spettro
fig_spectrum.show()
# fig.update_layout(xaxis = dict(type="category"))
# Aggiorna il layout dell'asse X per includere tutti i dati
# fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)
fig.update_xaxes(type="category", row=1, col=1)
fig.update_xaxes(type="category", row=2, col=1)
# Visualizza il secondo grafico con i subplot
fig.show()
# Esempio di utilizzo
data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")
analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)
as well as the test of SPY data on Goertzel
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots
def analyze_and_plot(data, num_samples, start_date, num_harmonics):
# Seleziona i dati nell'intervallo specificato
original_data = data
data = data[data.index >= start_date]
# Estrai i campioni desiderati
data = data.head(num_samples)
# Frequenze desiderate
frequency_range = np.arange(0, 20, 0.001)
# Calcola lo spettro delle frequenze utilizzando la funzione Goertzel
transform = goertzel_general_shortened(data['Close'].values, frequency_range)
harmonics_amplitudes = np.abs(transform)
frequency_range = frequency_range
# Creazione del grafico per lo spettro
spectrum_trace = go.Scatter(x=frequency_range, y=harmonics_amplitudes, mode='lines', name='FFT Spectrum')
fig_spectrum = go.Figure(spectrum_trace)
fig_spectrum.update_layout(title="Frequency Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))
# Visualizza il grafico dello spettro
fig_spectrum.show()
peaks_indexes = argrelmax(harmonics_amplitudes, order = 10)[0] # find indexes of peaks
peak_frequencies = frequency_range[peaks_indexes]
peak_amplitudes = harmonics_amplitudes[peaks_indexes]
print('peaks_indexes')
print(peaks_indexes[0:30])
print('peak_frequencies')
print(peak_frequencies[0:30])
print('peak_amplitudes')
print(peak_amplitudes[0:30])
lower_freq_sort_peak_indexes = np.sort(peaks_indexes)[0:num_harmonics] # lower indexes <--> lower frequencies
higher_amplitudes_sort_peak_indexes = peaks_indexes[np.argsort(harmonics_amplitudes[peaks_indexes])[::-1]][0:num_harmonics]
print('higher_amplitudes_sort_peak_indexes')
print(higher_amplitudes_sort_peak_indexes[0:10])
# used_indexes = lower_freq_sort_peak_indexes
used_indexes = higher_amplitudes_sort_peak_indexes
# Creazione del segnale ricostruito utilizzando i picchi
time = np.linspace(0, num_samples, num_samples, endpoint=False)
reconstructed_signal = np.zeros(len(time), dtype=float)
print('num_samples')
print(num_samples)
print('time[0:20]')
print(time[0:20])
print('reconstructed_signal')
print(reconstructed_signal[0:10])
for index in used_indexes:
phase = np.angle(transform[index])
amplitude = np.abs(transform[index])
frequency = frequency_range[index]
print('phase')
print(phase)
print('amplitude')
print(amplitude)
print('frequency')
print(frequency)
reconstructed_signal += amplitude * np.sin(2 * np.pi * frequency * time + phase)
# Estrai la parte reale del segnale ricostruito
reconstructed_signal_real = reconstructed_signal
print('reconstructed_signal[1]')
print(reconstructed_signal[1])
print('reconstructed_signal.shape')
print(reconstructed_signal.shape)
zeros = np.zeros(len(original_data) - 2*num_samples)
reconstructed_signal_real = np.concatenate((reconstructed_signal_real, reconstructed_signal_real), axis = 0)
print('reconstructed_signal_real.shape')
print(reconstructed_signal_real.shape)
reconstructed_signal_real = np.concatenate((reconstructed_signal_real, zeros), axis = 0)
print('reconstructed_signal_real.shape')
print(reconstructed_signal_real.shape)
original_data['reconstructed_signal'] = reconstructed_signal_real
# Creazione del secondo grafico con due subplot
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))
# Aggiungi il grafico di "Close" originale al primo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)
# Aggiungi il grafico della ricostruzione al secondo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)
# Aggiorna il layout del secondo grafico
fig.update_xaxes(title_text="Time", row=2, col=1)
fig.update_yaxes(title_text="Value", row=1, col=1)
fig.update_yaxes(title_text="Value", row=2, col=1)
# Aggiorna il layout generale
fig.update_layout(title="Close Analysis and Reconstruction")
# fig.update_layout(xaxis = dict(type="category"))
# Aggiorna il layout dell'asse X per includere tutti i dati
# fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)
fig.update_xaxes(type="category", row=1, col=1)
fig.update_xaxes(type="category", row=2, col=1)
# Visualizza il secondo grafico con i subplot
fig.show()
# Esempio di utilizzo
analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)
Edit 30/09/2023
I tried to normalize the SPY data as suggested in the answers, but the problem is still there, here is the resulting chart
I finally found the point. The Goertzel transform basically counts how many times each harmonic stays inside the sampling period returning, in addition, its amplitude and phase. To get the frequency it is necessary to divide by the number of samples, something that probably is implicitly done by the standard FFT libraries. The example with 5 "synthetic" sin waves turned out to be similar between FTT and Goertzel because was sampled in 1 second, so, for example, a 50Hz harmonic had exactly 50 sin waves in one full circle angle that madethe Goertzel transform resulting to be correct too.
The correction is in the following added second line of code