diff --git a/src/microplex_us/geography.py b/src/microplex_us/geography.py index 6e89ee2..1f22b5f 100644 --- a/src/microplex_us/geography.py +++ b/src/microplex_us/geography.py @@ -26,6 +26,14 @@ TRACT_GEOID_LEN = STATE_LEN + COUNTY_LEN + TRACT_LEN BLOCK_GEOID_LEN = STATE_LEN + COUNTY_LEN + TRACT_LEN + BLOCK_LEN +# At-large (single-district) states use district number 1 in the integer +# ``congressional_district_geoid`` (SSDD) convention. This matches the +# enhanced-CPS calibration targets exactly: AK->201, DE->1001, DC->1101, +# ND->3801, SD->4601, VT->5001, WY->5601. The block crosswalk encodes these +# districts with the ``"AL"`` suffix (e.g. ``"DC-AL"``). +AT_LARGE_DISTRICT_NUMBER = 1 +AT_LARGE_DISTRICT_SUFFIX = "AL" + PACKAGE_ROOT = Path(__file__).resolve().parents[2] DEFAULT_DATA_DIR_CANDIDATES = ( PACKAGE_ROOT / "data", @@ -106,6 +114,10 @@ "72": "PR", } +US_STATE_FIPS_BY_ABBR: dict[str, int] = { + abbr: int(fips) for fips, abbr in US_STATE_ABBR_BY_FIPS.items() +} + def load_block_probabilities(path: str | Path | None = None) -> pd.DataFrame: """Load US Census block probabilities from parquet.""" @@ -437,6 +449,257 @@ def derive_geographies( return result +def cd_id_to_congressional_district_geoid(cd_id: Any) -> int | None: + """Convert a block-crosswalk ``cd_id`` to the PE-US integer GEOID. + + The block probabilities crosswalk stores congressional districts as + ``"-"`` strings, e.g. ``"CA-01"`` or the at-large + sentinel ``"DC-AL"``. PolicyEngine-US (and the enhanced-CPS calibration + targets) store ``congressional_district_geoid`` as an ``SSDD`` integer: + ``state_fips * 100 + district_number``. + + At-large (single-district) states are encoded as district ``1`` to match the + enhanced-CPS calibration-target universe exactly (e.g. ``"AK-AL"`` -> 201, + ``"DC-AL"`` -> 1101, ``"WY-AL"`` -> 5601). Returns ``None`` for missing or + unparseable values rather than fabricating a geoid. + """ + if cd_id is None: + return None + try: + if pd.isna(cd_id): + return None + except (TypeError, ValueError): + pass + text = str(cd_id).strip() + if "-" not in text: + return None + abbr, district = text.rsplit("-", 1) + state_fips = US_STATE_FIPS_BY_ABBR.get(abbr.strip().upper()) + if state_fips is None: + return None + district = district.strip().upper() + if district == AT_LARGE_DISTRICT_SUFFIX: + district_number = AT_LARGE_DISTRICT_NUMBER + elif district.isdigit(): + district_number = int(district) + else: + return None + return state_fips * 100 + district_number + + +def cd_id_series_to_congressional_district_geoid(values: pd.Series) -> pd.Series: + """Vectorized :func:`cd_id_to_congressional_district_geoid` over a Series. + + Returns a nullable ``Int64`` Series so unmapped districts surface as ```` + rather than silently becoming a real geoid. + """ + mapping = { + cd_id: cd_id_to_congressional_district_geoid(cd_id) + for cd_id in pd.Series(values).dropna().astype(str).unique() + } + return pd.Series(values).astype("string").map(mapping).astype("Int64") + + +def assign_household_block_geography( + households: pd.DataFrame, + *, + block_geography: BlockGeography | None = None, + block_data: pd.DataFrame | None = None, + state_column: str = "state_fips", + county_column: str = "county_fips", + cd_geoid_column: str = "congressional_district_geoid", + random_state: int | None = None, +) -> pd.DataFrame: + """Attach Census block geography (``block_geoid``/``tract_geoid``/CD) to households. + + Each household is assigned a real 15-digit Census ``block_geoid`` via a + population-weighted draw from the block-probabilities crosswalk, mirroring the + enhanced-CPS block-assignment method. ``tract_geoid`` is then the true + 11-character prefix of ``block_geoid`` and ``congressional_district_geoid`` is + looked up from the block's crosswalk ``cd_id``. + + The block draw is partitioned using the most specific geography available for + each household, in priority order: + + 1. ``county_fips`` when present and matching a crosswalk county (the CPS source + county is the finest real signal), + 2. then an existing ``congressional_district_geoid`` (eCPS-style, preserves a + known CD), + 3. then ``state_fips``. + + Households whose partition has no matching blocks keep empty/NA geography rather + than receiving a fabricated geoid. + + Returns a copy of ``households`` with ``block_geoid`` (str), ``tract_geoid`` + (str), and ``congressional_district_geoid`` (Int64) columns added or updated. + Existing ``state_fips``/``county_fips`` columns are left untouched. + """ + if state_column not in households.columns: + raise ValueError( + f"Household table must contain '{state_column}' to assign block geography" + ) + + geography = block_geography + if geography is None: + geography = ( + BlockGeography.from_data(block_data) + if block_data is not None + else BlockGeography(lazy_load=False) + ) + + crosswalk = geography.data + cd_id_by_geoid = dict(zip(crosswalk["geoid"].astype(str), crosswalk["cd_id"])) + + result = households.copy() + n = len(result) + block_geoids = pd.Series([pd.NA] * n, index=result.index, dtype="string") + unresolved = pd.Series([True] * n, index=result.index) + rng_seed = random_state + + # --- Partition 1: county_fips (most specific CPS-source geography) --------- + if county_column in result.columns: + county_values = result[county_column].map(normalize_us_county_fips) + county_blocks = _county_block_index(crosswalk) + if county_blocks: + resolvable = ( + unresolved + & county_values.notna() + & county_values.isin(county_blocks.keys()) + ) + for county_fips, group in result.loc[resolvable].groupby( + county_values.loc[resolvable] + ): + frame = county_blocks.get(str(county_fips)) + if frame is None or frame.empty: + continue + block_geoids.loc[group.index] = _weighted_block_sample( + frame, len(group), rng_seed + ) + unresolved.loc[group.index] = False + + # --- Partition 2: existing congressional_district_geoid (eCPS-style) ------- + if unresolved.any() and cd_geoid_column in result.columns: + existing_cd = pd.to_numeric(result[cd_geoid_column], errors="coerce") + cd_blocks = _cd_geoid_block_index(crosswalk) + if cd_blocks: + resolvable = unresolved & existing_cd.notna() + for cd_geoid, group in result.loc[resolvable].groupby( + existing_cd.loc[resolvable].astype(int) + ): + frame = cd_blocks.get(int(cd_geoid)) + if frame is None or frame.empty: + continue + block_geoids.loc[group.index] = _weighted_block_sample( + frame, len(group), rng_seed + ) + unresolved.loc[group.index] = False + + # --- Partition 3: state_fips (always-available fallback) ------------------- + if unresolved.any(): + state_values = _normalize_us_state_fips_series(result[state_column]) + state_blocks = _state_block_index(crosswalk) + resolvable = unresolved & state_values.notna() + for state_fips, group in result.loc[resolvable].groupby( + state_values.loc[resolvable] + ): + frame = state_blocks.get(str(state_fips)) + if frame is None or frame.empty: + continue + block_geoids.loc[group.index] = _weighted_block_sample( + frame, len(group), rng_seed + ) + unresolved.loc[group.index] = False + + result["block_geoid"] = block_geoids.fillna("").astype("string") + tract = block_geoids.str[:TRACT_GEOID_LEN] + result["tract_geoid"] = tract.fillna("").astype("string") + cd_id_series = block_geoids.map(cd_id_by_geoid) + # Unresolved households fall back to the policyengine-us defaults: empty + # block/tract strings and CD geoid 0 (PE-US ``congressional_district_geoid`` + # default_value), so the export stays a clean integer array. In practice the + # state-partition fallback resolves every household with a valid state FIPS. + cd_geoids = cd_id_series_to_congressional_district_geoid(cd_id_series) + result[cd_geoid_column] = cd_geoids.fillna(0).astype("int64") + return result + + +def _block_sample_columns(crosswalk: pd.DataFrame) -> list[str]: + return [ + column + for column in ("geoid", "prob", "population", "national_prob") + if column in crosswalk.columns + ] + + +def _county_block_index(crosswalk: pd.DataFrame) -> dict[str, pd.DataFrame]: + if "county_fips" in crosswalk.columns: + county = crosswalk["county_fips"].astype(str).str.zfill(COUNTY_GEOID_LEN) + elif {"state_fips", "county"}.issubset(crosswalk.columns): + county = crosswalk["state_fips"].astype(str).str.zfill(STATE_GEOID_LEN) + ( + crosswalk["county"].astype(str).str.zfill(COUNTY_LEN) + ) + else: + return {} + columns = _block_sample_columns(crosswalk) + return { + str(county_fips): group[columns].copy() + for county_fips, group in crosswalk.groupby(county, sort=False) + } + + +def _state_block_index(crosswalk: pd.DataFrame) -> dict[str, pd.DataFrame]: + state = crosswalk["state_fips"].astype(str).str.zfill(STATE_GEOID_LEN) + columns = _block_sample_columns(crosswalk) + return { + str(state_fips): group[columns].copy() + for state_fips, group in crosswalk.groupby(state, sort=False) + } + + +def _cd_geoid_block_index(crosswalk: pd.DataFrame) -> dict[int, pd.DataFrame]: + if "cd_id" not in crosswalk.columns: + return {} + columns = _block_sample_columns(crosswalk) + frames: dict[int, list[pd.DataFrame]] = {} + for cd_id, group in crosswalk.groupby("cd_id", sort=False): + geoid = cd_id_to_congressional_district_geoid(cd_id) + if geoid is None: + continue + frames.setdefault(int(geoid), []).append(group[columns]) + return { + geoid: pd.concat(parts, ignore_index=True) for geoid, parts in frames.items() + } + + +def _weighted_block_sample( + frame: pd.DataFrame, + n: int, + random_state: int | None, +) -> np.ndarray: + """Draw ``n`` block GEOIDs weighted by block population/probability.""" + geoids = frame["geoid"].astype(str).to_numpy() + weights = _block_sampling_weights(frame) + rng = np.random.default_rng(random_state) + indices = rng.choice(len(geoids), size=n, replace=True, p=weights) + return geoids[indices] + + +def _block_sampling_weights(frame: pd.DataFrame) -> np.ndarray: + if "prob" in frame.columns: + weights = pd.to_numeric(frame["prob"], errors="coerce").to_numpy(dtype=float) + elif "population" in frame.columns: + weights = pd.to_numeric( + frame["population"], errors="coerce" + ).to_numpy(dtype=float) + else: + weights = np.ones(len(frame), dtype=float) + weights = np.where(np.isfinite(weights) & (weights > 0), weights, 0.0) + total = weights.sum() + if total <= 0: + return np.full(len(frame), 1.0 / len(frame)) + return weights / total + + class BlockGeography(GeographyProvider): """US atomic-geography provider backed by Census blocks.""" @@ -759,5 +1022,12 @@ def __repr__(self) -> str: "state_nonmetro_spm_area_code", "add_spm_metro_area_geography", "derive_geographies", + "cd_id_to_congressional_district_geoid", + "cd_id_series_to_congressional_district_geoid", + "assign_household_block_geography", + "AT_LARGE_DISTRICT_NUMBER", + "AT_LARGE_DISTRICT_SUFFIX", + "US_STATE_ABBR_BY_FIPS", + "US_STATE_FIPS_BY_ABBR", "BlockGeography", ] diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 201f5cf..5594086 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -1600,6 +1600,18 @@ class USMicroplexBuildConfig: """Normalized Forbes fixed-spine records to append after calibration.""" forbes_fixed_spine_snapshot_id: str = "forbes-us-top-tail" forbes_fixed_spine_replicates_per_unit: int = 10 + policyengine_assign_block_geography: bool = True + """Assign Census block geography to exported households. + + When enabled (default), the PolicyEngine entity-table build draws a real + 15-digit ``block_geoid`` for each household from the population-weighted + block-probabilities crosswalk, then derives ``tract_geoid`` (the 11-char + block prefix) and the integer ``congressional_district_geoid``. This mirrors + the enhanced-CPS block-assignment method so the export carries the same three + GEOID leaves. Silently skipped when the crosswalk data is unavailable. + """ + block_probabilities_path: str | Path | None = None + """Optional override path to the block-probabilities crosswalk parquet.""" def __post_init__(self) -> None: if ( @@ -6082,8 +6094,67 @@ def _build_policyengine_households(self, persons: pd.DataFrame) -> pd.DataFrame: .agg(aggregations) .rename(columns={"weight": "household_weight"}) ) + households = self._assign_household_block_geography(households) return households + def _assign_household_block_geography( + self, + households: pd.DataFrame, + ) -> pd.DataFrame: + """Attach block_geoid/tract_geoid/congressional_district_geoid to households. + + Draws a real Census block per household from the population-weighted + crosswalk (partitioned by the household's CPS county when disclosed, then by + an existing congressional district, then by state), derives + ``tract_geoid = block_geoid[:11]`` and the integer + ``congressional_district_geoid``. Mirrors the enhanced-CPS block assignment so + the export carries the same GEOID leaves. No-op when disabled or when + ``state_fips`` / the crosswalk data is unavailable — geoids are never + fabricated. + """ + if not self.config.policyengine_assign_block_geography: + return households + if "state_fips" not in households.columns or households.empty: + return households + try: + from microplex_us.geography import ( + BLOCK_GEOID_LEN, + BlockGeography, + assign_household_block_geography, + default_runtime_block_probabilities_path, + ) + except ImportError: + return households + + configured_path = self.config.block_probabilities_path + crosswalk_path = ( + Path(configured_path) + if configured_path is not None + else default_runtime_block_probabilities_path() + ) + if crosswalk_path is None or not Path(crosswalk_path).exists(): + _emit_us_pipeline_progress( + "block_geography_skipped", + reason="crosswalk_unavailable", + path=str(crosswalk_path) if crosswalk_path is not None else None, + ) + return households + + geography = BlockGeography(crosswalk_path, lazy_load=False) + assigned = assign_household_block_geography( + households, + block_geography=geography, + random_state=self.config.random_seed, + ) + _emit_us_pipeline_progress( + "block_geography_assigned", + households=int(len(assigned)), + assigned_blocks=int( + assigned["block_geoid"].astype(str).str.len().eq(BLOCK_GEOID_LEN).sum() + ), + ) + return assigned + def _build_policyengine_tax_units( self, persons: pd.DataFrame, diff --git a/src/microplex_us/policyengine/us.py b/src/microplex_us/policyengine/us.py index 5b60808..6672710 100644 --- a/src/microplex_us/policyengine/us.py +++ b/src/microplex_us/policyengine/us.py @@ -351,6 +351,14 @@ class PolicyEngineUSVariableMaterializationResult: "tenure_type", "state_fips", "county_fips", + # Census block geography GEOID leaves, mirroring the enhanced-CPS export. + # All three are storable INPUT variables in policyengine-us (no formula): + # block_geoid/tract_geoid are strings, congressional_district_geoid is the + # integer SSDD GEOID. tract_geoid is exported as the true 11-char prefix of + # block_geoid; congressional_district_geoid is derived from the block's CD. + "block_geoid", + "tract_geoid", + "congressional_district_geoid", } | set(POLICYENGINE_US_TAKEUP_INPUT_VARIABLES) POLICYENGINE_US_EXPORT_COLUMN_ALIASES: dict[str, str] = { diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 5f5b5cd..3d074e3 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -3,6 +3,7 @@ import json import logging import sqlite3 +from pathlib import Path from types import SimpleNamespace import h5py @@ -807,6 +808,67 @@ def test_build_policyengine_entity_tables(self, persons, households): {"SINGLE", "JOINT", "SEPARATE", "HEAD_OF_HOUSEHOLD", "SURVIVING_SPOUSE"} ) + def test_build_policyengine_entity_tables_assigns_block_geography(self): + from microplex_us.geography import default_runtime_block_probabilities_path + + crosswalk_path = default_runtime_block_probabilities_path() + if crosswalk_path is None or not Path(crosswalk_path).exists(): + pytest.skip("Block probabilities crosswalk not available") + + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + population = pd.DataFrame( + { + "person_id": [1, 2, 3, 4], + "household_id": [10, 10, 20, 30], + "weight": [1.0, 1.0, 2.0, 3.0], + "age": [45, 12, 70, 33], + "income": [60_000.0, 0.0, 25_000.0, 40_000.0], + "relationship_to_head": [0, 2, 0, 0], + "state_fips": [6, 6, 36, 17], + "county_fips": ["06037", "06037", "36061", "17031"], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + households = tables.households.sort_values("household_id").reset_index( + drop=True + ) + + for leaf in ("block_geoid", "tract_geoid", "congressional_district_geoid"): + assert leaf in households.columns + + block = households["block_geoid"].astype(str) + tract = households["tract_geoid"].astype(str) + # Real 15-digit blocks; tract is the true 11-char prefix. + assert (block.str.len() == 15).all() + assert (tract == block.str[:11]).all() + # County-partitioned draw lands each household in its CPS county. + assert (block.str[:5] == households["county_fips"].astype(str)).all() + # CD GEOID resolves and is consistent with the household state. + cd = households["congressional_district_geoid"] + assert cd.notna().all() + assert ((cd // 100).astype(int) == households["state_fips"].astype(int)).all() + + def test_build_policyengine_entity_tables_can_disable_block_geography(self): + config = USMicroplexBuildConfig(policyengine_assign_block_geography=False) + pipeline = USMicroplexPipeline(config) + population = pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 20], + "weight": [1.0, 2.0], + "age": [45, 70], + "income": [60_000.0, 25_000.0], + "relationship_to_head": [0, 0], + "state_fips": [6, 36], + "county_fips": ["06037", "36061"], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + assert "block_geoid" not in tables.households.columns + assert "tract_geoid" not in tables.households.columns + def test_build_policyengine_entity_tables_preserves_household_contract_inputs( self, ): diff --git a/tests/policyengine/test_geoid_export.py b/tests/policyengine/test_geoid_export.py new file mode 100644 index 0000000..15f6d5e --- /dev/null +++ b/tests/policyengine/test_geoid_export.py @@ -0,0 +1,416 @@ +"""Tests for Census block GEOID export wiring (G5). + +Covers the three GEOID leaves the enhanced-CPS export carries: +``block_geoid``, ``tract_geoid``, and ``congressional_district_geoid``. + +The contract being verified end-to-end: + +* ``block_geoid`` is a real 15-digit Census block GEOID drawn from the + population-weighted crosswalk (never fabricated). +* ``tract_geoid == block_geoid[:11]`` exactly. +* ``congressional_district_geoid`` is the integer ``SSDD`` GEOID derived from + the block's crosswalk ``cd_id`` (at-large states use district ``1``, matching + the enhanced-CPS calibration-target universe). +* All three are members of ``SAFE_POLICYENGINE_US_EXPORT_VARIABLES`` and survive + the PolicyEngine-US export map / forbidden-column guard. +""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +from microplex_us.geography import ( + AT_LARGE_DISTRICT_NUMBER, + BLOCK_GEOID_LEN, + TRACT_GEOID_LEN, + US_STATE_ABBR_BY_FIPS, + BlockGeography, + assign_household_block_geography, + cd_id_series_to_congressional_district_geoid, + cd_id_to_congressional_district_geoid, + default_runtime_block_probabilities_path, +) +from microplex_us.policyengine.us import ( + SAFE_POLICYENGINE_US_EXPORT_VARIABLES, + PolicyEngineUSEntityTableBundle, + build_policyengine_us_export_variable_maps, + build_policyengine_us_time_period_arrays, + resolve_policyengine_excluded_export_variables, + write_policyengine_us_time_period_dataset, +) + +GEOID_LEAVES = ("block_geoid", "tract_geoid", "congressional_district_geoid") + + +def _runtime_crosswalk_path() -> Path | None: + return default_runtime_block_probabilities_path() + + +def _require_crosswalk() -> BlockGeography: + path = _runtime_crosswalk_path() + if path is None or not Path(path).exists(): + pytest.skip("Block probabilities crosswalk not available") + return BlockGeography(path, lazy_load=False) + + +# --------------------------------------------------------------------------- +# congressional_district_geoid integer conversion +# --------------------------------------------------------------------------- + + +class TestCDIdConversion: + def test_standard_districts_use_ssdd(self) -> None: + # state_fips * 100 + district, matching the enhanced-CPS targets. + assert cd_id_to_congressional_district_geoid("CA-01") == 601 + assert cd_id_to_congressional_district_geoid("CA-52") == 652 + assert cd_id_to_congressional_district_geoid("NC-01") == 3701 + assert cd_id_to_congressional_district_geoid("AL-02") == 102 + assert cd_id_to_congressional_district_geoid("NY-12") == 3612 + + def test_at_large_states_use_district_one(self) -> None: + # At-large encoding matches the enhanced-CPS calibration targets exactly. + assert cd_id_to_congressional_district_geoid("AK-AL") == 201 + assert cd_id_to_congressional_district_geoid("DE-AL") == 1001 + assert cd_id_to_congressional_district_geoid("DC-AL") == 1101 + assert cd_id_to_congressional_district_geoid("ND-AL") == 3801 + assert cd_id_to_congressional_district_geoid("SD-AL") == 4601 + assert cd_id_to_congressional_district_geoid("VT-AL") == 5001 + assert cd_id_to_congressional_district_geoid("WY-AL") == 5601 + assert AT_LARGE_DISTRICT_NUMBER == 1 + + def test_unparseable_returns_none(self) -> None: + assert cd_id_to_congressional_district_geoid(None) is None + assert cd_id_to_congressional_district_geoid(float("nan")) is None + assert cd_id_to_congressional_district_geoid("") is None + assert cd_id_to_congressional_district_geoid("ZZ-01") is None + assert cd_id_to_congressional_district_geoid("06013") is None + + def test_state_component_recovers_fips(self) -> None: + # geoid // 100 must recover the state FIPS for every state. + for fips, abbr in US_STATE_ABBR_BY_FIPS.items(): + geoid = cd_id_to_congressional_district_geoid(f"{abbr}-01") + assert geoid is not None + assert geoid // 100 == int(fips) + + def test_series_conversion_is_nullable_int(self) -> None: + series = pd.Series(["CA-01", "DC-AL", None, "ZZ-99"]) + result = cd_id_series_to_congressional_district_geoid(series) + assert str(result.dtype) == "Int64" + assert result.tolist()[:2] == [601, 1101] + assert pd.isna(result.iloc[2]) + assert pd.isna(result.iloc[3]) + + +# --------------------------------------------------------------------------- +# Household block-geography assignment (real crosswalk) +# --------------------------------------------------------------------------- + + +class TestHouseholdBlockAssignment: + def test_state_partitioned_draw_produces_valid_geoids(self) -> None: + geography = _require_crosswalk() + households = pd.DataFrame( + { + "household_id": [1, 2, 3, 4, 5, 6], + "state_fips": [6, 36, 48, 1, 11, 56], + } + ) + out = assign_household_block_geography( + households, block_geography=geography, random_state=42 + ) + + for leaf in GEOID_LEAVES: + assert leaf in out.columns + + block = out["block_geoid"].astype(str) + tract = out["tract_geoid"].astype(str) + cd = out["congressional_district_geoid"] + + # block_geoid: real 15-digit GEOIDs. + assert (block.str.len() == BLOCK_GEOID_LEN).all() + assert block.str.isdigit().all() + + # tract_geoid is the TRUE 11-char prefix of block_geoid. + assert (tract == block.str[:TRACT_GEOID_LEN]).all() + assert (tract.str.len() == TRACT_GEOID_LEN).all() + + # block state prefix matches the requested household state. + assert (block.str[:2].astype(int) == out["state_fips"].astype(int)).all() + + # congressional_district_geoid resolves for every household and is + # consistent with the household state. + assert cd.notna().all() + # assign_household_block_geography fills unresolved CDs with 0 and exports + # a plain int64 column (matching PE-US's integer congressional_district_geoid). + assert cd.dtype.kind in {"i", "u"} + assert ((cd // 100).astype(int) == out["state_fips"].astype(int)).all() + # 3- or 4-digit SSDD values only. + assert cd.astype(int).astype(str).str.len().isin([3, 4]).all() + + def test_at_large_state_resolves_to_district_one(self) -> None: + geography = _require_crosswalk() + households = pd.DataFrame({"household_id": [1], "state_fips": [11]}) # DC + out = assign_household_block_geography( + households, block_geography=geography, random_state=0 + ) + assert int(out["congressional_district_geoid"].iloc[0]) == 1101 + + def test_county_partitioned_draw_lands_in_county(self) -> None: + geography = _require_crosswalk() + # Disclosed CPS counties: LA County, NY County (Manhattan), Cook County. + households = pd.DataFrame( + { + "household_id": [1, 2, 3], + "state_fips": [6, 36, 17], + "county_fips": ["06037", "36061", "17031"], + } + ) + out = assign_household_block_geography( + households, block_geography=geography, random_state=42 + ) + block = out["block_geoid"].astype(str) + tract = out["tract_geoid"].astype(str) + # Block's 5-char county prefix matches the household's CPS county. + assert (block.str[:5] == out["county_fips"]).all() + assert (tract == block.str[:TRACT_GEOID_LEN]).all() + assert out["congressional_district_geoid"].notna().all() + + def test_suppressed_county_falls_back_to_state(self) -> None: + geography = _require_crosswalk() + # CPS suppresses county for confidentiality in many records (GTCO == 0). + households = pd.DataFrame( + { + "household_id": [1, 2], + "state_fips": [6, 48], + "county_fips": ["00000", "0"], + } + ) + out = assign_household_block_geography( + households, block_geography=geography, random_state=1 + ) + block = out["block_geoid"].astype(str) + assert (block.str.len() == BLOCK_GEOID_LEN).all() + # Still lands in the correct state via the state-partition fallback. + assert (block.str[:2].astype(int) == out["state_fips"].astype(int)).all() + + def test_cd_partitioned_draw_preserves_existing_cd(self) -> None: + geography = _require_crosswalk() + households = pd.DataFrame( + { + "household_id": [10, 11, 12], + "state_fips": [6, 6, 37], + "congressional_district_geoid": [649, 612, 3701], + } + ) + out = assign_household_block_geography( + households, block_geography=geography, random_state=7 + ) + # CD preserved exactly; block drawn from WITHIN that CD. + assert out["congressional_district_geoid"].tolist() == [649, 612, 3701] + block = out["block_geoid"].astype(str) + tract = out["tract_geoid"].astype(str) + assert (block.str.len() == BLOCK_GEOID_LEN).all() + assert (tract == block.str[:TRACT_GEOID_LEN]).all() + + def test_reproducible_with_seed(self) -> None: + geography = _require_crosswalk() + households = pd.DataFrame( + {"household_id": list(range(20)), "state_fips": [6] * 20} + ) + first = assign_household_block_geography( + households, block_geography=geography, random_state=123 + ) + second = assign_household_block_geography( + households, block_geography=geography, random_state=123 + ) + pd.testing.assert_series_equal(first["block_geoid"], second["block_geoid"]) + + def test_does_not_clobber_state_or_county(self) -> None: + geography = _require_crosswalk() + households = pd.DataFrame( + { + "household_id": [1, 2], + "state_fips": [6, 36], + "county_fips": ["06037", "36061"], + } + ) + out = assign_household_block_geography( + households, block_geography=geography, random_state=1 + ) + assert out["state_fips"].tolist() == [6, 36] + assert out["county_fips"].tolist() == ["06037", "36061"] + + def test_missing_state_column_raises(self) -> None: + geography = _require_crosswalk() + with pytest.raises(ValueError, match="state_fips"): + assign_household_block_geography( + pd.DataFrame({"household_id": [1]}), block_geography=geography + ) + + +# --------------------------------------------------------------------------- +# Allowlist membership + export wiring (no PolicyEngine-US dependency) +# --------------------------------------------------------------------------- + + +class _FakeEntity: + def __init__(self, key: str) -> None: + self.key = key + + +class _FakeVariable: + def __init__(self, entity: _FakeEntity, formulas: dict | None = None) -> None: + self.entity = entity + self.formulas = formulas or {} + + +class _FakeSystem: + """Mimics the storable-INPUT geo variables in policyengine-us.""" + + variables = { + "employment_income": _FakeVariable(_FakeEntity("person")), + "state_fips": _FakeVariable(_FakeEntity("household")), + "county_fips": _FakeVariable(_FakeEntity("household")), + "block_geoid": _FakeVariable(_FakeEntity("household")), + "tract_geoid": _FakeVariable(_FakeEntity("household")), + "congressional_district_geoid": _FakeVariable(_FakeEntity("household")), + } + + +class TestGeoidExportWiring: + def test_all_three_leaves_in_allowlist(self) -> None: + for leaf in GEOID_LEAVES: + assert leaf in SAFE_POLICYENGINE_US_EXPORT_VARIABLES + + def _tables(self) -> PolicyEngineUSEntityTableBundle: + households = pd.DataFrame( + { + "household_id": [0, 1, 2], + "household_weight": [100.0, 200.0, 300.0], + "state_fips": [6, 36, 11], + "county_fips": ["06037", "36061", "11001"], + "block_geoid": [ + "060372073021001", + "360610001001000", + "110010091022000", + ], + "tract_geoid": ["06037207302", "36061000100", "11001009102"], + "congressional_district_geoid": [630, 3612, 1101], + } + ) + persons = pd.DataFrame( + { + "person_id": [0, 1, 2, 3], + "household_id": [0, 0, 1, 2], + "age": [40, 10, 30, 55], + "is_household_head": [True, False, True, True], + } + ) + return PolicyEngineUSEntityTableBundle(households=households, persons=persons) + + def test_export_map_includes_geoids(self) -> None: + tables = self._tables() + export_maps = build_policyengine_us_export_variable_maps( + tables, tax_benefit_system=_FakeSystem() + ) + for leaf in GEOID_LEAVES: + assert export_maps["household"].get(leaf) == leaf + + def test_geoids_not_excluded_by_guard(self) -> None: + tables = self._tables() + export_maps = build_policyengine_us_export_variable_maps( + tables, tax_benefit_system=_FakeSystem() + ) + excluded = resolve_policyengine_excluded_export_variables( + _FakeSystem(), + sorted({t for m in export_maps.values() for t in m.values()}), + ) + for leaf in GEOID_LEAVES: + assert leaf not in excluded + + def test_written_h5_carries_geoids_with_tract_prefix(self, tmp_path: Path) -> None: + h5py = pytest.importorskip("h5py") + tables = self._tables() + export_maps = build_policyengine_us_export_variable_maps( + tables, tax_benefit_system=_FakeSystem() + ) + excluded = resolve_policyengine_excluded_export_variables( + _FakeSystem(), + sorted({t for m in export_maps.values() for t in m.values()}), + ) + arrays = build_policyengine_us_time_period_arrays( + tables, + period=2024, + household_variable_map=export_maps["household"], + person_variable_map=export_maps["person"], + tax_unit_variable_map=export_maps["tax_unit"], + spm_unit_variable_map=export_maps["spm_unit"], + family_variable_map=export_maps["family"], + ) + out = tmp_path / "pe.h5" + write_policyengine_us_time_period_dataset( + arrays, out, excluded_variables=excluded + ) + with h5py.File(out, "r") as handle: + for leaf in GEOID_LEAVES: + assert leaf in handle, f"{leaf} missing from H5" + block = [ + value.decode() if isinstance(value, bytes) else value + for value in np.asarray(handle["block_geoid"]["2024"]).tolist() + ] + tract = [ + value.decode() if isinstance(value, bytes) else value + for value in np.asarray(handle["tract_geoid"]["2024"]).tolist() + ] + cd = np.asarray(handle["congressional_district_geoid"]["2024"]) + + assert all(len(b) == BLOCK_GEOID_LEN for b in block) + assert all(t == b[:TRACT_GEOID_LEN] for t, b in zip(tract, block)) + assert cd.dtype.kind in {"i", "u"} + assert cd.tolist() == [630, 3612, 1101] + + +# --------------------------------------------------------------------------- +# Cross-check against the enhanced-CPS calibration-target CD universe +# --------------------------------------------------------------------------- + + +def _ecps_targets_db() -> Path | None: + candidate = Path( + "/Users/maxghenis/PolicyEngine/policyengine-us-data/" + "policyengine_us_data/storage/calibration/policy_data.db" + ) + return candidate if candidate.exists() else None + + +class TestEnhancedCPSParity: + def test_cd_universe_matches_enhanced_cps_targets(self) -> None: + db_path = _ecps_targets_db() + if db_path is None: + pytest.skip("Enhanced-CPS calibration-target DB not available") + geography = _require_crosswalk() + + connection = sqlite3.connect(db_path) + try: + rows = connection.execute( + "SELECT DISTINCT value FROM stratum_constraints " + "WHERE constraint_variable = 'congressional_district_geoid'" + ).fetchall() + finally: + connection.close() + target_cds = {int(row[0]) for row in rows} + + crosswalk_cds = { + cd_id_to_congressional_district_geoid(cd_id) + for cd_id in geography.data["cd_id"].dropna().unique() + } + crosswalk_cds.discard(None) + + # Every block-crosswalk CD maps to a real enhanced-CPS target CD and the + # universes coincide exactly (at-large encoding agrees). + assert crosswalk_cds == target_cds