diff --git a/policyengine_us_data/calibration/publish_local_area.py b/policyengine_us_data/calibration/publish_local_area.py index b3e6085a9..a56380330 100644 --- a/policyengine_us_data/calibration/publish_local_area.py +++ b/policyengine_us_data/calibration/publish_local_area.py @@ -38,6 +38,7 @@ from policyengine_us_data.utils.takeup import ( SIMPLE_TAKEUP_VARS, apply_block_takeup_to_arrays, + any_person_flag_by_entity, ) CHECKPOINT_FILE = Path("completed_states.txt") @@ -551,6 +552,17 @@ def build_h5( } hh_state_fips = clone_geo["state_fips"].astype(np.int32) original_hh_ids = household_ids[active_hh].astype(np.int64) + reported_anchors = {} + if "has_marketplace_health_coverage_at_interview" in data: + reported_anchors["takes_up_aca_if_eligible"] = any_person_flag_by_entity( + data["person_tax_unit_id"][time_period], + data["tax_unit_id"][time_period], + data["has_marketplace_health_coverage_at_interview"][time_period], + ) + if "has_medicaid_health_coverage_at_interview" in data: + reported_anchors["takes_up_medicaid_if_eligible"] = data[ + "has_medicaid_health_coverage_at_interview" + ][time_period].astype(bool) takeup_results = apply_block_takeup_to_arrays( hh_blocks=active_blocks, @@ -561,6 +573,7 @@ def build_h5( entity_counts=entity_counts, time_period=time_period, takeup_filter=takeup_filter, + reported_anchors=reported_anchors, ) for var_name, bools in takeup_results.items(): data[var_name] = {time_period: bools} diff --git a/policyengine_us_data/calibration/unified_matrix_builder.py b/policyengine_us_data/calibration/unified_matrix_builder.py index 1917e1a14..eda95a583 100644 --- a/policyengine_us_data/calibration/unified_matrix_builder.py +++ b/policyengine_us_data/calibration/unified_matrix_builder.py @@ -14,6 +14,7 @@ from collections import defaultdict from typing import Dict, List, Optional, Tuple +import h5py import numpy as np import pandas as pd from scipy import sparse @@ -668,6 +669,7 @@ def _process_single_clone( entity_hh_idx_map = sd.get("entity_hh_idx_map", {}) entity_to_person_idx = sd.get("entity_to_person_idx", {}) precomputed_rates = sd.get("precomputed_rates", {}) + reported_takeup_anchors = sd.get("reported_takeup_anchors", {}) # Slice geography for this clone clone_states = geo_states[col_start:col_end] @@ -789,6 +791,7 @@ def _process_single_clone( ent_blocks, ent_hh_ids, ent_ci, + reported_mask=reported_takeup_anchors.get(takeup_var), ) ent_values = (ent_eligible * ent_takeup).astype(np.float32) @@ -2107,6 +2110,7 @@ def build_matrix( from policyengine_us_data.utils.takeup import ( TAKEUP_AFFECTED_TARGETS, compute_block_takeup_for_entities, + any_person_flag_by_entity, ) from policyengine_us_data.parameters import ( load_take_up_rate, @@ -2135,6 +2139,33 @@ def build_matrix( "person": person_hh_indices, } + reported_takeup_anchors = {} + with h5py.File(self.dataset_path, "r") as f: + period_key = str(self.time_period) + if ( + "has_marketplace_health_coverage_at_interview" in f + and period_key in f["has_marketplace_health_coverage_at_interview"] + ): + person_marketplace = f[ + "has_marketplace_health_coverage_at_interview" + ][period_key][...].astype(bool) + person_tax_unit_ids = f["person_tax_unit_id"][period_key][...] + tax_unit_ids = f["tax_unit_id"][period_key][...] + reported_takeup_anchors["takes_up_aca_if_eligible"] = ( + any_person_flag_by_entity( + person_tax_unit_ids, + tax_unit_ids, + person_marketplace, + ) + ) + if ( + "has_medicaid_health_coverage_at_interview" in f + and period_key in f["has_medicaid_health_coverage_at_interview"] + ): + reported_takeup_anchors["takes_up_medicaid_if_eligible"] = f[ + "has_medicaid_health_coverage_at_interview" + ][period_key][...].astype(bool) + entity_to_person_idx = {} for entity_level in ("spm_unit", "tax_unit"): ent_ids = sim.calculate( @@ -2175,6 +2206,7 @@ def build_matrix( self.household_ids = household_ids self.precomputed_rates = precomputed_rates self.affected_target_info = affected_target_info + self.reported_takeup_anchors = reported_takeup_anchors # 5d. Clone loop from pathlib import Path @@ -2224,6 +2256,7 @@ def build_matrix( shared_data["entity_hh_idx_map"] = entity_hh_idx_map shared_data["entity_to_person_idx"] = entity_to_person_idx shared_data["precomputed_rates"] = precomputed_rates + shared_data["reported_takeup_anchors"] = reported_takeup_anchors logger.info( "Starting parallel clone processing: %d clones, %d workers", @@ -2427,6 +2460,7 @@ def build_matrix( ent_blocks, ent_hh_ids, ent_ci, + reported_mask=reported_takeup_anchors.get(takeup_var), ) ent_values = (ent_eligible * ent_takeup).astype(np.float32) diff --git a/policyengine_us_data/datasets/cps/census_cps.py b/policyengine_us_data/datasets/cps/census_cps.py index 042fefe56..fb97b651b 100644 --- a/policyengine_us_data/datasets/cps/census_cps.py +++ b/policyengine_us_data/datasets/cps/census_cps.py @@ -236,7 +236,24 @@ class CensusCPS_2018(CensusCPS): "A_AGE", "A_SEX", "PEDISEYE", + "NOW_COV", + "NOW_DIR", "NOW_MRK", + "NOW_MRKS", + "NOW_MRKUN", + "NOW_NONM", + "NOW_PRIV", + "NOW_PUB", + "NOW_GRP", + "NOW_CAID", + "NOW_MCAID", + "NOW_PCHIP", + "NOW_OTHMT", + "NOW_MCARE", + "NOW_MIL", + "NOW_CHAMPVA", + "NOW_VACARE", + "NOW_IHSFLG", "WSAL_VAL", "INT_VAL", "SEMP_VAL", @@ -294,7 +311,6 @@ class CensusCPS_2018(CensusCPS): "PMED_VAL", "PEMCPREM", "PRCITSHP", - "NOW_GRP", "POCCU2", "PEINUSYR", "MCARE", diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 8d1189aa2..068775323 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -26,6 +26,58 @@ ) from policyengine_us_data.utils.downsample import downsample_dataset_arrays from policyengine_us_data.utils.randomness import seeded_rng +from policyengine_us_data.utils.takeup import ( + any_person_flag_by_entity, + assign_takeup_with_reported_anchors, +) + + +CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP = { + "reported_has_direct_purchase_health_coverage_at_interview": "NOW_DIR", + "reported_has_marketplace_health_coverage_at_interview": "NOW_MRK", + "reported_has_subsidized_marketplace_health_coverage_at_interview": "NOW_MRKS", + "reported_has_unsubsidized_marketplace_health_coverage_at_interview": "NOW_MRKUN", + "reported_has_non_marketplace_direct_purchase_health_coverage_at_interview": ( + "NOW_NONM" + ), + "reported_has_employer_sponsored_health_coverage_at_interview": "NOW_GRP", + "reported_has_medicare_health_coverage_at_interview": "NOW_MCARE", + "reported_has_medicaid_health_coverage_at_interview": "NOW_CAID", + "reported_has_means_tested_health_coverage_at_interview": "NOW_MCAID", + "reported_has_chip_health_coverage_at_interview": "NOW_PCHIP", + "reported_has_other_means_tested_health_coverage_at_interview": "NOW_OTHMT", + "reported_has_tricare_health_coverage_at_interview": "NOW_MIL", + "reported_has_champva_health_coverage_at_interview": "NOW_CHAMPVA", + "reported_has_va_health_coverage_at_interview": "NOW_VACARE", + "reported_has_indian_health_service_coverage_at_interview": "NOW_IHSFLG", +} + +CURRENT_HEALTH_COVERAGE_RULE_INPUT_ALIAS_MAP = { + "has_marketplace_health_coverage_at_interview": ( + "reported_has_marketplace_health_coverage_at_interview" + ), + "has_non_marketplace_direct_purchase_health_coverage_at_interview": ( + "reported_has_non_marketplace_direct_purchase_health_coverage_at_interview" + ), + "has_medicaid_health_coverage_at_interview": ( + "reported_has_medicaid_health_coverage_at_interview" + ), + "has_other_means_tested_health_coverage_at_interview": ( + "reported_has_other_means_tested_health_coverage_at_interview" + ), + "has_tricare_health_coverage_at_interview": ( + "reported_has_tricare_health_coverage_at_interview" + ), + "has_champva_health_coverage_at_interview": ( + "reported_has_champva_health_coverage_at_interview" + ), + "has_va_health_coverage_at_interview": ( + "reported_has_va_health_coverage_at_interview" + ), + "has_indian_health_service_coverage_at_interview": ( + "reported_has_indian_health_service_coverage_at_interview" + ), +} class CPS(Dataset): @@ -231,7 +283,16 @@ def add_takeup(self): # ACA rng = seeded_rng("takes_up_aca_if_eligible") - data["takes_up_aca_if_eligible"] = rng.random(n_tax_units) < aca_rate + reported_marketplace_by_tax_unit = any_person_flag_by_entity( + data["person_tax_unit_id"], + data["tax_unit_id"], + data["has_marketplace_health_coverage_at_interview"], + ) + data["takes_up_aca_if_eligible"] = assign_takeup_with_reported_anchors( + rng.random(n_tax_units), + aca_rate, + reported_mask=reported_marketplace_by_tax_unit, + ) # Medicaid: state-specific rates state_codes = baseline.calculate("state_code_str").values @@ -243,8 +304,11 @@ def add_takeup(self): [medicaid_rates_by_state.get(s, 0.93) for s in person_states] ) rng = seeded_rng("takes_up_medicaid_if_eligible") - data["takes_up_medicaid_if_eligible"] = ( - rng.random(n_persons) < medicaid_rate_by_person + data["takes_up_medicaid_if_eligible"] = assign_takeup_with_reported_anchors( + rng.random(n_persons), + medicaid_rate_by_person, + reported_mask=data["has_medicaid_health_coverage_at_interview"], + group_keys=person_states, ) # Head Start @@ -453,9 +517,44 @@ def children_per_parent(col: str) -> pd.DataFrame: ) cps["own_children_in_household"] = tmp.children.fillna(0) - cps["has_marketplace_health_coverage"] = person.NOW_MRK == 1 + for variable, cps_column in CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP.items(): + cps[variable] = person[cps_column] == 1 + + for ( + variable, + reported_variable, + ) in CURRENT_HEALTH_COVERAGE_RULE_INPUT_ALIAS_MAP.items(): + cps[variable] = cps[reported_variable] - cps["has_esi"] = person.NOW_GRP == 1 + cps["reported_has_private_health_coverage_at_interview"] = person.NOW_PRIV == 1 + cps["reported_has_public_health_coverage_at_interview"] = person.NOW_PUB == 1 + cps["reported_is_insured_at_interview"] = person.NOW_COV == 1 + cps["reported_is_uninsured_at_interview"] = person.NOW_COV != 1 + + coverage_families = np.column_stack( + [ + cps["reported_has_employer_sponsored_health_coverage_at_interview"], + cps["reported_has_marketplace_health_coverage_at_interview"], + cps[ + "reported_has_non_marketplace_direct_purchase_health_coverage_at_interview" + ], + cps["reported_has_medicare_health_coverage_at_interview"], + cps["reported_has_means_tested_health_coverage_at_interview"], + cps["reported_has_tricare_health_coverage_at_interview"], + cps["reported_has_champva_health_coverage_at_interview"], + cps["reported_has_va_health_coverage_at_interview"], + cps["reported_has_indian_health_service_coverage_at_interview"], + ] + ) + cps["reported_has_multiple_health_coverage_at_interview"] = ( + coverage_families.sum(axis=1) > 1 + ) + + # Legacy aliases retained for compatibility until rules-side names catch up. + cps["has_marketplace_health_coverage"] = cps[ + "reported_has_marketplace_health_coverage_at_interview" + ] + cps["has_esi"] = cps["reported_has_employer_sponsored_health_coverage_at_interview"] cps["cps_race"] = person.PRDTRACE cps["is_hispanic"] = person.PRDTHSP != 0 diff --git a/policyengine_us_data/utils/takeup.py b/policyengine_us_data/utils/takeup.py index b8db8c90a..0561e6f59 100644 --- a/policyengine_us_data/utils/takeup.py +++ b/policyengine_us_data/utils/takeup.py @@ -140,6 +140,90 @@ } +def any_person_flag_by_entity( + person_entity_ids: np.ndarray, + entity_ids: np.ndarray, + person_mask: np.ndarray, +) -> np.ndarray: + """Aggregate a person-level boolean to any-covered at entity level.""" + person_entity_ids = np.asarray(person_entity_ids) + entity_ids = np.asarray(entity_ids) + person_mask = np.asarray(person_mask, dtype=bool) + if len(person_entity_ids) != len(person_mask): + raise ValueError("person_entity_ids and person_mask must align") + if not person_mask.any(): + return np.zeros(len(entity_ids), dtype=bool) + flagged_ids = np.unique(person_entity_ids[person_mask]) + return np.isin(entity_ids, flagged_ids) + + +def assign_takeup_with_reported_anchors( + draws: np.ndarray, + rates, + reported_mask: Optional[np.ndarray] = None, + group_keys: Optional[np.ndarray] = None, +) -> np.ndarray: + """Apply the SSI/SNAP-style reported-first takeup pattern. + + Reported recipients are always assigned takeup=True. Remaining + non-reporters are filled probabilistically to reach the target count + implied by the rate, either globally or within each ``group_keys`` + group. + """ + draws = np.asarray(draws, dtype=np.float64) + if np.isscalar(rates): + rates_arr = np.full(len(draws), float(rates), dtype=np.float64) + else: + rates_arr = np.asarray(rates, dtype=np.float64) + if len(rates_arr) != len(draws): + raise ValueError("rates and draws must align") + + baseline = draws < rates_arr + if reported_mask is None: + return baseline + + reported_mask = np.asarray(reported_mask, dtype=bool) + if len(reported_mask) != len(draws): + raise ValueError("reported_mask and draws must align") + + result = reported_mask.copy() + + if group_keys is None: + unique_rates = np.unique(rates_arr) + if len(unique_rates) != 1: + raise ValueError("group_keys required when rates vary by entity") + target_count = int(unique_rates[0] * len(draws)) + non_reporters = ~reported_mask + remaining_needed = max(0, target_count - int(reported_mask.sum())) + adjusted_rate = ( + remaining_needed / int(non_reporters.sum()) if non_reporters.any() else 0 + ) + result |= non_reporters & (draws < adjusted_rate) + return result + + group_keys = np.asarray(group_keys) + if len(group_keys) != len(draws): + raise ValueError("group_keys and draws must align") + + for key in np.unique(group_keys): + group_mask = group_keys == key + group_rates = np.unique(rates_arr[group_mask]) + if len(group_rates) != 1: + raise ValueError("Each takeup group must have a single rate") + target_count = int(group_rates[0] * int(group_mask.sum())) + group_reported = reported_mask[group_mask] + remaining_needed = max(0, target_count - int(group_reported.sum())) + group_non_reporters = group_mask & ~reported_mask + adjusted_rate = ( + remaining_needed / int(group_non_reporters.sum()) + if group_non_reporters.any() + else 0 + ) + result[group_non_reporters] = draws[group_non_reporters] < adjusted_rate + + return result + + def _resolve_rate( rate_or_dict, state_fips: int, @@ -160,6 +244,7 @@ def compute_block_takeup_for_entities( entity_blocks: np.ndarray, entity_hh_ids: np.ndarray, entity_clone_indices: np.ndarray, + reported_mask: Optional[np.ndarray] = None, ) -> np.ndarray: """Compute boolean takeup via clone-seeded draws. @@ -178,6 +263,9 @@ def compute_block_takeup_for_entities( matrix builder (single clone), a scalar broadcast via np.full. For the H5 builder (all clones), a per-entity array. + reported_mask: Optional boolean mask forcing takeup=True for + reported recipients before filling remaining takeup to the + target rate. Returns: Boolean array of shape (n_entities,). @@ -185,6 +273,7 @@ def compute_block_takeup_for_entities( n = len(entity_blocks) draws = np.zeros(n, dtype=np.float64) rates = np.ones(n, dtype=np.float64) + state_fips = np.zeros(n, dtype=np.int32) # Resolve rates from block state FIPS for block in np.unique(entity_blocks): @@ -192,7 +281,9 @@ def compute_block_takeup_for_entities( continue blk_mask = entity_blocks == block sf = int(str(block)[:2]) - rates[blk_mask] = _resolve_rate(rate_or_dict, sf) + rate = _resolve_rate(rate_or_dict, sf) + rates[blk_mask] = rate + state_fips[blk_mask] = sf # Draw per (hh_id, clone_idx) pair for hh_id in np.unique(entity_hh_ids): @@ -203,7 +294,13 @@ def compute_block_takeup_for_entities( rng = seeded_rng(var_name, salt=f"{int(hh_id)}:{int(ci)}") draws[ci_mask] = rng.random(n_ent) - return draws < rates + group_keys = state_fips if isinstance(rate_or_dict, dict) else None + return assign_takeup_with_reported_anchors( + draws, + rates, + reported_mask=reported_mask, + group_keys=group_keys, + ) def apply_block_takeup_to_arrays( @@ -216,6 +313,7 @@ def apply_block_takeup_to_arrays( time_period: int, takeup_filter: List[str] = None, precomputed_rates: Optional[Dict[str, Any]] = None, + reported_anchors: Optional[Dict[str, np.ndarray]] = None, ) -> Dict[str, np.ndarray]: """Compute takeup draws from raw arrays. @@ -247,6 +345,7 @@ def apply_block_takeup_to_arrays( """ filter_set = set(takeup_filter) if takeup_filter is not None else None result = {} + reported_anchors = reported_anchors or {} for spec in SIMPLE_TAKEUP_VARS: var_name = spec["variable"] @@ -267,12 +366,16 @@ def apply_block_takeup_to_arrays( rate_or_dict = precomputed_rates[rate_key] else: rate_or_dict = load_take_up_rate(rate_key, time_period) + reported_mask = reported_anchors.get(var_name) + if reported_mask is not None and len(reported_mask) != n_ent: + raise ValueError(f"reported anchor for {var_name} has wrong length") bools = compute_block_takeup_for_entities( var_name, rate_or_dict, ent_blocks, ent_hh_ids, ent_clone_indices, + reported_mask=reported_mask, ) result[var_name] = bools diff --git a/tests/integration/test_cps.py b/tests/integration/test_cps.py index f0ecc9afd..55eb6464b 100644 --- a/tests/integration/test_cps.py +++ b/tests/integration/test_cps.py @@ -2,6 +2,7 @@ import numpy as np import pytest +import pandas as pd @pytest.fixture(scope="module") @@ -12,6 +13,86 @@ def cps_sim(): return Microsimulation(dataset=CPS_2024) +def test_add_personal_variables_maps_current_health_coverage_flags(): + from policyengine_us_data.datasets.cps.cps import add_personal_variables + + person = pd.DataFrame( + { + "A_AGE": [30, 45, 28], + "A_SEX": [2, 1, 2], + "PEDISEYE": [0, 1, 0], + "PEDISDRS": [0, 0, 0], + "PEDISEAR": [0, 0, 0], + "PEDISOUT": [0, 0, 0], + "PEDISPHY": [0, 0, 0], + "PEDISREM": [0, 0, 0], + "PEPAR1": [0, 0, 0], + "PEPAR2": [0, 0, 0], + "PH_SEQ": [1, 1, 2], + "A_LINENO": [1, 2, 1], + "NOW_COV": [1, 1, 0], + "NOW_DIR": [1, 0, 0], + "NOW_MRK": [1, 0, 0], + "NOW_MRKS": [1, 0, 0], + "NOW_MRKUN": [0, 0, 0], + "NOW_NONM": [0, 0, 0], + "NOW_PRIV": [1, 0, 0], + "NOW_PUB": [0, 1, 0], + "NOW_GRP": [0, 1, 0], + "NOW_CAID": [0, 0, 0], + "NOW_MCAID": [0, 1, 0], + "NOW_PCHIP": [0, 0, 0], + "NOW_OTHMT": [0, 1, 0], + "NOW_MCARE": [0, 0, 0], + "NOW_MIL": [0, 0, 0], + "NOW_CHAMPVA": [0, 0, 0], + "NOW_VACARE": [0, 0, 0], + "NOW_IHSFLG": [0, 0, 0], + "PRDTRACE": [1, 2, 3], + "PRDTHSP": [0, 1, 0], + "A_MARITL": [1, 4, 1], + "A_HSCOL": [0, 2, 0], + "POCCU2": [39, 52, 29], + } + ) + cps = {} + + add_personal_variables(cps, person) + + np.testing.assert_array_equal( + cps["reported_has_marketplace_health_coverage_at_interview"], + [True, False, False], + ) + np.testing.assert_array_equal( + cps["has_marketplace_health_coverage_at_interview"], + [True, False, False], + ) + np.testing.assert_array_equal( + cps["has_other_means_tested_health_coverage_at_interview"], + [False, True, False], + ) + np.testing.assert_array_equal( + cps["has_medicaid_health_coverage_at_interview"], + [False, False, False], + ) + np.testing.assert_array_equal( + cps["reported_has_means_tested_health_coverage_at_interview"], + [False, True, False], + ) + np.testing.assert_array_equal( + cps["reported_is_uninsured_at_interview"], + [False, False, True], + ) + np.testing.assert_array_equal( + cps["reported_has_multiple_health_coverage_at_interview"], + [False, True, False], + ) + np.testing.assert_array_equal( + cps["has_marketplace_health_coverage"], [True, False, False] + ) + np.testing.assert_array_equal(cps["has_esi"], [False, True, False]) + + # ── Sanity checks ───────────────────────────────────────────── diff --git a/tests/unit/calibration/test_unified_calibration.py b/tests/unit/calibration/test_unified_calibration.py index abb8f0a40..2b70e1f04 100644 --- a/tests/unit/calibration/test_unified_calibration.py +++ b/tests/unit/calibration/test_unified_calibration.py @@ -195,6 +195,22 @@ def test_different_blocks_different_result(self): differs = any(not np.array_equal(r1[v], r2[v]) for v in r1) assert differs + def test_reported_anchors_feed_through_for_aca(self): + args = self._make_arrays(4, 1, 1, 1) + result = apply_block_takeup_to_arrays( + *args, + time_period=2024, + takeup_filter=["takes_up_aca_if_eligible"], + precomputed_rates={"aca": 0.25}, + reported_anchors={ + "takes_up_aca_if_eligible": np.array([True, False, False, False]) + }, + ) + np.testing.assert_array_equal( + result["takes_up_aca_if_eligible"], + [True, False, False, False], + ) + class TestResolveRate: """Verify _resolve_rate handles scalar and dict rates.""" diff --git a/tests/unit/test_stochastic_variables.py b/tests/unit/test_stochastic_variables.py index 1f2602c29..3ec3e47ab 100644 --- a/tests/unit/test_stochastic_variables.py +++ b/tests/unit/test_stochastic_variables.py @@ -2,6 +2,10 @@ import numpy as np from policyengine_us_data.parameters import load_take_up_rate +from policyengine_us_data.utils.takeup import ( + any_person_flag_by_entity, + assign_takeup_with_reported_anchors, +) from policyengine_us_data.utils.randomness import ( _stable_string_hash, seeded_rng, @@ -147,3 +151,39 @@ def test_state_specific_medicaid_proportions(self): for state, expected_rate in [("UT", 0.53), ("CO", 0.99)]: take_up = draws[:10_000] < expected_rate assert abs(take_up.mean() - expected_rate) < 0.05 + + +class TestReportedTakeupAnchors: + def test_global_anchor_preserves_reported_and_fills_remaining(self): + draws = np.array([0.9, 0.2, 0.6, 0.9]) + reported = np.array([True, False, False, False]) + result = assign_takeup_with_reported_anchors( + draws, + 0.5, + reported_mask=reported, + ) + np.testing.assert_array_equal(result, [True, True, False, False]) + + def test_grouped_anchor_applies_within_each_group(self): + draws = np.array([0.9, 0.2, 0.1, 0.9]) + rates = np.array([0.5, 0.5, 0.5, 0.5]) + reported = np.array([True, False, False, False]) + groups = np.array(["A", "A", "B", "B"]) + result = assign_takeup_with_reported_anchors( + draws, + rates, + reported_mask=reported, + group_keys=groups, + ) + np.testing.assert_array_equal(result, [True, False, True, False]) + + def test_any_person_flag_by_entity_aggregates_correctly(self): + person_tax_unit_ids = np.array([10, 10, 20, 30]) + tax_unit_ids = np.array([10, 20, 30]) + person_marketplace = np.array([False, True, False, True]) + result = any_person_flag_by_entity( + person_tax_unit_ids, + tax_unit_ids, + person_marketplace, + ) + np.testing.assert_array_equal(result, [True, False, True])