diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index cdd1e9b1c5..88d134791d 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -836,11 +836,11 @@ async fn apply_block_changes<'a, P, I>( const REPLAY_REFRESH_MAX_ITERATIONS: usize = 3; /// End-of-pass recovery for addresses missing from the entry-time -/// snapshot. Re-polls `pending_addresses()`, builds a small `extras` map -/// of newly-derived addresses, and replays only the buffered changes -/// that match an `extras` entry. Foreign (other-wallet) addresses fall -/// out at the intersection check — no provider refresh storm, no log -/// flood. +/// snapshot. Re-polls `pending_addresses()` until a pass makes no +/// progress, each time building a small `extras` map of newly-derived +/// addresses and replaying only the buffered changes that now resolve. +/// Foreign (other-wallet) addresses fall out cheaply at the intersection +/// check — no provider refresh storm, no log flood. /// /// The refresh+replay is wrapped in a bounded loop so that /// `on_address_found` callbacks fired during replay can trigger gap @@ -858,65 +858,52 @@ async fn refresh_and_replay_unknown( return; } - // Build the set of unknown keys for a fast intersection probe. - let unknown_keys: std::collections::HashSet<&[u8]> = pending_unknown - .iter() - .map(|(key, _, _)| key.as_slice()) - .collect(); - - // Keys resolved across all iterations so we don't double-apply a - // delta if a follow-on iteration's `extras` still contains an - // already-replayed key. Owned bytes because the borrow checker won't - // let us keep `&[u8]` references into `pending_unknown` while we - // also borrow it for the inner loop. - let mut resolved_keys: std::collections::HashSet> = std::collections::HashSet::new(); - let mut total_replay_applied: usize = 0; + let initial_pending_count = pending_unknown.len(); + let mut remaining = pending_unknown; + let mut recovered = 0usize; let mut hit_iteration_cap = false; for iteration in 0..REPLAY_REFRESH_MAX_ITERATIONS { + // Build the set of still-unknown keys for a fast intersection + // probe. + let unknown_keys: std::collections::HashSet<&[u8]> = + remaining.iter().map(|(key, _, _)| key.as_slice()).collect(); + // Only addresses the provider can now produce AND that match a - // still-unresolved buffered miss are interesting — everything - // else is some other wallet's address and stays out of the - // lookup entirely. + // buffered miss are interesting — everything else is some other + // wallet's address and stays out of the lookup entirely. let mut extras: HashMap, (P::Tag, P::Address)> = HashMap::new(); for (tag, address) in provider.pending_addresses() { let bytes = address.to_bytes(); - if unknown_keys.contains(bytes.as_slice()) - && !resolved_keys.contains(&bytes) - && !key_to_tag.contains_key(&bytes) - { + if unknown_keys.contains(bytes.as_slice()) && !key_to_tag.contains_key(&bytes) { extras.insert(bytes, (tag, address)); } } if extras.is_empty() { if iteration == 0 { - // Common case on a populated multi-wallet chain: every - // buffered unknown belongs to another wallet. debug!( "Address sync: {} platform-reported balance change(s) reference \ address(es) not tracked by this wallet; ignoring", - pending_unknown.len() + initial_pending_count ); return; } - // No new addresses surfaced this iteration — we're done. break; } // Replay only the entries whose key actually resolves in - // `extras` and hasn't been resolved in a prior iteration. Order - // is preserved (compacted first, then recent — same as the - // forward pass), so `AddToCredits` deltas accumulate correctly. - // The catch-up cursor per change is preserved so the compacted - // height filter still sees the same `current_height` it would - // have seen on the forward pass. - let mut iteration_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); - for (key, change, height) in &pending_unknown { - if resolved_keys.contains(key) { - continue; - } + // `extras`. Order is preserved (compacted first, then recent — + // same as the forward pass), so `AddToCredits` deltas accumulate + // correctly. The catch-up cursor per change is preserved so the + // compacted height filter still sees the same `current_height` + // it would have seen on the forward pass. + let pass_len = remaining.len(); + let mut replay_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); + let mut still_pending: Vec = Vec::with_capacity(pass_len); + for (key, change, height) in remaining.into_iter() { let Some(&(tag, address)) = extras.get(key.as_slice()) else { + still_pending.push((key, change, height)); continue; }; let result_key = (tag, address); @@ -925,7 +912,7 @@ async fn refresh_and_replay_unknown( .get(&result_key) .map(|f| f.balance) .unwrap_or(0); - let new_balance = apply_op(borrow_op(change), current_balance, *height); + let new_balance = apply_op(borrow_op(&change), current_balance, height); if new_balance != current_balance { // TODO: same synthesized nonce=0 gap as the forward pass. @@ -936,35 +923,28 @@ async fn refresh_and_replay_unknown( }; result.absent.remove(&result_key); result.found.insert(result_key, funds); - iteration_applied.push((tag, address, funds)); + replay_applied.push((tag, address, funds)); } } - // Mark every key whose entry resolved in `extras` as resolved - // this pass — even if no balance moved — so the next iteration - // doesn't reconsider it. - for (key, _, _) in &pending_unknown { - if extras.contains_key(key.as_slice()) { - resolved_keys.insert(key.clone()); - } - } - - let iteration_resolved = iteration_applied.len(); - total_replay_applied += iteration_resolved; + let resolved_this_pass = pass_len - still_pending.len(); + recovered += resolved_this_pass; - // Fire callbacks for this iteration BEFORE the next refresh so - // that `on_address_found`-driven gap extension can expose the - // next batch of addresses to `pending_addresses()`. - for (tag, address, funds) in &iteration_applied { + for (tag, address, funds) in &replay_applied { provider.on_address_found(*tag, address, *funds).await; } - if iteration_resolved == 0 { - // `extras` was non-empty but every entry's delta was a - // no-op; nothing for gap extension to chew on. + if resolved_this_pass == 0 { + remaining = still_pending; break; } + if still_pending.is_empty() { + return; + } + + remaining = still_pending; + if iteration + 1 == REPLAY_REFRESH_MAX_ITERATIONS { hit_iteration_cap = true; } @@ -978,16 +958,13 @@ async fn refresh_and_replay_unknown( ); } - let still_unknown = pending_unknown - .iter() - .filter(|(key, _, _)| !resolved_keys.contains(key)) - .count(); - if still_unknown > 0 { + if !remaining.is_empty() { debug!( "Address sync: {} platform-reported balance change(s) reference \ address(es) not tracked by this wallet (refresh recovered {} \ other(s)); ignoring the untracked entries", - still_unknown, total_replay_applied + remaining.len(), + recovered ); } } @@ -1928,6 +1905,143 @@ mod tests { ); } + /// If replaying one recovered post-snapshot address extends the + /// provider gap window to expose the next pending address, the same + /// end-of-pass refresh must keep iterating until both buffered + /// changes are applied in order. + #[tokio::test] + async fn refresh_replays_chained_post_snapshot_addresses() { + use async_trait::async_trait; + + struct ChainedProvider { + first: PlatformAddress, + second: PlatformAddress, + visible: usize, + pending_polls: std::cell::Cell, + found: Vec<(u32, PlatformAddress, AddressFunds)>, + } + + #[async_trait] + impl AddressProvider for ChainedProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + self.pending_polls.set( + self.pending_polls + .get() + .checked_add(1) + .expect("test poll counter should not overflow"), + ); + let mut pending = Vec::with_capacity(self.visible); + if self.visible >= 1 { + pending.push((11u32, self.first)); + } + if self.visible >= 2 { + pending.push((12u32, self.second)); + } + pending.into_iter() + } + + async fn on_address_found( + &mut self, + tag: Self::Tag, + address: &Self::Address, + funds: AddressFunds, + ) { + self.found.push((tag, *address, funds)); + if *address == self.first && self.visible == 1 { + self.visible = 2; + } + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let first = p2pkh(0x31); + let second = p2pkh(0x32); + let lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + + let mut provider = ChainedProvider { + first, + second, + visible: 1, + pending_polls: std::cell::Cell::new(0), + found: Vec::new(), + }; + let mut result: AddressSyncResult = AddressSyncResult::new(); + let mut pending_unknown: Vec = Vec::new(); + + let first_op = BlockAwareCreditOperation::SetCredits(10_000); + let second_op = BlockAwareCreditOperation::SetCredits(20_000); + let changes = [ + (&first, BalanceOp::Compacted(&first_op)), + (&second, BalanceOp::Compacted(&second_op)), + ]; + + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + &mut pending_unknown, + ) + .await; + + assert_eq!(pending_unknown.len(), 2, "both misses must be buffered"); + + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + + assert_eq!( + result.found.get(&(11u32, first)).map(|f| f.balance), + Some(10_000), + "first recovered address must apply in the first replay pass" + ); + assert_eq!( + result.found.get(&(12u32, second)).map(|f| f.balance), + Some(20_000), + "second chained address must apply in the same end-of-pass refresh" + ); + assert_eq!( + provider.found, + vec![ + ( + 11u32, + first, + AddressFunds { + nonce: 0, + balance: 10_000, + }, + ), + ( + 12u32, + second, + AddressFunds { + nonce: 0, + balance: 20_000, + }, + ), + ], + "replay callbacks must preserve buffered change order across chained discovery" + ); + assert_eq!( + provider.pending_polls.get(), + 2, + "refresh should re-poll once after the first recovered address exposes the next" + ); + } + /// A foreign address (not in the lookup, never produced by the /// provider) is silently ignored — no `on_address_found`, no /// `result.found` insert, no `result.absent` mutation, and exactly