From 8d3243ce1b3576284dc260b958afa3db2c1fba23 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 1 Jun 2026 18:07:28 -0400 Subject: [PATCH] Add PUF support clone semantics for eCPS parity --- src/microplex_us/data_sources/puf.py | 573 +++++++++++++++--- .../pipelines/pe_us_data_rebuild.py | 2 + src/microplex_us/pipelines/us.py | 549 +++++++++++++++++ tests/pipelines/test_pe_us_data_rebuild.py | 29 +- tests/pipelines/test_us.py | 368 ++++++++++- tests/test_puf_source_provider.py | 262 +++++++- 6 files changed, 1683 insertions(+), 100 deletions(-) diff --git a/src/microplex_us/data_sources/puf.py b/src/microplex_us/data_sources/puf.py index aacfae6..5747991 100644 --- a/src/microplex_us/data_sources/puf.py +++ b/src/microplex_us/data_sources/puf.py @@ -7,6 +7,7 @@ from __future__ import annotations import pickle +import re import subprocess import sys import tempfile @@ -49,6 +50,7 @@ try: from huggingface_hub import hf_hub_download + HF_AVAILABLE = True except ImportError: HF_AVAILABLE = False @@ -98,6 +100,32 @@ "capital_gains_losses": "E01000", "partnership_and_s_corp_losses": "E26270", } +PUF_AGGREGATE_RECIDS = (999996, 999997, 999998, 999999) +PUF_SYNTHETIC_RECID_START = 1_000_000 +PUF_AGGREGATE_SCREENED_FIELDS = ( + "E00200", # Wages + "P23250", # Long-term capital gains + "P22250", # Short-term capital gains + "E00650", # Qualified dividends + "E00300", # Taxable interest + "E26270", # Partnership / S-corp + "E00900", # Business income + "E02100", # Farm income + "E00400", # Tax-exempt interest + "E00600", # Ordinary dividends +) +PUF_AGGREGATE_BUCKET_BOUNDS = { + 999996: (-np.inf, 0.0), + 999997: (0.0, 10_000_000.0), + 999998: (10_000_000.0, 100_000_000.0), + 999999: (100_000_000.0, np.inf), +} +PUF_AGGREGATE_STRUCTURAL_COLUMNS = ("MARS", "XTOT", "DSI", "EIC") +_PUF_AMOUNT_COLUMN_PATTERN = re.compile(r"^(?:[EPT]\d+|S\d{5})$") +_PUF_AGGREGATE_AGI_CAP_100M_PLUS = 1_250_000_000.0 +_PUF_AGGREGATE_MAX_AGI_DOMINANCE = 0.20 +_PUF_AGGREGATE_SELECTION_POWER = 24 +_PUF_NUMERIC_TOL = 1e-9 PE_PUF_REMAINING_RAW_COLUMNS = ( "E03500", "E00800", @@ -364,6 +392,7 @@ def predict(self, X_test: pd.DataFrame) -> pd.DataFrame: "XTOT", ) + def download_puf(cache_dir: Path | None = None) -> Path: """Download PUF from HuggingFace. @@ -420,19 +449,377 @@ def _resolve_policyengine_repo_local_puf_paths( puf_path = candidate_dir / "puf_2015.csv" demographics_path = candidate_dir / "demographics_2015.csv" if puf_path.exists(): - return puf_path, ( - demographics_path if demographics_path.exists() else None - ) + return puf_path, (demographics_path if demographics_path.exists() else None) return None +def _puf_aggregate_amount_columns(columns: pd.Index | list[str]) -> list[str]: + return [column for column in columns if _PUF_AMOUNT_COLUMN_PATTERN.match(column)] + + +def _puf_aggregate_bucket_mask(df: pd.DataFrame, recid: int) -> pd.Series: + if "E00100" not in df.columns: + return pd.Series(True, index=df.index) + agi = pd.to_numeric(df["E00100"], errors="coerce").fillna(0.0) + lower, upper = PUF_AGGREGATE_BUCKET_BOUNDS.get(recid, (-np.inf, np.inf)) + return agi.ge(lower) & agi.lt(upper) + + +def _puf_aggregate_eligibility_scores( + df: pd.DataFrame, + reference: pd.DataFrame | None = None, +) -> pd.Series: + reference_frame = df if reference is None else reference + present_fields = [ + field + for field in PUF_AGGREGATE_SCREENED_FIELDS + if field in df.columns and field in reference_frame.columns + ] + if not present_fields: + return pd.Series(0.0, index=df.index, dtype=float) + + scores = np.zeros(len(df), dtype=float) + for raw_field in present_fields: + values = pd.to_numeric(df[raw_field], errors="coerce").fillna(0.0) + reference_values = pd.to_numeric( + reference_frame[raw_field], errors="coerce" + ).fillna(0.0) + field_scores = np.zeros(len(df), dtype=float) + + positive_mask = values.gt(0) + reference_positive = np.sort( + reference_values[reference_values.gt(0)].to_numpy() + ) + if bool(positive_mask.any()) and len(reference_positive) > 0: + field_scores[positive_mask.to_numpy()] = np.searchsorted( + reference_positive, + values[positive_mask].to_numpy(), + side="right", + ) / len(reference_positive) + + negative_mask = values.lt(0) + reference_negative = np.sort( + (-reference_values[reference_values.lt(0)]).to_numpy() + ) + if bool(negative_mask.any()) and len(reference_negative) > 0: + negative_scores = np.searchsorted( + reference_negative, + (-values[negative_mask]).to_numpy(), + side="right", + ) / len(reference_negative) + field_scores[negative_mask.to_numpy()] = np.maximum( + field_scores[negative_mask.to_numpy()], + negative_scores, + ) + + scores = np.maximum(scores, field_scores) + return pd.Series(scores, index=df.index, dtype=float) + + +def _choose_puf_aggregate_synthetic_count(pop_weight: float) -> int: + total_weight = max(1, int(round(pop_weight))) + target_count = max(20, round(pop_weight / 10)) + return int(min(40, total_weight, target_count)) + + +def _assign_puf_aggregate_weights( + pop_weight: float, + n_records: int, + rng: np.random.Generator, +) -> np.ndarray: + total_weight = max(1, int(round(pop_weight))) + n_records = max(1, min(int(n_records), total_weight)) + weights = np.ones(n_records, dtype=int) + remainder = total_weight - n_records + if remainder > 0: + base_extra = remainder // n_records + weights += base_extra + leftover = remainder - base_extra * n_records + if leftover: + weights[rng.choice(n_records, size=leftover, replace=False)] += 1 + return weights + + +def _project_puf_weighted_sum_to_bounds( + values: np.ndarray, + weights: np.ndarray, + target_total: float, + lower: np.ndarray, + upper: np.ndarray, + max_iter: int = 50, +) -> np.ndarray: + projected = np.clip(values.astype(float), lower, upper) + + for _ in range(max_iter): + residual = float(target_total - np.dot(projected, weights)) + if abs(residual) <= 1e-6: + return projected + + slack = upper - projected if residual > 0 else projected - lower + free = slack > _PUF_NUMERIC_TOL + if not bool(free.any()): + break + + basis = np.abs(projected[free]) + if basis.sum() <= _PUF_NUMERIC_TOL: + basis = np.ones(free.sum(), dtype=float) + denom = float(np.dot(weights[free], basis)) + if denom <= _PUF_NUMERIC_TOL: + basis = np.ones(free.sum(), dtype=float) + denom = float(weights[free].sum()) + + delta = residual * basis / denom + if residual > 0: + delta = np.minimum(delta, slack[free]) + else: + delta = -np.minimum(-delta, slack[free]) + projected[free] += delta + projected = np.clip(projected, lower, upper) + + return projected + + +def _allocate_puf_weighted_values( + base_values: np.ndarray, + weights: np.ndarray, + target_total: float, + lower: np.ndarray | float | None = None, + upper: np.ndarray | float | None = None, +) -> np.ndarray: + base_values = np.asarray(base_values, dtype=float) + weights = np.asarray(weights, dtype=float) + n = len(base_values) + if abs(target_total) <= 1e-6: + return np.zeros(n, dtype=float) + + if target_total > 0 and np.any(base_values > 0): + active = base_values > 0 + elif target_total < 0 and np.any(base_values < 0): + active = base_values < 0 + elif np.any(np.abs(base_values) > _PUF_NUMERIC_TOL): + active = np.abs(base_values) > _PUF_NUMERIC_TOL + else: + active = np.ones(n, dtype=bool) + + allocated = np.zeros(n, dtype=float) + magnitudes = np.abs(base_values[active]) + if magnitudes.sum() <= _PUF_NUMERIC_TOL: + magnitudes = np.ones(active.sum(), dtype=float) + denom = float(np.dot(weights[active], magnitudes)) + if denom <= _PUF_NUMERIC_TOL: + magnitudes = np.ones(active.sum(), dtype=float) + denom = float(weights[active].sum()) + + allocated[active] = np.sign(target_total) * magnitudes * abs(target_total) / denom + if lower is None and upper is None: + return allocated + + if lower is None: + lower_array = np.full(n, -np.inf, dtype=float) + elif np.isscalar(lower): + lower_array = np.full(n, float(lower), dtype=float) + else: + lower_array = np.asarray(lower, dtype=float) + + if upper is None: + upper_array = np.full(n, np.inf, dtype=float) + elif np.isscalar(upper): + upper_array = np.full(n, float(upper), dtype=float) + else: + upper_array = np.asarray(upper, dtype=float) + + return _project_puf_weighted_sum_to_bounds( + allocated, + weights, + target_total, + lower_array, + upper_array, + ) + + +def _allocate_puf_aggregate_agi( + donor_agi: np.ndarray, + weights: np.ndarray, + recid: int, + target_total: float, +) -> np.ndarray: + donor_agi = np.asarray(donor_agi, dtype=float) + weights = np.asarray(weights, dtype=float) + dominance_cap = _PUF_AGGREGATE_MAX_AGI_DOMINANCE * abs(target_total) / weights + n = len(donor_agi) + + if recid == 999996: + lower = -dominance_cap + upper = np.zeros(n, dtype=float) + else: + bucket_lower, bucket_upper = PUF_AGGREGATE_BUCKET_BOUNDS[recid] + if np.isinf(bucket_upper): + bucket_upper = _PUF_AGGREGATE_AGI_CAP_100M_PLUS + lower = np.full(n, max(float(bucket_lower), 0.0), dtype=float) + upper = np.minimum(np.full(n, float(bucket_upper), dtype=float), dominance_cap) + + return _allocate_puf_weighted_values( + base_values=np.abs(donor_agi), + weights=weights, + target_total=target_total, + lower=lower, + upper=upper, + ) + + +def _sample_puf_aggregate_donors( + donor_bucket: pd.DataFrame, + donor_scores: pd.Series, + target_mean_agi: float, + n_records: int, + rng: np.random.Generator, +) -> pd.DataFrame: + scores = donor_scores.loc[donor_bucket.index].to_numpy(dtype=float) + score_mass = np.clip(scores, 1e-6, None) ** _PUF_AGGREGATE_SELECTION_POWER + donor_abs_agi = np.abs(donor_bucket["E00100"].to_numpy(dtype=float)) + target_abs_agi = max(abs(float(target_mean_agi)), 1.0) + agi_distance = np.abs(np.log1p(donor_abs_agi) - np.log1p(target_abs_agi)) + probabilities = score_mass * np.sqrt(1.0 / (1.0 + agi_distance)) + if not np.isfinite(probabilities).all() or probabilities.sum() <= 0: + probabilities = np.ones(len(donor_bucket), dtype=float) + probabilities = probabilities / probabilities.sum() + + selected_index = rng.choice( + donor_bucket.index.to_numpy(), + size=n_records, + replace=len(donor_bucket) < n_records, + p=probabilities, + ) + return donor_bucket.loc[selected_index].reset_index(drop=True).copy() + + +def _disaggregate_puf_aggregate_bucket( + recid: int, + row: pd.Series, + regular: pd.DataFrame, + amount_columns: list[str], + donor_scores: pd.Series, + next_recid: int, + rng: np.random.Generator, +) -> pd.DataFrame: + pop_weight = float(row["S006"]) / 100.0 + target_mean_agi = float(row["E00100"]) + target_total_agi = pop_weight * target_mean_agi + donor_bucket = regular[_puf_aggregate_bucket_mask(regular, recid)].copy() + if donor_bucket.empty: + donor_bucket = regular.copy() + + total_weight = max(1, int(round(pop_weight))) + n_records = min(_choose_puf_aggregate_synthetic_count(pop_weight), total_weight) + synthetic_weights = _assign_puf_aggregate_weights( + pop_weight, n_records, rng + ).astype(float) + selected = _sample_puf_aggregate_donors( + donor_bucket=donor_bucket, + donor_scores=donor_scores, + target_mean_agi=target_mean_agi, + n_records=n_records, + rng=rng, + ) + selected = selected.astype( + {column: float for column in amount_columns if column in selected.columns} + ) + + synthetic = selected.copy() + synthetic["RECID"] = np.arange(next_recid, next_recid + n_records, dtype=int) + synthetic["S006"] = (synthetic_weights.astype(int) * 100).astype(int) + + for column in PUF_AGGREGATE_STRUCTURAL_COLUMNS: + if column in synthetic.columns: + synthetic[column] = selected[column].round().astype(int) + if "MARS" in synthetic.columns and "XTOT" in synthetic.columns: + joint_mask = synthetic["MARS"] == 2 + synthetic.loc[joint_mask, "XTOT"] = np.maximum( + synthetic.loc[joint_mask, "XTOT"], + 2, + ) + synthetic["XTOT"] = synthetic["XTOT"].clip(lower=0, upper=5).astype(int) + + synthetic["E00100"] = _allocate_puf_aggregate_agi( + donor_agi=selected["E00100"].to_numpy(dtype=float), + weights=synthetic_weights, + recid=recid, + target_total=target_total_agi, + ) + for column in amount_columns: + if column == "E00100": + continue + target_value = float(row.get(column, 0.0)) + if not np.isfinite(target_value): + target_value = 0.0 + synthetic[column] = _allocate_puf_weighted_values( + base_values=selected[column].to_numpy(dtype=float), + weights=synthetic_weights, + target_total=pop_weight * target_value, + ) + return synthetic + + +def disaggregate_puf_aggregate_records( + puf: pd.DataFrame, + *, + seed: int = 42, +) -> pd.DataFrame: + """Replace IRS aggregate PUF rows with calibrated synthetic donor records.""" + if not {"RECID", "MARS", "S006", "E00100"}.issubset(puf.columns): + return puf + + recid = pd.to_numeric(puf["RECID"], errors="coerce").astype("Int64") + aggregate_mask = recid.isin(PUF_AGGREGATE_RECIDS) | pd.to_numeric( + puf["MARS"], errors="coerce" + ).eq(0) + if not bool(aggregate_mask.any()): + return puf + + regular = puf.loc[~aggregate_mask].copy() + if regular.empty: + return puf.loc[~pd.to_numeric(puf["MARS"], errors="coerce").eq(0)].copy() + + aggregate_rows = puf.loc[aggregate_mask].copy() + amount_columns = _puf_aggregate_amount_columns(puf.columns) + donor_scores = _puf_aggregate_eligibility_scores(regular) + rng = np.random.default_rng(seed) + next_recid = PUF_SYNTHETIC_RECID_START + synthetic_rows: list[pd.DataFrame] = [] + + for _, row in aggregate_rows.sort_values("RECID").iterrows(): + row_recid = int(row["RECID"]) + if row_recid not in PUF_AGGREGATE_BUCKET_BOUNDS: + continue + synthetic = _disaggregate_puf_aggregate_bucket( + recid=row_recid, + row=row, + regular=regular, + amount_columns=amount_columns, + donor_scores=donor_scores, + next_recid=next_recid, + rng=rng, + ) + next_recid += len(synthetic) + synthetic_rows.append(synthetic[puf.columns]) + + if not synthetic_rows: + return regular.reset_index(drop=True) + + synthetic_frame = pd.concat(synthetic_rows, ignore_index=True) + print( + f"Disaggregated {int(aggregate_mask.sum())} aggregate PUF records into " + f"{len(synthetic_frame)} synthetic records" + ) + return pd.concat([regular, synthetic_frame], ignore_index=True) + + def load_puf_raw(puf_path: Path, demographics_path: Path | None = None) -> pd.DataFrame: """Load raw PUF data from CSV.""" print(f"Loading PUF from {puf_path}...") puf = pd.read_csv(puf_path) - # Filter out aggregate records (MARS=0) - puf = puf[puf["MARS"] != 0].copy() + puf = disaggregate_puf_aggregate_records(puf) print(f" Raw records: {len(puf):,}") @@ -458,8 +845,7 @@ def _normalize_puf_uprating_mode(mode: str | None) -> str: } if resolved not in allowed: raise ValueError( - "puf uprating mode must be one of " - f"{sorted(allowed)}; got {mode!r}" + f"puf uprating mode must be one of {sorted(allowed)}; got {mode!r}" ) return resolved @@ -492,9 +878,7 @@ def _resolve_pe_uprating_factors_path( policyengine_us_data_repo: str | Path | None = None, ) -> Path: if policyengine_us_data_repo is None: - raise ValueError( - "PE forward uprating requires policyengine_us_data_repo" - ) + raise ValueError("PE forward uprating requires policyengine_us_data_repo") resolved = ( Path(policyengine_us_data_repo) / "policyengine_us_data" @@ -523,7 +907,9 @@ def _get_pe_soi_aggregate( *, is_count: bool, ) -> float: - lookup_variable = "count" if variable == "adjusted_gross_income" and is_count else variable + lookup_variable = ( + "count" if variable == "adjusted_gross_income" and is_count else variable + ) rows = soi_table[ (soi_table["Variable"] == lookup_variable) & (soi_table["Year"] == year) @@ -668,9 +1054,7 @@ def uprate_mapped_puf_with_pe_factors( start_column = str(from_year) end_column = str(to_year) if start_column not in factors.columns or end_column not in factors.columns: - raise ValueError( - f"PE uprating factors do not cover {from_year} -> {to_year}" - ) + raise ValueError(f"PE uprating factors do not cover {from_year} -> {to_year}") factor_lookup = factors.set_index("Variable") result = puf.copy() for column in result.columns: @@ -686,18 +1070,16 @@ def uprate_mapped_puf_with_pe_factors( "qualified_dividend_income", "non_qualified_dividend_income", }.issubset(result.columns): - result["ordinary_dividend_income"] = ( - result["qualified_dividend_income"].fillna(0.0) - + result["non_qualified_dividend_income"].fillna(0.0) - ) + result["ordinary_dividend_income"] = result["qualified_dividend_income"].fillna( + 0.0 + ) + result["non_qualified_dividend_income"].fillna(0.0) if { "taxable_pension_income", "tax_exempt_pension_income", }.issubset(result.columns): - result["total_pension_income"] = ( - result["taxable_pension_income"].fillna(0.0) - + result["tax_exempt_pension_income"].fillna(0.0) - ) + result["total_pension_income"] = result["taxable_pension_income"].fillna( + 0.0 + ) + result["tax_exempt_pension_income"].fillna(0.0) return result @@ -719,7 +1101,9 @@ def _impute_missing_puf_demographics(puf: pd.DataFrame) -> pd.DataFrame: return puf train = ( - puf.loc[observed_mask, [*PUF_DEMOGRAPHIC_PREDICTORS, *PUF_DEMOGRAPHIC_VARIABLES]] + puf.loc[ + observed_mask, [*PUF_DEMOGRAPHIC_PREDICTORS, *PUF_DEMOGRAPHIC_VARIABLES] + ] .copy() .fillna(0) ) @@ -785,14 +1169,13 @@ def map_puf_variables( # Preserve rental losses as negative values so downstream PE targets can # recover rent-and-royalty loss cells. - result["rental_income"] = ( - result.get("rental_income_positive", 0).fillna(0) + - -result.get("rental_income_negative", 0).fillna(0) - ) + result["rental_income"] = result.get("rental_income_positive", 0).fillna( + 0 + ) + -result.get("rental_income_negative", 0).fillna(0) if {"E00600", "E00650"}.issubset(set(puf.columns)): - result["non_qualified_dividend_income"] = ( - puf["E00600"].fillna(0) - puf["E00650"].fillna(0) - ) + result["non_qualified_dividend_income"] = puf["E00600"].fillna(0) - puf[ + "E00650" + ].fillna(0) if {"E26190", "E26180", "E25980", "E25960"}.issubset(set(puf.columns)): s_corp_income = puf["E26190"].fillna(0) - puf["E26180"].fillna(0) partnership_income = puf["E25980"].fillna(0) - puf["E25960"].fillna(0) @@ -827,9 +1210,9 @@ def map_puf_variables( if {"E26390", "E26400"}.issubset(set(puf.columns)): result["estate_income"] = puf["E26390"].fillna(0) - puf["E26400"].fillna(0) if {"E01500", "E01700"}.issubset(set(puf.columns)): - result["tax_exempt_pension_income"] = ( - puf["E01500"].fillna(0) - puf["E01700"].fillna(0) - ) + result["tax_exempt_pension_income"] = puf["E01500"].fillna(0) - puf[ + "E01700" + ].fillna(0) medical_expense_floor = result.get("medical_expense_agi_floor") if medical_expense_floor is not None: for variable, fraction in MEDICAL_EXPENSE_CATEGORY_BREAKDOWNS.items(): @@ -843,8 +1226,14 @@ def map_puf_variables( 4: "HEAD_OF_HOUSEHOLD", 5: "SURVIVING_SPOUSE", } - result["filing_status"] = result["filing_status_code"].map(filing_status_map).fillna("UNKNOWN") - filing_status_code = pd.to_numeric(result["filing_status_code"], errors="coerce").fillna(0).astype(int) + result["filing_status"] = ( + result["filing_status_code"].map(filing_status_map).fillna("UNKNOWN") + ) + filing_status_code = ( + pd.to_numeric(result["filing_status_code"], errors="coerce") + .fillna(0) + .astype(int) + ) result["is_surviving_spouse"] = filing_status_code.eq(5) # Add age from demographics if available @@ -901,7 +1290,12 @@ def map_puf_variables( ) try: predictions = model.fitted_model.predict(X_test=predictor_frame) - except (FileNotFoundError, ImportError, ValueError, subprocess.CalledProcessError): + except ( + FileNotFoundError, + ImportError, + ValueError, + subprocess.CalledProcessError, + ): if require_pre_tax_contribution_model: raise model = None @@ -985,9 +1379,11 @@ def _decode_puf_filer_age( if lower is None: return int(resolved_fallback) if upper is None or upper <= lower: + if age_code == 7: + if rng is not None: + return int(rng.integers(low=lower, high=90, endpoint=False)) + return int(lower + (90 - lower) / 2) return int(lower) - if rng is not None: - return int(rng.integers(low=lower, high=upper, endpoint=False)) return int(lower + (upper - lower) / 2) @@ -1079,7 +1475,9 @@ def _is_puf_numeric_split_column(df: pd.DataFrame, column: str) -> bool: return pd.api.types.is_numeric_dtype(df[column]) -def uprate_puf(df: pd.DataFrame, from_year: int = 2015, to_year: int = 2024) -> pd.DataFrame: +def uprate_puf( + df: pd.DataFrame, from_year: int = 2015, to_year: int = 2024 +) -> pd.DataFrame: """Uprate PUF income variables from one year to another. Uses SOI-based growth factors. @@ -1124,7 +1522,9 @@ def _social_security_age_bucket(ages: pd.Series) -> pd.Series: def _normalize_social_security_split_strategy(strategy: str | None) -> str: - resolved = (strategy or SOCIAL_SECURITY_SPLIT_STRATEGY_GROUPED_SHARE).strip().lower() + resolved = ( + (strategy or SOCIAL_SECURITY_SPLIT_STRATEGY_GROUPED_SHARE).strip().lower() + ) allowed = { SOCIAL_SECURITY_SPLIT_STRATEGY_GROUPED_SHARE, SOCIAL_SECURITY_SPLIT_STRATEGY_PE_QRF, @@ -1145,7 +1545,9 @@ def _build_pe_style_social_security_predictor_frame( if "age" in frame.columns: result["age"] = pd.to_numeric(frame["age"], errors="coerce").astype(float) if "is_male" in frame.columns: - result["is_male"] = pd.to_numeric(frame["is_male"], errors="coerce").astype(float) + result["is_male"] = pd.to_numeric(frame["is_male"], errors="coerce").astype( + float + ) elif "sex" in frame.columns: sex = pd.to_numeric(frame["sex"], errors="coerce") result["is_male"] = pd.Series( @@ -1290,9 +1692,7 @@ def _ensure_policyengine_us_data_repo_on_sys_path( return repo_root = Path(policyengine_us_data_repo).expanduser().resolve() if not repo_root.exists(): - raise ValueError( - f"PolicyEngine US-data repo does not exist: {repo_root}" - ) + raise ValueError(f"PolicyEngine US-data repo does not exist: {repo_root}") repo_root_str = str(repo_root) if repo_root_str not in sys.path: sys.path.insert(0, repo_root_str) @@ -1388,18 +1788,14 @@ def _load_pe_extended_cps_pre_tax_training_frame( h5["employment_income"][sorted(h5["employment_income"].keys())[-1]], dtype=float, ), - "age": np.asarray( - h5["age"][str(int(training_year))], dtype=float - ) + "age": np.asarray(h5["age"][str(int(training_year))], dtype=float) if str(int(training_year)) in h5["age"] else np.asarray( h5["age"][sorted(h5["age"].keys())[-1]], dtype=float, ), "is_male": 1.0 - - np.asarray( - h5["is_female"][str(int(training_year))], dtype=float - ) + - np.asarray(h5["is_female"][str(int(training_year))], dtype=float) if str(int(training_year)) in h5["is_female"] else 1.0 - np.asarray( @@ -1477,7 +1873,9 @@ def _default_pe_style_puf_pre_tax_contribution_model( [*predictors, "household_weight", "pre_tax_contributions"] ) train = cps_df.loc[:, [*predictors, "pre_tax_contributions"]].copy() - train = train.apply(lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0)) + train = train.apply( + lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0) + ) qrf = QRF(log_level="WARNING", memory_efficient=True) fitted_model = qrf.fit( @@ -1497,9 +1895,11 @@ def _strategy_social_security_share_model_loader( strategy: str, ) -> Callable[[int, Path | None], SocialSecurityShareModel]: if strategy == SOCIAL_SECURITY_SPLIT_STRATEGY_PE_QRF: - return lambda year, cache_dir: _default_pe_style_puf_social_security_share_model( - cps_reference_year=year, - cache_dir=cache_dir, + return lambda year, cache_dir: ( + _default_pe_style_puf_social_security_share_model( + cps_reference_year=year, + cache_dir=cache_dir, + ) ) if strategy == SOCIAL_SECURITY_SPLIT_STRATEGY_AGE_HEURISTIC: return lambda year, cache_dir: _age_heuristic_puf_social_security_share_model() @@ -1632,16 +2032,12 @@ def _add_derived_income_columns(df: pd.DataFrame) -> pd.DataFrame: result["interest_income"] = taxable_interest_income result["dividend_income"] = ordinary_dividend_income - result["capital_gains"] = ( - short_term_capital_gains - + long_term_capital_gains - ) + result["capital_gains"] = short_term_capital_gains + long_term_capital_gains result["pension_income"] = taxable_pension_income result["social_security"] = gross_social_security - result["social_security_retirement"] = ( - gross_social_security.where(ages >= MINIMUM_SOCIAL_SECURITY_RETIREMENT_AGE, 0.0) - .astype(float) - ) + result["social_security_retirement"] = gross_social_security.where( + ages >= MINIMUM_SOCIAL_SECURITY_RETIREMENT_AGE, 0.0 + ).astype(float) result["income"] = ( employment_income + self_employment_income @@ -1688,13 +2084,20 @@ def expand_to_persons(df: pd.DataFrame) -> pd.DataFrame: This enables stacking with CPS person-level data. """ records = [] - split_columns = [column for column in df.columns if _is_puf_numeric_split_column(df, column)] + split_columns = [ + column for column in df.columns if _is_puf_numeric_split_column(df, column) + ] pe_rng = np.random.default_rng(PE_PUF_PERSON_EXPANSION_RANDOM_SEED) + pe_age_rng = np.random.default_rng(PE_PUF_PERSON_EXPANSION_RANDOM_SEED + 1) for idx, row in df.iterrows(): filing_status = row.get("filing_status", "SINGLE") - exemptions = int(pd.to_numeric(row.get("exemptions_count", 1), errors="coerce") or 1) - has_pe_demographics = "_puf_agerange" in row.index and not pd.isna(row.get("_puf_agerange")) + exemptions = int( + pd.to_numeric(row.get("exemptions_count", 1), errors="coerce") or 1 + ) + has_pe_demographics = "_puf_agerange" in row.index and not pd.isna( + row.get("_puf_agerange") + ) tax_unit_id = row.get("_puf_recid") if tax_unit_id is None or pd.isna(tax_unit_id): tax_unit_id = idx @@ -1705,12 +2108,15 @@ def expand_to_persons(df: pd.DataFrame) -> pd.DataFrame: head["is_head"] = 1 head["is_spouse"] = 0 head["is_dependent"] = 0 - head["person_id"] = f"{pe_tax_unit_id}:1" if has_pe_demographics else f"{idx}_head" + head["person_id"] = ( + f"{pe_tax_unit_id}:1" if has_pe_demographics else f"{idx}_head" + ) head["tax_unit_id"] = pe_tax_unit_id if has_pe_demographics else str(idx) if has_pe_demographics: head["age"] = _decode_puf_filer_age( row.get("_puf_agerange"), fallback=row.get("age", 40.0), + rng=pe_age_rng, ) if pd.notna(row.get("_puf_gender")): head["is_male"] = float(int(row.get("_puf_gender")) == 1) @@ -1722,7 +2128,9 @@ def expand_to_persons(df: pd.DataFrame) -> pd.DataFrame: spouse["is_head"] = 0 spouse["is_spouse"] = 1 spouse["is_dependent"] = 0 - spouse["person_id"] = f"{pe_tax_unit_id}:2" if has_pe_demographics else f"{idx}_spouse" + spouse["person_id"] = ( + f"{pe_tax_unit_id}:2" if has_pe_demographics else f"{idx}_spouse" + ) spouse["tax_unit_id"] = pe_tax_unit_id if has_pe_demographics else str(idx) spouse["is_surviving_spouse"] = False @@ -1730,6 +2138,7 @@ def expand_to_persons(df: pd.DataFrame) -> pd.DataFrame: spouse["age"] = _decode_puf_filer_age( row.get("_puf_agerange"), fallback=row.get("age", 40.0), + rng=pe_age_rng, ) if pd.notna(row.get("_puf_gender")): spouse["is_male"] = _puf_spouse_is_male( @@ -1737,7 +2146,9 @@ def expand_to_persons(df: pd.DataFrame) -> pd.DataFrame: ) head_share = _puf_joint_head_share(row, rng=pe_rng) for column in split_columns: - amount = float(pd.to_numeric(row.get(column), errors="coerce") or 0.0) + amount = float( + pd.to_numeric(row.get(column), errors="coerce") or 0.0 + ) head[column] = amount * head_share spouse[column] = amount * (1.0 - head_share) else: @@ -1765,7 +2176,9 @@ def expand_to_persons(df: pd.DataFrame) -> pd.DataFrame: records.append(dependent) result = pd.DataFrame(records).reset_index(drop=True) - helper_columns = [column for column in result.columns if column in PUF_DEMOGRAPHIC_HELPER_COLUMNS] + helper_columns = [ + column for column in result.columns if column in PUF_DEMOGRAPHIC_HELPER_COLUMNS + ] if helper_columns: result = result.drop(columns=helper_columns) result = _add_derived_income_columns(result) @@ -1929,7 +2342,10 @@ def _sample_tax_units( .fillna(0.0) .clip(lower=0.0) ) - if candidate_weights.sum() > 0.0 and int((candidate_weights > 0.0).sum()) >= sample_n: + if ( + candidate_weights.sum() > 0.0 + and int((candidate_weights > 0.0).sum()) >= sample_n + ): sample_weights = candidate_weights try: return tax_units.sample( @@ -2005,7 +2421,9 @@ def _build_puf_tax_units( tax_units["tenure"] = 0 tax_units["household_weight"] = tax_units["weight"].astype(float) tax_units = _add_derived_income_columns(tax_units) - is_male = tax_units.get("is_male", pd.Series(np.nan, index=tax_units.index)).fillna(0) + is_male = tax_units.get("is_male", pd.Series(np.nan, index=tax_units.index)).fillna( + 0 + ) tax_units["sex"] = np.where(is_male > 0, 1, np.where(is_male == 0, 2, 0)) tax_units["education"] = 0 return tax_units @@ -2132,7 +2550,9 @@ class PUFSourceProvider: Callable[[int, Path | None], SocialSecurityShareModel] | None ) = None _descriptor_cache: SourceDescriptor | None = None - _social_security_share_model_cache: dict[tuple[int, str], SocialSecurityShareModel] = field( + _social_security_share_model_cache: dict[ + tuple[int, str], SocialSecurityShareModel + ] = field( default_factory=dict, init=False, repr=False, @@ -2311,9 +2731,12 @@ def _load_social_security_share_model( print("\nIncome variable sums:") income_vars = [ - "employment_income", "self_employment_income", - "long_term_capital_gains", "partnership_s_corp_income", - "gross_social_security", "taxable_pension_income", + "employment_income", + "self_employment_income", + "long_term_capital_gains", + "partnership_s_corp_income", + "gross_social_security", + "taxable_pension_income", ] for var in income_vars: if var in df.columns: diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild.py b/src/microplex_us/pipelines/pe_us_data_rebuild.py index 791cb61..0d1f137 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild.py @@ -62,6 +62,7 @@ def to_dict(self) -> dict[str, Any]: "stages": [stage.to_dict() for stage in self.stages], } + def default_policyengine_us_data_rebuild_config( **overrides: Any, ) -> USMicroplexBuildConfig: @@ -81,6 +82,7 @@ def default_policyengine_us_data_rebuild_config( donor_imputer_condition_selection="pe_prespecified", donor_imputer_qrf_zero_threshold=0.05, donor_imputer_excluded_variables=(), + puf_support_clone_enabled=True, prefer_cached_cps_asec_source=False, policyengine_direct_override_variables=( "health_savings_account_ald", diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 2f965d9..87e8d83 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -123,6 +123,132 @@ LOGGER = logging.getLogger(__name__) +PUF_SUPPORT_CLONE_FLAG_COLUMN = "person_is_puf_clone" + +PUF_SUPPORT_CLONE_IMPUTED_VARIABLES: tuple[str, ...] = ( + "employment_income", + "partnership_s_corp_income", + "social_security", + "taxable_pension_income", + "interest_deduction", + "tax_exempt_pension_income", + "long_term_capital_gains", + "unreimbursed_business_employee_expenses", + "pre_tax_contributions", + "taxable_ira_distributions", + "self_employment_income", + "w2_wages_from_qualified_business", + "unadjusted_basis_qualified_property", + "business_is_sstb", + "short_term_capital_gains", + "qualified_dividend_income", + "charitable_cash_donations", + "self_employed_pension_contribution_ald", + "unrecaptured_section_1250_gain", + "taxable_unemployment_compensation", + "taxable_interest_income", + "domestic_production_ald", + "self_employed_health_insurance_ald", + "rental_income", + "non_qualified_dividend_income", + "cdcc_relevant_expenses", + "tax_exempt_interest_income", + "salt_refund_income", + "foreign_tax_credit", + "estate_income", + "charitable_non_cash_donations", + "american_opportunity_credit", + "miscellaneous_income", + "alimony_expense", + "farm_income", + "partnership_se_income", + "alimony_income", + "health_savings_account_ald", + "non_sch_d_capital_gains", + "general_business_credit", + "energy_efficient_home_improvement_credit", + "traditional_ira_contributions", + "amt_foreign_tax_credit", + "excess_withheld_payroll_tax", + "savers_credit", + "student_loan_interest", + "investment_income_elected_form_4952", + "early_withdrawal_penalty", + "prior_year_minimum_tax_credit", + "farm_rent_income", + "qualified_tuition_expenses", + "educator_expense", + "long_term_capital_gains_on_collectibles", + "other_credits", + "casualty_loss", + "unreported_payroll_tax", + "recapture_of_investment_credit", + "deductible_mortgage_interest", + "qualified_reit_and_ptp_income", + "qualified_bdc_income", + "farm_operations_income", + "estate_income_would_be_qualified", + "farm_operations_income_would_be_qualified", + "farm_rent_income_would_be_qualified", + "partnership_s_corp_income_would_be_qualified", + "rental_income_would_be_qualified", + "self_employment_income_would_be_qualified", +) + +PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES: tuple[str, ...] = ( + "partnership_s_corp_income", + "interest_deduction", + "unreimbursed_business_employee_expenses", + "pre_tax_contributions", + "w2_wages_from_qualified_business", + "unadjusted_basis_qualified_property", + "business_is_sstb", + "charitable_cash_donations", + "self_employed_pension_contribution_ald", + "unrecaptured_section_1250_gain", + "taxable_unemployment_compensation", + "domestic_production_ald", + "self_employed_health_insurance_ald", + "cdcc_relevant_expenses", + "salt_refund_income", + "foreign_tax_credit", + "estate_income", + "charitable_non_cash_donations", + "american_opportunity_credit", + "miscellaneous_income", + "alimony_expense", + "health_savings_account_ald", + "non_sch_d_capital_gains", + "general_business_credit", + "energy_efficient_home_improvement_credit", + "amt_foreign_tax_credit", + "excess_withheld_payroll_tax", + "savers_credit", + "student_loan_interest", + "investment_income_elected_form_4952", + "early_withdrawal_penalty", + "prior_year_minimum_tax_credit", + "farm_rent_income", + "qualified_tuition_expenses", + "educator_expense", + "long_term_capital_gains_on_collectibles", + "other_credits", + "casualty_loss", + "unreported_payroll_tax", + "recapture_of_investment_credit", + "deductible_mortgage_interest", + "qualified_reit_and_ptp_income", + "qualified_bdc_income", + "farm_operations_income", + "estate_income_would_be_qualified", + "farm_operations_income_would_be_qualified", + "farm_rent_income_would_be_qualified", + "partnership_s_corp_income_would_be_qualified", + "rental_income_would_be_qualified", +) + +PUF_SUPPORT_CLONE_SPECIAL_VARIABLES: tuple[str, ...] = ("weeks_unemployed",) + @lru_cache(maxsize=1) def _default_block_geography() -> BlockGeography: @@ -1684,6 +1810,19 @@ class USMicroplexBuildConfig: donor_imputer_max_condition_vars: int | None = 8 donor_imputer_excluded_variables: tuple[str, ...] = ("filing_status_code",) donor_imputer_authoritative_override_variables: tuple[str, ...] = () + puf_support_clone_enabled: bool = False + puf_support_clone_source_prefixes: tuple[str, ...] = ("irs_soi_puf",) + puf_support_clone_zero_initial_weight: bool = True + puf_support_clone_flag_column: str = PUF_SUPPORT_CLONE_FLAG_COLUMN + puf_support_clone_prior_weight_share: float = 0.05 + puf_support_clone_overlap_variables: tuple[str, ...] = ( + PUF_SUPPORT_CLONE_IMPUTED_VARIABLES + + PUF_SUPPORT_CLONE_SPECIAL_VARIABLES + + ("wage_income", "dividend_income", "capital_gains") + ) + puf_support_clone_both_halves_override_variables: tuple[str, ...] = ( + PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES + ) dependent_tax_leaf_soft_cap_multiplier: float | None = None dependent_tax_leaf_soft_cap_base_variables: tuple[str, ...] = ( "employment_income", @@ -1798,6 +1937,27 @@ class USMicroplexBuildConfig: forbes_fixed_spine_replicates_per_unit: int = 10 def __post_init__(self) -> None: + if self.puf_support_clone_enabled: + if self.synthesis_backend != "seed": + raise ValueError( + "puf_support_clone_enabled requires synthesis_backend='seed' " + "until post-synthesis clone construction is implemented" + ) + if self.policyengine_selection_household_budget is not None: + raise ValueError( + "puf_support_clone_enabled cannot be combined with " + "policyengine_selection_household_budget until selector " + "clone activation is implemented" + ) + if not self.puf_support_clone_source_prefixes: + raise ValueError( + "puf_support_clone_source_prefixes must not be empty when " + "puf_support_clone_enabled is true" + ) + if not (0.0 <= self.puf_support_clone_prior_weight_share < 1.0): + raise ValueError( + "puf_support_clone_prior_weight_share must be in [0, 1)" + ) if ( self.policyengine_calibration_rescale_to_input_weight_sum and self.policyengine_calibration_rescale_to_target_total_weight @@ -2098,6 +2258,13 @@ def build_from_frames( "donor_conditioning_diagnostics": donor_integration.get( "conditioning_diagnostics", [] ), + "processed_donor_source_order": donor_integration.get( + "processed_donor_source_order", [] + ), + "puf_clone_source_order": donor_integration.get( + "puf_clone_source_order", [] + ), + "puf_support_clone": donor_integration.get("puf_support_clone_summary"), "donor_excluded_variables": list( self.config.donor_imputer_excluded_variables ), @@ -3198,6 +3365,127 @@ def _policyengine_selection_optimizer_kwargs( ) return kwargs + def _puf_clone_household_summary( + self, + tables: PolicyEngineUSEntityTableBundle, + ) -> dict[str, Any]: + flag_column = self.config.puf_support_clone_flag_column + if tables.persons is None or flag_column not in tables.persons.columns: + return { + "available": False, + "clone_household_count": 0, + "mixed_flag_household_count": 0, + } + persons = tables.persons + if "household_id" not in persons.columns: + return { + "available": False, + "reason": "missing_person_household_id", + "clone_household_count": 0, + "mixed_flag_household_count": 0, + } + flags = pd.to_numeric(persons[flag_column], errors="coerce").fillna(0.0) + grouped = flags.groupby(persons["household_id"], sort=False) + flag_min = grouped.min() + flag_max = grouped.max() + clone_household_ids = flag_min.index[(flag_min > 0.5) & (flag_max > 0.5)] + mixed_household_ids = flag_min.index[(flag_min <= 0.5) & (flag_max > 0.5)] + activated_count = 0 + weight_sum = 0.0 + weight_share = 0.0 + if "household_id" in tables.households.columns: + households = tables.households + weights = pd.to_numeric( + households.get("household_weight", 0.0), + errors="coerce", + ).fillna(0.0) + household_weights = pd.Series( + weights.to_numpy(dtype=float), + index=households["household_id"].to_numpy(), + dtype=float, + ) + clone_weights = household_weights.reindex(clone_household_ids).fillna(0.0) + activated_count = int((clone_weights > 0.0).sum()) + weight_sum = float(clone_weights.sum()) + total_weight = float(weights.sum()) + weight_share = float(weight_sum / total_weight) if total_weight else 0.0 + clone_household_id_values = [ + value.item() if hasattr(value, "item") else value + for value in clone_household_ids.to_list() + ] + return { + "available": True, + "flag_column": flag_column, + "clone_household_count": int(len(clone_household_ids)), + "mixed_flag_household_count": int(len(mixed_household_ids)), + "activated_household_count": activated_count, + "household_weight_sum": weight_sum, + "household_weight_share": weight_share, + "clone_household_ids": clone_household_id_values, + } + + def _initialize_puf_clone_calibration_weights( + self, + tables: PolicyEngineUSEntityTableBundle, + ) -> tuple[PolicyEngineUSEntityTableBundle, dict[str, Any]]: + if not self.config.puf_support_clone_enabled: + return tables, {"applied": False} + summary = self._puf_clone_household_summary(tables) + if not summary.get("available"): + return tables, {"applied": False, **summary} + if summary.get("mixed_flag_household_count", 0): + raise ValueError( + "PUF support clone household diagnostics found mixed original/clone " + "person flags within a household" + ) + if self.config.calibration_backend == "none": + return tables, { + "applied": False, + "reason": "calibration_backend_none", + **summary, + } + clone_household_ids = set(summary.get("clone_household_ids", [])) + if not clone_household_ids or "household_id" not in tables.households.columns: + return tables, {"applied": False, **summary} + households = tables.households.copy() + weights = pd.to_numeric( + households["household_weight"], + errors="coerce", + ).fillna(0.0) + clone_mask = households["household_id"].isin(clone_household_ids) + share = float(self.config.puf_support_clone_prior_weight_share) + clone_count = int(clone_mask.sum()) + original_weight_sum = float(weights.loc[~clone_mask].sum()) + clone_prior_total = ( + original_weight_sum * share / (1.0 - share) + if share > 0.0 and original_weight_sum > 0.0 and clone_count + else 0.0 + ) + clone_prior_weight = ( + clone_prior_total / clone_count + if clone_count and clone_prior_total + else 0.0 + ) + if clone_prior_weight > 0.0: + households.loc[clone_mask, "household_weight"] = clone_prior_weight + updated_tables = PolicyEngineUSEntityTableBundle( + households=households, + persons=tables.persons, + tax_units=tables.tax_units, + spm_units=tables.spm_units, + families=tables.families, + marital_units=tables.marital_units, + ) + return updated_tables, { + "applied": bool(clone_prior_weight > 0.0), + "clone_prior_weight_share": share, + "clone_prior_total_weight": clone_prior_total, + "clone_prior_household_weight": clone_prior_weight, + "clone_household_count": clone_count, + "pre_clone_weight_sum": float(weights.loc[clone_mask].sum()), + "pre_clone_original_weight_sum": original_weight_sum, + } + def calibrate_policyengine_tables( self, tables: PolicyEngineUSEntityTableBundle, @@ -3245,6 +3533,9 @@ def calibrate_policyengine_tables( "US microplex build: post-microsim checkpoint saved", path=str(self.config.pipeline_checkpoint_save_post_microsim_path), ) + tables, puf_clone_calibration_initialization = ( + self._initialize_puf_clone_calibration_weights(tables) + ) preselection_supported_targets = list(supported_targets) target_planning_household_count = len(tables.households) if not supported_targets: @@ -3893,6 +4184,13 @@ def _append_stage_summary( "active_solve_capped_mean_abs_relative_error": oracle_loss["active_solve"][ "capped_mean_abs_relative_error" ], + "puf_support_clone": { + "enabled": bool(self.config.puf_support_clone_enabled), + "calibration_initialization": puf_clone_calibration_initialization, + "final_household_diagnostics": self._puf_clone_household_summary( + updated_tables + ), + }, } if selection_summary is not None: summary["selection"] = selection_summary @@ -4938,6 +5236,183 @@ def _household_geography_coverage( state_fips = pd.to_numeric(households["state_fips"], errors="coerce").fillna(0) return int((state_fips > 0).sum()) + def _is_puf_support_clone_source(self, source_name: str) -> bool: + return any( + source_name.startswith(prefix) + for prefix in self.config.puf_support_clone_source_prefixes + ) + + def _is_cps_asec_scaffold_source(self, source_name: str) -> bool: + return source_name.startswith(("cps", "cps_asec")) + + def _ordered_donor_inputs_for_puf_support_clone( + self, + *, + scaffold_input: USMicroplexSourceInput, + donor_inputs: list[USMicroplexSourceInput], + ) -> tuple[list[USMicroplexSourceInput], list[str], list[str]]: + """Return PUF-first donor inputs and clone source order for clone mode.""" + input_order = [donor.frame.source.name for donor in donor_inputs] + if not self.config.puf_support_clone_enabled: + return donor_inputs, input_order, [] + + scaffold_name = scaffold_input.frame.source.name + if self._is_puf_support_clone_source(scaffold_name): + raise ValueError( + "puf_support_clone_enabled requires the PUF source to be a donor, " + f"but selected scaffold source is {scaffold_name!r}" + ) + if not self._is_cps_asec_scaffold_source(scaffold_name): + raise ValueError( + "puf_support_clone_enabled requires a CPS/ASEC-shaped scaffold; " + f"selected scaffold source is {scaffold_name!r}" + ) + + puf_donors = [ + donor + for donor in donor_inputs + if self._is_puf_support_clone_source(donor.frame.source.name) + ] + if not puf_donors: + raise ValueError( + "puf_support_clone_enabled requires exactly one PUF donor source, " + "but none matched puf_support_clone_source_prefixes" + ) + if len(puf_donors) > 1: + raise ValueError( + "puf_support_clone_enabled requires an unambiguous PUF donor source; " + "matched " + ", ".join(donor.frame.source.name for donor in puf_donors) + ) + + non_puf_donors = [ + donor + for donor in donor_inputs + if not self._is_puf_support_clone_source(donor.frame.source.name) + ] + ordered = [*puf_donors, *non_puf_donors] + return ( + ordered, + [donor.frame.source.name for donor in ordered], + [donor.frame.source.name for donor in puf_donors], + ) + + def _prepare_puf_support_clone_frame(self, original: pd.DataFrame) -> pd.DataFrame: + """Create a zero-stored-weight PUF clone frame from CPS support rows.""" + clone = original.copy() + structural_id_columns = {"person_id", *ENTITY_ID_COLUMNS.values()} + for column in sorted(structural_id_columns & set(clone.columns)): + series = clone[column] + if pd.api.types.is_numeric_dtype(series): + numeric = pd.to_numeric(series, errors="coerce") + finite = numeric[np.isfinite(numeric)] + offset = int(finite.max()) + 1 if not finite.empty else len(clone) + clone[column] = numeric.fillna(-1).astype(np.int64) + int(offset) + else: + clone[column] = series.astype(str) + "__puf_clone" + if self.config.puf_support_clone_zero_initial_weight: + for column in clone.columns: + if column == "weight" or "_weight" in column: + clone[column] = 0.0 + clone[self.config.puf_support_clone_flag_column] = 1.0 + return clone + + def _finalize_puf_support_clone_frame( + self, + *, + original: pd.DataFrame, + imputed_clone: pd.DataFrame, + donor_source_name: str, + integrated_variables: list[str], + preclone_columns: set[str], + donor_seed_columns: set[str], + donor_observed: set[str], + ) -> tuple[pd.DataFrame, dict[str, Any]]: + """Concatenate original CPS support and its PUF-imputed support clone.""" + flag_column = self.config.puf_support_clone_flag_column + original = original.copy() + clone = imputed_clone.copy() + original[flag_column] = 0.0 + clone[flag_column] = 1.0 + + integrated_set = set(integrated_variables) + both_halves_override = ( + integrated_set + & set(self.config.puf_support_clone_both_halves_override_variables) + & preclone_columns + ) + for variable in sorted(both_halves_override): + if variable in original.columns and variable in clone.columns: + original[variable] = clone[variable].to_numpy(copy=True) + + generated_entity_id_columns = sorted( + set(ENTITY_ID_COLUMNS.values()) & (set(clone.columns) - preclone_columns) + ) + if generated_entity_id_columns: + clone = clone.drop(columns=generated_entity_id_columns) + + for column in sorted(set(clone.columns) - set(original.columns)): + original[column] = 0.0 + for column in sorted(set(original.columns) - set(clone.columns)): + clone[column] = original[column].to_numpy(copy=True) + original = original.loc[:, clone.columns] + + combined = pd.concat([original, clone], ignore_index=True, sort=False) + combined = combined.reset_index(drop=True) + + overlap_variables = sorted(integrated_set & preclone_columns) + donor_only_variables = sorted(integrated_set - preclone_columns) + ecps_surface = ( + set(PUF_SUPPORT_CLONE_IMPUTED_VARIABLES) + | set(PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES) + | set(PUF_SUPPORT_CLONE_SPECIAL_VARIABLES) + ) + included_surface = sorted(ecps_surface & integrated_set) + excluded_surface: dict[str, str] = {} + for variable in sorted(ecps_surface - set(included_surface)): + if variable not in donor_observed and variable not in donor_seed_columns: + reason = "missing_puf_source_column" + elif variable in self.config.donor_imputer_excluded_variables: + reason = "excluded_by_config" + elif variable not in preclone_columns: + reason = "not_present_before_clone" + else: + reason = "not_selected_for_imputation" + excluded_surface[variable] = reason + + clone_weight_sum = 0.0 + for column in ("household_weight", "hh_weight", "weight"): + if column in clone.columns: + clone_weight_sum = float( + pd.to_numeric(clone[column], errors="coerce").fillna(0.0).sum() + ) + break + + summary = { + "enabled": True, + "donor_source_name": donor_source_name, + "original_row_count": int(len(original)), + "clone_row_count": int(len(clone)), + "final_row_count": int(len(combined)), + "clone_initial_weight_sum": clone_weight_sum, + "integrated_variable_count": int(len(integrated_set)), + "clone_overlap_variable_count": int(len(overlap_variables)), + "clone_donor_only_variable_count": int(len(donor_only_variables)), + "overlap_variables": overlap_variables, + "donor_only_variables": donor_only_variables, + "both_halves_override_variables": sorted(both_halves_override), + "dropped_generated_entity_id_columns": generated_entity_id_columns, + "variable_surface": { + "ecps_imputed_variables": list(PUF_SUPPORT_CLONE_IMPUTED_VARIABLES), + "ecps_overridden_variables": list( + PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES + ), + "ecps_special_variables": list(PUF_SUPPORT_CLONE_SPECIAL_VARIABLES), + "included_variables": included_surface, + "excluded_variables": excluded_surface, + }, + } + return combined, summary + def _integrate_donor_sources( self, seed_data: pd.DataFrame, @@ -4948,6 +5423,13 @@ def _integrate_donor_sources( current = seed_data.copy() integrated_variables: list[str] = [] conditioning_diagnostics: list[dict[str, Any]] = [] + donor_inputs, processed_donor_source_order, puf_clone_source_order = ( + self._ordered_donor_inputs_for_puf_support_clone( + scaffold_input=scaffold_input, + donor_inputs=donor_inputs, + ) + ) + puf_support_clone_summary: dict[str, Any] | None = None scaffold_observed = prune_redundant_variables( scaffold_input.fusion_plan.variables_for(EntityType.HOUSEHOLD) | scaffold_input.fusion_plan.variables_for(EntityType.PERSON) @@ -4975,10 +5457,27 @@ def _integrate_donor_sources( donor_sources=len(donor_inputs), seed_rows=len(current), condition_selection=self.config.donor_imputer_condition_selection, + puf_support_clone_enabled=self.config.puf_support_clone_enabled, ) for donor_input in donor_inputs: donor_source_name = donor_input.frame.source.name + is_puf_support_clone_source = ( + self.config.puf_support_clone_enabled + and self._is_puf_support_clone_source(donor_source_name) + ) + source_original_current: pd.DataFrame | None = None + source_preclone_columns: set[str] = set(current.columns) + source_integrated_variables: list[str] = [] + if is_puf_support_clone_source: + source_original_current = current.copy() + current = self._prepare_puf_support_clone_frame(source_original_current) + _emit_us_pipeline_progress( + "US microplex donor integration: puf support clone prepared", + donor_source=donor_source_name, + original_rows=len(source_original_current), + clone_rows=len(current), + ) _emit_us_pipeline_progress( "US microplex donor integration: source start", donor_source=donor_source_name, @@ -5040,8 +5539,32 @@ def _integrate_donor_sources( and donor_input.frame.source.is_authoritative_for(variable) and self._is_compatible_donor_target(donor_seed[variable]) ) + if is_puf_support_clone_source: + puf_clone_overlap_vars = sorted( + variable + for variable in set(self.config.puf_support_clone_overlap_variables) + if variable not in excluded + and variable not in self.config.donor_imputer_excluded_variables + and variable in scaffold_observed + and variable in donor_observed + and variable in current.columns + and variable in donor_seed.columns + and variable in numeric_current + and variable in numeric_donor + and donor_input.frame.source.is_authoritative_for(variable) + and self._is_compatible_donor_target(donor_seed[variable]) + ) + donor_override_vars = sorted( + set(donor_override_vars) | set(puf_clone_overlap_vars) + ) donor_target_vars = sorted(set(donor_only_vars) | set(donor_override_vars)) if not shared_vars or not donor_target_vars: + if is_puf_support_clone_source: + raise ValueError( + "PUF support clone donor produced no imputation targets; " + f"shared_vars={len(shared_vars)}, " + f"donor_target_vars={len(donor_target_vars)}" + ) _emit_us_pipeline_progress( "US microplex donor integration: source skipped", donor_source=donor_source_name, @@ -5317,6 +5840,7 @@ def _integrate_donor_sources( ) current = result.updated_frame integrated_variables.extend(result.integrated_variables) + source_integrated_variables.extend(result.integrated_variables) _emit_us_pipeline_progress( "US microplex donor integration: block complete", donor_source=donor_source_name, @@ -5466,6 +5990,7 @@ def _integrate_donor_sources( ) current = result.updated_frame integrated_variables.extend(result.integrated_variables) + source_integrated_variables.extend(result.integrated_variables) _emit_us_pipeline_progress( "US microplex donor integration: block complete", donor_source=donor_source_name, @@ -5473,10 +5998,34 @@ def _integrate_donor_sources( integrated_vars=len(result.integrated_variables), ) + if is_puf_support_clone_source: + if source_original_current is None: + raise AssertionError("PUF support clone original frame missing") + current, puf_support_clone_summary = ( + self._finalize_puf_support_clone_frame( + original=source_original_current, + imputed_clone=current, + donor_source_name=donor_source_name, + integrated_variables=source_integrated_variables, + preclone_columns=source_preclone_columns, + donor_seed_columns=set(donor_seed.columns), + donor_observed=donor_observed, + ) + ) + _emit_us_pipeline_progress( + "US microplex donor integration: puf support clone complete", + donor_source=donor_source_name, + rows=len(current), + integrated_vars=len(source_integrated_variables), + ) + return { "seed_data": current, "integrated_variables": sorted(set(integrated_variables)), "conditioning_diagnostics": conditioning_diagnostics, + "processed_donor_source_order": processed_donor_source_order, + "puf_clone_source_order": puf_clone_source_order, + "puf_support_clone_summary": puf_support_clone_summary, } def _apply_dependent_tax_leaf_soft_caps( diff --git a/tests/pipelines/test_pe_us_data_rebuild.py b/tests/pipelines/test_pe_us_data_rebuild.py index 01cf675..85ba167 100644 --- a/tests/pipelines/test_pe_us_data_rebuild.py +++ b/tests/pipelines/test_pe_us_data_rebuild.py @@ -27,7 +27,9 @@ ) -def test_default_policyengine_us_data_rebuild_program_has_expected_core_stages() -> None: +def test_default_policyengine_us_data_rebuild_program_has_expected_core_stages() -> ( + None +): program = default_policyengine_us_data_rebuild_program() assert program.program_id == "pe-us-data-rebuild-v1" @@ -88,6 +90,7 @@ def test_default_policyengine_us_data_rebuild_config_uses_incumbent_defaults() - assert config.donor_imputer_backend == "qrf" assert config.donor_imputer_condition_selection == "pe_prespecified" assert config.donor_imputer_excluded_variables == () + assert config.puf_support_clone_enabled is True assert config.policyengine_direct_override_variables == ( "health_savings_account_ald", "non_sch_d_capital_gains", @@ -97,7 +100,9 @@ def test_default_policyengine_us_data_rebuild_config_uses_incumbent_defaults() - assert config.cps_asec_source_year == 2022 -def test_default_policyengine_us_data_rebuild_config_respects_calibration_support_override() -> None: +def test_default_policyengine_us_data_rebuild_config_respects_calibration_support_override() -> ( + None +): config = default_policyengine_us_data_rebuild_config( policyengine_calibration_min_active_households=5 ) @@ -105,7 +110,9 @@ def test_default_policyengine_us_data_rebuild_config_respects_calibration_suppor assert config.policyengine_calibration_min_active_households == 5 -def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bundle() -> None: +def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bundle() -> ( + None +): providers = default_policyengine_us_data_rebuild_source_providers( cps_source_year=2022, puf_target_year=2024, @@ -138,7 +145,9 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund assert isinstance(providers[5], SCFSourceProvider) -def test_default_policyengine_us_data_rebuild_source_providers_can_disable_donor_surveys() -> None: +def test_default_policyengine_us_data_rebuild_source_providers_can_disable_donor_surveys() -> ( + None +): providers = default_policyengine_us_data_rebuild_source_providers( include_donor_surveys=False, cps_download=False, @@ -149,7 +158,9 @@ def test_default_policyengine_us_data_rebuild_source_providers_can_disable_donor assert isinstance(providers[1], PUFSourceProvider) -def test_default_policyengine_us_data_rebuild_source_providers_can_include_donor_surveys() -> None: +def test_default_policyengine_us_data_rebuild_source_providers_can_include_donor_surveys() -> ( + None +): providers = default_policyengine_us_data_rebuild_source_providers( include_donor_surveys=True, cps_download=False, @@ -166,7 +177,9 @@ def test_default_policyengine_us_data_rebuild_source_providers_can_include_donor assert isinstance(providers[5], SCFSourceProvider) -def test_default_policyengine_us_data_rebuild_source_providers_can_disable_only_acs() -> None: +def test_default_policyengine_us_data_rebuild_source_providers_can_disable_only_acs() -> ( + None +): providers = default_policyengine_us_data_rebuild_source_providers( include_donor_surveys=True, include_acs=False, @@ -184,7 +197,9 @@ def test_default_policyengine_us_data_rebuild_source_providers_can_disable_only_ assert not any(isinstance(provider, ACSSourceProvider) for provider in providers) -def test_build_policyengine_us_data_rebuild_pipeline_returns_configured_pipeline() -> None: +def test_build_policyengine_us_data_rebuild_pipeline_returns_configured_pipeline() -> ( + None +): pipeline = build_policyengine_us_data_rebuild_pipeline( random_seed=321, calibration_max_iter=77, diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 7caac8e..cca02c0 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -325,6 +325,113 @@ def test_can_opt_into_authoritative_donor_overrides(self): "rental_income", ) + def test_puf_support_clone_requires_seed_backend_and_no_household_selection(self): + with pytest.raises(ValueError, match="synthesis_backend='seed'"): + USMicroplexBuildConfig(puf_support_clone_enabled=True) + + with pytest.raises(ValueError, match="policyengine_selection_household_budget"): + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + policyengine_selection_household_budget=10, + ) + + def test_initialize_puf_support_clone_calibration_weights_reserves_clone_share( + self, + ): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + puf_support_clone_prior_weight_share=0.05, + ) + ) + tables = PolicyEngineUSEntityTableBundle( + households=pd.DataFrame( + { + "household_id": ["h1", "h2", "h1__puf_clone", "h2__puf_clone"], + "household_weight": [100.0, 200.0, 0.0, 0.0], + } + ), + persons=pd.DataFrame( + { + "person_id": [1, 2, 3, 4], + "household_id": [ + "h1", + "h2", + "h1__puf_clone", + "h2__puf_clone", + ], + "person_is_puf_clone": [0.0, 0.0, 1.0, 1.0], + "weight": [100.0, 200.0, 0.0, 0.0], + } + ), + tax_units=pd.DataFrame(), + spm_units=pd.DataFrame(), + families=pd.DataFrame(), + marital_units=pd.DataFrame(), + ) + + updated_tables, summary = pipeline._initialize_puf_clone_calibration_weights( + tables + ) + + assert summary["applied"] is True + assert summary["clone_household_count"] == 2 + assert summary["clone_prior_weight_share"] == pytest.approx(0.05) + assert summary["pre_clone_weight_sum"] == 0.0 + assert summary["pre_clone_original_weight_sum"] == pytest.approx(300.0) + assert summary["clone_prior_total_weight"] == pytest.approx(300.0 * 0.05 / 0.95) + assert summary["clone_prior_household_weight"] == pytest.approx( + 300.0 * 0.05 / 0.95 / 2 + ) + assert updated_tables.households["household_weight"].tolist() == [ + pytest.approx(100.0), + pytest.approx(200.0), + pytest.approx(300.0 * 0.05 / 0.95 / 2), + pytest.approx(300.0 * 0.05 / 0.95 / 2), + ] + assert updated_tables.persons["weight"].tolist() == [100.0, 200.0, 0.0, 0.0] + + def test_initialize_puf_support_clone_calibration_weights_skips_no_calibration( + self, + ): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + calibration_backend="none", + puf_support_clone_enabled=True, + ) + ) + tables = PolicyEngineUSEntityTableBundle( + households=pd.DataFrame( + { + "household_id": [1, 2], + "household_weight": [100.0, 0.0], + } + ), + persons=pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [1, 2], + "person_is_puf_clone": [0.0, 1.0], + "weight": [100.0, 0.0], + } + ), + tax_units=pd.DataFrame(), + spm_units=pd.DataFrame(), + families=pd.DataFrame(), + marital_units=pd.DataFrame(), + ) + + updated_tables, summary = pipeline._initialize_puf_clone_calibration_weights( + tables + ) + + assert summary["applied"] is False + assert summary["reason"] == "calibration_backend_none" + assert updated_tables.households["household_weight"].tolist() == [100.0, 0.0] + def test_rejects_conflicting_policyengine_weight_rescale_modes(self): with pytest.raises(ValueError, match="mutually exclusive"): USMicroplexBuildConfig( @@ -3201,6 +3308,261 @@ def generate(self, frame, seed=None): 500.0, ] + def test_integrate_donor_sources_appends_puf_support_clone_before_later_donors( + self, monkeypatch + ): + generated_lengths: list[tuple[tuple[str, ...], int]] = [] + + class FakeSynthesizer: + def __init__(self, *, target_vars, condition_vars, **kwargs): + _ = condition_vars, kwargs + self.target_vars = tuple(target_vars) + + def fit(self, *args, **kwargs): + _ = args, kwargs + + def generate(self, frame, seed=None): + _ = seed + generated_lengths.append((self.target_vars, len(frame))) + result = frame.copy() + for target in self.target_vars: + result[target] = np.linspace(1.0, float(len(result)), len(result)) + return result + + monkeypatch.setattr("microplex_us.pipelines.us.Synthesizer", FakeSynthesizer) + + cps_households = pd.DataFrame( + { + "household_id": [1, 2], + "hh_weight": [100.0, 200.0], + "state_fips": [6, 36], + "tenure": [1, 2], + } + ) + cps_persons = pd.DataFrame( + { + "person_id": [10, 20], + "household_id": [1, 2], + "age": [45, 62], + "sex": [1, 2], + "education": [3, 4], + "employment_status": [1, 0], + "income": [60_000.0, 12_000.0], + "self_employment_income": [75.0, 50.0], + "taxpayer_id_type": [1, 2], + } + ) + puf_households = pd.DataFrame( + { + "household_id": [101, 102], + "hh_weight": [80.0, 90.0], + "state_fips": [6, 36], + "tenure": [1, 2], + } + ) + puf_persons = pd.DataFrame( + { + "person_id": [1001, 1002], + "household_id": [101, 102], + "age": [44, 61], + "sex": [1, 2], + "education": [3, 4], + "employment_status": [1, 0], + "income": [58_000.0, 13_000.0], + "self_employment_income": [-250.0, 500.0], + "taxable_interest_income": [10.0, 20.0], + "state_income_tax_paid": [400.0, 50.0], + } + ) + sipp_households = pd.DataFrame( + { + "household_id": [201, 202], + "hh_weight": [70.0, 75.0], + "state_fips": [6, 36], + "tenure": [1, 2], + } + ) + sipp_persons = pd.DataFrame( + { + "person_id": [2001, 2002], + "household_id": [201, 202], + "age": [45, 62], + "sex": [1, 2], + "education": [3, 4], + "employment_status": [1, 0], + "income": [59_000.0, 14_000.0], + "ssi_reported": [0.0, 100.0], + } + ) + + def frame_for(name, households, persons, capabilities): + return ObservationFrame( + source=SourceDescriptor( + name=name, + shareability=Shareability.PUBLIC + if name.startswith("cps") + else Shareability.RESTRICTED, + time_structure=TimeStructure.REPEATED_CROSS_SECTION, + observations=( + EntityObservation( + entity=EntityType.HOUSEHOLD, + key_column="household_id", + variable_names=("state_fips", "tenure"), + weight_column="hh_weight", + ), + EntityObservation( + entity=EntityType.PERSON, + key_column="person_id", + variable_names=tuple( + column + for column in persons.columns + if column != "person_id" + ), + ), + ), + variable_capabilities={ + variable: SourceVariableCapability( + authoritative=True, + usable_as_condition=True, + ) + for variable in capabilities + }, + ), + tables={ + EntityType.HOUSEHOLD: households, + EntityType.PERSON: persons, + }, + relationships=( + EntityRelationship( + parent_entity=EntityType.HOUSEHOLD, + child_entity=EntityType.PERSON, + parent_key="household_id", + child_key="household_id", + cardinality=RelationshipCardinality.ONE_TO_MANY, + ), + ), + ) + + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + n_synthetic=4, + synthesis_backend="seed", + puf_support_clone_enabled=True, + puf_support_clone_overlap_variables=("self_employment_income",), + puf_support_clone_both_halves_override_variables=(), + ) + ) + cps_input = pipeline.prepare_source_input( + frame_for( + "cps_asec_test", cps_households, cps_persons, ("taxpayer_id_type",) + ) + ) + puf_input = pipeline.prepare_source_input( + frame_for( + "irs_soi_puf_2024", + puf_households, + puf_persons, + ( + "self_employment_income", + "taxable_interest_income", + "state_income_tax_paid", + ), + ) + ) + sipp_input = pipeline.prepare_source_input( + frame_for("sipp_2023", sipp_households, sipp_persons, ("ssi_reported",)) + ) + seed_data = pipeline.prepare_seed_data_from_source(cps_input) + + integration = pipeline._integrate_donor_sources( + seed_data, + scaffold_input=cps_input, + donor_inputs=[sipp_input, puf_input], + ) + result = integration["seed_data"] + + assert integration["processed_donor_source_order"] == [ + "irs_soi_puf_2024", + "sipp_2023", + ] + assert integration["puf_clone_source_order"] == ["irs_soi_puf_2024"] + assert result["person_is_puf_clone"].tolist() == [0.0, 0.0, 1.0, 1.0] + assert result["hh_weight"].tolist() == [100.0, 200.0, 0.0, 0.0] + assert result["self_employment_income"].iloc[:2].tolist() == [75.0, 50.0] + assert result["self_employment_income"].iloc[2:].tolist() == [-250.0, 500.0] + assert result["taxpayer_id_type"].tolist() == [1, 2, 1, 2] + assert result["taxable_interest_income"].iloc[:2].tolist() == [0.0, 0.0] + assert result["taxable_interest_income"].iloc[2:].tolist() == [10.0, 20.0] + assert "state_income_tax_paid" in result.columns + assert "tax_unit_id" not in result.columns + assert integration["puf_support_clone_summary"][ + "dropped_generated_entity_id_columns" + ] == ["tax_unit_id"] + assert result.index.tolist() == [0, 1, 2, 3] + assert generated_lengths[-1] == (("ssi_reported",), 4) + assert "ssi_reported" in result.columns + + def test_integrate_donor_sources_puf_support_clone_validates_scaffold_and_donor( + self, + ): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + ) + ) + frame = ObservationFrame( + source=SourceDescriptor( + name="cps_asec_test", + shareability=Shareability.PUBLIC, + time_structure=TimeStructure.REPEATED_CROSS_SECTION, + observations=( + EntityObservation( + entity=EntityType.HOUSEHOLD, + key_column="household_id", + variable_names=("state_fips",), + weight_column="hh_weight", + ), + EntityObservation( + entity=EntityType.PERSON, + key_column="person_id", + variable_names=("household_id", "age", "income"), + ), + ), + ), + tables={ + EntityType.HOUSEHOLD: pd.DataFrame( + {"household_id": [1], "hh_weight": [1.0], "state_fips": [6]} + ), + EntityType.PERSON: pd.DataFrame( + { + "person_id": [1], + "household_id": [1], + "age": [40], + "income": [1.0], + } + ), + }, + relationships=( + EntityRelationship( + parent_entity=EntityType.HOUSEHOLD, + child_entity=EntityType.PERSON, + parent_key="household_id", + child_key="household_id", + cardinality=RelationshipCardinality.ONE_TO_MANY, + ), + ), + ) + cps_input = pipeline.prepare_source_input(frame) + seed_data = pipeline.prepare_seed_data_from_source(cps_input) + + with pytest.raises(ValueError, match="requires exactly one PUF donor"): + pipeline._integrate_donor_sources( + seed_data, + scaffold_input=cps_input, + donor_inputs=[], + ) + def test_integrate_donor_sources_zeroes_minor_employment_income_after_authoritative_override( self, monkeypatch ): @@ -3763,7 +4125,11 @@ def test_export_policyengine_dataset(self, persons, households, tmp_path): with h5py.File(output_path, "r") as handle: assert "county_fips" in handle exported_counties = handle["county_fips"]["2024"][()] - assert set(np.asarray(exported_counties).tolist()) == {6037, 36061, 48201} + normalized_counties = { + str(value.decode() if isinstance(value, bytes) else value).zfill(5) + for value in np.asarray(exported_counties).tolist() + } + assert normalized_counties == {"06037", "36061", "48201"} def test_export_policyengine_dataset_passes_direct_overrides( self, diff --git a/tests/test_puf_source_provider.py b/tests/test_puf_source_provider.py index b4d975b..f29bf05 100644 --- a/tests/test_puf_source_provider.py +++ b/tests/test_puf_source_provider.py @@ -89,6 +89,175 @@ def fit(self, *, X_train, predictors, imputed_variables, n_jobs): return calls +def test_load_puf_raw_disaggregates_aggregate_records(tmp_path): + regular_rows = [ + { + "RECID": recid, + "MARS": 2 if recid % 2 == 0 else 1, + "XTOT": 2 if recid % 2 == 0 else 1, + "DSI": 0, + "EIC": 0, + "S006": 100, + "E00100": 100_000_000 + recid * 1_000_000, + "E00200": 2_000_000 + recid * 10_000, + "P23250": 60_000_000 + recid * 100_000, + } + for recid in range(1, 25) + ] + aggregate = { + "RECID": 999999, + "MARS": 0, + "XTOT": 0, + "DSI": 0, + "EIC": 0, + "S006": 50_000, + "E00100": 300_000_000, + "E00200": 10_000_000, + "P23250": 250_000_000, + } + puf_path = tmp_path / "puf.csv" + pd.DataFrame([*regular_rows, aggregate]).to_csv(puf_path, index=False) + + result = puf_module.load_puf_raw(puf_path) + synthetic = result[result["RECID"] >= puf_module.PUF_SYNTHETIC_RECID_START] + synthetic_weights = synthetic["S006"] / 100 + aggregate_weight = aggregate["S006"] / 100 + + assert 999999 not in set(result["RECID"]) + assert not result["MARS"].eq(0).any() + assert len(synthetic) == 40 + assert synthetic_weights.sum() == pytest.approx(aggregate_weight) + for column in ("E00100", "E00200", "P23250"): + assert (synthetic[column] * synthetic_weights).sum() == pytest.approx( + aggregate[column] * aggregate_weight + ) + + +def test_load_puf_raw_disaggregates_small_aggregate_records_with_positive_weights( + tmp_path, +): + regular_rows = [ + { + "RECID": recid, + "MARS": 2 if recid % 2 == 0 else 1, + "XTOT": 2 if recid % 2 == 0 else 1, + "DSI": 0, + "EIC": 0, + "S006": 100, + "E00100": 100_000_000 + recid * 1_000_000, + "E00200": 2_000_000 + recid * 10_000, + "P23250": 60_000_000 + recid * 100_000, + } + for recid in range(1, 25) + ] + aggregate = { + "RECID": 999999, + "MARS": 0, + "XTOT": 0, + "DSI": 0, + "EIC": 0, + "S006": 3_000, + "E00100": 300_000_000, + "E00200": 10_000_000, + "P23250": 250_000_000, + } + puf_path = tmp_path / "puf.csv" + pd.DataFrame([*regular_rows, aggregate]).to_csv(puf_path, index=False) + + result = puf_module.load_puf_raw(puf_path) + synthetic = result[result["RECID"] >= puf_module.PUF_SYNTHETIC_RECID_START] + synthetic_weights = synthetic["S006"] / 100 + aggregate_weight = aggregate["S006"] / 100 + + assert len(synthetic) == 20 + assert synthetic_weights.min() > 0 + assert synthetic_weights.sum() == pytest.approx(aggregate_weight) + for column in ("E00100", "E00200", "P23250"): + assert (synthetic[column] * synthetic_weights).sum() == pytest.approx( + aggregate[column] * aggregate_weight + ) + + +def test_load_puf_raw_disaggregates_all_aggregate_records_preserving_top_tail_totals( + tmp_path, +): + columns_to_preserve = ( + "E00100", + "P23250", + "P22250", + "E00600", + "E00650", + "E00300", + "E00400", + ) + bucket_agi = { + 999996: -500_000, + 999997: 2_000_000, + 999998: 25_000_000, + 999999: 250_000_000, + } + regular_rows = [] + recid = 1 + for bucket_recid, agi in bucket_agi.items(): + for offset in range(30): + regular_rows.append( + { + "RECID": recid, + "MARS": 2 if offset % 3 == 0 else 1, + "XTOT": 2 if offset % 3 == 0 else 1, + "DSI": 0, + "EIC": 0, + "S006": 100, + "E00100": agi + offset * max(abs(agi) * 0.01, 1_000), + "P23250": abs(agi) * 0.30 + offset * 10_000, + "P22250": abs(agi) * 0.03 + offset * 1_000, + "E00600": abs(agi) * 0.05 + offset * 500, + "E00650": abs(agi) * 0.03 + offset * 300, + "E00300": abs(agi) * 0.01 + offset * 100, + "E00400": abs(agi) * 0.005 + offset * 50, + } + ) + recid += 1 + + aggregate_rows = [] + for index, (bucket_recid, agi) in enumerate(bucket_agi.items(), start=1): + aggregate_rows.append( + { + "RECID": bucket_recid, + "MARS": 0, + "XTOT": 0, + "DSI": 0, + "EIC": 0, + "S006": 20_000 + index * 100, + "E00100": agi, + "P23250": abs(agi) * 0.40, + "P22250": abs(agi) * 0.04, + "E00600": abs(agi) * 0.08, + "E00650": abs(agi) * 0.05, + "E00300": abs(agi) * 0.015, + "E00400": abs(agi) * 0.008, + } + ) + + puf_path = tmp_path / "puf.csv" + source = pd.DataFrame([*regular_rows, *aggregate_rows]) + source.to_csv(puf_path, index=False) + + result = puf_module.load_puf_raw(puf_path) + synthetic = result[result["RECID"] >= puf_module.PUF_SYNTHETIC_RECID_START] + synthetic_weights = synthetic["S006"] / 100 + aggregate = pd.DataFrame(aggregate_rows) + aggregate_weights = aggregate["S006"] / 100 + + assert not set(puf_module.PUF_AGGREGATE_RECIDS) & set(result["RECID"]) + assert len(synthetic) >= 80 + assert synthetic_weights.sum() == pytest.approx(aggregate_weights.sum()) + for column in columns_to_preserve: + expected = (aggregate[column] * aggregate_weights).sum() + observed = (synthetic[column] * synthetic_weights).sum() + assert observed == pytest.approx(expected) + + def _write_minimal_soi_csv(path): def row(variable, year, is_count, value): return { @@ -196,7 +365,9 @@ def test_expand_to_persons_derives_retirement_social_security_for_older_records( } ) - persons = expand_to_persons(tax_units).sort_values("household_id").reset_index(drop=True) + persons = ( + expand_to_persons(tax_units).sort_values("household_id").reset_index(drop=True) + ) assert persons["social_security"].tolist() == [40.0, 25.0] assert persons["social_security_retirement"].tolist() == [40.0, 0.0] @@ -398,6 +569,7 @@ def test_uprate_mapped_puf_with_pe_factors_uses_aliases_and_recomputes(tmp_path) assert result["tax_exempt_pension_income"].tolist() == pytest.approx([5.4]) assert result["total_pension_income"].tolist() == pytest.approx([17.3]) + def test_puf_source_provider_pe_soi_mode_uses_raw_uprating(tmp_path): repo_root = tmp_path / "pe-us-data" storage = repo_root / "policyengine_us_data" / "storage" @@ -482,12 +654,22 @@ def test_expand_to_persons_uses_pe_demographic_helpers_when_present(): } ) - persons = expand_to_persons(tax_units).sort_values("person_id").reset_index(drop=True) - persons_repeat = expand_to_persons(tax_units).sort_values("person_id").reset_index(drop=True) + persons = ( + expand_to_persons(tax_units).sort_values("person_id").reset_index(drop=True) + ) + persons_repeat = ( + expand_to_persons(tax_units).sort_values("person_id").reset_index(drop=True) + ) pd.testing.assert_frame_equal(persons, persons_repeat) - assert persons["person_id"].tolist() == ["101:1", "101:2", "202:1", "202:3", "202:4"] + assert persons["person_id"].tolist() == [ + "101:1", + "101:2", + "202:1", + "202:3", + "202:4", + ] assert persons["tax_unit_id"].tolist() == ["101", "101", "202", "202", "202"] head = persons.loc[persons["person_id"] == "101:1"].iloc[0] @@ -515,6 +697,34 @@ def test_expand_to_persons_uses_pe_demographic_helpers_when_present(): assert dependent_2["is_male"] == 0.0 +def test_expand_to_persons_spreads_open_ended_puf_filer_age_band(): + tax_units = pd.DataFrame( + { + "filing_status": ["SINGLE"] * 10, + "weight": [1.0] * 10, + "household_id": [f"household-{i}" for i in range(10)], + "exemptions_count": [1] * 10, + "_puf_recid": list(range(1_001, 1_011)), + "_puf_agerange": [7] * 10, + "year": [2024] * 10, + } + ) + + persons = ( + expand_to_persons(tax_units).sort_values("person_id").reset_index(drop=True) + ) + persons_repeat = ( + expand_to_persons(tax_units).sort_values("person_id").reset_index(drop=True) + ) + + pd.testing.assert_frame_equal(persons, persons_repeat) + ages = persons["age"].tolist() + assert min(ages) >= 80 + assert max(ages) < 90 + assert len(set(ages)) > 1 + assert ages.count(80) < len(ages) + + def test_expand_to_persons_clears_status_flags_for_non_head_members(): tax_units = pd.DataFrame( { @@ -531,7 +741,9 @@ def test_expand_to_persons_clears_status_flags_for_non_head_members(): } ) - persons = expand_to_persons(tax_units).sort_values("person_id").reset_index(drop=True) + persons = ( + expand_to_persons(tax_units).sort_values("person_id").reset_index(drop=True) + ) assert persons["is_surviving_spouse"].tolist() == [True, False, False] @@ -644,7 +856,9 @@ def test_puf_source_provider_marks_placeholder_and_derived_variables_in_capabili assert descriptor.allows_conditioning_on("age") -def test_puf_source_provider_does_not_duplicate_joint_tax_unit_financial_income(tmp_path): +def test_puf_source_provider_does_not_duplicate_joint_tax_unit_financial_income( + tmp_path, +): puf = pd.DataFrame( { "RECID": [101], @@ -930,9 +1144,15 @@ def _run(args, *, check, cwd, env): with out_path.open("wb") as handle: pickle.dump(pd.DataFrame({"pre_tax_contributions": [4321.0]}), handle) - monkeypatch.setattr(puf_module, "resolve_policyengine_us_data_repo_root", _resolve_repo) - monkeypatch.setattr(puf_module, "resolve_policyengine_us_data_python", _resolve_python) - monkeypatch.setattr(puf_module, "build_policyengine_us_data_subprocess_env", _build_env) + monkeypatch.setattr( + puf_module, "resolve_policyengine_us_data_repo_root", _resolve_repo + ) + monkeypatch.setattr( + puf_module, "resolve_policyengine_us_data_python", _resolve_python + ) + monkeypatch.setattr( + puf_module, "build_policyengine_us_data_subprocess_env", _build_env + ) monkeypatch.setattr(puf_module.subprocess, "run", _run) monkeypatch.setattr( puf_module, @@ -1042,14 +1262,18 @@ def test_impute_missing_puf_demographics_uses_qrf_predictions(monkeypatch): assert imputed.loc[101, "GENDER"] == 2 -def test_download_puf_prefers_existing_local_files_without_hub_lookup(tmp_path, monkeypatch): +def test_download_puf_prefers_existing_local_files_without_hub_lookup( + tmp_path, monkeypatch +): puf_path = tmp_path / "puf_2015.csv" demographics_path = tmp_path / "demographics_2015.csv" puf_path.write_text("RECID,MARS\n1,1\n") demographics_path.write_text("RECID\n1\n") def fail_download(*args, **kwargs): - raise AssertionError("hf_hub_download should not be called when local files exist") + raise AssertionError( + "hf_hub_download should not be called when local files exist" + ) monkeypatch.setattr(puf_module, "hf_hub_download", fail_download, raising=False) @@ -1083,7 +1307,9 @@ def test_puf_source_provider_prefers_policyengine_repo_local_raw_files( ) def fail_loader(*args, **kwargs): - raise AssertionError("remote/cache loader should not run when repo-local PUF exists") + raise AssertionError( + "remote/cache loader should not run when repo-local PUF exists" + ) provider = PUFSourceProvider( target_year=2015, @@ -1148,8 +1374,12 @@ def test_puf_source_provider_age_imputation_is_reproducible_with_same_seed(tmp_p first = provider.load_frame(query) second = provider.load_frame(query) - first_persons = first.tables[EntityType.PERSON].sort_values("person_id").reset_index(drop=True) - second_persons = second.tables[EntityType.PERSON].sort_values("person_id").reset_index(drop=True) + first_persons = ( + first.tables[EntityType.PERSON].sort_values("person_id").reset_index(drop=True) + ) + second_persons = ( + second.tables[EntityType.PERSON].sort_values("person_id").reset_index(drop=True) + ) assert first_persons["age"].tolist() == second_persons["age"].tolist() @@ -1168,9 +1398,7 @@ def test_puf_sampling_falls_back_to_uniform_when_weighted_sampling_is_infeasible def flaky_sample(self, *args, **kwargs): if kwargs.get("weights") is not None: - raise ValueError( - "Weighted sampling cannot be achieved with replace=False." - ) + raise ValueError("Weighted sampling cannot be achieved with replace=False.") return original_sample(self, *args, **kwargs) monkeypatch.setattr(pd.DataFrame, "sample", flaky_sample)