Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 311 additions & 16 deletions src/microplex_us/pipelines/ecps_replacement_comparison.py

Large diffs are not rendered by default.

71 changes: 51 additions & 20 deletions src/microplex_us/pipelines/pe_native_calibration_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
import h5py
import numpy as np

from microplex_us.pipelines.pe_native_loss import loss_arrays_from_inputs
from microplex_us.pipelines.pe_native_optimization import (
_PE_NATIVE_BROAD_MATRIX_SCRIPT,
optimize_pe_native_loss_weights,
rewrite_policyengine_us_dataset_weights,
)
from microplex_us.pipelines.pe_native_scores import (
_DEFAULT_PE_NATIVE_BASELINE_CACHE_DIR,
_ENHANCED_CPS_BAD_TARGETS,
build_policyengine_us_data_subprocess_env,
compute_batch_us_pe_native_scores,
resolve_policyengine_us_data_repo_root,
validate_policyengine_us_data_runtime,
)

_DEFAULT_PE_NATIVE_BASELINE_CACHE_DIR = (
Path.home() / ".cache" / "microplex-us" / "pe-native-baseline"
)


Expand Down Expand Up @@ -260,18 +263,14 @@ def _extract_pe_native_loss_inputs(
policyengine_us_data_repo: str | Path | None,
policyengine_us_data_python: str | Path | None,
skip_tax_expenditure_targets: bool,
target_scope_filter: str | None,
) -> dict[str, Any]:
resolved_repo = resolve_policyengine_us_data_repo_root(policyengine_us_data_repo)
env = build_policyengine_us_data_subprocess_env(resolved_repo)
if policyengine_us_data_python is not None:
command = [str(Path(policyengine_us_data_python).expanduser())]
else:
command = ["uv", "run", "--project", str(resolved_repo), "python"]
validate_policyengine_us_data_runtime(
command,
repo_root=resolved_repo,
env=env,
)
_log("extracting PE-native loss matrix")
with TemporaryDirectory(prefix="microplex-us-pe-native-benchmark-") as temp_dir:
prefix = Path(temp_dir) / "pe_native_matrix"
Expand All @@ -287,6 +286,7 @@ def _extract_pe_native_loss_inputs(
str(Path(input_dataset_path).expanduser().resolve()),
"1" if skip_tax_expenditure_targets else "0",
str(prefix),
target_scope_filter or "",
],
cwd=resolved_repo,
env=env,
Expand All @@ -295,15 +295,35 @@ def _extract_pe_native_loss_inputs(
check=False,
)
if completed.returncode != 0:
detail = completed.stderr.strip() or completed.stdout.strip() or str(
completed.returncode
detail = (
completed.stderr.strip()
or completed.stdout.strip()
or str(completed.returncode)
)
raise RuntimeError(f"PE-native loss-matrix extraction failed: {detail}")
_log(f"extracted PE-native loss matrix in {perf_counter() - started_at:.1f}s")
return {
"scaled_matrix": np.load(prefix.with_suffix(".matrix.npy")),
"scaled_target": np.load(prefix.with_suffix(".target.npy")),
"initial_weights": np.load(prefix.with_suffix(".weights.npy")),
"unscaled_target": np.load(prefix.with_suffix(".target_unscaled.npy")),
"loss_denominator": np.load(prefix.with_suffix(".loss_denominator.npy")),
"loss_target_weight": np.load(
prefix.with_suffix(".loss_target_weight.npy")
),
"loss_bucket": np.load(
prefix.with_suffix(".loss_bucket.npy"), allow_pickle=True
),
"loss_unit": np.load(
prefix.with_suffix(".loss_unit.npy"), allow_pickle=True
),
"loss_scope": np.load(
prefix.with_suffix(".loss_scope.npy"), allow_pickle=True
),
"loss_family": np.load(
prefix.with_suffix(".loss_family.npy"), allow_pickle=True
),
"loss_epsilon": np.load(prefix.with_suffix(".loss_epsilon.npy")),
"metadata": json.loads(prefix.with_suffix(".meta.json").read_text()),
}

Expand All @@ -326,6 +346,7 @@ def build_policyengine_us_native_calibration_benchmark(
batch_households: int | None = None,
baseline_cache_dir: str | Path | None = _DEFAULT_PE_NATIVE_BASELINE_CACHE_DIR,
skip_tax_expenditure_targets: bool = False,
target_scope_filter: str | None = None,
force: bool = False,
) -> dict[str, Any]:
"""Run and score PE-native calibration variants against one baseline."""
Expand Down Expand Up @@ -369,6 +390,7 @@ def build_policyengine_us_native_calibration_benchmark(
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
skip_tax_expenditure_targets=skip_tax_expenditure_targets,
target_scope_filter=target_scope_filter,
)
if l2_penalties
else None
Expand All @@ -390,6 +412,7 @@ def build_policyengine_us_native_calibration_benchmark(
scaled_matrix=loss_inputs["scaled_matrix"],
scaled_target=loss_inputs["scaled_target"],
initial_weights=loss_inputs["initial_weights"],
loss_arrays=loss_arrays_from_inputs(loss_inputs),
budget=budget,
max_iter=max_iter,
l2_penalty=penalty,
Expand Down Expand Up @@ -420,9 +443,7 @@ def build_policyengine_us_native_calibration_benchmark(
"initial_weight_sum": float(summary["initial_weight_sum"]),
"optimized_weight_sum": float(summary["optimized_weight_sum"]),
"household_count": int(summary["household_count"]),
"positive_household_count": int(
summary["positive_household_count"]
),
"positive_household_count": int(summary["positive_household_count"]),
"budget": summary["budget"],
"converged": bool(summary["converged"]),
"iterations": int(summary["iterations"]),
Expand All @@ -436,7 +457,12 @@ def build_policyengine_us_native_calibration_benchmark(
"l2_penalty": penalty,
"target_total_weight": resolved_target_total_weight,
"target_total_weight_resolved_from": target_total_weight_resolved_from,
"optimizer_method": summary.get("method"),
"step_size": summary.get("step_size"),
"initial_step_size": summary.get("initial_step_size"),
"line_search_backtracking_steps": summary.get(
"line_search_backtracking_steps"
),
"history_interval": summary.get("history_interval"),
"loss_history": summary.get("loss_history", []),
"reused_existing_output": False,
Expand Down Expand Up @@ -479,9 +505,7 @@ def build_policyengine_us_native_calibration_benchmark(
period=period,
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
batch_households=batch_households,
baseline_cache_dir=baseline_cache_dir,
skip_tax_expenditure_targets=skip_tax_expenditure_targets,
target_scope_filter=target_scope_filter,
)
_log(f"scored variants in {perf_counter() - scoring_started_at:.1f}s")
scores_by_dataset = {
Expand Down Expand Up @@ -525,6 +549,7 @@ def build_policyengine_us_native_calibration_benchmark(
"baseline_dataset": str(baseline_path),
"output_dir": str(destination),
"skip_tax_expenditure_targets": bool(skip_tax_expenditure_targets),
"target_scope_filter": target_scope_filter,
"target_total_weight": resolved_target_total_weight,
"target_total_weight_resolved_from": target_total_weight_resolved_from,
"budget": None if budget is None else int(budget),
Expand All @@ -534,11 +559,7 @@ def build_policyengine_us_native_calibration_benchmark(
"baseline_enhanced_cps_native_loss": baseline_loss,
"best_variant_label": ranked_rows[0]["label"] if ranked_rows else None,
"best_variant_loss": (
float(
ranked_rows[0]["score_summary"][
"candidate_enhanced_cps_native_loss"
]
)
float(ranked_rows[0]["score_summary"]["candidate_enhanced_cps_native_loss"])
if ranked_rows
else None
),
Expand Down Expand Up @@ -633,6 +654,11 @@ def main(argv: list[str] | None = None) -> int:
"--skip-tax-expenditure-targets",
action="store_true",
)
parser.add_argument(
"--target-scope-filter",
choices=("national", "state"),
help="Restrict PE-native optimization/scoring to a target scope.",
)
parser.add_argument(
"--force",
action="store_true",
Expand Down Expand Up @@ -664,12 +690,17 @@ def main(argv: list[str] | None = None) -> int:
batch_households=args.batch_households,
baseline_cache_dir=args.baseline_cache_dir or None,
skip_tax_expenditure_targets=args.skip_tax_expenditure_targets,
target_scope_filter=args.target_scope_filter,
force=args.force,
)
print(str(written))
return 0


if __name__ == "__main__":
raise SystemExit(main())


__all__ = [
"CalibrationBenchmarkVariant",
"build_policyengine_us_native_calibration_benchmark",
Expand Down
Loading
Loading