Source code for commonpower.data_forecasting.nn_forecasting.nn_forecasting

"""
NNForecaster class.
"""
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING

import torch
from numpy import concatenate, ndarray
from torch.utils.data import DataLoader

from commonpower.data_forecasting.base import DataSource, Forecaster
from commonpower.data_forecasting.nn_forecasting.data_splitting import (
    DatasetSplit,
    DataSplitType,
    SimpleFractionalSplit,
)
from commonpower.data_forecasting.nn_forecasting.dataset_wrappers import DatasetWrapper, NStepAhead
from commonpower.data_forecasting.nn_forecasting.models import NNModule
from commonpower.data_forecasting.nn_forecasting.transform import IdentityTransform, Transformation

if TYPE_CHECKING:
    from commonpower.data_forecasting.nn_forecasting.config import ParameterSpace


[docs] class NNForecaster(Forecaster): def __init__( self, model_class: NNModule.__class__, targets: list[str], frequency: timedelta = timedelta(hours=1), horizon: timedelta = timedelta(hours=12), feature_transform: Transformation = IdentityTransform(), target_transform: Transformation = IdentityTransform(), ): """ Neural-Network-based Forecaster. All featues of the data source (including targets) will be used as model inputs. We make the assumption that all features besides the targets are static in the sense that they are available across the entire forecast horizon (e.g. time features). This is is necessary to apply the model iteratively. If this assumption cannot reasonably made in practice, the model output must cover the entire horizon in one step. When the forecaster is deployed, we assume that the targets are the first "columns" of the data source. Args: model_class (NNModule.__class__): Model class. targets (list[str]): Target variables. frequency (timedelta, optional): Frequency of the data. Defaults to timedelta(hours=1). horizon (timedelta, optional): Forecast horizon. Defaults to timedelta(hours=12). feature_transform (Transformation, optional): Feature transformation. Defaults to IdentityTransform(). target_transform (Transformation, optional): Target transformation. Defaults to IdentityTransform(). """ self.frequency = frequency self.horizon = horizon self.model_class = model_class self.targets = targets self.feature_transform = feature_transform self.target_transform = target_transform self.model: NNModule = None self.model_output_steps: int = None self.iteration_steps: int = None @property def look_back(self) -> timedelta: # Model input includes current time step return (self.model.input_shape[0] - 1) * self.frequency @property def input_range(self) -> tuple[timedelta]: """ Returns the min and max timedelta of observations which are required for the prediction. To indicate a timestamp before the current time, the timedelta must be negative. Returns: tuple[timedelta]: (td before, td after) """ return (-self.look_back, self.horizon)
[docs] def with_model(self, model: NNModule) -> NNForecaster: """ This can be called to pass an already trained model to the forecaster. We expect the transformations passed in the contructor to be already fitted. Args: model (NNModule): Forecast model. """ self.model = model self.model_output_steps = self.model.output_shape[0] self.iteration_steps = (self.horizon // self.frequency) // self.model_output_steps return self
[docs] def setup( self, data_source: DataSource, param_space: ParameterSpace, ) -> NNForecaster: """ Setup the forecaster for training. This is usually called from the NNTrainer. This means anything passed to the setup method can be tuned. Here, we check some model dimensions and fit the transformations. Args: data_source (DataSource): Data source. param_space (ParameterSpace): Parameter space for the forecaster. Returns: NNForecaster: The setup forecaster. """ self.param_space = param_space self.model: NNModule = self.model_class(**param_space.model) # default features are all variables (including targets) self.features = data_source.get_variables() assert (self.horizon // self.frequency) % self.model.output_shape[ 0 ] == 0, "Model output shape does not match the horizon." assert self.model.output_shape[1] == len( self.targets ), "Model output shape does not match the number of target dimensions." assert self.model.input_shape[1] == len( self.features ), "Model input shape does not match the number of features." assert ( self.model.input_shape[0] == (-self.input_range[0]) // self.frequency + 1 ), "Model input shape does not match the forecaster input range." # If the model output is lower than the number of prediction steps, we apply the model iteratively self.model_output_steps = self.model.output_shape[0] self.iteration_steps = (self.horizon // self.frequency) // self.model_output_steps # Fit the transformations complete_data = data_source(*data_source.get_date_range()) feature_data = complete_data[ :, [i for i, var in enumerate(data_source.get_variables()) if var in self.features] ] self.target_idxs = [i for i, var in enumerate(data_source.get_variables()) if var in self.targets] target_data = complete_data[:, self.target_idxs] self.feature_transform.fit(feature_data) self.target_transform.fit(target_data) return self
[docs] def __call__(self, data: ndarray) -> ndarray: """ Make a prediction. If the model prediction horizon (steps ahead) are less than the forecast horizon, we iteratively apply the model to make predictions covering the entire horizon. For this to work, the model feature and target variables must be identical. Args: data (ndarray): Input data. Expected shape: (N, n_features). Returns: ndarray: Forecasted values. Shape: (N, n_targets). """ assert self.model is not None, "Model is not set. Call setup() or with_model() first." # Apply transformations data = self.feature_transform(data) # Convert to torch tensor data: torch.Tensor = torch.tensor(data).float() # Inital data is [-look_back, 0] tmp_data = data[: self.model.input_shape[0], :] # Make prediction # The reshape is necessary because the model expects a batch dimension prediction: torch.Tensor = self.model(tmp_data.reshape(1, *tmp_data.shape)).reshape(self.model_output_steps, 1) # Apply inverse transformation out_prediction: ndarray = self.target_transform.inverse(prediction.detach().cpu().numpy()) # For each iteration step, we # apply input transformation to prediction # step data forward by one step # replace the targets with the prediction # make a new prediction # apply the inverse transformation tmp_prediction = out_prediction for t in range(1, self.iteration_steps): tmp_prediction: torch.Tensor = torch.tensor(self.target_transform(tmp_prediction)).float() tmp_data = data[t * self.model_output_steps : self.model.input_shape[0] + t * self.model_output_steps, :] # we assume target variables are the first columns tmp_data[-self.model_output_steps :, : tmp_prediction.shape[1]] = tmp_prediction tmp_prediction: torch.Tensor = self.model(tmp_data.reshape(1, *tmp_data.shape)).reshape( self.model_output_steps, 1 ) tmp_prediction: ndarray = self.target_transform.inverse(tmp_prediction.detach().cpu().numpy()) out_prediction = concatenate((out_prediction, tmp_prediction), axis=0) return out_prediction
[docs] def get_train_val_loaders( self, data_source: DataSource, param_space: ParameterSpace, dataset_wrapper_class: DatasetWrapper.__class__ = NStepAhead, dataset_split_class: DatasetSplit.__class__ = SimpleFractionalSplit, ) -> tuple[DataLoader, DataLoader]: """ Return training and validation data loaders. Args: data_source (DataSource): Data source. param_space (ParameterSpace): Parameter space for the forecaster. dataset_wrapper_class (DatasetWrapper.__class__, optional): Dataset wrapper class. Defaults to NStepAhead. dataset_split_class (DatasetSplit.__class__, optional): Dataset split class. Defaults to SimpleFractionalSplit. Returns: tuple[DataLoader, DataLoader]: Training and validation data loaders. """ train_dataset = dataset_wrapper_class( data_source, dataset_split_class( DataSplitType.TRAIN, data_source, self.model, **param_space.dataset_split, ), self.model, self.targets, self.features, self.feature_transform, self.target_transform, **param_space.dataset_wrapper, ) val_dataset = dataset_wrapper_class( data_source, dataset_split_class( DataSplitType.VAL, data_source, self.model, **param_space.dataset_split, ), self.model, self.targets, self.features, self.feature_transform, self.target_transform, **param_space.dataset_wrapper, ) train_loader = DataLoader(train_dataset, **param_space.data_loader) val_loader = DataLoader(val_dataset, **param_space.data_loader) return train_loader, val_loader