diff --git a/build/reports/coverage.svg b/build/reports/coverage.svg index 1c7007c..7a18c7f 100644 --- a/build/reports/coverage.svg +++ b/build/reports/coverage.svg @@ -15,7 +15,7 @@ coverage coverage - 89% - 89% + 80% + 80% diff --git a/executables/bestest_hydronic_heat_pump/P_hp.py b/executables/bestest_hydronic_heat_pump/P_hp.py index 97bf7ec..2c4fd53 100644 --- a/executables/bestest_hydronic_heat_pump/P_hp.py +++ b/executables/bestest_hydronic_heat_pump/P_hp.py @@ -36,7 +36,7 @@ # It is recommended to rename features, so that they can be easily added to the input list # Create Training data -prep = PreprocessingSingleStep(inputs, output) +prep = PreprocessingSingleStep(inputs=inputs, output=output) # Process Training data td = prep.pipeline(file_path) diff --git a/executables/bestest_hydronic_heat_pump/P_hp_FeatureSelection.py b/executables/bestest_hydronic_heat_pump/P_hp_FeatureSelection.py index ab4b60e..300c567 100644 --- a/executables/bestest_hydronic_heat_pump/P_hp_FeatureSelection.py +++ b/executables/bestest_hydronic_heat_pump/P_hp_FeatureSelection.py @@ -33,7 +33,7 @@ # Generic Preprocessing Pipeline # Model is output model, so single step evaluation is choosen -prep = PreprocessingSingleStep(inputs, output) +prep = PreprocessingSingleStep(inputs=inputs, output=output) # Generic Model m = LinearRegressionModel() diff --git a/executables/bestest_hydronic_heat_pump/P_hp_modular.py b/executables/bestest_hydronic_heat_pump/P_hp_modular.py new file mode 100644 index 0000000..c61126f --- /dev/null +++ b/executables/bestest_hydronic_heat_pump/P_hp_modular.py @@ -0,0 +1,41 @@ +from physXAI.models.modular.modular_ann import ModularANN, ModularModel +from physXAI.preprocessing.preprocessing import PreprocessingSingleStep +from physXAI.preprocessing.constructed import Feature +from physXAI.models.ann.ann_design import ClassicalANNModel +from physXAI.utils.logging import Logger + + +""" +Creates modular models to predict the power of the heat pump using the Boptest data. +""" + +Logger.setup_logger(folder_name='P_hp_modular', override=True) + +file_path = r"data/bestest_hydronic_heat_pump/pid_data.csv" + +inputs = ['oveHeaPumY_u', 'Func(logistic)', 'weaSta_reaWeaTDryBul_y', 'reaTZon_y'] +output = 'reaPHeaPum_y' + +oveHeaPumY_u = Feature('oveHeaPumY_u') +func_logistic = Feature('Func(logistic)') +TDryBul = Feature('weaSta_reaWeaTDryBul_y') +TZon = Feature('reaTZon_y') + +prep = PreprocessingSingleStep(inputs=inputs, output=output) +td = prep.pipeline(file_path) + +"""Example usages of modular models""" +y = ModularModel( + model=ClassicalANNModel(), + inputs=[oveHeaPumY_u.input() / func_logistic.input(), func_logistic.input() ** 2, TDryBul.input(), TZon.input()] +) +m = ModularANN(architecture=y, rescale_output=True) + +# Training pipeline +model = m.pipeline(td) + + +# Log setup of preprocessing and model as json +Logger.log_setup(prep, m) +# Log training data as pickle +Logger.save_training_data(td) diff --git a/executables/bestest_hydronic_heat_pump/P_hp_pinn.py b/executables/bestest_hydronic_heat_pump/P_hp_pinn.py index 1e3ba2d..532796a 100644 --- a/executables/bestest_hydronic_heat_pump/P_hp_pinn.py +++ b/executables/bestest_hydronic_heat_pump/P_hp_pinn.py @@ -39,7 +39,7 @@ pinn.rename('pinn') # Create Training data -prep = PreprocessingSingleStep(inputs, output) +prep = PreprocessingSingleStep(inputs=inputs, output=output) # Process Training data td = prep.pipeline(file_path) diff --git a/executables/bestest_hydronic_heat_pump/TAir.py b/executables/bestest_hydronic_heat_pump/TAir.py index 405d335..4f68c0d 100644 --- a/executables/bestest_hydronic_heat_pump/TAir.py +++ b/executables/bestest_hydronic_heat_pump/TAir.py @@ -31,7 +31,7 @@ x3.lag(2) # oveHeaPumY_u_lag1, oveHeaPumY_u_lag2 # Create Training data -prep = PreprocessingSingleStep(inputs, output) +prep = PreprocessingSingleStep(inputs=inputs, output=output) # Process Training data td = prep.pipeline(file_path) diff --git a/executables/bestest_hydronic_heat_pump/TAir_FeatureSelection.py b/executables/bestest_hydronic_heat_pump/TAir_FeatureSelection.py index bddc953..c232408 100644 --- a/executables/bestest_hydronic_heat_pump/TAir_FeatureSelection.py +++ b/executables/bestest_hydronic_heat_pump/TAir_FeatureSelection.py @@ -34,7 +34,7 @@ # Generic Preprocessing Pipeline # Model is state model, so multi-step evaluation is choosen # See example TAir_evaluateMultiStep.py for more information -prep = PreprocessingMultiStep(inputs, output, 48, 0, init_features=['reaTZon_y'], +prep = PreprocessingMultiStep(inputs=inputs, output=output, label_width=48, warmup_width=0, init_features=['reaTZon_y'], overlapping_sequences=False, batch_size=1) # Generic Model diff --git a/executables/bestest_hydronic_heat_pump/TAir_evaluateMultiStep.py b/executables/bestest_hydronic_heat_pump/TAir_evaluateMultiStep.py index 5c5a1bc..44e1075 100644 --- a/executables/bestest_hydronic_heat_pump/TAir_evaluateMultiStep.py +++ b/executables/bestest_hydronic_heat_pump/TAir_evaluateMultiStep.py @@ -42,7 +42,7 @@ overlapping_sequence should be False to avoid duplicate labels for single step prediction batch_size should be 1 as batches are processes differently in single step models """ -prep = PreprocessingMultiStep(inputs, output, 48, 0, init_features=['reaTZon_y'], +prep = PreprocessingMultiStep(inputs=inputs, output=output, label_width=48, warmup_width=0, init_features=['reaTZon_y'], overlapping_sequences=False, batch_size=1) # Process Training data td = prep.pipeline(file_path) diff --git a/executables/bestest_hydronic_heat_pump/TAir_rnn.py b/executables/bestest_hydronic_heat_pump/TAir_rnn.py index 0a74b3b..5039d4c 100644 --- a/executables/bestest_hydronic_heat_pump/TAir_rnn.py +++ b/executables/bestest_hydronic_heat_pump/TAir_rnn.py @@ -29,7 +29,7 @@ warmup_width = 48 # Create Training data. For RNNs MultiStep training data is required -prep = PreprocessingMultiStep(inputs, output, label_width, warmup_width, init_features=inits) +prep = PreprocessingMultiStep(inputs=inputs, output=output, label_width=label_width, warmup_width=warmup_width, init_features=inits) # Process Training data td = prep.pipeline(file_path) diff --git a/physXAI/__init__.py b/physXAI/__init__.py index 8d1c8b6..e69de29 100644 --- a/physXAI/__init__.py +++ b/physXAI/__init__.py @@ -1 +0,0 @@ - diff --git a/physXAI/models/ann/__init__.py b/physXAI/models/ann/__init__.py index 8d1c8b6..e69de29 100644 --- a/physXAI/models/ann/__init__.py +++ b/physXAI/models/ann/__init__.py @@ -1 +0,0 @@ - diff --git a/physXAI/models/ann/ann_design.py b/physXAI/models/ann/ann_design.py index 4097ead..46fa9a0 100644 --- a/physXAI/models/ann/ann_design.py +++ b/physXAI/models/ann/ann_design.py @@ -1,3 +1,4 @@ +from logging import warning import os import time from abc import ABC, abstractmethod @@ -48,6 +49,9 @@ def __init__(self, batch_size: int = 32, epochs: int = 1000, learning_rate: floa self.random_seed: int = random_seed keras.utils.set_random_seed(random_seed) + self.model_config = dict() + + @abstractmethod def generate_model(self, **kwargs): """ @@ -234,12 +238,12 @@ def __init__(self, n_layers: int = 1, n_neurons: Union[int, list[int]] = 32, self.activation_function: Union[str, list[str]] = activation_function self.rescale_output: bool = rescale_output - self.model_config = { + self.model_config.update({ "n_layers": self.n_layers, "n_neurons": self.n_neurons, "activation_function": self.activation_function, "rescale_output": self.rescale_output, - } + }) def generate_model(self, **kwargs): """ @@ -290,12 +294,12 @@ def __init__(self, n_layers: int = 1, n_neurons: Union[int, list[int]] = 32, res self.n_neurons: Union[int, list[int]] = n_neurons self.rescale_output: bool = rescale_output - self.model_config = { + self.model_config.update({ "n_layers": self.n_layers, "n_neurons": self.n_neurons, "rescale_output": self.rescale_output, "random_state": random_seed - } + }) def generate_model(self, **kwargs): """ @@ -354,12 +358,12 @@ def __init__(self, n_layers: int = 1, n_neurons: Union[int, list[int]] = 32, res self.n_neurons: Union[int, list[int]] = n_neurons self.rescale_output: bool = rescale_output - self.model_config = { + self.model_config.update({ "n_layers": self.n_layers, "n_neurons": self.n_neurons, "rescale_output": self.rescale_output, "random_state": random_seed - } + }) def generate_model(self, **kwargs): """ @@ -419,14 +423,14 @@ def __init__(self, n_layers: int = 1, n_neurons: Union[int, list[int]] = 32, self.monotonies: dict[str, int] = monotonies self.activation_split: list[float] = activation_split - self.model_config = { + self.model_config.update({ "n_layers": self.n_layers, "n_neurons": self.n_neurons, "activation_function": self.activation_function, "rescale_output": self.rescale_output, "monotonicities": self.monotonies, "activation_split": activation_split, - } + }) def generate_model(self, **kwargs): """ @@ -494,14 +498,14 @@ def __init__(self, n_layers: int = 1, n_neurons: Union[int, list[int]] = 32, self.pinn_weights: list[float] = pinn_weights - self.model_config = { + self.model_config.update({ "n_layers": self.n_layers, "n_neurons": self.n_neurons, "activation_function": self.activation_function, "rescale_output": self.rescale_output, "monotonicities": self.monotonies, "activation_split": activation_split, - } + }) # Create pinn loss based on standard losses self.pinn_loss = multi_y_loss(keras.losses.MeanSquaredError(name='MSE'), self.pinn_weights, 'mse') @@ -626,7 +630,6 @@ def get_config(self) -> dict: }) return config - @register_model class RNNModel(MultiStepModel): """ diff --git a/physXAI/models/ann/configs/ann_model_configs.py b/physXAI/models/ann/configs/ann_model_configs.py index 53d89f1..437da02 100644 --- a/physXAI/models/ann/configs/ann_model_configs.py +++ b/physXAI/models/ann/configs/ann_model_configs.py @@ -4,10 +4,12 @@ class ClassicalANNConstruction_config(BaseModel): - n_layers: int = Field(..., gt=0) + n_layers: int = Field(..., ge=0) n_neurons: Union[int, list[int]] = 32 activation_function: Union[str, list[str]] = 'softplus' rescale_output: bool = True + normalize: bool = True + n_features: Optional[int] = None @field_validator('n_neurons') def validate_n_neurons(cls, v, info): @@ -33,6 +35,7 @@ def validate_activation(cls, v, info): class RBFConstruction_config(ClassicalANNConstruction_config): + n_layers: int = Field(..., ge=1, le=1) random_state: int = 42 rescale_mean: Optional[float] = Field( None, description="Mean value for z-score normalization of outputs" diff --git a/physXAI/models/ann/keras_models/__init__.py b/physXAI/models/ann/keras_models/__init__.py index 8d1c8b6..e69de29 100644 --- a/physXAI/models/ann/keras_models/__init__.py +++ b/physXAI/models/ann/keras_models/__init__.py @@ -1 +0,0 @@ - diff --git a/physXAI/models/ann/model_construction/ann_models.py b/physXAI/models/ann/model_construction/ann_models.py index 97c2563..87f3e82 100644 --- a/physXAI/models/ann/model_construction/ann_models.py +++ b/physXAI/models/ann/model_construction/ann_models.py @@ -1,5 +1,4 @@ import os - import numpy as np from physXAI.preprocessing.training_data import TrainingDataGeneric from physXAI.models.ann.configs.ann_model_configs import (ClassicalANNConstruction_config, @@ -35,7 +34,10 @@ def ClassicalANNConstruction(config: dict, td: TrainingDataGeneric): n_neurons = [n_neurons] * n_layers else: assert len(n_neurons) == n_layers - n_featues = td.X_train_single.shape[1] + if config['n_features'] is not None: + n_features = config['n_features'] + else: + n_features = td.X_train_single.shape[1] activation_function = config['activation_function'] # If activation_function is a single string, replicate it for all layers if isinstance(activation_function, str): @@ -43,20 +45,17 @@ def ClassicalANNConstruction(config: dict, td: TrainingDataGeneric): else: assert len(activation_function) == n_layers - # Rescaling for output layer - rescale_mean = float(np.mean(td.y_train_single)) - rescale_sigma = float(np.std(td.y_train_single, ddof=1)) - # Build artificial neural network as Sequential model = keras.Sequential() # Add input layer - model.add(keras.layers.Input(shape=(n_featues,))) + model.add(keras.layers.Input(shape=(n_features,))) # Add normalization layer - normalization = keras.layers.Normalization() - normalization.adapt(td.X_train_single) - model.add(normalization) + if config['normalize']: + normalization = keras.layers.Normalization() + normalization.adapt(td.X_train_single) + model.add(normalization) for i in range(0, n_layers): # For each layer add dense @@ -65,6 +64,9 @@ def ClassicalANNConstruction(config: dict, td: TrainingDataGeneric): model.add(keras.layers.Dense(1, activation='linear')) # Add rescaling if config['rescale_output']: + # Rescaling for output layer + rescale_mean = float(np.mean(td.y_train_single)) + rescale_sigma = float(np.std(td.y_train_single, ddof=1)) model.add(keras.layers.Rescaling(scale=rescale_sigma, offset=rescale_mean)) model.summary() @@ -98,7 +100,10 @@ def CMNNModelConstruction(config: dict, td: TrainingDataGeneric): n_neurons = [n_neurons] * n_layers else: assert len(n_neurons) == n_layers - n_featues = td.X_train_single.shape[1] + if config['n_features'] is not None: + n_features = config['n_features'] + else: + n_features = td.X_train_single.shape[1] activation_function = config['activation_function'] # If activation_function is a single string, replicate it for all layers if isinstance(activation_function, str): @@ -109,21 +114,20 @@ def CMNNModelConstruction(config: dict, td: TrainingDataGeneric): # Get monotonicity constraints mono = config['monotonicities'] if mono is None: - monotonicities = [0] * n_featues + monotonicities = [0] * n_features else: monotonicities = [0 if name not in mono.keys() else mono[name] for name in td.columns] - # Rescaling for output layer - rescale_mean = float(np.mean(td.y_train_single)) - rescale_sigma = float(np.std(td.y_train_single, ddof=1)) - # Add input layer - input_layer = keras.layers.Input(shape=(n_featues,)) + input_layer = keras.layers.Input(shape=(n_features,)) # Add normalization layer - normalization = keras.layers.Normalization() - normalization.adapt(td.X_train_single) - x = normalization(input_layer) + if config['normalize']: + normalization = keras.layers.Normalization() + normalization.adapt(td.X_train_single) + x = normalization(input_layer) + else: + x = input_layer # Add dense layer activation_split = config['activation_split'] @@ -169,6 +173,9 @@ def CMNNModelConstruction(config: dict, td: TrainingDataGeneric): # Add rescaling if config['rescale_output']: + # Rescaling for output layer + rescale_mean = float(np.mean(td.y_train_single)) + rescale_sigma = float(np.std(td.y_train_single, ddof=1)) x = keras.layers.Rescaling(scale=rescale_sigma, offset=rescale_mean)(x) # # Add min / max constraints diff --git a/physXAI/models/ann/model_construction/rbf_models.py b/physXAI/models/ann/model_construction/rbf_models.py index 1d2e4a4..2a20df1 100644 --- a/physXAI/models/ann/model_construction/rbf_models.py +++ b/physXAI/models/ann/model_construction/rbf_models.py @@ -1,11 +1,33 @@ import keras import numpy as np from sklearn.cluster import KMeans +from sklearn.neighbors import NearestNeighbors from physXAI.preprocessing.training_data import TrainingDataGeneric from physXAI.models.ann.configs.ann_model_configs import RBFConstruction_config from physXAI.models.ann.keras_models.keras_models import RBFLayer +def gamma_init(centers, overlap=0.05): + """Initialize gamma parameter for RBF layer based on centers and desired overlap. + + Args: + centers (np.ndarray): Array of shape (n_centers, n_features) representing the RBF centers. + overlap (float): Desired overlap factor between RBFs. Higher values lead to more overlap. + """ + nbrs = NearestNeighbors(n_neighbors=2).fit(centers) + distances, _ = nbrs.kneighbors(centers) + dist_sq = distances[:, 1] ** 2 + avg_dist_sq = np.median(dist_sq) + + if avg_dist_sq == 0: + return 1.0 # Fallback + + gamma = -np.log(overlap) / avg_dist_sq + print(f"Calculated Gamma: {gamma}") + return gamma + + + def RBFModelConstruction(config: dict, td: TrainingDataGeneric): """ Constructs a Radial Basis Function (RBF) Network model using Keras. @@ -29,59 +51,57 @@ def RBFModelConstruction(config: dict, td: TrainingDataGeneric): config = RBFConstruction_config.model_validate(config).model_dump() # Get config - n_layers = config['n_layers'] n_neurons = config['n_neurons'] # If n_neurons is a single integer, replicate it for all layers - if isinstance(n_neurons, int): - n_neurons = [n_neurons] * n_layers - else: - assert len(n_neurons) == n_layers - n_featues = td.X_train_single.shape[1] - - # Rescaling for output layer - # Custom rescaling - if 'rescale_scale' in config.keys() or 'rescale_offset' in config.keys(): - raise ValueError( - "The 'rescale_scale' and 'rescale_offset' parameters are deprecated. " - "Scaling has changed from min/max to standardization (z-score normalization using mean=0, std=1). " - "Please use 'rescale_mean' and 'rescale_sigma' instead." - ) - if 'rescale_sigma' in config.keys() and config['rescale_sigma'] is not None: - if 'rescale_mean' in config.keys() and config['rescale_mean'] is not None: - rescale_mean = config['rescale_mean'] - else: - rescale_mean = 0 - rescale_sigma = config['rescale_sigma'] - # Standard rescaling + if isinstance(n_neurons, list): + n_neurons = n_neurons[0] + if config['n_features'] is not None: + n_features = config['n_features'] else: - rescale_mean = float(np.mean(td.y_train_single)) - rescale_sigma = float(np.std(td.y_train_single, ddof=1)) + n_features = td.X_train_single.shape[1] # Add input layer - input_layer = keras.layers.Input(shape=(n_featues,)) + input_layer = keras.layers.Input(shape=(n_features,)) # Add normalization layer - normalization = keras.layers.Normalization() - normalization.adapt(td.X_train_single) - x = normalization(input_layer) - - for i in range(0, n_layers): - # For each layer add RBF - - # Determine initial rbf centers - if i == 0: - # Apply KMeans Clustering for rbf centers - kmeans = KMeans(n_clusters=n_neurons[i], random_state=config['random_state'], n_init='auto') - kmeans.fit(normalization(td.X_train_single)) - initial_centers_kmeans = kmeans.cluster_centers_ - x = RBFLayer(n_neurons[i], initial_centers=initial_centers_kmeans, gamma=1)(x) - else: - x = RBFLayer(n_neurons[i], gamma=1)(x) + if config['normalize']: + normalization = keras.layers.Normalization() + normalization.adapt(td.X_train_single) + x = normalization(input_layer) + else: + x = input_layer + + kmeans = KMeans(n_clusters=n_neurons, random_state=config['random_state'], n_init='auto') + kmeans.fit(normalization(td.X_train_single).numpy()) + initial_centers_kmeans = kmeans.cluster_centers_ + + x = RBFLayer(n_neurons, + initial_centers=initial_centers_kmeans, + gamma=gamma_init(initial_centers_kmeans, overlap=0.5), + learnable_centers=False, + learnable_gamma=False)(x) # Add output layer - x = keras.layers.Dense(1, activation='linear')(x) + x = keras.layers.Dense(1, activation='linear', use_bias=False)(x) # Add rescaling if config['rescale_output']: + + # Rescaling for output layer + # Custom rescaling + # --- Sigma (Scale) --- + if 'rescale_sigma' in config and config['rescale_sigma'] is not None: + rescale_sigma = config['rescale_sigma'] + else: + # Auto-calculate from data + rescale_sigma = float(np.std(td.y_train_single, ddof=1)) + # --- Mean (Offset) --- + # CASE A: Residual Mode -> Config must provide 0.0 + # CASE B: Direct Prediction -> Config is None, calculate from data + if 'rescale_mean' in config and config['rescale_mean'] is not None: + rescale_mean = config['rescale_mean'] + else: + rescale_mean = float(np.mean(td.y_train_single)) + x = keras.layers.Rescaling(scale=rescale_sigma, offset=rescale_mean)(x) model = keras.Model(inputs=input_layer, outputs=x) diff --git a/physXAI/models/ann/model_construction/residual_models.py b/physXAI/models/ann/model_construction/residual_models.py index 5f55509..70ca9ad 100644 --- a/physXAI/models/ann/model_construction/residual_models.py +++ b/physXAI/models/ann/model_construction/residual_models.py @@ -35,7 +35,7 @@ def LinResidualANNConstruction(config: dict, td: TrainingDataGeneric, lin_model: # Determine predictions of linear regression for rescaling y_train_pred = lin_model.predict(td.X_train_single) config['rescale_sigma'] = float(np.std(td.y_train_single - y_train_pred, ddof=1)) - config['rescale_mean'] = float(np.mean(td.y_train_single - y_train_pred)) + config['rescale_mean'] = 0 # Add linear regression as dense keras layer lin = keras.layers.Dense(1, activation='linear') diff --git a/physXAI/models/modular/__init__.py b/physXAI/models/modular/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/physXAI/models/modular/modular_ann.py b/physXAI/models/modular/modular_ann.py new file mode 100644 index 0000000..b8b7a42 --- /dev/null +++ b/physXAI/models/modular/modular_ann.py @@ -0,0 +1,499 @@ +import functools +from itertools import combinations +from abc import ABC, abstractmethod +import operator +import os +from pathlib import Path +from typing import Optional, Union +from copy import deepcopy + +import numpy as np +from physXAI.models.ann.keras_models.keras_models import NonNegPartial +from physXAI.models.modular.modular_expression import (ModularExpression, register_modular_expression, + get_modular_expressions_by_name) +from physXAI.models.ann.ann_design import ANNModel, CMNNModel, ClassicalANNModel +from physXAI.models.models import AbstractModel, LinearRegressionModel, register_model +from physXAI.preprocessing.training_data import TrainingDataGeneric +from physXAI.preprocessing.constructed import FeatureBase +from physXAI.utils.logging import Logger +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' +import keras +from keras import Sequential +from keras.src import Functional +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0' + + +@register_model +class ModularANN(ANNModel): + """ + A Modular Artificial Neural Network model that allows for custom architectures. + """ + + def __init__(self, architecture: ModularExpression, batch_size: int = 32, epochs: int = 1000, + learning_rate: float = 0.001, early_stopping_epochs: Optional[int] = 100, + random_seed: int = 42, rescale_output: bool = False, **kwargs): + """ + Initializes the ModularANN. + + Args: + architecture (ModularExpression): The modular architecture defining the model. + batch_size (int): Number of samples per gradient update. + epochs (int): Number of times to iterate over the entire training dataset. + learning_rate (float): Learning rate for the Adam optimizer. + early_stopping_epochs (int): Number of epochs with no improvement after which training will be stopped. + If None, early stopping is disabled. + random_seed (int): Seed for random number generators to ensure reproducibility. + rescale_output (bool): Whether to rescale the output to output scale. + """ + + super().__init__(batch_size, epochs, learning_rate, early_stopping_epochs, random_seed) + self.architecture: ModularExpression = architecture + + self.rescale_output = rescale_output + + self.model_config.update({ + 'rescale_output': rescale_output, # the rest of the parameters are passed on to super + }) + + def generate_model(self, **kwargs): + """ + Generates the Keras model using the specified modular architecture. + """ + + td = kwargs['td'] + n_features = td.X_train_single.shape[1] + input_layer = keras.layers.Input(shape=(n_features,)) + x = self.architecture.construct(input_layer, td) + if self.rescale_output: + rescale_mean = float(np.mean(td.y_train_single)) + rescale_sigma = float(np.std(td.y_train_single, ddof=1)) + x = keras.layers.Rescaling(scale=rescale_sigma, offset=rescale_mean)(x) + model = keras.models.Model(inputs=input_layer, outputs=x) + model.summary() + + return model + + def get_config(self) -> dict: + config = super().get_config() + config.update({ + 'architecture': self.architecture.name, + 'rescale_output': self.rescale_output, + }) + return config + + @classmethod + def from_config(cls, config: dict) -> 'ModularANN': + + a = ModularExpression.get_existing_modular_expression(config['architecture']) + assert a is not None, (f"ModularExpression {config['architecture']} not found, make sure to construct required " + f"modular expressions before constructing {cls.__name__}.") + config['architecture'] = a + + return cls(**config) + + +class ModularAbstractModel(ModularExpression, ABC): + """ + Abstract Base Class for ModularExpressions having other ModularExpressions as inputs + Examples: ModularModel, ModularExistingModel, ModularLinear, ... + """ + def __init__(self, inputs: list[Union[ModularExpression, FeatureBase]], name: str): + super().__init__(name) + self.inputs = [inp if isinstance(inp, ModularExpression) else inp.input() for inp in inputs] + + @abstractmethod + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + pass + + def _get_config(self) -> dict: + c = super()._get_config() + c.update({ + 'inputs': [inp.name for inp in self.inputs], + }) + return c + + @classmethod + def _from_config(cls, item_config: dict, config: list[dict]) -> 'ModularAbstractModel': + """ + Creates a ModularAbstractModel instance (or its subclass) from a configuration dictionary. + Handles reconstruction of inputs. + + Args: + item_config (dict): Configuration dictionary. Must contain key 'inputs' with list of input names + config (list[dict]): The list with the configuration dictionaries of all modular expressions + + Returns: + ModularAbstractModel: An instance of the specific ModularAbstractModel subclass. + """ + + item_config['inputs'] = get_modular_expressions_by_name(item_config['inputs'], config) + return cls(**item_config) + + +@register_modular_expression +class ModularModel(ModularAbstractModel): + + allowed_models = [ClassicalANNModel, CMNNModel, LinearRegressionModel] + i = 0 + + def __init__(self, model: ANNModel, inputs: list[Union[ModularExpression, FeatureBase]], name: str = None, + nominal_range: tuple[float, float] = None): + if not any(isinstance(model, allowed) for allowed in self.allowed_models): + raise NotImplementedError(f"Currently {type(model)} is not supported. Allowed models are: {self.allowed_models}") + + if name is None: + name = f"ModularModel_{ModularModel.i}" + ModularModel.i += 1 + super().__init__(inputs, name) + + self.model = model + self.model.model_config.update({ + "normalize": False, + "rescale_output": False + }) + self._nominal_range = nominal_range + + if nominal_range is None: + self.rescale_output = False + elif nominal_range is not None and len(nominal_range) != 2: + raise ValueError(f"Modular Model: nominal_range must be a tuple of (min, max), but was {nominal_range}") + else: + self.rescale_output = True + self.nominal_mean = (nominal_range[1] + nominal_range[0]) / 2.0 + self.nominal_sigma = (nominal_range[1] - nominal_range[0]) / 4.0 # Assuming 4 sigma covers the range + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + if self.name in ModularExpression.models.keys(): + return ModularExpression.models[self.name] + else: + inps = list() + for x in self.inputs: + y = x.construct(input_layer, td) + inps.append(y) + self.model.model_config['n_features'] = len(inps) + td = deepcopy(td) + td.columns = [inp.name for inp in self.inputs] + if isinstance(self.model, LinearRegressionModel): + lr = ModularLinear(inputs=self.inputs, name=self.name + "_linear").construct(input_layer, td) + l = lr(keras.layers.Concatenate()(inps)) + else: + l = self.model.generate_model(td=td)(keras.layers.Concatenate()(inps)) + if self.rescale_output: + l = keras.layers.Rescaling(scale=self.nominal_sigma, offset=self.nominal_mean)(l) + ModularExpression.models[self.name] = l + return l + + def _get_config(self) -> dict: + c = super()._get_config() + c.update({ + 'model': self.model.get_config(), + 'nominal_range': self._nominal_range, + }) + return c + + @classmethod + def _from_config(cls, item_config: dict, config: list[dict]) -> 'ModularModel': + """ + Creates a ModularModel instance from a configuration dictionary. + Handles reconstruction of model (ANNModel) and inputs. + + Args: + item_config (dict): Configuration dictionary. Must contain configuration for model as well. + config (list[dict]): The list with the configuration dictionaries of all modular expressions + + Returns: + ModularModel: An instance of the specific ModularModel. + """ + + assert isinstance(item_config['model'], dict), (f"config must contain the configuration (dict) for the model " + f"but config['model'] is {item_config['model']}]") + m = AbstractModel.model_from_config(item_config['model']) + item_config['model'] = m + + item_config['inputs'] = get_modular_expressions_by_name(item_config['inputs'], config) + + return cls(**item_config) + + +@register_modular_expression +class ModularExistingModel(ModularAbstractModel): + + def __init__(self, model: Union[Sequential, Functional, str, Path], + original_inputs: list[Union[ModularExpression, FeatureBase]], trainable: bool, name: str = None): + if isinstance(model, str) or isinstance(model, Path): + self.model_path = model + model = keras.models.load_model(model) + self.model = model + + if name is None: + name = model.name + '_existing' + super().__init__(original_inputs, name) + + self.model.trainable = trainable + if not trainable: + for layer in self.model.layers: + layer.trainable = False + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + if self.name in ModularExpression.models.keys(): + return ModularExpression.models[self.name] + else: + inps = list() + for x in self.inputs: + y = x.construct(input_layer, td) + inps.append(y) + l = self.model(keras.layers.Concatenate()(inps)) + ModularExpression.models[self.name] = l + return l + + def _get_config(self) -> dict: + c = super()._get_config() + + # if model wasn't loaded from path originally, save it and store path + if not hasattr(self, 'model_path'): + self.model_path = Logger.get_model_savepath(save_name_model=self.model.name + '.keras') + self.model.save(self.model_path) + + c.update({ + 'model': self.model_path, + 'original_inputs': c['inputs'], + 'trainable': self.model.trainable + }) + c.__delitem__('inputs') # super config contains key 'inputs', here key must be original_inputs + return c + + @classmethod + def _from_config(cls, item_config: dict, config: list[dict]) -> 'ModularExistingModel': + """ + Creates a ModularExistingModel instance from a configuration dictionary. + Handles reconstruction of original_inputs. + + Args: + item_config (dict): Configuration dictionary + config (list[dict]): The list with the configuration dictionaries of all modular expressions + + Returns: + ModularExistingModel: An instance of the specific ModularExistingModel. + """ + + item_config['original_inputs'] = get_modular_expressions_by_name(item_config['original_inputs'], config) + + return cls(**item_config) + + +@register_modular_expression +class ModularLinear(ModularAbstractModel): + i = 0 + + def __init__(self, inputs: list[Union[ModularExpression, FeatureBase]], name: str = None, + nominal_range: tuple[float, float] = None): + if name is None: + name = f"ModularLinear_{ModularLinear.i}" + ModularLinear.i += 1 + super().__init__(inputs, name) + self._nominal_range = nominal_range + + if nominal_range is None: + self.rescale_output = False + elif nominal_range is not None and len(nominal_range) != 2: + raise ValueError(f"Modular Model: nominal_range must be a tuple of (min, max), but was {nominal_range}") + else: + self.rescale_output = True + self.nominal_mean = (nominal_range[1] + nominal_range[0]) / 2.0 + self.nominal_sigma = (nominal_range[1] - nominal_range[0]) / 4.0 # Assuming 4 sigma covers the range + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + if self.name in ModularExpression.models.keys(): + return ModularExpression.models[self.name] + else: + inps = list() + for x in self.inputs: + y = x.construct(input_layer, td) + inps.append(y) + l = keras.layers.Dense(units=1, activation='linear')(keras.layers.Concatenate()(inps)) + if self.rescale_output: + l = keras.layers.Rescaling(scale=self.nominal_sigma, offset=self.nominal_mean)(l) + ModularExpression.models[self.name] = l + return l + + def _get_config(self) -> dict: + c = super()._get_config() + c.update({ + 'nominal_range': self._nominal_range, + }) + return c + + +@register_modular_expression +class ModularMonotoneLinear(ModularAbstractModel): + i = 0 + + def __init__(self, inputs: list[Union[ModularExpression, FeatureBase]], name: str = None, + monotonicities: Optional[dict[str, int]] = None, nominal_range: tuple[float, float] = None): + if name is None: + name = f"ModularMonotoneLinear_{ModularLinear.i}" + ModularLinear.i += 1 + super().__init__(inputs, name) + self._nominal_range = nominal_range + + if monotonicities is None: + monotonicities = [0] * len(self.inputs) + else: + monotonicities = [0 if inp.name not in monotonicities.keys() else monotonicities[inp.name] for inp in self.inputs] + self.monotonicities = monotonicities + + if nominal_range is None: + self.rescale_output = False + elif nominal_range is not None and len(nominal_range) != 2: + raise ValueError(f"Modular Model: nominal_range must be a tuple of (min, max), but was {nominal_range}") + else: + self.rescale_output = True + self.nominal_mean = (nominal_range[1] + nominal_range[0]) / 2.0 + self.nominal_sigma = (nominal_range[1] - nominal_range[0]) / 4.0 # Assuming 4 sigma covers the range + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + if self.name in ModularExpression.models.keys(): + return ModularExpression.models[self.name] + else: + inps = list() + for x in self.inputs: + y = x.construct(input_layer, td) + inps.append(y) + l = keras.layers.Dense(units=1, activation='linear', kernel_constraint=NonNegPartial(self.monotonicities))(keras.layers.Concatenate()(inps)) + if self.rescale_output: + l = keras.layers.Rescaling(scale=self.nominal_sigma, offset=self.nominal_mean)(l) + ModularExpression.models[self.name] = l + return l + + def _get_config(self) -> dict: + c = super()._get_config() + c.update({ + 'nominal_range': self._nominal_range, + 'monotonicities': {self.inputs[n].name: self.monotonicities[n] for n in range(len(self.inputs))}, + }) + return c + + +@register_modular_expression +class ModularPolynomial(ModularAbstractModel): + i = 0 + + def __init__(self, inputs: list[Union[ModularExpression, FeatureBase]], degree: int = 2, interaction_degree: int = 1, + name: str = None, nominal_range: tuple[float, float] = None): + if name is None: + name = f"ModularPolynomial_{ModularPolynomial.i}" + ModularPolynomial.i += 1 + super().__init__(inputs, name) + assert degree >= 1, "Degree must be at least 1." + assert interaction_degree >= 1, "Interaction degree must be at least 1." + self.degree = degree + self.interaction_degree = interaction_degree + self._nominal_range = nominal_range + + if nominal_range is None: + self.rescale_output = False + elif nominal_range is not None and len(nominal_range) != 2: + raise ValueError(f"Modular Model: nominal_range must be a tuple of (min, max), but was {nominal_range}") + else: + self.rescale_output = True + self.nominal_mean = (nominal_range[1] + nominal_range[0]) / 2.0 + self.nominal_sigma = (nominal_range[1] - nominal_range[0]) / 4.0 # Assuming 4 sigma covers the range + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + if self.name in ModularExpression.models.keys(): + return ModularExpression.models[self.name] + else: + inps = list() + for x in self.inputs: + y = x.construct(input_layer, td) + inps.append(y) + + new_features = list(inps) + for feature in inps: + for d in range(2, self.degree + 1): + new_features.append(feature ** d) + for k in range(2, self.interaction_degree + 1): + for combo in combinations(inps, k): + interaction_term = functools.reduce(operator.mul, combo) + new_features.append(interaction_term) + + l = keras.layers.Dense(units=1, activation='linear')(keras.layers.Concatenate()(new_features)) + if self.rescale_output: + l = keras.layers.Rescaling(scale=self.nominal_sigma, offset=self.nominal_mean)(l) + ModularExpression.models[self.name] = l + return l + + def _get_config(self) -> dict: + c = super()._get_config() + c.update({ + 'degree': self.degree, + 'interaction_degree': self.interaction_degree, + 'nominal_range': self._nominal_range, + }) + return c + + +@register_modular_expression +class ModularAverage(ModularAbstractModel): + i = 0 + + def __init__(self, inputs: list[Union[ModularExpression, FeatureBase]], name: str = None): + if name is None: + name = f"ModularAverage_{ModularAverage.i}" + ModularAverage.i += 1 + super().__init__(inputs, name) + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + if self.name in ModularExpression.models.keys(): + return ModularExpression.models[self.name] + else: + inps = list() + for x in self.inputs: + y = x.construct(input_layer, td) + inps.append(y) + l = keras.layers.Average()(inps) + ModularExpression.models[self.name] = l + return l + + +@register_modular_expression +class ModularNormalization(ModularAbstractModel): + i = 0 + + def __init__(self, input: ModularExpression, name: str = None): + if name is None: + name = f"ModularNormalization_{ModularNormalization.i}" + ModularNormalization.i += 1 + super().__init__([input], name) + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + inp = self.inputs[0].construct(input_layer, td) + normalization = keras.layers.BatchNormalization() + l = normalization(inp) + return l + + def _get_config(self) -> dict: + c = super()._get_config() + c.update({ + 'input': c['inputs'][0], + }) + c.__delitem__('inputs') # super config contains key 'inputs', here only single input + return c + + @classmethod + def _from_config(cls, item_config: dict, config: list[dict]) -> 'ModularNormalization': + """ + Creates a ModularNormalization instance from a configuration dictionary. + Handles reconstruction of single input. + + Args: + item_config (dict): Configuration dictionary + config (list[dict]): The list with the configuration dictionaries of all modular expressions + + Returns: + ModularNormalization: An instance of the specific ModularNormalization. + """ + + item_config['input'] = get_modular_expressions_by_name(item_config['input'], config)[0] + + return cls(**item_config) diff --git a/physXAI/models/modular/modular_expression.py b/physXAI/models/modular/modular_expression.py new file mode 100644 index 0000000..351ac96 --- /dev/null +++ b/physXAI/models/modular/modular_expression.py @@ -0,0 +1,403 @@ +from abc import ABC, abstractmethod +import os +from typing import Union, Type +from physXAI.models.ann.keras_models.keras_models import ConstantLayer, DivideLayer, InputSliceLayer, PowerLayer +from physXAI.preprocessing.training_data import TrainingDataGeneric +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' +import keras +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0' + + +class ModularExpression(ABC): + + feature_list = dict() + feature_list_normalized = dict() + trainable_parameters = dict() + models = dict() + modular_expression_list = list['ModularExpression']() + + def __init__(self, name: str): + self.name = name + ModularExpression.modular_expression_list.append(self) + + @staticmethod + def reset(): + ModularExpression.feature_list = dict() + ModularExpression.feature_list_normalized = dict() + ModularExpression.trainable_parameters = dict() + ModularExpression.models = dict() + ModularExpression.modular_expression_list = list() + + @abstractmethod + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + pass + + def __add__(self, other): + return ModularAdd(self, other) + + def __radd__(self, other): + return ModularAdd(other, self) + + def __sub__(self, other): + return ModularSub(self, other) + + def __rsub__(self, other): + return ModularSub(other, self) + + def __mul__(self, other): + return ModularMul(self, other) + + def __rmul__(self, other): + return ModularMul(other, self) + + def __truediv__(self, other): + return ModularTrueDiv(self, other) + + def __rtruediv__(self, other): + return ModularTrueDiv(other, self) + + def __pow__(self, other): + return ModularPow(self, other) + + def __rpow__(self, other): + return ModularPow(other, self) + + def rename(self, name: str): + self.name = name + + def _get_config(self) -> dict: + c = { + 'class_name': self.__class__.__name__, + 'name': self.name, + } + return c + + @classmethod + def _from_config(cls, item_config: dict, config: list[dict]) -> 'ModularExpression': + return cls(**item_config) + + @staticmethod + def get_config() -> list: + """ + Returns a list of configuration dictionaries for all managed modular expressions. + This list can be serialized (e.g., to JSON) to save the modular expression pipeline. + """ + + item_configs = [item._get_config() for item in ModularExpression.modular_expression_list] + return item_configs + + @staticmethod + def from_config(config: list): + """ + Reconstructs the modular expression engineering pipeline from a list of configuration dictionaries. + Clears any existing modular expressions and populates `ModularExpression.modular_expression_list` with + newly created modular expression objects based on the provided configurations. + + Args: + config (List[dict]): A list where each dictionary is the configuration + for a single modular expression object. + """ + + ModularExpression.reset() + for item_conf in config: + f = ModularExpression.get_existing_modular_expression(item_conf['name']) + if f is None: + modular_expression_from_config(item_conf, config) + + @staticmethod + def get_existing_modular_expression(name: str) -> Union['ModularExpression', None]: + """ + Retrieves a modular expression object by its name from the managed list. + + Args: + name (str): The name of the modular expression to retrieve. + + Returns: + ModularExpression or None: The found modular expression object, or None if not found. + """ + for f in ModularExpression.modular_expression_list: + if f.name == name: + return f + return None + + +def get_name(feature: Union[ModularExpression, int, float]) -> str: + if isinstance(feature, ModularExpression): + return feature.name + else: + return str(feature) + + +# --- Registry for ModularExpression Classes --- +# This registry maps class names (strings) to the actual class types (Type[ModularExpression]). +# It's used by `modular_expression_from_config` to dynamically create instances of the correct modular expression class. +CONSTRUCTED_CLASS_REGISTRY: dict[str, Type['ModularExpression']] = dict() + + +def modular_expression_from_config(item_conf: dict, config: list[dict]) -> 'ModularExpression': + """ + Factory function to create a modular expression object from its configuration dictionary. + + Args: + item_conf (dict): The configuration dictionary for a single modular expression. + Must contain 'class_name' and other necessary parameters. + config (list[dict]): The list with the configuration dictionaries of all modular expressions + + Returns: + ModularExpression: An instance of the appropriate modular expression subclass. + + Raises: + KeyError: If 'class_name' is not in `item_conf` or if the class_name is not in `CONSTRUCTED_CLASS_REGISTRY`. + """ + class_name = item_conf['class_name'] + modular_expression_class = CONSTRUCTED_CLASS_REGISTRY[class_name] + item_conf.__delitem__('class_name') + f1f = modular_expression_class._from_config(item_conf, config) + return f1f + + +def get_modular_expressions_by_name(names: Union[str, list[str]], config: list[dict]) -> list[ModularExpression]: + """ + Retrieves modular expressions by their names if they have already been constructed, + otherwise constructs the modular expression objects based on the given configuration. + + Args: + names (Union[str, list[str]]): single name (str) or list of names of the modular expressions to retrieve + config (list[dict]): The list with the configuration dictionaries of all modular expressions + + Returns: + ModularExpression: An instance of the specific ModularExpression subclass. + """ + + if isinstance(names, str): # convert str to list + names = [names] + + l = list[ModularExpression]() + for name in names: + me = ModularExpression.get_existing_modular_expression( + name) # if modular expression already constructed, retrieve it + + if me is None: # modular expression yet unconstructed + item_config = dict() + for item in config: # find config of modular expression to construct it + if item['name'] == name: + item_config = item + me = modular_expression_from_config(item_config, config) # construct modular expression + l.append(me) + return l + + +def register_modular_expression(cls): + """ + A class decorator that registers the decorated class in the CONSTRUCTED_CLASS_REGISTRY. + The class is registered using its __name__. + """ + if cls.__name__ in CONSTRUCTED_CLASS_REGISTRY: # pragma: no cover + print(f"Warning: Class '{cls.__name__}' is already registered. Overwriting.") # pragma: no cover + CONSTRUCTED_CLASS_REGISTRY[cls.__name__] = cls + return cls # Decorators must return the class (or a replacement) + + +@register_modular_expression +class ModularFeature(ModularExpression): + + def __init__(self, name: str, normalize: bool = True): + super().__init__(name) + self.normalize = normalize + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + if self.normalize and self.name in ModularExpression.feature_list_normalized.keys(): + return ModularExpression.feature_list_normalized[self.name] + elif not self.normalize and self.name in ModularExpression.feature_list.keys(): + return ModularExpression.feature_list[self.name] + else: + x = InputSliceLayer([td.columns.index(self.name)])(input_layer) + if self.normalize: + l = keras.layers.Normalization() + l.adapt(td.X_train_single[:, td.columns.index(self.name)].reshape(-1, 1)) + x = l(x) + ModularExpression.feature_list_normalized[self.name] = x + else: + ModularExpression.feature_list[self.name] = x + + return x + + def _get_config(self) -> dict: + c = super()._get_config() + c.update({ + 'normalize': self.normalize, + }) + return c + + +@register_modular_expression +class ModularTrainable(ModularExpression): + + i = 0 + + def __init__(self, name: str = None, initial_value: float = None, trainable: bool = True): + if name is None: + name = f"ModularTrainable_{ModularTrainable.i}" + ModularTrainable.i += 1 + super().__init__(name) + self.initial_value = initial_value + self.trainable = trainable + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + if self.name in ModularExpression.trainable_parameters.keys(): + return ModularExpression.trainable_parameters[self.name] + else: + l = ConstantLayer(trainable=self.trainable, weight_name=self.name, value=self.initial_value)(input_layer) + ModularExpression.trainable_parameters[self.name] = l + return l + + def _get_config(self) -> dict: + c = super()._get_config() + c.update({ + 'initial_value': self.initial_value, + 'trainable': self.trainable, + }) + return c + + +class ModularTwo(ModularExpression, ABC): + + def __init__(self, feature1: Union[ModularExpression, int, float], feature2: Union[ModularExpression, int, float], name: str): + super().__init__(name) + self.feature1 = feature1 + self.feature2 = feature2 + + def construct(self, input_layer: keras.layers.Input, td: TrainingDataGeneric) -> keras.layers.Layer: + if isinstance(self.feature1, (int, float)): + l1 = ConstantLayer(value=self.feature1)(input_layer) + else: + l1 = self.feature1.construct(input_layer, td) + + if isinstance(self.feature2, (int, float)): + l2 = ConstantLayer(value=self.feature2)(input_layer) + else: + l2 = self.feature2.construct(input_layer, td) + + return self._construct(l1, l2) + + @abstractmethod + def _construct(self, layer1: keras.layers.Layer, layer2: keras.layers.Layer) -> keras.layers.Layer: + pass + + def _get_config(self) -> dict: + c = super()._get_config() + if isinstance(self.feature1, ModularExpression): + f1n = self.feature1.name + else: + f1n = self.feature1 + if isinstance(self.feature2, ModularExpression): + f2n = self.feature2.name + else: + f2n = self.feature2 + c.update({ + 'feature1': f1n, + 'feature2': f2n, + }) + return c + + @classmethod + def _from_config(cls, item_config: dict, config: list[dict]) -> 'ModularTwo': + """ + Creates a ModularTwo instance (or its subclass) from a configuration dictionary. + Handles reconstruction of operand modular expressions if they were ModularExpression objects. + + Args: + item_config (dict): Configuration dictionary. Must contain 'feature1' and 'feature2'. + config (list[dict]): The list with the configuration dictionaries of all modular expressions + + Returns: + ModularTwo: An instance of the specific ModularTwo subclass. + """ + + # Reconstruct feature 1 + if isinstance(item_config['feature1'], dict): + feature_conf = item_config['feature1'] + # Check if modular expression already exists + f1n = ModularExpression.get_existing_modular_expression(feature_conf['name']) + if f1n is None: + f1n = modular_expression_from_config(feature_conf, config) + elif isinstance(item_config['feature1'], str): + f1n = ModularExpression.get_existing_modular_expression(item_config['feature1']) + else: # feature is int or float + f1n = item_config['feature1'] + item_config['feature1'] = f1n + + # Reconstruct feature 2 + if isinstance(item_config['feature2'], dict): + feature_conf = item_config['feature2'] + # Check if modular expression already exists + f2n = ModularExpression.get_existing_modular_expression(feature_conf['name']) + if f2n is None: + f2n = modular_expression_from_config(feature_conf, config) + elif isinstance(item_config['feature2'], str): + f2n = ModularExpression.get_existing_modular_expression(item_config['feature2']) + else: # feature is int or float + f2n = item_config['feature2'] + item_config['feature2'] = f2n + + return cls(**item_config) + + +@register_modular_expression +class ModularAdd(ModularTwo): + + def __init__(self, feature1: Union[ModularExpression, int, float], feature2: Union[ModularExpression, int, float], name: str = None): + if name is None: + name = f"({get_name(feature1)}+{get_name(feature2)})" + super().__init__(feature1, feature2, name) + + def _construct(self, layer1: keras.layers.Layer, layer2: keras.layers.Layer) -> keras.layers.Layer: + return keras.layers.Add()([layer1, layer2]) + + +@register_modular_expression +class ModularSub(ModularTwo): + + def __init__(self, feature1: Union[ModularExpression, int, float], feature2: Union[ModularExpression, int, float], name: str = None): + if name is None: + name = f"({get_name(feature1)}-{get_name(feature2)})" + super().__init__(feature1, feature2, name) + + def _construct(self, layer1: keras.layers.Layer, layer2: keras.layers.Layer) -> keras.layers.Layer: + return keras.layers.Subtract()([layer1, layer2]) + + +@register_modular_expression +class ModularMul(ModularTwo): + + def __init__(self, feature1: Union[ModularExpression, int, float], feature2: Union[ModularExpression, int, float], name: str = None): + if name is None: + name = f"({get_name(feature1)}*{get_name(feature2)})" + super().__init__(feature1, feature2, name) + + def _construct(self, layer1: keras.layers.Layer, layer2: keras.layers.Layer) -> keras.layers.Layer: + return keras.layers.Multiply()([layer1, layer2]) + + +@register_modular_expression +class ModularTrueDiv(ModularTwo): + + def __init__(self, feature1: Union[ModularExpression, int, float], feature2: Union[ModularExpression, int, float], name: str = None): + if name is None: + name = f"({get_name(feature1)}/{get_name(feature2)})" + super().__init__(feature1, feature2, name) + + def _construct(self, layer1: keras.layers.Layer, layer2: keras.layers.Layer) -> keras.layers.Layer: + return DivideLayer()([layer1, layer2]) + + +@register_modular_expression +class ModularPow(ModularTwo): + + def __init__(self, feature1: Union[ModularExpression, int, float], feature2: Union[ModularExpression, int, float], name: str = None): + if name is None: + name = f"({get_name(feature1)}**{get_name(feature2)})" + super().__init__(feature1, feature2, name) + + def _construct(self, layer1: keras.layers.Layer, layer2: keras.layers.Layer) -> keras.layers.Layer: + return PowerLayer()([layer1, layer2]) diff --git a/physXAI/preprocessing/constructed.py b/physXAI/preprocessing/constructed.py index cd7e429..fc08b31 100644 --- a/physXAI/preprocessing/constructed.py +++ b/physXAI/preprocessing/constructed.py @@ -122,6 +122,10 @@ def get_config(self) -> dict: def from_config(cls, config: dict) -> 'FeatureBase': return cls(**config) + def input(self, normalize: bool = True): + from physXAI.models.modular.modular_expression import ModularFeature + return ModularFeature(self.feature, normalize=normalize) + # --- Registry for Feature Classes --- # This registry maps class names (strings) to the actual class types (Type[FeatureBase]). @@ -166,7 +170,9 @@ class Feature(FeatureBase): Represents a basic feature that is assumed to exist directly in the input DataFrame. Its `process` method simply retrieves the column by its name. """ - pass + def __init__(self, name: str, **kwargs): + super().__init__(name, **kwargs) + FeatureConstruction.add_input(self.feature) @register_feature @@ -195,6 +201,7 @@ def __init__(self, f: Union[FeatureBase, str], lag: int, name: str = None, **kwa name = f.feature + f'_lag{lag}' super().__init__(name) self.lag: int = lag + FeatureConstruction.add_input(self.feature) def process(self, df: DataFrame) -> Series: if self.feature not in df.columns: @@ -525,11 +532,13 @@ class FeatureConstruction: """ features = list[FeatureBase]() + inputs = list[str]() @staticmethod def reset(): """Clears all registered features and input names.""" FeatureConstruction.features = list[FeatureBase]() + FeatureConstruction.inputs = list[str]() @staticmethod def append(f: FeatureBase): @@ -543,6 +552,17 @@ def append(f: FeatureBase): if FeatureConstruction.get_feature(f.feature) is None: FeatureConstruction.features.append(f) + @staticmethod + def add_input(name: str): + """ + Adds a feature name to the list of input features. + + Args: + name (str): The name of the input feature to add. + """ + if name not in FeatureConstruction.inputs: + FeatureConstruction.inputs.append(name) + @staticmethod def get_feature(name: str) -> Union[FeatureBase, None]: """ diff --git a/physXAI/preprocessing/preprocessing.py b/physXAI/preprocessing/preprocessing.py index e32dd97..75bd2c7 100644 --- a/physXAI/preprocessing/preprocessing.py +++ b/physXAI/preprocessing/preprocessing.py @@ -152,8 +152,8 @@ def __init__(self, inputs: list[str], output: Union[str, list[str]], shift: int ignore_nan (bool): If True, rows with NaN values will be dropped. If False, an error is raised if NaNs are present. Default is False. """ - super().__init__(inputs, output, shift, time_step, test_size, val_size, random_state, time_index_col, - csv_delimiter, csv_encoding, csv_header, csv_skiprows, ignore_nan) + super().__init__(inputs=inputs, output=output, shift=shift, time_step=time_step, test_size=test_size, val_size=val_size, random_state=random_state, time_index_col=time_index_col, + csv_delimiter=csv_delimiter, csv_encoding=csv_encoding, csv_header=csv_header, csv_skiprows=csv_skiprows, ignore_nan=ignore_nan) def process_data(self, df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]: """ @@ -301,8 +301,8 @@ def __init__(self, inputs: list[str], output: Union[str, list[str]], label_width If None and warmup_width > 0, defaults to `inputs`. If None and warmup_width <= 0, defaults to empty list. """ - super().__init__(inputs, output, shift, time_step, test_size, val_size, random_state, time_index_col, - csv_delimiter, csv_encoding, csv_header, csv_skiprows) + super().__init__(inputs=inputs, output=output, shift=shift, time_step=time_step, test_size=test_size, val_size=val_size, random_state=random_state, time_index_col=time_index_col, + csv_delimiter=csv_delimiter, csv_encoding=csv_encoding, csv_header=csv_header, csv_skiprows=csv_skiprows) self.overlapping_sequences = overlapping_sequences diff --git a/physXAI/utils/logging.py b/physXAI/utils/logging.py index 7f873d7..aee4cbe 100644 --- a/physXAI/utils/logging.py +++ b/physXAI/utils/logging.py @@ -109,6 +109,7 @@ class Logger: base_path = 'stored_data' save_name_model: str = 'model' save_name_model_online_learning: str = 'model_ol' + save_name_modular_expression_config: str = 'modular_expression_config.json' _logger = None _override = False @@ -156,7 +157,7 @@ def setup_logger(folder_name: str = None, override: bool = False, base_path: str @staticmethod def log_setup(preprocessing=None, model=None, save_name_preprocessing=None, save_name_model=None, - save_name_constructed=None): + save_name_constructed=None, save_name_modular_expression=None): if Logger._logger is None: Logger.setup_logger() @@ -198,6 +199,19 @@ def log_setup(preprocessing=None, model=None, save_name_preprocessing=None, save with open(path, "w") as f: json.dump(model_dict, f, indent=4) + from physXAI.models.modular.modular_expression import ModularExpression + modular_expression_config = ModularExpression.get_config() + if len(modular_expression_config) > 0: + if save_name_modular_expression is None: + save_name_modular_expression = Logger.save_name_modular_expression_config + path = os.path.join(Logger._logger, save_name_modular_expression) + path = create_full_path(path) + Logger.override_question(path) + with open(path, "w") as f: + json.dump(modular_expression_config, f, indent=4) + + ModularExpression.reset() + @staticmethod def save_training_data(training_data, path: str = None): if Logger._logger is None: @@ -232,10 +246,19 @@ def save_training_data(training_data, path: str = None): pickle.dump(training_data, f) @staticmethod - def get_model_savepath(): + def get_model_savepath(save_name_model: str = None) -> str: + """ + returns the path the model is saved to + + Args: + save_name_model (str): optional name the model is saved with (string without .keras), + default: Logger.save_name_model + """ if Logger._logger is None: Logger.setup_logger() + if save_name_model is None: + save_name_model = Logger.save_name_model - p = os.path.join(Logger._logger, Logger.save_name_model) + p = os.path.join(Logger._logger, save_name_model) return p diff --git a/unittests/modular/test_modular.py b/unittests/modular/test_modular.py new file mode 100644 index 0000000..536a467 --- /dev/null +++ b/unittests/modular/test_modular.py @@ -0,0 +1,150 @@ +import os +import numpy as np +import pandas as pd +import json +import copy +from pathlib import Path +from physXAI.models.modular.modular_expression import ModularTrainable, ModularExpression +from physXAI.models.ann.ann_design import ClassicalANNModel, CMNNModel +from physXAI.models.modular.modular_ann import (ModularANN, ModularAverage, ModularLinear, ModularModel, + ModularExistingModel, ModularMonotoneLinear, ModularPolynomial, + ModularNormalization) +from physXAI.utils.logging import Logger +from physXAI.preprocessing.constructed import Feature +from physXAI.models.models import AbstractModel +from physXAI.preprocessing.preprocessing import PreprocessingSingleStep +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0' + +base_path = os.path.join(Path(__file__).resolve().parent.parent.parent, 'stored_data') + + +def test_generate_sample_csv(output_path: str = "data/sample_data.csv", num_rows: int = 1200, num_features: int = 4, seed: int = 42, value_range: tuple = (-100, 100)): + np.random.seed(seed) + + columns = [f"x{i}" for i in range(1, num_features + 1)] + + data = {} + + for col in columns: + data[col] = np.random.uniform(value_range[0], value_range[1], num_rows) + + data_with_index = {"": range(num_rows)} + data_with_index.update(data) + + df = pd.DataFrame(data_with_index) + + # Ensure output directory exists + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + + df.to_csv(output_path, sep=";", index=False) + + print(f"Sample CSV file generated at: {output_path}") + + +def test_generate_sample_model(random_seed: int = 42, training_data_path: str = "data/sample_data.csv"): + Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_modular', override=True) + + inputs = [f"x{i}" for i in range(1, 4)] + output = "x4" + + features = list() + for inp in inputs: + features.append(Feature(inp)) + + prep = PreprocessingSingleStep(inputs=inputs, output=output, random_state=random_seed) + td = prep.pipeline(training_data_path) + + # TODO: Flatten, BatchNorm, Cropping1D, Reshape, RBF + + m1 = ModularModel(ClassicalANNModel(random_seed=random_seed), inputs=features) + m2 = ModularTrainable(initial_value=0.5) + mX = ModularTrainable(initial_value=5) + mY = ModularTrainable(initial_value=0.5) + m3 = mX + mY + m4 = mX - mY + m5 = mX * mY + m6 = mX / mY + m7 = mX ** mY + m8 = ModularAverage([mX, mY]) + + # Existing model + cmnn = CMNNModel(monotonies={'x1': 1, 'x2': -1, 'x3': 0}, activation_split=[1, 1, 1], epochs=50) + cmnn_model = cmnn.pipeline(td, save_model=False, plot=False) + me = ModularExistingModel(model=cmnn_model, original_inputs=features, trainable=False) + + mml = ModularMonotoneLinear(inputs=[m3, m4], monotonicities={m3.name: 1, m4.name: -1}) + mp = ModularPolynomial(inputs=[m5, m7, m8], degree=3) + mn = ModularNormalization(input=m2) + + out = ModularLinear([ + m1, + m6, + me, + mml, + mp, + mn, + ]) + + m = ModularANN(architecture=out, epochs=50, random_seed=random_seed) + model = m.pipeline(td, plot=True, save_model=True) + + Logger.log_setup(preprocessing=prep, model=m) + + +def test_read_setup(training_data_path: str = "data/sample_data.csv"): + Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_modular', override=True) + + # Read setup + save_name_preprocessing = Logger.save_name_preprocessing + path = os.path.join(Logger._logger, save_name_preprocessing) + with open(path, "r") as f: + config_prep = json.load(f) + prep = PreprocessingSingleStep.from_config(config_prep) + + save_name_modular_expression = Logger.save_name_modular_expression_config + path = os.path.join(Logger._logger, save_name_modular_expression) + with open(path, "r") as f: + modular_expression_config = json.load(f) + stored_config = copy.deepcopy(modular_expression_config) + ModularExpression.from_config(modular_expression_config) + assert check_lists_equal(stored_config, ModularExpression.get_config()) + + save_name_model = Logger.save_name_model_config + path = os.path.join(Logger._logger, save_name_model) + with open(path, "r") as f: + config_model = json.load(f) + stored_config = copy.deepcopy(config_model) + m = AbstractModel.model_from_config(config_model) + assert check_lists_equal(stored_config, m.get_config()) + + td = prep.pipeline(training_data_path) + model = m.pipeline(td, plot=True, save_model=True) + + +def check_lists_equal(list1, list2): + """Check if all elements in list1 exist and are equal to those in list2.""" + + def make_hashable(d): + """Convert dictionary values to hashable types.""" + if isinstance(d, dict): + return frozenset((k, make_hashable(v)) for k, v in d.items()) + elif isinstance(d, list): + return tuple(make_hashable(i) for i in d) + elif hasattr(d, '__dict__'): # Check if it's an object with attributes + return frozenset((key, make_hashable(value)) for key, value in d.__dict__.items()) + else: + return d # Return as is if it's already hashable + + set1 = {make_hashable(d) for d in list1} + set2 = {make_hashable(d) for d in list2} + + return set1 == set2 + + +if __name__ == "__main__": + test_generate_sample_model() + test_generate_sample_model() + test_read_setup() + \ No newline at end of file diff --git a/unittests/test_coverage.py b/unittests/test_coverage.py index e17b448..98da362 100644 --- a/unittests/test_coverage.py +++ b/unittests/test_coverage.py @@ -75,7 +75,7 @@ def test_preprocessing(monkeypatch, file_path, inputs_php, output_php): FeatureConstant(1, 'name') # Create & process Training data - prep = PreprocessingSingleStep(inputs_php, output_php) + prep = PreprocessingSingleStep(inputs=inputs_php, output=output_php) prep.pipeline(file_path) def test_preprocessing_multistep(file_path, inputs_tair, output_tair): @@ -89,7 +89,7 @@ def test_preprocessing_multistep(file_path, inputs_tair, output_tair): x3.lag(2) # oveHeaPumY_u_lag1, oveHeaPumY_u_lag2 # EvaluateMultiStep: Prepare Preprocessing - prep = PreprocessingMultiStep(inputs_tair, output_tair, 6, 6, init_features=['reaTZon_y'], + prep = PreprocessingMultiStep(inputs=inputs_tair, output=output_tair, label_width=6, warmup_width=6, init_features=['reaTZon_y'], overlapping_sequences=False, batch_size=1) prep.pipeline(file_path) @@ -98,7 +98,7 @@ def p_hp_data(file_path, inputs_php, output_php): # Setup up logger for saving Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True) # Create & process Training data - prep = PreprocessingSingleStep(inputs_php, output_php) + prep = PreprocessingSingleStep(inputs=inputs_php, output=output_php) td = prep.pipeline(file_path) return prep, td @@ -111,7 +111,7 @@ def tair_data_delta(file_path, inputs_tair, output_tair): x2.lag(1) # weaSta_reaWeaTDryBul_y_lag1 x3 = Feature('oveHeaPumY_u') x3.lag(2) # oveHeaPumY_u_lag1, oveHeaPumY_u_lag2 - prep = PreprocessingMultiStep(inputs_tair, output_tair, 3, 0, init_features=['reaTZon_y'], + prep = PreprocessingMultiStep(inputs=inputs_tair, output=output_tair, label_width=3, warmup_width=0, init_features=['reaTZon_y'], overlapping_sequences=False, batch_size=1) td = prep.pipeline(file_path) return prep, td @@ -125,7 +125,7 @@ def tair_data_noval(file_path, inputs_tair, output_tair): x2.lag(1) # weaSta_reaWeaTDryBul_y_lag1 x3 = Feature('oveHeaPumY_u') x3.lag(2) # oveHeaPumY_u_lag1, oveHeaPumY_u_lag2 - prep = PreprocessingMultiStep(inputs_tair, output_tair, 3, 0, init_features=['reaTZon_y'], + prep = PreprocessingMultiStep(inputs=inputs_tair, output=output_tair, label_width=3, warmup_width=0, init_features=['reaTZon_y'], overlapping_sequences=False, batch_size=1, val_size=0) td = prep.pipeline(file_path) return prep, td @@ -140,7 +140,7 @@ def tair_data_total(file_path, inputs_tair, output_tair): x2.lag(1) # weaSta_reaWeaTDryBul_y_lag1 x3 = Feature('oveHeaPumY_u') x3.lag(2) # oveHeaPumY_u_lag1, oveHeaPumY_u_lag2 - prep = PreprocessingMultiStep(inputs_tair, 'reaTZon_y', 3, 0, init_features=['reaTZon_y'], + prep = PreprocessingMultiStep(inputs=inputs_tair, output='reaTZon_y', label_width=3, warmup_width=0, init_features=['reaTZon_y'], overlapping_sequences=False, batch_size=1) td = prep.pipeline(file_path) return prep, td @@ -150,7 +150,7 @@ def test_model_linReg(inputs_php, output_php, file_path): Logger.setup_logger(base_path=base_path, folder_name='unittests\\test_coverage', override=True) # Create & process Training data - prep = PreprocessingSingleStep(inputs_php, output_php, val_size=0) + prep = PreprocessingSingleStep(inputs=inputs_php, output=output_php, val_size=0) td = prep.pipeline(file_path) # Check Models @@ -257,12 +257,12 @@ def test_model_pinn(inputs_php, output_php, file_path): pinn.rename('pinn') # PINN: Preprocessing - prep = PreprocessingSingleStep(inputs_php, output_php) + prep = PreprocessingSingleStep(inputs=inputs_php, output=output_php) td = prep.pipeline(file_path) m = PINNModel(pinn_weights=[1], epochs=1, n_neurons=4) m.pipeline(td, save_model=False, plot=False) - prep = PreprocessingSingleStep(inputs_php, output_php, val_size=0) + prep = PreprocessingSingleStep(inputs=inputs_php, output=output_php, val_size=0) td = prep.pipeline(file_path) m = PINNModel(pinn_weights=None, epochs=1, n_neurons=4) m.pipeline(td, save_model=True, plot=False) @@ -281,7 +281,7 @@ def test_models_rnn(file_path): inputs = ['weaSta_reaWeaTDryBul_y', 'weaSta_reaWeaHDirNor_y', 'oveHeaPumY_u'] inits = ['reaTZon_y'] output = 'reaTZon_y' - prep = PreprocessingMultiStep(inputs, output, 4, 2, init_features=inits) + prep = PreprocessingMultiStep(inputs=inputs, output=output, label_width=4, warmup_width=2, init_features=inits) td = prep.pipeline(file_path) m = RNNModel(epochs=1, rnn_layer='LSTM', init_layer='dense') @@ -302,7 +302,7 @@ def test_models_rnn(file_path): m = RNNModel(epochs=1, rnn_layer='RNN') m.pipeline(td, save_model=True, plot=False) - prep = PreprocessingMultiStep(inputs, output, 4, 0, val_size=0) + prep = PreprocessingMultiStep(inputs=inputs, output=output, label_width=4, warmup_width=0, val_size=0) td = prep.pipeline(file_path) m = RNNModel(epochs=1, rnn_layer='LSTM', early_stopping_epochs=None) m.pipeline(td, save_model=False, plot=False) diff --git a/unittests/verify_installation.py b/unittests/verify_installation.py index 93c768b..6dbddd9 100644 --- a/unittests/verify_installation.py +++ b/unittests/verify_installation.py @@ -21,7 +21,7 @@ output = 'reaPHeaPum_y' # Create Training data -prep = PreprocessingSingleStep(inputs, output) +prep = PreprocessingSingleStep(inputs=inputs, output=output) # Process Training data td = prep.pipeline(file_path)