diff --git a/avaframe/ana4Stats/probAna.py b/avaframe/ana4Stats/probAna.py index 18afc1dee..2108fb7d3 100644 --- a/avaframe/ana4Stats/probAna.py +++ b/avaframe/ana4Stats/probAna.py @@ -24,7 +24,6 @@ from avaframe.out3Plot import statsPlots as sP from avaframe.in1Data import getInput as gI - # create local logger # change log level in calling module to DEBUG to see log messages log = logging.getLogger(__name__) @@ -203,17 +202,17 @@ def updateCfgRange(cfg, cfgProb, varName, varDict): if valVariation == "": valVariation = "-" parValue = ( - variationType - + "$" - + valSteps - + "$" - + valVariation - + "$" - + cfgDist["GENERAL"]["minMaxInterval"] - + "$" - + cfgDist["GENERAL"]["buildType"] - + "$" - + cfgDist["GENERAL"]["support"] + variationType + + "$" + + valSteps + + "$" + + valVariation + + "$" + + cfgDist["GENERAL"]["minMaxInterval"] + + "$" + + cfgDist["GENERAL"]["buildType"] + + "$" + + cfgDist["GENERAL"]["support"] ) # if variation using percent elif variationType.lower() == "percent": @@ -225,9 +224,9 @@ def updateCfgRange(cfg, cfgProb, varName, varDict): parValue = valVariation + "$" + valSteps if "ci" in valVariation: message = ( - "Variation Type: range - variationValue is %s not a valid option - only \ - scalar value allowed or consider variationType rangefromci" - % valVariation + "Variation Type: range - variationValue is %s not a valid option - only \ + scalar value allowed or consider variationType rangefromci" + % valVariation ) log.error(message) raise AssertionError(message) @@ -236,9 +235,9 @@ def updateCfgRange(cfg, cfgProb, varName, varDict): parValue = valVariation + "$" + valSteps else: message = ( - "Variation Type: %s - not a valid option, options are: percent, range, \ - normaldistribution, rangefromci" - % variationType + "Variation Type: %s - not a valid option, options are: percent, range, \ + normaldistribution, rangefromci" + % variationType ) log.error(message) raise AssertionError(message) @@ -272,9 +271,9 @@ def updateCfgRange(cfg, cfgProb, varName, varDict): valValues = np.linspace(float(valStart), float(valStop), int(valSteps)) else: message = ( - "Variation Type: %s - not a valid option, options are: percent, range, \ - normaldistribution, rangefromci" - % variationType + "Variation Type: %s - not a valid option, options are: percent, range, \ + normaldistribution, rangefromci" + % variationType ) log.error(message) raise AssertionError(message) @@ -616,8 +615,8 @@ def makeDictFromVars(cfg): if (len(varParList) == len(varValues) == len(cfg[lengthsPar].split("|")) == len(varTypes)) is False: message = ( - "For every parameter in varParList a variationValue, %s and variationType needs to be provided" - % lengthsPar + "For every parameter in varParList a variationValue, %s and variationType needs to be provided" + % lengthsPar ) log.error(message) raise AssertionError(message) @@ -801,13 +800,14 @@ def createSampleWithVariationStandardParameters(cfgProb, cfgStart, varParList, v "values": sampleWBounds, "typeList": cfgProb["PROBRUN"]["varParType"].split("|"), "thFromIni": "", + "bounds": np.column_stack((lowerBounds, upperBounds)).tolist() } return paramValuesD def createSampleWithVariationForThParameters( - avaDir, cfgProb, cfgStart, varParList, valVariationValue, varType, thReadFromShp + avaDir, cfgProb, cfgStart, varParList, valVariationValue, varType, thReadFromShp ): """Create a sample of parameters for a desired parameter variation, and fetch thickness values from shp file and perform variation for each feature within @@ -907,23 +907,23 @@ def createSampleWithVariationForThParameters( # set lower and upper bounds depending on varType (percent, range, rangefromci) lowerBounds[fullVarType == "percent"] = varValList[fullVarType == "percent"] - varValList[ fullVarType == "percent" - ] * (fullValVar[fullVarType == "percent"] / 100.0) + ] * (fullValVar[fullVarType == "percent"] / 100.0) upperBounds[fullVarType == "percent"] = varValList[fullVarType == "percent"] + varValList[ fullVarType == "percent" - ] * (fullValVar[fullVarType == "percent"] / 100.0) + ] * (fullValVar[fullVarType == "percent"] / 100.0) lowerBounds[fullVarType == "range"] = ( - varValList[fullVarType == "range"] - fullValVar[fullVarType == "range"] + varValList[fullVarType == "range"] - fullValVar[fullVarType == "range"] ) upperBounds[fullVarType == "range"] = ( - varValList[fullVarType == "range"] + fullValVar[fullVarType == "range"] + varValList[fullVarType == "range"] + fullValVar[fullVarType == "range"] ) lowerBounds[fullVarType == "rangefromci"] = ( - varValList[fullVarType == "rangefromci"] - ciValues[fullVarType == "rangefromci"] + varValList[fullVarType == "rangefromci"] - ciValues[fullVarType == "rangefromci"] ) upperBounds[fullVarType == "rangefromci"] = ( - varValList[fullVarType == "rangefromci"] + ciValues[fullVarType == "rangefromci"] + varValList[fullVarType == "rangefromci"] + ciValues[fullVarType == "rangefromci"] ) # create a sample of parameter values using scipy latin hypercube or morris sampling @@ -1102,9 +1102,10 @@ def createCfgFiles(paramValuesDList, comMod, cfg, cfgPath=""): cfgStart[section][par] = str(pVal[index]) else: cfgStart["GENERAL"][par] = str(pVal[index]) - if modName.lower() in ["com1dfa", "com5snowslide", "com6rockavalanche"]: + if modName.lower() in ["com1dfa", "com5snowslide", "com6rockavalanche", 'com8motpsa']: cfgStart["VISUALISATION"]["scenario"] = str(count1) cfgStart["INPUT"]["thFromIni"] = paramValuesD["thFromIni"] + cfgStart["VISUALISATION"]["sampleMethod"] = cfg['PROBRUN']['sampleMethod'] if "releaseScenario" in paramValuesD.keys(): cfgStart["INPUT"]["releaseScenario"] = paramValuesD["releaseScenario"] cfgF = pathlib.Path(cfgPath, ("%d_%sCfg.ini" % (countS, modName))) diff --git a/avaframe/ana4Stats/probAnaCfg.ini b/avaframe/ana4Stats/probAnaCfg.ini index 1859b48b4..bdddd6e8a 100644 --- a/avaframe/ana4Stats/probAnaCfg.ini +++ b/avaframe/ana4Stats/probAnaCfg.ini @@ -40,7 +40,7 @@ samplingStrategy = 1 # #++++++VARIATION INFO FOR DRAW SAMPLES FROM FULL SET OF VARIATIONS # type of parameters that shall be varied -separated by | (options: float) varParType = float|float -# factor used to create the number of samples, if morris number of samples depends on number of varied variables and number of trajectories, for now use nSample as number of trajectories +# factor used to create the number of samples, if morris: number of samples depends on number of varied variables and number of trajectories, for now use nSample as number of trajectories, n >=2 for morris nSample = 40 # sample method used to create sample (options: latin, morris) sampleMethod = latin @@ -71,13 +71,13 @@ frictModel = samosAT [com4FlowPy_com4FlowPy_override] -# use default com1DFA config as base configuration (True) and override following parameters +# use default com4FlowPy config as base configuration (True) and override following parameters # if False and local is available use local defaultConfig = True [com8MoTPSA_com8MoTPSA_override] -# use default com1DFA config as base configuration (True) and override following parameters +# use default com8MoTPSA config as base configuration (True) and override following parameters # if False and local is available use local defaultConfig = True diff --git a/avaframe/ana6Optimisation/README_ana6.md b/avaframe/ana6Optimisation/README_ana6.md new file mode 100644 index 000000000..bca9db16c --- /dev/null +++ b/avaframe/ana6Optimisation/README_ana6.md @@ -0,0 +1,140 @@ +# ana6 – Sensitivity Analysis & Optimisation + +The `ana6Optimisation` module provides tools for performing Morris sensitivity analysis and parameter optimisation within the AvaFrame workflow. It supports input parameter ranking, convergence analysis of sensitivity indices and surrogate-based optimisation strategies. The module can be used either sequentially (Morris analysis followed by optimisation) or independently for direct optimisation. + +--- + +## Module Structure + +The module contains the following files: + +- `runMorrisSA.py` (configuration: `runMorrisSACfg.ini`) +- `runPlotMorrisConvergence.py` (uses `runMorrisSACfg.ini`) +- `runOptimisation.py` (configuration: `runOptimisationCfg.ini`) +- `optimisationUtils.py` + +--- + +## Workflow +### Reference Data and Working Directory + +All scripts must be executed within the directory: `avaframe/ana6Optimisation` + +In `avaframeCfg.ini`, the avalanche reference directory (`avalancheDir`) must include the suffix `../`, for example: `../data/avaFleisskar` + +This ensures correct relative path resolution within the AvaFrame project structure. + +To compute goodness-of-fit metrics between reference and simulation results and to perform AIMEC analysis, the following reference data must be provided in: `avaframe/data//Inputs` + +The required folder structure is: +Folder: +- **LINES** + Contains the AIMEC path. + +- **POLYGONS** + Contains Cropshape and defines the maximal extent of runout area that is used for calculating areal indicators. + +- **REFDATA** + Defines the runout area of the reference event. + +- **REL** + Defines the release area of the avalanche event. + +File: +- **Digital Elevation Model (DEM)** + Must be placed directly in the `Inputs` directory and must cover the entire affected area. + +More Details here: https://docs.avaframe.org/en/latest/moduleCom1DFA.html + +___ + +### Morris Sensitivity Analysis (MorrisSA) + +The Morris sensitivity analysis provides a ranking of input parameters based on their influence on the model response. + +Before running `runMorrisSA.py`, the following step is required prior: + +- Execute `runAna4ProbAnaCom8MoTPSA` +- In `probAnaCfg.ini`: + - Set the sampling method to `'morris'` + - Define the number of Morris trajectories (`nSample`) + - Select the input parameters and define their variation bounds + +This step generates the required simulations and stores the sampled parameters and their bounds in a pickle file. + +**Afterwards:** + +- Run `runMorrisSA.py` +- Configure settings via `runMorrisSACfg.ini` +- The `MORRIS_CONVERGENCE` setting can be ignored for standard sensitivity analysis + +**Outputs:** + +- Pickle file containing: + - Ranked input parameters + - Morris sensitivity indices + - Parameter bounds +- Visualisation plots of the sensitivity results + +--- + +### Morris Convergence Analysis + +The convergence analysis evaluates how the Morris sensitivity indices stabilise with increasing numbers of trajectories. Its purpose is to determine the minimum number of trajectories that yields robust results. + +**Requirements:** + +- Run `runAna4ProbAnaCom8MoTPSA` multiple times with different numbers of Morris trajectories +- Rename Output folders afterwards with the following naming convention: OutputsR + + +where `` corresponds to the number of trajectories + +This process is computationally expensive, as it requires a large number of simulations. + +**Execution:** + +- Run `runPlotMorrisConvergence.py` + +**Output:** + +- Convergence plots of Morris sensitivity indices + +--- + +### Optimisation + +The optimisation process identifies the set of input parameters that yields the best agreement between simulation results and a defined reference. "Best" is defined by the objective function implemented in the optimisation routine. + +Optimisation can be performed in two ways: + +**With prior Morris analysis:** + - Parameter ranking is available + - Parameter bounds are already defined + - Execute `runOpmisiation.py` with scenario 1 in `runOptimisationCfg.ini` + +**Without prior Morris analysis:** + - Execute `runAna4ProbAnaCom8MoTPSA.py` to generate some initial samples (for surrogate) + - In `probAnaCfg.ini`: + - Set the sampling method to `'latin'` + - Define the number of model runs (`nSample`) + - Select the input parameters and define their variation bounds + - Execute `runOpmisiation.py` with scenario 2 in `runOptimisationCfg.ini` + +**Two optimisation strategies are implemented:** + +- Surrogate-based non-sequential optimisation +- Surrogate-based Bayesian (sequential) optimisation + +**Outputs:** + +- Optimal parameter set +- Visualisation plots of the optimisations results and progress + +--- + +## Notes + +- Performing Morris sensitivity analysis before optimisation is recommended to reduce the parameter space. +- Convergence analysis significantly increases computational cost. +- All workflows are controlled via `.ini` configuration files. \ No newline at end of file diff --git a/avaframe/ana6Optimisation/optimisationUtils.py b/avaframe/ana6Optimisation/optimisationUtils.py new file mode 100644 index 000000000..c7bcec60e --- /dev/null +++ b/avaframe/ana6Optimisation/optimisationUtils.py @@ -0,0 +1,892 @@ +import numpy as np +import pathlib +import pickle +import pandas as pd +import configparser +import os +import time +from scipy.stats import norm, qmc +from datetime import datetime +import re + +from sklearn.model_selection import KFold, cross_validate +from sklearn.preprocessing import StandardScaler +from sklearn.pipeline import Pipeline +from sklearn.gaussian_process import GaussianProcessRegressor +from sklearn.gaussian_process.kernels import Matern, WhiteKernel, ConstantKernel + +from avaframe.in3Utils import cfgUtils, initializeProject, logUtils +from avaframe.com8MoTPSA import com8MoTPSA +from avaframe.ana4Stats import probAna +from avaframe.runScripts.runPlotAreaRefDiffs import runPlotAreaRefDiffs +import avaframe.out3Plot.outAna6Plots as saveResults + + +def calcArealIndicatorsAndAimec(cfgOpt, avalancheDir, ana3AIMEC): + """ + Calculate areal indicators between reference polygon and simulation and AIMEC analysis and save data in ana3AIMEC and out1Peak. + + Parameters + ---------- + cfgOpt : configparser.ConfigParser + Global configuration object. Must contain section "GENERAL" + with keys: + - resType + - thresholdValueSimulation + - modName + avalancheDir : str + Directory containing the directory of the reference avalanche + ana3AIMEC : module + AIMEC analysis module providing `fullAimecAnalysis()`. + + """ + # ToDo: to reduce comp. cost: run AIMEC and calcArealIndicators only for new simulations + # Areal indicators + resType = cfgOpt["GENERAL"]["resType"] + thresholdValueSimulation = float(cfgOpt["GENERAL"]["thresholdValueSimulation"]) + modName = cfgOpt["GENERAL"]["modName"] + runPlotAreaRefDiffs(resType, thresholdValueSimulation, modName) + + # AIMEC + cfgAIMEC = cfgUtils.getModuleConfig(ana3AIMEC) + rasterTransfo, resAnalysisDF, plotDict, _, pathDict = ana3AIMEC.fullAimecAnalysis(avalancheDir, cfgAIMEC) + + +def readParamSetDF(inDir, varParList): + """ + Read parameter sets from .ini files in a directory and build a DataFrame. + + Parameters + ---------- + inDir : str or pathlib.Path + Path to directory containing .ini files. + varParList : list of str + List of parameter names to extract values from each .ini file. + + Returns + ------- + paramSetDF : pandas.DataFrame + DataFrame with simName, parameterSet and order as columns + """ + + # List to hold all parameters sets + paramSet = [] + order = [] + sampleMethods = [] + filenames = [] + + # Loop over all files in the folder + for filename in os.listdir(inDir): + if filename.endswith('.ini') and 'sourceConfiguration' not in filename: + filepath = os.path.join(inDir, filename) + + # Load the .ini file + config = configparser.ConfigParser() + config.read(filepath) + + if 'VISUALISATION' in config.sections(): + # config is inifile + index = config['VISUALISATION']['scenario'] + + if 'VISUALISATION' in config.sections(): + # config is inifile + index = config['VISUALISATION']['scenario'] + if 'sampleMethod' in config['VISUALISATION']: + sampleMethod = config['VISUALISATION']['sampleMethod'] + else: + sampleMethod = np.nan + + row = [] # row contains 1 row + for param in varParList: + section = probAna.fetchParameterSection(config, param) + value = config[section][param] + value = float(value) + row.append(value) + + order.append(index) + sampleMethods.append(sampleMethod) + paramSet.append(row) # rows contains all rows + filenames.append(os.path.splitext(filename)[0]) + + # Convert to pandas DF + paramSetDF = pd.DataFrame({ + 'simName': filenames, + 'parameterSet': paramSet, # [row for row in paramSet], # Wrap each row as a list + 'order': pd.to_numeric(order), # convert to int + 'sampleMethods': sampleMethods + }) + return paramSetDF + + +def readArealIndicators(inDir): + """ + Read areal indicator results from a pickle file and convert to a DataFrame. + + Parameters + ---------- + inDir : str or pathlib.Path + Path to pickle file containing indicator results. + + Returns + ------- + indicatorsDF : pandas.DataFrame + DataFrame with simName and areal indicators, + """ + with open(inDir, "rb") as f: + all_results = pickle.load(f) + + indicatorsDF = pd.DataFrame(all_results) + return indicatorsDF + + +def addLossMetrics(df, referenceDF, cfgOpt): + """ + Compute evaluation metrics (recall, precision, F1, Tversky score) and a combined weighted Loss (or optimisation) + variable from a given DataFrame. + + The metrics are based on area (number of pixel would also be possible). Invalid values + (division by zero) are replaced with 0. + + Parameters + ---------- + df : pandas.DataFrame + Input DataFrame with at least the columns + ``TP_SimRef_area``, ``FP_SimRef_area``, ``FN_SimRef_area``. + referenceDF: pandas.Dataframe + Dataframe with information of the reference in AIMEC e.g. reference_sRunout of the polygon + cfgOpt: configparser.ConfigParser + Config parser of the runOptimisationCfg.ini file + + Returns + ------- + df : pandas.DataFrame + Same DataFrame with additional columns: + - ``recall`` : float + - ``precision`` : float + - ``f1_score`` : float + - ``tversky_score`` : float + - ``optimisationVariable`` : float + """ + # Decide if loss function is based on ncells or area + basedOn = '_area' + TP = df[f"TP_SimRef{basedOn}"] + FP = df[f"FP_SimRef{basedOn}"] + FN = df[f"FN_SimRef{basedOn}"] + + # Recall = TP / (TP + FN) + denomRecall = TP + FN + df["recall"] = np.where(denomRecall != 0, TP / denomRecall, 0.0) + # Precision = TP / (TP + FP) + denomPrecision = TP + FP + df["precision"] = np.where(denomPrecision != 0, TP / denomPrecision, 0.0) + # F1 Score + denomF1 = df['precision'] + df['recall'] + df["f1_score"] = np.where(denomF1 != 0, 2.0 * df['precision'] * df['recall'] / denomF1, 0.0) + + # Tversky score = TP / (TP + alpha * FP + beta * FN), gives penalty to overshoot --> alpha + alpha = float(cfgOpt['LOSS_PARAMETERS']['tverskyAlpha']) + beta = float(cfgOpt['LOSS_PARAMETERS']['tverskyBeta']) + + denomTversky = TP + alpha * FP + beta * FN + df["tversky_score"] = np.where(denomTversky != 0, TP / denomTversky, 0.0) + # Subtract 1 to ensure that 0 are good values and 1 bad + df['1-tversky'] = 1 - df["tversky_score"] + + # Runout + # RMSE divided with Runout length of Reference + sRunoutRef = referenceDF['reference_sRunout'].values + runoutRMSE = df['runoutLineDiff_poly_RMSE'] + df['runoutRMSENormalised'] = runoutRMSE / sRunoutRef # 0 is good, 1 is bad + + # Weights + wTversky = float(cfgOpt['LOSS_PARAMETERS']['weightTversky']) + wRunout = float(cfgOpt['LOSS_PARAMETERS']['weightRunout']) + + df['optimisationVariable'] = ( + wRunout * df['runoutRMSENormalised'].fillna(1) + wTversky * df['1-tversky'].fillna(1)) + return df + + +def buildFinalDF(avalancheDir, varParList, cfgOpt): + """ + Build the final merged DataFrame for a given avalanche. + + Combines parameter sets, AIMEC results, and areal indicators into one DataFrame, + then computes evaluation metrics 'via addLossMetrics'. + + Parameters + ---------- + avalancheDir : str + Path of avalanche directory + varParList: list of str + List of parameter names that are varied + cfgOpt: configparser.ConfigParser + Config parser of the runOptimisationCfg.ini file + + Returns + ------- + finalDF : pandas.DataFrame + Final DataFrame containing: + - ``simName`` + - ``parameterSet`` + - ``order`` + - Areal indicator columns + - Evaluation metrics (recall, precision, f1_score, tversky_score, optimisationVariable) + """ + + avaName = avalancheDir.split('/')[-1] + + # Folder where ini files from simulations are + inDir = pathlib.Path(avalancheDir, 'Outputs/com8MoTPSA/configurationFiles') + # Read parameterSetDF + paramSetDF = readParamSetDF(inDir, varParList) + + # Dataframe from AIMEC + df_aimec = pd.read_csv( + avalancheDir + '/Outputs/ana3AIMEC/com8MoTPSA/Results_' + avaName + '_ppr_lim_1_w_600resAnalysisDF.csv') + # Get data of the reference in AIMEC + referenceDF = pd.read_csv(f"{avalancheDir}/Outputs/ana3AIMEC/com8MoTPSA/referenceDF.csv") + + # Read areal indicators + arealIndicatorDir = pathlib.Path(avalancheDir, 'Outputs', 'out1Peak', 'arealIndicators.pkl') + indicatorsDF = readArealIndicators(arealIndicatorDir) + + # remove rows ending with '_L1' + indicatorsDF = indicatorsDF.loc[~indicatorsDF["simName"].str.endswith("_L1")].copy() + # remove trailing '_L2' for merging on simName + indicatorsDF["simName"] = (indicatorsDF["simName"].str.replace(r"_L2$", "", regex=True)) + + # Merge df's + df_merged = pd.merge(paramSetDF, df_aimec, on='simName', how='inner') + df_merged = df_merged.merge(indicatorsDF, on="simName", how="left") + + # Add optimisation variables + finalDF = addLossMetrics(df_merged, referenceDF, cfgOpt) + return finalDF + + +def createDFParameterLoss(df, paramSelected): + """ + Create DataFrames linking selected parameters with the loss function. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame that contains the selected parameters and their values as well as the loss function. + paramSelected : list of str + Subset of parameters to include in the output DataFrames. + + Returns + ------- + paramLossDF : pandas.DataFrame + DataFrame with one column per selected parameter and an additional + ``Loss`` column with the raw values of ``optimisationVariable``. + paramLossDFScaled : pandas.DataFrame + Same as ``paramLossDF`` but with the selected parameters normalised + to the range [0, 1] using min–max scaling. + """ + paramLossDF = df[paramSelected].copy() + paramLossDFScaled = (paramLossDF - paramLossDF.min()) / (paramLossDF.max() - paramLossDF.min()) # normalise + paramLossDF['Loss'] = df[ + 'optimisationVariable'] + paramLossDFScaled['Loss'] = df[ + 'optimisationVariable'] + return paramLossDF, paramLossDFScaled + + +def fitSurrogate(df, cfgOpt): + """ + Prepare data and initialize surrogate models for loss prediction. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame containing input parameters as columns and a + column named 'Loss' as target variable. + cfgOpt: configparser.ConfigParser + Config parser of the runOptimisationCfg.ini file. + + Returns + ------- + X : numpy.ndarray + Feature matrix of shape (n_samples, n_features). + y : numpy.ndarray + Target vector of shape (n_samples,). + gp_pipe : sklearn.pipeline.Pipeline + Pipeline consisting of feature standardization and a + Gaussian Process regressor with Matern kernel. + """ + # Prepare X, y + y_col = 'Loss' + X = df.drop(columns=[y_col]).to_numpy(dtype=float) + y = df[y_col].to_numpy(dtype=float).reshape(-1) + n_features = X.shape[1] + + # GP kernel with Matern-Covariance + matern_nu = float(cfgOpt['OPTIMISATION']['matern_nu']) + kernel = ( + ConstantKernel(1.0, (1e-3, 1e3)) # Output variance tells how strong Y varies + * Matern(length_scale=np.ones(n_features), + length_scale_bounds=(1e-2, 1e2), # in z (variance) space + nu=matern_nu) + + WhiteKernel(noise_level=1e-4, noise_level_bounds=(1e-8, 1e-1)) + ) + gp = GaussianProcessRegressor( + kernel=kernel, + alpha=1e-8, + normalize_y=True, + n_restarts_optimizer=10, + random_state=0, + ) + + # Pipelines (feature scaling + model) + gp_pipe = Pipeline([("x_scaler", StandardScaler()), ("model", gp)]) + return X, y, gp_pipe + + +def KFoldCV(X, y, pipe, cfgOpt, outDir, avaName, pipeName): + """ + Perform k-fold cross-validation for a regression pipeline. + + Parameters + ---------- + X : numpy.ndarray + Feature matrix of shape (n_samples, n_features). + y : numpy.ndarray + Target vector of shape (n_samples,). + pipe : sklearn.pipeline.Pipeline + Regression pipeline to be evaluated. + cfgOpt: configparser.ConfigParser + Config parser of the runOptimisationCfg.ini file. + outDir : pathlib.Path + File path where the generated image will be saved. + avaName : str + Name of the avalanche. Used for naming the output figure. + pipeName : str + Name of the pipeline, used for formatted console output. + + Returns + ------- + scores : dict + Dictionary containing cross-validation results as returned + by sklearn.model_selection.cross_validate. + """ + # Get optType + optType = cfgOpt['OPTIMISATION']['optType'] + # For losses, sklearn uses "neg_*" because higher-is-better internally + rmse_scorer = "neg_root_mean_squared_error" + mae_scorer = "neg_mean_absolute_error" + r2_scorer = "r2" + k = int(cfgOpt['OPTIMISATION']['k']) + cv = KFold(n_splits=k, shuffle=True, random_state=0) + + scores = cross_validate( + pipe, X, y, cv=cv, + scoring={"rmse": rmse_scorer, "mae": mae_scorer, "r2": r2_scorer}, + return_train_score=True, + error_score="raise" # fail fast if something else is wrong + ) + + # NOTE: rmse/mae were returned as NEGATIVE numbers because the higher is better internally + neg_cols = ["test_rmse", "test_mae", "train_rmse", "train_mae"] + for c in neg_cols: + scores[c] = -scores[c] + # get smoothness parameter nu + matern_nu = float(cfgOpt['OPTIMISATION']['matern_nu']) + row = { + "experiment_name": pipeName, + "n_samples": X.shape[0], + "kernel": f"Matern {matern_nu}", + "noise_model": "WhiteKernel", + } + + for split in ("test", "train"): + for m in ("rmse", "mae", "r2"): + arr = scores[f"{split}_{m}"] + row[f"{split} {m} mean"] = arr.mean() + row[f"{split} {m} std"] = arr.std() + + print(f"\n{pipeName}, {k}-fold CV:") + for split in ("test", "train"): + print(f" {split.capitalize()} metrics:") + for m in ("rmse", "mae", "r2"): + arr = scores[f"{split}_{m}"] + print(f" {m.upper():<4}: {arr.mean():.4g} ± {arr.std():.4g}") + + df = pd.DataFrame([row]) + # Include date, format: YYYYMMDD + date = datetime.now().strftime("%Y%m%d") + base = os.path.join(outDir, f"{avaName}_{k}FoldCV_{optType}") + df.to_csv(base + f'_{date}.csv', mode="a", header=not os.path.exists(base + f'_{date}.csv'), + index=False) # checks if header exists + + # Save results as image + saveResults.saveKFoldCVPrintImage(scores, pipeName, k, base + f'_Matern_{matern_nu}_Kernel_{date}.png') + return scores + + +def optimiseNonSeq(pipe, cfgOpt, paramBounds): + """ + Perform non-sequential surrogate-based optimization using + Latin Hypercube sampling. + + Parameters + ---------- + pipe : sklearn.pipeline.Pipeline + Trained surrogate model pipeline used to predict loss + and uncertainty. + cfgOpt : configparser.ConfigParser + Config parser of the runOptimisationCfg.ini file. + paramBounds : dict + Dictionary mapping parameter names to (min, max) bounds. + + Returns + ------- + topNStat : pandas.DataFrame + DataFrame containing statistics of the top N surrogate + candidates with the lowest predicted loss. + """ + paramSelected = list(paramBounds.keys()) + bounds = np.array(list(paramBounds.values()), dtype=float) # shape (d,2) + d = bounds.shape[0] + + # Create LH samples + seed = int(cfgOpt['OPTIMISATION']['seed']) + n_lhs = int(cfgOpt['OPTIMISATION']['n_lhs']) + sampler = qmc.LatinHypercube(d=d, seed=seed) + sample = sampler.random(n=n_lhs) + X0 = qmc.scale(sample, bounds[:, 0], bounds[:, 1]) + + # Prediction of the loss with GP-Model + mu, sigma = pipe.predict(X0, return_std=True) + # Convert X0 to pandas df for analyze function + df_candidates = pd.DataFrame(X0, columns=paramSelected) + n_top_samples = int(cfgOpt['OPTIMISATION']['n_surrogate_top']) + topNStat, _ = analyzeTopCandidates(df_candidates, mu, sigma, paramSelected, N=n_top_samples) + return topNStat + + +def analyzeTopCandidates(df_candidates, mu, sigma, param_cols, N=5): + """ + Analyze the top N surrogate candidates. + + Parameters + ---------- + df_candidates : pandas.DataFrame + Candidate parameter sets. + mu : numpy.ndarray + Predicted loss values. + sigma : numpy.ndarray + Predicted uncertainty values. + param_cols : list of str + Parameter column names. + N : int, optional + Number of best candidates to analyze. + + Returns + ------- + stats : dict + Summary statistics for the top N and best candidate. + topNData : pandas.DataFrame + Top N candidates with predicted mu and sigma. + """ + + # Top N points + idx_topN = np.argsort(mu)[:N] + topNData = df_candidates.iloc[idx_topN].copy() + topNData["mu"] = mu[idx_topN] + topNData["sigma"] = sigma[idx_topN] + + mean_params = topNData[param_cols].mean() + std_params = topNData[param_cols].std() + mean_mu = topNData["mu"].mean() + std_mu = topNData["mu"].std() + mean_sigma = topNData["sigma"].mean() + std_sigma = topNData["sigma"].std() + + print(f"\n🔍 Mittelwerte ± Std (Top {N}):") + for p in param_cols: + m, s = mean_params[p], std_params[p] + perc = (s / m * 100) if m != 0 else np.nan + print(f" {p:30s}: {m:.6f} ± {s:.6f} ({perc:.1f}%)") + perc_mu = (std_mu / mean_mu * 100) if mean_mu != 0 else np.nan + perc_sigma = (std_sigma / mean_sigma * 100) if mean_sigma != 0 else np.nan + print(f"📉 mu: {mean_mu:.4f} ± {std_mu:.4f} ({perc_mu:.1f}%)") + print(f"📊 sigma: {mean_sigma:.4f} ± {std_sigma:.4f} ({perc_sigma:.1f}%)") + + # Best single point + idx_best = np.argmin(mu) + best_params = df_candidates.iloc[idx_best].copy() + best_loss = mu[idx_best] + best_sigma = sigma[idx_best] + + print("\n🔍 Best single Parameter combination:") + for p in param_cols: + print(f" {p:30s}: {best_params[p]:.4f}") + print(f"📉 mu: {best_loss:.4f}") + print(f"📊 sigma: {best_sigma:.4f}") + + return { + f"TopNBest": { + "mean_params": mean_params, + "std_params": std_params, + "mean_mu": mean_mu, + "std_mu": std_mu, + "mean_sigma": mean_sigma, + "std_sigma": std_sigma, + }, + "Best": { + "params": best_params, + "mu": best_loss, + "sigma": best_sigma, + } + }, topNData + + +def expectedImprovement(mu, sigma, f_best, xi=0): + """ + Compute the Expected Improvement (EI) acquisition function + for minimization problems. + Formula taken from: + https://ekamperi.github.io/machine%20learning/2021/06/11/acquisition-functions.html + + Parameters + ---------- + mu : numpy.ndarray + Predicted mean values of the surrogate model. + sigma : numpy.ndarray + Predicted standard deviations of the surrogate model. + f_best : float + Best observed objective value so far. + xi : float, optional + Exploration parameter controlling exploitation–exploration + trade-off. + + Returns + ------- + ei : numpy.ndarray + Expected Improvement values for each candidate. + """ + sigma = np.maximum(sigma, 1e-12) # numeric safety + imp = f_best - mu - xi # minimization, that's why the sign is different, xi for finetunig exploitation + + Z = imp / sigma + ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z) + ei[sigma <= 1e-12] = 0.0 # set EI to zero where sigma is 1e-12 + return ei + + +def lowerConfidenceBound(mu, sigma, k=0.0): + """ + Compute the Lower Confidence Bound (LCB) acquisition function. + Formula taken from: + https://ekamperi.github.io/machine%20learning/2021/06/11/acquisition-functions.html + + Parameters + ---------- + mu : numpy.ndarray + Predicted mean values of the surrogate model. + sigma : numpy.ndarray + Predicted standard deviations of the surrogate model. + k : float, optional + Exploration parameter controlling the trade-off between + exploitation and exploration. + + Returns + ------- + lcb : numpy.ndarray + Lower Confidence Bound values for each candidate. + """ + return -mu + k * sigma + + +def EINextPoint(pipe, y, paramBounds, cfgOpt): + """ + Propose the next evaluation point using Expected Improvement (EI) or lower confidence bound (LCB). + + This function generates a Latin Hypercube sample (LHS) of candidate points within the + parameter bounds, evaluates a surrogate model (``pipe``) on those candidates, and + selects the candidate that maximises EI or LCB for a *minimisation* + problem. + + Parameters + ---------- + pipe : sklearn.pipeline.Pipeline + Trained surrogate model pipeline used to predict loss + and uncertainty. + y : array-like + Observed objective values (loss) from previous evaluations. + paramBounds : dict + Dictionary mapping parameter names to (lower, upper) bounds. + cfgOpt : configparser.ConfigParser + Config parser of the runOptimisationCfg.ini file. + + Returns + ------- + xBest : numpy.ndarray + Vector of the values of the selected parameters that maximises EI among the LHS candidates. + xBestDict : dict + Mapping of parameter name to selected value for ``xBest``. + best_ei : float + Maximum EI value among the candidate set. + best_lcb : float + Maximum LCB value among the candidate set (computed for reference/diagnostics). + """ + paramSelected = list(paramBounds.keys()) + bounds = np.array(list(paramBounds.values()), dtype=float) # shape (d,2) + d = bounds.shape[0] + f_best = np.nanmin(y) + + # Create LH samples + seed = int(cfgOpt['OPTIMISATION']['seed']) + n_lhs = int(cfgOpt['OPTIMISATION']['n_lhs']) + sampler = qmc.LatinHypercube(d=d, seed=seed) + sample = sampler.random(n=n_lhs) + X0 = qmc.scale(sample, bounds[:, 0], bounds[:, 1]) + + # Predict with pipe + mu, sigma = pipe.predict(X0, return_std=True) + + print(mu.mean(), mu.max(), sigma.mean(), sigma.max()) + + # EI or LCB for minimization + ei = expectedImprovement(mu, sigma, f_best) + lcb = lowerConfidenceBound(mu, sigma) + + xBest = X0[np.argmax(ei)].copy() + # xBest = X0[np.argmax(lcb)].copy() + xBestDict = {feat: float(val) for feat, val in zip(paramSelected, xBest)} + + return xBest, xBestDict, np.max(ei), np.max(lcb) + + +def runCom8MoTPSA(avalancheDir, xBestDict, cfgMain, i=0, optimisationType=None): + """ + Based on the default runCom8MoTPSA function in com8MoTPSA/runCom8MoTPSA.py file, but overrides parameter values in + the module configuration using the values provided in ``xBestDict``. It also assigns a unique visualisation scenario + identifier and records the sampling method for traceability. + + Parameters + ---------- + avalancheDir: str + Path to avalanche directory. + xBestDict : dict + Mapping of parameter name to selected value for ``xBest``. + cfgMain : configparser.ConfigParser + General configuration for avaframe. + i : int, optional + Counter for identifying the number of iterations. + optimisationType: str, optional + Name of the optimisation type, sequential or non-sequential. + + Returns + --------- + simName: str + Name of the simulation. + """ + # Time the whole routine + startTime = time.time() + + # log file name; leave empty to use default runLog.log + logName = 'runCom8MoTPSA' + + # Start logging + log = logUtils.initiateLogger(avalancheDir, logName) + log.info('MAIN SCRIPT') + log.info('Current avalanche: %s', avalancheDir) + # ---------------- + # Clean input directory(ies) of old work and output files + # If you just created the ``avalancheDir`` this one should be clean but if you + # already did some calculations you might want to clean it:: + initializeProject.cleanSingleAvaDir(avalancheDir, deleteOutput=False) + # Get module config + cfgCom8MoTPSA = cfgUtils.getModuleConfig(com8MoTPSA, toPrint=False) + + # overwrite com8MoTPSACfg with xBest values + for param, val in xBestDict.items(): + # print(param, val) + section = probAna.fetchParameterSection(cfgCom8MoTPSA, param) + cfgCom8MoTPSA[section][param] = str(val) + # give visualisation unique scenario for identifying later + cfgCom8MoTPSA['VISUALISATION']['scenario'] = str(i) + if optimisationType == 'nonSeq': + cfgCom8MoTPSA["VISUALISATION"]["sampleMethod"] = 'nonSeq' + else: + cfgCom8MoTPSA["VISUALISATION"]["sampleMethod"] = 'EI/LCB' + + # ---------------- + # Run psa + simName = com8MoTPSA.com8MoTPSAMain(cfgMain, cfgInfo=cfgCom8MoTPSA, returnSimName=True) + # Print time needed + endTime = time.time() + log.info('Took %6.1f seconds to calculate.' % (endTime - startTime)) + + return simName + + +def findSimName(finalDF, paramValue, atol=1e-6): + """ + Return the simName in finalDF whose parameter columns match + the given paramValue within a numerical tolerance. + + Parameters + ---------- + finalDF : pandas.DataFrame + Must contain column simName and parameter columns given in paramValue. + paramValue : dict or iterable of (name, value) + Parameter values to match. + atol : float, optional + Absolute tolerance for float comparison (default: 1e-6). + + Returns + ------- + str + The first matching simName. + """ + mask = np.ones(len(finalDF), dtype=bool) + for col, val in dict(paramValue).items(): + s = pd.to_numeric(finalDF[col], errors="coerce") + mask &= np.isclose(s, float(val), atol=atol, rtol=0) + + matches = finalDF.loc[mask, "simName"] + return matches.iloc[0] + + +def loadVariationData(cfgOpt, outDir, avaDir): + """ + Load parameter bounds and selected parameters for optimisation. Two execution modes are supported, controlled via + cfgOpt['PARAM_BOUNDS']['scenario']: + + Scenario 1 (Morris pre-run): + - A Morris sensitivity analysis has already been executed. + - Ranked parameters and their bounds are loaded from + 'sa_parameters_bounds.pkl'. + - The top-N most influential parameters are selected for optimisation. + + Scenario 2 (Manual definition): + - No prior Morris screening. + - Parameter names and corresponding bounds are loaded from a previously saved pickle file generated by + ``runAna4ProbAnaCom8MoTPSA.py``. The parameter variation is therefore not defined within this function, but is + determined by the configuration specified in ``probAnaCfg.ini``. The ``probAnaCfg.ini`` file contains the + settings used to generate the initial sample set, including parameter ranges and variation rules. + + + Parameters + ---------- + cfgOpt: configparser.ConfigParser + Configuration object containing the section 'PARAM_BOUNDS'. + outDir: pathlib.Path + Directory containing the Morris output file + ('sa_parameters_bounds.pkl'). + avaDir: str + Directory of the avalanche. + + Returns + ------- + paramBounds : dict + Dictionary mapping parameter names to (min, max) tuples. + paramSelected : list + List of selected parameter names used for optimisation. + """ + # Read scenario flag + scenario = int(cfgOpt['PARAM_BOUNDS']['scenario']) + + avaName = avaDir.split('/')[-1] + + # Scenario 1: Morris run prior + if scenario == 1: + # Load SA data and define how much parameters should be optimized (variation bounds included) + SiDFSort = pd.read_pickle(outDir / f"{avaName}_sortedSAResultsWithBounds.pkl") + topN = int(cfgOpt['PARAM_BOUNDS']['topN']) + paramSelected = list(SiDFSort['Parameter'][:topN]) + paramBounds = dict(zip(paramSelected, SiDFSort['bounds'])) + + # Scenario 2: Morris is not run prior + else: + # Load variation data with bounds from pickle file + inDir2 = pathlib.Path(avaDir, 'Outputs', "ana4Stats") + paramValuesD = pd.read_pickle(inDir2 / "paramValuesD.pickle") + paramSelected = paramValuesD['names'] + paramBounds = { + name: (float(bounds[0]), float(bounds[1])) + for name, bounds in zip(paramValuesD["names"], paramValuesD["bounds"]) + } + return paramBounds, paramSelected + + +def loadMorrisConvergenceData(cfgMorrisSA, avalancheDir, avaName): + """ + Load Morris sensitivity analysis results for convergence plotting. + + Parameters + ---------- + cfgMorrisSA : configparser.ConfigParser + Morris configuration object containing section + 'MORRIS_CONVERGENCE' and 'GENERAL'. + avalancheDir : str or pathlib.Path + Root avalanche directory. + avaName : str + Avalanche name used in filename construction. + + Returns + ------- + SA_dfs : list[pandas.DataFrame] + Loaded Morris result DataFrames in the order defined in the ini file. + r_vals : list[int] + Corresponding number of trajectories (r) extracted from folder names. + outputs : list[str] + Output folder names as defined in the ini file. + outDir: pathlib.Path + Path where results are saved to. + reference_r : int + The number of trajectories used as refernce for ordering the parameters. + """ + + section = cfgMorrisSA["MORRIS_CONVERGENCE"] + + outputs = [] + r_vals = [] + + # Read outputs in ini order + for key, value in section.items(): + if key.lower().startswith("outputs"): + outputs.append(value) + + match = re.search(r"R(\d+)", value) + if match: + r_vals.append(int(match.group(1))) + else: + raise ValueError( + f"Could not extract r-value from folder name '{value}'" + ) + + # Build module path + resultDir = cfgMorrisSA["GENERAL"]["resultDir"] # e.g. Outputs/ana6MorrisSA + moduleDir = pathlib.Path(resultDir).name # e.g. ana6MorrisSA + comModuleName = cfgMorrisSA["GENERAL"]["modName"] + + filename = f"{avaName}_sortedSAResultsWithBounds.pkl" + + # Load DataFrames + selectedOutput = cfgMorrisSA['MORRIS_CONVERGENCE']['referenceOutput'] + + # Get reference trajectory number, import for determining of the order + match = re.search(r"R(\d+)", selectedOutput) + if match: + reference_r = int(match.group(1)) + else: + raise ValueError( + f"Could not extract r-value from referenceOutput '{selectedOutput}'" + ) + + selectedDir = None + SA_dfs = [] + + for out_name in outputs: + outDir = pathlib.Path(avalancheDir, out_name, moduleDir, comModuleName) + if out_name == selectedOutput: + selectedDir = outDir + with open(outDir / filename, "rb") as fi: + SA_dfs.append(pickle.load(fi)) + + return SA_dfs, r_vals, outputs, selectedDir, reference_r diff --git a/avaframe/ana6Optimisation/runMorrisSA.py b/avaframe/ana6Optimisation/runMorrisSA.py new file mode 100644 index 000000000..13b030fc8 --- /dev/null +++ b/avaframe/ana6Optimisation/runMorrisSA.py @@ -0,0 +1,97 @@ +""" +Run script for the Morris sensitivity analysis. For usage read README_ana6.md. +""" +import sys +import numpy as np +import pandas as pd +import pathlib +from datetime import datetime +from SALib.analyze import morris as morris_analyze + +from avaframe.in3Utils import cfgUtils +from avaframe.in3Utils import fileHandlerUtils as fU +from avaframe.ana3AIMEC import ana3AIMEC +import avaframe.out3Plot.outAna6Plots as saveResults +import optimisationUtils + +# Get module config +module = sys.modules[__name__] +cfgMorrisSA = cfgUtils.getModuleConfig(module, toPrint=False) + +# Load avalanche directory from general configuration file +cfgMain = cfgUtils.getGeneralConfig() +avalancheDir = cfgMain['MAIN']['avalancheDir'] +avaName = pathlib.Path(avalancheDir).name + +# Calculate Areal indicators and AIMEC and save the results in Outputs/ana3AIMEC and Outputs/out1Peak +optimisationUtils.calcArealIndicatorsAndAimec(cfgMorrisSA, avalancheDir, ana3AIMEC) + +# Load variation data with bounds from pickle file +inDir = pathlib.Path(avalancheDir, 'Outputs', "ana4Stats") +paramValuesD = pd.read_pickle(inDir / "paramValuesD.pickle") +varParList = paramValuesD['names'] +# +# Read and merge results from parameter sets (simulation data), areal indicators and AIMEC +finalDF = optimisationUtils.buildFinalDF(avalancheDir, varParList, cfgMorrisSA) + +# Use only morris samples +morrisDF = finalDF[finalDF['sampleMethods'] == 'morris'].copy(deep=True) +# Set order as index +morrisDF.set_index('order', inplace=True) +# Order df based on order (which is the index) +morrisDF.sort_index(inplace=True) + +# Define input for SA +problem = { + 'num_vars': len(varParList), + 'names': varParList, + 'bounds': paramValuesD['bounds'] +} +samples = np.vstack(morrisDF['parameterSet'].values).astype(float) +Y = morrisDF['optimisationVariable'].values + +# Perform SA +Si = morris_analyze.analyze( + problem, + samples, + Y, + conf_level=float(cfgMorrisSA['MORRIS SA']['conf_level']), + num_levels=int(cfgMorrisSA['MORRIS SA']['num_levels']) +) + +# Rank Parameters +SiData = { + "Parameter": Si['names'], + "mu_star": Si['mu_star'], + "sigma": Si['sigma'], + "mu_star_conf": Si['mu_star_conf']} + +# Convert to dataframe +SiDF = pd.DataFrame(SiData) + +# Create folder for saving the results of the analysis, if not already existing +resultDir = cfgMorrisSA['GENERAL']['resultDir'] +comModuleName = cfgMorrisSA['GENERAL']['modName'] +outDir = pathlib.Path(avalancheDir, resultDir, comModuleName) +fU.makeADir(outDir) + +saveResults.barplotSA(SiDF, avaName, outDir) +saveResults.scatterplotSA(SiDF, avaName, outDir) +saveResults.scatterplotUncertaintySA(SiDF, avaName, outDir) + +# Sort SA results +SiDFSort = SiDF.sort_values("mu_star", ascending=False).reset_index(drop=True) +# Append bounds to SiDFSort +paramBounds = dict(zip(problem["names"], problem["bounds"])) +SiDFSort["bounds"] = SiDFSort["Parameter"].map(paramBounds) +# Save as Pickle for Optimization +SiDFSort.to_pickle(outDir / f"{avaName}_sortedSAResultsWithBounds.pkl") + +# Create df with parameters and the loss function for summary statistics +paramLossDF, paramLossScaledDF = optimisationUtils.createDFParameterLoss(morrisDF, SiDFSort['Parameter']) +N = int(cfgMorrisSA['MORRIS SA']['N']) +paramLossSubsetDF = paramLossDF.sort_values(by='Loss', ascending=True)[:N] +# Save mean values of best input parameters as csv +date = datetime.now().strftime("%Y%m%d") +csvPath = f"{outDir}/{avaName}_MorrisBEST{N}Simulations_{date}.csv" +paramLossSubsetDF.describe().to_csv(csvPath) diff --git a/avaframe/ana6Optimisation/runMorrisSACfg.ini b/avaframe/ana6Optimisation/runMorrisSACfg.ini new file mode 100644 index 000000000..eb94c9b68 --- /dev/null +++ b/avaframe/ana6Optimisation/runMorrisSACfg.ini @@ -0,0 +1,46 @@ +### Config File - This file contains the main settings the morris SA + +# Sidenote: (1) when running runMorrisSA.py the working directory needs to be in the ana6Optimisation folder +# (2) avaDirectory in avaframeCfg.ini need ../ as prefix + +# SA ... sensitivity analysis + + +[GENERAL] +# USER input for running and plotting a comparison of simulation result to reference polygon +resType = ppr +thresholdValueSimulation = 1 +modName = com8MoTPSA +# in this folder the results will be saved, avalancheDir + this folder +resultDir = Outputs/ana6MorrisSA + +[LOSS_PARAMETERS] +# alpha gives penalty if sim. overshoot the ref. (FP) +# beta gives penalty if sim. comes short compared to the ref. (FN) +tverskyAlpha = 2 +tverskyBeta = 1 +# Loss function is kombination of TverskyScore * weightTversky + RunoutNormalised * weight runout +weightTversky = 0.25 +weightRunout = 0.75 + +[MORRIS SA] +# confidence level and number of levels used for SA +conf_level = 0.95 +num_levels = 6 +# number of best morris samples for statistics +N = 10 + + +[MORRIS_CONVERGENCE] +# for morris convergence plot, morris needs to be run with different morris trajectories prior +# define Output folder where results of morris analyses with different trajectories (R) are saved, naming: OutputsRNumber +Outputs1 = OutputsR5 +Outputs2 = OutputsR10 +Outputs3 = OutputsR20 +Outputs4 = OutputsR40 + +# get order of parameters from this output, also save result of convergence into child folders of this parent folder +referenceOutput = OutputsR10 + + + diff --git a/avaframe/ana6Optimisation/runOptimisation.py b/avaframe/ana6Optimisation/runOptimisation.py new file mode 100644 index 000000000..b0ec5266d --- /dev/null +++ b/avaframe/ana6Optimisation/runOptimisation.py @@ -0,0 +1,152 @@ +""" +Run script for the optimization process. For usage read README_ana6.md. +""" +import sys +import pathlib +import pickle + +from avaframe.in3Utils import cfgUtils +from avaframe.in3Utils import fileHandlerUtils as fU +from avaframe.ana3AIMEC import ana3AIMEC +import avaframe.out3Plot.outAna6Plots as saveResults +import optimisationUtils + +# Get module config +module = sys.modules[__name__] +cfgOpt = cfgUtils.getModuleConfig(module, toPrint=False) + +# Load avalanche directory from general configuration file +cfgMain = cfgUtils.getGeneralConfig() +avalancheDir = cfgMain['MAIN']['avalancheDir'] +avaName = pathlib.Path(avalancheDir).name + +# Create folder for saving the results of the analysis, if not already existing +resultDirOpt = cfgOpt['GENERAL']['resultDir'] +comModuleName = cfgOpt['GENERAL']['modName'] +outDir = pathlib.Path(avalancheDir, resultDirOpt, comModuleName) +fU.makeADir(outDir) + +# Get config from morris for path to morris results +cfgDir = 'runMorrisSA.ini' +cfgMorrisSA = cfgUtils.getModuleConfig(pathlib.Path(cfgDir), toPrint=False) +resultDirMorris = cfgMorrisSA['GENERAL']['resultDir'] +inDir = pathlib.Path(avalancheDir, resultDirMorris, comModuleName) + +# Load variation parameters and their bounds +paramBounds, paramSelected = optimisationUtils.loadVariationData(cfgOpt, inDir, avalancheDir) + +# Calculate Areal indicators and AIMEC and save the results in Outputs/ana3AIMEC and Outputs/out1Peak +optimisationUtils.calcArealIndicatorsAndAimec(cfgOpt, avalancheDir, ana3AIMEC) + +# Read and merge results from parameter sets (simulation data), areal indicators and AIMEC +finalDF = optimisationUtils.buildFinalDF(avalancheDir, paramSelected, cfgOpt) + +# ---------------------------------------------------------------------------------------------------------------------- +optimisationType = cfgOpt['OPTIMISATION']['optType'] +if optimisationType == 'nonseq': + csv_path = outDir / f"{avaName}_BestOrCurrentSimulation_NonSeq.csv" + # Save sim with currently best y + saveResults.saveBestorCurrentModelrun(finalDF, paramSelected, csv_path=csv_path) + + # Create df with most important parameters and the loss function + emulatorDF, emulatorScaledDF = optimisationUtils.createDFParameterLoss(finalDF, paramSelected) + + # Create surrogate + X, y, gp_pipe = optimisationUtils.fitSurrogate(emulatorDF, cfgOpt) # X,y are features of emulatorDF + + # K fold cross validation + optimisationUtils.KFoldCV(X, y, gp_pipe, cfgOpt, outDir, avaName, "Gaussian Process Matern Kernel") + + # Fit final pipline + gp_pipe.fit(X, y) + + # Optimize non-sequential (only use pipe once to find best param) + topNStat = optimisationUtils.optimiseNonSeq(gp_pipe, cfgOpt, paramBounds) + + # Run com8 with mean parameters of best N surrogate evaluations + simNameMean = optimisationUtils.runCom8MoTPSA(avalancheDir, topNStat['TopNBest']['mean_params'], cfgMain, + optimisationType='nonSeq') + # Run com8 with parameters of best surrogate evaluations + simNameBest = optimisationUtils.runCom8MoTPSA(avalancheDir, topNStat['Best']['params'], cfgMain, + optimisationType='nonSeq') + # SimName could be None if sim is already available, if so get the name from finalDF + if simNameMean is None: + simNameMean = optimisationUtils.findSimName(finalDF, topNStat["TopNBest"]["mean_params"], atol=1e-6) + if simNameBest is None: + simNameBest = optimisationUtils.findSimName(finalDF, topNStat["Best"]["params"], atol=1e-6) + + # Calculate Areal indicators and AIMEC and save the results in Outputs/ana3AIMEC and Outputs/out1Peak + optimisationUtils.calcArealIndicatorsAndAimec(cfgOpt, avalancheDir, ana3AIMEC) + + # Read and merge results from parameter sets (simulation data), areal indicators and AIMEC + finalDF = optimisationUtils.buildFinalDF(avalancheDir, paramSelected, cfgOpt) + + # Create image of table + saveResults.saveTopCandidates(finalDF, paramSelected, cfgOpt, topNStat, + out_path=outDir / f"{avaName}_StatisticsBestParameterValues_NonSeqAnalysis.png", + title=f"{avaName}, NonSeq-Analysis: Best Surrogate vs Best Model Run", + simNameMean=simNameMean, simNameBest=simNameBest) + + # Save latest real sim + saveResults.saveBestorCurrentModelrun(finalDF, paramSelected, simName=simNameMean, csv_path=csv_path) + +# ---------------------------------------------------------------------------------------------------------------------- +elif optimisationType == 'seq': + csv_path = outDir / f"{avaName}_BestOrCurrentSimulation_BO.csv" + # Save sim with currently best y + saveResults.saveBestorCurrentModelrun(finalDF, paramSelected, csv_path=csv_path) + + eiThreshold = float(cfgOpt['OPTIMISATION']['eiThreshold']) + nGoodSims = float(cfgOpt['OPTIMISATION']['numberOfGoodSimulations']) + countGoodSims = 0 + bo_max_iterations = int(cfgOpt['OPTIMISATION']['bo_max_iterations']) + + for i in range(bo_max_iterations): + # Create df with most important parameters and the loss function + emulatorDF, emulatorScaledDF = optimisationUtils.createDFParameterLoss(finalDF, paramSelected) + # Train surrogate + X, y, gp_pipe = optimisationUtils.fitSurrogate(emulatorDF, cfgOpt) # X,y are features of emulatorDF + + # K fold cross validation + optimisationUtils.KFoldCV(X, y, gp_pipe, cfgOpt, outDir, avaName, "Gaussian Process Matern Kernel") + + # Fit final pipline + gp_pipe.fit(X, y) + + # Get next input parameters with EI + xBest, xBestDict, ei, lcb = optimisationUtils.EINextPoint(gp_pipe, y, paramBounds, cfgOpt) + + # Run com8 with best x + simName = optimisationUtils.runCom8MoTPSA(avalancheDir, xBestDict, cfgMain, i, optimisationType='seq') + + # Calculate Areal indicators and AIMEC and save the results in Outputs/ana3AIMEC and Outputs/out1Peak + optimisationUtils.calcArealIndicatorsAndAimec(cfgOpt, avalancheDir, ana3AIMEC) + + # Read and merge results from parameter sets (simulation data), areal indicators and AIMEC + finalDF = optimisationUtils.buildFinalDF(avalancheDir, paramSelected, cfgOpt) + + # Save latest sim + saveResults.saveBestorCurrentModelrun(finalDF, paramSelected, ei, lcb, simName, + csv_path=csv_path) + # If ei is smaller than threshold, the simulation is counted as 'good', if number of good simulations is + # reached, optimization stops + if ei < eiThreshold: + countGoodSims = countGoodSims + 1 + if countGoodSims >= nGoodSims: + break + + saveResults.saveTopCandidates(finalDF, paramSelected, cfgOpt, + out_path=outDir / f"{avaName}_StatisticsBestParameterValues_BOAnalysis.png", + title=f"{avaName}, Seq-Analysis: Best Model Runs") + + # Save BO plots + n_top_samples = int(cfgOpt['OPTIMISATION']['n_model_top']) + emulatorDF, emulatorScaledDF = optimisationUtils.createDFParameterLoss(finalDF, paramSelected) + + saveResults.BOConvergencePlot(finalDF, avaName, outDir) + saveResults.BOBoxplot(emulatorDF, avaName, outDir, N=n_top_samples) + saveResults.BOBoxplotNormalised(emulatorDF, paramBounds, avaName, outDir, N=n_top_samples) + +# Save pickle file of finalDF +with open(outDir / f"{avaName}_finalDF.pickle", "wb") as fi: + pickle.dump(finalDF, fi) diff --git a/avaframe/ana6Optimisation/runOptimisationCfg.ini b/avaframe/ana6Optimisation/runOptimisationCfg.ini new file mode 100644 index 000000000..be8a3dbd4 --- /dev/null +++ b/avaframe/ana6Optimisation/runOptimisationCfg.ini @@ -0,0 +1,60 @@ +### Config File - This file contains the main settings for the optimisation process + +# Sidenote: (1) when running runOptimisation.py the working directory needs to be in the ana6Optimisation folder +# (2) avaDirectory in avaframeCfg.ini need ../ as prefix + +# BO ... bayesian optimisation +# ei ... expected improvement + + +[GENERAL] +# USER input for running and plotting a comparison of simulation result to reference polygon +resType = ppr +thresholdValueSimulation = 1 +modName = com8MoTPSA +# in this folder the results will be saved, avalancheDir + this folder +resultDir = Outputs/ana6Optimisation + + +[PARAM_BOUNDS] +# 2 scenarios: choose 1 or 2 +scenario = 2 +#(1): morris is run prior, then dataframe of ranked input parameters is already saved by runMorris.py as pickle file, user only need to determine how much input paramters to use for optimisation +topN = 6 +# (2): morris is not run prior, input parameters and their bounds are read from a previously saved pickle file generated by ``runAna4ProbAnaCom8MoTPSA.py``. The parameter variation is determined by the configuration specified in ``probAnaCfg.ini``. + + +[LOSS_PARAMETERS] +# alpha gives penalty if sim. overshoot the ref. (FP) +# beta gives penalty if sim. comes short compared to the ref. (FN) +tverskyAlpha = 2 +tverskyBeta = 1 + +weightTversky = 0.25 +weightRunout = 0.75 + + +[OPTIMISATION] +# Type of optimisation: seq or nonseq +optType = seq +# define the smoothness of the matern kernel of the surrogate, e.g.: 0.5, 1.5, 2.5 +matern_nu = 2.5 +# define k for k-fold cross validation +k = 5 +# sampling seed for generation of Latin Hypercube samples used for loss-prediction with surrogate +seed = 12345 +# number of LH samples +n_lhs = 1000000 +# determine the number N of surrogate's best simulations used for statistics +n_surrogate_top = 100 +# determine the number N of models best simulations used for statistics (n < available model runs) +n_model_top = 6 + +# settings for seq: +# if ei is smaller than threshold, the simulation is counted as 'good', if number of good simulations is reached, optimisation stops +eiThreshold = 0.01 +numberOfGoodSimulations = 5 +bo_max_iterations = 2 + + + diff --git a/avaframe/ana6Optimisation/runPlotMorrisConvergence.py b/avaframe/ana6Optimisation/runPlotMorrisConvergence.py new file mode 100644 index 000000000..eca869a52 --- /dev/null +++ b/avaframe/ana6Optimisation/runPlotMorrisConvergence.py @@ -0,0 +1,30 @@ +""" +Run script for the Morris convergence plot. For usage read README_ana6.md. +""" +import pathlib + +import avaframe.out3Plot.outAna6Plots as saveResults +from avaframe.in3Utils import cfgUtils +import optimisationUtils + +# Load avalanche directory from general configuration file +cfgMain = cfgUtils.getGeneralConfig() +avalancheDir = cfgMain['MAIN']['avalancheDir'] +avaName = pathlib.Path(avalancheDir).name + +# Load morris config file +cfgDir = 'runMorrisSA.ini' +cfgMorrisSA = cfgUtils.getModuleConfig(pathlib.Path(cfgDir), toPrint=False) + +# Load Morris sensitivity analysis results for convergence plot +SA_dfs, r_vals, outputs, outDir, reference_r = optimisationUtils.loadMorrisConvergenceData(cfgMorrisSA, avalancheDir, + avaName) + +# Top 7 parameters +saveResults.plotMorrisConvergence(SA_dfs, r_vals, reference_r, k=7, + outpath=outDir / f'{avaName}_MorrisSAConvergencePlotTop7.png', + title=f"{avaName} Convergence plot for top 7 parameters") +# All parameters +saveResults.plotMorrisConvergence(SA_dfs, r_vals, reference_r, k=None, + outpath=outDir / f'{avaName}_MorrisSAConvergencePlotAll.png', + title=f"{avaName} Convergence plot for all parameters") diff --git a/avaframe/com1DFA/com1DFATools.py b/avaframe/com1DFA/com1DFATools.py index 5ba11a87a..aba363f48 100644 --- a/avaframe/com1DFA/com1DFATools.py +++ b/avaframe/com1DFA/com1DFATools.py @@ -346,6 +346,7 @@ def initializeInputs(avalancheDir, cleanRemeshedRasters, module=com1DFA): simDFExisting, simNameExisting = cfgUtils.readConfigurationInfoFromDone( avalancheDir, specDir="", + modName=modName ) # fetch input data - dem, release-, entrainment- and resistance areas (and secondary release areas) diff --git a/avaframe/com8MoTPSA/com8MoTPSA.py b/avaframe/com8MoTPSA/com8MoTPSA.py index 033e7d4ec..679636ca5 100644 --- a/avaframe/com8MoTPSA/com8MoTPSA.py +++ b/avaframe/com8MoTPSA/com8MoTPSA.py @@ -18,12 +18,13 @@ import avaframe.in3Utils.fileHandlerUtils as fU from avaframe.out1Peak import outPlotAllPeak as oP import avaframe.in3Utils.MoTUtils as mT +from avaframe.in3Utils.initializeProject import _checkForFolderAndDelete # create local logger log = logging.getLogger(__name__) -def com8MoTPSAMain(cfgMain, cfgInfo=None): +def com8MoTPSAMain(cfgMain, cfgInfo=None, returnSimName=None): """Run the full MoT-PSA workflow: generate configs, run simulations in parallel, postprocess. Parameters @@ -32,6 +33,8 @@ def com8MoTPSAMain(cfgMain, cfgInfo=None): main AvaFrame configuration (avalancheDir, nCPU, plot flags) cfgInfo : dict or None, optional override configuration info passed to MoTGenerateConfigs + returnSimName : any, optional + if not None, return the first simDict key after running """ # Get all necessary information from the configuration files currentModule = sys.modules[__name__] @@ -53,22 +56,56 @@ def com8MoTPSAMain(cfgMain, cfgInfo=None): log.info("--- STARTING (potential) PARALLEL PART ----") - # Get number of CPU Cores wanted - nCPU = cfgUtils.getNumberOfProcesses(cfgMain, len(rcfFiles)) - - # Create parallel pool and run - # with multiprocessing.Pool(processes=nCPU) as pool: - with Pool(processes=nCPU) as pool: - results = pool.map(com8MoTPSATask, rcfFiles) - pool.close() - pool.join() - - timeNeeded = "%.2f" % (time.time() - startTime) - log.info("Overall (parallel) com8MoTPSA computation took: %s s " % timeNeeded) - log.info("--- ENDING (potential) PARALLEL PART ----") + # Split into chunks to postprocess and clean up work dirs incrementally + chunkSize = 8 + if len(rcfFiles) > chunkSize: + for i in range(0, len(rcfFiles), chunkSize): + rcfFilesChunk = rcfFiles[i:i + chunkSize] + simNamesChunk = [p.stem for p in rcfFilesChunk] + + nCPU = cfgUtils.getNumberOfProcesses(cfgMain, len(rcfFilesChunk)) + + if bool(simNamesChunk): + with Pool(processes=nCPU) as pool: + results = pool.map(com8MoTPSATask, rcfFilesChunk) + pool.close() + pool.join() + + timeNeeded = "%.2f" % (time.time() - startTime) + log.info("Overall (parallel) com8MoTPSA computation took: %s s " % timeNeeded) + log.info("--- ENDING (potential) PARALLEL PART ----") + + # Postprocess the simulations + com8MoTPSAPostprocess(simNamesChunk, cfgMain, inputSimFiles) + + # Delete folder in Work directory after postprocessing to reduce memory costs + avaDir = cfgMain["MAIN"]["avalancheDir"] + for sim in simNamesChunk: + folderName = "Work/com8MoTPSA/" + sim + _checkForFolderAndDelete(avaDir, folderName) + else: + log.warning("There is no simulation to be performed for releaseScenario") + else: + nCPU = cfgUtils.getNumberOfProcesses(cfgMain, len(rcfFiles)) + + simNames = [p.stem for p in rcfFiles] + if bool(simNames): + with Pool(processes=nCPU) as pool: + results = pool.map(com8MoTPSATask, rcfFiles) + pool.close() + pool.join() + + timeNeeded = "%.2f" % (time.time() - startTime) + log.info("Overall (parallel) com8MoTPSA computation took: %s s " % timeNeeded) + log.info("--- ENDING (potential) PARALLEL PART ----") + + # Postprocess the simulations + com8MoTPSAPostprocess(simNames, cfgMain, inputSimFiles) + else: + log.warning("There is no simulation to be performed for releaseScenario") - # Postprocess the simulations - com8MoTPSAPostprocess(simDict, cfgMain, inputSimFiles) + if returnSimName is not None and simDict: + return next(iter(simDict)) def copyRawToLayerPeakFiles(workDir, outputDirPeakFile): @@ -106,7 +143,7 @@ def copyRawToLayerPeakFiles(workDir, outputDirPeakFile): shutil.copy2(source, target) -def com8MoTPSAPostprocess(simDict, cfgMain, inputSimFiles): +def com8MoTPSAPostprocess(simNames, cfgMain, inputSimFiles): """Postprocess MoT-PSA results: rename outputs to L1/L2 peak files, generate plots and reports. For each simulation, copies DataTime.txt and renames raw MoT-PSA output files @@ -114,8 +151,8 @@ def com8MoTPSAPostprocess(simDict, cfgMain, inputSimFiles): Parameters ---------- - simDict : dict - simulation dictionary + simNames : list + list of simulation name strings cfgMain : configparser.ConfigParser main AvaFrame configuration (avalancheDir, plot flags) inputSimFiles : dict @@ -128,7 +165,7 @@ def com8MoTPSAPostprocess(simDict, cfgMain, inputSimFiles): outputDirPeakFile = pathlib.Path(avalancheDir) / "Outputs" / "com8MoTPSA" / "peakFiles" fU.makeADir(outputDirPeakFile) - for key in simDict: + for key in simNames: workDir = pathlib.Path(avalancheDir) / "Work" / "com8MoTPSA" / str(key) # Copy DataTime.txt @@ -137,6 +174,13 @@ def com8MoTPSAPostprocess(simDict, cfgMain, inputSimFiles): copyRawToLayerPeakFiles(workDir, outputDirPeakFile) + # Write config indicator files to track completed simulations + configFileName = "%s.ini" % key + for saveDir in ["configurationFilesDone", "configurationFilesLatest"]: + configDir = pathlib.Path(avalancheDir, "Outputs", "com8MoTPSA", "configurationFiles", saveDir) + with open((configDir / configFileName), "w") as fi: + fi.write("see directory configurationFiles for info on config") + # create plots and report modName = __name__.split(".")[-1] reportDir = pathlib.Path(avalancheDir, "Outputs", modName, "reports") diff --git a/avaframe/com8MoTPSA/com8MoTPSACfg.ini b/avaframe/com8MoTPSA/com8MoTPSACfg.ini index 6c5a76d08..cf0a65075 100644 --- a/avaframe/com8MoTPSA/com8MoTPSACfg.ini +++ b/avaframe/com8MoTPSA/com8MoTPSACfg.ini @@ -222,3 +222,7 @@ Initial CFL number (-) = 0.8 # peak files and plots are exported, option to turn off exports when exportData is set to False # this affects export of peak files and also generation of peak file plots exportData = True + +[VISUALISATION] +# scenario name - can be used for plotting +scenario = diff --git a/avaframe/in3Utils/cfgUtils.py b/avaframe/in3Utils/cfgUtils.py index dafd133ba..0f4ab5aac 100644 --- a/avaframe/in3Utils/cfgUtils.py +++ b/avaframe/in3Utils/cfgUtils.py @@ -958,7 +958,7 @@ def setStrnanToNan(simDF, simDFTest, name): return simDF -def readConfigurationInfoFromDone(avaDir, specDir="", latest=False): +def readConfigurationInfoFromDone(avaDir, specDir="", latest=False, modName=''): """Check avaName/Outputs/com1DFA/configurationFilesDone and pass names of all files found in this directory and create corresponding simDF this is useful if e.g. no allConfigurations.csv has @@ -976,6 +976,8 @@ def readConfigurationInfoFromDone(avaDir, specDir="", latest=False): path to a directory where simulation configuration files directory called configurationFiles can be found - optional latest: bool if True check for files found in avaName/Outputs/com1DFA/configurationFilesLatest + modName: str + name of the module to be used for task (optional) Returns -------- @@ -989,7 +991,10 @@ def readConfigurationInfoFromDone(avaDir, specDir="", latest=False): if specDir != "": inDir = pathlib.Path(specDir, "configurationFiles") else: - inDir = pathlib.Path(avaDir, "Outputs", "com1DFA", "configurationFiles") + if modName == 'com8MoTPSA': + inDir = pathlib.Path(avaDir, "Outputs", "com8MoTPSA", "configurationFiles") + else: + inDir = pathlib.Path(avaDir, "Outputs", "com1DFA", "configurationFiles") # search inDir/configurationFilesDone or inDir/configurationFilesLatest (depending on latest flag) for already existing sims if latest: @@ -1007,13 +1012,23 @@ def readConfigurationInfoFromDone(avaDir, specDir="", latest=False): simDF = None else: # create simDF (dataFrame with one row per simulation of configuration files found in configDir) - simDF = createConfigurationInfo( - avaDir, - comModule="com1DFA", - standardCfg="", - writeCSV=False, - specDir=specDir, - simNameList=simNameExisting, + if modName == 'com8MoTPSA': + simDF = createConfigurationInfo( + avaDir, + comModule="com8MoTPSA", + standardCfg="", + writeCSV=False, + specDir=specDir, + simNameList=simNameExisting, + ) + else: + simDF = createConfigurationInfo( + avaDir, + comModule="com1DFA", + standardCfg="", + writeCSV=False, + specDir=specDir, + simNameList=simNameExisting, ) # check for allConfigurationsInfo to find computation info and add to info fetched from ini files diff --git a/avaframe/out3Plot/outAna6Plots.py b/avaframe/out3Plot/outAna6Plots.py new file mode 100644 index 000000000..02e03024e --- /dev/null +++ b/avaframe/out3Plot/outAna6Plots.py @@ -0,0 +1,891 @@ +import numpy as np +import pandas as pd +import pathlib +import matplotlib.pyplot as plt +from adjustText import adjust_text +from datetime import datetime + + +def barplotSA(SiDF, avaName, outDir): + """ + Create a bar plot of Morris sensitivity results. + + Bars show μ* as percentage of the total sensitivity, with bar width + proportional to σ. A vertical dashed line is drawn where the cumulative μ* + first reaches 80%. + + Parameters + ---------- + SiDF : pandas.DataFrame + DataFrame with at least the following columns: + - ``Parameter`` : str, parameter names + - ``mu_star`` : float, mean absolute elementary effect + - ``mu_star_conf`` : float, optional, confidence interval of μ* + - ``sigma`` : float, standard deviation of elementary effects + avaName : str + Avalanche name, used in the saved filename. + outDir : str or pathlib.Path + Directory where the plot is saved. + """ + + # 1) Sort by mu_star (descending) + df = SiDF.sort_values("mu_star", ascending=False).reset_index(drop=True) + + # 2) Normalize μ* so the percentages sum to 100 + total_mu = df["mu_star"].sum() + mu_pct = 100 * df["mu_star"] / total_mu + + # scale the confidence interval to the same percentage units + if "mu_star_conf" in df: + mu_pct_conf = 100 * df["mu_star_conf"] / total_mu + else: + mu_pct_conf = None + + # 3) Map σ to bar widths + wmin, wmax = 0.3, 0.9 + rng = df["sigma"].max() - df["sigma"].min() + sigma_norm = (df["sigma"] - df["sigma"].min()) / (rng if rng else 1) + bar_widths = wmin + (wmax - wmin) * sigma_norm + + # 4) Plot + x = np.arange(len(df)) + fig, ax = plt.subplots(figsize=(12, 6)) + bars = ax.bar( + x, mu_pct, width=bar_widths, edgecolor="black", + capsize=3 if mu_pct_conf is not None else 0 + ) + + ax.set_xticks(x, df["Parameter"], rotation=60, ha="right") + ax.set_ylabel("μ* (% of total)") + ax.set_title("Sensitivity: μ* (percent of total) with σ mapped to bar width") + + # Show value labels + for xi, yi in zip(x, mu_pct): + ax.text(xi, yi, f"{yi:.1f}%", ha="center", va="bottom", fontsize=9) + + # 7) Save figure + # Include date, format: YYYYMMDD + date = datetime.now().strftime("%Y%m%d") + figName = f"{outDir}/{avaName}_paramRanking_{date}.png" + plt.savefig(figName, dpi=300, bbox_inches="tight") + + +def scatterplotSA(SiDF, avaName, outDir): + """ + Create a scatter plot of Morris sensitivity results (μ* vs σ). + + Parameters + ---------- + SiDF : pandas.DataFrame + DataFrame with at least the following columns: + - ``Parameter`` : str, parameter names + - ``mu_star`` : float, mean absolute elementary effect + - ``sigma`` : float, standard deviation of elementary effects + avaName : str + Avalanche name, used in the saved filename. + outDir : str or pathlib.Path + Directory where the plot is saved. + """ + + # Scatter Plot + plt.figure(figsize=(12, 7)) + plt.scatter(SiDF['mu_star'], SiDF['sigma'], color='blue') + + # Annotate name to the points + for i, txt in enumerate(SiDF['Parameter']): + plt.annotate(txt, (SiDF['mu_star'][i], SiDF['sigma'][i]), fontsize=9, xytext=(5, 5), textcoords='offset points') + + # Label and title + plt.xlabel('mu_star (Einflussstärke)', fontsize=12) + plt.ylabel('sigma (Nichtlinearität / Interaktionen)', fontsize=12) + plt.title('Morris Sensitivitätsanalyse: mu_star vs sigma', fontsize=14) + plt.grid(True) + + # Save figure + # Include date, format: YYYYMMDD + date = datetime.now().strftime("%Y%m%d") + figName = f"{outDir}/{avaName}_Scatterplot_{date}.png" + plt.savefig(figName, dpi=300, bbox_inches="tight") + + +def scatterplotUncertaintySA(SiDF, avaName, outDir): + """ + Create a scatter plot of Morris sensitivity results with uncertainty. + Plots μ* vs σ with horizontal error bars given by ``mu_star_conf``. + + Parameters + ---------- + SiDF : pandas.DataFrame + DataFrame with at least the following columns: + - ``Parameter`` : str, parameter names + - ``mu_star`` : float, mean absolute elementary effect + - ``mu_star_conf`` : float, confidence interval of μ* + - ``sigma`` : float, standard deviation of elementary effects + avaName : str + Avalanche name, used in the saved filename. + outDir : str or pathlib.Path + Directory where the plot is saved. + """ + # Plot with error bars + plt.figure(figsize=(12, 7)) + plt.errorbar( + SiDF['mu_star'], SiDF['sigma'], + xerr=SiDF['mu_star_conf'], + fmt='o', color='blue', ecolor='gray', elinewidth=1.5, capsize=4 + ) + + # Annotations with adjustText + texts = [plt.text(SiDF['mu_star'][i], SiDF['sigma'][i], SiDF['Parameter'][i], fontsize=9) for i in range(len(SiDF))] + adjust_text(texts, arrowprops=dict(arrowstyle='->', color='gray', lw=0.5)) + + # Axes and layout + plt.xlabel('mu_star (Einflussstärke)', fontsize=12) + plt.ylabel('sigma (Nichtlinearität / Interaktionen)', fontsize=12) + plt.title('Morris Sensitivitätsanalyse: mu_star vs sigma (mit Unsicherheit)', fontsize=14) + + # Save figure + # Include date, format: YYYYMMDD + date = datetime.now().strftime("%Y%m%d") + figName = f"{outDir}/{avaName}_ScatterplotUncertainty_{date}.png" + plt.savefig(figName, dpi=300, bbox_inches="tight") + + +def BOConvergencePlot(finalDF, avaName, outDir): + """ + This function visualises the evolution of the optimisation variable + (loss) across different sampling phases: + + 1. Latin hypercube sampling + 2. Bayesian optimisation (EI/LCB) + 3. Optional non-sequential surrogate-based sampling + + Parameters + ---------- + finalDF : pandas.DataFrame + DataFrame containing simulation results. + + avaName : str + Name of the avalanche. Used for naming the output figure. + + outDir : pathlib.Path + Directory where the convergence plot will be saved. + + """ + # Color palette + c_best = '#4e79a7' + c_bo = '#59a14f' + c_lhs = '#76b7b2' + c_nonseq = '#e15759' + + df = finalDF.dropna( + subset=["sampleMethods", "order", "optimisationVariable"] + ).copy() + + # Split by sampling method + latin = df[df["sampleMethods"] == "latin"].sort_values("order") + bo = df[df["sampleMethods"] == "EI/LCB"].sort_values("order") + nonseq = df[df["sampleMethods"] == "nonSeq"].sort_values("order") + + # Iteration axis + current_offset = 0 + + if not latin.empty: + latin["iter"] = latin["order"] + current_offset = latin["iter"].max() + 1 + + if not bo.empty: + bo["iter"] = bo["order"] + current_offset + current_offset = bo["iter"].max() + 1 + + if not nonseq.empty: + nonseq = nonseq.sort_index().copy() + nonseq["iter"] = np.arange(current_offset, current_offset + len(nonseq)) + current_offset = nonseq["iter"].max() + 1 + + # Combine for best-so-far + all_parts = [latin, bo] + if not nonseq.empty: + all_parts.append(nonseq) + + all_df = pd.concat(all_parts).sort_values("iter") + all_df["best_so_far"] = all_df["optimisationVariable"].cummin() + + # Plot + fig, ax = plt.subplots(figsize=(9, 5)) + + if not latin.empty: + ax.scatter( + latin["iter"], + latin["optimisationVariable"], + s=35, + alpha=0.7, + color=c_lhs, + label="Latin hypercube" + ) + + if not bo.empty: + ax.scatter( + bo["iter"], + bo["optimisationVariable"], + s=40, + alpha=0.85, + color=c_bo, + label="Bayesian optimisation (EI/LCB)" + ) + + if not nonseq.empty: + ax.scatter( + nonseq["iter"], + nonseq["optimisationVariable"], + s=40, + alpha=0.85, + color=c_nonseq, + label="Non-sequential sampling" + ) + + ax.plot( + all_df["iter"], + all_df["best_so_far"], + linewidth=1.5, + color=c_best, + label="Best-so-far" + ) + + # Add separator lines between sampling phases (if applicable) + if not latin.empty: + ax.axvline(latin["iter"].max() + 0.5, linestyle="--", linewidth=1, color="black") + if not bo.empty: + ax.axvline(bo["iter"].max() + 0.5, linestyle="--", linewidth=1, color="black") + + ax.set_xlabel("Iteration") + ax.set_ylabel("Optimisation variable (loss)") + ax.set_title("Convergence: Latin hypercube → Bayesian optimisation") + ax.legend(frameon=False, loc='best') + + fig.tight_layout() + + # Save figure + date = datetime.now().strftime("%Y%m%d") + figName = f"{outDir}/{avaName}_BOConvergence_{date}.png" + fig.savefig(figName, dpi=300, bbox_inches="tight") + plt.close(fig) + + +def BOBoxplot(paramLossDF, avaName, outDir, N=10): + """ + Create boxplots of the top-N parameter sets based on loss. + + This function selects the N best-performing simulations (lowest loss) + and visualises the distribution of their parameter values using boxplots. + Each parameter is plotted in a separate subplot. + + Parameters + ---------- + paramLossDF : pandas.DataFrame + DataFrame containing model parameters and corresponding loss values. + + avaName : str + Name of the avalanche. Used for naming the output figure. + + outDir : pathlib.Path + Directory where the boxplot figure will be saved. + + N : int, optional + Number of best-performing simulations (lowest loss) to include. + Default is 10. + """ + df_best = paramLossDF.nsmallest(N, "Loss") + param_cols = paramLossDF.columns.drop('Loss') + + fig, axes = plt.subplots(2, 4, figsize=(10, 6)) + axes = axes.flatten() + for ax, col in zip(axes, param_cols): + ax.boxplot(df_best[col]) + ax.set_xticklabels([col]) + + # If less than 8 parameters, remove empty axes + for ax in axes[len(param_cols):]: + ax.axis("off") + plt.tight_layout() + + # Save figure + # Include date, format: YYYYMMDD + date = datetime.now().strftime("%Y%m%d") + figName = f"{outDir}/{avaName}_BOBoxplot_{date}.png" + plt.savefig(figName, dpi=300, bbox_inches="tight") + + +def BOBoxplotNormalised(paramLossDF, paramBounds, avaName, outDir, N=10): + """ + Create normalized boxplots of parameters for the top-N simulations. + + This function selects the N best-performing simulations (lowest loss) + and visualises the distribution of their parameter values after + min–max normalization based on predefined parameter bounds. + + Parameters + ---------- + paramLossDF : pandas.DataFrame + DataFrame containing model parameters and corresponding loss values. + + paramBounds : dict + Dictionary mapping parameter names to their (min, max) bounds: + {parameter_name: (lower_bound, upper_bound)} + + Bounds are used for min–max normalization. + + avaName : str + Name of the avalanche. Used for naming the output figure. + + outDir : pathlib.Path + Directory where the normalized boxplot figure will be saved. + + N : int, optional + Number of best-performing simulations (lowest loss) to include. + Default is 10. + """ + df_best = paramLossDF.nsmallest(N, "Loss") + param_cols = paramLossDF.columns.drop('Loss') + + # normalize using parameter bounds + data = [ + (df_best[c] - paramBounds[c][0]) / (paramBounds[c][1] - paramBounds[c][0]) + for c in param_cols + ] + + fig, ax = plt.subplots(figsize=(15, 7)) + ax.boxplot(data) + ax.set_xticks(range(1, len(param_cols) + 1)) + ax.set_xticklabels(param_cols, rotation=45, ha="right", fontsize=14) + ax.set_ylabel("Normalized value (min–max)", fontsize=14) + ax.set_title(f"Normalized parameter distributions (best {N})", fontsize=16) + ax.tick_params(axis="y", labelsize=12) + fig.tight_layout() + + # Save figure + # Include date, format: YYYYMMDD + date = datetime.now().strftime("%Y%m%d") + figName = f"{outDir}/{avaName}_BOBoxplotNormalised_{date}.png" + plt.savefig(figName, dpi=300, bbox_inches="tight") + + +def saveKFoldCVPrintImage(scores, pipeName, k, out_path): + """ + Save a summary of K-fold cross-validation results as an image. The output image contains metrics for both + training and test sets. + + Reported metrics per split: + - RMSE (Root Mean Squared Error) + - MAE (Mean Absolute Error) + - R² (Coefficient of determination) + + Parameters + ---------- + scores : dict + Dictionary containing cross-validation results. Expected keys: + - "train_rmse", "train_mae", "train_r2" + - "test_rmse", "test_mae", "test_r2" + + pipeName : str + Name of the model or pipeline. Included in the figure header. + + k : int + Number of folds used in cross-validation. + + out_path : str or pathlib.Path + File path where the generated image will be saved. + """ + lines = [f"{pipeName}, {k}-fold CV:\n"] + for split in ("test", "train"): + lines.append(f"{split.capitalize()} metrics:") + for m in ("rmse", "mae", "r2"): + arr = scores[f"{split}_{m}"] + lines.append(f" {m.upper():<4}: {arr.mean():.4g} ± {arr.std():.4g}") + lines.append("") + + text = "\n".join(lines) + + fig = plt.figure(figsize=(6, 4)) + plt.axis("off") + plt.text(0.01, 0.99, text, va="top", family="monospace") + + plt.savefig(out_path, dpi=300, bbox_inches="tight") + plt.close(fig) + + +def saveBestorCurrentModelrun(finalDF, paramSelected, ei=None, lcb=None, simName=None, csv_path='dummy.csv'): + """ + Save either the best model run (based on optimisationVariable) or a + specified simulation (simName) to a CSV file. + + Parameters + ---------- + finalDF : pandas.DataFrame + containing all simulation results and optimization metrics. + paramSelected : list of str + List of parameter names that should be includedvin the exported output. + ei : float, optional + Expected Improvement value (used in Bayesian optimization). + If None, the column will be created with None. + lcb : float, optional + Lower Confidence Bound value (used in Bayesian optimization). + If None, the column will be created with None. + simName : str, optional + If provided, the row corresponding to this simulation name is saved. + If None, the row with the minimal optimisationVariable is saved. + csv_path : str or pathlib.Path, optional + Path to the CSV file. + + Notes + ----- + Only a subset of relevant columns (including selected parameters) + is written to the output file. + """ + # Subset df, save only important entries + cols_keep = ['simName', 'sampleMethods', 'order', 'Simulation time (s)', 'Minimum time step (s)', + 'Initial CFL number (-)', 'TP_SimRef_cells', 'TP_SimRef_area', 'FP_SimRef_cells', 'FP_SimRef_area', + 'FN_SimRef_cells', 'FN_SimRef_area', 'recall', 'precision', 'f1_score', 'tversky_score', '1-tversky', + 'runoutRMSENormalised', 'optimisationVariable'] + columns_keep = cols_keep[:6] + [p for p in paramSelected if p not in cols_keep] + cols_keep[6:] + df = finalDF[columns_keep] + + if simName is not None: + row = df.loc[df['simName'] == simName].copy() + else: + idx = df['optimisationVariable'].idxmin() + row = df.loc[[idx]].copy() + + # Ensure the optional columns always exist + if ei is not None: + row["ei"] = ei + else: + row['ei'] = None + if lcb is not None: + row["lcb"] = lcb + else: + row['lcb'] = None + # Write to csv + path = pathlib.Path(csv_path) + row.to_csv(path, mode="a", index=False, header=not path.exists()) + + +def saveTopCandidates(finalDF, paramSelected, cfgOpt, results_dict=None, out_path="analysisTable.png", title=None, + simNameMean=None, simNameBest=None): + """ + Create result table(s) for surrogate/model top candidates and save as PNG and CSV. + + Behavior + -------- + - If `results_dict` is provided: creates up to two tables + 1) Surrogate summary (TopNBest mean/std + surrogate single best in "best" column) + Optionally appends a row "optimisationVariable" where: + - "mean" is filled from `simNameMean` (lookup in finalDF) + - "best" is filled from `simNameBest` (lookup in finalDF) + 2) Model summary (Top N + single best), where N is read from cfgOpt['OPTIMISATION']['n_model_top'] + - If `results_dict` is None: creates only the model table (2). + + Outputs + ------- + - PNG at `out_path` + - CSV next to the PNG with suffix `_tables.csv` (tidy format with a `table` column) + + Parameters + ---------- + finalDF : pandas.DataFrame + Must contain column "optimisationVariable". + If simNameMean/simNameBest is used, must contain column "simName". + Must contain parameter columns listed in `paramSelected`. + paramSelected : list[str] + Parameter column names to summarize for the model table. + cfgOpt : configparser.ConfigParser-like + Needs: + - cfgOpt['OPTIMISATION']['n_surrogate_top'] + - cfgOpt['OPTIMISATION']['n_model_top'] + results_dict : dict | None, optional + If provided, expected keys: + - "TopNBest": dict with "mean_params", "std_params", "mean_mu", "std_mu", "mean_sigma", "std_sigma" + - "Best": dict with "params", "mu", "sigma" + out_path : str | pathlib.Path, optional + Path to save PNG figure. + title : str | None, optional + Optional global title for the PNG figure. + simNameMean : str | None, optional + Simulation name whose model optimisationVariable is placed into the surrogate table row + "optimisationVariable" under the "mean" column. + simNameBest : str | None, optional + Simulation name whose model optimisationVariable is placed into the surrogate table row + "optimisationVariable" under the "best" column. + + Returns + ------- + pathlib.Path + Path to the saved PNG figure. + """ + tables = [] + + # ========================================================== + # Surrogate table (optional) + # ========================================================== + if results_dict is not None: + top = results_dict["TopNBest"] + + # --- normalize mean/std params to Series --- + mean_params = top["mean_params"] + std_params = top["std_params"] + + # If params are list/tuple of (name,value) pairs, convert to dict first + if not isinstance(mean_params, dict): + mean_params = dict(mean_params) + if not isinstance(std_params, dict): + std_params = dict(std_params) + + df_top = pd.DataFrame( + { + "mean": pd.Series(mean_params, dtype="float64"), + "std": pd.Series(std_params, dtype="float64"), + } + ) + df_top["relStd [%]"] = relstd(df_top["std"], df_top["mean"]) + + extra = pd.DataFrame( + { + "mean": [top["mean_mu"], top["mean_sigma"]], + "std": [top["std_mu"], top["std_sigma"]], + }, + index=["optimisationVariable_Surrogate", "sigma"], + ) + extra["relStd [%]"] = relstd(extra["std"], extra["mean"]) + + df_top = pd.concat([df_top, extra], axis=0) + + # --- create/attach "best" column from surrogate Best --- + best = results_dict["Best"] + best_params = best["params"] + if not isinstance(best_params, dict): + best_params = dict(best_params) + + best_series = pd.Series(best_params, dtype="float64") + best_series.loc["optimisationVariable_Surrogate"] = float(best["mu"]) + best_series.loc["sigma"] = float(best["sigma"]) + df_top["best"] = best_series + + # --- add model optimisationVariable row: mean and/or best --- + def _get_optvar_for_sim(sim_name, label): + if "simName" not in finalDF.columns: + raise ValueError(f"{label} set but finalDF has no 'simName' column.") + sel = finalDF.loc[finalDF["simName"] == sim_name, "optimisationVariable"] + if sel.empty: + raise ValueError(f"{label}='{sim_name}' not found in finalDF['simName'].") + return float(pd.to_numeric(sel.iloc[0], errors="coerce")) + + if simNameMean is not None or simNameBest is not None: + # Ensure row exists + if "optimisationVariable" not in df_top.index: + df_top.loc["optimisationVariable", ["mean", "std", "relStd [%]", "best"]] = [np.nan, np.nan, np.nan, + np.nan] + + if simNameMean is not None: + df_top.loc["optimisationVariable", ["mean", "std", "relStd [%]"]] = [ + _get_optvar_for_sim(simNameMean, "simNameMean"), + np.nan, + np.nan, + ] + if simNameBest is not None: + df_top.loc["optimisationVariable", "best"] = _get_optvar_for_sim(simNameBest, "simNameBest") + + n_surrogate_top = int(cfgOpt["OPTIMISATION"]["n_surrogate_top"]) + mean_tag = f" ({simNameMean})" if simNameMean else "" + best_tag = f" ({simNameBest})" if simNameBest else "" + + t1 = ( + f"Surrogate: Mean of Top {n_surrogate_top} Best{mean_tag} " + f"+ Single Best{best_tag}" + ) + tables.append((fmt_df(df_top), t1)) + + # ========================================================== + # Model table (always) + # ========================================================== + if "optimisationVariable" not in finalDF.columns: + raise ValueError("finalDF must contain column 'optimisationVariable'.") + + best_idx = finalDF["optimisationVariable"].idxmin() + + n_model_top = int(cfgOpt["OPTIMISATION"]["n_model_top"]) + topN = finalDF.nsmallest(n_model_top, "optimisationVariable") + + df_topN = summary_table(topN, paramSelected, best_row=finalDF.loc[best_idx]) + + opt_mean = pd.to_numeric(topN["optimisationVariable"], errors="coerce").mean() + opt_std = pd.to_numeric(topN["optimisationVariable"], errors="coerce").std() + + df_topN.loc["optimisationVariable", ["mean", "std", "relStd [%]", "best"]] = [ + opt_mean, + opt_std, + relstd(opt_std, opt_mean), + pd.to_numeric(finalDF.loc[best_idx, "optimisationVariable"], errors="coerce"), + ] + + best_sim = finalDF.at[best_idx, "simName"] if "simName" in finalDF.columns else "" + t2 = f"Model: Mean of Top {n_model_top} Best + Single Best{f' ({best_sim})' if best_sim else ''}" + tables.append((fmt_df(df_topN), t2)) + + # ========================================================== + # Plot + # ========================================================== + out_path = pathlib.Path(out_path) + out_path.parent.mkdir(parents=True, exist_ok=True) + + fig_h = 1.2 + 0.38 * sum(len(df) for df, _ in tables) + fig, axes = plt.subplots(len(tables), 1, figsize=(10, fig_h)) + axes = [axes] if len(tables) == 1 else axes + + if title: + fig.suptitle(title, fontsize=14, y=0.99) + + for ax, (df_disp, t) in zip(axes, tables): + ax.axis("off") + tbl = ax.table( + cellText=df_disp.values, + rowLabels=df_disp.index.tolist(), + colLabels=df_disp.columns.tolist(), + loc="center", + ) + tbl.auto_set_font_size(False) + tbl.set_fontsize(9) + tbl.scale(1, 1.2) + ax.set_title(t, fontsize=12, pad=10) + + plt.tight_layout(rect=(0, 0, 1, 0.98)) + fig.savefig(out_path, dpi=200, bbox_inches="tight") + plt.close(fig) + + # ========================================================== + # CSV export (tidy) + # ========================================================== + csv_path = out_path.with_name(f"{out_path.stem}_tables.csv") + frames = [] + for df_disp, t in tables: + tmp = df_disp.copy().reset_index(names=["row"]) + tmp.insert(0, "table", t) + frames.append(tmp) + pd.concat(frames, ignore_index=True).to_csv(csv_path, index=False) + + return out_path + + +def formatSig(x): + """ + Format numbers for display in tables (significant digits, sci notation for small values). + - NaN or non-numeric values → "" + - 0 → "0" + - |x| < 1e-3 → scientific notation with 2 decimal places + - |x| < 100 → 2 significant digits + - |x| ≥ 100 → rounded integer + + Parameters + ---------- + x : Any + Value to be formatted. If conversion to float fails or the value + is NaN, an empty string is returned. + + Returns + ------- + str + Formatted string representation suitable for compact table output. + """ + # --- Number formatting --- + try: + x = float(x) + except (TypeError, ValueError): + return "" + if pd.isna(x): + return "" + if x == 0.0: + return "0" + if abs(x) < 1e-3: + return f"{x:.2e}" # small numbers in scientific notation + elif abs(x) < 100: + return f"{x:.2g}" # 2 significant digits + else: + return str(int(round(x))) + + +def fmt_df(df): + """ + Apply formatSig to common summary columns for prettier display/export. + + Parameters + ---------- + df : pandas.DataFrame + Input DataFrame containing summary statistics. + + Returns + ------- + pandas.DataFrame + A copy of the input DataFrame with selected columns formatted + as strings for display purposes. + + """ + g = df.copy() + for c in ["mean", "std", "relStd [%]", "best"]: + if c in g.columns: + g[c] = g[c].map(formatSig) + return g + + +def relstd(std, mean): + """ + Relative std in percent; returns NaN if mean is 0 or NaN. + + Parameters + ---------- + std : array-like or scalar + Standard deviation values. + + mean : array-like or scalar + Mean values corresponding to `std`. + + Returns + ------- + numpy.ndarray + Relative standard deviation in percent. Returns NaN where: + - mean is 0 + - mean is NaN + - conversion to numeric fails + """ + mean = pd.to_numeric(mean, errors="coerce") + std = pd.to_numeric(std, errors="coerce") + return np.where((mean == 0) | pd.isna(mean), np.nan, std / mean * 100.0) + + +def summary_table(df, cols, best_row=None): + """ + Compute summary statistics (mean, standard deviation, relative standard deviation, + and optional best values) for selected numeric columns. + + The function converts the specified columns to numeric (coercing errors to NaN), + computes column-wise statistics, and returns them in a compact summary table. + + Parameters + ---------- + df : pandas.DataFrame + Input DataFrame containing the data to summarise. + + cols : list-like + List of column names for which summary statistics should be computed. + + best_row : pandas.Series or dict-like, optional + Row containing reference or "best" parameter values. If provided, + the values corresponding to `cols` are included in the output under + the column "best". If None, the "best" column is filled with NaN. + + Returns + ------- + pandas.DataFrame + DataFrame indexed by `cols` containing: + - "mean" : Column-wise mean + - "std" : Column-wise standard deviation + - "relStd [%]" : Relative standard deviation in percent + - "best" : Reference/best values (if provided) + """ + X = df[cols].apply(pd.to_numeric, errors="coerce") + out = pd.DataFrame({"mean": X.mean(), "std": X.std()}) + out["relStd [%]"] = relstd(out["std"], out["mean"]) + if best_row is not None: + out["best"] = pd.to_numeric(best_row[cols], errors="coerce").values + else: + out["best"] = np.nan + return out + + +def plotMorrisConvergence(SA_dfs, r_values, reference_r, k=None, outpath=None, title=None): + """ + Plot convergence of Morris mu_star sensitivities. + + Parameters + ---------- + SA_dfs : list of pandas.DataFrame + Morris result DataFrames. + Must contain columns: 'mu_star', 'Parameter' + r_values : list[int] + Number of trajectories corresponding to SA_dfs. + reference_r : int + Trajectory count r that defines the reference dataset (parameter ranking/order). + k : int or None, optional + Number of top parameters to plot. If None, all parameters are plotted. + outpath : str or pathlib.Path, optional + If given, figure is saved to this path. + title : str, optional + Plot title. + Notes + -------- + - This code was written with AI. + + """ + + # -------------------------------------------------- + # Normalize (in-place safe) + ALIGN BY PARAMETER NAME + # -------------------------------------------------- + SA_dfs_aligned = [] + for df in SA_dfs: + d = df.copy() + if "pct" not in d.columns: + d["pct"] = d["mu_star"] / d["mu_star"].sum() * 100 + # Key fix: align all dataframes by the unique parameter name + d = d.set_index("Parameter") + SA_dfs_aligned.append(d) + + if reference_r not in r_values: + raise ValueError(f"reference_r={reference_r} not found in r_values") + + ref_idx = r_values.index(reference_r) + SA_ref = SA_dfs_aligned[ref_idx] + + # -------------------------------------------------- + # Select parameters (REFERENCE DEFINES ORDER) + # -------------------------------------------------- + if k is None: + top_params = SA_ref.index.tolist() + else: + top_params = SA_ref["pct"].nlargest(k).index.tolist() + + labels = top_params # already parameter names + + # -------------------------------------------------- + # Stack results (REINDEX EACH DF TO REFERENCE ORDER) + # -------------------------------------------------- + Y = np.column_stack([ + d.reindex(top_params)["pct"].to_numpy() + for d in SA_dfs_aligned + ]) + + # -------------------------------------------------- + # Plot + # -------------------------------------------------- + fig, ax = plt.subplots(figsize=(12, 8)) + ax.grid(True, ls="--", lw=0.8, color="gray", alpha=0.6, zorder=0) + + for label, y in zip(labels, Y): + ax.plot( + r_values, + y, + marker="o", + lw=1.5, + label=label + ) + + ax.set_xticks(r_values, [f"r = {r}" for r in r_values]) + ax.set_xlabel("Number of Morris trajectories (r)", fontsize=16) + ax.set_ylabel("Mu* sensitivity (%)", fontsize=16) + + if title is None: + title = "Morris sensitivity convergence" + ax.set_title(title, fontsize=18) + + ax.tick_params(axis="both", labelsize=14) + ax.legend(fontsize=11) + fig.tight_layout() + + if outpath is not None: + fig.savefig(outpath, dpi=300, bbox_inches="tight") diff --git a/avaframe/runScripts/runPlotAreaRefDiffs.py b/avaframe/runScripts/runPlotAreaRefDiffs.py index 9afb82ebf..57bbb703f 100644 --- a/avaframe/runScripts/runPlotAreaRefDiffs.py +++ b/avaframe/runScripts/runPlotAreaRefDiffs.py @@ -6,6 +6,8 @@ # importing general python modules import pathlib import numpy as np +import pickle +import re # Local imports from avaframe.in3Utils import cfgUtils @@ -19,100 +21,62 @@ import avaframe.com1DFA.DFAtools as DFAtls -################USER Input############# -resType = "ppr" -thresholdValueSimulation = 0.9 -modName = "com1DFA" -############################################################ - -# Load avalanche directory from general configuration file -cfgMain = cfgUtils.getGeneralConfig() -avalancheDir = cfgMain["MAIN"]["avalancheDir"] -outDir = pathlib.Path(avalancheDir, "Outputs", "out1Peak") -fU.makeADir(outDir) - -# Start logging -logName = "plotAreaDiff_%s" % (resType) -# Start logging -log = logUtils.initiateLogger(avalancheDir, logName) -log.info("MAIN SCRIPT") -log.info("Current avalanche: %s", avalancheDir) - - -# initialize DEM from avalancheDir (used to perform simulations) -# TODO: if meshCellSize was changed - use actual simulation DEM -dem = gI.readDEM(avalancheDir) -# get normal vector of the grid mesh -dem = gT.getNormalMesh(dem, num=1) -# get real Area -dem = DFAtls.getAreaMesh(dem, 1) -dem["originalHeader"] = dem["header"] - -# read reference data set -inDir = pathlib.Path(avalancheDir, "Inputs") -referenceFile, availableFile, _ = gI.getAndCheckInputFiles( - inDir, "REFDATA", "POLY", fileExt="shp", fileSuffix="POLY" -) -# convert polygon to raster with value 1 inside polygon and 0 outside the polygon -referenceLine = shpConv.readLine(referenceFile, "reference", dem) -referenceLine = gT.prepareArea(referenceLine, dem, np.sqrt(2), combine=True, checkOverlap=False) - -# if available zoom into area provided by crop shp file in Inputs/CROPSHAPE -cropFile, cropInfo, _ = gI.getAndCheckInputFiles( - inDir, "POLYGONS", "cropFile", fileExt="shp", fileSuffix="_cropshape" -) -if cropInfo: - cropLine = shpConv.readLine(cropFile, "cropFile", dem) - cropLine = gT.prepareArea(cropLine, dem, np.sqrt(2), combine=True, checkOverlap=False) - -if modName in ["com1DFA", "com5SnowSlide", "com6RockAvalanche", "com8MoTPSA", "com9MoTVoellmy"]: - # load dataFrame for all configurations of simulations in avalancheDir - simDF = cfgUtils.createConfigurationInfo(avalancheDir) - # create data frame that lists all available simulations and path to their result type result files - inputsDF, resTypeList = fU.makeSimFromResDF(avalancheDir, "com1DFA") - # merge parameters as columns to dataDF for matching simNames - dataDF = inputsDF.merge(simDF, left_on="simName", right_on="simName") - - ## loop over all simulations and load desired resType - for index, row in dataDF.iterrows(): - simFile = row[resType] - simData = IOf.readRaster(simFile) - - # compute referenceMask and simulationMask and true positive, false positive and false neg. arrays - # here thresholdValueReference is set to 0.9 as when converting the polygon to a raster, - # values inside polygon are set to 1 and outside to 0 - refMask, compMask, indicatorDict = oPD.computeAreaDiff( - referenceLine["rasterData"], - simData["rasterData"], - 0.9, - thresholdValueSimulation, - dem, - cropToArea=cropLine["rasterData"], +def runPlotAreaRefDiffs(resType, thresholdValueSimulation, modName): + # Load avalanche directory from general configuration file + cfgMain = cfgUtils.getGeneralConfig() + avalancheDir = cfgMain["MAIN"]["avalancheDir"] + outDir = pathlib.Path(avalancheDir, "Outputs", "out1Peak") + fU.makeADir(outDir) + + # Start logging + logName = "plotAreaDiff_%s" % (resType) + # Start logging + log = logUtils.initiateLogger(avalancheDir, logName) + log.info("MAIN SCRIPT") + log.info("Current avalanche: %s", avalancheDir) + + # initialize DEM from avalancheDir (used to perform simulations) + # TODO: if meshCellSize was changed - use actual simulation DEM + dem = gI.readDEM(avalancheDir) + # get normal vector of the grid mesh + dem = gT.getNormalMesh(dem, num=1) + # get real Area + dem = DFAtls.getAreaMesh(dem, 1) + dem["originalHeader"] = dem["header"] + + # read reference data set + inDir = pathlib.Path(avalancheDir, "Inputs") + referenceFile, availableFile, _ = gI.getAndCheckInputFiles( + inDir, "REFDATA", "POLY", fileExt="shp", fileSuffix="POLY" ) + # convert polygon to raster with value 1 inside polygon and 0 outside the polygon + referenceLine = shpConv.readLine(referenceFile, "reference", dem) + referenceLine = gT.prepareArea(referenceLine, dem, np.sqrt(2), combine=True, checkOverlap=False) - # plot differences - oPD.plotAreaDiff( - referenceLine["rasterData"], - refMask, - simData["rasterData"], - compMask, - resType, - simData["header"], - thresholdValueSimulation, - outDir, - indicatorDict, - row["simName"], - cropFile=cropFile, + # if available zoom into area provided by crop shp file in Inputs/CROPSHAPE + cropFile, cropInfo, _ = gI.getAndCheckInputFiles( + inDir, "POLYGONS", "cropFile", fileExt="shp", fileSuffix="_cropshape" ) -else: - # load all result files - resultDir = pathlib.Path(avalancheDir, "Outputs", modName, "peakFiles") - peakFilesList = list(resultDir.glob("*_%s.tif" % resType)) + list(resultDir.glob("*_%s.asc" % resType)) - for pF in peakFilesList: - simData = IOf.readRaster(pF) - simName = pF.stem + if cropInfo: + cropLine = shpConv.readLine(cropFile, "cropFile", dem) + cropLine = gT.prepareArea(cropLine, dem, np.sqrt(2), combine=True, checkOverlap=False) + + if modName in ["com1DFA", "com5SnowSlide", "com6RockAvalanche"]: + # load dataFrame for all configurations of simulations in avalancheDir + simDF = cfgUtils.createConfigurationInfo(avalancheDir) + # create data frame that lists all available simulations and path to their result type result files + inputsDF, resTypeList = fU.makeSimFromResDF(avalancheDir, "com1DFA") + # merge parameters as columns to dataDF for matching simNames + dataDF = inputsDF.merge(simDF, left_on="simName", right_on="simName") + + ## loop over all simulations and load desired resType + for index, row in dataDF.iterrows(): + simFile = row[resType] + simData = IOf.readRaster(simFile) # compute referenceMask and simulationMask and true positive, false positive and false neg. arrays + # here thresholdValueReference is set to 0.9 as when converting the polygon to a raster, + # values inside polygon are set to 1 and outside to 0 refMask, compMask, indicatorDict = oPD.computeAreaDiff( referenceLine["rasterData"], simData["rasterData"], @@ -133,6 +97,76 @@ thresholdValueSimulation, outDir, indicatorDict, - simName, + row["simName"], cropFile=cropFile, ) + else: + # load all result files + resultDir = pathlib.Path(avalancheDir, "Outputs", modName, "peakFiles") + peakFilesList = list(resultDir.glob("*_%s.tif" % resType)) + list(resultDir.glob("*_%s.asc" % resType)) + + allResults = [] + + for pF in peakFilesList: + simData = IOf.readRaster(pF) + simName = pF.stem + + # compute referenceMask and simulationMask and true positive, false positive and false neg. arrays + refMask, compMask, indicatorDict = oPD.computeAreaDiff( + referenceLine["rasterData"], + simData["rasterData"], + 0.9, + thresholdValueSimulation, + dem, + cropToArea=cropLine["rasterData"], + ) + + # plot differences + oPD.plotAreaDiff( + referenceLine["rasterData"], + refMask, + simData["rasterData"], + compMask, + resType, + simData["header"], + thresholdValueSimulation, + outDir, + indicatorDict, + simName, + cropFile=cropFile, + ) + + allResults.append({ + "sim_name": simName, + "res_type": resType, + "threshold": thresholdValueSimulation, + "indicator_dict": indicatorDict, + }) + + # Save summary of TP/FP/FN indicators as pickle + rows = [] + for entry in allResults: + cleanName = re.sub(r"_ppr$", "", entry["sim_name"]) + indicators = entry["indicator_dict"] + row = {"simName": cleanName} + + for key, subdict in indicators.items(): + shortKey = key.replace("truePositive", "TP_SimRef") \ + .replace("falsePositive", "FP_SimRef") \ + .replace("falseNegative", "FN_SimRef") + row["%s_cells" % shortKey] = subdict.get("nCells", None) + row["%s_area" % shortKey] = subdict.get("areaSum", None) + + rows.append(row) + + with open(outDir / "arealIndicators.pkl", "wb") as f: + pickle.dump(rows, f) + + +if __name__ == "__main__": + ################USER Input############# + resType = "ppr" + thresholdValueSimulation = 1 + modName = "com8MoTPSA" + + runPlotAreaRefDiffs(resType, thresholdValueSimulation, modName)