From 7a6e3bbf5f9ec217e5ad63843f3587bccc53451f Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Fri, 27 Mar 2026 09:58:07 -0400 Subject: [PATCH 1/3] Donor-impute CPS clone features --- .../impute-cps-clone-features.changed.md | 1 + .../datasets/cps/extended_cps.py | 289 +++++++++++++++++- .../tests/test_extended_cps.py | 168 ++++++++++ 3 files changed, 442 insertions(+), 16 deletions(-) create mode 100644 changelog.d/impute-cps-clone-features.changed.md diff --git a/changelog.d/impute-cps-clone-features.changed.md b/changelog.d/impute-cps-clone-features.changed.md new file mode 100644 index 000000000..981900a05 --- /dev/null +++ b/changelog.d/impute-cps-clone-features.changed.md @@ -0,0 +1 @@ +Donor-impute race, Hispanic status, sex, and occupation-based CPS features onto the PUF clone half of the extended CPS so subgroup and overtime-eligibility inputs better align with PUF-imputed incomes. diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index f38d57463..5146aea82 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -6,8 +6,8 @@ import pandas as pd from policyengine_core.data import Dataset -from policyengine_us_data.datasets.cps.cps import * # noqa: F403 -from policyengine_us_data.datasets.puf import * # noqa: F403 +from policyengine_us_data.datasets.cps.cps import CPS, CPS_2024, CPS_2024_Full +from policyengine_us_data.datasets.puf import PUF, PUF_2024 from policyengine_us_data.storage import STORAGE_FOLDER from policyengine_us_data.utils.retirement_limits import ( get_retirement_limits, @@ -16,10 +16,75 @@ logger = logging.getLogger(__name__) +# CPS-only categorical features to donor-impute onto the PUF clone half. +# These drive subgroup analysis and occupation-based logic, so naive donor +# duplication dilutes the relationship between the clone's PUF-imputed +# income and its CPS-side demographic/occupation labels. +CPS_CLONE_FEATURE_VARIABLES = [ + "is_male", + "cps_race", + "is_hispanic", + "detailed_occupation_recode", +] + +# Predictors used to rematch CPS features onto the PUF clone half. +# These are all available on the CPS half and on the doubled extended CPS. +CPS_CLONE_FEATURE_PREDICTORS = [ + "age", + "state_fips", + "tax_unit_is_joint", + "tax_unit_count_dependents", + "is_tax_unit_head", + "is_tax_unit_spouse", + "is_tax_unit_dependent", + "employment_income", + "self_employment_income", + "social_security", +] + +_OVERTIME_OCCUPATION_CODES = { + "has_never_worked": 53, + "is_military": 52, + "is_computer_scientist": 8, + "is_farmer_fisher": 41, +} +_EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES = np.array( + [ + 1, + 2, + 3, + 5, + 7, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 18, + 19, + 20, + 21, + 22, + 24, + 25, + 27, + 28, + 29, + 30, + 32, + 33, + 34, + ], + dtype=np.int16, +) + # CPS-only variables that should be QRF-imputed for the PUF clone half -# instead of naively duplicated from the CPS donor. These are -# income-correlated variables that exist only in the CPS; demographics, -# IDs, weights, and random seeds are fine to duplicate. +# instead of naively duplicated from the CPS donor. Most demographics, +# IDs, weights, and random seeds are fine to duplicate; the categorical +# clone features above are rematched separately. CPS_ONLY_IMPUTED_VARIABLES = [ # Retirement distributions "taxable_401k_distributions", @@ -98,6 +163,186 @@ ] +def _clone_half_person_values(data: dict, variable: str, time_period: int): + """Return clone-half values for ``variable`` mapped to person rows.""" + if variable not in data: + return None + + values = data[variable][time_period] + n_persons = len(data["person_id"][time_period]) + n_persons_half = n_persons // 2 + if len(values) == n_persons: + return np.asarray(values[n_persons_half:]) + + entity_mappings = [ + ("household_id", "person_household_id"), + ("tax_unit_id", "person_tax_unit_id"), + ("spm_unit_id", "person_spm_unit_id"), + ("family_id", "person_family_id"), + ] + for entity_id_var, person_entity_id_var in entity_mappings: + if entity_id_var not in data or person_entity_id_var not in data: + continue + entity_ids = data[entity_id_var][time_period] + if len(values) != len(entity_ids): + continue + entity_half = len(entity_ids) // 2 + clone_entity_ids = entity_ids[entity_half:] + clone_person_entity_ids = data[person_entity_id_var][time_period][n_persons_half:] + value_map = dict(zip(clone_entity_ids, values[entity_half:])) + return np.array([value_map[idx] for idx in clone_person_entity_ids]) + + return None + + +def _build_clone_test_frame( + cps_sim, + data: dict, + time_period: int, + predictors: list[str], +) -> pd.DataFrame: + """Build clone-half predictor data with available doubled-dataset overrides.""" + X_test = cps_sim.calculate_dataframe(predictors).copy() + for predictor in predictors: + clone_values = _clone_half_person_values(data, predictor, time_period) + if clone_values is not None and len(clone_values) == len(X_test): + X_test[predictor] = clone_values + return X_test[predictors] + + +def _prepare_knn_matrix( + df: pd.DataFrame, + reference: pd.DataFrame | None = None, +) -> np.ndarray: + """Normalise mixed-scale donor-matching predictors for kNN.""" + X = df.astype(float).copy() + for income_var in CPS_STAGE2_INCOME_PREDICTORS: + if income_var in X: + X[income_var] = np.arcsinh(X[income_var]) + + ref = X if reference is None else reference.astype(float).copy() + for income_var in CPS_STAGE2_INCOME_PREDICTORS: + if income_var in ref: + ref[income_var] = np.arcsinh(ref[income_var]) + + means = ref.mean() + stds = ref.std(ddof=0).replace(0, 1) + normalised = (X - means) / stds + return np.nan_to_num(normalised.to_numpy(dtype=np.float32), nan=0.0) + + +def _derive_overtime_occupation_inputs( + occupation_codes: np.ndarray, +) -> pd.DataFrame: + """Derive occupation-based overtime-exemption inputs from POCCU2.""" + occupation_codes = np.rint(occupation_codes).astype(np.int16, copy=False) + derived = { + name: occupation_codes == code + for name, code in _OVERTIME_OCCUPATION_CODES.items() + } + derived["is_executive_administrative_professional"] = np.isin( + occupation_codes, + _EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES, + ) + return pd.DataFrame(derived) + + +def _impute_clone_cps_features( + data: dict, + time_period: int, + dataset_path: str, +) -> pd.DataFrame: + """Rematch CPS demographic/occupation features for the clone half.""" + from policyengine_us import Microsimulation + from sklearn.neighbors import NearestNeighbors + + cps_sim = Microsimulation(dataset=dataset_path) + X_train = cps_sim.calculate_dataframe( + CPS_CLONE_FEATURE_PREDICTORS + CPS_CLONE_FEATURE_VARIABLES + ) + available_outputs = [ + variable for variable in CPS_CLONE_FEATURE_VARIABLES if variable in X_train.columns + ] + if not available_outputs: + n_half = len(data["person_id"][time_period]) // 2 + return pd.DataFrame(index=np.arange(n_half)) + + X_test = _build_clone_test_frame( + cps_sim, + data, + time_period, + CPS_CLONE_FEATURE_PREDICTORS, + ) + del cps_sim + + train_roles = ( + X_train[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]] + .round() + .astype(int) + .apply(tuple, axis=1) + ) + test_roles = ( + X_test[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]] + .round() + .astype(int) + .apply(tuple, axis=1) + ) + + predictions = pd.DataFrame(index=X_test.index, columns=available_outputs) + for role in test_roles.unique(): + test_mask = test_roles == role + train_mask = train_roles == role + if not train_mask.any(): + train_mask = pd.Series(True, index=X_train.index) + + train_predictors = X_train.loc[train_mask, CPS_CLONE_FEATURE_PREDICTORS] + test_predictors = X_test.loc[test_mask, CPS_CLONE_FEATURE_PREDICTORS] + train_matrix = _prepare_knn_matrix(train_predictors) + test_matrix = _prepare_knn_matrix(test_predictors, reference=train_predictors) + + matcher = NearestNeighbors(n_neighbors=1) + matcher.fit(train_matrix) + donor_indices = matcher.kneighbors( + test_matrix, + return_distance=False, + ).ravel() + donor_outputs = ( + X_train.loc[train_mask, available_outputs] + .iloc[donor_indices] + .reset_index(drop=True) + ) + predictions.loc[test_mask, available_outputs] = donor_outputs.to_numpy() + + if "detailed_occupation_recode" in predictions: + occupation_codes = predictions["detailed_occupation_recode"].astype(float).to_numpy() + for column, values in _derive_overtime_occupation_inputs(occupation_codes).items(): + predictions[column] = values + + return predictions + + +def _splice_clone_feature_predictions( + data: dict, + predictions: pd.DataFrame, + time_period: int, +) -> dict: + """Replace clone-half person-level feature variables with donor matches.""" + n_half = len(data["person_id"][time_period]) // 2 + for variable in predictions.columns: + if variable not in data: + continue + values = data[variable][time_period] + new_values = np.array(values, copy=True) + pred_values = predictions[variable].to_numpy() + if np.issubdtype(new_values.dtype, np.bool_): + pred_values = pred_values.astype(bool, copy=False) + else: + pred_values = pred_values.astype(new_values.dtype, copy=False) + new_values[n_half:] = pred_values + data[variable] = {time_period: new_values} + return data + + def _impute_cps_only_variables( data: dict, time_period: int, @@ -161,17 +406,16 @@ def _impute_cps_only_variables( missing_outputs, ) - # Build PUF clone test data: demographics from CPS sim (PUF clones - # share demographics with their CPS donors), income from the - # PUF-imputed values in the second half of the doubled data. - n_persons_half = len(data["person_id"][time_period]) // 2 - X_test = cps_sim.calculate_dataframe(CPS_STAGE2_DEMOGRAPHIC_PREDICTORS) + # Build PUF clone test data from the clone half itself, falling back to + # the CPS sim for formula variables that are not stored in the dataset. + X_test = _build_clone_test_frame( + cps_sim, + data, + time_period, + all_predictors, + ) del cps_sim - for var in CPS_STAGE2_INCOME_PREDICTORS: - # Income comes from PUF imputation in the second half. - X_test[var] = data[var][time_period][n_persons_half:] - logger.info( "Stage-2 CPS-only imputation: %d outputs, " "training on %d CPS persons, predicting for %d PUF clones", @@ -427,11 +671,24 @@ def generate(self): dataset_path=str(self.cps.file_path), ) - # Stage 2: QRF-impute CPS-only variables for PUF clones. + # Stage 2a: donor-impute CPS feature variables for PUF clones. + logger.info("Stage-2a: rematching CPS features for PUF clones") + clone_feature_predictions = _impute_clone_cps_features( + data=new_data, + time_period=self.time_period, + dataset_path=str(self.cps.file_path), + ) + new_data = _splice_clone_feature_predictions( + data=new_data, + predictions=clone_feature_predictions, + time_period=self.time_period, + ) + + # Stage 2b: QRF-impute CPS-only continuous variables for PUF clones. # Train on CPS data using demographics + PUF-imputed income # as predictors, so the PUF clone half gets values consistent # with its imputed income rather than naive donor duplication. - logger.info("Stage-2: imputing CPS-only variables for PUF clones") + logger.info("Stage-2b: imputing CPS-only variables for PUF clones") cps_only_predictions = _impute_cps_only_variables( data=new_data, time_period=self.time_period, diff --git a/policyengine_us_data/tests/test_extended_cps.py b/policyengine_us_data/tests/test_extended_cps.py index 5ddf46929..0576259f5 100644 --- a/policyengine_us_data/tests/test_extended_cps.py +++ b/policyengine_us_data/tests/test_extended_cps.py @@ -16,8 +16,13 @@ OVERRIDDEN_IMPUTED_VARIABLES, ) from policyengine_us_data.datasets.cps.extended_cps import ( + CPS_CLONE_FEATURE_VARIABLES, CPS_ONLY_IMPUTED_VARIABLES, + CPS_CLONE_FEATURE_PREDICTORS, CPS_STAGE2_INCOME_PREDICTORS, + _build_clone_test_frame, + _derive_overtime_occupation_inputs, + _impute_clone_cps_features, apply_retirement_constraints, reconcile_ss_subcomponents, ) @@ -35,6 +40,12 @@ def test_no_overlap_overridden_and_cps_only(self): overlap = set(OVERRIDDEN_IMPUTED_VARIABLES) & set(CPS_ONLY_IMPUTED_VARIABLES) assert overlap == set(), f"Variables in both OVERRIDDEN and CPS_ONLY: {overlap}" + def test_no_overlap_clone_features_and_cps_only(self): + overlap = set(CPS_CLONE_FEATURE_VARIABLES) & set(CPS_ONLY_IMPUTED_VARIABLES) + assert overlap == set(), ( + f"Variables in both clone-feature and CPS_ONLY lists: {overlap}" + ) + def test_overridden_is_subset_of_imputed(self): not_in_imputed = set(OVERRIDDEN_IMPUTED_VARIABLES) - set(IMPUTED_VARIABLES) assert not_in_imputed == set(), ( @@ -322,3 +333,160 @@ def test_single_call_vs_separate_calls_differ(self, correlated_training_data): assert corr_seq > corr_indep, ( f"Sequential corr ({corr_seq:.3f}) should exceed independent corr ({corr_indep:.3f})" ) + + +class TestCloneFeatureImputation: + def test_build_clone_test_frame_overrides_person_and_household_features(self): + class FakeMicrosimulation: + def calculate_dataframe(self, columns): + base = pd.DataFrame( + { + "age": [30, 40], + "state_fips": [6, 36], + "tax_unit_is_joint": [0, 1], + "tax_unit_count_dependents": [0, 2], + "is_tax_unit_head": [1, 1], + "is_tax_unit_spouse": [0, 0], + "is_tax_unit_dependent": [0, 0], + "employment_income": [20_000, 35_000], + "self_employment_income": [0, 0], + "social_security": [0, 0], + } + ) + return base[columns] + + tp = 2024 + data = { + "person_id": {tp: np.array([1, 2, 101, 102])}, + "household_id": {tp: np.array([1, 2, 101, 102])}, + "person_household_id": {tp: np.array([1, 2, 101, 102])}, + "age": {tp: np.array([30, 40, 50, 60], dtype=np.float32)}, + "employment_income": { + tp: np.array([20_000, 35_000, 90_000, 150_000], dtype=np.float32) + }, + "self_employment_income": { + tp: np.zeros(4, dtype=np.float32) + }, + "social_security": {tp: np.zeros(4, dtype=np.float32)}, + "is_tax_unit_head": {tp: np.ones(4, dtype=bool)}, + "is_tax_unit_spouse": {tp: np.zeros(4, dtype=bool)}, + "is_tax_unit_dependent": {tp: np.zeros(4, dtype=bool)}, + "state_fips": {tp: np.array([6, 36, 12, 48], dtype=np.int16)}, + } + + result = _build_clone_test_frame( + FakeMicrosimulation(), + data, + tp, + CPS_CLONE_FEATURE_PREDICTORS, + ) + + assert result["age"].tolist() == [50, 60] + assert result["employment_income"].tolist() == [90_000, 150_000] + assert result["state_fips"].tolist() == [12, 48] + assert result["tax_unit_is_joint"].tolist() == [0, 1] + + def test_derive_overtime_occupation_inputs(self): + derived = _derive_overtime_occupation_inputs( + np.array([53, 52, 8, 41, 1, 99]) + ) + + assert derived["has_never_worked"].tolist() == [ + True, + False, + False, + False, + False, + False, + ] + assert derived["is_military"].tolist() == [ + False, + True, + False, + False, + False, + False, + ] + assert derived["is_computer_scientist"].tolist() == [ + False, + False, + True, + False, + False, + False, + ] + assert derived["is_farmer_fisher"].tolist() == [ + False, + False, + False, + True, + False, + False, + ] + assert derived["is_executive_administrative_professional"].tolist() == [ + False, + False, + False, + False, + True, + False, + ] + + def test_clone_feature_imputation_rematches_outputs_and_derives_flags( + self, monkeypatch + ): + import policyengine_us + + train = pd.DataFrame( + { + "age": [45, 17], + "state_fips": [1, 1], + "tax_unit_is_joint": [0, 0], + "tax_unit_count_dependents": [0, 1], + "is_tax_unit_head": [1, 0], + "is_tax_unit_spouse": [0, 0], + "is_tax_unit_dependent": [0, 1], + "employment_income": [95_000, 0], + "self_employment_income": [0, 0], + "social_security": [0, 0], + "is_male": [1, 0], + "cps_race": [2, 1], + "is_hispanic": [0, 1], + "detailed_occupation_recode": [8, 41], + } + ) + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = dataset + + def calculate_dataframe(self, columns): + return train[columns] + + monkeypatch.setattr(policyengine_us, "Microsimulation", FakeMicrosimulation) + + tp = 2024 + data = { + "person_id": {tp: np.array([1, 2, 101, 102])}, + "household_id": {tp: np.array([1, 2, 101, 102])}, + "person_household_id": {tp: np.array([1, 2, 101, 102])}, + "age": {tp: np.array([45, 17, 46, 17], dtype=np.float32)}, + "state_fips": {tp: np.array([1, 1, 1, 1], dtype=np.int16)}, + "tax_unit_is_joint": {tp: np.zeros(4, dtype=np.float32)}, + "tax_unit_count_dependents": {tp: np.array([0, 1, 0, 1], dtype=np.float32)}, + "is_tax_unit_head": {tp: np.array([1, 0, 1, 0], dtype=bool)}, + "is_tax_unit_spouse": {tp: np.zeros(4, dtype=bool)}, + "is_tax_unit_dependent": {tp: np.array([0, 1, 0, 1], dtype=bool)}, + "employment_income": {tp: np.array([95_000, 0, 97_000, 0], dtype=np.float32)}, + "self_employment_income": {tp: np.zeros(4, dtype=np.float32)}, + "social_security": {tp: np.zeros(4, dtype=np.float32)}, + } + + result = _impute_clone_cps_features(data, tp, "unused") + + assert result["detailed_occupation_recode"].tolist() == [8, 41] + assert result["is_male"].tolist() == [1, 0] + assert result["cps_race"].tolist() == [2, 1] + assert result["is_hispanic"].tolist() == [0, 1] + assert result["is_computer_scientist"].tolist() == [True, False] + assert result["is_farmer_fisher"].tolist() == [False, True] From 1a4aa89532ca5b8b68012d3846a53fd1b80c14a3 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 28 Mar 2026 09:16:15 -0400 Subject: [PATCH 2/3] Trigger upstream PR CI From f0a2e5e2c67ccb773f7c2a631858f5acabaafda0 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 28 Mar 2026 09:18:22 -0400 Subject: [PATCH 3/3] Refresh changelog fragment wording --- changelog.d/impute-cps-clone-features.changed.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/impute-cps-clone-features.changed.md b/changelog.d/impute-cps-clone-features.changed.md index 981900a05..47883bf2b 100644 --- a/changelog.d/impute-cps-clone-features.changed.md +++ b/changelog.d/impute-cps-clone-features.changed.md @@ -1 +1 @@ -Donor-impute race, Hispanic status, sex, and occupation-based CPS features onto the PUF clone half of the extended CPS so subgroup and overtime-eligibility inputs better align with PUF-imputed incomes. +Donor-impute race, Hispanic status, sex, and occupation-based CPS features onto the PUF clone half of the extended CPS so subgroup analyses and overtime-eligibility inputs better align with PUF-imputed incomes.