From 03cd047c6f59bcc3d66bf64c2849d2c314b51a26 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 1 Jun 2026 11:06:06 -0400 Subject: [PATCH] Add pre-calibration eCPS export schema checks --- src/microplex_us/__init__.py | 2 + .../pipelines/check_export_columns.py | 61 ++- .../pe_us_data_rebuild_checkpoint.py | 27 +- src/microplex_us/pipelines/us.py | 80 +++- src/microplex_us/policyengine/__init__.py | 2 + src/microplex_us/policyengine/us.py | 350 +++++++++++++++++- tests/pipelines/test_check_export_columns.py | 26 ++ tests/policyengine/test_us.py | 166 ++++++++- 8 files changed, 680 insertions(+), 34 deletions(-) diff --git a/src/microplex_us/__init__.py b/src/microplex_us/__init__.py index 52ccee6..ada081a 100644 --- a/src/microplex_us/__init__.py +++ b/src/microplex_us/__init__.py @@ -156,6 +156,7 @@ "PolicyEngineUSTargetEvaluation", "PolicyEngineUSTargetEvaluationReport", "PolicyEngineUSVariableBinding", + "build_policyengine_us_export_column_names", "build_policyengine_us_time_period_arrays", "compare_policyengine_us_target_query_to_baseline", "compile_policyengine_us_household_linear_constraints", @@ -346,6 +347,7 @@ def __getattr__(name: str) -> Any: "PolicyEngineUSTargetEvaluation", "PolicyEngineUSTargetEvaluationReport", "PolicyEngineUSVariableBinding", + "build_policyengine_us_export_column_names", "build_policyengine_us_time_period_arrays", "compare_policyengine_us_target_query_to_baseline", "compile_policyengine_us_household_linear_constraints", diff --git a/src/microplex_us/pipelines/check_export_columns.py b/src/microplex_us/pipelines/check_export_columns.py index aa8cf64..cbc6534 100644 --- a/src/microplex_us/pipelines/check_export_columns.py +++ b/src/microplex_us/pipelines/check_export_columns.py @@ -30,6 +30,8 @@ python -m microplex_us.pipelines.check_export_columns export.h5 python -m microplex_us.pipelines.check_export_columns \\ --columns-json columns.json + python -m microplex_us.pipelines.check_export_columns \\ + --entity-tables checkpoints/post-imputation python -m microplex_us.pipelines.check_export_columns export.h5 \\ --contract custom_contract.json @@ -149,6 +151,29 @@ def _columns_from_json(json_path: Path) -> set[str]: return {str(name).split("/")[0] for name in names} +def _columns_from_entity_tables( + entity_tables_path: Path, + *, + direct_override_variables: tuple[str, ...] = (), +) -> set[str]: + """Return export column names from a saved PE entity-table checkpoint. + + This is the pre-calibration path: post-imputation entity tables already + determine the final H5 schema, while calibration only changes weights. + Imports stay deferred so the JSON/H5 fast paths do not import Microplex. + """ + from microplex_us.policyengine.us import ( + build_policyengine_us_export_column_names, + load_us_pipeline_checkpoint, + ) + + tables, _metadata = load_us_pipeline_checkpoint(entity_tables_path) + return build_policyengine_us_export_column_names( + tables, + direct_override_variables=direct_override_variables, + ) + + def _bullet_lines(items: list[str]) -> list[str]: """Render a list as indented bullets, or a placeholder if empty.""" if not items: @@ -206,6 +231,25 @@ def main(argv: list[str] | None = None) -> int: "H5 (the no-data CI path). Mutually exclusive with h5path." ), ) + parser.add_argument( + "--entity-tables", + metavar="DIR", + help=( + "Path to a saved PolicyEngine entity-table checkpoint/stage " + "directory (for example checkpoints/post-imputation). Checks " + "the export schema before microsimulation/calibration/H5." + ), + ) + parser.add_argument( + "--direct-override-variable", + action="append", + default=[], + metavar="VARIABLE", + help=( + "PolicyEngine formula variable intentionally exported from source " + "data. Repeat for each override used by the build." + ), + ) parser.add_argument( "--contract", metavar="FILE", @@ -214,8 +258,15 @@ def main(argv: list[str] | None = None) -> int: ) args = parser.parse_args(argv) - if bool(args.h5path) == bool(args.columns_json): - parser.error("provide exactly one of an H5 path or --columns-json.") + selected_inputs = [ + bool(args.h5path), + bool(args.columns_json), + bool(args.entity_tables), + ] + if sum(selected_inputs) != 1: + parser.error( + "provide exactly one of an H5 path, --columns-json, or --entity-tables." + ) contract = load_contract(Path(args.contract)) required = set(contract["required"]) @@ -226,6 +277,12 @@ def main(argv: list[str] | None = None) -> int: if args.columns_json: source = args.columns_json present = _columns_from_json(Path(args.columns_json)) + elif args.entity_tables: + source = args.entity_tables + present = _columns_from_entity_tables( + Path(args.entity_tables), + direct_override_variables=tuple(args.direct_override_variable), + ) else: source = args.h5path present = _columns_from_h5(Path(args.h5path)) diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py index 1b5e437..f8a345b 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py @@ -1111,7 +1111,9 @@ def _attach_checkpoint_registry_and_index( run_registry_metadata: dict[str, Any] | None, ) -> tuple[Path | None, Path | None]: if ( - manifest.get("calibration", {}).get("full_oracle_capped_mean_abs_relative_error") + manifest.get("calibration", {}).get( + "full_oracle_capped_mean_abs_relative_error" + ) is None and manifest.get("calibration", {}).get("full_oracle_mean_abs_relative_error") is None @@ -1119,7 +1121,10 @@ def _attach_checkpoint_registry_and_index( and "policyengine_native_scores" not in manifest ): return None, None - if "policyengine_harness" not in manifest and "policyengine_native_scores" not in manifest: + if ( + "policyengine_harness" not in manifest + and "policyengine_native_scores" not in manifest + ): resolved_harness_payload = None else: resolved_harness_payload = ( @@ -1637,7 +1642,9 @@ def attach_policyengine_us_data_rebuild_checkpoint_evidence( _refresh_checkpoint_data_flow_snapshot( artifact_root, manifest, - extra_outputs=(native_audit_path.name,) if native_audit_path is not None else (), + extra_outputs=(native_audit_path.name,) + if native_audit_path is not None + else (), ) _write_json_atomically(manifest_path, manifest) return PEUSDataRebuildCheckpointEvidenceResult( @@ -2176,6 +2183,16 @@ def main(argv: list[str] | None = None) -> None: "to skip the ~11 h synthesis stage." ), ) + parser.add_argument( + "--policyengine-export-column-contract-path", + type=str, + default=None, + help=( + "If set, check the eCPS export-column contract from the " + "post-imputation PE entity tables before microsimulation and " + "calibration." + ), + ) parser.add_argument( "--pipeline-checkpoint-save-post-microsim-path", type=str, @@ -2223,6 +2240,10 @@ def main(argv: list[str] | None = None) -> None: config_overrides["pipeline_checkpoint_save_post_imputation_path"] = ( args.pipeline_checkpoint_save_post_imputation_path ) + if args.policyengine_export_column_contract_path is not None: + config_overrides["policyengine_export_column_contract_path"] = ( + args.policyengine_export_column_contract_path + ) if args.pipeline_checkpoint_save_post_microsim_path is not None: config_overrides["pipeline_checkpoint_save_post_microsim_path"] = ( args.pipeline_checkpoint_save_post_microsim_path diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index bc8c1a1..2f965d9 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -58,6 +58,13 @@ PESourceImputeBlockRunRequest, PESourceImputeConditionedBlockRunRequest, ) +from microplex_us.pipelines.check_export_columns import ( + _format_report as _format_export_column_report, +) +from microplex_us.pipelines.check_export_columns import ( + compute_column_diff, + load_contract, +) from microplex_us.pipelines.donor_imputers import ( ColumnwiseQRFDonorImputer, RegimeAwareDonorImputer, @@ -83,6 +90,7 @@ PolicyEngineUSMicrosimulationAdapter, PolicyEngineUSQuantityTarget, PolicyEngineUSVariableBinding, + build_policyengine_us_export_column_names, build_policyengine_us_export_variable_maps, build_policyengine_us_time_period_arrays, compile_supported_policyengine_us_household_linear_constraints, @@ -136,10 +144,11 @@ def _normalize_household_county_fips_series( & state_numeric.notna() & state_numeric.gt(0) ) - combined.loc[county_fragment_mask] = ( - state_numeric.loc[county_fragment_mask].round().astype(int) * 1000 - + county_numeric.loc[county_fragment_mask].round().astype(int) - ) + combined.loc[county_fragment_mask] = state_numeric.loc[ + county_fragment_mask + ].round().astype(int) * 1000 + county_numeric.loc[ + county_fragment_mask + ].round().astype(int) normalized = combined.round().astype("Int64").astype("string").str.zfill(5) invalid = combined.isna() | combined.le(0) return normalized.mask(invalid).astype("string") @@ -206,7 +215,9 @@ def _attach_household_census_geographies( assigned_blocks = pd.Series(pd.NA, index=result.index, dtype="string") state_values = _normalize_household_state_fips_series(result["state_fips"]) county_values = ( - _normalize_household_county_fips_series(result["county_fips"], result["state_fips"]) + _normalize_household_county_fips_series( + result["county_fips"], result["state_fips"] + ) if "county_fips" in result.columns else pd.Series(pd.NA, index=result.index, dtype="string") ) @@ -1697,6 +1708,14 @@ class USMicroplexBuildConfig: policyengine_baseline_dataset: str | None = None policyengine_dataset_year: int | None = None policyengine_direct_override_variables: tuple[str, ...] = () + policyengine_export_column_contract_path: str | Path | None = None + """Optional eCPS export-column contract checked before calibration. + + When set, the pipeline verifies the final H5 column surface from the + post-imputation PE entity tables, then fails before microsimulation or + calibration if required columns are missing or forbidden columns would be + exported. + """ policyengine_prefer_existing_tax_unit_ids: bool = True policyengine_quantity_targets: tuple[PolicyEngineUSQuantityTarget, ...] = () policyengine_targets_db: str | None = None @@ -2120,6 +2139,10 @@ def build_from_frames( "US microplex build: post-imputation checkpoint saved", path=str(self.config.pipeline_checkpoint_save_post_imputation_path), ) + self._check_policyengine_export_column_contract( + synthetic_tables, + stage="pre_calibration", + ) _emit_us_pipeline_progress( "US microplex build: policyengine calibration start", backend=self.config.calibration_backend, @@ -2365,9 +2388,7 @@ def prepare_seed_data_from_source( seed_data["tenure"] = seed_data["tenure"].fillna(0).astype(int) seed_data["state_fips"] = seed_data["state_fips"].fillna(0).astype(int) seed_data["county_fips"] = ( - seed_data["county_fips"] - .map(normalize_us_county_fips) - .fillna("00000") + seed_data["county_fips"].map(normalize_us_county_fips).fillna("00000") ) if "block_geoid" in seed_data.columns: seed_data["block_geoid"] = seed_data["block_geoid"].fillna("").astype(str) @@ -3918,6 +3939,49 @@ def _append_stage_summary( warnings.warn(message, stacklevel=2) return updated_tables, calibrated_persons, summary + def _check_policyengine_export_column_contract( + self, + tables: PolicyEngineUSEntityTableBundle, + *, + stage: str, + ) -> None: + contract_path = self.config.policyengine_export_column_contract_path + if contract_path is None: + return + + tax_benefit_system = self._resolve_policyengine_tax_benefit_system() + contract = load_contract(Path(contract_path)) + present = build_policyengine_us_export_column_names( + tables, + tax_benefit_system=tax_benefit_system, + direct_override_variables=self.config.policyengine_direct_override_variables, + ) + diff = compute_column_diff( + present, + required=set(contract["required"]), + forbidden=set(contract["forbidden"]), + optional=set(contract["ecps_internal_optional"]), + excluded=set(contract.get("formula_owned_excluded", [])), + ) + _emit_us_pipeline_progress( + "US microplex build: policyengine export columns check complete", + stage=stage, + status="pass" if diff.ok else "fail", + columns_present=int(len(present)), + missing_required=int(len(diff.missing_required)), + forbidden_present=int(len(diff.forbidden_present)), + ) + if diff.ok: + return + report = _format_export_column_report( + diff, + source=f"{stage}:{contract_path}", + n_present=len(present), + n_required=len(contract["required"]), + n_forbidden=len(contract["forbidden"]), + ) + raise ValueError(report) + def _build_forbes_fixed_spine(self) -> ForbesFixedSpine | None: path = self.config.forbes_fixed_spine_records_path if path is None: diff --git a/src/microplex_us/policyengine/__init__.py b/src/microplex_us/policyengine/__init__.py index 87d7c1f..2eaf058 100644 --- a/src/microplex_us/policyengine/__init__.py +++ b/src/microplex_us/policyengine/__init__.py @@ -31,6 +31,7 @@ PolicyEngineUSMicrosimulationAdapter, PolicyEngineUSQuantityTarget, PolicyEngineUSVariableBinding, + build_policyengine_us_export_column_names, build_policyengine_us_time_period_arrays, compile_policyengine_us_household_linear_constraints, compute_policyengine_us_definition_hash, @@ -72,6 +73,7 @@ "PolicyEngineUSMicrosimulationAdapter", "PolicyEngineUSQuantityTarget", "PolicyEngineUSVariableBinding", + "build_policyengine_us_export_column_names", "build_policyengine_us_time_period_arrays", "compile_policyengine_us_household_linear_constraints", "compute_policyengine_us_definition_hash", diff --git a/src/microplex_us/policyengine/us.py b/src/microplex_us/policyengine/us.py index 0b374f3..92519b1 100644 --- a/src/microplex_us/policyengine/us.py +++ b/src/microplex_us/policyengine/us.py @@ -33,6 +33,16 @@ "congressional_district_geoid", } +NYC_FULL_COUNTY_FIPS: frozenset[int] = frozenset( + { + 36005, # Bronx + 36047, # Kings (Brooklyn) + 36061, # New York (Manhattan) + 36081, # Queens + 36085, # Richmond (Staten Island) + } +) + @dataclass(frozen=True) class PolicyEngineUSConstraint: @@ -430,9 +440,31 @@ class PolicyEngineUSVariableMaterializationResult: } | set(POLICYENGINE_US_TAKEUP_INPUT_VARIABLES) POLICYENGINE_US_EXPORT_COLUMN_ALIASES: dict[str, str] = { + # policyengine-us #8507 made monthly_hours_worked derive from + # hours_worked_last_week. Microplex source frames still often carry the + # annualized eCPS-compatible ``hours_worked`` name, so expose it through + # the persisted leaf input when no explicit last-week field is present. + "hours_worked": "hours_worked_last_week", "race": "cps_race", } +POLICYENGINE_US_STRUCTURAL_EXPORT_COLUMNS: frozenset[str] = frozenset( + { + "household_id", + "person_id", + "person_household_id", + "household_weight", + "tax_unit_id", + "person_tax_unit_id", + "spm_unit_id", + "person_spm_unit_id", + "family_id", + "person_family_id", + "marital_unit_id", + "person_marital_unit_id", + } +) + POLICYENGINE_US_EXPORT_DEFAULTS: dict[str, Any] = { "auto_loan_balance": 0.0, # American Opportunity Tax Credit factual eligibility inputs. The @@ -481,7 +513,9 @@ class PolicyEngineUSVariableMaterializationResult: "first_home_mortgage_interest": 0.0, "first_home_mortgage_origination_year": 0, "free_school_meals_reported": 0.0, + "fsla_overtime_premium": 0.0, "has_champva_health_coverage_at_interview": False, + "has_itin": True, "has_indian_health_service_coverage_at_interview": False, "has_marketplace_health_coverage_at_interview": False, "has_medicaid_health_coverage_at_interview": False, @@ -489,6 +523,7 @@ class PolicyEngineUSVariableMaterializationResult: "has_non_marketplace_direct_purchase_health_coverage_at_interview": False, "has_other_means_tested_health_coverage_at_interview": False, "has_tricare_health_coverage_at_interview": False, + "has_tin": True, "has_valid_ssn": True, "has_va_health_coverage_at_interview": False, "home_mortgage_interest": 0, @@ -498,6 +533,7 @@ class PolicyEngineUSVariableMaterializationResult: "household_vehicles_value": 0, "immigration_status_str": "CITIZEN", "investment_interest_expense": 0, + "in_nyc": False, "is_computer_scientist": False, "is_executive_administrative_professional": False, "is_farmer_fisher": False, @@ -516,6 +552,7 @@ class PolicyEngineUSVariableMaterializationResult: "is_union_member_or_covered": False, "is_wic_at_nutritional_risk": True, "keogh_distributions": 0, + "meets_ssi_disability_criteria": False, "net_worth": 0, "other_health_insurance_premiums": 0, "other_type_retirement_account_distributions": 0, @@ -1710,9 +1747,7 @@ def _contract_forbidden_export_columns() -> frozenset[str]: gate. """ contract_path = ( - Path(__file__).resolve().parents[1] - / "pipelines" - / "ecps_export_contract.json" + Path(__file__).resolve().parents[1] / "pipelines" / "ecps_export_contract.json" ) payload = json.loads(contract_path.read_text()) return frozenset(str(name) for name in payload.get("forbidden", ())) @@ -3001,11 +3036,12 @@ def build_policyengine_us_export_variable_maps( variable_metadata, direct_override_variables=direct_override_variables, ) + household_table = _with_policyengine_household_export_derivatives(tables.households) person_table = _with_policyengine_person_export_derivatives(tables.persons) table_specs = ( ( "household", - tables.households, + household_table, {"household_id", "household_weight", "weight"}, ), ("person", person_table, {"person_id", "household_id"}), @@ -3023,6 +3059,47 @@ def build_policyengine_us_export_variable_maps( return export_maps +def build_policyengine_us_export_column_names( + tables: PolicyEngineUSEntityTableBundle, + *, + tax_benefit_system: Any | None = None, + simulation_cls: Any | None = None, + direct_override_variables: tuple[str, ...] = (), +) -> set[str]: + """Return the final PE-US H5 column names without materializing arrays. + + This is the schema-only counterpart to + :func:`build_policyengine_us_time_period_arrays` + + :func:`write_policyengine_us_time_period_dataset`. It lets slow builds + verify the eCPS export contract from the saved post-imputation entity + tables, before microsimulation/calibration changes weights. + """ + if tables.persons is None: + raise ValueError("PolicyEngine US export requires a person table") + if tax_benefit_system is None: + tax_benefit_system = _resolve_policyengine_us_tax_benefit_system( + simulation_cls=simulation_cls + ) + export_maps = build_policyengine_us_export_variable_maps( + tables, + tax_benefit_system=tax_benefit_system, + direct_override_variables=direct_override_variables, + ) + exported_inputs = { + target + for variable_map in export_maps.values() + for target in variable_map.values() + } + excluded_variables = resolve_policyengine_excluded_export_variables( + tax_benefit_system, + sorted(exported_inputs), + direct_override_variables=direct_override_variables, + ) + return ( + exported_inputs - excluded_variables + ) | POLICYENGINE_US_STRUCTURAL_EXPORT_COLUMNS + + def build_policyengine_us_time_period_arrays( tables: PolicyEngineUSEntityTableBundle, *, @@ -3042,12 +3119,16 @@ def build_policyengine_us_time_period_arrays( raise ValueError("PolicyEngine US export requires a person table") period_key = str(period) + household_table = _with_policyengine_household_export_derivatives(tables.households) households = _prepare_household_export_table( - tables.households, + household_table, household_id_column=household_id_column, household_weight_column=household_weight_column, ) - person_table = _with_policyengine_person_export_derivatives(tables.persons) + person_table = _with_policyengine_person_export_derivatives( + tables.persons, + period=int(period), + ) persons = _prepare_person_export_table( person_table, person_id_column=person_id_column, @@ -3406,6 +3487,42 @@ def _normalize_h5_value(values: Any) -> np.ndarray: return array +def _with_policyengine_household_export_derivatives( + households: pd.DataFrame | None, +) -> pd.DataFrame | None: + """Attach eCPS persisted household fields derivable from table columns.""" + if households is None or "in_nyc" in households.columns: + return households + if "county_fips" not in households.columns: + return households + + household_table = households.copy() + county_numeric = pd.to_numeric(household_table["county_fips"], errors="coerce") + if "state_fips" in household_table.columns: + state_numeric = pd.to_numeric(household_table["state_fips"], errors="coerce") + else: + state_numeric = pd.Series(np.nan, index=household_table.index) + full_county_fips = county_numeric.copy() + county_fragment = ( + county_numeric.notna() + & county_numeric.gt(0) + & county_numeric.lt(1000) + & state_numeric.notna() + ) + full_county_fips.loc[county_fragment] = state_numeric.loc[ + county_fragment + ].round().astype(int) * 1000 + county_numeric.loc[county_fragment].round().astype( + int + ) + household_table["in_nyc"] = ( + full_county_fips.round() + .astype("Int64") + .isin(NYC_FULL_COUNTY_FIPS) + .fillna(False) + ) + return household_table + + def _prepare_household_export_table( households: pd.DataFrame, *, @@ -3463,21 +3580,226 @@ def _prepare_person_export_table( def _with_policyengine_person_export_derivatives( persons: pd.DataFrame | None, + *, + period: int | None = None, ) -> pd.DataFrame | None: - if persons is None or "is_household_head" in persons.columns: - return persons - if "relationship_to_head" not in persons.columns: + if persons is None: return persons person_table = persons.copy() - relationship = pd.to_numeric( - person_table["relationship_to_head"], - errors="coerce", - ) - person_table["is_household_head"] = relationship.eq(0).fillna(False) + if ( + "is_household_head" not in person_table.columns + and "relationship_to_head" in person_table.columns + ): + relationship = pd.to_numeric( + person_table["relationship_to_head"], + errors="coerce", + ) + person_table["is_household_head"] = relationship.eq(0).fillna(False) + + if ( + "hours_worked_last_week" not in person_table.columns + and "hours_worked" in person_table.columns + ): + person_table["hours_worked_last_week"] = pd.to_numeric( + person_table["hours_worked"], + errors="coerce", + ).fillna(0.0) + if "has_tin" not in person_table.columns: + person_table["has_tin"] = _derive_has_tin_for_export(person_table) + if "has_itin" not in person_table.columns: + person_table["has_itin"] = person_table["has_tin"].astype(bool) + if "meets_ssi_disability_criteria" not in person_table.columns: + person_table["meets_ssi_disability_criteria"] = ( + _derive_meets_ssi_disability_criteria_for_export(person_table) + ) + if "fsla_overtime_premium" not in person_table.columns: + fsla_premium = _derive_flsa_overtime_premium_for_export( + person_table, + period=period, + ) + if fsla_premium is not None: + person_table["fsla_overtime_premium"] = fsla_premium return person_table +def _derive_has_tin_for_export(persons: pd.DataFrame) -> pd.Series: + """Mirror PE-US has_tin default while honoring MP's SSN-card type signal.""" + if "ssn_card_type" not in persons.columns: + return pd.Series(True, index=persons.index, dtype=bool) + ssn_card_type = persons["ssn_card_type"].astype("string").str.upper() + return ssn_card_type.ne("NONE").fillna(True) + + +def _derive_meets_ssi_disability_criteria_for_export( + persons: pd.DataFrame, +) -> pd.Series: + """Approximate eCPS's persisted SSI disability input from available signals.""" + signals: list[pd.Series] = [] + bool_columns = ( + "is_disabled", + "difficulty_seeing", + "difficulty_hearing", + "difficulty_walking_or_climbing_stairs", + "difficulty_dressing_or_bathing", + "difficulty_doing_errands", + "difficulty_remembering_or_making_decisions", + ) + for column in bool_columns: + if column in persons.columns: + signals.append(_truthy_series(persons[column], index=persons.index)) + amount_columns = ( + "ssi", + "ssi_reported", + "disability_benefits", + "social_security_disability", + ) + for column in amount_columns: + if column in persons.columns: + signals.append( + pd.to_numeric(persons[column], errors="coerce").fillna(0.0).gt(0) + ) + if not signals: + return pd.Series(False, index=persons.index, dtype=bool) + result = signals[0].copy() + for signal in signals[1:]: + result |= signal + return result.fillna(False).astype(bool) + + +def _truthy_series(values: pd.Series, *, index: pd.Index) -> pd.Series: + if values.dtype == bool: + return values.fillna(False) + if pd.api.types.is_numeric_dtype(values): + return pd.to_numeric(values, errors="coerce").fillna(0.0).gt(0) + return ( + values.astype("string") + .str.upper() + .isin({"TRUE", "T", "YES", "Y", "1"}) + .reindex(index, fill_value=False) + ) + + +def _derive_flsa_overtime_premium_for_export( + persons: pd.DataFrame, + *, + period: int | None, +) -> pd.Series | None: + """Derive the data-backed FLSA overtime proxy when ORG inputs are present.""" + required_columns = { + "employment_income", + "hours_worked_last_week", + "weeks_worked", + "is_paid_hourly", + "has_never_worked", + "is_military", + "is_executive_administrative_professional", + "is_farmer_fisher", + "is_computer_scientist", + } + if not required_columns.issubset(persons.columns): + return None + + ( + hce_salary_threshold, + salary_basis_threshold, + computer_salary_threshold, + hours_threshold, + rate_multiplier, + ) = _flsa_overtime_policy_for_export(period or 2024) + + employment_income = ( + pd.to_numeric(persons["employment_income"], errors="coerce") + .fillna(0.0) + .clip(lower=0) + ) + hours_worked_last_week = ( + pd.to_numeric(persons["hours_worked_last_week"], errors="coerce") + .fillna(0.0) + .clip(lower=0) + ) + weeks_worked = ( + pd.to_numeric(persons["weeks_worked"], errors="coerce") + .fillna(0.0) + .clip(lower=0) + ) + overtime_hours = (hours_worked_last_week - hours_threshold).clip(lower=0) + straight_time_equivalent_hours = ( + hours_worked_last_week.clip(upper=hours_threshold) + + overtime_hours * rate_multiplier + ) + premium_share = pd.Series(0.0, index=persons.index, dtype=float) + positive_hours = straight_time_equivalent_hours.gt(0) + premium_share.loc[positive_hours] = ( + (rate_multiplier - 1) * overtime_hours.loc[positive_hours] + ) / straight_time_equivalent_hours.loc[positive_hours] + + is_paid_hourly = _truthy_series(persons["is_paid_hourly"], index=persons.index) + has_never_worked = _truthy_series(persons["has_never_worked"], index=persons.index) + is_military = _truthy_series(persons["is_military"], index=persons.index) + is_eap = _truthy_series( + persons["is_executive_administrative_professional"], + index=persons.index, + ) + is_farmer_fisher = _truthy_series(persons["is_farmer_fisher"], index=persons.index) + is_computer_scientist = _truthy_series( + persons["is_computer_scientist"], + index=persons.index, + ) + + salary_threshold = pd.Series( + hce_salary_threshold, + index=persons.index, + dtype=float, + ) + salary_threshold.loc[is_computer_scientist] = min( + computer_salary_threshold, + hce_salary_threshold, + ) + salary_threshold.loc[is_eap | is_farmer_fisher] = min( + salary_basis_threshold, + hce_salary_threshold, + ) + always_exempt = has_never_worked | is_military + salary_threshold.loc[always_exempt] = 0 + is_exempt = always_exempt | ( + employment_income.ge(salary_threshold) & ~is_paid_hourly + ) + eligible = ~is_exempt & weeks_worked.gt(0) + premium = pd.Series(0.0, index=persons.index, dtype=float) + premium.loc[eligible] = ( + employment_income.loc[eligible] * premium_share.loc[eligible] + ) + return premium.clip(lower=0, upper=employment_income).astype(np.float32) + + +@lru_cache(maxsize=8) +def _flsa_overtime_policy_for_export( + period: int, +) -> tuple[float, float, float, float, float]: + """Return eCPS-compatible FLSA overtime thresholds for one year.""" + try: + import policyengine_us + from policyengine_us.model_api import WEEKS_IN_YEAR + + system = getattr(policyengine_us.system, "system", policyengine_us.system) + overtime = system.parameters( + f"{int(period)}-01-01" + ).gov.irs.income.exemption.overtime + hours_threshold = float(overtime.hours_threshold) + return ( + float(overtime.hce_salary_threshold), + float(overtime.salary_basis_threshold) * float(WEEKS_IN_YEAR), + float(overtime.computer_salary_threshold) + * hours_threshold + * float(WEEKS_IN_YEAR), + hours_threshold, + float(overtime.rate_multiplier), + ) + except Exception: + return (132_964.0, 35_568.0, 57_470.4, 40.0, 1.5) + + def _resolve_person_group_ids( *, group_name: str, diff --git a/tests/pipelines/test_check_export_columns.py b/tests/pipelines/test_check_export_columns.py index 58b580d..34fb68a 100644 --- a/tests/pipelines/test_check_export_columns.py +++ b/tests/pipelines/test_check_export_columns.py @@ -150,6 +150,32 @@ def test_main_h5_path_accepts_flat_datasets(tmp_path, contract_path): assert rc == 0 +def test_main_entity_tables_path_uses_schema_columns( + tmp_path, contract_path, monkeypatch +): + checkpoint_dir = tmp_path / "post-imputation" + + def fake_columns(path, *, direct_override_variables): + assert path == checkpoint_dir + assert direct_override_variables == ("non_sch_d_capital_gains",) + return {"age", "snap", "employment_income"} + + monkeypatch.setattr(cec, "_columns_from_entity_tables", fake_columns) + + rc = main( + [ + "--entity-tables", + str(checkpoint_dir), + "--direct-override-variable", + "non_sch_d_capital_gains", + "--contract", + str(contract_path), + ] + ) + + assert rc == 0 + + def test_main_requires_exactly_one_input(tmp_path, contract_path): # Neither input -> argparse error (SystemExit code 2). with pytest.raises(SystemExit) as exc: diff --git a/tests/policyengine/test_us.py b/tests/policyengine/test_us.py index 03b7bee..002d772 100644 --- a/tests/policyengine/test_us.py +++ b/tests/policyengine/test_us.py @@ -32,6 +32,7 @@ PolicyEngineUSStratum, PolicyEngineUSTargetValidationError, PolicyEngineUSVariableBinding, + build_policyengine_us_export_column_names, build_policyengine_us_export_variable_maps, build_policyengine_us_time_period_arrays, compile_policyengine_us_household_linear_constraints, @@ -1800,6 +1801,73 @@ def test_builds_structural_time_period_arrays_from_entity_tables(self): np.testing.assert_array_equal(arrays["age"]["2024"], np.array([34, 12, 45])) np.testing.assert_allclose(arrays["snap"]["2024"], np.array([1200.0, 300.0])) + def test_export_column_names_match_written_time_period_arrays(self): + class FakeEntity: + def __init__(self, key): + self.key = key + + class FakeVariable: + def __init__(self, entity): + self.entity = FakeEntity(entity) + + class FakeSystem: + variables = { + "age": FakeVariable("person"), + "snap": FakeVariable("spm_unit"), + "state_code": FakeVariable("household"), + } + + tables = PolicyEngineUSEntityTableBundle( + households=pd.DataFrame( + { + "household_id": [10], + "household_weight": [1.5], + "state_code": ["CA"], + } + ), + persons=pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 10], + "tax_unit_id": [100, 100], + "spm_unit_id": [1000, 1000], + "family_id": [5000, 5000], + "marital_unit_id": [7000, 7000], + "age": [34, 12], + } + ), + tax_units=pd.DataFrame({"tax_unit_id": [100], "household_id": [10]}), + spm_units=pd.DataFrame( + {"spm_unit_id": [1000], "household_id": [10], "snap": [1200.0]} + ), + families=pd.DataFrame({"family_id": [5000], "household_id": [10]}), + marital_units=pd.DataFrame( + {"marital_unit_id": [7000], "household_id": [10]} + ), + ) + + export_maps = build_policyengine_us_export_variable_maps( + tables, + tax_benefit_system=FakeSystem(), + ) + arrays = build_policyengine_us_time_period_arrays( + tables, + period=2024, + household_variable_map=export_maps["household"], + person_variable_map=export_maps["person"], + spm_unit_variable_map=export_maps["spm_unit"], + ) + columns = build_policyengine_us_export_column_names( + tables, + tax_benefit_system=FakeSystem(), + ) + excluded = resolve_policyengine_excluded_export_variables( + FakeSystem(), + sorted(arrays), + ) + + assert columns == set(arrays) - excluded + def test_derives_household_head_export_from_relationship_to_head(self): tables = PolicyEngineUSEntityTableBundle( households=pd.DataFrame( @@ -2410,7 +2478,7 @@ def test_default_policyengine_us_export_surface_avoids_formula_aggregates(self): assert "health_savings_account_ald" in SAFE_POLICYENGINE_US_EXPORT_VARIABLES assert "filing_status" not in SAFE_POLICYENGINE_US_EXPORT_VARIABLES assert "rent" not in SAFE_POLICYENGINE_US_EXPORT_VARIABLES - assert "social_security_retirement" not in SAFE_POLICYENGINE_US_EXPORT_VARIABLES + assert "social_security_retirement" in SAFE_POLICYENGINE_US_EXPORT_VARIABLES assert ( "social_security_retirement_reported" not in SAFE_POLICYENGINE_US_EXPORT_VARIABLES @@ -2424,20 +2492,17 @@ def test_default_policyengine_us_export_surface_avoids_formula_aggregates(self): ) assert ( "traditional_ira_contributions_desired" - not in SAFE_POLICYENGINE_US_EXPORT_VARIABLES + in SAFE_POLICYENGINE_US_EXPORT_VARIABLES ) assert "roth_ira_contributions" not in SAFE_POLICYENGINE_US_EXPORT_VARIABLES - assert ( - "roth_ira_contributions_desired" - not in SAFE_POLICYENGINE_US_EXPORT_VARIABLES - ) + assert "roth_ira_contributions_desired" in SAFE_POLICYENGINE_US_EXPORT_VARIABLES assert ( "self_employed_pension_contributions" not in SAFE_POLICYENGINE_US_EXPORT_VARIABLES ) assert ( "self_employed_pension_contributions_desired" - not in SAFE_POLICYENGINE_US_EXPORT_VARIABLES + in SAFE_POLICYENGINE_US_EXPORT_VARIABLES ) assert "non_sch_d_capital_gains" in SAFE_POLICYENGINE_US_EXPORT_VARIABLES assert "receives_wic" in SAFE_POLICYENGINE_US_EXPORT_VARIABLES @@ -2663,6 +2728,93 @@ class FakeSystem: "takes_up_snap_if_eligible": "takes_up_snap_if_eligible", }.items() <= export_maps["spm_unit"].items() + def test_time_period_arrays_derive_ecps_persisted_computed_inputs(self): + class FakeEntity: + def __init__(self, key): + self.key = key + + class FakeVariable: + def __init__(self, entity, formulas=None): + self.entity = FakeEntity(entity) + self.formulas = formulas or {} + + class FakeSystem: + variables = { + "fsla_overtime_premium": FakeVariable("person"), + "has_itin": FakeVariable("person", formulas={"2024": object()}), + "has_tin": FakeVariable("person", formulas={"2024": object()}), + "hours_worked_last_week": FakeVariable("person"), + "in_nyc": FakeVariable("household", formulas={"2024": object()}), + "meets_ssi_disability_criteria": FakeVariable("person"), + } + + tables = PolicyEngineUSEntityTableBundle( + households=pd.DataFrame( + { + "household_id": [10, 11], + "household_weight": [1.0, 2.0], + "state_fips": [36, 6], + "county_fips": [61, 1], + } + ), + persons=pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 11], + "ssn_card_type": ["NONE", "CITIZEN"], + "age": [40, 70], + "difficulty_hearing": [True, False], + "ssi": [0.0, 0.0], + "employment_income": [0.0, 55_000.0], + "hours_worked": [0.0, 50.0], + "weeks_worked": [0.0, 52.0], + "is_paid_hourly": [False, True], + "has_never_worked": [False, False], + "is_military": [False, False], + "is_executive_administrative_professional": [False, False], + "is_farmer_fisher": [False, False], + "is_computer_scientist": [False, False], + } + ), + ) + + export_maps = build_policyengine_us_export_variable_maps( + tables, + tax_benefit_system=FakeSystem(), + ) + arrays = build_policyengine_us_time_period_arrays( + tables, + period=2024, + household_variable_map=export_maps["household"], + person_variable_map=export_maps["person"], + ) + + columns = build_policyengine_us_export_column_names( + tables, + tax_benefit_system=FakeSystem(), + ) + + assert arrays["in_nyc"]["2024"].tolist() == [True, False] + assert arrays["has_tin"]["2024"].tolist() == [False, True] + assert arrays["has_itin"]["2024"].tolist() == [False, True] + assert arrays["hours_worked_last_week"]["2024"].tolist() == [0.0, 50.0] + assert arrays["meets_ssi_disability_criteria"]["2024"].tolist() == [ + True, + False, + ] + np.testing.assert_allclose( + arrays["fsla_overtime_premium"]["2024"], + np.array([0.0, 5_000.0], dtype=np.float32), + ) + assert { + "fsla_overtime_premium", + "has_itin", + "has_tin", + "hours_worked_last_week", + "in_nyc", + "meets_ssi_disability_criteria", + }.issubset(columns) + def test_projects_frame_and_writes_time_period_dataset(self, tmp_path): frame = pd.DataFrame( {