refactor: extract capability lookup into PeerPool helpers#509
refactor: extract capability lookup into PeerPool helpers#509xdustinface wants to merge 1 commit intov0.42-devfrom
PeerPool helpers#509Conversation
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughReplaces per-peer scans with PeerPool service queries for capability-aware peer selection; adds Changes
Sequence Diagram(s)sequenceDiagram
participant Manager
participant PeerPool
participant Peer
participant Network
Manager->>PeerPool: peer_with_service(flags) / peers_with_service(flags)
PeerPool->>Peer: read-lock & check has_service(flags)
Peer-->>PeerPool: matching peer(s) (addr, Arc<RwLock<Peer>>)
PeerPool-->>Manager: return peer(s)
Manager->>Peer: select peer / update current_sync_peer / send message
Peer->>Network: transmit message
Network-->>Manager: ack or error
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## v0.42-dev #509 +/- ##
=============================================
- Coverage 66.90% 66.85% -0.06%
=============================================
Files 313 313
Lines 64757 64890 +133
=============================================
+ Hits 43325 43379 +54
- Misses 21432 21511 +79
*This pull request uses carry forward flags. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dash-spv/src/network/manager.rs (1)
994-1012:⚠️ Potential issue | 🟠 MajorSplit
GetHeadersandGetHeaders2selection rules.These branches currently share the same fallback logic, but
send_message_to_peer()only has a safe downgrade path forGetHeaders. An explicitGetHeaders2can still be routed to a peer inheaders2_disabledor, after the fallback, to a peer outside the headers2-capable set.GetHeaders2should require an eligible peer and return a protocol error otherwise;current_sync_peershould also be filtered throughheaders2_disabledbefore reuse.Also applies to: 1078-1089
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/network/manager.rs` around lines 994 - 1012, The GetHeaders2 selection must only pick peers that support headers2 and must not fall back to non-headers2 peers; update the branch that runs when check_headers2 is true to (1) filter the reused current_sync_peer through the headers2_disabled set and ensure peer.read().await.supports_headers2() before reusing it, (2) when no such peer is found do not call peer_with_service or pick peers[0] but instead return a protocol error (similar to other error paths) so send_message_to_peer() will never be given a GetHeaders2 to a headers2-disabled peer, and (3) make the identical change in the second occurrence around the 1078-1089 region; use the symbols current_sync_peer, headers2_disabled, supports_headers2(), peer_with_service(ServiceFlags::NODE_HEADERS_COMPRESSED) and send_message_to_peer() to locate and modify the logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@dash-spv/src/network/manager.rs`:
- Around line 983-987: The snapshot "peers" can be stale between calling
self.pool.peer_with_service(flags) and later peers.find(...), causing "Selected
peer not found"; after obtaining the address from
self.pool.peer_with_service(flags) (the selected_peer variable) re-resolve the
peer from the current pool instead of using the old peers snapshot — e.g.,
replace the peers.find(...) call with a fresh lookup on the pool (call the pool
method that returns a peer by address, e.g.,
self.pool.get_peer_by_address(address) or self.pool.peer(&address).await) and
use that result, keeping the selection branch that logs "Selected peer ..." but
using the freshly-resolved peer object for further work.
In `@dash-spv/src/network/pool.rs`:
- Around line 153-177: Add unit tests for the new service-selection helpers
peer_with_service() and peers_with_service(): under #[cfg(test)] add async
#[tokio::test] cases that (1) verify no-match returns None/empty, (2) verify
first-match returns the first matching SocketAddr, and (3) verify multiple
matches are all returned by peers_with_service(). Create a Pool, populate its
peers map via pool.peers.write().await with Arc<RwLock<Peer>> entries that
advertise specific ServiceFlags, then call peer_with_service(flags) and
peers_with_service(flags) and assert expected results; place tests next to the
implementation in the same file. Ensure you exercise both functions and cover
no-match, single-match, and multi-match scenarios.
---
Outside diff comments:
In `@dash-spv/src/network/manager.rs`:
- Around line 994-1012: The GetHeaders2 selection must only pick peers that
support headers2 and must not fall back to non-headers2 peers; update the branch
that runs when check_headers2 is true to (1) filter the reused current_sync_peer
through the headers2_disabled set and ensure
peer.read().await.supports_headers2() before reusing it, (2) when no such peer
is found do not call peer_with_service or pick peers[0] but instead return a
protocol error (similar to other error paths) so send_message_to_peer() will
never be given a GetHeaders2 to a headers2-disabled peer, and (3) make the
identical change in the second occurrence around the 1078-1089 region; use the
symbols current_sync_peer, headers2_disabled, supports_headers2(),
peer_with_service(ServiceFlags::NODE_HEADERS_COMPRESSED) and
send_message_to_peer() to locate and modify the logic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 76aae646-b9b2-466a-967e-43c2ba68b7a8
📒 Files selected for processing (2)
dash-spv/src/network/manager.rsdash-spv/src/network/pool.rs
a43d564 to
bbaf01d
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
dash-spv/src/network/manager.rs (1)
983-987:⚠️ Potential issue | 🟡 MinorResolve the selected peer from the live pool right before send.
peer_with_service()runs after the snapshot at Line 968, so it can pick a peer that is missing frompeers. That still makes the laterfind(...)fail withSelected peer not foundeven though a valid peer exists.Suggested fix
- let (addr, peer) = peers - .iter() - .find(|(a, _)| *a == selected_peer) - .ok_or_else(|| NetworkError::ConnectionFailed("Selected peer not found".to_string()))?; - - self.send_message_to_peer(addr, peer, message).await + let peer = self.pool.get_peer(&selected_peer).await.ok_or_else(|| { + NetworkError::ConnectionFailed( + "Selected peer disconnected before send".to_string(), + ) + })?; + + self.send_message_to_peer(&selected_peer, &peer, message).awaitAlso applies to: 1009-1013, 1053-1059
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/network/manager.rs` around lines 983 - 987, The selected peer can be stale because peer_with_service() was called before taking the snapshot used by peers.find(...); to fix, re-resolve the peer from the live pool immediately before using it: call self.pool.peer_with_service(flags).await (or the equivalent live lookup) right before the peers.find(...) / send path and use that fresh address to locate the Peer object so the later find(...) will succeed; apply this change for the other similar blocks that use peer_with_service() earlier (the blocks referenced around peer selection at the other occurrences).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@dash-spv/src/network/pool.rs`:
- Around line 153-180: The pool read lock is held while awaiting per-peer
RwLocks in peer_with_service and peers_with_service; to fix, first take a
short-lived snapshot of the peer addresses and Arcs (e.g. collect
Vec<(SocketAddr, Arc<RwLock<Peer>>)>) while holding self.peers.read().await,
then drop that read guard and only afterwards iterate the snapshot and await
peer.read().await to call has_service; update both peer_with_service and
peers_with_service to follow this pattern (use Arc::clone or peer.clone() when
building the snapshot so the Arcs outlive the released pool lock).
---
Duplicate comments:
In `@dash-spv/src/network/manager.rs`:
- Around line 983-987: The selected peer can be stale because
peer_with_service() was called before taking the snapshot used by
peers.find(...); to fix, re-resolve the peer from the live pool immediately
before using it: call self.pool.peer_with_service(flags).await (or the
equivalent live lookup) right before the peers.find(...) / send path and use
that fresh address to locate the Peer object so the later find(...) will
succeed; apply this change for the other similar blocks that use
peer_with_service() earlier (the blocks referenced around peer selection at the
other occurrences).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bb2447ee-7c31-44cd-a8f6-1e7b2339e449
📒 Files selected for processing (4)
dash-spv/src/network/manager.rsdash-spv/src/network/peer.rsdash-spv/src/network/pool.rsdash-spv/src/test_utils/network.rs
|
This PR has merge conflicts with the base branch. Please rebase or merge the base branch into your branch to resolve them. |
bbaf01d to
36be3dd
Compare
| #[cfg(test)] | ||
| impl PeerPool { | ||
| async fn insert_peer_with_services(&self, addr: SocketAddr, flags: ServiceFlags) { | ||
| let mut peer = Peer::dummy(addr); |
There was a problem hiding this comment.
move this into the tests module
There was a problem hiding this comment.
Why? Its test only already and moving it into the actual tests module just adds another layer of indentation. I think it's cleaner to have a separation here with having a test impl block.
| NetworkMessage::GetCFHeaders(_) | NetworkMessage::GetCFilters(_) => { | ||
| Some(ServiceFlags::COMPACT_FILTERS) | ||
| } | ||
| _ => None, |
There was a problem hiding this comment.
Since you are touching this logic, couldn't we improve it by returning ServiceFlags::None, that way we remove the unnecessary Optional and if let branch. Feel free to do it if you want, if not I will take a look into ti
There was a problem hiding this comment.
I have some other PRs coming in this area.
|
|
||
| #[tokio::test] | ||
| async fn test_peer_with_service() { | ||
| let pool = PeerPool::new(); |
There was a problem hiding this comment.
remove one test_peer_with_service function and merge missing cases if they exist into one
dash-spv/src/network/pool.rs
Outdated
|
|
||
| #[tokio::test] | ||
| async fn test_service_lookup_with_combined_flags() { | ||
| let pool = PeerPool::new(); |
There was a problem hiding this comment.
you could also move this logic into the test_peer_with_service function.
I would like to see one function testing all this paths, is as easy as, after testing and empty pool, in one block you add all the dummy peers we need, in other block we create the flags we want to request, and then check we get the right peers. We would end with less code and same test quality.
Btw, you are missing one case that I consider relevant, ServiceFlags::None should return the entire pool
Addresses ZocoLini review comment on PR #509 File: dash-spv/src/network/pool.rs
36be3dd to
22e4161
Compare
xdustinface
left a comment
There was a problem hiding this comment.
@ZocoLini Agree with merging the tests. See the other comments.
| #[cfg(test)] | ||
| impl PeerPool { | ||
| async fn insert_peer_with_services(&self, addr: SocketAddr, flags: ServiceFlags) { | ||
| let mut peer = Peer::dummy(addr); |
There was a problem hiding this comment.
Why? Its test only already and moving it into the actual tests module just adds another layer of indentation. I think it's cleaner to have a separation here with having a test impl block.
| NetworkMessage::GetCFHeaders(_) | NetworkMessage::GetCFilters(_) => { | ||
| Some(ServiceFlags::COMPACT_FILTERS) | ||
| } | ||
| _ => None, |
There was a problem hiding this comment.
I have some other PRs coming in this area.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dash-spv/src/network/manager.rs (1)
1081-1092:⚠️ Potential issue | 🟠 MajorAvoid fallback-to-all for explicit
GetHeaders2in distributed mode.In this branch,
GetHeadersandGetHeaders2share the same logic and fall back to all peers when no headers2-capable peers are found. For explicitGetHeaders2, this can dispatch to peers withoutNODE_HEADERS_COMPRESSED.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/network/manager.rs` around lines 1081 - 1092, Separate the handling for NetworkMessage::GetHeaders(_) and NetworkMessage::GetHeaders2(_) so that GetHeaders2 does not fall back to the full peers list when no headers2-capable peers exist; specifically, after building headers2_peers (and filtering with headers2_disabled), if the message is GetHeaders2 and headers2_peers.is_empty() return (headers2_peers, false) (i.e., keep the empty list) instead of returning (peers.clone(), false), while preserving the existing fallback-to-all behavior only for GetHeaders.
♻️ Duplicate comments (2)
dash-spv/src/network/manager.rs (1)
1052-1056:⚠️ Potential issue | 🟡 MinorRe-resolve selected peer from current pool before send.
At Line 1052,
selected_peercan be chosen from a fresher pool view (peer_with_service) than the earlierpeerssnapshot, so.find(...)can fail with"Selected peer not found"even when a valid peer exists. This was previously raised and still applies.💡 Suggested fix
- let (addr, peer) = peers - .iter() - .find(|(a, _)| *a == selected_peer) - .ok_or_else(|| NetworkError::ConnectionFailed("Selected peer not found".to_string()))?; - - self.send_message_to_peer(addr, peer, message).await + let peer = self + .pool + .get_peer(&selected_peer) + .await + .ok_or_else(|| { + NetworkError::ConnectionFailed( + "Selected peer disconnected before send".to_string(), + ) + })?; + + self.send_message_to_peer(&selected_peer, &peer, message).await🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/network/manager.rs` around lines 1052 - 1056, selected_peer may have been chosen from a newer view (peer_with_service) than the earlier peers snapshot, so the .find on peers can fail; update the send path to re-resolve the selected_peer against the current pool before returning NetworkError::ConnectionFailed. Concretely: after selecting selected_peer (symbol selected_peer) attempt to look it up in the freshest peers collection (symbol peers or obtain a fresh peers snapshot) and if not found fall back to resolving from the peer_with_service selection or refresh the peers list and retry; only return NetworkError::ConnectionFailed when the peer truly cannot be resolved from the current pool.dash-spv/src/network/pool.rs (1)
149-176:⚠️ Potential issue | 🟠 MajorRelease pool read lock before awaiting per-peer locks.
These helpers hold
self.peers.read().awaitwhile awaitingpeer.read().await. A slow/busy peer can pin the pool read lock and delay add/remove operations.💡 Suggested refactor
pub(crate) async fn peer_with_service( &self, flags: ServiceFlags, ) -> Option<(SocketAddr, Arc<RwLock<Peer>>)> { - let peers = self.peers.read().await; - for (addr, peer) in peers.iter() { - if peer.read().await.has_service(flags) { - return Some((*addr, Arc::clone(peer))); - } - } + let peers: Vec<_> = self + .peers + .read() + .await + .iter() + .map(|(addr, peer)| (*addr, Arc::clone(peer))) + .collect(); + for (addr, peer) in peers { + if peer.read().await.has_service(flags) { + return Some((addr, peer)); + } + } None }#!/bin/bash # Verify lock scope includes await on peer locks in service helpers. rg -n -C4 'peer_with_service|peers_with_service|let peers = self\.peers\.read\(\)\.await|peer\.read\(\)\.await' dash-spv/src/network/pool.rs🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@dash-spv/src/network/pool.rs` around lines 149 - 176, The helpers peer_with_service and peers_with_service currently hold self.peers.read().await while awaiting peer.read().await which can block pool modifications; fix by narrowing the read lock scope: inside each function take the read guard only long enough to clone the needed addresses and Arc<RwLock<Peer>> references into a local Vec, drop the guard, then iterate that local Vec and await peer.read().await to check has_service(flags); for peer_with_service stop on first match and return its cloned SocketAddr and Arc, for peers_with_service collect all matches—this ensures the pool read lock is released before any per-peer awaits.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@dash-spv/src/network/manager.rs`:
- Around line 973-978: The match that sets required_service must also handle
NetworkMessage::GetHeaders2(_) and require the NODE_HEADERS_COMPRESSED
capability; update the match on message (where required_service is assigned) to
include a branch mapping NetworkMessage::GetHeaders2(_) =>
Some(ServiceFlags::NODE_HEADERS_COMPRESSED) so single-peer selection will only
send GetHeaders2 to peers that advertise that capability.
In `@dash-spv/src/network/pool.rs`:
- Around line 273-275: The test currently assumes
peer_with_service(compact_filters) returns addr2 but iteration over the internal
HashMap is unordered; instead update the assertion to validate that the returned
found_addr corresponds to a peer that has the COMPACT_FILTERS bit set (and
optionally does not violate other expected service constraints) rather than
checking equality to addr2; locate the call to peer_with_service and replace the
assert_eq!(found_addr, addr2) with an assertion that looks up the peer by
found_addr in the pool (or inspects the returned tuple) and confirms
(peer.services & COMPACT_FILTERS) != 0 (or similarly checks service flags
against COMPACT_FILTERS and NODE_HEADERS_COMPRESSED as appropriate) so the test
no longer depends on HashMap iteration order.
---
Outside diff comments:
In `@dash-spv/src/network/manager.rs`:
- Around line 1081-1092: Separate the handling for NetworkMessage::GetHeaders(_)
and NetworkMessage::GetHeaders2(_) so that GetHeaders2 does not fall back to the
full peers list when no headers2-capable peers exist; specifically, after
building headers2_peers (and filtering with headers2_disabled), if the message
is GetHeaders2 and headers2_peers.is_empty() return (headers2_peers, false)
(i.e., keep the empty list) instead of returning (peers.clone(), false), while
preserving the existing fallback-to-all behavior only for GetHeaders.
---
Duplicate comments:
In `@dash-spv/src/network/manager.rs`:
- Around line 1052-1056: selected_peer may have been chosen from a newer view
(peer_with_service) than the earlier peers snapshot, so the .find on peers can
fail; update the send path to re-resolve the selected_peer against the current
pool before returning NetworkError::ConnectionFailed. Concretely: after
selecting selected_peer (symbol selected_peer) attempt to look it up in the
freshest peers collection (symbol peers or obtain a fresh peers snapshot) and if
not found fall back to resolving from the peer_with_service selection or refresh
the peers list and retry; only return NetworkError::ConnectionFailed when the
peer truly cannot be resolved from the current pool.
In `@dash-spv/src/network/pool.rs`:
- Around line 149-176: The helpers peer_with_service and peers_with_service
currently hold self.peers.read().await while awaiting peer.read().await which
can block pool modifications; fix by narrowing the read lock scope: inside each
function take the read guard only long enough to clone the needed addresses and
Arc<RwLock<Peer>> references into a local Vec, drop the guard, then iterate that
local Vec and await peer.read().await to check has_service(flags); for
peer_with_service stop on first match and return its cloned SocketAddr and Arc,
for peers_with_service collect all matches—this ensures the pool read lock is
released before any per-peer awaits.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 198ae04c-07c3-4480-b4ca-9401f44ccd2a
📒 Files selected for processing (4)
dash-spv/src/network/manager.rsdash-spv/src/network/peer.rsdash-spv/src/network/pool.rsdash-spv/src/test_utils/network.rs
- Add `peer_with_service()` and `peers_with_service()` on `PeerPool` to replace repeated "iterate peers, check service flag" loops in message routing. - Add unit tests - Generalize the match/log/error pattern for required-service peer selection in `send_to_single_peer` so new service requirements only need a single match arm.
22e4161 to
e5f3bc6
Compare
peer_with_service()andpeers_with_service()onPeerPoolto replace repeated "iterate peers, check service flag" loops in message routing.send_to_single_peerso new service requirements only need a single match arm.Summary by CodeRabbit
Improvements
Tests