From e0aa12c941a2bfe332eab1b0a16ed1d5d721fe7d Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Fri, 27 Mar 2026 09:58:07 -0400 Subject: [PATCH 1/9] Donor-impute CPS clone features --- .../impute-cps-clone-features.changed.md | 1 + .../datasets/cps/extended_cps.py | 285 +++++++++++++++++- .../tests/test_extended_cps.py | 168 +++++++++++ 3 files changed, 439 insertions(+), 15 deletions(-) create mode 100644 changelog.d/impute-cps-clone-features.changed.md diff --git a/changelog.d/impute-cps-clone-features.changed.md b/changelog.d/impute-cps-clone-features.changed.md new file mode 100644 index 000000000..981900a05 --- /dev/null +++ b/changelog.d/impute-cps-clone-features.changed.md @@ -0,0 +1 @@ +Donor-impute race, Hispanic status, sex, and occupation-based CPS features onto the PUF clone half of the extended CPS so subgroup and overtime-eligibility inputs better align with PUF-imputed incomes. diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index ca5c38afd..0bdfca947 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -26,15 +26,78 @@ logger = logging.getLogger(__name__) - def _supports_structural_mortgage_inputs() -> bool: return has_policyengine_us_variables(*STRUCTURAL_MORTGAGE_VARIABLES) +# CPS-only categorical features to donor-impute onto the PUF clone half. +# These drive subgroup analysis and occupation-based logic, so naive donor +# duplication dilutes the relationship between the clone's PUF-imputed +# income and its CPS-side demographic/occupation labels. +CPS_CLONE_FEATURE_VARIABLES = [ + "is_male", + "cps_race", + "is_hispanic", + "detailed_occupation_recode", +] + +# Predictors used to rematch CPS features onto the PUF clone half. +# These are all available on the CPS half and on the doubled extended CPS. +CPS_CLONE_FEATURE_PREDICTORS = [ + "age", + "state_fips", + "tax_unit_is_joint", + "tax_unit_count_dependents", + "is_tax_unit_head", + "is_tax_unit_spouse", + "is_tax_unit_dependent", + "employment_income", + "self_employment_income", + "social_security", +] + +_OVERTIME_OCCUPATION_CODES = { + "has_never_worked": 53, + "is_military": 52, + "is_computer_scientist": 8, + "is_farmer_fisher": 41, +} +_EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES = np.array( + [ + 1, + 2, + 3, + 5, + 7, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 18, + 19, + 20, + 21, + 22, + 24, + 25, + 27, + 28, + 29, + 30, + 32, + 33, + 34, + ], + dtype=np.int16, +) # CPS-only variables that should be QRF-imputed for the PUF clone half -# instead of naively duplicated from the CPS donor. These are -# income-correlated variables that exist only in the CPS; demographics, -# IDs, weights, and random seeds are fine to duplicate. +# instead of naively duplicated from the CPS donor. Most demographics, +# IDs, weights, and random seeds are fine to duplicate; the categorical +# clone features above are rematched separately. CPS_ONLY_IMPUTED_VARIABLES = [ # Retirement distributions "taxable_401k_distributions", @@ -117,6 +180,186 @@ def _supports_structural_mortgage_inputs() -> bool: ] +def _clone_half_person_values(data: dict, variable: str, time_period: int): + """Return clone-half values for ``variable`` mapped to person rows.""" + if variable not in data: + return None + + values = data[variable][time_period] + n_persons = len(data["person_id"][time_period]) + n_persons_half = n_persons // 2 + if len(values) == n_persons: + return np.asarray(values[n_persons_half:]) + + entity_mappings = [ + ("household_id", "person_household_id"), + ("tax_unit_id", "person_tax_unit_id"), + ("spm_unit_id", "person_spm_unit_id"), + ("family_id", "person_family_id"), + ] + for entity_id_var, person_entity_id_var in entity_mappings: + if entity_id_var not in data or person_entity_id_var not in data: + continue + entity_ids = data[entity_id_var][time_period] + if len(values) != len(entity_ids): + continue + entity_half = len(entity_ids) // 2 + clone_entity_ids = entity_ids[entity_half:] + clone_person_entity_ids = data[person_entity_id_var][time_period][n_persons_half:] + value_map = dict(zip(clone_entity_ids, values[entity_half:])) + return np.array([value_map[idx] for idx in clone_person_entity_ids]) + + return None + + +def _build_clone_test_frame( + cps_sim, + data: dict, + time_period: int, + predictors: list[str], +) -> pd.DataFrame: + """Build clone-half predictor data with available doubled-dataset overrides.""" + X_test = cps_sim.calculate_dataframe(predictors).copy() + for predictor in predictors: + clone_values = _clone_half_person_values(data, predictor, time_period) + if clone_values is not None and len(clone_values) == len(X_test): + X_test[predictor] = clone_values + return X_test[predictors] + + +def _prepare_knn_matrix( + df: pd.DataFrame, + reference: pd.DataFrame | None = None, +) -> np.ndarray: + """Normalise mixed-scale donor-matching predictors for kNN.""" + X = df.astype(float).copy() + for income_var in CPS_STAGE2_INCOME_PREDICTORS: + if income_var in X: + X[income_var] = np.arcsinh(X[income_var]) + + ref = X if reference is None else reference.astype(float).copy() + for income_var in CPS_STAGE2_INCOME_PREDICTORS: + if income_var in ref: + ref[income_var] = np.arcsinh(ref[income_var]) + + means = ref.mean() + stds = ref.std(ddof=0).replace(0, 1) + normalised = (X - means) / stds + return np.nan_to_num(normalised.to_numpy(dtype=np.float32), nan=0.0) + + +def _derive_overtime_occupation_inputs( + occupation_codes: np.ndarray, +) -> pd.DataFrame: + """Derive occupation-based overtime-exemption inputs from POCCU2.""" + occupation_codes = np.rint(occupation_codes).astype(np.int16, copy=False) + derived = { + name: occupation_codes == code + for name, code in _OVERTIME_OCCUPATION_CODES.items() + } + derived["is_executive_administrative_professional"] = np.isin( + occupation_codes, + _EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES, + ) + return pd.DataFrame(derived) + + +def _impute_clone_cps_features( + data: dict, + time_period: int, + dataset_path: str, +) -> pd.DataFrame: + """Rematch CPS demographic/occupation features for the clone half.""" + from policyengine_us import Microsimulation + from sklearn.neighbors import NearestNeighbors + + cps_sim = Microsimulation(dataset=dataset_path) + X_train = cps_sim.calculate_dataframe( + CPS_CLONE_FEATURE_PREDICTORS + CPS_CLONE_FEATURE_VARIABLES + ) + available_outputs = [ + variable for variable in CPS_CLONE_FEATURE_VARIABLES if variable in X_train.columns + ] + if not available_outputs: + n_half = len(data["person_id"][time_period]) // 2 + return pd.DataFrame(index=np.arange(n_half)) + + X_test = _build_clone_test_frame( + cps_sim, + data, + time_period, + CPS_CLONE_FEATURE_PREDICTORS, + ) + del cps_sim + + train_roles = ( + X_train[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]] + .round() + .astype(int) + .apply(tuple, axis=1) + ) + test_roles = ( + X_test[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]] + .round() + .astype(int) + .apply(tuple, axis=1) + ) + + predictions = pd.DataFrame(index=X_test.index, columns=available_outputs) + for role in test_roles.unique(): + test_mask = test_roles == role + train_mask = train_roles == role + if not train_mask.any(): + train_mask = pd.Series(True, index=X_train.index) + + train_predictors = X_train.loc[train_mask, CPS_CLONE_FEATURE_PREDICTORS] + test_predictors = X_test.loc[test_mask, CPS_CLONE_FEATURE_PREDICTORS] + train_matrix = _prepare_knn_matrix(train_predictors) + test_matrix = _prepare_knn_matrix(test_predictors, reference=train_predictors) + + matcher = NearestNeighbors(n_neighbors=1) + matcher.fit(train_matrix) + donor_indices = matcher.kneighbors( + test_matrix, + return_distance=False, + ).ravel() + donor_outputs = ( + X_train.loc[train_mask, available_outputs] + .iloc[donor_indices] + .reset_index(drop=True) + ) + predictions.loc[test_mask, available_outputs] = donor_outputs.to_numpy() + + if "detailed_occupation_recode" in predictions: + occupation_codes = predictions["detailed_occupation_recode"].astype(float).to_numpy() + for column, values in _derive_overtime_occupation_inputs(occupation_codes).items(): + predictions[column] = values + + return predictions + + +def _splice_clone_feature_predictions( + data: dict, + predictions: pd.DataFrame, + time_period: int, +) -> dict: + """Replace clone-half person-level feature variables with donor matches.""" + n_half = len(data["person_id"][time_period]) // 2 + for variable in predictions.columns: + if variable not in data: + continue + values = data[variable][time_period] + new_values = np.array(values, copy=True) + pred_values = predictions[variable].to_numpy() + if np.issubdtype(new_values.dtype, np.bool_): + pred_values = pred_values.astype(bool, copy=False) + else: + pred_values = pred_values.astype(new_values.dtype, copy=False) + new_values[n_half:] = pred_values + data[variable] = {time_period: new_values} + return data + + def _impute_cps_only_variables( data: dict, time_period: int, @@ -180,17 +423,16 @@ def _impute_cps_only_variables( missing_outputs, ) - # Build PUF clone test data: demographics from CPS sim (PUF clones - # share demographics with their CPS donors), income from the - # PUF-imputed values in the second half of the doubled data. - n_persons_half = len(data["person_id"][time_period]) // 2 - X_test = cps_sim.calculate_dataframe(CPS_STAGE2_DEMOGRAPHIC_PREDICTORS) + # Build PUF clone test data from the clone half itself, falling back to + # the CPS sim for formula variables that are not stored in the dataset. + X_test = _build_clone_test_frame( + cps_sim, + data, + time_period, + all_predictors, + ) del cps_sim - for var in CPS_STAGE2_INCOME_PREDICTORS: - # Income comes from PUF imputation in the second half. - X_test[var] = data[var][time_period][n_persons_half:] - logger.info( "Stage-2 CPS-only imputation: %d outputs, " "training on %d CPS persons, predicting for %d PUF clones", @@ -468,11 +710,24 @@ def generate(self): dataset_path=str(self.cps.file_path), ) - # Stage 2: QRF-impute CPS-only variables for PUF clones. + # Stage 2a: donor-impute CPS feature variables for PUF clones. + logger.info("Stage-2a: rematching CPS features for PUF clones") + clone_feature_predictions = _impute_clone_cps_features( + data=new_data, + time_period=self.time_period, + dataset_path=str(self.cps.file_path), + ) + new_data = _splice_clone_feature_predictions( + data=new_data, + predictions=clone_feature_predictions, + time_period=self.time_period, + ) + + # Stage 2b: QRF-impute CPS-only continuous variables for PUF clones. # Train on CPS data using demographics + PUF-imputed income # as predictors, so the PUF clone half gets values consistent # with its imputed income rather than naive donor duplication. - logger.info("Stage-2: imputing CPS-only variables for PUF clones") + logger.info("Stage-2b: imputing CPS-only variables for PUF clones") cps_only_predictions = _impute_cps_only_variables( data=new_data, time_period=self.time_period, diff --git a/policyengine_us_data/tests/test_extended_cps.py b/policyengine_us_data/tests/test_extended_cps.py index a44977bdc..0ea153d93 100644 --- a/policyengine_us_data/tests/test_extended_cps.py +++ b/policyengine_us_data/tests/test_extended_cps.py @@ -16,8 +16,13 @@ OVERRIDDEN_IMPUTED_VARIABLES, ) from policyengine_us_data.datasets.cps.extended_cps import ( + CPS_CLONE_FEATURE_VARIABLES, CPS_ONLY_IMPUTED_VARIABLES, + CPS_CLONE_FEATURE_PREDICTORS, CPS_STAGE2_INCOME_PREDICTORS, + _build_clone_test_frame, + _derive_overtime_occupation_inputs, + _impute_clone_cps_features, apply_retirement_constraints, reconcile_ss_subcomponents, ) @@ -36,6 +41,12 @@ def test_no_overlap_overridden_and_cps_only(self): overlap = set(OVERRIDDEN_IMPUTED_VARIABLES) & set(CPS_ONLY_IMPUTED_VARIABLES) assert overlap == set(), f"Variables in both OVERRIDDEN and CPS_ONLY: {overlap}" + def test_no_overlap_clone_features_and_cps_only(self): + overlap = set(CPS_CLONE_FEATURE_VARIABLES) & set(CPS_ONLY_IMPUTED_VARIABLES) + assert overlap == set(), ( + f"Variables in both clone-feature and CPS_ONLY lists: {overlap}" + ) + def test_overridden_is_subset_of_imputed(self): not_in_imputed = set(OVERRIDDEN_IMPUTED_VARIABLES) - set(IMPUTED_VARIABLES) assert not_in_imputed == set(), ( @@ -328,3 +339,160 @@ def test_single_call_vs_separate_calls_differ(self, correlated_training_data): assert corr_seq > corr_indep, ( f"Sequential corr ({corr_seq:.3f}) should exceed independent corr ({corr_indep:.3f})" ) + + +class TestCloneFeatureImputation: + def test_build_clone_test_frame_overrides_person_and_household_features(self): + class FakeMicrosimulation: + def calculate_dataframe(self, columns): + base = pd.DataFrame( + { + "age": [30, 40], + "state_fips": [6, 36], + "tax_unit_is_joint": [0, 1], + "tax_unit_count_dependents": [0, 2], + "is_tax_unit_head": [1, 1], + "is_tax_unit_spouse": [0, 0], + "is_tax_unit_dependent": [0, 0], + "employment_income": [20_000, 35_000], + "self_employment_income": [0, 0], + "social_security": [0, 0], + } + ) + return base[columns] + + tp = 2024 + data = { + "person_id": {tp: np.array([1, 2, 101, 102])}, + "household_id": {tp: np.array([1, 2, 101, 102])}, + "person_household_id": {tp: np.array([1, 2, 101, 102])}, + "age": {tp: np.array([30, 40, 50, 60], dtype=np.float32)}, + "employment_income": { + tp: np.array([20_000, 35_000, 90_000, 150_000], dtype=np.float32) + }, + "self_employment_income": { + tp: np.zeros(4, dtype=np.float32) + }, + "social_security": {tp: np.zeros(4, dtype=np.float32)}, + "is_tax_unit_head": {tp: np.ones(4, dtype=bool)}, + "is_tax_unit_spouse": {tp: np.zeros(4, dtype=bool)}, + "is_tax_unit_dependent": {tp: np.zeros(4, dtype=bool)}, + "state_fips": {tp: np.array([6, 36, 12, 48], dtype=np.int16)}, + } + + result = _build_clone_test_frame( + FakeMicrosimulation(), + data, + tp, + CPS_CLONE_FEATURE_PREDICTORS, + ) + + assert result["age"].tolist() == [50, 60] + assert result["employment_income"].tolist() == [90_000, 150_000] + assert result["state_fips"].tolist() == [12, 48] + assert result["tax_unit_is_joint"].tolist() == [0, 1] + + def test_derive_overtime_occupation_inputs(self): + derived = _derive_overtime_occupation_inputs( + np.array([53, 52, 8, 41, 1, 99]) + ) + + assert derived["has_never_worked"].tolist() == [ + True, + False, + False, + False, + False, + False, + ] + assert derived["is_military"].tolist() == [ + False, + True, + False, + False, + False, + False, + ] + assert derived["is_computer_scientist"].tolist() == [ + False, + False, + True, + False, + False, + False, + ] + assert derived["is_farmer_fisher"].tolist() == [ + False, + False, + False, + True, + False, + False, + ] + assert derived["is_executive_administrative_professional"].tolist() == [ + False, + False, + False, + False, + True, + False, + ] + + def test_clone_feature_imputation_rematches_outputs_and_derives_flags( + self, monkeypatch + ): + import policyengine_us + + train = pd.DataFrame( + { + "age": [45, 17], + "state_fips": [1, 1], + "tax_unit_is_joint": [0, 0], + "tax_unit_count_dependents": [0, 1], + "is_tax_unit_head": [1, 0], + "is_tax_unit_spouse": [0, 0], + "is_tax_unit_dependent": [0, 1], + "employment_income": [95_000, 0], + "self_employment_income": [0, 0], + "social_security": [0, 0], + "is_male": [1, 0], + "cps_race": [2, 1], + "is_hispanic": [0, 1], + "detailed_occupation_recode": [8, 41], + } + ) + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = dataset + + def calculate_dataframe(self, columns): + return train[columns] + + monkeypatch.setattr(policyengine_us, "Microsimulation", FakeMicrosimulation) + + tp = 2024 + data = { + "person_id": {tp: np.array([1, 2, 101, 102])}, + "household_id": {tp: np.array([1, 2, 101, 102])}, + "person_household_id": {tp: np.array([1, 2, 101, 102])}, + "age": {tp: np.array([45, 17, 46, 17], dtype=np.float32)}, + "state_fips": {tp: np.array([1, 1, 1, 1], dtype=np.int16)}, + "tax_unit_is_joint": {tp: np.zeros(4, dtype=np.float32)}, + "tax_unit_count_dependents": {tp: np.array([0, 1, 0, 1], dtype=np.float32)}, + "is_tax_unit_head": {tp: np.array([1, 0, 1, 0], dtype=bool)}, + "is_tax_unit_spouse": {tp: np.zeros(4, dtype=bool)}, + "is_tax_unit_dependent": {tp: np.array([0, 1, 0, 1], dtype=bool)}, + "employment_income": {tp: np.array([95_000, 0, 97_000, 0], dtype=np.float32)}, + "self_employment_income": {tp: np.zeros(4, dtype=np.float32)}, + "social_security": {tp: np.zeros(4, dtype=np.float32)}, + } + + result = _impute_clone_cps_features(data, tp, "unused") + + assert result["detailed_occupation_recode"].tolist() == [8, 41] + assert result["is_male"].tolist() == [1, 0] + assert result["cps_race"].tolist() == [2, 1] + assert result["is_hispanic"].tolist() == [0, 1] + assert result["is_computer_scientist"].tolist() == [True, False] + assert result["is_farmer_fisher"].tolist() == [False, True] From e5d528ba933f487659011f7f1f502506160b1f5e Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 28 Mar 2026 09:16:15 -0400 Subject: [PATCH 2/9] Trigger upstream PR CI From 06f6df2f9e3fa9cf03a18c482be23a0a51a365b0 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 28 Mar 2026 09:18:22 -0400 Subject: [PATCH 3/9] Refresh changelog fragment wording --- changelog.d/impute-cps-clone-features.changed.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/impute-cps-clone-features.changed.md b/changelog.d/impute-cps-clone-features.changed.md index 981900a05..47883bf2b 100644 --- a/changelog.d/impute-cps-clone-features.changed.md +++ b/changelog.d/impute-cps-clone-features.changed.md @@ -1 +1 @@ -Donor-impute race, Hispanic status, sex, and occupation-based CPS features onto the PUF clone half of the extended CPS so subgroup and overtime-eligibility inputs better align with PUF-imputed incomes. +Donor-impute race, Hispanic status, sex, and occupation-based CPS features onto the PUF clone half of the extended CPS so subgroup analyses and overtime-eligibility inputs better align with PUF-imputed incomes. From c827028651b3778559e35fbb7755c84033600508 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 28 Mar 2026 09:37:07 -0400 Subject: [PATCH 4/9] Format rebased CPS clone imputation changes --- .../datasets/cps/extended_cps.py | 18 ++++++++++++++---- .../tests/test_extended_cps.py | 12 +++++------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 0bdfca947..8c4881713 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -26,9 +26,11 @@ logger = logging.getLogger(__name__) + def _supports_structural_mortgage_inputs() -> bool: return has_policyengine_us_variables(*STRUCTURAL_MORTGAGE_VARIABLES) + # CPS-only categorical features to donor-impute onto the PUF clone half. # These drive subgroup analysis and occupation-based logic, so naive donor # duplication dilutes the relationship between the clone's PUF-imputed @@ -205,7 +207,9 @@ def _clone_half_person_values(data: dict, variable: str, time_period: int): continue entity_half = len(entity_ids) // 2 clone_entity_ids = entity_ids[entity_half:] - clone_person_entity_ids = data[person_entity_id_var][time_period][n_persons_half:] + clone_person_entity_ids = data[person_entity_id_var][time_period][ + n_persons_half: + ] value_map = dict(zip(clone_entity_ids, values[entity_half:])) return np.array([value_map[idx] for idx in clone_person_entity_ids]) @@ -278,7 +282,9 @@ def _impute_clone_cps_features( CPS_CLONE_FEATURE_PREDICTORS + CPS_CLONE_FEATURE_VARIABLES ) available_outputs = [ - variable for variable in CPS_CLONE_FEATURE_VARIABLES if variable in X_train.columns + variable + for variable in CPS_CLONE_FEATURE_VARIABLES + if variable in X_train.columns ] if not available_outputs: n_half = len(data["person_id"][time_period]) // 2 @@ -331,8 +337,12 @@ def _impute_clone_cps_features( predictions.loc[test_mask, available_outputs] = donor_outputs.to_numpy() if "detailed_occupation_recode" in predictions: - occupation_codes = predictions["detailed_occupation_recode"].astype(float).to_numpy() - for column, values in _derive_overtime_occupation_inputs(occupation_codes).items(): + occupation_codes = ( + predictions["detailed_occupation_recode"].astype(float).to_numpy() + ) + for column, values in _derive_overtime_occupation_inputs( + occupation_codes + ).items(): predictions[column] = values return predictions diff --git a/policyengine_us_data/tests/test_extended_cps.py b/policyengine_us_data/tests/test_extended_cps.py index 0ea153d93..91d2abfa7 100644 --- a/policyengine_us_data/tests/test_extended_cps.py +++ b/policyengine_us_data/tests/test_extended_cps.py @@ -370,9 +370,7 @@ def calculate_dataframe(self, columns): "employment_income": { tp: np.array([20_000, 35_000, 90_000, 150_000], dtype=np.float32) }, - "self_employment_income": { - tp: np.zeros(4, dtype=np.float32) - }, + "self_employment_income": {tp: np.zeros(4, dtype=np.float32)}, "social_security": {tp: np.zeros(4, dtype=np.float32)}, "is_tax_unit_head": {tp: np.ones(4, dtype=bool)}, "is_tax_unit_spouse": {tp: np.zeros(4, dtype=bool)}, @@ -393,9 +391,7 @@ def calculate_dataframe(self, columns): assert result["tax_unit_is_joint"].tolist() == [0, 1] def test_derive_overtime_occupation_inputs(self): - derived = _derive_overtime_occupation_inputs( - np.array([53, 52, 8, 41, 1, 99]) - ) + derived = _derive_overtime_occupation_inputs(np.array([53, 52, 8, 41, 1, 99])) assert derived["has_never_worked"].tolist() == [ True, @@ -483,7 +479,9 @@ def calculate_dataframe(self, columns): "is_tax_unit_head": {tp: np.array([1, 0, 1, 0], dtype=bool)}, "is_tax_unit_spouse": {tp: np.zeros(4, dtype=bool)}, "is_tax_unit_dependent": {tp: np.array([0, 1, 0, 1], dtype=bool)}, - "employment_income": {tp: np.array([95_000, 0, 97_000, 0], dtype=np.float32)}, + "employment_income": { + tp: np.array([95_000, 0, 97_000, 0], dtype=np.float32) + }, "self_employment_income": {tp: np.zeros(4, dtype=np.float32)}, "social_security": {tp: np.zeros(4, dtype=np.float32)}, } From b32dbeecf201c9f3f9b4c909b99f6c1420221944 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sun, 29 Mar 2026 07:07:44 -0400 Subject: [PATCH 5/9] Add Treasury tipped occupation codes to CPS data --- policyengine_us_data/datasets/cps/cps.py | 6 ++ .../datasets/cps/extended_cps.py | 2 + .../datasets/cps/tipped_occupation.py | 84 +++++++++++++++++++ .../tests/test_extended_cps.py | 15 ++++ 4 files changed, 107 insertions(+) create mode 100644 policyengine_us_data/datasets/cps/tipped_occupation.py diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 148e83e4c..5503913ec 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -25,6 +25,9 @@ predict_org_features, ) from policyengine_us_data.utils.randomness import seeded_rng +from policyengine_us_data.datasets.cps.tipped_occupation import ( + derive_treasury_tipped_occupation_code, +) class CPS(Dataset): @@ -487,6 +490,9 @@ def children_per_parent(col: str) -> pd.DataFrame: cps["is_full_time_college_student"] = person.A_HSCOL == 2 cps["detailed_occupation_recode"] = person.POCCU2 + cps["treasury_tipped_occupation_code"] = derive_treasury_tipped_occupation_code( + person.PEIOOCC + ) add_overtime_occupation(cps, person) diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 8c4881713..7cf4be11e 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -41,6 +41,8 @@ def _supports_structural_mortgage_inputs() -> bool: "is_hispanic", "detailed_occupation_recode", ] +if has_policyengine_us_variables("treasury_tipped_occupation_code"): + CPS_CLONE_FEATURE_VARIABLES.append("treasury_tipped_occupation_code") # Predictors used to rematch CPS features onto the PUF clone half. # These are all available on the CPS half and on the doubled extended CPS. diff --git a/policyengine_us_data/datasets/cps/tipped_occupation.py b/policyengine_us_data/datasets/cps/tipped_occupation.py new file mode 100644 index 000000000..76d523529 --- /dev/null +++ b/policyengine_us_data/datasets/cps/tipped_occupation.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +# Derived by joining: +# 1. Treasury Tipped Occupation Codes (TTOCs) and related 2018 SOC codes from +# the IRS "Occupations that customarily and regularly received tips on or +# before December 31, 2024" list / IRB 2025-42. +# 2. The Census Bureau 2018 occupation code list crosswalk from 2018 Census +# occupation code to 2018 SOC code. +# +# A few IRS SOC entries correspond to multiple TTOCs. For those collisions we +# pick one representative TTOC because the current policyengine-us logic only +# needs to distinguish listed occupations (TTOC > 0) from unlisted ones. The +# more detailed approximation work belongs here in policyengine-us-data, not in +# policyengine-us. +CENSUS_OCCUPATION_CODE_TO_TTOC = { + 725: 502, + 2350: 507, + 2633: 502, + 2752: 206, + 2755: 207, + 2770: 208, + 2910: 503, + 3602: 501, + 3630: 602, + 4000: 105, + 4010: 106, + 4030: 106, + 4040: 101, + 4055: 107, + 4110: 102, + 4120: 103, + 4130: 104, + 4140: 108, + 4150: 109, + 4160: 106, + 4230: 304, + 4251: 402, + 4350: 506, + 4420: 210, + 4500: 603, + 4510: 603, + 4521: 605, + 4522: 601, + 4600: 508, + 4621: 607, + 4655: 501, + 5130: 203, + 5300: 303, + 6355: 403, + 6442: 404, + 7120: 401, + 7200: 409, + 7315: 405, + 7320: 406, + 7340: 401, + 7540: 408, + 7610: 401, + 7800: 110, + 8510: 401, + 9122: 806, + 9141: 803, + 9142: 802, + 9350: 801, + 9610: 805, + 9620: 809, +} + + +def derive_treasury_tipped_occupation_code( + census_occupation_codes: pd.Series | np.ndarray, +) -> np.ndarray: + """Map CPS PEIOOCC detailed occupation codes to Treasury tipped codes.""" + + values = pd.Series(census_occupation_codes, copy=False) + values = pd.to_numeric(values, errors="coerce").fillna(-1).astype(int) + return ( + values.map(CENSUS_OCCUPATION_CODE_TO_TTOC) + .fillna(0) + .astype(np.int16) + .to_numpy() + ) diff --git a/policyengine_us_data/tests/test_extended_cps.py b/policyengine_us_data/tests/test_extended_cps.py index 91d2abfa7..e25b6d802 100644 --- a/policyengine_us_data/tests/test_extended_cps.py +++ b/policyengine_us_data/tests/test_extended_cps.py @@ -26,6 +26,9 @@ apply_retirement_constraints, reconcile_ss_subcomponents, ) +from policyengine_us_data.datasets.cps.tipped_occupation import ( + derive_treasury_tipped_occupation_code, +) from policyengine_us_data.datasets.org import ORG_IMPUTED_VARIABLES @@ -207,6 +210,15 @@ def test_se_pension_zeroed_without_se_income( ).all(), "SE pension should be zero without SE income" +class TestTreasuryTippedOccupationCode: + def test_derive_treasury_tipped_occupation_code(self): + derived = derive_treasury_tipped_occupation_code( + np.array([4040, 4110, 4230, 2770, -1, 9999]) + ) + + assert derived.tolist() == [101, 102, 304, 208, 0, 0] + + class TestSSReconciliation: """Post-processing SS normalization ensures sub-components sum to total.""" @@ -455,6 +467,7 @@ def test_clone_feature_imputation_rematches_outputs_and_derives_flags( "cps_race": [2, 1], "is_hispanic": [0, 1], "detailed_occupation_recode": [8, 41], + "treasury_tipped_occupation_code": [101, 304], } ) @@ -492,5 +505,7 @@ def calculate_dataframe(self, columns): assert result["is_male"].tolist() == [1, 0] assert result["cps_race"].tolist() == [2, 1] assert result["is_hispanic"].tolist() == [0, 1] + if "treasury_tipped_occupation_code" in result.columns: + assert result["treasury_tipped_occupation_code"].tolist() == [101, 304] assert result["is_computer_scientist"].tolist() == [True, False] assert result["is_farmer_fisher"].tolist() == [False, True] From 6e95a61a5297aec91e06a65921b2f5965b835542 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sun, 29 Mar 2026 07:09:52 -0400 Subject: [PATCH 6/9] Format tipped occupation crosswalk helper --- policyengine_us_data/datasets/cps/tipped_occupation.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/policyengine_us_data/datasets/cps/tipped_occupation.py b/policyengine_us_data/datasets/cps/tipped_occupation.py index 76d523529..74cc122b9 100644 --- a/policyengine_us_data/datasets/cps/tipped_occupation.py +++ b/policyengine_us_data/datasets/cps/tipped_occupation.py @@ -77,8 +77,5 @@ def derive_treasury_tipped_occupation_code( values = pd.Series(census_occupation_codes, copy=False) values = pd.to_numeric(values, errors="coerce").fillna(-1).astype(int) return ( - values.map(CENSUS_OCCUPATION_CODE_TO_TTOC) - .fillna(0) - .astype(np.int16) - .to_numpy() + values.map(CENSUS_OCCUPATION_CODE_TO_TTOC).fillna(0).astype(np.int16).to_numpy() ) From 2a71b0452c9f01fb12ef3da9bb7af44f13af2a2e Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sun, 29 Mar 2026 18:54:19 -0400 Subject: [PATCH 7/9] Use tipped occupation status in SIPP tip imputation --- .../calibration/source_impute.py | 22 ++++++++++++++ policyengine_us_data/datasets/cps/cps.py | 4 +++ .../datasets/cps/tipped_occupation.py | 29 ++++++++++++++++++ policyengine_us_data/datasets/sipp/sipp.py | 30 ++++++++++++++----- .../test_calibration/test_source_impute.py | 27 +++++++++++++++++ 5 files changed, 105 insertions(+), 7 deletions(-) diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index 883d68dc7..f3ea5d292 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -28,6 +28,10 @@ import numpy as np import pandas as pd +from policyengine_us_data.datasets.cps.tipped_occupation import ( + derive_any_treasury_tipped_occupation_code, + derive_is_tipped_occupation, +) from policyengine_us_data.datasets.org import ( ORG_BOOL_VARIABLES, @@ -80,6 +84,7 @@ "age", "count_under_18", "count_under_6", + "is_tipped_occupation", ] SIPP_ASSETS_PREDICTORS = [ @@ -109,6 +114,8 @@ "NONE": 0, } +SIPP_JOB_OCCUPATION_COLUMNS = [f"TJB{i}_OCC" for i in range(1, 8)] + def _encode_tenure_type(df: pd.DataFrame) -> pd.DataFrame: """Convert tenure_type enum strings to numeric codes.""" @@ -381,6 +388,14 @@ def _impute_sipp( sipp_df["age"] = sipp_df.TAGE sipp_df["household_weight"] = sipp_df.WPFINWGT sipp_df["household_id"] = sipp_df.SSUID + sipp_df["treasury_tipped_occupation_code"] = ( + derive_any_treasury_tipped_occupation_code( + sipp_df[SIPP_JOB_OCCUPATION_COLUMNS] + ) + ) + sipp_df["is_tipped_occupation"] = derive_is_tipped_occupation( + sipp_df["treasury_tipped_occupation_code"] + ) sipp_df["is_under_18"] = sipp_df.TAGE < 18 sipp_df["is_under_6"] = sipp_df.TAGE < 6 @@ -398,6 +413,7 @@ def _impute_sipp( "count_under_18", "count_under_6", "age", + "is_tipped_occupation", "household_weight", ] tip_train = sipp_df[tip_cols].dropna() @@ -428,6 +444,12 @@ def _impute_sipp( else: cps_tip_df["count_under_18"] = 0.0 cps_tip_df["count_under_6"] = 0.0 + if "treasury_tipped_occupation_code" in data: + cps_tip_df["is_tipped_occupation"] = derive_is_tipped_occupation( + data["treasury_tipped_occupation_code"][time_period] + ).astype(np.float32) + else: + cps_tip_df["is_tipped_occupation"] = 0.0 qrf = QRF() logger.info( diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 5503913ec..4f55348c9 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -27,6 +27,7 @@ from policyengine_us_data.utils.randomness import seeded_rng from policyengine_us_data.datasets.cps.tipped_occupation import ( derive_treasury_tipped_occupation_code, + derive_is_tipped_occupation, ) @@ -1808,6 +1809,9 @@ def add_tips(self, cps: h5py.File): .values ) cps = pd.DataFrame(cps) + cps["is_tipped_occupation"] = derive_is_tipped_occupation( + cps["treasury_tipped_occupation_code"] + ) # Impute tips diff --git a/policyengine_us_data/datasets/cps/tipped_occupation.py b/policyengine_us_data/datasets/cps/tipped_occupation.py index 74cc122b9..52584972f 100644 --- a/policyengine_us_data/datasets/cps/tipped_occupation.py +++ b/policyengine_us_data/datasets/cps/tipped_occupation.py @@ -79,3 +79,32 @@ def derive_treasury_tipped_occupation_code( return ( values.map(CENSUS_OCCUPATION_CODE_TO_TTOC).fillna(0).astype(np.int16).to_numpy() ) + + +def derive_any_treasury_tipped_occupation_code( + occupation_columns: pd.DataFrame, +) -> np.ndarray: + """Collapse multiple job occupation columns to one person-level tipped code.""" + + if occupation_columns.shape[1] == 0: + return np.zeros(len(occupation_columns), dtype=np.int16) + + mapped_columns = [ + derive_treasury_tipped_occupation_code(occupation_columns[column]) + for column in occupation_columns.columns + ] + return np.column_stack(mapped_columns).max(axis=1).astype(np.int16) + + +def derive_is_tipped_occupation( + treasury_tipped_occupation_codes: pd.Series | np.ndarray, +) -> np.ndarray: + """Return a boolean indicator for whether any Treasury tipped code is present.""" + + return ( + pd.Series(treasury_tipped_occupation_codes, copy=False) + .fillna(0) + .astype(np.int16) + .gt(0) + .to_numpy() + ) diff --git a/policyengine_us_data/datasets/sipp/sipp.py b/policyengine_us_data/datasets/sipp/sipp.py index ca62b9f41..2b8ead951 100644 --- a/policyengine_us_data/datasets/sipp/sipp.py +++ b/policyengine_us_data/datasets/sipp/sipp.py @@ -4,6 +4,20 @@ from policyengine_us_data.storage import STORAGE_FOLDER import pickle from huggingface_hub import hf_hub_download +from policyengine_us_data.datasets.cps.tipped_occupation import ( + derive_any_treasury_tipped_occupation_code, + derive_is_tipped_occupation, +) + + +SIPP_JOB_OCCUPATION_COLUMNS = [f"TJB{i}_OCC" for i in range(1, 8)] +TIP_MODEL_PREDICTORS = [ + "employment_income", + "age", + "count_under_18", + "count_under_6", + "is_tipped_occupation", +] def train_tip_model(): @@ -79,6 +93,12 @@ def train_tip_model(): df["household_weight"] = df.WPFINWGT df["household_id"] = df.SSUID df["age"] = df.TAGE + df["treasury_tipped_occupation_code"] = derive_any_treasury_tipped_occupation_code( + df[SIPP_JOB_OCCUPATION_COLUMNS] + ) + df["is_tipped_occupation"] = derive_is_tipped_occupation( + df["treasury_tipped_occupation_code"] + ) sipp = df[ [ @@ -88,6 +108,7 @@ def train_tip_model(): "count_under_18", "count_under_6", "age", + "is_tipped_occupation", "household_weight", ] ] @@ -107,12 +128,7 @@ def train_tip_model(): model = model.fit( X_train=sipp, - predictors=[ - "employment_income", - "age", - "count_under_18", - "count_under_6", - ], + predictors=TIP_MODEL_PREDICTORS, imputed_variables=["tip_income"], ) @@ -120,7 +136,7 @@ def train_tip_model(): def get_tip_model() -> QRF: - model_path = STORAGE_FOLDER / "tips.pkl" + model_path = STORAGE_FOLDER / "tips_tipped_occ_v2.pkl" if not model_path.exists(): model = train_tip_model() diff --git a/policyengine_us_data/tests/test_calibration/test_source_impute.py b/policyengine_us_data/tests/test_calibration/test_source_impute.py index 2d2edb9e3..704fd103f 100644 --- a/policyengine_us_data/tests/test_calibration/test_source_impute.py +++ b/policyengine_us_data/tests/test_calibration/test_source_impute.py @@ -4,6 +4,7 @@ """ import numpy as np +import pandas as pd from policyengine_us_data.calibration.source_impute import ( ACS_IMPUTED_VARIABLES, @@ -21,6 +22,10 @@ _person_state_fips, impute_source_variables, ) +from policyengine_us_data.datasets.cps.tipped_occupation import ( + derive_any_treasury_tipped_occupation_code, + derive_is_tipped_occupation, +) from policyengine_us_data.datasets.org import ORG_IMPUTED_VARIABLES @@ -43,6 +48,9 @@ def _make_data_dict(n_persons=20, time_period=2024): "employment_income": { time_period: rng.uniform(0, 100000, n_persons).astype(np.float32), }, + "treasury_tipped_occupation_code": { + time_period: np.zeros(n_persons, dtype=np.int16), + }, "rent": {time_period: np.zeros(n_persons)}, "real_estate_taxes": {time_period: np.zeros(n_persons)}, "tip_income": {time_period: np.zeros(n_persons)}, @@ -100,6 +108,9 @@ def test_acs_uses_state(self): def test_sipp_tips_has_income(self): assert "employment_income" in SIPP_TIPS_PREDICTORS + def test_sipp_tips_uses_tipped_occupation_status(self): + assert "is_tipped_occupation" in SIPP_TIPS_PREDICTORS + def test_sipp_assets_has_income(self): assert "employment_income" in SIPP_ASSETS_PREDICTORS @@ -228,3 +239,19 @@ def test_impute_org_exists(self): def test_impute_scf_exists(self): assert callable(_impute_scf) + + +class TestTippedOccupationHelpers: + def test_derive_any_treasury_tipped_occupation_code(self): + occupations = pd.DataFrame( + { + "TJB1_OCC": [4040, 1021, np.nan], + "TJB2_OCC": [np.nan, 4110, 9620], + } + ) + derived = derive_any_treasury_tipped_occupation_code(occupations) + np.testing.assert_array_equal(derived, np.array([101, 102, 809])) + + def test_derive_is_tipped_occupation(self): + derived = derive_is_tipped_occupation(np.array([0, 101, 809])) + np.testing.assert_array_equal(derived, np.array([False, True, True])) From 3993c5c0b39f176acfc4251a0a820c997c80e21c Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sun, 29 Mar 2026 23:52:14 -0400 Subject: [PATCH 8/9] Format source impute after tip predictor update --- policyengine_us_data/calibration/source_impute.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index f3ea5d292..cc2c43e3e 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -389,9 +389,7 @@ def _impute_sipp( sipp_df["household_weight"] = sipp_df.WPFINWGT sipp_df["household_id"] = sipp_df.SSUID sipp_df["treasury_tipped_occupation_code"] = ( - derive_any_treasury_tipped_occupation_code( - sipp_df[SIPP_JOB_OCCUPATION_COLUMNS] - ) + derive_any_treasury_tipped_occupation_code(sipp_df[SIPP_JOB_OCCUPATION_COLUMNS]) ) sipp_df["is_tipped_occupation"] = derive_is_tipped_occupation( sipp_df["treasury_tipped_occupation_code"] From 78c02f18ce50e958b719e9c8e3cd5e0fc5f2d330 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 30 Mar 2026 16:19:20 -0400 Subject: [PATCH 9/9] Fix CPS tip imputation build path --- policyengine_us_data/datasets/cps/cps.py | 6 +- .../test_datasets/test_cps_generation.py | 80 +++++++++++++++++++ 2 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 policyengine_us_data/tests/test_datasets/test_cps_generation.py diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 4f55348c9..73ff9badc 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -1792,6 +1792,9 @@ def add_tips(self, cps: h5py.File): raw_data = self.raw_cps(require=True).load() raw_person = raw_data["person"] cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values + cps["is_tipped_occupation"] = derive_is_tipped_occupation( + derive_treasury_tipped_occupation_code(raw_person.PEIOOCC) + ) raw_data.close() cps["is_under_18"] = cps.age < 18 @@ -1809,9 +1812,6 @@ def add_tips(self, cps: h5py.File): .values ) cps = pd.DataFrame(cps) - cps["is_tipped_occupation"] = derive_is_tipped_occupation( - cps["treasury_tipped_occupation_code"] - ) # Impute tips diff --git a/policyengine_us_data/tests/test_datasets/test_cps_generation.py b/policyengine_us_data/tests/test_datasets/test_cps_generation.py new file mode 100644 index 000000000..5f9dd6e4e --- /dev/null +++ b/policyengine_us_data/tests/test_datasets/test_cps_generation.py @@ -0,0 +1,80 @@ +import pandas as pd + + +def test_add_tips_derives_tipped_status_from_raw_cps(monkeypatch): + import policyengine_us + import policyengine_us_data.datasets.sipp as sipp_module + from policyengine_us_data.datasets.cps.cps import add_tips + + class FakeRawData: + def __init__(self): + self.person = pd.DataFrame( + { + "A_MARITL": [1, 3], + "PEIOOCC": [4040, 9999], + } + ) + + def __getitem__(self, key): + if key == "person": + return self.person + raise KeyError(key) + + def close(self): + pass + + class FakeRawCPS: + def __call__(self, require=True): + return self + + def load(self): + return FakeRawData() + + class FakeDataset: + def __init__(self): + self.raw_cps = FakeRawCPS() + self.saved_dataset = None + + def save_dataset(self, data): + self.saved_dataset = data + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = dataset + + def calculate_dataframe(self, columns, year): + base = pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 20], + "employment_income": [25_000, 30_000], + "age": [30, 45], + "household_weight": [1.0, 1.0], + "is_female": [False, True], + } + ) + return base[columns] + + class FakeTipModel: + def predict(self, X_test, mean_quantile): + assert X_test["is_tipped_occupation"].tolist() == [True, False] + return pd.DataFrame({"tip_income": [100.0, 0.0]}) + + class FakeAssetModel: + def predict(self, X_test, mean_quantile): + return pd.DataFrame( + { + "bank_account_assets": [0.0, 0.0], + "stock_assets": [0.0, 0.0], + "bond_assets": [0.0, 0.0], + } + ) + + monkeypatch.setattr(policyengine_us, "Microsimulation", FakeMicrosimulation) + monkeypatch.setattr(sipp_module, "get_tip_model", lambda: FakeTipModel()) + monkeypatch.setattr(sipp_module, "get_asset_model", lambda: FakeAssetModel()) + + dataset = FakeDataset() + add_tips(dataset, {}) + + assert dataset.saved_dataset["tip_income"].tolist() == [100.0, 0.0]