diff --git a/Cargo.lock b/Cargo.lock index bb5b3399..404b5726 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2286,8 +2286,7 @@ dependencies = [ [[package]] name = "evmlib" version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1338c23c9ce1b4e54ff5cc65e53ce859095f121bfc742e474e1e1f2e03748000" +source = "git+https://github.com/WithAutonomi/evmlib?rev=a3be57fcb3bd4982bc93ad0b58116255d509db28#a3be57fcb3bd4982bc93ad0b58116255d509db28" dependencies = [ "alloy", "ant-merkle", diff --git a/Cargo.toml b/Cargo.toml index ae82e9a8..460ba157 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ saorsa-core = "0.22.0" saorsa-pqc = "0.5" # Payment verification - autonomi network lookup + EVM payment -evmlib = "0.8" +evmlib = { git = "https://github.com/WithAutonomi/evmlib", rev = "a3be57fcb3bd4982bc93ad0b58116255d509db28" } xor_name = "5" # Caching - LRU cache for verified XorNames diff --git a/src/ant_protocol/chunk.rs b/src/ant_protocol/chunk.rs index d8c0840a..046e459d 100644 --- a/src/ant_protocol/chunk.rs +++ b/src/ant_protocol/chunk.rs @@ -234,8 +234,11 @@ pub enum ChunkQuoteResponse { /// When `already_stored` is `true` the node already holds this chunk and no /// payment is required — the client should skip the pay-then-PUT cycle for /// this address. The quote is still included for informational purposes. + /// + /// The close group view is embedded inside the serialized `PaymentQuote` + /// and covered by the quote's ML-DSA-65 signature, so it cannot be forged. Success { - /// Serialized `PaymentQuote`. + /// Serialized `PaymentQuote` (includes the node's close group view). quote: Vec, /// `true` when the chunk already exists on this node (skip payment). already_stored: bool, diff --git a/src/devnet.rs b/src/devnet.rs index a702ce56..ca6f5d80 100644 --- a/src/devnet.rs +++ b/src/devnet.rs @@ -581,6 +581,7 @@ impl Devnet { evm: evm_config, cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = PaymentVerifier::new(payment_config); let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS); @@ -594,6 +595,7 @@ impl Devnet { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + None, )) } @@ -635,6 +637,10 @@ impl Devnet { *node.state.write().await = NodeState::Running; if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Inject P2P node into protocol handler for close-group lookups. + if protocol.set_p2p_node(Arc::clone(p2p)).is_err() { + warn!("P2P node already set on protocol handler for devnet node {index}"); + } let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol); diff --git a/src/node.rs b/src/node.rs index 378474a4..8bfe05ce 100644 --- a/src/node.rs +++ b/src/node.rs @@ -118,11 +118,16 @@ impl NodeBuilder { None }; + // Wrap P2P node in Arc early so it can be shared with the protocol handler. + let p2p_node = Arc::new(p2p_node); + // Initialize ANT protocol handler for chunk storage and // wire the fresh-write channel so PUTs trigger replication. let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled { let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel(); - let mut protocol = Self::build_ant_protocol(&self.config, &identity).await?; + let mut protocol = + Self::build_ant_protocol(&self.config, &identity, Some(Arc::clone(&p2p_node))) + .await?; protocol.set_fresh_write_sender(fresh_write_tx); (Some(Arc::new(protocol)), Some(fresh_write_rx)) } else { @@ -130,8 +135,6 @@ impl NodeBuilder { (None, None) }; - let p2p_arc = Arc::new(p2p_node); - // Initialize replication engine (if storage is enabled) let replication_engine = if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) { @@ -140,7 +143,7 @@ impl NodeBuilder { let payment_verifier_arc = protocol.payment_verifier_arc(); match ReplicationEngine::new( repl_config, - Arc::clone(&p2p_arc), + Arc::clone(&p2p_node), storage_arc, payment_verifier_arc, &self.config.root_dir, @@ -161,7 +164,7 @@ impl NodeBuilder { let node = RunningNode { config: self.config, - p2p_node: p2p_arc, + p2p_node, shutdown, events_tx, events_rx: Some(events_rx), @@ -352,6 +355,7 @@ impl NodeBuilder { async fn build_ant_protocol( config: &NodeConfig, identity: &NodeIdentity, + p2p_node: Option>, ) -> Result { // Create LMDB storage let storage_config = LmdbStorageConfig { @@ -385,6 +389,7 @@ impl NodeBuilder { }, cache_capacity: config.payment.cache_capacity, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = PaymentVerifier::new(payment_config); let metrics_tracker = QuotingMetricsTracker::new(0); @@ -398,6 +403,7 @@ impl NodeBuilder { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + p2p_node, ); info!( diff --git a/src/payment/proof.rs b/src/payment/proof.rs index 0db0b5e0..2d5098d1 100644 --- a/src/payment/proof.rs +++ b/src/payment/proof.rs @@ -136,6 +136,7 @@ mod tests { timestamp: SystemTime::now(), price: Amount::from(1u64), rewards_address: RewardsAddress::new([1u8; 20]), + close_group: vec![], pub_key: vec![], signature: vec![], } diff --git a/src/payment/quote.rs b/src/payment/quote.rs index 4d6e616f..ca265dc8 100644 --- a/src/payment/quote.rs +++ b/src/payment/quote.rs @@ -120,6 +120,7 @@ impl QuoteGenerator { content: XorName, data_size: usize, data_type: u32, + close_group: Vec<[u8; 32]>, ) -> Result { let sign_fn = self .sign_fn @@ -134,9 +135,15 @@ impl QuoteGenerator { // Convert XorName to xor_name::XorName let xor_name = xor_name::XorName(content); - // Create bytes for signing (following autonomi's pattern) - let bytes = - PaymentQuote::bytes_for_signing(xor_name, timestamp, &price, &self.rewards_address); + // Create bytes for signing — includes close_group so it's + // cryptographically bound to this quote. + let bytes = PaymentQuote::bytes_for_signing( + xor_name, + timestamp, + &price, + &self.rewards_address, + &close_group, + ); // Sign the bytes let signature = sign_fn(&bytes); @@ -152,6 +159,7 @@ impl QuoteGenerator { price, pub_key: self.pub_key.clone(), rewards_address: self.rewards_address, + close_group, signature, }; @@ -437,7 +445,7 @@ mod tests { let generator = create_test_generator(); let content = [42u8; 32]; - let quote = generator.create_quote(content, 1024, 0); + let quote = generator.create_quote(content, 1024, 0, vec![]); assert!(quote.is_ok()); let quote = quote.expect("valid quote"); @@ -450,7 +458,7 @@ mod tests { let content = [42u8; 32]; let quote = generator - .create_quote(content, 1024, 0) + .create_quote(content, 1024, 0, vec![]) .expect("valid quote"); assert!(verify_quote_content("e, &content)); @@ -468,7 +476,7 @@ mod tests { assert!(!generator.can_sign()); let content = [42u8; 32]; - let result = generator.create_quote(content, 1024, 0); + let result = generator.create_quote(content, 1024, 0, vec![]); assert!(result.is_err()); } @@ -491,7 +499,7 @@ mod tests { let content = [7u8; 32]; let quote = generator - .create_quote(content, 2048, 0) + .create_quote(content, 2048, 0, vec![]) .expect("create quote"); // Valid signature should verify @@ -511,7 +519,7 @@ mod tests { let content = [42u8; 32]; let quote = generator - .create_quote(content, 1024, 0) + .create_quote(content, 1024, 0, vec![]) .expect("create quote"); // The dummy signer produces a 64-byte fake signature, not a valid @@ -556,9 +564,15 @@ mod tests { let content = [10u8; 32]; // All data types produce the same price (price depends on records_stored, not data_type) - let q0 = generator.create_quote(content, 1024, 0).expect("type 0"); - let q1 = generator.create_quote(content, 512, 1).expect("type 1"); - let q2 = generator.create_quote(content, 256, 2).expect("type 2"); + let q0 = generator + .create_quote(content, 1024, 0, vec![]) + .expect("type 0"); + let q1 = generator + .create_quote(content, 512, 1, vec![]) + .expect("type 1"); + let q2 = generator + .create_quote(content, 256, 2, vec![]) + .expect("type 2"); // All quotes should have a valid price (minimum floor of 1) assert!(q0.price >= Amount::from(1u64)); @@ -572,7 +586,9 @@ mod tests { let content = [11u8; 32]; // Price depends on records_stored, not data size - let quote = generator.create_quote(content, 0, 0).expect("zero size"); + let quote = generator + .create_quote(content, 0, 0, vec![]) + .expect("zero size"); assert!(quote.price >= Amount::from(1u64)); } @@ -583,7 +599,7 @@ mod tests { // Price depends on records_stored, not data size let quote = generator - .create_quote(content, 10_000_000, 0) + .create_quote(content, 10_000_000, 0, vec![]) .expect("large size"); assert!(quote.price >= Amount::from(1u64)); } @@ -595,6 +611,7 @@ mod tests { timestamp: SystemTime::now(), price: Amount::from(1u64), rewards_address: RewardsAddress::new([0u8; 20]), + close_group: vec![], pub_key: vec![], signature: vec![], }; diff --git a/src/payment/single_node.rs b/src/payment/single_node.rs index 433ecb5f..f5f0631f 100644 --- a/src/payment/single_node.rs +++ b/src/payment/single_node.rs @@ -258,6 +258,7 @@ mod tests { timestamp: SystemTime::now(), price: Amount::from(1u64), rewards_address: RewardsAddress::new([rewards_addr_seed; 20]), + close_group: vec![], pub_key: vec![], signature: vec![], } @@ -456,6 +457,7 @@ mod tests { timestamp: SystemTime::now(), price: Amount::from(*price), rewards_address: RewardsAddress::new([1u8; 20]), + close_group: vec![], pub_key: vec![], signature: vec![], }; @@ -566,6 +568,7 @@ mod tests { price: Amount::from(*price), #[allow(clippy::cast_possible_truncation)] // i is always < 7 rewards_address: RewardsAddress::new([i as u8 + 1; 20]), + close_group: vec![], pub_key: vec![], signature: vec![], }; @@ -639,6 +642,7 @@ mod tests { timestamp: SystemTime::now(), price, rewards_address: wallet.address(), + close_group: vec![], pub_key: vec![], signature: vec![], }; diff --git a/src/payment/verifier.rs b/src/payment/verifier.rs index b7fc98ff..87aedf05 100644 --- a/src/payment/verifier.rs +++ b/src/payment/verifier.rs @@ -20,6 +20,7 @@ use evmlib::RewardsAddress; use lru::LruCache; use parking_lot::Mutex; use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes; +use std::collections::HashSet; use std::num::NonZeroUsize; use std::time::SystemTime; use tracing::{debug, info}; @@ -77,6 +78,10 @@ pub struct PaymentVerifierConfig { /// Local node's rewards address. /// The verifier rejects payments that don't include this node as a recipient. pub local_rewards_address: RewardsAddress, + /// Local node's peer ID (32-byte BLAKE3 hash of ML-DSA-65 public key). + /// Used to build the full close group view (self + DHT peers) during + /// payment proof validation. + pub local_peer_id: [u8; 32], } /// Status returned by payment verification. @@ -201,6 +206,7 @@ impl PaymentVerifier { &self, xorname: &XorName, payment_proof: Option<&[u8]>, + local_close_group: &[[u8; 32]], ) -> Result { // First check if payment is required let status = self.check_payment_required(xorname); @@ -239,7 +245,8 @@ impl PaymentVerifier { debug!("Proof includes {} transaction hash(es)", tx_hashes.len()); } - self.verify_evm_payment(xorname, &payment).await?; + self.verify_evm_payment(xorname, &payment, local_close_group) + .await?; } None => { let tag = proof.first().copied().unwrap_or(0); @@ -304,7 +311,12 @@ impl PaymentVerifier { /// For unit tests that don't need on-chain verification, pre-populate /// the cache so `verify_payment` returns `CachedAsVerified` before /// reaching this method. - async fn verify_evm_payment(&self, xorname: &XorName, payment: &ProofOfPayment) -> Result<()> { + async fn verify_evm_payment( + &self, + xorname: &XorName, + payment: &ProofOfPayment, + local_close_group: &[[u8; 32]], + ) -> Result<()> { if tracing::enabled!(tracing::Level::DEBUG) { let xorname_hex = hex::encode(xorname); let quote_count = payment.peer_quotes.len(); @@ -316,6 +328,7 @@ impl PaymentVerifier { Self::validate_quote_timestamps(payment)?; Self::validate_peer_bindings(payment)?; self.validate_local_recipient(payment)?; + self.validate_close_group_membership(payment, local_close_group)?; // Verify quote signatures (CPU-bound, run off async runtime) let peer_quotes = payment.peer_quotes.clone(); @@ -672,12 +685,74 @@ impl PaymentVerifier { } Ok(()) } + + /// Verify that **every** peer in the payment proof is a known close group member. + /// + /// Builds the allowed set from: + /// 1. The current DHT close group (from the routing table) + /// 2. This node itself + /// 3. Peers listed in this node's own quote's `close_group` field + /// + /// Source (3) handles routing table churn: the close group may have changed + /// between quote issuance and the PUT arriving, but the node's own signed + /// quote captured its view at quote time, so those peers are still valid. + /// + /// Rejects the proof if ANY peer is unrecognized across all three sources. + /// Skipped when `local_close_group` is empty (unit tests without DHT). + fn validate_close_group_membership( + &self, + payment: &ProofOfPayment, + local_close_group: &[[u8; 32]], + ) -> Result<()> { + if local_close_group.is_empty() { + return Ok(()); + } + + // Build the allowed peer set: current DHT close group + this node. + let mut known_peers: HashSet<[u8; 32]> = local_close_group.iter().copied().collect(); + known_peers.insert(self.config.local_peer_id); + + // Also allow peers that were in this node's close group view at quote + // time. Find our own quote by matching the local rewards address, then + // add its signed close_group entries to the allowed set. + for (_, quote) in &payment.peer_quotes { + if quote.rewards_address == self.config.local_rewards_address { + for peer_id in "e.close_group { + known_peers.insert(*peer_id); + } + break; + } + } + + // Every proof peer must be in the allowed set. + for (_encoded_peer_id, quote) in &payment.peer_quotes { + let peer_id = peer_id_from_public_key_bytes("e.pub_key).map_err(|e| { + Error::Payment(format!("Invalid ML-DSA pub_key in proof quote: {e}")) + })?; + + if !known_peers.contains(peer_id.as_bytes()) { + return Err(Error::Payment(format!( + "Proof peer {} is not in the current close group", + peer_id.to_hex() + ))); + } + } + + debug!( + "Close group membership validated: all {} proof peers recognized", + payment.peer_quotes.len() + ); + Ok(()) + } } #[cfg(test)] #[allow(clippy::expect_used)] mod tests { use super::*; + use evmlib::EncodedPeerId; + use saorsa_core::MlDsa65; + use saorsa_pqc::pqc::MlDsaOperations; /// Create a verifier for unit tests. EVM is always on, but tests can /// pre-populate the cache to bypass on-chain verification. @@ -686,6 +761,7 @@ mod tests { evm: EvmVerifierConfig::default(), cache_capacity: 100, local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [1u8; 32], }; PaymentVerifier::new(config) } @@ -719,7 +795,7 @@ mod tests { let xorname = [1u8; 32]; // No proof provided => should return an error (EVM is always on) - let result = verifier.verify_payment(&xorname, None).await; + let result = verifier.verify_payment(&xorname, None, &[]).await; assert!( result.is_err(), "Expected Err without proof, got: {result:?}" @@ -735,7 +811,7 @@ mod tests { verifier.cache.insert(xorname); // Should succeed without payment (cached) - let result = verifier.verify_payment(&xorname, None).await; + let result = verifier.verify_payment(&xorname, None, &[]).await; assert!(result.is_ok()); assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified); } @@ -782,7 +858,9 @@ mod tests { // Proof smaller than MIN_PAYMENT_PROOF_SIZE_BYTES let small_proof = vec![0u8; MIN_PAYMENT_PROOF_SIZE_BYTES - 1]; - let result = verifier.verify_payment(&xorname, Some(&small_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&small_proof), &[]) + .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); assert!( @@ -798,7 +876,9 @@ mod tests { // Proof larger than MAX_PAYMENT_PROOF_SIZE_BYTES let large_proof = vec![0u8; MAX_PAYMENT_PROOF_SIZE_BYTES + 1]; - let result = verifier.verify_payment(&xorname, Some(&large_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&large_proof), &[]) + .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); assert!( @@ -815,7 +895,7 @@ mod tests { // Exactly MIN_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected let boundary_proof = vec![0xFFu8; MIN_PAYMENT_PROOF_SIZE_BYTES]; let result = verifier - .verify_payment(&xorname, Some(&boundary_proof)) + .verify_payment(&xorname, Some(&boundary_proof), &[]) .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); @@ -833,7 +913,7 @@ mod tests { // Exactly MAX_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected let boundary_proof = vec![0xFFu8; MAX_PAYMENT_PROOF_SIZE_BYTES]; let result = verifier - .verify_payment(&xorname, Some(&boundary_proof)) + .verify_payment(&xorname, Some(&boundary_proof), &[]) .await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); @@ -851,7 +931,7 @@ mod tests { // Valid tag (0x01) but garbage payload — should fail deserialization let mut garbage = vec![crate::ant_protocol::PROOF_TAG_SINGLE_NODE]; garbage.extend_from_slice(&[0xAB; 63]); - let result = verifier.verify_payment(&xorname, Some(&garbage)).await; + let result = verifier.verify_payment(&xorname, Some(&garbage), &[]).await; assert!(result.is_err()); let err_msg = format!("{}", result.expect_err("should fail")); assert!( @@ -906,7 +986,7 @@ mod tests { let v = verifier.clone(); handles.push(tokio::spawn(async move { let xorname = [i; 32]; - v.verify_payment(&xorname, None).await + v.verify_payment(&xorname, None, &[]).await })); } @@ -955,7 +1035,9 @@ mod tests { }); let content: XorName = [i; 32]; - let quote = generator.create_quote(content, 4096, 0).expect("quote"); + let quote = generator + .create_quote(content, 4096, 0, vec![]) + .expect("quote"); peer_quotes.push((EncodedPeerId::new(rand::random()), quote)); } @@ -1001,6 +1083,7 @@ mod tests { timestamp: SystemTime::now(), price: Amount::from(1u64), rewards_address: RewardsAddress::new([1u8; 20]), + close_group: vec![], pub_key: vec![0u8; 64], signature: vec![0u8; 64], }; @@ -1019,7 +1102,7 @@ mod tests { let proof_bytes = serialize_single_node_proof(&proof).expect("serialize proof"); let result = verifier - .verify_payment(&target_xorname, Some(&proof_bytes)) + .verify_payment(&target_xorname, Some(&proof_bytes), &[]) .await; assert!(result.is_err(), "Should reject mismatched content address"); @@ -1043,6 +1126,7 @@ mod tests { timestamp, price: Amount::from(1u64), rewards_address, + close_group: vec![], pub_key: vec![0u8; 64], signature: vec![0u8; 64], } @@ -1078,7 +1162,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject expired quote"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1107,7 +1193,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject future-timestamped quote"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1136,7 +1224,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; // Should NOT fail at timestamp check (will fail later at pub_key binding) let err_msg = format!("{}", result.expect_err("should fail at later check")); @@ -1165,7 +1255,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!( result.is_err(), @@ -1197,7 +1289,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; // Should NOT fail at timestamp check (will fail later at pub_key binding) let err_msg = format!("{}", result.expect_err("should fail at later check")); @@ -1227,6 +1321,7 @@ mod tests { }, cache_capacity: 100, local_rewards_address: local_addr, + local_peer_id: [0xAAu8; 32], }; let verifier = PaymentVerifier::new(config); @@ -1249,7 +1344,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject payment not addressed to us"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1286,7 +1383,9 @@ mod tests { } let proof_bytes = serialize_proof(peer_quotes); - let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&proof_bytes), &[]) + .await; assert!(result.is_err(), "Should reject wrong peer binding"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1314,7 +1413,7 @@ mod tests { merkle_garbage.extend_from_slice(&[0xAB; 63]); let result = verifier - .verify_payment(&xorname, Some(&merkle_garbage)) + .verify_payment(&xorname, Some(&merkle_garbage), &[]) .await; assert!( @@ -1362,7 +1461,9 @@ mod tests { // verify_payment should process it through the single-node path. // It will fail at quote validation (fake pub_key), but we verify // it passes the deserialization stage by checking the error type. - let result = verifier.verify_payment(&xorname, Some(&tagged_bytes)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_bytes), &[]) + .await; assert!(result.is_err(), "Should fail at quote validation stage"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1524,7 +1625,7 @@ mod tests { let wrong_xorname = [0xFFu8; 32]; let result = verifier - .verify_payment(&wrong_xorname, Some(&tagged_proof)) + .verify_payment(&wrong_xorname, Some(&tagged_proof), &[]) .await; assert!( @@ -1552,7 +1653,9 @@ mod tests { bad_proof.push(0x00); } - let result = verifier.verify_payment(&xorname, Some(&bad_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&bad_proof), &[]) + .await; assert!(result.is_err(), "Should reject malformed merkle body"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1604,6 +1707,7 @@ mod tests { evm: EvmVerifierConfig::default(), cache_capacity: 100, local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [1u8; 32], }; let verifier = PaymentVerifier::new(config); @@ -1719,7 +1823,7 @@ mod tests { let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize"); - let result = verifier.verify_payment(&xorname, Some(&tagged)).await; + let result = verifier.verify_payment(&xorname, Some(&tagged), &[]).await; assert!( result.is_err(), @@ -1749,7 +1853,7 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged)).await; + let result = verifier.verify_payment(&xorname, Some(&tagged), &[]).await; assert!( result.is_err(), @@ -1785,7 +1889,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!( result.is_err(), @@ -1820,7 +1926,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!(result.is_err(), "Should reject paid node address mismatch"); let err_msg = format!("{}", result.expect_err("should fail")); @@ -1850,7 +1958,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!( result.is_err(), @@ -1884,7 +1994,9 @@ mod tests { verifier.pool_cache.lock().put(pool_hash, info); } - let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await; + let result = verifier + .verify_payment(&xorname, Some(&tagged_proof), &[]) + .await; assert!( result.is_err(), @@ -1896,4 +2008,162 @@ mod tests { "Error should mention underpayment: {err_msg}" ); } + + // ========================================================================= + // Close-group membership validation tests + // ========================================================================= + + #[test] + fn test_close_group_all_peers_recognised_accepted() { + let ml_dsa = MlDsa65::new(); + let mut peer_quotes = Vec::new(); + let mut close_group_ids: Vec<[u8; 32]> = Vec::new(); + + // Generate CLOSE_GROUP_SIZE peers with real ML-DSA keys. + for _ in 0..CLOSE_GROUP_SIZE { + let (public_key, _) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let ant_peer_id = + peer_id_from_public_key_bytes(&pub_key_bytes).expect("peer id from pub key"); + close_group_ids.push(*ant_peer_id.as_bytes()); + + let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + quote.pub_key = pub_key_bytes; + peer_quotes.push((encoded, quote)); + } + + let payment = ProofOfPayment { peer_quotes }; + + // Verifier whose local_peer_id is NOT one of the proof peers (but that's + // fine — it only needs to be in the known set, and we insert it). + let config = PaymentVerifierConfig { + evm: EvmVerifierConfig::default(), + cache_capacity: 100, + local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [0xBBu8; 32], + }; + let verifier = PaymentVerifier::new(config); + + let result = verifier.validate_close_group_membership(&payment, &close_group_ids); + assert!( + result.is_ok(), + "All proof peers are in close group — should accept: {result:?}" + ); + } + + #[test] + fn test_close_group_unknown_peer_rejected() { + let ml_dsa = MlDsa65::new(); + let mut peer_quotes = Vec::new(); + let mut close_group_ids: Vec<[u8; 32]> = Vec::new(); + + // Generate CLOSE_GROUP_SIZE peers; include all but the last in the + // close group so one peer is "unknown". + for i in 0..CLOSE_GROUP_SIZE { + let (public_key, _) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let ant_peer_id = + peer_id_from_public_key_bytes(&pub_key_bytes).expect("peer id from pub key"); + + // Only add the first N-1 peers to the close group. + if i < CLOSE_GROUP_SIZE - 1 { + close_group_ids.push(*ant_peer_id.as_bytes()); + } + + let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + quote.pub_key = pub_key_bytes; + peer_quotes.push((encoded, quote)); + } + + let payment = ProofOfPayment { peer_quotes }; + + let config = PaymentVerifierConfig { + evm: EvmVerifierConfig::default(), + cache_capacity: 100, + local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: [0xBBu8; 32], + }; + let verifier = PaymentVerifier::new(config); + + let result = verifier.validate_close_group_membership(&payment, &close_group_ids); + assert!(result.is_err(), "One unknown peer — should reject"); + let err_msg = format!("{}", result.expect_err("should fail")); + assert!( + err_msg.contains("not in the current close group"), + "Error should mention close group: {err_msg}" + ); + } + + #[test] + fn test_close_group_empty_skips_validation() { + // With an empty close group (unit test / no DHT), validation is skipped. + let verifier = create_test_verifier(); + + let quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + let peer_quotes = vec![(EncodedPeerId::new(rand::random()), quote)]; + + let payment = ProofOfPayment { peer_quotes }; + + let result = verifier.validate_close_group_membership(&payment, &[]); + assert!( + result.is_ok(), + "Empty close group should skip validation: {result:?}" + ); + } + + #[test] + fn test_close_group_local_peer_is_implicitly_known() { + let ml_dsa = MlDsa65::new(); + + // Generate a single peer whose BLAKE3 ID we'll set as local_peer_id. + let (public_key, _) = ml_dsa.generate_keypair().expect("keygen"); + let pub_key_bytes = public_key.as_bytes().to_vec(); + let ant_peer_id = + peer_id_from_public_key_bytes(&pub_key_bytes).expect("peer id from pub key"); + + let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes); + let mut quote = make_fake_quote( + [0xAA; 32], + SystemTime::now(), + RewardsAddress::new([1u8; 20]), + ); + quote.pub_key = pub_key_bytes; + + let payment = ProofOfPayment { + peer_quotes: vec![(encoded, quote)], + }; + + // The local_peer_id matches the proof peer, and the close group + // contains at least one entry (so validation isn't skipped) but + // does NOT contain the proof peer — only local_peer_id does. + let config = PaymentVerifierConfig { + evm: EvmVerifierConfig::default(), + cache_capacity: 100, + local_rewards_address: RewardsAddress::new([1u8; 20]), + local_peer_id: *ant_peer_id.as_bytes(), + }; + let verifier = PaymentVerifier::new(config); + + // Close group has a dummy entry so validation isn't skipped. + let dummy_peer = [0xFFu8; 32]; + let result = verifier.validate_close_group_membership(&payment, &[dummy_peer]); + assert!( + result.is_ok(), + "Proof peer matches local_peer_id — should accept: {result:?}" + ); + } } diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 853362b6..f2171586 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -1059,7 +1059,7 @@ async fn handle_fresh_offer( // Gap 1: Validate PoP via PaymentVerifier. match payment_verifier - .verify_payment(&offer.key, Some(&offer.proof_of_payment)) + .verify_payment(&offer.key, Some(&offer.proof_of_payment), &[]) .await { Ok(status) if status.can_store() => { @@ -1174,7 +1174,7 @@ async fn handle_paid_notify( // Gap 1: Validate PoP via PaymentVerifier. match payment_verifier - .verify_payment(¬ify.key, Some(¬ify.proof_of_payment)) + .verify_payment(¬ify.key, Some(¬ify.proof_of_payment), &[]) .await { Ok(status) if status.can_store() => { diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 5eab7094..c2732e7d 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -30,8 +30,8 @@ use crate::ant_protocol::{ ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest, - MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK, - MAX_CHUNK_SIZE, + MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, CLOSE_GROUP_SIZE, + DATA_TYPE_CHUNK, MAX_CHUNK_SIZE, }; use crate::client::compute_address; use crate::error::{Error, Result}; @@ -39,7 +39,9 @@ use crate::payment::{PaymentVerifier, QuoteGenerator}; use crate::replication::fresh::FreshWriteEvent; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; +use saorsa_core::P2PNode; use std::sync::Arc; +use std::sync::OnceLock; use tokio::sync::mpsc; use tracing::{debug, info, warn}; @@ -57,6 +59,10 @@ pub struct AntProtocol { quote_generator: Arc, /// Channel for notifying the replication engine about newly-stored chunks. fresh_write_tx: Option>, + /// P2P node for local close-group lookups during quote and payment + /// validation. Initialised via the constructor or [`set_p2p_node`] when + /// the P2P layer starts after the protocol handler (devnet / test nodes). + p2p_node: OnceLock>, } impl AntProtocol { @@ -67,17 +73,61 @@ impl AntProtocol { /// * `storage` - LMDB storage for chunk persistence /// * `payment_verifier` - Payment verifier for validating payments /// * `quote_generator` - Quote generator for creating storage quotes + /// * `p2p_node` - P2P node for local close-group lookups (`None` in unit tests + /// or when the P2P layer is not yet started — see [`set_p2p_node`]) #[must_use] pub fn new( storage: Arc, payment_verifier: Arc, quote_generator: Arc, + p2p_node: Option>, ) -> Self { + let lock = OnceLock::new(); + if let Some(node) = p2p_node { + // Fresh OnceLock — set cannot fail. + let _ = lock.set(node); + } Self { storage, payment_verifier, quote_generator, fresh_write_tx: None, + p2p_node: lock, + } + } + + /// Inject the P2P node after construction. + /// + /// Used by devnet and test harnesses where the `P2PNode` is created after + /// the `AntProtocol` handler. + /// + /// # Errors + /// + /// Returns the rejected `Arc` if a node was already set. + pub fn set_p2p_node(&self, node: Arc) -> std::result::Result<(), Arc> { + self.p2p_node.set(node) + } + + /// Query the local routing table for the closest peers to `address`. + /// + /// Returns up to `CLOSE_GROUP_SIZE` peer IDs **excluding this node**. + /// The local node is intentionally omitted because `find_closest_nodes_local` + /// filters out self — the caller adds `local_peer_id` separately when + /// building the full close-group set for validation. + /// + /// We request `CLOSE_GROUP_SIZE` (not `CLOSE_GROUP_SIZE - 1`) because this + /// node may not be in the actual close group for the target address — asking + /// for fewer peers could exclude a legitimate member. + async fn local_close_group(&self, address: &[u8; 32]) -> Vec<[u8; 32]> { + match self.p2p_node.get() { + Some(p2p) => p2p + .dht() + .find_closest_nodes_local(address, CLOSE_GROUP_SIZE) + .await + .iter() + .map(|node| *node.peer_id.as_bytes()) + .collect(), + None => Vec::new(), } } @@ -131,7 +181,7 @@ impl AntProtocol { ChunkMessageBody::GetResponse(self.handle_get(req).await) } ChunkMessageBody::QuoteRequest(ref req) => { - ChunkMessageBody::QuoteResponse(self.handle_quote(req)) + ChunkMessageBody::QuoteResponse(self.handle_quote(req).await) } ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => { ChunkMessageBody::MerkleCandidateQuoteResponse( @@ -196,10 +246,17 @@ impl AntProtocol { Ok(false) => {} } - // 4. Verify payment + // 4. Look up local close group for this content address. + let local_close_group = self.local_close_group(&address).await; + + // 5. Verify payment (including close group membership check) let payment_result = self .payment_verifier - .verify_payment(&address, request.payment_proof.as_deref()) + .verify_payment( + &address, + request.payment_proof.as_deref(), + &local_close_group, + ) .await; match payment_result { @@ -216,7 +273,7 @@ impl AntProtocol { } } - // 5. Store chunk + // 6. Store chunk match self.storage.put(&address, &request.content).await { Ok(_) => { let content_len = request.content.len(); @@ -273,7 +330,7 @@ impl AntProtocol { } /// Handle a quote request. - fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse { + async fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse { let addr_hex = hex::encode(request.address); let data_size = request.data_size; debug!("Handling quote request for {addr_hex} (size: {data_size})"); @@ -306,10 +363,14 @@ impl AntProtocol { }); } - match self - .quote_generator - .create_quote(request.address, data_size_usize, request.data_type) - { + let close_group = self.local_close_group(&request.address).await; + + match self.quote_generator.create_quote( + request.address, + data_size_usize, + request.data_type, + close_group, + ) { Ok(quote) => { // Serialize the quote match rmp_serde::to_vec("e) { @@ -455,6 +516,7 @@ mod tests { evm: EvmVerifierConfig::default(), cache_capacity: 100_000, local_rewards_address: rewards_address, + local_peer_id: [1u8; 32], }; let payment_verifier = Arc::new(PaymentVerifier::new(payment_config)); let metrics_tracker = QuotingMetricsTracker::new(100); @@ -473,7 +535,7 @@ mod tests { .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec()) }); - let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator)); + let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator), None); (protocol, temp_dir) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 07059777..6bad30b1 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -38,7 +38,7 @@ //! let storage = Arc::new(LmdbStorage::new(config).await?); //! //! // Create protocol handler -//! let protocol = AntProtocol::new(storage, Arc::new(payment_verifier), Arc::new(quote_generator)); +//! let protocol = AntProtocol::new(storage, Arc::new(payment_verifier), Arc::new(quote_generator), None); //! //! // Register with saorsa-core //! listener.register_protocol(protocol).await?; diff --git a/tests/e2e/data_types/chunk.rs b/tests/e2e/data_types/chunk.rs index b47d9c55..fd130b17 100644 --- a/tests/e2e/data_types/chunk.rs +++ b/tests/e2e/data_types/chunk.rs @@ -443,6 +443,7 @@ mod tests { evm: EvmVerifierConfig { network }, cache_capacity: 100, local_rewards_address: rewards_address, + local_peer_id: [0x01; 32], }); let metrics_tracker = QuotingMetricsTracker::new(100); let quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker); @@ -451,6 +452,7 @@ mod tests { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + None, ); Ok((protocol, temp_dir, testnet)) diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index c77f9326..f5b82c20 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -1090,6 +1090,7 @@ impl TestNetwork { }, cache_capacity: TEST_PAYMENT_CACHE_CAPACITY, local_rewards_address: rewards_address, + local_peer_id: *identity.peer_id().as_bytes(), }; let payment_verifier = PaymentVerifier::new(payment_config); @@ -1124,6 +1125,7 @@ impl TestNetwork { Arc::new(storage), Arc::new(payment_verifier), Arc::new(quote_generator), + None, )) } @@ -1166,6 +1168,13 @@ impl TestNetwork { // Start protocol handler that routes incoming P2P messages to AntProtocol if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) { + // Inject P2P node into protocol handler for close-group lookups. + protocol.set_p2p_node(Arc::clone(p2p)).map_err(|_| { + TestnetError::Startup(format!( + "P2P node already set on protocol handler for node {}", + node.index, + )) + })?; let mut events = p2p.subscribe_events(); let p2p_clone = Arc::clone(p2p); let protocol_clone = Arc::clone(protocol);