Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 270 additions & 0 deletions src/microplex_us/geography.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
TRACT_GEOID_LEN = STATE_LEN + COUNTY_LEN + TRACT_LEN
BLOCK_GEOID_LEN = STATE_LEN + COUNTY_LEN + TRACT_LEN + BLOCK_LEN

# At-large (single-district) states use district number 1 in the integer
# ``congressional_district_geoid`` (SSDD) convention. This matches the
# enhanced-CPS calibration targets exactly: AK->201, DE->1001, DC->1101,
# ND->3801, SD->4601, VT->5001, WY->5601. The block crosswalk encodes these
# districts with the ``"AL"`` suffix (e.g. ``"DC-AL"``).
AT_LARGE_DISTRICT_NUMBER = 1
AT_LARGE_DISTRICT_SUFFIX = "AL"

PACKAGE_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_DATA_DIR_CANDIDATES = (
PACKAGE_ROOT / "data",
Expand Down Expand Up @@ -106,6 +114,10 @@
"72": "PR",
}

US_STATE_FIPS_BY_ABBR: dict[str, int] = {
abbr: int(fips) for fips, abbr in US_STATE_ABBR_BY_FIPS.items()
}


def load_block_probabilities(path: str | Path | None = None) -> pd.DataFrame:
"""Load US Census block probabilities from parquet."""
Expand Down Expand Up @@ -437,6 +449,257 @@ def derive_geographies(
return result


def cd_id_to_congressional_district_geoid(cd_id: Any) -> int | None:
"""Convert a block-crosswalk ``cd_id`` to the PE-US integer GEOID.

The block probabilities crosswalk stores congressional districts as
``"<state_abbr>-<district>"`` strings, e.g. ``"CA-01"`` or the at-large
sentinel ``"DC-AL"``. PolicyEngine-US (and the enhanced-CPS calibration
targets) store ``congressional_district_geoid`` as an ``SSDD`` integer:
``state_fips * 100 + district_number``.

At-large (single-district) states are encoded as district ``1`` to match the
enhanced-CPS calibration-target universe exactly (e.g. ``"AK-AL"`` -> 201,
``"DC-AL"`` -> 1101, ``"WY-AL"`` -> 5601). Returns ``None`` for missing or
unparseable values rather than fabricating a geoid.
"""
if cd_id is None:
return None
try:
if pd.isna(cd_id):
return None
except (TypeError, ValueError):
pass
text = str(cd_id).strip()
if "-" not in text:
return None
abbr, district = text.rsplit("-", 1)
state_fips = US_STATE_FIPS_BY_ABBR.get(abbr.strip().upper())
if state_fips is None:
return None
district = district.strip().upper()
if district == AT_LARGE_DISTRICT_SUFFIX:
district_number = AT_LARGE_DISTRICT_NUMBER
elif district.isdigit():
district_number = int(district)
else:
return None
return state_fips * 100 + district_number


def cd_id_series_to_congressional_district_geoid(values: pd.Series) -> pd.Series:
"""Vectorized :func:`cd_id_to_congressional_district_geoid` over a Series.

Returns a nullable ``Int64`` Series so unmapped districts surface as ``<NA>``
rather than silently becoming a real geoid.
"""
mapping = {
cd_id: cd_id_to_congressional_district_geoid(cd_id)
for cd_id in pd.Series(values).dropna().astype(str).unique()
}
return pd.Series(values).astype("string").map(mapping).astype("Int64")


def assign_household_block_geography(
households: pd.DataFrame,
*,
block_geography: BlockGeography | None = None,
block_data: pd.DataFrame | None = None,
state_column: str = "state_fips",
county_column: str = "county_fips",
cd_geoid_column: str = "congressional_district_geoid",
random_state: int | None = None,
) -> pd.DataFrame:
"""Attach Census block geography (``block_geoid``/``tract_geoid``/CD) to households.

Each household is assigned a real 15-digit Census ``block_geoid`` via a
population-weighted draw from the block-probabilities crosswalk, mirroring the
enhanced-CPS block-assignment method. ``tract_geoid`` is then the true
11-character prefix of ``block_geoid`` and ``congressional_district_geoid`` is
looked up from the block's crosswalk ``cd_id``.

The block draw is partitioned using the most specific geography available for
each household, in priority order:

1. ``county_fips`` when present and matching a crosswalk county (the CPS source
county is the finest real signal),
2. then an existing ``congressional_district_geoid`` (eCPS-style, preserves a
known CD),
3. then ``state_fips``.

Households whose partition has no matching blocks keep empty/NA geography rather
than receiving a fabricated geoid.

Returns a copy of ``households`` with ``block_geoid`` (str), ``tract_geoid``
(str), and ``congressional_district_geoid`` (Int64) columns added or updated.
Existing ``state_fips``/``county_fips`` columns are left untouched.
"""
if state_column not in households.columns:
raise ValueError(
f"Household table must contain '{state_column}' to assign block geography"
)

geography = block_geography
if geography is None:
geography = (
BlockGeography.from_data(block_data)
if block_data is not None
else BlockGeography(lazy_load=False)
)

crosswalk = geography.data
cd_id_by_geoid = dict(zip(crosswalk["geoid"].astype(str), crosswalk["cd_id"]))

result = households.copy()
n = len(result)
block_geoids = pd.Series([pd.NA] * n, index=result.index, dtype="string")
unresolved = pd.Series([True] * n, index=result.index)
rng_seed = random_state

# --- Partition 1: county_fips (most specific CPS-source geography) ---------
if county_column in result.columns:
county_values = result[county_column].map(normalize_us_county_fips)
county_blocks = _county_block_index(crosswalk)
if county_blocks:
resolvable = (
unresolved
& county_values.notna()
& county_values.isin(county_blocks.keys())
)
for county_fips, group in result.loc[resolvable].groupby(
county_values.loc[resolvable]
):
frame = county_blocks.get(str(county_fips))
if frame is None or frame.empty:
continue
block_geoids.loc[group.index] = _weighted_block_sample(
frame, len(group), rng_seed
)
unresolved.loc[group.index] = False

# --- Partition 2: existing congressional_district_geoid (eCPS-style) -------
if unresolved.any() and cd_geoid_column in result.columns:
existing_cd = pd.to_numeric(result[cd_geoid_column], errors="coerce")
cd_blocks = _cd_geoid_block_index(crosswalk)
if cd_blocks:
resolvable = unresolved & existing_cd.notna()
for cd_geoid, group in result.loc[resolvable].groupby(
existing_cd.loc[resolvable].astype(int)
):
frame = cd_blocks.get(int(cd_geoid))
if frame is None or frame.empty:
continue
block_geoids.loc[group.index] = _weighted_block_sample(
frame, len(group), rng_seed
)
unresolved.loc[group.index] = False

# --- Partition 3: state_fips (always-available fallback) -------------------
if unresolved.any():
state_values = _normalize_us_state_fips_series(result[state_column])
state_blocks = _state_block_index(crosswalk)
resolvable = unresolved & state_values.notna()
for state_fips, group in result.loc[resolvable].groupby(
state_values.loc[resolvable]
):
frame = state_blocks.get(str(state_fips))
if frame is None or frame.empty:
continue
block_geoids.loc[group.index] = _weighted_block_sample(
frame, len(group), rng_seed
)
unresolved.loc[group.index] = False

result["block_geoid"] = block_geoids.fillna("").astype("string")
tract = block_geoids.str[:TRACT_GEOID_LEN]
result["tract_geoid"] = tract.fillna("").astype("string")
cd_id_series = block_geoids.map(cd_id_by_geoid)
# Unresolved households fall back to the policyengine-us defaults: empty
# block/tract strings and CD geoid 0 (PE-US ``congressional_district_geoid``
# default_value), so the export stays a clean integer array. In practice the
# state-partition fallback resolves every household with a valid state FIPS.
cd_geoids = cd_id_series_to_congressional_district_geoid(cd_id_series)
result[cd_geoid_column] = cd_geoids.fillna(0).astype("int64")
return result


def _block_sample_columns(crosswalk: pd.DataFrame) -> list[str]:
return [
column
for column in ("geoid", "prob", "population", "national_prob")
if column in crosswalk.columns
]


def _county_block_index(crosswalk: pd.DataFrame) -> dict[str, pd.DataFrame]:
if "county_fips" in crosswalk.columns:
county = crosswalk["county_fips"].astype(str).str.zfill(COUNTY_GEOID_LEN)
elif {"state_fips", "county"}.issubset(crosswalk.columns):
county = crosswalk["state_fips"].astype(str).str.zfill(STATE_GEOID_LEN) + (
crosswalk["county"].astype(str).str.zfill(COUNTY_LEN)
)
else:
return {}
columns = _block_sample_columns(crosswalk)
return {
str(county_fips): group[columns].copy()
for county_fips, group in crosswalk.groupby(county, sort=False)
}


def _state_block_index(crosswalk: pd.DataFrame) -> dict[str, pd.DataFrame]:
state = crosswalk["state_fips"].astype(str).str.zfill(STATE_GEOID_LEN)
columns = _block_sample_columns(crosswalk)
return {
str(state_fips): group[columns].copy()
for state_fips, group in crosswalk.groupby(state, sort=False)
}


def _cd_geoid_block_index(crosswalk: pd.DataFrame) -> dict[int, pd.DataFrame]:
if "cd_id" not in crosswalk.columns:
return {}
columns = _block_sample_columns(crosswalk)
frames: dict[int, list[pd.DataFrame]] = {}
for cd_id, group in crosswalk.groupby("cd_id", sort=False):
geoid = cd_id_to_congressional_district_geoid(cd_id)
if geoid is None:
continue
frames.setdefault(int(geoid), []).append(group[columns])
return {
geoid: pd.concat(parts, ignore_index=True) for geoid, parts in frames.items()
}


def _weighted_block_sample(
frame: pd.DataFrame,
n: int,
random_state: int | None,
) -> np.ndarray:
"""Draw ``n`` block GEOIDs weighted by block population/probability."""
geoids = frame["geoid"].astype(str).to_numpy()
weights = _block_sampling_weights(frame)
rng = np.random.default_rng(random_state)
indices = rng.choice(len(geoids), size=n, replace=True, p=weights)
return geoids[indices]


def _block_sampling_weights(frame: pd.DataFrame) -> np.ndarray:
if "prob" in frame.columns:
weights = pd.to_numeric(frame["prob"], errors="coerce").to_numpy(dtype=float)
elif "population" in frame.columns:
weights = pd.to_numeric(
frame["population"], errors="coerce"
).to_numpy(dtype=float)
else:
weights = np.ones(len(frame), dtype=float)
weights = np.where(np.isfinite(weights) & (weights > 0), weights, 0.0)
total = weights.sum()
if total <= 0:
return np.full(len(frame), 1.0 / len(frame))
return weights / total


class BlockGeography(GeographyProvider):
"""US atomic-geography provider backed by Census blocks."""

Expand Down Expand Up @@ -759,5 +1022,12 @@ def __repr__(self) -> str:
"state_nonmetro_spm_area_code",
"add_spm_metro_area_geography",
"derive_geographies",
"cd_id_to_congressional_district_geoid",
"cd_id_series_to_congressional_district_geoid",
"assign_household_block_geography",
"AT_LARGE_DISTRICT_NUMBER",
"AT_LARGE_DISTRICT_SUFFIX",
"US_STATE_ABBR_BY_FIPS",
"US_STATE_FIPS_BY_ABBR",
"BlockGeography",
]
71 changes: 71 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,18 @@ class USMicroplexBuildConfig:
"""Normalized Forbes fixed-spine records to append after calibration."""
forbes_fixed_spine_snapshot_id: str = "forbes-us-top-tail"
forbes_fixed_spine_replicates_per_unit: int = 10
policyengine_assign_block_geography: bool = True
"""Assign Census block geography to exported households.

When enabled (default), the PolicyEngine entity-table build draws a real
15-digit ``block_geoid`` for each household from the population-weighted
block-probabilities crosswalk, then derives ``tract_geoid`` (the 11-char
block prefix) and the integer ``congressional_district_geoid``. This mirrors
the enhanced-CPS block-assignment method so the export carries the same three
GEOID leaves. Silently skipped when the crosswalk data is unavailable.
"""
block_probabilities_path: str | Path | None = None
"""Optional override path to the block-probabilities crosswalk parquet."""

def __post_init__(self) -> None:
if (
Expand Down Expand Up @@ -6082,8 +6094,67 @@ def _build_policyengine_households(self, persons: pd.DataFrame) -> pd.DataFrame:
.agg(aggregations)
.rename(columns={"weight": "household_weight"})
)
households = self._assign_household_block_geography(households)
return households

def _assign_household_block_geography(
self,
households: pd.DataFrame,
) -> pd.DataFrame:
"""Attach block_geoid/tract_geoid/congressional_district_geoid to households.

Draws a real Census block per household from the population-weighted
crosswalk (partitioned by the household's CPS county when disclosed, then by
an existing congressional district, then by state), derives
``tract_geoid = block_geoid[:11]`` and the integer
``congressional_district_geoid``. Mirrors the enhanced-CPS block assignment so
the export carries the same GEOID leaves. No-op when disabled or when
``state_fips`` / the crosswalk data is unavailable — geoids are never
fabricated.
"""
if not self.config.policyengine_assign_block_geography:
return households
if "state_fips" not in households.columns or households.empty:
return households
try:
from microplex_us.geography import (
BLOCK_GEOID_LEN,
BlockGeography,
assign_household_block_geography,
default_runtime_block_probabilities_path,
)
except ImportError:
return households

configured_path = self.config.block_probabilities_path
crosswalk_path = (
Path(configured_path)
if configured_path is not None
else default_runtime_block_probabilities_path()
)
if crosswalk_path is None or not Path(crosswalk_path).exists():
_emit_us_pipeline_progress(
"block_geography_skipped",
reason="crosswalk_unavailable",
path=str(crosswalk_path) if crosswalk_path is not None else None,
)
return households

geography = BlockGeography(crosswalk_path, lazy_load=False)
assigned = assign_household_block_geography(
households,
block_geography=geography,
random_state=self.config.random_seed,
)
_emit_us_pipeline_progress(
"block_geography_assigned",
households=int(len(assigned)),
assigned_blocks=int(
assigned["block_geoid"].astype(str).str.len().eq(BLOCK_GEOID_LEN).sum()
),
)
return assigned

def _build_policyengine_tax_units(
self,
persons: pd.DataFrame,
Expand Down
8 changes: 8 additions & 0 deletions src/microplex_us/policyengine/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ class PolicyEngineUSVariableMaterializationResult:
"tenure_type",
"state_fips",
"county_fips",
# Census block geography GEOID leaves, mirroring the enhanced-CPS export.
# All three are storable INPUT variables in policyengine-us (no formula):
# block_geoid/tract_geoid are strings, congressional_district_geoid is the
# integer SSDD GEOID. tract_geoid is exported as the true 11-char prefix of
# block_geoid; congressional_district_geoid is derived from the block's CD.
"block_geoid",
"tract_geoid",
"congressional_district_geoid",
} | set(POLICYENGINE_US_TAKEUP_INPUT_VARIABLES)

POLICYENGINE_US_EXPORT_COLUMN_ALIASES: dict[str, str] = {
Expand Down
Loading
Loading