From 56ca15b40df1ffe08279bd92d44092fde13479ad Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sun, 31 May 2026 23:00:41 -0400 Subject: [PATCH] Add CPS-derived income copies: survivor/educational/financial assistance (G7) Three person-level income leaves the Enhanced CPS exports as direct copies of raw ASEC fields, which Microplex produced none of (raw fields unmapped + leaves not allow-listed, so they never reached the H5): - survivor_benefits <- SRVS_VAL - educational_assistance <- ED_VAL - financial_assistance <- FIN_VAL Mirror eCPS policyengine_us_data/datasets/cps/cps.py:1493-1495 exactly (direct copies). Map the three raw fields in PERSON_VARIABLES so _process_persons renames them onto the pe-us input leaves, and add the leaves to SAFE_POLICYENGINE_US_EXPORT_VARIABLES. All three are confirmed storable INPUTS (no formula) in the pinned policyengine-us, so this exports real source-data values, never fakes. Closes 3 of the 47 missing required export-contract columns. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microplex_us/data_sources/cps.py | 4 + src/microplex_us/policyengine/us.py | 4 + .../test_cps_derived_income_copies.py | 114 ++++++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 tests/data_sources/test_cps_derived_income_copies.py diff --git a/src/microplex_us/data_sources/cps.py b/src/microplex_us/data_sources/cps.py index 63e13e3..54bde64 100644 --- a/src/microplex_us/data_sources/cps.py +++ b/src/microplex_us/data_sources/cps.py @@ -85,6 +85,10 @@ "SS_VAL": "social_security", "SSI_VAL": "ssi", "UC_VAL": "unemployment_compensation", + # CPS-derived direct income copies (mirror eCPS cps.py:1493-1495). + "SRVS_VAL": "survivor_benefits", + "ED_VAL": "educational_assistance", + "FIN_VAL": "financial_assistance", "PTOTVAL": "total_person_income", "OI_OFF": "_other_income_code", "OI_VAL": "_other_income_value", diff --git a/src/microplex_us/policyengine/us.py b/src/microplex_us/policyengine/us.py index 70fd055..3183f4e 100644 --- a/src/microplex_us/policyengine/us.py +++ b/src/microplex_us/policyengine/us.py @@ -325,6 +325,10 @@ class PolicyEngineUSVariableMaterializationResult: "roth_401k_contributions_desired", "traditional_ira_contributions_desired", "roth_ira_contributions_desired", + # CPS-derived direct income copies (eCPS cps.py:1493-1495). + "survivor_benefits", + "educational_assistance", + "financial_assistance", "stock_assets", "taxable_ira_distributions", "tip_income", diff --git a/tests/data_sources/test_cps_derived_income_copies.py b/tests/data_sources/test_cps_derived_income_copies.py new file mode 100644 index 0000000..2a66e02 --- /dev/null +++ b/tests/data_sources/test_cps_derived_income_copies.py @@ -0,0 +1,114 @@ +"""Tests for the CPS-derived direct income copies (G7 export-parity gap). + +The Enhanced CPS exports three person-level income leaves as direct copies of +raw ASEC fields (``policyengine_us_data/datasets/cps/cps.py:1493-1495``): + +- ``survivor_benefits`` <- ``SRVS_VAL`` +- ``educational_assistance`` <- ``ED_VAL`` +- ``financial_assistance`` <- ``FIN_VAL`` + +Microplex produced none of them: the raw fields were not mapped in +``PERSON_VARIABLES`` and the leaves were absent from the export allowlist, so +they never reached the H5. These tests exercise the real ``_process_persons`` +(no stubbing) to prove the rename happens, plus assert allowlist membership and +that no alias remaps the leaves. +""" + +import polars as pl + +from microplex_us.data_sources.cps import PERSON_VARIABLES, _process_persons + +_COPIES = { + "SRVS_VAL": "survivor_benefits", + "ED_VAL": "educational_assistance", + "FIN_VAL": "financial_assistance", +} + + +def _raw_person_frame(rows: list[dict]) -> pl.DataFrame: + """Raw CPS-style person frame carrying the income-copy fields. + + Census column names are used because ``_process_persons`` selects/renames + via ``PERSON_VARIABLES``. + """ + n = len(rows) + return pl.DataFrame( + { + "PH_SEQ": [1] * n, + "A_LINENO": list(range(1, n + 1)), + "A_FNLWGT": [100.0] * n, + "A_AGE": [row.get("age", 40) for row in rows], + "SRVS_VAL": [row.get("srvs", 0.0) for row in rows], + "ED_VAL": [row.get("ed", 0.0) for row in rows], + "FIN_VAL": [row.get("fin", 0.0) for row in rows], + } + ) + + +def test_person_variables_maps_the_three_raw_fields(): + for census, leaf in _COPIES.items(): + assert PERSON_VARIABLES.get(census) == leaf + + +def test_process_persons_copies_raw_fields_to_leaves(): + """The raw ASEC values are copied verbatim onto the pe-us input leaves.""" + rows = [ + {"srvs": 12_000.0, "ed": 0.0, "fin": 0.0}, + {"srvs": 0.0, "ed": 5_000.0, "fin": 0.0}, + {"srvs": 0.0, "ed": 0.0, "fin": 3_200.0}, + {"srvs": 800.0, "ed": 1_100.0, "fin": 450.0}, + {"srvs": 0.0, "ed": 0.0, "fin": 0.0}, # non-recipient + ] + result = _process_persons(_raw_person_frame(rows), 2023) + + for census, leaf in _COPIES.items(): + assert leaf in result.columns, f"{leaf} not produced" + got = result[leaf].to_list() + expected = [row.get(_FIELD_FOR[census], 0.0) for row in rows] + assert got == expected, f"{leaf}: {got} != {expected}" + + +_FIELD_FOR = {"SRVS_VAL": "srvs", "ED_VAL": "ed", "FIN_VAL": "fin"} + + +def test_copies_are_non_degenerate(): + """Each leaf carries distinct nonzero values, not a constant/zero fill.""" + rows = [ + {"srvs": 9_000.0, "ed": 2_000.0, "fin": 1_500.0}, + {"srvs": 21_000.0, "ed": 6_500.0, "fin": 4_000.0}, + {"srvs": 0.0, "ed": 0.0, "fin": 0.0}, + ] + result = _process_persons(_raw_person_frame(rows), 2023) + for leaf in _COPIES.values(): + values = [v for v in result[leaf].to_list() if v > 0] + assert len(values) >= 2, f"{leaf} should be positive for several records" + assert len(set(values)) >= 2, f"{leaf} should not be a single constant" + + +def test_copies_in_export_allowlist_and_not_aliased(): + from microplex_us.policyengine.us import ( + POLICYENGINE_US_EXPORT_COLUMN_ALIASES, + SAFE_POLICYENGINE_US_EXPORT_VARIABLES, + ) + + for leaf in _COPIES.values(): + assert leaf in SAFE_POLICYENGINE_US_EXPORT_VARIABLES + assert POLICYENGINE_US_EXPORT_COLUMN_ALIASES.get(leaf) is None + + +if __name__ == "__main__": + import traceback + + funcs = [v for k, v in sorted(globals().items()) if k.startswith("test_")] + passed = failed = 0 + for fn in funcs: + try: + fn() + print(f"PASS {fn.__name__}") + passed += 1 + except Exception: # noqa: BLE001 + print(f"FAIL {fn.__name__}") + traceback.print_exc() + failed += 1 + print(f"SUMMARY passed={passed} failed={failed}") + raise SystemExit(1 if failed else 0)