diff --git a/.github/workflows/ecps-eval.yaml b/.github/workflows/ecps-eval.yaml new file mode 100644 index 0000000..485b9f3 --- /dev/null +++ b/.github/workflows/ecps-eval.yaml @@ -0,0 +1,151 @@ +name: eCPS-replacement eval + +# Runs the *sound* Microplex-vs-eCPS comparison automatically so it no longer +# has to be driven by hand. The heavy lifting lives in scripts/run_ecps_eval.py +# (so this YAML stays thin and the gate logic is unit-tested); this workflow +# only wires up triggers, credentials, the runner, and artifact upload. +# +# Compute choice: at matched-N (~41k households) the comparison ran ~20-30 min +# on CPU locally, so a standard ubuntu-latest runner with a generous timeout is +# used for simplicity. Building a *candidate* from scratch is GPU-heavy and is +# intentionally out of scope here; if that is ever needed, a Modal path would be +# required (see the stubbed modal-build job at the bottom). + +on: + workflow_dispatch: + inputs: + candidate: + description: "Candidate Microplex H5 (local path, http(s):// URI, or hf://repo/file)" + required: true + type: string + baseline_source: + description: "Baseline eCPS source (local path, or 'latest' for latest published eCPS)" + required: false + default: "latest published eCPS" + type: string + schedule: + # Weekly, Mondays at 09:00 UTC. + - cron: "0 9 * * 1" + +permissions: + contents: read + +concurrency: + group: ecps-eval-${{ github.ref }} + cancel-in-progress: false + +jobs: + ecps-eval: + runs-on: ubuntu-latest + # The comparison ran ~20-30 min at matched-N locally; allow generous slack + # for download + cold caches. + timeout-minutes: 180 + defaults: + run: + working-directory: microplex-us + steps: + - name: Check out microplex-us + uses: actions/checkout@v4 + with: + path: microplex-us + + - name: Check out core microplex + uses: actions/checkout@v4 + with: + repository: PolicyEngine/microplex + ref: main + path: microplex + + - name: Check out microunit + uses: actions/checkout@v4 + with: + repository: PolicyEngine/microunit + ref: main + path: microunit + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.14" + + - name: Set up uv + uses: astral-sh/setup-uv@v6 + with: + version: "0.11.14" + working-directory: microplex-us + + - name: Resolve inputs + id: inputs + env: + # On the weekly schedule there is no candidate input; fall back to the + # latest published Enhanced CPS so the eval still runs end to end. + CANDIDATE_INPUT: ${{ github.event.inputs.candidate }} + BASELINE_INPUT: ${{ github.event.inputs.baseline_source }} + run: | + candidate="${CANDIDATE_INPUT:-hf://policyengine/policyengine-us-data/enhanced_cps_2024.h5}" + baseline="${BASELINE_INPUT:-latest published eCPS}" + echo "candidate=$candidate" >> "$GITHUB_OUTPUT" + echo "baseline=$baseline" >> "$GITHUB_OUTPUT" + + - name: Run sound eCPS-replacement eval + env: + HUGGING_FACE_TOKEN: ${{ secrets.HUGGING_FACE_TOKEN }} + run: | + uv run --extra dev --extra policyengine \ + --with huggingface_hub \ + --with-editable ../microplex \ + python scripts/run_ecps_eval.py \ + --candidate "${{ steps.inputs.outputs.candidate }}" \ + --baseline-source "${{ steps.inputs.outputs.baseline }}" \ + --work-dir eval_work \ + --output-dir comparison_output + + - name: Upload comparison artifacts + if: always() + uses: actions/upload-artifact@v4 + with: + name: ecps-replacement-comparison + # Paths are relative to the job working-directory (microplex-us). + path: microplex-us/comparison_output/** + if-no-files-found: warn + retention-days: 30 + + # --------------------------------------------------------------------------- # + # STUB: full candidate BUILD (GPU) via Modal. + # + # The job above benchmarks an *existing* candidate H5 on CPU. If a candidate + # ever needs to be BUILT from scratch (GPU-heavy synthesis/calibration), that + # work would run on Modal rather than a GitHub runner. This job is a + # deliberately disabled placeholder showing exactly where that plugs in, + # mirroring the Modal auth pattern in policyengine-us-data + # (.github/workflows/local_area_publish.yaml and reusable_test.yaml: + # MODAL_TOKEN_ID / MODAL_TOKEN_SECRET, `pip install modal`, `modal run ...`). + # + # To enable: flip `if: false` to a real condition, add the MODAL_TOKEN_ID / + # MODAL_TOKEN_SECRET repo secrets, point the final step at the Modal entry + # point that builds the candidate, then feed its output H5 into ecps-eval. + # --------------------------------------------------------------------------- # + modal-build: + if: false # disabled placeholder; see comment above + runs-on: ubuntu-latest + timeout-minutes: 480 + permissions: + contents: read + env: + HUGGING_FACE_TOKEN: ${{ secrets.HUGGING_FACE_TOKEN }} + MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }} + MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }} + steps: + - name: Check out repo + uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.13" + - name: Install Modal CLI + run: pip install modal + - name: Build candidate on Modal (GPU) + run: | + echo "STUB: invoke the Modal candidate build here, e.g." + echo " modal run modal_app/build_candidate.py --output candidate.h5" + exit 1 diff --git a/scripts/run_ecps_eval.py b/scripts/run_ecps_eval.py new file mode 100644 index 0000000..61cc5ff --- /dev/null +++ b/scripts/run_ecps_eval.py @@ -0,0 +1,486 @@ +"""Run the sound eCPS-replacement comparison end to end (for CI). + +This is the orchestration layer the ``ecps-eval`` GitHub Actions workflow shells +out to, so the workflow YAML stays thin and the logic stays unit-tested. It: + +1. **Resolves the baseline** Enhanced CPS - an explicit local path, or (by + default) the latest published artifact on the Hugging Face Hub + (``policyengine/policyengine-us-data``). Its clone-diagnostics sidecar is + resolved alongside it. +2. **Resolves the candidate** Microplex H5 - a local path or a downloadable URI. +3. **Runs the clone-floor baseline gate** (the core lesson). Before spending + ~20-30 minutes benchmarking, refuse to benchmark against a degraded (or + unverifiable) baseline. See + :mod:`microplex_us.pipelines.ecps_clone_floor`. +4. **Runs the comparison** via the + ``microplex-us-ecps-replacement-comparison`` console script with the sound + flags. +5. **Emits a GitHub Step Summary** with matched N, both losses, train/holdout + losses, ``candidate_beats_baseline``, every soundness gate, and the + honest-reporting caveat (#113). + +Set ``DRYRUN=1`` to print the comparison command that *would* run without +executing it (and without downloading anything that needs credentials). + +This script intentionally does **not** build a candidate dataset. A full +candidate build is GPU-heavy and would run on Modal; see the workflow for where +that path would plug in. + +Usage (typical CI invocation):: + + uv run python scripts/run_ecps_eval.py \\ + --candidate hf://policyengine/policyengine-us-data/enhanced_cps_2024.h5 \\ + --baseline-source latest \\ + --output-dir comparison_output +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import subprocess +import sys +from pathlib import Path +from typing import Any + +# Allow running both as an installed module and as a bare script from a checkout +# where ``src/`` is not yet importable. +try: + from microplex_us.pipelines.ecps_clone_floor import ( + DEFAULT_CLONE_FLOOR, + CloneFloorGateResult, + evaluate_clone_floor_gate, + ) +except ModuleNotFoundError: # pragma: no cover - exercised only outside tests + sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) + from microplex_us.pipelines.ecps_clone_floor import ( + DEFAULT_CLONE_FLOOR, + CloneFloorGateResult, + evaluate_clone_floor_gate, + ) + +logger = logging.getLogger("run_ecps_eval") + +# The published baseline artifact and its diagnostics sidecar. +HF_DATA_REPO = "policyengine/policyengine-us-data" +BASELINE_H5_FILENAME = "enhanced_cps_2024.h5" +BASELINE_DIAGNOSTICS_FILENAME = "enhanced_cps_2024.clone_diagnostics.json" + +# Default soundness flags for the comparison (kept in one place). +DEFAULT_PERIOD = 2024 +DEFAULT_MATCHED_SAMPLE_METHOD = "uniform" +DEFAULT_HOLDOUT_TARGET_FRACTION = 0.2 +DEFAULT_OPTIMIZER_MAX_ITER = 200 + +COMPARISON_CONSOLE_SCRIPT = "microplex-us-ecps-replacement-comparison" +RESULT_JSON_FILENAME = "sound_ecps_replacement_comparison.json" + +# The honest-reporting caveat, surfaced verbatim in the step summary. +HONEST_REPORTING_CAVEAT = ( + "candidate_beats_baseline=true is only meaningful if the baseline passed " + "the clone-floor gate AND the candidate does not share tax-unit " + "construction code with the baseline (see microplex-us #113 re: microunit " + "convergence)." +) + +# Values of DRYRUN that mean "off". +_FALSE_VALUES = ("", "0", "false", "no", "off") + + +def _dryrun_enabled() -> bool: + """Return ``True`` if ``DRYRUN`` is set to a truthy value.""" + return os.environ.get("DRYRUN", "").strip().lower() not in _FALSE_VALUES + + +def _is_published_baseline(baseline_source: str | None) -> bool: + """Return ``True`` if the baseline should be pulled from Hugging Face.""" + if baseline_source is None: + return True + return baseline_source.strip().lower() in ( + "latest", + "latest published ecps", + ) + + +def resolve_baseline( + baseline_source: str | None, + work_dir: Path, +) -> tuple[Path, Path]: + """Resolve the baseline H5 and its clone-diagnostics sidecar. + + Parameters + ---------- + baseline_source: + Either a local path to an Enhanced CPS H5, or ``None`` / ``"latest"`` / + ``"latest published eCPS"`` to pull the latest published artifact. + work_dir: + Directory to download artifacts into when pulling from the Hub. + + Returns + ------- + tuple[Path, Path] + ``(baseline_h5, baseline_diagnostics_json)``. + + Notes + ----- + In ``DRYRUN`` mode nothing is downloaded; the *expected* local paths are + returned so the caller can show the command without touching the network or + needing a Hugging Face token. + """ + if not _is_published_baseline(baseline_source): + assert baseline_source is not None # narrowed by _is_published_baseline + h5 = Path(baseline_source).expanduser() + diagnostics = h5.with_name(BASELINE_DIAGNOSTICS_FILENAME) + return h5, diagnostics + + work_dir.mkdir(parents=True, exist_ok=True) + expected_h5 = work_dir / BASELINE_H5_FILENAME + expected_diag = work_dir / BASELINE_DIAGNOSTICS_FILENAME + + if _dryrun_enabled(): + logger.info( + "[dryrun] would download %s and %s from %s", + BASELINE_H5_FILENAME, + BASELINE_DIAGNOSTICS_FILENAME, + HF_DATA_REPO, + ) + return expected_h5, expected_diag + + from huggingface_hub import hf_hub_download + + token = os.environ.get("HUGGING_FACE_TOKEN") + h5 = Path( + hf_hub_download( + repo_id=HF_DATA_REPO, + repo_type="dataset", + filename=BASELINE_H5_FILENAME, + local_dir=str(work_dir), + token=token, + ) + ) + # The diagnostics sidecar may not be published yet; download it if present, + # otherwise leave the expected path so the gate fails closed on absence. + try: + diagnostics = Path( + hf_hub_download( + repo_id=HF_DATA_REPO, + repo_type="dataset", + filename=BASELINE_DIAGNOSTICS_FILENAME, + local_dir=str(work_dir), + token=token, + ) + ) + except Exception as exc: # noqa: BLE001 - any failure means "treat as absent" + logger.warning( + "could not fetch %s (%s); clone-floor gate will fail closed", + BASELINE_DIAGNOSTICS_FILENAME, + exc, + ) + diagnostics = expected_diag + + return h5, diagnostics + + +def resolve_candidate(candidate: str, work_dir: Path) -> Path: + """Resolve the candidate H5 to a local path. + + A local path is returned as-is. An ``http(s)://`` or ``hf://`` URI is + downloaded into ``work_dir`` (skipped in ``DRYRUN`` mode). + """ + if candidate.startswith(("http://", "https://")): + target = work_dir / Path(candidate.split("?", 1)[0]).name + if _dryrun_enabled(): + logger.info("[dryrun] would download candidate from %s", candidate) + return target + work_dir.mkdir(parents=True, exist_ok=True) + from urllib.request import urlretrieve + + urlretrieve(candidate, target) # noqa: S310 - trusted CI input + return target + + if candidate.startswith("hf://"): + # Form: hf:/// (dataset repo assumed). + remainder = candidate[len("hf://") :] + repo_id, _, filename = remainder.rpartition("/") + target = work_dir / Path(filename).name + if _dryrun_enabled(): + logger.info( + "[dryrun] would download candidate %s from hf dataset %s", + filename, + repo_id, + ) + return target + work_dir.mkdir(parents=True, exist_ok=True) + from huggingface_hub import hf_hub_download + + return Path( + hf_hub_download( + repo_id=repo_id, + repo_type="dataset", + filename=filename, + local_dir=str(work_dir), + token=os.environ.get("HUGGING_FACE_TOKEN"), + ) + ) + + return Path(candidate).expanduser() + + +def build_comparison_command( + candidate: Path, + baseline: Path, + output_dir: Path, + period: int = DEFAULT_PERIOD, + matched_sample_method: str = DEFAULT_MATCHED_SAMPLE_METHOD, + holdout_target_fraction: float = DEFAULT_HOLDOUT_TARGET_FRACTION, + optimizer_max_iter: int = DEFAULT_OPTIMIZER_MAX_ITER, + policyengine_us_data_repo: str | None = None, + policyengine_us_data_python: str | None = None, +) -> list[str]: + """Assemble the comparison console-script command with the sound flags.""" + command = [ + COMPARISON_CONSOLE_SCRIPT, + "--candidate-dataset", + str(candidate), + "--baseline-dataset", + str(baseline), + "--output-dir", + str(output_dir), + "--period", + str(period), + "--matched-sample-method", + matched_sample_method, + "--holdout-target-fraction", + str(holdout_target_fraction), + "--optimizer-max-iter", + str(optimizer_max_iter), + "--force", + ] + if policyengine_us_data_repo: + command += ["--policyengine-us-data-repo", policyengine_us_data_repo] + if policyengine_us_data_python: + command += ["--policyengine-us-data-python", policyengine_us_data_python] + return command + + +def render_step_summary( + gate: CloneFloorGateResult, + result: dict[str, Any] | None, +) -> str: + """Render the full GitHub Step Summary as Markdown. + + Includes matched N, both losses, train/holdout losses, + ``candidate_beats_baseline``, every soundness gate, the clone-floor gate + outcome, and the honest-reporting caveat. A pure function so the exact + reporting text can be asserted in tests. + """ + lines: list[str] = ["## Sound eCPS-replacement comparison", ""] + + # Clone-floor baseline gate. + floor_status = "PASS" if gate.passed else "FAIL" + lines.append(f"### Clone-floor baseline gate: {floor_status}") + lines.append("") + lines.append(f"- {gate.message}") + lines.append("") + + if result is None: + lines.append( + "> Comparison was **not run** because the clone-floor gate failed." + ) + lines.append("") + else: + summary = result.get("summary", {}) + beats = summary.get("candidate_beats_baseline") + candidate_loss = summary.get("candidate_enhanced_cps_native_loss") + baseline_loss = summary.get("baseline_enhanced_cps_native_loss") + lines.extend( + [ + "### Results", + "", + f"- Matched N (households): {summary.get('matched_household_count')}", + f"- Baseline (eCPS) loss: {baseline_loss}", + f"- Candidate (MP) loss: {candidate_loss}", + f"- Baseline (eCPS) train loss: {summary.get('baseline_train_loss')}", + f"- Candidate (MP) train loss: {summary.get('candidate_train_loss')}", + f"- Baseline (eCPS) holdout loss: {summary.get('baseline_holdout_loss')}", + f"- Candidate (MP) holdout loss: {summary.get('candidate_holdout_loss')}", + f"- Holdout target fraction: {summary.get('holdout_target_fraction')}", + f"- candidate_beats_baseline: **{beats}**", + ] + ) + gates = _soundness_gates(summary) + if gates: + lines.append("- Soundness gates:") + for name, passed in gates.items(): + lines.append(f" - {name}: {'PASS' if passed else 'FAIL'}") + lines.append("") + + lines.append("### Honest-reporting caveat") + lines.append("") + lines.append(f"> {HONEST_REPORTING_CAVEAT}") + lines.append("") + return "\n".join(lines) + + +def _soundness_gates(summary: dict[str, Any]) -> dict[str, Any]: + """Pull the comparison's soundness-gate booleans out of its summary. + + The comparison reports these as individual keys in ``summary``; surface the + ones present so the step summary lists exactly what the run enforced. + """ + gate_keys = ( + "matched_household_count", + "symmetric_refit", + "score_candidate_only", + "refit_objective_matches_scoring", + "ecps_refit_recovery_passed", + ) + return {key: summary[key] for key in gate_keys if key in summary} + + +def _write_step_summary(text: str) -> None: + """Append ``text`` to ``$GITHUB_STEP_SUMMARY`` if set; always log it.""" + logger.info("\n%s", text) + summary_path = os.environ.get("GITHUB_STEP_SUMMARY") + if summary_path: + with open(summary_path, "a", encoding="utf-8") as handle: + handle.write(text + "\n") + + +def _load_result(output_dir: Path) -> dict[str, Any]: + """Load the comparison result JSON from ``output_dir``.""" + result_path = output_dir / RESULT_JSON_FILENAME + return json.loads(result_path.read_text()) + + +def run(args: argparse.Namespace) -> int: + """Resolve inputs, run the gate, run the comparison, emit the summary.""" + logging.basicConfig(level=logging.INFO, format="%(message)s") + work_dir = Path(args.work_dir) + output_dir = Path(args.output_dir) + dryrun = _dryrun_enabled() + + baseline_h5, baseline_diag = resolve_baseline(args.baseline_source, work_dir) + candidate_h5 = resolve_candidate(args.candidate, work_dir) + + logger.info("Baseline H5: %s", baseline_h5) + logger.info("Baseline diagnostics: %s", baseline_diag) + logger.info("Candidate H5: %s", candidate_h5) + + command = build_comparison_command( + candidate=candidate_h5, + baseline=baseline_h5, + output_dir=output_dir, + period=args.period, + matched_sample_method=args.matched_sample_method, + holdout_target_fraction=args.holdout_target_fraction, + optimizer_max_iter=args.optimizer_max_iter, + policyengine_us_data_repo=args.policyengine_us_data_repo, + policyengine_us_data_python=args.policyengine_us_data_python, + ) + + if dryrun: + # Show the clone-floor gate decision if the sidecar happens to exist + # locally, then print the command that would run and exit. + gate = evaluate_clone_floor_gate(baseline_diag, floor=args.clone_floor) + logger.info("[dryrun] clone-floor gate: %s", gate.message) + logger.info("[dryrun] would run:\n %s", " ".join(command)) + return 0 + + # --- Clone-floor baseline gate (fail closed) --------------------------- # + gate = evaluate_clone_floor_gate(baseline_diag, floor=args.clone_floor) + if not gate.passed: + logger.error("CLONE-FLOOR GATE FAILED: %s", gate.message) + _write_step_summary(render_step_summary(gate, result=None)) + return 1 + logger.info("Clone-floor gate passed: %s", gate.message) + + # --- Run the comparison ------------------------------------------------ # + logger.info("Running comparison:\n %s", " ".join(command)) + completed = subprocess.run(command, check=False) + if completed.returncode != 0: + logger.error("comparison exited %s", completed.returncode) + return completed.returncode + + result = _load_result(output_dir) + _write_step_summary(render_step_summary(gate, result=result)) + return 0 + + +def build_arg_parser() -> argparse.ArgumentParser: + """Build the argument parser for the runner.""" + parser = argparse.ArgumentParser( + description=( + "Resolve baseline + candidate, run the clone-floor gate, run the " + "sound eCPS-replacement comparison, and emit a GitHub Step Summary." + ) + ) + parser.add_argument( + "--candidate", + required=True, + help=("Candidate Microplex H5: local path, http(s):// URI, or hf://repo/file."), + ) + parser.add_argument( + "--baseline-source", + default=None, + help=( + "Baseline source: a local Enhanced CPS H5 path, or 'latest' " + "(default) to pull the latest published eCPS from Hugging Face." + ), + ) + parser.add_argument( + "--work-dir", + default="eval_work", + help="Directory for downloaded artifacts.", + ) + parser.add_argument( + "--output-dir", + default="comparison_output", + help="Directory for comparison artifacts.", + ) + parser.add_argument("--period", type=int, default=DEFAULT_PERIOD) + parser.add_argument( + "--matched-sample-method", + choices=("uniform", "weight_proportional", "pps", "largest_weight"), + default=DEFAULT_MATCHED_SAMPLE_METHOD, + ) + parser.add_argument( + "--holdout-target-fraction", + type=float, + default=DEFAULT_HOLDOUT_TARGET_FRACTION, + ) + parser.add_argument( + "--optimizer-max-iter", + type=int, + default=DEFAULT_OPTIMIZER_MAX_ITER, + ) + parser.add_argument( + "--clone-floor", + type=float, + default=DEFAULT_CLONE_FLOOR, + help="Minimum acceptable baseline clone weight share (default 0.05).", + ) + parser.add_argument( + "--policyengine-us-data-repo", + default=None, + help="Optional path/URL passed through to the comparison.", + ) + parser.add_argument( + "--policyengine-us-data-python", + default=None, + help="Optional interpreter path passed through to the comparison.", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + """CLI entry point for the runner.""" + parser = build_arg_parser() + args = parser.parse_args(argv) + return run(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/microplex_us/pipelines/ecps_clone_floor.py b/src/microplex_us/pipelines/ecps_clone_floor.py new file mode 100644 index 0000000..be7efd8 --- /dev/null +++ b/src/microplex_us/pipelines/ecps_clone_floor.py @@ -0,0 +1,300 @@ +"""Clone-floor baseline gate for the eCPS-replacement eval. + +Before the CI eval spends ~20-30 minutes benchmarking a candidate against a +baseline Enhanced CPS, it must confirm the baseline is actually worth measuring +against. A degraded baseline - one whose clone (PUF-support / donor-replay) +records have collapsed back toward the bare CPS-ASEC weights - makes any +``candidate_beats_baseline`` result meaningless: the candidate is then "beating" +a broken artifact, not a real eCPS. + +This module reads the baseline's ``enhanced_cps_2024.clone_diagnostics.json`` +sidecar and decides whether the clone household-weight share clears a floor +(default 5%). It is the single, unit-tested decision point the workflow shells +out to, so the gate logic is testable without running the heavy comparison. + +The sidecar schema is intentionally tolerant. It accepts, in priority order: + +1. A top-level ``clone_household_weight_share`` (or any of the equivalent keys + the repo already uses elsewhere, e.g. ``mp300k_artifact_gates`` reads + ``clone_household_weight_share`` / ``puf_clone_household_weight_share`` / + ``support_household_weight_share``). +2. The same keys nested under a ``summary`` object. +3. A ``sources`` list (or dict) of per-source rows; the clone share is then the + summed ``household_weight_share`` of every row whose ``source_class`` / + ``source_name`` marks it as a clone/support/donor-replay source (matching + the existing convention in ``mp300k_artifact_gates``). + +Missing or malformed sidecars fail the gate closed: an unverifiable baseline is +treated exactly like a degraded one, never silently benchmarked. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path + +# Default minimum acceptable clone household-weight share. Below this, the +# baseline eCPS has decayed toward the un-enhanced CPS and must not be used as a +# benchmark target. +DEFAULT_CLONE_FLOOR = 0.05 + +# Keys that, anywhere we look, carry a precomputed clone/support weight share. +# Ordered by specificity; the first present numeric value wins. +_CLONE_SHARE_KEYS = ( + "clone_household_weight_share", + "puf_clone_household_weight_share", + "support_household_weight_share", + "puf_support_household_weight_share", + "clone_weight_share", + "support_weight_share", +) + +# Per-source-row keys that hold that row's household weight share. +_ENTRY_SHARE_KEYS = ("household_weight_share", "weight_share", "share") + +# Tokens that mark a source row as a clone / support / donor-replay source. +_CLONE_SOURCE_TOKENS = ("clone", "support", "donor_replay") + +# Tokens that explicitly mark a source row as NOT a clone (a real base source). +_NON_CLONE_SOURCE_TOKENS = ("fixed", "forbes") + + +@dataclass(frozen=True) +class CloneFloorGateResult: + """Outcome of the clone-floor baseline gate. + + Attributes + ---------- + passed: + ``True`` only when the baseline is trustworthy enough to benchmark + against: a readable sidecar whose clone weight share meets the floor. + message: + Human-readable explanation - loud and specific on failure so CI logs + make the reason obvious. + clone_weight_share: + The observed clone household-weight share, or ``None`` when it could + not be read. + floor: + The floor the share was checked against. + """ + + passed: bool + message: str + clone_weight_share: float | None + floor: float + + +def _coerce_float(value: object) -> float | None: + """Return ``value`` as a float, or ``None`` if it is not finite/numeric.""" + if value is None or isinstance(value, bool): + return None + try: + result = float(value) # type: ignore[arg-type] + except (TypeError, ValueError): + return None + if result != result or result in (float("inf"), float("-inf")): + return None + return result + + +def _first_share_in_mapping(mapping: dict[str, object]) -> float | None: + """Return the first recognized precomputed clone-share value in a mapping.""" + for key in _CLONE_SHARE_KEYS: + if key in mapping: + share = _coerce_float(mapping[key]) + if share is not None: + return share + return None + + +def _source_rows(payload: dict[str, object]) -> list[dict[str, object]]: + """Extract per-source rows from any of the supported container keys.""" + for key in ("sources", "source_classes", "source_weight_shares"): + value = payload.get(key) + if isinstance(value, list): + return [row for row in value if isinstance(row, dict)] + if isinstance(value, dict): + rows: list[dict[str, object]] = [] + for name, row in value.items(): + if isinstance(row, dict): + rows.append({"source_name": name, **row}) + return rows + return [] + + +def _row_is_clone(row: dict[str, object]) -> bool: + """Return ``True`` if a source row represents a clone/support source.""" + source_class = str( + row.get("source_class") + or row.get("class") + or row.get("kind") + or row.get("category") + or "" + ).lower() + source_name = str(row.get("source_name") or row.get("name") or "").lower() + haystack = f"{source_class} {source_name}" + if any(token in haystack for token in _NON_CLONE_SOURCE_TOKENS): + return False + return any(token in haystack for token in _CLONE_SOURCE_TOKENS) + + +def _row_share(row: dict[str, object]) -> float | None: + """Return a source row's household weight share, if present.""" + for key in _ENTRY_SHARE_KEYS: + if key in row: + share = _coerce_float(row[key]) + if share is not None: + return share + return None + + +def extract_clone_weight_share(payload: dict[str, object]) -> float | None: + """Extract the clone household-weight share from a diagnostics payload. + + Tries, in order: top-level precomputed keys, the same keys nested under a + ``summary`` object, then summing the shares of clone source rows. + + Returns ``None`` if no clone share can be determined (the caller then fails + the gate closed). + """ + direct = _first_share_in_mapping(payload) + if direct is not None: + return direct + + summary = payload.get("summary") + if isinstance(summary, dict): + nested = _first_share_in_mapping(summary) + if nested is not None: + return nested + + rows = _source_rows(payload) + if rows: + clone_shares = [ + share + for row in rows + if _row_is_clone(row) and (share := _row_share(row)) is not None + ] + if clone_shares: + return float(sum(clone_shares)) + + return None + + +def load_clone_diagnostics(path: Path) -> dict[str, object]: + """Load and parse a clone-diagnostics JSON sidecar. + + Raises + ------ + FileNotFoundError + If the sidecar does not exist. + ValueError + If the sidecar is not valid JSON or is not a JSON object. + """ + if not path.exists(): + raise FileNotFoundError(path) + try: + payload = json.loads(path.read_text()) + except json.JSONDecodeError as exc: + raise ValueError(f"{path} is not valid JSON: {exc}") from exc + if not isinstance(payload, dict): + raise ValueError(f"{path} must contain a JSON object") + return payload + + +def evaluate_clone_floor_gate( + path: Path, + floor: float = DEFAULT_CLONE_FLOOR, +) -> CloneFloorGateResult: + """Evaluate the clone-floor baseline gate for a diagnostics sidecar. + + Folds together the three cases CI must distinguish: + + * **Healthy** - the sidecar exists, parses, exposes a clone share, and that + share meets ``floor``. The gate passes. + * **Degraded** - the sidecar exists and exposes a clone share, but it has + fallen below ``floor``. The baseline has decayed back toward the bare CPS + (see #113); the gate fails *loudly*. + * **Missing or malformed** - the sidecar is absent, unparseable, or exposes + no clone share. We cannot prove the baseline is healthy, so we *fail + closed*: an unverifiable baseline is treated exactly like a bad one. + + Parameters + ---------- + path: + Path to the baseline's ``*.clone_diagnostics.json`` sidecar. + floor: + Minimum acceptable clone household-weight share (default 5%). + + Returns + ------- + CloneFloorGateResult + Structured outcome with a loud message on failure. + """ + try: + payload = load_clone_diagnostics(path) + except FileNotFoundError: + return CloneFloorGateResult( + passed=False, + message=( + f"baseline eCPS clone diagnostics not found at {path} - " + "refusing to benchmark against an unverifiable baseline " + "(fail closed; see #113)" + ), + clone_weight_share=None, + floor=floor, + ) + except ValueError as exc: + return CloneFloorGateResult( + passed=False, + message=( + f"baseline eCPS clone diagnostics at {path} are malformed " + f"({exc}) - refusing to benchmark against an unverifiable " + "baseline (fail closed; see #113)" + ), + clone_weight_share=None, + floor=floor, + ) + + share = extract_clone_weight_share(payload) + if share is None: + return CloneFloorGateResult( + passed=False, + message=( + f"baseline eCPS clone diagnostics at {path} expose no clone " + "household-weight share - refusing to benchmark against an " + "unverifiable baseline (fail closed; see #113)" + ), + clone_weight_share=None, + floor=floor, + ) + + if share >= floor: + return CloneFloorGateResult( + passed=True, + message=( + f"baseline eCPS clone share {share:.1%} >= {floor:.1%} floor " + "- baseline is healthy" + ), + clone_weight_share=share, + floor=floor, + ) + + return CloneFloorGateResult( + passed=False, + message=( + f"baseline eCPS degraded; clone share {share:.1%} < {floor:.1%} " + "floor - refusing to benchmark against a bad baseline (see #113)" + ), + clone_weight_share=share, + floor=floor, + ) + + +__all__ = [ + "DEFAULT_CLONE_FLOOR", + "CloneFloorGateResult", + "evaluate_clone_floor_gate", + "extract_clone_weight_share", + "load_clone_diagnostics", +] diff --git a/tests/pipelines/test_ecps_clone_floor.py b/tests/pipelines/test_ecps_clone_floor.py new file mode 100644 index 0000000..6041c15 --- /dev/null +++ b/tests/pipelines/test_ecps_clone_floor.py @@ -0,0 +1,171 @@ +"""Tests for the clone-floor baseline gate. + +The gate is the CI decision point that refuses to benchmark a candidate against +a degraded or unverifiable baseline Enhanced CPS. These tests pin its behaviour +with tiny fixtures: a healthy share passes, a degraded share fails loudly, and a +missing / malformed sidecar fails closed. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +from microplex_us.pipelines.ecps_clone_floor import ( + CloneFloorGateResult, + evaluate_clone_floor_gate, + extract_clone_weight_share, +) + + +def _write(path: Path, payload: dict) -> None: + path.write_text(json.dumps(payload)) + + +# --------------------------------------------------------------------------- # +# Happy / degraded paths +# --------------------------------------------------------------------------- # +def test_gate_passes_above_floor(tmp_path: Path) -> None: + """A healthy 10% clone share passes the default 5% floor.""" + p = tmp_path / "enhanced_cps_2024.clone_diagnostics.json" + _write(p, {"clone_household_weight_share": 0.10}) + result = evaluate_clone_floor_gate(p) + assert isinstance(result, CloneFloorGateResult) + assert result.passed is True + assert result.clone_weight_share == 0.10 + assert "healthy" in result.message + + +def test_gate_fails_below_floor_loudly(tmp_path: Path) -> None: + """A degraded 2% clone share fails loudly with a specific message.""" + p = tmp_path / "enhanced_cps_2024.clone_diagnostics.json" + _write(p, {"clone_household_weight_share": 0.02}) + result = evaluate_clone_floor_gate(p) + assert result.passed is False + assert result.clone_weight_share == 0.02 + assert "degraded" in result.message + assert "2.0%" in result.message + assert "refusing to benchmark" in result.message + + +def test_gate_exactly_at_floor_passes(tmp_path: Path) -> None: + """A share exactly at the floor passes (>= comparison).""" + p = tmp_path / "enhanced_cps_2024.clone_diagnostics.json" + _write(p, {"clone_household_weight_share": 0.05}) + assert evaluate_clone_floor_gate(p).passed is True + + +# --------------------------------------------------------------------------- # +# Fail-closed paths +# --------------------------------------------------------------------------- # +def test_gate_fails_closed_when_file_missing(tmp_path: Path) -> None: + """A missing sidecar fails closed (refuse, don't silently benchmark).""" + p = tmp_path / "does_not_exist.clone_diagnostics.json" + result = evaluate_clone_floor_gate(p) + assert result.passed is False + assert result.clone_weight_share is None + assert "not found" in result.message + assert "fail closed" in result.message + + +def test_gate_fails_closed_when_field_missing(tmp_path: Path) -> None: + """A sidecar exposing no clone share fails closed, not silently. + + Documented choice: a missing clone-share field is treated like a bad + baseline (fail closed), never as an implicit pass. + """ + p = tmp_path / "enhanced_cps_2024.clone_diagnostics.json" + _write(p, {"total_households": 41000, "some_other_field": 1}) + result = evaluate_clone_floor_gate(p) + assert result.passed is False + assert result.clone_weight_share is None + assert "no clone" in result.message + assert "fail closed" in result.message + + +def test_gate_fails_closed_when_json_malformed(tmp_path: Path) -> None: + """An unparseable sidecar fails closed with a malformed message.""" + p = tmp_path / "enhanced_cps_2024.clone_diagnostics.json" + p.write_text("{not valid json") + result = evaluate_clone_floor_gate(p) + assert result.passed is False + assert result.clone_weight_share is None + assert "malformed" in result.message + assert "fail closed" in result.message + + +def test_gate_fails_closed_when_share_non_numeric(tmp_path: Path) -> None: + """A non-numeric clone-share value is treated as absent (fail closed).""" + p = tmp_path / "enhanced_cps_2024.clone_diagnostics.json" + _write(p, {"clone_household_weight_share": "lots"}) + result = evaluate_clone_floor_gate(p) + assert result.passed is False + assert result.clone_weight_share is None + + +# --------------------------------------------------------------------------- # +# Custom floor + schema flexibility +# --------------------------------------------------------------------------- # +def test_gate_respects_custom_floor(tmp_path: Path) -> None: + """A 4% share passes a 3% floor but fails the default 5% floor.""" + p = tmp_path / "enhanced_cps_2024.clone_diagnostics.json" + _write(p, {"clone_household_weight_share": 0.04}) + assert evaluate_clone_floor_gate(p, floor=0.03).passed is True + assert evaluate_clone_floor_gate(p, floor=0.05).passed is False + + +def test_gate_reads_share_from_summary(tmp_path: Path) -> None: + """The share may live under a nested 'summary' object.""" + p = tmp_path / "enhanced_cps_2024.clone_diagnostics.json" + _write(p, {"summary": {"support_household_weight_share": 0.08}}) + result = evaluate_clone_floor_gate(p) + assert result.passed is True + assert result.clone_weight_share == 0.08 + + +def test_extract_share_sums_clone_source_rows() -> None: + """When only a sources list is present, clone rows are summed.""" + payload = { + "sources": [ + { + "source_name": "cps_asec", + "source_class": "base", + "household_weight_share": 0.90, + }, + { + "source_name": "irs_soi_puf_support_clone", + "source_class": "puf_support", + "household_weight_share": 0.07, + }, + { + "source_name": "donor_replay_psid", + "source_class": "donor_replay", + "household_weight_share": 0.03, + }, + { + "source_name": "forbes_400_fixed", + "source_class": "fixed", + "household_weight_share": 0.001, + }, + ] + } + # 0.07 + 0.03 = 0.10; base and forbes/fixed are excluded. + assert extract_clone_weight_share(payload) == 0.10 + + +def test_gate_passes_via_summed_clone_source_rows(tmp_path: Path) -> None: + """End-to-end: a sources-list sidecar with 10% clone share passes.""" + p = tmp_path / "enhanced_cps_2024.clone_diagnostics.json" + _write( + p, + { + "sources": [ + {"source_name": "cps_asec", "household_weight_share": 0.90}, + { + "source_name": "puf_support_clone", + "household_weight_share": 0.10, + }, + ] + }, + ) + assert evaluate_clone_floor_gate(p).passed is True diff --git a/tests/test_run_ecps_eval.py b/tests/test_run_ecps_eval.py new file mode 100644 index 0000000..cd183ca --- /dev/null +++ b/tests/test_run_ecps_eval.py @@ -0,0 +1,301 @@ +"""Tests for the CI runner script (scripts/run_ecps_eval.py). + +These validate the orchestration logic *without* running the heavy eval: +command assembly, baseline/candidate resolution, DRYRUN behaviour, step-summary +rendering (including the honest-reporting caveat), and the clone-floor gate +short-circuit. The actual comparison subprocess is monkeypatched. +""" + +from __future__ import annotations + +import importlib.util +import json +from pathlib import Path +from types import ModuleType +from typing import Any + +import pytest + +from microplex_us.pipelines.ecps_clone_floor import CloneFloorGateResult + + +def _load_runner() -> ModuleType: + """Import scripts/run_ecps_eval.py as a module by path.""" + script = Path(__file__).resolve().parent.parent / "scripts" / "run_ecps_eval.py" + spec = importlib.util.spec_from_file_location("run_ecps_eval", script) + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +runner = _load_runner() + + +def _write_diag(path: Path, share: float) -> None: + path.write_text(json.dumps({"clone_household_weight_share": share})) + + +# --------------------------------------------------------------------------- # +# Command assembly +# --------------------------------------------------------------------------- # +def test_build_comparison_command_has_sound_flags(tmp_path: Path) -> None: + cmd = runner.build_comparison_command( + candidate=tmp_path / "cand.h5", + baseline=tmp_path / "base.h5", + output_dir=tmp_path / "out", + ) + assert cmd[0] == runner.COMPARISON_CONSOLE_SCRIPT + joined = " ".join(cmd) + assert "--candidate-dataset" in joined + assert "--baseline-dataset" in joined + assert "--matched-sample-method uniform" in joined + assert "--holdout-target-fraction 0.2" in joined + assert "--optimizer-max-iter 200" in joined + assert "--period 2024" in joined + assert "--force" in joined + + +def test_build_comparison_command_passes_through_pe_data_repo(tmp_path: Path) -> None: + cmd = runner.build_comparison_command( + candidate=tmp_path / "cand.h5", + baseline=tmp_path / "base.h5", + output_dir=tmp_path / "out", + policyengine_us_data_repo="/some/repo", + policyengine_us_data_python="/some/python", + ) + joined = " ".join(cmd) + assert "--policyengine-us-data-repo /some/repo" in joined + assert "--policyengine-us-data-python /some/python" in joined + + +# --------------------------------------------------------------------------- # +# Baseline / candidate resolution +# --------------------------------------------------------------------------- # +def test_resolve_baseline_local_path(tmp_path: Path) -> None: + h5 = tmp_path / "enhanced_cps_2024.h5" + h5.write_text("x") + resolved_h5, resolved_diag = runner.resolve_baseline(str(h5), tmp_path) + assert resolved_h5 == h5 + assert resolved_diag.name == runner.BASELINE_DIAGNOSTICS_FILENAME + assert resolved_diag.parent == h5.parent + + +def test_resolve_baseline_latest_is_dryrun_safe( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """'latest' under DRYRUN returns expected paths without downloading.""" + monkeypatch.setenv("DRYRUN", "1") + h5, diag = runner.resolve_baseline("latest published eCPS", tmp_path) + assert h5.name == runner.BASELINE_H5_FILENAME + assert diag.name == runner.BASELINE_DIAGNOSTICS_FILENAME + + +def test_resolve_candidate_local_path(tmp_path: Path) -> None: + cand = tmp_path / "candidate.h5" + assert runner.resolve_candidate(str(cand), tmp_path) == cand + + +def test_resolve_candidate_http_dryrun( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setenv("DRYRUN", "1") + target = runner.resolve_candidate("https://example.com/data/candidate.h5", tmp_path) + assert target == tmp_path / "candidate.h5" + + +def test_resolve_candidate_hf_dryrun( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setenv("DRYRUN", "1") + target = runner.resolve_candidate( + "hf://policyengine/policyengine-us-data/enhanced_cps_2024.h5", tmp_path + ) + assert target == tmp_path / "enhanced_cps_2024.h5" + + +# --------------------------------------------------------------------------- # +# Step-summary rendering +# --------------------------------------------------------------------------- # +def _summary_result() -> dict[str, Any]: + return { + "summary": { + "matched_household_count": 41000, + "baseline_enhanced_cps_native_loss": 1.0, + "candidate_enhanced_cps_native_loss": 0.5, + "baseline_train_loss": 0.9, + "candidate_train_loss": 0.4, + "baseline_holdout_loss": 0.95, + "candidate_holdout_loss": 0.45, + "holdout_target_fraction": 0.2, + "candidate_beats_baseline": True, + "symmetric_refit": True, + "score_candidate_only": False, + "refit_objective_matches_scoring": True, + "ecps_refit_recovery_passed": True, + } + } + + +def test_render_summary_with_result_includes_everything() -> None: + gate = CloneFloorGateResult( + passed=True, + message="baseline eCPS clone share 42.0% >= 5.0% floor - healthy", + clone_weight_share=0.42, + floor=0.05, + ) + text = runner.render_step_summary(gate, _summary_result()) + assert "Matched N (households): 41000" in text + assert "Baseline (eCPS) loss: 1.0" in text + assert "Candidate (MP) loss: 0.5" in text + assert "train loss: 0.9" in text + assert "train loss: 0.4" in text + assert "holdout loss: 0.95" in text + assert "holdout loss: 0.45" in text + assert "candidate_beats_baseline: **True**" in text + assert "symmetric_refit: PASS" in text + assert "ecps_refit_recovery_passed: PASS" in text + assert "Clone-floor baseline gate: PASS" in text + # Honest-reporting caveat must appear verbatim. + assert runner.HONEST_REPORTING_CAVEAT in text + assert "#113" in text + + +def test_render_summary_gate_failed_says_not_run() -> None: + gate = CloneFloorGateResult( + passed=False, + message="baseline eCPS degraded; clone share 2.0% < 5.0% floor", + clone_weight_share=0.02, + floor=0.05, + ) + text = runner.render_step_summary(gate, result=None) + assert "Clone-floor baseline gate: FAIL" in text + assert "was **not run**" in text + # Caveat is always present. + assert runner.HONEST_REPORTING_CAVEAT in text + + +# --------------------------------------------------------------------------- # +# End-to-end orchestration (subprocess monkeypatched) +# --------------------------------------------------------------------------- # +def test_run_dryrun_prints_command_and_runs_nothing( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """DRYRUN prints the would-run command and never calls subprocess.""" + monkeypatch.setenv("DRYRUN", "1") + + def _fail_run(*args: object, **kwargs: object) -> None: + raise AssertionError("subprocess.run must not be called in DRYRUN") + + monkeypatch.setattr(runner.subprocess, "run", _fail_run) + + cand = tmp_path / "candidate.h5" + base = tmp_path / "enhanced_cps_2024.h5" + args = runner.build_arg_parser().parse_args( + [ + "--candidate", + str(cand), + "--baseline-source", + str(base), + "--work-dir", + str(tmp_path / "work"), + "--output-dir", + str(tmp_path / "out"), + ] + ) + with caplog.at_level("INFO"): + rc = runner.run(args) + assert rc == 0 + assert "would run" in caplog.text + assert runner.COMPARISON_CONSOLE_SCRIPT in caplog.text + + +def test_run_gate_failure_short_circuits( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """A degraded baseline makes run() return 1 without invoking the eval.""" + monkeypatch.delenv("DRYRUN", raising=False) + + def _fail_run(*args: object, **kwargs: object) -> None: + raise AssertionError("comparison must not run when the gate fails") + + monkeypatch.setattr(runner.subprocess, "run", _fail_run) + + cand = tmp_path / "candidate.h5" + cand.write_text("x") + base = tmp_path / "enhanced_cps_2024.h5" + base.write_text("x") + _write_diag(base.with_name(runner.BASELINE_DIAGNOSTICS_FILENAME), share=0.02) + + summary_file = tmp_path / "summary.md" + monkeypatch.setenv("GITHUB_STEP_SUMMARY", str(summary_file)) + + args = runner.build_arg_parser().parse_args( + [ + "--candidate", + str(cand), + "--baseline-source", + str(base), + "--work-dir", + str(tmp_path / "work"), + "--output-dir", + str(tmp_path / "out"), + ] + ) + rc = runner.run(args) + assert rc == 1 + written = summary_file.read_text() + assert "Clone-floor baseline gate: FAIL" in written + assert "degraded" in written + + +def test_run_happy_path_invokes_eval_and_summarizes( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """A healthy baseline runs the eval (mocked) and writes the summary.""" + monkeypatch.delenv("DRYRUN", raising=False) + + output_dir = tmp_path / "out" + + class _Completed: + returncode = 0 + + def _fake_run(cmd: list[str], **kwargs: object) -> _Completed: + # Simulate the comparison writing its result JSON. + output_dir.mkdir(parents=True, exist_ok=True) + (output_dir / runner.RESULT_JSON_FILENAME).write_text( + json.dumps(_summary_result()) + ) + return _Completed() + + monkeypatch.setattr(runner.subprocess, "run", _fake_run) + + cand = tmp_path / "candidate.h5" + cand.write_text("x") + base = tmp_path / "enhanced_cps_2024.h5" + base.write_text("x") + _write_diag(base.with_name(runner.BASELINE_DIAGNOSTICS_FILENAME), share=0.42) + + summary_file = tmp_path / "summary.md" + monkeypatch.setenv("GITHUB_STEP_SUMMARY", str(summary_file)) + + args = runner.build_arg_parser().parse_args( + [ + "--candidate", + str(cand), + "--baseline-source", + str(base), + "--work-dir", + str(tmp_path / "work"), + "--output-dir", + str(output_dir), + ] + ) + rc = runner.run(args) + assert rc == 0 + written = summary_file.read_text() + assert "Clone-floor baseline gate: PASS" in written + assert "candidate_beats_baseline: **True**" in written + assert runner.HONEST_REPORTING_CAVEAT in written