From 3fe5102f80e4e1383a838cc91af78464f828bdf2 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 19 May 2026 22:10:34 +0200 Subject: [PATCH] Add epsilon-insensitive calibration policy --- changelog.d/1053.added.md | 1 + modal_app/pipeline.py | 16 +- modal_app/remote_calibration_runner.py | 20 + modal_app/step_manifests/state.py | 2 +- .../calibration/signatures.py | 17 + .../calibration/target_policy.py | 462 ++++++++++++++++++ .../calibration/target_policy.yaml | 31 ++ .../calibration/unified_calibration.py | 335 ++++++++++++- .../stage_contracts/calibration_package.py | 14 + .../calibration_package_schema.py | 45 ++ pyproject.toml | 2 +- scripts/run_epsilon_calibration_demo.py | 126 +++++ tests/unit/calibration/test_target_policy.py | 129 +++++ .../calibration/test_unified_calibration.py | 132 ++++- .../calibration_package_stage_contract.py | 20 + ...test_calibration_package_stage_contract.py | 12 + tests/unit/test_pipeline.py | 5 + tests/unit/test_remote_calibration_runner.py | 8 + uv.lock | 10 +- 19 files changed, 1362 insertions(+), 25 deletions(-) create mode 100644 changelog.d/1053.added.md create mode 100644 policyengine_us_data/calibration/target_policy.py create mode 100644 policyengine_us_data/calibration/target_policy.yaml create mode 100644 scripts/run_epsilon_calibration_demo.py create mode 100644 tests/unit/calibration/test_target_policy.py diff --git a/changelog.d/1053.added.md b/changelog.d/1053.added.md new file mode 100644 index 000000000..ab107d758 --- /dev/null +++ b/changelog.d/1053.added.md @@ -0,0 +1 @@ +Add epsilon-insensitive calibration target tolerances, target-policy artifacts, and hard-fail versus warning enforcement for calibration diagnostics. diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index 39a437808..c683771c6 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -162,6 +162,7 @@ def _calibration_package_parameters( workers: int, n_clones: int, target_config: str | None, + target_policy: str | None, skip_county: bool, chunked_matrix: bool, chunk_size: int, @@ -174,6 +175,7 @@ def _calibration_package_parameters( "workers": workers if not chunked_matrix else None, "n_clones": n_clones, "target_config": target_config, + "target_policy": target_policy, "skip_county": skip_county, "chunked_matrix": bool(chunked_matrix), "chunk_size": chunk_size if chunked_matrix else None, @@ -281,6 +283,8 @@ def archive_diagnostics( "log": f"{prefix}unified_diagnostics.csv", "cal_log": f"{prefix}calibration_log.csv", "config": f"{prefix}unified_run_config.json", + "target_policy": f"{prefix}calibration_target_policy.jsonl", + "target_policy_summary": (f"{prefix}calibration_target_policy_summary.json"), } for key, filename in file_map.items(): @@ -1242,6 +1246,7 @@ def run_pipeline( workers=num_workers, n_clones=n_clones, target_config=None, + target_policy="policyengine_us_data/calibration/target_policy.yaml", skip_county=True, chunked_matrix=chunked_matrix, chunk_size=chunk_size, @@ -1302,7 +1307,12 @@ def run_pipeline( completed_package_manifest = _complete_step_manifest( active_step_manifest, outputs=collect_artifacts( - [_artifacts_dir(run_id) / "calibration_package.pkl"], + [ + _artifacts_dir(run_id) / "calibration_package.pkl", + _artifacts_dir(run_id) / "calibration_target_policy.jsonl", + _artifacts_dir(run_id) + / "calibration_target_policy_summary.json", + ], missing_ok=True, ), vol=pipeline_volume, @@ -1321,19 +1331,23 @@ def run_pipeline( "gpu": gpu, "epochs": epochs, "target_config": "policyengine_us_data/calibration/target_config.yaml", + "target_policy": "policyengine_us_data/calibration/target_policy.yaml", "beta": 0.65, "lambda_l0": 1e-7, "lambda_l2": 1e-8, "log_freq": 100, + "loss_type": "relative_epsilon", } national_fit_parameters = { "gpu": national_gpu, "epochs": national_epochs, "target_config": "policyengine_us_data/calibration/target_config.yaml", + "target_policy": "policyengine_us_data/calibration/target_policy.yaml", "beta": 0.65, "lambda_l0": NATIONAL_FIT_LAMBDA_L0, "lambda_l2": 1e-12, "log_freq": 100, + "loss_type": "relative_epsilon", "skip_national": skip_national, } regional_fit_reuse = _step_reusable( diff --git a/modal_app/remote_calibration_runner.py b/modal_app/remote_calibration_runner.py index 7a198a0ed..a1204a846 100644 --- a/modal_app/remote_calibration_runner.py +++ b/modal_app/remote_calibration_runner.py @@ -99,6 +99,8 @@ def _collect_outputs(cal_lines): log_path = None cal_log_path = None config_path = None + target_policy_path = None + target_policy_summary_path = None for line in cal_lines: if "OUTPUT_PATH:" in line: output_path = line.split("OUTPUT_PATH:")[1].strip() @@ -110,6 +112,12 @@ def _collect_outputs(cal_lines): cal_log_path = line.split("CAL_LOG_PATH:")[1].strip() elif "LOG_PATH:" in line: log_path = line.split("LOG_PATH:")[1].strip() + elif "TARGET_POLICY_PATH:" in line: + target_policy_path = line.split("TARGET_POLICY_PATH:")[1].strip() + elif "TARGET_POLICY_SUMMARY_PATH:" in line: + target_policy_summary_path = line.split("TARGET_POLICY_SUMMARY_PATH:")[ + 1 + ].strip() with open(output_path, "rb") as f: weights_bytes = f.read() @@ -134,12 +142,24 @@ def _collect_outputs(cal_lines): with open(config_path, "rb") as f: config_bytes = f.read() + target_policy_bytes = None + if target_policy_path: + with open(target_policy_path, "rb") as f: + target_policy_bytes = f.read() + + target_policy_summary_bytes = None + if target_policy_summary_path: + with open(target_policy_summary_path, "rb") as f: + target_policy_summary_bytes = f.read() + return { "weights": weights_bytes, "geography": geography_bytes, "log": log_bytes, "cal_log": cal_log_bytes, "config": config_bytes, + "target_policy": target_policy_bytes, + "target_policy_summary": target_policy_summary_bytes, } diff --git a/modal_app/step_manifests/state.py b/modal_app/step_manifests/state.py index 0132d5d5d..a39d135c1 100644 --- a/modal_app/step_manifests/state.py +++ b/modal_app/step_manifests/state.py @@ -215,7 +215,7 @@ def artifact_identities(paths: dict[str, str | Path]) -> dict: def collect_diagnostics(run_id: str) -> list[ArtifactReference]: return collect_directory_artifacts( run_dir(run_id) / "diagnostics", - patterns=("*.csv", "*.json", "*.txt"), + patterns=("*.csv", "*.json", "*.jsonl", "*.txt"), role="diagnostic", ) diff --git a/policyengine_us_data/calibration/signatures.py b/policyengine_us_data/calibration/signatures.py index a161ecebe..d62eefcbe 100644 --- a/policyengine_us_data/calibration/signatures.py +++ b/policyengine_us_data/calibration/signatures.py @@ -108,6 +108,10 @@ def build_checkpoint_signature( lambda_l2: float, learning_rate: float, target_groups: np.ndarray | None = None, + target_weights: np.ndarray | None = None, + target_tolerances: np.ndarray | None = None, + target_scales: np.ndarray | None = None, + calibration_loss_type: str = "relative", ) -> dict: """Build a compact signature to validate calibration checkpoint resume.""" targets_arr = np.asarray(targets, dtype=np.float64) @@ -116,6 +120,9 @@ def build_checkpoint_signature( if target_groups is None else np.asarray(target_groups, dtype=np.int64) ) + target_weights_arr = _optional_float_signature_array(target_weights) + target_tolerances_arr = _optional_float_signature_array(target_tolerances) + target_scales_arr = _optional_float_signature_array(target_scales) return { "n_features": int(X_sparse.shape[1]), "n_targets": int(len(targets_arr)), @@ -123,6 +130,10 @@ def build_checkpoint_signature( "target_names_sha256": hash_string_list(target_names), "targets_sha256": hashlib.sha256(targets_arr.tobytes()).hexdigest(), "target_groups_sha256": hash_numpy_array(target_groups_arr), + "target_weights_sha256": hash_numpy_array(target_weights_arr), + "target_tolerances_sha256": hash_numpy_array(target_tolerances_arr), + "target_scales_sha256": hash_numpy_array(target_scales_arr), + "calibration_loss_type": str(calibration_loss_type), "lambda_l0": float(lambda_l0), "beta": float(beta), "lambda_l2": float(lambda_l2), @@ -130,6 +141,12 @@ def build_checkpoint_signature( } +def _optional_float_signature_array(values: np.ndarray | None) -> np.ndarray: + if values is None: + return np.array([], dtype=np.float64) + return np.asarray(values, dtype=np.float64) + + def checkpoint_signature_mismatches( expected: dict, actual: dict, diff --git a/policyengine_us_data/calibration/target_policy.py b/policyengine_us_data/calibration/target_policy.py new file mode 100644 index 000000000..a0898ac50 --- /dev/null +++ b/policyengine_us_data/calibration/target_policy.py @@ -0,0 +1,462 @@ +"""Calibration target tolerance policy resolution and artifacts.""" + +from __future__ import annotations + +import json +from collections.abc import Mapping, Sequence +from pathlib import Path +from typing import Any + +import numpy as np +import pandas as pd + +TARGET_POLICY_SCHEMA_VERSION = "1" +TARGET_POLICY_ARTIFACT = "calibration_target_policy.jsonl" +TARGET_POLICY_SUMMARY_ARTIFACT = "calibration_target_policy_summary.json" +DEFAULT_TARGET_POLICY_PATH = Path(__file__).resolve().parent / "target_policy.yaml" +VALID_ENFORCEMENT = frozenset({"fail", "warn", "diagnostic_only"}) + +COUNT_VARIABLES = frozenset( + { + "household_count", + "person_count", + "spm_unit_count", + "tax_unit_count", + } +) +ACA_TOKENS = ("aca", "marketplace", "selected_marketplace_plan") + + +def load_target_policy_config(path: str | Path | None = None) -> dict[str, Any]: + """Load target policy YAML, defaulting to the bundled policy.""" + + import yaml + + policy_path = Path(path) if path is not None else DEFAULT_TARGET_POLICY_PATH + with policy_path.open() as file: + config = yaml.safe_load(file) or {} + if not isinstance(config, dict): + raise ValueError("target policy config must be a mapping") + return config + + +def build_target_policy( + targets_df: pd.DataFrame, + *, + target_names: Sequence[str] | None = None, + config: Mapping[str, Any] | None = None, + row_sums: np.ndarray | None = None, +) -> pd.DataFrame: + """Return one resolved tolerance-policy row per calibration target.""" + + config = dict(config or {}) + defaults = _policy_defaults(config.get("defaults", {})) + rules = list(config.get("rules", [])) + if not isinstance(rules, list): + raise ValueError("target policy rules must be a list") + + rows: list[dict[str, Any]] = [] + for index, target in targets_df.reset_index(drop=True).iterrows(): + rule_id = "default" + policy = _default_policy_for_target(target, defaults) + for raw_rule in rules: + if not isinstance(raw_rule, Mapping): + raise ValueError("target policy rule must be a mapping") + if _rule_matches(target, raw_rule.get("match", {})): + policy.update(_rule_updates(raw_rule)) + rule_id = str(raw_rule.get("id", rule_id)) + + if row_sums is not None and float(row_sums[index]) <= 0: + policy["enforcement"] = "diagnostic_only" + policy["loss_weight"] = 0.0 + + enforcement = str(policy["enforcement"]) + if enforcement not in VALID_ENFORCEMENT: + raise ValueError( + f"target policy enforcement must be one of {sorted(VALID_ENFORCEMENT)}" + ) + tolerance_pct = float(policy["tolerance_pct"]) + loss_weight = float(policy["loss_weight"]) + scale_floor = float(policy["scale_floor"]) + if tolerance_pct < 0: + raise ValueError("target policy tolerance_pct must be non-negative") + if loss_weight < 0: + raise ValueError("target policy loss_weight must be non-negative") + if scale_floor <= 0: + raise ValueError("target policy scale_floor must be positive") + + row = { + "target_index": int(index), + "target": ( + str(target_names[index]) + if target_names is not None and index < len(target_names) + else None + ), + "variable": _target_string(target, "variable"), + "geo_level": _target_string(target, "geo_level"), + "geographic_id": _target_string(target, "geographic_id"), + "domain_variable": _target_string(target, "domain_variable"), + "enforcement": enforcement, + "priority": str(policy["priority"]), + "tolerance_pct": tolerance_pct, + "tolerance": tolerance_pct / 100.0, + "scale_floor": scale_floor, + "loss_weight": 0.0 if enforcement == "diagnostic_only" else loss_weight, + "policy_rule_id": rule_id, + "policy_group_key": _policy_group_key(target, policy), + "loss_enabled": enforcement != "diagnostic_only" and loss_weight > 0, + "schema_version": TARGET_POLICY_SCHEMA_VERSION, + } + rows.append(row) + + return pd.DataFrame(rows) + + +def target_policy_arrays( + target_policy: pd.DataFrame, + targets: np.ndarray, +) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + """Return L0-ready target weights, tolerances, and relative-error scales.""" + + if len(target_policy) != len(targets): + raise ValueError( + "target policy length must match target vector length: " + f"{len(target_policy)} != {len(targets)}" + ) + targets_arr = np.asarray(targets, dtype=np.float64) + target_weights = target_policy["loss_weight"].to_numpy(dtype=np.float64) + tolerances = target_policy["tolerance"].to_numpy(dtype=np.float64) + scale_floors = target_policy["scale_floor"].to_numpy(dtype=np.float64) + scales = np.maximum(np.abs(targets_arr), scale_floors) + return target_weights, tolerances, scales + + +def annotate_diagnostics_with_policy( + diagnostics: pd.DataFrame, + target_policy: pd.DataFrame | None, +) -> pd.DataFrame: + """Attach policy columns and final validation status to diagnostics.""" + + annotated = diagnostics.copy() + if target_policy is None: + annotated["enforcement"] = "warn" + annotated["priority"] = "P3" + annotated["tolerance_pct"] = np.inf + annotated["tolerance"] = np.inf + annotated["scale_floor"] = 1.0 + annotated["loss_weight"] = 1.0 + annotated["policy_rule_id"] = "legacy" + annotated["policy_group_key"] = "legacy" + annotated["loss_enabled"] = True + else: + if len(target_policy) != len(annotated): + raise ValueError( + "target policy length must match diagnostics length: " + f"{len(target_policy)} != {len(annotated)}" + ) + policy_columns = [ + "enforcement", + "priority", + "tolerance_pct", + "tolerance", + "scale_floor", + "loss_weight", + "policy_rule_id", + "policy_group_key", + "loss_enabled", + ] + for column in policy_columns: + annotated[column] = target_policy[column].to_numpy() + + annotated["excess_abs_rel_error"] = np.maximum( + annotated["abs_rel_error"].to_numpy(dtype=np.float64) + - annotated["tolerance"].to_numpy(dtype=np.float64), + 0.0, + ) + annotated["within_tolerance"] = annotated["excess_abs_rel_error"] <= 0 + annotated["validation_status"] = np.select( + [ + ~annotated["achievable"].astype(bool), + annotated["within_tolerance"], + annotated["enforcement"] == "fail", + annotated["enforcement"] == "warn", + ], + ["diagnostic_only", "pass", "fail", "warn"], + default="diagnostic_only", + ) + return annotated + + +def enforce_target_tolerances(diagnostics: pd.DataFrame) -> None: + """Raise if any achievable hard-fail target exceeds its tolerance.""" + + failed = diagnostics[ + (diagnostics["validation_status"] == "fail") + & diagnostics["achievable"].astype(bool) + ] + if failed.empty: + return + preview = failed.sort_values("excess_abs_rel_error", ascending=False).head(10) + rows = [ + f"{row.target}: abs_rel_error={row.abs_rel_error:.4%}, " + f"tolerance={row.tolerance:.4%}" + for row in preview.itertuples() + ] + raise ValueError( + "Calibration hard-fail targets exceeded tolerance: " + "; ".join(rows) + ) + + +def summarize_target_policy(policy: pd.DataFrame) -> dict[str, Any]: + """Return a compact JSON summary of a resolved target policy table.""" + + enforcement_counts = ( + policy["enforcement"].value_counts().sort_index().astype(int).to_dict() + ) + priority_counts = ( + policy["priority"].value_counts().sort_index().astype(int).to_dict() + ) + tolerance_rows = ( + policy[ + [ + "priority", + "enforcement", + "tolerance_pct", + "scale_floor", + "loss_weight", + "policy_rule_id", + ] + ] + .drop_duplicates() + .sort_values(["priority", "enforcement", "policy_rule_id"]) + .to_dict(orient="records") + ) + return { + "schema_version": TARGET_POLICY_SCHEMA_VERSION, + "n_targets": int(len(policy)), + "enforcement_counts": enforcement_counts, + "priority_counts": priority_counts, + "tolerances": tolerance_rows, + } + + +def write_target_policy_artifacts( + policy: pd.DataFrame, + output_dir: str | Path, + *, + prefix: str = "", +) -> tuple[Path, Path]: + """Write target policy JSONL and summary JSON artifacts.""" + + directory = Path(output_dir) + directory.mkdir(parents=True, exist_ok=True) + jsonl_path = directory / f"{prefix}{TARGET_POLICY_ARTIFACT}" + summary_path = directory / f"{prefix}{TARGET_POLICY_SUMMARY_ARTIFACT}" + records = policy.where(pd.notnull(policy), None).to_dict(orient="records") + with jsonl_path.open("w") as file: + for record in records: + file.write(json.dumps(record, sort_keys=True) + "\n") + summary_path.write_text( + json.dumps(summarize_target_policy(policy), indent=2, sort_keys=True) + "\n" + ) + return jsonl_path, summary_path + + +def _policy_defaults(raw_defaults: Mapping[str, Any]) -> dict[str, Any]: + defaults = dict(raw_defaults or {}) + return { + "count_scale_floors": { + "national": float(defaults.get("count_scale_floor_national", 100_000.0)), + "state": float(defaults.get("count_scale_floor_state", 10_000.0)), + "district": float(defaults.get("count_scale_floor_district", 1_000.0)), + }, + "dollar_scale_floors": { + "national": float( + defaults.get("dollar_scale_floor_national", 250_000_000.0) + ), + "state": float(defaults.get("dollar_scale_floor_state", 25_000_000.0)), + "district": float(defaults.get("dollar_scale_floor_district", 1_000_000.0)), + }, + "net_worth_scale_floor": float( + defaults.get("net_worth_scale_floor", 1_000_000_000.0) + ), + } + + +def _default_policy_for_target( + target: pd.Series, + defaults: Mapping[str, Any], +) -> dict[str, Any]: + variable = _target_string(target, "variable") + geo_level = _target_string(target, "geo_level") or "national" + domain_variable = _target_string(target, "domain_variable") + + if variable == "household_count" and not domain_variable: + return _policy( + priority="P0", + enforcement="fail", + tolerance_pct=_geo_tolerance(geo_level, 0.25, 0.5, 1.0), + scale_floor=_scale_floor(variable, geo_level, defaults), + loss_weight=40.0, + ) + if variable == "person_count" and not domain_variable: + return _policy( + priority="P0", + enforcement="fail", + tolerance_pct=_geo_tolerance(geo_level, 0.25, 0.5, 1.0), + scale_floor=_scale_floor(variable, geo_level, defaults), + loss_weight=40.0, + ) + if variable == "person_count" and domain_variable == "age": + return _policy( + priority="P1", + enforcement="warn", + tolerance_pct=_geo_tolerance(geo_level, 1.0, 2.0, 3.0), + scale_floor=_scale_floor(variable, geo_level, defaults), + loss_weight=15.0, + ) + if _is_aca_target(variable, domain_variable): + return _policy( + priority="P2", + enforcement="warn", + tolerance_pct=7.5, + scale_floor=_scale_floor(variable, geo_level, defaults), + loss_weight=6.0, + ) + if _is_count_variable(variable): + return _policy( + priority="P2", + enforcement="warn", + tolerance_pct=_geo_tolerance(geo_level, 5.0, 5.0, 7.5), + scale_floor=_scale_floor(variable, geo_level, defaults), + loss_weight=6.0, + ) + return _policy( + priority="P3", + enforcement="warn", + tolerance_pct=_geo_tolerance(geo_level, 5.0, 5.0, 10.0), + scale_floor=_scale_floor(variable, geo_level, defaults), + loss_weight=3.0, + ) + + +def _policy( + *, + priority: str, + enforcement: str, + tolerance_pct: float, + scale_floor: float, + loss_weight: float, +) -> dict[str, Any]: + return { + "priority": priority, + "enforcement": enforcement, + "tolerance_pct": float(tolerance_pct), + "scale_floor": float(scale_floor), + "loss_weight": float(loss_weight), + } + + +def _rule_matches(target: pd.Series, raw_match: Any) -> bool: + if raw_match is None: + raw_match = {} + if not isinstance(raw_match, Mapping): + raise ValueError("target policy rule match must be a mapping") + for key in ("variable", "geo_level", "domain_variable"): + if key in raw_match and not _matches_value( + _target_string(target, key), + raw_match[key], + ): + return False + if "domain_variable_contains_any" in raw_match: + domain_variable = _target_string(target, "domain_variable") + tokens = _as_sequence(raw_match["domain_variable_contains_any"]) + if not any(str(token) in domain_variable for token in tokens): + return False + return True + + +def _rule_updates(rule: Mapping[str, Any]) -> dict[str, Any]: + allowed = { + "priority", + "enforcement", + "tolerance_pct", + "scale_floor", + "loss_weight", + } + updates = {key: rule[key] for key in allowed if key in rule} + return updates + + +def _matches_value(value: str, expected: Any) -> bool: + return value in {str(item) for item in _as_sequence(expected)} + + +def _as_sequence(value: Any) -> Sequence[Any]: + if isinstance(value, list | tuple | set): + return tuple(value) + return (value,) + + +def _target_string(target: pd.Series, key: str) -> str: + value = target.get(key, "") + if value is None: + return "" + try: + if pd.isna(value): + return "" + except (TypeError, ValueError): + pass + return str(value) + + +def _policy_group_key(target: pd.Series, policy: Mapping[str, Any]) -> str: + return "|".join( + [ + str(policy["priority"]), + str(policy["enforcement"]), + _target_string(target, "geo_level"), + _target_string(target, "variable"), + _target_string(target, "domain_variable"), + ] + ) + + +def _geo_tolerance( + geo_level: str, + national: float, + state: float, + district: float, +) -> float: + if geo_level == "district": + return district + if geo_level == "state": + return state + return national + + +def _scale_floor( + variable: str, + geo_level: str, + defaults: Mapping[str, Any], +) -> float: + if variable == "net_worth": + return float(defaults["net_worth_scale_floor"]) + if _is_count_variable(variable): + floors = defaults["count_scale_floors"] + else: + floors = defaults["dollar_scale_floors"] + if geo_level == "district": + return float(floors["district"]) + if geo_level == "state": + return float(floors["state"]) + return float(floors["national"]) + + +def _is_count_variable(variable: str) -> bool: + return variable in COUNT_VARIABLES or variable.endswith("_count") + + +def _is_aca_target(variable: str, domain_variable: str) -> bool: + text = f"{variable},{domain_variable}".lower() + return any(token in text for token in ACA_TOKENS) diff --git a/policyengine_us_data/calibration/target_policy.yaml b/policyengine_us_data/calibration/target_policy.yaml new file mode 100644 index 000000000..02e4c06a9 --- /dev/null +++ b/policyengine_us_data/calibration/target_policy.yaml @@ -0,0 +1,31 @@ +schema_version: "1" + +defaults: + count_scale_floor_district: 1000 + count_scale_floor_state: 10000 + count_scale_floor_national: 100000 + dollar_scale_floor_district: 1000000 + dollar_scale_floor_state: 25000000 + dollar_scale_floor_national: 250000000 + net_worth_scale_floor: 1000000000 + +rules: + - id: aca_warning_targets + match: + domain_variable_contains_any: + - aca + - marketplace + - selected_marketplace_plan + priority: P2 + enforcement: warn + tolerance_pct: 7.5 + loss_weight: 6.0 + + - id: aca_variable_warning_targets + match: + variable: + - aca_ptc + priority: P2 + enforcement: warn + tolerance_pct: 7.5 + loss_weight: 6.0 diff --git a/policyengine_us_data/calibration/unified_calibration.py b/policyengine_us_data/calibration/unified_calibration.py index dcce949c1..f9e2c8ee9 100644 --- a/policyengine_us_data/calibration/unified_calibration.py +++ b/policyengine_us_data/calibration/unified_calibration.py @@ -41,6 +41,18 @@ build_checkpoint_signature, checkpoint_signature_mismatches, ) +from policyengine_us_data.calibration.target_policy import ( + DEFAULT_TARGET_POLICY_PATH, + TARGET_POLICY_ARTIFACT, + TARGET_POLICY_SUMMARY_ARTIFACT, + annotate_diagnostics_with_policy, + build_target_policy, + enforce_target_tolerances, + load_target_policy_config, + summarize_target_policy, + target_policy_arrays, + write_target_policy_artifacts, +) from policyengine_us_data.pipeline_metadata import pipeline_node from policyengine_us_data.stage_contracts.calibration_package import ( CalibrationPackageParameters, @@ -83,6 +95,7 @@ def _calibration_package_contract_parameters( workers: int, n_clones: int, target_config_path: str | None, + target_policy_path: str | None, skip_county: bool, skip_source_impute: bool, skip_takeup_rerandomize: bool, @@ -97,6 +110,7 @@ def _calibration_package_contract_parameters( workers=workers, n_clones=n_clones, target_config_path=target_config_path, + target_policy_path=target_policy_path, skip_county=skip_county, skip_source_impute=skip_source_impute, skip_takeup_rerandomize=skip_takeup_rerandomize, @@ -321,6 +335,14 @@ def parse_args(argv=None): "--all-active-targets is set." ), ) + parser.add_argument( + "--target-policy", + default=None, + help=( + "Path to calibration target tolerance policy YAML. " + "Defaults to calibration/target_policy.yaml." + ), + ) parser.add_argument( "--all-active-targets", action="store_true", @@ -553,6 +575,80 @@ def apply_target_config_to_targets( return filtered_df +def add_derived_population_anchor_targets( + targets_df: "pd.DataFrame", + X_sparse, + target_names: list, +) -> tuple["pd.DataFrame", object, list]: + """Append total person-count anchors derived from age-bin rows. + + District/state/national person counts are sometimes represented only as + age-bin person_count rows. The calibration policy needs an explicit total + person-count row so population undercounts are hard-fail targets. + """ + + if X_sparse is None or len(targets_df) == 0: + return targets_df, X_sparse, target_names + required = {"variable", "domain_variable", "geo_level", "geographic_id", "value"} + if not required.issubset(targets_df.columns): + return targets_df, X_sparse, target_names + + age_mask = (targets_df["variable"] == "person_count") & ( + targets_df["domain_variable"].fillna("") == "age" + ) + age_rows = targets_df[age_mask] + if age_rows.empty: + return targets_df, X_sparse, target_names + + import scipy.sparse as sparse + + existing_totals = targets_df[ + (targets_df["variable"] == "person_count") + & (targets_df["domain_variable"].fillna("") == "") + ] + existing_keys = set( + zip( + existing_totals["geo_level"].astype(str), + existing_totals["geographic_id"].astype(str), + strict=False, + ) + ) + new_rows = [] + new_matrix_rows = [] + new_names = list(target_names) + for (geo_level, geographic_id), group in age_rows.groupby( + ["geo_level", "geographic_id"], + sort=False, + ): + key = (str(geo_level), str(geographic_id)) + if key in existing_keys: + continue + source = group.iloc[0].copy() + source["domain_variable"] = "" + source["value"] = float(group["value"].sum()) + if "target_id" in source.index: + source["target_id"] = f"derived:person_count:{geo_level}:{geographic_id}" + if "target_name" in source.index: + source["target_name"] = f"derived_person_count_{geo_level}_{geographic_id}" + source["derived_target"] = True + source["derived_from"] = "person_count_age" + new_rows.append(source) + row_indices = group.index.to_numpy(dtype=np.int64) + new_matrix_rows.append(sparse.csr_matrix(X_sparse[row_indices, :].sum(axis=0))) + new_names.append(f"derived_person_count_{geo_level}_{geographic_id}") + + if not new_rows: + return targets_df, X_sparse, target_names + + expanded_targets = pd.concat( + [targets_df.reset_index(drop=True), pd.DataFrame(new_rows)], + ignore_index=True, + ) + expanded_X = sparse.vstack([X_sparse, *new_matrix_rows], format="csr") + logger.info("Added %d derived total person-count target(s)", len(new_rows)) + return expanded_targets, expanded_X, new_names + + def save_calibration_package( path: str, X_sparse, @@ -562,6 +658,7 @@ def save_calibration_package( initial_weights: np.ndarray = None, cd_geoid: np.ndarray = None, block_geoid: np.ndarray = None, + target_policy_df: "pd.DataFrame" = None, ) -> None: """Save calibration package to pickle. @@ -574,6 +671,7 @@ def save_calibration_package( initial_weights: Pre-computed initial weight array. cd_geoid: CD GEOID array from geography assignment. block_geoid: Block GEOID array from geography assignment. + target_policy_df: Resolved per-target tolerance policy. """ import pickle @@ -585,6 +683,7 @@ def save_calibration_package( "initial_weights": initial_weights, "cd_geoid": cd_geoid, "block_geoid": block_geoid, + "target_policy_df": target_policy_df, } Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, "wb") as f: @@ -768,6 +867,64 @@ def compute_initial_weights( return initial_weights +def _relative_errors_for_reporting( + *, + y_pred: np.ndarray, + targets: np.ndarray, + target_scales: np.ndarray | None, +) -> np.ndarray: + """Return relative errors using explicit scales when provided.""" + + if target_scales is not None: + return (y_pred - targets) / target_scales + return np.where( + np.abs(targets) > 0, + (y_pred - targets) / np.abs(targets), + 0.0, + ) + + +def _target_loss_for_reporting( + *, + rel_err: float, + target_weight: float, + target_tolerance: float, + loss_type: str, +) -> float: + """Return the scalar data-loss contribution used in progress logs.""" + + if loss_type == "relative_epsilon": + excess = max(abs(float(rel_err)) - float(target_tolerance), 0.0) + return float(target_weight) * excess**2 + return float(target_weight) * float(rel_err) ** 2 + + +def _calibration_data_loss_for_reporting( + *, + rel_errs: np.ndarray, + target_weights: np.ndarray | None, + target_tolerances: np.ndarray | None, + loss_type: str, +) -> float: + """Return the total data loss shown in epoch progress logs.""" + + weights = ( + np.ones(len(rel_errs), dtype=np.float64) + if target_weights is None + else np.asarray(target_weights, dtype=np.float64) + ) + if loss_type == "relative_epsilon": + tolerances = ( + np.zeros(len(rel_errs), dtype=np.float64) + if target_tolerances is None + else np.asarray(target_tolerances, dtype=np.float64) + ) + rel_terms = np.maximum(np.abs(rel_errs) - tolerances, 0.0) + else: + rel_terms = rel_errs + return float(np.sum(weights * rel_terms**2)) + + @pipeline_node( PipelineNode( id="fit_model", @@ -802,6 +959,10 @@ def fit_l0_weights( targets_df: "pd.DataFrame" = None, achievable: np.ndarray = None, target_groups: Optional[np.ndarray] = None, + target_weights: Optional[np.ndarray] = None, + target_tolerances: Optional[np.ndarray] = None, + target_scales: Optional[np.ndarray] = None, + calibration_loss_type: str = "relative", resume_from: str = None, checkpoint_path: str = None, ) -> np.ndarray: @@ -826,6 +987,10 @@ def fit_l0_weights( targets_df: Targets DataFrame, used to compute initial_weights when not provided. target_groups: Optional group ID per target row for balanced loss. + target_weights: Optional per-target loss weights. + target_tolerances: Optional relative-error tolerance per target. + target_scales: Optional relative-error scale per target. + calibration_loss_type: L0 data loss type. resume_from: Path to a `.checkpoint.pt` file or `.npy` weights file to continue fitting from. checkpoint_path: Where to save resumable fit checkpoints. @@ -857,6 +1022,10 @@ def fit_l0_weights( lambda_l2=lambda_l2, learning_rate=learning_rate, target_groups=target_groups, + target_weights=target_weights, + target_tolerances=target_tolerances, + target_scales=target_scales, + calibration_loss_type=calibration_loss_type, ) checkpoint_state_dict = None start_epoch = 0 @@ -987,11 +1156,14 @@ def _flushed_print(*args, **kwargs): M=X_sparse, y=targets, target_groups=target_groups, + target_weights=target_weights, + target_tolerances=target_tolerances, + target_scales=target_scales, lambda_l0=lambda_l0, lambda_l2=lambda_l2, lr=learning_rate, epochs=chunk, - loss_type="relative", + loss_type=calibration_loss_type, verbose=False, ) model.log_weight_jitter_sd = 0.0 @@ -1015,14 +1187,19 @@ def _flushed_print(*args, **kwargs): nz = len(active_w) sparsity = (1 - nz / n_total) * 100 - rel_errs = np.where( - np.abs(targets) > 0, - (y_pred - targets) / np.abs(targets), - 0.0, + rel_errs = _relative_errors_for_reporting( + y_pred=y_pred, + targets=targets, + target_scales=target_scales, ) mean_err = np.mean(np.abs(rel_errs)) max_err = np.max(np.abs(rel_errs)) - total_loss = np.sum(rel_errs**2) + total_loss = _calibration_data_loss_for_reporting( + rel_errs=rel_errs, + target_weights=target_weights, + target_tolerances=target_tolerances, + loss_type=calibration_loss_type, + ) if nz > 0: w_tiny = (active_w < 0.01).sum() @@ -1061,10 +1238,24 @@ def _flushed_print(*args, **kwargs): est = y_pred[i] tgt = targets[i] err = est - tgt - rel_err = err / tgt if tgt != 0 else 0 + if target_scales is not None: + rel_err = err / target_scales[i] + else: + rel_err = err / tgt if tgt != 0 else 0 abs_err = abs(err) rel_abs = abs(rel_err) - loss = rel_err**2 + loss = _target_loss_for_reporting( + rel_err=rel_err, + target_weight=( + target_weights[i] if target_weights is not None else 1.0 + ), + target_tolerance=( + target_tolerances[i] + if target_tolerances is not None + else 0.0 + ), + loss_type=calibration_loss_type, + ) f.write( f'"{target_names[i]}",' f"{est},{tgt},{absolute_epoch}," @@ -1086,11 +1277,14 @@ def _flushed_print(*args, **kwargs): M=X_sparse, y=targets, target_groups=target_groups, + target_weights=target_weights, + target_tolerances=target_tolerances, + target_scales=target_scales, lambda_l0=lambda_l0, lambda_l2=lambda_l2, lr=learning_rate, epochs=epochs, - loss_type="relative", + loss_type=calibration_loss_type, verbose=True, verbose_freq=verbose_freq, ) @@ -1146,6 +1340,7 @@ def compute_diagnostics( X_sparse, targets_df, target_names: list, + target_policy_df: "pd.DataFrame" = None, ) -> "pd.DataFrame": import pandas as pd @@ -1158,7 +1353,7 @@ def compute_diagnostics( (estimates - true_values) / np.abs(true_values), 0.0, ) - return pd.DataFrame( + diagnostics = pd.DataFrame( { "target": target_names, "true_value": true_values, @@ -1168,6 +1363,7 @@ def compute_diagnostics( "achievable": row_sums > 0, } ) + return annotate_diagnostics_with_policy(diagnostics, target_policy_df) def _raw_time_period_array( @@ -1261,6 +1457,8 @@ def _extract_forbes_state_fips_overrides( "calibration_weights.npy", "unified_diagnostics.csv", "unified_run_config.json", + TARGET_POLICY_ARTIFACT, + TARGET_POLICY_SUMMARY_ARTIFACT, ], validation_commands=[ "uv run pytest tests/unit/calibration/test_unified_calibration.py" @@ -1282,6 +1480,9 @@ def run_calibration( skip_county: bool = True, target_config: dict = None, target_config_path: str = None, + target_policy_config: dict = None, + target_policy_path: str = None, + target_policy_output_dir: str = None, build_only: bool = False, package_path: str = None, package_output_path: str = None, @@ -1319,6 +1520,9 @@ def run_calibration( skip_source_impute: Skip ACS/SIPP/SCF imputations. target_config: Parsed target config dict. target_config_path: Path to target config, for provenance. + target_policy_config: Parsed target tolerance policy config. + target_policy_path: Path to target policy YAML, for provenance. + target_policy_output_dir: Directory for target policy artifacts. build_only: If True, save package and skip fitting. package_path: Load pre-built package (skip build). package_output_path: Where to save calibration package. @@ -1357,14 +1561,29 @@ def run_calibration( targets_df, X_sparse, target_names = apply_target_config( targets_df, X_sparse, target_names, target_config ) + targets_df, X_sparse, target_names = add_derived_population_anchor_targets( + targets_df, + X_sparse, + target_names, + ) initial_weights = package.get("initial_weights") targets = targets_df["value"].values + row_sums = np.array(X_sparse.sum(axis=1)).flatten() + pkg_achievable = row_sums > 0 + target_policy_df = build_target_policy( + targets_df, + target_names=target_names, + config=target_policy_config, + row_sums=row_sums, + ) + target_weights, target_tolerances, target_scales = target_policy_arrays( + target_policy_df, + targets, + ) # Temporarily disable grouped target loss until target precedence # and tolerance handling can make grouped fitting safe. target_groups = None - row_sums = np.array(X_sparse.sum(axis=1)).flatten() - pkg_achievable = row_sums > 0 weights = fit_l0_weights( X_sparse=X_sparse, targets=targets, @@ -1381,6 +1600,10 @@ def run_calibration( targets_df=targets_df, achievable=pkg_achievable, target_groups=target_groups, + target_weights=target_weights, + target_tolerances=target_tolerances, + target_scales=target_scales, + calibration_loss_type="relative_epsilon", resume_from=resume_from, checkpoint_path=checkpoint_path, ) @@ -1392,6 +1615,7 @@ def run_calibration( "cd_geoid": package.get("cd_geoid"), "block_geoid": package.get("block_geoid"), "base_n_records": package["metadata"].get("base_n_records"), + "target_policy_df": target_policy_df, } return ( weights, @@ -1586,6 +1810,30 @@ def run_calibration( X_sparse.shape, X_sparse.nnz, ) + targets_df, X_sparse, target_names = add_derived_population_anchor_targets( + targets_df, + X_sparse, + target_names, + ) + row_sums = np.array(X_sparse.sum(axis=1)).flatten() + target_policy_df = build_target_policy( + targets_df, + target_names=target_names, + config=target_policy_config, + row_sums=row_sums, + ) + target_policy_summary = summarize_target_policy(target_policy_df) + target_policy_artifacts: tuple[Path, Path] | None = None + if target_policy_output_dir is not None: + target_policy_artifacts = write_target_policy_artifacts( + target_policy_df, + target_policy_output_dir, + ) + logger.info( + "Target policy artifacts saved to %s and %s", + target_policy_artifacts[0], + target_policy_artifacts[1], + ) # Step 6b: Save the calibration package. By default this is the # minimal package selected by target_config.yaml; use @@ -1599,6 +1847,9 @@ def run_calibration( "seed": seed, "created_at": _utc_now_isoformat(), "target_config_path": target_config_path, + "target_policy_path": target_policy_path, + "target_policy_schema_version": target_policy_summary["schema_version"], + "target_policy_summary": target_policy_summary, "package_scope": "minimal" if target_config else "all_active_targets", "matrix_builder": "chunked" if chunked_matrix else "precompute", "chunk_size": chunk_size if chunked_matrix else None, @@ -1613,6 +1864,10 @@ def run_calibration( metadata["target_config_sha256"] = compute_file_checksum( Path(target_config_path) ) + if target_policy_path: + metadata["target_policy_sha256"] = compute_file_checksum( + Path(target_policy_path) + ) initial_weights = compute_initial_weights(X_sparse, targets_df) if package_output_path: @@ -1624,6 +1879,7 @@ def run_calibration( "initial_weights": initial_weights, "cd_geoid": geography.cd_geoid, "block_geoid": geography.block_geoid, + "target_policy_df": target_policy_df, } save_calibration_package( package_output_path, @@ -1634,6 +1890,7 @@ def run_calibration( initial_weights=initial_weights, cd_geoid=geography.cd_geoid, block_geoid=geography.block_geoid, + target_policy_df=target_policy_df, ) from policyengine_us_data.stage_contracts.calibration_package import ( validate_calibration_package_contract, @@ -1650,6 +1907,7 @@ def run_calibration( workers=workers, n_clones=n_clones, target_config_path=target_config_path, + target_policy_path=target_policy_path, skip_county=skip_county, skip_source_impute=skip_source_impute, skip_takeup_rerandomize=skip_takeup_rerandomize, @@ -1684,6 +1942,7 @@ def run_calibration( "target_names": target_names, "metadata": metadata, "initial_weights": initial_weights, + "target_policy_df": target_policy_df, } result = validate_package(package) print(format_report(result)) @@ -1692,6 +1951,8 @@ def run_calibration( "block_geoid": geography.block_geoid, "base_n_records": n_records, "dataset_for_matrix": dataset_for_matrix, + "target_policy_df": target_policy_df, + "target_policy_artifacts": target_policy_artifacts, } return ( None, @@ -1703,11 +1964,14 @@ def run_calibration( # Step 7: L0 calibration targets = targets_df["value"].values + target_weights, target_tolerances, target_scales = target_policy_arrays( + target_policy_df, + targets, + ) # Temporarily disable grouped target loss until target precedence # and tolerance handling can make grouped fitting safe. target_groups = None - row_sums = np.array(X_sparse.sum(axis=1)).flatten() achievable = row_sums > 0 logger.info( "Achievable: %d / %d targets", @@ -1731,6 +1995,10 @@ def run_calibration( targets_df=targets_df, achievable=achievable, target_groups=target_groups, + target_weights=target_weights, + target_tolerances=target_tolerances, + target_scales=target_scales, + calibration_loss_type="relative_epsilon", resume_from=resume_from, checkpoint_path=checkpoint_path, ) @@ -1748,6 +2016,8 @@ def run_calibration( "household_ids": getattr(builder, "household_ids", None), "precomputed_rates": getattr(builder, "precomputed_rates", None), "affected_target_info": getattr(builder, "affected_target_info", None), + "target_policy_df": target_policy_df, + "target_policy_artifacts": target_policy_artifacts, } return ( weights, @@ -1816,6 +2086,8 @@ def main(argv=None): if not args.all_active_targets: target_config_path = args.target_config or str(DEFAULT_TARGET_CONFIG_PATH) target_config = load_target_config(target_config_path) + target_policy_path = args.target_policy or str(DEFAULT_TARGET_POLICY_PATH) + target_policy_config = load_target_policy_config(target_policy_path) package_output_path = args.package_output if args.build_only and not package_output_path: @@ -1824,6 +2096,11 @@ def main(argv=None): ) output_dir = Path(output_path).parent + target_policy_output_dir = ( + str(Path(package_output_path).parent) + if args.build_only and package_output_path + else str(output_dir) + ) cal_log_path = None if args.log_freq is not None: cal_log_path = str(output_dir / "calibration_log.csv") @@ -1851,6 +2128,9 @@ def main(argv=None): skip_county=not args.county_level, target_config=target_config, target_config_path=target_config_path, + target_policy_config=target_policy_config, + target_policy_path=target_policy_path, + target_policy_output_dir=target_policy_output_dir, build_only=args.build_only, package_path=args.package_path, package_output_path=package_output_path, @@ -1876,15 +2156,33 @@ def main(argv=None): if source_imputed and source_imputed != dataset_path: print(f"SOURCE_IMPUTED_PATH:{source_imputed}") + target_policy_df = geography_info.get("target_policy_df") + target_policy_artifacts = geography_info.get("target_policy_artifacts") + if target_policy_df is not None and target_policy_artifacts is None: + target_policy_artifacts = write_target_policy_artifacts( + target_policy_df, + target_policy_output_dir, + ) + if target_policy_artifacts is not None: + print(f"TARGET_POLICY_PATH:{target_policy_artifacts[0]}") + print(f"TARGET_POLICY_SUMMARY_PATH:{target_policy_artifacts[1]}") + if weights is None: logger.info("Build-only complete. Package saved.") return # Diagnostics (raw weights match X_sparse column layout) output_dir = Path(output_path).parent - diag_df = compute_diagnostics(weights, X_sparse, targets_df, target_names) + diag_df = compute_diagnostics( + weights, + X_sparse, + targets_df, + target_names, + target_policy_df=target_policy_df, + ) diag_path = output_dir / "unified_diagnostics.csv" diag_df.to_csv(diag_path, index=False) + enforce_target_tolerances(diag_df) ach = diag_df[diag_df.achievable] err_pct = ach.abs_rel_error * 100 @@ -1988,6 +2286,15 @@ def _sha256(filepath): "domain_variables": domain_variables, "hierarchical_domains": hierarchical_domains, "target_config": args.target_config, + "target_policy": target_policy_path, + "target_policy_artifacts": { + TARGET_POLICY_ARTIFACT: _sha256(target_policy_artifacts[0]) + if target_policy_artifacts is not None + else None, + TARGET_POLICY_SUMMARY_ARTIFACT: _sha256(target_policy_artifacts[1]) + if target_policy_artifacts is not None + else None, + }, "n_targets": len(targets_df), "n_records": X_sparse.shape[1], "weight_format": weight_format, diff --git a/policyengine_us_data/stage_contracts/calibration_package.py b/policyengine_us_data/stage_contracts/calibration_package.py index dc0385321..4fee2afed 100644 --- a/policyengine_us_data/stage_contracts/calibration_package.py +++ b/policyengine_us_data/stage_contracts/calibration_package.py @@ -161,6 +161,20 @@ def summarize_calibration_package( metadata, "target_config_sha256", ), + target_policy_path=_optional_metadata_string( + metadata, + "target_policy_path", + ), + target_policy_sha256=_optional_metadata_string( + metadata, + "target_policy_sha256", + ), + target_policy_schema_version=_optional_metadata_string( + metadata, + "target_policy_schema_version", + ), + target_policy_row_count=_optional_len(package.get("target_policy_df")), + has_target_policy=package.get("target_policy_df") is not None, n_clones=_optional_metadata_int(metadata, "n_clones"), seed=_optional_metadata_int(metadata, "seed"), base_n_records=_optional_metadata_int(metadata, "base_n_records"), diff --git a/policyengine_us_data/stage_contracts/calibration_package_schema.py b/policyengine_us_data/stage_contracts/calibration_package_schema.py index 06030812e..5905b33b0 100644 --- a/policyengine_us_data/stage_contracts/calibration_package_schema.py +++ b/policyengine_us_data/stage_contracts/calibration_package_schema.py @@ -39,6 +39,7 @@ "skip_source_impute", "skip_takeup_rerandomize", "target_config", + "target_policy", "workers", } ) @@ -65,6 +66,11 @@ "seed", "target_config_path", "target_config_sha256", + "target_policy_path", + "target_policy_sha256", + "target_policy_schema_version", + "target_policy_row_count", + "has_target_policy", "target_name_count", } ) @@ -207,6 +213,7 @@ class CalibrationPackageParameters: workers: int | None n_clones: int target_config: str | None + target_policy: str | None skip_county: bool skip_source_impute: bool skip_takeup_rerandomize: bool @@ -230,6 +237,8 @@ def __post_init__(self) -> None: _validate_bool(self.parallel_matrix, "parallel_matrix") if self.target_config is not None and not isinstance(self.target_config, str): raise ValueError("target_config must be a string or None") + if self.target_policy is not None and not isinstance(self.target_policy, str): + raise ValueError("target_policy must be a string or None") if self.chunked_matrix: if self.workers is not None: raise ValueError("workers must be None when chunked_matrix is true") @@ -258,6 +267,7 @@ def from_runtime_args( workers: int, n_clones: int, target_config_path: str | None, + target_policy_path: str | None, skip_county: bool, skip_source_impute: bool, skip_takeup_rerandomize: bool, @@ -273,6 +283,7 @@ def from_runtime_args( workers=workers if not chunked_matrix else None, n_clones=n_clones, target_config=target_config_path, + target_policy=target_policy_path, skip_county=skip_county, skip_source_impute=skip_source_impute, skip_takeup_rerandomize=skip_takeup_rerandomize, @@ -300,6 +311,7 @@ def from_dict( workers=_optional_int_field(data, "workers"), n_clones=_required_int_field(data, "n_clones"), target_config=_optional_string_field(data, "target_config"), + target_policy=_optional_string_field(data, "target_policy"), skip_county=_required_bool_field(data, "skip_county"), skip_source_impute=_required_bool_field(data, "skip_source_impute"), skip_takeup_rerandomize=_required_bool_field( @@ -325,6 +337,7 @@ def to_dict(self) -> dict[str, Any]: "skip_source_impute": self.skip_source_impute, "skip_takeup_rerandomize": self.skip_takeup_rerandomize, "target_config": self.target_config, + "target_policy": self.target_policy, "workers": self.workers, } @@ -343,6 +356,11 @@ class CalibrationPackageSummary: db_sha256: str | None target_config_path: str | None target_config_sha256: str | None + target_policy_path: str | None + target_policy_sha256: str | None + target_policy_schema_version: str | None + target_policy_row_count: int | None + has_target_policy: bool n_clones: int | None seed: int | None base_n_records: int | None @@ -369,6 +387,10 @@ def __post_init__(self) -> None: _validate_optional_non_negative_int(self.n_clones, "n_clones") _validate_optional_non_negative_int(self.seed, "seed") _validate_optional_non_negative_int(self.base_n_records, "base_n_records") + _validate_optional_non_negative_int( + self.target_policy_row_count, + "target_policy_row_count", + ) _validate_optional_non_negative_int(self.chunk_size, "chunk_size") _validate_optional_non_negative_int(self.cd_geoid_length, "cd_geoid_length") _validate_optional_non_negative_int( @@ -376,6 +398,7 @@ def __post_init__(self) -> None: "block_geoid_length", ) _validate_bool(self.has_initial_weights, "has_initial_weights") + _validate_bool(self.has_target_policy, "has_target_policy") _validate_bool(self.has_cd_geoid, "has_cd_geoid") _validate_bool(self.has_block_geoid, "has_block_geoid") for key in ( @@ -383,6 +406,9 @@ def __post_init__(self) -> None: "db_sha256", "target_config_path", "target_config_sha256", + "target_policy_path", + "target_policy_sha256", + "target_policy_schema_version", "package_scope", "matrix_builder", "chunk_dir", @@ -416,6 +442,20 @@ def from_dict(cls, data: Mapping[str, Any]) -> "CalibrationPackageSummary": data, "target_config_sha256", ), + target_policy_path=_optional_string_field(data, "target_policy_path"), + target_policy_sha256=_optional_string_field( + data, + "target_policy_sha256", + ), + target_policy_schema_version=_optional_string_field( + data, + "target_policy_schema_version", + ), + target_policy_row_count=_optional_int_field( + data, + "target_policy_row_count", + ), + has_target_policy=_required_bool_field(data, "has_target_policy"), n_clones=_optional_int_field(data, "n_clones"), seed=_optional_int_field(data, "seed"), base_n_records=_optional_int_field(data, "base_n_records"), @@ -455,6 +495,11 @@ def to_dict(self) -> dict[str, Any]: "seed": self.seed, "target_config_path": self.target_config_path, "target_config_sha256": self.target_config_sha256, + "target_policy_path": self.target_policy_path, + "target_policy_sha256": self.target_policy_sha256, + "target_policy_schema_version": self.target_policy_schema_version, + "target_policy_row_count": self.target_policy_row_count, + "has_target_policy": self.has_target_policy, "target_name_count": self.target_name_count, } diff --git a/pyproject.toml b/pyproject.toml index 24b8c385d..64ecdedd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ calibration = [ "samplics", ] l0 = [ - "l0-python", + "l0-python @ git+https://github.com/PolicyEngine/L0.git@codex/epsilon-insensitive-calibration-loss", ] [dependency-groups] diff --git a/scripts/run_epsilon_calibration_demo.py b/scripts/run_epsilon_calibration_demo.py new file mode 100644 index 000000000..18581c401 --- /dev/null +++ b/scripts/run_epsilon_calibration_demo.py @@ -0,0 +1,126 @@ +"""Run a small synthetic epsilon-insensitive calibration demonstration. + +This is intentionally outside the default test suite. It gives reviewers a +cheap way to compare grouped relative loss against the proposed +epsilon-insensitive policy loss on a fixture with high-cardinality soft targets +and a small set of hard population anchors. +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +import numpy as np +from scipy import sparse as sp + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument( + "--output", + default="epsilon_calibration_demo_summary.json", + help="JSON summary path.", + ) + parser.add_argument("--epochs", type=int, default=300) + parser.add_argument("--seed", type=int, default=42) + return parser.parse_args() + + +def main() -> None: + try: + from l0.calibration import SparseCalibrationWeights + except ImportError as exc: + raise SystemExit( + "Install the l0 extra or pin the L0 feature branch before running " + "this demo." + ) from exc + + args = parse_args() + rng = np.random.default_rng(args.seed) + n_features = 160 + n_soft = 80 + n_targets = 2 + n_soft + + weights_true = rng.lognormal(mean=1.0, sigma=0.5, size=n_features) + matrix = rng.gamma(shape=2.0, scale=1.0, size=(n_targets, n_features)) + matrix[0, :] = 1.0 + matrix[1, :] = rng.uniform(0.0, 1.0, size=n_features) > 0.45 + targets = matrix @ weights_true + + target_groups = np.array([0, 1] + [2] * n_soft) + initial_weights = np.full(n_features, targets[0] / n_features) + M = sp.csr_matrix(matrix) + + grouped = SparseCalibrationWeights( + n_features=n_features, + init_keep_prob=0.95, + init_weights=initial_weights, + log_weight_jitter_sd=0.05, + seed=args.seed, + ) + grouped.fit( + M, + targets, + lambda_l0=1e-3, + lr=0.1, + epochs=args.epochs, + loss_type="relative", + target_groups=target_groups, + verbose=False, + ) + + epsilon = SparseCalibrationWeights( + n_features=n_features, + init_keep_prob=0.95, + init_weights=initial_weights, + log_weight_jitter_sd=0.05, + seed=args.seed, + ) + target_weights = np.array([40.0, 40.0] + [2.0] * n_soft) + target_tolerances = np.array([0.005, 0.005] + [0.10] * n_soft) + target_scales = np.maximum( + np.abs(targets), np.array([100.0, 100.0] + [1.0] * n_soft) + ) + epsilon.fit( + M, + targets, + lambda_l0=1e-3, + lr=0.1, + epochs=args.epochs, + loss_type="relative_epsilon", + target_weights=target_weights, + target_tolerances=target_tolerances, + target_scales=target_scales, + verbose=False, + ) + + result = { + "grouped_relative": _summarize(grouped, M, targets, target_tolerances), + "epsilon_insensitive": _summarize(epsilon, M, targets, target_tolerances), + "hard_anchor_tolerance": 0.005, + "soft_target_tolerance": 0.10, + "epochs": args.epochs, + "seed": args.seed, + } + output_path = Path(args.output) + output_path.write_text(json.dumps(result, indent=2, sort_keys=True) + "\n") + print(f"Wrote {output_path}") + + +def _summarize(model, M, targets: np.ndarray, tolerances: np.ndarray) -> dict: + prediction = model.predict(M).detach().cpu().numpy() + rel_errors = np.abs((prediction - targets) / np.maximum(np.abs(targets), 1.0)) + weights = model.get_weights(deterministic=True).detach().cpu().numpy() + return { + "hard_anchor_max_abs_rel_error": float(rel_errors[:2].max()), + "soft_target_mean_abs_rel_error": float(rel_errors[2:].mean()), + "share_targets_within_tolerance": float((rel_errors <= tolerances).mean()), + "weight_sum": float(weights.sum()), + "nonzero_weights": int((weights > 0).sum()), + } + + +if __name__ == "__main__": + main() diff --git a/tests/unit/calibration/test_target_policy.py b/tests/unit/calibration/test_target_policy.py new file mode 100644 index 000000000..b1a2fffa1 --- /dev/null +++ b/tests/unit/calibration/test_target_policy.py @@ -0,0 +1,129 @@ +import json + +import numpy as np +import pandas as pd + +from policyengine_us_data.calibration.target_policy import ( + TARGET_POLICY_ARTIFACT, + TARGET_POLICY_SUMMARY_ARTIFACT, + annotate_diagnostics_with_policy, + build_target_policy, + enforce_target_tolerances, + load_target_policy_config, + target_policy_arrays, + write_target_policy_artifacts, +) + + +def _targets_df(): + return pd.DataFrame( + { + "variable": [ + "household_count", + "person_count", + "aca_ptc", + "tax_unit_count", + "snap", + ], + "geo_level": ["district", "state", "district", "state", "national"], + "geographic_id": ["0101", "01", "0101", "01", "00"], + "domain_variable": ["", "age", "", "aca_ptc", ""], + "value": [1_000.0, 10_000.0, 50_000.0, 100_000.0, 10_000_000.0], + } + ) + + +def test_build_target_policy_assigns_initial_tolerances(): + policy = build_target_policy( + _targets_df(), + target_names=["hh", "age", "aca_dollars", "aca_count", "snap"], + config=load_target_policy_config(), + row_sums=np.array([1.0, 1.0, 1.0, 1.0, 0.0]), + ) + + assert policy.loc[0, "enforcement"] == "fail" + assert policy.loc[0, "priority"] == "P0" + assert policy.loc[0, "tolerance_pct"] == 1.0 + assert policy.loc[0, "loss_weight"] == 40.0 + + assert policy.loc[1, "enforcement"] == "warn" + assert policy.loc[1, "priority"] == "P1" + assert policy.loc[1, "tolerance_pct"] == 2.0 + + assert policy.loc[2, "enforcement"] == "warn" + assert policy.loc[2, "policy_rule_id"] == "aca_variable_warning_targets" + assert policy.loc[2, "tolerance_pct"] == 7.5 + + assert policy.loc[3, "enforcement"] == "warn" + assert policy.loc[3, "policy_rule_id"] == "aca_warning_targets" + assert policy.loc[3, "tolerance_pct"] == 7.5 + + assert policy.loc[4, "enforcement"] == "diagnostic_only" + assert policy.loc[4, "loss_weight"] == 0.0 + assert not bool(policy.loc[4, "loss_enabled"]) + + +def test_target_policy_arrays_apply_scale_floors(): + targets = np.array([100.0, 20_000.0, 500.0, 1_000.0, 10_000_000.0]) + policy = build_target_policy( + _targets_df(), + config=load_target_policy_config(), + row_sums=np.ones(5), + ) + + weights, tolerances, scales = target_policy_arrays(policy, targets) + + assert weights[0] == 40.0 + assert tolerances[0] == 0.01 + assert scales[0] == 1_000.0 + assert scales[1] == 20_000.0 + assert scales[2] == 1_000_000.0 + + +def test_diagnostics_policy_enforces_only_hard_failures(): + policy = build_target_policy( + _targets_df().iloc[:4], + config=load_target_policy_config(), + row_sums=np.ones(4), + ) + diagnostics = pd.DataFrame( + { + "target": ["hh", "age", "aca_dollars", "aca_count"], + "true_value": [100.0, 100.0, 100.0, 100.0], + "estimate": [102.0, 103.0, 120.0, 110.0], + "rel_error": [0.02, 0.03, 0.20, 0.10], + "abs_rel_error": [0.02, 0.03, 0.20, 0.10], + "achievable": [True, True, True, True], + } + ) + + annotated = annotate_diagnostics_with_policy(diagnostics, policy) + + assert annotated.loc[0, "validation_status"] == "fail" + assert annotated.loc[1, "validation_status"] == "warn" + assert annotated.loc[2, "validation_status"] == "warn" + assert annotated.loc[3, "validation_status"] == "warn" + + try: + enforce_target_tolerances(annotated) + except ValueError as exc: + assert "hh" in str(exc) + else: + raise AssertionError("hard-fail target miss should raise") + + +def test_write_target_policy_artifacts(tmp_path): + policy = build_target_policy( + _targets_df().iloc[:2], + config=load_target_policy_config(), + row_sums=np.ones(2), + ) + + jsonl_path, summary_path = write_target_policy_artifacts(policy, tmp_path) + + assert jsonl_path.name == TARGET_POLICY_ARTIFACT + assert summary_path.name == TARGET_POLICY_SUMMARY_ARTIFACT + records = [json.loads(line) for line in jsonl_path.read_text().splitlines()] + summary = json.loads(summary_path.read_text()) + assert records[0]["enforcement"] == "fail" + assert summary["enforcement_counts"] == {"fail": 1, "warn": 1} diff --git a/tests/unit/calibration/test_unified_calibration.py b/tests/unit/calibration/test_unified_calibration.py index 41022a17b..b9a5e8fdb 100644 --- a/tests/unit/calibration/test_unified_calibration.py +++ b/tests/unit/calibration/test_unified_calibration.py @@ -713,6 +713,14 @@ def test_target_config_flag(self): args = parse_args(["--target-config", "config.yaml"]) assert args.target_config == "config.yaml" + def test_target_policy_flag(self): + from policyengine_us_data.calibration.unified_calibration import ( + parse_args, + ) + + args = parse_args(["--target-policy", "policy.yaml"]) + assert args.target_policy == "policy.yaml" + def test_all_active_targets_flag(self): from policyengine_us_data.calibration.unified_calibration import ( parse_args, @@ -800,6 +808,79 @@ def test_resume_flags(self): assert args_default.checkpoint_output is None +class TestDerivedPopulationAnchors: + def test_adds_total_person_count_from_age_bins(self): + import pandas as pd + from policyengine_us_data.calibration.unified_calibration import ( + add_derived_population_anchor_targets, + ) + + targets_df = pd.DataFrame( + { + "variable": ["person_count", "person_count", "household_count"], + "domain_variable": ["age", "age", ""], + "geo_level": ["district", "district", "district"], + "geographic_id": ["0101", "0101", "0101"], + "value": [100.0, 150.0, 90.0], + } + ) + X_sparse = sp.csr_matrix( + np.array( + [ + [1.0, 0.0, 2.0], + [0.0, 3.0, 0.0], + [1.0, 1.0, 1.0], + ], + dtype=np.float32, + ) + ) + + expanded_df, expanded_X, names = add_derived_population_anchor_targets( + targets_df, + X_sparse, + ["age_a", "age_b", "hh"], + ) + + assert len(expanded_df) == 4 + anchor = expanded_df.iloc[-1] + assert anchor["variable"] == "person_count" + assert anchor["domain_variable"] == "" + assert anchor["value"] == 250.0 + assert bool(anchor["derived_target"]) + np.testing.assert_array_equal( + expanded_X[-1].toarray(), + np.array([[1.0, 3.0, 2.0]]), + ) + assert names[-1] == "derived_person_count_district_0101" + + def test_does_not_duplicate_existing_total_person_count(self): + import pandas as pd + from policyengine_us_data.calibration.unified_calibration import ( + add_derived_population_anchor_targets, + ) + + targets_df = pd.DataFrame( + { + "variable": ["person_count", "person_count"], + "domain_variable": ["age", ""], + "geo_level": ["district", "district"], + "geographic_id": ["0101", "0101"], + "value": [100.0, 100.0], + } + ) + X_sparse = sp.csr_matrix(np.eye(2, dtype=np.float32)) + + expanded_df, expanded_X, names = add_derived_population_anchor_targets( + targets_df, + X_sparse, + ["age", "total"], + ) + + assert len(expanded_df) == 2 + assert expanded_X.shape == (2, 2) + assert names == ["age", "total"] + + class FakeSparseCalibrationWeights: fit_calls = [] @@ -840,8 +921,19 @@ def fit( verbose=False, verbose_freq=1, target_groups=None, + target_weights=None, + target_tolerances=None, + target_scales=None, ): - type(self).fit_calls.append({"target_groups": target_groups}) + type(self).fit_calls.append( + { + "target_groups": target_groups, + "target_weights": target_weights, + "target_tolerances": target_tolerances, + "target_scales": target_scales, + "loss_type": loss_type, + } + ) increment = float(epochs) + (self.alpha / 10.0) self.weights = self.weights + increment self.alpha = self.alpha + (10.0 * float(epochs)) @@ -931,6 +1023,44 @@ def test_passes_target_groups_to_logged_l0_fit(self, tmp_path): target_groups, ) + def test_passes_epsilon_policy_arrays_to_l0_model(self, tmp_path): + from policyengine_us_data.calibration.unified_calibration import ( + fit_l0_weights, + ) + + target_weights = np.array([40.0, 6.0], dtype=np.float64) + target_tolerances = np.array([0.01, 0.075], dtype=np.float64) + target_scales = np.array([1_000.0, 10_000.0], dtype=np.float64) + FakeSparseCalibrationWeights.fit_calls = [] + + with patch( + "l0.calibration.SparseCalibrationWeights", + FakeSparseCalibrationWeights, + ): + fit_l0_weights( + X_sparse=sp.csr_matrix(np.eye(2, dtype=np.float32)), + targets=np.array([1.0, 2.0], dtype=np.float64), + lambda_l0=1e-4, + epochs=1, + device="cpu", + target_names=["target_a", "target_b"], + initial_weights=np.array([1.0, 2.0], dtype=np.float64), + log_path=str(tmp_path / "calibration_log.csv"), + target_weights=target_weights, + target_tolerances=target_tolerances, + target_scales=target_scales, + calibration_loss_type="relative_epsilon", + ) + + fit_call = FakeSparseCalibrationWeights.fit_calls[-1] + assert fit_call["loss_type"] == "relative_epsilon" + np.testing.assert_array_equal(fit_call["target_weights"], target_weights) + np.testing.assert_array_equal( + fit_call["target_tolerances"], + target_tolerances, + ) + np.testing.assert_array_equal(fit_call["target_scales"], target_scales) + class TestFitResume: def _fit_kwargs(self, tmp_path): diff --git a/tests/unit/fixtures/calibration_package_stage_contract.py b/tests/unit/fixtures/calibration_package_stage_contract.py index f11640ee9..e8ad85f51 100644 --- a/tests/unit/fixtures/calibration_package_stage_contract.py +++ b/tests/unit/fixtures/calibration_package_stage_contract.py @@ -22,6 +22,7 @@ CALIBRATION_COMPLETED_AT = "2026-05-08T12:02:00Z" CALIBRATION_DURATION_S = 120.0 TARGET_CONFIG_PATH = "policyengine_us_data/calibration/target_config.yaml" +TARGET_POLICY_PATH = "policyengine_us_data/calibration/target_policy.yaml" CALIBRATION_BLOCK_GEOIDS = ("010010001", "010010002", "020010001") CALIBRATION_CD_GEOIDS = ("0101", "0102", "0201") @@ -67,6 +68,9 @@ def calibration_package_payload() -> dict[str, Any]: "db_sha256": "sha256:db", "target_config_path": TARGET_CONFIG_PATH, "target_config_sha256": "sha256:target-config", + "target_policy_path": TARGET_POLICY_PATH, + "target_policy_sha256": "sha256:target-policy", + "target_policy_schema_version": "1", "n_clones": 3, "seed": 42, "base_n_records": 1, @@ -80,6 +84,21 @@ def calibration_package_payload() -> dict[str, Any]: "initial_weights": np.array([1.0, 1.0, 1.0]), "cd_geoid": np.array(CALIBRATION_CD_GEOIDS), "block_geoid": np.array(CALIBRATION_BLOCK_GEOIDS), + "target_policy_df": pd.DataFrame( + { + "target_index": [0, 1], + "enforcement": ["fail", "warn"], + "priority": ["P0", "P2"], + "tolerance_pct": [1.0, 5.0], + "tolerance": [0.01, 0.05], + "scale_floor": [1_000.0, 25_000_000.0], + "loss_weight": [40.0, 6.0], + "policy_rule_id": ["default", "default"], + "policy_group_key": ["P0|fail", "P2|warn"], + "loss_enabled": [True, True], + "schema_version": ["1", "1"], + } + ), } @@ -148,6 +167,7 @@ def calibration_package_parameters() -> dict[str, Any]: "workers": None, "n_clones": 3, "target_config": TARGET_CONFIG_PATH, + "target_policy": TARGET_POLICY_PATH, "skip_county": True, "skip_source_impute": True, "skip_takeup_rerandomize": False, diff --git a/tests/unit/test_calibration_package_stage_contract.py b/tests/unit/test_calibration_package_stage_contract.py index f00f646da..8f6bbf1e2 100644 --- a/tests/unit/test_calibration_package_stage_contract.py +++ b/tests/unit/test_calibration_package_stage_contract.py @@ -1,5 +1,6 @@ from tests.unit.fixtures.calibration_package_stage_contract import ( TARGET_CONFIG_PATH, + TARGET_POLICY_PATH, calibration_package_contract, calibration_package_parameters, calibration_package_payload, @@ -60,6 +61,7 @@ def test_calibration_package_parameters_parse_runtime_args(): workers=8, n_clones=430, target_config_path=TARGET_CONFIG_PATH, + target_policy_path=TARGET_POLICY_PATH, skip_county=True, skip_source_impute=True, skip_takeup_rerandomize=False, @@ -79,6 +81,7 @@ def test_calibration_package_parameters_parse_runtime_args(): "skip_source_impute": True, "skip_takeup_rerandomize": False, "target_config": TARGET_CONFIG_PATH, + "target_policy": TARGET_POLICY_PATH, "workers": None, } @@ -89,6 +92,7 @@ def test_calibration_package_parameters_reject_inconsistent_chunk_shape(): workers=8, n_clones=430, target_config=None, + target_policy=None, skip_county=True, skip_source_impute=True, skip_takeup_rerandomize=False, @@ -204,6 +208,13 @@ def test_calibration_package_contract_records_matrix_summary(tmp_path): assert summary["n_targets"] == 2 assert summary["target_name_count"] == 2 assert summary["target_config_sha256"] == "sha256:target-config" + assert summary["target_policy_path"] == ( + "policyengine_us_data/calibration/target_policy.yaml" + ) + assert summary["target_policy_sha256"] == "sha256:target-policy" + assert summary["target_policy_schema_version"] == "1" + assert summary["target_policy_row_count"] == 2 + assert summary["has_target_policy"] is True assert summary["n_clones"] == 3 assert summary["seed"] == 42 assert summary["matrix_builder"] == "chunked" @@ -308,6 +319,7 @@ def test_calibration_package_summary_omits_bulky_payloads(): assert "initial_weights" not in summary assert "cd_geoid" not in summary assert "block_geoid" not in summary + assert "target_policy_df" not in summary def test_calibration_package_geography_summary_rejects_mismatched_arrays(): diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 69f67bb82..a7980dd97 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -35,6 +35,7 @@ def test_calibration_package_parameters_track_matrix_mode(): workers=50, n_clones=430, target_config=None, + target_policy="policyengine_us_data/calibration/target_policy.yaml", skip_county=True, chunked_matrix=True, chunk_size=10_000, @@ -45,6 +46,9 @@ def test_calibration_package_parameters_track_matrix_mode(): assert params["chunked_matrix"] is True assert "workers" not in params assert params["chunk_size"] == 10_000 + assert ( + params["target_policy"] == "policyengine_us_data/calibration/target_policy.yaml" + ) assert params["parallel_matrix"] is True assert params["num_matrix_workers"] == 25 @@ -54,6 +58,7 @@ def test_calibration_package_parameters_ignore_unused_matrix_options(): workers=50, n_clones=430, target_config=None, + target_policy="policyengine_us_data/calibration/target_policy.yaml", skip_county=True, chunked_matrix=False, chunk_size=10_000, diff --git a/tests/unit/test_remote_calibration_runner.py b/tests/unit/test_remote_calibration_runner.py index 77053dc78..10d5ba33e 100644 --- a/tests/unit/test_remote_calibration_runner.py +++ b/tests/unit/test_remote_calibration_runner.py @@ -58,6 +58,8 @@ def test_collect_outputs_returns_pipeline_artifact_bytes(tmp_path): log_path = tmp_path / "diag.csv" cal_log = tmp_path / "calibration.csv" config = tmp_path / "config.json" + target_policy = tmp_path / "target_policy.jsonl" + target_policy_summary = tmp_path / "target_policy_summary.json" paths_and_bytes = { weights: b"weights", @@ -65,6 +67,8 @@ def test_collect_outputs_returns_pipeline_artifact_bytes(tmp_path): log_path: b"log", cal_log: b"cal-log", config: b"config", + target_policy: b"policy", + target_policy_summary: b"policy-summary", } for path, content in paths_and_bytes.items(): path.write_bytes(content) @@ -76,6 +80,8 @@ def test_collect_outputs_returns_pipeline_artifact_bytes(tmp_path): f"LOG_PATH:{log_path}", f"CAL_LOG_PATH:{cal_log}", f"CONFIG_PATH:{config}", + f"TARGET_POLICY_PATH:{target_policy}", + f"TARGET_POLICY_SUMMARY_PATH:{target_policy_summary}", ] ) @@ -85,6 +91,8 @@ def test_collect_outputs_returns_pipeline_artifact_bytes(tmp_path): "log": b"log", "cal_log": b"cal-log", "config": b"config", + "target_policy": b"policy", + "target_policy_summary": b"policy-summary", } diff --git a/uv.lock b/uv.lock index f8701581c..1ebebab42 100644 --- a/uv.lock +++ b/uv.lock @@ -1226,17 +1226,13 @@ wheels = [ [[package]] name = "l0-python" -version = "0.5.0" -source = { registry = "https://pypi.org/simple" } +version = "0.6.2" +source = { git = "https://github.com/PolicyEngine/L0.git?rev=codex%2Fepsilon-insensitive-calibration-loss#0342225921b2c78679e1e57eef1941670352973d" } dependencies = [ { name = "numpy" }, { name = "scipy" }, { name = "torch" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/cf/6b/4a9ca6d1eb9828c526947fffb2ee2a1d02eec330f04cd53af301a05fde0a/l0_python-0.5.0.tar.gz", hash = "sha256:9b6b1751e142702e21ed866e40d8ab47304a26a5455998620a0eb798f4c7f599", size = 36320, upload-time = "2026-01-21T13:55:53.365Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/78/80/33ccae8af3fe55a81d33569d9241a29cecde17ab34fdff214804e81fa353/l0_python-0.5.0-py3-none-any.whl", hash = "sha256:9c8f4532426b927a97f4722b1c5114147adb09365100623effb49c0021345881", size = 23590, upload-time = "2026-01-21T13:55:52.406Z" }, -] [[package]] name = "lark" @@ -2197,7 +2193,7 @@ dev = [ requires-dist = [ { name = "google-auth", specifier = ">=2.0.0" }, { name = "google-cloud-storage", specifier = ">=2.0.0" }, - { name = "l0-python", marker = "extra == 'l0'" }, + { name = "l0-python", marker = "extra == 'l0'", git = "https://github.com/PolicyEngine/L0.git?rev=codex%2Fepsilon-insensitive-calibration-loss" }, { name = "microdf-python", specifier = ">=1.2.1" }, { name = "microimpute", specifier = ">=1.15.1" }, { name = "openpyxl", specifier = ">=3.1.5" },