Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 185 additions & 20 deletions src/microplex_us/data_sources/donor_surveys.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pathlib import Path
from textwrap import dedent

import h5py
import numpy as np
import pandas as pd
from microplex.core import (
Expand Down Expand Up @@ -104,7 +105,9 @@ def _descriptor_from_tables(
entity=EntityType.HOUSEHOLD,
key_column="household_id",
variable_names=household_variables,
weight_column="household_weight" if "household_weight" in households.columns else None,
weight_column="household_weight"
if "household_weight" in households.columns
else None,
period_column="year" if "year" in households.columns else None,
),
EntityObservation(
Expand Down Expand Up @@ -169,9 +172,7 @@ def _ensure_person_ids(persons: pd.DataFrame) -> pd.DataFrame:

if "household_id" in result.columns:
composite = (
result["household_id"].astype(str)
+ ":"
+ result["person_id"].astype(str)
result["household_id"].astype(str) + ":" + result["person_id"].astype(str)
)
if not composite.duplicated().any():
result["person_id"] = composite
Expand Down Expand Up @@ -212,7 +213,9 @@ def _sample_households_and_persons(
sampled_persons = persons[persons["household_id"].isin(keep)].copy()
return (
sampled_households.sort_values(["household_id"]).reset_index(drop=True),
sampled_persons.sort_values(["household_id", "person_id"]).reset_index(drop=True),
sampled_persons.sort_values(["household_id", "person_id"]).reset_index(
drop=True
),
)


Expand Down Expand Up @@ -520,6 +523,143 @@ def _build_persons():
)


def _decode_h5_values(values: np.ndarray) -> np.ndarray:
"""Decode fixed-width HDF5 byte strings to ordinary Python strings."""
if values.dtype.kind not in {"S", "O"}:
return values
return np.asarray(
[
value.decode() if isinstance(value, (bytes, bytearray)) else value
for value in values
]
)


def _load_policyengine_us_data_h5_dataset(
*,
filename: str,
policyengine_us_data_repo: str | Path | None,
cache_dir: Path | None,
) -> dict[str, np.ndarray]:
if policyengine_us_data_repo is None:
h5_path = _download_policyengine_us_data_file(
filename=filename,
cache_dir=cache_dir,
)
else:
repo_root = resolve_policyengine_us_data_repo_root(policyengine_us_data_repo)
h5_path = repo_root / "policyengine_us_data" / "storage" / filename
if not h5_path.exists():
raise FileNotFoundError(f"Missing PolicyEngine US-data H5 file: {h5_path}")
with h5py.File(h5_path, "r") as h5:
return {key: _decode_h5_values(np.asarray(h5[key])) for key in h5.keys()}


def _build_policyengine_dataset_tables_from_arrays(
*,
data: dict[str, np.ndarray],
dataset_loader: PEPolicyengineDatasetLoaderSpec,
year: int,
) -> DonorSurveyTables:
spec = asdict(dataset_loader)

def _numeric(values):
return pd.to_numeric(pd.Series(np.asarray(values)), errors="coerce").fillna(0.0)

def _boolean_float(values):
return pd.Series(np.asarray(values)).astype(bool).astype(float)

def _text(values):
return pd.Series(_decode_h5_values(np.asarray(values))).astype(str)

def _mapped_text(values, mapping):
return _text(values).map(mapping).fillna(0).astype(int)

def _load_fallback(keys):
for key in keys:
if key in data:
return pd.Series(np.asarray(data[key]))
raise KeyError(f"Missing fallback keys {keys} in dataset payload")

if spec["builder_kind"] == "household_rows":
household_index = pd.Index(data[spec["household_index_key"]])
person_households = pd.Index(data[spec["person_household_key"]])
household_to_row = pd.Series(
np.arange(len(household_index), dtype=np.int64),
index=household_index,
)
household_rows = household_to_row.loc[person_households].to_numpy()
persons = pd.DataFrame({"household_id": person_households.to_numpy()})
if spec["person_id_key"] is not None:
persons["person_id"] = np.asarray(data[spec["person_id_key"]])
for target, source in spec["direct_person_columns"].items():
persons[target] = _numeric(data[source])
for target, source in spec["boolean_person_columns"].items():
persons[target] = _boolean_float(data[source])
for target, source in spec["row_indexed_person_columns"].items():
persons[target] = _numeric(np.asarray(data[source])[household_rows])
for target, source in spec["mapped_row_person_columns"].items():
persons[target] = _mapped_text(
np.asarray(data[source])[household_rows],
spec["mapped_value_tables"][target],
)
elif spec["builder_kind"] == "single_person_households":
base_length = len(data[spec["length_source_key"]])
if spec["generated_household_ids"]:
household_ids = np.arange(base_length, dtype=np.int64) + 1
else:
household_ids = np.asarray(data[spec["household_index_key"]])
persons = pd.DataFrame({"household_id": household_ids})
if spec["person_id_from_household_id"]:
persons["person_id"] = persons["household_id"]
elif spec["person_id_key"] is not None:
persons["person_id"] = np.asarray(data[spec["person_id_key"]])
for target, source in spec["direct_person_columns"].items():
persons[target] = _numeric(data[source])
for target, source in spec["boolean_person_columns"].items():
persons[target] = _boolean_float(data[source])
else:
raise ValueError(
f"Unsupported dataset loader builder kind: {spec['builder_kind']}"
)

for target, keys in spec["fallback_person_columns"].items():
persons[target] = _numeric(_load_fallback(keys))
if spec["sex_from_boolean_source"] is not None:
source = spec["sex_from_boolean_source"]
source_values = pd.Series(persons[source]).astype(bool).to_numpy()
persons["sex"] = np.where(
source_values,
spec["sex_true_value"],
spec["sex_false_value"],
)
for target, source in spec["copy_person_columns"].items():
persons[target] = persons[source]
for target, value in spec["constant_person_columns"].items():
persons[target] = value
if spec["income_sum_columns"]:
persons["income"] = sum(
_numeric(persons[column]) for column in spec["income_sum_columns"]
)
for column in spec["int_person_columns"]:
if column in persons.columns:
persons[column] = (
pd.to_numeric(persons[column], errors="coerce").fillna(0).astype(int)
)
persons["year"] = int(year)
households = (
persons[["household_id", "state_fips", "tenure", "weight", "year"]]
.rename(columns={"weight": "household_weight"})
.drop_duplicates(subset=["household_id"])
)
return DonorSurveyTables(
households=households.sort_values(["household_id"]).reset_index(drop=True),
persons=persons.sort_values(["household_id", "person_id"]).reset_index(
drop=True
),
)


def _run_policyengine_dataset_loader(
*,
script: str,
Expand Down Expand Up @@ -568,7 +708,9 @@ def _run_policyengine_dataset_loader_from_spec(
) -> DonorSurveyTables:
dataset_loader = spec.dataset_loader
if dataset_loader is None:
raise ValueError(f"PE source-impute block '{spec.key}' is missing a dataset loader spec")
raise ValueError(
f"PE source-impute block '{spec.key}' is missing a dataset loader spec"
)
return _run_policyengine_dataset_loader(
script=_build_policyengine_dataset_loader_script(dataset_loader, year=year),
sample_n=sample_n,
Expand All @@ -589,27 +731,46 @@ def _default_acs_tables_loader(
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
) -> DonorSurveyTables:
_ = cache_dir
spec = get_pe_source_impute_block_spec("acs")
if int(year) != spec.default_year:
if int(year) != spec.default_year and policyengine_us_data_repo is None:
raise ValueError(
f"{spec.descriptor_name} provider currently supports year={spec.default_year} only"
f"{spec.descriptor_name} provider supports non-default years only "
"when policyengine_us_data_repo is provided"
)
if int(year) == spec.default_year:
tables = _run_policyengine_dataset_loader_from_spec(
spec=spec,
year=year,
sample_n=None if state_floor else sample_n,
random_seed=random_seed,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
)
else:
dataset_loader = spec.dataset_loader
if dataset_loader is None:
raise ValueError(
f"PE source-impute block '{spec.key}' is missing a dataset loader spec"
)
data = _load_policyengine_us_data_h5_dataset(
filename=f"acs_{int(year)}.h5",
policyengine_us_data_repo=policyengine_us_data_repo,
cache_dir=cache_dir,
)
tables = _build_policyengine_dataset_tables_from_arrays(
data=data,
dataset_loader=dataset_loader,
year=year,
)
tables = _run_policyengine_dataset_loader_from_spec(
spec=spec,
year=year,
sample_n=None if state_floor else sample_n,
random_seed=random_seed,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
)
households = (
tables.households.drop_duplicates(subset=["household_id"])
.sort_values(["household_id"])
.reset_index(drop=True)
)
persons = (
tables.persons[tables.persons["household_id"].isin(set(households["household_id"]))]
tables.persons[
tables.persons["household_id"].isin(set(households["household_id"]))
]
.sort_values(["household_id", "person_id"])
.reset_index(drop=True)
)
Expand Down Expand Up @@ -705,7 +866,9 @@ def _load_sipp_tables_from_spec(
) -> DonorSurveyTables:
raw_loader = spec.raw_loader
if raw_loader is None:
raise ValueError(f"PE source-impute block '{spec.key}' is missing a raw loader spec")
raise ValueError(
f"PE source-impute block '{spec.key}' is missing a raw loader spec"
)
if int(year) != spec.default_year:
raise ValueError(
f"{spec.descriptor_name} provider currently supports year={spec.default_year} only"
Expand All @@ -729,7 +892,9 @@ def _load_sipp_tables_from_spec(
df[variable] = values.astype(float)
for variable, contains in raw_loader.sum_columns_contains.items():
matched_columns = [column for column in df.columns if contains in column]
df[variable] = df[matched_columns].fillna(0).sum(axis=1) if matched_columns else 0.0
df[variable] = (
df[matched_columns].fillna(0).sum(axis=1) if matched_columns else 0.0
)
for variable, indicator in raw_loader.indicator_columns.items():
raw_values = pd.to_numeric(df[indicator.column], errors="coerce").fillna(0.0)
df[variable] = raw_values.eq(indicator.equals).astype(float)
Expand Down
2 changes: 1 addition & 1 deletion src/microplex_us/pipelines/pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def default_policyengine_us_data_rebuild_source_providers(
include_donor_surveys: bool = True,
include_sipp: bool | None = None,
include_scf: bool | None = None,
acs_year: int = 2022,
acs_year: int = 2024,
sipp_year: int = 2023,
scf_year: int = 2022,
donor_cache_dir: str | Path | None = None,
Expand Down
4 changes: 2 additions & 2 deletions src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1981,7 +1981,7 @@ def run_policyengine_us_data_rebuild_checkpoint(
include_donor_surveys: bool = True,
include_sipp: bool | None = None,
include_scf: bool | None = None,
acs_year: int = 2022,
acs_year: int = 2024,
sipp_year: int = 2023,
scf_year: int = 2022,
donor_cache_dir: str | Path | None = None,
Expand Down Expand Up @@ -2264,7 +2264,7 @@ def main(argv: list[str] | None = None) -> None:
parser.add_argument("--cps-source-year", type=int, default=2023)
parser.add_argument("--puf-target-year", type=int)
parser.add_argument("--puf-cps-reference-year", type=int)
parser.add_argument("--acs-year", type=int, default=2022)
parser.add_argument("--acs-year", type=int, default=2024)
parser.add_argument("--sipp-year", type=int, default=2023)
parser.add_argument("--scf-year", type=int, default=2022)
parser.add_argument("--cps-cache-dir")
Expand Down
9 changes: 9 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -5453,6 +5453,15 @@ def score(source: USMicroplexSourceInput) -> tuple[int, int, int, int]:
household_rows,
)

if self.config.puf_support_clone_enabled:
cps_candidates = [
source
for source in candidates
if self._is_cps_asec_scaffold_source(source.frame.source.name)
]
if cps_candidates:
return max(cps_candidates, key=score)

return max(candidates, key=score)

def _household_geography_coverage(
Expand Down
1 change: 1 addition & 0 deletions tests/pipelines/test_pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund
SOCIAL_SECURITY_SPLIT_STRATEGY_PE_QRF
)
assert isinstance(providers[2], ACSSourceProvider)
assert providers[2].year == 2024
assert isinstance(providers[3], SIPPSourceProvider)
assert providers[3].block == "tips"
assert isinstance(providers[4], SIPPSourceProvider)
Expand Down
Loading
Loading