Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/microplex_us/pipelines/pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any

from microplex_us.vintages import MP_2024

if TYPE_CHECKING:
from microplex.core import SourceProvider

Expand Down Expand Up @@ -95,10 +97,10 @@ def default_policyengine_us_data_rebuild_config(

def default_policyengine_us_data_rebuild_source_providers(
*,
cps_source_year: int = 2023,
cps_source_year: int = MP_2024.cps_asec.release,
cps_cache_dir: str | Path | None = None,
cps_download: bool = True,
puf_target_year: int = 2024,
puf_target_year: int = MP_2024.model_year,
puf_cps_reference_year: int | None = None,
puf_cache_dir: str | Path | None = None,
puf_path: str | Path | None = None,
Expand All @@ -107,9 +109,9 @@ def default_policyengine_us_data_rebuild_source_providers(
include_donor_surveys: bool = True,
include_sipp: bool | None = None,
include_scf: bool | None = None,
acs_year: int = 2024,
sipp_year: int = 2023,
scf_year: int = 2022,
acs_year: int = MP_2024.acs.release,
sipp_year: int = MP_2024.sipp.release,
scf_year: int = MP_2024.scf.release,
donor_cache_dir: str | Path | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
Expand Down
19 changes: 11 additions & 8 deletions src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
write_us_stage_run_manifests_from_artifact_manifest,
)
from microplex_us.variables import prune_redundant_variables
from microplex_us.vintages import MP_2024

if TYPE_CHECKING:
from microplex.core import SourceProvider
Expand Down Expand Up @@ -1969,7 +1970,7 @@ def run_policyengine_us_data_rebuild_checkpoint(
config_overrides: dict[str, Any] | None = None,
providers: tuple[SourceProvider, ...] | list[SourceProvider] | None = None,
queries: dict[str, SourceQuery] | None = None,
cps_source_year: int = 2023,
cps_source_year: int = MP_2024.cps_asec.release,
cps_cache_dir: str | Path | None = None,
cps_download: bool = True,
puf_target_year: int | None = None,
Expand All @@ -1981,9 +1982,9 @@ def run_policyengine_us_data_rebuild_checkpoint(
include_donor_surveys: bool = True,
include_sipp: bool | None = None,
include_scf: bool | None = None,
acs_year: int = 2024,
sipp_year: int = 2023,
scf_year: int = 2022,
acs_year: int = MP_2024.acs.release,
sipp_year: int = MP_2024.sipp.release,
scf_year: int = MP_2024.scf.release,
donor_cache_dir: str | Path | None = None,
policyengine_us_data_repo: str | Path | None = None,
policyengine_us_data_python: str | Path | None = None,
Expand Down Expand Up @@ -2261,12 +2262,14 @@ def main(argv: list[str] | None = None) -> None:
"variables. See docs/next-run-plan.md."
),
)
parser.add_argument("--cps-source-year", type=int, default=2023)
parser.add_argument(
"--cps-source-year", type=int, default=MP_2024.cps_asec.release
)
parser.add_argument("--puf-target-year", type=int)
parser.add_argument("--puf-cps-reference-year", type=int)
parser.add_argument("--acs-year", type=int, default=2024)
parser.add_argument("--sipp-year", type=int, default=2023)
parser.add_argument("--scf-year", type=int, default=2022)
parser.add_argument("--acs-year", type=int, default=MP_2024.acs.release)
parser.add_argument("--sipp-year", type=int, default=MP_2024.sipp.release)
parser.add_argument("--scf-year", type=int, default=MP_2024.scf.release)
parser.add_argument("--cps-cache-dir")
parser.add_argument("--puf-cache-dir")
parser.add_argument("--donor-cache-dir")
Expand Down
183 changes: 183 additions & 0 deletions src/microplex_us/vintages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""Single source of truth for the source vintages a built dataset uses.

A :class:`DatasetProfile` declares, in ONE place, the model year a dataset
represents and the exact source *release* feeding each input, plus how that
release's dollars reach the model year (used natively, or aged with a
component-specific factor family).

Build code reads vintages from a profile instead of per-call literal defaults,
so a stale year cannot hide in a function signature, a CLI default, or a
forgotten shell flag: the value is defined once and the safe path is the only
path. (The motivating bug: ``cps_source_year`` defaulted to 2023 -- income year
2022 -- while every production build overrode it to 2025; the stale literal sat
in three signatures for who knows how long because nothing failed.)

The coherence checks here are the spec the build must satisfy: every source must
reach ``model_year`` -- either it is native to that year (``income_year ==
model_year``) or it declares an ``age_to == model_year`` aging step. A source
that does not yet reach the model year must declare a ``gap_reason`` so the gap
is explicit rather than silent. A future build-time gate verifies a produced
artifact against the active profile; this module guarantees the *profile itself*
is internally consistent.
"""

from __future__ import annotations

from dataclasses import dataclass


@dataclass(frozen=True)
class Release:
"""One source release and how its dollars reach a model year.

Attributes:
release: the survey/file release actually loaded (e.g. CPS ASEC 2025).
income_year: the calendar/income year that release represents. CPS ASEC
survey year ``Y`` covers income year ``Y - 1`` (ASEC 2025 -> 2024);
most other sources have ``release == income_year``.
age_to: when set, dollar variables are aged from ``income_year`` to this
year using ``factors``; when ``None`` the release is used on its
native basis.
factors: label of the component-specific growth-factor family used when
aging (e.g. ``"soi"``). Required iff ``age_to`` is set; the build
binds the label to the actual factor implementation.
gap_reason: explicit, temporary acknowledgement that this source does not
yet reach the model year (e.g. aging not wired). Lets a profile stay
honest about a known gap without silently passing coherence.
"""

release: int
income_year: int
age_to: int | None = None
factors: str | None = None
gap_reason: str | None = None

def __post_init__(self) -> None:
if self.age_to is not None and self.factors is None:
raise ValueError(
f"Release(release={self.release}) sets age_to={self.age_to} but "
"no `factors` family to age with."
)
if self.age_to is None and self.factors is not None:
raise ValueError(
f"Release(release={self.release}) sets factors={self.factors!r} "
"but no `age_to` year to age toward."
)
if self.age_to is not None and self.age_to < self.income_year:
raise ValueError(
f"Release(release={self.release}) ages backward: age_to="
f"{self.age_to} < income_year={self.income_year}."
)
if self.gap_reason is not None and not self.gap_reason.strip():
raise ValueError(
f"Release(release={self.release}) has an empty gap_reason; use None "
"for no declared gap or give a real explanation."
)

@property
def effective_year(self) -> int:
"""The model year this release's dollars land on after any aging."""
return self.age_to if self.age_to is not None else self.income_year


@dataclass(frozen=True)
class DatasetProfile:
"""The complete vintage definition for one built dataset.

``model_year`` is the year the dataset represents; every source must reach it
(or declare a ``gap_reason``).
"""

name: str
model_year: int
cps_asec: Release
puf: Release
acs: Release
sipp: Release
scf: Release

def sources(self) -> dict[str, Release]:
return {
"cps_asec": self.cps_asec,
"puf": self.puf,
"acs": self.acs,
"sipp": self.sipp,
"scf": self.scf,
}

def incoherent_sources(self) -> dict[str, str]:
"""Map each source that fails to reach ``model_year`` (and has not
declared a ``gap_reason``) to a human-readable explanation."""
problems: dict[str, str] = {}
for name, release in self.sources().items():
if release.gap_reason is not None:
continue
if release.effective_year != self.model_year:
problems[name] = (
f"reaches {release.effective_year} (release {release.release}, "
f"income {release.income_year}, age_to {release.age_to}); "
f"model_year is {self.model_year}"
)
return problems

def declared_gaps(self) -> dict[str, str]:
"""Map each source with a declared (acknowledged) basis gap to its reason."""
return {
name: release.gap_reason
for name, release in self.sources().items()
if release.gap_reason is not None
}

def __post_init__(self) -> None:
problems = self.incoherent_sources()
if problems:
detail = "; ".join(f"{name}: {why}" for name, why in problems.items())
raise ValueError(
f"DatasetProfile {self.name!r} is incoherent: every source must "
f"reach model_year {self.model_year} or declare a gap_reason. {detail}"
)


# The current Microplex eCPS-replacement target: a 2024 base dataset that
# replaces ``enhanced_cps_2024``. Source releases match what the production build
# loads today; the aging declarations are the spec the build satisfies (PUF ages
# via SOI factors; SIPP/SCF aging to 2024 landed in #185; ACS donor is now the
# native-2024 release).
MP_2024 = DatasetProfile(
name="mp_2024",
model_year=2024,
# CPS ASEC survey year 2025 == income/calendar year 2024: native 2024 spine.
cps_asec=Release(release=2025, income_year=2024),
# Public-use PUF base is 2015 (latest released); aged to 2024 via SOI factors.
puf=Release(release=2015, income_year=2015, age_to=2024, factors="soi"),
# ACS donor is pinned to the 2022 release (manifest block ACS_2022,
# default_year=2022) and is not in TARGET_YEAR_UPRATED_SURVEYS, so it is not
# aged to the model year. The provider default had drifted to 2024 while the
# loader stayed at 2022; declare the real release and flag the gap so the
# ACS-2024 migration is explicit rather than silently assumed done.
acs=Release(
release=2022,
income_year=2022,
gap_reason=(
"ACS donor loads the 2022 release (manifest ACS_2022) and is not aged "
"to the model year; reconcile when ACS moves to the 2024 release."
),
),
# SIPP/SCF donors aged from their latest releases to 2024.
sipp=Release(release=2023, income_year=2023, age_to=2024, factors="pe_growfactors"),
scf=Release(release=2022, income_year=2022, age_to=2024, factors="pe_growfactors"),
)


PROFILES: dict[str, DatasetProfile] = {MP_2024.name: MP_2024}


def get_profile(name: str) -> DatasetProfile:
"""Return the named dataset profile, or raise with the known names."""
try:
return PROFILES[name]
except KeyError:
known = ", ".join(sorted(PROFILES))
raise KeyError(
f"Unknown dataset profile {name!r}; known profiles: {known}"
) from None
38 changes: 37 additions & 1 deletion tests/pipelines/test_pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund
SOCIAL_SECURITY_SPLIT_STRATEGY_PE_QRF
)
assert isinstance(providers[2], ACSSourceProvider)
assert providers[2].year == 2024
# ACS is pinned to its 2022 release (manifest ACS_2022, not aged); the default
# derives from MP_2024.acs.release. The prior 2024 here disagreed with the
# loader -- see the declared ACS gap in the vintage profile.
assert providers[2].year == 2022
assert isinstance(providers[3], SIPPSourceProvider)
assert providers[3].block == "tips"
assert providers[3].target_year == 2024
Expand Down Expand Up @@ -203,3 +206,36 @@ def test_build_policyengine_us_data_rebuild_pipeline_returns_configured_pipeline
assert pipeline.config.calibration_max_iter == 77
assert pipeline.config.synthesis_backend == "seed"
assert pipeline.config.calibration_backend == "entropy"


def test_source_provider_year_defaults_derive_from_mp_2024_profile() -> None:
# Year defaults come from the single-source-of-truth vintage profile, so a
# stale literal cannot silently return. The CPS spine default is the profile's
# ASEC release (2025 = income year 2024), not the 2023 that used to require a
# CLI override to be correct.
import inspect

from microplex_us.vintages import MP_2024

params = inspect.signature(
default_policyengine_us_data_rebuild_source_providers
).parameters
assert params["cps_source_year"].default == MP_2024.cps_asec.release == 2025
assert params["acs_year"].default == MP_2024.acs.release
assert params["sipp_year"].default == MP_2024.sipp.release
assert params["scf_year"].default == MP_2024.scf.release
assert params["puf_target_year"].default == MP_2024.model_year

# The checkpoint signature is the second of the three sites the stale literal
# used to live in; guard it too so a revert there cannot pass silently.
from microplex_us.pipelines.pe_us_data_rebuild_checkpoint import (
run_policyengine_us_data_rebuild_checkpoint,
)

checkpoint_params = inspect.signature(
run_policyengine_us_data_rebuild_checkpoint
).parameters
assert checkpoint_params["cps_source_year"].default == MP_2024.cps_asec.release
assert checkpoint_params["acs_year"].default == MP_2024.acs.release
assert checkpoint_params["sipp_year"].default == MP_2024.sipp.release
assert checkpoint_params["scf_year"].default == MP_2024.scf.release
Loading
Loading