diff --git a/hydromt_fiat/fiat.py b/hydromt_fiat/fiat.py index a4567380..79ce2e46 100644 --- a/hydromt_fiat/fiat.py +++ b/hydromt_fiat/fiat.py @@ -18,12 +18,11 @@ from hydromt_fiat import DATADIR from hydromt_fiat.config import Config from hydromt_fiat.workflows.exposure_vector import ExposureVector -from hydromt_fiat.workflows.hazard import * # TODO: when the hazard module is updated, explicitly mention the functions that are imported +from hydromt_fiat.workflows.hazard import create_lists, check_lists_size, read_maps, check_maps_metadata, check_maps_rp, check_map_uniqueness, create_risk_dataset from hydromt_fiat.workflows.social_vulnerability_index import SocialVulnerabilityIndex from hydromt_fiat.workflows.vulnerability import Vulnerability from hydromt_fiat.workflows.aggregation_areas import join_exposure_aggregation_areas - __all__ = ["FiatModel"] _logger = logging.getLogger(__name__) @@ -335,7 +334,8 @@ def setup_hazard( risk_output: bool = False, unit_conversion_factor: float = 1.0, ) -> None: - """Set up hazard maps. This component integrates multiple checks for the maps + """Set up hazard maps. This component integrates multiple checks for the hazard + maps. Parameters ---------- @@ -372,58 +372,24 @@ def setup_hazard( risk_output : bool, optional The parameter that defines if a risk analysis is required, by default False """ - # check parameters types and size, and existance of provided files of maps - - params = check_parameters_type(map_fn, map_type, rp, crs, nodata, var, chunks) - check_parameters_size(params) - check_files(params, self.root) - + # create lists of maps and their parameters to be able to iterate over them + params = create_lists(map_fn, map_type, rp, crs, nodata, var, chunks) + check_lists_size(params) + rp_list = [] map_name_lst = [] - # retrieve maps information from parameters and datacatalog - # load maps in memory and check them and save the with st_map function for idx, da_map_fn in enumerate(params["map_fn_lst"]): - da_map_fn, da_name, da_type = read_floodmaps(params, da_map_fn, idx) - - # load flood maps to memory - # da = load_floodmaps(self.data_catalog, self.region,da_map_fn,da_name,name_catalog) - # reading from path - if isinstance(da_map_fn, Path): - if da_map_fn.stem == "sfincs_map": - sfincs_root = os.path.dirname(da_map_fn) - sfincs_model = SfincsModel( - sfincs_root, mode="r", logger=self.logger - ) - sfincs_model.read_results() - # save sfincs map as GeoTIFF - # result_list = list(sfincs_model.results.keys()) - # sfincs_model.write_raster("results.zsmax", compress="LZW") - da = sfincs_model.results["zsmax"] - # da = da.squeeze('timemax').drop('timemax') - da = da.isel(timemax=0).drop("timemax") - - - else: - if not self.region.empty: - # da = self.data_catalog.get_rasterdataset( - # da_map_fn, geom=self.region - # ) - da = self.data_catalog.get_rasterdataset(da_map_fn) - else: - da = self.data_catalog.get_rasterdataset(da_map_fn) - # Convert to units of the exposure data if required - # reading from the datacatalog - else: - if not self.region.empty: - # da = self.data_catalog.get_rasterdataset( - # name_catalog, variables=da_name, geom=self.region - # ) - da = self.data_catalog.get_rasterdataset(map_fn, variables=da_name) - else: - da = self.data_catalog.get_rasterdataset(map_fn, variables=da_name) - if self.exposure.unit != da.units: - da = da * unit_conversion_factor + # read maps and retrieve their attributes + da_map_fn, da_name, da_type = read_maps(params, da_map_fn, idx) + + da = self.data_catalog.get_rasterdataset(da_map_fn, geom=self.region) + + # Convert to units of the exposure data if required + if self.exposure in locals() or self.exposure in globals(): # change to be sure that the unit information is available from the expousure dataset + if self.exposure.unit != da.units: + da = da * unit_conversion_factor + da.encoding["_FillValue"] = None da = da.raster.gdal_compliant() @@ -433,84 +399,86 @@ def setup_hazard( # check maps return periods da_rp = check_maps_rp(params, da, da_name, idx, risk_output) - # chek if maps are unique - # TODO: check if providing methods like self.get_config can be used - # TODO: create a new funtion to check uniqueness trhough files names - # check_maps_uniquenes(self.get_config,self.staticmaps,params,da,da_map_fn,da_name,da_type,da_rp,idx) + if risk_output and da_map_fn.stem == "sfincs_map": + da_name = da_name + f"_{str(da_rp)}" post = f"(rp {da_rp})" if risk_output else "" self.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}") rp_list.append(da_rp) - - # If a risk calculation is required and the map comes from sfincs, they - # have the same name so give another name - if risk_output and da_map_fn.stem == "sfincs_map": - da_name = da_name + f"_{str(da_rp)}" map_name_lst.append(da_name) + + da.attrs = { + "returnperiod": str(da_rp), + "type": da_type, + "name": da_name, + "analysis": "event", + } + + da = da.to_dataset(name= da_name) + self.set_maps(da, da_name) check_map_uniqueness(map_name_lst) - # in case risk_output is required maps are put in a netcdf with a raster with - # an extra dimension 'rp' accounting for return period - # select first risk maps + + # in case of risk analysis, create a single netcdf with multibans per rp if risk_output: - list_keys = list(self.maps.keys()) - first_map = self.maps[list_keys[0]].rename("risk_datarray") - list_keys.pop(0) - - # add additional risk maps - for idx, x in enumerate(list_keys): - key_name = list_keys[idx] - layer = self.maps[key_name] - first_map = xr.concat([first_map, layer], dim="rp") - - # convert to a dataset to be able to write attributes when writing the maps - # in the ouput folders. If datarray is provided attributes will not be - # shown in the output netcdf dataset - da = first_map.to_dataset(name="risk_maps") - da.attrs = { - "returnperiod": list(rp_list), - "type": params["map_type_lst"], - "name": map_name_lst, - "Analysis": "risk", + + da, sorted_rp, sorted_names = create_risk_dataset(params, rp_list, map_name_lst, self.maps) + + self.set_grid(da) + + self.grid.attrs = { + "rp": sorted_rp, + "type": params["map_type_lst"], #TODO: This parameter has to be changed in case that a list with different hazard types per map is provided + "name": sorted_names, + "analysis": "risk", } - # load merged map into self.maps - self.set_maps(da) + list_maps = list(self.maps.keys()) - # erase individual maps from self.maps keeping the merged map - for item in list_maps[:-1]: + for item in list_maps[:]: self.maps.pop(item) - self.set_config("hazard.return_periods", rp_list) + # set configuration .toml file + self.set_config("hazard.return_periods", + str(da_rp) if not risk_output else sorted_rp + ) - # the metadata of the hazard maps is saved in the configuration toml files - # this component was modified to provided the element [0] od the list - # in case multiple maps are required then remove [0] self.set_config( "hazard.file", [ str(Path("hazard") / (hazard_map + ".nc")) - for hazard_map in self.maps.keys() + for hazard_map in self.maps.keys() + ][0] if not risk_output else + [ + str(Path("hazard") / ("risk_map" + ".nc")) ][0], ) self.set_config( "hazard.crs", [ - "EPSG:" + str((self.maps[hazard_map].rio.crs.to_epsg())) + "EPSG:" + str((self.maps[hazard_map].raster.crs.to_epsg())) for hazard_map in self.maps.keys() - ][0], + ][0] if not risk_output else + [ + "EPSG:" + str((self.crs.to_epsg())) + ][0] + , ) self.set_config( - "hazard.elevation_reference", "dem" if da_type == "water_depth" else "datum" + "hazard.elevation_reference", + "dem" if da_type == "water_depth" else "datum" ) # Set the configurations for a multiband netcdf self.set_config( "hazard.settings.subset", - [(self.maps[hazard_map].name) for hazard_map in self.maps.keys()][0], + [ + (self.maps[hazard_map].name) + for hazard_map in self.maps.keys() + ][0] if not risk_output else sorted_rp, ) self.set_config( @@ -742,6 +710,8 @@ def write(self): self.write_config() if self.maps: self.write_maps(fn="hazard/{name}.nc") + if self.grid: + self.write_grid(fn="hazard/risk_map.nc") if self.geoms: self.write_geoms(fn="exposure/{name}.gpkg", driver="GPKG") if self._tables: diff --git a/hydromt_fiat/validation.py b/hydromt_fiat/validation.py index e1c9b20d..85bbfcdb 100644 --- a/hydromt_fiat/validation.py +++ b/hydromt_fiat/validation.py @@ -9,97 +9,6 @@ def check_dir_exist(dir, name=None): f"The directory indicated by the '{name}' parameter does not exist." ) - -def check_file_exist(root, param_lst, name=None): - root = Path(root) - param_lst = [Path(p) for p in param_lst] - for param_idx, param in enumerate(param_lst): - if isinstance(param, dict): - fn_lst = list(param.values()) - else: - fn_lst = [param] - for fn_idx, fn in enumerate(fn_lst): - if not Path(fn).is_file(): - if root.joinpath(fn).is_file(): - if isinstance(param, dict): - param_lst[param_idx][ - list(param.keys())[fn_idx] - ] = root.joinpath(fn) - else: - param_lst[param_idx] = root.joinpath(fn) - else: - if isinstance(param, dict): - param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn) - else: - param_lst[param_idx] = Path(fn) - try: - if isinstance(param, dict): - assert isinstance( - param_lst[param_idx][list(param.keys())[fn_idx]], Path - ) - else: - assert isinstance(param_lst[param_idx], Path) - except AssertionError: - raise TypeError( - f"The file indicated by the '{name}' parameter does not" - f" exist in the directory '{root}'." - ) - - -# TODO: Improve this tool without calling model.get_congif(input_dir) -# def check_file_exist(get_config, root, param_lst, name=None, input_dir=None): -# root = Path(root) -# param_lst = [Path(p) for p in param_lst] -# for param_idx, param in enumerate(param_lst): -# if isinstance(param, dict): -# fn_lst = list(param.values()) -# else: -# fn_lst = [param] -# for fn_idx, fn in enumerate(fn_lst): -# if not Path(fn).is_file(): -# if root.joinpath(fn).is_file(): -# if isinstance(param, dict): -# param_lst[param_idx][ -# list(param.keys())[fn_idx] -# ] = root.joinpath(fn) -# else: -# param_lst[param_idx] = root.joinpath(fn) -# if input_dir is not None: -# if get_config(input_dir).joinpath(fn).is_file(): -# if isinstance(param, dict): -# param_lst[param_idx][ -# list(param.keys())[fn_idx] -# ] = get_config(input_dir).joinpath(fn) -# else: -# param_lst[param_idx] = get_config( -# input_dir -# ).joinpath(fn) -# else: -# if isinstance(param, dict): -# param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn) -# else: -# param_lst[param_idx] = Path(fn) -# try: -# if isinstance(param, dict): -# assert isinstance( -# param_lst[param_idx][list(param.keys())[fn_idx]], Path -# ) -# else: -# assert isinstance(param_lst[param_idx], Path) -# except AssertionError: -# if input_dir is None: -# raise TypeError( -# f"The file indicated by the '{name}' parameter does not" -# f" exist in the directory '{root}'." -# ) -# else: -# raise TypeError( -# f"The file indicated by the '{name}' parameter does not" -# f" exist in either of the directories '{root}' or " -# f"'{get_config(input_dir)}'." -# ) - - def check_uniqueness(map_name_lst): def check_duplicates(lst): unique_elements = set() @@ -114,51 +23,6 @@ def check_duplicates(lst): if check: raise ValueError(f"The filenames of the hazard maps should be unique.") - -# TODO: Improve this tool without calling model. Just checking the maps names -# def check_uniqueness(model, *args, file_type=None, filename=None): -# """ """ - -# args = list(args) -# if len(args) == 1 and "." in args[0]: -# args = args[0].split(".") + args[1:] -# branch = args.pop(-1) -# for key in args[::-1]: -# branch = {key: branch} - -# if model.get_config(args[0], args[1]): -# for key in model.staticmaps.data_vars: -# if filename == key: -# raise ValueError( -# f"The filenames of the {file_type} maps should be unique." -# ) -# if ( -# model.get_config(args[0], args[1], key) -# == list(branch[args[0]][args[1]].values())[0] -# ): -# raise ValueError(f"Each model input layers must be unique.") - - -def check_param_type(param, name=None, types=None): - """ """ - - if not isinstance(param, list): - raise TypeError( - f"The '{name}_lst' parameter should be a of {list}, received a " - f"{type(param)} instead." - ) - for i in param: - if not isinstance(i, types): - if isinstance(types, tuple): - types = " or ".join([str(j) for j in types]) - else: - types = types - raise TypeError( - f"The '{name}' parameter should be a of {types}, received a " - f"{type(i)} instead." - ) - - def get_param(param_lst, map_fn_lst, file_type, filename, i, param_name): """ """ diff --git a/hydromt_fiat/workflows/hazard.py b/hydromt_fiat/workflows/hazard.py index b814c647..39a362d9 100644 --- a/hydromt_fiat/workflows/hazard.py +++ b/hydromt_fiat/workflows/hazard.py @@ -14,7 +14,7 @@ import geopandas as gpd -def check_parameters_type( +def create_lists( map_fn: Union[str, Path, list[str], list[Path]], map_type: Union[str, list[str]], rp: Union[int, list[int], None] = None, @@ -23,7 +23,7 @@ def check_parameters_type( var: Union[str, list[str], None] = None, chunks: Union[int, str, list[int]] = "auto", ) -> dict: - """Check data type of parameters and save them as list items. + """Make list out of the parameters provided in the setup hazard maps. Parameters ---------- @@ -71,28 +71,28 @@ def check_parameters_type( params["nodata"] = nodata params["var"] = var - def validate_type(dictionary, param, name, types): - params_lst = [param] if isinstance(param, types) else param - check_param_type(params_lst, name=name, types=types) - dictionary[name + "_lst"] = params_lst + def check_list(param, name): + params_lst = [param] if not isinstance(param, list) else param + params[name + "_lst"] = params_lst return + + check_list(map_fn, name="map_fn") + check_list(map_type, name="map_type") - validate_type(params, map_fn, name="map_fn", types=(str, Path)) - validate_type(params, map_type, name="map_type", types=str) if chunks != "auto": - validate_type(params, chunks, name="chunks", types=(int, dict)) + check_list(chunks, name="chunks") if rp is not None: - validate_type(params, rp, name="rp", types=(float, int)) + check_list(rp, name="rp") if crs is not None: - validate_type(params, crs, name="crs", types=(int, str)) + check_list(crs, name="crs") if nodata is not None: - validate_type(params, nodata, name="nodata", types=(float, int)) + check_list(nodata, name="nodata") if var is not None: - validate_type(params, var, name="var", types=str) + check_list(var, name="var") return params -def check_parameters_size( +def check_lists_size( params: dict, ): """Check that list of parameters are of the same size in case multiple maps are @@ -170,29 +170,7 @@ def error_message(variable_list): ): error_message("var") - -def check_files( - params: dict, - root: str, -): - """Check if the provided files paths exists. I will raise an error in case the - flood maps does not exist - - Parameters - ---------- - params : dict - Dictionary with the parameters and list of parameters used in setup_hazard. - root : str - The root directory of the model. - """ - # load dictionary variables - map_fn_lst = params["map_fn_lst"] - - # Check if the hazard input files exist. - check_file_exist(root, param_lst=map_fn_lst, name="map_fn") - - -def read_floodmaps( +def read_maps( params: dict, da_map_fn: str, idx: int, @@ -230,24 +208,36 @@ def read_floodmaps( map_fn_lst = params["map_fn_lst"] map_type_lst = params["map_type_lst"] - # Check if it is a path or a name from the catalog + # check existance of path if os.path.exists(da_map_fn): da_map_fn = Path(da_map_fn) - da_name = da_map_fn.stem + da_name = da_map_fn.stem da_suffix = da_map_fn.suffix else: - da_name = da_map_fn + raise ValueError( + f"The map {da_map_fn} could not be found." + ) - da_type = get_param(map_type_lst, map_fn_lst, "hazard", da_name, idx, "map type") + # retrieve data type + da_type = get_param( + map_type_lst, + map_fn_lst, + "hazard", + da_name, + idx, + "map type" + ) - # Get the local hazard map. + # get chuck area for the map kwargs.update(chunks=chunks if chunks == "auto" else params["chunks_lst"][idx]) - if "da_suffix" in locals() and da_suffix == ".nc": + # check if we are providing a NetCDF file + if da_suffix == ".nc": if var is None: raise ValueError( "The 'var' parameter is required when reading NetCDF data." ) + # retrieve variable name from parameter lists da_var = get_param( params["var_lst"], map_fn_lst, @@ -261,120 +251,6 @@ def read_floodmaps( return da_map_fn, da_name, da_type -def load_floodmaps( - data_catalog: DataCatalog, - region: gpd.GeoDataFrame, - da_map_fn: Union[str, Path], - da_name: str, - name_catalog: str = "flood_maps", - **kwargs, -) -> xr.DataArray: - """Load flood maps in memory from datacatalog or a local path - - Parameters - ---------- - data_catalog : DataCatalog - Data catalog object from model. - region : gpd.GeoDataFrame - Region of the model. - da_map_fn : Union[str, Path] - Path as string or key name in datacatalog of the hazard a specific hazard - map idx. - da_name : str - File name of a specific hazard map. - name_catalog : str, optional - Name of data catalog item to take the flood maps from, by default "flood_maps". - - Returns - ------- - xr.DataArray - Hazard map to be loaded to the model's maps - """ - - # reading from path - if da_map_fn.stem: - if da_map_fn.stem == "sfincs_map": - sfincs_root = os.path.dirname(da_map_fn) - sfincs_model = SfincsModel(sfincs_root, mode="r") - sfincs_model.read_results() - # result_list = list(sfincs_model.results.keys()) - # sfincs_model.write_raster("results.zsmax", compress="LZW") - da = sfincs_model.results["zsmax"] - da.encoding["_FillValue"] = None - else: - if not region.empty: - da = data_catalog.get_rasterdataset(da_map_fn, geom=region, **kwargs) - else: - da = data_catalog.get_rasterdataset(da_map_fn, **kwargs) - # reading from the datacatalog - else: - if not region.empty: - da = data_catalog.get_rasterdataset( - name_catalog, variables=da_name, geom=region - ) - else: - da = data_catalog.get_rasterdataset(name_catalog, variables=da_name) - - return da - - -# def load_floodmaps( -# data_catalog: DataCatalog, -# region: gpd.GeoDataFrame, -# da_map_fn: Union[str, Path], -# da_name: str, -# name_catalog: str = "flood_maps", -# **kwargs, -# ) -> xr.DataArray: -# """Load flood maps in memory from datacatalog or a local path - -# Parameters -# ---------- -# data_catalog : DataCatalog -# Data catalog object from model. -# region : gpd.GeoDataFrame -# Region of the model. -# da_map_fn : Union[str, Path] -# Path as string or key name in datacatalog of the hazard a specific hazard -# map idx. -# da_name : str -# File name of a specific hazard map. -# name_catalog : str, optional -# Name of data catalog item to take the flood maps from, by default "flood_maps". - -# Returns -# ------- -# xr.DataArray -# Hazard map to be loaded to the model's maps -# """ - -# # reading from path -# if da_map_fn.stem: -# if da_map_fn.stem == "sfincs_map": -# sfincs_root = os.path.dirname(da_map_fn) -# sfincs_model = SfincsModel(sfincs_root, mode="r") -# sfincs_model.read_results() -# # result_list = list(sfincs_model.results.keys()) -# # sfincs_model.write_raster("results.zsmax", compress="LZW") -# da = sfincs_model.results["zsmax"] -# da.encoding["_FillValue"] = None -# else: -# if not region.empty: -# da = data_catalog.get_rasterdataset(da_map_fn, geom=region, **kwargs) -# else: -# da = data_catalog.get_rasterdataset(da_map_fn, **kwargs) -# # reading from the datacatalog -# else: -# if not region.empty: -# da = data_catalog.get_rasterdataset( -# name_catalog, variables=da_name, geom=region -# ) -# else: -# da = data_catalog.get_rasterdataset(name_catalog, variables=da_name) - -# return da - - def check_maps_metadata( maps: xr.Dataset, params: dict, @@ -429,7 +305,12 @@ def check_maps_metadata( # Set nodata and mask the nodata value. if nodata is not None: da_nodata = get_param( - params["nodata_lst"], map_fn_lst, "hazard", da_name, idx, "nodata" + params["nodata_lst"], + map_fn_lst, + "hazard", + da_name, + idx, + "nodata" ) da.raster.set_nodata(nodata=da_nodata) elif nodata is None and da.raster.nodata is None: @@ -499,11 +380,8 @@ def check_maps_rp( else: da_rp = None - if risk_output: - da = da.expand_dims({"rp": [da_rp]}, axis=0) - if risk_output and da_rp is None: - # Get (if possible) the return period from dataset names if the input parameter is None. + # get (if possible) the return period from dataset names if the input parameter is None. if "rp" in da_name.lower(): def fstrip(x): @@ -527,7 +405,7 @@ def fstrip(x): raise ValueError( "The hazard map must contain a return period in order to conduct a risk calculation." ) - + return da_rp @@ -544,156 +422,28 @@ def check_map_uniqueness( check_uniqueness(map_name_lst) -# old version of check_maps_uniquenes -# def check_maps_uniquenes( -# get_config, -# maps: xr.Dataset, -# params: dict, -# da: xr.DataArray, -# da_map_fn: Union[str, Path], -# da_name: str, -# da_type: str, -# da_rp: Union[int, float], -# idx: int, -# ): - -# chunks = params['chunks'] - -# # Add the hazard map to config and staticmaps. -# check_uniqueness( -# get_config, -# maps, -# "hazard", -# da_type, -# da_name, -# { -# "usage": True, -# "map_fn": da_map_fn, -# "map_type": da_type, -# "rp": da_rp, -# "crs": da.raster.crs, -# "nodata": da.raster.nodata, -# # "var": None if "var_lst" not in locals() else self.var_lst[idx], -# "var": None if not 'var_lst' in params else params['var_lst'][idx], -# "chunks": "auto" if chunks == "auto" else params['chunks_lst'][idx], -# }, -# file_type="hazard", -# filename=da_name, -# ) - - -def check_floodmaps( - get_config, - maps, - params, - da, - da_map_fn, - da_name, - da_type, - idx, - risk_output, - **kwargs, -): - map_fn_lst = params["map_fn_lst"] - chunks = params["chunks"] - crs = params["crs"] - nodata = params["nodata"] - - # Set the coordinate reference system. - if crs is not None: - da_crs = get_param( - params["crs_lst"], - map_fn_lst, - "hazard", - da_name, - idx, - "coordinate reference system", - ) - da_crs_str = da_crs if "EPSG" in da_crs else f"EPSG:{da_crs}" - da.raster.set_crs(da_crs_str) - elif crs is None and not da.raster.crs: - raise ValueError("The hazard map has no coordinate reference system assigned.") - - # Set nodata and mask the nodata value. - if nodata is not None: - da_nodata = get_param( - params["nodata_lst"], map_fn_lst, "hazard", da_name, idx, "nodata" - ) - da.raster.set_nodata(nodata=da_nodata) - elif nodata is None and da.raster.nodata is None: - raise ValueError("The hazard map has no nodata value assigned.") - - # Correct (if necessary) the grid orientation from the lower to the upper left corner. - # This check could not be implemented into the sfincs_map outputs. They require to be transformed to geotiff first - # if da_name != "sfincs_map": - if da.raster.res[1] > 0: - da = da.reindex({da.raster.y_dim: list(reversed(da.raster.ycoords))}) - - # Check if the obtained hazard map is identical. - if maps and not maps.raster.identical_grid(da): - raise ValueError("The hazard maps should have identical grids.") - # Get the return period input parameter. - if "rp_lst" in params: - da_rp = get_param( - params["rp_lst"], - map_fn_lst, - "hazard", - da_name, - idx, - "return period", - ) - else: - da_rp = None - - if risk_output: - da = da.expand_dims({"rp": [da_rp]}, axis=0) - - if risk_output and da_rp is None: - # Get (if possible) the return period from dataset names if the input parameter is None. - if "rp" in da_name.lower(): - - def fstrip(x): - return x in "0123456789." - - rp_str = "".join(filter(fstrip, da_name.lower().split("rp")[-1])).lstrip( - "0" - ) - - try: - assert isinstance( - literal_eval(rp_str) if rp_str else None, (int, float) - ) - da_rp = literal_eval(rp_str) - - except AssertionError: - raise ValueError( - f"Could not derive the return period for hazard map: {da_name}." - ) - else: - raise ValueError( - "The hazard map must contain a return period in order to conduct a risk calculation." - ) - - # Add the hazard map to config and staticmaps. - check_uniqueness( - get_config, +def create_risk_dataset( + params: dict, + rp_list: list, + map_name_lst: list, maps, - "hazard", - da_type, - da_name, - { - "usage": True, - "map_fn": da_map_fn, - "map_type": da_type, - "rp": da_rp, - "crs": da.raster.crs, - "nodata": da.raster.nodata, - # "var": None if "var_lst" not in locals() else self.var_lst[idx], - "var": None if "var_lst" not in params else params["var_lst"][idx], - "chunks": "auto" if chunks == "auto" else params["chunks_lst"][idx], - }, - file_type="hazard", - filename=da_name, - ) - return da_rp +): + # order return periods and maps + dict_rp_name = {} + for rp, name in zip(rp_list, map_name_lst): + dict_rp_name[rp] = name + sorted_rp = sorted(rp_list, reverse=False) + dict_rp_name = {key: dict_rp_name[key] for key in sorted_rp} + + sorted_maps = [] + sorted_names = [] + + for key, value in dict_rp_name.items(): + map_ordered = maps[value].rename(str(key)) + sorted_maps.append(map_ordered) + sorted_names.append(value) + + da = xr.merge(sorted_maps) + + return da, sorted_rp, sorted_names \ No newline at end of file diff --git a/tests/test_integrations_hazard.py b/tests/test_integrations_hazard.py index 53af79f6..72d931cf 100644 --- a/tests/test_integrations_hazard.py +++ b/tests/test_integrations_hazard.py @@ -1,110 +1,144 @@ from hydromt_fiat.fiat import FiatModel -from hydromt.config import configread from hydromt.log import setuplog from pathlib import Path import pytest import shutil -import os EXAMPLEDIR = Path( "P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database" ) _cases = { - "integration": { + "event_map": { "data_catalogue": EXAMPLEDIR / "fiat_catalog.yml", - "dir": "test_hazard", - "ini": EXAMPLEDIR / "test_hazard_unique.ini", + "dir": "test_event_map", + "configuration": { + "setup_hazard": { + "map_fn": [ + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\RP_1_maps.nc", + ], + "map_type": "water_depth", + "rp": None, + "crs": None, + "nodata": -99999, + "var": "zsmax", + "risk_output": False, + } + } }, -} - - -@pytest.mark.skip(reason="Hazard functions not yet finalized") -@pytest.mark.parametrize("case", list(_cases.keys())) -def test_hazard(case): - # Read model in examples folder. - root = EXAMPLEDIR.joinpath(_cases[case]["dir"]) - if root.exists: - shutil.rmtree(root) - - # uncomment to test event analysis from geotiff file - configuration = { + "risk_map": { + "data_catalogue": EXAMPLEDIR / "fiat_catalog.yml", + "dir": "test_risk_map", + "configuration": { "setup_hazard": { "map_fn": [ - "P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database/data/Hazard/Current_prob_event_set_combined_doNothing_withSeaWall_RP=1_max_flood_depth.tif" + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\RP_1_maps.nc", + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\RP_50_maps.nc", + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\RP_10_maps.nc", + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\RP_100_maps.nc", ], "map_type": "water_depth", "rp": None, "crs": None, "nodata": -99999, + "var": "zsmax", + "risk_output": True, + } + } + }, + + "event_map_geotiffs": { + "data_catalogue": EXAMPLEDIR / "fiat_catalog.yml", + "dir": "test_event_map_geotiffs", + "configuration": { + "setup_hazard": { + "map_fn": [ + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\Swell_Majuro_case_SW_slr_100_RP1_Final.tif", + ], + "map_type": "water_depth", + "rp": None, + "crs": None, + "nodata": None, "var": None, - "chunks": "auto", - "name_catalog": None, "risk_output": False, } } + }, - # uncomment to test risk analysis from geotiff file - # configuration = { - # "setup_hazard": { - # "map_fn": ["P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database/data/Hazard/Current_prob_event_set_combined_doNothing_withSeaWall_RP=1_max_flood_depth.tif", "P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database/data/Hazard/Current_prob_event_set_combined_doNothing_withSeaWall_RP=2_max_flood_depth.tif", "P:/11207949-dhs-phaseii-floodadapt/Model-builder/Delft-FIAT/local_test_database/data/Hazard/Current_prob_event_set_combined_doNothing_withSeaWall_RP=5_max_flood_depth.tif"], - # "map_type": "water_depth", - # "rp": None, - # "crs": None, - # "nodata": -99999, - # "var": None, - # "chunks": "auto", - # "name_catalog": None, - # "risk_output": True, - # } - # } - - # for these test data sfincs output data is required in local files - # uncomment to test event analysis from sfincs output - # mode = "single" - # map_path = Path("C:/Users/fuentesm/CISNE/Deltares/FloodAdapt/tests/test_database/charleston/output/simulations/current_extreme12ft_no_measures/overland") - - # uncomment to test risk analysis from sfincs outputs - # mode = "risk" - # map_path = Path("C:/Users/fuentesm/CISNE/Deltares/FloodAdapt/tests/test_database/charleston/output/simulations/current_test_set_no_measures/") + "risk_map_geotiffs": { + "data_catalogue": EXAMPLEDIR / "fiat_catalog.yml", + "dir": "test_risk_map_geotiffs", + "configuration": { + "setup_hazard": { + "map_fn": [ + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\Swell_Majuro_case_SW_slr_100_RP1_Final.tif", + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\Swell_Majuro_case_SW_slr_100_RP10_Final.tif", + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\Swell_Majuro_case_SW_slr_100_RP50_Final.tif", + ], + "map_type": "water_depth", + "rp": None, + "crs": None, + "nodata": None, + "var": None, + "risk_output": True, + } + } + }, - # map_fn = [] + "event_map_geotiff2": { + "data_catalogue": EXAMPLEDIR / "fiat_catalog.yml", + "dir": "test_event_map_geotiff2", + "configuration": { + "setup_hazard": { + "map_fn": [ + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\kingTide_SLR_max_flood_depth.tif", + ], + "map_type": "water_depth", + "rp": None, + "crs": None, + "nodata": None, + "var": None, + "risk_output": False, + } + } + }, - # if mode == "risk": - # # check for netcdf - # for file in os.listdir(str(map_path)): - # if file.endswith("_maps.nc"): - # map_fn.append(map_path.joinpath(file)) - # risk_output = True - # var = "risk_map" + "risk_map_geotiff2": { + "data_catalogue": EXAMPLEDIR / "fiat_catalog.yml", + "dir": "test_risk_map_geotiff2", + "configuration": { + "setup_hazard": { + "map_fn": [ + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\Current_prob_event_set_combined_doNothing_withSeaWall_RP=2_max_flood_depth.tif", + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\Current_prob_event_set_combined_doNothing_withSeaWall_RP=10_max_flood_depth.tif", + r"P:\11207949-dhs-phaseii-floodadapt\Model-builder\Delft-FIAT\local_test_database\test_RP_floodmaps\Current_prob_event_set_combined_doNothing_withSeaWall_RP=100_max_flood_depth.tif", + ], + "map_type": "water_depth", + "rp": None, + "crs": None, + "nodata": None, + "var": None, + "risk_output": True, + } + } + }, +} - # elif mode == "single": - # map_fn.append(map_path.joinpath("sfincs_map.nc")) - # risk_output = False - # var = "zsmax" - # configuration = { - # "setup_hazard": { - # "map_fn": map_fn, # absolute or relative (with respect to the configuration.ini) path to the hazard file - # "map_type": "water_depth", # description of the hazard file type - # "rp": None, # hazard return period in years, required for a risk calculation (optional) - # "crs": None, # coordinate reference system of the hazard file (optional) - # "nodata": -999, # value that is assigned as nodata (optional) - # "var": var, # hazard variable name in NetCDF input files (optional) - # "chunks": "auto", # chunk sizes along each dimension used to load the hazard file into a dask array (default is 'auto') (optional) - # "name_catalog": None, - # "risk_output": risk_output, - # } - # } +@pytest.mark.parametrize("case", list(_cases.keys())) +def test_hazard(case): + # Read model in examples folder. + root = EXAMPLEDIR.joinpath(_cases[case]["dir"]) + if root.exists(): + shutil.rmtree(root) logger = setuplog("hydromt_fiat", log_level=10) data_catalog_yml = str(_cases[case]["data_catalogue"]) fm = FiatModel(root=root, mode="w", data_libs=[data_catalog_yml], logger=logger) region = fm.data_catalog.get_geodataframe("region", variables=None) - # opt = configread(_cases[case]["ini"]) - # fm.build(region={"geom": region}, opt=opt) - fm.build(region={"geom": region}, opt=configuration) + + fm.build(region={"geom": region}, opt=_cases[case]["configuration"]) fm.write() # Check if the hazard folder exists