diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 201f5cf..dcca2ab 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -9,6 +9,7 @@ from collections import Counter from collections.abc import Iterable from dataclasses import asdict, dataclass, field +from functools import lru_cache from pathlib import Path from tempfile import TemporaryDirectory from types import FunctionType @@ -36,6 +37,7 @@ TimeStructure, ) from microplex.fusion import FusionPlan +from microplex.geography import GeographyQuery from microplex.hierarchical import TaxUnitOptimizer from microplex.synthesizer import Synthesizer from microplex.targets import TargetQuery, TargetSpec @@ -47,6 +49,10 @@ build_forbes_fixed_spine, residualize_targets_for_fixed_spine, ) +from microplex_us.geography import ( + BlockGeography, + normalize_us_county_fips, +) from microplex_us.pe_source_impute_engine import ( PE_SOURCE_IMPUTE_BLOCK_ENGINE, PESourceImputeBlockRunRequest, @@ -107,6 +113,174 @@ LOGGER = logging.getLogger(__name__) +@lru_cache(maxsize=1) +def _default_block_geography() -> BlockGeography: + return BlockGeography() + + +def _normalize_household_county_fips_series( + county_fips: pd.Series, + state_fips: pd.Series, +) -> pd.Series: + """Normalize CPS county fragments into PE's five-digit county FIPS values.""" + county_numeric = pd.to_numeric(county_fips, errors="coerce") + state_numeric = pd.to_numeric(state_fips, errors="coerce") + combined = county_numeric.copy() + county_fragment_mask = ( + county_numeric.notna() + & county_numeric.gt(0) + & county_numeric.lt(1000) + & state_numeric.notna() + & state_numeric.gt(0) + ) + combined.loc[county_fragment_mask] = ( + state_numeric.loc[county_fragment_mask].round().astype(int) * 1000 + + county_numeric.loc[county_fragment_mask].round().astype(int) + ) + normalized = combined.round().astype("Int64").astype("string").str.zfill(5) + invalid = combined.isna() | combined.le(0) + return normalized.mask(invalid).astype("string") + + +def _normalize_household_state_fips_series(state_fips: pd.Series) -> pd.Series: + numeric = pd.to_numeric(state_fips, errors="coerce") + normalized = numeric.round().astype("Int64").astype("string").str.zfill(2) + return normalized.mask(numeric.isna() | numeric.le(0)).astype("string") + + +def _congressional_district_geoid_from_cd_id( + cd_id: Any, + state_fips: Any, +) -> int: + try: + state = int(str(state_fips).strip()) + except (TypeError, ValueError): + return 0 + cd_text = str(cd_id).strip() + if not cd_text or cd_text.lower() in {"nan", "none", ""}: + return 0 + district_token = cd_text.split("-")[-1] + # eCPS normalizes at-large districts to 01: the raw Census codes "AL"/"ZZ" + # (at-large) and "98" (DC) map to district 0, which is then bumped to 1 + # (policyengine-us-data db/create_initial_strata.py). Microplex's crosswalk + # feeds the "-AL" token, but accept the raw Census forms too so the encoder + # stays faithful to the eCPS 436-CD universe regardless of input convention. + if district_token.upper() in {"AL", "ZZ"}: + district = 1 + else: + try: + district = int(district_token) + except ValueError: + return 0 + if district in (0, 98): + district = 1 + return state * 100 + district + + +def _attach_household_census_geographies( + households: pd.DataFrame, + *, + seed: int, + geography: BlockGeography | None = None, +) -> pd.DataFrame: + """Attach eCPS-contract block, tract, county, and CD geographies to households.""" + # Intermediate frames are indexed by row label and written back via .loc; + # a non-unique household-frame index makes those reindex operations ambiguous + # (ValueError: cannot reindex on an axis with duplicate labels). The caller + # consumes this result by merging on the household_id column, not the index, + # so collapsing to a fresh RangeIndex here is both safe and robust. + result = households.reset_index(drop=True) + for column, default in ( + ("block_geoid", ""), + ("tract_geoid", ""), + ("congressional_district_geoid", 0), + ): + if column not in result.columns: + result[column] = default + if result.empty or "state_fips" not in result.columns: + return result + + assigned_blocks = pd.Series(pd.NA, index=result.index, dtype="string") + state_values = _normalize_household_state_fips_series(result["state_fips"]) + county_values = ( + _normalize_household_county_fips_series(result["county_fips"], result["state_fips"]) + if "county_fips" in result.columns + else pd.Series(pd.NA, index=result.index, dtype="string") + ) + result["county_fips"] = county_values.fillna("00000") + + try: + block_geography = geography or _default_block_geography() + block_data = block_geography.data + except FileNotFoundError: + return result + + valid_counties = set(block_data["county_fips"].dropna().astype(str)) + county_mask = county_values.isin(valid_counties) + if county_mask.any(): + county_query = GeographyQuery( + partition_columns=("county_fips",), + partition_normalizers={"county_fips": normalize_us_county_fips}, + ) + county_assigner = block_geography.load_assigner(county_query) + county_frame = pd.DataFrame( + {"county_fips": county_values.loc[county_mask]}, + index=result.index[county_mask], + ) + assigned = county_assigner.assign( + county_frame, + random_state=seed, + ) + assigned_blocks.loc[assigned.index] = assigned["block_geoid"].astype("string") + + remaining_mask = assigned_blocks.isna() + state_mask = remaining_mask & state_values.notna() + if state_mask.any(): + state_frame = pd.DataFrame( + {"state_fips": state_values.loc[state_mask]}, + index=result.index[state_mask], + ) + assigned = block_geography.assign( + state_frame, + random_state=seed + 1, + ) + assigned_blocks.loc[assigned.index] = assigned["block_geoid"].astype("string") + + assigned_mask = assigned_blocks.notna() + if not assigned_mask.any(): + return result + + materialized = block_geography.materialize( + pd.DataFrame( + { + "_row_index": assigned_blocks.index[assigned_mask], + "block_geoid": assigned_blocks.loc[assigned_mask].astype(str), + }, + index=result.index[assigned_mask], + ), + columns=("state_fips", "county_fips", "tract_geoid", "cd_id"), + ) + row_index = materialized["_row_index"].to_numpy() + for column in ("block_geoid", "tract_geoid", "county_fips"): + if column in materialized.columns: + result.loc[row_index, column] = materialized[column].to_numpy() + result.loc[row_index, "state_fips"] = ( + pd.to_numeric(materialized["state_fips"], errors="coerce") + .fillna(0) + .astype(int) + .to_numpy() + ) + result.loc[row_index, "congressional_district_geoid"] = [ + _congressional_district_geoid_from_cd_id(cd_id, state_fips) + for cd_id, state_fips in zip( + materialized.get("cd_id", pd.Series(index=row_index)), + materialized["state_fips"], + strict=False, + ) + ] + return result + + def _root_logger_has_handlers() -> bool: return bool(logging.getLogger().handlers) @@ -2133,6 +2307,10 @@ def prepare_seed_data_from_source( hh["county_fips"] = 0 if "tenure" not in household_coverage or "tenure" not in hh.columns: hh["tenure"] = 0 + hh = _attach_household_census_geographies( + hh, + seed=self.config.random_seed, + ) required_person_defaults = { "age": 0, @@ -2145,13 +2323,31 @@ def prepare_seed_data_from_source( if column not in person_coverage or column not in persons_df.columns: persons_df[column] = default + household_seed_columns = [ + "household_id", + "state_fips", + "county_fips", + "hh_weight", + "tenure", + "block_geoid", + "tract_geoid", + "congressional_district_geoid", + ] seed_data = persons_df.merge( - hh[["household_id", "state_fips", "county_fips", "hh_weight", "tenure"]], + hh[[column for column in household_seed_columns if column in hh.columns]], on="household_id", how="left", suffixes=("", "__household"), ) - for column in ("state_fips", "county_fips", "hh_weight", "tenure"): + for column in ( + "state_fips", + "county_fips", + "hh_weight", + "tenure", + "block_geoid", + "tract_geoid", + "congressional_district_geoid", + ): household_column = f"{column}__household" if household_column not in seed_data.columns: continue @@ -2165,7 +2361,24 @@ def prepare_seed_data_from_source( seed_data["hh_weight"] = seed_data["hh_weight"].fillna(1.0).astype(float) seed_data["tenure"] = seed_data["tenure"].fillna(0).astype(int) seed_data["state_fips"] = seed_data["state_fips"].fillna(0).astype(int) - seed_data["county_fips"] = seed_data["county_fips"].fillna(0).astype(int) + seed_data["county_fips"] = ( + seed_data["county_fips"] + .map(normalize_us_county_fips) + .fillna("00000") + ) + if "block_geoid" in seed_data.columns: + seed_data["block_geoid"] = seed_data["block_geoid"].fillna("").astype(str) + if "tract_geoid" in seed_data.columns: + seed_data["tract_geoid"] = seed_data["tract_geoid"].fillna("").astype(str) + if "congressional_district_geoid" in seed_data.columns: + seed_data["congressional_district_geoid"] = ( + pd.to_numeric( + seed_data["congressional_district_geoid"], + errors="coerce", + ) + .fillna(0) + .astype(int) + ) seed_data["income"] = pd.to_numeric( seed_data["income"], errors="coerce" ).fillna(0.0) @@ -6022,6 +6235,10 @@ def _finalize_synthetic_population( result = synthetic.copy().reset_index(drop=True) for column, default in { "state_fips": 0, + "county_fips": "00000", + "block_geoid": "", + "tract_geoid": "", + "congressional_district_geoid": 0, "tenure": 0, "age": 0, "sex": 0, @@ -6066,6 +6283,9 @@ def _build_policyengine_households(self, persons: pd.DataFrame) -> pd.DataFrame: for column in ( "state_fips", "county_fips", + "block_geoid", + "tract_geoid", + "congressional_district_geoid", "tenure", "tenure_type", "state", diff --git a/src/microplex_us/policyengine/us.py b/src/microplex_us/policyengine/us.py index 5b60808..9623d81 100644 --- a/src/microplex_us/policyengine/us.py +++ b/src/microplex_us/policyengine/us.py @@ -351,6 +351,9 @@ class PolicyEngineUSVariableMaterializationResult: "tenure_type", "state_fips", "county_fips", + "block_geoid", + "tract_geoid", + "congressional_district_geoid", } | set(POLICYENGINE_US_TAKEUP_INPUT_VARIABLES) POLICYENGINE_US_EXPORT_COLUMN_ALIASES: dict[str, str] = { diff --git a/tests/pipelines/test_geoid_cd_encoding.py b/tests/pipelines/test_geoid_cd_encoding.py new file mode 100644 index 0000000..6b919b2 --- /dev/null +++ b/tests/pipelines/test_geoid_cd_encoding.py @@ -0,0 +1,181 @@ +"""Geoid carry-over guards for the CPS block-geography export (PR #129). + +Two regressions are pinned here, both surfaced while reconciling PR #129 +against the closed PR #130: + +1. ``congressional_district_geoid`` encoding must match the eCPS 436-CD + calibration universe, which encodes at-large districts (and DC) as district + ``01`` -- e.g. AK->201, WY->5601, DC->1101. The raw Census block crosswalk + carries at-large as ``00``/``98``; eCPS normalizes those to ``01`` in + ``policyengine-us-data db/create_initial_strata.py`` before writing the + targets DB and the dataset. ``_congressional_district_geoid_from_cd_id`` must + reproduce the ``01`` convention regardless of which input form it is fed. + +2. ``_attach_household_census_geographies`` must not assume a unique household + -frame index. It assigns blocks and writes them back via ``.loc[row_index]``; + a duplicate index previously raised + ``ValueError: cannot reindex on an axis with duplicate labels``. +""" + +import pandas as pd +import pytest + +from microplex_us.pipelines.us import ( + _attach_household_census_geographies, + _congressional_district_geoid_from_cd_id, + _default_block_geography, +) + +# state_fips for the 7 single-seat / at-large jurisdictions in the 119th Congress. +_AT_LARGE_STATES = { + "AK": 2, + "DE": 10, + "DC": 11, + "ND": 38, + "SD": 46, + "VT": 50, + "WY": 56, +} + + +def test_multi_district_encoding_is_ssdd(): + cases = { + ("CA-52", 6): 652, + ("NY-12", 36): 3612, + ("TX-38", 48): 4838, + ("FL-28", 12): 1228, + ("AL-02", 1): 102, + } + for (cd_id, state), expected in cases.items(): + assert _congressional_district_geoid_from_cd_id(cd_id, state) == expected + + +def test_at_large_uses_district_01_via_AL_token(): + # Microplex's crosswalk feeds the "-AL" token; every at-large state and + # DC must encode to district 01 (state*100 + 1), matching the eCPS universe. + for abbr, state in _AT_LARGE_STATES.items(): + assert ( + _congressional_district_geoid_from_cd_id(f"{abbr}-AL", state) + == state * 100 + 1 + ) + + +def test_raw_census_at_large_forms_normalize_to_01(): + # Hardening: even if a raw Census form leaks through (DC as "98", at-large as + # "ZZ" or "00"), the encoder must still produce district 01, never 1198/5600. + assert _congressional_district_geoid_from_cd_id("DC-98", 11) == 1101 + assert _congressional_district_geoid_from_cd_id("WY-ZZ", 56) == 5601 + assert _congressional_district_geoid_from_cd_id("AK-00", 2) == 201 + assert _congressional_district_geoid_from_cd_id("98", 11) == 1101 + + +def test_no_at_large_geoid_ends_in_00_or_98(): + # The invariant that distinguishes the eCPS universe from the raw crosswalk. + for abbr, state in _AT_LARGE_STATES.items(): + for token in (f"{abbr}-AL", f"{abbr}-00", f"{abbr}-98", f"{abbr}-ZZ"): + geoid = _congressional_district_geoid_from_cd_id(token, state) + assert geoid % 100 not in (0, 98), f"{token} -> {geoid}" + + +def test_invalid_inputs_return_zero(): + assert _congressional_district_geoid_from_cd_id("", 6) == 0 + assert _congressional_district_geoid_from_cd_id("nan", 6) == 0 + assert _congressional_district_geoid_from_cd_id("", 6) == 0 + assert _congressional_district_geoid_from_cd_id("CA-12", "not-a-state") == 0 + + +# --- duplicate-index robustness ------------------------------------------------- + + +class _StubAssigner: + def __init__(self, block_geoid: str): + self._block_geoid = block_geoid + + def assign(self, frame: pd.DataFrame, random_state: int = 0) -> pd.DataFrame: + out = frame.copy() + out["block_geoid"] = self._block_geoid + return out + + +class _StubBlockGeography: + """Minimal BlockGeography surface for exercising the assignment write-back. + + Returns a single deterministic CA block so the test focuses on index + handling, not the probabilistic draw. + """ + + _BLOCK = "060371000001000" + + def __init__(self): + self.data = pd.DataFrame({"county_fips": ["06037"]}) + + def load_assigner(self, query) -> _StubAssigner: # noqa: ANN001 - query unused + return _StubAssigner(self._BLOCK) + + def assign(self, frame: pd.DataFrame, random_state: int = 0) -> pd.DataFrame: + out = frame.copy() + out["block_geoid"] = self._BLOCK + return out + + def materialize(self, frame: pd.DataFrame, columns=()) -> pd.DataFrame: + out = frame.copy() + out["state_fips"] = "06" + out["county_fips"] = "06037" + out["tract_geoid"] = "06037100000" + out["cd_id"] = "CA-37" + return out + + +def test_attach_geographies_handles_duplicate_household_index(): + # Duplicate labels [0, 0, 1] previously broke the .loc[row_index] write-back. + households = pd.DataFrame( + { + "household_id": [10, 11, 12], + "state_fips": [6, 6, 6], + "county_fips": [37, 37, 37], # CPS fragment -> 06037 + }, + index=[0, 0, 1], + ) + result = _attach_household_census_geographies( + households, seed=0, geography=_StubBlockGeography() + ) + assert len(result) == 3 + assert (result["block_geoid"] == "060371000001000").all() + assert (result["tract_geoid"] == "06037100000").all() + # CA-37 -> 6 * 100 + 37 + assert (result["congressional_district_geoid"] == 637).all() + # household_id is preserved (consumed downstream via merge on this column). + assert sorted(result["household_id"]) == [10, 11, 12] + + +# --- live universe parity (skips when the crosswalk parquet is unavailable) ----- + + +def test_cd_encoder_reproduces_ecps_436_cd_universe(): + """Run the encoder over the real block crosswalk's distinct (state, cd_id). + + Verified during review to equal the eCPS calibration target universe in + policy_data.db exactly (436 CDs, at-large=01). Skips in environments without + the crosswalk parquet (e.g. CI). + """ + try: + data = _default_block_geography().data + except (FileNotFoundError, OSError): + pytest.skip("block crosswalk parquet not available") + if "cd_id" not in data.columns or "state_fips" not in data.columns: + pytest.skip("crosswalk lacks cd_id/state_fips columns") + + pairs = data[["state_fips", "cd_id"]].dropna().drop_duplicates() + geoids = { + _congressional_district_geoid_from_cd_id(cd_id, state) + for state, cd_id in zip(pairs["state_fips"], pairs["cd_id"], strict=False) + } + geoids.discard(0) + + assert len(geoids) == 436, f"expected 436-CD universe, got {len(geoids)}" + # at-large districts encode to 01, so nothing ends in 00. + assert not any(g % 100 == 0 for g in geoids) + for g in geoids: + state, district = divmod(g, 100) + assert 1 <= state <= 78, f"invalid state in {g}" + assert 1 <= district <= 53, f"invalid district in {g}" diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 5f5b5cd..1721f20 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -25,11 +25,13 @@ from microplex.targets import TargetAggregation, TargetQuery, TargetSpec import microplex_us.pipelines.us as us_pipeline_module +from microplex_us.geography import BlockGeography from microplex_us.pipelines.us import ( USMicroplexBuildConfig, USMicroplexBuildResult, USMicroplexPipeline, USMicroplexTargets, + _attach_household_census_geographies, _policyengine_target_loss_geography_key, _select_feasible_policyengine_calibration_constraints, _select_policyengine_deferred_stage_constraints, @@ -604,6 +606,44 @@ def persons(self): } ) + def test_attach_household_census_geographies_from_state_county(self): + geography = BlockGeography.from_data( + pd.DataFrame( + { + "geoid": ["060010201001000", "360610101001000"], + "state_fips": ["06", "36"], + "county": ["001", "061"], + "county_fips": ["06001", "36061"], + "tract": ["020100", "010100"], + "tract_geoid": ["06001020100", "36061010100"], + "cd_id": ["CA-01", "NY-12"], + "prob": [1.0, 1.0], + } + ) + ) + households = pd.DataFrame( + { + "household_id": [10, 20], + "state_fips": [6, 36], + "county_fips": [1, 61], + }, + index=[100, 200], + ) + + result = _attach_household_census_geographies( + households, + seed=0, + geography=geography, + ).sort_values("household_id") + + assert result["block_geoid"].tolist() == [ + "060010201001000", + "360610101001000", + ] + assert result["county_fips"].tolist() == ["06001", "36061"] + assert result["tract_geoid"].tolist() == ["06001020100", "36061010100"] + assert result["congressional_district_geoid"].tolist() == [601, 3612] + def test_prepare_seed_data(self, persons, households): pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) @@ -612,6 +652,9 @@ def test_prepare_seed_data(self, persons, households): assert len(seed) == len(persons) assert "state" in seed.columns assert "county_fips" in seed.columns + assert "block_geoid" in seed.columns + assert "tract_geoid" in seed.columns + assert "congressional_district_geoid" in seed.columns assert "age_group" in seed.columns assert "income_bracket" in seed.columns assert set(seed["state"]) == {"CA", "NY", "TX"} diff --git a/tests/policyengine/test_us.py b/tests/policyengine/test_us.py index d9565db..72fdf60 100644 --- a/tests/policyengine/test_us.py +++ b/tests/policyengine/test_us.py @@ -2107,7 +2107,11 @@ def __init__(self, entity): household_contract_inputs = ( "auto_loan_balance", "auto_loan_interest", + "block_geoid", + "congressional_district_geoid", + "county_fips", "tenure_type", + "tract_geoid", ) tax_unit_contract_inputs = ( "domestic_production_ald",