Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions policyengine_us_data/calibration/source_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,13 +761,19 @@ def _impute_org(
if "self_employment_income" in cps_df.columns
else None
)
n_persons = len(data["person_id"][time_period])
predictions = predict_org_features(
receiver,
self_employment_income=self_employment_income,
)

for var in ORG_IMPUTED_VARIABLES:
values = predictions[var].values
if len(values) != n_persons:
raise ValueError(
f"ORG prediction for '{var}' has {len(values)} entries "
f"but dataset has {n_persons} persons"
)
if var in ORG_BOOL_VARIABLES:
data[var] = {time_period: values.astype(bool)}
else:
Expand Down
48 changes: 19 additions & 29 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
build_org_receiver_frame,
predict_org_features,
)
from policyengine_us_data.utils.downsample import downsample_dataset_arrays
from policyengine_us_data.utils.randomness import seeded_rng


Expand Down Expand Up @@ -102,38 +103,16 @@ def generate(self):
def downsample(self, frac: float):
from policyengine_us import Microsimulation

# Store original dtypes before modifying
original_data: dict = self.load_dataset()
original_dtypes = {key: original_data[key].dtype for key in original_data}
sim = Microsimulation(dataset=self)
sim.subsample(frac=frac)

for key in original_data:
if key not in sim.tax_benefit_system.variables:
logging.warning(
f"Attempting to downsample the variable {key} but failing because it is not in the given country package."
)
continue
values = sim.calculate(key).values

# Preserve the original dtype if possible
if (
key in original_dtypes
and hasattr(values, "dtype")
and values.dtype != original_dtypes[key]
):
try:
original_data[key] = values.astype(original_dtypes[key])
except:
# If conversion fails, log it but continue
logging.warning(
f"Could not convert {key} back to {original_dtypes[key]}"
)
original_data[key] = values
else:
original_data[key] = values

self.save_dataset(original_data)
self.save_dataset(
downsample_dataset_arrays(
original_data=original_data,
sim=sim,
dataset_name=self.name,
)
)


def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame):
Expand Down Expand Up @@ -1838,6 +1817,7 @@ def add_tips(self, cps: h5py.File):

def add_org_labor_market_inputs(cps: h5py.File) -> None:
"""Impute ORG-derived wage and union inputs onto CPS persons."""
n_persons = len(np.asarray(cps["age"]))
household_ids = np.asarray(cps["household_id"], dtype=np.int64)
person_household_ids = np.asarray(
cps["person_household_id"],
Expand All @@ -1864,6 +1844,11 @@ def add_org_labor_market_inputs(cps: h5py.File) -> None:
employment_income=cps["employment_income"],
weekly_hours_worked=cps["weekly_hours_worked"],
)
if len(receiver) != n_persons:
raise ValueError(
f"ORG receiver frame has {len(receiver)} rows but CPS has "
f"{n_persons} persons"
)
self_employment_income = np.asarray(
cps.get(
"self_employment_income",
Expand All @@ -1878,6 +1863,11 @@ def add_org_labor_market_inputs(cps: h5py.File) -> None:

for variable in ORG_IMPUTED_VARIABLES:
values = predictions[variable].values
if len(values) != n_persons:
raise ValueError(
f"ORG prediction for '{variable}' has {len(values)} entries "
f"but CPS has {n_persons} persons"
)
if variable in ORG_BOOL_VARIABLES:
cps[variable] = values.astype(bool)
else:
Expand Down
5 changes: 5 additions & 0 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ def _splice_cps_only_predictions(
)

n_half = entity_half_lengths.get(entity_key, len(data[var][time_period]) // 2)
if len(pred_values) != n_half:
raise ValueError(
f"Stage-2 prediction for '{var}' has {len(pred_values)} "
f"entries but expected {n_half} (half of {entity_key})"
)
values = data[var][time_period]
# First half: keep original CPS values.
# Second half: replace with QRF predictions.
Expand Down
13 changes: 12 additions & 1 deletion policyengine_us_data/datasets/org/org.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,24 @@ def predict_org_features(
if missing:
raise ValueError(f"ORG receiver frame missing required columns: {missing}")

n_receiver = len(receiver)
predictions = get_org_model().predict(X_test=receiver[ORG_PREDICTORS])
if len(predictions) != n_receiver:
raise ValueError(
f"ORG QRF returned {len(predictions)} rows but receiver has "
f"{n_receiver} rows; predictions must match receiver length"
)
predictions["is_union_member_or_covered"] = _predict_union_coverage_from_bls_tables(
receiver,
self_employment_income=self_employment_income,
)
return apply_org_domain_constraints(
result = apply_org_domain_constraints(
predictions=predictions,
receiver=receiver,
self_employment_income=self_employment_income,
)
if len(result) != n_receiver:
raise ValueError(
f"ORG post-processing changed row count from {n_receiver} to {len(result)}"
)
return result
36 changes: 9 additions & 27 deletions policyengine_us_data/datasets/scf/scf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from typing import Type
from filelock import FileLock

from policyengine_us_data.utils.downsample import downsample_dataset_arrays


class SCF(Dataset):
"""Dataset containing processed Survey of Consumer Finances data."""
Expand Down Expand Up @@ -115,36 +117,16 @@ def downsample(self, frac: float):
"""
from policyengine_us import Microsimulation

# Store original dtypes before modifying
original_data: dict = self.load_dataset()
original_dtypes = {key: original_data[key].dtype for key in original_data}

sim = Microsimulation(dataset=self)
sim.subsample(frac=frac)

for key in original_data:
if key not in sim.tax_benefit_system.variables:
continue
values = sim.calculate(key).values

# Preserve the original dtype if possible
if (
key in original_dtypes
and hasattr(values, "dtype")
and values.dtype != original_dtypes[key]
):
try:
original_data[key] = values.astype(original_dtypes[key])
except:
# If conversion fails, log it but continue
print(
f"Warning: Could not convert {key} back to {original_dtypes[key]}"
)
original_data[key] = values
else:
original_data[key] = values

self.save_dataset(original_data)
self.save_dataset(
downsample_dataset_arrays(
original_data=original_data,
sim=sim,
dataset_name=self.name,
)
)

def _lock(self) -> FileLock:
return FileLock(f"{self.file_path}.lock", timeout=600)
Expand Down
103 changes: 103 additions & 0 deletions policyengine_us_data/tests/test_downsample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from types import SimpleNamespace

import numpy as np
import pytest

from policyengine_us_data.utils.downsample import downsample_dataset_arrays


class _FakeArrayResult:
def __init__(self, values):
self.values = values


class _FakeMicrosimulation:
def __init__(self, variable_entities, calculated_values):
self.tax_benefit_system = SimpleNamespace(
variables={
variable_name: SimpleNamespace(entity=SimpleNamespace(key=entity_key))
for variable_name, entity_key in variable_entities.items()
}
)
self._calculated_values = calculated_values

def calculate(self, variable_name):
return _FakeArrayResult(self._calculated_values[variable_name])


def test_downsample_dataset_arrays_preserves_original_dtypes():
original_data = {
"person_id": np.array([101, 102], dtype=np.int32),
"household_id": np.array([201], dtype=np.int32),
"employment_income": np.array([100.0, 200.0], dtype=np.float32),
}
sim = _FakeMicrosimulation(
variable_entities={
"person_id": "person",
"household_id": "household",
"employment_income": "person",
},
calculated_values={
"person_id": np.array([101], dtype=np.int64),
"household_id": np.array([201], dtype=np.int64),
"employment_income": np.array([150.0], dtype=np.float64),
},
)

resampled = downsample_dataset_arrays(
original_data=original_data,
sim=sim,
dataset_name="cps",
)

assert resampled["person_id"].dtype == np.int32
assert resampled["household_id"].dtype == np.int32
assert resampled["employment_income"].dtype == np.float32
np.testing.assert_array_equal(
resampled["employment_income"], np.array([150.0], dtype=np.float32)
)


def test_downsample_dataset_arrays_fails_closed_on_unknown_variables():
original_data = {
"person_id": np.array([101, 102], dtype=np.int32),
"hourly_wage": np.array([25.0, 30.0], dtype=np.float32),
}
sim = _FakeMicrosimulation(
variable_entities={"person_id": "person"},
calculated_values={"person_id": np.array([101], dtype=np.int64)},
)

with pytest.raises(ValueError, match="out of sync"):
downsample_dataset_arrays(
original_data=original_data,
sim=sim,
dataset_name="cps",
)


def test_downsample_dataset_arrays_rejects_entity_length_mismatches():
original_data = {
"person_id": np.array([101, 102], dtype=np.int32),
"household_id": np.array([201], dtype=np.int32),
"employment_income": np.array([100.0, 200.0], dtype=np.float32),
}
sim = _FakeMicrosimulation(
variable_entities={
"person_id": "person",
"household_id": "household",
"employment_income": "person",
},
calculated_values={
"person_id": np.array([101], dtype=np.int64),
"household_id": np.array([201], dtype=np.int64),
"employment_income": np.array([150.0, 250.0], dtype=np.float64),
},
)

with pytest.raises(ValueError, match="entity lengths are inconsistent"):
downsample_dataset_arrays(
original_data=original_data,
sim=sim,
dataset_name="cps",
)
Loading
Loading