Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 223 additions & 3 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections import Counter
from collections.abc import Iterable
from dataclasses import asdict, dataclass, field
from functools import lru_cache
from pathlib import Path
from tempfile import TemporaryDirectory
from types import FunctionType
Expand Down Expand Up @@ -36,6 +37,7 @@
TimeStructure,
)
from microplex.fusion import FusionPlan
from microplex.geography import GeographyQuery
from microplex.hierarchical import TaxUnitOptimizer
from microplex.synthesizer import Synthesizer
from microplex.targets import TargetQuery, TargetSpec
Expand All @@ -47,6 +49,10 @@
build_forbes_fixed_spine,
residualize_targets_for_fixed_spine,
)
from microplex_us.geography import (
BlockGeography,
normalize_us_county_fips,
)
from microplex_us.pe_source_impute_engine import (
PE_SOURCE_IMPUTE_BLOCK_ENGINE,
PESourceImputeBlockRunRequest,
Expand Down Expand Up @@ -107,6 +113,174 @@
LOGGER = logging.getLogger(__name__)


@lru_cache(maxsize=1)
def _default_block_geography() -> BlockGeography:
return BlockGeography()


def _normalize_household_county_fips_series(
county_fips: pd.Series,
state_fips: pd.Series,
) -> pd.Series:
"""Normalize CPS county fragments into PE's five-digit county FIPS values."""
county_numeric = pd.to_numeric(county_fips, errors="coerce")
state_numeric = pd.to_numeric(state_fips, errors="coerce")
combined = county_numeric.copy()
county_fragment_mask = (
county_numeric.notna()
& county_numeric.gt(0)
& county_numeric.lt(1000)
& state_numeric.notna()
& state_numeric.gt(0)
)
combined.loc[county_fragment_mask] = (
state_numeric.loc[county_fragment_mask].round().astype(int) * 1000
+ county_numeric.loc[county_fragment_mask].round().astype(int)
)
normalized = combined.round().astype("Int64").astype("string").str.zfill(5)
invalid = combined.isna() | combined.le(0)
return normalized.mask(invalid).astype("string")


def _normalize_household_state_fips_series(state_fips: pd.Series) -> pd.Series:
numeric = pd.to_numeric(state_fips, errors="coerce")
normalized = numeric.round().astype("Int64").astype("string").str.zfill(2)
return normalized.mask(numeric.isna() | numeric.le(0)).astype("string")


def _congressional_district_geoid_from_cd_id(
cd_id: Any,
state_fips: Any,
) -> int:
try:
state = int(str(state_fips).strip())
except (TypeError, ValueError):
return 0
cd_text = str(cd_id).strip()
if not cd_text or cd_text.lower() in {"nan", "none", "<na>"}:
return 0
district_token = cd_text.split("-")[-1]
# eCPS normalizes at-large districts to 01: the raw Census codes "AL"/"ZZ"
# (at-large) and "98" (DC) map to district 0, which is then bumped to 1
# (policyengine-us-data db/create_initial_strata.py). Microplex's crosswalk
# feeds the "-AL" token, but accept the raw Census forms too so the encoder
# stays faithful to the eCPS 436-CD universe regardless of input convention.
if district_token.upper() in {"AL", "ZZ"}:
district = 1
else:
try:
district = int(district_token)
except ValueError:
return 0
if district in (0, 98):
district = 1
return state * 100 + district


def _attach_household_census_geographies(
households: pd.DataFrame,
*,
seed: int,
geography: BlockGeography | None = None,
) -> pd.DataFrame:
"""Attach eCPS-contract block, tract, county, and CD geographies to households."""
# Intermediate frames are indexed by row label and written back via .loc;
# a non-unique household-frame index makes those reindex operations ambiguous
# (ValueError: cannot reindex on an axis with duplicate labels). The caller
# consumes this result by merging on the household_id column, not the index,
# so collapsing to a fresh RangeIndex here is both safe and robust.
result = households.reset_index(drop=True)
for column, default in (
("block_geoid", ""),
("tract_geoid", ""),
("congressional_district_geoid", 0),
):
if column not in result.columns:
result[column] = default
if result.empty or "state_fips" not in result.columns:
return result

assigned_blocks = pd.Series(pd.NA, index=result.index, dtype="string")
state_values = _normalize_household_state_fips_series(result["state_fips"])
county_values = (
_normalize_household_county_fips_series(result["county_fips"], result["state_fips"])
if "county_fips" in result.columns
else pd.Series(pd.NA, index=result.index, dtype="string")
)
result["county_fips"] = county_values.fillna("00000")

try:
block_geography = geography or _default_block_geography()
block_data = block_geography.data
except FileNotFoundError:
return result

valid_counties = set(block_data["county_fips"].dropna().astype(str))
county_mask = county_values.isin(valid_counties)
if county_mask.any():
county_query = GeographyQuery(
partition_columns=("county_fips",),
partition_normalizers={"county_fips": normalize_us_county_fips},
)
county_assigner = block_geography.load_assigner(county_query)
county_frame = pd.DataFrame(
{"county_fips": county_values.loc[county_mask]},
index=result.index[county_mask],
)
assigned = county_assigner.assign(
county_frame,
random_state=seed,
)
assigned_blocks.loc[assigned.index] = assigned["block_geoid"].astype("string")

remaining_mask = assigned_blocks.isna()
state_mask = remaining_mask & state_values.notna()
if state_mask.any():
state_frame = pd.DataFrame(
{"state_fips": state_values.loc[state_mask]},
index=result.index[state_mask],
)
assigned = block_geography.assign(
state_frame,
random_state=seed + 1,
)
assigned_blocks.loc[assigned.index] = assigned["block_geoid"].astype("string")

assigned_mask = assigned_blocks.notna()
if not assigned_mask.any():
return result

materialized = block_geography.materialize(
pd.DataFrame(
{
"_row_index": assigned_blocks.index[assigned_mask],
"block_geoid": assigned_blocks.loc[assigned_mask].astype(str),
},
index=result.index[assigned_mask],
),
columns=("state_fips", "county_fips", "tract_geoid", "cd_id"),
)
row_index = materialized["_row_index"].to_numpy()
for column in ("block_geoid", "tract_geoid", "county_fips"):
if column in materialized.columns:
result.loc[row_index, column] = materialized[column].to_numpy()
result.loc[row_index, "state_fips"] = (
pd.to_numeric(materialized["state_fips"], errors="coerce")
.fillna(0)
.astype(int)
.to_numpy()
)
result.loc[row_index, "congressional_district_geoid"] = [
_congressional_district_geoid_from_cd_id(cd_id, state_fips)
for cd_id, state_fips in zip(
materialized.get("cd_id", pd.Series(index=row_index)),
materialized["state_fips"],
strict=False,
)
]
return result


def _root_logger_has_handlers() -> bool:
return bool(logging.getLogger().handlers)

Expand Down Expand Up @@ -2133,6 +2307,10 @@ def prepare_seed_data_from_source(
hh["county_fips"] = 0
if "tenure" not in household_coverage or "tenure" not in hh.columns:
hh["tenure"] = 0
hh = _attach_household_census_geographies(
hh,
seed=self.config.random_seed,
)

required_person_defaults = {
"age": 0,
Expand All @@ -2145,13 +2323,31 @@ def prepare_seed_data_from_source(
if column not in person_coverage or column not in persons_df.columns:
persons_df[column] = default

household_seed_columns = [
"household_id",
"state_fips",
"county_fips",
"hh_weight",
"tenure",
"block_geoid",
"tract_geoid",
"congressional_district_geoid",
]
seed_data = persons_df.merge(
hh[["household_id", "state_fips", "county_fips", "hh_weight", "tenure"]],
hh[[column for column in household_seed_columns if column in hh.columns]],
on="household_id",
how="left",
suffixes=("", "__household"),
)
for column in ("state_fips", "county_fips", "hh_weight", "tenure"):
for column in (
"state_fips",
"county_fips",
"hh_weight",
"tenure",
"block_geoid",
"tract_geoid",
"congressional_district_geoid",
):
household_column = f"{column}__household"
if household_column not in seed_data.columns:
continue
Expand All @@ -2165,7 +2361,24 @@ def prepare_seed_data_from_source(
seed_data["hh_weight"] = seed_data["hh_weight"].fillna(1.0).astype(float)
seed_data["tenure"] = seed_data["tenure"].fillna(0).astype(int)
seed_data["state_fips"] = seed_data["state_fips"].fillna(0).astype(int)
seed_data["county_fips"] = seed_data["county_fips"].fillna(0).astype(int)
seed_data["county_fips"] = (
seed_data["county_fips"]
.map(normalize_us_county_fips)
.fillna("00000")
)
if "block_geoid" in seed_data.columns:
seed_data["block_geoid"] = seed_data["block_geoid"].fillna("").astype(str)
if "tract_geoid" in seed_data.columns:
seed_data["tract_geoid"] = seed_data["tract_geoid"].fillna("").astype(str)
if "congressional_district_geoid" in seed_data.columns:
seed_data["congressional_district_geoid"] = (
pd.to_numeric(
seed_data["congressional_district_geoid"],
errors="coerce",
)
.fillna(0)
.astype(int)
)
seed_data["income"] = pd.to_numeric(
seed_data["income"], errors="coerce"
).fillna(0.0)
Expand Down Expand Up @@ -6022,6 +6235,10 @@ def _finalize_synthetic_population(
result = synthetic.copy().reset_index(drop=True)
for column, default in {
"state_fips": 0,
"county_fips": "00000",
"block_geoid": "",
"tract_geoid": "",
"congressional_district_geoid": 0,
"tenure": 0,
"age": 0,
"sex": 0,
Expand Down Expand Up @@ -6066,6 +6283,9 @@ def _build_policyengine_households(self, persons: pd.DataFrame) -> pd.DataFrame:
for column in (
"state_fips",
"county_fips",
"block_geoid",
"tract_geoid",
"congressional_district_geoid",
"tenure",
"tenure_type",
"state",
Expand Down
3 changes: 3 additions & 0 deletions src/microplex_us/policyengine/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ class PolicyEngineUSVariableMaterializationResult:
"tenure_type",
"state_fips",
"county_fips",
"block_geoid",
"tract_geoid",
"congressional_district_geoid",
} | set(POLICYENGINE_US_TAKEUP_INPUT_VARIABLES)

POLICYENGINE_US_EXPORT_COLUMN_ALIASES: dict[str, str] = {
Expand Down
Loading
Loading