diff --git a/src/microplex_us/data_sources/cps.py b/src/microplex_us/data_sources/cps.py index 457b366..aff0d7c 100644 --- a/src/microplex_us/data_sources/cps.py +++ b/src/microplex_us/data_sources/cps.py @@ -37,7 +37,7 @@ # Default cache directory DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex" -CPS_ASEC_PROCESSED_CACHE_VERSION = "20260412pe_export_ssn" +CPS_ASEC_PROCESSED_CACHE_VERSION = "20260601_ecps_spm_takeup_inputs" # CPS ASEC data URLs by year CPS_URLS = { @@ -96,6 +96,8 @@ "PMED_VAL": "other_medical_expenses", "PEMCPREM": "medicare_part_b_premiums", "WICYN": "_receives_wic", + "SPM_CAPHOUSESUB": "_spm_capped_housing_subsidy", + "SPM_ENGVAL": "spm_unit_energy_subsidy", # Identifiers "PH_SEQ": "household_id", "GESTFIPS": "state_fips", @@ -175,6 +177,7 @@ "social_security_retirement", "social_security_survivors", "social_security_dependents", + "spm_unit_energy_subsidy", ) PERSON_ZERO_DEFAULT_VALUE_COLUMNS = ( @@ -190,6 +193,7 @@ "social_security_retirement", "social_security_survivors", "social_security_dependents", + "spm_unit_energy_subsidy", ) PERSON_CACHE_REQUIRED_COLUMNS = ( @@ -1321,6 +1325,26 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: ).drop("_receives_wic") elif "_receives_wic" in result.columns: result = result.drop("_receives_wic") + if ( + "_spm_capped_housing_subsidy" in result.columns + and "receives_housing_assistance" not in result.columns + ): + result = result.with_columns( + (pl.col("_spm_capped_housing_subsidy") > 0).alias( + "receives_housing_assistance" + ) + ) + if ( + "receives_housing_assistance" in result.columns + and "takes_up_housing_assistance_if_eligible" not in result.columns + ): + result = result.with_columns( + pl.col("receives_housing_assistance").alias( + "takes_up_housing_assistance_if_eligible" + ) + ) + if "_spm_capped_housing_subsidy" in result.columns: + result = result.drop("_spm_capped_housing_subsidy") for value_column in PERSON_ZERO_DEFAULT_VALUE_COLUMNS: if value_column not in result.columns: result = result.with_columns(pl.lit(0.0).alias(value_column)) @@ -1333,6 +1357,13 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: ): if bool_column in result.columns: result = result.with_columns((pl.col(bool_column) == 1).alias(bool_column)) + if ( + "has_medicare" in result.columns + and "takes_up_medicare_if_eligible" not in result.columns + ): + result = result.with_columns( + pl.col("has_medicare").alias("takes_up_medicare_if_eligible") + ) for col in PERSON_NONNEGATIVE_VALUE_COLUMNS: if col in result.columns: result = result.with_columns( diff --git a/src/microplex_us/microdata_roles.py b/src/microplex_us/microdata_roles.py index 07c45e9..3f4519c 100644 --- a/src/microplex_us/microdata_roles.py +++ b/src/microplex_us/microdata_roles.py @@ -54,7 +54,9 @@ class PolicyEngineUSVariableRole(Enum): "takes_up_early_head_start_if_eligible", "takes_up_eitc", "takes_up_head_start_if_eligible", + "takes_up_housing_assistance_if_eligible", "takes_up_medicaid_if_eligible", + "takes_up_medicare_if_eligible", "takes_up_snap_if_eligible", "takes_up_ssi_if_eligible", "takes_up_tanf_if_eligible", diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 201f5cf..2a0e6e7 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -4135,6 +4135,7 @@ def build_policyengine_entity_tables( persons = self._assign_family_and_spm_units(persons) families = self._collapse_group_table(persons, "family_id") spm_units = self._collapse_group_table(persons, "spm_unit_id") + spm_units = self._attach_spm_unit_source_columns(persons, spm_units) if "tenure_type" in persons.columns: spm_tenure = ( persons.groupby("spm_unit_id", as_index=False)["tenure_type"] @@ -7982,6 +7983,31 @@ def _collapse_group_table( .astype({id_column: np.int64, "household_id": np.int64}) ) + def _attach_spm_unit_source_columns( + self, + persons: pd.DataFrame, + spm_units: pd.DataFrame, + ) -> pd.DataFrame: + """Attach observed SPM-unit inputs carried on CPS person rows.""" + if "spm_unit_id" not in persons.columns: + return spm_units + + aggregation_by_column = { + "receives_housing_assistance": "max", + "takes_up_housing_assistance_if_eligible": "max", + "spm_unit_energy_subsidy": "first", + } + aggregations = { + column: aggregation + for column, aggregation in aggregation_by_column.items() + if column in persons.columns and column not in spm_units.columns + } + if not aggregations: + return spm_units + + source_values = persons.groupby("spm_unit_id", as_index=False).agg(aggregations) + return spm_units.merge(source_values, on="spm_unit_id", how="left") + def _normalize_relationship_to_head(self, persons: pd.DataFrame) -> pd.Series: family_normalized: pd.Series | None = None if "family_relationship" in persons.columns: diff --git a/src/microplex_us/policyengine/us.py b/src/microplex_us/policyengine/us.py index 5b60808..a33aeb7 100644 --- a/src/microplex_us/policyengine/us.py +++ b/src/microplex_us/policyengine/us.py @@ -284,6 +284,7 @@ class PolicyEngineUSVariableMaterializationResult: "child_support_received", "charitable_cash_donations", "charitable_non_cash_donations", + "receives_housing_assistance", "receives_wic", "cps_race", "disability_benefits", @@ -538,11 +539,13 @@ class PolicyEngineUSVariableMaterializationResult: "snap_reported": "spm_unit", "spm_unit_broadband_subsidy_reported": "spm_unit", "spm_unit_capped_housing_subsidy_reported": "spm_unit", + "spm_unit_energy_subsidy": "spm_unit", "spm_unit_energy_subsidy_reported": "spm_unit", "spm_unit_federal_tax_reported": "spm_unit", "spm_unit_payroll_tax_reported": "spm_unit", "spm_unit_state_tax_reported": "spm_unit", "spm_unit_wic_reported": "spm_unit", + "takes_up_housing_assistance_if_eligible": "spm_unit", "tanf_reported": "person", "taxpayer_id_type": "person", } @@ -564,6 +567,7 @@ class PolicyEngineUSVariableMaterializationResult: # inputs for these fallback formulas. "fsla_overtime_premium", "meets_ssi_disability_criteria", + "spm_unit_energy_subsidy", # social_security_retirement is a storable INPUT in the pinned pe-us # (no formula), reconstructed from the CPS SS_VAL/RESNSS split. Some # pe-us versions add a fallback formula; listing it here keeps the diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 5f5b5cd..10d7942 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -847,6 +847,36 @@ def test_build_policyengine_entity_tables_preserves_household_contract_inputs( "RENTER", ] + def test_build_policyengine_entity_tables_preserves_spm_source_inputs( + self, + ): + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + population = pd.DataFrame( + { + "person_id": [1, 2, 3], + "household_id": [10, 10, 20], + "spm_unit_id": [100, 100, 200], + "weight": [1.0, 1.0, 2.0], + "age": [45, 12, 70], + "income": [60_000.0, 0.0, 25_000.0], + "relationship_to_head": [0, 2, 0], + "receives_housing_assistance": [False, True, False], + "takes_up_housing_assistance_if_eligible": [False, True, False], + "spm_unit_energy_subsidy": [90.0, 90.0, 0.0], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + spm_units = tables.spm_units.sort_values("household_id").reset_index(drop=True) + + assert len(spm_units) == 2 + assert spm_units["receives_housing_assistance"].tolist() == [True, False] + assert spm_units["takes_up_housing_assistance_if_eligible"].tolist() == [ + True, + False, + ] + assert spm_units["spm_unit_energy_subsidy"].tolist() == [90.0, 0.0] + def test_build_policyengine_entity_tables_recomputes_child_count_contract_inputs( self, ): diff --git a/tests/policyengine/test_us.py b/tests/policyengine/test_us.py index d9565db..60f2a0d 100644 --- a/tests/policyengine/test_us.py +++ b/tests/policyengine/test_us.py @@ -2115,7 +2115,14 @@ def __init__(self, entity): "unrecaptured_section_1250_gain", "unreported_payroll_tax", ) - spm_unit_contract_inputs = ("spm_unit_tenure_type",) + spm_unit_contract_inputs = ( + "receives_housing_assistance", + "spm_unit_tenure_type", + ) + legacy_spm_unit_contract_inputs = ( + "spm_unit_energy_subsidy", + "takes_up_housing_assistance_if_eligible", + ) legacy_person_contract_inputs = ("count_under_18", "count_under_6") class FakeSystem: @@ -2126,7 +2133,13 @@ class FakeSystem: for name in household_contract_inputs }, **{name: FakeVariable("tax_unit") for name in tax_unit_contract_inputs}, - **{name: FakeVariable("spm_unit") for name in spm_unit_contract_inputs}, + **{ + name: FakeVariable("spm_unit") + for name in ( + *spm_unit_contract_inputs, + *legacy_spm_unit_contract_inputs, + ) + }, "self_employed_health_insurance_ald": FakeVariable("tax_unit"), "self_employed_pension_contribution_ald": FakeVariable("tax_unit"), } @@ -2160,7 +2173,9 @@ class FakeSystem: { "spm_unit_id": [1000], "household_id": [10], - **{name: ["RENTER"] for name in spm_unit_contract_inputs}, + "receives_housing_assistance": [True], + "spm_unit_tenure_type": ["RENTER"], + **{name: [1.0] for name in legacy_spm_unit_contract_inputs}, } ), ) @@ -2180,9 +2195,13 @@ class FakeSystem: assert export_maps["tax_unit"] == { name: name for name in tax_unit_contract_inputs } - assert {name: name for name in spm_unit_contract_inputs}.items() <= export_maps[ - "spm_unit" - ].items() + assert { + name: name + for name in ( + *spm_unit_contract_inputs, + *legacy_spm_unit_contract_inputs, + ) + }.items() <= export_maps["spm_unit"].items() def test_build_policyengine_us_export_variable_maps_blocks_computed_direct_overrides( self, diff --git a/tests/test_cps_source_provider.py b/tests/test_cps_source_provider.py index aa8545a..e5c8b97 100644 --- a/tests/test_cps_source_provider.py +++ b/tests/test_cps_source_provider.py @@ -440,6 +440,8 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path): "MCARE": [1, 2], "MCAID": [2, 1], "WICYN": [1, 2], + "SPM_CAPHOUSESUB": [700, 0], + "SPM_ENGVAL": [90, -1], "PHIP_VAL": [900, -1], "POTC_VAL": [120, -1], "PMED_VAL": [450, -1], @@ -461,8 +463,12 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path): assert persons["social_security_survivors"].tolist() == [0, 0] assert persons["social_security_dependents"].tolist() == [0, 0] assert persons["has_medicare"].tolist() == [True, False] + assert persons["takes_up_medicare_if_eligible"].tolist() == [True, False] assert persons["has_medicaid"].tolist() == [False, True] assert persons["receives_wic"].tolist() == [True, False] + assert persons["receives_housing_assistance"].tolist() == [True, False] + assert persons["takes_up_housing_assistance_if_eligible"].tolist() == [True, False] + assert persons["spm_unit_energy_subsidy"].tolist() == [90, 0] assert persons["health_insurance_premiums_without_medicare_part_b"].tolist() == [900, 0] assert persons["over_the_counter_health_expenses"].tolist() == [120, 0] assert persons["other_medical_expenses"].tolist() == [450, 0]