Forecasting with WindowSummarizer and exogenous features

386 Views Asked by At

The WindowSummarizer allows to capture time series characteristics within a specified rolling window. I tried to modify an example I found in the documentation. It seems that this functionality does not work with models that would actually use the exogenous features.

Here's a minimal working example based on the documentation:

from sktime.forecasting.base import ForecastingHorizon
from sktime.transformations.series.impute import Imputer
from sktime.datasets import load_airline, load_longley
from sktime.forecasting.arima import AutoARIMA
from sktime.forecasting.naive import NaiveForecaster
from sktime.forecasting.model_selection import temporal_train_test_split
from sktime.forecasting.compose import ForecastingPipeline
from sktime.transformations.series.window_summarizer import WindowSummarizer
y, X = load_longley()
y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)

kwargs = {
    "lag_config": {
        "mean": ["mean", [[3, 0], [4, 0]]],
    }
}
Z_train = pd.concat([X_train, y_train], axis=1)
Z_test = pd.concat([X_test, y_test], axis=1)
pipe = ForecastingPipeline(
    steps=[
        ("ws", WindowSummarizer(**kwargs, n_jobs=1, target_cols=["GNP"])),
        ("imputer",Imputer('mean')),
        ("forecaster", NaiveForecaster(strategy="drift")),
    ]
)
pipe_return = pipe.fit(y_train, Z_train)
y_pred = pipe_return.predict(fh=fh, X=Z_test) # this works

If we change the forecaster into something that uses the engineered features, things do not go so well anymore:

pipe = ForecastingPipeline(
    steps=[
        ("ws", WindowSummarizer(**kwargs, n_jobs=1, target_cols=["GNP"])),
        ("imputer",Imputer('mean')),
        ("forecaster", AutoARIMA()),
    ]
)
pipe.fit(y_train, X=Z_train)
pipe.predict(fh=fh,X = Z_test) # this throws an error

My suspicion was that this is connected with no continuation between Z_train and Z_test. The second thing is the Imputer. I think it does not work the way it should - after fitting it should hold the values for filling empty fields.

ws = pipe.steps_[0][1]
imp = pipe.steps_[1][1]
imp._transform(ws._transform(Z_test)) 

gives

    GNP_mean_3_0    GNP_mean_4_0    GNPDEFL     UNEMP   ARMED   POP     TOTEMP
1959    501159.333333   NaN     112.6   3813.0  2552.0  123366.0    68655.0
1960    501159.333333   NaN     114.2   3931.0  2514.0  125368.0    69564.0
1961    501159.333333   NaN     115.7   4806.0  2572.0  127852.0    69331.0
1962    501159.333333   NaN     116.9   4007.0  2827.0  130081.0    70551.0
1

There are 1 best solutions below

0
On

NEW

Library versions .10 and newer have modified the behavior of WindowSummarizer. It should work without issues.

OLD

I think I have a work around. This is not the most elegant solution but it gets the job done. I've modified the WindowSummarizer in such a way, that it saves a minimal X window required to calculate all aggregations OR saves all seen records of X (default option).

Whenever .transform is applied, the summarizer tries to update the window and recalculates (correctly!) the aggregations. For simplicity, I'm focusing here only on the summarizer and a simpler dataset.

def update_X(self,X):
    if self.target_cols==None:
        cols = X.columns
    else:
        cols = self.target_cols
    X_window = self.X_window
    X_window = pd.concat([X_window,X[cols]],axis=0)
    X_window = X_window.groupby(X_window.index).first()
    # would remember only last #min_window rows
    # self.X_window = X_window.iloc[-self.min_window:]
    # would remember all rows
    self.X_window = X_window

def window_size(windows):
    try:
        is_list_of_lists = all(isinstance(i, list) for i in windows)
        if is_list_of_lists:
            size = max(map(sum,windows))
        else:
            size = sum(windows)
        return size
    
    except:
        print('error')
        
class WS(WindowSummarizer):
    def __init__(
        self,
        lag_config,
        n_jobs=-1,
        target_cols=None,
        truncate=None,
        ):

        self.lag_config = lag_config
        self.n_jobs = n_jobs
        self.target_cols = target_cols
        self.truncate = truncate
        self._converter_store_X = dict()
        
        # calculates the minimal window required to calculate the window summaries in lag_config
        self.min_window = max([window_size(x[1]) for key,x in lag_config.items()])
        # empty data frame for data window
        self.X_window = pd.DataFrame()
        
        super(WindowSummarizer).__init__()
        
    def _fit(self, X, y=None):
        update_X(self,X)
        super()._fit(X, y)
        
    def _transform(self, X, y=None):
        X_window = pd.concat([self.X_window,X],axis=0)
        X_window = X_window.groupby(X_window.index).first()
        X_transformed = super()._transform(X_window, y)
        update_X(self,X)
        return X_transformed.loc[X.index]

        

Here's a small test:

y = load_airline()
y_train, y_test = temporal_train_test_split(y.iloc[:10])
fh = ForecastingHorizon(y_test.index, is_relative=False)

kwargs = {
    "lag_config": {
        "mean": ["mean", [[3, 1], [4, 1]]],
    }
}

ws = WS(**kwargs, n_jobs=1)
ws.fit(pd.DataFrame(y_train),y_train)
ws.transform(pd.DataFrame(y_test))

    Number of airline passengers_mean_3_1   Number of airline passengers_mean_4_1
1949-08     128.333333  129.25
1949-09     134.666667  133.25
1949-10     143.666667  138.00