diff --git a/src/policyengine/outputs/__init__.py b/src/policyengine/outputs/__init__.py index cac44e5..8e64e51 100644 --- a/src/policyengine/outputs/__init__.py +++ b/src/policyengine/outputs/__init__.py @@ -4,6 +4,14 @@ ChangeAggregate, ChangeAggregateType, ) +from policyengine.outputs.congressional_district_impact import ( + CongressionalDistrictImpact, + compute_us_congressional_district_impacts, +) +from policyengine.outputs.constituency_impact import ( + ConstituencyImpact, + compute_uk_constituency_impacts, +) from policyengine.outputs.decile_impact import ( DecileImpact, calculate_decile_impacts, @@ -15,6 +23,14 @@ calculate_uk_inequality, calculate_us_inequality, ) +from policyengine.outputs.intra_decile_impact import ( + IntraDecileImpact, + compute_intra_decile_impacts, +) +from policyengine.outputs.local_authority_impact import ( + LocalAuthorityImpact, + compute_uk_local_authority_impacts, +) from policyengine.outputs.poverty import ( UK_POVERTY_VARIABLES, US_POVERTY_VARIABLES, @@ -34,6 +50,8 @@ "ChangeAggregateType", "DecileImpact", "calculate_decile_impacts", + "IntraDecileImpact", + "compute_intra_decile_impacts", "Poverty", "UKPovertyType", "USPovertyType", @@ -46,4 +64,10 @@ "US_INEQUALITY_INCOME_VARIABLE", "calculate_uk_inequality", "calculate_us_inequality", + "CongressionalDistrictImpact", + "compute_us_congressional_district_impacts", + "ConstituencyImpact", + "compute_uk_constituency_impacts", + "LocalAuthorityImpact", + "compute_uk_local_authority_impacts", ] diff --git a/src/policyengine/outputs/congressional_district_impact.py b/src/policyengine/outputs/congressional_district_impact.py new file mode 100644 index 0000000..1f7f524 --- /dev/null +++ b/src/policyengine/outputs/congressional_district_impact.py @@ -0,0 +1,100 @@ +"""Congressional district impact output class for US policy reforms.""" + +from typing import TYPE_CHECKING + +import numpy as np +from pydantic import ConfigDict + +from policyengine.core import Output + +if TYPE_CHECKING: + from policyengine.core.simulation import Simulation + + +class CongressionalDistrictImpact(Output): + """Per-congressional-district income change from a policy reform. + + Groups households by congressional_district_geoid (integer SSDD format + where SS = state FIPS, DD = district number) and computes weighted + average and relative household income changes per district. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + baseline_simulation: "Simulation" + reform_simulation: "Simulation" + + # Results populated by run() + district_results: list[dict] | None = None + + def run(self) -> None: + """Group households by geoid and compute per-district metrics.""" + baseline_hh = self.baseline_simulation.output_dataset.data.household + reform_hh = self.reform_simulation.output_dataset.data.household + + geoids = baseline_hh["congressional_district_geoid"].values + baseline_income = baseline_hh["household_net_income"].values + reform_income = reform_hh["household_net_income"].values + weights = baseline_hh["household_weight"].values + + # Only include valid geoids (positive integers) + unique_geoids = np.unique(geoids[geoids > 0]) + + results: list[dict] = [] + for geoid in unique_geoids: + mask = geoids == geoid + w = weights[mask] + total_weight = float(w.sum()) + if total_weight == 0: + continue + + b_inc = baseline_income[mask] + r_inc = reform_income[mask] + + weighted_baseline = float((b_inc * w).sum()) + weighted_reform = float((r_inc * w).sum()) + + avg_change = (weighted_reform - weighted_baseline) / total_weight + rel_change = ( + (weighted_reform / weighted_baseline - 1.0) + if weighted_baseline != 0 + else 0.0 + ) + + geoid_int = int(geoid) + state_fips = geoid_int // 100 + district_number = geoid_int % 100 + + results.append( + { + "district_geoid": geoid_int, + "state_fips": state_fips, + "district_number": district_number, + "average_household_income_change": float(avg_change), + "relative_household_income_change": float(rel_change), + "population": total_weight, + } + ) + + self.district_results = results + + +def compute_us_congressional_district_impacts( + baseline_simulation: "Simulation", + reform_simulation: "Simulation", +) -> CongressionalDistrictImpact: + """Compute per-congressional-district income changes. + + Args: + baseline_simulation: Completed baseline simulation. + reform_simulation: Completed reform simulation. + + Returns: + CongressionalDistrictImpact with district_results populated. + """ + impact = CongressionalDistrictImpact.model_construct( + baseline_simulation=baseline_simulation, + reform_simulation=reform_simulation, + ) + impact.run() + return impact diff --git a/src/policyengine/outputs/constituency_impact.py b/src/policyengine/outputs/constituency_impact.py new file mode 100644 index 0000000..5cee7f4 --- /dev/null +++ b/src/policyengine/outputs/constituency_impact.py @@ -0,0 +1,126 @@ +"""UK parliamentary constituency impact output class. + +Computes per-constituency income changes using pre-computed weight matrices. +Each constituency has a row in the weight matrix (shape: 650 x N_households) +that reweights all households to represent that constituency's demographics. +""" + +from typing import TYPE_CHECKING + +import h5py +import numpy as np +import pandas as pd +from pydantic import ConfigDict + +from policyengine.core import Output + +if TYPE_CHECKING: + from policyengine.core.simulation import Simulation + + +class ConstituencyImpact(Output): + """Per-parliamentary-constituency income change from a UK policy reform. + + Uses pre-computed weight matrices from GCS to reweight households + for each of 650 constituencies, then computes weighted average and + relative household income changes. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + baseline_simulation: "Simulation" + reform_simulation: "Simulation" + weight_matrix_path: str + constituency_csv_path: str + year: str = "2025" + + # Results populated by run() + constituency_results: list[dict] | None = None + + def run(self) -> None: + """Load weight matrix and compute per-constituency metrics.""" + # Load constituency metadata (code, name, x, y) + constituency_df = pd.read_csv(self.constituency_csv_path) + + # Load weight matrix: shape (N_constituencies, N_households) + with h5py.File(self.weight_matrix_path, "r") as f: + weight_matrix = f[self.year][...] + + # Get household income arrays from output datasets + baseline_hh = self.baseline_simulation.output_dataset.data.household + reform_hh = self.reform_simulation.output_dataset.data.household + + baseline_income = baseline_hh["household_net_income"].values + reform_income = reform_hh["household_net_income"].values + + results: list[dict] = [] + for i in range(len(constituency_df)): + row = constituency_df.iloc[i] + code = str(row["code"]) + name = str(row["name"]) + x = int(row["x"]) + y = int(row["y"]) + w = weight_matrix[i] + + total_weight = float(np.sum(w)) + if total_weight == 0: + continue + + weighted_baseline = float(np.sum(baseline_income * w)) + weighted_reform = float(np.sum(reform_income * w)) + + # Count of weighted households + count = float(np.sum(w > 0)) + if count == 0: + continue + + avg_change = (weighted_reform - weighted_baseline) / total_weight + rel_change = ( + (weighted_reform / weighted_baseline - 1.0) + if weighted_baseline != 0 + else 0.0 + ) + + results.append( + { + "constituency_code": code, + "constituency_name": name, + "x": x, + "y": y, + "average_household_income_change": float(avg_change), + "relative_household_income_change": float(rel_change), + "population": total_weight, + } + ) + + self.constituency_results = results + + +def compute_uk_constituency_impacts( + baseline_simulation: "Simulation", + reform_simulation: "Simulation", + weight_matrix_path: str, + constituency_csv_path: str, + year: str = "2025", +) -> ConstituencyImpact: + """Compute per-constituency income changes for UK. + + Args: + baseline_simulation: Completed baseline simulation. + reform_simulation: Completed reform simulation. + weight_matrix_path: Path to parliamentary_constituency_weights.h5. + constituency_csv_path: Path to constituencies_2024.csv. + year: Year key in the H5 file (default "2025"). + + Returns: + ConstituencyImpact with constituency_results populated. + """ + impact = ConstituencyImpact.model_construct( + baseline_simulation=baseline_simulation, + reform_simulation=reform_simulation, + weight_matrix_path=weight_matrix_path, + constituency_csv_path=constituency_csv_path, + year=year, + ) + impact.run() + return impact diff --git a/src/policyengine/outputs/decile_impact.py b/src/policyengine/outputs/decile_impact.py index f58c631..d767e0f 100644 --- a/src/policyengine/outputs/decile_impact.py +++ b/src/policyengine/outputs/decile_impact.py @@ -16,6 +16,7 @@ class DecileImpact(Output): baseline_simulation: Simulation reform_simulation: Simulation income_variable: str = "equiv_hbai_household_net_income" + decile_variable: str | None = None # If set, use pre-computed grouping variable entity: str | None = None decile: int quantiles: int = 10 @@ -68,16 +69,19 @@ def run(self): baseline_income = baseline_data[self.income_variable] reform_income = reform_data[self.income_variable] - # Calculate deciles based on baseline income - decile_series = ( - pd.qcut( - baseline_income, - self.quantiles, - labels=False, - duplicates="drop", + # Calculate deciles: use pre-computed variable or qcut + if self.decile_variable: + decile_series = baseline_data[self.decile_variable] + else: + decile_series = ( + pd.qcut( + baseline_income, + self.quantiles, + labels=False, + duplicates="drop", + ) + + 1 ) - + 1 - ) # Calculate changes absolute_change = reform_income - baseline_income diff --git a/src/policyengine/outputs/intra_decile_impact.py b/src/policyengine/outputs/intra_decile_impact.py new file mode 100644 index 0000000..52ab692 --- /dev/null +++ b/src/policyengine/outputs/intra_decile_impact.py @@ -0,0 +1,180 @@ +"""Intra-decile impact output. + +Computes the distribution of income change categories within each decile. +Each row represents one decile (1-10) or the overall average (decile=0), +with five proportion columns summing to ~1.0. + +The five categories classify households by their percentage income change: + - lose_more_than_5pct: change <= -5% + - lose_less_than_5pct: -5% < change <= -0.1% + - no_change: -0.1% < change <= 0.1% + - gain_less_than_5pct: 0.1% < change <= 5% + - gain_more_than_5pct: change > 5% + +Proportions are people-weighted (using household_count_people * +household_weight) so they reflect the share of people, not households. +""" + +import numpy as np +import pandas as pd +from pydantic import ConfigDict + +from policyengine.core import Output, OutputCollection, Simulation + +# The 5-category thresholds +BOUNDS = [-np.inf, -0.05, -1e-3, 1e-3, 0.05, np.inf] +CATEGORY_NAMES = [ + "lose_more_than_5pct", + "lose_less_than_5pct", + "no_change", + "gain_less_than_5pct", + "gain_more_than_5pct", +] + + +class IntraDecileImpact(Output): + """Single decile's intra-decile impact — proportion of people in each + income change category.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + baseline_simulation: Simulation + reform_simulation: Simulation + income_variable: str = "household_net_income" + decile_variable: str | None = None # If set, use pre-computed grouping + entity: str = "household" + decile: int # 1-10 for individual deciles + quantiles: int = 10 + + # Results populated by run() + lose_more_than_5pct: float | None = None + lose_less_than_5pct: float | None = None + no_change: float | None = None + gain_less_than_5pct: float | None = None + gain_more_than_5pct: float | None = None + + def run(self): + """Calculate intra-decile proportions for this specific decile.""" + baseline_data = getattr( + self.baseline_simulation.output_dataset.data, self.entity + ) + reform_data = getattr( + self.reform_simulation.output_dataset.data, self.entity + ) + + baseline_income = baseline_data[self.income_variable].values + reform_income = reform_data[self.income_variable].values + + # Determine decile grouping + if self.decile_variable: + decile_series = baseline_data[self.decile_variable].values + else: + decile_series = ( + pd.qcut( + baseline_income, + self.quantiles, + labels=False, + duplicates="drop", + ) + + 1 + ) + + # People-weighted counts + weights = baseline_data[f"{self.entity}_weight"].values + if self.entity == "household": + people_count = baseline_data["household_count_people"].values + people = people_count * weights + else: + people = weights + + # Compute percentage income change + capped_baseline = np.maximum(baseline_income, 1.0) + income_change = (reform_income - baseline_income) / capped_baseline + + in_decile = decile_series == self.decile + people_in_decile = float(np.sum(people[in_decile])) + + if people_in_decile == 0: + self.lose_more_than_5pct = 0.0 + self.lose_less_than_5pct = 0.0 + self.no_change = 1.0 + self.gain_less_than_5pct = 0.0 + self.gain_more_than_5pct = 0.0 + return + + proportions = [] + for lower, upper in zip(BOUNDS[:-1], BOUNDS[1:]): + in_category = (income_change > lower) & (income_change <= upper) + in_both = in_decile & in_category + proportions.append(float(np.sum(people[in_both]) / people_in_decile)) + + self.lose_more_than_5pct = proportions[0] + self.lose_less_than_5pct = proportions[1] + self.no_change = proportions[2] + self.gain_less_than_5pct = proportions[3] + self.gain_more_than_5pct = proportions[4] + + +def compute_intra_decile_impacts( + baseline_simulation: Simulation, + reform_simulation: Simulation, + income_variable: str = "household_net_income", + decile_variable: str | None = None, + entity: str = "household", + quantiles: int = 10, +) -> OutputCollection[IntraDecileImpact]: + """Compute intra-decile proportions for all deciles + overall average. + + Returns: + OutputCollection containing list of IntraDecileImpact objects + (deciles 1-N plus overall average at decile=0) and DataFrame. + """ + results = [] + for decile in range(1, quantiles + 1): + impact = IntraDecileImpact.model_construct( + baseline_simulation=baseline_simulation, + reform_simulation=reform_simulation, + income_variable=income_variable, + decile_variable=decile_variable, + entity=entity, + decile=decile, + quantiles=quantiles, + ) + impact.run() + results.append(impact) + + # Overall average (decile=0): arithmetic mean of decile proportions + overall = IntraDecileImpact.model_construct( + baseline_simulation=baseline_simulation, + reform_simulation=reform_simulation, + income_variable=income_variable, + decile_variable=decile_variable, + entity=entity, + decile=0, + quantiles=quantiles, + lose_more_than_5pct=sum(r.lose_more_than_5pct for r in results) / quantiles, + lose_less_than_5pct=sum(r.lose_less_than_5pct for r in results) / quantiles, + no_change=sum(r.no_change for r in results) / quantiles, + gain_less_than_5pct=sum(r.gain_less_than_5pct for r in results) / quantiles, + gain_more_than_5pct=sum(r.gain_more_than_5pct for r in results) / quantiles, + ) + results.append(overall) + + # Create DataFrame + df = pd.DataFrame( + [ + { + "baseline_simulation_id": r.baseline_simulation.id, + "reform_simulation_id": r.reform_simulation.id, + "decile": r.decile, + "lose_more_than_5pct": r.lose_more_than_5pct, + "lose_less_than_5pct": r.lose_less_than_5pct, + "no_change": r.no_change, + "gain_less_than_5pct": r.gain_less_than_5pct, + "gain_more_than_5pct": r.gain_more_than_5pct, + } + for r in results + ] + ) + + return OutputCollection(outputs=results, dataframe=df) diff --git a/src/policyengine/outputs/local_authority_impact.py b/src/policyengine/outputs/local_authority_impact.py new file mode 100644 index 0000000..fc91f3e --- /dev/null +++ b/src/policyengine/outputs/local_authority_impact.py @@ -0,0 +1,125 @@ +"""UK local authority impact output class. + +Computes per-local-authority income changes using pre-computed weight matrices. +Each local authority has a row in the weight matrix (shape: 360 x N_households) +that reweights all households to represent that local authority's demographics. +""" + +from typing import TYPE_CHECKING + +import h5py +import numpy as np +import pandas as pd +from pydantic import ConfigDict + +from policyengine.core import Output + +if TYPE_CHECKING: + from policyengine.core.simulation import Simulation + + +class LocalAuthorityImpact(Output): + """Per-local-authority income change from a UK policy reform. + + Uses pre-computed weight matrices from GCS to reweight households + for each of 360 local authorities, then computes weighted average and + relative household income changes. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + baseline_simulation: "Simulation" + reform_simulation: "Simulation" + weight_matrix_path: str + local_authority_csv_path: str + year: str = "2025" + + # Results populated by run() + local_authority_results: list[dict] | None = None + + def run(self) -> None: + """Load weight matrix and compute per-local-authority metrics.""" + # Load local authority metadata (code, x, y, name) + la_df = pd.read_csv(self.local_authority_csv_path) + + # Load weight matrix: shape (N_local_authorities, N_households) + with h5py.File(self.weight_matrix_path, "r") as f: + weight_matrix = f[self.year][...] + + # Get household income arrays from output datasets + baseline_hh = self.baseline_simulation.output_dataset.data.household + reform_hh = self.reform_simulation.output_dataset.data.household + + baseline_income = baseline_hh["household_net_income"].values + reform_income = reform_hh["household_net_income"].values + + results: list[dict] = [] + for i in range(len(la_df)): + row = la_df.iloc[i] + code = str(row["code"]) + name = str(row["name"]) + x = int(row["x"]) + y = int(row["y"]) + w = weight_matrix[i] + + total_weight = float(np.sum(w)) + if total_weight == 0: + continue + + weighted_baseline = float(np.sum(baseline_income * w)) + weighted_reform = float(np.sum(reform_income * w)) + + count = float(np.sum(w > 0)) + if count == 0: + continue + + avg_change = (weighted_reform - weighted_baseline) / total_weight + rel_change = ( + (weighted_reform / weighted_baseline - 1.0) + if weighted_baseline != 0 + else 0.0 + ) + + results.append( + { + "local_authority_code": code, + "local_authority_name": name, + "x": x, + "y": y, + "average_household_income_change": float(avg_change), + "relative_household_income_change": float(rel_change), + "population": total_weight, + } + ) + + self.local_authority_results = results + + +def compute_uk_local_authority_impacts( + baseline_simulation: "Simulation", + reform_simulation: "Simulation", + weight_matrix_path: str, + local_authority_csv_path: str, + year: str = "2025", +) -> LocalAuthorityImpact: + """Compute per-local-authority income changes for UK. + + Args: + baseline_simulation: Completed baseline simulation. + reform_simulation: Completed reform simulation. + weight_matrix_path: Path to local_authority_weights.h5. + local_authority_csv_path: Path to local_authorities_2021.csv. + year: Year key in the H5 file (default "2025"). + + Returns: + LocalAuthorityImpact with local_authority_results populated. + """ + impact = LocalAuthorityImpact.model_construct( + baseline_simulation=baseline_simulation, + reform_simulation=reform_simulation, + weight_matrix_path=weight_matrix_path, + local_authority_csv_path=local_authority_csv_path, + year=year, + ) + impact.run() + return impact diff --git a/src/policyengine/tax_benefit_models/uk/model.py b/src/policyengine/tax_benefit_models/uk/model.py index cb2337e..504ae49 100644 --- a/src/policyengine/tax_benefit_models/uk/model.py +++ b/src/policyengine/tax_benefit_models/uk/model.py @@ -107,6 +107,7 @@ class PolicyEngineUKLatest(TaxBenefitModelVersion): # Income measures "household_net_income", "household_income_decile", + "household_wealth_decile", "hbai_household_net_income", "equiv_hbai_household_net_income", "household_market_income", diff --git a/src/policyengine/tax_benefit_models/us/model.py b/src/policyengine/tax_benefit_models/us/model.py index 7eddf54..b80a4d3 100644 --- a/src/policyengine/tax_benefit_models/us/model.py +++ b/src/policyengine/tax_benefit_models/us/model.py @@ -113,6 +113,7 @@ class PolicyEngineUSLatest(TaxBenefitModelVersion): "household_benefits", "household_tax", "household_market_income", + "congressional_district_geoid", ], } diff --git a/tests/test_congressional_district_impact.py b/tests/test_congressional_district_impact.py new file mode 100644 index 0000000..576e539 --- /dev/null +++ b/tests/test_congressional_district_impact.py @@ -0,0 +1,131 @@ +"""Unit tests for CongressionalDistrictImpact output class.""" + +from unittest.mock import MagicMock + +import pandas as pd +from microdf import MicroDataFrame + +from policyengine.outputs.congressional_district_impact import ( + compute_us_congressional_district_impacts, +) + + +def _make_sim(household_data: dict) -> MagicMock: + """Create a mock Simulation with household-level data.""" + hh_df = MicroDataFrame( + pd.DataFrame(household_data), + weights="household_weight", + ) + sim = MagicMock() + sim.output_dataset.data.household = hh_df + sim.id = "test-sim" + return sim + + +def test_basic_district_grouping(): + """Two districts with known incomes produce correct per-district metrics.""" + baseline = _make_sim( + { + "congressional_district_geoid": [101, 101, 202], + "household_net_income": [50000.0, 60000.0, 40000.0], + "household_weight": [1.0, 1.0, 1.0], + } + ) + reform = _make_sim( + { + "congressional_district_geoid": [101, 101, 202], + "household_net_income": [52000.0, 62000.0, 42000.0], + "household_weight": [1.0, 1.0, 1.0], + } + ) + + impact = compute_us_congressional_district_impacts(baseline, reform) + + assert impact.district_results is not None + assert len(impact.district_results) == 2 + + by_geoid = {r["district_geoid"]: r for r in impact.district_results} + + # District 101: baseline avg = 55000, reform avg = 57000, change = 2000 + d101 = by_geoid[101] + assert d101["state_fips"] == 1 + assert d101["district_number"] == 1 + assert abs(d101["average_household_income_change"] - 2000.0) < 1e-6 + assert d101["population"] == 2.0 + + # District 202: baseline = 40000, reform = 42000, change = 2000 + d202 = by_geoid[202] + assert d202["state_fips"] == 2 + assert d202["district_number"] == 2 + assert abs(d202["average_household_income_change"] - 2000.0) < 1e-6 + + +def test_negative_geoid_excluded(): + """Households with geoid <= 0 are excluded from results.""" + baseline = _make_sim( + { + "congressional_district_geoid": [-1, 0, 101], + "household_net_income": [50000.0, 50000.0, 50000.0], + "household_weight": [1.0, 1.0, 1.0], + } + ) + reform = _make_sim( + { + "congressional_district_geoid": [-1, 0, 101], + "household_net_income": [55000.0, 55000.0, 55000.0], + "household_weight": [1.0, 1.0, 1.0], + } + ) + + impact = compute_us_congressional_district_impacts(baseline, reform) + + assert len(impact.district_results) == 1 + assert impact.district_results[0]["district_geoid"] == 101 + + +def test_zero_weight_district_skipped(): + """Districts where all households have zero weight produce no result.""" + baseline = _make_sim( + { + "congressional_district_geoid": [101, 202], + "household_net_income": [50000.0, 50000.0], + "household_weight": [1.0, 0.0], + } + ) + reform = _make_sim( + { + "congressional_district_geoid": [101, 202], + "household_net_income": [55000.0, 55000.0], + "household_weight": [1.0, 0.0], + } + ) + + impact = compute_us_congressional_district_impacts(baseline, reform) + + assert len(impact.district_results) == 1 + assert impact.district_results[0]["district_geoid"] == 101 + + +def test_weighted_average_change(): + """Weighted average income change is computed correctly.""" + baseline = _make_sim( + { + "congressional_district_geoid": [101, 101], + "household_net_income": [40000.0, 80000.0], + "household_weight": [3.0, 1.0], + } + ) + reform = _make_sim( + { + "congressional_district_geoid": [101, 101], + "household_net_income": [42000.0, 82000.0], + "household_weight": [3.0, 1.0], + } + ) + + impact = compute_us_congressional_district_impacts(baseline, reform) + + d = impact.district_results[0] + # Weighted avg change: (3*2000 + 1*2000) / 4 = 2000 + assert abs(d["average_household_income_change"] - 2000.0) < 1e-6 + assert d["population"] == 4.0 diff --git a/tests/test_constituency_impact.py b/tests/test_constituency_impact.py new file mode 100644 index 0000000..fbe2c81 --- /dev/null +++ b/tests/test_constituency_impact.py @@ -0,0 +1,151 @@ +"""Unit tests for ConstituencyImpact output class.""" + +import os +import tempfile +from unittest.mock import MagicMock + +import h5py +import numpy as np +import pandas as pd +from microdf import MicroDataFrame + +from policyengine.outputs.constituency_impact import ( + compute_uk_constituency_impacts, +) + + +def _make_sim(household_data: dict) -> MagicMock: + """Create a mock Simulation with household-level data.""" + hh_df = MicroDataFrame( + pd.DataFrame(household_data), + weights="household_weight", + ) + sim = MagicMock() + sim.output_dataset.data.household = hh_df + sim.id = "test-sim" + return sim + + +def _make_weight_matrix_and_csv(tmpdir, n_constituencies, n_households, weights, csv_rows): + """Create a temp H5 weight matrix and CSV metadata file.""" + h5_path = os.path.join(tmpdir, "weights.h5") + with h5py.File(h5_path, "w") as f: + f.create_dataset("2025", data=np.array(weights, dtype=np.float64)) + + csv_path = os.path.join(tmpdir, "constituencies.csv") + pd.DataFrame(csv_rows).to_csv(csv_path, index=False) + + return h5_path, csv_path + + +def test_basic_constituency_reweighting(): + """Two constituencies with known weight matrices produce correct metrics.""" + n_hh = 3 + baseline = _make_sim( + { + "household_net_income": [50000.0, 60000.0, 40000.0], + "household_weight": [1.0, 1.0, 1.0], + } + ) + reform = _make_sim( + { + "household_net_income": [52000.0, 62000.0, 42000.0], + "household_weight": [1.0, 1.0, 1.0], + } + ) + + # Constituency 0 weights: [2, 0, 1] → weighted baseline = 2*50k + 0 + 1*40k = 140k + # Constituency 1 weights: [0, 3, 0] → weighted baseline = 0 + 3*60k + 0 = 180k + weight_matrix = [[2.0, 0.0, 1.0], [0.0, 3.0, 0.0]] + csv_rows = [ + {"code": "C001", "name": "Constituency A", "x": 10, "y": 20}, + {"code": "C002", "name": "Constituency B", "x": 30, "y": 40}, + ] + + with tempfile.TemporaryDirectory() as tmpdir: + h5_path, csv_path = _make_weight_matrix_and_csv( + tmpdir, 2, n_hh, weight_matrix, csv_rows + ) + impact = compute_uk_constituency_impacts( + baseline, reform, h5_path, csv_path + ) + + assert impact.constituency_results is not None + assert len(impact.constituency_results) == 2 + + by_code = {r["constituency_code"]: r for r in impact.constituency_results} + + c1 = by_code["C001"] + # Weighted change: (2*2000 + 0 + 1*2000) / 3 = 2000 + assert abs(c1["average_household_income_change"] - 2000.0) < 1e-6 + assert c1["constituency_name"] == "Constituency A" + assert c1["x"] == 10 + assert c1["y"] == 20 + assert c1["population"] == 3.0 + + c2 = by_code["C002"] + # Weighted change: (0 + 3*2000 + 0) / 3 = 2000 + assert abs(c2["average_household_income_change"] - 2000.0) < 1e-6 + + +def test_zero_weight_constituency_skipped(): + """A constituency with all-zero weights produces no result.""" + baseline = _make_sim( + { + "household_net_income": [50000.0, 60000.0], + "household_weight": [1.0, 1.0], + } + ) + reform = _make_sim( + { + "household_net_income": [55000.0, 65000.0], + "household_weight": [1.0, 1.0], + } + ) + + weight_matrix = [[1.0, 1.0], [0.0, 0.0]] + csv_rows = [ + {"code": "C001", "name": "A", "x": 0, "y": 0}, + {"code": "C002", "name": "B", "x": 0, "y": 0}, + ] + + with tempfile.TemporaryDirectory() as tmpdir: + h5_path, csv_path = _make_weight_matrix_and_csv( + tmpdir, 2, 2, weight_matrix, csv_rows + ) + impact = compute_uk_constituency_impacts( + baseline, reform, h5_path, csv_path + ) + + assert len(impact.constituency_results) == 1 + assert impact.constituency_results[0]["constituency_code"] == "C001" + + +def test_relative_change(): + """Relative household income change is computed correctly.""" + baseline = _make_sim( + { + "household_net_income": [100000.0], + "household_weight": [1.0], + } + ) + reform = _make_sim( + { + "household_net_income": [110000.0], + "household_weight": [1.0], + } + ) + + weight_matrix = [[1.0]] + csv_rows = [{"code": "C001", "name": "A", "x": 0, "y": 0}] + + with tempfile.TemporaryDirectory() as tmpdir: + h5_path, csv_path = _make_weight_matrix_and_csv( + tmpdir, 1, 1, weight_matrix, csv_rows + ) + impact = compute_uk_constituency_impacts( + baseline, reform, h5_path, csv_path + ) + + # 10% increase + assert abs(impact.constituency_results[0]["relative_household_income_change"] - 0.1) < 1e-6 diff --git a/tests/test_intra_decile_impact.py b/tests/test_intra_decile_impact.py new file mode 100644 index 0000000..de9a807 --- /dev/null +++ b/tests/test_intra_decile_impact.py @@ -0,0 +1,265 @@ +"""Unit tests for IntraDecileImpact and DecileImpact with decile_variable.""" + +from unittest.mock import MagicMock + +import numpy as np +import pandas as pd +from microdf import MicroDataFrame + +from policyengine.outputs.decile_impact import DecileImpact +from policyengine.outputs.intra_decile_impact import ( + compute_intra_decile_impacts, +) + + +def _make_variable_mock(name: str, entity: str) -> MagicMock: + """Create a mock Variable with name and entity attributes.""" + var = MagicMock() + var.name = name + var.entity = entity + return var + + +def _make_sim(household_data: dict, variables: list | None = None) -> MagicMock: + """Create a mock Simulation with household-level data.""" + hh_df = MicroDataFrame( + pd.DataFrame(household_data), + weights="household_weight", + ) + sim = MagicMock() + sim.output_dataset.data.household = hh_df + sim.id = "test-sim" + if variables is not None: + sim.tax_benefit_model_version.variables = variables + return sim + + +# --------------------------------------------------------------------------- +# IntraDecileImpact tests +# --------------------------------------------------------------------------- + + +def test_intra_decile_no_change(): + """When baseline == reform, all households fall in no_change category.""" + n = 100 + incomes = np.linspace(10000, 100000, n) + baseline = _make_sim( + { + "household_net_income": incomes, + "household_weight": np.ones(n), + "household_count_people": np.full(n, 2.0), + } + ) + reform = _make_sim( + { + "household_net_income": incomes, + "household_weight": np.ones(n), + "household_count_people": np.full(n, 2.0), + } + ) + + results = compute_intra_decile_impacts( + baseline_simulation=baseline, + reform_simulation=reform, + income_variable="household_net_income", + entity="household", + ) + + # 11 rows: deciles 1-10 + overall (decile=0) + assert len(results.outputs) == 11 + + for r in results.outputs: + assert r.no_change == 1.0 or abs(r.no_change - 1.0) < 1e-9 + assert r.lose_more_than_5pct == 0.0 or abs(r.lose_more_than_5pct) < 1e-9 + assert r.gain_more_than_5pct == 0.0 or abs(r.gain_more_than_5pct) < 1e-9 + + +def test_intra_decile_all_large_gain(): + """When everyone gains >5%, all fall in gain_more_than_5pct category.""" + n = 100 + incomes = np.linspace(10000, 100000, n) + reform_incomes = incomes * 1.10 # 10% gain + baseline = _make_sim( + { + "household_net_income": incomes, + "household_weight": np.ones(n), + "household_count_people": np.ones(n), + } + ) + reform = _make_sim( + { + "household_net_income": reform_incomes, + "household_weight": np.ones(n), + "household_count_people": np.ones(n), + } + ) + + results = compute_intra_decile_impacts( + baseline_simulation=baseline, + reform_simulation=reform, + income_variable="household_net_income", + entity="household", + ) + + for r in results.outputs: + assert r.gain_more_than_5pct == 1.0 or abs(r.gain_more_than_5pct - 1.0) < 1e-9 + assert r.no_change == 0.0 or abs(r.no_change) < 1e-9 + + +def test_intra_decile_overall_is_mean_of_deciles(): + """The overall row (decile=0) is the arithmetic mean of deciles 1-10.""" + n = 100 + incomes = np.linspace(10000, 100000, n) + # Give a small gain so results aren't trivially all in one bucket + reform_incomes = incomes * 1.02 # 2% gain (falls in gain_less_than_5pct) + baseline = _make_sim( + { + "household_net_income": incomes, + "household_weight": np.ones(n), + "household_count_people": np.ones(n), + } + ) + reform = _make_sim( + { + "household_net_income": reform_incomes, + "household_weight": np.ones(n), + "household_count_people": np.ones(n), + } + ) + + results = compute_intra_decile_impacts( + baseline_simulation=baseline, + reform_simulation=reform, + income_variable="household_net_income", + entity="household", + ) + + decile_rows = [r for r in results.outputs if r.decile != 0] + overall = next(r for r in results.outputs if r.decile == 0) + + assert len(decile_rows) == 10 + + expected_gain = sum(r.gain_less_than_5pct for r in decile_rows) / 10 + assert abs(overall.gain_less_than_5pct - expected_gain) < 1e-9 + + expected_no_change = sum(r.no_change for r in decile_rows) / 10 + assert abs(overall.no_change - expected_no_change) < 1e-9 + + +def test_intra_decile_with_decile_variable(): + """Using a pre-computed decile_variable groups by that variable.""" + baseline = _make_sim( + { + "household_net_income": [50000.0, 60000.0, 70000.0, 80000.0], + "household_weight": [1.0, 1.0, 1.0, 1.0], + "household_count_people": [2.0, 2.0, 2.0, 2.0], + "household_wealth_decile": [1, 1, 2, 2], + } + ) + reform = _make_sim( + { + "household_net_income": [55000.0, 66000.0, 77000.0, 88000.0], + "household_weight": [1.0, 1.0, 1.0, 1.0], + "household_count_people": [2.0, 2.0, 2.0, 2.0], + "household_wealth_decile": [1, 1, 2, 2], + } + ) + + # Only 2 deciles present, so use quantiles=2 + results = compute_intra_decile_impacts( + baseline_simulation=baseline, + reform_simulation=reform, + income_variable="household_net_income", + decile_variable="household_wealth_decile", + entity="household", + quantiles=2, + ) + + # 3 rows: decile 1, decile 2, overall (decile=0) + assert len(results.outputs) == 3 + + decile_1 = next(r for r in results.outputs if r.decile == 1) + decile_2 = next(r for r in results.outputs if r.decile == 2) + + # All gains are 10% → all in gain_more_than_5pct + assert decile_1.gain_more_than_5pct == 1.0 or abs(decile_1.gain_more_than_5pct - 1.0) < 1e-9 + assert decile_2.gain_more_than_5pct == 1.0 or abs(decile_2.gain_more_than_5pct - 1.0) < 1e-9 + + +# --------------------------------------------------------------------------- +# DecileImpact with decile_variable tests +# --------------------------------------------------------------------------- + + +def test_decile_impact_with_decile_variable(): + """DecileImpact uses pre-computed grouping when decile_variable is set.""" + variables = [_make_variable_mock("household_net_income", "household")] + baseline = _make_sim( + { + "household_net_income": [40000.0, 60000.0, 80000.0, 100000.0], + "household_weight": [1.0, 1.0, 1.0, 1.0], + "household_wealth_decile": [1, 1, 2, 2], + }, + variables=variables, + ) + reform = _make_sim( + { + "household_net_income": [42000.0, 62000.0, 82000.0, 102000.0], + "household_weight": [1.0, 1.0, 1.0, 1.0], + "household_wealth_decile": [1, 1, 2, 2], + }, + variables=variables, + ) + + di = DecileImpact.model_construct( + baseline_simulation=baseline, + reform_simulation=reform, + income_variable="household_net_income", + decile_variable="household_wealth_decile", + entity="household", + decile=1, + ) + di.run() + + # Decile 1: households with income 40k, 60k → baseline mean = 50k + assert abs(di.baseline_mean - 50000.0) < 1e-6 + assert abs(di.reform_mean - 52000.0) < 1e-6 + assert abs(di.absolute_change - 2000.0) < 1e-6 + + +def test_decile_impact_qcut_default(): + """Without decile_variable, DecileImpact uses qcut (default behavior).""" + n = 100 + incomes = np.linspace(10000, 100000, n) + reform_incomes = incomes + 1000 + variables = [_make_variable_mock("household_net_income", "household")] + + baseline = _make_sim( + { + "household_net_income": incomes, + "household_weight": np.ones(n), + }, + variables=variables, + ) + reform = _make_sim( + { + "household_net_income": reform_incomes, + "household_weight": np.ones(n), + }, + variables=variables, + ) + + di = DecileImpact.model_construct( + baseline_simulation=baseline, + reform_simulation=reform, + income_variable="household_net_income", + entity="household", + decile=1, + ) + di.run() + + # Decile 1 should be the lowest-income group + # With uniform spacing, each decile has ~10 households + assert di.baseline_mean is not None + assert di.absolute_change is not None + assert abs(di.absolute_change - 1000.0) < 1e-6 diff --git a/tests/test_local_authority_impact.py b/tests/test_local_authority_impact.py new file mode 100644 index 0000000..cbbcbe1 --- /dev/null +++ b/tests/test_local_authority_impact.py @@ -0,0 +1,116 @@ +"""Unit tests for LocalAuthorityImpact output class.""" + +import os +import tempfile +from unittest.mock import MagicMock + +import h5py +import numpy as np +import pandas as pd +from microdf import MicroDataFrame + +from policyengine.outputs.local_authority_impact import ( + compute_uk_local_authority_impacts, +) + + +def _make_sim(household_data: dict) -> MagicMock: + """Create a mock Simulation with household-level data.""" + hh_df = MicroDataFrame( + pd.DataFrame(household_data), + weights="household_weight", + ) + sim = MagicMock() + sim.output_dataset.data.household = hh_df + sim.id = "test-sim" + return sim + + +def _make_weight_matrix_and_csv(tmpdir, weights, csv_rows): + """Create a temp H5 weight matrix and CSV metadata file.""" + h5_path = os.path.join(tmpdir, "la_weights.h5") + with h5py.File(h5_path, "w") as f: + f.create_dataset("2025", data=np.array(weights, dtype=np.float64)) + + csv_path = os.path.join(tmpdir, "local_authorities.csv") + pd.DataFrame(csv_rows).to_csv(csv_path, index=False) + + return h5_path, csv_path + + +def test_basic_local_authority_reweighting(): + """Two LAs with known weight matrices produce correct metrics.""" + baseline = _make_sim( + { + "household_net_income": [50000.0, 60000.0, 40000.0], + "household_weight": [1.0, 1.0, 1.0], + } + ) + reform = _make_sim( + { + "household_net_income": [53000.0, 63000.0, 43000.0], + "household_weight": [1.0, 1.0, 1.0], + } + ) + + weight_matrix = [[1.0, 1.0, 0.0], [0.0, 1.0, 2.0]] + csv_rows = [ + {"code": "LA001", "name": "Authority A", "x": 5, "y": 15}, + {"code": "LA002", "name": "Authority B", "x": 25, "y": 35}, + ] + + with tempfile.TemporaryDirectory() as tmpdir: + h5_path, csv_path = _make_weight_matrix_and_csv( + tmpdir, weight_matrix, csv_rows + ) + impact = compute_uk_local_authority_impacts( + baseline, reform, h5_path, csv_path + ) + + assert impact.local_authority_results is not None + assert len(impact.local_authority_results) == 2 + + by_code = {r["local_authority_code"]: r for r in impact.local_authority_results} + + la1 = by_code["LA001"] + # Weighted change: (1*3000 + 1*3000) / 2 = 3000 + assert abs(la1["average_household_income_change"] - 3000.0) < 1e-6 + assert la1["local_authority_name"] == "Authority A" + assert la1["population"] == 2.0 + + la2 = by_code["LA002"] + # Weighted change: (0 + 1*3000 + 2*3000) / 3 = 3000 + assert abs(la2["average_household_income_change"] - 3000.0) < 1e-6 + + +def test_zero_weight_la_skipped(): + """A local authority with all-zero weights produces no result.""" + baseline = _make_sim( + { + "household_net_income": [50000.0], + "household_weight": [1.0], + } + ) + reform = _make_sim( + { + "household_net_income": [55000.0], + "household_weight": [1.0], + } + ) + + weight_matrix = [[1.0], [0.0]] + csv_rows = [ + {"code": "LA001", "name": "A", "x": 0, "y": 0}, + {"code": "LA002", "name": "B", "x": 0, "y": 0}, + ] + + with tempfile.TemporaryDirectory() as tmpdir: + h5_path, csv_path = _make_weight_matrix_and_csv( + tmpdir, weight_matrix, csv_rows + ) + impact = compute_uk_local_authority_impacts( + baseline, reform, h5_path, csv_path + ) + + assert len(impact.local_authority_results) == 1 + assert impact.local_authority_results[0]["local_authority_code"] == "LA001"