From 98683bf8271f7d8cf267ce0e246813b44bef2d3f Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Tue, 2 Jun 2026 14:48:48 -0400 Subject: [PATCH] Use ACS 2024 donor without changing CPS scaffold --- .../data_sources/donor_surveys.py | 205 ++++++++++++++++-- .../pipelines/pe_us_data_rebuild.py | 2 +- .../pe_us_data_rebuild_checkpoint.py | 4 +- src/microplex_us/pipelines/us.py | 9 + tests/pipelines/test_pe_us_data_rebuild.py | 1 + tests/pipelines/test_us.py | 129 +++++++++++ tests/test_donor_survey_source_providers.py | 70 +++++- 7 files changed, 390 insertions(+), 30 deletions(-) diff --git a/src/microplex_us/data_sources/donor_surveys.py b/src/microplex_us/data_sources/donor_surveys.py index 573692b..7e2759a 100644 --- a/src/microplex_us/data_sources/donor_surveys.py +++ b/src/microplex_us/data_sources/donor_surveys.py @@ -11,6 +11,7 @@ from pathlib import Path from textwrap import dedent +import h5py import numpy as np import pandas as pd from microplex.core import ( @@ -104,7 +105,9 @@ def _descriptor_from_tables( entity=EntityType.HOUSEHOLD, key_column="household_id", variable_names=household_variables, - weight_column="household_weight" if "household_weight" in households.columns else None, + weight_column="household_weight" + if "household_weight" in households.columns + else None, period_column="year" if "year" in households.columns else None, ), EntityObservation( @@ -169,9 +172,7 @@ def _ensure_person_ids(persons: pd.DataFrame) -> pd.DataFrame: if "household_id" in result.columns: composite = ( - result["household_id"].astype(str) - + ":" - + result["person_id"].astype(str) + result["household_id"].astype(str) + ":" + result["person_id"].astype(str) ) if not composite.duplicated().any(): result["person_id"] = composite @@ -212,7 +213,9 @@ def _sample_households_and_persons( sampled_persons = persons[persons["household_id"].isin(keep)].copy() return ( sampled_households.sort_values(["household_id"]).reset_index(drop=True), - sampled_persons.sort_values(["household_id", "person_id"]).reset_index(drop=True), + sampled_persons.sort_values(["household_id", "person_id"]).reset_index( + drop=True + ), ) @@ -520,6 +523,143 @@ def _build_persons(): ) +def _decode_h5_values(values: np.ndarray) -> np.ndarray: + """Decode fixed-width HDF5 byte strings to ordinary Python strings.""" + if values.dtype.kind not in {"S", "O"}: + return values + return np.asarray( + [ + value.decode() if isinstance(value, (bytes, bytearray)) else value + for value in values + ] + ) + + +def _load_policyengine_us_data_h5_dataset( + *, + filename: str, + policyengine_us_data_repo: str | Path | None, + cache_dir: Path | None, +) -> dict[str, np.ndarray]: + if policyengine_us_data_repo is None: + h5_path = _download_policyengine_us_data_file( + filename=filename, + cache_dir=cache_dir, + ) + else: + repo_root = resolve_policyengine_us_data_repo_root(policyengine_us_data_repo) + h5_path = repo_root / "policyengine_us_data" / "storage" / filename + if not h5_path.exists(): + raise FileNotFoundError(f"Missing PolicyEngine US-data H5 file: {h5_path}") + with h5py.File(h5_path, "r") as h5: + return {key: _decode_h5_values(np.asarray(h5[key])) for key in h5.keys()} + + +def _build_policyengine_dataset_tables_from_arrays( + *, + data: dict[str, np.ndarray], + dataset_loader: PEPolicyengineDatasetLoaderSpec, + year: int, +) -> DonorSurveyTables: + spec = asdict(dataset_loader) + + def _numeric(values): + return pd.to_numeric(pd.Series(np.asarray(values)), errors="coerce").fillna(0.0) + + def _boolean_float(values): + return pd.Series(np.asarray(values)).astype(bool).astype(float) + + def _text(values): + return pd.Series(_decode_h5_values(np.asarray(values))).astype(str) + + def _mapped_text(values, mapping): + return _text(values).map(mapping).fillna(0).astype(int) + + def _load_fallback(keys): + for key in keys: + if key in data: + return pd.Series(np.asarray(data[key])) + raise KeyError(f"Missing fallback keys {keys} in dataset payload") + + if spec["builder_kind"] == "household_rows": + household_index = pd.Index(data[spec["household_index_key"]]) + person_households = pd.Index(data[spec["person_household_key"]]) + household_to_row = pd.Series( + np.arange(len(household_index), dtype=np.int64), + index=household_index, + ) + household_rows = household_to_row.loc[person_households].to_numpy() + persons = pd.DataFrame({"household_id": person_households.to_numpy()}) + if spec["person_id_key"] is not None: + persons["person_id"] = np.asarray(data[spec["person_id_key"]]) + for target, source in spec["direct_person_columns"].items(): + persons[target] = _numeric(data[source]) + for target, source in spec["boolean_person_columns"].items(): + persons[target] = _boolean_float(data[source]) + for target, source in spec["row_indexed_person_columns"].items(): + persons[target] = _numeric(np.asarray(data[source])[household_rows]) + for target, source in spec["mapped_row_person_columns"].items(): + persons[target] = _mapped_text( + np.asarray(data[source])[household_rows], + spec["mapped_value_tables"][target], + ) + elif spec["builder_kind"] == "single_person_households": + base_length = len(data[spec["length_source_key"]]) + if spec["generated_household_ids"]: + household_ids = np.arange(base_length, dtype=np.int64) + 1 + else: + household_ids = np.asarray(data[spec["household_index_key"]]) + persons = pd.DataFrame({"household_id": household_ids}) + if spec["person_id_from_household_id"]: + persons["person_id"] = persons["household_id"] + elif spec["person_id_key"] is not None: + persons["person_id"] = np.asarray(data[spec["person_id_key"]]) + for target, source in spec["direct_person_columns"].items(): + persons[target] = _numeric(data[source]) + for target, source in spec["boolean_person_columns"].items(): + persons[target] = _boolean_float(data[source]) + else: + raise ValueError( + f"Unsupported dataset loader builder kind: {spec['builder_kind']}" + ) + + for target, keys in spec["fallback_person_columns"].items(): + persons[target] = _numeric(_load_fallback(keys)) + if spec["sex_from_boolean_source"] is not None: + source = spec["sex_from_boolean_source"] + source_values = pd.Series(persons[source]).astype(bool).to_numpy() + persons["sex"] = np.where( + source_values, + spec["sex_true_value"], + spec["sex_false_value"], + ) + for target, source in spec["copy_person_columns"].items(): + persons[target] = persons[source] + for target, value in spec["constant_person_columns"].items(): + persons[target] = value + if spec["income_sum_columns"]: + persons["income"] = sum( + _numeric(persons[column]) for column in spec["income_sum_columns"] + ) + for column in spec["int_person_columns"]: + if column in persons.columns: + persons[column] = ( + pd.to_numeric(persons[column], errors="coerce").fillna(0).astype(int) + ) + persons["year"] = int(year) + households = ( + persons[["household_id", "state_fips", "tenure", "weight", "year"]] + .rename(columns={"weight": "household_weight"}) + .drop_duplicates(subset=["household_id"]) + ) + return DonorSurveyTables( + households=households.sort_values(["household_id"]).reset_index(drop=True), + persons=persons.sort_values(["household_id", "person_id"]).reset_index( + drop=True + ), + ) + + def _run_policyengine_dataset_loader( *, script: str, @@ -568,7 +708,9 @@ def _run_policyengine_dataset_loader_from_spec( ) -> DonorSurveyTables: dataset_loader = spec.dataset_loader if dataset_loader is None: - raise ValueError(f"PE source-impute block '{spec.key}' is missing a dataset loader spec") + raise ValueError( + f"PE source-impute block '{spec.key}' is missing a dataset loader spec" + ) return _run_policyengine_dataset_loader( script=_build_policyengine_dataset_loader_script(dataset_loader, year=year), sample_n=sample_n, @@ -589,27 +731,46 @@ def _default_acs_tables_loader( policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, ) -> DonorSurveyTables: - _ = cache_dir spec = get_pe_source_impute_block_spec("acs") - if int(year) != spec.default_year: + if int(year) != spec.default_year and policyengine_us_data_repo is None: raise ValueError( - f"{spec.descriptor_name} provider currently supports year={spec.default_year} only" + f"{spec.descriptor_name} provider supports non-default years only " + "when policyengine_us_data_repo is provided" + ) + if int(year) == spec.default_year: + tables = _run_policyengine_dataset_loader_from_spec( + spec=spec, + year=year, + sample_n=None if state_floor else sample_n, + random_seed=random_seed, + policyengine_us_data_repo=policyengine_us_data_repo, + policyengine_us_data_python=policyengine_us_data_python, + ) + else: + dataset_loader = spec.dataset_loader + if dataset_loader is None: + raise ValueError( + f"PE source-impute block '{spec.key}' is missing a dataset loader spec" + ) + data = _load_policyengine_us_data_h5_dataset( + filename=f"acs_{int(year)}.h5", + policyengine_us_data_repo=policyengine_us_data_repo, + cache_dir=cache_dir, + ) + tables = _build_policyengine_dataset_tables_from_arrays( + data=data, + dataset_loader=dataset_loader, + year=year, ) - tables = _run_policyengine_dataset_loader_from_spec( - spec=spec, - year=year, - sample_n=None if state_floor else sample_n, - random_seed=random_seed, - policyengine_us_data_repo=policyengine_us_data_repo, - policyengine_us_data_python=policyengine_us_data_python, - ) households = ( tables.households.drop_duplicates(subset=["household_id"]) .sort_values(["household_id"]) .reset_index(drop=True) ) persons = ( - tables.persons[tables.persons["household_id"].isin(set(households["household_id"]))] + tables.persons[ + tables.persons["household_id"].isin(set(households["household_id"])) + ] .sort_values(["household_id", "person_id"]) .reset_index(drop=True) ) @@ -705,7 +866,9 @@ def _load_sipp_tables_from_spec( ) -> DonorSurveyTables: raw_loader = spec.raw_loader if raw_loader is None: - raise ValueError(f"PE source-impute block '{spec.key}' is missing a raw loader spec") + raise ValueError( + f"PE source-impute block '{spec.key}' is missing a raw loader spec" + ) if int(year) != spec.default_year: raise ValueError( f"{spec.descriptor_name} provider currently supports year={spec.default_year} only" @@ -729,7 +892,9 @@ def _load_sipp_tables_from_spec( df[variable] = values.astype(float) for variable, contains in raw_loader.sum_columns_contains.items(): matched_columns = [column for column in df.columns if contains in column] - df[variable] = df[matched_columns].fillna(0).sum(axis=1) if matched_columns else 0.0 + df[variable] = ( + df[matched_columns].fillna(0).sum(axis=1) if matched_columns else 0.0 + ) for variable, indicator in raw_loader.indicator_columns.items(): raw_values = pd.to_numeric(df[indicator.column], errors="coerce").fillna(0.0) df[variable] = raw_values.eq(indicator.equals).astype(float) diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild.py b/src/microplex_us/pipelines/pe_us_data_rebuild.py index 468ce8d..6be44f4 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild.py @@ -107,7 +107,7 @@ def default_policyengine_us_data_rebuild_source_providers( include_donor_surveys: bool = True, include_sipp: bool | None = None, include_scf: bool | None = None, - acs_year: int = 2022, + acs_year: int = 2024, sipp_year: int = 2023, scf_year: int = 2022, donor_cache_dir: str | Path | None = None, diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py index 56295ce..a37f993 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py @@ -1981,7 +1981,7 @@ def run_policyengine_us_data_rebuild_checkpoint( include_donor_surveys: bool = True, include_sipp: bool | None = None, include_scf: bool | None = None, - acs_year: int = 2022, + acs_year: int = 2024, sipp_year: int = 2023, scf_year: int = 2022, donor_cache_dir: str | Path | None = None, @@ -2264,7 +2264,7 @@ def main(argv: list[str] | None = None) -> None: parser.add_argument("--cps-source-year", type=int, default=2023) parser.add_argument("--puf-target-year", type=int) parser.add_argument("--puf-cps-reference-year", type=int) - parser.add_argument("--acs-year", type=int, default=2022) + parser.add_argument("--acs-year", type=int, default=2024) parser.add_argument("--sipp-year", type=int, default=2023) parser.add_argument("--scf-year", type=int, default=2022) parser.add_argument("--cps-cache-dir") diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 5db2b43..2195cfa 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -5453,6 +5453,15 @@ def score(source: USMicroplexSourceInput) -> tuple[int, int, int, int]: household_rows, ) + if self.config.puf_support_clone_enabled: + cps_candidates = [ + source + for source in candidates + if self._is_cps_asec_scaffold_source(source.frame.source.name) + ] + if cps_candidates: + return max(cps_candidates, key=score) + return max(candidates, key=score) def _household_geography_coverage( diff --git a/tests/pipelines/test_pe_us_data_rebuild.py b/tests/pipelines/test_pe_us_data_rebuild.py index 335be0c..ca5ac9c 100644 --- a/tests/pipelines/test_pe_us_data_rebuild.py +++ b/tests/pipelines/test_pe_us_data_rebuild.py @@ -138,6 +138,7 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund SOCIAL_SECURITY_SPLIT_STRATEGY_PE_QRF ) assert isinstance(providers[2], ACSSourceProvider) + assert providers[2].year == 2024 assert isinstance(providers[3], SIPPSourceProvider) assert providers[3].block == "tips" assert isinstance(providers[4], SIPPSourceProvider) diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 8d275e4..4d14d8c 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -7288,6 +7288,135 @@ def test_build_from_frames_prefers_scaffold_with_valid_geography(self): assert result.source_frame.source.name == "cps_like" assert result.seed_data["state_fips"].tolist() == [6, 36] + def test_select_scaffold_prefers_cps_when_puf_support_clone_enabled(self): + cps_households = pd.DataFrame( + { + "household_id": [1, 2], + "hh_weight": [100.0, 120.0], + "state_fips": [6, 36], + "tenure": [1, 2], + } + ) + cps_persons = pd.DataFrame( + { + "person_id": [10, 20], + "household_id": [1, 2], + "age": [45, 19], + "sex": [1, 2], + "education": [3, 2], + "employment_status": [1, 0], + "income": [60_000.0, 12_000.0], + } + ) + acs_households = pd.DataFrame( + { + "household_id": [101, 102, 103], + "hh_weight": [90.0, 110.0, 130.0], + "state_fips": [6, 36, 48], + "tenure": [1, 2, 1], + "rent": [1_000.0, 1_500.0, 900.0], + "real_estate_taxes": [0.0, 2_000.0, 3_000.0], + } + ) + acs_persons = pd.DataFrame( + { + "person_id": [1001, 1002, 1003], + "household_id": [101, 102, 103], + "age": [44, 21, 62], + "sex": [1, 2, 1], + "education": [3, 2, 4], + "employment_status": [1, 0, 1], + "income": [58_000.0, 13_000.0, 74_000.0], + "extra_person_var": [9.0, 8.0, 7.0], + } + ) + + def frame( + name: str, + households: pd.DataFrame, + persons: pd.DataFrame, + household_variables: tuple[str, ...], + person_variables: tuple[str, ...], + ) -> ObservationFrame: + return ObservationFrame( + source=SourceDescriptor( + name=name, + shareability=Shareability.PUBLIC, + time_structure=TimeStructure.REPEATED_CROSS_SECTION, + observations=( + EntityObservation( + entity=EntityType.HOUSEHOLD, + key_column="household_id", + variable_names=household_variables, + weight_column="hh_weight", + ), + EntityObservation( + entity=EntityType.PERSON, + key_column="person_id", + variable_names=person_variables, + ), + ), + ), + tables={ + EntityType.HOUSEHOLD: households, + EntityType.PERSON: persons, + }, + relationships=( + EntityRelationship( + parent_entity=EntityType.HOUSEHOLD, + child_entity=EntityType.PERSON, + parent_key="household_id", + child_key="household_id", + cardinality=RelationshipCardinality.ONE_TO_MANY, + ), + ), + ) + + cps_frame = frame( + "cps_asec_2025", + cps_households, + cps_persons, + ("state_fips", "tenure"), + ( + "household_id", + "age", + "sex", + "education", + "employment_status", + "income", + ), + ) + acs_frame = frame( + "acs_2022", + acs_households, + acs_persons, + ("state_fips", "tenure", "rent", "real_estate_taxes"), + ( + "household_id", + "age", + "sex", + "education", + "employment_status", + "income", + "extra_person_var", + ), + ) + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + puf_support_clone_enabled=True, + synthesis_backend="seed", + calibration_backend="entropy", + ) + ) + source_inputs = [ + pipeline.prepare_source_input(cps_frame), + pipeline.prepare_source_input(acs_frame), + ] + + selected = pipeline._select_scaffold_source(source_inputs) + + assert selected.frame.source.name == "cps_asec_2025" + def test_build_from_frames_prefers_scaffold_with_state_program_proxies(self): proxy_households = pd.DataFrame( { diff --git a/tests/test_donor_survey_source_providers.py b/tests/test_donor_survey_source_providers.py index 343cabd..06d72fe 100644 --- a/tests/test_donor_survey_source_providers.py +++ b/tests/test_donor_survey_source_providers.py @@ -2,6 +2,7 @@ from __future__ import annotations +import h5py import pandas as pd from microplex.core import EntityType @@ -163,7 +164,9 @@ def _flaky_sample(self, *args, **kwargs): ) assert len(sampled_households) == 2 - assert set(sampled_persons["household_id"]) == set(sampled_households["household_id"]) + assert set(sampled_persons["household_id"]) == set( + sampled_households["household_id"] + ) def test_sample_households_and_persons_state_floor_preserves_states() -> None: @@ -191,10 +194,14 @@ def test_sample_households_and_persons_state_floor_preserves_states() -> None: assert len(sampled_households) == 3 assert sampled_households["state_fips"].nunique() == 3 - assert set(sampled_persons["household_id"]) == set(sampled_households["household_id"]) + assert set(sampled_persons["household_id"]) == set( + sampled_households["household_id"] + ) -def test_sample_households_and_persons_state_age_floor_preserves_age_band_coverage() -> None: +def test_sample_households_and_persons_state_age_floor_preserves_age_band_coverage() -> ( + None +): households = pd.DataFrame( { "household_id": ["h1", "h2", "h3", "h4"], @@ -284,7 +291,9 @@ def test_acs_source_provider_uses_manifest_backed_dataset_loader( ) -> None: captured: dict[str, object] = {} - def _fake_loader(*, spec, year, sample_n, random_seed, **_kwargs) -> DonorSurveyTables: + def _fake_loader( + *, spec, year, sample_n, random_seed, **_kwargs + ) -> DonorSurveyTables: captured["spec"] = spec captured["year"] = year captured["sample_n"] = sample_n @@ -307,6 +316,47 @@ def _fake_loader(*, spec, year, sample_n, random_seed, **_kwargs) -> DonorSurvey assert captured["random_seed"] == 0 +def test_acs_source_provider_can_load_newer_storage_h5( + tmp_path, +) -> None: + storage_dir = tmp_path / "policyengine_us_data" / "storage" + storage_dir.mkdir(parents=True) + h5_path = storage_dir / "acs_2024.h5" + with h5py.File(h5_path, "w") as h5: + h5.create_dataset("household_id", data=[1, 2]) + h5.create_dataset("person_household_id", data=[1, 1, 2]) + h5.create_dataset("person_id", data=[11, 12, 21]) + h5.create_dataset("age", data=[45, 12, 68]) + h5.create_dataset("is_male", data=[True, False, False]) + h5.create_dataset("is_household_head", data=[True, False, True]) + h5.create_dataset("state_fips", data=[6, 36]) + h5.create_dataset( + "tenure_type", + data=[b"OWNED_WITH_MORTGAGE", b"RENTED"], + ) + h5.create_dataset("employment_income", data=[50_000.0, 0.0, 12_000.0]) + h5.create_dataset("self_employment_income", data=[5_000.0, 0.0, 0.0]) + h5.create_dataset("social_security", data=[0.0, 0.0, 20_000.0]) + h5.create_dataset( + "taxable_private_pension_income", + data=[0.0, 0.0, 15_000.0], + ) + h5.create_dataset("rent", data=[1_200.0, 0.0, 950.0]) + h5.create_dataset("real_estate_taxes", data=[3_000.0, 0.0, 0.0]) + h5.create_dataset("household_weight", data=[100.0, 120.0]) + + frame = ACSSourceProvider( + year=2024, policyengine_us_data_repo=tmp_path + ).load_frame() + + assert frame.source.name == "acs_2024" + households = frame.tables[EntityType.HOUSEHOLD] + persons = frame.tables[EntityType.PERSON] + assert households["household_weight"].tolist() == [100.0, 120.0] + assert persons["rent"].tolist() == [1_200.0, 0.0, 950.0] + assert persons["tenure"].tolist() == [1, 1, 2] + + def test_acs_source_provider_forwards_state_age_floor_query_filter() -> None: captured: dict[str, object] = {} @@ -333,7 +383,9 @@ def _loader(**kwargs) -> DonorSurveyTables: def test_acs_source_provider_deduplicates_households_from_dataset_loader( monkeypatch, ) -> None: - def _fake_loader(*, spec, year, sample_n, random_seed, **_kwargs) -> DonorSurveyTables: + def _fake_loader( + *, spec, year, sample_n, random_seed, **_kwargs + ) -> DonorSurveyTables: households = pd.DataFrame( { "household_id": [1, 1, 2], @@ -380,7 +432,9 @@ def _fake_loader(*, spec, year, sample_n, random_seed, **_kwargs) -> DonorSurvey def test_acs_source_provider_makes_duplicate_person_ids_household_scoped( monkeypatch, ) -> None: - def _fake_loader(*, spec, year, sample_n, random_seed, **_kwargs) -> DonorSurveyTables: + def _fake_loader( + *, spec, year, sample_n, random_seed, **_kwargs + ) -> DonorSurveyTables: households = pd.DataFrame( { "household_id": [1, 2], @@ -448,7 +502,9 @@ def test_scf_source_provider_uses_manifest_backed_dataset_loader( ) -> None: captured: dict[str, object] = {} - def _fake_loader(*, spec, year, sample_n, random_seed, **_kwargs) -> DonorSurveyTables: + def _fake_loader( + *, spec, year, sample_n, random_seed, **_kwargs + ) -> DonorSurveyTables: captured["spec"] = spec captured["year"] = year captured["sample_n"] = sample_n