diff --git a/src/microplex_us/data_sources/donor_surveys.py b/src/microplex_us/data_sources/donor_surveys.py index 7e2759a..486051b 100644 --- a/src/microplex_us/data_sources/donor_surveys.py +++ b/src/microplex_us/data_sources/donor_surveys.py @@ -65,6 +65,55 @@ "year", ) +DONOR_UPRATING_EXCLUDED_COLUMNS = { + "person_id", + "household_id", + "weight", + "year", + "age", + "sex", + "is_female", + "is_male", + "is_married", + "is_household_head", + "cps_race", + "state_fips", + "tenure", + "tenure_type", + "own_children_in_household", + "count_under_18", + "count_under_6", + "household_size", +} + +DONOR_UPRATING_FACTOR_ALIASES = { + "employment_income": "employment_income_before_lsr", + "income": "employment_income_before_lsr", + "interest_dividend_income": "taxable_interest_income", + "social_security_pension_income": "social_security_retirement", + "scf_certificates_of_deposit": "bank_account_assets", + "scf_savings_bonds": "bond_assets", + "scf_retirement_assets": "net_worth", + "scf_cash_value_life_insurance": "net_worth", + "scf_other_managed_assets": "stock_assets", + "scf_other_financial_assets": "net_worth", + "scf_primary_residence_value": "home_equity", + "scf_other_residential_real_estate": "household_other_real_estate_value", + "scf_nonresidential_real_estate_equity": "household_other_real_estate_equity", + "scf_business_equity": "household_business_assets_equity", + "scf_other_nonfinancial_assets": "net_worth", + "scf_mortgage_debt": "first_home_mortgage_balance", + "scf_other_residential_debt": "household_other_real_estate_debt", + "scf_other_lines_of_credit": "household_vehicles_debt", + "scf_credit_card_debt": "household_vehicles_debt", + "scf_vehicle_installment_debt": "household_vehicles_debt", + "scf_student_loan_debt": "household_vehicles_debt", + "scf_other_installment_debt": "household_vehicles_debt", + "scf_other_debt": "household_vehicles_debt", +} + +TARGET_YEAR_UPRATED_SURVEYS = {"sipp", "scf"} + @dataclass(frozen=True) class DonorSurveyTables: @@ -843,6 +892,80 @@ def _download_policyengine_us_data_file( return Path(downloaded) +def _load_policyengine_uprating_factors( + *, + policyengine_us_data_repo: str | Path | None, + cache_dir: Path | None, +) -> pd.DataFrame: + if policyengine_us_data_repo is None: + factors_path = _download_policyengine_us_data_file( + filename="uprating_factors.csv", + cache_dir=cache_dir, + ) + else: + repo_root = resolve_policyengine_us_data_repo_root(policyengine_us_data_repo) + factors_path = ( + repo_root / "policyengine_us_data" / "storage" / "uprating_factors.csv" + ) + if not factors_path.exists(): + raise FileNotFoundError( + f"Missing PolicyEngine US-data uprating factors: {factors_path}" + ) + return pd.read_csv(factors_path).set_index("Variable") + + +def _uprate_donor_tables_to_target_year( + tables: DonorSurveyTables, + *, + spec: PESourceImputeBlockSpec, + source_year: int, + target_year: int | None, + policyengine_us_data_repo: str | Path | None, + cache_dir: str | Path | None, +) -> DonorSurveyTables: + if ( + target_year is None + or int(target_year) == int(source_year) + or spec.survey_name not in TARGET_YEAR_UPRATED_SURVEYS + ): + return tables + + resolved_cache_dir = None if cache_dir is None else Path(cache_dir) + factors = _load_policyengine_uprating_factors( + policyengine_us_data_repo=policyengine_us_data_repo, + cache_dir=resolved_cache_dir, + ) + start_column = str(int(source_year)) + end_column = str(int(target_year)) + if start_column not in factors.columns or end_column not in factors.columns: + raise ValueError( + "PolicyEngine US-data uprating factors do not cover " + f"{source_year}->{target_year}" + ) + + persons = tables.persons.copy() + for column in persons.columns: + if column in DONOR_UPRATING_EXCLUDED_COLUMNS: + continue + factor_name = DONOR_UPRATING_FACTOR_ALIASES.get(column, column) + if factor_name not in factors.index: + continue + start = float(factors.loc[factor_name, start_column]) + end = float(factors.loc[factor_name, end_column]) + if start == 0: + raise ValueError(f"Zero uprating base for {factor_name} in {source_year}") + persons[column] = pd.to_numeric(persons[column], errors="coerce").fillna( + 0.0 + ) * (end / start) + + if "year" in persons.columns: + persons["year"] = int(target_year) + households = tables.households.copy() + if "year" in households.columns: + households["year"] = int(target_year) + return DonorSurveyTables(households=households, persons=persons) + + def _build_joined_raw_identifier( frame: pd.DataFrame, *, @@ -914,6 +1037,9 @@ def _load_sipp_tables_from_spec( df[variable] = df[source_variable] df = apply_pe_source_impute_loader_postprocess(df, spec) + for variable, source_variable in raw_loader.copy_columns.items(): + if source_variable in df.columns: + df[variable] = df[source_variable] households = ( df[["household_id", "weight", "state_fips", "tenure", "year"]] .rename(columns={"weight": "household_weight"}) @@ -1018,9 +1144,11 @@ def __init__( loader: DonorSurveyTablesLoader | None = None, policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, + target_year: int | None = None, ) -> None: self.spec = spec self.year = int(spec.default_year if year is None else year) + self.target_year = None if target_year is None else int(target_year) self.cache_dir = None if cache_dir is None else Path(cache_dir) self.shareability = shareability self.loader = loader @@ -1042,22 +1170,34 @@ def load_frame(self, query: SourceQuery | None = None) -> ObservationFrame: provider_filters = query.provider_filters loader = self.loader or _default_loader_for_spec(self.spec) year = int(provider_filters.get("year", self.year)) + target_year = provider_filters.get("target_year", self.target_year) + resolved_target_year = None if target_year is None else int(target_year) + cache_dir = provider_filters.get("cache_dir", self.cache_dir) + policyengine_us_data_repo = provider_filters.get( + "policyengine_us_data_repo", + self.policyengine_us_data_repo, + ) tables = loader( year=year, sample_n=provider_filters.get("sample_n"), random_seed=int(provider_filters.get("random_seed", 0)), state_floor=provider_filters.get("state_floor"), state_age_floor=provider_filters.get("state_age_floor"), - cache_dir=provider_filters.get("cache_dir", self.cache_dir), - policyengine_us_data_repo=provider_filters.get( - "policyengine_us_data_repo", - self.policyengine_us_data_repo, - ), + cache_dir=cache_dir, + policyengine_us_data_repo=policyengine_us_data_repo, policyengine_us_data_python=provider_filters.get( "policyengine_us_data_python", self.policyengine_us_data_python, ), ) + tables = _uprate_donor_tables_to_target_year( + tables, + spec=self.spec, + source_year=year, + target_year=resolved_target_year, + policyengine_us_data_repo=policyengine_us_data_repo, + cache_dir=cache_dir, + ) frame = _build_observation_frame( households=tables.households, persons=tables.persons, @@ -1080,6 +1220,7 @@ def __init__( loader: DonorSurveyTablesLoader | None = None, policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, + target_year: int | None = None, ) -> None: super().__init__( spec=get_pe_source_impute_block_spec("acs"), @@ -1088,6 +1229,7 @@ def __init__( loader=loader, policyengine_us_data_repo=policyengine_us_data_repo, policyengine_us_data_python=policyengine_us_data_python, + target_year=target_year, ) @@ -1102,6 +1244,9 @@ def __init__( cache_dir: str | Path | None = None, shareability: Shareability = Shareability.PUBLIC, loader: DonorSurveyTablesLoader | None = None, + policyengine_us_data_repo: str | Path | None = None, + policyengine_us_data_python: str | Path | None = None, + target_year: int | None = None, ) -> None: self.block = block super().__init__( @@ -1110,6 +1255,9 @@ def __init__( cache_dir=cache_dir, shareability=shareability, loader=loader, + policyengine_us_data_repo=policyengine_us_data_repo, + policyengine_us_data_python=policyengine_us_data_python, + target_year=target_year, ) @@ -1123,6 +1271,9 @@ def __init__( cache_dir: str | Path | None = None, shareability: Shareability = Shareability.PUBLIC, loader: DonorSurveyTablesLoader | None = None, + policyengine_us_data_repo: str | Path | None = None, + policyengine_us_data_python: str | Path | None = None, + target_year: int | None = None, ) -> None: super().__init__( block="tips", @@ -1130,6 +1281,9 @@ def __init__( cache_dir=cache_dir, shareability=shareability, loader=loader, + policyengine_us_data_repo=policyengine_us_data_repo, + policyengine_us_data_python=policyengine_us_data_python, + target_year=target_year, ) @@ -1143,6 +1297,9 @@ def __init__( cache_dir: str | Path | None = None, shareability: Shareability = Shareability.PUBLIC, loader: DonorSurveyTablesLoader | None = None, + policyengine_us_data_repo: str | Path | None = None, + policyengine_us_data_python: str | Path | None = None, + target_year: int | None = None, ) -> None: super().__init__( block="assets", @@ -1150,6 +1307,9 @@ def __init__( cache_dir=cache_dir, shareability=shareability, loader=loader, + policyengine_us_data_repo=policyengine_us_data_repo, + policyengine_us_data_python=policyengine_us_data_python, + target_year=target_year, ) @@ -1164,6 +1324,7 @@ def __init__( loader: DonorSurveyTablesLoader | None = None, policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, + target_year: int | None = None, ) -> None: super().__init__( spec=get_pe_source_impute_block_spec("scf"), @@ -1172,4 +1333,5 @@ def __init__( loader=loader, policyengine_us_data_repo=policyengine_us_data_repo, policyengine_us_data_python=policyengine_us_data_python, + target_year=target_year, ) diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild.py b/src/microplex_us/pipelines/pe_us_data_rebuild.py index 6be44f4..d09e95d 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild.py @@ -131,6 +131,7 @@ def default_policyengine_us_data_rebuild_source_providers( cps_cache = None if cps_cache_dir is None else Path(cps_cache_dir) puf_cache = None if puf_cache_dir is None else Path(puf_cache_dir) donor_cache = None if donor_cache_dir is None else Path(donor_cache_dir) + donor_target_year = int(puf_target_year) providers: list[SourceProvider] = [ CPSASECSourceProvider( year=int(cps_source_year), @@ -176,11 +177,17 @@ def default_policyengine_us_data_rebuild_source_providers( block="tips", year=int(sipp_year), cache_dir=donor_cache, + policyengine_us_data_repo=policyengine_us_data_repo, + policyengine_us_data_python=policyengine_us_data_python, + target_year=donor_target_year, ), SIPPSourceProvider( block="assets", year=int(sipp_year), cache_dir=donor_cache, + policyengine_us_data_repo=policyengine_us_data_repo, + policyengine_us_data_python=policyengine_us_data_python, + target_year=donor_target_year, ), ] ) @@ -190,6 +197,7 @@ def default_policyengine_us_data_rebuild_source_providers( year=int(scf_year), policyengine_us_data_repo=policyengine_us_data_repo, policyengine_us_data_python=policyengine_us_data_python, + target_year=donor_target_year, ) ) return tuple(providers) diff --git a/tests/pipelines/test_pe_us_data_rebuild.py b/tests/pipelines/test_pe_us_data_rebuild.py index ca5ac9c..aacfe69 100644 --- a/tests/pipelines/test_pe_us_data_rebuild.py +++ b/tests/pipelines/test_pe_us_data_rebuild.py @@ -141,9 +141,14 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund assert providers[2].year == 2024 assert isinstance(providers[3], SIPPSourceProvider) assert providers[3].block == "tips" + assert providers[3].target_year == 2024 + assert providers[3].policyengine_us_data_python == "/tmp/pe-python" assert isinstance(providers[4], SIPPSourceProvider) assert providers[4].block == "assets" + assert providers[4].target_year == 2024 + assert providers[4].policyengine_us_data_python == "/tmp/pe-python" assert isinstance(providers[5], SCFSourceProvider) + assert providers[5].target_year == 2024 def test_default_policyengine_us_data_rebuild_source_providers_keeps_acs_when_donor_surveys_disabled() -> ( @@ -177,9 +182,12 @@ def test_default_policyengine_us_data_rebuild_source_providers_can_include_donor assert isinstance(providers[2], ACSSourceProvider) assert isinstance(providers[3], SIPPSourceProvider) assert providers[3].block == "tips" + assert providers[3].target_year == 2024 assert isinstance(providers[4], SIPPSourceProvider) assert providers[4].block == "assets" + assert providers[4].target_year == 2024 assert isinstance(providers[5], SCFSourceProvider) + assert providers[5].target_year == 2024 def test_build_policyengine_us_data_rebuild_pipeline_returns_configured_pipeline() -> ( diff --git a/tests/test_donor_survey_source_providers.py b/tests/test_donor_survey_source_providers.py index 06d72fe..218121e 100644 --- a/tests/test_donor_survey_source_providers.py +++ b/tests/test_donor_survey_source_providers.py @@ -4,6 +4,7 @@ import h5py import pandas as pd +import pytest from microplex.core import EntityType import microplex_us.data_sources.donor_surveys as donor_surveys @@ -275,6 +276,25 @@ def _scf_tables(**_kwargs) -> DonorSurveyTables: return DonorSurveyTables(households=households, persons=persons) +def _write_uprating_factors( + repo_root, + rows: dict[str, tuple[float, float, float]], +) -> None: + storage_dir = repo_root / "policyengine_us_data" / "storage" + storage_dir.mkdir(parents=True) + pd.DataFrame( + [ + { + "Variable": variable, + "2022": values[0], + "2023": values[1], + "2024": values[2], + } + for variable, values in rows.items() + ] + ).to_csv(storage_dir / "uprating_factors.csv", index=False) + + def test_acs_source_provider_builds_observation_frame_from_injected_loader() -> None: provider = ACSSourceProvider(loader=_acs_tables) @@ -497,6 +517,100 @@ def test_sipp_and_scf_provider_fillers_are_not_usable_as_conditions() -> None: assert scf_frame.source.observes("net_worth", EntityType.PERSON) +def test_sipp_provider_uprates_amounts_to_target_year(tmp_path) -> None: + _write_uprating_factors( + tmp_path, + { + "employment_income_before_lsr": (1.0, 1.0, 1.1), + "tip_income": (1.0, 1.0, 1.25), + }, + ) + + provider = SIPPSourceProvider( + block="tips", + loader=_sipp_tips_tables, + policyengine_us_data_repo=tmp_path, + target_year=2024, + ) + + frame = provider.load_frame() + households = frame.tables[EntityType.HOUSEHOLD] + persons = frame.tables[EntityType.PERSON] + + assert frame.source.name == "sipp_tips_2023" + assert households["year"].tolist() == [2024, 2024] + assert persons["year"].tolist() == [2024, 2024, 2024] + assert persons["employment_income"].tolist() == pytest.approx( + [44_000.0, 0.0, 27_500.0] + ) + assert persons["income"].tolist() == pytest.approx([44_000.0, 0.0, 27_500.0]) + assert persons["tip_income"].tolist() == [1_125.0, 0.0, 312.5] + assert persons["weight"].tolist() == [80.0, 80.0, 90.0] + + +def test_sipp_asset_provider_uprates_liquid_assets_to_target_year(tmp_path) -> None: + _write_uprating_factors( + tmp_path, + { + "employment_income_before_lsr": (1.0, 1.0, 1.1), + "bank_account_assets": (1.0, 1.0, 1.2), + "stock_assets": (1.0, 1.0, 1.3), + "bond_assets": (1.0, 1.0, 1.4), + }, + ) + + provider = SIPPSourceProvider( + block="assets", + loader=_sipp_assets_tables, + policyengine_us_data_repo=tmp_path, + target_year=2024, + ) + + persons = provider.load_frame().tables[EntityType.PERSON] + + assert persons["employment_income"].tolist() == pytest.approx([44_000.0, 27_500.0]) + assert persons["income"].tolist() == pytest.approx([44_000.0, 27_500.0]) + assert persons["bank_account_assets"].tolist() == [3_000.0, 12_000.0] + assert persons["stock_assets"].tolist() == [0.0, 5_200.0] + assert persons["bond_assets"].tolist() == [0.0, 2_100.0] + + +def test_scf_provider_uprates_amounts_to_target_year(tmp_path) -> None: + _write_uprating_factors( + tmp_path, + { + "employment_income_before_lsr": (1.0, 1.0, 1.1), + "taxable_interest_income": (1.0, 1.0, 2.0), + "social_security_retirement": (1.0, 1.0, 1.5), + "net_worth": (1.0, 1.0, 1.2), + "auto_loan_balance": (1.0, 1.0, 1.3), + "auto_loan_interest": (1.0, 1.0, 1.4), + }, + ) + + provider = SCFSourceProvider( + loader=_scf_tables, + policyengine_us_data_repo=tmp_path, + target_year=2024, + ) + + frame = provider.load_frame() + households = frame.tables[EntityType.HOUSEHOLD] + persons = frame.tables[EntityType.PERSON] + + assert frame.source.name == "scf_2022" + assert households["year"].tolist() == [2024, 2024] + assert persons["year"].tolist() == [2024, 2024] + assert persons["employment_income"].tolist() == [82_500.0, 0.0] + assert persons["income"].tolist() == [82_500.0, 0.0] + assert persons["interest_dividend_income"].tolist() == [2_400.0, 800.0] + assert persons["social_security_pension_income"].tolist() == [0.0, 27_000.0] + assert persons["net_worth"].tolist() == [420_000.0, 216_000.0] + assert persons["auto_loan_balance"].tolist() == [10_400.0, 0.0] + assert persons["auto_loan_interest"].tolist() == [770.0, 0.0] + assert persons["weight"].tolist() == [10.0, 12.0] + + def test_scf_source_provider_uses_manifest_backed_dataset_loader( monkeypatch, ) -> None: