From 2a4fd5d66fae39628c99f5953bbc2cf9a94255d5 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Mon, 1 Jun 2026 21:16:05 -0400 Subject: [PATCH 1/3] Publish PE-native target diagnostics sidecar --- src/microplex_us/pipelines/__init__.py | 1 + src/microplex_us/pipelines/artifacts.py | 2 + .../pipelines/backfill_pe_native_audit.py | 31 +++- src/microplex_us/pipelines/experiments.py | 15 ++ .../pipelines/pe_native_scores.py | 117 +++++++++++- .../pe_us_data_rebuild_checkpoint.py | 174 +++++++++++++++++- src/microplex_us/pipelines/stage_contracts.py | 20 ++ src/microplex_us/pipelines/stage_run.py | 9 + src/microplex_us/pipelines/stage_status.py | 1 + .../pipelines/stage_validation_evidence.py | 1 + .../test_backfill_pe_native_audit.py | 37 ++++ tests/pipelines/test_pe_native_scores.py | 52 ++++++ .../test_pe_us_data_rebuild_checkpoint.py | 51 +++++ 13 files changed, 497 insertions(+), 14 deletions(-) diff --git a/src/microplex_us/pipelines/__init__.py b/src/microplex_us/pipelines/__init__.py index 353faa9..057de2a 100644 --- a/src/microplex_us/pipelines/__init__.py +++ b/src/microplex_us/pipelines/__init__.py @@ -120,6 +120,7 @@ def _exports(module: str, names: tuple[str, ...]) -> dict[str, str]: "microplex_us.pipelines.pe_native_scores", ( "PolicyEngineUSEnhancedCPSNativeScores", + "build_us_pe_native_target_diagnostics_payload", "compare_us_pe_native_target_deltas", "compute_batch_us_pe_native_scores", "compute_policyengine_us_enhanced_cps_native_scores", diff --git a/src/microplex_us/pipelines/artifacts.py b/src/microplex_us/pipelines/artifacts.py index 39c2871..61af296 100644 --- a/src/microplex_us/pipelines/artifacts.py +++ b/src/microplex_us/pipelines/artifacts.py @@ -98,6 +98,7 @@ class USMicroplexArtifactPaths: policyengine_harness: Path | None = None policyengine_native_scores: Path | None = None policyengine_native_audit: Path | None = None + policyengine_native_target_diagnostics: Path | None = None child_tax_unit_agi_drift: Path | None = None capital_gains_lots: Path | None = None source_weight_diagnostics: Path | None = None @@ -1286,6 +1287,7 @@ def save_us_microplex_artifacts( policyengine_harness=policyengine_harness_path, policyengine_native_scores=policyengine_native_scores_path, policyengine_native_audit=None, + policyengine_native_target_diagnostics=None, child_tax_unit_agi_drift=child_tax_unit_agi_drift_path, capital_gains_lots=capital_gains_lots_path, source_weight_diagnostics=source_weight_diagnostics_path, diff --git a/src/microplex_us/pipelines/backfill_pe_native_audit.py b/src/microplex_us/pipelines/backfill_pe_native_audit.py index ba549c8..9f343de 100644 --- a/src/microplex_us/pipelines/backfill_pe_native_audit.py +++ b/src/microplex_us/pipelines/backfill_pe_native_audit.py @@ -13,6 +13,7 @@ discover_us_candidate_artifact_dirs, ) from microplex_us.pipelines.pe_native_scores import ( + build_us_pe_native_target_diagnostics_payload, compute_batch_us_pe_native_support_audits, compute_batch_us_pe_native_target_deltas, ) @@ -245,10 +246,38 @@ def _write_native_audit_payload_to_bundle( artifacts["policyengine_native_audit"] = str( native_audit_path.relative_to(bundle_dir) ) + extra_outputs = [native_audit_path.name] + target_delta_payload = payload.get("targetDelta") + if isinstance(target_delta_payload, dict): + target_diagnostics = build_us_pe_native_target_diagnostics_payload( + period=int(payload.get("period") or 2024), + from_label="policyengine-us-data", + to_label="microplex-us", + policyengine_targets_db_path=dict(manifest.get("config", {})).get( + "policyengine_targets_db" + ), + target_delta_payload=target_delta_payload, + ) + target_diagnostics_path = resolve_us_stage_artifact_contract_path( + bundle_dir, + "09_validation_benchmarking", + "policyengine_native_target_diagnostics", + ) + target_diagnostics_path.write_text( + json.dumps(target_diagnostics, indent=2, sort_keys=True) + ) + artifacts["policyengine_native_target_diagnostics"] = str( + target_diagnostics_path.relative_to(bundle_dir) + ) + extra_outputs.append(target_diagnostics_path.name) manifest["artifacts"] = artifacts manifest["policyengine_native_audit"] = dict(payload.get("verdictHints", {})) - _refresh_checkpoint_data_flow_snapshot(bundle_dir, manifest) + _refresh_checkpoint_data_flow_snapshot( + bundle_dir, + manifest, + extra_outputs=tuple(extra_outputs), + ) assert_valid_benchmark_artifact_manifest( manifest, artifact_dir=bundle_dir, diff --git a/src/microplex_us/pipelines/experiments.py b/src/microplex_us/pipelines/experiments.py index 4436475..b4ba6ae 100644 --- a/src/microplex_us/pipelines/experiments.py +++ b/src/microplex_us/pipelines/experiments.py @@ -292,6 +292,11 @@ def to_dict(self) -> dict[str, Any]: if self.artifact_paths.policyengine_native_audit is not None else None ), + "policyengine_native_target_diagnostics": ( + str(self.artifact_paths.policyengine_native_target_diagnostics) + if self.artifact_paths.policyengine_native_target_diagnostics is not None + else None + ), "capital_gains_lots": ( str(self.artifact_paths.capital_gains_lots) if self.artifact_paths.capital_gains_lots is not None @@ -380,6 +385,12 @@ def from_dict(cls, payload: dict[str, Any]) -> USMicroplexExperimentResult: if artifact_paths.get("policyengine_native_audit") is not None else None ), + policyengine_native_target_diagnostics=( + Path(artifact_paths["policyengine_native_target_diagnostics"]) + if artifact_paths.get("policyengine_native_target_diagnostics") + is not None + else None + ), capital_gains_lots=( Path(artifact_paths["capital_gains_lots"]) if artifact_paths.get("capital_gains_lots") is not None @@ -819,6 +830,10 @@ def _refresh_experiment_artifact_paths( artifact_root, artifacts.get("policyengine_native_audit"), ), + policyengine_native_target_diagnostics=_resolve_optional_result_artifact_path( + artifact_root, + artifacts.get("policyengine_native_target_diagnostics"), + ), run_registry=Path(run_registry_path), run_index_db=run_index_path, ) diff --git a/src/microplex_us/pipelines/pe_native_scores.py b/src/microplex_us/pipelines/pe_native_scores.py index 21a8df4..90ddb7e 100644 --- a/src/microplex_us/pipelines/pe_native_scores.py +++ b/src/microplex_us/pipelines/pe_native_scores.py @@ -3213,19 +3213,67 @@ def write_us_pe_native_target_diagnostics( ) -> Path: """Write the full PE-native per-target diagnostic dataset to disk.""" - payload = compare_us_pe_native_target_deltas( + payload = build_us_pe_native_target_diagnostics_payload( from_dataset_path=from_dataset_path, to_dataset_path=to_dataset_path, period=period, top_k=top_k, + from_label=from_label, + to_label=to_label, policyengine_us_data_repo=policyengine_us_data_repo, policyengine_us_data_python=policyengine_us_data_python, + policyengine_targets_db_path=policyengine_targets_db_path, + ) + destination = Path(output_path) + destination.parent.mkdir(parents=True, exist_ok=True) + destination.write_text(json.dumps(payload, indent=2, sort_keys=True)) + return destination + + +def build_us_pe_native_target_diagnostics_payload( + *, + from_dataset_path: str | Path | None = None, + to_dataset_path: str | Path | None = None, + period: int = 2024, + top_k: int = 50, + from_label: str = "policyengine-us-data", + to_label: str = "microplex-us", + policyengine_us_data_repo: str | Path | None = None, + policyengine_us_data_python: str | Path | None = None, + policyengine_targets_db_path: str | Path | None = None, + target_delta_payload: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build the full PE-native per-target diagnostic payload. + + When ``target_delta_payload`` is supplied, the caller is responsible for + ensuring it compares the same baseline/candidate datasets and period. + """ + + payload = ( + dict(target_delta_payload) + if target_delta_payload is not None + else compare_us_pe_native_target_deltas( + from_dataset_path=_required_dataset_path( + from_dataset_path, + label="from_dataset_path", + ), + to_dataset_path=_required_dataset_path( + to_dataset_path, + label="to_dataset_path", + ), + period=period, + top_k=top_k, + policyengine_us_data_repo=policyengine_us_data_repo, + policyengine_us_data_python=policyengine_us_data_python, + ) ) payload["diagnostic_schema_version"] = 1 payload["dataset_labels"] = { "from": from_label, "to": to_label, } + payload.setdefault("baseline_dataset", payload.get("from_dataset")) + payload.setdefault("candidate_dataset", payload.get("to_dataset")) target_db_path = ( Path(policyengine_targets_db_path).expanduser() if policyengine_targets_db_path is not None @@ -3236,10 +3284,69 @@ def write_us_pe_native_target_diagnostics( target_db_path=target_db_path, period=period, ) - destination = Path(output_path) - destination.parent.mkdir(parents=True, exist_ok=True) - destination.write_text(json.dumps(payload, indent=2, sort_keys=True)) - return destination + _add_policyengine_target_diagnostic_aliases(payload) + return payload + + +def _required_dataset_path(value: str | Path | None, *, label: str) -> str | Path: + if value is None: + raise ValueError( + f"{label} is required when target_delta_payload is not supplied" + ) + return value + + +def _add_policyengine_target_diagnostic_aliases(payload: dict[str, Any]) -> None: + """Add dashboard-friendly aliases while preserving the native delta schema.""" + + baseline_dataset = payload.get("from_dataset") + candidate_dataset = payload.get("to_dataset") + baseline_label = payload.get("dataset_labels", {}).get("from") + candidate_label = payload.get("dataset_labels", {}).get("to") + for row in payload.get("targets", ()): + if not isinstance(row, dict): + continue + target_value = row.get("target_value") + from_estimate = row.get("from_estimate") + to_estimate = row.get("to_estimate") + from_absolute_error = _absolute_error_or_none(from_estimate, target_value) + to_absolute_error = _absolute_error_or_none(to_estimate, target_value) + row.setdefault("target_id", row.get("policyengine_target_id") or row.get("target_name")) + row.setdefault("baseline_dataset", baseline_dataset) + row.setdefault("candidate_dataset", candidate_dataset) + row.setdefault("baseline_label", baseline_label) + row.setdefault("candidate_label", candidate_label) + row.setdefault("us_data_aggregate", from_estimate) + row.setdefault("microplex_aggregate", to_estimate) + row.setdefault("us_data_absolute_error", from_absolute_error) + row.setdefault("microplex_absolute_error", to_absolute_error) + row.setdefault("us_data_relative_error", row.get("from_rel_error")) + row.setdefault("microplex_relative_error", row.get("to_rel_error")) + if from_absolute_error is not None and to_absolute_error is not None: + row.setdefault( + "delta_absolute_error", + to_absolute_error - from_absolute_error, + ) + row.setdefault( + "delta_relative_error", + _delta_or_none(row.get("to_rel_error"), row.get("from_rel_error")), + ) + row.setdefault("loss_contribution", row.get("to_weighted_term")) + row.setdefault("family", row.get("target_family")) + row.setdefault("in_loss", True) + row.setdefault("supported_by_microplex", True) + + +def _absolute_error_or_none(value: Any, target: Any) -> float | None: + if value is None or target is None: + return None + return abs(float(value) - float(target)) + + +def _delta_or_none(value: Any, baseline: Any) -> float | None: + if value is None or baseline is None: + return None + return float(value) - float(baseline) def main(argv: list[str] | None = None) -> int: diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py index 8c7e681..faa6875 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py @@ -54,8 +54,10 @@ select_us_microplex_frontier_entry, ) from microplex_us.pipelines.stage_contracts import ( + canonicalize_us_pipeline_stage_id, resolve_us_stage_artifact_contract_path, ) +from microplex_us.pipelines.stage_metrics import stage_metrics from microplex_us.pipelines.stage_run import ( USStageInputOverride, parse_us_stage_input_override, @@ -114,6 +116,8 @@ class PEUSDataRebuildCheckpointResult: parity_payload: dict[str, Any] native_audit_path: Path | None = None native_audit_payload: dict[str, Any] | None = None + native_target_diagnostics_path: Path | None = None + native_target_diagnostics_payload: dict[str, Any] | None = None imputation_ablation_path: Path | None = None imputation_ablation_payload: dict[str, Any] | None = None @@ -130,6 +134,8 @@ class PEUSDataRebuildCheckpointEvidenceResult: parity_payload: dict[str, Any] native_audit_path: Path | None = None native_audit_payload: dict[str, Any] | None = None + native_target_diagnostics_path: Path | None = None + native_target_diagnostics_payload: dict[str, Any] | None = None imputation_ablation_path: Path | None = None imputation_ablation_payload: dict[str, Any] | None = None @@ -1016,10 +1022,20 @@ def _refresh_checkpoint_data_flow_snapshot( "checkpoint_extra_outputs", list(extra_outputs), ) - updated_manifest = write_us_stage_run_manifests_from_artifact_manifest( - artifact_root, - manifest, - ) + try: + updated_manifest = write_us_stage_run_manifests_from_artifact_manifest( + artifact_root, + manifest, + ) + except ValueError as exc: + manifest.setdefault("diagnostics", {})["checkpoint_stage_refresh_error"] = ( + f"{type(exc).__name__}: {exc}" + ) + return _patch_checkpoint_data_flow_snapshot_outputs( + artifact_root, + manifest=manifest, + extra_outputs=extra_outputs, + ) manifest.clear() manifest.update(updated_manifest) snapshot_path = resolve_us_stage_artifact_contract_path( @@ -1027,9 +1043,89 @@ def _refresh_checkpoint_data_flow_snapshot( "08_dataset_assembly", "data_flow_snapshot", ) + if extra_outputs: + return _patch_checkpoint_data_flow_snapshot_outputs( + artifact_root, + manifest=manifest, + extra_outputs=extra_outputs, + ) return snapshot_path if snapshot_path.exists() else None +def _patch_checkpoint_data_flow_snapshot_outputs( + artifact_root: Path, + *, + manifest: dict[str, Any], + extra_outputs: tuple[str, ...], +) -> Path | None: + snapshot_path = resolve_us_stage_artifact_contract_path( + artifact_root, + "08_dataset_assembly", + "data_flow_snapshot", + ) + if not snapshot_path.exists(): + return None + snapshot = json.loads(snapshot_path.read_text()) + stages = snapshot.get("stages") + if not isinstance(stages, list): + return snapshot_path + validation_stage = None + for stage in stages: + if not isinstance(stage, dict): + continue + stage_id = str(stage.get("id", "")) + if canonicalize_us_pipeline_stage_id(stage_id) == "09_validation_benchmarking": + validation_stage = stage + stage["id"] = "09_validation_benchmarking" + break + if validation_stage is None: + validation_stage = { + "id": "09_validation_benchmarking", + "outputs": [], + "metrics": [], + "status": "ready", + } + stages.append(validation_stage) + existing_outputs = list(validation_stage.get("outputs") or ()) + validation_stage["outputs"] = list( + dict.fromkeys( + [ + *existing_outputs, + *_checkpoint_validation_output_names(manifest), + *extra_outputs, + ] + ) + ) + if not validation_stage.get("metrics"): + validation_stage["metrics"] = stage_metrics( + "09_validation_benchmarking", + manifest=manifest, + ) + if extra_outputs: + validation_stage["status"] = "ready" + else: + validation_stage.setdefault("status", "ready") + _write_json_atomically(snapshot_path, snapshot) + return snapshot_path + + +def _checkpoint_validation_output_names(manifest: dict[str, Any]) -> tuple[str, ...]: + artifacts = dict(manifest.get("artifacts", {})) + ordered_keys = ( + "policyengine_harness", + "policyengine_native_scores", + "imputation_ablation", + "policyengine_native_audit", + "policyengine_native_target_diagnostics", + "child_tax_unit_agi_drift", + ) + return tuple( + str(artifacts[key]) + for key in ordered_keys + if isinstance(artifacts.get(key), str) + ) + + def _attach_checkpoint_registry_and_index( artifact_root: Path, manifest: dict[str, Any], @@ -1250,6 +1346,12 @@ def _load_checkpoint_versioned_artifacts( "policyengine_native_audit", stage_id="09_validation_benchmarking", ), + policyengine_native_target_diagnostics=_resolve_saved_stage_artifact_path( + artifact_root, + artifacts, + "policyengine_native_target_diagnostics", + stage_id="09_validation_benchmarking", + ), child_tax_unit_agi_drift=_resolve_saved_stage_artifact_path( artifact_root, artifacts, @@ -1432,7 +1534,10 @@ def attach_policyengine_us_data_rebuild_checkpoint_evidence( ) -> PEUSDataRebuildCheckpointEvidenceResult: """Attach PE comparison evidence to an already-saved rebuild artifact.""" - from microplex_us.pipelines.pe_native_scores import compute_us_pe_native_scores + from microplex_us.pipelines.pe_native_scores import ( + build_us_pe_native_target_diagnostics_payload, + compute_us_pe_native_scores, + ) from microplex_us.policyengine.harness import evaluate_policyengine_us_harness from microplex_us.policyengine.us import load_policyengine_us_entity_tables @@ -1636,6 +1741,8 @@ def attach_policyengine_us_data_rebuild_checkpoint_evidence( ) native_audit_path: Path | None = None native_audit_payload: dict[str, Any] | None = None + native_target_diagnostics_path: Path | None = None + native_target_diagnostics_payload: dict[str, Any] | None = None if compute_native_audit and artifacts.get("policyengine_native_scores") is not None: native_audit_payload = build_policyengine_us_data_rebuild_native_audit( artifact_root, @@ -1655,13 +1762,47 @@ def attach_policyengine_us_data_rebuild_checkpoint_evidence( manifest["policyengine_native_audit"] = dict( native_audit_payload.get("verdictHints", {}) ) + target_delta_payload = native_audit_payload.get("targetDelta") + if isinstance(target_delta_payload, dict): + native_target_diagnostics_payload = ( + build_us_pe_native_target_diagnostics_payload( + period=( + config.get("policyengine_dataset_year") + or config.get("policyengine_target_period") + or 2024 + ), + from_label="policyengine-us-data", + to_label="microplex-us", + policyengine_us_data_repo=policyengine_us_data_repo, + policyengine_us_data_python=policyengine_us_data_python, + policyengine_targets_db_path=config.get("policyengine_targets_db"), + target_delta_payload=target_delta_payload, + ) + ) + native_target_diagnostics_path = resolve_us_stage_artifact_contract_path( + artifact_root, + "09_validation_benchmarking", + "policyengine_native_target_diagnostics", + ) + _write_json_atomically( + native_target_diagnostics_path, + native_target_diagnostics_payload, + ) + artifacts["policyengine_native_target_diagnostics"] = ( + native_target_diagnostics_path.name + ) manifest["artifacts"] = artifacts _refresh_checkpoint_data_flow_snapshot( artifact_root, manifest, - extra_outputs=(native_audit_path.name,) - if native_audit_path is not None - else (), + extra_outputs=tuple( + path.name + for path in ( + native_audit_path, + native_target_diagnostics_path, + ) + if path is not None + ), ) _write_json_atomically(manifest_path, manifest) return PEUSDataRebuildCheckpointEvidenceResult( @@ -1673,6 +1814,8 @@ def attach_policyengine_us_data_rebuild_checkpoint_evidence( parity_payload=parity_payload, native_audit_path=native_audit_path, native_audit_payload=native_audit_payload, + native_target_diagnostics_path=native_target_diagnostics_path, + native_target_diagnostics_payload=native_target_diagnostics_payload, imputation_ablation_path=imputation_ablation_path, imputation_ablation_payload=imputation_ablation_payload, ) @@ -2045,6 +2188,11 @@ def run_policyengine_us_data_rebuild_checkpoint( "PE-US-data rebuild checkpoint: evidence complete", parity_path=evidence.parity_path, native_audit_path=evidence.native_audit_path, + native_target_diagnostics_path=getattr( + evidence, + "native_target_diagnostics_path", + None, + ), imputation_ablation_path=evidence.imputation_ablation_path, ) refreshed_artifacts = _load_checkpoint_versioned_artifacts( @@ -2065,6 +2213,16 @@ def run_policyengine_us_data_rebuild_checkpoint( parity_payload=evidence.parity_payload, native_audit_path=evidence.native_audit_path, native_audit_payload=evidence.native_audit_payload, + native_target_diagnostics_path=getattr( + evidence, + "native_target_diagnostics_path", + None, + ), + native_target_diagnostics_payload=getattr( + evidence, + "native_target_diagnostics_payload", + None, + ), imputation_ablation_path=evidence.imputation_ablation_path, imputation_ablation_payload=evidence.imputation_ablation_payload, ) diff --git a/src/microplex_us/pipelines/stage_contracts.py b/src/microplex_us/pipelines/stage_contracts.py index d9fdc5d..d4bee0c 100644 --- a/src/microplex_us/pipelines/stage_contracts.py +++ b/src/microplex_us/pipelines/stage_contracts.py @@ -830,6 +830,12 @@ def default_us_pipeline_stage_contracts() -> tuple[USPipelineStageContract, ...] "Stage-local calibration summary.", stage_id="07_calibration", ), + _stage_output_resource( + "target_ledger", + "Calibration target ledger summary.", + stage_id="07_calibration", + required=False, + ), ), artifacts=( USStageArtifactContract( @@ -1071,6 +1077,12 @@ def default_us_pipeline_stage_contracts() -> tuple[USPipelineStageContract, ...] stage_id="09_validation_benchmarking", required=False, ), + _artifact_resource( + "policyengine_native_target_diagnostics", + "Full PE-US-data native per-target diagnostics payload.", + stage_id="09_validation_benchmarking", + required=False, + ), ), artifacts=( USStageArtifactContract( @@ -1097,6 +1109,14 @@ def default_us_pipeline_stage_contracts() -> tuple[USPipelineStageContract, ...] format="json", hash_mode="file_sha256", ), + USStageArtifactContract( + key="policyengine_native_target_diagnostics", + description="Full PE-US-data native per-target diagnostics payload.", + path_hint="pe_native_target_diagnostics.json", + resume_role="diagnostic", + format="json", + hash_mode="file_sha256", + ), USStageArtifactContract( key="imputation_ablation", description="Imputation ablation benchmark payload.", diff --git a/src/microplex_us/pipelines/stage_run.py b/src/microplex_us/pipelines/stage_run.py index 5f2ee44..e558c65 100644 --- a/src/microplex_us/pipelines/stage_run.py +++ b/src/microplex_us/pipelines/stage_run.py @@ -346,6 +346,7 @@ class USValidationBenchmarkingOutputs(USStageOutputManifest): policyengine_harness: USArtifactRef | None = None policyengine_native_scores: USArtifactRef | None = None policyengine_native_audit: USArtifactRef | None = None + policyengine_native_target_diagnostics: USArtifactRef | None = None imputation_ablation: USArtifactRef | None = None child_tax_unit_agi_drift: USArtifactRef | None = None @@ -999,6 +1000,13 @@ def build_us_stage_output_manifests_from_artifact_manifest( "09_validation_benchmarking", category="diagnostic", ), + policyengine_native_target_diagnostics=_artifact_ref( + root, + artifacts, + "policyengine_native_target_diagnostics", + "09_validation_benchmarking", + category="diagnostic", + ), imputation_ablation=_artifact_ref( root, artifacts, @@ -1323,6 +1331,7 @@ def _manifest_benchmark_summary(manifest: Mapping[str, Any]) -> dict[str, Any]: "policyengine_harness", "policyengine_native_scores", "policyengine_native_audit", + "policyengine_native_target_diagnostics", "imputation_ablation", ): value = manifest.get(key) diff --git a/src/microplex_us/pipelines/stage_status.py b/src/microplex_us/pipelines/stage_status.py index 410f57d..852f758 100644 --- a/src/microplex_us/pipelines/stage_status.py +++ b/src/microplex_us/pipelines/stage_status.py @@ -93,6 +93,7 @@ def stage_status( "policyengine_harness", "policyengine_native_scores", "policyengine_native_audit", + "policyengine_native_target_diagnostics", "imputation_ablation", ) evidence_index_keys = ("validation_evidence",) diff --git a/src/microplex_us/pipelines/stage_validation_evidence.py b/src/microplex_us/pipelines/stage_validation_evidence.py index 4b90e7b..195f287 100644 --- a/src/microplex_us/pipelines/stage_validation_evidence.py +++ b/src/microplex_us/pipelines/stage_validation_evidence.py @@ -29,6 +29,7 @@ def build_us_validation_evidence_manifest( "policyengine_harness", "policyengine_native_scores", "policyengine_native_audit", + "policyengine_native_target_diagnostics", "imputation_ablation", "child_tax_unit_agi_drift", ) diff --git a/tests/pipelines/test_backfill_pe_native_audit.py b/tests/pipelines/test_backfill_pe_native_audit.py index cf1e9c6..2203d6f 100644 --- a/tests/pipelines/test_backfill_pe_native_audit.py +++ b/tests/pipelines/test_backfill_pe_native_audit.py @@ -101,6 +101,34 @@ def test_backfill_us_pe_native_audit_root_updates_manifest_and_snapshot( "microplex_us.pipelines.backfill_pe_native_audit.build_policyengine_us_data_rebuild_native_audit", lambda *args, **kwargs: { "artifactId": "run-1", + "period": 2024, + "targetDelta": { + "metric": "enhanced_cps_native_loss_target_delta", + "period": 2024, + "from_dataset": str((tmp_path / "baseline.h5").resolve()), + "to_dataset": str((bundle_dir / "policyengine_us.h5").resolve()), + "summary": {"n_targets": 1, "to_win_rate": 1.0}, + "family_summaries": [{"target_family": "national_irs_other"}], + "scope_summaries": [{"target_scope": "national"}], + "targets": [ + { + "target_name": "nation/irs/example", + "target_family": "national_irs_other", + "target_scope": "national", + "winner": "to", + "weighted_term_delta": -1.0, + "from_weighted_term": 2.0, + "to_weighted_term": 1.0, + "target_value": 100.0, + "from_estimate": 90.0, + "to_estimate": 95.0, + "from_rel_error": 0.2, + "to_rel_error": 0.1, + } + ], + "top_regressions": [], + "top_improvements": [], + }, "verdictHints": { "largestRegressingFamily": "national_irs_other", "productionImputationVariant": "structured_pe_conditioning", @@ -118,6 +146,14 @@ def test_backfill_us_pe_native_audit_root_updates_manifest_and_snapshot( updated_manifest["artifacts"]["policyengine_native_audit"] == "pe_us_data_rebuild_native_audit.json" ) + assert ( + updated_manifest["artifacts"]["policyengine_native_target_diagnostics"] + == "pe_native_target_diagnostics.json" + ) + target_diagnostics = json.loads( + (bundle_dir / "pe_native_target_diagnostics.json").read_text() + ) + assert target_diagnostics["targets"][0]["delta_absolute_error"] == -5.0 assert ( updated_manifest["policyengine_native_audit"][ "productionImputationVariantIsSupportWinner" @@ -133,6 +169,7 @@ def test_backfill_us_pe_native_audit_root_updates_manifest_and_snapshot( assert benchmark["outputs"] == [ "policyengine_native_scores.json", "pe_us_data_rebuild_native_audit.json", + "pe_native_target_diagnostics.json", ] diff --git a/tests/pipelines/test_pe_native_scores.py b/tests/pipelines/test_pe_native_scores.py index e9311d6..c8c9fdc 100644 --- a/tests/pipelines/test_pe_native_scores.py +++ b/tests/pipelines/test_pe_native_scores.py @@ -11,6 +11,7 @@ annotate_pe_native_target_db_matches, build_policyengine_us_data_pythonpath, build_policyengine_us_data_subprocess_env, + build_us_pe_native_target_diagnostics_payload, compare_us_pe_native_target_deltas, compute_batch_us_pe_native_scores, compute_batch_us_pe_native_support_audits, @@ -560,6 +561,57 @@ def test_write_us_pe_native_target_diagnostics_persists_full_payload( assert payload["target_db_summary"]["unparsed"] == 1 +def test_build_us_pe_native_target_diagnostics_payload_adds_public_aliases( + tmp_path, +) -> None: + payload = build_us_pe_native_target_diagnostics_payload( + target_delta_payload={ + "metric": "enhanced_cps_native_loss_target_delta", + "period": 2024, + "from_dataset": "/tmp/enhanced_cps_2024.h5", + "to_dataset": "/tmp/policyengine_us.h5", + "summary": {"n_targets": 1}, + "targets": [ + { + "target_name": "nation/irs/example", + "target_family": "national_irs_other", + "target_scope": "national", + "winner": "to", + "weighted_term_delta": -1.0, + "from_weighted_term": 2.0, + "to_weighted_term": 1.0, + "target_value": 100.0, + "from_estimate": 90.0, + "to_estimate": 95.0, + "from_rel_error": 0.2, + "to_rel_error": 0.1, + } + ], + "top_regressions": [], + "top_improvements": [], + }, + from_label="policyengine-us-data", + to_label="microplex-us", + policyengine_targets_db_path=tmp_path / "missing.db", + ) + + row = payload["targets"][0] + assert payload["diagnostic_schema_version"] == 1 + assert payload["baseline_dataset"] == "/tmp/enhanced_cps_2024.h5" + assert payload["candidate_dataset"] == "/tmp/policyengine_us.h5" + assert row["target_id"] == "nation/irs/example" + assert row["us_data_aggregate"] == 90.0 + assert row["microplex_aggregate"] == 95.0 + assert row["us_data_absolute_error"] == 10.0 + assert row["microplex_absolute_error"] == 5.0 + assert row["delta_absolute_error"] == -5.0 + assert round(row["delta_relative_error"], 10) == -0.1 + assert row["loss_contribution"] == 1.0 + assert row["family"] == "national_irs_other" + assert row["in_loss"] is True + assert row["supported_by_microplex"] is True + + def test_compute_batch_us_pe_native_target_deltas_wraps_multiple_candidates( monkeypatch, tmp_path, diff --git a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py index 6557317..0eddeae 100644 --- a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py +++ b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py @@ -966,6 +966,34 @@ def test_attach_policyengine_us_data_rebuild_checkpoint_evidence_updates_manifes ) native_audit_payload = { "artifactId": "artifact", + "period": 2024, + "targetDelta": { + "metric": "enhanced_cps_native_loss_target_delta", + "period": 2024, + "from_dataset": "/tmp/enhanced_cps_2024.h5", + "to_dataset": "/tmp/policyengine_us.h5", + "summary": {"n_targets": 1, "to_win_rate": 1.0}, + "family_summaries": [{"target_family": "national_irs_other"}], + "scope_summaries": [{"target_scope": "national"}], + "targets": [ + { + "target_name": "nation/irs/example", + "target_family": "national_irs_other", + "target_scope": "national", + "winner": "to", + "weighted_term_delta": -1.0, + "from_weighted_term": 2.0, + "to_weighted_term": 1.0, + "target_value": 100.0, + "from_estimate": 90.0, + "to_estimate": 95.0, + "from_rel_error": 0.2, + "to_rel_error": 0.1, + } + ], + "top_regressions": [], + "top_improvements": [], + }, "verdictHints": { "productionImputationVariantIsMaeWinner": True, "productionImputationVariantIsSupportWinner": True, @@ -1004,11 +1032,19 @@ def test_attach_policyengine_us_data_rebuild_checkpoint_evidence_updates_manifes result.native_audit_path == artifact_dir / "pe_us_data_rebuild_native_audit.json" ) + assert ( + result.native_target_diagnostics_path + == artifact_dir / "pe_native_target_diagnostics.json" + ) assert result.native_audit_payload == native_audit_payload + assert result.native_target_diagnostics_payload is not None assert result.imputation_ablation_path == artifact_dir / "imputation_ablation.json" written_native_audit = json.loads( (artifact_dir / "pe_us_data_rebuild_native_audit.json").read_text() ) + written_target_diagnostics = json.loads( + (artifact_dir / "pe_native_target_diagnostics.json").read_text() + ) assert ( written_manifest["artifacts"]["policyengine_harness"] == "policyengine_harness.json" @@ -1021,6 +1057,10 @@ def test_attach_policyengine_us_data_rebuild_checkpoint_evidence_updates_manifes written_manifest["artifacts"]["policyengine_native_audit"] == "pe_us_data_rebuild_native_audit.json" ) + assert ( + written_manifest["artifacts"]["policyengine_native_target_diagnostics"] + == "pe_native_target_diagnostics.json" + ) assert ( written_manifest["artifacts"]["imputation_ablation"] == "imputation_ablation.json" @@ -1049,6 +1089,16 @@ def test_attach_policyengine_us_data_rebuild_checkpoint_evidence_updates_manifes written_native_audit["verdictHints"]["productionImputationVariantIsMaeWinner"] is True ) + assert written_target_diagnostics["diagnostic_schema_version"] == 1 + assert written_target_diagnostics["dataset_labels"] == { + "from": "policyengine-us-data", + "to": "microplex-us", + } + first_target = written_target_diagnostics["targets"][0] + assert first_target["target_id"] == "nation/irs/example" + assert first_target["us_data_absolute_error"] == 10.0 + assert first_target["microplex_absolute_error"] == 5.0 + assert first_target["delta_absolute_error"] == -5.0 assert written_manifest["run_registry"]["artifact_id"] == "artifact" assert written_manifest["run_index"]["artifact_id"] == "artifact" assert (tmp_path / "run_index.duckdb").exists() @@ -1063,6 +1113,7 @@ def test_attach_policyengine_us_data_rebuild_checkpoint_evidence_updates_manifes "policyengine_native_scores.json", "imputation_ablation.json", "pe_us_data_rebuild_native_audit.json", + "pe_native_target_diagnostics.json", ] assert {metric["label"]: metric["value"] for metric in benchmark_stage["metrics"]}[ "Capped full oracle loss" From 3817aba29d8b8bb4c58d52c8fbbd58f1c621b903 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Tue, 2 Jun 2026 00:39:56 -0400 Subject: [PATCH 2/3] Tighten PE-native target diagnostics schema --- .../pipelines/backfill_pe_native_audit.py | 2 + .../pipelines/pe_native_scores.py | 262 +++++++++++++++--- .../pe_us_data_rebuild_checkpoint.py | 6 + .../test_backfill_pe_native_audit.py | 3 + tests/pipelines/test_pe_native_scores.py | 32 ++- .../test_pe_us_data_rebuild_checkpoint.py | 3 + 6 files changed, 274 insertions(+), 34 deletions(-) diff --git a/src/microplex_us/pipelines/backfill_pe_native_audit.py b/src/microplex_us/pipelines/backfill_pe_native_audit.py index 9f343de..d0673a9 100644 --- a/src/microplex_us/pipelines/backfill_pe_native_audit.py +++ b/src/microplex_us/pipelines/backfill_pe_native_audit.py @@ -257,6 +257,8 @@ def _write_native_audit_payload_to_bundle( "policyengine_targets_db" ), target_delta_payload=target_delta_payload, + artifact_id=str(payload.get("artifactId") or bundle_dir.name), + run_id=str(payload.get("artifactId") or bundle_dir.name), ) target_diagnostics_path = resolve_us_stage_artifact_contract_path( bundle_dir, diff --git a/src/microplex_us/pipelines/pe_native_scores.py b/src/microplex_us/pipelines/pe_native_scores.py index 90ddb7e..d41de16 100644 --- a/src/microplex_us/pipelines/pe_native_scores.py +++ b/src/microplex_us/pipelines/pe_native_scores.py @@ -2940,6 +2940,8 @@ def annotate_pe_native_target_db_matches( "policyengine_target_period": match["period"], "policyengine_target_value": match["value"], "policyengine_target_source": match["source"], + "policyengine_target_geo_level": match["geo_level"], + "policyengine_target_geographic_id": match["geographic_id"], "policyengine_target_domain_variable": match["domain_variable"], "policyengine_target_constraints": match["constraints"], } @@ -3210,6 +3212,8 @@ def write_us_pe_native_target_diagnostics( policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, policyengine_targets_db_path: str | Path | None = None, + artifact_id: str | None = None, + run_id: str | None = None, ) -> Path: """Write the full PE-native per-target diagnostic dataset to disk.""" @@ -3223,6 +3227,8 @@ def write_us_pe_native_target_diagnostics( policyengine_us_data_repo=policyengine_us_data_repo, policyengine_us_data_python=policyengine_us_data_python, policyengine_targets_db_path=policyengine_targets_db_path, + artifact_id=artifact_id, + run_id=run_id, ) destination = Path(output_path) destination.parent.mkdir(parents=True, exist_ok=True) @@ -3242,6 +3248,8 @@ def build_us_pe_native_target_diagnostics_payload( policyengine_us_data_python: str | Path | None = None, policyengine_targets_db_path: str | Path | None = None, target_delta_payload: dict[str, Any] | None = None, + artifact_id: str | None = None, + run_id: str | None = None, ) -> dict[str, Any]: """Build the full PE-native per-target diagnostic payload. @@ -3272,6 +3280,14 @@ def build_us_pe_native_target_diagnostics_payload( "from": from_label, "to": to_label, } + resolved_artifact_id = _first_present( + artifact_id, + payload.get("artifact_id"), + payload.get("artifactId"), + ) + resolved_run_id = _first_present(run_id, payload.get("run_id"), payload.get("runId")) + payload.setdefault("artifact_id", resolved_artifact_id) + payload.setdefault("run_id", resolved_run_id) payload.setdefault("baseline_dataset", payload.get("from_dataset")) payload.setdefault("candidate_dataset", payload.get("to_dataset")) target_db_path = ( @@ -3299,42 +3315,222 @@ def _required_dataset_path(value: str | Path | None, *, label: str) -> str | Pat def _add_policyengine_target_diagnostic_aliases(payload: dict[str, Any]) -> None: """Add dashboard-friendly aliases while preserving the native delta schema.""" - baseline_dataset = payload.get("from_dataset") - candidate_dataset = payload.get("to_dataset") - baseline_label = payload.get("dataset_labels", {}).get("from") - candidate_label = payload.get("dataset_labels", {}).get("to") + context = _PolicyEngineTargetDiagnosticAliasContext.from_payload(payload) + targets_by_name: dict[str, dict[str, Any]] = {} for row in payload.get("targets", ()): if not isinstance(row, dict): continue - target_value = row.get("target_value") - from_estimate = row.get("from_estimate") - to_estimate = row.get("to_estimate") - from_absolute_error = _absolute_error_or_none(from_estimate, target_value) - to_absolute_error = _absolute_error_or_none(to_estimate, target_value) - row.setdefault("target_id", row.get("policyengine_target_id") or row.get("target_name")) - row.setdefault("baseline_dataset", baseline_dataset) - row.setdefault("candidate_dataset", candidate_dataset) - row.setdefault("baseline_label", baseline_label) - row.setdefault("candidate_label", candidate_label) - row.setdefault("us_data_aggregate", from_estimate) - row.setdefault("microplex_aggregate", to_estimate) - row.setdefault("us_data_absolute_error", from_absolute_error) - row.setdefault("microplex_absolute_error", to_absolute_error) - row.setdefault("us_data_relative_error", row.get("from_rel_error")) - row.setdefault("microplex_relative_error", row.get("to_rel_error")) - if from_absolute_error is not None and to_absolute_error is not None: - row.setdefault( - "delta_absolute_error", - to_absolute_error - from_absolute_error, - ) + _add_policyengine_target_diagnostic_aliases_to_row(row, context) + target_name = row.get("target_name") + if target_name is not None: + targets_by_name[str(target_name)] = row + + for list_name in ("top_improvements", "top_regressions"): + for row in payload.get(list_name) or []: + if not isinstance(row, dict): + continue + full_row = targets_by_name.get(str(row.get("target_name", ""))) + if full_row is not None: + for key, value in full_row.items(): + row.setdefault(key, value) + _add_policyengine_target_diagnostic_aliases_to_row(row, context) + + +@dataclass(frozen=True) +class _PolicyEngineTargetDiagnosticAliasContext: + baseline_dataset: Any + candidate_dataset: Any + baseline_label: Any + candidate_label: Any + period: Any + artifact_id: Any + run_id: Any + + @classmethod + def from_payload( + cls, + payload: dict[str, Any], + ) -> _PolicyEngineTargetDiagnosticAliasContext: + return cls( + baseline_dataset=payload.get("from_dataset"), + candidate_dataset=payload.get("to_dataset"), + baseline_label=payload.get("dataset_labels", {}).get("from"), + candidate_label=payload.get("dataset_labels", {}).get("to"), + period=payload.get("period"), + artifact_id=payload.get("artifact_id") or payload.get("artifactId"), + run_id=payload.get("run_id") or payload.get("runId"), + ) + + +def _add_policyengine_target_diagnostic_aliases_to_row( + row: dict[str, Any], + context: _PolicyEngineTargetDiagnosticAliasContext, +) -> None: + expected_target = _expected_policyengine_target(row) + target_name = str(row.get("target_name") or "") + target_value = row.get("target_value") + from_estimate = row.get("from_estimate") + to_estimate = row.get("to_estimate") + from_absolute_error = _absolute_error_or_none(from_estimate, target_value) + to_absolute_error = _absolute_error_or_none(to_estimate, target_value) + row.setdefault( + "target_id", + row.get("policyengine_target_id") or row.get("target_name"), + ) + row.setdefault( + "period", + _first_present(row.get("policyengine_target_period"), context.period), + ) + row.setdefault( + "variable", + _first_present( + row.get("policyengine_target_variable"), + expected_target.get("variable"), + ), + ) + row.setdefault( + "geo_level", + _first_present( + row.get("policyengine_target_geo_level"), + expected_target.get("geo_level"), + _infer_target_geo_level(target_name, row), + ), + ) + row.setdefault( + "geography", + _first_present( + row.get("policyengine_target_geographic_id"), + expected_target.get("geographic_id"), + _infer_target_geography(target_name, row), + ), + ) + row.setdefault("state", _infer_target_state(target_name, row)) + row.setdefault( + "entity", + _infer_policyengine_target_entity(target_name, row, expected_target), + ) + row.setdefault("artifact_id", context.artifact_id) + row.setdefault("run_id", context.run_id) + row.setdefault("baseline_dataset", context.baseline_dataset) + row.setdefault("candidate_dataset", context.candidate_dataset) + row.setdefault("baseline_label", context.baseline_label) + row.setdefault("candidate_label", context.candidate_label) + row.setdefault("us_data_aggregate", from_estimate) + row.setdefault("microplex_aggregate", to_estimate) + row.setdefault("us_data_absolute_error", from_absolute_error) + row.setdefault("microplex_absolute_error", to_absolute_error) + row.setdefault("us_data_relative_error", row.get("from_rel_error")) + row.setdefault("microplex_relative_error", row.get("to_rel_error")) + if from_absolute_error is not None and to_absolute_error is not None: row.setdefault( - "delta_relative_error", - _delta_or_none(row.get("to_rel_error"), row.get("from_rel_error")), + "delta_absolute_error", + to_absolute_error - from_absolute_error, ) - row.setdefault("loss_contribution", row.get("to_weighted_term")) - row.setdefault("family", row.get("target_family")) - row.setdefault("in_loss", True) - row.setdefault("supported_by_microplex", True) + row.setdefault( + "delta_relative_error", + _delta_or_none(row.get("to_rel_error"), row.get("from_rel_error")), + ) + row.setdefault("us_data_loss_contribution", row.get("from_weighted_term")) + row.setdefault( + "policyengine_us_data_loss_contribution", + row.get("from_weighted_term"), + ) + row.setdefault("baseline_loss_contribution", row.get("from_weighted_term")) + row.setdefault("microplex_loss_contribution", row.get("to_weighted_term")) + row.setdefault("candidate_loss_contribution", row.get("to_weighted_term")) + row.setdefault("loss_contribution", row.get("to_weighted_term")) + row.setdefault("loss_contribution_delta", row.get("weighted_term_delta")) + row.setdefault("family", row.get("target_family")) + row.setdefault("in_loss", True) + row.setdefault("supported_by_microplex", True) + + +def _expected_policyengine_target(row: dict[str, Any]) -> dict[str, Any]: + expected = row.get("policyengine_target_expected") + return expected if isinstance(expected, dict) else {} + + +def _first_present(*values: Any) -> Any: + for value in values: + if value is not None: + return value + return None + + +def _target_name_parts(target_name: str) -> list[str]: + return [part for part in target_name.split("/") if part] + + +def _infer_target_geo_level(target_name: str, row: dict[str, Any]) -> str | None: + scope = row.get("target_scope") + if scope in {"national", "state"}: + return str(scope) + parts = _target_name_parts(target_name) + if not parts: + return None + if parts[0] == "nation": + return "national" + if parts[0] == "state": + return "state" + return None + + +def _infer_target_geography(target_name: str, row: dict[str, Any]) -> str | None: + geo_level = _infer_target_geo_level(target_name, row) + if geo_level == "national": + return "US" + state = _infer_target_state(target_name, row) + return state + + +def _infer_target_state(target_name: str, row: dict[str, Any]) -> str | None: + geography = _first_present( + row.get("policyengine_target_geographic_id"), + _expected_policyengine_target(row).get("geographic_id"), + ) + if isinstance(geography, str) and geography != "US": + return geography + parts = _target_name_parts(target_name) + if len(parts) >= 2 and parts[0] == "state": + return parts[1] + return None + + +def _infer_policyengine_target_entity( + target_name: str, + row: dict[str, Any], + expected_target: dict[str, Any], +) -> str | None: + variable = _first_present( + row.get("policyengine_target_variable"), + expected_target.get("variable"), + row.get("variable"), + ) + domain_variable = _first_present( + row.get("policyengine_target_domain_variable"), + expected_target.get("domain_variable"), + ) + if _contains_entity_hint("tax_unit", variable, domain_variable): + return "tax_unit" + if _contains_entity_hint("spm_unit", variable, domain_variable): + return "spm_unit" + if _contains_entity_hint("household", variable, domain_variable): + return "household" + + parts = _target_name_parts(target_name) + if "irs" in parts or "jct" in parts: + return "tax_unit" + if "census" in parts: + return "person" + if "snap-hhs" in target_name: + return "household" + if "spm-unit" in target_name: + return "spm_unit" + return None + + +def _contains_entity_hint(entity: str, *values: Any) -> bool: + return any(entity in str(value) for value in values if value is not None) def _absolute_error_or_none(value: Any, target: Any) -> float | None: @@ -3392,6 +3588,8 @@ def main_target_diagnostics(argv: list[str] | None = None) -> int: parser.add_argument("--policyengine-us-data-python") parser.add_argument("--policyengine-us-data-repo") parser.add_argument("--policyengine-targets-db") + parser.add_argument("--artifact-id") + parser.add_argument("--run-id") args = parser.parse_args(argv) path = write_us_pe_native_target_diagnostics( @@ -3405,6 +3603,8 @@ def main_target_diagnostics(argv: list[str] | None = None) -> int: policyengine_us_data_python=args.policyengine_us_data_python, policyengine_us_data_repo=args.policyengine_us_data_repo, policyengine_targets_db_path=args.policyengine_targets_db, + artifact_id=args.artifact_id, + run_id=args.run_id, ) print(str(path)) return 0 diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py index faa6875..bcb7e11 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py @@ -1777,6 +1777,12 @@ def attach_policyengine_us_data_rebuild_checkpoint_evidence( policyengine_us_data_python=policyengine_us_data_python, policyengine_targets_db_path=config.get("policyengine_targets_db"), target_delta_payload=target_delta_payload, + artifact_id=str( + native_audit_payload.get("artifactId") or artifact_root.name + ), + run_id=str( + native_audit_payload.get("artifactId") or artifact_root.name + ), ) ) native_target_diagnostics_path = resolve_us_stage_artifact_contract_path( diff --git a/tests/pipelines/test_backfill_pe_native_audit.py b/tests/pipelines/test_backfill_pe_native_audit.py index 2203d6f..84308ed 100644 --- a/tests/pipelines/test_backfill_pe_native_audit.py +++ b/tests/pipelines/test_backfill_pe_native_audit.py @@ -153,6 +153,9 @@ def test_backfill_us_pe_native_audit_root_updates_manifest_and_snapshot( target_diagnostics = json.loads( (bundle_dir / "pe_native_target_diagnostics.json").read_text() ) + assert target_diagnostics["artifact_id"] == "run-1" + assert target_diagnostics["run_id"] == "run-1" + assert target_diagnostics["targets"][0]["artifact_id"] == "run-1" assert target_diagnostics["targets"][0]["delta_absolute_error"] == -5.0 assert ( updated_manifest["policyengine_native_audit"][ diff --git a/tests/pipelines/test_pe_native_scores.py b/tests/pipelines/test_pe_native_scores.py index c8c9fdc..db558bf 100644 --- a/tests/pipelines/test_pe_native_scores.py +++ b/tests/pipelines/test_pe_native_scores.py @@ -498,6 +498,8 @@ def test_annotate_pe_native_target_db_matches_marks_matches_and_gaps( assert payload["targets"][0]["policyengine_target_match"] == "matched" assert payload["targets"][0]["policyengine_target_id"] == 123 + assert payload["targets"][0]["policyengine_target_geo_level"] == "national" + assert payload["targets"][0]["policyengine_target_geographic_id"] == "US" assert payload["targets"][1]["policyengine_target_match"] == "legacy_only" assert payload["targets"][1]["policyengine_target_expected"]["variable"] == ( "tax_unit_count" @@ -573,7 +575,7 @@ def test_build_us_pe_native_target_diagnostics_payload_adds_public_aliases( "summary": {"n_targets": 1}, "targets": [ { - "target_name": "nation/irs/example", + "target_name": "nation/irs/eitc/returns/c2_0_1k", "target_family": "national_irs_other", "target_scope": "national", "winner": "to", @@ -587,19 +589,34 @@ def test_build_us_pe_native_target_diagnostics_payload_adds_public_aliases( "to_rel_error": 0.1, } ], - "top_regressions": [], + "top_regressions": [ + { + "target_name": "nation/irs/eitc/returns/c2_0_1k", + "weighted_term_delta": -1.0, + } + ], "top_improvements": [], }, from_label="policyengine-us-data", to_label="microplex-us", policyengine_targets_db_path=tmp_path / "missing.db", + artifact_id="artifact-1", + run_id="run-1", ) row = payload["targets"][0] assert payload["diagnostic_schema_version"] == 1 assert payload["baseline_dataset"] == "/tmp/enhanced_cps_2024.h5" assert payload["candidate_dataset"] == "/tmp/policyengine_us.h5" - assert row["target_id"] == "nation/irs/example" + assert row["target_id"] == "nation/irs/eitc/returns/c2_0_1k" + assert row["period"] == 2024 + assert row["variable"] == "tax_unit_count" + assert row["geo_level"] == "national" + assert row["geography"] == "US" + assert row["state"] is None + assert row["entity"] == "tax_unit" + assert row["artifact_id"] == "artifact-1" + assert row["run_id"] == "run-1" assert row["us_data_aggregate"] == 90.0 assert row["microplex_aggregate"] == 95.0 assert row["us_data_absolute_error"] == 10.0 @@ -607,9 +624,18 @@ def test_build_us_pe_native_target_diagnostics_payload_adds_public_aliases( assert row["delta_absolute_error"] == -5.0 assert round(row["delta_relative_error"], 10) == -0.1 assert row["loss_contribution"] == 1.0 + assert row["microplex_loss_contribution"] == 1.0 + assert row["candidate_loss_contribution"] == 1.0 + assert row["us_data_loss_contribution"] == 2.0 + assert row["policyengine_us_data_loss_contribution"] == 2.0 + assert row["baseline_loss_contribution"] == 2.0 + assert row["loss_contribution_delta"] == -1.0 assert row["family"] == "national_irs_other" assert row["in_loss"] is True assert row["supported_by_microplex"] is True + top_row = payload["top_regressions"][0] + assert top_row["microplex_aggregate"] == 95.0 + assert top_row["artifact_id"] == "artifact-1" def test_compute_batch_us_pe_native_target_deltas_wraps_multiple_candidates( diff --git a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py index 0eddeae..a78cee0 100644 --- a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py +++ b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py @@ -1045,6 +1045,9 @@ def test_attach_policyengine_us_data_rebuild_checkpoint_evidence_updates_manifes written_target_diagnostics = json.loads( (artifact_dir / "pe_native_target_diagnostics.json").read_text() ) + assert written_target_diagnostics["artifact_id"] == "artifact" + assert written_target_diagnostics["run_id"] == "artifact" + assert written_target_diagnostics["targets"][0]["artifact_id"] == "artifact" assert ( written_manifest["artifacts"]["policyengine_harness"] == "policyengine_harness.json" From 598e7d1e9fd0dc30eb15621f76b10f337a51d85b Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Tue, 2 Jun 2026 10:19:40 -0400 Subject: [PATCH 3/3] Infer legacy PE-native target metadata --- .../pipelines/pe_native_scores.py | 165 +++++++++++++++++- tests/pipelines/test_pe_native_scores.py | 67 +++++++ 2 files changed, 228 insertions(+), 4 deletions(-) diff --git a/src/microplex_us/pipelines/pe_native_scores.py b/src/microplex_us/pipelines/pe_native_scores.py index d41de16..e91e372 100644 --- a/src/microplex_us/pipelines/pe_native_scores.py +++ b/src/microplex_us/pipelines/pe_native_scores.py @@ -3386,6 +3386,7 @@ def _add_policyengine_target_diagnostic_aliases_to_row( _first_present( row.get("policyengine_target_variable"), expected_target.get("variable"), + _infer_target_variable(target_name, row), ), ) row.setdefault( @@ -3440,7 +3441,7 @@ def _add_policyengine_target_diagnostic_aliases_to_row( row.setdefault("candidate_loss_contribution", row.get("to_weighted_term")) row.setdefault("loss_contribution", row.get("to_weighted_term")) row.setdefault("loss_contribution_delta", row.get("weighted_term_delta")) - row.setdefault("family", row.get("target_family")) + row.setdefault("family", _infer_target_family(target_name, row)) row.setdefault("in_loss", True) row.setdefault("supported_by_microplex", True) @@ -3461,6 +3462,103 @@ def _target_name_parts(target_name: str) -> list[str]: return [part for part in target_name.split("/") if part] +def _infer_target_variable(target_name: str, row: dict[str, Any]) -> str | None: + parts = _target_name_parts(target_name) + if not parts: + return None + + if target_name.endswith("/snap-cost"): + return "snap_cost" + if target_name.endswith("/snap-hhs"): + return "snap_households" + + if parts[0] == "nation": + return _infer_national_target_variable(parts) + if parts[0] == "state": + return _infer_state_target_variable(parts) + + family = row.get("target_family") + return str(family) if family not in {None, "other"} else None + + +def _infer_target_family(target_name: str, row: dict[str, Any]) -> str | None: + family = row.get("target_family") + if family not in {None, "other"}: + return str(family) + if target_name.startswith("state/irs/aca_spending/"): + return "state_aca_spending" + if target_name.startswith("state/irs/aca_enrollment/"): + return "state_aca_enrollment" + if target_name.endswith("/snap-cost"): + return "state_snap_cost" + if target_name.endswith("/snap-hhs"): + return "state_snap_households" + return str(family) if family is not None else None + + +def _infer_national_target_variable(parts: list[str]) -> str | None: + if len(parts) < 2: + return None + source = parts[1] + if source == "irs" and len(parts) >= 3: + metric = parts[2] + if metric == "adjusted gross income": + return "adjusted_gross_income" + if metric == "count": + return "tax_unit_count" + return _slugify_target_token(metric) + if source == "census" and len(parts) >= 3: + metric = parts[2] + if metric.startswith("agi_in_spm_threshold_decile_"): + return "agi_in_spm_threshold_decile" + if metric.startswith("count_in_spm_threshold_decile_"): + return "count_in_spm_threshold_decile" + if metric == "population_by_age": + return "population" + return _slugify_target_token(metric) + if source == "gov" and len(parts) >= 3: + return _slugify_target_token(parts[2]) + if source == "cbo" and len(parts) >= 3: + if parts[2] == "income_by_source" and len(parts) >= 4: + return _slugify_target_token(parts[3]) + return _slugify_target_token(parts[2]) + if source in {"soi", "hhs"} and len(parts) >= 3: + return _slugify_target_token(parts[2]) + if source in {"jct", "net_worth", "ssa"}: + return source + return _slugify_target_token(source) + + +def _infer_state_target_variable(parts: list[str]) -> str | None: + if len(parts) < 2: + return None + source_or_state = parts[1] + if source_or_state == "irs" and len(parts) >= 3: + return _slugify_target_token(parts[2]) + if source_or_state == "census" and len(parts) >= 3: + metric = parts[2] + if metric == "population_by_state": + return "population" + if metric == "population_under_5_by_state": + return "population_under_5" + return _slugify_target_token(metric) + if source_or_state == "real_estate_taxes": + return "real_estate_taxes" + if _looks_like_state_code(source_or_state) and len(parts) >= 3: + return _slugify_target_token(parts[2]) + return _slugify_target_token(source_or_state) + + +def _slugify_target_token(value: str) -> str: + return ( + value.strip() + .lower() + .replace(" ", "_") + .replace("-", "_") + .replace("/", "_") + ) + + def _infer_target_geo_level(target_name: str, row: dict[str, Any]) -> str | None: scope = row.get("target_scope") if scope in {"national", "state"}: @@ -3489,13 +3587,29 @@ def _infer_target_state(target_name: str, row: dict[str, Any]) -> str | None: _expected_policyengine_target(row).get("geographic_id"), ) if isinstance(geography, str) and geography != "US": - return geography + return _normalize_state_code(geography) parts = _target_name_parts(target_name) - if len(parts) >= 2 and parts[0] == "state": - return parts[1] + if parts and parts[0] == "state": + for token in parts[1:]: + if _looks_like_state_code(token): + return _normalize_state_code(token) + if parts and _looks_like_state_fips_id(parts[0]): + return parts[0] return None +def _looks_like_state_code(value: str) -> bool: + return len(value) == 2 and value.isalpha() + + +def _looks_like_state_fips_id(value: str) -> bool: + return len(value) == 4 and value.startswith("US") and value[2:].isdigit() + + +def _normalize_state_code(value: str) -> str: + return value.upper() if _looks_like_state_code(value) else value + + def _infer_policyengine_target_entity( target_name: str, row: dict[str, Any], @@ -3520,10 +3634,53 @@ def _infer_policyengine_target_entity( parts = _target_name_parts(target_name) if "irs" in parts or "jct" in parts: return "tax_unit" + if "soi" in parts: + return "tax_unit" + if "cbo" in parts: + if "snap" in parts: + return "household" + if "ssi" in parts or "social_security" in parts: + return "person" + return "tax_unit" + if "hhs" in parts: + return "person" if "census" in parts: return "person" + family = row.get("target_family") + if family in { + "state_agi_distribution", + "national_irs_other", + "national_tax_expenditures", + "state_aca_enrollment", + "state_aca_spending", + }: + return "tax_unit" + if family in { + "state_age_distribution", + "state_population", + "state_population_under_5", + "national_population_by_age", + "national_infants", + "national_census_other", + "national_ssa", + }: + return "person" + if family in { + "national_spm_threshold_agi", + "national_spm_threshold_count", + }: + return "spm_unit" + if family in { + "state_real_estate_taxes", + "national_net_worth", + }: + return "household" if "snap-hhs" in target_name: return "household" + if "snap-cost" in target_name: + return "household" + if _contains_entity_hint("aca", variable, target_name): + return "tax_unit" if "spm-unit" in target_name: return "spm_unit" return None diff --git a/tests/pipelines/test_pe_native_scores.py b/tests/pipelines/test_pe_native_scores.py index db558bf..cc92c83 100644 --- a/tests/pipelines/test_pe_native_scores.py +++ b/tests/pipelines/test_pe_native_scores.py @@ -638,6 +638,73 @@ def test_build_us_pe_native_target_diagnostics_payload_adds_public_aliases( assert top_row["artifact_id"] == "artifact-1" +def test_build_us_pe_native_target_diagnostics_payload_infers_legacy_metadata( + tmp_path, +) -> None: + def row(target_name: str, family: str, scope: str = "state") -> dict[str, object]: + return { + "target_name": target_name, + "target_family": family, + "target_scope": scope, + "winner": "to", + "weighted_term_delta": -1.0, + "from_weighted_term": 2.0, + "to_weighted_term": 1.0, + "target_value": 100.0, + "from_estimate": 90.0, + "to_estimate": 95.0, + "from_rel_error": 0.2, + "to_rel_error": 0.1, + } + + payload = build_us_pe_native_target_diagnostics_payload( + target_delta_payload={ + "metric": "enhanced_cps_native_loss_target_delta", + "period": 2024, + "from_dataset": "/tmp/enhanced_cps_2024.h5", + "to_dataset": "/tmp/policyengine_us.h5", + "summary": {"n_targets": 3}, + "targets": [ + row("state/irs/aca_spending/ak", "other"), + row("US39/snap-hhs", "state_snap_households"), + row("nation/irs/count/count/AGI in 20k-25k/taxable/All", "national_irs_other", "national"), + row("state/census/age/AZ/75-79", "state_age_distribution"), + row("nation/cbo/income_by_source/qualified_dividend_income", "other", "national"), + row("nation/soi/filer_count/agi_1m_2m", "other", "national"), + row("nation/hhs/medicaid_enrollment", "other", "national"), + ], + "top_regressions": [], + "top_improvements": [], + }, + policyengine_targets_db_path=tmp_path / "missing.db", + ) + + aca, snap, irs_count, state_age, cbo_income, soi_count, hhs_medicaid = payload[ + "targets" + ] + assert aca["variable"] == "aca_spending" + assert aca["geography"] == "AK" + assert aca["state"] == "AK" + assert aca["entity"] == "tax_unit" + assert aca["family"] == "state_aca_spending" + assert snap["variable"] == "snap_households" + assert snap["geography"] == "US39" + assert snap["state"] == "US39" + assert snap["entity"] == "household" + assert irs_count["variable"] == "tax_unit_count" + assert irs_count["geography"] == "US" + assert irs_count["entity"] == "tax_unit" + assert state_age["variable"] == "age" + assert state_age["geography"] == "AZ" + assert state_age["entity"] == "person" + assert cbo_income["variable"] == "qualified_dividend_income" + assert cbo_income["entity"] == "tax_unit" + assert soi_count["variable"] == "filer_count" + assert soi_count["entity"] == "tax_unit" + assert hhs_medicaid["variable"] == "medicaid_enrollment" + assert hhs_medicaid["entity"] == "person" + + def test_compute_batch_us_pe_native_target_deltas_wraps_multiple_candidates( monkeypatch, tmp_path,