Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/microplex_us/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
"PolicyEngineUSTargetEvaluation",
"PolicyEngineUSTargetEvaluationReport",
"PolicyEngineUSVariableBinding",
"build_policyengine_us_export_column_names",
"build_policyengine_us_time_period_arrays",
"compare_policyengine_us_target_query_to_baseline",
"compile_policyengine_us_household_linear_constraints",
Expand Down Expand Up @@ -346,6 +347,7 @@ def __getattr__(name: str) -> Any:
"PolicyEngineUSTargetEvaluation",
"PolicyEngineUSTargetEvaluationReport",
"PolicyEngineUSVariableBinding",
"build_policyengine_us_export_column_names",
"build_policyengine_us_time_period_arrays",
"compare_policyengine_us_target_query_to_baseline",
"compile_policyengine_us_household_linear_constraints",
Expand Down
61 changes: 59 additions & 2 deletions src/microplex_us/pipelines/check_export_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
python -m microplex_us.pipelines.check_export_columns export.h5
python -m microplex_us.pipelines.check_export_columns \\
--columns-json columns.json
python -m microplex_us.pipelines.check_export_columns \\
--entity-tables checkpoints/post-imputation
python -m microplex_us.pipelines.check_export_columns export.h5 \\
--contract custom_contract.json

Expand Down Expand Up @@ -149,6 +151,29 @@ def _columns_from_json(json_path: Path) -> set[str]:
return {str(name).split("/")[0] for name in names}


def _columns_from_entity_tables(
entity_tables_path: Path,
*,
direct_override_variables: tuple[str, ...] = (),
) -> set[str]:
"""Return export column names from a saved PE entity-table checkpoint.

This is the pre-calibration path: post-imputation entity tables already
determine the final H5 schema, while calibration only changes weights.
Imports stay deferred so the JSON/H5 fast paths do not import Microplex.
"""
from microplex_us.policyengine.us import (
build_policyengine_us_export_column_names,
load_us_pipeline_checkpoint,
)

tables, _metadata = load_us_pipeline_checkpoint(entity_tables_path)
return build_policyengine_us_export_column_names(
tables,
direct_override_variables=direct_override_variables,
)


def _bullet_lines(items: list[str]) -> list[str]:
"""Render a list as indented bullets, or a placeholder if empty."""
if not items:
Expand Down Expand Up @@ -206,6 +231,25 @@ def main(argv: list[str] | None = None) -> int:
"H5 (the no-data CI path). Mutually exclusive with h5path."
),
)
parser.add_argument(
"--entity-tables",
metavar="DIR",
help=(
"Path to a saved PolicyEngine entity-table checkpoint/stage "
"directory (for example checkpoints/post-imputation). Checks "
"the export schema before microsimulation/calibration/H5."
),
)
parser.add_argument(
"--direct-override-variable",
action="append",
default=[],
metavar="VARIABLE",
help=(
"PolicyEngine formula variable intentionally exported from source "
"data. Repeat for each override used by the build."
),
)
parser.add_argument(
"--contract",
metavar="FILE",
Expand All @@ -214,8 +258,15 @@ def main(argv: list[str] | None = None) -> int:
)
args = parser.parse_args(argv)

if bool(args.h5path) == bool(args.columns_json):
parser.error("provide exactly one of an H5 path or --columns-json.")
selected_inputs = [
bool(args.h5path),
bool(args.columns_json),
bool(args.entity_tables),
]
if sum(selected_inputs) != 1:
parser.error(
"provide exactly one of an H5 path, --columns-json, or --entity-tables."
)

contract = load_contract(Path(args.contract))
required = set(contract["required"])
Expand All @@ -226,6 +277,12 @@ def main(argv: list[str] | None = None) -> int:
if args.columns_json:
source = args.columns_json
present = _columns_from_json(Path(args.columns_json))
elif args.entity_tables:
source = args.entity_tables
present = _columns_from_entity_tables(
Path(args.entity_tables),
direct_override_variables=tuple(args.direct_override_variable),
)
else:
source = args.h5path
present = _columns_from_h5(Path(args.h5path))
Expand Down
27 changes: 24 additions & 3 deletions src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,15 +1111,20 @@ def _attach_checkpoint_registry_and_index(
run_registry_metadata: dict[str, Any] | None,
) -> tuple[Path | None, Path | None]:
if (
manifest.get("calibration", {}).get("full_oracle_capped_mean_abs_relative_error")
manifest.get("calibration", {}).get(
"full_oracle_capped_mean_abs_relative_error"
)
is None
and manifest.get("calibration", {}).get("full_oracle_mean_abs_relative_error")
is None
and "policyengine_harness" not in manifest
and "policyengine_native_scores" not in manifest
):
return None, None
if "policyengine_harness" not in manifest and "policyengine_native_scores" not in manifest:
if (
"policyengine_harness" not in manifest
and "policyengine_native_scores" not in manifest
):
resolved_harness_payload = None
else:
resolved_harness_payload = (
Expand Down Expand Up @@ -1637,7 +1642,9 @@ def attach_policyengine_us_data_rebuild_checkpoint_evidence(
_refresh_checkpoint_data_flow_snapshot(
artifact_root,
manifest,
extra_outputs=(native_audit_path.name,) if native_audit_path is not None else (),
extra_outputs=(native_audit_path.name,)
if native_audit_path is not None
else (),
)
_write_json_atomically(manifest_path, manifest)
return PEUSDataRebuildCheckpointEvidenceResult(
Expand Down Expand Up @@ -2176,6 +2183,16 @@ def main(argv: list[str] | None = None) -> None:
"to skip the ~11 h synthesis stage."
),
)
parser.add_argument(
"--policyengine-export-column-contract-path",
type=str,
default=None,
help=(
"If set, check the eCPS export-column contract from the "
"post-imputation PE entity tables before microsimulation and "
"calibration."
),
)
parser.add_argument(
"--pipeline-checkpoint-save-post-microsim-path",
type=str,
Expand Down Expand Up @@ -2223,6 +2240,10 @@ def main(argv: list[str] | None = None) -> None:
config_overrides["pipeline_checkpoint_save_post_imputation_path"] = (
args.pipeline_checkpoint_save_post_imputation_path
)
if args.policyengine_export_column_contract_path is not None:
config_overrides["policyengine_export_column_contract_path"] = (
args.policyengine_export_column_contract_path
)
if args.pipeline_checkpoint_save_post_microsim_path is not None:
config_overrides["pipeline_checkpoint_save_post_microsim_path"] = (
args.pipeline_checkpoint_save_post_microsim_path
Expand Down
80 changes: 72 additions & 8 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
PESourceImputeBlockRunRequest,
PESourceImputeConditionedBlockRunRequest,
)
from microplex_us.pipelines.check_export_columns import (
_format_report as _format_export_column_report,
)
from microplex_us.pipelines.check_export_columns import (
compute_column_diff,
load_contract,
)
from microplex_us.pipelines.donor_imputers import (
ColumnwiseQRFDonorImputer,
RegimeAwareDonorImputer,
Expand All @@ -83,6 +90,7 @@
PolicyEngineUSMicrosimulationAdapter,
PolicyEngineUSQuantityTarget,
PolicyEngineUSVariableBinding,
build_policyengine_us_export_column_names,
build_policyengine_us_export_variable_maps,
build_policyengine_us_time_period_arrays,
compile_supported_policyengine_us_household_linear_constraints,
Expand Down Expand Up @@ -136,10 +144,11 @@ def _normalize_household_county_fips_series(
& state_numeric.notna()
& state_numeric.gt(0)
)
combined.loc[county_fragment_mask] = (
state_numeric.loc[county_fragment_mask].round().astype(int) * 1000
+ county_numeric.loc[county_fragment_mask].round().astype(int)
)
combined.loc[county_fragment_mask] = state_numeric.loc[
county_fragment_mask
].round().astype(int) * 1000 + county_numeric.loc[
county_fragment_mask
].round().astype(int)
normalized = combined.round().astype("Int64").astype("string").str.zfill(5)
invalid = combined.isna() | combined.le(0)
return normalized.mask(invalid).astype("string")
Expand Down Expand Up @@ -206,7 +215,9 @@ def _attach_household_census_geographies(
assigned_blocks = pd.Series(pd.NA, index=result.index, dtype="string")
state_values = _normalize_household_state_fips_series(result["state_fips"])
county_values = (
_normalize_household_county_fips_series(result["county_fips"], result["state_fips"])
_normalize_household_county_fips_series(
result["county_fips"], result["state_fips"]
)
if "county_fips" in result.columns
else pd.Series(pd.NA, index=result.index, dtype="string")
)
Expand Down Expand Up @@ -1697,6 +1708,14 @@ class USMicroplexBuildConfig:
policyengine_baseline_dataset: str | None = None
policyengine_dataset_year: int | None = None
policyengine_direct_override_variables: tuple[str, ...] = ()
policyengine_export_column_contract_path: str | Path | None = None
"""Optional eCPS export-column contract checked before calibration.

When set, the pipeline verifies the final H5 column surface from the
post-imputation PE entity tables, then fails before microsimulation or
calibration if required columns are missing or forbidden columns would be
exported.
"""
policyengine_prefer_existing_tax_unit_ids: bool = True
policyengine_quantity_targets: tuple[PolicyEngineUSQuantityTarget, ...] = ()
policyengine_targets_db: str | None = None
Expand Down Expand Up @@ -2120,6 +2139,10 @@ def build_from_frames(
"US microplex build: post-imputation checkpoint saved",
path=str(self.config.pipeline_checkpoint_save_post_imputation_path),
)
self._check_policyengine_export_column_contract(
synthetic_tables,
stage="pre_calibration",
)
_emit_us_pipeline_progress(
"US microplex build: policyengine calibration start",
backend=self.config.calibration_backend,
Expand Down Expand Up @@ -2365,9 +2388,7 @@ def prepare_seed_data_from_source(
seed_data["tenure"] = seed_data["tenure"].fillna(0).astype(int)
seed_data["state_fips"] = seed_data["state_fips"].fillna(0).astype(int)
seed_data["county_fips"] = (
seed_data["county_fips"]
.map(normalize_us_county_fips)
.fillna("00000")
seed_data["county_fips"].map(normalize_us_county_fips).fillna("00000")
)
if "block_geoid" in seed_data.columns:
seed_data["block_geoid"] = seed_data["block_geoid"].fillna("").astype(str)
Expand Down Expand Up @@ -3918,6 +3939,49 @@ def _append_stage_summary(
warnings.warn(message, stacklevel=2)
return updated_tables, calibrated_persons, summary

def _check_policyengine_export_column_contract(
self,
tables: PolicyEngineUSEntityTableBundle,
*,
stage: str,
) -> None:
contract_path = self.config.policyengine_export_column_contract_path
if contract_path is None:
return

tax_benefit_system = self._resolve_policyengine_tax_benefit_system()
contract = load_contract(Path(contract_path))
present = build_policyengine_us_export_column_names(
tables,
tax_benefit_system=tax_benefit_system,
direct_override_variables=self.config.policyengine_direct_override_variables,
)
diff = compute_column_diff(
present,
required=set(contract["required"]),
forbidden=set(contract["forbidden"]),
optional=set(contract["ecps_internal_optional"]),
excluded=set(contract.get("formula_owned_excluded", [])),
)
_emit_us_pipeline_progress(
"US microplex build: policyengine export columns check complete",
stage=stage,
status="pass" if diff.ok else "fail",
columns_present=int(len(present)),
missing_required=int(len(diff.missing_required)),
forbidden_present=int(len(diff.forbidden_present)),
)
if diff.ok:
return
report = _format_export_column_report(
diff,
source=f"{stage}:{contract_path}",
n_present=len(present),
n_required=len(contract["required"]),
n_forbidden=len(contract["forbidden"]),
)
raise ValueError(report)

def _build_forbes_fixed_spine(self) -> ForbesFixedSpine | None:
path = self.config.forbes_fixed_spine_records_path
if path is None:
Expand Down
2 changes: 2 additions & 0 deletions src/microplex_us/policyengine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
PolicyEngineUSMicrosimulationAdapter,
PolicyEngineUSQuantityTarget,
PolicyEngineUSVariableBinding,
build_policyengine_us_export_column_names,
build_policyengine_us_time_period_arrays,
compile_policyengine_us_household_linear_constraints,
compute_policyengine_us_definition_hash,
Expand Down Expand Up @@ -72,6 +73,7 @@
"PolicyEngineUSMicrosimulationAdapter",
"PolicyEngineUSQuantityTarget",
"PolicyEngineUSVariableBinding",
"build_policyengine_us_export_column_names",
"build_policyengine_us_time_period_arrays",
"compile_policyengine_us_household_linear_constraints",
"compute_policyengine_us_definition_hash",
Expand Down
Loading
Loading