diff --git a/build/reports/coverage.svg b/build/reports/coverage.svg
index 1c7007c..b3e8ba0 100644
--- a/build/reports/coverage.svg
+++ b/build/reports/coverage.svg
@@ -15,7 +15,7 @@
coverage
coverage
- 89%
- 89%
+ 88%
+ 88%
diff --git a/executables/bestest_hydronic_heat_pump/different_sampling_methods.py b/executables/bestest_hydronic_heat_pump/different_sampling_methods.py
new file mode 100644
index 0000000..77816d2
--- /dev/null
+++ b/executables/bestest_hydronic_heat_pump/different_sampling_methods.py
@@ -0,0 +1,89 @@
+from physXAI.models.ann.ann_design import ClassicalANNModel
+from physXAI.preprocessing.preprocessing import PreprocessingSingleStep
+from physXAI.preprocessing.constructed import Feature, FeatureExp
+from physXAI.utils.logging import Logger
+
+"""
+This script demonstrates the usage of different sampling methods. It is not physically meaningful.
+
+When creating a Feature, a sampling method can be specified.
+For constructed features, no sampling method is necessary. It is assigned based on their corresponding base feature(s).
+
+sampling_method (Union[str, int]): Time step of the input data used to predict the output.
+ - if None: Feature.get_default_sampling_method() is used
+ - if 'current' or 0: Current time step will be used for prediction.
+ - if 'previous' or 1: Previous time step will be used for prediction.
+ - if 'mean_over_interval': Mean between current and previous time step will be used.
+
+ Specify default sampling method using Feature.set_default_sampling_method().
+ If no default sampling method is specified by the user, 'previous' is used as default.
+
+Constructed features must be built from features with sampling methods that apply the same time shift. Therefore,
+constructed features can either base on features which have solely the sampling method 'current' (no time shift applied)
+or on features which have one of the sampling methods ['previous','mean_over_interval'] (time shift of one unit applied).
+"""
+
+Feature.set_default_sampling_method(0)
+
+# Setup up logger for saving
+Logger.setup_logger(folder_name='different_sampling_methods_ann', override=True)
+
+# File path to data
+file_path = r"data/bestest_hydronic_heat_pump/pid_data.csv"
+
+# List of input features. Can include names of constructed features and lagged inputs
+inputs = ['reaTZon_y', 'reaTZon_y_lag1', 'reaTZon_y_lag2', 'weaSta_reaWeaTDryBul_y', 'weaSta_reaWeaTDryBul_y_lag1',
+ Feature('weaSta_reaWeaHDirNor_y', sampling_method='mean_over_interval'), 'oveHeaPumY_u',
+ 'oveHeaPumY_u_lag1', 'oveHeaPumY_u_lag2']
+# Output feature. Can include names of constructed features as well
+output = ['Change(T_air)']
+
+"""
+The constructed features are automatically added to the data via 'physXAI.preprocessing.constructed.py'
+Lagged inputs can be added directly based on the feature
+"""
+# create lags of reaTZon_y: reaTZon_y_lag1, reaTZon_y_lag2
+x1 = Feature('reaTZon_y', sampling_method='previous')
+lx1 = x1.lag(2) # for all lags of reaTZon_y, the shift will be set automatically as 'previous'
+
+# create lag of weaSta_reaWeaTDryBul_y: weaSta_reaWeaTDryBul_y_lag1
+x2 = Feature('weaSta_reaWeaTDryBul_y')
+lx2 = x2.lag(1)
+
+# create lag of oveHeaPumY_u: oveHeaPumY_u_lag1, oveHeaPumY_u_lag2
+x3 = Feature('oveHeaPumY_u', sampling_method='mean_over_interval')
+x3.lag(2)
+
+# dummy Features
+y = x1 + lx1[0]
+z = y + x1
+z.rename('example_feature_two') # since z is a constructed feature based on x1, its sampling_method will be previous
+e = FeatureExp(x1-273.15, 'exp') # reduce x1 by 273.15, otherwise values are too high
+
+# x1 and x3 have sampling methods 'previous' and 'mean_over_interval'.
+# Since both of them apply a time shift of one, they can be combined in constructed features
+a = x1 + x3
+
+inputs.extend([z, e, a]) # add dummy features to inputs
+
+# construct output
+change_tair = x1 - lx1[0]
+change_tair.rename('Change(T_air)')
+
+# Create Training data
+# Time step defines target sampling: if original sampling of data is in 15min intervals, it is resampled to 1h intervals
+# for time_step=4. Hence, if the shift method of an input is defined as 'mean_over_interval', the mean over the last
+# hour is taken as input
+prep = PreprocessingSingleStep(inputs, output, time_step=4)
+
+# Process Training data
+td = prep.pipeline(file_path)
+
+# Build & train Classical ANN
+m = ClassicalANNModel(epochs=50)
+model = m.pipeline(td)
+
+# Log setup of preprocessing and model as json
+Logger.log_setup(prep, m)
+# Log training data as pickle
+Logger.save_training_data(td)
diff --git a/physXAI/preprocessing/constructed.py b/physXAI/preprocessing/constructed.py
index cd7e429..945eea9 100644
--- a/physXAI/preprocessing/constructed.py
+++ b/physXAI/preprocessing/constructed.py
@@ -1,7 +1,9 @@
from abc import ABC, abstractmethod
-from typing import Type, Union
+from typing import Optional, Type, Union
import numpy as np
from pandas import DataFrame, Series
+import warnings
+from physXAI.preprocessing.sampling import return_valid_sampling_method
class FeatureBase(ABC):
@@ -11,20 +13,45 @@ class FeatureBase(ABC):
in a Pandas DataFrame. It supports arithmetic operations to combine features.
"""
- def __init__(self, name: str, **kwargs):
+ def __init__(self, name: str, sampling_method: Optional[Union[str, int]] = None, **kwargs):
"""
Initializes a FeatureBase instance.
Args:
name (str): The name of the feature. This will be the column name in the DataFrame.
+ sampling_method (Optional[Union[str, int]]): Time step of the input data used to predict the output.
+ - if None: Feature._default_sampling_method is used
+ - if 'current' or 0: Current time step will be used for prediction.
+ - if 'previous' or 1: Previous time step will be used for prediction.
+ - if 'mean_over_interval': Mean between current and previous time step will be used.
**kwargs: Catches any additional keyword arguments.
"""
self.feature: str = name
+ self._sampling_method = None
+ self.set_sampling_method(sampling_method)
# Automatically registers the newly created feature instance with the FeatureConstruction manager
FeatureConstruction.append(self)
+ def get_sampling_method(self) -> str:
+ """returns the Features sampling method"""
+ return self._sampling_method
+
+ def set_sampling_method(self, val: Union[str, int] = None):
+ """
+ Sets the feature's sampling method. If None is given, Feature._default_sampling_method is used
+ Available methods:
+ - 'current' or 0: Current time step will be used for prediction.
+ - 'previous' or 1: Previous time step will be used for prediction.
+ - 'mean_over_interval': Mean between current and previous time step will be used.
+ """
+
+ if val is None:
+ self._sampling_method = Feature.get_default_sampling_method()
+ else:
+ self._sampling_method = return_valid_sampling_method(val)
+
def rename(self, name: str):
"""
Renames the feature.
@@ -103,7 +130,8 @@ def lag(self, lag: int, previous: bool = True):
FeatureLag object for the specified lag_value.
Returns:
- FeatureLag or List[FeatureLag]: A single lagged feature or a list of lagged features.
+ FeatureLag or List[FeatureLag]: A single lagged feature or a list of lagged features, each with the same
+ sampling method as their corresponding base feature.
"""
if previous and lag > 1:
@@ -115,8 +143,11 @@ def lag(self, lag: int, previous: bool = True):
return FeatureLag(self, lag)
def get_config(self) -> dict:
- return {'class_name': self.__class__.__name__,
- 'name': self.feature}
+ return {
+ 'class_name': self.__class__.__name__,
+ 'name': self.feature,
+ 'sampling_method': self.get_sampling_method(),
+ }
@classmethod
def from_config(cls, config: dict) -> 'FeatureBase':
@@ -156,6 +187,8 @@ def feature_from_config(item_conf: dict) -> 'FeatureBase':
"""
class_name = item_conf['class_name']
feature_class = CONSTRUCTED_CLASS_REGISTRY[class_name]
+ if 'sampling_method' in item_conf.keys() and item_conf['sampling_method'] == '_':
+ item_conf['ignore_sampling_for_output'] = True
f1f = feature_class.from_config(item_conf)
return f1f
@@ -166,7 +199,82 @@ class Feature(FeatureBase):
Represents a basic feature that is assumed to exist directly in the input DataFrame.
Its `process` method simply retrieves the column by its name.
"""
- pass
+
+ _default_sampling_method = 'previous'
+
+ @classmethod
+ def get_default_sampling_method(cls):
+ return Feature._default_sampling_method
+
+ @classmethod
+ def set_default_sampling_method(cls, val: Union[str, int]):
+ """
+ Sets the default sampling method for all features that do not have a custom sampling method. Available methods:
+ - 'current' or 0: Current time step will be used for prediction.
+ - 'previous' or 1: Previous time step will be used for prediction.
+ - 'mean_over_interval': Mean between current and previous time step will be used.
+ """
+ Feature._default_sampling_method = return_valid_sampling_method(val)
+
+
+def get_sampling_from_base_feature(base_features: Union[FeatureBase, list[FeatureBase]], **kwargs) -> [str, list]:
+ """
+ Returns the appropriate sampling_method for a constructed feature based on its base feature(s). A constructed
+ feature must be built from features with sampling methods that apply the same time shift. Therefore, constructed
+ features can either base on features which have solely the sampling method 'current' (no time shift applied) or on
+ features which have one of the sampling methods ['previous','mean_over_interval'] (time shift of one unit applied).
+
+ Args:
+ base_features (Union[FeatureBase, list[FeatureBase]]): single base feature or list of max. two base features
+ **kwargs: additional keyword arguments. If sampling_method is given in kwargs as well, its validity is checked
+
+ Returns:
+ sampling_method (str): sampling method
+ kwargs: kwargs which does not contain the key 'sampling_method' (anymore)
+ """
+
+ if not isinstance(base_features, list):
+ base_features = [base_features]
+
+ assert len(base_features) <= 2, f'Expected a maximum of two features, got {len(base_features)} instead'
+
+ sampling = []
+ for f in base_features:
+ if isinstance(f, FeatureBase):
+ sampling.append(f.get_sampling_method())
+ elif isinstance(f, (int, float)): # FeatureTwo can be built with int or float values
+ continue
+ else:
+ raise ValueError(f"Expected type [FeatureBase, int, float], got type {type(f)} instead")
+
+ sampling = list(set(sampling))
+
+ if len(sampling) == 1:
+ sampling_method = sampling[0]
+ else:
+ if 'current' in sampling: # 'current' together with other sampling methods
+ raise ValueError(f"Sampling method(s) of base feature(s) are not equal 'current', got sampling method(s): {sampling}")
+ else: # 'previous' together with 'mean_over_interval'
+ sampling_method = 'mean_over_interval'
+
+ if 'sampling_method' in kwargs.keys():
+ if 'ignore_sampling_for_output' in kwargs.keys() and kwargs['ignore_sampling_for_output']:
+ # necessary for feature construction from config
+ sampling_method = '_'
+ else:
+ message = (f"Constructed features must be built from features with sampling methods that apply the same "
+ f"time shift. Therefore, constructed features can either base on features which have solely the "
+ f"sampling method 'current' (no time shift applied) or on features which have one of the sampling"
+ f" methods ['previous','mean_over_interval'] (time shift of one unit applied).\n"
+ f"Sampling method of base feature(s) is '{sampling_method}' but in kwargs "
+ f"'{return_valid_sampling_method(kwargs['sampling_method'])}' was given as sampling method.")
+ if sampling_method == 'current':
+ assert return_valid_sampling_method(kwargs['sampling_method']) == sampling_method, message
+ else:
+ assert return_valid_sampling_method(kwargs['sampling_method']) in ['previous', 'mean_over_interval'], message
+ kwargs.__delitem__('sampling_method') # constructor must not get more than one arg with the same key
+
+ return sampling_method, kwargs
@register_feature
@@ -189,11 +297,17 @@ def __init__(self, f: Union[FeatureBase, str], lag: int, name: str = None, **kwa
"""
if isinstance(f, FeatureBase):
self.origf: str = f.feature
+ if name is None:
+ name = f.feature + f'_lag{lag}'
else:
self.origf: str = f
- if name is None:
- name = f.feature + f'_lag{lag}'
- super().__init__(name)
+ if name is None:
+ name = f + f'_lag{lag}'
+
+ # lags must have the same sampling_method as their base feature
+ sampling_method, kwargs = get_sampling_from_base_feature(FeatureConstruction.get_feature(self.origf), **kwargs)
+
+ super().__init__(name, sampling_method=sampling_method, **kwargs)
self.lag: int = lag
def process(self, df: DataFrame) -> Series:
@@ -213,8 +327,8 @@ class FeatureTwo(FeatureBase, ABC):
Examples: FeatureAdd (f1 + f2), FeatureSub (f1 - f2).
"""
- def __init__(self, feature1: Union[FeatureBase, int, float], feature2: Union[FeatureBase, int, float], name: str = None,
- **kwargs):
+ def __init__(self, feature1: Union[FeatureBase, int, float], feature2: Union[FeatureBase, int, float],
+ name: str = None, **kwargs):
"""
Initializes a FeatureTwo instance.
@@ -236,7 +350,10 @@ def __init__(self, feature1: Union[FeatureBase, int, float], feature2: Union[Fea
f2n = str(feature2)
if name is None:
name = self.name(f1n, f2n)
- super().__init__(name)
+
+ # constructed features must have the same sampling_method as their base features
+ sampling_method, kwargs = get_sampling_from_base_feature([feature1, feature2], **kwargs)
+ super().__init__(name, sampling_method=sampling_method, **kwargs)
self.feature1 = feature1
self.feature2 = feature2
@@ -414,7 +531,9 @@ def __init__(self, f1: FeatureBase, name: str = None, **kwargs):
self.f1: FeatureBase = f1
if name is None:
name = 'exp(' + f1.feature + ')'
- super().__init__(name)
+ # constructed features must have the same sampling_method as their base features
+ sampling_method, kwargs = get_sampling_from_base_feature(f1, **kwargs)
+ super().__init__(name, sampling_method=sampling_method, **kwargs)
def process(self, df: DataFrame) -> Series:
if self.feature not in df.columns:
@@ -444,7 +563,9 @@ def __init__(self, f1: FeatureBase, name: str = None, **kwargs):
self.f1: FeatureBase = f1
if name is None:
name = 'sin(' + f1.feature + ')'
- super().__init__(name)
+ # constructed features must have the same sampling_method as their base features
+ sampling_method, kwargs = get_sampling_from_base_feature(f1, **kwargs)
+ super().__init__(name, sampling_method=sampling_method, **kwargs)
def process(self, df: DataFrame) -> Series:
if self.feature not in df.columns:
@@ -474,7 +595,9 @@ def __init__(self, f1: FeatureBase, name: str = None, **kwargs):
self.f1: FeatureBase = f1
if name is None:
name = 'cos(' + f1.feature + ')'
- super().__init__(name)
+ # constructed features must have the same sampling_method as their base features
+ sampling_method, kwargs = get_sampling_from_base_feature(f1, **kwargs)
+ super().__init__(name, sampling_method=sampling_method, **kwargs)
def process(self, df: DataFrame) -> Series:
if self.feature not in df.columns:
@@ -504,7 +627,10 @@ class FeatureConstant(FeatureBase):
def __init__(self, c: float, name: str, **kwargs):
self.c = c
- super().__init__(name)
+ if 'sampling_method' in kwargs.keys():
+ warnings.warn(f"Using 'sampling_method' for {self.__class__.__name__} does not have any effect.",
+ UserWarning)
+ super().__init__(name, **kwargs)
def process(self, df: DataFrame) -> Series:
if self.feature not in df.columns:
@@ -528,8 +654,9 @@ class FeatureConstruction:
@staticmethod
def reset():
- """Clears all registered features and input names."""
+ """Clears all registered features and input names. Furthermore, resets the default sampling method"""
FeatureConstruction.features = list[FeatureBase]()
+ Feature.set_default_sampling_method('previous')
@staticmethod
def append(f: FeatureBase):
@@ -560,7 +687,106 @@ def get_feature(name: str) -> Union[FeatureBase, None]:
return None
@staticmethod
- def process(df: DataFrame):
+ def get_features_including_lagged_features(l: list[str] = None) -> list[str]:
+ """
+ returns a list of the names of all FeatureLag and FeatureTwo where at least one feature is a FeatureLag
+ - within the given list or
+ - of all constructed features if list is None
+
+ Args:
+ l (list[str]): list of feature names to search in
+
+ Returns:
+ list[str]: the list of lag-based features
+ """
+
+ # if no list is given, search in all features
+ if not l:
+ l = FeatureConstruction.features
+
+ def recursive_search(feature):
+ """Recursively checks for lagged features"""
+ if isinstance(feature, FeatureLag):
+ return True
+
+ elif isinstance(feature, FeatureTwo):
+ # Check both sub-features recursively
+ return recursive_search(feature.feature1) or recursive_search(feature.feature2)
+
+ return False
+
+ res = list()
+ for f in FeatureConstruction.features:
+ if isinstance(f, FeatureLag) and (f.feature in l):
+ res.append(f.feature) # name of the feature
+
+ elif isinstance(f, FeatureTwo) and (f.feature in l):
+ # Use recursive search to check for nested lagged features
+ if recursive_search(f.feature1) or recursive_search(f.feature2):
+ res.append(f.feature)
+
+ return res
+
+ @staticmethod
+ def get_constructed_features(l: list[str] = None) -> list[str]:
+ """
+ returns a list of the names of all constructed features (features that have a type other than 'Feature')
+ - within the given list or
+ - of all constructed features if list is None
+
+ Args:
+ l (list[str]): list of feature names to search in
+
+ Returns:
+ list[str]: the list of the names of the constructed features
+ """
+
+ # if no list is given, search in all features
+ if not l:
+ l = FeatureConstruction.features
+
+ res = list()
+ for f in FeatureConstruction.features:
+ if not isinstance(f, Feature) and (f.feature in l):
+ res.append(f.feature) # name of the feature
+
+ return res
+
+ @staticmethod
+ def create_features(inputs: list[Union[str, FeatureBase]], no_sampling_method: bool = False) -> list[str]:
+ """
+ Creates a Feature for all inputs that are not yet created as features
+
+ Args:
+ inputs (list(Union[str, FeatureBase])): List of column names or Features to be used as input features.
+ no_sampling_method (bool): deactivate sampling_method for outputs, default = False.
+ If deactivated, sampling_method will be set to '_'
+
+ Returns:
+ list[str]: list of column names of all input features
+ """
+
+ input_str = list()
+
+ for inp in inputs:
+ if isinstance(inp, FeatureBase):
+ input_str.append(inp.feature) # get name of feature (which is used as column name)
+ if no_sampling_method:
+ inp.set_sampling_method('_')
+ elif isinstance(inp, str):
+ input_str.append(inp)
+ # check if a Feature with the given name (inp) was already created, otherwise create it
+ if not any(inp == f.feature for f in FeatureConstruction.features):
+ Feature(name=inp)
+ if no_sampling_method:
+ FeatureConstruction.get_feature(inp).set_sampling_method('_')
+ else:
+ raise TypeError(f"Only inputs with types 'str' or 'FeatureBase' allowed, got type {type(inp)} instead")
+
+ return input_str
+
+ @staticmethod
+ def process(df: DataFrame, feature_names: list[str] = None):
"""
Processes the input DataFrame by applying all registered feature transformations in order.
Each feature's `process` method is called, which typically adds a new column to `df`
@@ -568,10 +794,16 @@ def process(df: DataFrame):
Args:
df (DataFrame): The DataFrame to process and add features to.
+ feature_names (list[str]): optional parameter to only process those features given in feature_names
"""
- for f in FeatureConstruction.features:
- f.process(df)
+ if feature_names is None:
+ for f in FeatureConstruction.features:
+ f.process(df)
+ else:
+ for f in FeatureConstruction.features:
+ if f.feature in feature_names:
+ f.process(df)
@staticmethod
def get_config() -> list:
diff --git a/physXAI/preprocessing/preprocessing.py b/physXAI/preprocessing/preprocessing.py
index e32dd97..7e5d613 100644
--- a/physXAI/preprocessing/preprocessing.py
+++ b/physXAI/preprocessing/preprocessing.py
@@ -3,9 +3,11 @@
from typing import Optional, Union
import numpy as np
import pandas as pd
+import warnings
from sklearn.model_selection import train_test_split
-from physXAI.preprocessing.constructed import FeatureConstruction
+from physXAI.preprocessing.constructed import FeatureConstruction, FeatureBase, Feature, FeatureTwo
from physXAI.preprocessing.training_data import TrainingData, TrainingDataMultiStep, TrainingDataGeneric
+from physXAI.preprocessing.sampling import Sampling
from physXAI.utils.logging import get_full_path
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import keras
@@ -17,19 +19,17 @@ class PreprocessingData(ABC):
Abstract Preprocessing Class
"""
- def __init__(self, inputs: list[str], output: Union[str, list[str]], shift: int = 1,
+ def __init__(self, inputs: list[Union[str, FeatureBase]], output: Union[str, list[str]],
time_step: Optional[Union[int, float]] = None,
test_size: float = 0.1, val_size: float = 0.1, random_state: int = 42,
time_index_col: Union[str, float] = 0, csv_delimiter: str = ';', csv_encoding: str = 'latin1',
- csv_header: int = 0, csv_skiprows: Union[int, list[int]] = [], ignore_nan: bool = False):
+ csv_header: int = 0, csv_skiprows: Union[int, list[int]] = [], ignore_nan: bool = False, **kwargs):
"""
Initializes the Preprocessing instance.
Args:
- inputs (List[str]): List of column names to be used as input features.
+ inputs (List[Union[str, FeatureBase]]): List of column names or Features to be used as input features.
output (Union[str, List[str]]): Column name(s) for the target variable(s).
- shift (int): The number of time steps to shift the target variable for forecasting.
- A shift of one means predicting the next time step.
time_step (Optional[Union[int, float]]): Optional time step sampling. If None, sampling of data is used.
test_size (float): Proportion of the dataset to allocate to the test set.
val_size (float): Proportion of the dataset to allocate to the validation set.
@@ -47,11 +47,12 @@ def __init__(self, inputs: list[str], output: Union[str, list[str]], shift: int
self.csv_header = csv_header
self.csv_skiprows = csv_skiprows
- self.inputs: list[str] = inputs
+ self.inputs: list[str] = FeatureConstruction.create_features(inputs)
if isinstance(output, str):
output = [output]
- self.output: list[str] = output
- self.shift: int = shift
+ # outputs shouldn't have any sampling method
+ self.output: list[str] = FeatureConstruction.create_features(output, no_sampling_method=True)
+
self.time_step = time_step
# Training, validation and test size should be equal to 1
@@ -91,8 +92,6 @@ def load_data(self, file_path: str) -> pd.DataFrame:
else:
assert self.time_step % time_step == 0, (f"Value Error: Given time step {self.time_step} is not a multiple "
f"of data time step: {time_step}.")
- filtering = (df.index - df.index[0]) % self.time_step == 0
- df = df[filtering]
return df
@@ -116,7 +115,18 @@ def get_config(self) -> dict:
@classmethod
@abstractmethod
def from_config(cls, config: dict) -> 'PreprocessingData':
- pass
+
+ if "__class_name__" in config.keys():
+ if config['__class_name__'] == 'PreprocessingSingleStep':
+ return PreprocessingSingleStep.from_config(config)
+ elif config['__class_name__'] == 'PreprocessingMultiStep':
+ return PreprocessingMultiStep.from_config(config)
+ else:
+ raise ValueError(
+ f"config does not contain a valid '__class_name__'. config['__class_name__'] is "
+ f"{config['__class_name__']} but only 'PreprocessingSingleStep' or 'PreprocessingMultiStep' allowed.")
+ else:
+ raise ValueError("No valid config given. config does not contain key '__class_name__'")
class PreprocessingSingleStep(PreprocessingData):
@@ -127,7 +137,7 @@ class PreprocessingSingleStep(PreprocessingData):
validation, and test sets.
"""
- def __init__(self, inputs: list[str], output: Union[str, list[str]], shift: int = 1,
+ def __init__(self, inputs: list[Union[str, FeatureBase]], output: Union[str, list[str]],
time_step: Optional[Union[int, float]] = None,
test_size: float = 0.1, val_size: float = 0.1, random_state: int = 42,
time_index_col: Union[str, float] = 0, csv_delimiter: str = ';', csv_encoding: str = 'latin1',
@@ -136,10 +146,8 @@ def __init__(self, inputs: list[str], output: Union[str, list[str]], shift: int
Initializes the PreprocessingSingleStep instance.
Args:
- inputs (List[str]): List of column names to be used as input features.
+ inputs (List[Union[str, FeatureBase]]): List of column names or Features to be used as input features.
output (Union[str, List[str]]): Column name(s) for the target variable(s).
- shift (int): The number of time steps to shift the target variable for forecasting.
- A shift of one means predicting the next time step.
time_step (Optional[Union[int, float]]): Optional time step sampling. If None, sampling of data is used.
test_size (float): Proportion of the dataset to allocate to the test set.
val_size (float): Proportion of the dataset to allocate to the validation set.
@@ -152,16 +160,30 @@ def __init__(self, inputs: list[str], output: Union[str, list[str]], shift: int
ignore_nan (bool): If True, rows with NaN values will be dropped. If False, an error is raised if NaNs are present. Default is False.
"""
- super().__init__(inputs, output, shift, time_step, test_size, val_size, random_state, time_index_col,
- csv_delimiter, csv_encoding, csv_header, csv_skiprows, ignore_nan)
+ if 'shift' in kwargs.keys():
+ warnings.warn("shift parameter is deprecated for SingleStep models and replaced by sampling_method,"
+ "an attribute of each Feature. This allows specifying individual 'shifts' for each Feature / "
+ "input. A default sampling method can be specified via "
+ "Feature.set_default_sampling_method().", DeprecationWarning)
+ warnings.warn(f"shift parameter was given as shift={kwargs['shift']}. Setting"
+ f"Feature.set_default_sampling_method(shift) and overriding possible individual sampling "
+ f"methods of all Features. If this is not intended, remove shift parameter when initializing"
+ f" PreprocessingSingleStep object!", DeprecationWarning)
+ Feature.set_default_sampling_method(kwargs['shift'])
+ for f in FeatureConstruction.features:
+ f.set_sampling_method(kwargs['shift'])
+
+ super().__init__(inputs, output, time_step, test_size, val_size, random_state, time_index_col,
+ csv_delimiter, csv_encoding, csv_header, csv_skiprows, ignore_nan, **kwargs)
def process_data(self, df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Processes the loaded DataFrame:
- 1. Applies feature constructions defined in `FeatureConstruction`.
- 2. Selects relevant input and output columns.
- 3. Handles missing values by dropping rows.
- 4. Shifts the target variable(s) `y` for forecasting.
+
+ 1. Selects relevant input and output columns.
+ 2. Handles missing values by dropping rows.
+ 3. Applies the defined sampling method on each (unconstructed) input variable.
+ 4. Applies feature constructions defined in `FeatureConstruction`.
Args:
df (pd.DataFrame): The input DataFrame.
@@ -171,27 +193,46 @@ def process_data(self, df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
and target (y) DataFrames.
"""
- # Applies feature constructions defined in `FeatureConstruction`.
- FeatureConstruction.process(df)
+ # extract the names of all constructed features
+ constructed_inputs = FeatureConstruction.get_constructed_features(self.inputs)
+ constructed_outputs = FeatureConstruction.get_constructed_features(self.output)
- df = df[self.inputs + [out for out in self.output if out not in self.inputs]]
+ # Only apply sampling method to those features which are not constructed features
+ # but whose data is taken directly from the data frame
+ inputs_without_constructed = [inp for inp in self.inputs if inp not in constructed_inputs]
+ output_without_constructed = [out for out in self.output if out not in constructed_outputs]
- # Nan handling
+ df = df[inputs_without_constructed + output_without_constructed]
+
+ # Nan handling in first and last rows
non_nan_rows = df.notna().all(axis=1)
first_valid_index = non_nan_rows.idxmax() if non_nan_rows.any() else None
last_valid_index = non_nan_rows.iloc[::-1].idxmax() if non_nan_rows.any() else None
df = df.loc[first_valid_index:last_valid_index]
- if df.isnull().values.any():
- if self.ignore_nan:
- df.dropna(inplace=True)
- else:
- raise ValueError("Data Error: The TrainingData contains NaN values in intermediate rows. If this is intended, set ignore_nan=True in PreprocessingSingleStep.")
- X = df[self.inputs]
- y = df[self.output].shift(-self.shift)
- if self.shift > 0: # pragma: no cover
- y = y.iloc[:-self.shift]
- X = X.iloc[:-self.shift]
+ sampler = Sampling(inputs_without_constructed, output_without_constructed, self.time_step, self.ignore_nan)
+ # sample input data
+ X = sampler.sample_unconstructed_inputs(df)
+
+ # sample output data
+ if len(output_without_constructed) != 0: # at least one non-constructed output feature
+ y = sampler.sample_unconstructed_outputs(df)
+ res_df = pd.concat([X, y], axis=1)
+ else: # only constructed outputs
+ res_df = X
+
+ # Applies feature constructions defined in `FeatureConstruction`
+ FeatureConstruction.process(res_df, feature_names=constructed_inputs + constructed_outputs)
+
+ if len(constructed_outputs) != 0:
+ # correct shifting of constructed outputs if any
+ res_df = sampler.sample_constructed_outputs(res_df, constructed_outputs)
+
+ # drop NaNs occurring due to creation of lags (constructed feature)
+ res_df.dropna(inplace=True)
+
+ X = res_df[self.inputs]
+ y = res_df[self.output]
return X, y
@@ -249,7 +290,6 @@ def get_config(self) -> dict:
'__class_name__': self.__class__.__name__,
'inputs': self.inputs,
'output': self.output,
- 'shift': self.shift,
'test_size': self.test_size,
'val_size': self.val_size,
'random_state': self.random_state,
@@ -262,7 +302,7 @@ def from_config(cls, config: dict) -> 'PreprocessingSingleStep':
return cls(**config)
-class PreprocessingMultiStep (PreprocessingData):
+class PreprocessingMultiStep(PreprocessingData):
"""
Handles preprocessing for multi-step forecasting models, typically RNNs.
This involves creating windowed datasets suitable for sequence models,
@@ -274,12 +314,12 @@ def __init__(self, inputs: list[str], output: Union[str, list[str]], label_width
test_size: float = 0.1, val_size: float = 0.1, random_state: int = 42,
time_index_col: Union[str, float] = 0, csv_delimiter: str = ';', csv_encoding: str = 'latin1',
csv_header: int = 0, csv_skiprows: Union[int, list[int]] = [],
- overlapping_sequences: bool = True, batch_size=32, init_features: list[str] = None,**kwargs):
+ overlapping_sequences: bool = True, batch_size=32, init_features: list[str] = None, **kwargs):
"""
Initializes the PreprocessingMultiStep instance.
Args:
- inputs (List[str]): Column names for input features to the main RNN.
+ inputs (List[Union[str, FeatureBase]]): List of column names or Features that are input features to the main RNN.
output (Union[str, List[str]]): Column name(s) for target variable(s).
label_width (int): Number of time steps in the output (label) sequence.
warmup_width (int): Number of time steps in the warmup sequence (for RNN state initialization).
@@ -301,7 +341,7 @@ def __init__(self, inputs: list[str], output: Union[str, list[str]], label_width
If None and warmup_width > 0, defaults to `inputs`.
If None and warmup_width <= 0, defaults to empty list.
"""
- super().__init__(inputs, output, shift, time_step, test_size, val_size, random_state, time_index_col,
+ super().__init__(inputs, output, time_step, test_size, val_size, random_state, time_index_col,
csv_delimiter, csv_encoding, csv_header, csv_skiprows)
self.overlapping_sequences = overlapping_sequences
@@ -315,11 +355,11 @@ def __init__(self, inputs: list[str], output: Union[str, list[str]], label_width
keras.utils.set_random_seed(random_state)
# Determine necessary parameters for window creation
- self.features: list[str] = (inputs + self.output +
- [f for f in self.init_features if f not in inputs and f not in self.output])
+ self.features: list[str] = (self.inputs + self.output +
+ [f for f in self.init_features if f not in self.inputs and f not in self.output])
self.column_indices: dict[str, int] = {name: i for i, name in enumerate(self.features)}
- self.warmup_columns_input: list[str] = list(set(self.init_features) & set(inputs))
- self.warmup_columns_labels: list[str] = list(set(self.init_features) - set(inputs))
+ self.warmup_columns_input: list[str] = list(set(self.init_features) & set(self.inputs))
+ self.warmup_columns_labels: list[str] = list(set(self.init_features) - set(self.inputs))
self.label_width: int = label_width
self.warmup_width: int = warmup_width
@@ -352,6 +392,10 @@ def process_data(self, df: pd.DataFrame) -> TrainingDataMultiStep:
TrainingDataMultiStep: Container with tf.data.Dataset objects.
"""
+ # filter data
+ sampler = Sampling(unconstructed_inputs=[], unconstructed_outputs=[], time_step=self.time_step)
+ df = sampler.sample_df_according_to_timestep(df)
+
# Applies feature constructions defined in `FeatureConstruction`.
FeatureConstruction.process(df)
diff --git a/physXAI/preprocessing/sampling.py b/physXAI/preprocessing/sampling.py
new file mode 100644
index 0000000..4426c42
--- /dev/null
+++ b/physXAI/preprocessing/sampling.py
@@ -0,0 +1,218 @@
+from typing import Union, Iterable
+import pandas as pd
+import numpy as np
+import itertools
+
+
+def return_valid_sampling_method(v: Union[int, str]):
+ """ check the validity of the given sampling method and return a string if value is int """
+
+ if not isinstance(v, (int, str)):
+ raise TypeError(f'Type of sampling method not supported. Type is {type(v)}, must be int or str.')
+
+ if v in ['current', 0]:
+ return 'current'
+ elif v in ['previous', 1]:
+ return 'previous'
+ elif v in ['mean_over_interval', '_']:
+ return v
+ else:
+ raise ValueError(
+ f"Value of sampling method not supported, value is: {v}. Sampling method must be 'current' "
+ f"(or 0 if sampling_method is int), 'previous' (or 1 if sampling_method is int) or 'mean_over_interval'. "
+ f"In case of deactivated sampling (for outputs), sampling_method must be '_'.")
+
+
+class Sampling:
+ def __init__(self, unconstructed_inputs: list[str], unconstructed_outputs: list[str], time_step: Union[int, float],
+ ignore_nan: bool = False):
+ """
+ A class providing methods for sampling
+
+ Args:
+ unconstructed_inputs (list[str]): names of unconstructed (!) input features
+ unconstructed_outputs (list[str]): names of unconstructed (!) output features
+ time_step (Union[int, float]): sampling interval, multiple of sampling of data
+ ignore_nan: If True, intermediate rows with NaN values will be dropped.
+ If False, an error is raised if NaNs are present in intermediate rows after processing.
+ Default is False.
+ """
+ self.inputs = unconstructed_inputs
+ self.outputs = unconstructed_outputs
+ self.time_step = time_step
+ self.ignore_nan = ignore_nan
+
+ def sample_df_according_to_timestep(self, df: pd.DataFrame) -> pd.DataFrame:
+ """
+ samples given data frame to the new grid defined by time_step
+
+ Args:
+ df: pandas DataFrame
+ Returns:
+ pd.DataFrame: DataFrame with the new sampling grid
+ """
+ filtering = (df.index - df.index[0]) % self.time_step == 0
+ df = df[filtering]
+ return df
+
+ def previous_or_mean_in_sampling_methods(self) -> list[bool]:
+ """
+ checks if any input uses the sampling methods 'previous' or 'mean_over_interval'
+
+ Returns:
+ list[bool]: list of bool stating if the sampling method of an input is prev./mean (True) or not (False)
+ (list in the order of self.inputs)
+ """
+ # no import on module level possible due to circular import
+ from physXAI.preprocessing.preprocessing import FeatureConstruction
+
+ arr = []
+ for fn in self.inputs:
+ sm = FeatureConstruction.get_feature(fn).get_sampling_method()
+ arr.append(sm in ['previous', 'mean_over_interval'])
+ return arr
+
+ def sample_unconstructed_inputs(self, df: pd.DataFrame) -> pd.DataFrame:
+ """
+ extracts the unconstructed inputs from the given DataFrame, applies their corresponding sampling method and
+ samples them to the target grid
+
+ Args:
+ df (pd.DataFrame): data
+ Returns:
+ pd.DataFrame: DataFrame (X) that solely contains all unconstructed inputs (with the correct sampling)
+ """
+
+ # no import on module level possible due to circular import
+ from physXAI.preprocessing.preprocessing import FeatureConstruction
+
+ # extract inputs from DataFrame and get target sampling grid
+ X = df[self.inputs].copy()
+ target_grid = self.sample_df_according_to_timestep(df).index
+
+ # different inputs can have different sampling methods
+ res = []
+ features_without_constructed = [FeatureConstruction.get_feature(inp) for inp in self.inputs]
+ for f in features_without_constructed:
+ # only process inputs with sampling method mean_over_interval first since X cannot be sampled
+ # to the actual required time steps until the intermediate values were taken into the mean
+ if f.get_sampling_method() == 'mean_over_interval':
+ res.append(get_mean_over_interval(X[[f.feature]], target_grid))
+
+ # sample X to target grid
+ X = self.sample_df_according_to_timestep(X)
+ # process inputs with sampling methods 'current' and 'previous'
+ for f in features_without_constructed:
+ _x = X[[f.feature]]
+ if f.get_sampling_method() == 'current':
+ # no transformation needed
+ res.append(_x)
+ elif f.get_sampling_method() == 'previous':
+ # shift by 1
+ _x = _x.shift(1)
+ _x = _x.iloc[1:]
+ res.append(_x)
+ elif f.get_sampling_method() == 'mean_over_interval':
+ continue
+ else:
+ raise NotImplementedError(f"Sampling method '{f.get_sampling_method()}' not implemented.")
+ # concatenate sampled input data
+ X = pd.concat(res, axis=1)
+ X = X.sort_index(ascending=True)
+
+ # Sampling methods 'previous' and 'mean_over_interval' reduce available data points by 1.
+ previous_or_mean = self.previous_or_mean_in_sampling_methods()
+ if any(previous_or_mean):
+ # if at least one of the features uses 'current' as sampling method, shorten X
+ if not all(previous_or_mean):
+ X = X.iloc[1:]
+
+ # check for NaNs
+ if X.isnull().values.any():
+ if self.ignore_nan:
+ X.dropna(inplace=True)
+ else:
+ raise ValueError(
+ "Data Error: The input data contains NaN values in intermediate rows. If this is intended, set "
+ "ignore_nan=True in PreprocessingSingleStep.")
+ return X
+
+ def sample_unconstructed_outputs(self, df: pd.DataFrame) -> pd.DataFrame:
+ """
+ extracts the unconstructed outputs from the given DataFrame and samples them to the target grid
+
+ Args:
+ df (pd.DataFrame): data
+ Returns:
+ pd.DataFrame: DataFrame (y) that solely contains all unconstructed outputs
+ """
+ y = df[self.outputs].copy()
+ y = self.sample_df_according_to_timestep(y)
+
+ # Sampling methods 'previous' and 'mean_over_interval' reduce available data points by 1.
+ # synchronize length of X and y
+ if any(self.previous_or_mean_in_sampling_methods()):
+ y = y.iloc[1:]
+
+ # check for NaNs
+ if y.isnull().values.any():
+ if self.ignore_nan:
+ y.dropna(inplace=True)
+ else:
+ raise ValueError(
+ "Data Error: The output data contains NaN values in intermediate rows. If this is intended,"
+ "set ignore_nan=True in PreprocessingSingleStep.")
+ return y
+
+ def sample_constructed_outputs(self, df: pd.DataFrame, constructed_outputs: list[str]) -> pd.DataFrame:
+ """
+ Correct shifting of constructed outputs if they are based on input features with sampling previous or mean_over_interval.
+ Since the inputs are shifted before the constructed features are created, the constructed output has to be
+ shifted to invert / neutralize the shift of the input features that was applied before.
+
+ Args:
+ df (pd.DataFrame): data including constructed features
+ constructed_outputs (list[str]): names of constructed output features
+ Returns:
+ pd.DataFrame: modified DataFrame (df)
+ """
+ # no import on module level possible due to circular import
+ from physXAI.preprocessing.preprocessing import FeatureConstruction, FeatureTwo
+
+ if any(self.previous_or_mean_in_sampling_methods()):
+ methods = ['previous', 'mean_over_interval']
+ for out in constructed_outputs:
+ out_feature = FeatureConstruction.get_feature(out)
+ if isinstance(out_feature, FeatureTwo):
+ # correct shifting only if output bases on input features with before mentioned sampling methods
+ if (out_feature.feature1.get_sampling_method() in methods or
+ out_feature.feature2.get_sampling_method() in methods):
+ df[out_feature.feature] = df[out_feature.feature].shift(-1)
+ else: # constructed feature that doesn't consist of two features (FeatureExp, ...)
+ # correct shifting only if output bases on input features with before mentioned sampling methods
+ if out_feature.f1.get_sampling_method() in methods:
+ df[out_feature.feature] = df[out_feature.feature].shift(-1)
+ return df
+
+
+def get_mean_over_interval(x: pd.DataFrame, target_grid: pd.DataFrame.index) -> pd.DataFrame:
+ """samples and returns x on target grid taking the mean over the interval (between the grid indices)"""
+
+ def pairwise(iterable: Iterable):
+ "s -> (s0,s1), (s1,s2), (s2, s3), ..."
+ a, b = itertools.tee(iterable)
+ next(b, None)
+ return zip(a, b)
+
+ original_grid = np.array(x.index)
+ results = []
+ for i, j in pairwise(target_grid):
+ slicer = np.logical_and(original_grid >= i, original_grid < j)
+ d = {'Index': j}
+ for inp in x.columns:
+ d[inp] = x[inp][slicer].mean()
+ results.append(d)
+
+ x = pd.DataFrame(results).set_index('Index')
+
+ return x
diff --git a/physXAI/utils/logging.py b/physXAI/utils/logging.py
index 7f873d7..61334b2 100644
--- a/physXAI/utils/logging.py
+++ b/physXAI/utils/logging.py
@@ -142,7 +142,7 @@ def setup_logger(folder_name: str = None, override: bool = False, base_path: str
if base_path is None:
base_path = Logger.base_path
if folder_name is None:
- folder_name = datetime.now().strftime("%d.%m.%y %H:%M:%S")
+ folder_name = datetime.now().strftime("%y-%m-%d %H.%M.%S")
folder_name = os.path.join(base_path, folder_name)
else:
folder_name = os.path.join(base_path, folder_name)
diff --git a/unittests/test_coverage.py b/unittests/test_coverage.py
index e17b448..de712b0 100644
--- a/unittests/test_coverage.py
+++ b/unittests/test_coverage.py
@@ -2,19 +2,19 @@
import os
import pathlib
from unittest.mock import patch
+from unittest import TestCase
import keras
import pytest
######################################################################################################################
from physXAI.utils.logging import Logger, get_parent_working_directory
from physXAI.preprocessing.preprocessing import PreprocessingSingleStep, PreprocessingMultiStep, \
PreprocessingData
-from physXAI.preprocessing.constructed import Feature, FeatureConstruction, FeatureConstant
+from physXAI.preprocessing.constructed import Feature, FeatureConstruction, FeatureConstant, FeatureExp, FeatureLag
from physXAI.feature_selection.recursive_feature_elimination import recursive_feature_elimination_pipeline
from physXAI.models.models import LinearRegressionModel, AbstractModel
from physXAI.models.ann.ann_design import ClassicalANNModel, CMNNModel, LinANNModel, PINNModel, RNNModel, \
RBFModel
-
base_path = os.path.join(pathlib.Path(__file__).resolve().parent.parent, 'stored_data')
@@ -24,29 +24,42 @@ def disable_plotly_show():
with patch('plotly.graph_objects.Figure.show'):
yield
+
@pytest.fixture(scope='module')
def file_path():
return os.path.join(pathlib.Path(__file__).resolve().parent.parent, "data/bestest_hydronic_heat_pump/pid_data.csv")
+
@pytest.fixture(scope='module')
def inputs_php():
return ['oveHeaPumY_u', 'Func(logistic)', 'weaSta_reaWeaTDryBul_y', 'reaTZon_y']
+
@pytest.fixture(scope='module')
def inputs_tair():
return ['reaTZon_y', 'weaSta_reaWeaTDryBul_y', 'oveHeaPumY_u', 'oveHeaPumY_u_lag1']
+
+@pytest.fixture(scope='module')
+def inputs_tair_extended():
+ return ['reaTZon_y', 'reaTZon_y_lag1', 'reaTZon_y_lag2', 'weaSta_reaWeaTDryBul_y', 'weaSta_reaWeaTDryBul_y_lag1',
+ Feature('weaSta_reaWeaHDirNor_y', sampling_method='mean_over_interval'), 'oveHeaPumY_u',
+ 'oveHeaPumY_u_lag1', 'oveHeaPumY_u_lag2']
+
@pytest.fixture(scope='module')
def output_php():
return 'reaPHeaPum_y'
+
@pytest.fixture(scope='module')
def output_tair():
return 'Change(T_zone)'
+
def test_path_setup():
get_parent_working_directory()
+
def test_preprocessing(monkeypatch, file_path, inputs_php, output_php):
monkeypatch.setattr('builtins.input', lambda _: "Y")
@@ -78,6 +91,9 @@ def test_preprocessing(monkeypatch, file_path, inputs_php, output_php):
prep = PreprocessingSingleStep(inputs_php, output_php)
prep.pipeline(file_path)
+ FeatureConstruction.reset()
+
+
def test_preprocessing_multistep(file_path, inputs_tair, output_tair):
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -93,6 +109,128 @@ def test_preprocessing_multistep(file_path, inputs_tair, output_tair):
overlapping_sequences=False, batch_size=1)
prep.pipeline(file_path)
+ FeatureConstruction.reset()
+
+
+class TestSamplingMethodsFaults(TestCase):
+
+ # test case: unsupported str given as sampling method
+ def test_unsupported_str(self):
+ with self.assertRaises(ValueError):
+ Feature.set_default_sampling_method('test')
+
+ # test case: unsupported type given for sampling method
+ def test_unsupported_type(self):
+ with self.assertRaises(TypeError):
+ Feature.set_default_sampling_method(['current'])
+
+ # test case: lags of the same input have mismatching sampling methods
+ def test_lag_with_mismatching_sampling_methods(self):
+
+ # allowed
+ x = Feature('test_correct', sampling_method='mean_over_interval')
+ x2 = FeatureLag(x, lag=2, sampling_method='mean_over_interval')
+ e = FeatureExp(x, sampling_method='previous')
+
+ with pytest.warns(UserWarning):
+ c = FeatureConstant(c=100, name='test_const', sampling_method=1)
+
+ # not allowed
+ y = Feature('test_fault', sampling_method='current')
+ with self.assertRaises(AssertionError):
+ FeatureLag(y, lag=1, sampling_method='previous')
+ FeatureConstruction.reset()
+
+
+def test_sampling_method_use_default(file_path, inputs_tair, output_tair):
+ """test case: use default sampling when no default is specified by user"""
+
+ # when not overriding default sampling method, 'previous' is used
+
+ x = Feature('oveHeaPumY_u')
+ x.lag(1)
+
+ # Create & process Training data
+ prep = PreprocessingSingleStep(inputs_tair, output_tair)
+ td = prep.pipeline(file_path)
+
+ for inp in inputs_tair:
+ f = FeatureConstruction.get_feature(inp)
+ assert f.get_sampling_method() == 'previous'
+
+ FeatureConstruction.reset()
+
+
+def test_sampling_method_str(file_path, inputs_tair, output_tair):
+ """test case: set default using str (setting default with int is done in test_different_sampling_methods)"""
+
+ Feature.set_default_sampling_method('mean_over_interval')
+
+ x = Feature('oveHeaPumY_u')
+ x.lag(1)
+
+ # Create & process Training data
+ prep = PreprocessingSingleStep(inputs_tair, output_tair, time_step=4)
+ td = prep.pipeline(file_path)
+
+ for inp in inputs_tair:
+ f = FeatureConstruction.get_feature(inp)
+ assert f.get_sampling_method() == 'mean_over_interval'
+
+ FeatureConstruction.reset()
+
+
+def test_different_sampling_methods(file_path, inputs_tair_extended):
+ """test case: different sampling methods given"""
+
+ # set default
+ Feature.set_default_sampling_method(0)
+
+ # Create lags
+ x1 = Feature('reaTZon_y', sampling_method='previous')
+ lx1 = x1.lag(2) # reaTZon_y_lag1, reaTZon_y_lag2
+ x2 = Feature('weaSta_reaWeaTDryBul_y')
+ lx2 = x2.lag(1) # weaSta_reaWeaTDryBul_y_lag1
+ x3 = Feature('oveHeaPumY_u', sampling_method='mean_over_interval')
+ x3.lag(2) # oveHeaPumY_u_lag1, oveHeaPumY_u_lag2
+
+ # dummy Features
+ y = x1 + lx1[0]
+ z = y + x1
+ z.rename('test_feature_two')
+ e = FeatureExp(x1 - 273.15, 'exp', sampling_method=1) # reduce x1 by 273.15, otherwise values are too high
+
+ # x1 and x3 have sampling methods 'previous' and 'mean_over_interval'.
+ # Since both of them apply a time shift of one, they can be combined in constructed features
+ a = x1 + x3
+ a.rename('test_add')
+
+ inputs_tair_extended.extend([z, e, a])
+
+ # output
+ change_tair = x1 - lx1[0]
+ change_tair.rename('Change(T_air)')
+
+ # Create & process Training data
+ prep = PreprocessingSingleStep(inputs_tair_extended, [change_tair], time_step=4)
+ td = prep.pipeline(file_path)
+
+ # Build & train Classical ANN
+ m = ClassicalANNModel(epochs=1)
+ model = m.pipeline(td)
+
+ # check correct sampling_method specification
+ assert x1.get_sampling_method() == 'previous' and lx1[1].get_sampling_method() == 'previous'
+ assert x2.get_sampling_method() == 'current' and lx2.get_sampling_method() == 'current'
+ assert FeatureConstruction.get_feature('weaSta_reaWeaHDirNor_y').get_sampling_method() == 'mean_over_interval'
+ assert FeatureConstruction.get_feature('test_feature_two').get_sampling_method() == 'previous'
+ assert FeatureConstruction.get_feature('test_add').get_sampling_method() == 'mean_over_interval'
+ assert e.get_sampling_method() == 'previous'
+ assert change_tair.get_sampling_method() == '_'
+
+ FeatureConstruction.reset()
+
+
@pytest.fixture(scope='module')
def p_hp_data(file_path, inputs_php, output_php):
# Setup up logger for saving
@@ -102,6 +240,7 @@ def p_hp_data(file_path, inputs_php, output_php):
td = prep.pipeline(file_path)
return prep, td
+
@pytest.fixture(scope='module')
def tair_data_delta(file_path, inputs_tair, output_tair):
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -116,6 +255,7 @@ def tair_data_delta(file_path, inputs_tair, output_tair):
td = prep.pipeline(file_path)
return prep, td
+
@pytest.fixture(scope='module')
def tair_data_noval(file_path, inputs_tair, output_tair):
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -145,6 +285,7 @@ def tair_data_total(file_path, inputs_tair, output_tair):
td = prep.pipeline(file_path)
return prep, td
+
def test_model_linReg(inputs_php, output_php, file_path):
# Setup up logger for saving
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -161,6 +302,7 @@ def test_model_linReg(inputs_php, output_php, file_path):
Logger.log_setup(prep, m, save_name_model='model_linReg.json')
Logger.save_training_data(td, path=os.path.join(Logger._logger, 'training_data2'))
+
def test_model_ann(p_hp_data, inputs_php, output_php, file_path):
# Setup up logger for saving
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -179,6 +321,53 @@ def test_model_ann(p_hp_data, inputs_php, output_php, file_path):
Logger.log_setup(None, m)
Logger.save_training_data(td)
+
+def test_deprecated_shift(p_hp_data, inputs_php, output_php, file_path):
+
+ # Setup up logger for saving
+ Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
+
+ # Create & process Training data
+ with pytest.warns(DeprecationWarning):
+ prep = PreprocessingSingleStep(inputs_php, output_php, shift=0) # deprecated shift given in preprocessing
+ td = prep.pipeline(file_path)
+
+ m = ClassicalANNModel(epochs=1, n_neurons=[4, 4], n_layers=2, activation_function=['softplus', 'softplus'],
+ early_stopping_epochs=None, rescale_output=False)
+ m.pipeline(td)
+
+ m.epochs = 1
+ m.online_pipeline(td, os.path.join(Logger._logger, 'model.keras'))
+
+ assert Feature.get_default_sampling_method() == 'current'
+ Feature.set_default_sampling_method('previous') # reset default sampling
+
+ # from config
+ config_prep = {
+ "__class_name__": "PreprocessingSingleStep",
+ "inputs": [
+ "oveHeaPumY_u",
+ "Func(logistic)",
+ "weaSta_reaWeaTDryBul_y",
+ "reaTZon_y"
+ ],
+ "output": [
+ "reaPHeaPum_y"
+ ],
+ "shift": 0, # deprecated shift
+ "test_size": 0.1,
+ "val_size": 0.1,
+ "random_state": 42,
+ "time_step": 1.0,
+ }
+ with pytest.warns(DeprecationWarning):
+ a = PreprocessingData.from_config(config_prep)
+ assert isinstance(a, PreprocessingSingleStep)
+ assert Feature.get_default_sampling_method() == 'current'
+
+ FeatureConstruction.reset()
+
+
def test_model_cmnn(p_hp_data, inputs_php, output_php, file_path):
# Setup up logger for saving
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -217,6 +406,7 @@ def test_model_cmnn(p_hp_data, inputs_php, output_php, file_path):
Logger.log_setup(prep, m)
Logger.save_training_data(td)
+
def test_model_linANN(p_hp_data, inputs_php, output_php, file_path):
# Setup up logger for saving
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -242,6 +432,7 @@ def test_model_linANN(p_hp_data, inputs_php, output_php, file_path):
Logger.log_setup(prep, m)
Logger.save_training_data(td)
+
def test_model_pinn(inputs_php, output_php, file_path):
# Setup up logger for saving
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -274,6 +465,7 @@ def test_model_pinn(inputs_php, output_php, file_path):
Logger.log_setup(prep, m)
Logger.save_training_data(td)
+
def test_models_rnn(file_path):
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -286,7 +478,7 @@ def test_models_rnn(file_path):
m = RNNModel(epochs=1, rnn_layer='LSTM', init_layer='dense')
m.pipeline(td, os.path.join(Logger._logger, 'model2.keras'))
- Logger.log_setup(td, m, 'preprocessing_config2.json',
+ Logger.log_setup(prep, m, 'preprocessing_config2.json',
save_name_constructed='constructed_config2.json')
Logger.save_training_data(td)
@@ -309,6 +501,9 @@ def test_models_rnn(file_path):
m = RNNModel(epochs=1, rnn_layer='RNN', early_stopping_epochs=None)
m.pipeline(td, save_model=False, plot=False)
+ FeatureConstruction.reset()
+
+
def test_read_setup():
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -318,19 +513,24 @@ def test_read_setup():
path = os.path.join(Logger._logger, save_name_preprocessing)
with open(path, "r") as f:
config_prep = json.load(f)
- PreprocessingData.from_config(config_prep)
+ a = PreprocessingData.from_config(config_prep)
+ assert isinstance(a, PreprocessingSingleStep)
+ FeatureConstruction.reset()
save_name_preprocessing = 'preprocessing_config2.json'
path = os.path.join(Logger._logger, save_name_preprocessing)
with open(path, "r") as f:
config_prep = json.load(f)
- PreprocessingData.from_config(config_prep)
+ b = PreprocessingData.from_config(config_prep)
+ assert isinstance(b, PreprocessingMultiStep)
+ FeatureConstruction.reset()
save_name_constructed = Logger.save_name_constructed
path = os.path.join(Logger._logger, save_name_constructed)
with open(path, "r") as f:
config_constructed = json.load(f)
FeatureConstruction.from_config(config_constructed)
+ FeatureConstruction.reset()
save_name_model = Logger.save_name_model_config
path = os.path.join(Logger._logger, save_name_model)
@@ -344,12 +544,14 @@ def test_read_setup():
config_model = json.load(f)
AbstractModel.model_from_config(config_model)
-def test_feature_selection(monkeypatch, p_hp_data, file_path):
+
+def test_feature_selection(monkeypatch, inputs_php, output_php, file_path):
# Setup up logger for saving
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
monkeypatch.setattr('builtins.input', lambda _: "2")
- prep = p_hp_data[0]
+ # Create Training data
+ prep = PreprocessingSingleStep(inputs_php, output_php)
m = LinearRegressionModel()
@@ -359,6 +561,7 @@ def test_feature_selection(monkeypatch, p_hp_data, file_path):
recursive_feature_elimination_pipeline(file_path, prep, m, ascending_lag_order=True,
fixed_inputs=['weaSta_reaWeaTDryBul_y', 'oveHeaPumY_u'])
+
def test_feature_selection_multi(monkeypatch, tair_data_delta, tair_data_noval ,tair_data_total, file_path):
# Setup up logger for saving
Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True)
@@ -389,5 +592,6 @@ def test_feature_selection_multi(monkeypatch, tair_data_delta, tair_data_noval ,
m = ClassicalANNModel(epochs=1, n_neurons=4)
recursive_feature_elimination_pipeline(file_path, prep2, m, use_multi_step_error=False)
m.pipeline(td2, save_model=False, plot=False)
- Logger.log_setup(prep, None)
- Logger.save_training_data(td, path=os.path.join(Logger._logger, 'training_data2.json'))
\ No newline at end of file
+ Logger.log_setup(prep, None, save_name_preprocessing='preprocessing_feature-selection-multi.json',
+ save_name_constructed='constructed_config_feature-selection-multi.json')
+ Logger.save_training_data(td, path=os.path.join(Logger._logger, 'training_data2.json'))