From 35925f8e5e78d50a6473eea4f77186a9ab971b2f Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Fri, 15 May 2026 14:18:17 +0200 Subject: [PATCH 1/8] fix(rs-sdk): address-sync no longer silently discards balance changes for post-snapshot addresses (Found-025) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `incremental_catch_up` built its `key_to_tag` lookup once from a single pre-RPC `provider.pending_addresses()` snapshot and passed it by immutable reference into both apply loops. The `if let Some(..) = address_lookup.get(..)` predicate had no `else`, so any balance change the platform returned for an address derived *after* the snapshot was dropped with no log, metric, or error — `result.found` never got it and `on_address_found` was never called. Under concurrent multi-identity funding the derive-fund-sync interleave is routine, which is why e2e gates TK-001/007/013/014 and id_005 flaked here. Extract the two inline apply loops into a pure `pub(crate) apply_address_changes` seam (no Sdk, no network, no async) that returns applied updates plus the addresses absent from the snapshot. The new `apply_block_changes` re-polls `pending_addresses()` when an unknown address appears (mirroring the tree-scan refresh) and replays only the previously-unknown subset, so a fresh receive address is recovered and known-address `AddToCredits` deltas are never double-counted. An address still unknown after the refresh is logged at `warn` — observable, never silently dropped. Known-address behavior is byte-for-byte identical. Adds three deterministic `#[cfg(test)]` regression guards on the pure seam (no proof/Sdk needed): unknown-address surfacing, post-snapshot recovery through the refresh, and delta double-count safety. All three fail on the pre-fix silent-discard logic and pass post-fix. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../rs-sdk/src/platform/address_sync/mod.rs | 552 +++++++++++++++--- 1 file changed, 487 insertions(+), 65 deletions(-) diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index b95fa877443..583f07fff54 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -60,7 +60,8 @@ use dapi_grpc::platform::v0::{ get_recent_compacted_address_balance_changes_request, GetAddressesBranchStateRequest, GetRecentAddressBalanceChangesRequest, GetRecentCompactedAddressBalanceChangesRequest, Proof, }; -use dpp::balances::credits::{BlockAwareCreditOperation, CreditOperation}; +use dpp::address_funds::PlatformAddress; +use dpp::balances::credits::{BlockAwareCreditOperation, CreditOperation, Credits}; use dpp::prelude::AddressNonce; use dpp::version::PlatformVersion; use drive::drive::{Drive, RootTree}; @@ -72,7 +73,7 @@ use rs_dapi_client::{ DapiRequest, ExecutionError, ExecutionResponse, InnerInto, IntoInner, RequestSettings, }; use std::collections::HashMap; -use tracing::{debug, info, trace}; +use tracing::{debug, info, trace, warn}; /// Server limit for compacted address balance changes per request. const COMPACTED_BATCH_LIMIT: usize = 25; @@ -458,8 +459,11 @@ async fn incremental_catch_up( settings: RequestSettings, ) -> Result<(), Error> { // `key_to_tag` is already keyed by raw GroveDB bytes with - // `(tag, address)` values, so it can serve as the lookup directly. - let address_lookup = key_to_tag; + // `(tag, address)` values. Found-025: take an owned, refreshable copy + // so a balance change for an address derived *after* the entry-time + // snapshot can still be resolved by re-polling `pending_addresses()` + // mid-pass (mirrors `after_branch_iteration`'s tree-scan refresh). + let mut address_lookup: HashMap, (P::Tag, P::Address)> = key_to_tag.clone(); let mut current_height = start_height; let mut observed_tip_height = start_height; @@ -614,39 +618,17 @@ async fn incremental_catch_up( result.metrics.compacted_entries_returned += entry_count; for entry in &entries { - for (platform_addr, credit_op) in &entry.changes { - let addr_bytes = platform_addr.to_bytes(); - if let Some(&(tag, address)) = address_lookup.get(&addr_bytes) { - let result_key = (tag, address); - let current_balance = result - .found - .get(&result_key) - .map(|f| f.balance) - .unwrap_or(0); - - let new_balance = match credit_op { - BlockAwareCreditOperation::SetCredits(credits) => *credits, - BlockAwareCreditOperation::AddToCreditsOperations(operations) => { - let total_to_add: u64 = operations - .iter() - .filter(|(height, _)| **height >= current_height) - .map(|(_, credits)| *credits) - .fold(0u64, |acc, c| acc.saturating_add(c)); - current_balance.saturating_add(total_to_add) - } - }; - - if new_balance != current_balance { - let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); - let funds = AddressFunds { - nonce, - balance: new_balance, - }; - result.found.insert(result_key, funds); - provider.on_address_found(tag, &address, funds).await; - } - } - } + apply_block_changes( + &mut address_lookup, + entry + .changes + .iter() + .map(|(a, op)| (a, AddressBalanceChange::Compacted(op))), + current_height, + provider, + result, + ) + .await; if entry.end_block_height.saturating_add(1) > current_height { current_height = entry.end_block_height.saturating_add(1); @@ -677,34 +659,17 @@ async fn incremental_catch_up( highest_recent_block = entry.block_height; } - for (platform_addr, credit_op) in &entry.changes { - let addr_bytes = platform_addr.to_bytes(); - if let Some(&(tag, address)) = address_lookup.get(&addr_bytes) { - let result_key = (tag, address); - let current_balance = result - .found - .get(&result_key) - .map(|f| f.balance) - .unwrap_or(0); - - let new_balance = match credit_op { - CreditOperation::SetCredits(credits) => *credits, - CreditOperation::AddToCredits(credits) => { - current_balance.saturating_add(*credits) - } - }; - - if new_balance != current_balance { - let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); - let funds = AddressFunds { - nonce, - balance: new_balance, - }; - result.found.insert(result_key, funds); - provider.on_address_found(tag, &address, funds).await; - } - } - } + apply_block_changes( + &mut address_lookup, + entry + .changes + .iter() + .map(|(a, op)| (a, AddressBalanceChange::Recent(op))), + current_height, + provider, + result, + ) + .await; if entry.block_height.saturating_add(1) > current_height { current_height = entry.block_height.saturating_add(1); @@ -723,6 +688,198 @@ async fn incremental_catch_up( Ok(()) } +// ── Pure changes-application seam (Found-025) ───────────────────────── + +/// A single address balance change, abstracting the recent (`CreditOperation`) +/// and compacted (`BlockAwareCreditOperation`) shapes so one pure function can +/// apply both phases identically. +#[derive(Clone, Copy)] +pub(crate) enum AddressBalanceChange<'a> { + /// A recent (per-block) credit operation. + Recent(&'a CreditOperation), + /// A compacted (block-range) credit operation. + Compacted(&'a BlockAwareCreditOperation), +} + +impl AddressBalanceChange<'_> { + /// Resolve the post-change balance given the address's current balance and + /// the catch-up cursor height. Mirrors the two original inline loops + /// exactly (compacted height-filtered sum vs. recent flat add). + fn new_balance(&self, current_balance: Credits, current_height: u64) -> Credits { + match self { + AddressBalanceChange::Recent(op) => match op { + CreditOperation::SetCredits(credits) => *credits, + CreditOperation::AddToCredits(credits) => current_balance.saturating_add(*credits), + }, + AddressBalanceChange::Compacted(op) => match op { + BlockAwareCreditOperation::SetCredits(credits) => *credits, + BlockAwareCreditOperation::AddToCreditsOperations(operations) => { + let total_to_add: u64 = operations + .iter() + .filter(|(height, _)| **height >= current_height) + .map(|(_, credits)| *credits) + .fold(0u64, |acc, c| acc.saturating_add(c)); + current_balance.saturating_add(total_to_add) + } + }, + } + } +} + +/// Outcome of applying one block's address balance changes. +/// +/// Carries the applied updates (so the caller can drive the async +/// `on_address_found` callback outside this pure function) and — the +/// Found-025 fix — the addresses the platform reported a change for but +/// that were absent from the lookup snapshot, so they are never silently +/// discarded. +pub(crate) struct AppliedAddressChanges { + /// `(tag, address, funds)` triples whose balance actually moved. + pub applied: Vec<(Tag, Address, AddressFunds)>, + /// Raw GroveDB key bytes the platform returned a change for but which + /// were not in `address_lookup` (post-snapshot / unregistered). + pub unknown: Vec>, +} + +/// Apply one block's worth of address balance changes against the lookup. +/// +/// Pure: no `Sdk`, no network, no async. Updates `result.found` for every +/// changed known address and returns the applied triples plus any unknown +/// addresses (Found-025: the unknown set makes a post-snapshot address +/// observable instead of silently dropped). +/// +/// `current_height` is the catch-up cursor used by the compacted height +/// filter; it is ignored for recent changes. +pub(crate) fn apply_address_changes<'a, Tag, Address, I>( + address_lookup: &HashMap, (Tag, Address)>, + changes: I, + current_height: u64, + result: &mut AddressSyncResult, +) -> AppliedAddressChanges +where + Tag: Copy + Ord, + Address: AddressToBytes, + I: IntoIterator)>, +{ + let mut applied = Vec::new(); + let mut unknown = Vec::new(); + + for (platform_addr, change) in changes { + let addr_bytes = platform_addr.to_bytes(); + if let Some(&(tag, address)) = address_lookup.get(&addr_bytes) { + let result_key = (tag, address); + let current_balance = result + .found + .get(&result_key) + .map(|f| f.balance) + .unwrap_or(0); + + let new_balance = change.new_balance(current_balance, current_height); + + if new_balance != current_balance { + let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); + let funds = AddressFunds { + nonce, + balance: new_balance, + }; + result.found.insert(result_key, funds); + applied.push((tag, address, funds)); + } + } else { + // Found-025: the platform returned a chain-confirmed balance + // change for an address absent from the pre-RPC snapshot. The + // old code dropped it silently (no else). Surface it so the + // caller can refresh the lookup and re-apply instead. + unknown.push(addr_bytes); + } + } + + AppliedAddressChanges { applied, unknown } +} + +/// Apply one block's changes, drive the provider's `on_address_found` +/// callbacks, and — Found-025 — recover addresses missing from the +/// snapshot by re-polling `pending_addresses()` and applying only the +/// previously-unknown changes. +/// +/// `changes` is collected once so the unknown subset can be replayed +/// after a refresh. Known addresses behave exactly as before: they are +/// applied in the first pass and excluded from the replay, so a delta +/// (`AddToCredits`) is never double-counted. An address the platform +/// reported but that is still unknown after the refresh is logged at +/// `warn` (observable, never silently dropped). +async fn apply_block_changes<'a, P, I>( + address_lookup: &mut HashMap, (P::Tag, P::Address)>, + changes: I, + current_height: u64, + provider: &mut P, + result: &mut AddressSyncResult, +) where + P: AddressProvider, + I: IntoIterator)>, +{ + let changes: Vec<(&PlatformAddress, AddressBalanceChange<'_>)> = changes.into_iter().collect(); + + let outcome = apply_address_changes( + address_lookup, + changes.iter().map(|(a, c)| (*a, *c)), + current_height, + result, + ); + for (tag, address, funds) in &outcome.applied { + provider.on_address_found(*tag, address, *funds).await; + } + + if outcome.unknown.is_empty() { + return; + } + + // Found-025: the platform returned chain-confirmed balance changes for + // addresses absent from the entry-time snapshot. Re-poll the provider + // (a fresh receive address may have been derived mid-pass), then apply + // ONLY the previously-unknown subset so already-applied known + // addresses are not re-processed (delta double-count safe). + let before = address_lookup.len(); + for (tag, address) in provider.pending_addresses() { + address_lookup + .entry(address.to_bytes()) + .or_insert((tag, address)); + } + + if address_lookup.len() == before { + warn!( + "Address sync: {} platform-reported balance change(s) reference address(es) \ + absent from the provider snapshot and the refresh found no new addresses; \ + they will be resolved on the next full sync (Found-025)", + outcome.unknown.len() + ); + return; + } + + let unknown: std::collections::HashSet<&[u8]> = + outcome.unknown.iter().map(|b| b.as_slice()).collect(); + let replay = apply_address_changes( + address_lookup, + changes + .iter() + .filter(|(a, _)| unknown.contains(a.to_bytes().as_slice())) + .map(|(a, c)| (*a, *c)), + current_height, + result, + ); + for (tag, address, funds) in &replay.applied { + provider.on_address_found(*tag, address, *funds).await; + } + if !replay.unknown.is_empty() { + warn!( + "Address sync: {} platform-reported balance change(s) still reference \ + address(es) absent from the provider snapshot after refresh; they \ + will be resolved on the next full sync (Found-025)", + replay.unknown.len() + ); + } +} + /// Extract the highest block height from the recent tree boundaries in the proof. /// /// Returns: @@ -1381,4 +1538,269 @@ mod tests { "expected balance conversion error, got: {err:?}" ); } + + // ── Found-025 regression guards ────────────────────────────────── + // + // Found-025: `incremental_catch_up` built `key_to_tag` once from a + // single pre-RPC `pending_addresses()` snapshot and the apply loops + // had no `else` on the lookup miss — a balance change the platform + // returned for an address derived *after* the snapshot was silently + // dropped (no log, no metric, `result.found` never got it). These + // tests pin the corrected behavior via the pure `pub(crate)` seam, + // no `Sdk`/proof/network needed. Pre-fix logic had no `unknown` + // channel and no provider refresh, so both tests below would fail + // on the old code (the post-snapshot address would never surface). + + use dpp::address_funds::PlatformAddress; + use dpp::balances::credits::{BlockAwareCreditOperation, CreditOperation}; + + fn p2pkh(byte: u8) -> PlatformAddress { + PlatformAddress::P2pkh([byte; 20]) + } + + /// Pure-seam guard: a balance change for an address absent from the + /// stale lookup is surfaced via `unknown` (never silently dropped), + /// while a known address still applies exactly as before. + #[test] + fn found_025_apply_address_changes_surfaces_unknown_address() { + let known = p2pkh(0xAA); + let post_snapshot = p2pkh(0xBB); + + // Stale snapshot: contains `known`, MISSING `post_snapshot` + // (derived after the snapshot was taken). + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(known.to_bytes(), (1u32, known)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + + let known_op = CreditOperation::SetCredits(5_000); + let post_op = CreditOperation::SetCredits(9_000); + let changes = [ + (&known, AddressBalanceChange::Recent(&known_op)), + (&post_snapshot, AddressBalanceChange::Recent(&post_op)), + ]; + + let outcome = apply_address_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut result, + ); + + // Known address: applied exactly as the old inline loop did. + assert_eq!( + result.found.get(&(1u32, known)).map(|f| f.balance), + Some(5_000), + "known address must still apply identically (no regression)" + ); + assert_eq!( + outcome.applied, + vec![( + 1u32, + known, + AddressFunds { + nonce: 0, + balance: 5_000 + } + )] + ); + + // Found-025: the post-snapshot address is NOT silently dropped — + // it is reported in `unknown`. Pre-fix there was no `unknown` + // channel at all, so this assertion could not even be written + // and the change vanished without a trace. + assert_eq!( + outcome.unknown, + vec![post_snapshot.to_bytes()], + "post-snapshot address must be observable, not silently discarded" + ); + assert!( + !result.found.contains_key(&(2u32, post_snapshot)), + "unresolved address must not be applied with a guessed tag" + ); + } + + /// End-to-end guard through `apply_block_changes`: a provider that + /// derives a fresh address mid-pass (so the entry-time lookup misses + /// it) gets the balance applied AND `on_address_found` fired after + /// the in-pass refresh. This is the exact Found-025 scenario. + #[tokio::test] + async fn found_025_apply_block_changes_recovers_post_snapshot_address() { + use async_trait::async_trait; + + struct GrowingProvider { + // The address was derived mid-pass — *after* the entry-time + // snapshot (the empty `lookup` passed in) but before the + // Found-025 refresh poll, so `pending_addresses()` yields it. + late: PlatformAddress, + found: Vec<(u32, PlatformAddress, AddressFunds)>, + } + + #[async_trait] + impl AddressProvider for GrowingProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + std::iter::once((7u32, self.late)) + } + + async fn on_address_found( + &mut self, + tag: Self::Tag, + address: &Self::Address, + funds: AddressFunds, + ) { + self.found.push((tag, *address, funds)); + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let late = p2pkh(0xCD); + + // Entry-time snapshot (built before the RPC) is empty — exactly + // the Found-025 race window: `late` was derived after this. + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + + let mut provider = GrowingProvider { + late, + found: Vec::new(), + }; + let mut result: AddressSyncResult = AddressSyncResult::new(); + + // The platform returns a chain-confirmed balance for `late`. + let op = BlockAwareCreditOperation::SetCredits(42_000); + let changes = [(&late, AddressBalanceChange::Compacted(&op))]; + + apply_block_changes( + &mut lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + ) + .await; + + // Pre-fix: `late` absent from the snapshot → silently dropped: + // `result.found` empty, `on_address_found` never called. + // Post-fix: the in-pass refresh picks `late` up and the change + // is applied and surfaced. + assert_eq!( + result.found.get(&(7u32, late)).map(|f| f.balance), + Some(42_000), + "post-snapshot address balance must be applied after refresh (Found-025)" + ); + assert!( + provider + .found + .iter() + .any(|(t, a, f)| *t == 7 && *a == late && f.balance == 42_000), + "on_address_found must fire for the recovered post-snapshot address" + ); + } + + /// The Found-025 refresh must not double-count a known address's + /// `AddToCredits` delta when it replays the unknown subset in the + /// same block (the replay must exclude already-applied addresses). + #[tokio::test] + async fn found_025_known_delta_not_double_counted_on_refresh() { + use async_trait::async_trait; + + let known = p2pkh(0x11); + let late = p2pkh(0x22); + + struct GrowingProvider { + late: PlatformAddress, + } + + #[async_trait] + impl AddressProvider for GrowingProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + std::iter::once((9u32, self.late)) + } + + async fn on_address_found( + &mut self, + _tag: Self::Tag, + _address: &Self::Address, + _funds: AddressFunds, + ) { + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + // `known` is in the snapshot with a starting balance; `late` is + // not (post-snapshot) and forces the refresh + replay path. + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(known.to_bytes(), (3u32, known)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + result.found.insert( + (3u32, known), + AddressFunds { + nonce: 0, + balance: 1_000, + }, + ); + + let mut provider = GrowingProvider { late }; + + // Same block: a +500 delta for the known address, and a set for + // the post-snapshot address (the latter triggers the replay). + let known_op = BlockAwareCreditOperation::AddToCreditsOperations( + std::iter::once((0u64, 500u64)).collect(), + ); + let late_op = BlockAwareCreditOperation::SetCredits(7_000); + let changes = [ + (&known, AddressBalanceChange::Compacted(&known_op)), + (&late, AddressBalanceChange::Compacted(&late_op)), + ]; + + apply_block_changes( + &mut lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + ) + .await; + + // Known delta applied exactly once: 1000 + 500 (NOT 1000 + 500 + + // 500). The replay must skip the already-applied known address. + assert_eq!( + result.found.get(&(3u32, known)).map(|f| f.balance), + Some(1_500), + "known AddToCredits delta must apply exactly once across refresh" + ); + assert_eq!( + result.found.get(&(9u32, late)).map(|f| f.balance), + Some(7_000), + "post-snapshot address still recovered after refresh" + ); + } } From aef52d9522d89cec2ea9e7eeff6db289253cf8a2 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 21 May 2026 11:14:13 +0200 Subject: [PATCH 2/8] fix(rs-sdk): keep address-sync found/absent disjoint on catch-up (CMT-001) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The tree scan can prove an address absent (`result.absent.insert(...)` at L171 / L223) and a later chain-confirmed balance change in the catch-up phase can legitimately re-discover that same address. The catch-up path only wrote to `result.found`, leaving the stale `absent` marker in place — the same `(tag, address)` then appeared in both sets and the final `AddressSyncResult` was internally inconsistent. Drop the stale `absent` entry before inserting into `found`. Symmetric direction (`found → absent`) doesn't exist: no `result.found.remove` call is reachable from any code path, and "absent" means proven-not-in- tree (not "balance is 0"), so a found-with-balance-0 stays in `found`. Adds two regression tests: one at the `apply_address_changes` seam and one at the `apply_block_changes` level, both asserting `found` and `absent` are disjoint after the catch-up rediscovers a previously-absent address. Addresses CodeRabbit https://github.com/dashpay/platform/pull/3650#discussion_r3264233458 🤖 Co-authored by [Claudius the Magnificent](https://github.com/lklimek/claudius) AI Agent --- .../rs-sdk/src/platform/address_sync/mod.rs | 135 ++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index 583f07fff54..12d36747473 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -782,6 +782,11 @@ where nonce, balance: new_balance, }; + // CMT-001: an address proven absent by the tree scan + // may legitimately become found here on a chain-confirmed + // balance change. Drop the stale `absent` marker so + // `found` and `absent` stay disjoint. + result.absent.remove(&result_key); result.found.insert(result_key, funds); applied.push((tag, address, funds)); } @@ -1710,6 +1715,136 @@ mod tests { ); } + /// CMT-001 (coderabbitai): an address marked `absent` by the tree + /// scan and then re-discovered as `found` during the catch-up phase + /// must leave the two sets disjoint. Pre-fix the catch-up only + /// inserted into `found` and never pruned `absent`, so the same + /// `(tag, address)` could appear in both — internally inconsistent. + #[test] + fn cmt_001_catch_up_prunes_absent_when_address_is_rediscovered() { + let tag: u32 = 1; + let addr = p2pkh(0x42); + + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(addr.to_bytes(), (tag, addr)); + + // Tree scan proved the address absent (matches the path at L171 + // / L223 in this file). + let mut result: AddressSyncResult = AddressSyncResult::new(); + result.absent.insert((tag, addr)); + + // Catch-up: the platform reports a chain-confirmed balance for + // the same address that the tree scan had marked absent. + let op = CreditOperation::SetCredits(12_345); + let changes = [(&addr, AddressBalanceChange::Recent(&op))]; + + let outcome = apply_address_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut result, + ); + + assert_eq!( + outcome.applied, + vec![( + tag, + addr, + AddressFunds { + nonce: 0, + balance: 12_345, + } + )] + ); + assert_eq!( + result.found.get(&(tag, addr)).map(|f| f.balance), + Some(12_345), + "rediscovered address must be in `found`" + ); + assert!( + !result.absent.contains(&(tag, addr)), + "rediscovered address must be pruned from `absent` (CMT-001)" + ); + + // Stronger invariant: the two sets are globally disjoint. + for key in result.found.keys() { + assert!( + !result.absent.contains(key), + "found ∩ absent must be empty (CMT-001): {key:?} in both" + ); + } + } + + /// End-to-end CMT-001 guard via `apply_block_changes`: the same + /// invariant must hold after the full per-block apply path runs + /// (including the Found-025 refresh/replay branch). + #[tokio::test] + async fn cmt_001_apply_block_changes_keeps_found_and_absent_disjoint() { + use async_trait::async_trait; + + struct NoopProvider; + + #[async_trait] + impl AddressProvider for NoopProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + std::iter::empty() + } + + async fn on_address_found( + &mut self, + _tag: Self::Tag, + _address: &Self::Address, + _funds: AddressFunds, + ) { + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let tag: u32 = 5; + let addr = p2pkh(0x99); + + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(addr.to_bytes(), (tag, addr)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + result.absent.insert((tag, addr)); + + let op = BlockAwareCreditOperation::SetCredits(7_777); + let changes = [(&addr, AddressBalanceChange::Compacted(&op))]; + + apply_block_changes( + &mut lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut NoopProvider, + &mut result, + ) + .await; + + assert_eq!( + result.found.get(&(tag, addr)).map(|f| f.balance), + Some(7_777), + ); + assert!( + !result.absent.contains(&(tag, addr)), + "apply_block_changes must keep found/absent disjoint (CMT-001)" + ); + } + /// The Found-025 refresh must not double-count a known address's /// `AddToCredits` delta when it replays the unknown subset in the /// same block (the replay must exclude already-applied addresses). From 84518fcb8e56d6cd2d19650dd9d7c4f6293305f5 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 21 May 2026 13:00:56 +0200 Subject: [PATCH 3/8] fix(rs-sdk): batch foreign-address refresh + drop hot-path allocations (CMT-001/CMT-004/CMT-005/CMT-006/CMT-007) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-fix `apply_block_changes` refreshed `provider.pending_addresses()` once per block and emitted a `warn!` claiming "addresses will be resolved on the next full sync (Found-025)" for every snapshot miss. `GetRecentAddressBalanceChanges{,Compacted}RequestV0` carry no per-wallet filter, so on a populated multi-wallet chain every other wallet's address tripped that branch — refresh storm + warn-log flood on the operator's box for legitimately-not-mine addresses. This rewrites the catch-up flow to do exactly one refresh per call: * `apply_block_changes` now takes a borrowed `&HashMap<...>` lookup and appends each miss to a caller-provided `Vec`. It still applies all known-address hits synchronously so the per-block cursor / delta semantics are preserved. * `incremental_catch_up` threads one `pending_unknown` buffer through both phases (compacted then recent) and calls a new `refresh_and_replay_unknown` exactly once at end-of-pass. * `refresh_and_replay_unknown` polls `pending_addresses()` once, keeps only those whose key actually matches a buffered miss (CMT-006: a precise intersection check replaces the imprecise `len() == before` proxy), and replays only those entries. Foreign addresses fall out at the intersection step — no allocation beyond the buffered misses themselves. * Foreign-address logging is demoted from `warn!` to `debug!` and the message no longer claims "next full sync will resolve it" (CMT-007 — no sync resolves an address the wallet never derived). * The unconditional `address_lookup = key_to_tag.clone()` is gone (CMT-005), as is the unconditional per-block `Vec` materialization (CMT-004) — only buffered misses allocate, and `key_to_tag` is shared read-only through both apply phases. Also adds a `TODO(CMT-002)` at the two nonce=0 synthesis sites (per-block apply + replay): the incremental RPCs carry no nonces, so addresses first surfaced through this path persist with `nonce=0` and clients must rely on `AddressInvalidNonceError.expected_nonce` to recover. Resolution wants either authoritative `AddressFunds` fetch for recovered addresses or modelling `nonce` as `Option` — deferred out of this PR (cross-cutting change). The pre-refactor `pub(crate) fn apply_address_changes` / `AppliedAddressChanges` seam is removed — the only callers were two unit tests with redundant invariants now covered by the end-to-end `apply_block_changes` + `refresh_and_replay_unknown` regression guards. Existing CMT-001 / Found-025 regression tests (recovers post-snapshot address, no double-counted delta, found ∩ absent disjoint) updated to call the new two-step API; all pass. CMT-003 regression guard (foreign addresses ignored, no refresh storm) follows in the next commit. Addresses thepastaclaw review at https://github.com/dashpay/platform/pull/3650#pullrequestreview-4335687567 🤖 Co-authored by [Claudius the Magnificent](https://github.com/lklimek/claudius) AI Agent --- .../rs-sdk/src/platform/address_sync/mod.rs | 469 ++++++++---------- 1 file changed, 199 insertions(+), 270 deletions(-) diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index 12d36747473..ca0b7addbd3 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -73,7 +73,7 @@ use rs_dapi_client::{ DapiRequest, ExecutionError, ExecutionResponse, InnerInto, IntoInner, RequestSettings, }; use std::collections::HashMap; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, trace}; /// Server limit for compacted address balance changes per request. const COMPACTED_BATCH_LIMIT: usize = 25; @@ -458,12 +458,11 @@ async fn incremental_catch_up( result: &mut AddressSyncResult, settings: RequestSettings, ) -> Result<(), Error> { - // `key_to_tag` is already keyed by raw GroveDB bytes with - // `(tag, address)` values. Found-025: take an owned, refreshable copy - // so a balance change for an address derived *after* the entry-time - // snapshot can still be resolved by re-polling `pending_addresses()` - // mid-pass (mirrors `after_branch_iteration`'s tree-scan refresh). - let mut address_lookup: HashMap, (P::Tag, P::Address)> = key_to_tag.clone(); + // Use the borrowed `key_to_tag` directly through the pass — only + // unknown-address replay (rare, end-of-pass) materializes any extra + // allocation. Buffered misses are bounded by the count of foreign / + // post-snapshot addresses in the response. + let mut pending_unknown: Vec = Vec::new(); let mut current_height = start_height; let mut observed_tip_height = start_height; @@ -619,7 +618,7 @@ async fn incremental_catch_up( for entry in &entries { apply_block_changes( - &mut address_lookup, + key_to_tag, entry .changes .iter() @@ -627,6 +626,7 @@ async fn incremental_catch_up( current_height, provider, result, + &mut pending_unknown, ) .await; @@ -660,7 +660,7 @@ async fn incremental_catch_up( } apply_block_changes( - &mut address_lookup, + key_to_tag, entry .changes .iter() @@ -668,6 +668,7 @@ async fn incremental_catch_up( current_height, provider, result, + &mut pending_unknown, ) .await; @@ -677,6 +678,11 @@ async fn incremental_catch_up( } } + // Single end-of-pass Found-025 recovery (CMT-001/CMT-006/CMT-007): + // foreign-wallet addresses fall out cheaply at the extras-intersection + // check, so no refresh storm and no warn-log flood on multi-wallet chains. + refresh_and_replay_unknown(key_to_tag, pending_unknown, provider, result).await; + result.new_sync_height = current_height.max(observed_tip_height); // Store the highest block from the recent entries so the next sync can // use RangeAfter(this_height) for compaction detection. @@ -724,45 +730,69 @@ impl AddressBalanceChange<'_> { }, } } + + /// Owned snapshot of the change for end-of-pass replay. Cheap for + /// `Recent` (the inner op is `Copy`); clones the operations vector for + /// `Compacted`. Only called for unknown addresses. + fn into_owned(self) -> OwnedAddressBalanceChange { + match self { + AddressBalanceChange::Recent(op) => OwnedAddressBalanceChange::Recent(*op), + AddressBalanceChange::Compacted(op) => OwnedAddressBalanceChange::Compacted(op.clone()), + } + } } -/// Outcome of applying one block's address balance changes. -/// -/// Carries the applied updates (so the caller can drive the async -/// `on_address_found` callback outside this pure function) and — the -/// Found-025 fix — the addresses the platform reported a change for but -/// that were absent from the lookup snapshot, so they are never silently -/// discarded. -pub(crate) struct AppliedAddressChanges { - /// `(tag, address, funds)` triples whose balance actually moved. - pub applied: Vec<(Tag, Address, AddressFunds)>, - /// Raw GroveDB key bytes the platform returned a change for but which - /// were not in `address_lookup` (post-snapshot / unregistered). - pub unknown: Vec>, +/// Owned counterpart of [`AddressBalanceChange`] so unknown-address changes +/// can outlive the per-block iterator and be replayed at end-of-pass. +#[derive(Clone)] +pub(crate) enum OwnedAddressBalanceChange { + Recent(CreditOperation), + Compacted(BlockAwareCreditOperation), } -/// Apply one block's worth of address balance changes against the lookup. -/// -/// Pure: no `Sdk`, no network, no async. Updates `result.found` for every -/// changed known address and returns the applied triples plus any unknown -/// addresses (Found-025: the unknown set makes a post-snapshot address -/// observable instead of silently dropped). +impl OwnedAddressBalanceChange { + fn as_borrowed(&self) -> AddressBalanceChange<'_> { + match self { + OwnedAddressBalanceChange::Recent(op) => AddressBalanceChange::Recent(op), + OwnedAddressBalanceChange::Compacted(op) => AddressBalanceChange::Compacted(op), + } + } +} + +/// A single change for an address that wasn't in the entry-time snapshot. +/// Buffered across the catch-up pass and replayed once at the end after a +/// single `pending_addresses()` refresh (Found-025, CMT-001). +pub(crate) struct PendingUnknownChange { + /// Raw GroveDB key bytes — joined against the refreshed lookup. + key: Vec, + /// Owned change so the underlying response entries can be dropped. + change: OwnedAddressBalanceChange, + /// Catch-up cursor at the time of the original block — feeds the + /// compacted height filter on replay. Ignored by `Recent`. + current_height: u64, +} + +/// Apply one block's changes against the borrowed entry-time lookup, drive +/// `on_address_found` for every known address whose balance moved, and +/// append unknown-address changes to `pending_unknown` for a single +/// end-of-pass refresh + replay (CMT-001). /// -/// `current_height` is the catch-up cursor used by the compacted height -/// filter; it is ignored for recent changes. -pub(crate) fn apply_address_changes<'a, Tag, Address, I>( - address_lookup: &HashMap, (Tag, Address)>, +/// Pre-refactor this function refreshed the provider per-block; on +/// populated multi-wallet chains that meant a refresh storm + warn-log +/// flood, because every other wallet's address looked "unknown". The +/// refresh now runs exactly once at the end of `incremental_catch_up`. +async fn apply_block_changes<'a, P, I>( + address_lookup: &HashMap, (P::Tag, P::Address)>, changes: I, current_height: u64, - result: &mut AddressSyncResult, -) -> AppliedAddressChanges -where - Tag: Copy + Ord, - Address: AddressToBytes, + provider: &mut P, + result: &mut AddressSyncResult, + pending_unknown: &mut Vec, +) where + P: AddressProvider, I: IntoIterator)>, { - let mut applied = Vec::new(); - let mut unknown = Vec::new(); + let mut local_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); for (platform_addr, change) in changes { let addr_bytes = platform_addr.to_bytes(); @@ -777,110 +807,121 @@ where let new_balance = change.new_balance(current_balance, current_height); if new_balance != current_balance { + // TODO(CMT-002): synthesized nonce=0 (see same TODO in + // `apply_address_changes` for full rationale). let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); let funds = AddressFunds { nonce, balance: new_balance, }; - // CMT-001: an address proven absent by the tree scan - // may legitimately become found here on a chain-confirmed - // balance change. Drop the stale `absent` marker so - // `found` and `absent` stay disjoint. result.absent.remove(&result_key); result.found.insert(result_key, funds); - applied.push((tag, address, funds)); + local_applied.push((tag, address, funds)); } } else { - // Found-025: the platform returned a chain-confirmed balance - // change for an address absent from the pre-RPC snapshot. The - // old code dropped it silently (no else). Surface it so the - // caller can refresh the lookup and re-apply instead. - unknown.push(addr_bytes); + pending_unknown.push(PendingUnknownChange { + key: addr_bytes, + change: change.into_owned(), + current_height, + }); } } - AppliedAddressChanges { applied, unknown } + for (tag, address, funds) in &local_applied { + provider.on_address_found(*tag, address, *funds).await; + } } -/// Apply one block's changes, drive the provider's `on_address_found` -/// callbacks, and — Found-025 — recover addresses missing from the -/// snapshot by re-polling `pending_addresses()` and applying only the -/// previously-unknown changes. -/// -/// `changes` is collected once so the unknown subset can be replayed -/// after a refresh. Known addresses behave exactly as before: they are -/// applied in the first pass and excluded from the replay, so a delta -/// (`AddToCredits`) is never double-counted. An address the platform -/// reported but that is still unknown after the refresh is logged at -/// `warn` (observable, never silently dropped). -async fn apply_block_changes<'a, P, I>( - address_lookup: &mut HashMap, (P::Tag, P::Address)>, - changes: I, - current_height: u64, +/// End-of-pass Found-025 recovery. Re-polls `pending_addresses()` exactly +/// once, builds a small `extras` map of newly-derived addresses, and +/// replays only the buffered changes that match an extras entry. Foreign +/// (other-wallet) addresses fall out cheaply at the intersection check — +/// no refresh storm, no warn-log flood (CMT-001/CMT-006/CMT-007). +async fn refresh_and_replay_unknown( + key_to_tag: &HashMap, (P::Tag, P::Address)>, + pending_unknown: Vec, provider: &mut P, result: &mut AddressSyncResult, -) where - P: AddressProvider, - I: IntoIterator)>, -{ - let changes: Vec<(&PlatformAddress, AddressBalanceChange<'_>)> = changes.into_iter().collect(); - - let outcome = apply_address_changes( - address_lookup, - changes.iter().map(|(a, c)| (*a, *c)), - current_height, - result, - ); - for (tag, address, funds) in &outcome.applied { - provider.on_address_found(*tag, address, *funds).await; - } - - if outcome.unknown.is_empty() { +) { + if pending_unknown.is_empty() { return; } - // Found-025: the platform returned chain-confirmed balance changes for - // addresses absent from the entry-time snapshot. Re-poll the provider - // (a fresh receive address may have been derived mid-pass), then apply - // ONLY the previously-unknown subset so already-applied known - // addresses are not re-processed (delta double-count safe). - let before = address_lookup.len(); + // Build the set of unknown keys for a fast intersection probe. + let unknown_keys: std::collections::HashSet<&[u8]> = + pending_unknown.iter().map(|p| p.key.as_slice()).collect(); + + // Only addresses the provider can now produce AND that match a + // buffered miss are interesting — everything else is some other + // wallet's address and stays out of the lookup entirely. + let mut extras: HashMap, (P::Tag, P::Address)> = HashMap::new(); for (tag, address) in provider.pending_addresses() { - address_lookup - .entry(address.to_bytes()) - .or_insert((tag, address)); + let bytes = address.to_bytes(); + if unknown_keys.contains(bytes.as_slice()) && !key_to_tag.contains_key(&bytes) { + extras.insert(bytes, (tag, address)); + } } - if address_lookup.len() == before { - warn!( - "Address sync: {} platform-reported balance change(s) reference address(es) \ - absent from the provider snapshot and the refresh found no new addresses; \ - they will be resolved on the next full sync (Found-025)", - outcome.unknown.len() + if extras.is_empty() { + // Common case on a populated multi-wallet chain: every buffered + // unknown belongs to another wallet. Demoted to `debug` so it + // does not flood operator logs (CMT-007). + debug!( + "Address sync: {} platform-reported balance change(s) reference \ + address(es) not tracked by this wallet; ignoring (Found-025)", + pending_unknown.len() ); return; } - let unknown: std::collections::HashSet<&[u8]> = - outcome.unknown.iter().map(|b| b.as_slice()).collect(); - let replay = apply_address_changes( - address_lookup, - changes - .iter() - .filter(|(a, _)| unknown.contains(a.to_bytes().as_slice())) - .map(|(a, c)| (*a, *c)), - current_height, - result, - ); - for (tag, address, funds) in &replay.applied { + // Replay only the entries whose key actually resolves in `extras`. + // Order is preserved (compacted first, then recent — same as the + // forward pass), so `AddToCredits` deltas accumulate correctly. The + // catch-up cursor per change is preserved so the compacted height + // filter still sees the same `current_height` it would have seen on + // the forward pass. + let mut replay_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); + let mut still_unknown: usize = 0; + for pending in &pending_unknown { + let Some(&(tag, address)) = extras.get(pending.key.as_slice()) else { + still_unknown += 1; + continue; + }; + let result_key = (tag, address); + let current_balance = result + .found + .get(&result_key) + .map(|f| f.balance) + .unwrap_or(0); + let new_balance = pending + .change + .as_borrowed() + .new_balance(current_balance, pending.current_height); + + if new_balance != current_balance { + // TODO(CMT-002): same synthesized nonce=0 gap as above. + let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); + let funds = AddressFunds { + nonce, + balance: new_balance, + }; + result.absent.remove(&result_key); + result.found.insert(result_key, funds); + replay_applied.push((tag, address, funds)); + } + } + + for (tag, address, funds) in &replay_applied { provider.on_address_found(*tag, address, *funds).await; } - if !replay.unknown.is_empty() { - warn!( - "Address sync: {} platform-reported balance change(s) still reference \ - address(es) absent from the provider snapshot after refresh; they \ - will be resolved on the next full sync (Found-025)", - replay.unknown.len() + + if still_unknown > 0 { + debug!( + "Address sync: {} platform-reported balance change(s) reference \ + address(es) not tracked by this wallet (refresh recovered {} \ + other(s)); ignoring the untracked entries (Found-025)", + still_unknown, + replay_applied.len() ); } } @@ -1546,97 +1587,28 @@ mod tests { // ── Found-025 regression guards ────────────────────────────────── // - // Found-025: `incremental_catch_up` built `key_to_tag` once from a - // single pre-RPC `pending_addresses()` snapshot and the apply loops - // had no `else` on the lookup miss — a balance change the platform - // returned for an address derived *after* the snapshot was silently - // dropped (no log, no metric, `result.found` never got it). These - // tests pin the corrected behavior via the pure `pub(crate)` seam, - // no `Sdk`/proof/network needed. Pre-fix logic had no `unknown` - // channel and no provider refresh, so both tests below would fail - // on the old code (the post-snapshot address would never surface). + // Found-025: `incremental_catch_up` originally had no `else` on the + // lookup miss — a balance change the platform returned for an address + // derived *after* the entry-time snapshot was silently dropped. The + // guards below pin the corrected end-of-pass refresh + replay shape. use dpp::address_funds::PlatformAddress; - use dpp::balances::credits::{BlockAwareCreditOperation, CreditOperation}; + use dpp::balances::credits::BlockAwareCreditOperation; fn p2pkh(byte: u8) -> PlatformAddress { PlatformAddress::P2pkh([byte; 20]) } - /// Pure-seam guard: a balance change for an address absent from the - /// stale lookup is surfaced via `unknown` (never silently dropped), - /// while a known address still applies exactly as before. - #[test] - fn found_025_apply_address_changes_surfaces_unknown_address() { - let known = p2pkh(0xAA); - let post_snapshot = p2pkh(0xBB); - - // Stale snapshot: contains `known`, MISSING `post_snapshot` - // (derived after the snapshot was taken). - let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); - lookup.insert(known.to_bytes(), (1u32, known)); - - let mut result: AddressSyncResult = AddressSyncResult::new(); - - let known_op = CreditOperation::SetCredits(5_000); - let post_op = CreditOperation::SetCredits(9_000); - let changes = [ - (&known, AddressBalanceChange::Recent(&known_op)), - (&post_snapshot, AddressBalanceChange::Recent(&post_op)), - ]; - - let outcome = apply_address_changes( - &lookup, - changes.iter().map(|(a, c)| (*a, *c)), - 0, - &mut result, - ); - - // Known address: applied exactly as the old inline loop did. - assert_eq!( - result.found.get(&(1u32, known)).map(|f| f.balance), - Some(5_000), - "known address must still apply identically (no regression)" - ); - assert_eq!( - outcome.applied, - vec![( - 1u32, - known, - AddressFunds { - nonce: 0, - balance: 5_000 - } - )] - ); - - // Found-025: the post-snapshot address is NOT silently dropped — - // it is reported in `unknown`. Pre-fix there was no `unknown` - // channel at all, so this assertion could not even be written - // and the change vanished without a trace. - assert_eq!( - outcome.unknown, - vec![post_snapshot.to_bytes()], - "post-snapshot address must be observable, not silently discarded" - ); - assert!( - !result.found.contains_key(&(2u32, post_snapshot)), - "unresolved address must not be applied with a guessed tag" - ); - } - - /// End-to-end guard through `apply_block_changes`: a provider that - /// derives a fresh address mid-pass (so the entry-time lookup misses - /// it) gets the balance applied AND `on_address_found` fired after - /// the in-pass refresh. This is the exact Found-025 scenario. + /// End-to-end guard: a provider that derives a fresh address mid-pass + /// (so the entry-time lookup misses it) gets the balance applied AND + /// `on_address_found` fired after the end-of-pass Found-025 refresh. + /// Mirrors `incremental_catch_up`: per-block apply buffers misses, + /// `refresh_and_replay_unknown` runs once at the end. #[tokio::test] async fn found_025_apply_block_changes_recovers_post_snapshot_address() { use async_trait::async_trait; struct GrowingProvider { - // The address was derived mid-pass — *after* the entry-time - // snapshot (the empty `lookup` passed in) but before the - // Found-025 refresh poll, so `pending_addresses()` yields it. late: PlatformAddress, found: Vec<(u32, PlatformAddress, AddressFunds)>, } @@ -1674,33 +1646,38 @@ mod tests { let late = p2pkh(0xCD); - // Entry-time snapshot (built before the RPC) is empty — exactly - // the Found-025 race window: `late` was derived after this. - let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + let lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); let mut provider = GrowingProvider { late, found: Vec::new(), }; let mut result: AddressSyncResult = AddressSyncResult::new(); + let mut pending_unknown: Vec = Vec::new(); - // The platform returns a chain-confirmed balance for `late`. let op = BlockAwareCreditOperation::SetCredits(42_000); let changes = [(&late, AddressBalanceChange::Compacted(&op))]; apply_block_changes( - &mut lookup, + &lookup, changes.iter().map(|(a, c)| (*a, *c)), 0, &mut provider, &mut result, + &mut pending_unknown, ) .await; - // Pre-fix: `late` absent from the snapshot → silently dropped: - // `result.found` empty, `on_address_found` never called. - // Post-fix: the in-pass refresh picks `late` up and the change - // is applied and surfaced. + // Per-block apply must NOT touch the provider for unknowns — + // the refresh is deferred to end-of-pass (CMT-001). + assert!( + provider.found.is_empty(), + "no on_address_found before end-of-pass refresh" + ); + assert_eq!(pending_unknown.len(), 1, "miss is buffered for replay"); + + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + assert_eq!( result.found.get(&(7u32, late)).map(|f| f.balance), Some(42_000), @@ -1715,69 +1692,8 @@ mod tests { ); } - /// CMT-001 (coderabbitai): an address marked `absent` by the tree - /// scan and then re-discovered as `found` during the catch-up phase - /// must leave the two sets disjoint. Pre-fix the catch-up only - /// inserted into `found` and never pruned `absent`, so the same - /// `(tag, address)` could appear in both — internally inconsistent. - #[test] - fn cmt_001_catch_up_prunes_absent_when_address_is_rediscovered() { - let tag: u32 = 1; - let addr = p2pkh(0x42); - - let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); - lookup.insert(addr.to_bytes(), (tag, addr)); - - // Tree scan proved the address absent (matches the path at L171 - // / L223 in this file). - let mut result: AddressSyncResult = AddressSyncResult::new(); - result.absent.insert((tag, addr)); - - // Catch-up: the platform reports a chain-confirmed balance for - // the same address that the tree scan had marked absent. - let op = CreditOperation::SetCredits(12_345); - let changes = [(&addr, AddressBalanceChange::Recent(&op))]; - - let outcome = apply_address_changes( - &lookup, - changes.iter().map(|(a, c)| (*a, *c)), - 0, - &mut result, - ); - - assert_eq!( - outcome.applied, - vec![( - tag, - addr, - AddressFunds { - nonce: 0, - balance: 12_345, - } - )] - ); - assert_eq!( - result.found.get(&(tag, addr)).map(|f| f.balance), - Some(12_345), - "rediscovered address must be in `found`" - ); - assert!( - !result.absent.contains(&(tag, addr)), - "rediscovered address must be pruned from `absent` (CMT-001)" - ); - - // Stronger invariant: the two sets are globally disjoint. - for key in result.found.keys() { - assert!( - !result.absent.contains(key), - "found ∩ absent must be empty (CMT-001): {key:?} in both" - ); - } - } - /// End-to-end CMT-001 guard via `apply_block_changes`: the same - /// invariant must hold after the full per-block apply path runs - /// (including the Found-025 refresh/replay branch). + /// invariant must hold after the full per-block apply path runs. #[tokio::test] async fn cmt_001_apply_block_changes_keeps_found_and_absent_disjoint() { use async_trait::async_trait; @@ -1826,12 +1742,14 @@ mod tests { let op = BlockAwareCreditOperation::SetCredits(7_777); let changes = [(&addr, AddressBalanceChange::Compacted(&op))]; + let mut pending_unknown: Vec = Vec::new(); apply_block_changes( - &mut lookup, + &lookup, changes.iter().map(|(a, c)| (*a, *c)), 0, &mut NoopProvider, &mut result, + &mut pending_unknown, ) .await; @@ -1843,6 +1761,10 @@ mod tests { !result.absent.contains(&(tag, addr)), "apply_block_changes must keep found/absent disjoint (CMT-001)" ); + assert!( + pending_unknown.is_empty(), + "no unknowns expected for a known address" + ); } /// The Found-025 refresh must not double-count a known address's @@ -1905,8 +1827,6 @@ mod tests { let mut provider = GrowingProvider { late }; - // Same block: a +500 delta for the known address, and a set for - // the post-snapshot address (the latter triggers the replay). let known_op = BlockAwareCreditOperation::AddToCreditsOperations( std::iter::once((0u64, 500u64)).collect(), ); @@ -1916,17 +1836,26 @@ mod tests { (&late, AddressBalanceChange::Compacted(&late_op)), ]; + let mut pending_unknown: Vec = Vec::new(); apply_block_changes( - &mut lookup, + &lookup, changes.iter().map(|(a, c)| (*a, *c)), 0, &mut provider, &mut result, + &mut pending_unknown, ) .await; + // Known address was applied immediately (no longer waits on the + // end-of-pass refresh). `late` is buffered for replay. + assert_eq!(pending_unknown.len(), 1); + + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; // Known delta applied exactly once: 1000 + 500 (NOT 1000 + 500 + - // 500). The replay must skip the already-applied known address. + // 500). The replay must skip the already-applied known address — + // here that is guaranteed structurally because the replay only + // walks the buffered misses, not the full change set. assert_eq!( result.found.get(&(3u32, known)).map(|f| f.balance), Some(1_500), From 9eede09565d23cbc2dea456bb403797d3cf0618d Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 21 May 2026 13:01:27 +0200 Subject: [PATCH 4/8] test(rs-sdk): regression guard for foreign-address handling in apply_block_changes (CMT-003) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pins the CMT-001 fix from the previous commit at the test surface: four "blocks" worth of changes — one for a known wallet address, three for foreign-wallet addresses — must result in (a) zero per-block `pending_addresses()` polls (the refresh is end-of-pass only), (b) exactly one `pending_addresses()` poll from `refresh_and_replay_unknown`, (c) no foreign address in `result.found`, (d) no foreign address in `result.absent`, (e) `on_address_found` fired exactly once (for the known address), and (f) `found` and `absent` still globally disjoint. Pre-refactor this test would have failed on the poll count (per-block refresh storm) and would have stuffed each foreign address through the `unknown` channel + warn-log path. Now it pins the corrected shape on the dominant production case (multi-wallet shared chain). Addresses thepastaclaw https://github.com/dashpay/platform/pull/3650#pullrequestreview-4335687567 🤖 Co-authored by [Claudius the Magnificent](https://github.com/lklimek/claudius) AI Agent --- .../rs-sdk/src/platform/address_sync/mod.rs | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index ca0b7addbd3..4a46d370eac 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -1867,4 +1867,151 @@ mod tests { "post-snapshot address still recovered after refresh" ); } + + // ── CMT-003 regression guards ──────────────────────────────────── + // + // CMT-003 (thepastaclaw): a foreign-wallet address (one the provider + // has never derived and never will) must NOT trigger a refresh storm, + // a warn-log flood, or any insertion into `result.found`. The + // pre-refactor code refreshed per-block and logged at `warn` for + // every cross-wallet address — these guards pin the corrected + // end-of-pass behavior. + + /// A foreign address (not in the lookup, never produced by the + /// provider) is silently ignored — no `on_address_found`, no + /// `result.found` insert, no `result.absent` mutation. + #[tokio::test] + async fn cmt_003_foreign_address_is_ignored_without_refresh_storm() { + use async_trait::async_trait; + + struct CountingNoopProvider { + pending_polls: std::sync::atomic::AtomicUsize, + found_calls: usize, + } + + #[async_trait] + impl AddressProvider for CountingNoopProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + self.pending_polls + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + std::iter::empty() + } + + async fn on_address_found( + &mut self, + _tag: Self::Tag, + _address: &Self::Address, + _funds: AddressFunds, + ) { + self.found_calls += 1; + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let mine = p2pkh(0x01); + let foreign_1 = p2pkh(0xF1); + let foreign_2 = p2pkh(0xF2); + let foreign_3 = p2pkh(0xF3); + + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(mine.to_bytes(), (1u32, mine)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + let mut provider = CountingNoopProvider { + pending_polls: std::sync::atomic::AtomicUsize::new(0), + found_calls: 0, + }; + let mut pending_unknown: Vec = Vec::new(); + + // Three separate "blocks" (representing the per-entry calls + // inside `incremental_catch_up`), every change but the first + // belongs to another wallet. + for (addr, credits) in [ + (&mine, 1_000), + (&foreign_1, 5_000), + (&foreign_2, 5_000), + (&foreign_3, 5_000), + ] { + let op = BlockAwareCreditOperation::SetCredits(credits); + let changes = [(addr, AddressBalanceChange::Compacted(&op))]; + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + &mut pending_unknown, + ) + .await; + } + + // CMT-001 contract: per-block apply must NEVER refresh the + // provider — the refresh runs once, at end of pass. + assert_eq!( + provider + .pending_polls + .load(std::sync::atomic::Ordering::Relaxed), + 0, + "no per-block pending_addresses() polls — refresh is end-of-pass only" + ); + + // The end-of-pass refresh runs exactly once. + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + assert_eq!( + provider + .pending_polls + .load(std::sync::atomic::Ordering::Relaxed), + 1, + "end-of-pass refresh must poll the provider exactly once" + ); + + // Foreign addresses must not surface as `found` or fire callbacks. + assert_eq!( + result.found.len(), + 1, + "only the known address is in `found` (foreign addresses ignored)" + ); + assert_eq!( + result.found.get(&(1u32, mine)).map(|f| f.balance), + Some(1_000), + "known address applied" + ); + assert!( + !result + .found + .keys() + .any(|(_, a)| *a == foreign_1 || *a == foreign_2 || *a == foreign_3), + "no foreign address may be inserted into `result.found`" + ); + assert!( + result.absent.is_empty(), + "foreign addresses must not be marked `absent` either" + ); + assert_eq!( + provider.found_calls, 1, + "on_address_found fires only for the known address" + ); + + // Found ∩ Absent stays disjoint (CMT-001 invariant). + for key in result.found.keys() { + assert!( + !result.absent.contains(key), + "found ∩ absent must be empty (CMT-001): {key:?} in both" + ); + } + } } From 64c19797f08299eb592b284ee882113bd209495c Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 21 May 2026 13:12:15 +0200 Subject: [PATCH 5/8] refactor(rs-sdk): drop internal triage symbols from address-sync code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `CMT-NNN` and `Found-025` were Claudius-internal review tracking labels, not public issue identifiers. External readers won't recognize them, and per project conventions code should describe present-state behaviour directly — the legitimate place for review-thread references is the commit history, not the source. This pass renames test functions, rewrites inline / doc comments, drops the labels from `debug!` log strings, and de-prefixes the two `TODO(CMT-002)` markers (the technical content is preserved; the ticket label is gone). Test names now describe what they assert: * `found_025_apply_block_changes_recovers_post_snapshot_address` → `apply_block_changes_recovers_post_snapshot_address` * `cmt_001_apply_block_changes_keeps_found_and_absent_disjoint` → `apply_block_changes_keeps_found_and_absent_disjoint_on_catch_up` * `found_025_known_delta_not_double_counted_on_refresh` → `refresh_does_not_double_count_known_address_delta` * `cmt_003_foreign_address_is_ignored_without_refresh_storm` → `apply_block_changes_ignores_foreign_address_without_refresh_storm` Verified by `grep -rnE 'CMT-[0-9]+|Found-025|Found025|found_025|cmt_[0-9]+' packages/rs-sdk/src/` — 28 hits before, 0 after. 🤖 Co-authored by [Claudius the Magnificent](https://github.com/lklimek/claudius) AI Agent --- .../rs-sdk/src/platform/address_sync/mod.rs | 102 ++++++++---------- 1 file changed, 44 insertions(+), 58 deletions(-) diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index 4a46d370eac..cd5ec73dc58 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -678,9 +678,9 @@ async fn incremental_catch_up( } } - // Single end-of-pass Found-025 recovery (CMT-001/CMT-006/CMT-007): - // foreign-wallet addresses fall out cheaply at the extras-intersection - // check, so no refresh storm and no warn-log flood on multi-wallet chains. + // Single end-of-pass recovery: foreign-wallet addresses fall out at + // the extras-intersection check, so no per-block refresh and no log + // flood on multi-wallet chains. refresh_and_replay_unknown(key_to_tag, pending_unknown, provider, result).await; result.new_sync_height = current_height.max(observed_tip_height); @@ -694,7 +694,7 @@ async fn incremental_catch_up( Ok(()) } -// ── Pure changes-application seam (Found-025) ───────────────────────── +// ── Address-balance change application ──────────────────────────────── /// A single address balance change, abstracting the recent (`CreditOperation`) /// and compacted (`BlockAwareCreditOperation`) shapes so one pure function can @@ -761,7 +761,7 @@ impl OwnedAddressBalanceChange { /// A single change for an address that wasn't in the entry-time snapshot. /// Buffered across the catch-up pass and replayed once at the end after a -/// single `pending_addresses()` refresh (Found-025, CMT-001). +/// single `pending_addresses()` refresh. pub(crate) struct PendingUnknownChange { /// Raw GroveDB key bytes — joined against the refreshed lookup. key: Vec, @@ -775,12 +775,9 @@ pub(crate) struct PendingUnknownChange { /// Apply one block's changes against the borrowed entry-time lookup, drive /// `on_address_found` for every known address whose balance moved, and /// append unknown-address changes to `pending_unknown` for a single -/// end-of-pass refresh + replay (CMT-001). -/// -/// Pre-refactor this function refreshed the provider per-block; on -/// populated multi-wallet chains that meant a refresh storm + warn-log -/// flood, because every other wallet's address looked "unknown". The -/// refresh now runs exactly once at the end of `incremental_catch_up`. +/// end-of-pass refresh + replay. The refresh is deliberately deferred so +/// foreign-wallet addresses on a shared chain do not trigger a per-block +/// provider poll. async fn apply_block_changes<'a, P, I>( address_lookup: &HashMap, (P::Tag, P::Address)>, changes: I, @@ -807,8 +804,11 @@ async fn apply_block_changes<'a, P, I>( let new_balance = change.new_balance(current_balance, current_height); if new_balance != current_balance { - // TODO(CMT-002): synthesized nonce=0 (see same TODO in - // `apply_address_changes` for full rationale). + // TODO: incremental RPCs carry only balance deltas, never + // nonces — addresses first seen here get nonce=0. Clients + // recover via `AddressInvalidNonceError.expected_nonce`; + // a proper fix would fetch authoritative `AddressFunds` + // or model `nonce` as `Option`. let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); let funds = AddressFunds { nonce, @@ -832,11 +832,12 @@ async fn apply_block_changes<'a, P, I>( } } -/// End-of-pass Found-025 recovery. Re-polls `pending_addresses()` exactly -/// once, builds a small `extras` map of newly-derived addresses, and -/// replays only the buffered changes that match an extras entry. Foreign -/// (other-wallet) addresses fall out cheaply at the intersection check — -/// no refresh storm, no warn-log flood (CMT-001/CMT-006/CMT-007). +/// End-of-pass recovery for addresses missing from the entry-time +/// snapshot. Re-polls `pending_addresses()` exactly once, builds a small +/// `extras` map of newly-derived addresses, and replays only the buffered +/// changes that match an `extras` entry. Foreign (other-wallet) addresses +/// fall out at the intersection check — no provider refresh storm, no +/// log flood. async fn refresh_and_replay_unknown( key_to_tag: &HashMap, (P::Tag, P::Address)>, pending_unknown: Vec, @@ -864,11 +865,10 @@ async fn refresh_and_replay_unknown( if extras.is_empty() { // Common case on a populated multi-wallet chain: every buffered - // unknown belongs to another wallet. Demoted to `debug` so it - // does not flood operator logs (CMT-007). + // unknown belongs to another wallet. debug!( "Address sync: {} platform-reported balance change(s) reference \ - address(es) not tracked by this wallet; ignoring (Found-025)", + address(es) not tracked by this wallet; ignoring", pending_unknown.len() ); return; @@ -899,7 +899,7 @@ async fn refresh_and_replay_unknown( .new_balance(current_balance, pending.current_height); if new_balance != current_balance { - // TODO(CMT-002): same synthesized nonce=0 gap as above. + // TODO: same synthesized nonce=0 gap as the forward pass. let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); let funds = AddressFunds { nonce, @@ -919,7 +919,7 @@ async fn refresh_and_replay_unknown( debug!( "Address sync: {} platform-reported balance change(s) reference \ address(es) not tracked by this wallet (refresh recovered {} \ - other(s)); ignoring the untracked entries (Found-025)", + other(s)); ignoring the untracked entries", still_unknown, replay_applied.len() ); @@ -1585,12 +1585,7 @@ mod tests { ); } - // ── Found-025 regression guards ────────────────────────────────── - // - // Found-025: `incremental_catch_up` originally had no `else` on the - // lookup miss — a balance change the platform returned for an address - // derived *after* the entry-time snapshot was silently dropped. The - // guards below pin the corrected end-of-pass refresh + replay shape. + // ── End-of-pass refresh + replay regression guards ───────────────── use dpp::address_funds::PlatformAddress; use dpp::balances::credits::BlockAwareCreditOperation; @@ -1599,13 +1594,11 @@ mod tests { PlatformAddress::P2pkh([byte; 20]) } - /// End-to-end guard: a provider that derives a fresh address mid-pass - /// (so the entry-time lookup misses it) gets the balance applied AND - /// `on_address_found` fired after the end-of-pass Found-025 refresh. - /// Mirrors `incremental_catch_up`: per-block apply buffers misses, - /// `refresh_and_replay_unknown` runs once at the end. + /// A provider that derives a fresh address mid-pass — so the + /// entry-time lookup misses it — gets the balance applied AND + /// `on_address_found` fired after the end-of-pass refresh. #[tokio::test] - async fn found_025_apply_block_changes_recovers_post_snapshot_address() { + async fn apply_block_changes_recovers_post_snapshot_address() { use async_trait::async_trait; struct GrowingProvider { @@ -1669,7 +1662,7 @@ mod tests { .await; // Per-block apply must NOT touch the provider for unknowns — - // the refresh is deferred to end-of-pass (CMT-001). + // the refresh is deferred to end-of-pass. assert!( provider.found.is_empty(), "no on_address_found before end-of-pass refresh" @@ -1681,7 +1674,7 @@ mod tests { assert_eq!( result.found.get(&(7u32, late)).map(|f| f.balance), Some(42_000), - "post-snapshot address balance must be applied after refresh (Found-025)" + "post-snapshot address balance must be applied after refresh" ); assert!( provider @@ -1692,10 +1685,11 @@ mod tests { ); } - /// End-to-end CMT-001 guard via `apply_block_changes`: the same - /// invariant must hold after the full per-block apply path runs. + /// A known address proven absent by the tree scan but re-discovered + /// by an incremental change is moved into `found` and pruned from + /// `absent`, keeping the two sets disjoint. #[tokio::test] - async fn cmt_001_apply_block_changes_keeps_found_and_absent_disjoint() { + async fn apply_block_changes_keeps_found_and_absent_disjoint_on_catch_up() { use async_trait::async_trait; struct NoopProvider; @@ -1759,7 +1753,7 @@ mod tests { ); assert!( !result.absent.contains(&(tag, addr)), - "apply_block_changes must keep found/absent disjoint (CMT-001)" + "apply_block_changes must keep found/absent disjoint" ); assert!( pending_unknown.is_empty(), @@ -1767,11 +1761,11 @@ mod tests { ); } - /// The Found-025 refresh must not double-count a known address's + /// The end-of-pass refresh must not double-count a known address's /// `AddToCredits` delta when it replays the unknown subset in the /// same block (the replay must exclude already-applied addresses). #[tokio::test] - async fn found_025_known_delta_not_double_counted_on_refresh() { + async fn refresh_does_not_double_count_known_address_delta() { use async_trait::async_trait; let known = p2pkh(0x11); @@ -1868,20 +1862,12 @@ mod tests { ); } - // ── CMT-003 regression guards ──────────────────────────────────── - // - // CMT-003 (thepastaclaw): a foreign-wallet address (one the provider - // has never derived and never will) must NOT trigger a refresh storm, - // a warn-log flood, or any insertion into `result.found`. The - // pre-refactor code refreshed per-block and logged at `warn` for - // every cross-wallet address — these guards pin the corrected - // end-of-pass behavior. - /// A foreign address (not in the lookup, never produced by the /// provider) is silently ignored — no `on_address_found`, no - /// `result.found` insert, no `result.absent` mutation. + /// `result.found` insert, no `result.absent` mutation, and exactly + /// one provider refresh for the whole pass. #[tokio::test] - async fn cmt_003_foreign_address_is_ignored_without_refresh_storm() { + async fn apply_block_changes_ignores_foreign_address_without_refresh_storm() { use async_trait::async_trait; struct CountingNoopProvider { @@ -1959,8 +1945,8 @@ mod tests { .await; } - // CMT-001 contract: per-block apply must NEVER refresh the - // provider — the refresh runs once, at end of pass. + // Per-block apply must NEVER refresh the provider — the refresh + // runs once, at end of pass. assert_eq!( provider .pending_polls @@ -2006,11 +1992,11 @@ mod tests { "on_address_found fires only for the known address" ); - // Found ∩ Absent stays disjoint (CMT-001 invariant). + // `found` and `absent` stay disjoint. for key in result.found.keys() { assert!( !result.absent.contains(key), - "found ∩ absent must be empty (CMT-001): {key:?} in both" + "found ∩ absent must be empty: {key:?} in both" ); } } From a1e456fc250ea406cf20b1fac36ce20f4b7c9a9d Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 25 May 2026 14:52:53 +0200 Subject: [PATCH 6/8] perf(sdk): warn when pending_unknown buffer exceeds 1000 records (PR #3650 CMT-003) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Crikey, foreign-wallet changes on a busy shared chain can pile up in the end-of-pass replay buffer without anyone noticing. We're not chasing premature optimization here — just dropping a one-shot tracing::warn the first time the buffer crosses 1000 entries in a pass, so a future operator can actually see whether this path needs the reviewer's mitigation (a) treatment (keys-only buffer + re-derive on refresh). Additive only — no logic changes, no behavioural shift, no new test. Co-Authored-By: Claude Opus 4.6 --- .../rs-sdk/src/platform/address_sync/mod.rs | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index cd5ec73dc58..048c6e371fb 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -73,7 +73,11 @@ use rs_dapi_client::{ DapiRequest, ExecutionError, ExecutionResponse, InnerInto, IntoInner, RequestSettings, }; use std::collections::HashMap; -use tracing::{debug, info, trace}; +use tracing::{debug, info, trace, warn}; + +/// One-shot warning threshold for the end-of-pass replay buffer +/// (`pending_unknown`). Cross-checked at the push site below. +const PENDING_UNKNOWN_WARN_THRESHOLD: usize = 1000; /// Server limit for compacted address balance changes per request. const COMPACTED_BATCH_LIMIT: usize = 25; @@ -824,6 +828,20 @@ async fn apply_block_changes<'a, P, I>( change: change.into_owned(), current_height, }); + // NOTE: this buffer is intentionally unbounded — premature optimization here + // would couple the catch-up loop to ad-hoc memory heuristics. We log a + // one-shot warning above a generous threshold so a future operator can + // observe whether this path actually exceeds 1000 buffered foreign-wallet + // changes in real workloads; if it does, the right fix is to follow the + // reviewer's mitigation (a) — store only Vec keys and re-derive replay + // changes after the refresh resolves them. See PR #3650 @thepastaclaw review. + if pending_unknown.len() == PENDING_UNKNOWN_WARN_THRESHOLD { + warn!( + "Address sync: pending_unknown buffer reached {} entries — \ + foreign-wallet balance changes are accumulating on a shared chain", + PENDING_UNKNOWN_WARN_THRESHOLD + ); + } } } From 875a55d8fb0ca0c9ec879bc00b66c44b87e578b0 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Mon, 25 May 2026 14:56:05 +0200 Subject: [PATCH 7/8] fix(sdk): bounded-iteration replay catches same-pass gap-extension addresses (PR #3650 CMT-001) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Streuth — the single-pass refresh was leaving freshly-gap-extended addresses on the cutting room floor. If `pending_unknown` buffered both A and A+1, and the provider only exposed A+1 *after* `on_address_found(A, …)` triggered gap extension, the previous implementation polled `pending_addresses()` once, missed A+1, and silently dropped its buffered balance until the next sync. Wrap the refresh+replay in a bounded for-loop (cap 3 iterations, debug! when the cap fires). Each iteration: 1. Re-reads `pending_addresses()`, builds `extras` from the still-unresolved intersection. 2. Replays only matching unresolved buffered entries. 3. Fires `on_address_found` callbacks BEFORE the next refresh so gap extension actually has a chance to surface follow-on addresses to the next poll. 4. Tracks resolved keys so subsequent iterations skip already-applied entries (no double-counted deltas). Adds a regression test (`refresh_loops_until_gap_extension_recovers_follow_on_address`) that pins the gap-extension scenario with a mock provider whose pending set grows from `[A]` to `[A, B]` mid-replay; the test asserts both A and B land in `result.found` with `on_address_found` firing for each. Co-Authored-By: Claude Opus 4.6 --- .../rs-sdk/src/platform/address_sync/mod.rs | 319 ++++++++++++++---- 1 file changed, 258 insertions(+), 61 deletions(-) diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index 048c6e371fb..99552ce9681 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -850,12 +850,25 @@ async fn apply_block_changes<'a, P, I>( } } +/// Maximum number of refresh+replay rounds. The loop iterates so a +/// `pending_addresses()` set that grows via `on_address_found`-triggered +/// gap extension can be picked up in the same pass; the cap guards +/// against any pathological livelock. +const REPLAY_REFRESH_MAX_ITERATIONS: usize = 3; + /// End-of-pass recovery for addresses missing from the entry-time -/// snapshot. Re-polls `pending_addresses()` exactly once, builds a small -/// `extras` map of newly-derived addresses, and replays only the buffered -/// changes that match an `extras` entry. Foreign (other-wallet) addresses -/// fall out at the intersection check — no provider refresh storm, no -/// log flood. +/// snapshot. Re-polls `pending_addresses()`, builds a small `extras` map +/// of newly-derived addresses, and replays only the buffered changes +/// that match an `extras` entry. Foreign (other-wallet) addresses fall +/// out at the intersection check — no provider refresh storm, no log +/// flood. +/// +/// The refresh+replay is wrapped in a bounded loop so that +/// `on_address_found` callbacks fired during replay can trigger gap +/// extension on the provider and surface follow-on addresses +/// (e.g. address `A+1` that the provider only exposes after seeing `A` +/// was used). Iteration stops as soon as no new addresses are resolved +/// or the cap [`REPLAY_REFRESH_MAX_ITERATIONS`] is reached. async fn refresh_and_replay_unknown( key_to_tag: &HashMap, (P::Tag, P::Address)>, pending_unknown: Vec, @@ -870,76 +883,133 @@ async fn refresh_and_replay_unknown( let unknown_keys: std::collections::HashSet<&[u8]> = pending_unknown.iter().map(|p| p.key.as_slice()).collect(); - // Only addresses the provider can now produce AND that match a - // buffered miss are interesting — everything else is some other - // wallet's address and stays out of the lookup entirely. - let mut extras: HashMap, (P::Tag, P::Address)> = HashMap::new(); - for (tag, address) in provider.pending_addresses() { - let bytes = address.to_bytes(); - if unknown_keys.contains(bytes.as_slice()) && !key_to_tag.contains_key(&bytes) { - extras.insert(bytes, (tag, address)); + // Keys resolved across all iterations so we don't double-apply a + // delta if a follow-on iteration's `extras` still contains an + // already-replayed key. Owned bytes because the borrow checker won't + // let us keep `&[u8]` references into `pending_unknown` while we + // also borrow it for the inner loop. + let mut resolved_keys: std::collections::HashSet> = std::collections::HashSet::new(); + let mut total_replay_applied: usize = 0; + let mut hit_iteration_cap = false; + + for iteration in 0..REPLAY_REFRESH_MAX_ITERATIONS { + // Only addresses the provider can now produce AND that match a + // still-unresolved buffered miss are interesting — everything + // else is some other wallet's address and stays out of the + // lookup entirely. + let mut extras: HashMap, (P::Tag, P::Address)> = HashMap::new(); + for (tag, address) in provider.pending_addresses() { + let bytes = address.to_bytes(); + if unknown_keys.contains(bytes.as_slice()) + && !resolved_keys.contains(&bytes) + && !key_to_tag.contains_key(&bytes) + { + extras.insert(bytes, (tag, address)); + } } - } - if extras.is_empty() { - // Common case on a populated multi-wallet chain: every buffered - // unknown belongs to another wallet. - debug!( - "Address sync: {} platform-reported balance change(s) reference \ - address(es) not tracked by this wallet; ignoring", - pending_unknown.len() - ); - return; - } + if extras.is_empty() { + if iteration == 0 { + // Common case on a populated multi-wallet chain: every + // buffered unknown belongs to another wallet. + debug!( + "Address sync: {} platform-reported balance change(s) reference \ + address(es) not tracked by this wallet; ignoring", + pending_unknown.len() + ); + return; + } + // No new addresses surfaced this iteration — we're done. + break; + } - // Replay only the entries whose key actually resolves in `extras`. - // Order is preserved (compacted first, then recent — same as the - // forward pass), so `AddToCredits` deltas accumulate correctly. The - // catch-up cursor per change is preserved so the compacted height - // filter still sees the same `current_height` it would have seen on - // the forward pass. - let mut replay_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); - let mut still_unknown: usize = 0; - for pending in &pending_unknown { - let Some(&(tag, address)) = extras.get(pending.key.as_slice()) else { - still_unknown += 1; - continue; - }; - let result_key = (tag, address); - let current_balance = result - .found - .get(&result_key) - .map(|f| f.balance) - .unwrap_or(0); - let new_balance = pending - .change - .as_borrowed() - .new_balance(current_balance, pending.current_height); - - if new_balance != current_balance { - // TODO: same synthesized nonce=0 gap as the forward pass. - let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); - let funds = AddressFunds { - nonce, - balance: new_balance, + // Replay only the entries whose key actually resolves in + // `extras` and hasn't been resolved in a prior iteration. Order + // is preserved (compacted first, then recent — same as the + // forward pass), so `AddToCredits` deltas accumulate correctly. + // The catch-up cursor per change is preserved so the compacted + // height filter still sees the same `current_height` it would + // have seen on the forward pass. + let mut iteration_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); + for pending in &pending_unknown { + if resolved_keys.contains(&pending.key) { + continue; + } + let Some(&(tag, address)) = extras.get(pending.key.as_slice()) else { + continue; }; - result.absent.remove(&result_key); - result.found.insert(result_key, funds); - replay_applied.push((tag, address, funds)); + let result_key = (tag, address); + let current_balance = result + .found + .get(&result_key) + .map(|f| f.balance) + .unwrap_or(0); + let new_balance = pending + .change + .as_borrowed() + .new_balance(current_balance, pending.current_height); + + if new_balance != current_balance { + // TODO: same synthesized nonce=0 gap as the forward pass. + let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); + let funds = AddressFunds { + nonce, + balance: new_balance, + }; + result.absent.remove(&result_key); + result.found.insert(result_key, funds); + iteration_applied.push((tag, address, funds)); + } + } + + // Mark every key whose entry resolved in `extras` as resolved + // this pass — even if no balance moved — so the next iteration + // doesn't reconsider it. + for pending in &pending_unknown { + if extras.contains_key(pending.key.as_slice()) { + resolved_keys.insert(pending.key.clone()); + } + } + + let iteration_resolved = iteration_applied.len(); + total_replay_applied += iteration_resolved; + + // Fire callbacks for this iteration BEFORE the next refresh so + // that `on_address_found`-driven gap extension can expose the + // next batch of addresses to `pending_addresses()`. + for (tag, address, funds) in &iteration_applied { + provider.on_address_found(*tag, address, *funds).await; + } + + if iteration_resolved == 0 { + // `extras` was non-empty but every entry's delta was a + // no-op; nothing for gap extension to chew on. + break; + } + + if iteration + 1 == REPLAY_REFRESH_MAX_ITERATIONS { + hit_iteration_cap = true; } } - for (tag, address, funds) in &replay_applied { - provider.on_address_found(*tag, address, *funds).await; + if hit_iteration_cap { + debug!( + "Address sync: refresh+replay reached the {}-iteration cap; \ + any further gap-extension addresses will surface on the next sync", + REPLAY_REFRESH_MAX_ITERATIONS + ); } + let still_unknown = pending_unknown + .iter() + .filter(|p| !resolved_keys.contains(&p.key)) + .count(); if still_unknown > 0 { debug!( "Address sync: {} platform-reported balance change(s) reference \ address(es) not tracked by this wallet (refresh recovered {} \ other(s)); ignoring the untracked entries", - still_unknown, - replay_applied.len() + still_unknown, total_replay_applied ); } } @@ -2018,4 +2088,131 @@ mod tests { ); } } + + /// Two post-snapshot addresses A and A+1 where the provider only + /// exposes A initially and extends its gap to include A+1 from + /// inside `on_address_found(A, ...)`. The bounded-iteration replay + /// must pick up A+1 in a follow-on iteration instead of leaving + /// its buffered change silently dropped until the next sync. + #[tokio::test] + async fn refresh_loops_until_gap_extension_recovers_follow_on_address() { + use async_trait::async_trait; + + struct GapExtendingProvider { + a: PlatformAddress, + b: PlatformAddress, + // false until `on_address_found(a, ...)` mutates it — then + // `pending_addresses()` returns both A and B. + extended: bool, + found: Vec<(u32, PlatformAddress, AddressFunds)>, + } + + #[async_trait] + impl AddressProvider for GapExtendingProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + // First call returns just A; once `on_address_found(A, …)` + // has flipped `extended`, subsequent calls also yield B. + // The recovery of B is what proves the loop ran more + // than once. + let initial = std::iter::once((10u32, self.a)); + let extended = self + .extended + .then(|| std::iter::once((11u32, self.b))) + .into_iter() + .flatten(); + initial.chain(extended) + } + + async fn on_address_found( + &mut self, + tag: Self::Tag, + address: &Self::Address, + funds: AddressFunds, + ) { + self.found.push((tag, *address, funds)); + // The hook that simulates HD-wallet gap extension: as + // soon as A is observed, expose A+1 as the next pending + // address. + if *address == self.a { + self.extended = true; + } + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let a = p2pkh(0xAA); + let b = p2pkh(0xBB); + + // Both A and B are post-snapshot — entry-time lookup is empty. + let lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + + let mut provider = GapExtendingProvider { + a, + b, + extended: false, + found: Vec::new(), + }; + let mut result: AddressSyncResult = AddressSyncResult::new(); + + // Buffer changes for both A and B as if `apply_block_changes` + // had already seen them and stashed them for end-of-pass replay. + let op_a = BlockAwareCreditOperation::SetCredits(1_111); + let op_b = BlockAwareCreditOperation::SetCredits(2_222); + let pending_unknown = vec![ + PendingUnknownChange { + key: a.to_bytes(), + change: OwnedAddressBalanceChange::Compacted(op_a), + current_height: 0, + }, + PendingUnknownChange { + key: b.to_bytes(), + change: OwnedAddressBalanceChange::Compacted(op_b), + current_height: 0, + }, + ]; + + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + + // A surfaced on iteration 0, then `on_address_found(A,...)` + // flipped `extended = true`, so iteration 1 sees B and applies + // its balance too. + assert_eq!( + result.found.get(&(10u32, a)).map(|f| f.balance), + Some(1_111), + "A must be recovered by the first iteration" + ); + assert_eq!( + result.found.get(&(11u32, b)).map(|f| f.balance), + Some(2_222), + "B must be recovered by the bounded-iteration follow-up" + ); + assert!( + provider + .found + .iter() + .any(|(t, addr, f)| *t == 10 && *addr == a && f.balance == 1_111), + "on_address_found must fire for A" + ); + assert!( + provider + .found + .iter() + .any(|(t, addr, f)| *t == 11 && *addr == b && f.balance == 2_222), + "on_address_found must fire for B in the follow-on iteration" + ); + } } From 000eeba8f57a6523b2aa7dd27d0d350e1cfd5c9b Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Fri, 29 May 2026 12:35:14 +0200 Subject: [PATCH 8/8] refactor(sdk): collapse address-sync change types from 3 structs to 1 enum + alias Pure refactor of the Found-025 change-application seam in address_sync. The three named types carried no behavior the borrow split actually required two distinct named types for: - Rename AddressBalanceChange<'a> -> BalanceOp<'a> (the genuine 2-variant borrowed sum type; still Clone+Copy, still threads the hot apply path clone-free). - Move new_balance() off the enum into free fn apply_op(); the compacted height-filtered sum and recent flat add/set are preserved byte-for-byte. - Replace OwnedAddressBalanceChange + its into_owned()/as_borrowed() methods with a minimal OwnedBalanceOp enum (owned arity needed only to buffer a miss past its response-entry borrow), an inline clone at the cold buffer site, and a free borrow_op() for replay. - Replace struct PendingUnknownChange with `type PendingMiss = (Vec, OwnedBalanceOp, u64)` and positional access. Net: zero hand-written structs for this concept (2 enums + 1 alias + 2 free fns). Hot apply path keeps zero new clones; the only clone stays on the cold miss buffer path, exactly where into_owned() cloned before. All Found-025 + CMT-001 tests pass unchanged in logic (constructor syntax only). cargo check / clippy --all-features --tests clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../rs-sdk/src/platform/address_sync/mod.rs | 178 ++++++++---------- 1 file changed, 74 insertions(+), 104 deletions(-) diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index 99552ce9681..cdd1e9b1c5d 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -466,7 +466,7 @@ async fn incremental_catch_up( // unknown-address replay (rare, end-of-pass) materializes any extra // allocation. Buffered misses are bounded by the count of foreign / // post-snapshot addresses in the response. - let mut pending_unknown: Vec = Vec::new(); + let mut pending_unknown: Vec = Vec::new(); let mut current_height = start_height; let mut observed_tip_height = start_height; @@ -626,7 +626,7 @@ async fn incremental_catch_up( entry .changes .iter() - .map(|(a, op)| (a, AddressBalanceChange::Compacted(op))), + .map(|(a, op)| (a, BalanceOp::Compacted(op))), current_height, provider, result, @@ -668,7 +668,7 @@ async fn incremental_catch_up( entry .changes .iter() - .map(|(a, op)| (a, AddressBalanceChange::Recent(op))), + .map(|(a, op)| (a, BalanceOp::Recent(op))), current_height, provider, result, @@ -700,80 +700,59 @@ async fn incremental_catch_up( // ── Address-balance change application ──────────────────────────────── -/// A single address balance change, abstracting the recent (`CreditOperation`) -/// and compacted (`BlockAwareCreditOperation`) shapes so one pure function can -/// apply both phases identically. +/// A single borrowed address balance change, abstracting the recent +/// (`CreditOperation`) and compacted (`BlockAwareCreditOperation`) shapes so one +/// pure function can apply both phases identically. #[derive(Clone, Copy)] -pub(crate) enum AddressBalanceChange<'a> { +pub(crate) enum BalanceOp<'a> { /// A recent (per-block) credit operation. Recent(&'a CreditOperation), /// A compacted (block-range) credit operation. Compacted(&'a BlockAwareCreditOperation), } -impl AddressBalanceChange<'_> { - /// Resolve the post-change balance given the address's current balance and - /// the catch-up cursor height. Mirrors the two original inline loops - /// exactly (compacted height-filtered sum vs. recent flat add). - fn new_balance(&self, current_balance: Credits, current_height: u64) -> Credits { - match self { - AddressBalanceChange::Recent(op) => match op { - CreditOperation::SetCredits(credits) => *credits, - CreditOperation::AddToCredits(credits) => current_balance.saturating_add(*credits), - }, - AddressBalanceChange::Compacted(op) => match op { - BlockAwareCreditOperation::SetCredits(credits) => *credits, - BlockAwareCreditOperation::AddToCreditsOperations(operations) => { - let total_to_add: u64 = operations - .iter() - .filter(|(height, _)| **height >= current_height) - .map(|(_, credits)| *credits) - .fold(0u64, |acc, c| acc.saturating_add(c)); - current_balance.saturating_add(total_to_add) - } - }, - } - } - - /// Owned snapshot of the change for end-of-pass replay. Cheap for - /// `Recent` (the inner op is `Copy`); clones the operations vector for - /// `Compacted`. Only called for unknown addresses. - fn into_owned(self) -> OwnedAddressBalanceChange { - match self { - AddressBalanceChange::Recent(op) => OwnedAddressBalanceChange::Recent(*op), - AddressBalanceChange::Compacted(op) => OwnedAddressBalanceChange::Compacted(op.clone()), - } - } -} - -/// Owned counterpart of [`AddressBalanceChange`] so unknown-address changes -/// can outlive the per-block iterator and be replayed at end-of-pass. +/// Owned arity of [`BalanceOp`], used only to buffer a miss past the borrow of +/// its response entry so it can be replayed at end-of-pass. #[derive(Clone)] -pub(crate) enum OwnedAddressBalanceChange { +pub(crate) enum OwnedBalanceOp { Recent(CreditOperation), Compacted(BlockAwareCreditOperation), } -impl OwnedAddressBalanceChange { - fn as_borrowed(&self) -> AddressBalanceChange<'_> { - match self { - OwnedAddressBalanceChange::Recent(op) => AddressBalanceChange::Recent(op), - OwnedAddressBalanceChange::Compacted(op) => AddressBalanceChange::Compacted(op), - } +/// A buffered miss: raw GroveDB key, owned change, and the catch-up cursor +/// height at the original block (feeds the compacted height filter on replay; +/// ignored by `Recent`). +type PendingMiss = (Vec, OwnedBalanceOp, u64); + +/// Resolve the post-change balance from the current balance and the catch-up +/// cursor height. Compacted sums only operations at or after `current_height`; +/// recent applies a flat set/add. +fn apply_op(op: BalanceOp<'_>, current_balance: Credits, current_height: u64) -> Credits { + match op { + BalanceOp::Recent(op) => match op { + CreditOperation::SetCredits(credits) => *credits, + CreditOperation::AddToCredits(credits) => current_balance.saturating_add(*credits), + }, + BalanceOp::Compacted(op) => match op { + BlockAwareCreditOperation::SetCredits(credits) => *credits, + BlockAwareCreditOperation::AddToCreditsOperations(operations) => { + let total_to_add: u64 = operations + .iter() + .filter(|(height, _)| **height >= current_height) + .map(|(_, credits)| *credits) + .fold(0u64, |acc, c| acc.saturating_add(c)); + current_balance.saturating_add(total_to_add) + } + }, } } -/// A single change for an address that wasn't in the entry-time snapshot. -/// Buffered across the catch-up pass and replayed once at the end after a -/// single `pending_addresses()` refresh. -pub(crate) struct PendingUnknownChange { - /// Raw GroveDB key bytes — joined against the refreshed lookup. - key: Vec, - /// Owned change so the underlying response entries can be dropped. - change: OwnedAddressBalanceChange, - /// Catch-up cursor at the time of the original block — feeds the - /// compacted height filter on replay. Ignored by `Recent`. - current_height: u64, +/// Borrow an owned op back into [`BalanceOp`] for replay. +fn borrow_op(op: &OwnedBalanceOp) -> BalanceOp<'_> { + match op { + OwnedBalanceOp::Recent(op) => BalanceOp::Recent(op), + OwnedBalanceOp::Compacted(op) => BalanceOp::Compacted(op), + } } /// Apply one block's changes against the borrowed entry-time lookup, drive @@ -788,10 +767,10 @@ async fn apply_block_changes<'a, P, I>( current_height: u64, provider: &mut P, result: &mut AddressSyncResult, - pending_unknown: &mut Vec, + pending_unknown: &mut Vec, ) where P: AddressProvider, - I: IntoIterator)>, + I: IntoIterator)>, { let mut local_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); @@ -805,7 +784,7 @@ async fn apply_block_changes<'a, P, I>( .map(|f| f.balance) .unwrap_or(0); - let new_balance = change.new_balance(current_balance, current_height); + let new_balance = apply_op(change, current_balance, current_height); if new_balance != current_balance { // TODO: incremental RPCs carry only balance deltas, never @@ -823,11 +802,11 @@ async fn apply_block_changes<'a, P, I>( local_applied.push((tag, address, funds)); } } else { - pending_unknown.push(PendingUnknownChange { - key: addr_bytes, - change: change.into_owned(), - current_height, - }); + let owned = match change { + BalanceOp::Recent(op) => OwnedBalanceOp::Recent(*op), + BalanceOp::Compacted(op) => OwnedBalanceOp::Compacted(op.clone()), + }; + pending_unknown.push((addr_bytes, owned, current_height)); // NOTE: this buffer is intentionally unbounded — premature optimization here // would couple the catch-up loop to ad-hoc memory heuristics. We log a // one-shot warning above a generous threshold so a future operator can @@ -871,7 +850,7 @@ const REPLAY_REFRESH_MAX_ITERATIONS: usize = 3; /// or the cap [`REPLAY_REFRESH_MAX_ITERATIONS`] is reached. async fn refresh_and_replay_unknown( key_to_tag: &HashMap, (P::Tag, P::Address)>, - pending_unknown: Vec, + pending_unknown: Vec, provider: &mut P, result: &mut AddressSyncResult, ) { @@ -880,8 +859,10 @@ async fn refresh_and_replay_unknown( } // Build the set of unknown keys for a fast intersection probe. - let unknown_keys: std::collections::HashSet<&[u8]> = - pending_unknown.iter().map(|p| p.key.as_slice()).collect(); + let unknown_keys: std::collections::HashSet<&[u8]> = pending_unknown + .iter() + .map(|(key, _, _)| key.as_slice()) + .collect(); // Keys resolved across all iterations so we don't double-apply a // delta if a follow-on iteration's `extras` still contains an @@ -931,11 +912,11 @@ async fn refresh_and_replay_unknown( // height filter still sees the same `current_height` it would // have seen on the forward pass. let mut iteration_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); - for pending in &pending_unknown { - if resolved_keys.contains(&pending.key) { + for (key, change, height) in &pending_unknown { + if resolved_keys.contains(key) { continue; } - let Some(&(tag, address)) = extras.get(pending.key.as_slice()) else { + let Some(&(tag, address)) = extras.get(key.as_slice()) else { continue; }; let result_key = (tag, address); @@ -944,10 +925,7 @@ async fn refresh_and_replay_unknown( .get(&result_key) .map(|f| f.balance) .unwrap_or(0); - let new_balance = pending - .change - .as_borrowed() - .new_balance(current_balance, pending.current_height); + let new_balance = apply_op(borrow_op(change), current_balance, *height); if new_balance != current_balance { // TODO: same synthesized nonce=0 gap as the forward pass. @@ -965,9 +943,9 @@ async fn refresh_and_replay_unknown( // Mark every key whose entry resolved in `extras` as resolved // this pass — even if no balance moved — so the next iteration // doesn't reconsider it. - for pending in &pending_unknown { - if extras.contains_key(pending.key.as_slice()) { - resolved_keys.insert(pending.key.clone()); + for (key, _, _) in &pending_unknown { + if extras.contains_key(key.as_slice()) { + resolved_keys.insert(key.clone()); } } @@ -1002,7 +980,7 @@ async fn refresh_and_replay_unknown( let still_unknown = pending_unknown .iter() - .filter(|p| !resolved_keys.contains(&p.key)) + .filter(|(key, _, _)| !resolved_keys.contains(key)) .count(); if still_unknown > 0 { debug!( @@ -1734,10 +1712,10 @@ mod tests { found: Vec::new(), }; let mut result: AddressSyncResult = AddressSyncResult::new(); - let mut pending_unknown: Vec = Vec::new(); + let mut pending_unknown: Vec = Vec::new(); let op = BlockAwareCreditOperation::SetCredits(42_000); - let changes = [(&late, AddressBalanceChange::Compacted(&op))]; + let changes = [(&late, BalanceOp::Compacted(&op))]; apply_block_changes( &lookup, @@ -1822,9 +1800,9 @@ mod tests { result.absent.insert((tag, addr)); let op = BlockAwareCreditOperation::SetCredits(7_777); - let changes = [(&addr, AddressBalanceChange::Compacted(&op))]; + let changes = [(&addr, BalanceOp::Compacted(&op))]; - let mut pending_unknown: Vec = Vec::new(); + let mut pending_unknown: Vec = Vec::new(); apply_block_changes( &lookup, changes.iter().map(|(a, c)| (*a, *c)), @@ -1914,11 +1892,11 @@ mod tests { ); let late_op = BlockAwareCreditOperation::SetCredits(7_000); let changes = [ - (&known, AddressBalanceChange::Compacted(&known_op)), - (&late, AddressBalanceChange::Compacted(&late_op)), + (&known, BalanceOp::Compacted(&known_op)), + (&late, BalanceOp::Compacted(&late_op)), ]; - let mut pending_unknown: Vec = Vec::new(); + let mut pending_unknown: Vec = Vec::new(); apply_block_changes( &lookup, changes.iter().map(|(a, c)| (*a, *c)), @@ -2009,7 +1987,7 @@ mod tests { pending_polls: std::sync::atomic::AtomicUsize::new(0), found_calls: 0, }; - let mut pending_unknown: Vec = Vec::new(); + let mut pending_unknown: Vec = Vec::new(); // Three separate "blocks" (representing the per-entry calls // inside `incremental_catch_up`), every change but the first @@ -2021,7 +1999,7 @@ mod tests { (&foreign_3, 5_000), ] { let op = BlockAwareCreditOperation::SetCredits(credits); - let changes = [(addr, AddressBalanceChange::Compacted(&op))]; + let changes = [(addr, BalanceOp::Compacted(&op))]; apply_block_changes( &lookup, changes.iter().map(|(a, c)| (*a, *c)), @@ -2172,17 +2150,9 @@ mod tests { // had already seen them and stashed them for end-of-pass replay. let op_a = BlockAwareCreditOperation::SetCredits(1_111); let op_b = BlockAwareCreditOperation::SetCredits(2_222); - let pending_unknown = vec![ - PendingUnknownChange { - key: a.to_bytes(), - change: OwnedAddressBalanceChange::Compacted(op_a), - current_height: 0, - }, - PendingUnknownChange { - key: b.to_bytes(), - change: OwnedAddressBalanceChange::Compacted(op_b), - current_height: 0, - }, + let pending_unknown: Vec = vec![ + (a.to_bytes(), OwnedBalanceOp::Compacted(op_a), 0), + (b.to_bytes(), OwnedBalanceOp::Compacted(op_b), 0), ]; refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await;