Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 180 additions & 66 deletions packages/rs-sdk/src/platform/address_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,11 +836,11 @@ async fn apply_block_changes<'a, P, I>(
const REPLAY_REFRESH_MAX_ITERATIONS: usize = 3;

/// End-of-pass recovery for addresses missing from the entry-time
/// snapshot. Re-polls `pending_addresses()`, builds a small `extras` map
/// of newly-derived addresses, and replays only the buffered changes
/// that match an `extras` entry. Foreign (other-wallet) addresses fall
/// out at the intersection check — no provider refresh storm, no log
/// flood.
/// snapshot. Re-polls `pending_addresses()` until a pass makes no
/// progress, each time building a small `extras` map of newly-derived
/// addresses and replaying only the buffered changes that now resolve.
/// Foreign (other-wallet) addresses fall out cheaply at the intersection
/// check — no provider refresh storm, no log flood.
///
/// The refresh+replay is wrapped in a bounded loop so that
/// `on_address_found` callbacks fired during replay can trigger gap
Expand All @@ -858,65 +858,52 @@ async fn refresh_and_replay_unknown<P: AddressProvider>(
return;
}

// Build the set of unknown keys for a fast intersection probe.
let unknown_keys: std::collections::HashSet<&[u8]> = pending_unknown
.iter()
.map(|(key, _, _)| key.as_slice())
.collect();

// Keys resolved across all iterations so we don't double-apply a
// delta if a follow-on iteration's `extras` still contains an
// already-replayed key. Owned bytes because the borrow checker won't
// let us keep `&[u8]` references into `pending_unknown` while we
// also borrow it for the inner loop.
let mut resolved_keys: std::collections::HashSet<Vec<u8>> = std::collections::HashSet::new();
let mut total_replay_applied: usize = 0;
let initial_pending_count = pending_unknown.len();
let mut remaining = pending_unknown;
let mut recovered = 0usize;
let mut hit_iteration_cap = false;

for iteration in 0..REPLAY_REFRESH_MAX_ITERATIONS {
// Build the set of still-unknown keys for a fast intersection
// probe.
let unknown_keys: std::collections::HashSet<&[u8]> =
remaining.iter().map(|(key, _, _)| key.as_slice()).collect();

// Only addresses the provider can now produce AND that match a
// still-unresolved buffered miss are interesting — everything
// else is some other wallet's address and stays out of the
// lookup entirely.
// buffered miss are interesting — everything else is some other
// wallet's address and stays out of the lookup entirely.
let mut extras: HashMap<Vec<u8>, (P::Tag, P::Address)> = HashMap::new();
for (tag, address) in provider.pending_addresses() {
let bytes = address.to_bytes();
if unknown_keys.contains(bytes.as_slice())
&& !resolved_keys.contains(&bytes)
&& !key_to_tag.contains_key(&bytes)
{
if unknown_keys.contains(bytes.as_slice()) && !key_to_tag.contains_key(&bytes) {
extras.insert(bytes, (tag, address));
}
}

if extras.is_empty() {
if iteration == 0 {
// Common case on a populated multi-wallet chain: every
// buffered unknown belongs to another wallet.
debug!(
"Address sync: {} platform-reported balance change(s) reference \
address(es) not tracked by this wallet; ignoring",
pending_unknown.len()
initial_pending_count
);
return;
}
// No new addresses surfaced this iteration — we're done.
break;
}

// Replay only the entries whose key actually resolves in
// `extras` and hasn't been resolved in a prior iteration. Order
// is preserved (compacted first, then recent — same as the
// forward pass), so `AddToCredits` deltas accumulate correctly.
// The catch-up cursor per change is preserved so the compacted
// height filter still sees the same `current_height` it would
// have seen on the forward pass.
let mut iteration_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new();
for (key, change, height) in &pending_unknown {
if resolved_keys.contains(key) {
continue;
}
// `extras`. Order is preserved (compacted first, then recent —
// same as the forward pass), so `AddToCredits` deltas accumulate
// correctly. The catch-up cursor per change is preserved so the
// compacted height filter still sees the same `current_height`
// it would have seen on the forward pass.
let pass_len = remaining.len();
let mut replay_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new();
let mut still_pending: Vec<PendingMiss> = Vec::with_capacity(pass_len);
for (key, change, height) in remaining.into_iter() {
let Some(&(tag, address)) = extras.get(key.as_slice()) else {
still_pending.push((key, change, height));
continue;
};
let result_key = (tag, address);
Expand All @@ -925,7 +912,7 @@ async fn refresh_and_replay_unknown<P: AddressProvider>(
.get(&result_key)
.map(|f| f.balance)
.unwrap_or(0);
let new_balance = apply_op(borrow_op(change), current_balance, *height);
let new_balance = apply_op(borrow_op(&change), current_balance, height);

if new_balance != current_balance {
// TODO: same synthesized nonce=0 gap as the forward pass.
Expand All @@ -936,35 +923,28 @@ async fn refresh_and_replay_unknown<P: AddressProvider>(
};
result.absent.remove(&result_key);
result.found.insert(result_key, funds);
iteration_applied.push((tag, address, funds));
replay_applied.push((tag, address, funds));
}
}

// Mark every key whose entry resolved in `extras` as resolved
// this pass — even if no balance moved — so the next iteration
// doesn't reconsider it.
for (key, _, _) in &pending_unknown {
if extras.contains_key(key.as_slice()) {
resolved_keys.insert(key.clone());
}
}

let iteration_resolved = iteration_applied.len();
total_replay_applied += iteration_resolved;
let resolved_this_pass = pass_len - still_pending.len();
recovered += resolved_this_pass;

// Fire callbacks for this iteration BEFORE the next refresh so
// that `on_address_found`-driven gap extension can expose the
// next batch of addresses to `pending_addresses()`.
for (tag, address, funds) in &iteration_applied {
for (tag, address, funds) in &replay_applied {
provider.on_address_found(*tag, address, *funds).await;
}

if iteration_resolved == 0 {
// `extras` was non-empty but every entry's delta was a
// no-op; nothing for gap extension to chew on.
if resolved_this_pass == 0 {
remaining = still_pending;
break;
}

if still_pending.is_empty() {
return;
}

remaining = still_pending;

if iteration + 1 == REPLAY_REFRESH_MAX_ITERATIONS {
hit_iteration_cap = true;
}
Expand All @@ -978,16 +958,13 @@ async fn refresh_and_replay_unknown<P: AddressProvider>(
);
}

let still_unknown = pending_unknown
.iter()
.filter(|(key, _, _)| !resolved_keys.contains(key))
.count();
if still_unknown > 0 {
if !remaining.is_empty() {
debug!(
"Address sync: {} platform-reported balance change(s) reference \
address(es) not tracked by this wallet (refresh recovered {} \
other(s)); ignoring the untracked entries",
still_unknown, total_replay_applied
remaining.len(),
recovered
);
}
}
Expand Down Expand Up @@ -1928,6 +1905,143 @@ mod tests {
);
}

/// If replaying one recovered post-snapshot address extends the
/// provider gap window to expose the next pending address, the same
/// end-of-pass refresh must keep iterating until both buffered
/// changes are applied in order.
#[tokio::test]
async fn refresh_replays_chained_post_snapshot_addresses() {
use async_trait::async_trait;

struct ChainedProvider {
first: PlatformAddress,
second: PlatformAddress,
visible: usize,
pending_polls: std::cell::Cell<usize>,
found: Vec<(u32, PlatformAddress, AddressFunds)>,
}

#[async_trait]
impl AddressProvider for ChainedProvider {
type Tag = u32;
type Address = PlatformAddress;

fn gap_limit(&self) -> AddressIndex {
0
}

fn pending_addresses(&self) -> impl Iterator<Item = (Self::Tag, Self::Address)> + '_ {
self.pending_polls.set(
self.pending_polls
.get()
.checked_add(1)
.expect("test poll counter should not overflow"),
);
let mut pending = Vec::with_capacity(self.visible);
if self.visible >= 1 {
pending.push((11u32, self.first));
}
if self.visible >= 2 {
pending.push((12u32, self.second));
}
pending.into_iter()
}

async fn on_address_found(
&mut self,
tag: Self::Tag,
address: &Self::Address,
funds: AddressFunds,
) {
self.found.push((tag, *address, funds));
if *address == self.first && self.visible == 1 {
self.visible = 2;
}
}

async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {}

fn current_balances(
&self,
) -> impl Iterator<Item = (Self::Tag, Self::Address, AddressFunds)> + '_ {
std::iter::empty()
}
}

let first = p2pkh(0x31);
let second = p2pkh(0x32);
let lookup: HashMap<Vec<u8>, (u32, PlatformAddress)> = HashMap::new();

let mut provider = ChainedProvider {
first,
second,
visible: 1,
pending_polls: std::cell::Cell::new(0),
found: Vec::new(),
};
let mut result: AddressSyncResult<u32, PlatformAddress> = AddressSyncResult::new();
let mut pending_unknown: Vec<PendingMiss> = Vec::new();

let first_op = BlockAwareCreditOperation::SetCredits(10_000);
let second_op = BlockAwareCreditOperation::SetCredits(20_000);
let changes = [
(&first, BalanceOp::Compacted(&first_op)),
(&second, BalanceOp::Compacted(&second_op)),
];

apply_block_changes(
&lookup,
changes.iter().map(|(a, c)| (*a, *c)),
0,
&mut provider,
&mut result,
&mut pending_unknown,
)
.await;

assert_eq!(pending_unknown.len(), 2, "both misses must be buffered");

refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await;

assert_eq!(
result.found.get(&(11u32, first)).map(|f| f.balance),
Some(10_000),
"first recovered address must apply in the first replay pass"
);
assert_eq!(
result.found.get(&(12u32, second)).map(|f| f.balance),
Some(20_000),
"second chained address must apply in the same end-of-pass refresh"
);
assert_eq!(
provider.found,
vec![
(
11u32,
first,
AddressFunds {
nonce: 0,
balance: 10_000,
},
),
(
12u32,
second,
AddressFunds {
nonce: 0,
balance: 20_000,
},
),
],
"replay callbacks must preserve buffered change order across chained discovery"
);
assert_eq!(
provider.pending_polls.get(),
2,
"refresh should re-poll once after the first recovered address exposes the next"
);
}

/// A foreign address (not in the lookup, never produced by the
/// provider) is silently ignored — no `on_address_found`, no
/// `result.found` insert, no `result.absent` mutation, and exactly
Expand Down
Loading