Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 167 additions & 5 deletions src/microplex_us/data_sources/donor_surveys.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,55 @@
"year",
)

DONOR_UPRATING_EXCLUDED_COLUMNS = {
"person_id",
"household_id",
"weight",
"year",
"age",
"sex",
"is_female",
"is_male",
"is_married",
"is_household_head",
"cps_race",
"state_fips",
"tenure",
"tenure_type",
"own_children_in_household",
"count_under_18",
"count_under_6",
"household_size",
}

DONOR_UPRATING_FACTOR_ALIASES = {
"employment_income": "employment_income_before_lsr",
"income": "employment_income_before_lsr",
"interest_dividend_income": "taxable_interest_income",
"social_security_pension_income": "social_security_retirement",
"scf_certificates_of_deposit": "bank_account_assets",
"scf_savings_bonds": "bond_assets",
"scf_retirement_assets": "net_worth",
"scf_cash_value_life_insurance": "net_worth",
"scf_other_managed_assets": "stock_assets",
"scf_other_financial_assets": "net_worth",
"scf_primary_residence_value": "home_equity",
"scf_other_residential_real_estate": "household_other_real_estate_value",
"scf_nonresidential_real_estate_equity": "household_other_real_estate_equity",
"scf_business_equity": "household_business_assets_equity",
"scf_other_nonfinancial_assets": "net_worth",
"scf_mortgage_debt": "first_home_mortgage_balance",
"scf_other_residential_debt": "household_other_real_estate_debt",
"scf_other_lines_of_credit": "household_vehicles_debt",
"scf_credit_card_debt": "household_vehicles_debt",
"scf_vehicle_installment_debt": "household_vehicles_debt",
"scf_student_loan_debt": "household_vehicles_debt",
"scf_other_installment_debt": "household_vehicles_debt",
"scf_other_debt": "household_vehicles_debt",
}

TARGET_YEAR_UPRATED_SURVEYS = {"sipp", "scf"}


@dataclass(frozen=True)
class DonorSurveyTables:
Expand Down Expand Up @@ -843,6 +892,80 @@ def _download_policyengine_us_data_file(
return Path(downloaded)


def _load_policyengine_uprating_factors(
*,
policyengine_us_data_repo: str | Path | None,
cache_dir: Path | None,
) -> pd.DataFrame:
if policyengine_us_data_repo is None:
factors_path = _download_policyengine_us_data_file(
filename="uprating_factors.csv",
cache_dir=cache_dir,
)
else:
repo_root = resolve_policyengine_us_data_repo_root(policyengine_us_data_repo)
factors_path = (
repo_root / "policyengine_us_data" / "storage" / "uprating_factors.csv"
)
if not factors_path.exists():
raise FileNotFoundError(
f"Missing PolicyEngine US-data uprating factors: {factors_path}"
)
return pd.read_csv(factors_path).set_index("Variable")


def _uprate_donor_tables_to_target_year(
tables: DonorSurveyTables,
*,
spec: PESourceImputeBlockSpec,
source_year: int,
target_year: int | None,
policyengine_us_data_repo: str | Path | None,
cache_dir: str | Path | None,
) -> DonorSurveyTables:
if (
target_year is None
or int(target_year) == int(source_year)
or spec.survey_name not in TARGET_YEAR_UPRATED_SURVEYS
):
return tables

resolved_cache_dir = None if cache_dir is None else Path(cache_dir)
factors = _load_policyengine_uprating_factors(
policyengine_us_data_repo=policyengine_us_data_repo,
cache_dir=resolved_cache_dir,
)
start_column = str(int(source_year))
end_column = str(int(target_year))
if start_column not in factors.columns or end_column not in factors.columns:
raise ValueError(
"PolicyEngine US-data uprating factors do not cover "
f"{source_year}->{target_year}"
)

persons = tables.persons.copy()
for column in persons.columns:
if column in DONOR_UPRATING_EXCLUDED_COLUMNS:
continue
factor_name = DONOR_UPRATING_FACTOR_ALIASES.get(column, column)
if factor_name not in factors.index:
continue
start = float(factors.loc[factor_name, start_column])
end = float(factors.loc[factor_name, end_column])
if start == 0:
raise ValueError(f"Zero uprating base for {factor_name} in {source_year}")
persons[column] = pd.to_numeric(persons[column], errors="coerce").fillna(
0.0
) * (end / start)

if "year" in persons.columns:
persons["year"] = int(target_year)
households = tables.households.copy()
if "year" in households.columns:
households["year"] = int(target_year)
return DonorSurveyTables(households=households, persons=persons)


def _build_joined_raw_identifier(
frame: pd.DataFrame,
*,
Expand Down Expand Up @@ -914,6 +1037,9 @@ def _load_sipp_tables_from_spec(
df[variable] = df[source_variable]

df = apply_pe_source_impute_loader_postprocess(df, spec)
for variable, source_variable in raw_loader.copy_columns.items():
if source_variable in df.columns:
df[variable] = df[source_variable]
households = (
df[["household_id", "weight", "state_fips", "tenure", "year"]]
.rename(columns={"weight": "household_weight"})
Expand Down Expand Up @@ -1018,9 +1144,11 @@ def __init__(
loader: DonorSurveyTablesLoader | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
target_year: int | None = None,
) -> None:
self.spec = spec
self.year = int(spec.default_year if year is None else year)
self.target_year = None if target_year is None else int(target_year)
self.cache_dir = None if cache_dir is None else Path(cache_dir)
self.shareability = shareability
self.loader = loader
Expand All @@ -1042,22 +1170,34 @@ def load_frame(self, query: SourceQuery | None = None) -> ObservationFrame:
provider_filters = query.provider_filters
loader = self.loader or _default_loader_for_spec(self.spec)
year = int(provider_filters.get("year", self.year))
target_year = provider_filters.get("target_year", self.target_year)
resolved_target_year = None if target_year is None else int(target_year)
cache_dir = provider_filters.get("cache_dir", self.cache_dir)
policyengine_us_data_repo = provider_filters.get(
"policyengine_us_data_repo",
self.policyengine_us_data_repo,
)
tables = loader(
year=year,
sample_n=provider_filters.get("sample_n"),
random_seed=int(provider_filters.get("random_seed", 0)),
state_floor=provider_filters.get("state_floor"),
state_age_floor=provider_filters.get("state_age_floor"),
cache_dir=provider_filters.get("cache_dir", self.cache_dir),
policyengine_us_data_repo=provider_filters.get(
"policyengine_us_data_repo",
self.policyengine_us_data_repo,
),
cache_dir=cache_dir,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=provider_filters.get(
"policyengine_us_data_python",
self.policyengine_us_data_python,
),
)
tables = _uprate_donor_tables_to_target_year(
tables,
spec=self.spec,
source_year=year,
target_year=resolved_target_year,
policyengine_us_data_repo=policyengine_us_data_repo,
cache_dir=cache_dir,
)
frame = _build_observation_frame(
households=tables.households,
persons=tables.persons,
Expand All @@ -1080,6 +1220,7 @@ def __init__(
loader: DonorSurveyTablesLoader | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
target_year: int | None = None,
) -> None:
super().__init__(
spec=get_pe_source_impute_block_spec("acs"),
Expand All @@ -1088,6 +1229,7 @@ def __init__(
loader=loader,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
target_year=target_year,
)


Expand All @@ -1102,6 +1244,9 @@ def __init__(
cache_dir: str | Path | None = None,
shareability: Shareability = Shareability.PUBLIC,
loader: DonorSurveyTablesLoader | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
target_year: int | None = None,
) -> None:
self.block = block
super().__init__(
Expand All @@ -1110,6 +1255,9 @@ def __init__(
cache_dir=cache_dir,
shareability=shareability,
loader=loader,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
target_year=target_year,
)


Expand All @@ -1123,13 +1271,19 @@ def __init__(
cache_dir: str | Path | None = None,
shareability: Shareability = Shareability.PUBLIC,
loader: DonorSurveyTablesLoader | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
target_year: int | None = None,
) -> None:
super().__init__(
block="tips",
year=year,
cache_dir=cache_dir,
shareability=shareability,
loader=loader,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
target_year=target_year,
)


Expand All @@ -1143,13 +1297,19 @@ def __init__(
cache_dir: str | Path | None = None,
shareability: Shareability = Shareability.PUBLIC,
loader: DonorSurveyTablesLoader | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
target_year: int | None = None,
) -> None:
super().__init__(
block="assets",
year=year,
cache_dir=cache_dir,
shareability=shareability,
loader=loader,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
target_year=target_year,
)


Expand All @@ -1164,6 +1324,7 @@ def __init__(
loader: DonorSurveyTablesLoader | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
target_year: int | None = None,
) -> None:
super().__init__(
spec=get_pe_source_impute_block_spec("scf"),
Expand All @@ -1172,4 +1333,5 @@ def __init__(
loader=loader,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
target_year=target_year,
)
8 changes: 8 additions & 0 deletions src/microplex_us/pipelines/pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def default_policyengine_us_data_rebuild_source_providers(
cps_cache = None if cps_cache_dir is None else Path(cps_cache_dir)
puf_cache = None if puf_cache_dir is None else Path(puf_cache_dir)
donor_cache = None if donor_cache_dir is None else Path(donor_cache_dir)
donor_target_year = int(puf_target_year)
providers: list[SourceProvider] = [
CPSASECSourceProvider(
year=int(cps_source_year),
Expand Down Expand Up @@ -176,11 +177,17 @@ def default_policyengine_us_data_rebuild_source_providers(
block="tips",
year=int(sipp_year),
cache_dir=donor_cache,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
target_year=donor_target_year,
),
SIPPSourceProvider(
block="assets",
year=int(sipp_year),
cache_dir=donor_cache,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
target_year=donor_target_year,
),
]
)
Expand All @@ -190,6 +197,7 @@ def default_policyengine_us_data_rebuild_source_providers(
year=int(scf_year),
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
target_year=donor_target_year,
)
)
return tuple(providers)
Expand Down
8 changes: 8 additions & 0 deletions tests/pipelines/test_pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,14 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund
assert providers[2].year == 2024
assert isinstance(providers[3], SIPPSourceProvider)
assert providers[3].block == "tips"
assert providers[3].target_year == 2024
assert providers[3].policyengine_us_data_python == "/tmp/pe-python"
assert isinstance(providers[4], SIPPSourceProvider)
assert providers[4].block == "assets"
assert providers[4].target_year == 2024
assert providers[4].policyengine_us_data_python == "/tmp/pe-python"
assert isinstance(providers[5], SCFSourceProvider)
assert providers[5].target_year == 2024


def test_default_policyengine_us_data_rebuild_source_providers_keeps_acs_when_donor_surveys_disabled() -> (
Expand Down Expand Up @@ -177,9 +182,12 @@ def test_default_policyengine_us_data_rebuild_source_providers_can_include_donor
assert isinstance(providers[2], ACSSourceProvider)
assert isinstance(providers[3], SIPPSourceProvider)
assert providers[3].block == "tips"
assert providers[3].target_year == 2024
assert isinstance(providers[4], SIPPSourceProvider)
assert providers[4].block == "assets"
assert providers[4].target_year == 2024
assert isinstance(providers[5], SCFSourceProvider)
assert providers[5].target_year == 2024


def test_build_policyengine_us_data_rebuild_pipeline_returns_configured_pipeline() -> (
Expand Down
Loading
Loading