Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/impute-cps-clone-features.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Donor-impute race, Hispanic status, sex, and occupation-based CPS features onto the PUF clone half of the extended CPS so subgroup analyses and overtime-eligibility inputs better align with PUF-imputed incomes.
289 changes: 273 additions & 16 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import pandas as pd
from policyengine_core.data import Dataset

from policyengine_us_data.datasets.cps.cps import * # noqa: F403
from policyengine_us_data.datasets.puf import * # noqa: F403
from policyengine_us_data.datasets.cps.cps import CPS, CPS_2024, CPS_2024_Full
from policyengine_us_data.datasets.puf import PUF, PUF_2024
from policyengine_us_data.storage import STORAGE_FOLDER
from policyengine_us_data.utils.retirement_limits import (
get_retirement_limits,
Expand All @@ -16,10 +16,75 @@

logger = logging.getLogger(__name__)

# CPS-only categorical features to donor-impute onto the PUF clone half.
# These drive subgroup analysis and occupation-based logic, so naive donor
# duplication dilutes the relationship between the clone's PUF-imputed
# income and its CPS-side demographic/occupation labels.
CPS_CLONE_FEATURE_VARIABLES = [
"is_male",
"cps_race",
"is_hispanic",
"detailed_occupation_recode",
]

# Predictors used to rematch CPS features onto the PUF clone half.
# These are all available on the CPS half and on the doubled extended CPS.
CPS_CLONE_FEATURE_PREDICTORS = [
"age",
"state_fips",
"tax_unit_is_joint",
"tax_unit_count_dependents",
"is_tax_unit_head",
"is_tax_unit_spouse",
"is_tax_unit_dependent",
"employment_income",
"self_employment_income",
"social_security",
]

_OVERTIME_OCCUPATION_CODES = {
"has_never_worked": 53,
"is_military": 52,
"is_computer_scientist": 8,
"is_farmer_fisher": 41,
}
_EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES = np.array(
[
1,
2,
3,
5,
7,
9,
10,
11,
12,
13,
14,
15,
16,
18,
19,
20,
21,
22,
24,
25,
27,
28,
29,
30,
32,
33,
34,
],
dtype=np.int16,
)

# CPS-only variables that should be QRF-imputed for the PUF clone half
# instead of naively duplicated from the CPS donor. These are
# income-correlated variables that exist only in the CPS; demographics,
# IDs, weights, and random seeds are fine to duplicate.
# instead of naively duplicated from the CPS donor. Most demographics,
# IDs, weights, and random seeds are fine to duplicate; the categorical
# clone features above are rematched separately.
CPS_ONLY_IMPUTED_VARIABLES = [
# Retirement distributions
"taxable_401k_distributions",
Expand Down Expand Up @@ -98,6 +163,186 @@
]


def _clone_half_person_values(data: dict, variable: str, time_period: int):
"""Return clone-half values for ``variable`` mapped to person rows."""
if variable not in data:
return None

values = data[variable][time_period]
n_persons = len(data["person_id"][time_period])
n_persons_half = n_persons // 2
if len(values) == n_persons:
return np.asarray(values[n_persons_half:])

entity_mappings = [
("household_id", "person_household_id"),
("tax_unit_id", "person_tax_unit_id"),
("spm_unit_id", "person_spm_unit_id"),
("family_id", "person_family_id"),
]
for entity_id_var, person_entity_id_var in entity_mappings:
if entity_id_var not in data or person_entity_id_var not in data:
continue
entity_ids = data[entity_id_var][time_period]
if len(values) != len(entity_ids):
continue
entity_half = len(entity_ids) // 2
clone_entity_ids = entity_ids[entity_half:]
clone_person_entity_ids = data[person_entity_id_var][time_period][n_persons_half:]
value_map = dict(zip(clone_entity_ids, values[entity_half:]))
return np.array([value_map[idx] for idx in clone_person_entity_ids])

return None


def _build_clone_test_frame(
cps_sim,
data: dict,
time_period: int,
predictors: list[str],
) -> pd.DataFrame:
"""Build clone-half predictor data with available doubled-dataset overrides."""
X_test = cps_sim.calculate_dataframe(predictors).copy()
for predictor in predictors:
clone_values = _clone_half_person_values(data, predictor, time_period)
if clone_values is not None and len(clone_values) == len(X_test):
X_test[predictor] = clone_values
return X_test[predictors]


def _prepare_knn_matrix(
df: pd.DataFrame,
reference: pd.DataFrame | None = None,
) -> np.ndarray:
"""Normalise mixed-scale donor-matching predictors for kNN."""
X = df.astype(float).copy()
for income_var in CPS_STAGE2_INCOME_PREDICTORS:
if income_var in X:
X[income_var] = np.arcsinh(X[income_var])

ref = X if reference is None else reference.astype(float).copy()
for income_var in CPS_STAGE2_INCOME_PREDICTORS:
if income_var in ref:
ref[income_var] = np.arcsinh(ref[income_var])

means = ref.mean()
stds = ref.std(ddof=0).replace(0, 1)
normalised = (X - means) / stds
return np.nan_to_num(normalised.to_numpy(dtype=np.float32), nan=0.0)


def _derive_overtime_occupation_inputs(
occupation_codes: np.ndarray,
) -> pd.DataFrame:
"""Derive occupation-based overtime-exemption inputs from POCCU2."""
occupation_codes = np.rint(occupation_codes).astype(np.int16, copy=False)
derived = {
name: occupation_codes == code
for name, code in _OVERTIME_OCCUPATION_CODES.items()
}
derived["is_executive_administrative_professional"] = np.isin(
occupation_codes,
_EXECUTIVE_ADMINISTRATIVE_PROFESSIONAL_CODES,
)
return pd.DataFrame(derived)


def _impute_clone_cps_features(
data: dict,
time_period: int,
dataset_path: str,
) -> pd.DataFrame:
"""Rematch CPS demographic/occupation features for the clone half."""
from policyengine_us import Microsimulation
from sklearn.neighbors import NearestNeighbors

cps_sim = Microsimulation(dataset=dataset_path)
X_train = cps_sim.calculate_dataframe(
CPS_CLONE_FEATURE_PREDICTORS + CPS_CLONE_FEATURE_VARIABLES
)
available_outputs = [
variable for variable in CPS_CLONE_FEATURE_VARIABLES if variable in X_train.columns
]
if not available_outputs:
n_half = len(data["person_id"][time_period]) // 2
return pd.DataFrame(index=np.arange(n_half))

X_test = _build_clone_test_frame(
cps_sim,
data,
time_period,
CPS_CLONE_FEATURE_PREDICTORS,
)
del cps_sim

train_roles = (
X_train[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]]
.round()
.astype(int)
.apply(tuple, axis=1)
)
test_roles = (
X_test[["is_tax_unit_head", "is_tax_unit_spouse", "is_tax_unit_dependent"]]
.round()
.astype(int)
.apply(tuple, axis=1)
)

predictions = pd.DataFrame(index=X_test.index, columns=available_outputs)
for role in test_roles.unique():
test_mask = test_roles == role
train_mask = train_roles == role
if not train_mask.any():
train_mask = pd.Series(True, index=X_train.index)

train_predictors = X_train.loc[train_mask, CPS_CLONE_FEATURE_PREDICTORS]
test_predictors = X_test.loc[test_mask, CPS_CLONE_FEATURE_PREDICTORS]
train_matrix = _prepare_knn_matrix(train_predictors)
test_matrix = _prepare_knn_matrix(test_predictors, reference=train_predictors)

matcher = NearestNeighbors(n_neighbors=1)
matcher.fit(train_matrix)
donor_indices = matcher.kneighbors(
test_matrix,
return_distance=False,
).ravel()
donor_outputs = (
X_train.loc[train_mask, available_outputs]
.iloc[donor_indices]
.reset_index(drop=True)
)
predictions.loc[test_mask, available_outputs] = donor_outputs.to_numpy()

if "detailed_occupation_recode" in predictions:
occupation_codes = predictions["detailed_occupation_recode"].astype(float).to_numpy()
for column, values in _derive_overtime_occupation_inputs(occupation_codes).items():
predictions[column] = values

return predictions


def _splice_clone_feature_predictions(
data: dict,
predictions: pd.DataFrame,
time_period: int,
) -> dict:
"""Replace clone-half person-level feature variables with donor matches."""
n_half = len(data["person_id"][time_period]) // 2
for variable in predictions.columns:
if variable not in data:
continue
values = data[variable][time_period]
new_values = np.array(values, copy=True)
pred_values = predictions[variable].to_numpy()
if np.issubdtype(new_values.dtype, np.bool_):
pred_values = pred_values.astype(bool, copy=False)
else:
pred_values = pred_values.astype(new_values.dtype, copy=False)
new_values[n_half:] = pred_values
data[variable] = {time_period: new_values}
return data


def _impute_cps_only_variables(
data: dict,
time_period: int,
Expand Down Expand Up @@ -161,17 +406,16 @@ def _impute_cps_only_variables(
missing_outputs,
)

# Build PUF clone test data: demographics from CPS sim (PUF clones
# share demographics with their CPS donors), income from the
# PUF-imputed values in the second half of the doubled data.
n_persons_half = len(data["person_id"][time_period]) // 2
X_test = cps_sim.calculate_dataframe(CPS_STAGE2_DEMOGRAPHIC_PREDICTORS)
# Build PUF clone test data from the clone half itself, falling back to
# the CPS sim for formula variables that are not stored in the dataset.
X_test = _build_clone_test_frame(
cps_sim,
data,
time_period,
all_predictors,
)
del cps_sim

for var in CPS_STAGE2_INCOME_PREDICTORS:
# Income comes from PUF imputation in the second half.
X_test[var] = data[var][time_period][n_persons_half:]

logger.info(
"Stage-2 CPS-only imputation: %d outputs, "
"training on %d CPS persons, predicting for %d PUF clones",
Expand Down Expand Up @@ -427,11 +671,24 @@ def generate(self):
dataset_path=str(self.cps.file_path),
)

# Stage 2: QRF-impute CPS-only variables for PUF clones.
# Stage 2a: donor-impute CPS feature variables for PUF clones.
logger.info("Stage-2a: rematching CPS features for PUF clones")
clone_feature_predictions = _impute_clone_cps_features(
data=new_data,
time_period=self.time_period,
dataset_path=str(self.cps.file_path),
)
new_data = _splice_clone_feature_predictions(
data=new_data,
predictions=clone_feature_predictions,
time_period=self.time_period,
)

# Stage 2b: QRF-impute CPS-only continuous variables for PUF clones.
# Train on CPS data using demographics + PUF-imputed income
# as predictors, so the PUF clone half gets values consistent
# with its imputed income rather than naive donor duplication.
logger.info("Stage-2: imputing CPS-only variables for PUF clones")
logger.info("Stage-2b: imputing CPS-only variables for PUF clones")
cps_only_predictions = _impute_cps_only_variables(
data=new_data,
time_period=self.time_period,
Expand Down
Loading