Source code for commonpower.data_forecasting.data_sources

"""
Collection of data sources.
"""
from __future__ import annotations

from copy import copy
from datetime import datetime, timedelta
from typing import List, Optional

import numpy as np
import pandas as pd

from commonpower.data_forecasting.base import DataSource


[docs] class PandasDataSource(DataSource): def __init__(self, data: pd.DataFrame, frequency: timedelta = timedelta(hours=1)): """ Data source based on a pandas dataframe. Args: data (pd.DataFrame): Dataframe containing the data. The index needs to be a datetime index. frequency (timedelta, optional): Frequency of the data. Defaults to timedelta(hours=1). """ self.frequency = frequency self.data = data
[docs] def get_date_range(self) -> List[datetime]: return copy([self.data.index[0].to_pydatetime(), self.data.index[-1].to_pydatetime()])
[docs] def get_variables(self) -> List[str]: return self.data.columns.to_numpy()
[docs] def get_limits(self) -> dict[str, tuple[float, float]]: return {col: (self.data[col].min(), self.data[col].max()) for col in self.data.columns}
[docs] def apply_to_column(self, column: str, fcn: callable) -> PandasDataSource: """ Allows applying a transformation to a column of the data (using pandas df.apply()). Args: column (str): Column to manipulate. fcn (callable): Transformation to apply. The fcn needs to take one argument which refers to the value of a cell: fcn(x). Returns: DataSource: self """ self.data[column] = self.data[column].apply(fcn) return self
[docs] def shift_time_series(self, shift_by: timedelta) -> DataSource: """ Shifts time series by a given timedelta. The shift is done in a rolling fashing such that the start and end timestamps do not change. Can be used to simulate more diverse data. Args: shift_by (timedelta): Time delta to shift by. Posititve values shift into the "future", negative into the "past". Returns: DataSource: self """ shift_steps = int(shift_by / self.frequency) self.data = pd.DataFrame( np.roll(self.data.values, shift=shift_steps, axis=0), index=self.data.index, columns=self.data.columns ) return self
[docs] def create_time_features(self, month: bool = True, day: bool = True, hour: bool = True) -> PandasDataSource: """ Creates time features from the datetime index. The features are encoded cyclically via sin and cos transformations. The created features are (if enabled): month_sin, month_cos, day_sin, day_cos, hour_sin, hour_cos Args: month (bool, optional): If True, the month is added as a feature. Defaults to True. day (bool, optional): If True, the weekday is added as a feature. Defaults to True. hour (bool, optional): If True, the hour is added as a feature. Defaults to True. Returns: PandasDataSource: self """ def encode_cyclic(x: int, max_val: int) -> tuple[float, float]: """ Encodes a cyclic value (e.g., month, day, hour) into two values (sin, cos) to preserve cyclic information. """ return np.sin(2 * np.pi * x / max_val), np.cos(2 * np.pi * x / max_val) if month: self.data["month_sin"], self.data["month_cos"] = zip( *self.data.index.month.map(lambda x: encode_cyclic(x, 12)) ) if day: self.data["day_sin"], self.data["day_cos"] = zip( *self.data.index.weekday.map(lambda x: encode_cyclic(x, 7)) ) if hour: self.data["hour_sin"], self.data["hour_cos"] = zip( *self.data.index.hour.map(lambda x: encode_cyclic(x, 24)) ) return self
[docs] def __call__(self, from_time: datetime, to_time: datetime) -> np.ndarray: return self.data.loc[from_time:to_time].to_numpy()
def __len__(self) -> int: """ Returns the number of elements in the dataset. """ return len(self.data)
[docs] class CSVDataSource(PandasDataSource): def __init__( self, file_path: str, datetime_format: str = "%d.%m.%Y %H:%M", rename_dict: dict = {}, auto_drop: bool = False, resample: timedelta = timedelta(hours=1), aggregation_method: str = "mean", aggregation_alignment: str = "future", interpolation_method: str = "time", **csv_read_kwargs, ) -> CSVDataSource: """ DataSource based on .csv data. It imports from a .csv file, does some preprocessing and stores it in an internal pandas data frame. Args: file_path (str): Path to the source .csv file. datetime_format (_type_, optional): Datetime format the source .csv file. Specifically, this refers to the (required) column "t". Defaults to "%d.%m.%Y %H:%M". rename_dict (dict, optional): Dict to specify column renaming. Format: {"original name": "new name", ...}. Defaults to {}. auto_drop (bool, optional): If set to true, all columns of the source data except those mentioned in rename_dict will be dropped. Defaults to False. resample (timedelta, optional): Resamples the source data to this value. If the time interval of the source data is larger than the resample value, the data is interpolated linearly. Defaults to timedelta(hours=1). aggregation_method (str, optional): Method by which to aggregate multiple source datapoints for one sample. Parameter to pandas Resampler.aggregate. e.g. 'mean', 'last', 'first', 'max' Defaults to 'mean' aggregation_alignment (str, optional): How to align the resampled windows. For easier use, aliases 'future' and 'past' are provided Parameter to pandas resample(label=, closed=) 'left' ('future') or 'right' ('past') Defaults to 'future' interpolation_method (str, optional): Method by which to interpolate fewer source datapoints to more samples Parameter to pandas interpolate() Defaults to 'time' """ self.frequency = resample self.data = pd.read_csv(file_path, **csv_read_kwargs).rename(columns=rename_dict) self.datetime_format = datetime_format assert "t" in self.data.columns, ( "The data needs a time column called 't'. Alternatively, you can specify the name of the time column in" " the rename_dict" ) assert not self.data.isnull().any().any(), "There were NaN entries in the data" if auto_drop is True: # automatically drop all columns that were not mentioned in rename_dict self.data.drop(self.data.columns.difference([col for col in rename_dict.values()]), axis=1, inplace=True) # make time column the index self.data.t = self.data.t.apply(lambda x: datetime.strptime(x, datetime_format)) self.data.set_index("t", inplace=True, verify_integrity=True) # translate aliases for aggregation_alignment (left and right are hard to understand) aggregation_alignment = {'future': 'left', 'past': 'right'}.get(aggregation_alignment, aggregation_alignment) # resample data self.data = ( self.data.resample(resample, label=aggregation_alignment, closed=aggregation_alignment) .agg(aggregation_method) .interpolate(interpolation_method) )
[docs] class ArrayDataSource(DataSource): def __init__( self, values_dict: dict, date_range: List[datetime], frequency: timedelta = timedelta(hours=1), start_date: Optional[datetime] = None, ): """ DataSource which repeatedly returns array values, one for each timestep. Usage Example: ``` looping_data = ArrayDataSource({ "day_night": [0.] * 6 + [1.] * 12 + [0.] * 6, "weekends": [0.] * 24 * 5 + [1.] * 24 * 2, "prime_chaos": [0, 42, 73, 4, 5, 6, 7], }, date_range=[pd.to_datetime('2024-01-01'), pd.to_datetime('2024-12-31')] # 2024 starts with a Monday ) ``` Args: values_dict (dict): Dict containing element names and the respective arrays that should be returned. date_range (List[datetime]): Date range to simulate. frequency (timedelta, optional): Frequency of data to simulate. Defaults to timedelta(hours=1). start_date (datetime, optional): The datetime at which the arrays begin looping. Defaults to date_range[0] """ super().__init__(frequency) self.values_dict = values_dict self.date_range = date_range self.start_date = start_date or date_range[0]
[docs] def get_variables(self) -> List[str]: return list(self.values_dict.keys())
[docs] def get_limits(self) -> dict[str, tuple[float, float]]: return {key: (min(val), max(val)) for key, val in self.values_dict.items()}
[docs] def __call__(self, from_time: datetime, to_time: datetime) -> np.ndarray: n_start = int((from_time - self.start_date) / self.frequency) n_steps = int((to_time - from_time) / self.frequency) + 1 n_vars = len(self.values_dict) out = np.empty([n_steps, n_vars]) for i, values in enumerate(self.values_dict.values()): ln = len(values) repeated = np.tile(values, int(n_steps / ln) + 2) offset = n_start % ln out[:, i] = repeated[offset : n_steps + offset] return out
[docs] def get_date_range(self) -> List[datetime]: return self.date_range
def __len__(self) -> int: """ Returns the number of elements in the dataset. """ return int((self.date_range[1] - self.date_range[0]) / self.frequency)
[docs] class CalendarDataSource(ArrayDataSource): def __init__( self, date_range: List[datetime], frequency: timedelta = timedelta(hours=1), seasons_only: bool = False, ): """ DataSource which returns calendar information. Args: date_range (List[datetime]): Date range to simulate. frequency (timedelta, optional): Frequency of data to simulate. Defaults to timedelta(hours=1). seasons_only (bool, optional): If True, only the season information is returned, otherwise the exact month. Defaults to False. """ # Generate the time index for one year (e.g., 2023) with the specified frequency time_index = pd.date_range(start=date_range[0], end=date_range[1], freq=frequency) # Define the season mapping dictionary if seasons_only: season_mapping = { 12: 0, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1, 6: 2, 7: 2, 8: 2, 9: 3, 10: 3, 11: 3, } else: season_mapping = { 12: 12, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, 11: 11, } # Map the months to seasons and convert to numerical values (0: winter, 1: spring, 2: summer, 3: autumn) season_array = pd.Series(time_index.map(lambda x: season_mapping[x.month])).to_numpy() # for each value in the time index, convert to a boolean value indicating whether it is a weekend weekend_arr = time_index.map(lambda x: 1 if x.dayofweek >= 5 else 0).to_numpy() super().__init__( values_dict={ "is_weekend": weekend_arr, "season": season_array, }, date_range=date_range, frequency=frequency, )
[docs] class ConstantDataSource(DataSource): def __init__(self, values_dict: dict, date_range: List[datetime], frequency: timedelta = timedelta(hours=1)): """ Dummy DataSource which returns constant values. Args: values_dict (dict): Dict containing element names and the respective constant value that should be returned. date_range (List[datetime]): Date range to simulate. frequency (timedelta, optional): Frequency of data to simulate. Defaults to timedelta(hours=1). """ self.frequency = frequency self.values_dict = values_dict self.date_range = date_range
[docs] def get_variables(self) -> List[str]: return list(self.values_dict.keys())
[docs] def get_limits(self) -> dict[str, tuple[float, float]]: return {key: (val, val) for key, val in self.values_dict.items()}
[docs] def __call__(self, from_time: datetime, to_time: datetime) -> np.ndarray: n_steps = int((to_time - from_time) / self.frequency) + 1 return np.repeat(np.array(list(self.values_dict.values())).reshape((1, -1)), n_steps, axis=0)
[docs] def get_date_range(self) -> List[datetime]: return self.date_range