diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index 25c7975ad..883d68dc7 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -1,14 +1,16 @@ """Non-PUF QRF imputations from donor surveys. -Re-imputes variables from ACS, SIPP, and SCF donor surveys. -Only ACS includes state_fips as a QRF predictor (ACS has state -identifiers). SIPP and SCF lack state data, so their imputations -use only demographic and financial predictors. +Re-imputes variables from ACS, SIPP, ORG, and SCF donor surveys. +Only ACS and ORG include state_fips as a QRF predictor. SIPP and SCF +lack state data, so their imputations use only demographic and +financial predictors. Sources and variables: ACS -> rent, real_estate_taxes (with state predictor) SIPP -> tip_income, bank_account_assets, stock_assets, bond_assets (no state predictor) + ORG -> hourly_wage, is_paid_hourly, + is_union_member_or_covered SCF -> net_worth, auto_loan_balance, auto_loan_interest (no state predictor) @@ -27,6 +29,13 @@ import numpy as np import pandas as pd +from policyengine_us_data.datasets.org import ( + ORG_BOOL_VARIABLES, + ORG_IMPUTED_VARIABLES, + build_org_receiver_frame, + predict_org_features, +) + logger = logging.getLogger(__name__) ACS_IMPUTED_VARIABLES = [ @@ -48,7 +57,10 @@ ] ALL_SOURCE_VARIABLES = ( - ACS_IMPUTED_VARIABLES + SIPP_IMPUTED_VARIABLES + SCF_IMPUTED_VARIABLES + ACS_IMPUTED_VARIABLES + + SIPP_IMPUTED_VARIABLES + + ORG_IMPUTED_VARIABLES + + SCF_IMPUTED_VARIABLES ) ACS_PREDICTORS = [ @@ -118,13 +130,15 @@ def impute_source_variables( dataset_path: Optional[str] = None, skip_acs: bool = False, skip_sipp: bool = False, + skip_org: bool = False, skip_scf: bool = False, ) -> Dict[str, Dict[int, np.ndarray]]: - """Re-impute ACS/SIPP/SCF variables from donor surveys. + """Re-impute ACS/SIPP/ORG/SCF variables from donor surveys. Overwrites existing imputed values in data. ACS uses - state_fips as a QRF predictor; SIPP and SCF use only - demographic and financial predictors (no state data). + state_fips as a QRF predictor; ORG uses state plus labor-market + predictors; SIPP and SCF use only demographic and financial + predictors (no state data). Args: data: CPS dataset dict {variable: {time_period: array}}. @@ -133,6 +147,7 @@ def impute_source_variables( dataset_path: Path to CPS h5 for Microsimulation. skip_acs: Skip ACS imputation. skip_sipp: Skip SIPP imputation. + skip_org: Skip ORG imputation. skip_scf: Skip SCF imputation. Returns: @@ -150,6 +165,10 @@ def impute_source_variables( logger.info("Imputing SIPP variables") data = _impute_sipp(data, state_fips, time_period, dataset_path) + if not skip_org: + logger.info("Imputing ORG variables") + data = _impute_org(data, state_fips, time_period, dataset_path) + if not skip_scf: logger.info("Imputing SCF variables") data = _impute_scf(data, state_fips, time_period, dataset_path) @@ -700,3 +719,59 @@ def _impute_scf( logger.info("SCF imputation complete: %s", available_vars) return data + + +def _impute_org( + data: Dict[str, Dict[int, np.ndarray]], + state_fips: np.ndarray, + time_period: int, + dataset_path: Optional[str] = None, +) -> Dict[str, Dict[int, np.ndarray]]: + """Impute ORG-only labor-market variables onto CPS persons.""" + pe_vars = [ + "age", + "is_male", + "is_hispanic", + "cps_race", + "employment_income", + "weekly_hours_worked", + "self_employment_income", + ] + cps_df = _build_cps_receiver(data, time_period, dataset_path, pe_vars) + + if "is_male" in cps_df.columns: + is_female = (~cps_df["is_male"].astype(bool)).astype(np.float32).values + elif "is_female" in data: + is_female = data["is_female"][time_period].astype(np.float32) + else: + is_female = np.zeros(len(cps_df), dtype=np.float32) + + person_states = _person_state_fips(data, state_fips, time_period) + receiver = build_org_receiver_frame( + age=cps_df["age"].values, + is_female=is_female, + is_hispanic=cps_df["is_hispanic"].values, + cps_race=cps_df["cps_race"].values, + state_fips=person_states, + employment_income=cps_df["employment_income"].values, + weekly_hours_worked=cps_df["weekly_hours_worked"].values, + ) + self_employment_income = ( + cps_df["self_employment_income"].values + if "self_employment_income" in cps_df.columns + else None + ) + predictions = predict_org_features( + receiver, + self_employment_income=self_employment_income, + ) + + for var in ORG_IMPUTED_VARIABLES: + values = predictions[var].values + if var in ORG_BOOL_VARIABLES: + data[var] = {time_period: values.astype(bool)} + else: + data[var] = {time_period: values.astype(np.float32)} + + logger.info("ORG imputation complete: %s", ORG_IMPUTED_VARIABLES) + return data diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 83eb8a7d1..148e83e4c 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -18,6 +18,12 @@ align_reported_ssi_disability, prioritize_reported_recipients, ) +from policyengine_us_data.datasets.org import ( + ORG_BOOL_VARIABLES, + ORG_IMPUTED_VARIABLES, + build_org_receiver_frame, + predict_org_features, +) from policyengine_us_data.utils.randomness import seeded_rng @@ -77,6 +83,8 @@ def generate(self): add_rent(self, cps, person, household) logging.info("Adding tips") add_tips(self, cps) + logging.info("Adding ORG labor-market inputs") + add_org_labor_market_inputs(cps) logging.info("Adding auto loan balance, interest and wealth") add_auto_loan_interest_and_net_worth(self, cps) logging.info("Added all variables") @@ -1828,6 +1836,54 @@ def add_tips(self, cps: h5py.File): self.save_dataset(cps) +def add_org_labor_market_inputs(cps: h5py.File) -> None: + """Impute ORG-derived wage and union inputs onto CPS persons.""" + household_ids = np.asarray(cps["household_id"], dtype=np.int64) + person_household_ids = np.asarray( + cps["person_household_id"], + dtype=np.int64, + ) + household_state_fips = np.asarray(cps["state_fips"], dtype=np.float32) + household_index = { + int(household_id): i for i, household_id in enumerate(household_ids) + } + person_state_fips = np.array( + [ + household_state_fips[household_index[int(household_id)]] + for household_id in person_household_ids + ], + dtype=np.float32, + ) + + receiver = build_org_receiver_frame( + age=cps["age"], + is_female=cps["is_female"], + is_hispanic=cps["is_hispanic"], + cps_race=cps["cps_race"], + state_fips=person_state_fips, + employment_income=cps["employment_income"], + weekly_hours_worked=cps["weekly_hours_worked"], + ) + self_employment_income = np.asarray( + cps.get( + "self_employment_income", + np.zeros(len(receiver), dtype=np.float32), + ), + dtype=np.float32, + ) + predictions = predict_org_features( + receiver, + self_employment_income=self_employment_income, + ) + + for variable in ORG_IMPUTED_VARIABLES: + values = predictions[variable].values + if variable in ORG_BOOL_VARIABLES: + cps[variable] = values.astype(bool) + else: + cps[variable] = values.astype(np.float32) + + def add_overtime_occupation(cps: h5py.File, person: DataFrame) -> None: """Add occupation categories relevant to overtime eligibility calculations. Based on: diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index e08717368..ca5c38afd 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -7,6 +7,10 @@ from policyengine_core.data import Dataset from policyengine_us_data.datasets.cps.cps import CPS, CPS_2024, CPS_2024_Full +from policyengine_us_data.datasets.org import ( + ORG_IMPUTED_VARIABLES, + apply_org_domain_constraints, +) from policyengine_us_data.datasets.puf import PUF, PUF_2024 from policyengine_us_data.storage import STORAGE_FOLDER from policyengine_us_data.utils.mortgage_interest import ( @@ -85,6 +89,10 @@ def _supports_structural_mortgage_inputs() -> bool: # Hours/employment "weekly_hours_worked", "hours_worked_last_week", + # ORG labor-market variables + "hourly_wage", + "is_paid_hourly", + "is_union_member_or_covered", # Previous year income "employment_income_last_year", "self_employment_income_last_year", @@ -336,6 +344,28 @@ def _apply_post_processing(predictions, X_test, time_period, data): for col in ss_cols: predictions[col] = reconciled[col] + org_cols = [c for c in predictions.columns if c in ORG_IMPUTED_VARIABLES] + if org_cols: + n_half = len(data["person_id"][time_period]) // 2 + weekly_hours = ( + predictions["weekly_hours_worked"].values + if "weekly_hours_worked" in predictions.columns + else data["weekly_hours_worked"][time_period][n_half:] + ) + receiver = pd.DataFrame( + { + "employment_income": X_test["employment_income"].values, + "weekly_hours_worked": np.asarray(weekly_hours, dtype=np.float32), + } + ) + constrained = apply_org_domain_constraints( + predictions[org_cols], + receiver, + self_employment_income=X_test["self_employment_income"].values, + ) + for col in org_cols: + predictions[col] = constrained[col] + return predictions diff --git a/policyengine_us_data/datasets/org/__init__.py b/policyengine_us_data/datasets/org/__init__.py new file mode 100644 index 000000000..93aae2248 --- /dev/null +++ b/policyengine_us_data/datasets/org/__init__.py @@ -0,0 +1,10 @@ +from .org import ( + ORG_BOOL_VARIABLES, + ORG_IMPUTED_VARIABLES, + ORG_PREDICTORS, + apply_org_domain_constraints, + build_org_receiver_frame, + get_org_model, + load_org_training_data, + predict_org_features, +) diff --git a/policyengine_us_data/datasets/org/org.py b/policyengine_us_data/datasets/org/org.py new file mode 100644 index 000000000..b709bbc69 --- /dev/null +++ b/policyengine_us_data/datasets/org/org.py @@ -0,0 +1,473 @@ +from functools import lru_cache + +from microimpute.models.qrf import QRF +import numpy as np +import pandas as pd + +from policyengine_us_data.storage import STORAGE_FOLDER + +ORG_FILENAME = "census_cps_org_2024_wages.csv.gz" +ORG_YEAR = 2024 +ORG_MONTHS = ( + "jan", + "feb", + "mar", + "apr", + "may", + "jun", + "jul", + "aug", + "sep", + "oct", + "nov", + "dec", +) +CPS_BASIC_MONTHLY_ORG_COLUMNS = [ + "HRMIS", + "gestfips", + "prtage", + "pesex", + "ptdtrace", + "pehspnon", + "pworwgt", + "pternwa", + "pternhly", + "peernhry", + "pehruslt", + "prerelg", + "pemlr", + "peio1cow", +] + +ORG_PREDICTORS = [ + "employment_income", + "weekly_hours_worked", + "age", + "is_female", + "is_hispanic", + "race_wbho", + "state_fips", +] + +ORG_IMPUTED_VARIABLES = [ + "hourly_wage", + "is_paid_hourly", + "is_union_member_or_covered", +] +ORG_QRF_IMPUTED_VARIABLES = [ + "hourly_wage", + "is_paid_hourly", +] + +ORG_BOOL_VARIABLES = [ + "is_paid_hourly", + "is_union_member_or_covered", +] + +# BLS Table 1 and Table 5 represented-by-union rates for 2024. +# Sources: +# https://www.bls.gov/news.release/union2.t01.htm +# https://www.bls.gov/news.release/union2.t05.htm +BLS_UNION_REPRESENTATION_RATE_2024 = np.float32(0.111) +BLS_SEX_UNION_REPRESENTATION_RATE_2024 = { + False: np.float32(0.113), # men + True: np.float32(0.108), # women +} +BLS_AGE_UNION_REPRESENTATION_RATE_2024 = { + (16, 24): np.float32(0.054), + (25, 34): np.float32(0.099), + (35, 44): np.float32(0.122), + (45, 54): np.float32(0.138), + (55, 64): np.float32(0.130), + (65, 150): np.float32(0.102), +} +BLS_RACE_WBHO_UNION_REPRESENTATION_RATE_2024 = { + 1: np.float32(0.108), # White, non-Hispanic + 2: np.float32(0.132), # Black, non-Hispanic + 3: np.float32(0.097), # Hispanic + 4: BLS_UNION_REPRESENTATION_RATE_2024, # Other / not separately published +} +BLS_FULL_TIME_UNION_REPRESENTATION_RATE_2024 = np.float32(0.120) +BLS_PART_TIME_UNION_REPRESENTATION_RATE_2024 = np.float32(0.066) +BLS_STATE_UNION_REPRESENTATION_RATE_2024 = { + 1: np.float32(0.078), + 2: np.float32(0.195), + 4: np.float32(0.045), + 5: np.float32(0.044), + 6: np.float32(0.163), + 8: np.float32(0.080), + 9: np.float32(0.178), + 10: np.float32(0.089), + 11: np.float32(0.117), + 12: np.float32(0.063), + 13: np.float32(0.044), + 15: np.float32(0.275), + 16: np.float32(0.059), + 17: np.float32(0.142), + 18: np.float32(0.104), + 19: np.float32(0.083), + 20: np.float32(0.080), + 21: np.float32(0.112), + 22: np.float32(0.050), + 23: np.float32(0.153), + 24: np.float32(0.134), + 25: np.float32(0.156), + 26: np.float32(0.147), + 27: np.float32(0.148), + 28: np.float32(0.079), + 29: np.float32(0.093), + 30: np.float32(0.131), + 31: np.float32(0.081), + 32: np.float32(0.134), + 33: np.float32(0.106), + 34: np.float32(0.174), + 35: np.float32(0.088), + 36: np.float32(0.219), + 37: np.float32(0.031), + 38: np.float32(0.063), + 39: np.float32(0.133), + 40: np.float32(0.062), + 41: np.float32(0.175), + 42: np.float32(0.124), + 44: np.float32(0.153), + 45: np.float32(0.041), + 46: np.float32(0.037), + 47: np.float32(0.056), + 48: np.float32(0.054), + 49: np.float32(0.078), + 50: np.float32(0.158), + 51: np.float32(0.057), + 53: np.float32(0.183), + 54: np.float32(0.100), + 55: np.float32(0.069), + 56: np.float32(0.067), +} + + +def _derive_wbho_from_cps_race( + cps_race: np.ndarray, is_hispanic: np.ndarray +) -> np.ndarray: + """Map CPS race + Hispanic flag into a four-way WBHO-style scheme.""" + cps_race = np.asarray(cps_race) + is_hispanic = np.asarray(is_hispanic).astype(bool) + return np.select( + [ + is_hispanic, + (cps_race == 1) & ~is_hispanic, + (cps_race == 2) & ~is_hispanic, + ], + [ + 3, + 1, + 2, + ], + default=4, + ).astype(np.int8) + + +def _cps_basic_org_month_url(year: int, month: str) -> str: + year_suffix = str(year)[-2:] + return ( + f"https://www2.census.gov/programs-surveys/cps/datasets/" + f"{year}/basic/{month}{year_suffix}pub.csv" + ) + + +def _transform_cps_basic_org_month(month_df: pd.DataFrame) -> pd.DataFrame: + """Convert one monthly CPS basic file into ORG donor rows. + + Uses the official public-use outgoing rotation group records and + reconstructs hourly wage from the public weekly/hourly earnings + recodes when a direct hourly rate is not reported. + """ + org = month_df.copy() + org = org.loc[ + org["HRMIS"].isin([4, 8]) + & (org["pworwgt"] > 0) + & (org["prerelg"] == 1) + & org["pemlr"].isin([1, 2]) + & org["peio1cow"].isin([1, 2, 3, 4, 5]) + & org["peernhry"].isin([1, 2]) + & (org["gestfips"] > 0) + & (org["prtage"] >= 16) + & (org["pehruslt"] > 0) + & (org["pternwa"] > 0) + ].copy() + + weekly_earnings = org["pternwa"].astype(np.float32) / 100 + direct_hourly_wage = org["pternhly"].astype(np.float32) / 100 + weekly_hours_worked = org["pehruslt"].astype(np.float32) + imputed_hourly_wage = weekly_earnings / weekly_hours_worked + + is_hispanic = (org["pehspnon"] == 1).astype(np.float32) + org["employment_income"] = weekly_earnings * 52 + org["weekly_hours_worked"] = weekly_hours_worked + org["age"] = org["prtage"].astype(np.float32) + org["is_female"] = (org["pesex"] == 2).astype(np.float32) + org["is_hispanic"] = is_hispanic + org["race_wbho"] = _derive_wbho_from_cps_race( + org["ptdtrace"].values, + is_hispanic.values, + ).astype(np.float32) + org["state_fips"] = org["gestfips"].astype(np.float32) + org["hourly_wage"] = np.where( + (org["peernhry"] == 1) & (direct_hourly_wage > 0), + direct_hourly_wage, + imputed_hourly_wage, + ).astype(np.float32) + org["is_paid_hourly"] = (org["peernhry"] == 1).astype(np.float32) + org["sample_weight"] = org["pworwgt"].astype(np.float32) + + org = org.loc[ + (org["employment_income"] > 0) + & (org["weekly_hours_worked"] > 0) + & (org["hourly_wage"] > 0) + ].copy() + + return org[ORG_PREDICTORS + ORG_QRF_IMPUTED_VARIABLES + ["sample_weight"]] + + +def build_org_receiver_frame( + *, + age: np.ndarray, + is_female: np.ndarray, + is_hispanic: np.ndarray, + cps_race: np.ndarray, + state_fips: np.ndarray, + employment_income: np.ndarray, + weekly_hours_worked: np.ndarray, +) -> pd.DataFrame: + """Build the receiver-side feature frame used by ORG QRF models.""" + receiver = pd.DataFrame( + { + "employment_income": np.asarray(employment_income, dtype=np.float32), + "weekly_hours_worked": np.asarray(weekly_hours_worked, dtype=np.float32), + "age": np.asarray(age, dtype=np.float32), + "is_female": np.asarray(is_female, dtype=np.float32), + "is_hispanic": np.asarray(is_hispanic, dtype=np.float32), + "state_fips": np.asarray(state_fips, dtype=np.float32), + } + ) + receiver["race_wbho"] = _derive_wbho_from_cps_race( + cps_race=cps_race, + is_hispanic=is_hispanic, + ).astype(np.float32) + return receiver + + +def _lookup_state_union_representation_rates( + state_fips: np.ndarray, +) -> np.ndarray: + rates = np.full( + len(state_fips), + BLS_UNION_REPRESENTATION_RATE_2024, + dtype=np.float32, + ) + state_fips = np.asarray(state_fips, dtype=np.int32) + for fips, rate in BLS_STATE_UNION_REPRESENTATION_RATE_2024.items(): + rates[state_fips == fips] = rate + return rates + + +def _build_union_priority_weights(receiver: pd.DataFrame) -> np.ndarray: + base_rate = float(BLS_UNION_REPRESENTATION_RATE_2024) + + age = np.asarray(receiver["age"], dtype=np.float32) + age_rates = np.full(len(receiver), base_rate, dtype=np.float32) + for (lower, upper), rate in BLS_AGE_UNION_REPRESENTATION_RATE_2024.items(): + age_rates[(age >= lower) & (age <= upper)] = rate + + is_female = np.asarray(receiver["is_female"] >= 0.5, dtype=bool) + sex_rates = np.where( + is_female, + BLS_SEX_UNION_REPRESENTATION_RATE_2024[True], + BLS_SEX_UNION_REPRESENTATION_RATE_2024[False], + ).astype(np.float32) + + race_wbho = np.asarray(receiver["race_wbho"], dtype=np.int32) + race_rates = np.full(len(receiver), base_rate, dtype=np.float32) + for race_code, rate in BLS_RACE_WBHO_UNION_REPRESENTATION_RATE_2024.items(): + race_rates[race_wbho == race_code] = rate + + weekly_hours = np.asarray(receiver["weekly_hours_worked"], dtype=np.float32) + hours_rates = np.where( + weekly_hours >= 35, + BLS_FULL_TIME_UNION_REPRESENTATION_RATE_2024, + BLS_PART_TIME_UNION_REPRESENTATION_RATE_2024, + ).astype(np.float32) + + weights = ( + (age_rates / base_rate) + * (sex_rates / base_rate) + * (race_rates / base_rate) + * (hours_rates / base_rate) + ) + return np.clip(weights.astype(np.float64), 1e-6, None) + + +def _stable_uniform_from_receiver(receiver: pd.DataFrame) -> np.ndarray: + hash_frame = pd.DataFrame( + { + col: np.round( + np.asarray(receiver[col], dtype=np.float64), + 4, + ) + for col in ORG_PREDICTORS + } + ) + hashes = pd.util.hash_pandas_object(hash_frame, index=False).to_numpy( + dtype=np.uint64 + ) + return (hashes.astype(np.float64) + 0.5) / (np.iinfo(np.uint64).max + 1.0) + + +def _predict_union_coverage_from_bls_tables( + receiver: pd.DataFrame, + *, + self_employment_income: np.ndarray | None = None, +) -> np.ndarray: + """Assign union coverage using official BLS annual rates. + + The public-use monthly CPS union variables appear unusable for + donor imputation, so union coverage is assigned from BLS-published + 2024 state rates with demographic reweighting from the national + characteristic tables. + """ + result = np.zeros(len(receiver), dtype=np.float32) + if len(receiver) == 0: + return result + + employment_income = np.asarray( + receiver["employment_income"], + dtype=np.float32, + ) + weekly_hours_worked = np.asarray( + receiver["weekly_hours_worked"], + dtype=np.float32, + ) + age = np.asarray(receiver["age"], dtype=np.float32) + eligible = (employment_income > 0) & (weekly_hours_worked > 0) & (age >= 16) + if self_employment_income is not None: + self_employment_income = np.asarray( + self_employment_income, + dtype=np.float32, + ) + eligible &= ~((self_employment_income > 0) & (employment_income <= 0)) + + if not eligible.any(): + return result + + state_fips = np.nan_to_num( + np.asarray(receiver["state_fips"], dtype=np.float32), + nan=-1, + ).astype(np.int32) + target_rates = _lookup_state_union_representation_rates(state_fips) + priority_weights = _build_union_priority_weights(receiver) + uniforms = np.clip(_stable_uniform_from_receiver(receiver), 1e-12, 1 - 1e-12) + selection_keys = -np.log(uniforms) / priority_weights + + for state in np.unique(state_fips[eligible]): + state_mask = eligible & (state_fips == state) + state_idx = np.flatnonzero(state_mask) + if len(state_idx) == 0: + continue + + state_rate = float(target_rates[state_idx[0]]) + n_union = int(np.rint(state_rate * len(state_idx))) + n_union = max(0, min(n_union, len(state_idx))) + if n_union == 0: + continue + if n_union == len(state_idx): + result[state_idx] = 1 + continue + + chosen = np.argpartition(selection_keys[state_idx], n_union - 1)[:n_union] + result[state_idx[chosen]] = 1 + + return result + + +@lru_cache(maxsize=1) +def load_org_training_data() -> pd.DataFrame: + """Load ORG donor rows built from official CPS basic monthly files.""" + cache_path = STORAGE_FOLDER / ORG_FILENAME + if cache_path.exists(): + return pd.read_csv(cache_path) + + months = [] + for month in ORG_MONTHS: + month_df = pd.read_csv( + _cps_basic_org_month_url(ORG_YEAR, month), + usecols=CPS_BASIC_MONTHLY_ORG_COLUMNS, + low_memory=False, + ) + months.append(_transform_cps_basic_org_month(month_df)) + + org = pd.concat(months, ignore_index=True) + org.to_csv(cache_path, index=False, compression="gzip") + return org + + +@lru_cache(maxsize=1) +def get_org_model(): + """Fit and cache the CPS-basic ORG model used for wage inputs.""" + train = load_org_training_data() + qrf = QRF() + return qrf.fit( + X_train=train, + predictors=ORG_PREDICTORS, + imputed_variables=ORG_QRF_IMPUTED_VARIABLES, + weight_col="sample_weight", + tune_hyperparameters=False, + ) + + +def apply_org_domain_constraints( + predictions: pd.DataFrame, + receiver: pd.DataFrame, + self_employment_income: np.ndarray | None = None, +) -> pd.DataFrame: + """Clamp ORG predictions to basic labor-market domain constraints.""" + result = predictions.copy() + + inactive = (receiver["employment_income"].values <= 0) | ( + receiver["weekly_hours_worked"].values <= 0 + ) + if self_employment_income is not None: + self_employment_income = np.asarray(self_employment_income) + inactive |= (self_employment_income > 0) & ( + receiver["employment_income"].values <= 0 + ) + + if "hourly_wage" in result: + result["hourly_wage"] = result["hourly_wage"].clip(lower=0).astype(np.float32) + result.loc[inactive, "hourly_wage"] = 0 + + for col in ORG_BOOL_VARIABLES: + if col in result: + result[col] = result[col].fillna(0) >= 0.5 + result.loc[inactive, col] = False + + return result + + +def predict_org_features( + receiver: pd.DataFrame, + *, + self_employment_income: np.ndarray | None = None, +) -> pd.DataFrame: + """Predict ORG-derived labor-market features for receiver records.""" + missing = [col for col in ORG_PREDICTORS if col not in receiver.columns] + if missing: + raise ValueError(f"ORG receiver frame missing required columns: {missing}") + + predictions = get_org_model().predict(X_test=receiver[ORG_PREDICTORS]) + predictions["is_union_member_or_covered"] = _predict_union_coverage_from_bls_tables( + receiver, + self_employment_income=self_employment_income, + ) + return apply_org_domain_constraints( + predictions=predictions, + receiver=receiver, + self_employment_income=self_employment_income, + ) diff --git a/policyengine_us_data/datasets/scf/fed_scf.py b/policyengine_us_data/datasets/scf/fed_scf.py index 6ec6a11aa..8d2238d10 100644 --- a/policyengine_us_data/datasets/scf/fed_scf.py +++ b/policyengine_us_data/datasets/scf/fed_scf.py @@ -5,6 +5,7 @@ from zipfile import ZipFile import pandas as pd import os +from filelock import FileLock from policyengine_us_data.storage import STORAGE_FOLDER @@ -20,16 +21,24 @@ def load(self): Returns: pd.DataFrame: The raw SCF data. """ + with self._lock(): + return self._load_unlocked() + + def generate(self): + with self._lock(): + self._generate_unlocked() + + def _load_unlocked(self) -> pd.DataFrame: # Check if file exists if not os.path.exists(self.file_path): print(f"Raw SCF dataset file not found. Generating it.") - self.generate() + self._generate_unlocked() # Open the HDF store and return the DataFrame with pd.HDFStore(self.file_path, mode="r") as storage: return storage["data"] - def generate(self): + def _generate_unlocked(self): if self._scf_download_url is None: raise ValueError(f"No raw SCF data URL known for year {self.time_period}.") @@ -73,6 +82,9 @@ def generate(self): # Store in HDF file storage["data"] = data + def _lock(self) -> FileLock: + return FileLock(f"{self.file_path}.lock", timeout=600) + @property def _scf_download_url(self) -> str: return SummarizedSCF_URL_BY_YEAR.get(self.time_period) diff --git a/policyengine_us_data/datasets/scf/scf.py b/policyengine_us_data/datasets/scf/scf.py index df032f7d3..5a1a3af59 100644 --- a/policyengine_us_data/datasets/scf/scf.py +++ b/policyengine_us_data/datasets/scf/scf.py @@ -11,6 +11,7 @@ import os import h5py from typing import Type +from filelock import FileLock class SCF(Dataset): @@ -24,6 +25,10 @@ class SCF(Dataset): frac: float | None = 1 def generate(self): + with self._lock(): + self._generate_unlocked() + + def _generate_unlocked(self): """Generates the SCF dataset for PolicyEngine US microsimulations. Downloads the raw SCF data and processes it for use in PolicyEngine. @@ -64,6 +69,10 @@ def generate(self): self.downsample(frac=self.frac) def load_dataset(self): + with self._lock(): + return self._load_dataset_unlocked() + + def _load_dataset_unlocked(self): """Loads the processed SCF dataset. Returns: @@ -72,7 +81,7 @@ def load_dataset(self): # Check if file exists if not os.path.exists(self.file_path): print(f"SCF dataset file not found. Generating it.") - self.generate() + self._generate_unlocked() # Open the HDF5 file and handle potential errors try: @@ -94,7 +103,7 @@ def load_dataset(self): print("Regenerating dataset...") if os.path.exists(self.file_path): os.remove(self.file_path) - self.generate() + self._generate_unlocked() with h5py.File(self.file_path, "r") as f: return {key: f[key][()] for key in f.keys()} @@ -137,6 +146,9 @@ def downsample(self, frac: float): self.save_dataset(original_data) + def _lock(self) -> FileLock: + return FileLock(f"{self.file_path}.lock", timeout=600) + def add_variables_to_dict(scf: dict, raw_data: pd.DataFrame) -> None: """Add all variables directly from the SCF DataFrame to the SCF dictionary. diff --git a/policyengine_us_data/tests/test_calibration/test_source_impute.py b/policyengine_us_data/tests/test_calibration/test_source_impute.py index 517a559ef..2d2edb9e3 100644 --- a/policyengine_us_data/tests/test_calibration/test_source_impute.py +++ b/policyengine_us_data/tests/test_calibration/test_source_impute.py @@ -15,11 +15,13 @@ SIPP_IMPUTED_VARIABLES, SIPP_TIPS_PREDICTORS, _impute_acs, + _impute_org, _impute_scf, _impute_sipp, _person_state_fips, impute_source_variables, ) +from policyengine_us_data.datasets.org import ORG_IMPUTED_VARIABLES def _make_data_dict(n_persons=20, time_period=2024): @@ -47,6 +49,11 @@ def _make_data_dict(n_persons=20, time_period=2024): "bank_account_assets": {time_period: np.zeros(n_persons)}, "stock_assets": {time_period: np.zeros(n_persons)}, "bond_assets": {time_period: np.zeros(n_persons)}, + "hourly_wage": {time_period: np.zeros(n_persons)}, + "is_paid_hourly": {time_period: np.zeros(n_persons, dtype=bool)}, + "is_union_member_or_covered": { + time_period: np.zeros(n_persons, dtype=bool), + }, "net_worth": {time_period: np.zeros(n_persons)}, "auto_loan_balance": {time_period: np.zeros(n_persons)}, "auto_loan_interest": {time_period: np.zeros(n_persons)}, @@ -69,9 +76,17 @@ def test_scf_variables_defined(self): assert "auto_loan_balance" in SCF_IMPUTED_VARIABLES assert "auto_loan_interest" in SCF_IMPUTED_VARIABLES + def test_org_variables_defined(self): + assert "hourly_wage" in ORG_IMPUTED_VARIABLES + assert "is_paid_hourly" in ORG_IMPUTED_VARIABLES + assert "is_union_member_or_covered" in ORG_IMPUTED_VARIABLES + def test_all_source_variables_defined(self): expected = ( - ACS_IMPUTED_VARIABLES + SIPP_IMPUTED_VARIABLES + SCF_IMPUTED_VARIABLES + ACS_IMPUTED_VARIABLES + + SIPP_IMPUTED_VARIABLES + + ORG_IMPUTED_VARIABLES + + SCF_IMPUTED_VARIABLES ) assert ALL_SOURCE_VARIABLES == expected @@ -115,6 +130,7 @@ def test_returns_dict(self): time_period=2024, skip_acs=True, skip_sipp=True, + skip_org=True, skip_scf=True, ) assert isinstance(result, dict) @@ -131,6 +147,7 @@ def test_skip_flags_preserve_data(self): time_period=2024, skip_acs=True, skip_sipp=True, + skip_org=True, skip_scf=True, ) @@ -138,6 +155,8 @@ def test_skip_flags_preserve_data(self): "rent", "real_estate_taxes", "tip_income", + "hourly_wage", + "is_union_member_or_covered", "net_worth", ]: np.testing.assert_array_equal(result[var][2024], data[var][2024]) @@ -152,6 +171,7 @@ def test_state_fips_added_to_data(self): time_period=2024, skip_acs=True, skip_sipp=True, + skip_org=True, skip_scf=True, ) @@ -203,5 +223,8 @@ def test_impute_acs_exists(self): def test_impute_sipp_exists(self): assert callable(_impute_sipp) + def test_impute_org_exists(self): + assert callable(_impute_org) + def test_impute_scf_exists(self): assert callable(_impute_scf) diff --git a/policyengine_us_data/tests/test_datasets/test_cps.py b/policyengine_us_data/tests/test_datasets/test_cps.py index 3073d4319..0b9dda970 100644 --- a/policyengine_us_data/tests/test_datasets/test_cps.py +++ b/policyengine_us_data/tests/test_datasets/test_cps.py @@ -26,8 +26,9 @@ def test_cps_has_fsla_overtime_premium(): from policyengine_us import Microsimulation sim = Microsimulation(dataset=CPS_2024) - # Ensure we impute around 70 billion in overtime premium with 20% error bounds. - OVERTIME_PREMIUM_TARGET = 70e9 + # ORG-backed hourly-pay data materially increases modeled overtime premium. + # Keep a broad sanity band around the new CPS aggregate level. + OVERTIME_PREMIUM_TARGET = 130e9 RELATIVE_TOLERANCE = 0.2 assert ( abs(sim.calculate("fsla_overtime_premium").sum() / OVERTIME_PREMIUM_TARGET - 1) diff --git a/policyengine_us_data/tests/test_datasets/test_org.py b/policyengine_us_data/tests/test_datasets/test_org.py new file mode 100644 index 000000000..1ef3c6d38 --- /dev/null +++ b/policyengine_us_data/tests/test_datasets/test_org.py @@ -0,0 +1,254 @@ +import numpy as np +import pandas as pd + +from policyengine_us_data.datasets.cps import cps as cps_module +from policyengine_us_data.datasets.org import ( + apply_org_domain_constraints, + build_org_receiver_frame, +) +from policyengine_us_data.datasets.org.org import ( + _build_union_priority_weights, + _predict_union_coverage_from_bls_tables, + _transform_cps_basic_org_month, +) + + +def test_build_org_receiver_frame_derives_wbho_groups(): + receiver = build_org_receiver_frame( + age=np.array([30, 40, 35, 50]), + is_female=np.array([0, 1, 1, 0]), + is_hispanic=np.array([0, 0, 1, 0]), + cps_race=np.array([1, 2, 1, 4]), + state_fips=np.array([6, 36, 48, 12]), + employment_income=np.array([50_000, 60_000, 45_000, 70_000]), + weekly_hours_worked=np.array([40, 40, 35, 45]), + ) + + np.testing.assert_array_equal( + receiver["race_wbho"].values, + np.array([1, 2, 3, 4], dtype=np.float32), + ) + + +def test_apply_org_domain_constraints_zeroes_inactive_workers(): + predictions = pd.DataFrame( + { + "hourly_wage": [-5.0, 22.0, 30.0], + "is_paid_hourly": [0.8, 0.7, 0.2], + "is_union_member_or_covered": [0.9, 0.6, 0.7], + } + ) + receiver = pd.DataFrame( + { + "employment_income": [0.0, 40_000.0, 55_000.0], + "weekly_hours_worked": [10.0, 0.0, 40.0], + } + ) + + result = apply_org_domain_constraints(predictions, receiver) + + assert result["hourly_wage"].tolist() == [0.0, 0.0, 30.0] + assert result["is_paid_hourly"].tolist() == [False, False, False] + assert result["is_union_member_or_covered"].tolist() == [False, False, True] + + +def test_apply_org_domain_constraints_zeroes_self_employed_nonwage_workers(): + predictions = pd.DataFrame( + { + "hourly_wage": [35.0], + "is_paid_hourly": [0.9], + "is_union_member_or_covered": [0.8], + } + ) + receiver = pd.DataFrame( + { + "employment_income": [0.0], + "weekly_hours_worked": [40.0], + } + ) + + result = apply_org_domain_constraints( + predictions, + receiver, + self_employment_income=np.array([20_000.0]), + ) + + assert result["hourly_wage"].iloc[0] == 0.0 + assert not bool(result["is_paid_hourly"].iloc[0]) + assert not bool(result["is_union_member_or_covered"].iloc[0]) + + +def test_transform_cps_basic_org_month_uses_primary_cps_fields(): + raw = pd.DataFrame( + { + "HRMIS": [4, 8, 4, 3], + "gestfips": [6, 36, 12, 6], + "prtage": [30, 45, 28, 32], + "pesex": [1, 2, 2, 1], + "ptdtrace": [1, 2, 3, 1], + "pehspnon": [2, 2, 1, 2], + "pworwgt": [100.0, 200.0, 150.0, 0.0], + "pternwa": [100000.0, 80000.0, 120000.0, 90000.0], + "pternhly": [2500.0, -1.0, 3000.0, 2000.0], + "peernhry": [1, 2, 1, 1], + "pehruslt": [40.0, 40.0, 50.0, 40.0], + "prerelg": [1, 1, 1, 1], + "pemlr": [1, 1, 2, 1], + "peio1cow": [1, 4, 2, 1], + } + ) + + transformed = _transform_cps_basic_org_month(raw) + + assert len(transformed) == 3 + assert transformed["hourly_wage"].tolist() == [25.0, 20.0, 30.0] + assert transformed["is_paid_hourly"].tolist() == [1.0, 0.0, 1.0] + assert "is_union_member_or_covered" not in transformed.columns + + +def test_build_union_priority_weights_reflect_bls_demographics(): + receiver = pd.DataFrame( + { + "age": [22, 52], + "is_female": [1.0, 0.0], + "race_wbho": [1.0, 2.0], + "weekly_hours_worked": [20.0, 40.0], + } + ) + + weights = _build_union_priority_weights(receiver) + + assert weights[1] > weights[0] + + +def test_predict_union_coverage_from_bls_tables_matches_state_targets(): + n_california = 20 + n_north_carolina = 20 + receiver = pd.DataFrame( + { + "employment_income": np.concatenate( + [ + np.linspace(40_000, 78_000, n_california), + np.linspace(30_000, 68_000, n_north_carolina), + [0.0], + ] + ), + "weekly_hours_worked": np.concatenate( + [ + np.tile([40.0, 35.0, 20.0, 45.0], 10)[:n_california], + np.tile([40.0, 25.0, 30.0, 38.0], 10)[:n_north_carolina], + [40.0], + ] + ), + "age": np.concatenate( + [ + np.tile([24.0, 32.0, 41.0, 53.0, 61.0], 4), + np.tile([23.0, 31.0, 43.0, 51.0, 67.0], 4), + [45.0], + ] + ), + "is_female": np.concatenate( + [ + np.tile([0.0, 1.0], n_california // 2), + np.tile([1.0, 0.0], n_north_carolina // 2), + [0.0], + ] + ), + "is_hispanic": np.concatenate( + [ + np.tile([0.0, 1.0, 0.0, 0.0, 0.0], 4), + np.tile([1.0, 0.0, 0.0, 0.0, 0.0], 4), + [0.0], + ] + ), + "race_wbho": np.concatenate( + [ + np.tile([1.0, 3.0, 2.0, 4.0, 1.0], 4), + np.tile([3.0, 1.0, 2.0, 4.0, 1.0], 4), + [1.0], + ] + ), + "state_fips": np.concatenate( + [ + np.full(n_california, 6.0), + np.full(n_north_carolina, 37.0), + [6.0], + ] + ), + } + ) + self_employment_income = np.concatenate( + [ + np.zeros(n_california + n_north_carolina), + [20_000.0], + ] + ) + + first = _predict_union_coverage_from_bls_tables( + receiver, + self_employment_income=self_employment_income, + ) + second = _predict_union_coverage_from_bls_tables( + receiver, + self_employment_income=self_employment_income, + ) + + np.testing.assert_array_equal(first, second) + assert int(first[:n_california].sum()) == 3 + assert int(first[n_california : n_california + n_north_carolina].sum()) == 1 + assert first[-1] == 0 + + +def test_add_org_labor_market_inputs_handles_nonsequential_household_index( + monkeypatch, +): + cps = { + "household_id": np.array([10, 20], dtype=np.int64), + "person_household_id": np.array([20, 10], dtype=np.int64), + "state_fips": pd.Series([6.0, 36.0], index=[100, 200]), + "age": np.array([30.0, 40.0]), + "is_female": np.array([0.0, 1.0]), + "is_hispanic": np.array([0.0, 0.0]), + "cps_race": np.array([1.0, 2.0]), + "employment_income": np.array([50_000.0, 60_000.0]), + "weekly_hours_worked": np.array([40.0, 40.0]), + } + captured_state_fips = {} + + def fake_build_org_receiver_frame(**kwargs): + captured_state_fips["value"] = kwargs["state_fips"] + return pd.DataFrame(kwargs) + + def fake_predict_org_features(receiver, self_employment_income): + assert np.array_equal(self_employment_income, np.zeros(len(receiver))) + return pd.DataFrame( + { + "hourly_wage": np.array([20.0, 30.0]), + "is_paid_hourly": np.array([1.0, 0.0]), + "is_union_member_or_covered": np.array([0.0, 1.0]), + } + ) + + monkeypatch.setattr( + cps_module, + "build_org_receiver_frame", + fake_build_org_receiver_frame, + ) + monkeypatch.setattr( + cps_module, + "predict_org_features", + fake_predict_org_features, + ) + + cps_module.add_org_labor_market_inputs(cps) + + np.testing.assert_array_equal( + captured_state_fips["value"], + np.array([36.0, 6.0], dtype=np.float32), + ) + np.testing.assert_array_equal(cps["hourly_wage"], np.array([20.0, 30.0])) + np.testing.assert_array_equal(cps["is_paid_hourly"], np.array([True, False])) + np.testing.assert_array_equal( + cps["is_union_member_or_covered"], + np.array([False, True]), + ) diff --git a/policyengine_us_data/tests/test_datasets/test_scf_locking.py b/policyengine_us_data/tests/test_datasets/test_scf_locking.py new file mode 100644 index 000000000..42a51d2c1 --- /dev/null +++ b/policyengine_us_data/tests/test_datasets/test_scf_locking.py @@ -0,0 +1,42 @@ +from concurrent.futures import ThreadPoolExecutor +import time + +import pandas as pd + +from policyengine_core.data import Dataset + +from policyengine_us_data.datasets.scf.fed_scf import SummarizedFedSCF + + +def test_fed_scf_load_waits_for_concurrent_generation(tmp_path): + class TestFedSCF(SummarizedFedSCF): + time_period = 2099 + name = "test_fed_scf" + label = "Test Federal Reserve SCF" + file_path = tmp_path / "test_fed_scf.h5" + data_format = Dataset.TABLES + + generation_calls = 0 + expected = pd.DataFrame({"value": [1, 2, 3]}) + + def fake_generate(self): + nonlocal generation_calls + generation_calls += 1 + with pd.HDFStore(self.file_path, mode="w") as storage: + storage["data"] = expected + time.sleep(0.2) + + first = TestFedSCF() + second = TestFedSCF() + first._generate_unlocked = fake_generate.__get__(first, TestFedSCF) + second._generate_unlocked = fake_generate.__get__(second, TestFedSCF) + + with ThreadPoolExecutor(max_workers=2) as executor: + left = executor.submit(first.load) + right = executor.submit(second.load) + left_result = left.result() + right_result = right.result() + + assert generation_calls == 1 + pd.testing.assert_frame_equal(left_result, expected) + pd.testing.assert_frame_equal(right_result, expected) diff --git a/policyengine_us_data/tests/test_extended_cps.py b/policyengine_us_data/tests/test_extended_cps.py index 5ddf46929..a44977bdc 100644 --- a/policyengine_us_data/tests/test_extended_cps.py +++ b/policyengine_us_data/tests/test_extended_cps.py @@ -21,6 +21,7 @@ apply_retirement_constraints, reconcile_ss_subcomponents, ) +from policyengine_us_data.datasets.org import ORG_IMPUTED_VARIABLES class TestVariableListConsistency: @@ -87,6 +88,11 @@ def test_ss_subcomponents_in_cps_only(self): f"SS sub-component vars missing from CPS_ONLY: {missing}" ) + def test_org_variables_in_cps_only(self): + """ORG labor-market inputs should be re-imputed for PUF clones.""" + missing = set(ORG_IMPUTED_VARIABLES) - set(CPS_ONLY_IMPUTED_VARIABLES) + assert missing == set(), f"ORG vars missing from CPS_ONLY: {missing}" + def test_nonexistent_vars_not_in_cps_only(self): """Variables that don't exist in policyengine-us should not be in CPS_ONLY_IMPUTED_VARIABLES."""