Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 83 additions & 8 deletions policyengine_us_data/calibration/source_impute.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""Non-PUF QRF imputations from donor surveys.

Re-imputes variables from ACS, SIPP, and SCF donor surveys.
Only ACS includes state_fips as a QRF predictor (ACS has state
identifiers). SIPP and SCF lack state data, so their imputations
use only demographic and financial predictors.
Re-imputes variables from ACS, SIPP, ORG, and SCF donor surveys.
Only ACS and ORG include state_fips as a QRF predictor. SIPP and SCF
lack state data, so their imputations use only demographic and
financial predictors.

Sources and variables:
ACS -> rent, real_estate_taxes (with state predictor)
SIPP -> tip_income, bank_account_assets, stock_assets,
bond_assets (no state predictor)
ORG -> hourly_wage, is_paid_hourly,
is_union_member_or_covered
SCF -> net_worth, auto_loan_balance, auto_loan_interest
(no state predictor)

Expand All @@ -27,6 +29,13 @@
import numpy as np
import pandas as pd

from policyengine_us_data.datasets.org import (
ORG_BOOL_VARIABLES,
ORG_IMPUTED_VARIABLES,
build_org_receiver_frame,
predict_org_features,
)

logger = logging.getLogger(__name__)

ACS_IMPUTED_VARIABLES = [
Expand All @@ -48,7 +57,10 @@
]

ALL_SOURCE_VARIABLES = (
ACS_IMPUTED_VARIABLES + SIPP_IMPUTED_VARIABLES + SCF_IMPUTED_VARIABLES
ACS_IMPUTED_VARIABLES
+ SIPP_IMPUTED_VARIABLES
+ ORG_IMPUTED_VARIABLES
+ SCF_IMPUTED_VARIABLES
)

ACS_PREDICTORS = [
Expand Down Expand Up @@ -118,13 +130,15 @@ def impute_source_variables(
dataset_path: Optional[str] = None,
skip_acs: bool = False,
skip_sipp: bool = False,
skip_org: bool = False,
skip_scf: bool = False,
) -> Dict[str, Dict[int, np.ndarray]]:
"""Re-impute ACS/SIPP/SCF variables from donor surveys.
"""Re-impute ACS/SIPP/ORG/SCF variables from donor surveys.

Overwrites existing imputed values in data. ACS uses
state_fips as a QRF predictor; SIPP and SCF use only
demographic and financial predictors (no state data).
state_fips as a QRF predictor; ORG uses state plus labor-market
predictors; SIPP and SCF use only demographic and financial
predictors (no state data).

Args:
data: CPS dataset dict {variable: {time_period: array}}.
Expand All @@ -133,6 +147,7 @@ def impute_source_variables(
dataset_path: Path to CPS h5 for Microsimulation.
skip_acs: Skip ACS imputation.
skip_sipp: Skip SIPP imputation.
skip_org: Skip ORG imputation.
skip_scf: Skip SCF imputation.

Returns:
Expand All @@ -150,6 +165,10 @@ def impute_source_variables(
logger.info("Imputing SIPP variables")
data = _impute_sipp(data, state_fips, time_period, dataset_path)

if not skip_org:
logger.info("Imputing ORG variables")
data = _impute_org(data, state_fips, time_period, dataset_path)

if not skip_scf:
logger.info("Imputing SCF variables")
data = _impute_scf(data, state_fips, time_period, dataset_path)
Expand Down Expand Up @@ -700,3 +719,59 @@ def _impute_scf(

logger.info("SCF imputation complete: %s", available_vars)
return data


def _impute_org(
data: Dict[str, Dict[int, np.ndarray]],
state_fips: np.ndarray,
time_period: int,
dataset_path: Optional[str] = None,
) -> Dict[str, Dict[int, np.ndarray]]:
"""Impute ORG-only labor-market variables onto CPS persons."""
pe_vars = [
"age",
"is_male",
"is_hispanic",
"cps_race",
"employment_income",
"weekly_hours_worked",
"self_employment_income",
]
cps_df = _build_cps_receiver(data, time_period, dataset_path, pe_vars)

if "is_male" in cps_df.columns:
is_female = (~cps_df["is_male"].astype(bool)).astype(np.float32).values
elif "is_female" in data:
is_female = data["is_female"][time_period].astype(np.float32)
else:
is_female = np.zeros(len(cps_df), dtype=np.float32)

person_states = _person_state_fips(data, state_fips, time_period)
receiver = build_org_receiver_frame(
age=cps_df["age"].values,
is_female=is_female,
is_hispanic=cps_df["is_hispanic"].values,
cps_race=cps_df["cps_race"].values,
state_fips=person_states,
employment_income=cps_df["employment_income"].values,
weekly_hours_worked=cps_df["weekly_hours_worked"].values,
)
self_employment_income = (
cps_df["self_employment_income"].values
if "self_employment_income" in cps_df.columns
else None
)
predictions = predict_org_features(
receiver,
self_employment_income=self_employment_income,
)

for var in ORG_IMPUTED_VARIABLES:
values = predictions[var].values
if var in ORG_BOOL_VARIABLES:
data[var] = {time_period: values.astype(bool)}
else:
data[var] = {time_period: values.astype(np.float32)}

logger.info("ORG imputation complete: %s", ORG_IMPUTED_VARIABLES)
return data
56 changes: 56 additions & 0 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
align_reported_ssi_disability,
prioritize_reported_recipients,
)
from policyengine_us_data.datasets.org import (
ORG_BOOL_VARIABLES,
ORG_IMPUTED_VARIABLES,
build_org_receiver_frame,
predict_org_features,
)
from policyengine_us_data.utils.randomness import seeded_rng


Expand Down Expand Up @@ -77,6 +83,8 @@ def generate(self):
add_rent(self, cps, person, household)
logging.info("Adding tips")
add_tips(self, cps)
logging.info("Adding ORG labor-market inputs")
add_org_labor_market_inputs(cps)
logging.info("Adding auto loan balance, interest and wealth")
add_auto_loan_interest_and_net_worth(self, cps)
logging.info("Added all variables")
Expand Down Expand Up @@ -1828,6 +1836,54 @@ def add_tips(self, cps: h5py.File):
self.save_dataset(cps)


def add_org_labor_market_inputs(cps: h5py.File) -> None:
"""Impute ORG-derived wage and union inputs onto CPS persons."""
household_ids = np.asarray(cps["household_id"], dtype=np.int64)
person_household_ids = np.asarray(
cps["person_household_id"],
dtype=np.int64,
)
household_state_fips = np.asarray(cps["state_fips"], dtype=np.float32)
household_index = {
int(household_id): i for i, household_id in enumerate(household_ids)
}
person_state_fips = np.array(
[
household_state_fips[household_index[int(household_id)]]
for household_id in person_household_ids
],
dtype=np.float32,
)

receiver = build_org_receiver_frame(
age=cps["age"],
is_female=cps["is_female"],
is_hispanic=cps["is_hispanic"],
cps_race=cps["cps_race"],
state_fips=person_state_fips,
employment_income=cps["employment_income"],
weekly_hours_worked=cps["weekly_hours_worked"],
)
self_employment_income = np.asarray(
cps.get(
"self_employment_income",
np.zeros(len(receiver), dtype=np.float32),
),
dtype=np.float32,
)
predictions = predict_org_features(
receiver,
self_employment_income=self_employment_income,
)

for variable in ORG_IMPUTED_VARIABLES:
values = predictions[variable].values
if variable in ORG_BOOL_VARIABLES:
cps[variable] = values.astype(bool)
else:
cps[variable] = values.astype(np.float32)


def add_overtime_occupation(cps: h5py.File, person: DataFrame) -> None:
"""Add occupation categories relevant to overtime eligibility calculations.
Based on:
Expand Down
30 changes: 30 additions & 0 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
from policyengine_core.data import Dataset

from policyengine_us_data.datasets.cps.cps import CPS, CPS_2024, CPS_2024_Full
from policyengine_us_data.datasets.org import (
ORG_IMPUTED_VARIABLES,
apply_org_domain_constraints,
)
from policyengine_us_data.datasets.puf import PUF, PUF_2024
from policyengine_us_data.storage import STORAGE_FOLDER
from policyengine_us_data.utils.mortgage_interest import (
Expand Down Expand Up @@ -85,6 +89,10 @@ def _supports_structural_mortgage_inputs() -> bool:
# Hours/employment
"weekly_hours_worked",
"hours_worked_last_week",
# ORG labor-market variables
"hourly_wage",
"is_paid_hourly",
"is_union_member_or_covered",
# Previous year income
"employment_income_last_year",
"self_employment_income_last_year",
Expand Down Expand Up @@ -336,6 +344,28 @@ def _apply_post_processing(predictions, X_test, time_period, data):
for col in ss_cols:
predictions[col] = reconciled[col]

org_cols = [c for c in predictions.columns if c in ORG_IMPUTED_VARIABLES]
if org_cols:
n_half = len(data["person_id"][time_period]) // 2
weekly_hours = (
predictions["weekly_hours_worked"].values
if "weekly_hours_worked" in predictions.columns
else data["weekly_hours_worked"][time_period][n_half:]
)
receiver = pd.DataFrame(
{
"employment_income": X_test["employment_income"].values,
"weekly_hours_worked": np.asarray(weekly_hours, dtype=np.float32),
}
)
constrained = apply_org_domain_constraints(
predictions[org_cols],
receiver,
self_employment_income=X_test["self_employment_income"].values,
)
for col in org_cols:
predictions[col] = constrained[col]

return predictions


Expand Down
10 changes: 10 additions & 0 deletions policyengine_us_data/datasets/org/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .org import (
ORG_BOOL_VARIABLES,
ORG_IMPUTED_VARIABLES,
ORG_PREDICTORS,
apply_org_domain_constraints,
build_org_receiver_frame,
get_org_model,
load_org_training_data,
predict_org_features,
)
Loading
Loading