diff --git a/linopy/constants.py b/linopy/constants.py index 021a9a10..35e0367f 100644 --- a/linopy/constants.py +++ b/linopy/constants.py @@ -35,6 +35,12 @@ TERM_DIM = "_term" STACKED_TERM_DIM = "_stacked_term" + +# Piecewise linear constraint constants +PWL_LAMBDA_SUFFIX = "_lambda" +PWL_CONVEX_SUFFIX = "_convex" +PWL_LINK_SUFFIX = "_link" +DEFAULT_BREAKPOINT_DIM = "breakpoint" GROUPED_TERM_DIM = "_grouped_term" GROUP_DIM = "_group" FACTOR_DIM = "_factor" diff --git a/linopy/model.py b/linopy/model.py index 657b2d45..d81f06ea 100644 --- a/linopy/model.py +++ b/linopy/model.py @@ -35,9 +35,13 @@ to_path, ) from linopy.constants import ( + DEFAULT_BREAKPOINT_DIM, GREATER_EQUAL, HELPER_DIMS, LESS_EQUAL, + PWL_CONVEX_SUFFIX, + PWL_LAMBDA_SUFFIX, + PWL_LINK_SUFFIX, TERM_DIM, ModelStatus, TerminationCondition, @@ -130,6 +134,7 @@ class Model: "_cCounter", "_varnameCounter", "_connameCounter", + "_pwlCounter", "_blocks", # TODO: check if these should not be mutable "_chunk", @@ -180,6 +185,7 @@ def __init__( self._cCounter: int = 0 self._varnameCounter: int = 0 self._connameCounter: int = 0 + self._pwlCounter: int = 0 self._blocks: DataArray | None = None self._chunk: T_Chunks = chunk @@ -591,6 +597,274 @@ def add_sos_constraints( variable.attrs.update(sos_type=sos_type, sos_dim=sos_dim) + def add_piecewise_constraints( + self, + expr: Variable | LinearExpression | dict[str, Variable | LinearExpression], + breakpoints: DataArray, + link_dim: str | None = None, + dim: str = DEFAULT_BREAKPOINT_DIM, + mask: DataArray | None = None, + name: str | None = None, + skip_nan_check: bool = False, + ) -> Constraint: + """ + Add a piecewise linear constraint using SOS2 formulation. + + This method creates a piecewise linear constraint that links one or more + variables/expressions together via a set of breakpoints. It uses the SOS2 + (Special Ordered Set of type 2) formulation with lambda (interpolation) + variables. + + The SOS2 formulation ensures that at most two adjacent lambda variables + can be non-zero, effectively selecting a segment of the piecewise linear + function. + + Parameters + ---------- + expr : Variable, LinearExpression, or dict of these + The variable(s) or expression(s) to be linked by the piecewise constraint. + - If a single Variable/LinearExpression is passed, the breakpoints + directly specify the piecewise points for that expression. + - If a dict is passed, the keys must match coordinates in `link_dim` + of the breakpoints, allowing multiple expressions to be linked. + breakpoints : xr.DataArray + The breakpoint values defining the piecewise linear function. + Must have `dim` as one of its dimensions. If `expr` is a dict, + must also have `link_dim` dimension with coordinates matching the + dict keys. + link_dim : str, optional + The dimension in breakpoints that links to different expressions. + Required when `expr` is a dict. If None and `expr` is a dict, + will attempt to auto-detect from breakpoints dimensions. + dim : str, default "breakpoint" + The dimension in breakpoints that represents the breakpoint index. + This dimension's coordinates must be numeric (used as SOS2 weights). + mask : xr.DataArray, optional + Boolean mask indicating which piecewise constraints are valid. + If None, auto-detected from NaN values in breakpoints (unless + skip_nan_check is True). + name : str, optional + Base name for the generated variables and constraints. + If None, auto-generates names like "pwl0", "pwl1", etc. + skip_nan_check : bool, default False + If True, skip automatic NaN detection in breakpoints. Use this + when you know breakpoints contain no NaN values for better performance. + + Returns + ------- + Constraint + The convexity constraint (sum of lambda = 1). Lambda variables + and other constraints can be accessed via: + - `model.variables[f"{name}_lambda"]` + - `model.constraints[f"{name}_convex"]` + - `model.constraints[f"{name}_link"]` + + Raises + ------ + ValueError + If expr is not a Variable, LinearExpression, or dict of these. + If breakpoints doesn't have the required dim dimension. + If link_dim cannot be auto-detected when expr is a dict. + If link_dim coordinates don't match dict keys. + If dim coordinates are not numeric. + + Examples + -------- + Single variable piecewise constraint: + + >>> m = Model() + >>> x = m.add_variables(name="x") + >>> breakpoints = xr.DataArray([0, 10, 50, 100], dims=["bp"]) + >>> _ = m.add_piecewise_constraints(x, breakpoints, dim="bp") + + Using an expression: + + >>> m = Model() + >>> x = m.add_variables(name="x") + >>> y = m.add_variables(name="y") + >>> breakpoints = xr.DataArray([0, 10, 50, 100], dims=["bp"]) + >>> _ = m.add_piecewise_constraints(x + y, breakpoints, dim="bp") + + Multiple linked variables (e.g., power-efficiency curve): + + >>> m = Model() + >>> generators = ["gen1", "gen2"] + >>> power = m.add_variables(coords=[generators], name="power") + >>> efficiency = m.add_variables(coords=[generators], name="efficiency") + >>> breakpoints = xr.DataArray( + ... [[0, 50, 100], [0.8, 0.95, 0.9]], + ... coords={"var": ["power", "efficiency"], "bp": [0, 1, 2]}, + ... ) + >>> _ = m.add_piecewise_constraints( + ... {"power": power, "efficiency": efficiency}, + ... breakpoints, + ... link_dim="var", + ... dim="bp", + ... ) + + Notes + ----- + The piecewise linear constraint is formulated using SOS2 variables: + + 1. Lambda variables λ_i with bounds [0, 1] are created for each breakpoint + 2. SOS2 constraint ensures at most two adjacent λ_i can be non-zero + 3. Convexity constraint: Σ λ_i = 1 + 4. Linking constraints: expr = Σ λ_i × breakpoint_i (for each expression) + """ + # --- Input validation --- + if dim not in breakpoints.dims: + raise ValueError( + f"breakpoints must have dimension '{dim}', " + f"but only has dimensions {list(breakpoints.dims)}" + ) + + if not pd.api.types.is_numeric_dtype(breakpoints.coords[dim]): + raise ValueError( + f"Breakpoint dimension '{dim}' must have numeric coordinates " + f"for SOS2 weights, but got {breakpoints.coords[dim].dtype}" + ) + + # --- Generate names using counter --- + if name is None: + name = f"pwl{self._pwlCounter}" + self._pwlCounter += 1 + + lambda_name = f"{name}{PWL_LAMBDA_SUFFIX}" + convex_name = f"{name}{PWL_CONVEX_SUFFIX}" + link_name = f"{name}{PWL_LINK_SUFFIX}" + + # --- Determine lambda coordinates, mask, and target expression --- + is_single = isinstance(expr, Variable | LinearExpression) + is_dict = isinstance(expr, dict) + + if not is_single and not is_dict: + raise ValueError( + f"'expr' must be a Variable, LinearExpression, or dict of these, " + f"got {type(expr)}" + ) + + if is_single: + # Single expression case + assert isinstance(expr, Variable | LinearExpression) + target_expr = self._to_linexpr(expr) + # Build lambda coordinates from breakpoints dimensions + lambda_coords = [ + pd.Index(breakpoints.coords[d].values, name=d) for d in breakpoints.dims + ] + lambda_mask = self._compute_pwl_mask(mask, breakpoints, skip_nan_check) + + else: + # Dict case - need to validate link_dim and build stacked expression + assert isinstance(expr, dict) + expr_dict: dict[str, Variable | LinearExpression] = expr + expr_keys = set(expr_dict.keys()) + + # Auto-detect or validate link_dim + link_dim = self._resolve_pwl_link_dim(link_dim, breakpoints, dim, expr_keys) + + # Build lambda coordinates (exclude link_dim) + lambda_coords = [ + pd.Index(breakpoints.coords[d].values, name=d) + for d in breakpoints.dims + if d != link_dim + ] + + # Compute mask + base_mask = self._compute_pwl_mask(mask, breakpoints, skip_nan_check) + lambda_mask = base_mask.any(dim=link_dim) if base_mask is not None else None + + # Build stacked expression from dict + target_expr = self._build_stacked_expr(expr_dict, breakpoints, link_dim) + + # --- Common: Create lambda, SOS2, convexity, and linking constraints --- + lambda_var = self.add_variables( + lower=0, upper=1, coords=lambda_coords, name=lambda_name, mask=lambda_mask + ) + + self.add_sos_constraints(lambda_var, sos_type=2, sos_dim=dim) + + convex_con = self.add_constraints( + lambda_var.sum(dim=dim) == 1, name=convex_name + ) + + weighted_sum = (lambda_var * breakpoints).sum(dim=dim) + self.add_constraints(target_expr == weighted_sum, name=link_name) + + return convex_con + + def _to_linexpr(self, expr: Variable | LinearExpression) -> LinearExpression: + """Convert Variable or LinearExpression to LinearExpression.""" + if isinstance(expr, LinearExpression): + return expr + return expr.to_linexpr() + + def _compute_pwl_mask( + self, + mask: DataArray | None, + breakpoints: DataArray, + skip_nan_check: bool, + ) -> DataArray | None: + """Compute mask for piecewise constraint, optionally skipping NaN check.""" + if mask is not None: + return mask + if skip_nan_check: + return None + return ~breakpoints.isnull() + + def _resolve_pwl_link_dim( + self, + link_dim: str | None, + breakpoints: DataArray, + dim: str, + expr_keys: set[str], + ) -> str: + """Auto-detect or validate link_dim for dict case.""" + if link_dim is None: + for d in breakpoints.dims: + if d == dim: + continue + coords_set = set(str(c) for c in breakpoints.coords[d].values) + if coords_set == expr_keys: + return str(d) + raise ValueError( + "Could not auto-detect link_dim. Please specify it explicitly. " + f"Breakpoint dimensions: {list(breakpoints.dims)}, " + f"expression keys: {list(expr_keys)}" + ) + + if link_dim not in breakpoints.dims: + raise ValueError( + f"link_dim '{link_dim}' not found in breakpoints dimensions " + f"{list(breakpoints.dims)}" + ) + coords_set = set(str(c) for c in breakpoints.coords[link_dim].values) + if coords_set != expr_keys: + raise ValueError( + f"link_dim '{link_dim}' coordinates {coords_set} " + f"don't match expression keys {expr_keys}" + ) + return link_dim + + def _build_stacked_expr( + self, + expr_dict: dict[str, Variable | LinearExpression], + breakpoints: DataArray, + link_dim: str, + ) -> LinearExpression: + """Build a stacked LinearExpression from a dict of Variables/Expressions.""" + link_coords = list(breakpoints.coords[link_dim].values) + + # Collect expression data and stack + expr_data_list = [] + for k in link_coords: + e = expr_dict[str(k)] + linexpr = self._to_linexpr(e) + expr_data_list.append(linexpr.data.expand_dims({link_dim: [k]})) + + # Concatenate along link_dim + stacked_data = xr.concat(expr_data_list, dim=link_dim) + return LinearExpression(stacked_data, self) + def add_constraints( self, lhs: VariableLike diff --git a/test/test_piecewise_constraints.py b/test/test_piecewise_constraints.py new file mode 100644 index 00000000..d1351690 --- /dev/null +++ b/test/test_piecewise_constraints.py @@ -0,0 +1,521 @@ +"""Tests for piecewise linear constraints.""" + +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +from linopy import Model, available_solvers +from linopy.constants import PWL_CONVEX_SUFFIX, PWL_LAMBDA_SUFFIX, PWL_LINK_SUFFIX + + +class TestBasicSingleVariable: + """Tests for single variable piecewise constraints.""" + + def test_basic_single_variable(self) -> None: + """Test basic piecewise constraint with a single variable.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0, 10, 50, 100], dims=["bp"], coords={"bp": [0, 1, 2, 3]} + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Check lambda variables were created + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + + # Check constraints were created + assert f"pwl0{PWL_CONVEX_SUFFIX}" in m.constraints + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + # Check SOS2 constraint was added + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert lambda_var.attrs.get("sos_type") == 2 + assert lambda_var.attrs.get("sos_dim") == "bp" + + def test_single_variable_with_coords(self) -> None: + """Test piecewise constraint with a variable that has coordinates.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + x = m.add_variables(coords=[generators], name="x") + + bp_coords = [0, 1, 2] + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 30, 80]], + dims=["generator", "bp"], + coords={"generator": generators, "bp": bp_coords}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Lambda should have both generator and bp dimensions + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert "generator" in lambda_var.dims + assert "bp" in lambda_var.dims + + +class TestDictOfVariables: + """Tests for dict of variables (multiple linked variables).""" + + def test_dict_of_variables(self) -> None: + """Test piecewise constraint with multiple linked variables.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0.8, 0.95, 0.9]], + dims=["var", "bp"], + coords={"var": ["power", "efficiency"], "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + # Check single linking constraint was created for all variables + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_dict_with_coordinates(self) -> None: + """Test dict of variables with additional coordinates.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + power = m.add_variables(coords=[generators], name="power") + efficiency = m.add_variables(coords=[generators], name="efficiency") + + breakpoints = xr.DataArray( + [[[0, 50, 100], [0.8, 0.95, 0.9]], [[0, 30, 80], [0.75, 0.9, 0.85]]], + dims=["generator", "var", "bp"], + coords={ + "generator": generators, + "var": ["power", "efficiency"], + "bp": [0, 1, 2], + }, + ) + + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + # Lambda should have generator and bp dimensions (not var) + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert "generator" in lambda_var.dims + assert "bp" in lambda_var.dims + assert "var" not in lambda_var.dims + + +class TestAutoDetectLinkDim: + """Tests for auto-detection of link_dim.""" + + def test_auto_detect_link_dim(self) -> None: + """Test that link_dim is auto-detected from breakpoints.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0.8, 0.95, 0.9]], + dims=["var", "bp"], + coords={"var": ["power", "efficiency"], "bp": [0, 1, 2]}, + ) + + # Should auto-detect link_dim="var" + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + dim="bp", + ) + + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_auto_detect_fails_with_no_match(self) -> None: + """Test that auto-detection fails when no dimension matches keys.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + # Dimension 'wrong' doesn't match variable keys + breakpoints = xr.DataArray( + [[0, 50, 100], [0.8, 0.95, 0.9]], + dims=["wrong", "bp"], + coords={"wrong": ["a", "b"], "bp": [0, 1, 2]}, + ) + + with pytest.raises(ValueError, match="Could not auto-detect link_dim"): + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + dim="bp", + ) + + +class TestMasking: + """Tests for masking functionality.""" + + def test_nan_masking(self) -> None: + """Test that NaN values in breakpoints create masked constraints.""" + m = Model() + x = m.add_variables(name="x") + + # Third breakpoint is NaN + breakpoints = xr.DataArray( + [0, 10, np.nan, 100], + dims=["bp"], + coords={"bp": [0, 1, 2, 3]}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Lambda for NaN breakpoint should be masked + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + # Check that at least some labels are valid + assert (lambda_var.labels != -1).any() + + def test_explicit_mask(self) -> None: + """Test user-provided mask.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + x = m.add_variables(coords=[generators], name="x") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 30, 80]], + dims=["generator", "bp"], + coords={"generator": generators, "bp": [0, 1, 2]}, + ) + + # Mask out gen2 + mask = xr.DataArray( + [[True, True, True], [False, False, False]], + dims=["generator", "bp"], + coords={"generator": generators, "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", mask=mask) + + # Should still create variables and constraints + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + + def test_skip_nan_check(self) -> None: + """Test skip_nan_check parameter for performance.""" + m = Model() + x = m.add_variables(name="x") + + # Breakpoints with no NaNs + breakpoints = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + + # Should work with skip_nan_check=True + m.add_piecewise_constraints(x, breakpoints, dim="bp", skip_nan_check=True) + + # All lambda variables should be valid (no masking) + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert (lambda_var.labels != -1).all() + + +class TestMultiDimensional: + """Tests for multi-dimensional piecewise constraints.""" + + def test_multi_dimensional(self) -> None: + """Test piecewise constraint with multiple loop dimensions.""" + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + timesteps = pd.Index([0, 1, 2], name="time") + x = m.add_variables(coords=[generators, timesteps], name="x") + + rng = np.random.default_rng(42) + breakpoints = xr.DataArray( + rng.random((2, 3, 4)) * 100, + dims=["generator", "time", "bp"], + coords={"generator": generators, "time": timesteps, "bp": [0, 1, 2, 3]}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Lambda should have all dimensions + lambda_var = m.variables[f"pwl0{PWL_LAMBDA_SUFFIX}"] + assert "generator" in lambda_var.dims + assert "time" in lambda_var.dims + assert "bp" in lambda_var.dims + + +class TestValidationErrors: + """Tests for input validation.""" + + def test_invalid_vars_type(self) -> None: + """Test error when expr is not Variable, LinearExpression, or dict.""" + m = Model() + + breakpoints = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + + with pytest.raises( + ValueError, match="must be a Variable, LinearExpression, or dict" + ): + m.add_piecewise_constraints("invalid", breakpoints, dim="bp") # type: ignore + + def test_missing_dim(self) -> None: + """Test error when breakpoints don't have the required dim.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray([0, 10, 50], dims=["wrong"]) + + with pytest.raises(ValueError, match="must have dimension"): + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + def test_non_numeric_dim(self) -> None: + """Test error when dim coordinates are not numeric.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0, 10, 50], + dims=["bp"], + coords={"bp": ["a", "b", "c"]}, # Non-numeric + ) + + with pytest.raises(ValueError, match="numeric coordinates"): + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + def test_expression_support(self) -> None: + """Test that LinearExpression is supported as input.""" + m = Model() + x = m.add_variables(name="x") + y = m.add_variables(name="y") + + breakpoints = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + + # Should work with a LinearExpression + m.add_piecewise_constraints(x + y, breakpoints, dim="bp") + + # Check constraints were created + assert f"pwl0{PWL_LINK_SUFFIX}" in m.constraints + + def test_link_dim_not_in_breakpoints(self) -> None: + """Test error when link_dim is not in breakpoints.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + breakpoints = xr.DataArray([0, 50, 100], dims=["bp"], coords={"bp": [0, 1, 2]}) + + with pytest.raises(ValueError, match="not found in breakpoints dimensions"): + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + def test_link_dim_coords_mismatch(self) -> None: + """Test error when link_dim coords don't match dict keys.""" + m = Model() + power = m.add_variables(name="power") + efficiency = m.add_variables(name="efficiency") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0.8, 0.95, 0.9]], + dims=["var", "bp"], + coords={"var": ["wrong1", "wrong2"], "bp": [0, 1, 2]}, + ) + + with pytest.raises(ValueError, match="don't match expression keys"): + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + +class TestNameGeneration: + """Tests for automatic name generation.""" + + def test_auto_name_generation(self) -> None: + """Test that names are auto-generated correctly.""" + m = Model() + x = m.add_variables(name="x") + y = m.add_variables(name="y") + + bp1 = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + bp2 = xr.DataArray([0, 20, 80], dims=["bp"], coords={"bp": [0, 1, 2]}) + + m.add_piecewise_constraints(x, bp1, dim="bp") + m.add_piecewise_constraints(y, bp2, dim="bp") + + assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables + assert f"pwl1{PWL_LAMBDA_SUFFIX}" in m.variables + + def test_custom_name(self) -> None: + """Test using a custom name.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray([0, 10, 50], dims=["bp"], coords={"bp": [0, 1, 2]}) + + m.add_piecewise_constraints(x, breakpoints, dim="bp", name="my_pwl") + + assert f"my_pwl{PWL_LAMBDA_SUFFIX}" in m.variables + assert f"my_pwl{PWL_CONVEX_SUFFIX}" in m.constraints + assert f"my_pwl{PWL_LINK_SUFFIX}" in m.constraints + + +class TestLPFileOutput: + """Tests for LP file output with piecewise constraints.""" + + def test_piecewise_written_to_lp(self, tmp_path: Path) -> None: + """Test that piecewise constraints are properly written to LP file.""" + m = Model() + x = m.add_variables(name="x") + + breakpoints = xr.DataArray( + [0.0, 10.0, 50.0], + dims=["bp"], + coords={"bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints(x, breakpoints, dim="bp") + + # Add a simple objective to make it a valid LP + m.add_objective(x) + + fn = tmp_path / "pwl.lp" + m.to_file(fn, io_api="lp") + content = fn.read_text() + + # Should contain SOS2 section + assert "\nsos\n" in content.lower() + assert "s2" in content.lower() + + +@pytest.mark.skipif("gurobi" not in available_solvers, reason="Gurobi not installed") +class TestSolverIntegration: + """Integration tests with Gurobi solver.""" + + def test_solve_single_variable(self) -> None: + """Test solving a model with piecewise constraint.""" + gurobipy = pytest.importorskip("gurobipy") + + m = Model() + # Variable that should be between 0 and 100 + x = m.add_variables(lower=0, upper=100, name="x") + + # Piecewise linear cost function: cost = f(x) + # f(0) = 0, f(50) = 10, f(100) = 50 + cost = m.add_variables(name="cost") + + breakpoints = xr.DataArray( + [[0, 50, 100], [0, 10, 50]], + dims=["var", "bp"], + coords={"var": ["x", "cost"], "bp": [0, 1, 2]}, + ) + + m.add_piecewise_constraints( + {"x": x, "cost": cost}, breakpoints, link_dim="var", dim="bp" + ) + + # Minimize cost, but need x >= 50 to make it interesting + m.add_constraints(x >= 50, name="x_min") + m.add_objective(cost) + + try: + status, cond = m.solve(solver_name="gurobi", io_api="direct") + except gurobipy.GurobiError as exc: + pytest.skip(f"Gurobi environment unavailable: {exc}") + + assert status == "ok" + # At x=50, cost should be 10 + assert np.isclose(x.solution.values, 50, atol=1e-5) + assert np.isclose(cost.solution.values, 10, atol=1e-5) + + def test_solve_efficiency_curve(self) -> None: + """Test solving with a realistic efficiency curve.""" + gurobipy = pytest.importorskip("gurobipy") + + m = Model() + power = m.add_variables(lower=0, upper=100, name="power") + efficiency = m.add_variables(name="efficiency") + + # Efficiency curve: starts low, peaks, then decreases + # power: 0 25 50 75 100 + # efficiency: 0.7 0.85 0.95 0.9 0.8 + breakpoints = xr.DataArray( + [[0, 25, 50, 75, 100], [0.7, 0.85, 0.95, 0.9, 0.8]], + dims=["var", "bp"], + coords={"var": ["power", "efficiency"], "bp": [0, 1, 2, 3, 4]}, + ) + + m.add_piecewise_constraints( + {"power": power, "efficiency": efficiency}, + breakpoints, + link_dim="var", + dim="bp", + ) + + # Maximize efficiency + m.add_objective(efficiency, sense="max") + + try: + status, cond = m.solve(solver_name="gurobi", io_api="direct") + except gurobipy.GurobiError as exc: + pytest.skip(f"Gurobi environment unavailable: {exc}") + + assert status == "ok" + # Maximum efficiency is at power=50 + assert np.isclose(power.solution.values, 50, atol=1e-5) + assert np.isclose(efficiency.solution.values, 0.95, atol=1e-5) + + def test_solve_multi_generator(self) -> None: + """Test with multiple generators each with different curves.""" + gurobipy = pytest.importorskip("gurobipy") + + m = Model() + generators = pd.Index(["gen1", "gen2"], name="generator") + power = m.add_variables(lower=0, upper=100, coords=[generators], name="power") + cost = m.add_variables(coords=[generators], name="cost") + + # Different cost curves for each generator + # gen1: cheaper at low power, expensive at high + # gen2: more expensive at low power, cheaper at high + breakpoints = xr.DataArray( + [ + [[0, 50, 100], [0, 5, 30]], # gen1: power, cost + [[0, 50, 100], [0, 15, 20]], # gen2: power, cost + ], + dims=["generator", "var", "bp"], + coords={ + "generator": generators, + "var": ["power", "cost"], + "bp": [0, 1, 2], + }, + ) + + m.add_piecewise_constraints( + {"power": power, "cost": cost}, breakpoints, link_dim="var", dim="bp" + ) + + # Need total power of 120 + m.add_constraints(power.sum() >= 120, name="demand") + + # Minimize total cost + m.add_objective(cost.sum()) + + try: + status, cond = m.solve(solver_name="gurobi", io_api="direct") + except gurobipy.GurobiError as exc: + pytest.skip(f"Gurobi environment unavailable: {exc}") + + assert status == "ok" + # gen1 should provide ~50 (cheap up to 50), gen2 provides rest + total_power = power.solution.sum().values + assert np.isclose(total_power, 120, atol=1e-5)