diff --git a/linopy/io.py b/linopy/io.py index 4dc4dc02..a3b08eef 100644 --- a/linopy/io.py +++ b/linopy/io.py @@ -5,6 +5,7 @@ from __future__ import annotations +import copy as _copy import json import logging import shutil @@ -422,16 +423,26 @@ def sos_to_file( for name in names: var = m.variables[name] - sos_type = var.attrs[SOS_TYPE_ATTR] - sos_dim = var.attrs[SOS_DIM_ATTR] + sos_type = int(var.attrs[SOS_TYPE_ATTR]) # type: ignore[call-overload] + sos_dim = str(var.attrs[SOS_DIM_ATTR]) other_dims = [dim for dim in var.labels.dims if dim != sos_dim] for var_slice in var.iterate_slices(slice_size, other_dims): ds = var_slice.labels.to_dataset() - ds["sos_labels"] = ds["labels"].isel({sos_dim: 0}) + # Per-set id: max of labels along the SOS dim. Real labels are + # non-negative and globally unique, so max yields a valid, + # unique-per-set id whenever the set has any unmasked slot. + # Fully-masked sets get id -1 and are filtered out below. + ds["sos_labels"] = ds["labels"].max(sos_dim) ds["weights"] = ds.coords[sos_dim] df = to_polars(ds) + # Drop masked member rows so the LP file never emits `x-1`, and + # drop any rows belonging to a fully-masked set (sos_labels == -1). + df = df.filter((pl.col("labels") != -1) & (pl.col("sos_labels") != -1)) + if df.is_empty(): + continue + df = df.group_by("sos_labels").agg( pl.concat_str( *print_variable(pl.col("labels")), pl.lit(":"), pl.col("weights") @@ -591,8 +602,6 @@ def to_file( """ Write out a model to a lp or mps file. """ - m._check_sos_unmasked() - if fn is None: fn = Path(m.get_problem_file()) if isinstance(fn, str): @@ -845,7 +854,19 @@ def to_netcdf(m: Model, *args: Any, **kwargs: Any) -> None: Arguments passed to ``xarray.Dataset.to_netcdf``. **kwargs : TYPE Keyword arguments passed to ``xarray.Dataset.to_netcdf``. + + Raises + ------ + RuntimeError + If the model has an active SOS reformulation. Call + :meth:`Model.undo_sos_reformulation` before serializing. """ + if m._sos_reformulation_state is not None: + raise RuntimeError( + "Cannot serialize a model with an active SOS reformulation. " + "Call `model.undo_sos_reformulation()` first to restore the " + "original SOS form before saving." + ) def with_prefix(ds: xr.Dataset, prefix: str) -> xr.Dataset: to_rename = set([*ds.dims, *ds.coords, *ds]) @@ -1117,6 +1138,9 @@ def copy(m: Model, include_solution: bool = False, deep: bool = True) -> Model: if include_solution or attr not in SOLVE_STATE_ATTRS: setattr(new_model, attr, getattr(m, attr)) + if m._sos_reformulation_state is not None: + new_model._sos_reformulation_state = _copy.deepcopy(m._sos_reformulation_state) + return new_model diff --git a/linopy/model.py b/linopy/model.py index 03fd9479..d853b793 100644 --- a/linopy/model.py +++ b/linopy/model.py @@ -90,6 +90,7 @@ available_solvers, ) from linopy.sos_reformulation import ( + SOSReformulationResult, reformulate_sos_constraints, undo_sos_reformulation, ) @@ -240,6 +241,7 @@ class Model: "_relaxed_registry", "_piecewise_formulations", "_solver", + "_sos_reformulation_state", "__weakref__", ) @@ -310,6 +312,7 @@ def __init__( gettempdir() if solver_dir is None else solver_dir ) self._solver: solvers.Solver | None = None + self._sos_reformulation_state: SOSReformulationResult | None = None @property def solver(self) -> solvers.Solver | None: @@ -1221,33 +1224,43 @@ def remove_sos_constraints(self, variable: Variable) -> None: reformulate_sos_constraints = reformulate_sos_constraints - def _check_sos_unmasked(self) -> None: + def apply_sos_reformulation(self) -> None: """ - Reject the model if any SOS variable has masked entries. + Reformulate SOS constraints into binary + linear form, in place. - The SOS plumbing (both direct-API solvers and the LP file writer) treats - linopy variable labels as solver column indices / names, which breaks as - soon as a label is ``-1`` (linopy's ``FILL_VALUE["labels"]`` for masked - slots). The downstream symptoms are solver-specific — ``IndexError`` on - gurobipy, ``?404 Invalid column number`` on xpress, parse errors on - xpress/cplex LP readers, silent SOS-set corruption on gurobi's LP reader. + The reformulation token is stored on the model so it can be reverted + with :meth:`undo_sos_reformulation`. This is the stateful counterpart + to :func:`linopy.sos_reformulation.reformulate_sos_constraints`, where + the caller owns the token. - Surface a single clear error until #688 lands the proper fix. + Raises + ------ + RuntimeError + If a reformulation has already been applied and not undone. """ - if not self.variables.sos: - return - affected = [ - name - for name in self.variables.sos - if (self.variables[name].labels.values == -1).any() - ] - if affected: - raise NotImplementedError( - f"SOS constraints on masked variables are not yet supported " - f"(affected: {affected}; " - "see https://github.com/PyPSA/linopy/issues/688). " - "Pass reformulate_sos=True as a workaround." + if self._sos_reformulation_state is not None: + raise RuntimeError( + "SOS reformulation has already been applied to this model. " + "Call `undo_sos_reformulation()` before applying again." ) + self._sos_reformulation_state = reformulate_sos_constraints(self) + + def undo_sos_reformulation(self) -> None: + """ + Revert a previously applied SOS reformulation. + + Raises + ------ + RuntimeError + If no reformulation is currently applied. + """ + if self._sos_reformulation_state is None: + raise RuntimeError( + "No SOS reformulation is currently applied to this model." + ) + state = self._sos_reformulation_state + self._sos_reformulation_state = None + undo_sos_reformulation(self, state) def remove_objective(self) -> None: """ @@ -1642,12 +1655,6 @@ def solve( sanitize_zeros=sanitize_zeros, sanitize_infinities=sanitize_infinities ) - if self.objective.expression.empty: - raise ValueError( - "No objective has been set on the model. Use `m.add_objective(...)` " - "first (e.g. `m.add_objective(0 * x)` for a pure feasibility problem)." - ) - # check io_api if io_api is not None and io_api not in IO_APIS: raise ValueError( @@ -1655,6 +1662,16 @@ def solve( ) if remote is not None: + # The remote branch short-circuits before reaching Solver.solve(), + # which is where the empty-objective check normally fires. Replicate + # it here. This duplication becomes obsolete once OETC is folded + # into the Solver pipeline (see PyPSA/linopy#683). + if self.objective.expression.empty: + raise ValueError( + "No objective has been set on the model. Use " + "`m.add_objective(...)` first (e.g. `m.add_objective(0 * x)` " + "for a pure feasibility problem)." + ) if isinstance(remote, OetcHandler): solved = remote.solve_on_oetc( self, solver_name=solver_name, **solver_options @@ -1720,26 +1737,13 @@ def solve( else: solution_fn = self.get_solution_file() - if sanitize_zeros: - self.constraints.sanitize_zeros() - - if sanitize_infinities: - self.constraints.sanitize_infinities() - - if self.is_quadratic and not solver_class.supports( - SolverFeature.QUADRATIC_OBJECTIVE - ): - raise ValueError( - f"Solver {solver_name} does not support quadratic problems." - ) - if reformulate_sos not in (True, False, "auto"): raise ValueError( f"Invalid value for reformulate_sos: {reformulate_sos!r}. " "Must be True, False, or 'auto'." ) - sos_reform_result = None + applied_sos_reformulation_here = False if self.variables.sos: supports_sos = solver_class.supports(SolverFeature.SOS_CONSTRAINTS) should_reformulate = reformulate_sos is True or ( @@ -1748,67 +1752,90 @@ def solve( if should_reformulate: logger.info(f"Reformulating SOS constraints for solver {solver_name}") - sos_reform_result = reformulate_sos_constraints(self) - elif reformulate_sos is False and not supports_sos: - raise ValueError( - f"Solver {solver_name} does not support SOS constraints. " - "Use reformulate_sos=True or 'auto', or a solver that supports SOS." - ) - - if self.variables.semi_continuous: - if not solver_class.supports(SolverFeature.SEMI_CONTINUOUS_VARIABLES): - raise ValueError( - f"Solver {solver_name} does not support semi-continuous variables. " - "Use a solver that supports them (gurobi, cplex, highs)." - ) + self.apply_sos_reformulation() + applied_sos_reformulation_here = True + # If SOS is present and the solver doesn't support it (and the user + # didn't ask for reformulation), Solver._build() will raise. try: - self.solver = None # closes any previous solver - if io_api == "direct": - if set_names is None: - set_names = self.set_names_in_solver_io - build_kwargs: dict[str, Any] = { - "explicit_coordinate_names": explicit_coordinate_names, - "set_names": set_names, - "log_fn": to_path(log_fn), - } - if env is not None: - build_kwargs["env"] = env - else: - build_kwargs = { - "explicit_coordinate_names": explicit_coordinate_names, - "slice_size": slice_size, - "progress": progress, - "problem_fn": to_path(problem_fn), - } - self.solver = solver = solvers.Solver.from_name( - solver_name, - model=self, - io_api=io_api, - options=solver_options, - **build_kwargs, - ) - if io_api != "direct": - problem_fn = solver._problem_fn - result = solver.solve( - solution_fn=to_path(solution_fn), - log_fn=to_path(log_fn), - warmstart_fn=to_path(warmstart_fn), - basis_fn=to_path(basis_fn), - env=env, - ) - finally: - for fn in (problem_fn, solution_fn): - if fn is not None and (os.path.exists(fn) and not keep_files): - os.remove(fn) + if sanitize_zeros: + self.constraints.sanitize_zeros() + if sanitize_infinities: + self.constraints.sanitize_infinities() + + try: + self.solver = None # closes any previous solver + if io_api == "direct": + if set_names is None: + set_names = self.set_names_in_solver_io + build_kwargs: dict[str, Any] = { + "explicit_coordinate_names": explicit_coordinate_names, + "set_names": set_names, + "log_fn": to_path(log_fn), + } + if env is not None: + build_kwargs["env"] = env + else: + build_kwargs = { + "explicit_coordinate_names": explicit_coordinate_names, + "slice_size": slice_size, + "progress": progress, + "problem_fn": to_path(problem_fn), + } + self.solver = solver = solvers.Solver.from_name( + solver_name, + model=self, + io_api=io_api, + options=solver_options, + **build_kwargs, + ) + if io_api != "direct": + problem_fn = solver._problem_fn + result = solver.solve( + solution_fn=to_path(solution_fn), + log_fn=to_path(log_fn), + warmstart_fn=to_path(warmstart_fn), + basis_fn=to_path(basis_fn), + env=env, + ) + finally: + for fn in (problem_fn, solution_fn): + if fn is not None and (os.path.exists(fn) and not keep_files): + os.remove(fn) - try: return self.assign_result(result) finally: - if sos_reform_result is not None: - undo_sos_reformulation(self, sos_reform_result) + if applied_sos_reformulation_here: + self.undo_sos_reformulation() + + def assign_result( + self, + result: Result, + solver: solvers.Solver | None = None, + ) -> tuple[str, str]: + """ + Write a solver Result back onto the model. + + Copies primal / dual values onto variables / constraints, sets + :attr:`status`, :attr:`termination_condition`, and + :attr:`objective.value`. When ``solver`` is provided, also stores it on + ``self.solver`` so post-solve introspection (``model.solver_model``, + ``compute_infeasibilities()``) works. + + Parameters + ---------- + result : Result + The :class:`linopy.constants.Result` returned by + :meth:`linopy.solvers.Solver.solve`. + solver : Solver, optional + The solver instance that produced the result. Pass it on the + low-level ``Solver.from_name(...).solve()`` path to attach it as + ``self.solver`` for post-solve introspection. ``Model.solve()`` + attaches the solver itself and does not pass this argument. + """ + if solver is not None: + self.solver = solver - def assign_result(self, result: Result) -> tuple[str, str]: result.info() if result.solution is not None: diff --git a/linopy/solvers.py b/linopy/solvers.py index 9466db0f..7b11b6ee 100644 --- a/linopy/solvers.py +++ b/linopy/solvers.py @@ -19,7 +19,7 @@ import warnings from abc import ABC from collections import namedtuple -from collections.abc import Callable, Generator, Iterator, Sequence +from collections.abc import Callable, Generator, Iterable, Iterator, Sequence from dataclasses import dataclass, field from enum import Enum, auto from importlib.metadata import PackageNotFoundError @@ -106,6 +106,75 @@ def _solution_from_labels( return values_to_lookup_array(np.asarray(values, dtype=float), labels, size=size) +def _sos_set_positions( + labels: np.ndarray, weights: np.ndarray, label_to_pos: np.ndarray +) -> tuple[list[int], list[float]]: + """ + Convert a SOS set's linopy labels to solver column positions. + + Direct-API solvers (gurobi, xpress) accept SOS members as 0-based column + positions in the solver's variable array, which corresponds to the active + (non-masked) variable order — i.e., the order of + ``model.variables.label_index.vlabels``. Masked entries (label ``-1``) are + dropped along with their weights. + + Parameters + ---------- + labels : np.ndarray + Flat array of linopy labels for the SOS members. + weights : np.ndarray + Matching weights; same length as ``labels``. + label_to_pos : np.ndarray + ``model.variables.label_index.label_to_pos`` — lookup of label → + active-variable position. + + Returns + ------- + tuple[list[int], list[float]] + Solver column positions and matching weights, with masked entries + removed. + """ + mask = labels != -1 + return ( + label_to_pos[labels[mask]].tolist(), + weights[mask].tolist(), + ) + + +def _iter_sos_sets(model: Model) -> Iterator[tuple[int, list[int], list[float]]]: + """ + Yield ``(sos_type, positions, weights)`` per active SOS set in ``model``. + + Iterates 1D SOS variables as a single set and multi-dim SOS variables as + one set per non-SOS-dim coordinate. Masked members are dropped, surviving + linopy labels are resolved to solver column positions via + ``_sos_set_positions``, and empty sets are skipped. + + Shared between direct-API solvers (Gurobi, Xpress). Each solver only + differs in the vendor ``addSOS`` call. + """ + label_to_pos = model.variables.label_index.label_to_pos + for var_name in model.variables.sos: + var = model.variables.sos[var_name] + sos_type = int(var.attrs[SOS_TYPE_ATTR]) # type: ignore[call-overload] + sos_dim = str(var.attrs[SOS_DIM_ATTR]) + others = [d for d in var.labels.dims if d != sos_dim] + + if not others: + sets: Iterable[xr.DataArray] = [var.labels] + else: + stacked = var.labels.stack(_sos_group=others) + sets = (s.unstack("_sos_group") for _, s in stacked.groupby("_sos_group")) + + for s in sets: + s = s.squeeze() + labels = s.values.flatten() + weights = s.coords[sos_dim].values + positions, kept_weights = _sos_set_positions(labels, weights, label_to_pos) + if positions: + yield sos_type, positions, kept_weights + + class SolverFeature(Enum): """Enumeration of all solver capabilities tracked by linopy.""" @@ -504,15 +573,51 @@ def from_model( return instance def _build(self, **build_kwargs: Any) -> None: - """Dispatch to direct or file build based on ``io_api``.""" + """ + Dispatch to direct or file build based on ``io_api``. + + The Solver never mutates ``self.model``. Constraint sanitization + (``model.constraints.sanitize_zeros()`` / + ``.sanitize_infinities()``) and SOS reformulation + (``model.apply_sos_reformulation()``) are Model-level operations + the caller applies first; this builder consumes whatever shape it + is handed. + """ if self.model is None: raise RuntimeError("Solver has no model attached; cannot build.") - self.model._check_sos_unmasked() + self._validate_model() if self.io_api == "direct": self._build_direct(**build_kwargs) else: self._build_file(**build_kwargs) + def _validate_model(self) -> None: + """Pre-build checks on whether this solver can handle ``self.model``.""" + model = self.model + assert model is not None + solver_name = self.solver_name.value + cls = type(self) + + if model.is_quadratic and not cls.supports(SolverFeature.QUADRATIC_OBJECTIVE): + raise ValueError( + f"Solver {solver_name} does not support quadratic problems." + ) + + if model.variables.semi_continuous and not cls.supports( + SolverFeature.SEMI_CONTINUOUS_VARIABLES + ): + raise ValueError( + f"Solver {solver_name} does not support semi-continuous variables. " + "Use a solver that supports them (gurobi, cplex, highs)." + ) + + if model.variables.sos and not cls.supports(SolverFeature.SOS_CONSTRAINTS): + raise ValueError( + f"Solver {solver_name} does not support SOS constraints. " + "Reformulate first via `Model.solve(reformulate_sos=True)` or " + "`model.apply_sos_reformulation()`, or use a solver that supports SOS." + ) + def _build_direct(self, **build_kwargs: Any) -> None: """Build the native solver model from ``self.model``. Override per-solver.""" raise NotImplementedError( @@ -553,7 +658,30 @@ def _build_file(self, **build_kwargs: Any) -> None: self._cache_model_sizes(model) def solve(self, **run_kwargs: Any) -> Result: - """Run the prepared solver and return a :class:`Result`.""" + """ + Run the prepared solver and return a :class:`Result`. + + The canonical low-level pattern is:: + + solver = Solver.from_name("gurobi", model, io_api="direct") + result = solver.solve() + model.assign_result(result, solver=solver) + + Passing ``solver=`` to :meth:`Model.assign_result` wires + ``model.solver`` so post-solve helpers like + :meth:`Model.compute_infeasibilities` keep working. + + Raises + ------ + ValueError + If the attached model has no objective set. Submit-time check + shared by both ``Model.solve()`` and direct-Solver callers. + """ + if self.model is not None and self.model.objective.expression.empty: + raise ValueError( + "No objective has been set on the model. Use `m.add_objective(...)` " + "first (e.g. `m.add_objective(0 * x)` for a pure feasibility problem)." + ) if self.io_api == "direct" or self.solver_model is not None: return self._run_direct(**run_kwargs) if self._problem_fn is not None: @@ -1520,25 +1648,8 @@ def _build_solver_model( names = print_constraints(M.clabels) c.setAttr("ConstrName", names) - if model.variables.sos: - for var_name in model.variables.sos: - var = model.variables.sos[var_name] - sos_type: int = var.attrs[SOS_TYPE_ATTR] # type: ignore[assignment] - sos_dim: str = var.attrs[SOS_DIM_ATTR] # type: ignore[assignment] - - def add_sos(s: xr.DataArray, sos_type: int, sos_dim: str) -> None: - s = s.squeeze() - indices = s.values.flatten().tolist() - weights = s.coords[sos_dim].values.tolist() - gm.addSOS(sos_type, x[indices].tolist(), weights) - - others = [dim for dim in var.labels.dims if dim != sos_dim] - if not others: - add_sos(var.labels, sos_type, sos_dim) - else: - stacked = var.labels.stack(_sos_group=others) - for _, s in stacked.groupby("_sos_group"): - add_sos(s.unstack("_sos_group"), sos_type, sos_dim) + for sos_type, positions, weights in _iter_sos_sets(model): + gm.addSOS(sos_type, x[positions].tolist(), weights) gm.update() return gm @@ -2162,25 +2273,8 @@ def _build_solver_model( if cnames: problem.addnames(xpress_Namespaces.ROW, cnames, 0, len(cnames) - 1) - if model.variables.sos: - for var_name in model.variables.sos: - var = model.variables.sos[var_name] - sos_type: int = var.attrs[SOS_TYPE_ATTR] # type: ignore[assignment] - sos_dim: str = var.attrs[SOS_DIM_ATTR] # type: ignore[assignment] - - def add_sos(s: xr.DataArray, sos_type: int, sos_dim: str) -> None: - s = s.squeeze() - indices = s.values.flatten().tolist() - weights = s.coords[sos_dim].values.tolist() - problem.addSOS(indices, weights, type=sos_type) - - others = [dim for dim in var.labels.dims if dim != sos_dim] - if not others: - add_sos(var.labels, sos_type, sos_dim) - else: - stacked = var.labels.stack(_sos_group=others) - for _, s in stacked.groupby("_sos_group"): - add_sos(s.unstack("_sos_group"), sos_type, sos_dim) + for sos_type, positions, weights in _iter_sos_sets(model): + problem.addSOS(positions, weights, type=sos_type) return problem diff --git a/linopy/sos_reformulation.py b/linopy/sos_reformulation.py index 8ccb7613..0c677216 100644 --- a/linopy/sos_reformulation.py +++ b/linopy/sos_reformulation.py @@ -233,8 +233,10 @@ def reformulate_sos_constraints( 1. If custom big_m was specified in add_sos_constraints(), use that 2. Otherwise, use the variable bounds (tightest valid Big-M) - Note: This permanently mutates the model. To solve with automatic - undo, use ``model.solve(reformulate_sos=True)`` instead. + Note: This permanently mutates the model and returns a token the caller + owns. For a stateful, reversible API use ``model.apply_sos_reformulation()`` + / ``model.undo_sos_reformulation()``; for automatic undo around a single + solve use ``model.solve(reformulate_sos=True)``. Parameters ---------- diff --git a/test/test_piecewise_constraints.py b/test/test_piecewise_constraints.py index 3c91a88e..c44af394 100644 --- a/test/test_piecewise_constraints.py +++ b/test/test_piecewise_constraints.py @@ -41,7 +41,11 @@ SEGMENT_DIM, ) from linopy.piecewise import _slopes_to_points -from linopy.solver_capabilities import SolverFeature, get_available_solvers_with_feature +from linopy.solver_capabilities import ( + SolverFeature, + get_available_solvers_with_feature, + solver_supports, +) if TYPE_CHECKING: from linopy.piecewise import BreaksLike, _PwlInputs @@ -52,6 +56,13 @@ _sos2_solvers = get_available_solvers_with_feature( SolverFeature.SOS_CONSTRAINTS, available_solvers ) +_sos2_direct_solvers = sorted( + s for s in _sos2_solvers if solver_supports(s, SolverFeature.DIRECT_API) +) +_SOS_PATHS = [ + *[pytest.param(s, "direct", id=f"{s}-direct") for s in _sos2_direct_solvers], + *[pytest.param(s, "lp", id=f"{s}-lp") for s in sorted(_sos2_solvers)], +] _any_solvers = [ s for s in ["highs", "gurobi", "glpk", "cplex"] if s in available_solvers ] @@ -2313,23 +2324,37 @@ def test_lp_per_entity_nan_padding( Per-entity NaN-padded breakpoints with method='lp': padded segments must be masked out so they don't create spurious ``y ≤ 0`` constraints (bug-2 regression). - - ``method='sos2'`` would emit a masked SOS lambda variable, which the - native SOS path doesn't yet support (#688) — exercised separately in - :py:meth:`test_sos2_per_entity_nan_padding_errors`. """ m = nan_padded_pwl_model("lp") m.solve() # f_b(10) on chord (5,10)→(15,15) is 12.5 assert abs(float(m.solution.sel({"entity": "b"})["y"]) - 12.5) < 1e-3 - def test_sos2_per_entity_nan_padding_errors( - self, nan_padded_pwl_model: Callable[[Method], Model] + @pytest.mark.skipif(not _SOS_PATHS, reason="No SOS-capable solver installed") + @pytest.mark.parametrize(("solver", "io_api"), _SOS_PATHS) + def test_sos2_per_entity_nan_padding( + self, + nan_padded_pwl_model: Callable[[Method], Model], + solver: str, + io_api: str, ) -> None: - """Masked SOS lambdas hit the #688 guard at solve time.""" + """ + Per-entity NaN-padded breakpoints with method='sos2': the SOS + lambda variable's masked entries must flow through both the + direct API (via label→position resolution) and the LP writer + (via masked-member filtering) so the solve returns the same + answer as ``method='lp'``. Regression for #688. + + Parametrized across every SOS-capable solver × io_api so the + bug surfaces no matter which backend handles the SOS section + (gurobi-lp masked the bug on master by silently dropping + unknown ``x-1`` members; cplex-lp and gurobi-direct surfaced + it as a parse / OOB error). + """ m = nan_padded_pwl_model("sos2") - with pytest.raises(NotImplementedError, match="masked"): - m.solve() + m.solve(solver_name=solver, io_api=io_api) + # f_b(10) on chord (5,10)→(15,15) is 12.5 — same oracle as lp variant + assert abs(float(m.solution.sel({"entity": "b"})["y"]) - 12.5) < 1e-3 def test_lp_rejects_decreasing_x_concave_ge(self) -> None: """ diff --git a/test/test_solvers.py b/test/test_solvers.py index db894137..1b6bd9a9 100644 --- a/test/test_solvers.py +++ b/test/test_solvers.py @@ -23,6 +23,14 @@ from linopy.solvers import _installed_version_in +@pytest.fixture +def lp_only_solver() -> str: + for name in ("glpk", "cbc"): + if name in solvers.available_solvers: + return name + pytest.skip("Need an LP-only solver (glpk or cbc) installed") + + @pytest.fixture def simple_model() -> Model: m = Model(chunk=None) @@ -464,3 +472,95 @@ def test_xpress_gpu_feature_reflects_installed_version() -> None: assert solvers.Xpress.supports( SolverFeature.GPU_ACCELERATION ) == _installed_version_in("xpress", ">=9.8.0") + + +class TestValidateModelOnBuild: + """Solver._build() runs solver-feature checks regardless of entry point.""" + + def test_quadratic_without_qp_support_raises(self, lp_only_solver: str) -> None: + m = Model() + x = m.add_variables(name="x", lower=0, upper=10) + m.add_objective(x * x, sense="min") + + with pytest.raises(ValueError, match="does not support quadratic"): + solvers.Solver.from_name(lp_only_solver, m, io_api="lp") + + def test_semi_continuous_without_support_raises(self, lp_only_solver: str) -> None: + m = Model() + x = m.add_variables(name="x", lower=1, upper=10, semi_continuous=True) + m.add_objective(x) + + with pytest.raises(ValueError, match="does not support semi-continuous"): + solvers.Solver.from_name(lp_only_solver, m, io_api="lp") + + @pytest.mark.skipif( + "highs" not in solvers.available_solvers, reason="HiGHS not installed" + ) + def test_solve_without_objective_raises(self) -> None: + m = Model() + m.add_variables(name="x", lower=0, upper=10) + # No objective added — both entry points should raise the same error. + with pytest.raises(ValueError, match="No objective has been set"): + solvers.Solver.from_name("highs", m, io_api="lp").solve() + with pytest.raises(ValueError, match="No objective has been set"): + m.solve("highs") + + +class TestSolverDoesNotMutateModel: + """Solver.from_model() must not mutate model state (sanitize stays Model-level).""" + + @pytest.mark.skipif( + "highs" not in solvers.available_solvers, reason="HiGHS not installed" + ) + def test_from_model_leaves_constraints_untouched(self) -> None: + m = Model() + x = m.add_variables(name="x", lower=0, upper=10) + # Constraint with a near-zero coefficient — would be sanitized away if + # the Solver path were sanitizing on build. + m.add_constraints(1e-12 * x + x >= 0, name="c") + m.add_objective(x) + + before = m.constraints["c"].coeffs.values.copy() + solvers.Solver.from_name("highs", m, io_api="lp") + after = m.constraints["c"].coeffs.values + + assert np.allclose(before, after, equal_nan=True), ( + "Solver.from_model() must not mutate model constraints. " + "Sanitization is a Model-level primitive; call " + "model.constraints.sanitize_zeros() / .sanitize_infinities() " + "explicitly before building." + ) + + +class TestAssignResultWiring: + """assign_result(result, solver=...) populates model.solver.""" + + @pytest.mark.skipif( + "highs" not in solvers.available_solvers, reason="HiGHS not installed" + ) + def test_assign_result_with_solver_wires_model_solver(self) -> None: + m = Model() + x = m.add_variables(name="x", lower=0, upper=10) + m.add_objective(x, sense="min") + + assert m.solver is None + solver = solvers.Solver.from_name("highs", m, io_api="lp") + result = solver.solve() + m.assign_result(result, solver=solver) + + assert m.solver is solver + assert m.solver_model is solver.solver_model + + @pytest.mark.skipif( + "highs" not in solvers.available_solvers, reason="HiGHS not installed" + ) + def test_assign_result_without_solver_kwarg_leaves_solver_unset(self) -> None: + m = Model() + x = m.add_variables(name="x", lower=0, upper=10) + m.add_objective(x, sense="min") + + solver = solvers.Solver.from_name("highs", m, io_api="lp") + result = solver.solve() + m.assign_result(result) # no solver kwarg + + assert m.solver is None diff --git a/test/test_sos_constraints.py b/test/test_sos_constraints.py index 30b2d767..8160d524 100644 --- a/test/test_sos_constraints.py +++ b/test/test_sos_constraints.py @@ -8,19 +8,6 @@ import xarray as xr from linopy import Model, available_solvers -from linopy.solver_capabilities import ( - SolverFeature, - get_available_solvers_with_feature, - solver_supports, -) - -_direct_sos_solvers = [ - s - for s in get_available_solvers_with_feature( - SolverFeature.SOS_CONSTRAINTS, available_solvers - ) - if solver_supports(s, SolverFeature.DIRECT_API) -] def test_add_sos_constraints_registers_variable() -> None: @@ -209,66 +196,6 @@ def test_qp_sos1_xpress_direct() -> None: assert np.isclose(m.objective.value, -25) -@pytest.fixture -def masked_sos_model() -> Model: - """Tiny model with a single masked SOS1 variable.""" - m = Model() - coords = pd.Index([0, 1, 2, 3], name="i") - mask = pd.Series([True, True, False, True], index=coords) - var = m.add_variables(lower=0, upper=1, coords=[coords], mask=mask, name="sos_var") - m.add_sos_constraints(var, sos_type=1, sos_dim="i") - m.add_objective(-var.sum()) - return m - - -@pytest.mark.parametrize("solver_name", _direct_sos_solvers) -def test_direct_api_raises_on_masked_sos( - solver_name: str, masked_sos_model: Model -) -> None: - with pytest.raises(NotImplementedError, match="masked"): - masked_sos_model.solve(solver_name=solver_name, io_api="direct") - - -def test_lp_writer_raises_on_masked_sos( - masked_sos_model: Model, tmp_path: Path -) -> None: - with pytest.raises(NotImplementedError, match="masked"): - masked_sos_model.to_file(tmp_path / "sos.lp", io_api="lp") - - -@pytest.mark.parametrize( - "solver_name", - [ - pytest.param( - "gurobi", - marks=pytest.mark.skipif( - "gurobi" not in available_solvers, reason="Gurobi not installed" - ), - ), - pytest.param( - "highs", - marks=pytest.mark.skipif( - "highs" not in available_solvers, reason="HiGHS not installed" - ), - ), - ], -) -def test_reformulate_sos_true_solves_masked_sos( - solver_name: str, masked_sos_model: Model -) -> None: - """The documented workaround for the masked-SOS bug actually solves.""" - masked_sos_model.solve(solver_name=solver_name, reformulate_sos=True) - sol = masked_sos_model.variables["sos_var"].solution.values - # SOS1 over 3 unmasked entries, max sum, each in [0, 1]: - # one entry == 1, others == 0, masked stays NaN. - assert masked_sos_model.objective.value is not None - assert np.isclose(masked_sos_model.objective.value, -1.0) - assert np.isnan(sol[2]) - nonzero = np.flatnonzero(~np.isnan(sol) & (sol > 1e-6)) - assert len(nonzero) == 1 - assert np.isclose(sol[nonzero[0]], 1.0) - - @pytest.mark.skipif("gurobi" not in available_solvers, reason="Gurobi not installed") def test_reformulate_sos_true_reformulates_on_native_solver(tmp_path: Path) -> None: """ @@ -316,7 +243,7 @@ def test_unsupported_solver_raises_error() -> None: m.solve(solver_name=solver) -def test_to_highspy_raises_not_implemented() -> None: +def test_to_highspy_raises_when_sos_present() -> None: pytest.importorskip("highspy") m = Model() @@ -324,8 +251,5 @@ def test_to_highspy_raises_not_implemented() -> None: build = m.add_variables(coords=[locations], name="build", binary=True) m.add_sos_constraints(build, sos_type=1, sos_dim="locations") - with pytest.raises( - NotImplementedError, - match="SOS constraints are not supported by the HiGHS direct API", - ): + with pytest.raises(ValueError, match="does not support SOS constraints"): m.to_highspy() diff --git a/test/test_sos_masked.py b/test/test_sos_masked.py new file mode 100644 index 00000000..d816ef1c --- /dev/null +++ b/test/test_sos_masked.py @@ -0,0 +1,313 @@ +""" +Regression coverage for SOS constraints on masked variables (#688). + +The bug being pinned here has two related failure modes: + +1. **Position-vs-label**: direct-API builds (gurobi, xpress) pass linopy variable + labels straight to vendor ``addSOS`` as if they were 0-based column positions + in the active-variable array. They only happen to coincide when no variable + in the model is masked anywhere. + +2. **LP file emits ``x-1``**: the LP writer iterates raw label arrays and emits + names like ``x-1`` for masked SOS entries, which LP parsers either reject + outright or (gurobi LP reader) silently corrupt into wrong SOS sets. + +The fixture asymmetric-coefficient design plus three-layer oracle (status, +objective, element-wise solution) ensures any wrong indexing surfaces as a +visible failure rather than a permutation-equivalent silent pass. +""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import Literal + +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +from linopy import Model, available_solvers +from linopy.solver_capabilities import SolverFeature, solver_supports + +# --------------------------------------------------------------------------- +# Capability-derived solver / io_api parametrization +# --------------------------------------------------------------------------- + +SOS_DIRECT = sorted( + s + for s in available_solvers + if solver_supports(s, SolverFeature.SOS_CONSTRAINTS) + and solver_supports(s, SolverFeature.DIRECT_API) +) +SOS_FILE = sorted( + s for s in available_solvers if solver_supports(s, SolverFeature.SOS_CONSTRAINTS) +) +SOS_PATHS = [ + *[pytest.param(s, "direct", id=f"{s}-direct") for s in SOS_DIRECT], + *[pytest.param(s, "lp", id=f"{s}-lp") for s in SOS_FILE], +] + +# --------------------------------------------------------------------------- +# Analytical optimum (matches solver semantics: list-position adjacency for SOS2) +# --------------------------------------------------------------------------- + + +def _optimize_sos_set( + active_i: list[int], coefs: dict[int, float], sos_type: int +) -> tuple[float, dict[int, float]]: + """ + Closed-form optimum for one SOS set with binary [0,1] members. + + ``active_i`` is the sorted list of active (unmasked) member indices in the + SOS dimension. ``coefs`` maps each active index to its objective coefficient + (minimization). For SOS2, adjacency is list-position adjacency, matching the + semantics of gurobi/xpress ``addSOS``. + """ + if not active_i: + return 0.0, {} + + best_obj = 0.0 + best_sol: dict[int, float] = {} + + # singletons + for i in active_i: + if coefs[i] < best_obj: + best_obj = coefs[i] + best_sol = {i: 1.0} + + if sos_type == 2: + # adjacent pairs in the (sorted-by-weight) list + for k in range(len(active_i) - 1): + i1, i2 = active_i[k], active_i[k + 1] + pair_obj = coefs[i1] + coefs[i2] + if pair_obj < best_obj: + best_obj = pair_obj + best_sol = {i1: 1.0, i2: 1.0} + + return best_obj, best_sol + + +# --------------------------------------------------------------------------- +# Fixture +# --------------------------------------------------------------------------- + +MaskOnSos = Literal[None, "sos_dim", "non_sos_dim", "both_dims"] + + +@pytest.fixture +def sos_masked_model() -> Callable[..., tuple[Model, float, np.ndarray]]: # noqa: E501 + """ + Factory producing SOS{1,2} models with controllable mask placement. + + Objective coefficients along the SOS dim are ``[-1, -2, -3, -4]``, + asymmetric to break permutation symmetry — wrong indexing then produces an + observably different objective AND solution. + + Returns ``(model, expected_obj, expected_sol)``. ``expected_sol`` is shaped + like ``sos_var.solution`` (with ``NaN`` where the mask removes a slot). + """ + + def _build( + sos_type: Literal[1, 2] = 1, + sos_var_2d: bool = False, + mask_on_sos: MaskOnSos = None, + mask_on_other: bool = False, + ) -> tuple[Model, float, np.ndarray]: + if not sos_var_2d and mask_on_sos in ("non_sos_dim", "both_dims"): + raise ValueError(f"mask_on_sos={mask_on_sos!r} requires sos_var_2d=True") + + m = Model() + + # Optional unrelated masked variable: shifts label->position mapping + # for all subsequent variables, exposing the position-vs-label bug. + if mask_on_other: + ck = pd.Index([0, 1, 2, 3], name="k") + m.add_variables( + lower=0, + upper=1, + coords=[ck], + mask=pd.Series([False, True, True, True], index=ck), + name="other", + ) + + ci = pd.Index([0, 1, 2, 3], name="i") + cj = pd.Index([0, 1], name="j") + + # Construct sos_var mask + if mask_on_sos is None: + sos_mask = None + elif mask_on_sos == "sos_dim": + mask_i = np.array([True, True, False, True]) + if sos_var_2d: + sos_mask = xr.DataArray( + np.broadcast_to(mask_i[:, None], (4, 2)).copy(), + coords=[ci, cj], + dims=["i", "j"], + ) + else: + sos_mask = pd.Series(mask_i, index=ci) + elif mask_on_sos == "non_sos_dim": + assert sos_var_2d + mask_j = np.array([False, True]) + sos_mask = xr.DataArray( + np.broadcast_to(mask_j[None, :], (4, 2)).copy(), + coords=[ci, cj], + dims=["i", "j"], + ) + elif mask_on_sos == "both_dims": + assert sos_var_2d + mask_i = np.array([True, True, False, True]) + mask_j = np.array([False, True]) + combined = mask_i[:, None] & mask_j[None, :] + sos_mask = xr.DataArray(combined, coords=[ci, cj], dims=["i", "j"]) + else: + raise ValueError(f"unknown mask_on_sos={mask_on_sos!r}") + + sos_coords = [ci, cj] if sos_var_2d else [ci] + sos_var = m.add_variables( + lower=0, + upper=1, + coords=sos_coords, + mask=sos_mask, + name="sos_var", + ) + m.add_sos_constraints(sos_var, sos_type=sos_type, sos_dim="i") + + # Asymmetric coefficients along the SOS dim; broadcast across j in 2D + coefs_i = np.array([-1.0, -2.0, -3.0, -4.0]) + if sos_var_2d: + coefs = xr.DataArray( + np.broadcast_to(coefs_i[:, None], (4, 2)).copy(), + coords=[ci, cj], + dims=["i", "j"], + ) + else: + coefs = xr.DataArray(coefs_i, coords=[ci], dims=["i"]) + m.add_objective(sos_var * coefs) + + # ------------------------------------------------------------------ + # Compute expected_obj and expected_sol from the same mask logic + # ------------------------------------------------------------------ + coefs_dict = {i: float(coefs_i[i]) for i in range(4)} + + # active_per_j[j] = sorted list of active i for SOS set at j (or for + # the single 1D set we use j=None as a sentinel) + if sos_var_2d: + # Reconstruct the 2D mask (default to all True if none) + if sos_mask is None: + mask_arr = np.ones((4, 2), dtype=bool) + else: + mask_arr = np.asarray(sos_mask.values, dtype=bool) + active_per_j = { + j: [i for i in range(4) if mask_arr[i, j]] for j in range(2) + } + else: + if sos_mask is None: + active = list(range(4)) + else: + active = [i for i in range(4) if bool(sos_mask.iloc[i])] + active_per_j = {None: active} + + expected_obj = 0.0 + # Build expected_sol with the right shape and NaN-fill masked slots + if sos_var_2d: + expected_sol = np.full((4, 2), 0.0) + if sos_mask is not None: + mask_arr = np.asarray(sos_mask.values, dtype=bool) + expected_sol[~mask_arr] = np.nan + else: + expected_sol = np.full(4, 0.0) + if sos_mask is not None: + for i in range(4): + if not bool(sos_mask.iloc[i]): + expected_sol[i] = np.nan + + for j_key, active in active_per_j.items(): + obj_j, sol_j = _optimize_sos_set(active, coefs_dict, sos_type) + expected_obj += obj_j + for i, value in sol_j.items(): + if sos_var_2d: + expected_sol[i, j_key] = value + else: + expected_sol[i] = value + + return m, expected_obj, expected_sol + + return _build + + +# --------------------------------------------------------------------------- +# Test matrix: 11 fixture configs × 2 SOS types × (solver, io_api) +# --------------------------------------------------------------------------- + +# Each entry: (sos_var_2d, mask_on_sos, mask_on_other) +FIXTURE_CONFIGS = [ + pytest.param(False, None, False, id="1d-no_mask"), + pytest.param(False, "sos_dim", False, id="1d-mask_sos"), + pytest.param(False, None, True, id="1d-mask_other"), + pytest.param(False, "sos_dim", True, id="1d-mask_both"), + pytest.param(True, None, False, id="2d-no_mask"), + pytest.param(True, "sos_dim", False, id="2d-mask_sos_dim"), + pytest.param(True, "non_sos_dim", False, id="2d-mask_non_sos_dim"), + pytest.param(True, "both_dims", False, id="2d-mask_both_dims"), + pytest.param(True, "sos_dim", True, id="2d-mask_sos_dim+other"), + pytest.param(True, "non_sos_dim", True, id="2d-mask_non_sos_dim+other"), + pytest.param(True, "both_dims", True, id="2d-mask_both_dims+other"), +] + + +@pytest.mark.skipif(not SOS_PATHS, reason="No SOS-capable solver installed") +@pytest.mark.parametrize("sos_type", [1, 2]) +@pytest.mark.parametrize(("solver", "io_api"), SOS_PATHS) +@pytest.mark.parametrize( + ("sos_var_2d", "mask_on_sos", "mask_on_other"), FIXTURE_CONFIGS +) +def test_sos_with_masked_variables( + sos_masked_model: Callable[..., tuple[Model, float, np.ndarray]], + solver: str, + io_api: str, + sos_type: int, + sos_var_2d: bool, + mask_on_sos: MaskOnSos, + mask_on_other: bool, +) -> None: + """ + Three-oracle test: status + objective + element-wise solution. + + Asymmetric objective + element-wise solution check ensures we catch: + - direct-path OOB raises (status != ok) + - LP parser rejections (status != ok) + - silent SOS-set corruption (objective and/or solution differ) + """ + m, expected_obj, expected_sol = sos_masked_model( + sos_type=sos_type, + sos_var_2d=sos_var_2d, + mask_on_sos=mask_on_sos, + mask_on_other=mask_on_other, + ) + m.solve(solver_name=solver, io_api=io_api) + + # Oracle 1: did the solve succeed? + assert m.status == "ok", ( + f"solver={solver} io_api={io_api} status={m.status!r} " + f"termination={m.termination_condition!r}" + ) + + # Oracle 2: is the objective at the analytical optimum? + assert m.objective.value is not None + assert m.objective.value == pytest.approx(expected_obj, abs=1e-5) + + # Oracle 3: are the right slots at the right values? + actual_sol = m.variables["sos_var"].solution.values + np.testing.assert_allclose( + actual_sol, + expected_sol, + atol=1e-5, + equal_nan=True, + err_msg=( + f"sos_var.solution mismatch for solver={solver} io_api={io_api} " + f"sos_type={sos_type} sos_var_2d={sos_var_2d} " + f"mask_on_sos={mask_on_sos!r} mask_on_other={mask_on_other}" + ), + ) diff --git a/test/test_sos_reformulation.py b/test/test_sos_reformulation.py index 24ba62b3..6b68d2d3 100644 --- a/test/test_sos_reformulation.py +++ b/test/test_sos_reformulation.py @@ -3,6 +3,8 @@ from __future__ import annotations import logging +from collections.abc import Callable +from pathlib import Path import numpy as np import pandas as pd @@ -312,10 +314,210 @@ def test_reformulate_inplace(self) -> None: assert "_sos_reform_x_y" in m.variables +class TestApplyUndoSOSReformulation: + """Tests for Model.apply_sos_reformulation / undo_sos_reformulation.""" + + @staticmethod + def _build_sos1_model() -> Model: + m = Model() + idx = pd.Index([0, 1, 2], name="i") + x = m.add_variables(lower=0, upper=1, coords=[idx], name="x") + m.add_sos_constraints(x, sos_type=1, sos_dim="i") + return m + + def test_apply_stashes_state(self) -> None: + m = self._build_sos1_model() + assert m._sos_reformulation_state is None + + m.apply_sos_reformulation() + + assert m._sos_reformulation_state is not None + assert m._sos_reformulation_state.reformulated == ["x"] + assert len(list(m.variables.sos)) == 0 + assert "_sos_reform_x_y" in m.variables + + def test_undo_restores_and_clears_state(self) -> None: + m = self._build_sos1_model() + m.apply_sos_reformulation() + + m.undo_sos_reformulation() + + assert m._sos_reformulation_state is None + assert list(m.variables.sos) == ["x"] + assert "_sos_reform_x_y" not in m.variables + + def test_double_apply_raises(self) -> None: + m = self._build_sos1_model() + m.apply_sos_reformulation() + + with pytest.raises(RuntimeError, match="already been applied"): + m.apply_sos_reformulation() + + def test_undo_without_apply_raises(self) -> None: + m = self._build_sos1_model() + + with pytest.raises(RuntimeError, match="No SOS reformulation"): + m.undo_sos_reformulation() + + @pytest.mark.parametrize( + "copy_fn", + [ + pytest.param(lambda m: m.copy(), id="model.copy()"), + pytest.param(lambda m: __import__("copy").copy(m), id="copy.copy(model)"), + pytest.param( + lambda m: __import__("copy").deepcopy(m), id="copy.deepcopy(model)" + ), + ], + ) + def test_copy_persists_state_and_undo_works_on_copy( + self, copy_fn: Callable[[Model], Model] + ) -> None: + m = self._build_sos1_model() + m.apply_sos_reformulation() + + c = copy_fn(m) + + # State is carried over but is an independent object + assert c._sos_reformulation_state is not None + assert c._sos_reformulation_state is not m._sos_reformulation_state + # Aux vars/cons exist on the copy (they were copied as part of the + # reformulated model state) + assert "_sos_reform_x_y" in c.variables + assert "_sos_reform_x_upper" in c.constraints + assert "_sos_reform_x_card" in c.constraints + # SOS attrs are not on the copy's "x" yet (still in reformulated form) + assert "x" not in list(c.variables.sos) + + # Undo on the copy fully restores the original SOS form + c.undo_sos_reformulation() + assert c._sos_reformulation_state is None + assert list(c.variables.sos) == ["x"] + assert "_sos_reform_x_y" not in c.variables + assert "_sos_reform_x_upper" not in c.constraints + assert "_sos_reform_x_card" not in c.constraints + + # Original is entirely unaffected + assert m._sos_reformulation_state is not None + assert "_sos_reform_x_y" in m.variables + assert len(list(m.variables.sos)) == 0 + + def test_to_netcdf_raises_when_state_active(self, tmp_path: Path) -> None: + m = self._build_sos1_model() + m.apply_sos_reformulation() + + with pytest.raises(RuntimeError, match="active SOS reformulation"): + m.to_netcdf(tmp_path / "m.nc") + + def test_to_netcdf_works_after_undo(self, tmp_path: Path) -> None: + m = self._build_sos1_model() + m.apply_sos_reformulation() + m.undo_sos_reformulation() + + m.to_netcdf(tmp_path / "m.nc") # should not raise + + +@pytest.mark.skipif("highs" not in available_solvers, reason="HiGHS not installed") +class TestSolverPathSOSCheck: + """Solver._build() must raise on SOS-bearing model with non-SOS solver.""" + + def test_solver_from_name_raises_without_reformulation(self) -> None: + from linopy import solvers + + m = Model() + idx = pd.Index([0, 1, 2], name="i") + x = m.add_variables(lower=0, upper=1, coords=[idx], name="x") + m.add_sos_constraints(x, sos_type=1, sos_dim="i") + m.add_objective(x.sum(), sense="max") + + with pytest.raises(ValueError, match="does not support SOS"): + solvers.Solver.from_name("highs", m, io_api="lp") + + +@pytest.mark.skipif("highs" not in available_solvers, reason="HiGHS not installed") +class TestSolveAutoUndoOnFailure: + """Model.solve must auto-undo SOS reformulation when build/solve raises.""" + + def test_state_restored_when_build_raises( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + from linopy import solvers + + m = Model() + idx = pd.Index([0, 1, 2], name="i") + x = m.add_variables(lower=0, upper=1, coords=[idx], name="x") + m.add_sos_constraints(x, sos_type=1, sos_dim="i") + m.add_objective(x.sum(), sense="max") + + def boom(*args: object, **kwargs: object) -> None: + raise RuntimeError("simulated build failure") + + monkeypatch.setattr(solvers.Solver, "from_name", boom) + + with pytest.raises(RuntimeError, match="simulated build failure"): + m.solve(solver_name="highs", reformulate_sos=True) + + assert m._sos_reformulation_state is None + assert list(m.variables.sos) == ["x"] + assert "_sos_reform_x_y" not in m.variables + + # A subsequent real solve must not hit "already applied" + monkeypatch.undo() + m.solve(solver_name="highs", reformulate_sos=True) + + @pytest.mark.skipif("highs" not in available_solvers, reason="HiGHS not installed") class TestSolveWithReformulation: """Tests for solving with SOS reformulation.""" + @pytest.mark.parametrize( + "solver_name", + [ + pytest.param( + "gurobi", + marks=pytest.mark.skipif( + "gurobi" not in available_solvers, reason="Gurobi not installed" + ), + ), + pytest.param( + "highs", + marks=pytest.mark.skipif( + "highs" not in available_solvers, reason="HiGHS not installed" + ), + ), + ], + ) + def test_reformulate_handles_masked_sos_variables(self, solver_name: str) -> None: + """ + ``reformulate_sos=True`` must handle SOS variables with masked entries. + + Exercises the reformulation pipeline (``apply_sos_reformulation`` → + binary + linking constraints → solve → ``undo``) on a model whose SOS + variable has a masked slot. Parametrized to cover both the native-SOS + case (gurobi: reformulation runs anyway under ``reformulate_sos=True``, + per #689) and the no-native-SOS case (highs: reformulation is the only + way to solve). + """ + m = Model() + coords = pd.Index([0, 1, 2, 3], name="i") + mask = pd.Series([True, True, False, True], index=coords) + var = m.add_variables( + lower=0, upper=1, coords=[coords], mask=mask, name="sos_var" + ) + m.add_sos_constraints(var, sos_type=1, sos_dim="i") + m.add_objective(-var.sum()) + + m.solve(solver_name=solver_name, reformulate_sos=True) + + sol = m.variables["sos_var"].solution.values + # SOS1 over 3 unmasked entries, all in [0, 1], obj = -sum: + # one slot at 1, others at 0, masked stays NaN. + assert m.objective.value is not None + assert np.isclose(m.objective.value, -1.0) + assert np.isnan(sol[2]) + nonzero = np.flatnonzero(~np.isnan(sol) & (sol > 1e-6)) + assert len(nonzero) == 1 + assert np.isclose(sol[nonzero[0]], 1.0) + def test_sos1_maximize_with_highs(self) -> None: """Test SOS1 maximize problem with HiGHS using reformulation.""" m = Model()