Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion src/microplex_us/data_sources/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

# Default cache directory
DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex"
CPS_ASEC_PROCESSED_CACHE_VERSION = "20260412pe_export_ssn"
CPS_ASEC_PROCESSED_CACHE_VERSION = "20260601_ecps_spm_takeup_inputs"

# CPS ASEC data URLs by year
CPS_URLS = {
Expand Down Expand Up @@ -96,6 +96,8 @@
"PMED_VAL": "other_medical_expenses",
"PEMCPREM": "medicare_part_b_premiums",
"WICYN": "_receives_wic",
"SPM_CAPHOUSESUB": "_spm_capped_housing_subsidy",
"SPM_ENGVAL": "spm_unit_energy_subsidy",
# Identifiers
"PH_SEQ": "household_id",
"GESTFIPS": "state_fips",
Expand Down Expand Up @@ -175,6 +177,7 @@
"social_security_retirement",
"social_security_survivors",
"social_security_dependents",
"spm_unit_energy_subsidy",
)

PERSON_ZERO_DEFAULT_VALUE_COLUMNS = (
Expand All @@ -190,6 +193,7 @@
"social_security_retirement",
"social_security_survivors",
"social_security_dependents",
"spm_unit_energy_subsidy",
)

PERSON_CACHE_REQUIRED_COLUMNS = (
Expand Down Expand Up @@ -1321,6 +1325,26 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame:
).drop("_receives_wic")
elif "_receives_wic" in result.columns:
result = result.drop("_receives_wic")
if (
"_spm_capped_housing_subsidy" in result.columns
and "receives_housing_assistance" not in result.columns
):
result = result.with_columns(
(pl.col("_spm_capped_housing_subsidy") > 0).alias(
"receives_housing_assistance"
)
)
if (
"receives_housing_assistance" in result.columns
and "takes_up_housing_assistance_if_eligible" not in result.columns
):
result = result.with_columns(
pl.col("receives_housing_assistance").alias(
"takes_up_housing_assistance_if_eligible"
)
)
if "_spm_capped_housing_subsidy" in result.columns:
result = result.drop("_spm_capped_housing_subsidy")
for value_column in PERSON_ZERO_DEFAULT_VALUE_COLUMNS:
if value_column not in result.columns:
result = result.with_columns(pl.lit(0.0).alias(value_column))
Expand All @@ -1333,6 +1357,13 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame:
):
if bool_column in result.columns:
result = result.with_columns((pl.col(bool_column) == 1).alias(bool_column))
if (
"has_medicare" in result.columns
and "takes_up_medicare_if_eligible" not in result.columns
):
result = result.with_columns(
pl.col("has_medicare").alias("takes_up_medicare_if_eligible")
)
for col in PERSON_NONNEGATIVE_VALUE_COLUMNS:
if col in result.columns:
result = result.with_columns(
Expand Down
2 changes: 2 additions & 0 deletions src/microplex_us/microdata_roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class PolicyEngineUSVariableRole(Enum):
"takes_up_early_head_start_if_eligible",
"takes_up_eitc",
"takes_up_head_start_if_eligible",
"takes_up_housing_assistance_if_eligible",
"takes_up_medicaid_if_eligible",
"takes_up_medicare_if_eligible",
"takes_up_snap_if_eligible",
"takes_up_ssi_if_eligible",
"takes_up_tanf_if_eligible",
Expand Down
26 changes: 26 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -4135,6 +4135,7 @@ def build_policyengine_entity_tables(
persons = self._assign_family_and_spm_units(persons)
families = self._collapse_group_table(persons, "family_id")
spm_units = self._collapse_group_table(persons, "spm_unit_id")
spm_units = self._attach_spm_unit_source_columns(persons, spm_units)
if "tenure_type" in persons.columns:
spm_tenure = (
persons.groupby("spm_unit_id", as_index=False)["tenure_type"]
Expand Down Expand Up @@ -7982,6 +7983,31 @@ def _collapse_group_table(
.astype({id_column: np.int64, "household_id": np.int64})
)

def _attach_spm_unit_source_columns(
self,
persons: pd.DataFrame,
spm_units: pd.DataFrame,
) -> pd.DataFrame:
"""Attach observed SPM-unit inputs carried on CPS person rows."""
if "spm_unit_id" not in persons.columns:
return spm_units

aggregation_by_column = {
"receives_housing_assistance": "max",
"takes_up_housing_assistance_if_eligible": "max",
"spm_unit_energy_subsidy": "first",
}
aggregations = {
column: aggregation
for column, aggregation in aggregation_by_column.items()
if column in persons.columns and column not in spm_units.columns
}
if not aggregations:
return spm_units

source_values = persons.groupby("spm_unit_id", as_index=False).agg(aggregations)
return spm_units.merge(source_values, on="spm_unit_id", how="left")

def _normalize_relationship_to_head(self, persons: pd.DataFrame) -> pd.Series:
family_normalized: pd.Series | None = None
if "family_relationship" in persons.columns:
Expand Down
4 changes: 4 additions & 0 deletions src/microplex_us/policyengine/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class PolicyEngineUSVariableMaterializationResult:
"child_support_received",
"charitable_cash_donations",
"charitable_non_cash_donations",
"receives_housing_assistance",
"receives_wic",
"cps_race",
"disability_benefits",
Expand Down Expand Up @@ -538,11 +539,13 @@ class PolicyEngineUSVariableMaterializationResult:
"snap_reported": "spm_unit",
"spm_unit_broadband_subsidy_reported": "spm_unit",
"spm_unit_capped_housing_subsidy_reported": "spm_unit",
"spm_unit_energy_subsidy": "spm_unit",
"spm_unit_energy_subsidy_reported": "spm_unit",
"spm_unit_federal_tax_reported": "spm_unit",
"spm_unit_payroll_tax_reported": "spm_unit",
"spm_unit_state_tax_reported": "spm_unit",
"spm_unit_wic_reported": "spm_unit",
"takes_up_housing_assistance_if_eligible": "spm_unit",
"tanf_reported": "person",
"taxpayer_id_type": "person",
}
Expand All @@ -564,6 +567,7 @@ class PolicyEngineUSVariableMaterializationResult:
# inputs for these fallback formulas.
"fsla_overtime_premium",
"meets_ssi_disability_criteria",
"spm_unit_energy_subsidy",
# social_security_retirement is a storable INPUT in the pinned pe-us
# (no formula), reconstructed from the CPS SS_VAL/RESNSS split. Some
# pe-us versions add a fallback formula; listing it here keeps the
Expand Down
30 changes: 30 additions & 0 deletions tests/pipelines/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,36 @@ def test_build_policyengine_entity_tables_preserves_household_contract_inputs(
"RENTER",
]

def test_build_policyengine_entity_tables_preserves_spm_source_inputs(
self,
):
pipeline = USMicroplexPipeline(USMicroplexBuildConfig())
population = pd.DataFrame(
{
"person_id": [1, 2, 3],
"household_id": [10, 10, 20],
"spm_unit_id": [100, 100, 200],
"weight": [1.0, 1.0, 2.0],
"age": [45, 12, 70],
"income": [60_000.0, 0.0, 25_000.0],
"relationship_to_head": [0, 2, 0],
"receives_housing_assistance": [False, True, False],
"takes_up_housing_assistance_if_eligible": [False, True, False],
"spm_unit_energy_subsidy": [90.0, 90.0, 0.0],
}
)

tables = pipeline.build_policyengine_entity_tables(population)
spm_units = tables.spm_units.sort_values("household_id").reset_index(drop=True)

assert len(spm_units) == 2
assert spm_units["receives_housing_assistance"].tolist() == [True, False]
assert spm_units["takes_up_housing_assistance_if_eligible"].tolist() == [
True,
False,
]
assert spm_units["spm_unit_energy_subsidy"].tolist() == [90.0, 0.0]

def test_build_policyengine_entity_tables_recomputes_child_count_contract_inputs(
self,
):
Expand Down
31 changes: 25 additions & 6 deletions tests/policyengine/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -2115,7 +2115,14 @@ def __init__(self, entity):
"unrecaptured_section_1250_gain",
"unreported_payroll_tax",
)
spm_unit_contract_inputs = ("spm_unit_tenure_type",)
spm_unit_contract_inputs = (
"receives_housing_assistance",
"spm_unit_tenure_type",
)
legacy_spm_unit_contract_inputs = (
"spm_unit_energy_subsidy",
"takes_up_housing_assistance_if_eligible",
)
legacy_person_contract_inputs = ("count_under_18", "count_under_6")

class FakeSystem:
Expand All @@ -2126,7 +2133,13 @@ class FakeSystem:
for name in household_contract_inputs
},
**{name: FakeVariable("tax_unit") for name in tax_unit_contract_inputs},
**{name: FakeVariable("spm_unit") for name in spm_unit_contract_inputs},
**{
name: FakeVariable("spm_unit")
for name in (
*spm_unit_contract_inputs,
*legacy_spm_unit_contract_inputs,
)
},
"self_employed_health_insurance_ald": FakeVariable("tax_unit"),
"self_employed_pension_contribution_ald": FakeVariable("tax_unit"),
}
Expand Down Expand Up @@ -2160,7 +2173,9 @@ class FakeSystem:
{
"spm_unit_id": [1000],
"household_id": [10],
**{name: ["RENTER"] for name in spm_unit_contract_inputs},
"receives_housing_assistance": [True],
"spm_unit_tenure_type": ["RENTER"],
**{name: [1.0] for name in legacy_spm_unit_contract_inputs},
}
),
)
Expand All @@ -2180,9 +2195,13 @@ class FakeSystem:
assert export_maps["tax_unit"] == {
name: name for name in tax_unit_contract_inputs
}
assert {name: name for name in spm_unit_contract_inputs}.items() <= export_maps[
"spm_unit"
].items()
assert {
name: name
for name in (
*spm_unit_contract_inputs,
*legacy_spm_unit_contract_inputs,
)
}.items() <= export_maps["spm_unit"].items()

def test_build_policyengine_us_export_variable_maps_blocks_computed_direct_overrides(
self,
Expand Down
6 changes: 6 additions & 0 deletions tests/test_cps_source_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path):
"MCARE": [1, 2],
"MCAID": [2, 1],
"WICYN": [1, 2],
"SPM_CAPHOUSESUB": [700, 0],
"SPM_ENGVAL": [90, -1],
"PHIP_VAL": [900, -1],
"POTC_VAL": [120, -1],
"PMED_VAL": [450, -1],
Expand All @@ -461,8 +463,12 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path):
assert persons["social_security_survivors"].tolist() == [0, 0]
assert persons["social_security_dependents"].tolist() == [0, 0]
assert persons["has_medicare"].tolist() == [True, False]
assert persons["takes_up_medicare_if_eligible"].tolist() == [True, False]
assert persons["has_medicaid"].tolist() == [False, True]
assert persons["receives_wic"].tolist() == [True, False]
assert persons["receives_housing_assistance"].tolist() == [True, False]
assert persons["takes_up_housing_assistance_if_eligible"].tolist() == [True, False]
assert persons["spm_unit_energy_subsidy"].tolist() == [90, 0]
assert persons["health_insurance_premiums_without_medicare_part_b"].tolist() == [900, 0]
assert persons["over_the_counter_health_expenses"].tolist() == [120, 0]
assert persons["other_medical_expenses"].tolist() == [450, 0]
Expand Down
Loading