diff --git a/README.md b/README.md index ccd4bcaf..eb08028d 100644 --- a/README.md +++ b/README.md @@ -430,6 +430,23 @@ write_vrt('mosaic.vrt', ['tile1.tif', 'tile2.tif']) # generate VRT ------- +### **Multi-Criteria Decision Analysis (MCDA)** + +| Name | Description | Source | NumPy xr.DataArray | Dask xr.DataArray | CuPy GPU xr.DataArray | Dask GPU xr.DataArray | +|:----------:|:------------|:------:|:----------------------:|:--------------------:|:-------------------:|:------:| +| [Standardize](xrspatial/mcda/standardize.py) | Converts criterion rasters to 0-1 suitability scale (linear, sigmoidal, gaussian, triangular, piecewise, categorical) | Standard | ✅️ | ✅️ | ✅️ | ✅️ | +| [AHP Weights](xrspatial/mcda/weights.py) | Derives criterion weights from pairwise comparisons using the Saaty eigenvector method with consistency ratio | Saaty 1980 | ✅️ | ✅️ | ✅️ | ✅️ | +| [Rank Weights](xrspatial/mcda/weights.py) | Derives weights from a rank ordering (ROC, rank sum, reciprocal) | Standard | ✅️ | ✅️ | ✅️ | ✅️ | +| [WLC](xrspatial/mcda/combine.py) | Weighted Linear Combination (fully compensatory weighted sum) | Malczewski 2006 | ✅️ | ✅️ | ✅️ | ✅️ | +| [WPM](xrspatial/mcda/combine.py) | Weighted Product Model (multiplicative, penalizes low scores) | Standard | ✅️ | ✅️ | ✅️ | ✅️ | +| [OWA](xrspatial/mcda/combine.py) | Ordered Weighted Averaging with tunable risk attitude | Yager 1988 | ✅️ | ✅️ | ✅️ | ✅️ | +| [Fuzzy Overlay](xrspatial/mcda/combine.py) | Combines criteria using fuzzy set operators (AND, OR, sum, product, gamma) | Eastman 1999 | ✅️ | ✅️ | ✅️ | ✅️ | +| [Boolean Overlay](xrspatial/mcda/combine.py) | Combines binary criterion masks using AND/OR logic | Standard | ✅️ | ✅️ | ✅️ | ✅️ | +| [Constrain](xrspatial/mcda/constrain.py) | Masks exclusion zones from a suitability surface | Standard | ✅️ | ✅️ | ✅️ | ✅️ | +| [Sensitivity](xrspatial/mcda/sensitivity.py) | Assesses weight stability via one-at-a-time or Monte Carlo perturbation | Standard | ✅️ | ✅️ | ✅️ | ✅️ | + +------- + ### **Pathfinding** | Name | Description | Source | NumPy xr.DataArray | Dask xr.DataArray | CuPy GPU xr.DataArray | Dask GPU xr.DataArray | diff --git a/docs/source/reference/index.rst b/docs/source/reference/index.rst index 9de20872..792cb0b7 100644 --- a/docs/source/reference/index.rst +++ b/docs/source/reference/index.rst @@ -15,6 +15,7 @@ Reference focal hydrology interpolation + mcda morphology multispectral pathfinding diff --git a/docs/source/reference/mcda.rst b/docs/source/reference/mcda.rst new file mode 100644 index 00000000..77bfbcef --- /dev/null +++ b/docs/source/reference/mcda.rst @@ -0,0 +1,45 @@ +.. _reference.mcda: + +********************************************* +Multi-Criteria Decision Analysis (MCDA) +********************************************* + +Standardize +=========== +.. autosummary:: + :toctree: _autosummary + + xrspatial.mcda.standardize.standardize + +Weights +======= +.. autosummary:: + :toctree: _autosummary + + xrspatial.mcda.weights.ahp_weights + xrspatial.mcda.weights.rank_weights + +Combination +=========== +.. autosummary:: + :toctree: _autosummary + + xrspatial.mcda.combine.wlc + xrspatial.mcda.combine.wpm + xrspatial.mcda.combine.owa + xrspatial.mcda.combine.fuzzy_overlay + xrspatial.mcda.combine.boolean_overlay + +Constraints +=========== +.. autosummary:: + :toctree: _autosummary + + xrspatial.mcda.constrain.constrain + +Sensitivity +=========== +.. autosummary:: + :toctree: _autosummary + + xrspatial.mcda.sensitivity.sensitivity diff --git a/examples/user_guide/35_MCDA.ipynb b/examples/user_guide/35_MCDA.ipynb new file mode 100644 index 00000000..99bf2f55 --- /dev/null +++ b/examples/user_guide/35_MCDA.ipynb @@ -0,0 +1,514 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Multi-Criteria Decision Analysis (MCDA)\n", + "\n", + "MCDA on rasters treats each pixel as a decision alternative, each raster layer as a criterion,\n", + "and produces a single composite surface where every pixel gets a score.\n", + "Common applications: site suitability, hazard mapping, conservation prioritization.\n", + "\n", + "The workflow has four stages:\n", + "\n", + "```\n", + "raw criteria -> standardize (0-1) -> weight -> combine -> suitability surface\n", + " |\n", + " constraints -> mask exclusion zones\n", + " |\n", + " sensitivity -> stability check\n", + "```\n", + "\n", + "This notebook walks through each stage using synthetic terrain data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline\n", + "import numpy as np\n", + "import xarray as xr\n", + "import matplotlib.pyplot as plt\n", + "\n", + "from xrspatial import generate_terrain, slope, aspect\n", + "from xrspatial.proximity import proximity\n", + "from xrspatial.mcda import (\n", + " standardize, ahp_weights, rank_weights,\n", + " wlc, wpm, owa, fuzzy_overlay, boolean_overlay,\n", + " constrain, sensitivity,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Generate synthetic criterion layers\n", + "\n", + "We'll create a terrain surface and derive slope, aspect, and a\n", + "synthetic \"distance to road\" layer from it." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "canvas = xr.DataArray(np.zeros((300, 300)), dims=['y', 'x'])\n", + "terrain = generate_terrain(canvas=canvas)\n", + "\n", + "slp = slope(terrain)\n", + "asp = aspect(terrain)\n", + "\n", + "# Synthetic distance-to-road: place a few \"road\" pixels and compute proximity\n", + "rng = np.random.default_rng(1030)\n", + "road_mask = np.zeros((300, 300), dtype=np.float64)\n", + "road_mask[150, :] = 1 # horizontal road\n", + "road_mask[:, 100] = 1 # vertical road\n", + "road_raster = xr.DataArray(road_mask, dims=['y', 'x'])\n", + "dist_road = proximity(road_raster)\n", + "\n", + "# Water mask for constraints\n", + "water = terrain < float(np.nanpercentile(terrain.values, 10))\n", + "\n", + "fig, axes = plt.subplots(2, 2, figsize=(12, 10))\n", + "terrain.plot.imshow(ax=axes[0, 0], cmap='terrain', add_colorbar=True)\n", + "axes[0, 0].set_title('Elevation')\n", + "axes[0, 0].set_axis_off()\n", + "\n", + "slp.plot.imshow(ax=axes[0, 1], cmap='YlOrRd', add_colorbar=True)\n", + "axes[0, 1].set_title('Slope (degrees)')\n", + "axes[0, 1].set_axis_off()\n", + "\n", + "dist_road.plot.imshow(ax=axes[1, 0], cmap='Blues_r', add_colorbar=True)\n", + "axes[1, 0].set_title('Distance to road')\n", + "axes[1, 0].set_axis_off()\n", + "\n", + "water.plot.imshow(ax=axes[1, 1], cmap='Blues', add_colorbar=False)\n", + "axes[1, 1].set_title('Water mask (exclusion zone)')\n", + "axes[1, 1].set_axis_off()\n", + "\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Stage 1: Standardize criteria to 0-1\n", + "\n", + "Each criterion uses a different value function depending on how\n", + "its raw values relate to suitability:\n", + "\n", + "- **Slope**: lower is better (linear)\n", + "- **Distance to road**: closer is better (sigmoidal decay)\n", + "- **Aspect**: south-facing (180 deg) is ideal (gaussian)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "slope_std = standardize(slp, method='linear',\n", + " direction='lower_is_better', bounds=(0, 45))\n", + "\n", + "dist_std = standardize(dist_road, method='sigmoidal',\n", + " midpoint=80, spread=-0.05)\n", + "\n", + "aspect_std = standardize(asp, method='gaussian',\n", + " mean=180, std=60)\n", + "\n", + "fig, axes = plt.subplots(1, 3, figsize=(16, 4))\n", + "for ax, data, title in zip(axes,\n", + " [slope_std, dist_std, aspect_std],\n", + " ['Slope (linear, lower better)',\n", + " 'Distance (sigmoidal decay)',\n", + " 'Aspect (gaussian, south ideal)']):\n", + " data.plot.imshow(ax=ax, cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + " ax.set_title(title)\n", + " ax.set_axis_off()\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Other standardization methods\n", + "\n", + "The `standardize` function supports six methods. Here are the\n", + "remaining three applied to elevation:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "elev_min = float(np.nanmin(terrain.values))\n", + "elev_max = float(np.nanmax(terrain.values))\n", + "elev_mid = (elev_min + elev_max) / 2\n", + "\n", + "tri_std = standardize(terrain, method='triangular',\n", + " low=elev_min, peak=elev_mid, high=elev_max)\n", + "\n", + "pw_std = standardize(terrain, method='piecewise',\n", + " breakpoints=[elev_min, elev_mid * 0.5,\n", + " elev_mid, elev_mid * 1.5, elev_max],\n", + " values=[0.0, 0.3, 1.0, 0.6, 0.0])\n", + "\n", + "# Categorical: bin elevation into 4 classes\n", + "bins = np.linspace(elev_min, elev_max, 5)\n", + "elev_class = np.digitize(terrain.values, bins).astype(np.float64)\n", + "elev_class_da = xr.DataArray(elev_class, dims=['y', 'x'])\n", + "cat_std = standardize(elev_class_da, method='categorical',\n", + " mapping={1: 0.9, 2: 0.7, 3: 0.3, 4: 0.1, 5: 0.0})\n", + "\n", + "fig, axes = plt.subplots(1, 3, figsize=(16, 4))\n", + "for ax, data, title in zip(axes,\n", + " [tri_std, pw_std, cat_std],\n", + " ['Triangular', 'Piecewise', 'Categorical']):\n", + " data.plot.imshow(ax=ax, cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + " ax.set_title(title)\n", + " ax.set_axis_off()\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Stage 2: Derive weights\n", + "\n", + "### AHP (Analytical Hierarchy Process)\n", + "\n", + "Provide pairwise comparisons on a 1-9 scale. AHP computes weights\n", + "via the principal eigenvector and flags inconsistent judgments\n", + "(consistency ratio > 0.10)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "weights_ahp, consistency = ahp_weights(\n", + " criteria=['slope', 'distance', 'aspect'],\n", + " comparisons={\n", + " ('slope', 'distance'): 3, # slope 3x more important\n", + " ('slope', 'aspect'): 5, # slope 5x more important\n", + " ('distance', 'aspect'): 2, # distance 2x more important\n", + " }\n", + ")\n", + "\n", + "print('AHP weights:')\n", + "for k, v in weights_ahp.items():\n", + " print(f' {k}: {v:.3f}')\n", + "print(f'\\nConsistency ratio: {consistency.ratio:.4f}')\n", + "print(f'Consistent (CR < 0.10): {consistency.is_consistent}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Rank-order weights\n", + "\n", + "When you can rank criteria but can't quantify pairwise comparisons,\n", + "use `rank_weights` with ROC (rank-order centroid), rank sum, or\n", + "reciprocal methods." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for method in ['roc', 'rs', 'rr']:\n", + " w = rank_weights(['slope', 'distance', 'aspect'], method=method)\n", + " print(f'{method}: ' + ', '.join(f'{k}={v:.3f}' for k, v in w.items()))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Stage 3: Combine layers\n", + "\n", + "Bundle standardized criteria into an `xr.Dataset` and apply\n", + "different combination methods." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "criteria = xr.Dataset({\n", + " 'slope': slope_std,\n", + " 'distance': dist_std,\n", + " 'aspect': aspect_std,\n", + "})\n", + "\n", + "# Use the AHP-derived weights\n", + "suit_wlc = wlc(criteria, weights_ahp)\n", + "suit_wpm = wpm(criteria, weights_ahp)\n", + "\n", + "fig, axes = plt.subplots(1, 2, figsize=(12, 5))\n", + "suit_wlc.plot.imshow(ax=axes[0], cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + "axes[0].set_title('WLC (Weighted Linear Combination)')\n", + "axes[0].set_axis_off()\n", + "\n", + "suit_wpm.plot.imshow(ax=axes[1], cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + "axes[1].set_title('WPM (Weighted Product Model)')\n", + "axes[1].set_axis_off()\n", + "\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### OWA: risk attitude via order weights\n", + "\n", + "OWA adds a second set of weights applied by rank position at each pixel.\n", + "Weighting the lowest-scoring criterion produces conservative (AND-like)\n", + "results; weighting the highest produces optimistic (OR-like) results." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Conservative: worst criterion matters most\n", + "suit_conservative = owa(criteria, weights_ahp,\n", + " order_weights=[0.1, 0.3, 0.6])\n", + "\n", + "# Optimistic: best criterion matters most\n", + "suit_optimistic = owa(criteria, weights_ahp,\n", + " order_weights=[0.6, 0.3, 0.1])\n", + "\n", + "fig, axes = plt.subplots(1, 2, figsize=(12, 5))\n", + "suit_conservative.plot.imshow(ax=axes[0], cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + "axes[0].set_title('OWA conservative (risk-averse)')\n", + "axes[0].set_axis_off()\n", + "\n", + "suit_optimistic.plot.imshow(ax=axes[1], cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + "axes[1].set_title('OWA optimistic (risk-tolerant)')\n", + "axes[1].set_axis_off()\n", + "\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Fuzzy overlay\n", + "\n", + "No explicit weights. Combine using fuzzy set operators." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fig, axes = plt.subplots(2, 3, figsize=(16, 9))\n", + "\n", + "for ax, op, title in zip(axes.flat,\n", + " ['and', 'or', 'product', 'sum', 'gamma'],\n", + " ['AND (min)', 'OR (max)', 'Product',\n", + " 'Algebraic Sum', 'Gamma (0.9)']):\n", + " kw = {'gamma': 0.9} if op == 'gamma' else {}\n", + " result = fuzzy_overlay(criteria, operator=op, **kw)\n", + " result.plot.imshow(ax=ax, cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + " ax.set_title(title)\n", + " ax.set_axis_off()\n", + "\n", + "axes[1, 2].set_visible(False)\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Boolean overlay\n", + "\n", + "Binary suitability from hard thresholds." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "suitable = boolean_overlay({\n", + " 'gentle_slope': slp < 15,\n", + " 'near_road': dist_road < 80,\n", + " 'not_water': ~water,\n", + "}, operator='and')\n", + "\n", + "fig, ax = plt.subplots(figsize=(6, 5))\n", + "suitable.astype(float).plot.imshow(ax=ax, cmap='Greens',\n", + " add_colorbar=False)\n", + "ax.set_title('Boolean overlay (slope<15 AND near road AND not water)')\n", + "ax.set_axis_off()\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Stage 4: Constraints and sensitivity\n", + "\n", + "### Apply constraints\n", + "\n", + "Mask out water areas from the WLC suitability surface." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "suit_constrained = constrain(suit_wlc, exclude=[water])\n", + "\n", + "fig, axes = plt.subplots(1, 2, figsize=(12, 5))\n", + "suit_wlc.plot.imshow(ax=axes[0], cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + "axes[0].set_title('Before constraints')\n", + "axes[0].set_axis_off()\n", + "\n", + "suit_constrained.plot.imshow(ax=axes[1], cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + "axes[1].set_title('After masking water')\n", + "axes[1].set_axis_off()\n", + "\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Sensitivity analysis\n", + "\n", + "Check whether the results change when weights shift by a few percent.\n", + "High sensitivity means the output depends more on weight choices\n", + "than on spatial data." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# One-at-a-time: how much does each criterion's weight perturbation affect scores?\n", + "sens_oat = sensitivity(criteria, weights_ahp,\n", + " method='one_at_a_time', delta=0.05)\n", + "\n", + "fig, axes = plt.subplots(1, 3, figsize=(16, 4))\n", + "for ax, var in zip(axes, sens_oat.data_vars):\n", + " sens_oat[var].plot.imshow(ax=ax, cmap='YlOrRd', add_colorbar=True)\n", + " ax.set_title(f'Sensitivity: {var}')\n", + " ax.set_axis_off()\n", + "plt.tight_layout()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Monte Carlo: overall stability across random weight vectors\n", + "sens_mc = sensitivity(criteria, weights_ahp,\n", + " method='monte_carlo', n_samples=200)\n", + "\n", + "fig, ax = plt.subplots(figsize=(6, 5))\n", + "sens_mc.plot.imshow(ax=ax, cmap='YlOrRd', add_colorbar=True,\n", + " cbar_kwargs={'label': 'Coefficient of variation'})\n", + "ax.set_title('Monte Carlo sensitivity (200 samples)')\n", + "ax.set_axis_off()\n", + "plt.tight_layout()\n", + "\n", + "print(f'Mean CV: {float(sens_mc.mean()):.4f}')\n", + "print(f'Max CV: {float(sens_mc.max()):.4f}')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Comparison of combination methods\n", + "\n", + "Side-by-side view of all methods on the same criteria and weights." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results = {\n", + " 'WLC': suit_wlc,\n", + " 'WPM': suit_wpm,\n", + " 'OWA (conservative)': suit_conservative,\n", + " 'OWA (optimistic)': suit_optimistic,\n", + " 'Fuzzy AND': fuzzy_overlay(criteria, operator='and'),\n", + " 'Fuzzy Gamma 0.9': fuzzy_overlay(criteria, operator='gamma', gamma=0.9),\n", + "}\n", + "\n", + "fig, axes = plt.subplots(2, 3, figsize=(16, 9))\n", + "for ax, (title, data) in zip(axes.flat, results.items()):\n", + " data.plot.imshow(ax=ax, cmap='RdYlGn', vmin=0, vmax=1,\n", + " add_colorbar=True)\n", + " ax.set_title(title)\n", + " ax.set_axis_off()\n", + "plt.tight_layout()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.10.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/xrspatial/__init__.py b/xrspatial/__init__.py index e34f97f8..0edf4e73 100644 --- a/xrspatial/__init__.py +++ b/xrspatial/__init__.py @@ -125,6 +125,7 @@ from xrspatial.reproject import merge # noqa from xrspatial.reproject import reproject # noqa +import xrspatial.mcda # noqa: F401 — exposes xrspatial.mcda subpackage import xrspatial.accessor # noqa: F401 — registers .xrs accessors diff --git a/xrspatial/mcda/__init__.py b/xrspatial/mcda/__init__.py new file mode 100644 index 00000000..73912c5e --- /dev/null +++ b/xrspatial/mcda/__init__.py @@ -0,0 +1,34 @@ +"""Multi-Criteria Decision Analysis (MCDA) for raster suitability modeling. + +Provides four stages of spatial MCDA: + +1. **Standardize** -- convert criterion layers to a common 0-1 scale +2. **Weight** -- derive criterion weights (AHP, rank-order, or direct) +3. **Combine** -- merge layers into a composite suitability surface +4. **Validate** -- constrain exclusion zones and check weight stability +""" + +from xrspatial.mcda.standardize import standardize +from xrspatial.mcda.weights import ahp_weights, rank_weights +from xrspatial.mcda.combine import ( + boolean_overlay, + fuzzy_overlay, + owa, + wlc, + wpm, +) +from xrspatial.mcda.constrain import constrain +from xrspatial.mcda.sensitivity import sensitivity + +__all__ = [ + "standardize", + "ahp_weights", + "rank_weights", + "wlc", + "wpm", + "owa", + "fuzzy_overlay", + "boolean_overlay", + "constrain", + "sensitivity", +] diff --git a/xrspatial/mcda/combine.py b/xrspatial/mcda/combine.py new file mode 100644 index 00000000..52fef03a --- /dev/null +++ b/xrspatial/mcda/combine.py @@ -0,0 +1,332 @@ +"""Combination methods for MCDA criterion layers. + +All functions take an ``xr.Dataset`` of standardized (0-1) criterion +layers and return a single composite ``xr.DataArray``. +""" + +from __future__ import annotations + +import numpy as np +import xarray as xr + + +def _validate_criteria(criteria: xr.Dataset) -> None: + if not isinstance(criteria, xr.Dataset): + raise TypeError( + f"criteria must be an xr.Dataset, got {type(criteria).__name__}" + ) + if len(criteria.data_vars) == 0: + raise ValueError("criteria Dataset has no variables") + + +def _validate_weights( + weights: dict[str, float], criteria: xr.Dataset +) -> None: + criteria_names = set(criteria.data_vars) + weight_names = set(weights) + missing = criteria_names - weight_names + if missing: + raise ValueError(f"Missing weights for criteria: {missing}") + extra = weight_names - criteria_names + if extra: + raise ValueError( + f"Weights contain keys not in criteria Dataset: {extra}" + ) + total = sum(weights.values()) + if abs(total - 1.0) > 0.01: + raise ValueError( + f"Weights must sum to ~1.0, got {total:.4f}" + ) + + +def wlc( + criteria: xr.Dataset, + weights: dict[str, float], + name: str = "wlc", +) -> xr.DataArray: + """Weighted Linear Combination. + + Fully compensatory: per-pixel ``sum(w_i * x_i)``. + + Parameters + ---------- + criteria : xr.Dataset + Standardized criterion layers (0-1). + weights : dict + ``{criterion_name: weight}``, must sum to ~1.0. + name : str + Name of the output DataArray. + + Returns + ------- + xr.DataArray + Composite suitability surface. + """ + _validate_criteria(criteria) + _validate_weights(weights, criteria) + + result = None + for var_name in criteria.data_vars: + w = weights[var_name] + term = criteria[var_name] * w + result = term if result is None else result + term + + result.name = name + return result + + +def wpm( + criteria: xr.Dataset, + weights: dict[str, float], + name: str = "wpm", +) -> xr.DataArray: + """Weighted Product Model. + + Multiplicative combination: per-pixel ``product(x_i ^ w_i)``. + More sensitive to low scores than WLC. + + Parameters + ---------- + criteria : xr.Dataset + Standardized criterion layers (0-1). + weights : dict + ``{criterion_name: weight}``, must sum to ~1.0. + name : str + Name of the output DataArray. + + Returns + ------- + xr.DataArray + Composite suitability surface. + """ + _validate_criteria(criteria) + _validate_weights(weights, criteria) + + result = None + for var_name in criteria.data_vars: + w = weights[var_name] + term = criteria[var_name] ** w + result = term if result is None else result * term + + result.name = name + return result + + +def owa( + criteria: xr.Dataset, + criterion_weights: dict[str, float], + order_weights: list[float], + name: str = "owa", +) -> xr.DataArray: + """Ordered Weighted Averaging. + + Generalizes WLC by adding position-based order weights that control + risk attitude. Order weights are applied to criterion values sorted + by magnitude at each pixel, not by criterion identity. + + Parameters + ---------- + criteria : xr.Dataset + Standardized criterion layers (0-1). + criterion_weights : dict + ``{criterion_name: weight}``, must sum to ~1.0. + order_weights : list of float + Weights applied by rank position (index 0 = highest value). + Must have the same length as the number of criteria and + sum to ~1.0. + name : str + Name of the output DataArray. + + Returns + ------- + xr.DataArray + Composite suitability surface. + """ + _validate_criteria(criteria) + _validate_weights(criterion_weights, criteria) + + n = len(criteria.data_vars) + if len(order_weights) != n: + raise ValueError( + f"order_weights length ({len(order_weights)}) must match " + f"number of criteria ({n})" + ) + ow_sum = sum(order_weights) + if abs(ow_sum - 1.0) > 0.01: + raise ValueError( + f"order_weights must sum to ~1.0, got {ow_sum:.4f}" + ) + + order_weights_arr = np.array(order_weights, dtype=np.float64) + + # First apply criterion weights + weighted_layers = [] + for var_name in criteria.data_vars: + w = criterion_weights[var_name] + weighted_layers.append(criteria[var_name] * w * n) + + # Stack, sort descending along criterion axis, apply order weights + stacked = xr.concat(weighted_layers, dim="__mcda_criterion") + sorted_data = _sort_descending(stacked.data, axis=0) + + # Apply order weights along the criterion axis + # Reshape order weights for broadcasting + shape = [n] + [1] * (sorted_data.ndim - 1) + + try: + import dask.array as da + if isinstance(sorted_data, da.Array): + ow = da.from_array(order_weights_arr.reshape(shape), + chunks=-1) + else: + ow = order_weights_arr.reshape(shape) + except ImportError: + ow = order_weights_arr.reshape(shape) + + result_data = (sorted_data * ow).sum(axis=0) + + # Pick dims/coords from first data var + first_var = list(criteria.data_vars)[0] + template = criteria[first_var] + return xr.DataArray( + result_data, name=name, dims=template.dims, + coords=template.coords, attrs=template.attrs, + ) + + +def _sort_descending(data, axis): + """Sort array descending along axis, supporting dask.""" + try: + import dask.array as da + if isinstance(data, da.Array): + # Negate, sort, negate back to get descending order + return -da.sort(-data, axis=axis) + except ImportError: + pass + + try: + import cupy + if isinstance(data, cupy.ndarray): + return -cupy.sort(-data, axis=axis) + except ImportError: + pass + + return -np.sort(-data, axis=axis) + + +def fuzzy_overlay( + criteria: xr.Dataset, + operator: str = "and", + gamma: float = 0.9, + name: str = "fuzzy_overlay", +) -> xr.DataArray: + """Combine criteria using fuzzy set operators. + + No explicit weights. All criteria contribute equally through the + chosen fuzzy operator. + + Parameters + ---------- + criteria : xr.Dataset + Standardized criterion layers (0-1). + operator : str + Fuzzy operator: ``"and"`` (min), ``"or"`` (max), + ``"sum"``, ``"product"``, or ``"gamma"``. + gamma : float + Gamma parameter for the ``"gamma"`` operator (0 = product, + 1 = sum). Only used when ``operator="gamma"``. + name : str + Name of the output DataArray. + + Returns + ------- + xr.DataArray + Composite suitability surface. + """ + _validate_criteria(criteria) + + var_names = list(criteria.data_vars) + layers = [criteria[v] for v in var_names] + + if operator == "and": + result = layers[0] + for layer in layers[1:]: + result = np.minimum(result, layer) + elif operator == "or": + result = layers[0] + for layer in layers[1:]: + result = np.maximum(result, layer) + elif operator == "product": + result = layers[0] + for layer in layers[1:]: + result = result * layer + elif operator == "sum": + # Fuzzy algebraic sum: 1 - product(1 - x_i) + complement = layers[0] * 0.0 + 1.0 + for layer in layers: + complement = complement * (1.0 - layer) + result = 1.0 - complement + elif operator == "gamma": + if not 0 <= gamma <= 1: + raise ValueError(f"gamma must be in [0, 1], got {gamma}") + # product part + prod = layers[0] + for layer in layers[1:]: + prod = prod * layer + # sum part + complement = layers[0] * 0.0 + 1.0 + for layer in layers: + complement = complement * (1.0 - layer) + fuzzy_sum = 1.0 - complement + result = (fuzzy_sum ** gamma) * (prod ** (1.0 - gamma)) + else: + raise ValueError( + f"Unknown operator {operator!r}. Choose from " + "'and', 'or', 'sum', 'product', 'gamma'" + ) + + result.name = name + return result + + +def boolean_overlay( + criteria: dict[str, xr.DataArray], + operator: str = "and", + name: str = "boolean_overlay", +) -> xr.DataArray: + """Combine binary (boolean) criterion masks. + + Parameters + ---------- + criteria : dict of str to xr.DataArray + Named boolean/binary criterion masks. + operator : str + ``"and"`` (intersection) or ``"or"`` (union). + name : str + Name of the output DataArray. + + Returns + ------- + xr.DataArray + Binary suitability mask. + """ + if not criteria: + raise ValueError("criteria dict is empty") + + # Cast to bool to handle numeric input (0/1, float thresholds) + layers = [v.astype(bool) for v in criteria.values()] + if operator == "and": + result = layers[0] + for layer in layers[1:]: + result = result & layer + elif operator == "or": + result = layers[0] + for layer in layers[1:]: + result = result | layer + else: + raise ValueError( + f"Unknown operator {operator!r}. Choose from 'and', 'or'" + ) + + result.name = name + return result diff --git a/xrspatial/mcda/constrain.py b/xrspatial/mcda/constrain.py new file mode 100644 index 00000000..59275ffe --- /dev/null +++ b/xrspatial/mcda/constrain.py @@ -0,0 +1,43 @@ +"""Constraint masking for MCDA suitability surfaces.""" + +from __future__ import annotations + +import numpy as np +import xarray as xr + + +def constrain( + suitability: xr.DataArray, + exclude: list[xr.DataArray], + fill: float = np.nan, + name: str | None = None, +) -> xr.DataArray: + """Mask out areas that are categorically unsuitable. + + Pixels where any exclusion mask is True (nonzero) are set to *fill*. + + Parameters + ---------- + suitability : xr.DataArray + Input suitability surface. + exclude : list of xr.DataArray + Boolean or binary masks. True/nonzero marks excluded areas. + fill : float + Value to assign to excluded pixels. Default ``np.nan``. + name : str, optional + Name of the output DataArray. + + Returns + ------- + xr.DataArray + Constrained suitability surface. + """ + if name is None: + name = suitability.name + + result = suitability.copy() + for mask in exclude: + result = xr.where(mask, fill, result) + + result.name = name + return result diff --git a/xrspatial/mcda/sensitivity.py b/xrspatial/mcda/sensitivity.py new file mode 100644 index 00000000..d55f91f3 --- /dev/null +++ b/xrspatial/mcda/sensitivity.py @@ -0,0 +1,187 @@ +"""Sensitivity analysis for MCDA weight stability.""" + +from __future__ import annotations + +import numpy as np +import xarray as xr + +from xrspatial.mcda.combine import wlc, wpm + + +def sensitivity( + criteria: xr.Dataset, + weights: dict[str, float], + method: str = "one_at_a_time", + combine_method: str = "wlc", + delta: float = 0.05, + n_samples: int = 1000, + seed: int = 42, + name: str = "sensitivity", +) -> xr.DataArray | xr.Dataset: + """Assess how weight changes affect the suitability surface. + + Parameters + ---------- + criteria : xr.Dataset + Standardized criterion layers. + weights : dict + Base weights (must sum to ~1.0). + method : str + ``"one_at_a_time"`` or ``"monte_carlo"``. + combine_method : str + Combination function: ``"wlc"`` or ``"wpm"``. + delta : float + Perturbation magnitude for one-at-a-time (default 0.05). + n_samples : int + Number of random weight vectors for Monte Carlo (default 1000). + seed : int + Random seed for Monte Carlo reproducibility. + name : str + Name prefix for output arrays. + + Returns + ------- + xr.Dataset or xr.DataArray + For ``"one_at_a_time"``: Dataset with per-criterion DataArrays + showing the absolute score difference under perturbation. + For ``"monte_carlo"``: DataArray of per-pixel coefficient of + variation across random weight samples. + """ + combine_fn = {"wlc": wlc, "wpm": wpm}.get(combine_method) + if combine_fn is None: + raise ValueError( + f"Unknown combine_method {combine_method!r}. " + "Choose 'wlc' or 'wpm'" + ) + + if method == "one_at_a_time": + return _oat(criteria, weights, combine_fn, delta, name) + elif method == "monte_carlo": + return _monte_carlo( + criteria, weights, combine_fn, n_samples, seed, name + ) + else: + raise ValueError( + f"Unknown method {method!r}. " + "Choose 'one_at_a_time' or 'monte_carlo'" + ) + + +def _perturb_weights( + weights: dict[str, float], target: str, delta: float +) -> tuple[dict[str, float], dict[str, float]]: + """Create +delta and -delta weight variants for one criterion. + + The target criterion's weight shifts by delta; remaining weights + are rescaled proportionally to maintain a sum of 1.0. + """ + names = list(weights.keys()) + base_w = weights[target] + others = [k for k in names if k != target] + other_sum = sum(weights[k] for k in others) + + # Single criterion: weight must stay at 1.0, perturbation is a no-op + if not others: + copy = dict(weights) + return copy, dict(copy) + + results = [] + for sign in (+1, -1): + new_target = max(0.0, min(1.0, base_w + sign * delta)) + remainder = 1.0 - new_target + perturbed = {} + for k in names: + if k == target: + perturbed[k] = new_target + else: + if other_sum > 0: + perturbed[k] = weights[k] / other_sum * remainder + else: + perturbed[k] = remainder / len(others) + results.append(perturbed) + + return results[0], results[1] + + +def _oat(criteria, weights, combine_fn, delta, name): + """One-at-a-time sensitivity: perturb each weight +/- delta.""" + result_vars = {} + + for criterion in weights: + w_plus, w_minus = _perturb_weights(weights, criterion, delta) + score_plus = combine_fn(criteria, w_plus) + score_minus = combine_fn(criteria, w_minus) + diff = abs(score_plus - score_minus) + diff.name = f"{name}_{criterion}" + result_vars[criterion] = diff + + return xr.Dataset(result_vars) + + +def _is_dask_dataset(criteria): + """Check if any variable in the Dataset is backed by dask.""" + try: + import dask.array as da + return any( + isinstance(criteria[v].data, da.Array) + for v in criteria.data_vars + ) + except ImportError: + return False + + +def _monte_carlo(criteria, weights, combine_fn, n_samples, seed, name): + """Monte Carlo sensitivity: random weight vectors, measure CV.""" + rng = np.random.default_rng(seed) + n_criteria = len(weights) + criteria_names = list(weights.keys()) + + # Generate random weight vectors using Dirichlet distribution + # Use base weights as concentration parameters (scaled up) + alpha = np.array([weights[k] for k in criteria_names]) * n_criteria * 5 + alpha = np.maximum(alpha, 0.5) # floor to avoid zero concentrations + + first_var = list(criteria.data_vars)[0] + template = criteria[first_var] + + use_dask = _is_dask_dataset(criteria) + + # Accumulate running mean and M2 for Welford's online algorithm + # to avoid storing all n_samples surfaces in memory + running_mean = template.values * 0.0 if not use_dask else np.zeros(template.shape) + running_m2 = running_mean.copy() + + # For dask inputs: compute each score eagerly to avoid graph + # explosion (each lazy iteration would chain ~35 graph nodes; + # 1000 iterations = 35000 chained tasks with no parallelism). + # The criterion layers are small enough per-pixel that the + # bottleneck is the combination, not I/O. + if use_dask: + criteria_np = criteria.compute() + else: + criteria_np = criteria + + for i in range(n_samples): + raw = rng.dirichlet(alpha) + sample_weights = { + k: float(raw[j]) for j, k in enumerate(criteria_names) + } + score = combine_fn(criteria_np, sample_weights) + score_vals = score.values + + delta_val = score_vals - running_mean + running_mean = running_mean + delta_val / (i + 1) + delta2 = score_vals - running_mean + running_m2 = running_m2 + delta_val * delta2 + + variance = running_m2 / n_samples + std_dev = np.sqrt(variance) + # Coefficient of variation + with np.errstate(divide="ignore", invalid="ignore"): + cv_vals = np.where(running_mean != 0, std_dev / np.abs(running_mean), 0.0) + + result = xr.DataArray( + cv_vals, name=name, dims=template.dims, + coords=template.coords, attrs=template.attrs, + ) + return result diff --git a/xrspatial/mcda/standardize.py b/xrspatial/mcda/standardize.py new file mode 100644 index 00000000..d33b1fd7 --- /dev/null +++ b/xrspatial/mcda/standardize.py @@ -0,0 +1,308 @@ +"""Value-function standardization for MCDA criterion layers. + +Converts raw criterion rasters from native units to a common 0-1 +suitability scale using configurable value functions. +""" + +from __future__ import annotations + +import numpy as np +import xarray as xr + + +def standardize( + agg: xr.DataArray, + method: str = "linear", + *, + direction: str = "higher_is_better", + bounds: tuple[float, float] | None = None, + midpoint: float = 0.0, + spread: float = 1.0, + mean: float = 0.0, + std: float = 1.0, + low: float = 0.0, + peak: float = 0.5, + high: float = 1.0, + breakpoints: list[float] | None = None, + values: list[float] | None = None, + mapping: dict | None = None, + name: str | None = None, +) -> xr.DataArray: + """Standardize a criterion raster to 0-1 suitability scale. + + Parameters + ---------- + agg : xr.DataArray + Input criterion raster (numpy, cupy, dask+numpy, or dask+cupy). + method : str + Value function type. One of ``"linear"``, ``"sigmoidal"``, + ``"gaussian"``, ``"triangular"``, ``"piecewise"``, or + ``"categorical"``. + direction : str + For monotonic methods (``"linear"``): ``"higher_is_better"`` or + ``"lower_is_better"``. + bounds : tuple of float, optional + ``(low, high)`` for linear scaling. Values outside bounds are + clipped. If *None*, uses data min/max. + midpoint : float + Inflection point for sigmoidal method. + spread : float + Steepness for sigmoidal method. Negative values produce a + decreasing sigmoid. + mean : float + Center of the bell curve for gaussian method. + std : float + Width of the bell curve for gaussian method. Must be > 0. + low, peak, high : float + Vertices of the triangle for triangular method. + breakpoints : list of float, optional + X-coordinates for piecewise linear interpolation. + values : list of float, optional + Corresponding suitability values for each breakpoint. + mapping : dict, optional + ``{class_value: suitability}`` lookup for categorical method. + name : str, optional + Name of the output DataArray. Defaults to the input name. + + Returns + ------- + xr.DataArray + Standardized raster with values in [0, 1]. NaN values pass through. + """ + if name is None: + name = agg.name + + _methods = { + "linear": _linear, + "sigmoidal": _sigmoidal, + "gaussian": _gaussian, + "triangular": _triangular, + "piecewise": _piecewise, + "categorical": _categorical, + } + if method not in _methods: + raise ValueError( + f"Unknown method {method!r}. Choose from {list(_methods)}" + ) + + func = _methods[method] + + if method == "linear": + data = func(agg.data, direction=direction, bounds=bounds) + elif method == "sigmoidal": + data = func(agg.data, midpoint=midpoint, spread=spread) + elif method == "gaussian": + if std <= 0: + raise ValueError(f"std must be > 0, got {std}") + data = func(agg.data, mean=mean, std=std) + elif method == "triangular": + if not (low <= peak <= high): + raise ValueError( + f"Need low <= peak <= high, got {low}, {peak}, {high}" + ) + data = func(agg.data, low=low, peak=peak, high=high) + elif method == "piecewise": + if breakpoints is None or values is None: + raise ValueError( + "piecewise method requires breakpoints and values" + ) + if len(breakpoints) != len(values): + raise ValueError( + "breakpoints and values must have the same length" + ) + if len(breakpoints) < 2: + raise ValueError("piecewise requires at least 2 breakpoints") + if len(set(breakpoints)) != len(breakpoints): + raise ValueError( + "piecewise breakpoints must be unique (duplicates found)" + ) + data = func(agg.data, breakpoints=breakpoints, values=values) + elif method == "categorical": + if mapping is None: + raise ValueError("categorical method requires mapping dict") + data = func(agg.data, mapping=mapping) + + return xr.DataArray( + data, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs, + ) + + +def _is_dask(data): + """Return True if data is a dask array.""" + try: + import dask.array as da + return isinstance(data, da.Array) + except ImportError: + return False + + +def _get_xp(data): + """Return the array module for the underlying data. + + For dask arrays (numpy- or cupy-backed), returns numpy because + numpy ufuncs dispatch correctly to dask through __array_function__. + Only returns cupy for plain cupy arrays. + """ + # Dask arrays (regardless of backend) use numpy dispatch + if _is_dask(data): + return np + + try: + import cupy + if isinstance(data, cupy.ndarray): + return cupy + except ImportError: + pass + + return np + + +def _linear(data, *, direction, bounds): + xp = _get_xp(data) + + if bounds is not None: + lo, hi = float(bounds[0]), float(bounds[1]) + if lo > hi: + raise ValueError( + f"bounds[0] must be <= bounds[1], got ({lo}, {hi})" + ) + else: + # Compute from data, ignoring NaN + import warnings + if _is_dask(data): + import dask + import dask.array as da + lo, hi = dask.compute(da.nanmin(data), da.nanmax(data)) + lo, hi = float(lo), float(hi) + else: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", RuntimeWarning) + lo = float(np.nanmin(data)) + hi = float(np.nanmax(data)) + + rng = hi - lo + if rng == 0: + # Constant surface -- return 0.5 + out = xp.where(xp.isfinite(data), 0.5, xp.nan) + return out + + # Clip and scale to 0-1 + clipped = xp.clip(data, lo, hi) + result = (clipped - lo) / rng + + if direction == "lower_is_better": + result = 1.0 - result + elif direction != "higher_is_better": + raise ValueError( + f"direction must be 'higher_is_better' or 'lower_is_better', " + f"got {direction!r}" + ) + + # Preserve NaN + result = xp.where(xp.isfinite(data), result, xp.nan) + return result + + +def _sigmoidal(data, *, midpoint, spread): + xp = _get_xp(data) + import warnings + exponent = -spread * (data - midpoint) + # Clamp exponent to avoid overflow in exp(); the sigmoid is + # effectively 0 or 1 beyond ~±700 anyway. + exponent = xp.clip(exponent, -500.0, 500.0) + result = 1.0 / (1.0 + xp.exp(exponent)) + result = xp.where(xp.isfinite(data), result, xp.nan) + return result + + +def _gaussian(data, *, mean, std): + xp = _get_xp(data) + result = xp.exp(-0.5 * ((data - mean) / std) ** 2) + result = xp.where(xp.isfinite(data), result, xp.nan) + return result + + +def _triangular(data, *, low, peak, high): + xp = _get_xp(data) + + if low == peak == high: + # Fully degenerate: 1.0 at the exact point, 0 elsewhere + result = xp.where(data == peak, 1.0, 0.0) + result = xp.where(xp.isfinite(data), result, xp.nan) + return result + + if peak == low: + # No rising edge: 1.0 at peak, linear fall to 0 at high, + # 0 below low. + result = (high - data) / (high - low) + result = xp.clip(result, 0.0, 1.0) + result = xp.where(data < low, 0.0, result) + elif peak == high: + # No falling edge: 0 at low, linear rise to 1.0 at peak, + # 0 above high. + result = (data - low) / (high - low) + result = xp.clip(result, 0.0, 1.0) + result = xp.where(data > high, 0.0, result) + else: + # Standard triangle: min of rising and falling edges + rising = (data - low) / (peak - low) + falling = (high - data) / (high - peak) + result = xp.minimum(rising, falling) + result = xp.clip(result, 0.0, 1.0) + + result = xp.where(xp.isfinite(data), result, xp.nan) + return result + + +def _piecewise(data, *, breakpoints, values): + xp = _get_xp(data) + bp = np.asarray(breakpoints, dtype=np.float64) + vl = np.asarray(values, dtype=np.float64) + + # Sort by breakpoint + order = np.argsort(bp) + bp = bp[order] + vl = vl[order] + + if _is_dask(data): + import dask.array as da + + def _interp_block(block): + # Ensure block is numpy for np.interp (handles cupy chunks) + arr = np.asarray(block) + return np.interp(arr, bp, vl) + + result = da.map_blocks(_interp_block, data, dtype=np.float64) + result = da.where(da.isfinite(data), result, np.nan) + return result + + result = xp.interp(data, bp, vl) + result = xp.where(xp.isfinite(data), result, xp.nan) + return result + + +def _categorical(data, *, mapping): + xp = _get_xp(data) + + # Build output: default NaN, fill from mapping + keys = np.array(list(mapping.keys()), dtype=np.float64) + vals = np.array(list(mapping.values()), dtype=np.float64) + + if _is_dask(data): + import dask.array as da + + def _apply_mapping(block): + # Ensure block is numpy (handles cupy chunks) + arr = np.asarray(block) + out = np.full(arr.shape, np.nan, dtype=np.float64) + for k, v in zip(keys, vals): + out[arr == k] = v + return out + + result = da.map_blocks(_apply_mapping, data, dtype=np.float64) + return result + + out = xp.full(data.shape, np.nan, dtype=np.float64) + for k, v in zip(keys, vals): + out[data == k] = v + return out diff --git a/xrspatial/mcda/weights.py b/xrspatial/mcda/weights.py new file mode 100644 index 00000000..c79dc359 --- /dev/null +++ b/xrspatial/mcda/weights.py @@ -0,0 +1,173 @@ +"""Weight derivation methods for MCDA. + +Provides AHP (Analytical Hierarchy Process) and rank-order weighting. +These operate on small metadata (criteria names and comparisons), not +on raster data, so they always use numpy regardless of backend. +""" + +from __future__ import annotations + +import warnings +from dataclasses import dataclass + +import numpy as np + + +# Random consistency index (Saaty) for matrices of size 1..15 +_RI = [0.0, 0.0, 0.58, 0.90, 1.12, 1.24, 1.32, 1.41, 1.45, 1.49, + 1.51, 1.48, 1.56, 1.57, 1.59] + + +@dataclass +class ConsistencyResult: + """AHP consistency check results.""" + ratio: float + index: float + is_consistent: bool + lambda_max: float + + +def ahp_weights( + criteria: list[str], + comparisons: dict[tuple[str, str], float], +) -> tuple[dict[str, float], ConsistencyResult]: + """Derive criterion weights from pairwise comparisons using AHP. + + Uses the standard Saaty eigenvector method. Input pairwise + comparisons on a 1-9 scale (or reciprocals). The function builds + the full comparison matrix, computes the principal eigenvector for + weights, and derives the consistency ratio. + + Parameters + ---------- + criteria : list of str + Criterion names in order. + comparisons : dict + Pairwise comparisons as ``{(criterion_a, criterion_b): value}``. + Only provide each pair once; the reciprocal is inferred. + Values follow the Saaty scale (1 = equal, 9 = extreme preference + of a over b, 1/9 = extreme preference of b over a). + + Returns + ------- + weights : dict of str to float + Normalized weights summing to 1.0. + consistency : ConsistencyResult + Consistency ratio and related metrics. ``is_consistent`` is + True when ratio < 0.10. + + Raises + ------ + ValueError + If criteria list has fewer than 2 items or comparisons are + incomplete. + """ + n = len(criteria) + if n < 2: + raise ValueError("Need at least 2 criteria") + if len(set(criteria)) != n: + raise ValueError("Duplicate criterion names are not allowed") + + expected = n * (n - 1) // 2 + if len(comparisons) < expected: + warnings.warn( + f"Only {len(comparisons)} of {expected} pairwise comparisons " + f"provided for {n} criteria. Missing pairs default to 1 " + f"(equal importance).", + UserWarning, + stacklevel=2, + ) + + idx = {name: i for i, name in enumerate(criteria)} + matrix = np.ones((n, n), dtype=np.float64) + + for (a, b), val in comparisons.items(): + if a not in idx: + raise ValueError(f"Unknown criterion {a!r}") + if b not in idx: + raise ValueError(f"Unknown criterion {b!r}") + if a == b: + raise ValueError( + f"Self-comparison ({a!r}, {b!r}) is not allowed; " + f"diagonal entries are always 1" + ) + if val <= 0: + raise ValueError( + f"Comparison value must be positive, got {val} " + f"for ({a!r}, {b!r})" + ) + i, j = idx[a], idx[b] + matrix[i, j] = val + matrix[j, i] = 1.0 / val + + # Principal eigenvector + eigenvalues, eigenvectors = np.linalg.eig(matrix) + # Find the largest real eigenvalue + real_parts = eigenvalues.real + max_idx = np.argmax(real_parts) + lambda_max = float(real_parts[max_idx]) + + # Corresponding eigenvector (take real part) + raw_weights = eigenvectors[:, max_idx].real + raw_weights = np.abs(raw_weights) + normalized = raw_weights / raw_weights.sum() + + # Consistency + ci = (lambda_max - n) / (n - 1) if n > 1 else 0.0 + ri = _RI[n - 1] if n <= len(_RI) else _RI[-1] + cr = ci / ri if ri > 0 else 0.0 + + weights = {name: float(normalized[i]) for i, name in enumerate(criteria)} + consistency = ConsistencyResult( + ratio=cr, + index=ci, + is_consistent=(cr < 0.10), + lambda_max=lambda_max, + ) + return weights, consistency + + +def rank_weights( + ranking: list[str], + method: str = "roc", +) -> dict[str, float]: + """Derive weights from a rank ordering of criteria. + + Parameters + ---------- + ranking : list of str + Criteria ordered from most to least important. + method : str + Weighting scheme: ``"roc"`` (rank-order centroid), + ``"rs"`` (rank sum), or ``"rr"`` (reciprocal of ranks). + + Returns + ------- + weights : dict of str to float + Normalized weights summing to 1.0. + """ + n = len(ranking) + if n < 1: + raise ValueError("Need at least 1 criterion in ranking") + if len(set(ranking)) != n: + raise ValueError("Duplicate criterion names are not allowed") + + if method == "roc": + # ROC: w_i = (1/n) * sum(1/k for k in range(i+1, n+1)) + raw = np.array([ + sum(1.0 / k for k in range(i + 1, n + 1)) / n + for i in range(n) + ]) + elif method == "rs": + # Rank sum: w_i = (n - rank + 1) / sum + raw = np.array([n - i for i in range(n)], dtype=np.float64) + elif method == "rr": + # Reciprocal of rank: w_i = (1/rank) / sum + raw = np.array([1.0 / (i + 1) for i in range(n)]) + else: + raise ValueError( + f"Unknown method {method!r}. Choose from 'roc', 'rs', 'rr'" + ) + + normalized = raw / raw.sum() + return {name: float(normalized[i]) for i, name in enumerate(ranking)} diff --git a/xrspatial/tests/test_mcda.py b/xrspatial/tests/test_mcda.py new file mode 100644 index 00000000..ad8bf656 --- /dev/null +++ b/xrspatial/tests/test_mcda.py @@ -0,0 +1,1712 @@ +"""Tests for the xrspatial.mcda module (#1030).""" + +from __future__ import annotations + +import numpy as np +import pytest +import xarray as xr + +from xrspatial.mcda import ( + ahp_weights, + boolean_overlay, + constrain, + fuzzy_overlay, + owa, + rank_weights, + sensitivity, + standardize, + wlc, + wpm, +) + +try: + import dask.array as da + HAS_DASK = True +except ImportError: + HAS_DASK = False + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def criterion_raster(): + """Simple 4x4 raster with known values and one NaN.""" + data = np.array([ + [0.0, 10.0, 20.0, 30.0], + [40.0, 50.0, 60.0, 70.0], + [80.0, 90.0, np.nan, 100.0], + [5.0, 15.0, 25.0, 45.0], + ], dtype=np.float64) + return xr.DataArray(data, dims=["y", "x"]) + + +@pytest.fixture +def criteria_dataset(): + """Dataset with three standardized criteria for combination tests.""" + np.random.seed(1030) + slope = xr.DataArray( + np.array([[0.2, 0.8, 0.5], + [0.9, 0.1, 0.6], + [0.4, 0.7, 0.3]], dtype=np.float64), + dims=["y", "x"], name="slope", + ) + distance = xr.DataArray( + np.array([[0.5, 0.3, 0.7], + [0.2, 0.9, 0.4], + [0.8, 0.1, 0.6]], dtype=np.float64), + dims=["y", "x"], name="distance", + ) + ndvi = xr.DataArray( + np.array([[0.6, 0.4, 0.8], + [0.3, 0.7, 0.5], + [0.9, 0.2, 0.1]], dtype=np.float64), + dims=["y", "x"], name="ndvi", + ) + return xr.Dataset({"slope": slope, "distance": distance, "ndvi": ndvi}) + + +@pytest.fixture +def weights_3(): + return {"slope": 0.4, "distance": 0.35, "ndvi": 0.25} + + +# =========================================================================== +# standardize +# =========================================================================== + +class TestStandardizeLinear: + def test_higher_is_better(self, criterion_raster): + result = standardize( + criterion_raster, method="linear", + direction="higher_is_better", bounds=(0, 100), + ) + # 0 -> 0.0, 100 -> 1.0 + assert float(result.values[0, 0]) == pytest.approx(0.0) + assert float(result.values[2, 3]) == pytest.approx(1.0) + assert float(result.values[1, 2]) == pytest.approx(0.6) + + def test_lower_is_better(self, criterion_raster): + result = standardize( + criterion_raster, method="linear", + direction="lower_is_better", bounds=(0, 100), + ) + # 0 -> 1.0, 100 -> 0.0 + assert float(result.values[0, 0]) == pytest.approx(1.0) + assert float(result.values[2, 3]) == pytest.approx(0.0) + + def test_nan_preserved(self, criterion_raster): + result = standardize( + criterion_raster, method="linear", bounds=(0, 100), + ) + assert np.isnan(result.values[2, 2]) + + def test_no_bounds_uses_data_range(self, criterion_raster): + result = standardize(criterion_raster, method="linear") + finite = criterion_raster.values[np.isfinite(criterion_raster.values)] + # min value -> 0, max value -> 1 + min_idx = np.unravel_index( + np.nanargmin(criterion_raster.values), criterion_raster.shape + ) + max_idx = np.unravel_index( + np.nanargmax(criterion_raster.values), criterion_raster.shape + ) + assert float(result.values[min_idx]) == pytest.approx(0.0) + assert float(result.values[max_idx]) == pytest.approx(1.0) + + def test_constant_surface(self): + data = np.full((3, 3), 5.0) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize(agg, method="linear", bounds=(5, 5)) + # Should return 0.5 for constant + assert np.all(result.values == pytest.approx(0.5)) + + def test_clipping(self): + data = np.array([[0.0, 50.0, 200.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize(agg, method="linear", bounds=(0, 100)) + assert float(result.values[0, 2]) == pytest.approx(1.0) + + def test_invalid_direction(self, criterion_raster): + with pytest.raises(ValueError, match="direction"): + standardize( + criterion_raster, method="linear", direction="bad" + ) + + +class TestStandardizeSigmoidal: + def test_basic(self): + data = np.array([[0.0, 5000.0, 10000.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize( + agg, method="sigmoidal", midpoint=5000, spread=-0.001, + ) + # At midpoint, sigmoid = 0.5 + assert float(result.values[0, 1]) == pytest.approx(0.5) + # Negative spread: higher values -> lower score + assert float(result.values[0, 0]) > float(result.values[0, 2]) + + def test_nan_preserved(self): + data = np.array([[np.nan, 5000.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize(agg, method="sigmoidal", midpoint=5000, spread=0.001) + assert np.isnan(result.values[0, 0]) + + +class TestStandardizeGaussian: + def test_peak_at_mean(self): + data = np.array([[0.0, 0.65, 1.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize( + agg, method="gaussian", mean=0.65, std=0.15, + ) + # At mean, gaussian = 1.0 + assert float(result.values[0, 1]) == pytest.approx(1.0) + # Away from mean, values decrease + assert float(result.values[0, 0]) < 1.0 + assert float(result.values[0, 2]) < 1.0 + + def test_invalid_std(self): + agg = xr.DataArray(np.array([[1.0]]), dims=["y", "x"]) + with pytest.raises(ValueError, match="std must be > 0"): + standardize(agg, method="gaussian", mean=0, std=0) + + +class TestStandardizeTriangular: + def test_basic(self): + data = np.array([[10.0, 22.0, 30.0, 35.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize( + agg, method="triangular", low=15, peak=22, high=30, + ) + # At peak -> 1.0 + assert float(result.values[0, 1]) == pytest.approx(1.0) + # At high -> 0.0 + assert float(result.values[0, 2]) == pytest.approx(0.0) + # Below low -> 0.0 + assert float(result.values[0, 0]) == pytest.approx(0.0) + # Above high -> 0.0 + assert float(result.values[0, 3]) == pytest.approx(0.0) + + def test_invalid_vertices(self): + agg = xr.DataArray(np.array([[1.0]]), dims=["y", "x"]) + with pytest.raises(ValueError, match="low <= peak <= high"): + standardize(agg, method="triangular", low=30, peak=20, high=10) + + +class TestStandardizePiecewise: + def test_basic(self): + data = np.array([[0.0, 100.0, 500.0, 1000.0, 2000.0]], + dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize( + agg, method="piecewise", + breakpoints=[0, 100, 500, 1000, 2000], + values=[0.0, 0.3, 1.0, 0.6, 0.0], + ) + assert float(result.values[0, 0]) == pytest.approx(0.0) + assert float(result.values[0, 1]) == pytest.approx(0.3) + assert float(result.values[0, 2]) == pytest.approx(1.0) + assert float(result.values[0, 3]) == pytest.approx(0.6) + assert float(result.values[0, 4]) == pytest.approx(0.0) + + def test_missing_params(self): + agg = xr.DataArray(np.array([[1.0]]), dims=["y", "x"]) + with pytest.raises(ValueError, match="breakpoints and values"): + standardize(agg, method="piecewise") + + def test_length_mismatch(self): + agg = xr.DataArray(np.array([[1.0]]), dims=["y", "x"]) + with pytest.raises(ValueError, match="same length"): + standardize( + agg, method="piecewise", + breakpoints=[0, 1, 2], values=[0.0, 1.0], + ) + + +class TestStandardizeCategorical: + def test_basic(self): + data = np.array([[1, 2, 3, 4]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize( + agg, method="categorical", + mapping={1: 0.9, 2: 0.7, 3: 0.4, 4: 0.1}, + ) + assert float(result.values[0, 0]) == pytest.approx(0.9) + assert float(result.values[0, 1]) == pytest.approx(0.7) + assert float(result.values[0, 2]) == pytest.approx(0.4) + assert float(result.values[0, 3]) == pytest.approx(0.1) + + def test_unmapped_values_nan(self): + data = np.array([[1, 99]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize( + agg, method="categorical", mapping={1: 0.5}, + ) + assert float(result.values[0, 0]) == pytest.approx(0.5) + assert np.isnan(result.values[0, 1]) + + def test_missing_mapping(self): + agg = xr.DataArray(np.array([[1.0]]), dims=["y", "x"]) + with pytest.raises(ValueError, match="mapping"): + standardize(agg, method="categorical") + + +class TestStandardizeTriangularDegenerate: + """Degenerate triangles where peak coincides with low or high.""" + + def test_peak_equals_low(self): + """Right triangle: 1.0 at peak=low, falling to 0 at high.""" + data = np.array([[10.0, 15.0, 20.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize(agg, method="triangular", low=10, peak=10, high=20) + assert float(result.values[0, 0]) == pytest.approx(1.0) + assert float(result.values[0, 1]) == pytest.approx(0.5) + assert float(result.values[0, 2]) == pytest.approx(0.0) + + def test_peak_equals_high(self): + """Left triangle: 0 at low, rising to 1.0 at peak=high.""" + data = np.array([[10.0, 15.0, 20.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize(agg, method="triangular", low=10, peak=20, high=20) + assert float(result.values[0, 0]) == pytest.approx(0.0) + assert float(result.values[0, 1]) == pytest.approx(0.5) + assert float(result.values[0, 2]) == pytest.approx(1.0) + + def test_all_equal(self): + """Fully degenerate: low=peak=high -> 1.0 at that point, 0 elsewhere.""" + data = np.array([[5.0, 10.0, 15.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + result = standardize(agg, method="triangular", low=10, peak=10, high=10) + assert float(result.values[0, 0]) == pytest.approx(0.0) + assert float(result.values[0, 1]) == pytest.approx(1.0) + assert float(result.values[0, 2]) == pytest.approx(0.0) + + +class TestStandardizeSigmoidalOverflow: + """Sigmoid must not produce warnings or inf for extreme values.""" + + def test_extreme_values_no_warning(self): + import warnings + data = np.array([[0.0, 1e6, -1e6]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + with warnings.catch_warnings(): + warnings.simplefilter("error") + result = standardize( + agg, method="sigmoidal", midpoint=0, spread=1.0, + ) + assert float(result.values[0, 0]) == pytest.approx(0.5) + assert float(result.values[0, 1]) == pytest.approx(1.0) + assert float(result.values[0, 2]) == pytest.approx(0.0, abs=1e-100) + + def test_large_spread_no_warning(self): + import warnings + data = np.array([[0.0, 100.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + with warnings.catch_warnings(): + warnings.simplefilter("error") + result = standardize( + agg, method="sigmoidal", midpoint=50, spread=100.0, + ) + assert np.all(np.isfinite(result.values)) + + +class TestStandardizeAllNaN: + """All-NaN input should return all NaN without raising.""" + + def test_linear_all_nan_with_bounds(self): + agg = xr.DataArray(np.full((3, 3), np.nan), dims=["y", "x"]) + result = standardize(agg, method="linear", bounds=(0, 100)) + assert np.all(np.isnan(result.values)) + + def test_linear_all_nan_no_bounds(self): + """nanmin/nanmax on all-NaN should not raise.""" + import warnings + agg = xr.DataArray(np.full((2, 2), np.nan), dims=["y", "x"]) + with warnings.catch_warnings(): + warnings.simplefilter("error", RuntimeWarning) + # Should NOT raise RuntimeWarning + result = standardize(agg, method="linear") + assert np.all(np.isnan(result.values)) + + def test_gaussian_all_nan(self): + agg = xr.DataArray(np.full((2, 2), np.nan), dims=["y", "x"]) + result = standardize(agg, method="gaussian", mean=0, std=1) + assert np.all(np.isnan(result.values)) + + +class TestStandardizeUnknownMethod: + def test_unknown_method(self): + agg = xr.DataArray(np.array([[1.0]]), dims=["y", "x"]) + with pytest.raises(ValueError, match="Unknown method"): + standardize(agg, method="magic") + + +@pytest.mark.skipif(not HAS_DASK, reason="Requires dask") +class TestStandardizeDask: + def test_linear_dask(self, criterion_raster): + dask_raster = criterion_raster.chunk({"y": 2, "x": 2}) + result = standardize( + dask_raster, method="linear", bounds=(0, 100), + ) + assert hasattr(result.data, 'compute') + computed = result.compute() + numpy_result = standardize( + criterion_raster, method="linear", bounds=(0, 100), + ) + np.testing.assert_allclose( + computed.values, numpy_result.values, equal_nan=True, + ) + + def test_sigmoidal_dask(self): + data = np.array([[0.0, 5000.0, 10000.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]).chunk({"x": 2}) + result = standardize( + agg, method="sigmoidal", midpoint=5000, spread=-0.001, + ) + assert hasattr(result.data, 'compute') + computed = result.compute() + assert float(computed.values[0, 1]) == pytest.approx(0.5) + + def test_categorical_dask(self): + data = np.array([[1, 2, 3]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]).chunk({"x": 2}) + result = standardize( + agg, method="categorical", mapping={1: 0.9, 2: 0.5, 3: 0.1}, + ) + computed = result.compute() + assert float(computed.values[0, 0]) == pytest.approx(0.9) + + def test_piecewise_dask(self): + data = np.array([[0.0, 50.0, 100.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]).chunk({"x": 2}) + result = standardize( + agg, method="piecewise", + breakpoints=[0, 50, 100], values=[0.0, 1.0, 0.5], + ) + computed = result.compute() + assert float(computed.values[0, 1]) == pytest.approx(1.0) + + +# =========================================================================== +# weights +# =========================================================================== + +class TestAHPWeights: + def test_equal_comparisons(self): + """All criteria equally important -> equal weights.""" + w, c = ahp_weights( + criteria=["a", "b", "c"], + comparisons={("a", "b"): 1, ("a", "c"): 1, ("b", "c"): 1}, + ) + assert w["a"] == pytest.approx(1 / 3, abs=0.01) + assert w["b"] == pytest.approx(1 / 3, abs=0.01) + assert w["c"] == pytest.approx(1 / 3, abs=0.01) + assert c.is_consistent + + def test_dominant_criterion(self): + """One criterion strongly preferred -> gets highest weight.""" + w, c = ahp_weights( + criteria=["a", "b", "c"], + comparisons={("a", "b"): 9, ("a", "c"): 9, ("b", "c"): 1}, + ) + assert w["a"] > w["b"] + assert w["a"] > w["c"] + assert sum(w.values()) == pytest.approx(1.0) + + def test_consistency_ratio(self): + """Consistent judgments should have CR < 0.10.""" + w, c = ahp_weights( + criteria=["slope", "distance", "ndvi"], + comparisons={ + ("slope", "distance"): 3, + ("slope", "ndvi"): 2, + ("distance", "ndvi"): 1 / 2, + }, + ) + assert c.ratio < 0.10 + assert c.is_consistent + + def test_weights_sum_to_one(self): + w, _ = ahp_weights( + criteria=["a", "b", "c", "d"], + comparisons={ + ("a", "b"): 3, ("a", "c"): 5, ("a", "d"): 2, + ("b", "c"): 2, ("b", "d"): 1, + ("c", "d"): 1 / 3, + }, + ) + assert sum(w.values()) == pytest.approx(1.0) + + def test_unknown_criterion_raises(self): + with pytest.raises(ValueError, match="Unknown criterion"): + ahp_weights( + criteria=["a", "b"], + comparisons={("a", "z"): 2}, + ) + + def test_too_few_criteria_raises(self): + with pytest.raises(ValueError, match="at least 2"): + ahp_weights(criteria=["a"], comparisons={}) + + +class TestRankWeights: + def test_roc(self): + w = rank_weights(["a", "b", "c"], method="roc") + # First-ranked should have highest weight + assert w["a"] > w["b"] > w["c"] + assert sum(w.values()) == pytest.approx(1.0) + + def test_rs(self): + w = rank_weights(["a", "b", "c"], method="rs") + assert w["a"] > w["b"] > w["c"] + assert sum(w.values()) == pytest.approx(1.0) + + def test_rr(self): + w = rank_weights(["a", "b", "c"], method="rr") + assert w["a"] > w["b"] > w["c"] + assert sum(w.values()) == pytest.approx(1.0) + + def test_single_criterion(self): + w = rank_weights(["only"], method="roc") + assert w["only"] == pytest.approx(1.0) + + def test_unknown_method(self): + with pytest.raises(ValueError, match="Unknown method"): + rank_weights(["a", "b"], method="bad") + + +# =========================================================================== +# combine +# =========================================================================== + +class TestWLC: + def test_basic(self, criteria_dataset, weights_3): + result = wlc(criteria_dataset, weights_3) + # Manual check for [0,0]: 0.2*0.4 + 0.5*0.35 + 0.6*0.25 = 0.405 + assert float(result.values[0, 0]) == pytest.approx(0.405) + + def test_output_shape(self, criteria_dataset, weights_3): + result = wlc(criteria_dataset, weights_3) + assert result.shape == (3, 3) + + def test_missing_weight_raises(self, criteria_dataset): + with pytest.raises(ValueError, match="Missing weights"): + wlc(criteria_dataset, {"slope": 0.5, "distance": 0.5}) + + def test_weights_not_summing_raises(self, criteria_dataset): + with pytest.raises(ValueError, match="sum to"): + wlc(criteria_dataset, {"slope": 0.5, "distance": 0.5, "ndvi": 0.5}) + + def test_empty_dataset_raises(self): + ds = xr.Dataset() + with pytest.raises(ValueError, match="no variables"): + wlc(ds, {}) + + def test_not_dataset_raises(self): + da = xr.DataArray(np.array([[1.0]]), dims=["y", "x"]) + with pytest.raises(TypeError, match="xr.Dataset"): + wlc(da, {"a": 1.0}) + + +class TestWPM: + def test_basic(self, criteria_dataset, weights_3): + result = wpm(criteria_dataset, weights_3) + # Manual check for [0,0]: 0.2^0.4 * 0.5^0.35 * 0.6^0.25 + expected = 0.2 ** 0.4 * 0.5 ** 0.35 * 0.6 ** 0.25 + assert float(result.values[0, 0]) == pytest.approx(expected) + + def test_zero_value_kills_product(self): + """A zero criterion value should produce zero output.""" + ds = xr.Dataset({ + "a": xr.DataArray(np.array([[0.0]]), dims=["y", "x"]), + "b": xr.DataArray(np.array([[0.8]]), dims=["y", "x"]), + }) + result = wpm(ds, {"a": 0.5, "b": 0.5}) + assert float(result.values[0, 0]) == pytest.approx(0.0) + + +class TestWLCNaN: + def test_nan_propagates(self): + """NaN in any criterion should propagate to the output.""" + ds = xr.Dataset({ + "a": xr.DataArray( + np.array([[0.5, np.nan]], dtype=np.float64), dims=["y", "x"], + ), + "b": xr.DataArray( + np.array([[0.7, 0.4]], dtype=np.float64), dims=["y", "x"], + ), + }) + result = wlc(ds, {"a": 0.6, "b": 0.4}) + assert float(result.values[0, 0]) == pytest.approx(0.58) + assert np.isnan(result.values[0, 1]) + + +class TestOWA: + def test_uniform_order_weights_equals_wlc_exact( + self, criteria_dataset, weights_3 + ): + """Uniform order weights should EXACTLY equal WLC.""" + n = 3 + ow = [1.0 / n] * n + owa_result = owa(criteria_dataset, weights_3, ow) + wlc_result = wlc(criteria_dataset, weights_3) + np.testing.assert_allclose( + owa_result.values, wlc_result.values, atol=1e-14, + ) + + def test_wrong_order_weights_length_raises( + self, criteria_dataset, weights_3 + ): + with pytest.raises(ValueError, match="order_weights length"): + owa(criteria_dataset, weights_3, [0.5, 0.5]) + + def test_order_weights_not_summing_raises( + self, criteria_dataset, weights_3 + ): + with pytest.raises(ValueError, match="order_weights must sum"): + owa(criteria_dataset, weights_3, [0.5, 0.5, 0.5]) + + +class TestFuzzyOverlay: + def test_and_is_min(self, criteria_dataset): + result = fuzzy_overlay(criteria_dataset, operator="and") + # [0,0]: min(0.2, 0.5, 0.6) = 0.2 + assert float(result.values[0, 0]) == pytest.approx(0.2) + + def test_or_is_max(self, criteria_dataset): + result = fuzzy_overlay(criteria_dataset, operator="or") + # [0,0]: max(0.2, 0.5, 0.6) = 0.6 + assert float(result.values[0, 0]) == pytest.approx(0.6) + + def test_product(self, criteria_dataset): + result = fuzzy_overlay(criteria_dataset, operator="product") + # [0,0]: 0.2 * 0.5 * 0.6 = 0.06 + assert float(result.values[0, 0]) == pytest.approx(0.06) + + def test_sum(self, criteria_dataset): + result = fuzzy_overlay(criteria_dataset, operator="sum") + # [0,0]: 1 - (1-0.2)*(1-0.5)*(1-0.6) = 1 - 0.8*0.5*0.4 = 1 - 0.16 = 0.84 + assert float(result.values[0, 0]) == pytest.approx(0.84) + + def test_gamma(self, criteria_dataset): + result = fuzzy_overlay( + criteria_dataset, operator="gamma", gamma=0.5, + ) + # gamma=0.5: (fuzzy_sum^0.5) * (product^0.5) + prod = 0.2 * 0.5 * 0.6 + fsum = 1 - (0.8 * 0.5 * 0.4) + expected = (fsum ** 0.5) * (prod ** 0.5) + assert float(result.values[0, 0]) == pytest.approx(expected) + + def test_gamma_0_equals_product(self, criteria_dataset): + gamma0 = fuzzy_overlay( + criteria_dataset, operator="gamma", gamma=0.0, + ) + product = fuzzy_overlay(criteria_dataset, operator="product") + np.testing.assert_allclose(gamma0.values, product.values, atol=1e-10) + + def test_gamma_1_equals_sum(self, criteria_dataset): + gamma1 = fuzzy_overlay( + criteria_dataset, operator="gamma", gamma=1.0, + ) + fsum = fuzzy_overlay(criteria_dataset, operator="sum") + np.testing.assert_allclose(gamma1.values, fsum.values, atol=1e-10) + + def test_invalid_operator_raises(self, criteria_dataset): + with pytest.raises(ValueError, match="Unknown operator"): + fuzzy_overlay(criteria_dataset, operator="xor") + + def test_invalid_gamma_raises(self, criteria_dataset): + with pytest.raises(ValueError, match="gamma must be in"): + fuzzy_overlay(criteria_dataset, operator="gamma", gamma=1.5) + + +class TestBooleanOverlay: + def test_and(self): + a = xr.DataArray( + np.array([[True, True, False]]), dims=["y", "x"], + ) + b = xr.DataArray( + np.array([[True, False, False]]), dims=["y", "x"], + ) + result = boolean_overlay({"a": a, "b": b}, operator="and") + np.testing.assert_array_equal( + result.values, [[True, False, False]], + ) + + def test_or(self): + a = xr.DataArray( + np.array([[True, True, False]]), dims=["y", "x"], + ) + b = xr.DataArray( + np.array([[True, False, False]]), dims=["y", "x"], + ) + result = boolean_overlay({"a": a, "b": b}, operator="or") + np.testing.assert_array_equal( + result.values, [[True, True, False]], + ) + + def test_empty_raises(self): + with pytest.raises(ValueError, match="empty"): + boolean_overlay({}) + + def test_invalid_operator_raises(self): + a = xr.DataArray(np.array([[True]]), dims=["y", "x"]) + with pytest.raises(ValueError, match="Unknown operator"): + boolean_overlay({"a": a}, operator="xor") + + +# =========================================================================== +# constrain +# =========================================================================== + +class TestConstrain: + def test_basic(self): + suit = xr.DataArray( + np.array([[0.8, 0.6, 0.9], + [0.5, 0.7, 0.3]], dtype=np.float64), + dims=["y", "x"], + ) + water = xr.DataArray( + np.array([[False, False, True], + [False, False, False]]), + dims=["y", "x"], + ) + result = constrain(suit, exclude=[water]) + assert np.isnan(result.values[0, 2]) + assert float(result.values[0, 0]) == pytest.approx(0.8) + + def test_multiple_masks(self): + suit = xr.DataArray( + np.array([[0.8, 0.6, 0.9]], dtype=np.float64), + dims=["y", "x"], + ) + mask1 = xr.DataArray( + np.array([[True, False, False]]), dims=["y", "x"], + ) + mask2 = xr.DataArray( + np.array([[False, True, False]]), dims=["y", "x"], + ) + result = constrain(suit, exclude=[mask1, mask2]) + assert np.isnan(result.values[0, 0]) + assert np.isnan(result.values[0, 1]) + assert float(result.values[0, 2]) == pytest.approx(0.9) + + def test_custom_fill(self): + suit = xr.DataArray( + np.array([[0.8, 0.6]], dtype=np.float64), + dims=["y", "x"], + ) + mask = xr.DataArray( + np.array([[True, False]]), dims=["y", "x"], + ) + result = constrain(suit, exclude=[mask], fill=-1.0) + assert float(result.values[0, 0]) == pytest.approx(-1.0) + + +# =========================================================================== +# sensitivity +# =========================================================================== + +class TestSensitivityOAT: + def test_returns_dataset(self, criteria_dataset, weights_3): + result = sensitivity( + criteria_dataset, weights_3, + method="one_at_a_time", delta=0.05, + ) + assert isinstance(result, xr.Dataset) + assert set(result.data_vars) == {"slope", "distance", "ndvi"} + + def test_zero_delta_gives_zero_sensitivity( + self, criteria_dataset, weights_3 + ): + result = sensitivity( + criteria_dataset, weights_3, + method="one_at_a_time", delta=0.0, + ) + for var in result.data_vars: + np.testing.assert_allclose( + result[var].values, 0.0, atol=1e-10, + ) + + +class TestSensitivityMonteCarlo: + def test_returns_dataarray(self, criteria_dataset, weights_3): + result = sensitivity( + criteria_dataset, weights_3, + method="monte_carlo", n_samples=50, + ) + assert isinstance(result, xr.DataArray) + assert result.shape == (3, 3) + + def test_cv_non_negative(self, criteria_dataset, weights_3): + result = sensitivity( + criteria_dataset, weights_3, + method="monte_carlo", n_samples=50, + ) + assert np.all(result.values >= 0) + + def test_unknown_method_raises(self, criteria_dataset, weights_3): + with pytest.raises(ValueError, match="Unknown method"): + sensitivity(criteria_dataset, weights_3, method="bad") + + def test_unknown_combine_raises(self, criteria_dataset, weights_3): + with pytest.raises(ValueError, match="Unknown combine_method"): + sensitivity( + criteria_dataset, weights_3, combine_method="bad", + ) + + +@pytest.mark.skipif(not HAS_DASK, reason="Requires dask") +class TestSensitivityDask: + def test_monte_carlo_dask_returns_numpy(self): + """MC with dask input should compute eagerly, not build huge graph.""" + ds = xr.Dataset({ + "a": xr.DataArray( + np.random.rand(20, 20), dims=["y", "x"], + ).chunk({"y": 10, "x": 10}), + "b": xr.DataArray( + np.random.rand(20, 20), dims=["y", "x"], + ).chunk({"y": 10, "x": 10}), + }) + result = sensitivity( + ds, {"a": 0.6, "b": 0.4}, + method="monte_carlo", n_samples=30, + ) + # Result should be eagerly computed numpy, not dask + assert isinstance(result.data, np.ndarray) + assert result.shape == (20, 20) + assert np.all(result.values >= 0) + + +# =========================================================================== +# Edge cases +# =========================================================================== + +class TestSinglePixelRaster: + """All functions should work on a 1x1 raster.""" + + def test_standardize_methods(self): + single = xr.DataArray(np.array([[50.0]]), dims=["y", "x"]) + assert float(standardize( + single, method="linear", bounds=(0, 100), + ).values[0, 0]) == pytest.approx(0.5) + assert float(standardize( + single, method="sigmoidal", midpoint=50, spread=0.1, + ).values[0, 0]) == pytest.approx(0.5) + assert float(standardize( + single, method="gaussian", mean=50, std=10, + ).values[0, 0]) == pytest.approx(1.0) + assert float(standardize( + single, method="triangular", low=0, peak=50, high=100, + ).values[0, 0]) == pytest.approx(1.0) + assert float(standardize( + single, method="piecewise", + breakpoints=[0, 50, 100], values=[0.0, 1.0, 0.5], + ).values[0, 0]) == pytest.approx(1.0) + assert float(standardize( + single, method="categorical", mapping={50: 0.8}, + ).values[0, 0]) == pytest.approx(0.8) + + def test_combine_single_pixel(self): + ds = xr.Dataset({ + "a": xr.DataArray(np.array([[0.5]]), dims=["y", "x"]), + "b": xr.DataArray(np.array([[0.7]]), dims=["y", "x"]), + }) + w = {"a": 0.6, "b": 0.4} + assert float(wlc(ds, w).values[0, 0]) == pytest.approx(0.58) + expected_wpm = 0.5 ** 0.6 * 0.7 ** 0.4 + assert float(wpm(ds, w).values[0, 0]) == pytest.approx(expected_wpm) + + def test_full_pipeline_single_pixel(self): + raw = xr.DataArray(np.array([[50.0]]), dims=["y", "x"]) + std = standardize(raw, method="linear", bounds=(0, 100)) + ds = xr.Dataset({"a": std, "b": std}) + result = wlc(ds, {"a": 0.6, "b": 0.4}) + result = constrain( + result, + exclude=[xr.DataArray(np.array([[False]]), dims=["y", "x"])], + ) + assert result.shape == (1, 1) + assert float(result.values[0, 0]) == pytest.approx(0.5) + + +class TestNaNPropagation: + """NaN in any criterion should propagate through all combine methods.""" + + @pytest.fixture + def ds_with_nan(self): + return xr.Dataset({ + "a": xr.DataArray( + np.array([[0.5, np.nan, 0.3]], dtype=np.float64), + dims=["y", "x"], + ), + "b": xr.DataArray( + np.array([[0.7, 0.4, np.nan]], dtype=np.float64), + dims=["y", "x"], + ), + }) + + def test_wlc_nan(self, ds_with_nan): + r = wlc(ds_with_nan, {"a": 0.6, "b": 0.4}) + assert np.isfinite(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert np.isnan(r.values[0, 2]) + + def test_wpm_nan(self, ds_with_nan): + r = wpm(ds_with_nan, {"a": 0.6, "b": 0.4}) + assert np.isfinite(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert np.isnan(r.values[0, 2]) + + def test_fuzzy_and_nan(self, ds_with_nan): + r = fuzzy_overlay(ds_with_nan, operator="and") + assert np.isfinite(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert np.isnan(r.values[0, 2]) + + def test_fuzzy_or_nan(self, ds_with_nan): + r = fuzzy_overlay(ds_with_nan, operator="or") + assert np.isfinite(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert np.isnan(r.values[0, 2]) + + def test_fuzzy_product_nan(self, ds_with_nan): + r = fuzzy_overlay(ds_with_nan, operator="product") + assert np.isfinite(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert np.isnan(r.values[0, 2]) + + def test_fuzzy_sum_nan(self, ds_with_nan): + r = fuzzy_overlay(ds_with_nan, operator="sum") + assert np.isfinite(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert np.isnan(r.values[0, 2]) + + def test_fuzzy_gamma_nan(self, ds_with_nan): + r = fuzzy_overlay(ds_with_nan, operator="gamma", gamma=0.9) + assert np.isfinite(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert np.isnan(r.values[0, 2]) + + def test_owa_nan(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.array([[0.5, np.nan]], dtype=np.float64), + dims=["y", "x"], + ), + "b": xr.DataArray( + np.array([[0.7, 0.4]], dtype=np.float64), + dims=["y", "x"], + ), + "c": xr.DataArray( + np.array([[0.3, 0.6]], dtype=np.float64), + dims=["y", "x"], + ), + }) + w = {"a": 0.4, "b": 0.35, "c": 0.25} + r = owa(ds, w, [0.5, 0.3, 0.2]) + assert np.isfinite(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + + +class TestPiecewiseExtrapolation: + """np.interp clamps values outside the breakpoint range.""" + + def test_below_range_clamps_to_first_value(self): + data = np.array([[-100.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize( + agg, method="piecewise", + breakpoints=[0, 100], values=[0.2, 0.9], + ) + assert float(r.values[0, 0]) == pytest.approx(0.2) + + def test_above_range_clamps_to_last_value(self): + data = np.array([[500.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize( + agg, method="piecewise", + breakpoints=[0, 100], values=[0.2, 0.9], + ) + assert float(r.values[0, 0]) == pytest.approx(0.9) + + +class TestCategoricalPrecision: + """Categorical uses exact equality; near-miss float values are NaN.""" + + def test_exact_float_match(self): + data = np.array([[1.0, 2.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize( + agg, method="categorical", mapping={1: 0.9, 2: 0.5}, + ) + assert float(r.values[0, 0]) == pytest.approx(0.9) + assert float(r.values[0, 1]) == pytest.approx(0.5) + + def test_float_near_miss_is_nan(self): + """Floating point near-miss should not match.""" + data = np.array([[1.0000000001]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize( + agg, method="categorical", mapping={1: 0.9}, + ) + assert np.isnan(r.values[0, 0]) + + def test_integer_data(self): + """Integer data should match float keys via implicit cast.""" + data = np.array([[1, 2, 3]], dtype=np.int32) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize( + agg, method="categorical", + mapping={1: 0.9, 2: 0.7, 3: 0.4}, + ) + assert float(r.values[0, 0]) == pytest.approx(0.9) + assert float(r.values[0, 1]) == pytest.approx(0.7) + assert float(r.values[0, 2]) == pytest.approx(0.4) + + +class TestAHPIncomplete: + """AHP with fewer than n(n-1)/2 comparisons should warn.""" + + def test_warns_on_incomplete(self): + with pytest.warns(UserWarning, match="Only 1 of 3"): + ahp_weights( + criteria=["a", "b", "c"], + comparisons={("a", "b"): 3}, + ) + + def test_no_warning_when_complete(self): + import warnings + with warnings.catch_warnings(): + warnings.simplefilter("error", UserWarning) + ahp_weights( + criteria=["a", "b", "c"], + comparisons={ + ("a", "b"): 3, + ("a", "c"): 5, + ("b", "c"): 2, + }, + ) + + def test_missing_defaults_to_equal(self): + """Missing comparisons default to 1 (equal importance).""" + w_full, _ = ahp_weights( + criteria=["a", "b"], + comparisons={("a", "b"): 1}, + ) + with pytest.warns(UserWarning): + w_missing, _ = ahp_weights( + criteria=["a", "b", "c"], + comparisons={("a", "b"): 1}, + ) + # b and c should be approximately equal since their pairwise + # comparison defaults to 1 + assert abs(w_missing["b"] - w_missing["c"]) < 0.05 + + +class TestMismatchedCoords: + """Criteria with partially overlapping coords produce NaN outside overlap.""" + + def test_partial_overlap(self): + a = xr.DataArray( + np.array([[0.5, 0.3], [0.8, 0.2]], dtype=np.float64), + dims=["y", "x"], + coords={"y": [0, 1], "x": [0, 1]}, + ) + b = xr.DataArray( + np.array([[0.7, 0.4], [0.1, 0.9]], dtype=np.float64), + dims=["y", "x"], + coords={"y": [1, 2], "x": [0, 1]}, + ) + ds = xr.Dataset({"a": a, "b": b}) + result = wlc(ds, {"a": 0.6, "b": 0.4}) + # y=0: only a has data, b is NaN -> result NaN + assert np.isnan(result.sel(y=0, x=0).values) + # y=1: both have data -> valid + expected = 0.8 * 0.6 + 0.7 * 0.4 + assert float(result.sel(y=1, x=0).values) == pytest.approx(expected) + # y=2: only b has data, a is NaN -> result NaN + assert np.isnan(result.sel(y=2, x=0).values) + + +@pytest.mark.skipif(not HAS_DASK, reason="Requires dask") +class TestDaskChunkAlignment: + """Dask operations should preserve chunks and produce correct results.""" + + def test_standardize_preserves_chunks(self): + data = np.random.rand(20, 20) + agg = xr.DataArray(data, dims=["y", "x"]).chunk({"y": 10, "x": 10}) + for method in ["linear", "sigmoidal", "gaussian"]: + kw = {"bounds": (0, 1)} if method == "linear" else {} + if method == "sigmoidal": + kw = {"midpoint": 0.5, "spread": 1.0} + elif method == "gaussian": + kw = {"mean": 0.5, "std": 0.2} + result = standardize(agg, method=method, **kw) + assert hasattr(result.data, "compute"), f"{method} lost dask" + np_result = standardize( + xr.DataArray(data, dims=["y", "x"]), + method=method, **kw, + ) + np.testing.assert_allclose( + result.compute().values, np_result.values, + equal_nan=True, atol=1e-14, + ) + + def test_wlc_dask(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.random.rand(20, 20), dims=["y", "x"], + ).chunk({"y": 10}), + "b": xr.DataArray( + np.random.rand(20, 20), dims=["y", "x"], + ).chunk({"y": 10}), + }) + w = {"a": 0.6, "b": 0.4} + result = wlc(ds, w) + assert hasattr(result.data, "compute") + ds_np = ds.compute() + np_result = wlc(ds_np, w) + np.testing.assert_allclose( + result.compute().values, np_result.values, atol=1e-14, + ) + + def test_fuzzy_overlay_dask(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.random.rand(20, 20), dims=["y", "x"], + ).chunk({"y": 10}), + "b": xr.DataArray( + np.random.rand(20, 20), dims=["y", "x"], + ).chunk({"y": 10}), + }) + for op in ["and", "or", "product", "sum"]: + result = fuzzy_overlay(ds, operator=op) + ds_np = ds.compute() + np_result = fuzzy_overlay(ds_np, operator=op) + np.testing.assert_allclose( + result.compute().values, np_result.values, atol=1e-14, + err_msg=f"fuzzy {op} mismatch", + ) + + def test_oat_sensitivity_dask(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.random.rand(20, 20), dims=["y", "x"], + ).chunk({"y": 10}), + "b": xr.DataArray( + np.random.rand(20, 20), dims=["y", "x"], + ).chunk({"y": 10}), + }) + w = {"a": 0.6, "b": 0.4} + result = sensitivity(ds, w, method="one_at_a_time", delta=0.05) + assert isinstance(result, xr.Dataset) + for var in result.data_vars: + assert np.all(np.isfinite(result[var].compute().values)) + + +class TestWPMEdgeCases: + def test_all_ones(self): + """All criteria at 1.0 should produce 1.0 regardless of weights.""" + ds = xr.Dataset({ + "a": xr.DataArray(np.array([[1.0]]), dims=["y", "x"]), + "b": xr.DataArray(np.array([[1.0]]), dims=["y", "x"]), + }) + r = wpm(ds, {"a": 0.3, "b": 0.7}) + assert float(r.values[0, 0]) == pytest.approx(1.0) + + def test_all_zeros(self): + """All criteria at 0.0 should produce 0.0.""" + ds = xr.Dataset({ + "a": xr.DataArray(np.array([[0.0]]), dims=["y", "x"]), + "b": xr.DataArray(np.array([[0.0]]), dims=["y", "x"]), + }) + r = wpm(ds, {"a": 0.5, "b": 0.5}) + assert float(r.values[0, 0]) == pytest.approx(0.0) + + +class TestConstrainEdgeCases: + def test_empty_exclude_list(self): + """Empty exclude list should return input unchanged.""" + suit = xr.DataArray( + np.array([[0.8, 0.6]], dtype=np.float64), + dims=["y", "x"], + ) + result = constrain(suit, exclude=[]) + np.testing.assert_array_equal(result.values, suit.values) + + def test_all_excluded(self): + """All pixels excluded should be fill value.""" + suit = xr.DataArray( + np.array([[0.8, 0.6]], dtype=np.float64), + dims=["y", "x"], + ) + mask = xr.DataArray( + np.array([[True, True]]), dims=["y", "x"], + ) + result = constrain(suit, exclude=[mask]) + assert np.all(np.isnan(result.values)) + + +# =========================================================================== +# Input validation edge cases +# =========================================================================== + +class TestInvertedBounds: + def test_raises(self): + agg = xr.DataArray(np.array([[50.0]]), dims=["y", "x"]) + with pytest.raises(ValueError, match="bounds\\[0\\] must be <= bounds\\[1\\]"): + standardize(agg, method="linear", bounds=(100, 0)) + + +class TestPiecewiseDuplicateBreakpoints: + def test_raises(self): + agg = xr.DataArray(np.array([[5.0]]), dims=["y", "x"]) + with pytest.raises(ValueError, match="unique"): + standardize( + agg, method="piecewise", + breakpoints=[0, 5, 5, 10], values=[0, 0.3, 0.7, 1], + ) + + +class TestAHPValidation: + def test_self_comparison_raises(self): + with pytest.raises(ValueError, match="Self-comparison"): + ahp_weights( + criteria=["a", "b"], + comparisons={("a", "a"): 5, ("a", "b"): 3}, + ) + + def test_zero_comparison_raises(self): + with pytest.raises(ValueError, match="must be positive"): + ahp_weights( + criteria=["a", "b"], + comparisons={("a", "b"): 0}, + ) + + def test_negative_comparison_raises(self): + with pytest.raises(ValueError, match="must be positive"): + ahp_weights( + criteria=["a", "b"], + comparisons={("a", "b"): -3}, + ) + + +class TestExtraWeights: + def test_extra_keys_raises(self): + ds = xr.Dataset({ + "a": xr.DataArray(np.array([[0.5]]), dims=["y", "x"]), + }) + with pytest.raises(ValueError, match="not in criteria"): + wlc(ds, {"a": 0.5, "b": 0.3, "c": 0.2}) + + def test_extra_keys_wpm_raises(self): + ds = xr.Dataset({ + "a": xr.DataArray(np.array([[0.5]]), dims=["y", "x"]), + }) + with pytest.raises(ValueError, match="not in criteria"): + wpm(ds, {"a": 0.5, "b": 0.5}) + + +class TestBooleanOverlayFloat: + """Float data should be cast to bool (nonzero = True).""" + + def test_float_and(self): + a = xr.DataArray( + np.array([[0.5, 0.0, 1.0]]), dims=["y", "x"], + ) + b = xr.DataArray( + np.array([[0.3, 0.0, 1.0]]), dims=["y", "x"], + ) + result = boolean_overlay({"a": a, "b": b}, operator="and") + np.testing.assert_array_equal( + result.values, [[True, False, True]], + ) + + def test_float_or(self): + a = xr.DataArray( + np.array([[0.5, 0.0, 0.0]]), dims=["y", "x"], + ) + b = xr.DataArray( + np.array([[0.0, 0.0, 1.0]]), dims=["y", "x"], + ) + result = boolean_overlay({"a": a, "b": b}, operator="or") + np.testing.assert_array_equal( + result.values, [[True, False, True]], + ) + + +# =========================================================================== +# Inf handling +# =========================================================================== + +class TestInfValues: + """Inf values should be treated as non-finite (mapped to NaN).""" + + def test_linear_inf(self): + data = np.array([[np.inf, -np.inf, 50.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize(agg, method="linear", bounds=(0, 100)) + assert np.isnan(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert float(r.values[0, 2]) == pytest.approx(0.5) + + def test_sigmoidal_inf(self): + data = np.array([[np.inf, -np.inf, 50.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize(agg, method="sigmoidal", midpoint=50, spread=0.1) + assert np.isnan(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert np.isfinite(r.values[0, 2]) + + def test_gaussian_inf(self): + data = np.array([[np.inf, -np.inf, 50.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize(agg, method="gaussian", mean=50, std=10) + assert np.isnan(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert float(r.values[0, 2]) == pytest.approx(1.0) + + def test_triangular_inf(self): + data = np.array([[np.inf, -np.inf, 50.0]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize(agg, method="triangular", low=0, peak=50, high=100) + assert np.isnan(r.values[0, 0]) + assert np.isnan(r.values[0, 1]) + assert float(r.values[0, 2]) == pytest.approx(1.0) + + +# =========================================================================== +# Sensitivity edge cases +# =========================================================================== + +class TestSensitivityWeightClamping: + """Perturbed weights should be clamped to [0, 1].""" + + def test_clamp_to_zero(self): + """Weight near 0 perturbed negative should clamp to 0.""" + from xrspatial.mcda.sensitivity import _perturb_weights + + w = {"a": 0.02, "b": 0.58, "c": 0.40} + _, w_minus = _perturb_weights(w, "a", 0.05) + assert w_minus["a"] == pytest.approx(0.0) + assert sum(w_minus.values()) == pytest.approx(1.0) + + def test_clamp_to_one(self): + """Weight near 1 perturbed up should clamp to 1.""" + from xrspatial.mcda.sensitivity import _perturb_weights + + w = {"a": 0.98, "b": 0.02} + w_plus, _ = _perturb_weights(w, "a", 0.05) + assert w_plus["a"] == pytest.approx(1.0) + assert w_plus["b"] == pytest.approx(0.0) + assert sum(w_plus.values()) == pytest.approx(1.0) + + def test_perturbed_weights_sum_to_one(self): + """All perturbed weight sets must sum to 1.0.""" + from xrspatial.mcda.sensitivity import _perturb_weights + + w = {"a": 0.4, "b": 0.35, "c": 0.25} + for target in w: + for delta in [0.01, 0.05, 0.10, 0.20]: + wp, wm = _perturb_weights(w, target, delta) + assert sum(wp.values()) == pytest.approx(1.0, abs=1e-10) + assert sum(wm.values()) == pytest.approx(1.0, abs=1e-10) + + +class TestMonteCarloReproducibility: + """Same seed must produce identical results.""" + + def test_same_seed_same_result(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.random.rand(10, 10), dims=["y", "x"], + ), + "b": xr.DataArray( + np.random.rand(10, 10), dims=["y", "x"], + ), + }) + w = {"a": 0.6, "b": 0.4} + r1 = sensitivity(ds, w, method="monte_carlo", n_samples=30, seed=42) + r2 = sensitivity(ds, w, method="monte_carlo", n_samples=30, seed=42) + np.testing.assert_array_equal(r1.values, r2.values) + + def test_different_seed_different_result(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.random.rand(10, 10), dims=["y", "x"], + ), + "b": xr.DataArray( + np.random.rand(10, 10), dims=["y", "x"], + ), + }) + w = {"a": 0.6, "b": 0.4} + r1 = sensitivity(ds, w, method="monte_carlo", n_samples=30, seed=42) + r2 = sensitivity(ds, w, method="monte_carlo", n_samples=30, seed=99) + assert not np.array_equal(r1.values, r2.values) + + +# =========================================================================== +# Behavioral edge cases +# =========================================================================== + +class TestGaussianSmallStd: + """Very small std approaches a delta function.""" + + def test_tiny_std(self): + data = np.array([[49.9, 50.0, 50.1]], dtype=np.float64) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize(agg, method="gaussian", mean=50, std=1e-10) + assert float(r.values[0, 0]) == pytest.approx(0.0, abs=1e-100) + assert float(r.values[0, 1]) == pytest.approx(1.0) + assert float(r.values[0, 2]) == pytest.approx(0.0, abs=1e-100) + + +class TestOWARiskAttitude: + """Conservative OWA <= optimistic OWA for mixed-quality pixels.""" + + def test_conservative_le_optimistic(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.array([[0.9]]), dims=["y", "x"], + ), + "b": xr.DataArray( + np.array([[0.1]]), dims=["y", "x"], + ), + "c": xr.DataArray( + np.array([[0.5]]), dims=["y", "x"], + ), + }) + w = {"a": 1 / 3, "b": 1 / 3, "c": 1 / 3} + conservative = owa(ds, w, [0.1, 0.2, 0.7]) + optimistic = owa(ds, w, [0.7, 0.2, 0.1]) + assert float(conservative.values[0, 0]) <= float( + optimistic.values[0, 0] + ) + + +class TestRankWeightsManyCriteria: + """Rank weights with many criteria should still sum to 1.0 + and decrease monotonically.""" + + def test_20_criteria_roc(self): + names = [f"c{i}" for i in range(20)] + w = rank_weights(names, method="roc") + vals = list(w.values()) + assert sum(vals) == pytest.approx(1.0) + assert all(vals[i] >= vals[i + 1] for i in range(19)) + + def test_20_criteria_rs(self): + names = [f"c{i}" for i in range(20)] + w = rank_weights(names, method="rs") + vals = list(w.values()) + assert sum(vals) == pytest.approx(1.0) + assert all(vals[i] >= vals[i + 1] for i in range(19)) + + def test_20_criteria_rr(self): + names = [f"c{i}" for i in range(20)] + w = rank_weights(names, method="rr") + vals = list(w.values()) + assert sum(vals) == pytest.approx(1.0) + assert all(vals[i] >= vals[i + 1] for i in range(19)) + + +class TestConstantSurface: + """All-identical finite values with no explicit bounds.""" + + def test_linear_constant_no_bounds(self): + data = np.full((3, 3), 42.0) + agg = xr.DataArray(data, dims=["y", "x"]) + r = standardize(agg, method="linear") + np.testing.assert_allclose(r.values, 0.5) + + +class TestConstrainOverlappingMasks: + """Multiple masks excluding the same pixel should still work.""" + + def test_overlapping(self): + suit = xr.DataArray( + np.array([[0.8, 0.6, 0.9]], dtype=np.float64), + dims=["y", "x"], + ) + mask1 = xr.DataArray( + np.array([[True, True, False]]), dims=["y", "x"], + ) + mask2 = xr.DataArray( + np.array([[True, False, False]]), dims=["y", "x"], + ) + result = constrain(suit, exclude=[mask1, mask2]) + assert np.isnan(result.values[0, 0]) # excluded by both + assert np.isnan(result.values[0, 1]) # excluded by mask1 + assert float(result.values[0, 2]) == pytest.approx(0.9) + + +# =========================================================================== +# Degenerate input shapes +# =========================================================================== + +class TestSingleCriterionDataset: + """Dataset with a single variable should work for all combine methods.""" + + @pytest.fixture + def single_ds(self): + return xr.Dataset({ + "a": xr.DataArray( + np.array([[0.3, 0.7, 0.5]], dtype=np.float64), + dims=["y", "x"], + ), + }) + + def test_wlc_single(self, single_ds): + r = wlc(single_ds, {"a": 1.0}) + np.testing.assert_allclose(r.values, [[0.3, 0.7, 0.5]]) + + def test_wpm_single(self, single_ds): + r = wpm(single_ds, {"a": 1.0}) + np.testing.assert_allclose(r.values, [[0.3, 0.7, 0.5]]) + + def test_owa_single(self, single_ds): + r = owa(single_ds, {"a": 1.0}, [1.0]) + np.testing.assert_allclose(r.values, [[0.3, 0.7, 0.5]]) + + def test_fuzzy_all_operators_single(self, single_ds): + for op in ["and", "or", "product", "sum", "gamma"]: + kw = {"gamma": 0.9} if op == "gamma" else {} + r = fuzzy_overlay(single_ds, operator=op, **kw) + np.testing.assert_allclose( + r.values, [[0.3, 0.7, 0.5]], err_msg=f"fuzzy {op}", + ) + + +class TestBooleanOverlayNaN: + """NaN cast to bool is True -- document this gotcha.""" + + def test_nan_treated_as_true(self): + a = xr.DataArray( + np.array([[True, False]]), dims=["y", "x"], + ) + b = xr.DataArray( + np.array([[np.nan, np.nan]]), dims=["y", "x"], + ) + # bool(NaN) = True, so NaN pixels are treated as True + r_and = boolean_overlay({"a": a, "b": b}, operator="and") + np.testing.assert_array_equal(r_and.values, [[True, False]]) + r_or = boolean_overlay({"a": a, "b": b}, operator="or") + np.testing.assert_array_equal(r_or.values, [[True, True]]) + + +class TestSensitivitySingleCriterion: + """Single criterion: perturbation is a no-op, sensitivity is zero.""" + + def test_oat_single_criterion(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.array([[0.5, 0.8]], dtype=np.float64), + dims=["y", "x"], + ), + }) + r = sensitivity(ds, {"a": 1.0}, method="one_at_a_time", delta=0.05) + assert isinstance(r, xr.Dataset) + np.testing.assert_allclose(r["a"].values, 0.0, atol=1e-15) + + def test_mc_single_criterion(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.array([[0.5, 0.8]], dtype=np.float64), + dims=["y", "x"], + ), + }) + r = sensitivity( + ds, {"a": 1.0}, method="monte_carlo", n_samples=20, + ) + # Single criterion -> all weight vectors are [1.0] -> zero variance + np.testing.assert_allclose(r.values, 0.0, atol=1e-10) + + +class TestSensitivityWithWPM: + """OAT and MC should work with WPM combine method.""" + + def test_oat_wpm(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.array([[0.5, 0.8]], dtype=np.float64), + dims=["y", "x"], + ), + "b": xr.DataArray( + np.array([[0.3, 0.9]], dtype=np.float64), + dims=["y", "x"], + ), + }) + r = sensitivity( + ds, {"a": 0.6, "b": 0.4}, + method="one_at_a_time", combine_method="wpm", delta=0.05, + ) + assert isinstance(r, xr.Dataset) + for var in r.data_vars: + assert np.all(np.isfinite(r[var].values)) + assert np.all(r[var].values >= 0) + + def test_mc_wpm(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.array([[0.5, 0.8]], dtype=np.float64), + dims=["y", "x"], + ), + "b": xr.DataArray( + np.array([[0.3, 0.9]], dtype=np.float64), + dims=["y", "x"], + ), + }) + r = sensitivity( + ds, {"a": 0.6, "b": 0.4}, + method="monte_carlo", combine_method="wpm", n_samples=20, + ) + assert np.all(np.isfinite(r.values)) + assert np.all(r.values >= 0) + + +class TestAHPManyCriteria: + """AHP with >15 criteria uses last _RI value as fallback.""" + + def test_16_criteria_all_equal(self): + names = [f"c{i}" for i in range(16)] + comps = {} + for i in range(16): + for j in range(i + 1, 16): + comps[(names[i], names[j])] = 1 + w, c = ahp_weights(names, comps) + assert sum(w.values()) == pytest.approx(1.0) + # All equal comparisons -> all equal weights + for v in w.values(): + assert v == pytest.approx(1 / 16, abs=0.001) + assert c.is_consistent + + +class TestDuplicateCriteriaNames: + """Duplicate names in criteria lists should raise.""" + + def test_ahp_duplicates_raise(self): + with pytest.raises(ValueError, match="Duplicate"): + ahp_weights( + criteria=["a", "a", "b"], + comparisons={("a", "b"): 3}, + ) + + def test_rank_weights_duplicates_raise(self): + with pytest.raises(ValueError, match="Duplicate"): + rank_weights(["a", "a", "b"]) + + +class TestConstrainIntMask: + """Integer masks should work (nonzero = excluded).""" + + def test_int_mask(self): + suit = xr.DataArray( + np.array([[0.8, 0.6, 0.9]], dtype=np.float64), + dims=["y", "x"], + ) + mask = xr.DataArray( + np.array([[1, 0, 0]], dtype=np.int32), dims=["y", "x"], + ) + r = constrain(suit, exclude=[mask]) + assert np.isnan(r.values[0, 0]) + assert float(r.values[0, 1]) == pytest.approx(0.6) + assert float(r.values[0, 2]) == pytest.approx(0.9) + + +class TestMonteCarloSingleSample: + """n_samples=1 produces zero variance (CV=0 everywhere).""" + + def test_single_sample_zero_cv(self): + ds = xr.Dataset({ + "a": xr.DataArray( + np.random.rand(5, 5), dims=["y", "x"], + ), + "b": xr.DataArray( + np.random.rand(5, 5), dims=["y", "x"], + ), + }) + r = sensitivity( + ds, {"a": 0.6, "b": 0.4}, + method="monte_carlo", n_samples=1, + ) + np.testing.assert_allclose(r.values, 0.0) + + +class TestThreeDimensionalInput: + """3D DataArrays should work with explicit bounds.""" + + def test_3d_linear(self): + data = np.random.rand(3, 10, 10) + agg = xr.DataArray(data, dims=["band", "y", "x"]) + r = standardize(agg, method="linear", bounds=(0, 1)) + assert r.shape == (3, 10, 10) + # All values should be in [0, 1] since data is in [0, 1) + assert np.nanmin(r.values) >= 0.0 + assert np.nanmax(r.values) <= 1.0 + + def test_3d_gaussian(self): + data = np.random.rand(2, 5, 5) + agg = xr.DataArray(data, dims=["band", "y", "x"]) + r = standardize(agg, method="gaussian", mean=0.5, std=0.2) + assert r.shape == (2, 5, 5) + assert np.all(np.isfinite(r.values)) + + +# =========================================================================== +# Integration: end-to-end workflow +# =========================================================================== + +class TestEndToEnd: + def test_site_suitability_workflow(self): + """Full MCDA workflow: standardize -> weight -> combine -> constrain.""" + np.random.seed(1030) + shape = (10, 10) + slope = xr.DataArray( + np.random.uniform(0, 45, shape), dims=["y", "x"], + ) + distance = xr.DataArray( + np.random.uniform(0, 10000, shape), dims=["y", "x"], + ) + ndvi = xr.DataArray( + np.random.uniform(0, 1, shape), dims=["y", "x"], + ) + water = xr.DataArray( + np.random.random(shape) > 0.9, dims=["y", "x"], + ) + + # Standardize + slope_std = standardize( + slope, method="linear", + direction="lower_is_better", bounds=(0, 45), + ) + dist_std = standardize( + distance, method="sigmoidal", midpoint=5000, spread=-0.001, + ) + ndvi_std = standardize( + ndvi, method="gaussian", mean=0.65, std=0.15, + ) + + # Combine + criteria = xr.Dataset({ + "slope": slope_std, + "distance": dist_std, + "ndvi": ndvi_std, + }) + weights = {"slope": 0.4, "distance": 0.35, "ndvi": 0.25} + suitability = wlc(criteria, weights) + + # Constrain + suitability = constrain(suitability, exclude=[water]) + + # Check output + assert suitability.shape == shape + # Water pixels should be NaN + assert np.isnan(suitability.values[water.values]).all() + # Non-water pixels should be in [0, 1] + valid = suitability.values[~water.values & np.isfinite(suitability.values)] + assert valid.min() >= 0.0 + assert valid.max() <= 1.0