Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(object-model): Add support for user provided netcdf forcing files #626

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
42 changes: 31 additions & 11 deletions flood_adapt/adapter/sfincs_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
RainfallConstant,
RainfallCSV,
RainfallMeteo,
RainfallNetCDF,
RainfallSynthetic,
RainfallTrack,
)
Expand All @@ -52,6 +53,7 @@
from flood_adapt.object_model.hazard.forcing.wind import (
WindConstant,
WindMeteo,
WindNetCDF,
WindSynthetic,
WindTrack,
)
Expand Down Expand Up @@ -339,6 +341,10 @@ def add_projection(self, projection: IProjection):
)

### GETTERS ###
def get_model_time(self) -> TimeModel:
t0, t1 = self._model.get_model_time()
return TimeModel(start_time=t0, end_time=t1)

def get_model_root(self) -> Path:
return Path(self._model.root)

Expand Down Expand Up @@ -909,8 +915,7 @@ def _add_forcing_wind(
const_dir : float, optional
direction of time-invariant wind forcing [deg], by default None
"""
t0, t1 = self._model.get_model_time()

time_frame = self.get_model_time()
if isinstance(wind, WindConstant):
# HydroMT function: set wind forcing from constant magnitude and direction
self._model.setup_wind_forcing(
Expand All @@ -919,7 +924,7 @@ def _add_forcing_wind(
direction=wind.direction.value,
)
elif isinstance(wind, WindSynthetic):
df = wind.to_dataframe(time_frame=TimeModel(start_time=t0, end_time=t1))
df = wind.to_dataframe(time_frame=time_frame)
df["mag"] *= us.UnitfulVelocity(
value=1.0, units=Settings().unit_system.velocity
).convert(us.UnitTypesVelocity.mps)
Expand All @@ -932,7 +937,7 @@ def _add_forcing_wind(
timeseries=tmp_path, magnitude=None, direction=None
)
elif isinstance(wind, WindMeteo):
ds = MeteoHandler().read(TimeModel(start_time=t0, end_time=t1))
ds = MeteoHandler().read(time_frame)
# data already in metric units so no conversion needed

# HydroMT function: set wind forcing from grid
Expand All @@ -942,6 +947,14 @@ def _add_forcing_wind(
raise ValueError("No path to rainfall track file provided.")
# data already in metric units so no conversion needed
self._add_forcing_spw(wind.path)
elif isinstance(wind, WindNetCDF):
ds = wind.read()
# time slicing to time_frame not needed, hydromt-sfincs handles it
conversion = us.UnitfulVelocity(value=1.0, units=wind.unit).convert(
us.UnitTypesVelocity.mps
)
ds *= conversion
self._model.setup_wind_forcing_from_grid(wind=ds)
else:
self.logger.warning(
f"Unsupported wind forcing type: {wind.__class__.__name__}"
Expand All @@ -958,8 +971,7 @@ def _add_forcing_rain(self, rainfall: IRainfall):
const_intensity : float, optional
time-invariant precipitation intensity [mm_hr], by default None
"""
t0, t1 = self._model.get_model_time()
time_frame = TimeModel(start_time=t0, end_time=t1)
time_frame = self.get_model_time()
if isinstance(rainfall, RainfallConstant):
self._model.setup_precip_forcing(
timeseries=None,
Expand All @@ -984,13 +996,22 @@ def _add_forcing_rain(self, rainfall: IRainfall):
self._model.setup_precip_forcing(timeseries=tmp_path)
elif isinstance(rainfall, RainfallMeteo):
ds = MeteoHandler().read(time_frame)
# data already in metric units so no conversion needed
# MeteoHandler always return metric so no conversion needed
ds["precip"] *= self._current_scenario.event.attrs.rainfall_multiplier
self._model.setup_precip_forcing_from_grid(precip=ds, aggregate=False)
elif isinstance(rainfall, RainfallTrack):
if rainfall.path is None:
raise ValueError("No path to rainfall track file provided.")
# data already in metric units so no conversion needed
self._add_forcing_spw(rainfall.path)
elif isinstance(rainfall, RainfallNetCDF):
ds = rainfall.read()
# time slicing to time_frame not needed, hydromt-sfincs handles it
conversion = us.UnitfulIntensity(value=1.0, units=rainfall.unit).convert(
us.UnitTypesIntensity.mm_hr
)
ds *= self._current_scenario.event.attrs.rainfall_multiplier * conversion
self._model.setup_precip_forcing_from_grid(precip=ds, aggregate=False)
else:
self.logger.warning(
f"Unsupported rainfall forcing type: {rainfall.__class__.__name__}"
Expand All @@ -1015,8 +1036,7 @@ def _add_forcing_discharge(self, forcing: IDischarge):
)

def _add_forcing_waterlevels(self, forcing: IWaterlevel):
t0, t1 = self._model.get_model_time()
time_frame = TimeModel(start_time=t0, end_time=t1)
time_frame = self.get_model_time()
if isinstance(forcing, WaterlevelSynthetic):
df_ts = forcing.to_dataframe(time_frame=time_frame)
conversion = us.UnitfulLength(
Expand Down Expand Up @@ -1222,8 +1242,8 @@ def _set_single_river_forcing(self, discharge: IDischarge):
return

self.logger.info(f"Setting discharge forcing for river: {discharge.river.name}")
t0, t1 = self._model.get_model_time()
time_frame = TimeModel(start_time=t0, end_time=t1)

time_frame = self.get_model_time()
model_rivers = self._read_river_locations()

# Check that the river is defined in the model and that the coordinates match
Expand Down
11 changes: 5 additions & 6 deletions flood_adapt/dbs_classes/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from cht_cyclones.tropical_cyclone import TropicalCyclone
from geopandas import GeoDataFrame
from plotly.express import line
from plotly.express.colors import sample_colorscale
from xarray import open_dataarray, open_dataset

from flood_adapt.dbs_classes.dbs_benefit import DbsBenefit
from flood_adapt.dbs_classes.dbs_event import DbsEvent
Expand Down Expand Up @@ -484,17 +484,16 @@ def get_max_water_level(
"Flooding",
"max_water_level_map.nc",
)
map = open_dataarray(map_path)

zsmax = map.to_numpy()

with xr.open_dataarray(map_path) as map:
zsmax = map.to_numpy()
else:
file_path = self.scenarios.output_path.joinpath(
scenario_name,
"Flooding",
f"RP_{return_period:04d}_maps.nc",
)
zsmax = open_dataset(file_path)["risk_map"][:, :].to_numpy().T
with xr.open_dataset(file_path) as ds:
zsmax = ds["risk_map"][:, :].to_numpy().T
return zsmax

def get_fiat_footprints(self, scenario_name: str) -> GeoDataFrame:
Expand Down
2 changes: 1 addition & 1 deletion flood_adapt/object_model/hazard/forcing/discharge.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def to_dataframe(self, time_frame: TimeModel) -> pd.DataFrame:

def save_additional(self, output_dir: Path | str | os.PathLike) -> None:
if self.path:
output_dir = Path(output_dir)
output_dir = Path(output_dir).resolve()
if self.path == output_dir / self.path.name:
return
output_dir.mkdir(parents=True, exist_ok=True)
Expand Down
40 changes: 40 additions & 0 deletions flood_adapt/object_model/hazard/forcing/netcdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import numpy as np
import pandas as pd
import xarray as xr


@staticmethod
def validate_netcdf_forcing(
ds: xr.Dataset, required_vars: tuple[str, ...], required_coords: tuple[str, ...]
) -> xr.Dataset:
"""Validate a forcing dataset by checking for required variables and coordinates."""
# Check variables
_required_vars = set(required_vars)
if not _required_vars.issubset(ds.data_vars):
missing_vars = _required_vars - set(ds.data_vars)
raise ValueError(
f"Missing required variables for netcdf forcing: {missing_vars}"
)

# Check coordinates
_required_coords = set(required_coords)
if not _required_coords.issubset(ds.coords):
missing_coords = _required_coords - set(ds.coords)
raise ValueError(
f"Missing required coordinates for netcdf forcing: {missing_coords}"
)

# Check time step
ts = pd.to_timedelta(np.diff(ds.time).mean())
if ts < pd.to_timedelta("1H"):
raise ValueError(
f"SFINCS NetCDF forcing time step cannot be less than 1 hour: {ts}"
)

for var in ds.data_vars:
# Check order of dimensions
if ds[var].dims != required_coords:
raise ValueError(
f"Order of dimensions for variable {var} must be {required_coords}"
)
return ds
30 changes: 28 additions & 2 deletions flood_adapt/object_model/hazard/forcing/rainfall.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from typing import Optional

import pandas as pd
import xarray as xr
from pydantic import Field

from flood_adapt.object_model.hazard.forcing.netcdf import validate_netcdf_forcing
from flood_adapt.object_model.hazard.forcing.timeseries import (
CSVTimeseries,
SyntheticTimeseries,
Expand Down Expand Up @@ -60,7 +62,7 @@ class RainfallTrack(IRainfall):

def save_additional(self, output_dir: Path | str | os.PathLike) -> None:
if self.path:
output_dir = Path(output_dir)
output_dir = Path(output_dir).resolve()
if self.path == output_dir / self.path.name:
return
output_dir.mkdir(parents=True, exist_ok=True)
Expand All @@ -81,9 +83,33 @@ def to_dataframe(self, time_frame: TimeModel) -> pd.DataFrame:

def save_additional(self, output_dir: Path | str | os.PathLike) -> None:
if self.path:
output_dir = Path(output_dir)
output_dir = Path(output_dir).resolve()
if self.path == output_dir / self.path.name:
return
output_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(self.path, output_dir)
self.path = output_dir / self.path.name


class RainfallNetCDF(IRainfall):
source: ForcingSource = ForcingSource.NETCDF
unit: us.UnitTypesIntensity = us.UnitTypesIntensity.mm_hr

path: Path

def read(self) -> xr.Dataset:
required_vars = ("precip",)
required_coords = ("time", "lat", "lon")
with xr.open_dataset(self.path) as ds:
validated_ds = validate_netcdf_forcing(ds, required_vars, required_coords)
return validated_ds

def save_additional(self, output_dir: Path | str | os.PathLike) -> None:
if self.path:
output_dir = Path(output_dir).resolve()
if self.path == output_dir / self.path.name:
return
output_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(self.path, output_dir)
self.path = output_dir / self.path.name

2 changes: 1 addition & 1 deletion flood_adapt/object_model/hazard/forcing/waterlevels.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def to_dataframe(self, time_frame: TimeModel) -> pd.DataFrame:

def save_additional(self, output_dir: Path | str | os.PathLike) -> None:
if self.path:
output_dir = Path(output_dir)
output_dir = Path(output_dir).resolve()
if self.path == output_dir / self.path.name:
return
output_dir.mkdir(parents=True, exist_ok=True)
Expand Down
30 changes: 28 additions & 2 deletions flood_adapt/object_model/hazard/forcing/wind.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from typing import Optional

import pandas as pd
import xarray as xr
from pydantic import Field

from flood_adapt.object_model.hazard.forcing.netcdf import validate_netcdf_forcing
from flood_adapt.object_model.hazard.forcing.timeseries import SyntheticTimeseries
from flood_adapt.object_model.hazard.interface.forcing import (
ForcingSource,
Expand Down Expand Up @@ -75,7 +77,7 @@ class WindTrack(IWind):

def save_additional(self, output_dir: Path | str | os.PathLike) -> None:
if self.path:
output_dir = Path(output_dir)
output_dir = Path(output_dir).resolve()
if self.path == output_dir / self.path.name:
return
output_dir.mkdir(parents=True, exist_ok=True)
Expand All @@ -94,7 +96,7 @@ def to_dataframe(self, time_frame: TimeModel) -> pd.DataFrame:

def save_additional(self, output_dir: Path | str | os.PathLike) -> None:
if self.path:
output_dir = Path(output_dir)
output_dir = Path(output_dir).resolve()
if self.path == output_dir / self.path.name:
return
output_dir.mkdir(parents=True, exist_ok=True)
Expand All @@ -104,3 +106,27 @@ def save_additional(self, output_dir: Path | str | os.PathLike) -> None:

class WindMeteo(IWind):
source: ForcingSource = ForcingSource.METEO


class WindNetCDF(IWind):
source: ForcingSource = ForcingSource.NETCDF
unit: us.UnitTypesVelocity = us.UnitTypesVelocity.mps

path: Path

def read(self) -> xr.Dataset:
required_vars = ("wind10_v", "wind10_u", "press_msl")
required_coords = ("time", "lat", "lon")
with xr.open_dataset(self.path) as ds:
validated_ds = validate_netcdf_forcing(ds, required_vars, required_coords)
return validated_ds

def save_additional(self, output_dir: Path | str | os.PathLike) -> None:
if self.path:
output_dir = Path(output_dir).resolve()
if self.path == output_dir / self.path.name:
return
output_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(self.path, output_dir)
self.path = output_dir / self.path.name

3 changes: 2 additions & 1 deletion flood_adapt/object_model/hazard/interface/forcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class ForcingSource(str, Enum):

MODEL = "MODEL" # 'our' hindcast/ sfincs offshore model
TRACK = "TRACK" # 'our' hindcast/ sfincs offshore model + (shifted) hurricane
CSV = "CSV" # user imported data
CSV = "CSV" # user provided csv file
NETCDF = "NETCDF" # user provided netcdf file

SYNTHETIC = "SYNTHETIC" # synthetic data
CONSTANT = "CONSTANT" # synthetic data
Expand Down
2 changes: 1 addition & 1 deletion flood_adapt/object_model/hazard/interface/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
class TimeModel(BaseModel):
start_time: datetime = REFERENCE_TIME
end_time: datetime = REFERENCE_TIME + timedelta(days=1)
time_step: timedelta = timedelta(seconds=10)
time_step: timedelta = timedelta(minutes=10)

@field_validator("start_time", "end_time", mode="before")
@classmethod
Expand Down
Loading
Loading