diff --git a/Makefile b/Makefile index 602afe3d8..496a12a44 100644 --- a/Makefile +++ b/Makefile @@ -188,7 +188,8 @@ calibrate-modal-national: modal run modal_app/remote_calibration_runner.py::main \ --branch $(BRANCH) --gpu $(NATIONAL_GPU) \ --epochs $(NATIONAL_EPOCHS) \ - --push-results --national + --push-results --national \ + --target-config policyengine_us_data/calibration/target_config_national.yaml calibrate-both: $(MAKE) calibrate-modal & $(MAKE) calibrate-modal-national & wait diff --git a/changelog.d/acs-2024-donor.changed.md b/changelog.d/acs-2024-donor.changed.md new file mode 100644 index 000000000..451a80503 --- /dev/null +++ b/changelog.d/acs-2024-donor.changed.md @@ -0,0 +1 @@ +Use 2024 ACS 1-year PUMS as the rent and property-tax donor source for source imputation, while keeping the historical 2022 ACS dataset available. diff --git a/changelog.d/scf-2024-source-impute.changed.md b/changelog.d/scf-2024-source-impute.changed.md new file mode 100644 index 000000000..c968d2b9a --- /dev/null +++ b/changelog.d/scf-2024-source-impute.changed.md @@ -0,0 +1 @@ +Age SCF donor values from 2022 to 2024 during source imputation, align household `net_worth` to the 2024 national total, and make the 2024 source-imputed CPS the tested net-worth dataset. diff --git a/changelog.d/source-imputed-cps-datasets.changed.md b/changelog.d/source-imputed-cps-datasets.changed.md new file mode 100644 index 000000000..d0bc29634 --- /dev/null +++ b/changelog.d/source-imputed-cps-datasets.changed.md @@ -0,0 +1 @@ +Add dataset classes for the stratified and source-imputed stratified extended CPS artifacts, and align `EnhancedCPS_2024` with the canonical source-imputed calibration input. diff --git a/docs/calibration.md b/docs/calibration.md index fa9f9ac2e..a6d169b6e 100644 --- a/docs/calibration.md +++ b/docs/calibration.md @@ -182,12 +182,16 @@ weights = fit_l0_weights( ## Target Config -The target config controls which targets reach the optimizer. It uses a YAML exclusion list: +The target config controls which targets reach the optimizer. It can use either a YAML inclusion list or exclusion list: ```yaml -exclude: +include: + - variable: net_worth + geo_level: national - variable: rent geo_level: national + +exclude: - variable: eitc geo_level: district - variable: snap @@ -195,7 +199,7 @@ exclude: domain_variable: snap # optional: further narrow the match ``` -Each rule drops rows from the calibration matrix where **all** specified fields match. Unrecognized variables silently match nothing. +`include` keeps only matching rows. `exclude` drops matching rows. If both are present, `include` is applied first and `exclude` removes from that set. Unrecognized variables silently match nothing. ### Fields @@ -207,12 +211,11 @@ Each rule drops rows from the calibration matrix where **all** specified fields ### Default config -The checked-in config at `policyengine_us_data/calibration/target_config.yaml` reproduces the junkyard notebook's 22 excluded target groups. It drops: +The default training config at `policyengine_us_data/calibration/target_config.yaml` is include-based. It defines the shared target subset used by local calibration and excludes national `net_worth`. -- **13 national-level variables**: alimony, charitable deduction, child support, interest deduction, medical expense deduction, net worth, person count, real estate taxes, rent, social security dependents/survivors -- **9 district-level variables**: ACA PTC, EITC, income tax before credits, medical expense deduction, net capital gains, rental income, tax unit count, partnership/S-corp income, taxable social security +The national calibration preset uses `policyengine_us_data/calibration/target_config_national.yaml`, which is the same include-based target set plus national `net_worth`. -Applying this config reduces targets from ~37K to ~21K, matching the junkyard's target selection. +The checked-in backup config at `policyengine_us_data/calibration/target_config_full.yaml` preserves the earlier junkyard-style exclusion list for reference. ### Writing a custom config diff --git a/modal_app/README.md b/modal_app/README.md index 876f3610e..90e545dce 100644 --- a/modal_app/README.md +++ b/modal_app/README.md @@ -185,7 +185,7 @@ Loads pre-built matrices from Modal volume, fits L0-regularized weights on GPU. | **Modal CLI (national preset)** | `make calibrate-modal-national BRANCH=` | | **Both presets** | `make calibrate-both BRANCH=` | -`make calibrate-modal` passes `--prebuilt-matrices --push-results` automatically. `make calibrate-modal-national` adds `--national`, which sets λ_L0=1e-4 for a smaller ~50K-record output. `make calibrate-both` runs both in parallel. +`make calibrate-modal` passes `--prebuilt-matrices --push-results` automatically. `make calibrate-modal-national` adds `--national` and uses `policyengine_us_data/calibration/target_config_national.yaml`, which sets λ_L0=1e-4 for a smaller ~50K-record output and currently adds national `net_worth`. `make calibrate-both` runs both in parallel. Full example: ``` @@ -194,7 +194,7 @@ modal run modal_app/remote_calibration_runner.py::main \ --gpu T4 --epochs 1000 \ --beta 0.65 --lambda-l0 1e-6 --lambda-l2 1e-8 \ --log-freq 500 \ - --target-config policyengine_us_data/calibration/target_config.yaml \ + --target-config policyengine_us_data/calibration/target_config_national.yaml \ --prebuilt-matrices --push-results ``` diff --git a/modal_app/data_build.py b/modal_app/data_build.py index 20314e4d8..43e01a32b 100644 --- a/modal_app/data_build.py +++ b/modal_app/data_build.py @@ -36,7 +36,7 @@ "policyengine_us_data/storage/uprating_factors.csv" ), "policyengine_us_data/datasets/acs/acs.py": ( - "policyengine_us_data/storage/acs_2022.h5" + "policyengine_us_data/storage/acs_2024.h5" ), "policyengine_us_data/datasets/puf/irs_puf.py": ( "policyengine_us_data/storage/irs_puf_2015.h5" diff --git a/policyengine_us_data/calibration/create_source_imputed_cps.py b/policyengine_us_data/calibration/create_source_imputed_cps.py index 68dd876ac..164d47241 100644 --- a/policyengine_us_data/calibration/create_source_imputed_cps.py +++ b/policyengine_us_data/calibration/create_source_imputed_cps.py @@ -10,9 +10,9 @@ import logging import sys -from pathlib import Path import h5py +import numpy as np from policyengine_us_data.storage import STORAGE_FOLDER @@ -22,15 +22,30 @@ OUTPUT_PATH = str(STORAGE_FOLDER / "source_imputed_stratified_extended_cps_2024.h5") +def _resolve_household_state_fips(sim, n_records: int, seed: int, use_existing_state_fips: bool): + if use_existing_state_fips: + try: + existing_states = sim.calculate("state_fips", map_to="household").values + if len(existing_states) == n_records: + logger.info("Using existing household state_fips from input dataset") + return np.asarray(existing_states, dtype=np.int32) + except Exception as exc: + logger.info("Existing state_fips unavailable, assigning random geography: %s", exc) + + from policyengine_us_data.calibration.clone_and_assign import assign_random_geography + + geography = assign_random_geography(n_records=n_records, n_clones=1, seed=seed) + return geography.state_fips[:n_records].astype(np.int32) + + def create_source_imputed_cps( input_path: str = INPUT_PATH, output_path: str = OUTPUT_PATH, seed: int = 42, + use_existing_state_fips: bool = False, + time_period: int | None = None, ): from policyengine_us import Microsimulation - from policyengine_us_data.calibration.clone_and_assign import ( - assign_random_geography, - ) from policyengine_us_data.calibration.source_impute import ( impute_source_variables, ) @@ -38,17 +53,17 @@ def create_source_imputed_cps( logger.info("Loading dataset from %s", input_path) sim = Microsimulation(dataset=input_path) n_records = len(sim.calculate("household_id", map_to="household").values) - - raw_keys = sim.dataset.load_dataset()["household_id"] - if isinstance(raw_keys, dict): - time_period = int(next(iter(raw_keys))) - else: - time_period = 2024 + if time_period is None: + time_period = int(sim.default_calculation_period) logger.info("Loaded %d households, time_period=%d", n_records, time_period) - geography = assign_random_geography(n_records=n_records, n_clones=1, seed=seed) - base_states = geography.state_fips[:n_records] + base_states = _resolve_household_state_fips( + sim, + n_records=n_records, + seed=seed, + use_existing_state_fips=use_existing_state_fips, + ) raw_data = sim.dataset.load_dataset() data_dict = {} diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index 25c7975ad..dc7ff60bc 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -89,6 +89,26 @@ "social_security_pension_income", ] +SCF_DONOR_YEAR = 2022 + +SCF_DONOR_UPRATING_MAP = { + "employment_income": ("employment_income",), + "interest_dividend_income": ( + "taxable_interest_income", + "tax_exempt_interest_income", + "qualified_dividend_income", + "non_qualified_dividend_income", + ), + "social_security_pension_income": ("social_security",), + "net_worth": ("net_worth",), + "auto_loan_balance": ("auto_loan_balance",), + "auto_loan_interest": ("auto_loan_interest",), +} + +NET_WORTH_TOTAL_TARGETS = { + 2024: 160e12, +} + TENURE_TYPE_MAP = { "OWNED_WITH_MORTGAGE": 1, @@ -111,6 +131,63 @@ def _encode_tenure_type(df: pd.DataFrame) -> pd.DataFrame: return df +def _uprating_ratio(variable_names: tuple[str, ...], from_year: int, to_year: int) -> float: + """Return a donor-to-recipient uprating ratio. + + Uses the mean ratio across available variable-specific uprating series and + falls back to CPI-U when no series is available. + """ + from policyengine_us.system import system + + ratios: list[float] = [] + for variable_name in variable_names: + variable = system.variables.get(variable_name) + if variable is None or variable.uprating is None: + continue + parameter = system.parameters.get_child(variable.uprating) + ratios.append(float(parameter(to_year) / parameter(from_year))) + + if ratios: + return float(np.mean(ratios)) + + cpi = system.parameters.get_child("gov.bls.cpi.cpi_u") + return float(cpi(to_year) / cpi(from_year)) + + +def _uprate_scf_donor_frame( + donor: pd.DataFrame, + *, + from_year: int, + to_year: int, +) -> pd.DataFrame: + """Uprate SCF donor money columns from donor year to recipient year.""" + if from_year == to_year: + return donor + + donor = donor.copy() + for column, variable_names in SCF_DONOR_UPRATING_MAP.items(): + if column not in donor.columns: + continue + donor[column] = donor[column].astype(np.float32) * _uprating_ratio( + variable_names, from_year, to_year + ) + return donor + + +def _align_weighted_total( + values: np.ndarray, + weights: np.ndarray, + target_total: float, +) -> np.ndarray: + """Scale values so their weighted total matches a target.""" + current_total = float(np.dot(values.astype(np.float64), weights.astype(np.float64))) + if current_total <= 0 or target_total <= 0: + return values + return (values.astype(np.float64) * (target_total / current_total)).astype( + np.float32 + ) + + def impute_source_variables( data: Dict[str, Dict[int, np.ndarray]], state_fips: np.ndarray, @@ -195,6 +272,183 @@ def _build_cps_receiver( return df +def _household_values_from_data( + data: Dict[str, Dict[int, np.ndarray]], + variable: str, + time_period: int, + household_ids: np.ndarray, + person_household_ids: np.ndarray | None, + *, + how: str = "sum", + default: float = 0.0, +) -> np.ndarray: + """Map a variable to one household-level value per household.""" + values = data.get(variable, {}).get(time_period) + if values is None: + return np.full(len(household_ids), default, dtype=np.float32) + + values = np.asarray(values) + household_ids = np.asarray(household_ids) + + if len(values) == len(household_ids): + return values.astype(np.float32) + + if person_household_ids is None or len(values) != len(person_household_ids): + return np.full(len(household_ids), default, dtype=np.float32) + + frame = pd.DataFrame( + { + "household_id": person_household_ids, + "value": values, + } + ) + if how == "first": + grouped = frame.groupby("household_id", sort=False)["value"].first() + elif how == "max": + grouped = frame.groupby("household_id", sort=False)["value"].max() + else: + grouped = frame.groupby("household_id", sort=False)["value"].sum() + + return ( + grouped.reindex(household_ids, fill_value=default).to_numpy(dtype=np.float32) + ) + + +def _build_household_scf_receiver( + data: Dict[str, Dict[int, np.ndarray]], + time_period: int, +) -> pd.DataFrame: + """Build a household-level receiver frame for SCF wealth imputation.""" + household_ids = np.asarray(data["household_id"][time_period]) + person_household_ids = data.get("person_household_id", {}).get(time_period) + if person_household_ids is not None: + person_household_ids = np.asarray(person_household_ids) + + receiver = pd.DataFrame({"household_id": household_ids}) + + receiver["age"] = _household_values_from_data( + data, + "age", + time_period, + household_ids, + person_household_ids, + how="first", + ) + + if "is_female" in data: + receiver["is_female"] = _household_values_from_data( + data, + "is_female", + time_period, + household_ids, + person_household_ids, + how="first", + ) + elif "is_male" in data: + receiver["is_female"] = 1.0 - _household_values_from_data( + data, + "is_male", + time_period, + household_ids, + person_household_ids, + how="first", + ) + else: + receiver["is_female"] = 0.0 + + receiver["cps_race"] = _household_values_from_data( + data, + "cps_race", + time_period, + household_ids, + person_household_ids, + how="first", + ) + receiver["is_married"] = _household_values_from_data( + data, + "is_married", + time_period, + household_ids, + person_household_ids, + how="max", + ) + receiver["own_children_in_household"] = _household_values_from_data( + data, + "own_children_in_household", + time_period, + household_ids, + person_household_ids, + how="max", + ) + receiver["employment_income"] = _household_values_from_data( + data, + "employment_income", + time_period, + household_ids, + person_household_ids, + how="sum", + ) + + if "interest_dividend_income" in data: + interest_dividend_income = _household_values_from_data( + data, + "interest_dividend_income", + time_period, + household_ids, + person_household_ids, + how="sum", + ) + else: + interest_dividend_income = np.zeros(len(household_ids), dtype=np.float32) + for variable in [ + "taxable_interest_income", + "tax_exempt_interest_income", + "qualified_dividend_income", + "non_qualified_dividend_income", + ]: + interest_dividend_income += _household_values_from_data( + data, + variable, + time_period, + household_ids, + person_household_ids, + how="sum", + ) + receiver["interest_dividend_income"] = interest_dividend_income + + if "social_security_pension_income" in data: + social_security_pension_income = _household_values_from_data( + data, + "social_security_pension_income", + time_period, + household_ids, + person_household_ids, + how="sum", + ) + else: + social_security_pension_income = np.zeros( + len(household_ids), dtype=np.float32 + ) + for variable in [ + "tax_exempt_private_pension_income", + "taxable_private_pension_income", + "social_security_retirement", + "social_security", + "pension_income", + ]: + social_security_pension_income += _household_values_from_data( + data, + variable, + time_period, + household_ids, + person_household_ids, + how="sum", + ) + receiver["social_security_pension_income"] = social_security_pension_income + + return receiver + + def _get_variable_entity(variable_name: str) -> str: """Return the entity key for a PE variable.""" from policyengine_us import CountryTaxBenefitSystem @@ -256,9 +510,9 @@ def _impute_acs( from microimpute.models.qrf import QRF from policyengine_us import Microsimulation - from policyengine_us_data.datasets.acs.acs import ACS_2022 + from policyengine_us_data.datasets.acs.acs import ACS_2024 - acs = Microsimulation(dataset=ACS_2022) + acs = Microsimulation(dataset=ACS_2024) predictors = ACS_PREDICTORS + ["state_fips"] acs_df = acs.calculate_dataframe(ACS_PREDICTORS + ACS_IMPUTED_VARIABLES) @@ -602,57 +856,13 @@ def _impute_scf( if weights is not None: donor["wgt"] = weights donor = donor.dropna(subset=scf_predictors) + donor = _uprate_scf_donor_frame( + donor, + from_year=SCF_DONOR_YEAR, + to_year=time_period, + ) donor = donor.sample(frac=0.5, random_state=42).reset_index(drop=True) - - pe_vars = [ - "age", - "is_male", - "employment_income", - ] - cps_df = _build_cps_receiver(data, time_period, dataset_path, pe_vars) - - if "is_male" in cps_df.columns: - cps_df["is_female"] = (~cps_df["is_male"].astype(bool)).astype(np.float32) - else: - cps_df["is_female"] = 0.0 - - for var in [ - "cps_race", - "is_married", - "own_children_in_household", - ]: - if var in data: - cps_df[var] = data[var][time_period].astype(np.float32) - else: - cps_df[var] = 0.0 - - for var in [ - "taxable_interest_income", - "tax_exempt_interest_income", - "qualified_dividend_income", - "non_qualified_dividend_income", - ]: - if var in data: - cps_df[var] = data[var][time_period].astype(np.float32) - cps_df["interest_dividend_income"] = ( - cps_df.get("taxable_interest_income", 0) - + cps_df.get("tax_exempt_interest_income", 0) - + cps_df.get("qualified_dividend_income", 0) - + cps_df.get("non_qualified_dividend_income", 0) - ).astype(np.float32) - - for var in [ - "tax_exempt_private_pension_income", - "taxable_private_pension_income", - "social_security_retirement", - ]: - if var in data: - cps_df[var] = data[var][time_period].astype(np.float32) - cps_df["social_security_pension_income"] = ( - cps_df.get("tax_exempt_private_pension_income", 0) - + cps_df.get("taxable_private_pension_income", 0) - + cps_df.get("social_security_retirement", 0) - ).astype(np.float32) + cps_df = _build_household_scf_receiver(data, time_period) qrf = QRF() logger.info( @@ -670,30 +880,42 @@ def _impute_scf( ) preds = fitted.predict(X_test=cps_df) - hh_ids = data["household_id"][time_period] - person_hh_ids = data.get("person_household_id", {}).get(time_period) - for var in available_vars: - person_vals = preds[var].values + household_vals = preds[var].values.astype(np.float32) entity = _get_variable_entity(var) - if entity == "household" and person_hh_ids is not None: - hh_vals = np.zeros(len(hh_ids), dtype=np.float32) - hh_to_idx = {int(hid): i for i, hid in enumerate(hh_ids)} - seen = set() - for p_idx, p_hh in enumerate(person_hh_ids): - hh_key = int(p_hh) - if hh_key not in seen: - seen.add(hh_key) - hh_vals[hh_to_idx[hh_key]] = person_vals[p_idx] - data[var] = {time_period: hh_vals} + if entity == "household": + if var == "net_worth": + target_total = NET_WORTH_TOTAL_TARGETS.get(time_period) + household_weights = data.get("household_weight", {}).get(time_period) + if target_total is not None and household_weights is not None: + household_vals = _align_weighted_total( + household_vals, + household_weights.astype(np.float32), + target_total, + ) + logger.info( + " %s: aligned household total to %.3e", + var, + target_total, + ) + data[var] = {time_period: household_vals} logger.info( - " %s: person(%d) -> household(%d)", + " %s: household(%d)", var, - len(person_vals), - len(hh_vals), + len(household_vals), ) else: - data[var] = {time_period: person_vals} + person_hh_ids = data.get("person_household_id", {}).get(time_period) + if person_hh_ids is None: + data[var] = {time_period: household_vals} + else: + hh_ids = data["household_id"][time_period] + hh_to_value = dict(zip(hh_ids, household_vals)) + person_vals = np.array( + [hh_to_value[int(hid)] for hid in person_hh_ids], + dtype=np.float32, + ) + data[var] = {time_period: person_vals} del fitted, preds gc.collect() diff --git a/policyengine_us_data/calibration/target_config_full.yaml b/policyengine_us_data/calibration/target_config_full.yaml index 1e1e287dd..2a7f91e9b 100644 --- a/policyengine_us_data/calibration/target_config_full.yaml +++ b/policyengine_us_data/calibration/target_config_full.yaml @@ -18,8 +18,6 @@ exclude: geo_level: national - variable: medical_expense_deduction geo_level: national - - variable: net_worth - geo_level: national - variable: person_count geo_level: national - variable: real_estate_taxes diff --git a/policyengine_us_data/calibration/target_config_national.yaml b/policyengine_us_data/calibration/target_config_national.yaml new file mode 100644 index 000000000..08ff76567 --- /dev/null +++ b/policyengine_us_data/calibration/target_config_national.yaml @@ -0,0 +1,233 @@ +include: + # === DISTRICT — age demographics === + - variable: person_count + geo_level: district + domain_variable: age + + # === DISTRICT — count targets === + - variable: person_count + geo_level: district + domain_variable: adjusted_gross_income + - variable: household_count + geo_level: district + + # === DISTRICT — dollar targets (needed_w 7-41, compatible) === + - variable: real_estate_taxes + geo_level: district + - variable: self_employment_income + geo_level: district + - variable: taxable_pension_income + geo_level: district + # DISABLED: refundable_ctc formula doesn't gate on tax_unit_is_filer; + # non-filer values inflate totals beyond IRS SOI targets. + # See https://github.com/PolicyEngine/policyengine-us/issues/7748 + # - variable: refundable_ctc + # geo_level: district + - variable: unemployment_compensation + geo_level: district + + # === DISTRICT — ACA PTC === + # DISABLED: aca_ptc formula doesn't gate on tax_unit_is_filer; + # non-filer values inflate totals beyond IRS SOI targets. + # See https://github.com/PolicyEngine/policyengine-us/issues/7748 + # - variable: aca_ptc + # geo_level: district + # - variable: tax_unit_count + # geo_level: district + # domain_variable: aca_ptc + + # === STATE === + - variable: person_count + geo_level: state + domain_variable: medicaid_enrolled + - variable: person_count + geo_level: state + domain_variable: is_pregnant + - variable: snap + geo_level: state + + # === NATIONAL — aggregate dollar targets === + - variable: adjusted_gross_income + geo_level: national + - variable: child_support_expense + geo_level: national + - variable: child_support_received + geo_level: national + # DISABLED: eitc formula doesn't gate on tax_unit_is_filer; + # non-filer values inflate totals beyond IRS SOI targets. + # See https://github.com/PolicyEngine/policyengine-us/issues/7748 + # - variable: eitc + # geo_level: national + - variable: health_insurance_premiums_without_medicare_part_b + geo_level: national + - variable: medicaid + geo_level: national + - variable: medicare_part_b_premiums + geo_level: national + - variable: net_worth + geo_level: national + - variable: other_medical_expenses + geo_level: national + - variable: over_the_counter_health_expenses + geo_level: national + - variable: qualified_business_income_deduction + geo_level: national + - variable: rent + geo_level: national + - variable: salt_deduction + geo_level: national + - variable: snap + geo_level: national + - variable: social_security + geo_level: national + - variable: social_security_disability + geo_level: national + - variable: social_security_retirement + geo_level: national + - variable: spm_unit_capped_housing_subsidy + geo_level: national + - variable: spm_unit_capped_work_childcare_expenses + geo_level: national + - variable: ssi + geo_level: national + - variable: tanf + geo_level: national + - variable: tip_income + geo_level: national + - variable: unemployment_compensation + geo_level: national + + # === NATIONAL — IRS SOI domain-constrained dollar targets === + # DISABLED: aca_ptc formula doesn't gate on tax_unit_is_filer + # See https://github.com/PolicyEngine/policyengine-us/issues/7748 + # - variable: aca_ptc + # geo_level: national + # domain_variable: aca_ptc + - variable: dividend_income + geo_level: national + domain_variable: dividend_income + # DISABLED: eitc formula doesn't gate on tax_unit_is_filer + # See https://github.com/PolicyEngine/policyengine-us/issues/7748 + # - variable: eitc + # geo_level: national + # domain_variable: eitc_child_count + - variable: income_tax_positive + geo_level: national + - variable: income_tax_before_credits + geo_level: national + domain_variable: income_tax_before_credits + - variable: net_capital_gains + geo_level: national + domain_variable: net_capital_gains + - variable: qualified_business_income_deduction + geo_level: national + domain_variable: qualified_business_income_deduction + - variable: qualified_dividend_income + geo_level: national + domain_variable: qualified_dividend_income + # DISABLED: refundable_ctc formula doesn't gate on tax_unit_is_filer + # See https://github.com/PolicyEngine/policyengine-us/issues/7748 + # - variable: refundable_ctc + # geo_level: national + # domain_variable: refundable_ctc + - variable: rental_income + geo_level: national + domain_variable: rental_income + - variable: salt + geo_level: national + domain_variable: salt + - variable: self_employment_income + geo_level: national + domain_variable: self_employment_income + - variable: tax_exempt_interest_income + geo_level: national + domain_variable: tax_exempt_interest_income + - variable: tax_unit_partnership_s_corp_income + geo_level: national + domain_variable: tax_unit_partnership_s_corp_income + - variable: taxable_interest_income + geo_level: national + domain_variable: taxable_interest_income + - variable: taxable_ira_distributions + geo_level: national + domain_variable: taxable_ira_distributions + - variable: taxable_pension_income + geo_level: national + domain_variable: taxable_pension_income + - variable: taxable_social_security + geo_level: national + domain_variable: taxable_social_security + - variable: unemployment_compensation + geo_level: national + domain_variable: unemployment_compensation + + # === NATIONAL — IRS SOI filer count targets === + # DISABLED: aca_ptc inflated by non-filers + # See https://github.com/PolicyEngine/policyengine-us/issues/7748 + # - variable: tax_unit_count + # geo_level: national + # domain_variable: aca_ptc + - variable: tax_unit_count + geo_level: national + domain_variable: dividend_income + # DISABLED: eitc inflated by non-filers + # See https://github.com/PolicyEngine/policyengine-us/issues/7748 + # - variable: tax_unit_count + # geo_level: national + # domain_variable: eitc_child_count + - variable: tax_unit_count + geo_level: national + domain_variable: income_tax + - variable: tax_unit_count + geo_level: national + domain_variable: income_tax_before_credits + - variable: tax_unit_count + geo_level: national + domain_variable: medical_expense_deduction + - variable: tax_unit_count + geo_level: national + domain_variable: net_capital_gains + - variable: tax_unit_count + geo_level: national + domain_variable: qualified_business_income_deduction + - variable: tax_unit_count + geo_level: national + domain_variable: qualified_dividend_income + - variable: tax_unit_count + geo_level: national + domain_variable: real_estate_taxes + # DISABLED: refundable_ctc inflated by non-filers + # See https://github.com/PolicyEngine/policyengine-us/issues/7748 + # - variable: tax_unit_count + # geo_level: national + # domain_variable: refundable_ctc + - variable: tax_unit_count + geo_level: national + domain_variable: rental_income + - variable: tax_unit_count + geo_level: national + domain_variable: salt + - variable: tax_unit_count + geo_level: national + domain_variable: self_employment_income + - variable: tax_unit_count + geo_level: national + domain_variable: tax_exempt_interest_income + - variable: tax_unit_count + geo_level: national + domain_variable: tax_unit_partnership_s_corp_income + - variable: tax_unit_count + geo_level: national + domain_variable: taxable_interest_income + - variable: tax_unit_count + geo_level: national + domain_variable: taxable_ira_distributions + - variable: tax_unit_count + geo_level: national + domain_variable: taxable_pension_income + - variable: tax_unit_count + geo_level: national + domain_variable: taxable_social_security + - variable: tax_unit_count + geo_level: national + domain_variable: unemployment_compensation diff --git a/policyengine_us_data/datasets/acs/README.md b/policyengine_us_data/datasets/acs/README.md index 633e04e02..1b941b1d4 100644 --- a/policyengine_us_data/datasets/acs/README.md +++ b/policyengine_us_data/datasets/acs/README.md @@ -1,6 +1,6 @@ -2022 ACS 1 Year Data Dictionary: -https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2022.pdf +2024 ACS 1 Year Data Dictionary: +https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2024.pdf User Guide: -https://www2.census.gov/programs-surveys/acs/tech_docs/pums/2022ACS_PUMS_User_Guide.pdf +https://www2.census.gov/programs-surveys/acs/tech_docs/pums/2024ACS_PUMS_User_Guide.pdf PUMS Documentation: https://www.census.gov/programs-surveys/acs/microdata/documentation.html diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index 11d1ef738..79158b305 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -1,7 +1,6 @@ -import logging from policyengine_core.data import Dataset import h5py -from policyengine_us_data.datasets.acs.census_acs import CensusACS_2022 +from policyengine_us_data.datasets.acs.census_acs import CensusACS_2022, CensusACS_2024 from policyengine_us_data.storage import STORAGE_FOLDER from pandas import DataFrame import numpy as np @@ -108,5 +107,13 @@ class ACS_2022(ACS): url = "release://PolicyEngine/policyengine-us-data/1.13.0/acs_2022.h5" +class ACS_2024(ACS): + name = "acs_2024" + label = "ACS 2024" + time_period = 2024 + file_path = STORAGE_FOLDER / "acs_2024.h5" + census_acs = CensusACS_2024 + + if __name__ == "__main__": - ACS_2022().generate() + ACS_2024().generate() diff --git a/policyengine_us_data/datasets/acs/census_acs.py b/policyengine_us_data/datasets/acs/census_acs.py index 7bd28bd61..a63bdb639 100644 --- a/policyengine_us_data/datasets/acs/census_acs.py +++ b/policyengine_us_data/datasets/acs/census_acs.py @@ -53,12 +53,15 @@ "TAXAMT", # Property taxes ] +HOUSEHOLD_COLUMN_ALIASES = { + "STATE": "ST", +} + class CensusACS(Dataset): data_format = Dataset.TABLES def generate(self) -> None: - spm_url = f"https://www2.census.gov/programs-surveys/supplemental-poverty-measure/datasets/spm/spm_{self.time_period}_pu.dta" person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_pus.zip" household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_hus.zip" @@ -85,17 +88,19 @@ def process_household_data( f.write(chunk) f.seek(0) zf = ZipFile(f) + usecols = set(columns) | set(HOUSEHOLD_COLUMN_ALIASES) a = pd.read_csv( zf.open(prefix + "a.csv"), - usecols=columns, + usecols=lambda c: c in usecols, dtype={"SERIALNO": str}, ) b = pd.read_csv( zf.open(prefix + "b.csv"), - usecols=columns, + usecols=lambda c: c in usecols, dtype={"SERIALNO": str}, ) res = pd.concat([a, b]).fillna(0) + res = res.rename(columns=HOUSEHOLD_COLUMN_ALIASES) res.columns = res.columns.str.upper() # Ensure correct data types @@ -198,3 +203,10 @@ class CensusACS_2022(CensusACS): name = "census_acs_2022.h5" file_path = STORAGE_FOLDER / "census_acs_2022.h5" time_period = 2022 + + +class CensusACS_2024(CensusACS): + label = "Census ACS (2024)" + name = "census_acs_2024.h5" + file_path = STORAGE_FOLDER / "census_acs_2024.h5" + time_period = 2024 diff --git a/policyengine_us_data/datasets/cps/__init__.py b/policyengine_us_data/datasets/cps/__init__.py index 2411ca43b..ca3122eeb 100644 --- a/policyengine_us_data/datasets/cps/__init__.py +++ b/policyengine_us_data/datasets/cps/__init__.py @@ -1,3 +1,4 @@ -from .cps import * -from .extended_cps import * -from .enhanced_cps import * +from .cps import * # noqa: F403 +from .extended_cps import * # noqa: F403 +from .source_imputed_cps import * # noqa: F403 +from .enhanced_cps import * # noqa: F403 diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 418d73963..cfe42bace 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -6,13 +6,11 @@ from pandas import DataFrame, Series import numpy as np import pandas as pd -import os import yaml from typing import Type from policyengine_us_data.utils.uprating import ( create_policyengine_uprating_factors_table, ) -from microimpute.models.qrf import QRF import logging from policyengine_us_data.parameters import load_take_up_rate from policyengine_us_data.utils.randomness import seeded_rng @@ -70,12 +68,6 @@ def generate(self): add_spm_variables(self, cps, spm_unit) logging.info("Adding household variables") add_household_variables(cps, household) - logging.info("Adding rent") - add_rent(self, cps, person, household) - logging.info("Adding tips") - add_tips(self, cps) - logging.info("Adding auto loan balance, interest and wealth") - add_auto_loan_interest_and_net_worth(self, cps) logging.info("Added all variables") raw_data.close() @@ -124,70 +116,6 @@ def downsample(self, frac: float): self.save_dataset(original_data) - -def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): - cps["tenure_type"] = household.H_TENURE.map( - { - 0: "NONE", - 1: "OWNED_WITH_MORTGAGE", - 2: "RENTED", - 3: "NONE", - } - ).astype("S") - self.save_dataset(cps) - - from policyengine_us_data.datasets.acs.acs import ACS_2022 - from policyengine_us import Microsimulation - - acs = Microsimulation(dataset=ACS_2022) - cps_sim = Microsimulation(dataset=self) - - PREDICTORS = [ - "is_household_head", - "age", - "is_male", - "tenure_type", - "employment_income", - "self_employment_income", - "social_security", - "pension_income", - "state_code_str", - "household_size", - ] - IMPUTATIONS = ["rent", "real_estate_taxes"] - train_df = acs.calculate_dataframe(PREDICTORS + IMPUTATIONS) - train_df.tenure_type = train_df.tenure_type.map( - { - "OWNED_OUTRIGHT": "OWNED_WITH_MORTGAGE", - }, - na_action="ignore", - ).fillna(train_df.tenure_type) - train_df = train_df[train_df.is_household_head].sample(10_000) - inference_df = cps_sim.calculate_dataframe(PREDICTORS) - mask = inference_df.is_household_head.values - inference_df = inference_df[mask] - - qrf = QRF() - logging.info("Training imputation model for rent and real estate taxes.") - fitted_model = qrf.fit( - X_train=train_df, - predictors=PREDICTORS, - imputed_variables=IMPUTATIONS, - ) - logging.info("Imputing rent and real estate taxes.") - imputed_values = fitted_model.predict(X_test=inference_df) - logging.info("Imputation complete.") - cps["rent"] = np.zeros_like(cps["age"]) - cps["rent"][mask] = imputed_values["rent"] - # Assume zero housing assistance since - cps["pre_subsidy_rent"] = cps["rent"] - cps["housing_assistance"] = np.zeros_like( - cps["spm_unit_capped_housing_subsidy_reported"] - ) - cps["real_estate_taxes"] = np.zeros_like(cps["age"]) - cps["real_estate_taxes"][mask] = imputed_values["real_estate_taxes"] - - def add_takeup(self): data = self.load_dataset() @@ -1748,82 +1676,6 @@ def _update_documentation_with_numbers(log_df, docs_dir): print(f"Documentation updated with population numbers: {doc_path}") - -def add_tips(self, cps: h5py.File): - self.save_dataset(cps) - from policyengine_us import Microsimulation - - sim = Microsimulation(dataset=self) - cps = sim.calculate_dataframe( - [ - "person_id", - "household_id", - "employment_income", - "age", - "household_weight", - "is_female", - ], - 2025, - ) - cps = pd.DataFrame(cps) - - # Get is_married from raw CPS data (A_MARITL codes: 1,2 = married) - # Note: is_married in policyengine-us is Family-level, but we need - # person-level for imputation models - raw_data = self.raw_cps(require=True).load() - raw_person = raw_data["person"] - cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values - raw_data.close() - - cps["is_under_18"] = cps.age < 18 - cps["is_under_6"] = cps.age < 6 - cps["count_under_18"] = ( - cps.groupby("household_id")["is_under_18"] - .sum() - .loc[cps.household_id.values] - .values - ) - cps["count_under_6"] = ( - cps.groupby("household_id")["is_under_6"] - .sum() - .loc[cps.household_id.values] - .values - ) - cps = pd.DataFrame(cps) - - # Impute tips - - from policyengine_us_data.datasets.sipp import get_tip_model - - model = get_tip_model() - - cps["tip_income"] = model.predict( - X_test=cps, - mean_quantile=0.5, - ).tip_income.values - - # Impute liquid assets from SIPP (bank accounts, stocks, bonds) - - from policyengine_us_data.datasets.sipp import get_asset_model - - asset_model = get_asset_model() - - asset_predictions = asset_model.predict( - X_test=cps, - mean_quantile=0.5, - ) - cps["bank_account_assets"] = asset_predictions.bank_account_assets.values - cps["stock_assets"] = asset_predictions.stock_assets.values - cps["bond_assets"] = asset_predictions.bond_assets.values - - # Drop temporary columns used only for imputation - # is_married is person-level here but policyengine-us defines it at Family - # level, so we must not save it - cps = cps.drop(columns=["is_married", "is_under_18", "is_under_6"], errors="ignore") - - self.save_dataset(cps) - - def add_overtime_occupation(cps: h5py.File, person: DataFrame) -> None: """Add occupation categories relevant to overtime eligibility calculations. Based on: @@ -1867,291 +1719,6 @@ def add_overtime_occupation(cps: h5py.File, person: DataFrame) -> None: ] ) - -def add_auto_loan_interest_and_net_worth(self, cps: h5py.File) -> None: - """ "Add auto loan balance, interest and net_worth variable.""" - self.save_dataset(cps) - cps_data = self.load_dataset() - - # Access raw CPS for additional variables - raw_data_instance = self.raw_cps(require=True) - raw_data = raw_data_instance.load() - person_data = raw_data.person - - # Preprocess the CPS for imputation - lengths = {k: len(v) for k, v in cps_data.items()} - var_len = cps_data["person_household_id"].shape[0] - vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] - agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) - agg_data["interest_dividend_income"] = np.sum( - [ - agg_data["taxable_interest_income"], - agg_data["tax_exempt_interest_income"], - agg_data["qualified_dividend_income"], - agg_data["non_qualified_dividend_income"], - ], - axis=0, - ) - agg_data["social_security_pension_income"] = np.sum( - [ - agg_data["tax_exempt_private_pension_income"], - agg_data["taxable_private_pension_income"], - agg_data["social_security_retirement"], - ], - axis=0, - ) - - agg = ( - agg_data.groupby("person_household_id")[ - [ - "employment_income", - "interest_dividend_income", - "social_security_pension_income", - ] - ] - .sum() - .rename( - columns={ - "employment_income": "household_employment_income", - "interest_dividend_income": "household_interest_dividend_income", - "social_security_pension_income": "household_social_security_pension_income", - } - ) - .reset_index() - ) - - def create_scf_reference_person_mask(cps_data, raw_person_data): - """ - Create a boolean mask identifying SCF-style reference persons. - - SCF Reference Person Definition: - - Single adult in household without a couple - - In households with couples: male in mixed-sex couple OR older person in same-sex couple - """ - all_persons_data = pd.DataFrame( - { - "person_household_id": cps_data["person_household_id"], - "age": cps_data["age"], - } - ) - - # Add sex variable (PESEX=2 means female in CPS) - all_persons_data["is_female"] = (raw_person_data.A_SEX == 2).values - - # Add marital status (A_MARITL codes: 1,2 = married with spouse present/absent) - all_persons_data["is_married"] = raw_person_data.A_MARITL.isin([1, 2]).values - - # Define adults as age 18+ - all_persons_data["is_adult"] = all_persons_data["age"] >= 18 - - # Count adults per household - adults_per_household = ( - all_persons_data[all_persons_data["is_adult"]] - .groupby("person_household_id") - .size() - .reset_index(name="n_adults") - ) - all_persons_data = all_persons_data.merge( - adults_per_household, on="person_household_id", how="left" - ) - - # Identify couple households (households with exactly 2 married adults) - married_adults_per_household = ( - all_persons_data[ - (all_persons_data["is_adult"]) & (all_persons_data["is_married"]) - ] - .groupby("person_household_id") - .size() - ) - - couple_households = married_adults_per_household[ - (married_adults_per_household == 2) - & (all_persons_data.groupby("person_household_id")["n_adults"].first() == 2) - ].index - - all_persons_data["is_couple_household"] = all_persons_data[ - "person_household_id" - ].isin(couple_households) - - def determine_reference_person(group): - """Determine reference person for a household group.""" - adults = group[group["is_adult"]] - - if len(adults) == 0: - # No adults - select the oldest person regardless of age - reference_idx = group["age"].idxmax() - result = pd.Series([False] * len(group), index=group.index) - result[reference_idx] = True - return result - - elif len(adults) == 1: - # Only one adult - they are the reference person - result = pd.Series([False] * len(group), index=group.index) - result[adults.index[0]] = True - return result - - elif group["is_couple_household"].iloc[0] and len(adults) == 2: - # Couple household with 2 adults - couple_adults = adults.copy() - - # Check if same-sex couple - if couple_adults["is_female"].nunique() == 1: - # Same-sex couple - choose older person - reference_idx = couple_adults["age"].idxmax() - else: - # Mixed-sex couple - choose male (is_female = False) - male_adults = couple_adults[~couple_adults["is_female"]] - if len(male_adults) > 0: - reference_idx = male_adults.index[0] - else: - # Fallback to older person - reference_idx = couple_adults["age"].idxmax() - - result = pd.Series([False] * len(group), index=group.index) - result[reference_idx] = True - return result - - else: - # Multiple adults but not a couple household - # Use the oldest adult as reference person - reference_idx = adults["age"].idxmax() - result = pd.Series([False] * len(group), index=group.index) - result[reference_idx] = True - return result - - # Apply the reference person logic to each household - all_persons_data["is_scf_reference_person"] = ( - all_persons_data.groupby("person_household_id") - .apply(determine_reference_person, include_groups=False) - .reset_index(level=0, drop=True) - ) - - return all_persons_data["is_scf_reference_person"].values - - mask = create_scf_reference_person_mask(cps_data, person_data) - mask_len = mask.shape[0] - - cps_data = { - var: data[mask] if data.shape[0] == mask_len else data - for var, data in cps_data.items() - } - - CPS_RACE_MAPPING = { - 1: 1, # White only -> WHITE - 2: 2, # Black only -> BLACK/AFRICAN-AMERICAN - 3: 5, # American Indian, Alaskan Native only -> OTHER - 4: 4, # Asian only -> ASIAN - 5: 5, # Hawaiian/Pacific Islander only -> OTHER - 6: 5, # White-Black -> OTHER - 7: 5, # White-AI -> OTHER - 8: 5, # White-Asian -> OTHER - 9: 3, # White-HP -> HISPANIC - 10: 5, # Black-AI -> OTHER - 11: 5, # Black-Asian -> OTHER - 12: 3, # Black-HP -> HISPANIC - 13: 5, # AI-Asian -> OTHER - 14: 5, # AI-HP -> OTHER - 15: 3, # Asian-HP -> HISPANIC - 16: 5, # White-Black-AI -> OTHER - 17: 5, # White-Black-Asian -> OTHER - 18: 5, # White-Black-HP -> OTHER - 19: 5, # White-AI-Asian -> OTHER - 20: 5, # White-AI-HP -> OTHER - 21: 5, # White-Asian-HP -> OTHER - 22: 5, # Black-AI-Asian -> OTHER - 23: 5, # White-Black-AI-Asian -> OTHER - 24: 5, # White-AI-Asian-HP -> OTHER - 25: 5, # Other 3 race comb. -> OTHER - 26: 5, # Other 4 or 5 race comb. -> OTHER - } - - # Apply the mapping to recode the race values - cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"]) - - lengths = {k: len(v) for k, v in cps_data.items()} - var_len = cps_data["person_household_id"].shape[0] - vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] - receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) - - receiver_data = receiver_data.merge( - agg[ - [ - "person_household_id", - "household_employment_income", - "household_interest_dividend_income", - "household_social_security_pension_income", - ] - ], - on="person_household_id", - how="left", - ) - receiver_data.drop("employment_income", axis=1, inplace=True) - - receiver_data.rename( - columns={ - "household_employment_income": "employment_income", - "household_interest_dividend_income": "interest_dividend_income", - "household_social_security_pension_income": "social_security_pension_income", - }, - inplace=True, - ) - - # Add is_married variable for household heads based on raw person data - reference_persons = person_data[mask] - receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values - - # Impute auto loan balance from the SCF - from policyengine_us_data.datasets.scf.scf import SCF_2022 - - scf_dataset = SCF_2022() - scf_data = scf_dataset.load_dataset() - scf_data = pd.DataFrame({key: scf_data[key] for key in scf_data.keys()}) - - PREDICTORS = [ - "age", - "is_female", - "cps_race", - "is_married", - "own_children_in_household", - "employment_income", - "interest_dividend_income", - "social_security_pension_income", - ] - IMPUTED_VARIABLES = ["networth", "auto_loan_balance", "auto_loan_interest"] - weights = ["wgt"] - - donor_data = scf_data[PREDICTORS + IMPUTED_VARIABLES + weights].copy() - - from microimpute.models.qrf import QRF - import logging - import os - - # Set root logger level - log_level = os.getenv("PYTHON_LOG_LEVEL", "WARNING") - - # Specifically target the microimpute logger - logging.getLogger("microimpute").setLevel(getattr(logging, log_level)) - - qrf_model = QRF() - donor_data = donor_data.sample(frac=0.5, random_state=42).reset_index(drop=True) - fitted_model = qrf_model.fit( - X_train=donor_data, - predictors=PREDICTORS, - imputed_variables=IMPUTED_VARIABLES, - weight_col=weights[0], - tune_hyperparameters=False, - ) - imputations = fitted_model.predict(X_test=receiver_data) - - for var in IMPUTED_VARIABLES: - cps[var] = imputations[var] - - cps["net_worth"] = cps["networth"] - del cps["networth"] - - self.save_dataset(cps) - - class CPS_2019(CPS): name = "cps_2019" label = "CPS 2019" diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index eb841488c..5ebf028ee 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -1,10 +1,7 @@ from policyengine_core.data import Dataset import pandas as pd from policyengine_us_data.utils import ( - pe_to_soi, - get_soi, build_loss_matrix, - fmt, HardConcrete, print_reweighting_diagnostics, set_seeds, @@ -14,10 +11,9 @@ from tqdm import trange from typing import Type from policyengine_us_data.storage import STORAGE_FOLDER -from policyengine_us_data.datasets.cps.extended_cps import ( - ExtendedCPS_2024, - ExtendedCPS_2024_Half, - CPS_2024, +from policyengine_us_data.datasets.cps.cps import CPS_2024 +from policyengine_us_data.datasets.cps.source_imputed_cps import ( + SourceImputedStratifiedExtendedCPS_2024, ) import logging @@ -88,7 +84,7 @@ def loss(weights): optimizer.zero_grad() masked = torch.exp(weights) * gates() l_main = loss(masked) - l = l_main + l0_lambda * gates.get_penalty() + total_loss = l_main + l0_lambda * gates.get_penalty() if (log_path is not None) and (i % 10 == 0): gates.eval() estimates = (torch.exp(weights) * gates()) @ loss_matrix @@ -112,10 +108,12 @@ def loss(weights): if (log_path is not None) and (i % 1000 == 0): performance.to_csv(log_path, index=False) if start_loss is None: - start_loss = l.item() - loss_rel_change = (l.item() - start_loss) / start_loss - l.backward() - iterator.set_postfix({"loss": l.item(), "loss_rel_change": loss_rel_change}) + start_loss = total_loss.item() + loss_rel_change = (total_loss.item() - start_loss) / start_loss + total_loss.backward() + iterator.set_postfix( + {"loss": total_loss.item(), "loss_rel_change": loss_rel_change} + ) optimizer.step() if log_path is not None: performance.to_csv(log_path, index=False) @@ -249,7 +247,7 @@ def generate(self): class EnhancedCPS_2024(EnhancedCPS): - input_dataset = ExtendedCPS_2024_Half + input_dataset = SourceImputedStratifiedExtendedCPS_2024 start_year = 2024 end_year = 2024 name = "enhanced_cps_2024" diff --git a/policyengine_us_data/datasets/cps/source_imputed_cps.py b/policyengine_us_data/datasets/cps/source_imputed_cps.py new file mode 100644 index 000000000..2863a60cc --- /dev/null +++ b/policyengine_us_data/datasets/cps/source_imputed_cps.py @@ -0,0 +1,83 @@ +from typing import Type + +from policyengine_core.data import Dataset + +from policyengine_us_data.calibration.create_source_imputed_cps import ( + create_source_imputed_cps, +) +from policyengine_us_data.calibration.create_stratified_cps import ( + create_stratified_cps_dataset, +) +from policyengine_us_data.datasets.cps.cps import CPS_2024 +from policyengine_us_data.datasets.cps.extended_cps import ExtendedCPS_2024 +from policyengine_us_data.storage import STORAGE_FOLDER + + +class StratifiedExtendedCPS(Dataset): + data_format = Dataset.TIME_PERIOD_ARRAYS + base_dataset: Type[Dataset] + target_households = 30_000 + high_income_percentile = 99 + oversample_poor = False + seed = None + + def generate(self): + self.base_dataset(require=True) + create_stratified_cps_dataset( + target_households=self.target_households, + high_income_percentile=self.high_income_percentile, + oversample_poor=self.oversample_poor, + seed=self.seed, + base_dataset=str(self.base_dataset.file_path), + output_path=str(self.file_path), + ) + + +class StratifiedExtendedCPS_2024(StratifiedExtendedCPS): + base_dataset = ExtendedCPS_2024 + name = "stratified_extended_cps_2024" + label = "Stratified Extended CPS (2024)" + file_path = STORAGE_FOLDER / "stratified_extended_cps_2024.h5" + time_period = 2024 + + +class SourceImputedDataset(Dataset): + data_format = Dataset.TIME_PERIOD_ARRAYS + input_dataset: Type[Dataset] + seed = 42 + use_existing_state_fips = False + + def generate(self): + self.input_dataset(require=True) + create_source_imputed_cps( + input_path=str(self.input_dataset.file_path), + output_path=str(self.file_path), + seed=self.seed, + use_existing_state_fips=self.use_existing_state_fips, + time_period=self.time_period, + ) + + +class SourceImputedCPS(SourceImputedDataset): + use_existing_state_fips = True + + +class SourceImputedCPS_2024(SourceImputedCPS): + input_dataset = CPS_2024 + name = "source_imputed_cps_2024" + label = "Source-Imputed CPS (2024)" + file_path = STORAGE_FOLDER / "source_imputed_cps_2024.h5" + time_period = 2024 + + +class SourceImputedStratifiedExtendedCPS(SourceImputedDataset): + pass + + +class SourceImputedStratifiedExtendedCPS_2024(SourceImputedStratifiedExtendedCPS): + input_dataset = StratifiedExtendedCPS_2024 + name = "source_imputed_stratified_extended_cps_2024" + label = "Source-Imputed Stratified Extended CPS (2024)" + file_path = STORAGE_FOLDER / "source_imputed_stratified_extended_cps_2024.h5" + url = "hf://policyengine/policyengine-us-data/calibration/source_imputed_stratified_extended_cps.h5" + time_period = 2024 diff --git a/policyengine_us_data/tests/test_calibration/test_source_impute.py b/policyengine_us_data/tests/test_calibration/test_source_impute.py index 517a559ef..dd0527034 100644 --- a/policyengine_us_data/tests/test_calibration/test_source_impute.py +++ b/policyengine_us_data/tests/test_calibration/test_source_impute.py @@ -4,20 +4,27 @@ """ import numpy as np +import pandas as pd from policyengine_us_data.calibration.source_impute import ( ACS_IMPUTED_VARIABLES, ACS_PREDICTORS, ALL_SOURCE_VARIABLES, + NET_WORTH_TOTAL_TARGETS, SCF_IMPUTED_VARIABLES, SCF_PREDICTORS, + SCF_DONOR_UPRATING_MAP, SIPP_ASSETS_PREDICTORS, SIPP_IMPUTED_VARIABLES, SIPP_TIPS_PREDICTORS, + _build_household_scf_receiver, + _household_values_from_data, + _align_weighted_total, _impute_acs, _impute_scf, _impute_sipp, _person_state_fips, + _uprate_scf_donor_frame, impute_source_variables, ) @@ -41,6 +48,30 @@ def _make_data_dict(n_persons=20, time_period=2024): "employment_income": { time_period: rng.uniform(0, 100000, n_persons).astype(np.float32), }, + "taxable_interest_income": { + time_period: rng.uniform(0, 5000, n_persons).astype(np.float32), + }, + "qualified_dividend_income": { + time_period: rng.uniform(0, 4000, n_persons).astype(np.float32), + }, + "taxable_private_pension_income": { + time_period: rng.uniform(0, 6000, n_persons).astype(np.float32), + }, + "social_security_retirement": { + time_period: rng.uniform(0, 8000, n_persons).astype(np.float32), + }, + "is_male": { + time_period: rng.integers(0, 2, n_persons).astype(np.float32), + }, + "cps_race": { + time_period: rng.integers(1, 5, n_persons).astype(np.float32), + }, + "is_married": { + time_period: rng.integers(0, 2, n_persons).astype(np.float32), + }, + "own_children_in_household": { + time_period: rng.integers(0, 3, n_persons).astype(np.float32), + }, "rent": {time_period: np.zeros(n_persons)}, "real_estate_taxes": {time_period: np.zeros(n_persons)}, "tip_income": {time_period: np.zeros(n_persons)}, @@ -75,6 +106,20 @@ def test_all_source_variables_defined(self): ) assert ALL_SOURCE_VARIABLES == expected + def test_scf_uprating_map_covers_scf_money_columns(self): + expected = { + "employment_income", + "interest_dividend_income", + "social_security_pension_income", + "net_worth", + "auto_loan_balance", + "auto_loan_interest", + } + assert expected == set(SCF_DONOR_UPRATING_MAP) + + def test_net_worth_total_targets_defined_for_2024(self): + assert NET_WORTH_TOTAL_TARGETS[2024] == 160e12 + class TestPredictorLists: def test_acs_uses_state(self): @@ -196,6 +241,135 @@ def test_fallback_unequal_sizes(self): assert len(result) == 5 +class TestHouseholdReceiverHelpers: + def test_household_values_from_data_aggregates_person_arrays(self): + data = { + "household_id": {2024: np.array([10, 20])}, + "person_household_id": {2024: np.array([10, 10, 20, 20])}, + "employment_income": {2024: np.array([1, 2, 3, 4], dtype=np.float32)}, + "age": {2024: np.array([30, 31, 40, 41], dtype=np.float32)}, + } + + summed = _household_values_from_data( + data, + "employment_income", + 2024, + data["household_id"][2024], + data["person_household_id"][2024], + how="sum", + ) + first = _household_values_from_data( + data, + "age", + 2024, + data["household_id"][2024], + data["person_household_id"][2024], + how="first", + ) + + np.testing.assert_array_equal(summed, np.array([3, 7], dtype=np.float32)) + np.testing.assert_array_equal(first, np.array([30, 40], dtype=np.float32)) + + def test_build_household_scf_receiver_uses_household_level_predictors(self): + data = { + "household_id": {2024: np.array([10, 20])}, + "person_household_id": {2024: np.array([10, 10, 20, 20])}, + "age": {2024: np.array([30, 31, 40, 41], dtype=np.float32)}, + "is_male": {2024: np.array([1, 0, 0, 0], dtype=np.float32)}, + "cps_race": {2024: np.array([1, 1, 3, 3], dtype=np.float32)}, + "is_married": {2024: np.array([1, 1, 0, 0], dtype=np.float32)}, + "own_children_in_household": { + 2024: np.array([2, 2, 1, 1], dtype=np.float32) + }, + "employment_income": { + 2024: np.array([10_000, 20_000, 30_000, 40_000], dtype=np.float32) + }, + "taxable_interest_income": { + 2024: np.array([100, 150, 200, 250], dtype=np.float32) + }, + "qualified_dividend_income": { + 2024: np.array([50, 50, 60, 60], dtype=np.float32) + }, + "taxable_private_pension_income": { + 2024: np.array([500, 500, 700, 700], dtype=np.float32) + }, + "social_security_retirement": { + 2024: np.array([250, 250, 300, 300], dtype=np.float32) + }, + } + + receiver = _build_household_scf_receiver(data, 2024) + + np.testing.assert_array_equal(receiver["household_id"], np.array([10, 20])) + np.testing.assert_array_equal( + receiver["employment_income"], + np.array([30_000, 70_000], dtype=np.float32), + ) + np.testing.assert_array_equal( + receiver["interest_dividend_income"], + np.array([350, 570], dtype=np.float32), + ) + np.testing.assert_array_equal( + receiver["social_security_pension_income"], + np.array([1_500, 2_000], dtype=np.float32), + ) + np.testing.assert_array_equal( + receiver["is_female"], + np.array([0, 1], dtype=np.float32), + ) + + +class TestScfDonorUprating: + def test_align_weighted_total_hits_target(self): + values = np.array([10.0, 30.0], dtype=np.float32) + weights = np.array([2.0, 1.0], dtype=np.float32) + + aligned = _align_weighted_total(values, weights, target_total=100.0) + + assert np.isclose(np.dot(aligned, weights), 100.0) + + def test_uprate_scf_donor_frame_noops_same_year(self): + donor = pd.DataFrame( + { + "employment_income": [10_000.0], + "net_worth": [50_000.0], + "wgt": [1.0], + } + ) + + result = _uprate_scf_donor_frame(donor, from_year=2022, to_year=2022) + + pd.testing.assert_frame_equal(result, donor) + + def test_uprate_scf_donor_frame_changes_monetary_columns(self): + donor = pd.DataFrame( + { + "employment_income": [10_000.0], + "interest_dividend_income": [2_000.0], + "social_security_pension_income": [3_000.0], + "net_worth": [50_000.0], + "auto_loan_balance": [12_000.0], + "auto_loan_interest": [900.0], + "age": [55.0], + "wgt": [1.0], + } + ) + + result = _uprate_scf_donor_frame(donor, from_year=2022, to_year=2024) + + for column in [ + "employment_income", + "interest_dividend_income", + "social_security_pension_income", + "net_worth", + "auto_loan_balance", + "auto_loan_interest", + ]: + assert result[column].iloc[0] > donor[column].iloc[0] + assert result["age"].iloc[0] == donor["age"].iloc[0] + assert result["wgt"].iloc[0] == donor["wgt"].iloc[0] + + class TestSubfunctions: def test_impute_acs_exists(self): assert callable(_impute_acs) diff --git a/policyengine_us_data/tests/test_calibration/test_target_config.py b/policyengine_us_data/tests/test_calibration/test_target_config.py index 377d3a640..15c803c36 100644 --- a/policyengine_us_data/tests/test_calibration/test_target_config.py +++ b/policyengine_us_data/tests/test_calibration/test_target_config.py @@ -1,5 +1,7 @@ """Tests for target config filtering in unified calibration.""" +from pathlib import Path + import numpy as np import pandas as pd import pytest @@ -134,6 +136,32 @@ def test_load_empty_config(self, tmp_path): config = load_target_config(str(config_file)) assert config["exclude"] == [] + def test_default_training_config_excludes_national_net_worth(self): + config = load_target_config( + str( + Path(__file__).resolve().parents[2] + / "calibration" + / "target_config.yaml" + ) + ) + assert { + "variable": "net_worth", + "geo_level": "national", + } not in config["include"] + + def test_national_training_config_includes_national_net_worth(self): + config = load_target_config( + str( + Path(__file__).resolve().parents[2] + / "calibration" + / "target_config_national.yaml" + ) + ) + assert { + "variable": "net_worth", + "geo_level": "national", + } in config["include"] + class TestCalibrationPackageRoundTrip: def test_round_trip(self, sample_targets, tmp_path): diff --git a/policyengine_us_data/tests/test_datasets/test_acs.py b/policyengine_us_data/tests/test_datasets/test_acs.py index 5c0d61221..88c728097 100644 --- a/policyengine_us_data/tests/test_datasets/test_acs.py +++ b/policyengine_us_data/tests/test_datasets/test_acs.py @@ -1,13 +1,13 @@ import pytest -from policyengine_us import Microsimulation -@pytest.mark.parametrize("year", [2022]) +@pytest.mark.parametrize("year", [2022, 2024]) def test_acs_generates(year: int): - from policyengine_us_data.datasets.acs.acs import ACS_2022 + from policyengine_us_data.datasets.acs.acs import ACS_2022, ACS_2024 dataset_by_year = { 2022: ACS_2022, + 2024: ACS_2024, } dataset = dataset_by_year[year]() diff --git a/policyengine_us_data/tests/test_datasets/test_cps.py b/policyengine_us_data/tests/test_datasets/test_cps.py index f03469393..d7eace450 100644 --- a/policyengine_us_data/tests/test_datasets/test_cps.py +++ b/policyengine_us_data/tests/test_datasets/test_cps.py @@ -1,13 +1,12 @@ -import pytest import numpy as np -def test_cps_has_auto_loan_interest(): - from policyengine_us_data.datasets.cps import CPS_2024 +def test_source_imputed_cps_has_auto_loan_interest(): + from policyengine_us_data.datasets.cps import SourceImputedCPS_2024 from policyengine_us import Microsimulation - sim = Microsimulation(dataset=CPS_2024) - # Ensure we impute around $85 billion in overtime premium with 25% error bounds. + sim = Microsimulation(dataset=SourceImputedCPS_2024) + # Ensure we impute around $85 billion in auto loan interest with 40% bounds. AUTO_LOAN_INTEREST_TARGET = 85e9 AUTO_LOAN_BALANCE_TARGET = 1550e9 RELATIVE_TOLERANCE = 0.4 @@ -36,11 +35,11 @@ def test_cps_has_fsla_overtime_premium(): ) -def test_cps_has_net_worth(): - from policyengine_us_data.datasets.cps import CPS_2022 +def test_source_imputed_cps_has_net_worth(): + from policyengine_us_data.datasets.cps import SourceImputedCPS_2024 from policyengine_us import Microsimulation - sim = Microsimulation(dataset=CPS_2022) + sim = Microsimulation(dataset=SourceImputedCPS_2024) # Ensure we impute around 160 trillion in net worth with 25% error bounds. # https://fred.stlouisfed.org/series/BOGZ1FL192090005Q NET_WORTH_TARGET = 160e12 diff --git a/policyengine_us_data/tests/test_datasets/test_cps_pipeline.py b/policyengine_us_data/tests/test_datasets/test_cps_pipeline.py new file mode 100644 index 000000000..41f155b70 --- /dev/null +++ b/policyengine_us_data/tests/test_datasets/test_cps_pipeline.py @@ -0,0 +1,34 @@ +from pathlib import Path + + +def test_source_imputed_dataset_matches_calibration_artifact_paths(): + from policyengine_us_data.calibration.create_source_imputed_cps import ( + INPUT_PATH, + OUTPUT_PATH, + ) + from policyengine_us_data.datasets.cps import ( + SourceImputedStratifiedExtendedCPS_2024, + StratifiedExtendedCPS_2024, + ) + + assert StratifiedExtendedCPS_2024.file_path == Path(INPUT_PATH) + assert SourceImputedStratifiedExtendedCPS_2024.file_path == Path(OUTPUT_PATH) + assert ( + SourceImputedStratifiedExtendedCPS_2024.input_dataset + is StratifiedExtendedCPS_2024 + ) + + +def test_source_imputed_cps_uses_base_cps_input(): + from policyengine_us_data.datasets.cps import CPS_2024, SourceImputedCPS_2024 + + assert SourceImputedCPS_2024.input_dataset is CPS_2024 + + +def test_enhanced_cps_uses_source_imputed_stratified_input(): + from policyengine_us_data.datasets.cps import ( + EnhancedCPS_2024, + SourceImputedStratifiedExtendedCPS_2024, + ) + + assert EnhancedCPS_2024.input_dataset is SourceImputedStratifiedExtendedCPS_2024