Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/microplex_us/data_sources/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

# Default cache directory
DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex"
CPS_ASEC_PROCESSED_CACHE_VERSION = "20260601_ecps_age80_84"
CPS_ASEC_PROCESSED_CACHE_VERSION = "20260602_childcare_input"

# CPS ASEC data URLs by year
CPS_URLS = {
Expand Down Expand Up @@ -244,6 +244,7 @@
"social_security_survivors",
"social_security_dependents",
"receives_wic",
"spm_unit_pre_subsidy_childcare_expenses",
)

PERSON_CPS_DISABILITY_COLUMNS = (
Expand Down
6 changes: 6 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -10208,6 +10208,12 @@ def has_any(*columns: str) -> bool:
else:
result["is_hispanic"] = hispanic.fillna(0).astype(int).ne(0)

if has_any("pre_subsidy_rent", "rent"):
result["pre_subsidy_rent"] = first_nonzero_or_present(
"pre_subsidy_rent",
"rent",
).clip(lower=0.0)

marital_status = (
pd.to_numeric(result["marital_status"], errors="coerce")
if "marital_status" in result.columns
Expand Down
5 changes: 5 additions & 0 deletions src/microplex_us/policyengine/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ class PolicyEngineUSVariableMaterializationResult:
# the persisted leaf input when no explicit last-week field is present.
"hours_worked": "hours_worked_last_week",
"race": "cps_race",
# PE-US computes ``rent`` from the persisted source input
# ``pre_subsidy_rent``. Microplex's ACS donor block restores the
# user-facing ``rent`` column, so export it through the storable leaf input
# rather than dropping it as formula-owned.
"rent": "pre_subsidy_rent",
}

POLICYENGINE_US_STRUCTURAL_EXPORT_COLUMNS: frozenset[str] = frozenset(
Expand Down
21 changes: 21 additions & 0 deletions tests/pipelines/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -4790,6 +4790,27 @@ def test_augment_policyengine_person_inputs_materializes_non_sch_d_capital_gains

assert augmented["non_sch_d_capital_gains"].tolist() == [250.0]

def test_augment_policyengine_person_inputs_aliases_rent_to_pre_subsidy_rent(
self,
):
pipeline = USMicroplexPipeline(USMicroplexBuildConfig())
persons = pd.DataFrame(
{
"rent": [14_400.0, 0.0, 9_600.0],
"pre_subsidy_rent": [0.0, 7_200.0, None],
"age": [45, 70, 12],
"sex": [1, 2, 1],
}
)

augmented = pipeline._augment_policyengine_person_inputs(persons)

assert augmented["pre_subsidy_rent"].tolist() == [
14_400.0,
7_200.0,
9_600.0,
]

def test_augment_policyengine_person_inputs_zeros_part_b_without_medicare(
self,
):
Expand Down
49 changes: 49 additions & 0 deletions tests/policyengine/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -2644,6 +2644,55 @@ class FakeSystem:
"is_hispanic": "is_hispanic",
}.items() <= export_maps["person"].items()

def test_build_policyengine_us_export_variable_maps_aliases_rent_to_pre_subsidy_rent(
self,
):
class FakeEntity:
def __init__(self, key):
self.key = key

class FakeVariable:
def __init__(self, entity, formulas=None):
self.entity = FakeEntity(entity)
self.formulas = formulas or {}

class FakeSystem:
variables = {
"pre_subsidy_rent": FakeVariable("person"),
"rent": FakeVariable("person", formulas={"2024": object()}),
}

tables = PolicyEngineUSEntityTableBundle(
households=pd.DataFrame(
{
"household_id": [10, 20],
"household_weight": [1.0, 1.0],
}
),
persons=pd.DataFrame(
{
"person_id": [1, 2],
"household_id": [10, 20],
"rent": [14_400.0, 0.0],
}
),
)

export_maps = build_policyengine_us_export_variable_maps(
tables,
tax_benefit_system=FakeSystem(),
)
arrays = build_policyengine_us_time_period_arrays(
tables,
period=2024,
person_variable_map=export_maps["person"],
)

assert export_maps["person"]["rent"] == "pre_subsidy_rent"
assert "pre_subsidy_rent" not in export_maps["person"]
assert "rent" not in export_maps["person"].values()
assert arrays["pre_subsidy_rent"]["2024"].tolist() == [14_400.0, 0.0]

def test_build_policyengine_us_export_variable_maps_includes_absent_export_defaults(
self,
):
Expand Down
86 changes: 65 additions & 21 deletions tests/test_cps_source_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ def test_cps_parquet_source_provider_loads_observation_frame(tmp_path):

assert isinstance(provider, SourceProvider)
assert set(frame.tables) == {EntityType.HOUSEHOLD, EntityType.PERSON}
assert frame.tables[EntityType.PERSON]["person_id"].tolist() == ["1:1", "1:2", "2:1"]
assert frame.tables[EntityType.PERSON]["person_id"].tolist() == [
"1:1",
"1:2",
"2:1",
]
assert frame.tables[EntityType.HOUSEHOLD]["year"].tolist() == [2024, 2024]
assert frame.source.archetype is SourceArchetype.HOUSEHOLD_INCOME

Expand Down Expand Up @@ -110,7 +114,11 @@ def test_cps_parquet_source_provider_derives_tax_unit_roles_from_tax_id(tmp_path

provider = CPSASECParquetSourceProvider(data_dir=tmp_path, year=2024)
frame = provider.load_frame(SourceQuery(period=2024))
result = frame.tables[EntityType.PERSON].sort_values("person_number").reset_index(drop=True)
result = (
frame.tables[EntityType.PERSON]
.sort_values("person_number")
.reset_index(drop=True)
)

assert result["tax_unit_id"].tolist() == [100, 100, 100, 101]
assert result["tax_unit_is_joint"].tolist() == [1.0, 1.0, 1.0, 0.0]
Expand Down Expand Up @@ -271,7 +279,9 @@ def test_cps_parquet_source_provider_sampling_respects_household_weights(tmp_pat
assert frame.tables[EntityType.PERSON]["household_id"].tolist() == [3]


def test_cps_parquet_source_provider_applies_generic_atomic_variable_semantics(tmp_path):
def test_cps_parquet_source_provider_applies_generic_atomic_variable_semantics(
tmp_path,
):
households = pd.DataFrame(
{
"household_id": [1],
Expand Down Expand Up @@ -391,6 +401,7 @@ def test_load_cps_asec_caches_household_geography_on_persons(tmp_path):
assert "social_security_survivors" in first.persons.columns
assert "social_security_dependents" in first.persons.columns
assert "receives_wic" in first.persons.columns
assert "spm_unit_pre_subsidy_childcare_expenses" in first.persons.columns
assert "has_marketplace_health_coverage" in first.persons.columns
assert "has_esi" in first.persons.columns
assert "tax_unit_id" in first.persons.columns
Expand All @@ -409,7 +420,16 @@ def test_load_cps_asec_caches_household_geography_on_persons(tmp_path):
assert cached_persons["social_security_survivors"].to_list() == [0.0, 0.0, 0.0]
assert cached_persons["social_security_dependents"].to_list() == [0.0, 0.0, 0.0]
assert cached_persons["receives_wic"].to_list() == [True, False, False]
assert cached_persons["has_marketplace_health_coverage"].to_list() == [True, False, False]
assert cached_persons["spm_unit_pre_subsidy_childcare_expenses"].to_list() == [
0.0,
0.0,
0.0,
]
assert cached_persons["has_marketplace_health_coverage"].to_list() == [
True,
False,
False,
]
assert cached_persons["has_esi"].to_list() == [False, True, False]
assert cached_persons["tax_unit_id"].to_list() == [100, 100, 200]
assert cached_persons["spm_unit_id"].to_list() == [10, 10, 20]
Expand Down Expand Up @@ -458,7 +478,9 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path):
archive.writestr("pppub23.csv", person_rows.to_csv(index=False))

dataset = load_cps_asec(year=2023, cache_dir=tmp_path, download=False)
persons = dataset.persons.to_pandas().sort_values("person_number").reset_index(drop=True)
persons = (
dataset.persons.to_pandas().sort_values("person_number").reset_index(drop=True)
)

assert persons["alimony_income"].tolist() == [1200, 0]
assert persons["child_support_received"].tolist() == [300, 0]
Expand All @@ -476,11 +498,15 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path):
assert persons["takes_up_housing_assistance_if_eligible"].tolist() == [True, False]
assert persons["spm_unit_energy_subsidy"].tolist() == [90, 0]
assert persons["spm_unit_pre_subsidy_childcare_expenses"].tolist() == [1500, 0]
assert persons["health_insurance_premiums_without_medicare_part_b"].tolist() == [900, 0]
assert persons["health_insurance_premiums_without_medicare_part_b"].tolist() == [
900,
0,
]
assert persons["over_the_counter_health_expenses"].tolist() == [120, 0]
assert persons["other_medical_expenses"].tolist() == [450, 0]
assert persons["medicare_part_b_premiums"].tolist() == [600, 0]


def test_load_cps_asec_derives_survivor_and_dependent_social_security(tmp_path):
person_rows = pd.DataFrame(
{
Expand All @@ -497,7 +523,9 @@ def test_load_cps_asec_derives_survivor_and_dependent_social_security(tmp_path):
archive.writestr("pppub23.csv", person_rows.to_csv(index=False))

dataset = load_cps_asec(year=2023, cache_dir=tmp_path, download=False)
persons = dataset.persons.to_pandas().sort_values("person_number").reset_index(drop=True)
persons = (
dataset.persons.to_pandas().sort_values("person_number").reset_index(drop=True)
)

assert persons["social_security_survivors"].tolist() == [1000.0, 1100.0, 0.0, 0.0]
assert persons["social_security_dependents"].tolist() == [0.0, 0.0, 1200.0, 1300.0]
Expand Down Expand Up @@ -535,6 +563,7 @@ def test_cps_source_provider_repeat_loads_are_deterministic_for_cached_processed
"other_medical_expenses": [0.0] * 5,
"over_the_counter_health_expenses": [0.0] * 5,
"medicare_part_b_premiums": [0.0] * 5,
"spm_unit_pre_subsidy_childcare_expenses": [0.0] * 5,
"year": [2023, 2023, 2023, 2023, 2023],
}
)
Expand All @@ -553,13 +582,21 @@ def test_cps_source_provider_repeat_loads_are_deterministic_for_cached_processed
first_persons = first.tables[EntityType.PERSON]
second_persons = second.tables[EntityType.PERSON]

assert first_households["household_id"].tolist() == second_households["household_id"].tolist()
assert (
first_households["household_id"].tolist()
== second_households["household_id"].tolist()
)
assert first_persons["person_id"].tolist() == second_persons["person_id"].tolist()
assert first_households["household_weight"].tolist() == second_households["household_weight"].tolist()
assert (
first_households["household_weight"].tolist()
== second_households["household_weight"].tolist()
)
assert first_persons["weight"].tolist() == second_persons["weight"].tolist()


def test_load_cps_asec_rebuilds_stale_processed_cache_without_pe_presim_inputs(tmp_path):
def test_load_cps_asec_rebuilds_stale_processed_cache_without_pe_presim_inputs(
tmp_path,
):
stale_processed = pl.DataFrame(
{
"household_id": [1, 1, 2],
Expand Down Expand Up @@ -623,16 +660,19 @@ def test_load_cps_asec_rebuilds_stale_processed_cache_without_pe_presim_inputs(t
assert dataset.persons["cps_race"].to_list() == [4, 4, 1]
assert dataset.persons["is_hispanic"].to_list() == [False, True, False]
assert dataset.persons["is_disabled"].to_list() == [False, True, False]
assert dataset.persons["has_marketplace_health_coverage"].to_list() == [True, False, False]
assert dataset.persons["has_marketplace_health_coverage"].to_list() == [
True,
False,
False,
]
assert dataset.persons["has_esi"].to_list() == [False, True, False]
assert dataset.persons["alimony_income"].to_list() == [1200, 0, 0]
assert dataset.persons["child_support_received"].to_list() == [300, 0, 0]
assert dataset.persons["child_support_expense"].to_list() == [700, 0, 0]
assert dataset.persons["disability_benefits"].to_list() == [550, 0, 0]
assert (
dataset.persons["health_insurance_premiums_without_medicare_part_b"].to_list()
== [900, 0, 0]
)
assert dataset.persons[
"health_insurance_premiums_without_medicare_part_b"
].to_list() == [900, 0, 0]
assert dataset.persons["other_medical_expenses"].to_list() == [450, 0, 0]
assert dataset.persons["over_the_counter_health_expenses"].to_list() == [120, 0, 0]
assert dataset.persons["medicare_part_b_premiums"].to_list() == [600, 0, 0]
Expand Down Expand Up @@ -661,9 +701,7 @@ def test_cps_sampling_falls_back_to_uniform_when_weighted_sampling_is_infeasible

def flaky_sample(self, *args, **kwargs):
if kwargs.get("weights") is not None:
raise ValueError(
"Weighted sampling cannot be achieved with replace=False."
)
raise ValueError("Weighted sampling cannot be achieved with replace=False.")
return original_sample(self, *args, **kwargs)

monkeypatch.setattr(pd.DataFrame, "sample", flaky_sample)
Expand Down Expand Up @@ -710,10 +748,14 @@ def test_sample_households_and_persons_state_floor_preserves_state_coverage() ->

assert len(sampled_households) == 3
assert sampled_households["state_fips"].nunique() == 3
assert set(sampled_persons["household_id"]) == set(sampled_households["household_id"])
assert set(sampled_persons["household_id"]) == set(
sampled_households["household_id"]
)


def test_sample_households_and_persons_state_age_floor_preserves_age_band_coverage() -> None:
def test_sample_households_and_persons_state_age_floor_preserves_age_band_coverage() -> (
None
):
households = pd.DataFrame(
{
"household_id": [1, 2, 3, 4, 5, 6],
Expand Down Expand Up @@ -759,4 +801,6 @@ def test_sample_households_and_persons_state_age_floor_preserves_age_band_covera

assert len(sampled_households) == 4
assert observed_keys.issubset(sampled_keys)
assert set(sampled_persons["household_id"]) == set(sampled_households["household_id"])
assert set(sampled_persons["household_id"]) == set(
sampled_households["household_id"]
)
Loading