diff --git a/src/microplex_us/pipelines/ecps_replacement_comparison.py b/src/microplex_us/pipelines/ecps_replacement_comparison.py index a59b167..6c0b22c 100644 --- a/src/microplex_us/pipelines/ecps_replacement_comparison.py +++ b/src/microplex_us/pipelines/ecps_replacement_comparison.py @@ -192,14 +192,12 @@ def build_sound_ecps_replacement_comparison( and candidate_score_error <= score_consistency_tol and baseline_score_error <= score_consistency_tol ) - ecps_refit_recovery_passed = ( - baseline_refit["optimized_full_loss"] + ecps_refit_recovery_passed = baseline_refit[ + "optimized_full_loss" + ] <= baseline_refit["initial_full_loss"] + score_consistency_tol and ( + baseline_score_loss is None + or baseline_score_loss <= baseline_refit["initial_full_loss"] + score_consistency_tol - and ( - baseline_score_loss is None - or baseline_score_loss - <= baseline_refit["initial_full_loss"] + score_consistency_tol - ) ) protected_family_losses = _protected_family_losses( @@ -232,6 +230,10 @@ def build_sound_ecps_replacement_comparison( support_audit_summary = ( _support_audit_summary(support_audit) if support_audit is not None else None ) + candidate_optimizer_summary = dict(candidate_refit["optimizer_summary"]) + baseline_optimizer_summary = dict(baseline_refit["optimizer_summary"]) + candidate_refit_progress = _refit_progress_summary(candidate_refit["loss_curve"]) + baseline_refit_progress = _refit_progress_summary(baseline_refit["loss_curve"]) score_summary.update( { @@ -252,6 +254,30 @@ def build_sound_ecps_replacement_comparison( "baseline_score_abs_error": baseline_score_error, "candidate_refit_config": refit_config, "baseline_refit_config": refit_config, + "candidate_refit_converged": bool( + candidate_optimizer_summary.get("converged", False) + ), + "baseline_refit_converged": bool( + baseline_optimizer_summary.get("converged", False) + ), + "candidate_refit_iterations": int( + candidate_optimizer_summary.get("iterations", 0) + ), + "baseline_refit_iterations": int( + baseline_optimizer_summary.get("iterations", 0) + ), + "candidate_refit_train_loss_improvement_last_step": ( + candidate_refit_progress["train_loss_improvement_last_step"] + ), + "baseline_refit_train_loss_improvement_last_step": ( + baseline_refit_progress["train_loss_improvement_last_step"] + ), + "candidate_refit_train_loss_improvement_last_20_steps": ( + candidate_refit_progress["train_loss_improvement_last_20_steps"] + ), + "baseline_refit_train_loss_improvement_last_20_steps": ( + baseline_refit_progress["train_loss_improvement_last_20_steps"] + ), "symmetric_refit": True, "score_candidate_only": False, "refit_objective_matches_scoring": objective_identity_passed, @@ -282,6 +308,12 @@ def build_sound_ecps_replacement_comparison( "score_candidate_only": False, "refit_objective_matches_scoring": objective_identity_passed, "ecps_refit_recovery_passed": ecps_refit_recovery_passed, + "candidate_refit_converged": bool( + candidate_optimizer_summary.get("converged", False) + ), + "baseline_refit_converged": bool( + baseline_optimizer_summary.get("converged", False) + ), "holdout_target_fraction": float(holdout_target_fraction), "holdout_targets": int(holdout_mask.sum()), "protected_family_losses": protected_family_losses, @@ -324,7 +356,9 @@ def build_sound_ecps_replacement_comparison( "train_targets": int((~holdout_mask).sum()), "holdout_targets": int(holdout_mask.sum()), "holdout_target_names": [ - name for name, holdout in zip(target_names, holdout_mask, strict=True) if holdout + name + for name, holdout in zip(target_names, holdout_mask, strict=True) + if holdout ], }, "refit_config": refit_config, @@ -386,7 +420,9 @@ def _write_matched_dataset( force: bool, ) -> None: if output_path.exists() and not force: - raise FileExistsError(f"{output_path} already exists; pass --force to replace it") + raise FileExistsError( + f"{output_path} already exists; pass --force to replace it" + ) _write_matched_policyengine_us_baseline_dataset( input_path, output_path, @@ -438,9 +474,7 @@ def _entity_structure_summary( period_key, ) if person_ids.shape[0] != person_household_ids.shape[0]: - raise ValueError( - f"{path} person_id and person_household_id lengths differ" - ) + raise ValueError(f"{path} person_id and person_household_id lengths differ") household_count = int(household_ids.shape[0]) summary: dict[str, Any] = { @@ -500,8 +534,7 @@ def _entity_membership_summary( ) if person_entity_ids.shape[0] != person_household_ids.shape[0]: raise ValueError( - f"{dataset_path} person_{entity}_id and person_household_id " - "lengths differ" + f"{dataset_path} person_{entity}_id and person_household_id lengths differ" ) unique_entity_ids = np.unique(entity_ids) duplicate_unit_id_count = int(entity_ids.shape[0] - unique_entity_ids.shape[0]) @@ -602,8 +635,10 @@ def _extract_pe_native_loss_inputs( check=False, ) if completed.returncode != 0: - detail = completed.stderr.strip() or completed.stdout.strip() or str( - completed.returncode + detail = ( + completed.stderr.strip() + or completed.stdout.strip() + or str(completed.returncode) ) raise RuntimeError(f"PE-native loss-matrix extraction failed: {detail}") return { @@ -761,6 +796,24 @@ def _objective(matrix: np.ndarray, target: np.ndarray, weights: np.ndarray) -> f return float(np.dot(residual, residual)) +def _refit_progress_summary( + loss_curve: list[dict[str, Any]], +) -> dict[str, float | None]: + if len(loss_curve) < 2: + return { + "train_loss_improvement_last_step": None, + "train_loss_improvement_last_20_steps": None, + } + last_train_loss = float(loss_curve[-1]["train_loss"]) + previous_train_loss = float(loss_curve[-2]["train_loss"]) + lookback_index = max(0, len(loss_curve) - 21) + lookback_train_loss = float(loss_curve[lookback_index]["train_loss"]) + return { + "train_loss_improvement_last_step": previous_train_loss - last_train_loss, + "train_loss_improvement_last_20_steps": lookback_train_loss - last_train_loss, + } + + def _protected_family_losses( *, target_names: list[str], @@ -771,7 +824,6 @@ def _protected_family_losses( ) -> dict[str, dict[str, float | int]]: candidate_terms = _loss_terms(candidate_inputs, candidate_weights) baseline_terms = _loss_terms(baseline_inputs, baseline_weights) - n_targets = float(len(target_names)) rows: dict[str, dict[str, float | int]] = {} for family, patterns in _PROTECTED_TARGET_PATTERNS.items(): indices = [ @@ -781,8 +833,8 @@ def _protected_family_losses( ] if not indices: continue - candidate_loss = float(candidate_terms[indices].sum() / n_targets) - baseline_loss = float(baseline_terms[indices].sum() / n_targets) + candidate_loss = float(candidate_terms[indices].sum()) + baseline_loss = float(baseline_terms[indices].sum()) rows[family] = { "n_targets": int(len(indices)), "candidate_loss": candidate_loss, @@ -861,6 +913,12 @@ def _target_loss_diagnostics( "baseline_relative_error": float( baseline_values["relative_error"][index] ), + "candidate_shifted_residual_ratio": float( + candidate_values["relative_error"][index] + ), + "baseline_shifted_residual_ratio": float( + baseline_values["relative_error"][index] + ), "candidate_loss_term": candidate_loss, "baseline_loss_term": baseline_loss, "loss_delta": float(loss_delta), @@ -921,9 +979,9 @@ def _target_value_diagnostics( target[native_mask] = np.asarray(unscaled_target, dtype=np.float64)[ native_mask ] - estimate[native_mask] = scaled_estimate[native_mask] / scaling_array[ - native_mask - ] + estimate[native_mask] = ( + scaled_estimate[native_mask] / scaling_array[native_mask] + ) value_scale[native_mask] = "native" if target.shape != estimate.shape: raise ValueError("target and estimate shapes differ") @@ -945,7 +1003,6 @@ def _target_family_breakdown( families: dict[str, list[dict[str, Any]]] = {} for row in target_rows: families.setdefault(str(row["family"]), []).append(row) - denominator = float(total_targets) if total_targets else 1.0 breakdown = [] for family, rows in sorted(families.items()): candidate_loss = sum(float(row["candidate_loss_term"]) for row in rows) @@ -954,19 +1011,13 @@ def _target_family_breakdown( { "family": family, "n_targets": int(len(rows)), - "train_targets": int( - sum(1 for row in rows if row["split"] == "train") - ), + "train_targets": int(sum(1 for row in rows if row["split"] == "train")), "holdout_targets": int( sum(1 for row in rows if row["split"] == "holdout") ), - "candidate_loss_contribution": float( - candidate_loss / denominator - ), - "baseline_loss_contribution": float(baseline_loss / denominator), - "loss_delta": float( - (candidate_loss - baseline_loss) / denominator - ), + "candidate_loss_contribution": float(candidate_loss), + "baseline_loss_contribution": float(baseline_loss), + "loss_delta": float(candidate_loss - baseline_loss), "candidate_wins": int( sum(1 for row in rows if row["winner"] == "candidate") ), @@ -976,7 +1027,9 @@ def _target_family_breakdown( "ties": int(sum(1 for row in rows if row["winner"] == "tie")), } ) - return sorted(breakdown, key=lambda row: abs(float(row["loss_delta"])), reverse=True) + return sorted( + breakdown, key=lambda row: abs(float(row["loss_delta"])), reverse=True + ) def _support_audit_summary(support_audit: dict[str, Any]) -> dict[str, Any]: @@ -1008,10 +1061,27 @@ def _support_audit_summary(support_audit: dict[str, Any]) -> dict[str, Any]: "top_medicare_part_b_by_age_gaps": _sort_rows_by_abs_delta( list(comparisons.get("medicare_part_b_premiums_by_age_delta") or ()), "weighted_positive_delta", + drop_zero=True, ), "top_aca_ptc_spending_gaps": _sort_rows_by_abs_delta( list(comparisons.get("state_aca_ptc_spending_top_gaps") or ()), "weighted_aca_ptc_delta", + drop_zero=True, + ), + "top_state_marketplace_enrollment_gaps": _sort_rows_by_abs_delta( + list(comparisons.get("state_marketplace_enrollment_top_gaps") or ()), + "weighted_marketplace_enrollment_delta", + drop_zero=True, + ), + "top_state_age_bucket_gaps": _sort_rows_by_abs_delta( + list(comparisons.get("state_age_bucket_top_gaps") or ()), + "weighted_count_delta", + drop_zero=True, + ), + "top_mfs_high_agi_gaps": _sort_rows_by_abs_delta( + list(comparisons.get("mfs_high_agi_delta") or ()), + "weighted_count_delta", + drop_zero=True, ), } @@ -1021,12 +1091,20 @@ def _sort_rows_by_abs_delta( delta_key: str, *, limit: int = 10, + drop_zero: bool = False, ) -> list[dict[str, Any]]: - return sorted( + sorted_rows = sorted( rows, key=lambda row: abs(float(row.get(delta_key, 0.0))), reverse=True, - )[:limit] + ) + if drop_zero: + sorted_rows = [ + row + for row in sorted_rows + if not np.isclose(float(row.get(delta_key, 0.0)), 0.0) + ] + return sorted_rows[:limit] def _loss_terms(loss_inputs: dict[str, Any], weights: np.ndarray) -> np.ndarray: @@ -1042,10 +1120,7 @@ def _target_matches_protected_family( patterns: tuple[str, ...], ) -> bool: normalized = ( - target_name.lower() - .replace("-", "_") - .replace(" ", "_") - .replace("/", "_") + target_name.lower().replace("-", "_").replace(" ", "_").replace("/", "_") ) if family == "wages" and ( "self_employment" in normalized or "business_income" in normalized diff --git a/src/microplex_us/pipelines/pe_native_optimization.py b/src/microplex_us/pipelines/pe_native_optimization.py index 163933c..e12d820 100644 --- a/src/microplex_us/pipelines/pe_native_optimization.py +++ b/src/microplex_us/pipelines/pe_native_optimization.py @@ -166,7 +166,7 @@ def _project_to_simplex(values: np.ndarray, total: float) -> np.ndarray: return values.copy() clipped = np.maximum(values.astype(np.float64, copy=False), 0.0) current_sum = float(clipped.sum()) - if np.isclose(current_sum, total): + if np.isclose(current_sum, total, rtol=0.0, atol=1e-6): return clipped if total <= 0.0: return np.zeros_like(clipped) @@ -246,7 +246,9 @@ def optimize_pe_native_loss_weights( initial_weight_sum = float(weights0.sum()) total_weight = ( - float(target_total_weight) if target_total_weight is not None else initial_weight_sum + float(target_total_weight) + if target_total_weight is not None + else initial_weight_sum ) weights = _project_to_budget_simplex(weights0, total_weight, budget) initial_reference = weights.copy() @@ -358,7 +360,9 @@ def rewrite_policyengine_us_dataset_weights( with h5py.File(output, "r+") as handle: household_ids = handle["household_id"][period_key][:] if len(household_ids) != len(weights): - raise ValueError("household_weights length does not match household_id array") + raise ValueError( + "household_weights length does not match household_id array" + ) household_map = { int(household_id): float(weight) for household_id, weight in zip(household_ids, weights, strict=True) @@ -368,7 +372,10 @@ def rewrite_policyengine_us_dataset_weights( if "person_weight" in handle and "person_household_id" in handle: person_households = handle["person_household_id"][period_key][:] person_weights = np.array( - [household_map[int(household_id)] for household_id in person_households], + [ + household_map[int(household_id)] + for household_id in person_households + ], dtype=np.float32, ) handle["person_weight"][period_key][...] = person_weights @@ -448,8 +455,10 @@ def optimize_policyengine_us_native_loss_dataset( check=False, ) if completed.returncode != 0: - detail = completed.stderr.strip() or completed.stdout.strip() or str( - completed.returncode + detail = ( + completed.stderr.strip() + or completed.stdout.strip() + or str(completed.returncode) ) raise RuntimeError(f"PE-native loss-matrix extraction failed: {detail}") diff --git a/tests/pipelines/test_ecps_replacement_comparison.py b/tests/pipelines/test_ecps_replacement_comparison.py index 626a339..d3e0f6c 100644 --- a/tests/pipelines/test_ecps_replacement_comparison.py +++ b/tests/pipelines/test_ecps_replacement_comparison.py @@ -50,9 +50,7 @@ def _write_minimal_policyengine_dataset( "family_id": {str(period): np.asarray([1000, 2000])}, "person_family_id": {str(period): np.asarray([1000, 1000, 2000])}, "marital_unit_id": {str(period): np.asarray([10000, 10001, 20000])}, - "person_marital_unit_id": { - str(period): np.asarray([10000, 10001, 20000]) - }, + "person_marital_unit_id": {str(period): np.asarray([10000, 10001, 20000])}, } return write_policyengine_us_time_period_dataset(arrays, path) @@ -184,6 +182,25 @@ def _fake_support_audit(**_kwargs) -> dict[str, object]: "weighted_aca_ptc_delta": -10.0, } ], + "state_marketplace_enrollment_top_gaps": [ + { + "state": "NY", + "weighted_marketplace_enrollment_delta": 9.0, + } + ], + "state_age_bucket_top_gaps": [ + { + "state": "TX", + "age_bucket": "80_to_84", + "weighted_count_delta": -8.0, + } + ], + "mfs_high_agi_delta": [ + { + "agi_bin": "500k_to_1m", + "weighted_count_delta": 7.0, + } + ], }, } @@ -212,8 +229,83 @@ def test_protected_family_losses_match_pe_native_labels_with_spaces(): ) assert rows["wages"]["n_targets"] == 1 + assert rows["wages"]["candidate_loss"] == pytest.approx(1.0) + assert rows["wages"]["baseline_loss"] == pytest.approx(0.0) + assert rows["wages"]["loss_delta"] == pytest.approx(1.0) assert rows["capital_gains"]["n_targets"] == 1 + assert rows["capital_gains"]["candidate_loss"] == pytest.approx(1.0) assert rows["household_net_income"]["n_targets"] == 1 + assert rows["household_net_income"]["candidate_loss"] == pytest.approx(1.0) + + +def test_target_family_breakdown_uses_headline_loss_scale(): + rows = [ + { + "family": "national_irs_other", + "split": "train", + "candidate_loss_term": 0.20, + "baseline_loss_term": 0.05, + "winner": "baseline", + }, + { + "family": "national_irs_other", + "split": "holdout", + "candidate_loss_term": 0.10, + "baseline_loss_term": 0.02, + "winner": "baseline", + }, + { + "family": "state_snap_cost", + "split": "train", + "candidate_loss_term": 0.01, + "baseline_loss_term": 0.02, + "winner": "candidate", + }, + ] + + breakdown = ecps._target_family_breakdown(rows, total_targets=100) + + irs_row = next(row for row in breakdown if row["family"] == "national_irs_other") + assert irs_row["candidate_loss_contribution"] == pytest.approx(0.30) + assert irs_row["baseline_loss_contribution"] == pytest.approx(0.07) + assert irs_row["loss_delta"] == pytest.approx(0.23) + + +def test_support_audit_summary_drops_zero_only_secondary_panels(): + summary = ecps._support_audit_summary( + { + "comparisons": { + "state_marketplace_enrollment_top_gaps": [ + { + "state": "CA", + "weighted_marketplace_enrollment_delta": 0.0, + } + ], + "state_age_bucket_top_gaps": [ + { + "state": "NY", + "weighted_count_delta": -3.0, + }, + { + "state": "CA", + "weighted_count_delta": 0.0, + }, + ], + "mfs_high_agi_delta": [ + { + "agi_bin": "500k_to_1m", + "weighted_count_delta": 0.0, + } + ], + } + } + ) + + assert summary["top_state_marketplace_enrollment_gaps"] == [] + assert summary["top_state_age_bucket_gaps"] == [ + {"state": "NY", "weighted_count_delta": -3.0} + ] + assert summary["top_mfs_high_agi_gaps"] == [] def _artifact_manifest(artifact_dir: Path, baseline_dataset: Path) -> None: @@ -314,9 +406,18 @@ def test_sound_ecps_replacement_comparison_satisfies_gate_contract( assert summary["score_candidate_only"] is False assert summary["refit_objective_matches_scoring"] is True assert summary["ecps_refit_recovery_passed"] is True - assert summary["candidate_enhanced_cps_native_loss"] < summary[ - "baseline_enhanced_cps_native_loss" - ] + assert isinstance(summary["candidate_refit_converged"], bool) + assert isinstance(summary["baseline_refit_converged"], bool) + assert summary["candidate_refit_iterations"] > 0 + assert summary["baseline_refit_iterations"] > 0 + assert ( + summary["candidate_refit_train_loss_improvement_last_step"] is None + or summary["candidate_refit_train_loss_improvement_last_step"] >= -1e-12 + ) + assert ( + summary["candidate_enhanced_cps_native_loss"] + < summary["baseline_enhanced_cps_native_loss"] + ) assert summary["holdout_targets"] > 0 assert set(summary["protected_family_losses"]) == { "ssi", @@ -333,12 +434,9 @@ def test_sound_ecps_replacement_comparison_satisfies_gate_contract( assert summary["protected_family_losses"]["wages"]["n_targets"] == 1 target_diagnostics = payload["target_diagnostics"] assert target_diagnostics["summary"]["n_targets"] == len(_TARGET_NAMES) - assert ( - target_diagnostics["summary"]["candidate_wins"] - + target_diagnostics["summary"]["baseline_wins"] - + target_diagnostics["summary"]["ties"] - == len(_TARGET_NAMES) - ) + assert target_diagnostics["summary"]["candidate_wins"] + target_diagnostics[ + "summary" + ]["baseline_wins"] + target_diagnostics["summary"]["ties"] == len(_TARGET_NAMES) assert target_diagnostics["summary"]["train_targets"] > 0 assert target_diagnostics["summary"]["holdout_targets"] > 0 assert target_diagnostics["top_regressions"] @@ -351,9 +449,12 @@ def test_sound_ecps_replacement_comparison_satisfies_gate_contract( assert "baseline_estimate" in first_target assert "candidate_relative_error" in first_target assert "baseline_relative_error" in first_target - assert { - row["split"] for row in target_diagnostics["targets"] - } == {"train", "holdout"} + assert "candidate_shifted_residual_ratio" in first_target + assert "baseline_shifted_residual_ratio" in first_target + assert {row["split"] for row in target_diagnostics["targets"]} == { + "train", + "holdout", + } assert target_diagnostics["family_breakdown"] support_summary = payload["summary"]["support_audit"] assert support_summary["top_filing_status_gaps"][0]["filing_status"] == ( @@ -366,6 +467,17 @@ def test_sound_ecps_replacement_comparison_satisfies_gate_contract( == "10_to_19" ) assert support_summary["top_aca_ptc_spending_gaps"][0]["state"] == "CA" + assert support_summary["top_state_marketplace_enrollment_gaps"][0]["state"] == "NY" + assert support_summary["top_state_age_bucket_gaps"][0]["state"] == "TX" + assert support_summary["top_mfs_high_agi_gaps"][0]["agi_bin"] == "500k_to_1m" + assert ( + payload["comparison_contract"]["candidate_refit_converged"] + == summary["candidate_refit_converged"] + ) + assert ( + payload["comparison_contract"]["baseline_refit_converged"] + == summary["baseline_refit_converged"] + ) structure = payload["entity_structure"]["candidate_matched"] assert structure["household_count"] == 2 assert structure["person_count"] == 3 diff --git a/tests/pipelines/test_pe_native_optimization.py b/tests/pipelines/test_pe_native_optimization.py index 0b3469c..e15ce86 100644 --- a/tests/pipelines/test_pe_native_optimization.py +++ b/tests/pipelines/test_pe_native_optimization.py @@ -114,6 +114,17 @@ def test_optimize_pe_native_loss_weights_respects_target_total_weight(): assert summary["optimized_loss"] < summary["initial_loss"] +def test_project_to_simplex_does_not_accept_relative_weight_sum_drift(): + """Population-sized sums still need exact projection, not np.isclose rtol drift.""" + total = 153_767_768.0 + values = np.asarray([153_768_768.0, 0.0], dtype=np.float64) + + projected = pe_opt._project_to_simplex(values, total) + + assert np.isclose(projected.sum(), total, rtol=0.0, atol=1e-6) + assert projected[0] == total + + def test_optimize_pe_native_loss_weights_rejects_objective_increasing_steps( monkeypatch, ): @@ -155,7 +166,9 @@ def test_rewrite_policyengine_us_dataset_weights_updates_group_weights(tmp_path: assert rewritten == output_path.resolve() with h5py.File(output_path, "r") as handle: - assert np.allclose(handle["household_weight"]["2024"][:], np.asarray([7.0, 3.0])) + assert np.allclose( + handle["household_weight"]["2024"][:], np.asarray([7.0, 3.0]) + ) assert np.allclose( handle["person_weight"]["2024"][:], np.asarray([7.0, 7.0, 3.0]), @@ -163,7 +176,9 @@ def test_rewrite_policyengine_us_dataset_weights_updates_group_weights(tmp_path: assert np.allclose(handle["tax_unit_weight"]["2024"][:], np.asarray([7.0, 3.0])) -def test_optimize_policyengine_us_native_loss_dataset_rewrites_dataset(tmp_path: Path, monkeypatch): +def test_optimize_policyengine_us_native_loss_dataset_rewrites_dataset( + tmp_path: Path, monkeypatch +): source_path = _build_stub_dataset(tmp_path / "input.h5") output_path = tmp_path / "optimized.h5"