diff --git a/changelog.d/1156.changed.md b/changelog.d/1156.changed.md new file mode 100644 index 000000000..ef0aafc45 --- /dev/null +++ b/changelog.d/1156.changed.md @@ -0,0 +1 @@ +- Replaced the in-repo tax-unit construction engine and rule helpers with a dependency on the standalone `microunit` package, re-pointing all call sites. Tax-unit output is unchanged. diff --git a/policyengine_us_data/datasets/acs/acs_to_cps_columns.py b/policyengine_us_data/datasets/acs/acs_to_cps_columns.py index ea3b5a372..72aaf9f59 100644 --- a/policyengine_us_data/datasets/acs/acs_to_cps_columns.py +++ b/policyengine_us_data/datasets/acs/acs_to_cps_columns.py @@ -1,6 +1,6 @@ """ Map ACS PUMS person records onto the CPS-like columns consumed by -``policyengine_us_data.datasets.cps.tax_unit_construction``. +``microunit.construct_tax_units``. Column contract: diff --git a/policyengine_us_data/datasets/acs/tax_unit_construction.py b/policyengine_us_data/datasets/acs/tax_unit_construction.py index ba447c731..d27635fa4 100644 --- a/policyengine_us_data/datasets/acs/tax_unit_construction.py +++ b/policyengine_us_data/datasets/acs/tax_unit_construction.py @@ -2,13 +2,11 @@ import pandas as pd +from microunit import POLICYENGINE_MODE, construct_tax_units + from policyengine_us_data.datasets.acs.acs_to_cps_columns import ( acs_person_to_cps_tax_unit_columns, ) -from policyengine_us_data.datasets.cps.tax_unit_construction import ( - POLICYENGINE_MODE, - construct_tax_units, -) def construct_tax_units_acs( diff --git a/policyengine_us_data/datasets/cps/census_cps.py b/policyengine_us_data/datasets/cps/census_cps.py index 37f85fb86..2288a2dea 100644 --- a/policyengine_us_data/datasets/cps/census_cps.py +++ b/policyengine_us_data/datasets/cps/census_cps.py @@ -5,9 +5,7 @@ from zipfile import ZipFile import pandas as pd from policyengine_us_data.storage import STORAGE_FOLDER -from policyengine_us_data.datasets.cps.tax_unit_construction import ( - construct_tax_units, -) +from microunit import construct_tax_units OPTIONAL_PERSON_COLUMNS = { diff --git a/policyengine_us_data/datasets/cps/tax_unit_construction.py b/policyengine_us_data/datasets/cps/tax_unit_construction.py deleted file mode 100644 index 4bd1ef023..000000000 --- a/policyengine_us_data/datasets/cps/tax_unit_construction.py +++ /dev/null @@ -1,890 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any - -import numpy as np -import pandas as pd - -from policyengine_us_data.datasets.cps.tax_unit_rule_helpers import ( - REFERENCE_PERSON_CODES, - dependent_gross_income_limit, - qualifying_child_age_test, - reference_relationship_allows_qualifying_child, - reference_relationship_allows_qualifying_relative, - related_to_head_or_spouse as reference_related_to_head_or_spouse, -) - - -HEAD = "HEAD" -SPOUSE = "SPOUSE" -DEPENDENT = "DEPENDENT" - -POLICYENGINE_MODE = "policyengine" -CENSUS_DOCUMENTED_MODE = "census_documented" -SUPPORTED_TAX_UNIT_CONSTRUCTION_MODES = frozenset( - { - POLICYENGINE_MODE, - CENSUS_DOCUMENTED_MODE, - } -) -DISABILITY_FLAGS = ( - "PEDISDRS", - "PEDISEAR", - "PEDISEYE", - "PEDISOUT", - "PEDISPHY", - "PEDISREM", -) -_GROSS_INCOME_COLUMN = "_tax_unit_gross_income" -_CLAIMANT_INCOME_COLUMN = "_tax_unit_claimant_income" -_TOTAL_MONEY_INCOME_COLUMN = "_tax_unit_total_money_income" -_HAS_DISABILITY_COLUMN = "_tax_unit_has_disability" -_IS_FULL_TIME_STUDENT_COLUMN = "_tax_unit_is_full_time_student" - - -@dataclass(frozen=True) -class _HouseholdPerson: - index: int - household_id: int - line_no: int - age: int - relationship_code: int | None - marital_status: int - spouse_line: int | None - parent_lines: tuple[int, ...] - gross_income: float - claimant_income: float - total_money_income: float - is_full_time_student: bool - is_permanently_disabled: bool - - @property - def starts_base_unit(self) -> bool: - return self.age >= 18 or self.marital_status in {1, 2, 3, 4, 5, 6} - - @property - def married_spouse_present(self) -> bool: - return self.marital_status in {1, 2} and self.spouse_line is not None - - -@dataclass -class _BaseTaxUnit: - key: tuple - household_id: int - head_index: int - spouse_index: int | None = None - claimant_lines: tuple[int, ...] = () - claimant_income: float = 0.0 - total_money_income: float = 0.0 - head_age: int = 0 - - -@dataclass(frozen=True) -class _ClaimCandidate: - unit_key: tuple - priority: int - score: tuple[Any, ...] - - -def _to_optional_positive_int(value) -> int | None: - if pd.isna(value): - return None - value = int(value) - return value if value > 0 else None - - -def _to_optional_parent_line(value) -> int | None: - if pd.isna(value): - return None - value = int(value) - return value if value > 0 else None - - -def _numeric_array( - person: pd.DataFrame, - column: str, - *, - default: float = 0, -) -> np.ndarray: - if column not in person: - return np.full(len(person), default, dtype=float) - series = person[column] - if pd.api.types.is_numeric_dtype(series): - values = series.to_numpy(dtype=float, copy=False) - else: - values = pd.to_numeric(series, errors="coerce").to_numpy( - dtype=float, - copy=False, - ) - return np.nan_to_num(values, nan=default) - - -def _positive_series(person: pd.DataFrame, column: str) -> np.ndarray: - values = _numeric_array(person, column) - return np.maximum(values, 0) - - -def estimate_dependent_gross_income(person: pd.DataFrame) -> np.ndarray: - return ( - _positive_series(person, "WSAL_VAL") - + _positive_series(person, "SEMP_VAL") - + _positive_series(person, "FRSE_VAL") - + _positive_series(person, "INT_VAL") - + _positive_series(person, "DIV_VAL") - + _positive_series(person, "RNT_VAL") - + _positive_series(person, "CAP_VAL") - + _positive_series(person, "UC_VAL") - + _positive_series(person, "OI_VAL") - + _positive_series(person, "ANN_VAL") - + _positive_series(person, "PNSN_VAL") - ) - - -def _estimate_claimant_income(person: pd.DataFrame) -> np.ndarray: - return estimate_dependent_gross_income(person) + _positive_series(person, "SS_VAL") - - -def _has_disability(person: pd.DataFrame) -> np.ndarray: - has_disability = np.zeros(len(person), dtype=bool) - for flag in DISABILITY_FLAGS: - if flag in person: - has_disability |= _numeric_array(person, flag) == 1 - return has_disability - - -def _is_full_time_student(person: pd.DataFrame) -> np.ndarray: - enrolled_values = _numeric_array(person, "A_ENRLW") - full_time_values = _numeric_array(person, "A_FTPT") - school_level_values = _numeric_array(person, "A_HSCOL") - # Limit this to tax-unit construction: CPS TAX_ID behavior treats current - # high-school or college enrollment as strong student evidence for young - # adults even when the full-time flag is absent or part-time. - return ((enrolled_values == 1) & (full_time_values == 1)) | ( - (enrolled_values == 1) & np.isin(school_level_values, [1, 2]) - ) - - -def _precompute_tax_unit_inputs(person: pd.DataFrame) -> pd.DataFrame: - gross_income = estimate_dependent_gross_income(person) - person[_GROSS_INCOME_COLUMN] = gross_income - person[_CLAIMANT_INCOME_COLUMN] = gross_income + _positive_series(person, "SS_VAL") - person[_TOTAL_MONEY_INCOME_COLUMN] = ( - _numeric_array(person, "PTOTVAL") - if "PTOTVAL" in person - else person[_CLAIMANT_INCOME_COLUMN].to_numpy(dtype=float, copy=False) - ) - person[_HAS_DISABILITY_COLUMN] = _has_disability(person) - person[_IS_FULL_TIME_STUDENT_COLUMN] = _is_full_time_student(person) - return person - - -def _prepare_household_people( - household: pd.DataFrame, - household_id: int, -) -> list[_HouseholdPerson]: - gross_income = ( - household[_GROSS_INCOME_COLUMN].to_numpy(dtype=float, copy=False) - if _GROSS_INCOME_COLUMN in household - else estimate_dependent_gross_income(household) - ) - claimant_income = ( - household[_CLAIMANT_INCOME_COLUMN].to_numpy(dtype=float, copy=False) - if _CLAIMANT_INCOME_COLUMN in household - else _estimate_claimant_income(household) - ) - total_money_income = ( - household[_TOTAL_MONEY_INCOME_COLUMN].to_numpy(dtype=float, copy=False) - if _TOTAL_MONEY_INCOME_COLUMN in household - else _numeric_array(household, "PTOTVAL") - if "PTOTVAL" in household - else claimant_income.copy() - ) - has_disability = ( - household[_HAS_DISABILITY_COLUMN].to_numpy(dtype=bool, copy=False) - if _HAS_DISABILITY_COLUMN in household - else _has_disability(household) - ) - is_full_time_student = ( - household[_IS_FULL_TIME_STUDENT_COLUMN].to_numpy(dtype=bool, copy=False) - if _IS_FULL_TIME_STUDENT_COLUMN in household - else _is_full_time_student(household) - ) - people = [] - for row_number, (index, row) in enumerate(household.iterrows()): - line_no = int(row["A_LINENO"]) - parent_lines = tuple( - parent - for parent in ( - _to_optional_parent_line(row.get("PEPAR1", 0)), - _to_optional_parent_line(row.get("PEPAR2", 0)), - ) - if parent is not None - ) - relationship_code = row.get("A_EXPRRP") - if pd.isna(relationship_code): - relationship_code = None - else: - relationship_code = int(relationship_code) - people.append( - _HouseholdPerson( - index=index, - household_id=household_id, - line_no=line_no, - age=int(row["A_AGE"]), - relationship_code=relationship_code, - marital_status=int(row.get("A_MARITL", 7)), - spouse_line=_to_optional_positive_int(row.get("A_SPOUSE", 0)), - parent_lines=parent_lines, - gross_income=float(gross_income[row_number]), - claimant_income=float(claimant_income[row_number]), - total_money_income=float(total_money_income[row_number]), - is_full_time_student=bool(is_full_time_student[row_number]), - is_permanently_disabled=bool(has_disability[row_number]), - ) - ) - return people - - -def _choose_pair_head( - person_a: _HouseholdPerson, - person_b: _HouseholdPerson, -) -> tuple[_HouseholdPerson, _HouseholdPerson]: - if person_a.relationship_code in {code.value for code in REFERENCE_PERSON_CODES}: - return person_a, person_b - if person_b.relationship_code in {code.value for code in REFERENCE_PERSON_CODES}: - return person_b, person_a - if person_a.age != person_b.age: - return ( - (person_a, person_b) - if person_a.age > person_b.age - else (person_b, person_a) - ) - return ( - (person_a, person_b) - if person_a.line_no < person_b.line_no - else (person_b, person_a) - ) - - -def _build_base_tax_units( - people: list[_HouseholdPerson], -) -> tuple[dict[tuple, _BaseTaxUnit], dict[int, tuple], tuple | None]: - by_line = {person.line_no: person for person in people} - paired_indices: set[int] = set() - units: dict[tuple, _BaseTaxUnit] = {} - base_unit_by_person: dict[int, tuple] = {} - reference_unit_key: tuple | None = None - - married_pairs: set[tuple[int, int]] = set() - for person in people: - if not person.married_spouse_present: - continue - spouse = by_line.get(person.spouse_line) - if ( - spouse is None - or spouse.index == person.index - or not spouse.married_spouse_present - ): - continue - married_pairs.add(tuple(sorted((person.line_no, spouse.line_no)))) - - for line_a, line_b in sorted(married_pairs): - person_a = by_line[line_a] - person_b = by_line[line_b] - head, spouse = _choose_pair_head(person_a, person_b) - key = ("pair", min(line_a, line_b), max(line_a, line_b)) - unit = _BaseTaxUnit( - key=key, - household_id=head.household_id, - head_index=head.index, - spouse_index=spouse.index, - claimant_lines=(head.line_no, spouse.line_no), - claimant_income=head.claimant_income + spouse.claimant_income, - total_money_income=head.total_money_income + spouse.total_money_income, - head_age=head.age, - ) - units[key] = unit - paired_indices.update({head.index, spouse.index}) - base_unit_by_person[head.index] = key - base_unit_by_person[spouse.index] = key - if head.relationship_code in { - code.value for code in REFERENCE_PERSON_CODES - } or spouse.relationship_code in { - code.value for code in REFERENCE_PERSON_CODES - }: - reference_unit_key = key - - for person in people: - if person.index in paired_indices or not person.starts_base_unit: - continue - key = ("single", person.line_no) - units[key] = _BaseTaxUnit( - key=key, - household_id=person.household_id, - head_index=person.index, - claimant_lines=(person.line_no,), - claimant_income=person.claimant_income, - total_money_income=person.total_money_income, - head_age=person.age, - ) - base_unit_by_person[person.index] = key - if person.relationship_code in {code.value for code in REFERENCE_PERSON_CODES}: - reference_unit_key = key - - return units, base_unit_by_person, reference_unit_key - - -def _parent_candidate_units( - person: _HouseholdPerson, - base_units: dict[tuple, _BaseTaxUnit], - eligible_units: set[tuple], -) -> list[tuple]: - candidates = [] - for unit_key in eligible_units: - unit = base_units[unit_key] - if any( - parent_line in unit.claimant_lines for parent_line in person.parent_lines - ): - candidates.append(unit_key) - return candidates - - -def _reference_candidate_unit( - person: _HouseholdPerson, - reference_unit_key: tuple | None, - base_unit_key: tuple | None, - eligible_units: set[tuple], -) -> tuple | None: - if ( - reference_unit_key is None - or reference_unit_key == base_unit_key - or reference_unit_key not in eligible_units - ): - return None - return reference_unit_key - - -def _unit_income_score( - unit_key: tuple, - base_units: dict[tuple, _BaseTaxUnit], -) -> tuple[float, int, int]: - unit = base_units[unit_key] - return ( - unit.claimant_income, - unit.head_age, - -unit.claimant_lines[0], - ) - - -def _choose_best_candidate(candidates: list[_ClaimCandidate]) -> tuple | None: - if not candidates: - return None - return max( - candidates, - key=lambda candidate: (candidate.priority, candidate.score), - ).unit_key - - -def _choose_best_parent_unit_by_total_money_income( - candidate_units: list[tuple], - base_units: dict[tuple, _BaseTaxUnit], -) -> tuple | None: - if not candidate_units: - return None - return max( - candidate_units, - key=lambda key: ( - base_units[key].total_money_income, - base_units[key].claimant_income, - base_units[key].head_age, - -base_units[key].claimant_lines[0], - ), - ) - - -def _choose_main_filing_unit( - base_units: dict[tuple, _BaseTaxUnit], - reference_unit_key: tuple | None, -) -> tuple | None: - if reference_unit_key in base_units: - return reference_unit_key - if not base_units: - return None - return max( - base_units, - key=lambda key: ( - base_units[key].total_money_income, - base_units[key].claimant_income, - base_units[key].head_age, - -base_units[key].claimant_lines[0], - ), - ) - - -def _select_claimant_unit( - person: _HouseholdPerson, - year: int, - base_units: dict[tuple, _BaseTaxUnit], - base_unit_key: tuple | None, - reference_unit_key: tuple | None, - eligible_units: set[tuple], -) -> tuple | None: - parent_units = _parent_candidate_units(person, base_units, eligible_units) - age_eligible = qualifying_child_age_test( - age=person.age, - is_full_time_student=person.is_full_time_student, - is_permanently_disabled=person.is_permanently_disabled, - ) - - reference_unit = _reference_candidate_unit( - person, - reference_unit_key, - base_unit_key, - eligible_units, - ) - candidates: list[_ClaimCandidate] = [] - - if age_eligible: - candidates.extend( - _ClaimCandidate( - unit_key=unit_key, - priority=100, - score=_unit_income_score(unit_key, base_units), - ) - for unit_key in parent_units - ) - if ( - reference_unit is not None - and not person.starts_base_unit - and not person.parent_lines - and person.age < 15 - ): - candidates.append( - _ClaimCandidate( - unit_key=reference_unit, - priority=80, - score=_unit_income_score(reference_unit, base_units), - ) - ) - selected = _choose_best_candidate(candidates) - if selected is not None: - return selected - - if person.gross_income >= dependent_gross_income_limit(year): - return None - - if person.starts_base_unit: - return None - - candidates.extend( - _ClaimCandidate( - unit_key=unit_key, - priority=60, - score=_unit_income_score(unit_key, base_units), - ) - for unit_key in parent_units - ) - - if ( - reference_unit is not None - and ( - reference_relationship_allows_qualifying_relative(person.relationship_code) - or (not person.parent_lines and person.age < 15) - ) - and person.age < 15 - ): - candidates.append( - _ClaimCandidate( - unit_key=reference_unit, - priority=50, - score=_unit_income_score(reference_unit, base_units), - ) - ) - - return _choose_best_candidate(candidates) - - -def _determine_final_assignments_for_household_policyengine( - people: list[_HouseholdPerson], - year: int, -) -> tuple[dict[int, tuple], dict[int, str], dict[tuple, str], dict[int, bool]]: - base_units, base_unit_by_person, reference_unit_key = _build_base_tax_units(people) - person_by_index = {person.index: person for person in people} - - adult_claims: dict[int, tuple] = {} - adult_candidates = [ - person - for person in people - if person.starts_base_unit - and base_unit_by_person.get(person.index) in base_units - and base_units[base_unit_by_person[person.index]].spouse_index is None - ] - eligible_units = set(base_units) - for person in sorted(adult_candidates, key=lambda item: (item.age, item.line_no)): - unit_key = _select_claimant_unit( - person=person, - year=year, - base_units=base_units, - base_unit_key=base_unit_by_person.get(person.index), - reference_unit_key=reference_unit_key, - eligible_units=eligible_units, - ) - if unit_key is not None: - adult_claims[person.index] = unit_key - claimed_person_unit_key = base_unit_by_person.get(person.index) - if claimed_person_unit_key is not None: - eligible_units.discard(claimed_person_unit_key) - - def _resolve_surviving_unit(unit_key: tuple) -> tuple: - seen: set[tuple] = set() - current_unit_key = unit_key - while current_unit_key not in seen: - seen.add(current_unit_key) - unit = base_units[current_unit_key] - if unit.spouse_index is not None: - return current_unit_key - next_unit_key = adult_claims.get(unit.head_index) - if next_unit_key is None: - return current_unit_key - current_unit_key = next_unit_key - return current_unit_key - - adult_claims = { - person_index: _resolve_surviving_unit(unit_key) - for person_index, unit_key in adult_claims.items() - } - - surviving_units = { - unit_key - for unit_key, unit in base_units.items() - if unit.spouse_index is not None or unit.head_index not in adult_claims - } - - child_claims: dict[int, tuple] = {} - child_candidates = [ - person - for person in people - if not person.starts_base_unit and person.index not in adult_claims - ] - for person in sorted(child_candidates, key=lambda item: (item.age, item.line_no)): - unit_key = _select_claimant_unit( - person=person, - year=year, - base_units=base_units, - base_unit_key=base_unit_by_person.get(person.index), - reference_unit_key=reference_unit_key, - eligible_units=surviving_units, - ) - if unit_key is not None: - child_claims[person.index] = unit_key - - final_unit_key_by_person: dict[int, tuple] = {} - roles_by_person: dict[int, str] = {} - for unit_key, unit in base_units.items(): - if unit.spouse_index is not None: - final_unit_key_by_person[unit.head_index] = unit_key - final_unit_key_by_person[unit.spouse_index] = unit_key - roles_by_person[unit.head_index] = HEAD - roles_by_person[unit.spouse_index] = SPOUSE - continue - if unit.head_index in adult_claims: - continue - final_unit_key_by_person[unit.head_index] = unit_key - roles_by_person[unit.head_index] = HEAD - - for person_index, unit_key in adult_claims.items(): - final_unit_key_by_person[person_index] = unit_key - roles_by_person[person_index] = DEPENDENT - - for person_index, unit_key in child_claims.items(): - final_unit_key_by_person[person_index] = unit_key - roles_by_person[person_index] = DEPENDENT - - for person in people: - if person.index in final_unit_key_by_person: - continue - unit_key = ("single", person.line_no) - final_unit_key_by_person[person.index] = unit_key - roles_by_person[person.index] = HEAD - - related_to_head_or_spouse: dict[int, bool] = {} - head_spouse_lines_by_unit: dict[tuple, set[int]] = {} - for person_index, unit_key in final_unit_key_by_person.items(): - role = roles_by_person[person_index] - if role in {HEAD, SPOUSE}: - head_spouse_lines_by_unit.setdefault(unit_key, set()).add( - person_by_index[person_index].line_no - ) - - filing_status_by_unit: dict[tuple, str] = {} - unit_members: dict[tuple, list[_HouseholdPerson]] = {} - for person_index, unit_key in final_unit_key_by_person.items(): - unit_members.setdefault(unit_key, []).append(person_by_index[person_index]) - - for unit_key, members in unit_members.items(): - roles = {person.index: roles_by_person[person.index] for person in members} - has_spouse = any(role == SPOUSE for role in roles.values()) - head = next(person for person in members if roles[person.index] == HEAD) - claimant_lines = head_spouse_lines_by_unit.get(unit_key, {head.line_no}) - - for person in members: - if roles[person.index] in {HEAD, SPOUSE}: - related_to_head_or_spouse[person.index] = True - continue - related_to_head_or_spouse[person.index] = any( - parent_line in claimant_lines for parent_line in person.parent_lines - ) or reference_related_to_head_or_spouse(person.relationship_code) - - if has_spouse: - filing_status_by_unit[unit_key] = "JOINT" - continue - - has_qualifying_child = any( - roles[person.index] == DEPENDENT - and ( - any( - parent_line in claimant_lines for parent_line in person.parent_lines - ) - or reference_relationship_allows_qualifying_child( - person.relationship_code - ) - ) - and qualifying_child_age_test( - age=person.age, - is_full_time_student=person.is_full_time_student, - is_permanently_disabled=person.is_permanently_disabled, - ) - for person in members - ) - has_qualifying_relative = any( - roles[person.index] == DEPENDENT - and related_to_head_or_spouse[person.index] - and person.gross_income < dependent_gross_income_limit(year) - for person in members - ) - has_head_of_household_person = has_qualifying_child or has_qualifying_relative - - if head.marital_status == 4 and has_qualifying_child: - filing_status_by_unit[unit_key] = "SURVIVING_SPOUSE" - elif has_head_of_household_person and head.marital_status != 6: - filing_status_by_unit[unit_key] = "HEAD_OF_HOUSEHOLD" - elif has_head_of_household_person and head.marital_status == 6: - filing_status_by_unit[unit_key] = "HEAD_OF_HOUSEHOLD" - elif head.marital_status == 6: - filing_status_by_unit[unit_key] = "SEPARATE" - else: - filing_status_by_unit[unit_key] = "SINGLE" - - return ( - final_unit_key_by_person, - roles_by_person, - filing_status_by_unit, - related_to_head_or_spouse, - ) - - -def _determine_final_assignments_for_household_census_documented( - people: list[_HouseholdPerson], - year: int, -) -> tuple[dict[int, tuple], dict[int, str], dict[tuple, str], dict[int, bool]]: - del year - # Follow the publicly documented Census tax-model flow: married + dependents - # + others, qualifying-child-only parent-pointer claims, and under-15 - # no-parent fallback to the household's main filing unit. - base_units, _, reference_unit_key = _build_base_tax_units(people) - person_by_index = {person.index: person for person in people} - main_unit_key = _choose_main_filing_unit(base_units, reference_unit_key) - - final_unit_key_by_person: dict[int, tuple] = {} - roles_by_person: dict[int, str] = {} - - for unit_key, unit in base_units.items(): - final_unit_key_by_person[unit.head_index] = unit_key - roles_by_person[unit.head_index] = HEAD - if unit.spouse_index is not None: - final_unit_key_by_person[unit.spouse_index] = unit_key - roles_by_person[unit.spouse_index] = SPOUSE - - dependent_claims: dict[int, tuple] = {} - for person in sorted(people, key=lambda item: (item.age, item.line_no)): - if person.index in final_unit_key_by_person or person.married_spouse_present: - continue - - age_eligible = qualifying_child_age_test( - age=person.age, - is_full_time_student=person.is_full_time_student, - is_permanently_disabled=person.is_permanently_disabled, - ) - if person.parent_lines and age_eligible: - parent_units = [ - unit_key - for unit_key, unit in base_units.items() - if any( - parent_line in unit.claimant_lines - for parent_line in person.parent_lines - ) - ] - unit_key = _choose_best_parent_unit_by_total_money_income( - parent_units, - base_units, - ) - if unit_key is not None: - dependent_claims[person.index] = unit_key - continue - - if not person.parent_lines and person.age < 15 and main_unit_key is not None: - dependent_claims[person.index] = main_unit_key - - for person_index, unit_key in dependent_claims.items(): - final_unit_key_by_person[person_index] = unit_key - roles_by_person[person_index] = DEPENDENT - - for person in people: - if person.index in final_unit_key_by_person: - continue - unit_key = ("single", person.line_no) - final_unit_key_by_person[person.index] = unit_key - roles_by_person[person.index] = HEAD - - related_to_head_or_spouse: dict[int, bool] = {} - unit_members: dict[tuple, list[_HouseholdPerson]] = {} - head_spouse_lines_by_unit: dict[tuple, set[int]] = {} - for person_index, unit_key in final_unit_key_by_person.items(): - unit_members.setdefault(unit_key, []).append(person_by_index[person_index]) - if roles_by_person[person_index] in {HEAD, SPOUSE}: - head_spouse_lines_by_unit.setdefault(unit_key, set()).add( - person_by_index[person_index].line_no - ) - - filing_status_by_unit: dict[tuple, str] = {} - for unit_key, members in unit_members.items(): - roles = {person.index: roles_by_person[person.index] for person in members} - has_spouse = any(role == SPOUSE for role in roles.values()) - has_dependents = any(role == DEPENDENT for role in roles.values()) - claimant_lines = head_spouse_lines_by_unit.get(unit_key, set()) - - for person in members: - if roles[person.index] in {HEAD, SPOUSE}: - related_to_head_or_spouse[person.index] = True - continue - related_to_head_or_spouse[person.index] = any( - parent_line in claimant_lines for parent_line in person.parent_lines - ) or reference_related_to_head_or_spouse(person.relationship_code) - - if has_spouse: - filing_status_by_unit[unit_key] = "JOINT" - elif has_dependents: - filing_status_by_unit[unit_key] = "HEAD_OF_HOUSEHOLD" - else: - filing_status_by_unit[unit_key] = "SINGLE" - - return ( - final_unit_key_by_person, - roles_by_person, - filing_status_by_unit, - related_to_head_or_spouse, - ) - - -def construct_tax_units( - person: pd.DataFrame, - year: int, - mode: str = POLICYENGINE_MODE, -) -> tuple[pd.DataFrame, pd.DataFrame]: - required_columns = { - "PH_SEQ", - "A_LINENO", - "A_AGE", - "A_MARITL", - "A_SPOUSE", - "PEPAR1", - "PEPAR2", - "A_EXPRRP", - } - missing = sorted( - column for column in required_columns if column not in person.columns - ) - if missing: - raise KeyError( - "Missing required CPS columns for tax-unit construction: " - + ", ".join(missing) - ) - if mode not in SUPPORTED_TAX_UNIT_CONSTRUCTION_MODES: - raise ValueError( - "Unsupported tax-unit construction mode " - f"{mode!r}. Expected one of: " - + ", ".join(sorted(SUPPORTED_TAX_UNIT_CONSTRUCTION_MODES)) - ) - - original_index = person.index - person = _precompute_tax_unit_inputs(person.reset_index(drop=True)) - person_assignments = pd.DataFrame(index=original_index) - unit_key_records: list[tuple] = [] - unit_filing_records: list[str] = [] - - household_unit_key_by_row: dict[Any, tuple] = {} - household_role_by_row: dict[Any, str] = {} - household_related_flag_by_row: dict[Any, bool] = {} - - assignment_fn = ( - _determine_final_assignments_for_household_policyengine - if mode == POLICYENGINE_MODE - else _determine_final_assignments_for_household_census_documented - ) - - for household_id, household in person.groupby("PH_SEQ", sort=False): - household_people = _prepare_household_people(household, int(household_id)) - ( - unit_key_by_person, - roles_by_person, - filing_status_by_unit, - related_to_head_or_spouse, - ) = assignment_fn(household_people, year) - - for row_index in household.index: - unit_key = (int(household_id),) + tuple(unit_key_by_person[row_index]) - household_unit_key_by_row[row_index] = unit_key - household_role_by_row[row_index] = roles_by_person[row_index] - household_related_flag_by_row[row_index] = related_to_head_or_spouse[ - row_index - ] - - for unit_key, filing_status in filing_status_by_unit.items(): - unit_key_records.append((int(household_id),) + tuple(unit_key)) - unit_filing_records.append(filing_status) - - ordered_household_unit_keys = [ - household_unit_key_by_row[row_index] for row_index in person.index - ] - dense_unit_ids = { - unit_key: unit_id - for unit_id, unit_key in enumerate( - dict.fromkeys(ordered_household_unit_keys), - start=1, - ) - } - person_assignments["TAX_ID"] = np.array( - [dense_unit_ids[unit_key] for unit_key in ordered_household_unit_keys], - dtype=np.int64, - ) - person_assignments["tax_unit_role_input"] = np.array( - [household_role_by_row[row_index] for row_index in person.index] - ).astype("S") - person_assignments["is_related_to_head_or_spouse"] = np.array( - [household_related_flag_by_row[row_index] for row_index in person.index], - dtype=bool, - ) - - tax_unit = pd.DataFrame( - { - "TAX_ID": np.array( - [dense_unit_ids[unit_key] for unit_key in unit_key_records], - dtype=np.int64, - ), - "filing_status_input": np.array(unit_filing_records).astype("S"), - } - ).drop_duplicates("TAX_ID") - tax_unit = tax_unit.sort_values("TAX_ID").reset_index(drop=True) - - return person_assignments, tax_unit diff --git a/policyengine_us_data/datasets/cps/tax_unit_rule_helpers.py b/policyengine_us_data/datasets/cps/tax_unit_rule_helpers.py deleted file mode 100644 index a2ce2fcf0..000000000 --- a/policyengine_us_data/datasets/cps/tax_unit_rule_helpers.py +++ /dev/null @@ -1,143 +0,0 @@ -from __future__ import annotations - -from enum import IntEnum -from functools import lru_cache -from importlib import resources - -import yaml - -try: - from policyengine_us.tools.tax_unit_construction import ( - CPSRelationshipCode, - REFERENCE_PERSON_CODES, - REFERENCE_SPOUSE_CODES, - dependent_gross_income_limit, - qualifying_child_age_test, - reference_relationship_allows_qualifying_child, - reference_relationship_allows_qualifying_relative, - related_to_head_or_spouse, - ) -except ImportError: - # Temporary compatibility shim while policyengine-us-data can still run - # against released policyengine-us versions that do not yet expose the - # shared tax-unit helper module. Remove once the minimum dependency includes - # policyengine_us.tools.tax_unit_construction. - class CPSRelationshipCode(IntEnum): - REFERENCE_PERSON_WITH_RELATIVES = 1 - REFERENCE_PERSON_WITHOUT_RELATIVES = 2 - HUSBAND = 3 - WIFE = 4 - OWN_CHILD = 5 - GRANDCHILD = 7 - PARENT = 8 - SIBLING = 9 - OTHER_RELATIVE = 10 - FOSTER_CHILD = 11 - NONRELATIVE_WITH_RELATIVES = 12 - PARTNER_OR_ROOMMATE = 13 - NONRELATIVE_WITHOUT_RELATIVES = 14 - - REFERENCE_PERSON_CODES = frozenset( - { - CPSRelationshipCode.REFERENCE_PERSON_WITH_RELATIVES, - CPSRelationshipCode.REFERENCE_PERSON_WITHOUT_RELATIVES, - } - ) - - REFERENCE_SPOUSE_CODES = frozenset( - { - CPSRelationshipCode.HUSBAND, - CPSRelationshipCode.WIFE, - } - ) - - REFERENCE_QUALIFYING_CHILD_CODES = frozenset( - { - CPSRelationshipCode.OWN_CHILD, - CPSRelationshipCode.GRANDCHILD, - CPSRelationshipCode.SIBLING, - CPSRelationshipCode.FOSTER_CHILD, - } - ) - - REFERENCE_QUALIFYING_RELATIVE_CODES = frozenset( - { - CPSRelationshipCode.OWN_CHILD, - CPSRelationshipCode.GRANDCHILD, - CPSRelationshipCode.PARENT, - CPSRelationshipCode.SIBLING, - CPSRelationshipCode.OTHER_RELATIVE, - CPSRelationshipCode.FOSTER_CHILD, - } - ) - - def qualifying_child_age_test( - age: int | float, - is_full_time_student: bool = False, - is_permanently_disabled: bool = False, - non_student_age_limit: int = 19, - student_age_limit: int = 24, - ) -> bool: - if is_permanently_disabled: - return True - age_limit = student_age_limit if is_full_time_student else non_student_age_limit - return float(age) < age_limit - - def _relationship_from_code(relationship_code: int | None): - if relationship_code is None: - return None - try: - return CPSRelationshipCode(int(relationship_code)) - except ValueError: - return None - - def reference_relationship_allows_qualifying_child( - relationship_code: int | None, - ) -> bool: - relationship = _relationship_from_code(relationship_code) - return relationship in REFERENCE_QUALIFYING_CHILD_CODES - - def reference_relationship_allows_qualifying_relative( - relationship_code: int | None, - ) -> bool: - relationship = _relationship_from_code(relationship_code) - return relationship in REFERENCE_QUALIFYING_RELATIVE_CODES - - def related_to_head_or_spouse(relationship_code: int | None) -> bool: - relationship = _relationship_from_code(relationship_code) - return relationship in ( - REFERENCE_PERSON_CODES - | REFERENCE_SPOUSE_CODES - | REFERENCE_QUALIFYING_RELATIVE_CODES - ) - - @lru_cache(maxsize=None) - def dependent_gross_income_limit(year: int) -> float: - parameter_path = ( - resources.files("policyengine_us") - / "parameters" - / "gov" - / "irs" - / "income" - / "exemption" - / "amount.yaml" - ) - with parameter_path.open("r", encoding="utf-8") as f: - values = yaml.safe_load(f)["values"] - - def _period_year(period) -> int: - if hasattr(period, "year"): - return int(period.year) - return int(str(period)[:4]) - - applicable_years = sorted( - _period_year(period) for period in values if _period_year(period) <= year - ) - if not applicable_years: - raise ValueError(f"No dependent gross income limit configured for {year}.") - - selected_year = applicable_years[-1] - for period, entry in values.items(): - if _period_year(period) == selected_year: - return float(entry["value"]) - raise ValueError(f"No dependent gross income limit configured for {year}.") diff --git a/pyproject.toml b/pyproject.toml index f9dff0c7b..5d2557366 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,9 @@ classifiers = [ ] dependencies = [ "policyengine-us==1.715.3", + # Tax-unit construction engine, extracted verbatim from this repository into + # the standalone microunit package. See PolicyEngine/microunit. + "microunit>=0.1.0", # policyengine-core 3.26.1 is the current 3.26.x runtime and includes the fix for # PolicyEngine/policyengine-core#482 (user-set ETERNITY inputs lost # after _invalidate_all_caches) and is required by policyengine-us 1.682.1+. diff --git a/tests/unit/datasets/test_cps_tax_unit_construction.py b/tests/unit/datasets/test_cps_tax_unit_construction.py index efbe5ffd9..5cc82f74e 100644 --- a/tests/unit/datasets/test_cps_tax_unit_construction.py +++ b/tests/unit/datasets/test_cps_tax_unit_construction.py @@ -1,7 +1,18 @@ +"""Integration coverage for the CPS dataset's use of the microunit engine. + +The tax-unit construction *engine* itself is tested in the microunit package +(PolicyEngine/microunit), which is the canonical home for those rules. These +tests exercise this repository's own wiring into that engine: that +``CensusCPS._create_tax_unit_table`` calls ``microunit.construct_tax_units`` +with the dataset's time period and construction mode, writes the constructed +``TAX_ID`` back onto the person table, preserves the original Census identifiers +under ``CENSUS_TAX_ID``, and returns the per-unit table. +""" + import numpy as np import pandas as pd -from policyengine_us_data.datasets.cps.tax_unit_construction import construct_tax_units +from policyengine_us_data.datasets.cps.census_cps import CensusCPS_2024 def _person_fixture(**overrides): @@ -9,49 +20,20 @@ def _person_fixture(**overrides): defaults = { "PH_SEQ": np.ones(n, dtype=int), "A_LINENO": np.arange(1, n + 1, dtype=int), + "TAX_ID": np.arange(1, n + 1, dtype=int), "A_AGE": np.zeros(n, dtype=int), "A_MARITL": np.full(n, 7, dtype=int), "A_SPOUSE": np.zeros(n, dtype=int), - "PECOHAB": np.full(n, -1, dtype=int), "PEPAR1": np.full(n, -1, dtype=int), "PEPAR2": np.full(n, -1, dtype=int), "A_EXPRRP": np.full(n, 14, dtype=int), - "A_ENRLW": np.zeros(n, dtype=int), - "A_FTPT": np.zeros(n, dtype=int), - "A_HSCOL": np.zeros(n, dtype=int), "WSAL_VAL": np.zeros(n, dtype=float), - "SEMP_VAL": np.zeros(n, dtype=float), - "FRSE_VAL": np.zeros(n, dtype=float), - "INT_VAL": np.zeros(n, dtype=float), - "DIV_VAL": np.zeros(n, dtype=float), - "RNT_VAL": np.zeros(n, dtype=float), - "CAP_VAL": np.zeros(n, dtype=float), - "UC_VAL": np.zeros(n, dtype=float), - "OI_VAL": np.zeros(n, dtype=float), - "ANN_VAL": np.zeros(n, dtype=float), - "PNSN_VAL": np.zeros(n, dtype=float), - "PTOTVAL": np.zeros(n, dtype=float), - "SS_VAL": np.zeros(n, dtype=float), - "PEDISDRS": np.zeros(n, dtype=int), - "PEDISEAR": np.zeros(n, dtype=int), - "PEDISEYE": np.zeros(n, dtype=int), - "PEDISOUT": np.zeros(n, dtype=int), - "PEDISPHY": np.zeros(n, dtype=int), - "PEDISREM": np.zeros(n, dtype=int), } defaults.update(overrides) return pd.DataFrame(defaults) -def _decoded_roles(assignments: pd.DataFrame) -> list[str]: - return [value.decode() for value in assignments["tax_unit_role_input"].tolist()] - - -def _decoded_statuses(tax_unit: pd.DataFrame) -> list[str]: - return [value.decode() for value in tax_unit["filing_status_input"].tolist()] - - -def test_construct_tax_units_keeps_married_couple_and_child_together(): +def test_create_tax_unit_table_wires_microunit_and_writes_back_tax_id(): person = _person_fixture( A_AGE=[40, 38, 8], A_MARITL=[1, 1, 7], @@ -59,383 +41,48 @@ def test_construct_tax_units_keeps_married_couple_and_child_together(): A_EXPRRP=[1, 4, 5], PEPAR1=[-1, -1, 1], PEPAR2=[-1, -1, 2], + TAX_ID=[10, 10, 10], WSAL_VAL=[60_000, 20_000, 0], ) - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].nunique() == 1 - assert _decoded_roles(assignments) == ["HEAD", "SPOUSE", "DEPENDENT"] - assert _decoded_statuses(tax_unit) == ["JOINT"] - - -def test_construct_tax_units_claims_low_income_full_time_student(): - person = _person_fixture( - A_AGE=[45, 20], - A_EXPRRP=[1, 5], - PEPAR1=[-1, 1], - A_ENRLW=[0, 1], - A_FTPT=[0, 1], - WSAL_VAL=[70_000, 3_000], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].nunique() == 1 - assert _decoded_roles(assignments) == ["HEAD", "DEPENDENT"] - assert _decoded_statuses(tax_unit) == ["HEAD_OF_HOUSEHOLD"] - - -def test_construct_tax_units_claims_enrolled_young_adult_student(): - person = _person_fixture( - A_AGE=[45, 21], - A_EXPRRP=[1, 5], - PEPAR1=[-1, 1], - A_ENRLW=[0, 1], - A_FTPT=[0, 2], - A_HSCOL=[0, 2], - WSAL_VAL=[70_000, 12_000], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].nunique() == 1 - assert _decoded_roles(assignments) == ["HEAD", "DEPENDENT"] - assert _decoded_statuses(tax_unit) == ["HEAD_OF_HOUSEHOLD"] - - -def test_construct_tax_units_leaves_low_income_nonstudent_adult_child_independent(): - person = _person_fixture( - A_AGE=[45, 22], - A_EXPRRP=[1, 5], - PEPAR1=[-1, 1], - A_ENRLW=[0, 0], - A_FTPT=[0, 0], - WSAL_VAL=[70_000, 2_000], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].nunique() == 2 - assert _decoded_roles(assignments) == ["HEAD", "HEAD"] - assert sorted(_decoded_statuses(tax_unit)) == ["SINGLE", "SINGLE"] - - -def test_construct_tax_units_leaves_zero_income_nonstudent_young_adult_child_independent(): - person = _person_fixture( - A_AGE=[45, 22], - A_EXPRRP=[1, 5], - PEPAR1=[-1, 1], - A_ENRLW=[0, 0], - A_FTPT=[0, 0], - WSAL_VAL=[70_000, 0], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) + tax_unit_df = CensusCPS_2024()._create_tax_unit_table(person) - assert assignments["TAX_ID"].nunique() == 2 - assert _decoded_roles(assignments) == ["HEAD", "HEAD"] - assert sorted(_decoded_statuses(tax_unit)) == ["SINGLE", "SINGLE"] + # The married couple plus their child collapse into a single constructed unit. + assert person["TAX_ID"].nunique() == 1 + assert tax_unit_df.columns.tolist() == ["TAX_ID"] + assert tax_unit_df["TAX_ID"].tolist() == [1] + # The original Census identifier is preserved for downstream comparison. + assert person["CENSUS_TAX_ID"].tolist() == [10, 10, 10] -def test_construct_tax_units_leaves_high_income_adult_child_independent(): +def test_create_tax_unit_table_splits_unrelated_adults(): person = _person_fixture( A_AGE=[45, 22], A_EXPRRP=[1, 5], PEPAR1=[-1, 1], + TAX_ID=[7, 7], WSAL_VAL=[70_000, 10_000], ) - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].nunique() == 2 - assert _decoded_roles(assignments) == ["HEAD", "HEAD"] - assert sorted(_decoded_statuses(tax_unit)) == ["SINGLE", "SINGLE"] - - -def test_construct_tax_units_assigns_child_to_higher_income_separated_parent(): - person = _person_fixture( - A_AGE=[40, 38, 10], - A_MARITL=[6, 6, 7], - A_EXPRRP=[1, 13, 5], - PEPAR1=[-1, -1, 1], - PEPAR2=[-1, -1, 2], - WSAL_VAL=[50_000, 20_000, 0], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].nunique() == 2 - assert _decoded_roles(assignments) == ["HEAD", "HEAD", "DEPENDENT"] - child_unit = assignments.loc[2, "TAX_ID"] - assert child_unit == assignments.loc[0, "TAX_ID"] - assert sorted(_decoded_statuses(tax_unit)) == ["HEAD_OF_HOUSEHOLD", "SEPARATE"] - - -def test_construct_tax_units_can_roll_child_of_claimed_adult_up_to_grandparent(): - person = _person_fixture( - A_AGE=[70, 22, 4], - A_EXPRRP=[1, 5, 7], - PEPAR1=[-1, 1, 2], - A_ENRLW=[0, 1, 0], - A_FTPT=[0, 1, 0], - WSAL_VAL=[40_000, 2_000, 0], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].nunique() == 1 - assert _decoded_roles(assignments) == ["HEAD", "DEPENDENT", "DEPENDENT"] - assert _decoded_statuses(tax_unit) == ["HEAD_OF_HOUSEHOLD"] - - -def test_construct_tax_units_handles_nonconsecutive_person_index(): - person = _person_fixture( - A_AGE=[40, 10], - A_EXPRRP=[1, 5], - PEPAR1=[-1, 1], - WSAL_VAL=[50_000, 0], - ) - person.index = [10, 20] - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments.index.tolist() == [10, 20] - assert assignments["TAX_ID"].tolist() == [1, 1] - assert _decoded_roles(assignments) == ["HEAD", "DEPENDENT"] - assert _decoded_statuses(tax_unit) == ["HEAD_OF_HOUSEHOLD"] - - -def test_construct_tax_units_handles_duplicate_person_index_labels(): - person = _person_fixture( - PH_SEQ=[1, 2], - A_LINENO=[1, 1], - A_AGE=[40, 30], - A_EXPRRP=[1, 1], - WSAL_VAL=[50_000, 45_000], - ) - person.index = [0, 0] - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments.index.tolist() == [0, 0] - assert assignments["TAX_ID"].tolist() == [1, 2] - assert _decoded_roles(assignments) == ["HEAD", "HEAD"] - assert sorted(_decoded_statuses(tax_unit)) == ["SINGLE", "SINGLE"] - - -def test_construct_tax_units_preserves_original_order_for_interleaved_households(): - person = _person_fixture( - PH_SEQ=[1, 2, 1, 2], - A_LINENO=[1, 1, 2, 2], - A_AGE=[40, 32, 8, 29], - A_EXPRRP=[1, 1, 5, 13], - PEPAR1=[-1, -1, 1, -1], - WSAL_VAL=[50_000, 45_000, 0, 35_000], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) + tax_unit_df = CensusCPS_2024()._create_tax_unit_table(person) - assert assignments["TAX_ID"].tolist() == [1, 2, 1, 3] - assert _decoded_roles(assignments) == ["HEAD", "HEAD", "DEPENDENT", "HEAD"] - assert sorted(_decoded_statuses(tax_unit)) == [ - "HEAD_OF_HOUSEHOLD", - "SINGLE", - "SINGLE", - ] + # A high-income adult child cannot be claimed and forms an independent unit. + assert person["TAX_ID"].tolist() == [1, 2] + assert sorted(tax_unit_df["TAX_ID"].tolist()) == [1, 2] -def test_construct_tax_units_allows_missing_optional_evidence_columns(): +def test_create_tax_unit_table_respects_dataset_year(): + # 2024 dependent gross income limit is $5,050: $5,000 of income keeps the + # under-19 child claimable, exercising the year passed through to microunit. person = _person_fixture( - A_AGE=[40, 10], + A_AGE=[45, 17], A_EXPRRP=[1, 5], PEPAR1=[-1, 1], - ).drop( - columns=[ - "A_ENRLW", - "A_FTPT", - "A_HSCOL", - "PTOTVAL", - "PEDISDRS", - "PEDISEAR", - "PEDISEYE", - "PEDISOUT", - "PEDISPHY", - "PEDISREM", - ] - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].tolist() == [1, 1] - assert _decoded_roles(assignments) == ["HEAD", "DEPENDENT"] - assert _decoded_statuses(tax_unit) == ["HEAD_OF_HOUSEHOLD"] - - -def test_construct_tax_units_collapses_transitive_adult_claim_chains(): - person = _person_fixture( - A_AGE=[46, 69, 43], - A_MARITL=[5, 5, 7], - A_EXPRRP=[1, 10, 12], - PEPAR1=[-1, -1, 2], - WSAL_VAL=[0, 0, 0], - SEMP_VAL=[120_000, 0, 0], - A_ENRLW=[0, 0, 0], - A_FTPT=[0, 0, 0], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].tolist() == [1, 2, 3] - assert _decoded_roles(assignments) == ["HEAD", "HEAD", "HEAD"] - assert sorted(_decoded_statuses(tax_unit)) == ["SINGLE", "SINGLE", "SINGLE"] - - -def test_construct_tax_units_prevents_mutual_adult_claim_cycles(): - person = _person_fixture( - A_AGE=[39, 75, 42], - A_MARITL=[7, 5, 7], - A_EXPRRP=[1, 8, 13], - PEPAR1=[2, -1, -1], - PECOHAB=[3, -1, 1], - WSAL_VAL=[0, 0, 40_000], - INT_VAL=[13, 3, 3], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].tolist() == [1, 2, 3] - assert _decoded_roles(assignments) == ["HEAD", "HEAD", "HEAD"] - assert sorted(_decoded_statuses(tax_unit)) == ["SINGLE", "SINGLE", "SINGLE"] - - -def test_construct_tax_units_does_not_claim_adult_child_with_children(): - person = _person_fixture( - A_AGE=[70, 42, 11], - A_EXPRRP=[1, 5, 7], - PEPAR1=[-1, 1, 2], - WSAL_VAL=[23_000, 0, 0], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].tolist() == [1, 2, 2] - assert _decoded_roles(assignments) == ["HEAD", "HEAD", "DEPENDENT"] - assert sorted(_decoded_statuses(tax_unit)) == ["HEAD_OF_HOUSEHOLD", "SINGLE"] - - -def test_construct_tax_units_keeps_older_grandchild_without_parent_pointer_separate(): - person = _person_fixture( - A_AGE=[64, 58, 16], - A_MARITL=[1, 1, 7], - A_SPOUSE=[2, 1, 0], - A_EXPRRP=[1, 4, 7], - WSAL_VAL=[0, 9_000, 0], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].tolist() == [1, 1, 2] - assert _decoded_roles(assignments) == ["HEAD", "SPOUSE", "HEAD"] - assert sorted(_decoded_statuses(tax_unit)) == ["JOINT", "SINGLE"] - - -def test_construct_tax_units_claims_younger_grandchild_without_parent_pointer(): - person = _person_fixture( - A_AGE=[64, 58, 12], - A_MARITL=[1, 1, 7], - A_SPOUSE=[2, 1, 0], - A_EXPRRP=[1, 4, 7], - WSAL_VAL=[0, 9_000, 0], - ) - - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].tolist() == [1, 1, 1] - assert _decoded_roles(assignments) == ["HEAD", "SPOUSE", "DEPENDENT"] - assert _decoded_statuses(tax_unit) == ["JOINT"] - - -def test_construct_tax_units_claims_under15_nonrelative_without_parent_pointer(): - person = _person_fixture( - A_AGE=[40, 12], - A_EXPRRP=[1, 14], - WSAL_VAL=[50_000, 0], + TAX_ID=[3, 3], + WSAL_VAL=[70_000, 5_000], ) - assignments, tax_unit = construct_tax_units(person, year=2024) - - assert assignments["TAX_ID"].tolist() == [1, 1] - assert _decoded_roles(assignments) == ["HEAD", "DEPENDENT"] - assert _decoded_statuses(tax_unit) == ["SINGLE"] - - -def test_census_documented_claims_under15_without_parent_pointer_to_main_unit(): - person = _person_fixture( - A_AGE=[40, 12], - A_EXPRRP=[1, 14], - WSAL_VAL=[50_000, 0], - PTOTVAL=[50_000, 0], - ) - - assignments, tax_unit = construct_tax_units( - person, - year=2024, - mode="census_documented", - ) - - assert assignments["TAX_ID"].tolist() == [1, 1] - assert _decoded_roles(assignments) == ["HEAD", "DEPENDENT"] - assert _decoded_statuses(tax_unit) == ["HEAD_OF_HOUSEHOLD"] - - -def test_census_documented_leaves_age15_without_parent_pointer_independent(): - person = _person_fixture( - A_AGE=[40, 15], - A_EXPRRP=[1, 14], - WSAL_VAL=[50_000, 0], - PTOTVAL=[50_000, 0], - ) - - assignments, tax_unit = construct_tax_units( - person, - year=2024, - mode="census_documented", - ) - - assert assignments["TAX_ID"].tolist() == [1, 2] - assert _decoded_roles(assignments) == ["HEAD", "HEAD"] - assert sorted(_decoded_statuses(tax_unit)) == ["SINGLE", "SINGLE"] - - -def test_census_documented_uses_total_money_income_for_split_parents(): - person = _person_fixture( - A_AGE=[40, 38, 10], - A_MARITL=[7, 7, 7], - A_EXPRRP=[1, 13, 5], - PEPAR1=[-1, -1, 1], - PEPAR2=[-1, -1, 2], - WSAL_VAL=[0, 50_000, 0], - PTOTVAL=[30_000, 20_000, 0], - ) - - assignments, tax_unit = construct_tax_units( - person, - year=2024, - mode="census_documented", - ) - - assert assignments["TAX_ID"].tolist() == [1, 2, 1] - assert _decoded_roles(assignments) == ["HEAD", "HEAD", "DEPENDENT"] - assert sorted(_decoded_statuses(tax_unit)) == ["HEAD_OF_HOUSEHOLD", "SINGLE"] - - -def test_construct_tax_units_rejects_unknown_mode(): - person = _person_fixture(A_AGE=[40], A_EXPRRP=[1]) + tax_unit_df = CensusCPS_2024()._create_tax_unit_table(person) - try: - construct_tax_units(person, year=2024, mode="unknown") - except ValueError as error: - assert "Unsupported tax-unit construction mode" in str(error) - else: - raise AssertionError("Expected construct_tax_units to reject unknown modes") + assert person["TAX_ID"].tolist() == [1, 1] + assert tax_unit_df["TAX_ID"].tolist() == [1] diff --git a/uv.lock b/uv.lock index 66f1ed9d7..e52e758e9 100644 --- a/uv.lock +++ b/uv.lock @@ -1431,6 +1431,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/da/ed/274ec5f92ce49d367c09e0038cbb6adbe3c81941bf06b162cf9a2d8bdebd/microimpute-2.1.0-py3-none-any.whl", hash = "sha256:04463740c2091bbbe7552b9bd87bc3dd472902a3798b96b6c9924b8f9870c4dd", size = 127301, upload-time = "2026-05-21T17:53:53.212Z" }, ] +[[package]] +name = "microunit" +version = "0.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, + { name = "pandas" }, + { name = "pyyaml" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/58/c1/6a8a1a1f7e90e41295e813808f170c71f0d20d36c6203722fd682d0a3387/microunit-0.1.0.tar.gz", hash = "sha256:a1e90f525e0a1a3921a3ed62ce291620bd45242f829cbd7892253dfff307eeb3", size = 21638, upload-time = "2026-05-30T18:51:35.59Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f1/cf/a38de31d10b1029923daa7f9271a78c965deb94519627f6dd4d9c3fbf359/microunit-0.1.0-py3-none-any.whl", hash = "sha256:1652fd43b57fb6fc803089d0da0fc4d28948d9e7d5e742e3327afd376e0a3060", size = 23581, upload-time = "2026-05-30T18:51:34.376Z" }, +] + [[package]] name = "mistune" version = "3.2.0" @@ -2188,6 +2202,7 @@ dependencies = [ { name = "google-cloud-storage" }, { name = "microdf-python" }, { name = "microimpute" }, + { name = "microunit" }, { name = "openpyxl" }, { name = "pandas" }, { name = "pip-system-certs" }, @@ -2242,6 +2257,7 @@ requires-dist = [ { name = "l0-python", marker = "extra == 'l0'" }, { name = "microdf-python", specifier = ">=1.2.1" }, { name = "microimpute", specifier = ">=2.1.0" }, + { name = "microunit", specifier = ">=0.1.0" }, { name = "openpyxl", specifier = ">=3.1.5" }, { name = "pandas", specifier = ">=2.3.1" }, { name = "pip-system-certs", specifier = ">=3.0" }, diff --git a/validation/cps_tax_unit_outcome_validation.py b/validation/cps_tax_unit_outcome_validation.py index 5feec24c3..100ccd165 100644 --- a/validation/cps_tax_unit_outcome_validation.py +++ b/validation/cps_tax_unit_outcome_validation.py @@ -19,7 +19,7 @@ import pandas as pd from policyengine_core.data import Dataset -from policyengine_us_data.datasets.cps.tax_unit_construction import construct_tax_units +from microunit import construct_tax_units from policyengine_us_data.utils.soi import ( compare_soi_replication_to_soi, get_soi, diff --git a/validation/cps_tax_unit_validation.py b/validation/cps_tax_unit_validation.py index e4d7c0c9b..10e26fc0e 100644 --- a/validation/cps_tax_unit_validation.py +++ b/validation/cps_tax_unit_validation.py @@ -14,13 +14,11 @@ import pandas as pd -from policyengine_us_data.datasets.cps.tax_unit_construction import ( +from microunit import ( POLICYENGINE_MODE, SUPPORTED_TAX_UNIT_CONSTRUCTION_MODES, - construct_tax_units, -) -from policyengine_us_data.datasets.cps.tax_unit_rule_helpers import ( CPSRelationshipCode, + construct_tax_units, qualifying_child_age_test, )