Skip to content

Commit

Permalink
Replace usage of arr_to_attr_dict with label in sql query
Browse files Browse the repository at this point in the history
  • Loading branch information
margrietpalm committed Dec 9, 2024
1 parent 43a27cf commit d950630
Showing 1 changed file with 60 additions and 145 deletions.
205 changes: 60 additions & 145 deletions threedigrid_builder/interface/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from contextlib import contextmanager
from enum import Enum
from functools import lru_cache
from typing import Callable, ContextManager, Dict, List, Optional, Union
from typing import Callable, ContextManager, Dict, List, Union

import numpy as np
import shapely
Expand Down Expand Up @@ -94,20 +94,6 @@ def _set_initialization_type(
dct[type_field] = None


def arr_to_attr_dict(
arr: np.ndarray, rename_dict: Optional[Dict[str, str]] = None
) -> Dict[str, str]:
"""
Convert structured array to dict with optional rename of the keys
"""
if rename_dict is None:
rename_dict = {}
return {
rename_dict[name] if name in rename_dict else name: arr[name]
for name in arr.dtype.names
}


def map_cross_section_definition(
objects: List[Union[CrossSectionLocations, Pipes, Weirs, Orifices, Culverts]],
definition_map: Dict[str, Dict[int, int]],
Expand Down Expand Up @@ -425,12 +411,12 @@ def get_channels(self) -> Channels:
"""Return Channels"""
cols = [
models.Channel.geom.label("the_geom"),
models.Channel.calculation_point_distance,
models.Channel.calculation_point_distance.label("dist_calc_points"),
models.Channel.id,
models.Channel.code,
models.Channel.connection_node_id_start,
models.Channel.connection_node_id_end,
models.Channel.exchange_type,
models.Channel.connection_node_id_start.label("connection_node_start_id"),
models.Channel.connection_node_id_end.label("connection_node_end_id"),
models.Channel.exchange_type.label("calculation_type"),
models.Channel.display_name,
models.Channel.exchange_thickness,
models.Channel.hydraulic_conductivity_out,
Expand All @@ -442,22 +428,12 @@ def get_channels(self) -> Channels:

arr["the_geom"] = self.reproject(arr["the_geom"])
# map "old" calculation types (100, 101, 102, 105) to (0, 1, 2, 5)
arr["exchange_type"][arr["exchange_type"] >= 100] -= 100
arr["calculation_type"][arr["calculation_type"] >= 100] -= 100
arr["hydraulic_conductivity_out"] /= DAY_IN_SECONDS
arr["hydraulic_conductivity_in"] /= DAY_IN_SECONDS

attr_dict = arr_to_attr_dict(
arr,
{
"exchange_type": "calculation_type",
"calculation_point_distance": "dist_calc_points",
"connection_node_id_start": "connection_node_start_id",
"connection_node_id_end": "connection_node_end_id",
},
)

# transform to a Channels object
return Channels(**attr_dict)
return Channels(**{name: arr[name] for name in arr.dtype.names})

def get_connection_nodes(self) -> ConnectionNodes:
"""Return ConnectionNodes"""
Expand All @@ -466,12 +442,12 @@ def get_connection_nodes(self) -> ConnectionNodes:
models.ConnectionNode.id,
models.ConnectionNode.code,
models.ConnectionNode.storage_area,
models.ConnectionNode.initial_water_level,
models.ConnectionNode.exchange_type,
models.ConnectionNode.initial_water_level.label("initial_waterlevel"),
models.ConnectionNode.exchange_type.label("calculation_type"),
models.ConnectionNode.bottom_level,
models.ConnectionNode.exchange_level,
models.ConnectionNode.visualisation,
models.ConnectionNode.manhole_surface_level,
models.ConnectionNode.exchange_level.label("drain_level"),
models.ConnectionNode.visualisation.label("manhole_indicator"),
models.ConnectionNode.manhole_surface_level.label("surface_level"),
models.ConnectionNode.display_name,
models.ConnectionNode.exchange_thickness,
models.ConnectionNode.hydraulic_conductivity_out,
Expand All @@ -486,22 +462,11 @@ def get_connection_nodes(self) -> ConnectionNodes:
arr["the_geom"] = self.reproject(arr["the_geom"])

# replace -9999.0 with NaN in initial_water_level
arr["initial_water_level"][arr["initial_water_level"] == -9999.0] = np.nan
arr["initial_waterlevel"][arr["initial_waterlevel"] == -9999.0] = np.nan
arr["hydraulic_conductivity_out"] /= DAY_IN_SECONDS
arr["hydraulic_conductivity_in"] /= DAY_IN_SECONDS

attr_dict = arr_to_attr_dict(
arr,
{
"initial_water_level": "initial_waterlevel",
"exchange_type": "calculation_type",
"exchange_level": "drain_level",
"visualisation": "manhole_indicator",
"manhole_surface_level": "surface_level",
},
)

return ConnectionNodes(**attr_dict)
return ConnectionNodes(**{name: arr[name] for name in arr.dtype.names})

def get_cross_section_definition_for_table(self, table) -> np.ndarray:
with self.get_session() as session:
Expand Down Expand Up @@ -597,12 +562,12 @@ def get_culverts(self) -> Culverts:
models.Culvert.id,
models.Culvert.code,
models.Culvert.geom.label("the_geom"),
models.Culvert.calculation_point_distance,
models.Culvert.connection_node_id_start,
models.Culvert.connection_node_id_end,
models.Culvert.exchange_type,
models.Culvert.invert_level_start,
models.Culvert.invert_level_end,
models.Culvert.calculation_point_distance.label("dist_calc_points"),
models.Culvert.connection_node_id_start.label("connection_node_start_id"),
models.Culvert.connection_node_id_end.label("connection_node_end_id"),
models.Culvert.exchange_type.label("calculation_type"),
models.Culvert.invert_level_start.label("invert_level_start_point"),
models.Culvert.invert_level_end.label("invert_level_end_point"),
models.Culvert.discharge_coefficient_negative,
models.Culvert.discharge_coefficient_positive,
models.Culvert.display_name,
Expand Down Expand Up @@ -642,24 +607,14 @@ def get_culverts(self) -> Culverts:
arr["friction_type"][arr["friction_type"] == 4] = 2

# When no calculation type is provides we default to isolated
arr["exchange_type"][arr["exchange_type"] == -9999] = LineType.LINE_1D_ISOLATED
arr["calculation_type"][
arr["calculation_type"] == -9999
] = LineType.LINE_1D_ISOLATED
# map "old" calculation types (100, 101, 102, 105) to (0, 1, 2, 5)
arr["exchange_type"][arr["exchange_type"] >= 100] -= 100

attr_dict = arr_to_attr_dict(
arr,
{
"exchange_type": "calculation_type",
"calculation_point_distance": "dist_calc_points",
"invert_level_start": "invert_level_start_point",
"invert_level_end": "invert_level_end_point",
"connection_node_id_start": "connection_node_start_id",
"connection_node_id_end": "connection_node_end_id",
},
)
arr["calculation_type"][arr["calculation_type"] >= 100] -= 100

# transform to a CrossSectionLocations object
return Culverts(**attr_dict)
# transform to a Culverts object
return Culverts(**{name: arr[name] for name in arr.dtype.names})

def get_exchange_lines(self) -> ExchangeLines:
with self.get_session() as session:
Expand Down Expand Up @@ -687,7 +642,7 @@ def get_grid_refinements(self) -> GridRefinements:
models.GridRefinementLine.id,
models.GridRefinementLine.code,
models.GridRefinementLine.display_name,
models.GridRefinementLine.grid_level,
models.GridRefinementLine.grid_level.label("refinement_level"),
)
.filter(
models.GridRefinementLine.geom.isnot(None),
Expand All @@ -702,7 +657,7 @@ def get_grid_refinements(self) -> GridRefinements:
models.GridRefinementArea.id,
models.GridRefinementArea.code,
models.GridRefinementArea.display_name,
models.GridRefinementArea.grid_level,
models.GridRefinementArea.grid_level.label("refinement_level"),
)
.filter(
models.GridRefinementArea.geom.isnot(None),
Expand All @@ -715,10 +670,9 @@ def get_grid_refinements(self) -> GridRefinements:

# reproject
arr["the_geom"] = self.reproject(arr["the_geom"])
arr["id"] = np.arange(len(arr["grid_level"]))
arr["id"] = np.arange(len(arr["refinement_level"]))

attr_dict = arr_to_attr_dict(arr, {"grid_level": "refinement_level"})
return GridRefinements(**attr_dict)
return GridRefinements(**{name: arr[name] for name in arr.dtype.names})

def get_dem_average_areas(self) -> DemAverageAreas:
"""Return DemAverageAreas"""
Expand Down Expand Up @@ -751,16 +705,16 @@ def get_obstacles(self) -> Obstacles:

# reproject
arr["the_geom"] = self.reproject(arr["the_geom"])
# transform to a Channels object
# transform to a Obstacles object
return Obstacles(**{name: arr[name] for name in arr.dtype.names})

def get_orifices(self) -> Orifices:
"""Return Orifices"""
cols = [
models.Orifice.id,
models.Orifice.code,
models.Orifice.connection_node_id_start,
models.Orifice.connection_node_id_end,
models.Orifice.connection_node_id_start.label("connection_node_start_id"),
models.Orifice.connection_node_id_end.label("connection_node_end_id"),
models.Orifice.crest_level,
models.Orifice.crest_type,
models.Orifice.discharge_coefficient_negative,
Expand Down Expand Up @@ -797,34 +751,26 @@ def get_orifices(self) -> Orifices:
)
# map friction_type 4 to friction_type 2 to match crosssectionlocation enum
arr["friction_type"][arr["friction_type"] == 4] = 2
attr_dict = arr_to_attr_dict(
arr,
{
"exchange_type": "calculation_type",
"calculation_point_distance": "dist_calc_points",
"connection_node_id_start": "connection_node_start_id",
"connection_node_id_end": "connection_node_end_id",
},
)
return Orifices(**attr_dict)

return Orifices(**{name: arr[name] for name in arr.dtype.names})

def get_pipes(self) -> Pipes:
"""Return Pipes"""
cols = [
models.Pipe.id,
models.Pipe.code,
models.Pipe.sewerage_type,
models.Pipe.exchange_type,
models.Pipe.invert_level_start,
models.Pipe.invert_level_end,
models.Pipe.calculation_point_distance,
models.Pipe.connection_node_id_start,
models.Pipe.connection_node_id_end,
models.Pipe.exchange_type.label("calculation_type"),
models.Pipe.invert_level_start.label("invert_level_start_point"),
models.Pipe.invert_level_end.label("invert_level_end_point"),
models.Pipe.calculation_point_distance.label("dist_calc_points"),
models.Pipe.connection_node_id_start.label("connection_node_start_id"),
models.Pipe.connection_node_id_end.label("connection_node_end_id"),
models.Pipe.display_name,
models.Pipe.exchange_thickness,
models.Pipe.hydraulic_conductivity_out,
models.Pipe.hydraulic_conductivity_in,
models.Pipe.material_id,
models.Pipe.material_id.label("material"),
case(
{
models.Pipe.friction_value.isnot(None)
Expand Down Expand Up @@ -855,21 +801,8 @@ def get_pipes(self) -> Pipes:
arr["hydraulic_conductivity_out"] /= DAY_IN_SECONDS
arr["hydraulic_conductivity_in"] /= DAY_IN_SECONDS

attr_dict = arr_to_attr_dict(
arr,
{
"exchange_type": "calculation_type",
"calculation_point_distance": "dist_calc_points",
"material_id": "material",
"invert_level_start": "invert_level_start_point",
"invert_level_end": "invert_level_end_point",
"connection_node_id_start": "connection_node_start_id",
"connection_node_id_end": "connection_node_end_id",
},
)

# transform to a Pipes object
return Pipes(**attr_dict)
return Pipes(**{name: arr[name] for name in arr.dtype.names})

def get_pumps(self) -> Pumps:
with self.get_session() as session:
Expand All @@ -878,8 +811,10 @@ def get_pumps(self) -> Pumps:
models.Pump.id,
models.Pump.code,
models.Pump.capacity,
models.Pump.connection_node_id,
models.PumpMap.connection_node_id_end,
models.Pump.connection_node_id.label("connection_node_start_id"),
models.PumpMap.connection_node_id_end.label(
"connection_node_end_id"
),
models.Pump.type_,
models.Pump.start_level,
models.Pump.lower_stop_level,
Expand All @@ -894,24 +829,16 @@ def get_pumps(self) -> Pumps:
# Pump capacity is entered as L/s but we need m3/s.
arr["capacity"] = arr["capacity"] / 1000

attr_dict = arr_to_attr_dict(
arr,
{
"connection_node_id": "connection_node_start_id",
"connection_node_id_end": "connection_node_end_id",
},
)

# transform to a Pumps object
return Pumps(**attr_dict)
return Pumps(**{name: arr[name] for name in arr.dtype.names})

def get_weirs(self) -> Weirs:
"""Return Weirs"""
cols = [
models.Weir.id,
models.Weir.code,
models.Weir.connection_node_id_start,
models.Weir.connection_node_id_end,
models.Weir.connection_node_id_start.label("connection_node_start_id"),
models.Weir.connection_node_id_end.label("connection_node_end_id"),
models.Weir.crest_level,
models.Weir.crest_type,
models.Weir.discharge_coefficient_negative,
Expand Down Expand Up @@ -944,16 +871,8 @@ def get_weirs(self) -> Weirs:
)
# map friction_type 4 to friction_type 2 to match crosssectionlocation enum
arr["friction_type"][arr["friction_type"] == 4] = 2
attr_dict = arr_to_attr_dict(
arr,
{
"exchange_type": "calculation_type",
"calculation_point_distance": "dist_calc_points",
"connection_node_id_start": "connection_node_start_id",
"connection_node_id_end": "connection_node_end_id",
},
)
return Weirs(**attr_dict)

return Weirs(**{name: arr[name] for name in arr.dtype.names})

def get_windshieldings(self) -> Windshieldings:
with self.get_session() as session:
Expand Down Expand Up @@ -991,8 +910,10 @@ def get_potential_breaches(self) -> PotentialBreaches:

if self.get_version() >= 212:
cols += [
models.PotentialBreach.initial_exchange_level,
models.PotentialBreach.final_exchange_level,
models.PotentialBreach.initial_exchange_level.label("exchange_level"),
models.PotentialBreach.final_exchange_level.label(
"maximum_breach_depth"
),
models.PotentialBreach.levee_material,
]

Expand All @@ -1007,17 +928,11 @@ def get_potential_breaches(self) -> PotentialBreaches:
arr["the_geom"] = self.reproject(arr["the_geom"])
# derive maximum_breach_depth from initial and final exchange level
# and overwrite final_exchange_level because adding a field is more work
arr["final_exchange_level"] = (
arr["initial_exchange_level"] - arr["final_exchange_level"]
arr["maximum_breach_depth"] = (
arr["exchange_level"] - arr["maximum_breach_depth"]
)
attr_dict = arr_to_attr_dict(
arr,
{
"initial_exchange_level": "exchange_level",
"final_exchange_level": "maximum_breach_depth",
},
)
return PotentialBreaches(**attr_dict)

return PotentialBreaches(**{name: arr[name] for name in arr.dtype.names})


# Constructing a Transformer takes quite long, so we use caching here. The
Expand Down

0 comments on commit d950630

Please sign in to comment.