diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index b95fa87744..cdd1e9b1c5 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -60,7 +60,8 @@ use dapi_grpc::platform::v0::{ get_recent_compacted_address_balance_changes_request, GetAddressesBranchStateRequest, GetRecentAddressBalanceChangesRequest, GetRecentCompactedAddressBalanceChangesRequest, Proof, }; -use dpp::balances::credits::{BlockAwareCreditOperation, CreditOperation}; +use dpp::address_funds::PlatformAddress; +use dpp::balances::credits::{BlockAwareCreditOperation, CreditOperation, Credits}; use dpp::prelude::AddressNonce; use dpp::version::PlatformVersion; use drive::drive::{Drive, RootTree}; @@ -72,7 +73,11 @@ use rs_dapi_client::{ DapiRequest, ExecutionError, ExecutionResponse, InnerInto, IntoInner, RequestSettings, }; use std::collections::HashMap; -use tracing::{debug, info, trace}; +use tracing::{debug, info, trace, warn}; + +/// One-shot warning threshold for the end-of-pass replay buffer +/// (`pending_unknown`). Cross-checked at the push site below. +const PENDING_UNKNOWN_WARN_THRESHOLD: usize = 1000; /// Server limit for compacted address balance changes per request. const COMPACTED_BATCH_LIMIT: usize = 25; @@ -457,9 +462,11 @@ async fn incremental_catch_up( result: &mut AddressSyncResult, settings: RequestSettings, ) -> Result<(), Error> { - // `key_to_tag` is already keyed by raw GroveDB bytes with - // `(tag, address)` values, so it can serve as the lookup directly. - let address_lookup = key_to_tag; + // Use the borrowed `key_to_tag` directly through the pass — only + // unknown-address replay (rare, end-of-pass) materializes any extra + // allocation. Buffered misses are bounded by the count of foreign / + // post-snapshot addresses in the response. + let mut pending_unknown: Vec = Vec::new(); let mut current_height = start_height; let mut observed_tip_height = start_height; @@ -614,39 +621,18 @@ async fn incremental_catch_up( result.metrics.compacted_entries_returned += entry_count; for entry in &entries { - for (platform_addr, credit_op) in &entry.changes { - let addr_bytes = platform_addr.to_bytes(); - if let Some(&(tag, address)) = address_lookup.get(&addr_bytes) { - let result_key = (tag, address); - let current_balance = result - .found - .get(&result_key) - .map(|f| f.balance) - .unwrap_or(0); - - let new_balance = match credit_op { - BlockAwareCreditOperation::SetCredits(credits) => *credits, - BlockAwareCreditOperation::AddToCreditsOperations(operations) => { - let total_to_add: u64 = operations - .iter() - .filter(|(height, _)| **height >= current_height) - .map(|(_, credits)| *credits) - .fold(0u64, |acc, c| acc.saturating_add(c)); - current_balance.saturating_add(total_to_add) - } - }; - - if new_balance != current_balance { - let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); - let funds = AddressFunds { - nonce, - balance: new_balance, - }; - result.found.insert(result_key, funds); - provider.on_address_found(tag, &address, funds).await; - } - } - } + apply_block_changes( + key_to_tag, + entry + .changes + .iter() + .map(|(a, op)| (a, BalanceOp::Compacted(op))), + current_height, + provider, + result, + &mut pending_unknown, + ) + .await; if entry.end_block_height.saturating_add(1) > current_height { current_height = entry.end_block_height.saturating_add(1); @@ -677,34 +663,18 @@ async fn incremental_catch_up( highest_recent_block = entry.block_height; } - for (platform_addr, credit_op) in &entry.changes { - let addr_bytes = platform_addr.to_bytes(); - if let Some(&(tag, address)) = address_lookup.get(&addr_bytes) { - let result_key = (tag, address); - let current_balance = result - .found - .get(&result_key) - .map(|f| f.balance) - .unwrap_or(0); - - let new_balance = match credit_op { - CreditOperation::SetCredits(credits) => *credits, - CreditOperation::AddToCredits(credits) => { - current_balance.saturating_add(*credits) - } - }; - - if new_balance != current_balance { - let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); - let funds = AddressFunds { - nonce, - balance: new_balance, - }; - result.found.insert(result_key, funds); - provider.on_address_found(tag, &address, funds).await; - } - } - } + apply_block_changes( + key_to_tag, + entry + .changes + .iter() + .map(|(a, op)| (a, BalanceOp::Recent(op))), + current_height, + provider, + result, + &mut pending_unknown, + ) + .await; if entry.block_height.saturating_add(1) > current_height { current_height = entry.block_height.saturating_add(1); @@ -712,6 +682,11 @@ async fn incremental_catch_up( } } + // Single end-of-pass recovery: foreign-wallet addresses fall out at + // the extras-intersection check, so no per-block refresh and no log + // flood on multi-wallet chains. + refresh_and_replay_unknown(key_to_tag, pending_unknown, provider, result).await; + result.new_sync_height = current_height.max(observed_tip_height); // Store the highest block from the recent entries so the next sync can // use RangeAfter(this_height) for compaction detection. @@ -723,6 +698,300 @@ async fn incremental_catch_up( Ok(()) } +// ── Address-balance change application ──────────────────────────────── + +/// A single borrowed address balance change, abstracting the recent +/// (`CreditOperation`) and compacted (`BlockAwareCreditOperation`) shapes so one +/// pure function can apply both phases identically. +#[derive(Clone, Copy)] +pub(crate) enum BalanceOp<'a> { + /// A recent (per-block) credit operation. + Recent(&'a CreditOperation), + /// A compacted (block-range) credit operation. + Compacted(&'a BlockAwareCreditOperation), +} + +/// Owned arity of [`BalanceOp`], used only to buffer a miss past the borrow of +/// its response entry so it can be replayed at end-of-pass. +#[derive(Clone)] +pub(crate) enum OwnedBalanceOp { + Recent(CreditOperation), + Compacted(BlockAwareCreditOperation), +} + +/// A buffered miss: raw GroveDB key, owned change, and the catch-up cursor +/// height at the original block (feeds the compacted height filter on replay; +/// ignored by `Recent`). +type PendingMiss = (Vec, OwnedBalanceOp, u64); + +/// Resolve the post-change balance from the current balance and the catch-up +/// cursor height. Compacted sums only operations at or after `current_height`; +/// recent applies a flat set/add. +fn apply_op(op: BalanceOp<'_>, current_balance: Credits, current_height: u64) -> Credits { + match op { + BalanceOp::Recent(op) => match op { + CreditOperation::SetCredits(credits) => *credits, + CreditOperation::AddToCredits(credits) => current_balance.saturating_add(*credits), + }, + BalanceOp::Compacted(op) => match op { + BlockAwareCreditOperation::SetCredits(credits) => *credits, + BlockAwareCreditOperation::AddToCreditsOperations(operations) => { + let total_to_add: u64 = operations + .iter() + .filter(|(height, _)| **height >= current_height) + .map(|(_, credits)| *credits) + .fold(0u64, |acc, c| acc.saturating_add(c)); + current_balance.saturating_add(total_to_add) + } + }, + } +} + +/// Borrow an owned op back into [`BalanceOp`] for replay. +fn borrow_op(op: &OwnedBalanceOp) -> BalanceOp<'_> { + match op { + OwnedBalanceOp::Recent(op) => BalanceOp::Recent(op), + OwnedBalanceOp::Compacted(op) => BalanceOp::Compacted(op), + } +} + +/// Apply one block's changes against the borrowed entry-time lookup, drive +/// `on_address_found` for every known address whose balance moved, and +/// append unknown-address changes to `pending_unknown` for a single +/// end-of-pass refresh + replay. The refresh is deliberately deferred so +/// foreign-wallet addresses on a shared chain do not trigger a per-block +/// provider poll. +async fn apply_block_changes<'a, P, I>( + address_lookup: &HashMap, (P::Tag, P::Address)>, + changes: I, + current_height: u64, + provider: &mut P, + result: &mut AddressSyncResult, + pending_unknown: &mut Vec, +) where + P: AddressProvider, + I: IntoIterator)>, +{ + let mut local_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); + + for (platform_addr, change) in changes { + let addr_bytes = platform_addr.to_bytes(); + if let Some(&(tag, address)) = address_lookup.get(&addr_bytes) { + let result_key = (tag, address); + let current_balance = result + .found + .get(&result_key) + .map(|f| f.balance) + .unwrap_or(0); + + let new_balance = apply_op(change, current_balance, current_height); + + if new_balance != current_balance { + // TODO: incremental RPCs carry only balance deltas, never + // nonces — addresses first seen here get nonce=0. Clients + // recover via `AddressInvalidNonceError.expected_nonce`; + // a proper fix would fetch authoritative `AddressFunds` + // or model `nonce` as `Option`. + let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); + let funds = AddressFunds { + nonce, + balance: new_balance, + }; + result.absent.remove(&result_key); + result.found.insert(result_key, funds); + local_applied.push((tag, address, funds)); + } + } else { + let owned = match change { + BalanceOp::Recent(op) => OwnedBalanceOp::Recent(*op), + BalanceOp::Compacted(op) => OwnedBalanceOp::Compacted(op.clone()), + }; + pending_unknown.push((addr_bytes, owned, current_height)); + // NOTE: this buffer is intentionally unbounded — premature optimization here + // would couple the catch-up loop to ad-hoc memory heuristics. We log a + // one-shot warning above a generous threshold so a future operator can + // observe whether this path actually exceeds 1000 buffered foreign-wallet + // changes in real workloads; if it does, the right fix is to follow the + // reviewer's mitigation (a) — store only Vec keys and re-derive replay + // changes after the refresh resolves them. See PR #3650 @thepastaclaw review. + if pending_unknown.len() == PENDING_UNKNOWN_WARN_THRESHOLD { + warn!( + "Address sync: pending_unknown buffer reached {} entries — \ + foreign-wallet balance changes are accumulating on a shared chain", + PENDING_UNKNOWN_WARN_THRESHOLD + ); + } + } + } + + for (tag, address, funds) in &local_applied { + provider.on_address_found(*tag, address, *funds).await; + } +} + +/// Maximum number of refresh+replay rounds. The loop iterates so a +/// `pending_addresses()` set that grows via `on_address_found`-triggered +/// gap extension can be picked up in the same pass; the cap guards +/// against any pathological livelock. +const REPLAY_REFRESH_MAX_ITERATIONS: usize = 3; + +/// End-of-pass recovery for addresses missing from the entry-time +/// snapshot. Re-polls `pending_addresses()`, builds a small `extras` map +/// of newly-derived addresses, and replays only the buffered changes +/// that match an `extras` entry. Foreign (other-wallet) addresses fall +/// out at the intersection check — no provider refresh storm, no log +/// flood. +/// +/// The refresh+replay is wrapped in a bounded loop so that +/// `on_address_found` callbacks fired during replay can trigger gap +/// extension on the provider and surface follow-on addresses +/// (e.g. address `A+1` that the provider only exposes after seeing `A` +/// was used). Iteration stops as soon as no new addresses are resolved +/// or the cap [`REPLAY_REFRESH_MAX_ITERATIONS`] is reached. +async fn refresh_and_replay_unknown( + key_to_tag: &HashMap, (P::Tag, P::Address)>, + pending_unknown: Vec, + provider: &mut P, + result: &mut AddressSyncResult, +) { + if pending_unknown.is_empty() { + return; + } + + // Build the set of unknown keys for a fast intersection probe. + let unknown_keys: std::collections::HashSet<&[u8]> = pending_unknown + .iter() + .map(|(key, _, _)| key.as_slice()) + .collect(); + + // Keys resolved across all iterations so we don't double-apply a + // delta if a follow-on iteration's `extras` still contains an + // already-replayed key. Owned bytes because the borrow checker won't + // let us keep `&[u8]` references into `pending_unknown` while we + // also borrow it for the inner loop. + let mut resolved_keys: std::collections::HashSet> = std::collections::HashSet::new(); + let mut total_replay_applied: usize = 0; + let mut hit_iteration_cap = false; + + for iteration in 0..REPLAY_REFRESH_MAX_ITERATIONS { + // Only addresses the provider can now produce AND that match a + // still-unresolved buffered miss are interesting — everything + // else is some other wallet's address and stays out of the + // lookup entirely. + let mut extras: HashMap, (P::Tag, P::Address)> = HashMap::new(); + for (tag, address) in provider.pending_addresses() { + let bytes = address.to_bytes(); + if unknown_keys.contains(bytes.as_slice()) + && !resolved_keys.contains(&bytes) + && !key_to_tag.contains_key(&bytes) + { + extras.insert(bytes, (tag, address)); + } + } + + if extras.is_empty() { + if iteration == 0 { + // Common case on a populated multi-wallet chain: every + // buffered unknown belongs to another wallet. + debug!( + "Address sync: {} platform-reported balance change(s) reference \ + address(es) not tracked by this wallet; ignoring", + pending_unknown.len() + ); + return; + } + // No new addresses surfaced this iteration — we're done. + break; + } + + // Replay only the entries whose key actually resolves in + // `extras` and hasn't been resolved in a prior iteration. Order + // is preserved (compacted first, then recent — same as the + // forward pass), so `AddToCredits` deltas accumulate correctly. + // The catch-up cursor per change is preserved so the compacted + // height filter still sees the same `current_height` it would + // have seen on the forward pass. + let mut iteration_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); + for (key, change, height) in &pending_unknown { + if resolved_keys.contains(key) { + continue; + } + let Some(&(tag, address)) = extras.get(key.as_slice()) else { + continue; + }; + let result_key = (tag, address); + let current_balance = result + .found + .get(&result_key) + .map(|f| f.balance) + .unwrap_or(0); + let new_balance = apply_op(borrow_op(change), current_balance, *height); + + if new_balance != current_balance { + // TODO: same synthesized nonce=0 gap as the forward pass. + let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); + let funds = AddressFunds { + nonce, + balance: new_balance, + }; + result.absent.remove(&result_key); + result.found.insert(result_key, funds); + iteration_applied.push((tag, address, funds)); + } + } + + // Mark every key whose entry resolved in `extras` as resolved + // this pass — even if no balance moved — so the next iteration + // doesn't reconsider it. + for (key, _, _) in &pending_unknown { + if extras.contains_key(key.as_slice()) { + resolved_keys.insert(key.clone()); + } + } + + let iteration_resolved = iteration_applied.len(); + total_replay_applied += iteration_resolved; + + // Fire callbacks for this iteration BEFORE the next refresh so + // that `on_address_found`-driven gap extension can expose the + // next batch of addresses to `pending_addresses()`. + for (tag, address, funds) in &iteration_applied { + provider.on_address_found(*tag, address, *funds).await; + } + + if iteration_resolved == 0 { + // `extras` was non-empty but every entry's delta was a + // no-op; nothing for gap extension to chew on. + break; + } + + if iteration + 1 == REPLAY_REFRESH_MAX_ITERATIONS { + hit_iteration_cap = true; + } + } + + if hit_iteration_cap { + debug!( + "Address sync: refresh+replay reached the {}-iteration cap; \ + any further gap-extension addresses will surface on the next sync", + REPLAY_REFRESH_MAX_ITERATIONS + ); + } + + let still_unknown = pending_unknown + .iter() + .filter(|(key, _, _)| !resolved_keys.contains(key)) + .count(); + if still_unknown > 0 { + debug!( + "Address sync: {} platform-reported balance change(s) reference \ + address(es) not tracked by this wallet (refresh recovered {} \ + other(s)); ignoring the untracked entries", + still_unknown, total_replay_applied + ); + } +} + /// Extract the highest block height from the recent tree boundaries in the proof. /// /// Returns: @@ -1381,4 +1650,539 @@ mod tests { "expected balance conversion error, got: {err:?}" ); } + + // ── End-of-pass refresh + replay regression guards ───────────────── + + use dpp::address_funds::PlatformAddress; + use dpp::balances::credits::BlockAwareCreditOperation; + + fn p2pkh(byte: u8) -> PlatformAddress { + PlatformAddress::P2pkh([byte; 20]) + } + + /// A provider that derives a fresh address mid-pass — so the + /// entry-time lookup misses it — gets the balance applied AND + /// `on_address_found` fired after the end-of-pass refresh. + #[tokio::test] + async fn apply_block_changes_recovers_post_snapshot_address() { + use async_trait::async_trait; + + struct GrowingProvider { + late: PlatformAddress, + found: Vec<(u32, PlatformAddress, AddressFunds)>, + } + + #[async_trait] + impl AddressProvider for GrowingProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + std::iter::once((7u32, self.late)) + } + + async fn on_address_found( + &mut self, + tag: Self::Tag, + address: &Self::Address, + funds: AddressFunds, + ) { + self.found.push((tag, *address, funds)); + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let late = p2pkh(0xCD); + + let lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + + let mut provider = GrowingProvider { + late, + found: Vec::new(), + }; + let mut result: AddressSyncResult = AddressSyncResult::new(); + let mut pending_unknown: Vec = Vec::new(); + + let op = BlockAwareCreditOperation::SetCredits(42_000); + let changes = [(&late, BalanceOp::Compacted(&op))]; + + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + &mut pending_unknown, + ) + .await; + + // Per-block apply must NOT touch the provider for unknowns — + // the refresh is deferred to end-of-pass. + assert!( + provider.found.is_empty(), + "no on_address_found before end-of-pass refresh" + ); + assert_eq!(pending_unknown.len(), 1, "miss is buffered for replay"); + + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + + assert_eq!( + result.found.get(&(7u32, late)).map(|f| f.balance), + Some(42_000), + "post-snapshot address balance must be applied after refresh" + ); + assert!( + provider + .found + .iter() + .any(|(t, a, f)| *t == 7 && *a == late && f.balance == 42_000), + "on_address_found must fire for the recovered post-snapshot address" + ); + } + + /// A known address proven absent by the tree scan but re-discovered + /// by an incremental change is moved into `found` and pruned from + /// `absent`, keeping the two sets disjoint. + #[tokio::test] + async fn apply_block_changes_keeps_found_and_absent_disjoint_on_catch_up() { + use async_trait::async_trait; + + struct NoopProvider; + + #[async_trait] + impl AddressProvider for NoopProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + std::iter::empty() + } + + async fn on_address_found( + &mut self, + _tag: Self::Tag, + _address: &Self::Address, + _funds: AddressFunds, + ) { + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let tag: u32 = 5; + let addr = p2pkh(0x99); + + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(addr.to_bytes(), (tag, addr)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + result.absent.insert((tag, addr)); + + let op = BlockAwareCreditOperation::SetCredits(7_777); + let changes = [(&addr, BalanceOp::Compacted(&op))]; + + let mut pending_unknown: Vec = Vec::new(); + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut NoopProvider, + &mut result, + &mut pending_unknown, + ) + .await; + + assert_eq!( + result.found.get(&(tag, addr)).map(|f| f.balance), + Some(7_777), + ); + assert!( + !result.absent.contains(&(tag, addr)), + "apply_block_changes must keep found/absent disjoint" + ); + assert!( + pending_unknown.is_empty(), + "no unknowns expected for a known address" + ); + } + + /// The end-of-pass refresh must not double-count a known address's + /// `AddToCredits` delta when it replays the unknown subset in the + /// same block (the replay must exclude already-applied addresses). + #[tokio::test] + async fn refresh_does_not_double_count_known_address_delta() { + use async_trait::async_trait; + + let known = p2pkh(0x11); + let late = p2pkh(0x22); + + struct GrowingProvider { + late: PlatformAddress, + } + + #[async_trait] + impl AddressProvider for GrowingProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + std::iter::once((9u32, self.late)) + } + + async fn on_address_found( + &mut self, + _tag: Self::Tag, + _address: &Self::Address, + _funds: AddressFunds, + ) { + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + // `known` is in the snapshot with a starting balance; `late` is + // not (post-snapshot) and forces the refresh + replay path. + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(known.to_bytes(), (3u32, known)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + result.found.insert( + (3u32, known), + AddressFunds { + nonce: 0, + balance: 1_000, + }, + ); + + let mut provider = GrowingProvider { late }; + + let known_op = BlockAwareCreditOperation::AddToCreditsOperations( + std::iter::once((0u64, 500u64)).collect(), + ); + let late_op = BlockAwareCreditOperation::SetCredits(7_000); + let changes = [ + (&known, BalanceOp::Compacted(&known_op)), + (&late, BalanceOp::Compacted(&late_op)), + ]; + + let mut pending_unknown: Vec = Vec::new(); + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + &mut pending_unknown, + ) + .await; + // Known address was applied immediately (no longer waits on the + // end-of-pass refresh). `late` is buffered for replay. + assert_eq!(pending_unknown.len(), 1); + + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + + // Known delta applied exactly once: 1000 + 500 (NOT 1000 + 500 + + // 500). The replay must skip the already-applied known address — + // here that is guaranteed structurally because the replay only + // walks the buffered misses, not the full change set. + assert_eq!( + result.found.get(&(3u32, known)).map(|f| f.balance), + Some(1_500), + "known AddToCredits delta must apply exactly once across refresh" + ); + assert_eq!( + result.found.get(&(9u32, late)).map(|f| f.balance), + Some(7_000), + "post-snapshot address still recovered after refresh" + ); + } + + /// A foreign address (not in the lookup, never produced by the + /// provider) is silently ignored — no `on_address_found`, no + /// `result.found` insert, no `result.absent` mutation, and exactly + /// one provider refresh for the whole pass. + #[tokio::test] + async fn apply_block_changes_ignores_foreign_address_without_refresh_storm() { + use async_trait::async_trait; + + struct CountingNoopProvider { + pending_polls: std::sync::atomic::AtomicUsize, + found_calls: usize, + } + + #[async_trait] + impl AddressProvider for CountingNoopProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + self.pending_polls + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + std::iter::empty() + } + + async fn on_address_found( + &mut self, + _tag: Self::Tag, + _address: &Self::Address, + _funds: AddressFunds, + ) { + self.found_calls += 1; + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let mine = p2pkh(0x01); + let foreign_1 = p2pkh(0xF1); + let foreign_2 = p2pkh(0xF2); + let foreign_3 = p2pkh(0xF3); + + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(mine.to_bytes(), (1u32, mine)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + let mut provider = CountingNoopProvider { + pending_polls: std::sync::atomic::AtomicUsize::new(0), + found_calls: 0, + }; + let mut pending_unknown: Vec = Vec::new(); + + // Three separate "blocks" (representing the per-entry calls + // inside `incremental_catch_up`), every change but the first + // belongs to another wallet. + for (addr, credits) in [ + (&mine, 1_000), + (&foreign_1, 5_000), + (&foreign_2, 5_000), + (&foreign_3, 5_000), + ] { + let op = BlockAwareCreditOperation::SetCredits(credits); + let changes = [(addr, BalanceOp::Compacted(&op))]; + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + &mut pending_unknown, + ) + .await; + } + + // Per-block apply must NEVER refresh the provider — the refresh + // runs once, at end of pass. + assert_eq!( + provider + .pending_polls + .load(std::sync::atomic::Ordering::Relaxed), + 0, + "no per-block pending_addresses() polls — refresh is end-of-pass only" + ); + + // The end-of-pass refresh runs exactly once. + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + assert_eq!( + provider + .pending_polls + .load(std::sync::atomic::Ordering::Relaxed), + 1, + "end-of-pass refresh must poll the provider exactly once" + ); + + // Foreign addresses must not surface as `found` or fire callbacks. + assert_eq!( + result.found.len(), + 1, + "only the known address is in `found` (foreign addresses ignored)" + ); + assert_eq!( + result.found.get(&(1u32, mine)).map(|f| f.balance), + Some(1_000), + "known address applied" + ); + assert!( + !result + .found + .keys() + .any(|(_, a)| *a == foreign_1 || *a == foreign_2 || *a == foreign_3), + "no foreign address may be inserted into `result.found`" + ); + assert!( + result.absent.is_empty(), + "foreign addresses must not be marked `absent` either" + ); + assert_eq!( + provider.found_calls, 1, + "on_address_found fires only for the known address" + ); + + // `found` and `absent` stay disjoint. + for key in result.found.keys() { + assert!( + !result.absent.contains(key), + "found ∩ absent must be empty: {key:?} in both" + ); + } + } + + /// Two post-snapshot addresses A and A+1 where the provider only + /// exposes A initially and extends its gap to include A+1 from + /// inside `on_address_found(A, ...)`. The bounded-iteration replay + /// must pick up A+1 in a follow-on iteration instead of leaving + /// its buffered change silently dropped until the next sync. + #[tokio::test] + async fn refresh_loops_until_gap_extension_recovers_follow_on_address() { + use async_trait::async_trait; + + struct GapExtendingProvider { + a: PlatformAddress, + b: PlatformAddress, + // false until `on_address_found(a, ...)` mutates it — then + // `pending_addresses()` returns both A and B. + extended: bool, + found: Vec<(u32, PlatformAddress, AddressFunds)>, + } + + #[async_trait] + impl AddressProvider for GapExtendingProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + // First call returns just A; once `on_address_found(A, …)` + // has flipped `extended`, subsequent calls also yield B. + // The recovery of B is what proves the loop ran more + // than once. + let initial = std::iter::once((10u32, self.a)); + let extended = self + .extended + .then(|| std::iter::once((11u32, self.b))) + .into_iter() + .flatten(); + initial.chain(extended) + } + + async fn on_address_found( + &mut self, + tag: Self::Tag, + address: &Self::Address, + funds: AddressFunds, + ) { + self.found.push((tag, *address, funds)); + // The hook that simulates HD-wallet gap extension: as + // soon as A is observed, expose A+1 as the next pending + // address. + if *address == self.a { + self.extended = true; + } + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let a = p2pkh(0xAA); + let b = p2pkh(0xBB); + + // Both A and B are post-snapshot — entry-time lookup is empty. + let lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + + let mut provider = GapExtendingProvider { + a, + b, + extended: false, + found: Vec::new(), + }; + let mut result: AddressSyncResult = AddressSyncResult::new(); + + // Buffer changes for both A and B as if `apply_block_changes` + // had already seen them and stashed them for end-of-pass replay. + let op_a = BlockAwareCreditOperation::SetCredits(1_111); + let op_b = BlockAwareCreditOperation::SetCredits(2_222); + let pending_unknown: Vec = vec![ + (a.to_bytes(), OwnedBalanceOp::Compacted(op_a), 0), + (b.to_bytes(), OwnedBalanceOp::Compacted(op_b), 0), + ]; + + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + + // A surfaced on iteration 0, then `on_address_found(A,...)` + // flipped `extended = true`, so iteration 1 sees B and applies + // its balance too. + assert_eq!( + result.found.get(&(10u32, a)).map(|f| f.balance), + Some(1_111), + "A must be recovered by the first iteration" + ); + assert_eq!( + result.found.get(&(11u32, b)).map(|f| f.balance), + Some(2_222), + "B must be recovered by the bounded-iteration follow-up" + ); + assert!( + provider + .found + .iter() + .any(|(t, addr, f)| *t == 10 && *addr == a && f.balance == 1_111), + "on_address_found must fire for A" + ); + assert!( + provider + .found + .iter() + .any(|(t, addr, f)| *t == 11 && *addr == b && f.balance == 2_222), + "on_address_found must fire for B in the follow-on iteration" + ); + } }