From 102e8a04decf4069d245bf5817f0595ffe274791 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 26 Feb 2026 09:03:17 +0100 Subject: [PATCH 1/8] Clarify that each pending monitor update ID must be marked complete The previous wording implied that persisting a full ChannelMonitor would automatically resolve all pending updates. Reword to make clear that each update ID still needs to be individually marked complete via channel_monitor_updated, even after a full monitor persistence. Co-Authored-By: Claude Opus 4.6 --- lightning/src/chain/chainmonitor.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7db1b697c2b..74e5e03d07f 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -83,8 +83,10 @@ use core::sync::atomic::{AtomicUsize, Ordering}; /// the background with [`ChainMonitor::list_pending_monitor_updates`] and /// [`ChainMonitor::get_monitor`]. /// -/// Once a full [`ChannelMonitor`] has been persisted, all pending updates for that channel can -/// be marked as complete via [`ChainMonitor::channel_monitor_updated`]. +/// Each pending update must be individually marked as complete by calling +/// [`ChainMonitor::channel_monitor_updated`] with the corresponding update ID. Note that +/// persisting a full [`ChannelMonitor`] covers all prior updates, but each update ID still +/// needs to be marked complete separately. /// /// If at some point no further progress can be made towards persisting the pending updates, the /// node should simply shut down. From 705d1fe3dd10bd13d77abe522fb23b1f78b998e1 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 25 Feb 2026 09:01:13 +0100 Subject: [PATCH 2/8] Extract shared dummy_monitor helper to test_utils Extract the ChannelMonitor construction boilerplate from channelmonitor test functions into a reusable dummy_monitor helper in test_utils.rs, generic over the signer type. Co-Authored-By: Claude Opus 4.6 --- lightning/src/chain/channelmonitor.rs | 97 ++------------------------- lightning/src/ln/chan_utils.rs | 2 +- lightning/src/util/test_utils.rs | 70 +++++++++++++++++++ 3 files changed, 76 insertions(+), 93 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index a8d055a9c5b..eb763de1d5c 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -6769,23 +6769,16 @@ mod tests { weight_revoked_received_htlc, WEIGHT_REVOKED_OUTPUT, }; use crate::chain::transaction::OutPoint; - use crate::chain::{BestBlock, Confirm}; + use crate::chain::Confirm; use crate::io; - use crate::ln::chan_utils::{ - self, ChannelPublicKeys, ChannelTransactionParameters, - CounterpartyChannelTransactionParameters, HTLCOutputInCommitment, - HolderCommitmentTransaction, - }; + use crate::ln::chan_utils::{self, HTLCOutputInCommitment, HolderCommitmentTransaction}; use crate::ln::channel_keys::{ - DelayedPaymentBasepoint, DelayedPaymentKey, HtlcBasepoint, RevocationBasepoint, - RevocationKey, + DelayedPaymentBasepoint, DelayedPaymentKey, RevocationBasepoint, RevocationKey, }; use crate::ln::channelmanager::{HTLCSource, PaymentId}; use crate::ln::functional_test_utils::*; use crate::ln::outbound_payment::RecipientOnionFields; - use crate::ln::script::ShutdownScript; use crate::ln::types::ChannelId; - use crate::sign::{ChannelSigner, InMemorySigner}; use crate::sync::Arc; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::{PaymentHash, PaymentPreimage}; @@ -6955,51 +6948,11 @@ mod tests { } } - let keys = InMemorySigner::new( - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - true, - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - [41; 32], - [0; 32], - [0; 32], - ); - - let counterparty_pubkeys = ChannelPublicKeys { - funding_pubkey: PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[44; 32]).unwrap()), - revocation_basepoint: RevocationBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[45; 32]).unwrap())), - payment_point: PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[46; 32]).unwrap()), - delayed_payment_basepoint: DelayedPaymentBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[47; 32]).unwrap())), - htlc_basepoint: HtlcBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[48; 32]).unwrap())) - }; let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; let channel_id = ChannelId::v1_from_funding_outpoint(funding_outpoint); - let channel_parameters = ChannelTransactionParameters { - holder_pubkeys: keys.pubkeys(&secp_ctx), - holder_selected_contest_delay: 66, - is_outbound_from_holder: true, - counterparty_parameters: Some(CounterpartyChannelTransactionParameters { - pubkeys: counterparty_pubkeys, - selected_contest_delay: 67, - }), - funding_outpoint: Some(funding_outpoint), - splice_parent_funding_txid: None, - channel_type_features: ChannelTypeFeatures::only_static_remote_key(), - channel_value_satoshis: 0, - }; // Prune with one old state and a holder commitment tx holding a few overlaps with the // old state. - let shutdown_pubkey = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); - let shutdown_script = ShutdownScript::new_p2wpkh_from_pubkey(shutdown_pubkey); - let best_block = BestBlock::from_network(Network::Testnet); - let monitor = ChannelMonitor::new( - Secp256k1::new(), keys, Some(shutdown_script.into_inner()), 0, &ScriptBuf::new(), - &channel_parameters, true, 0, HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), - best_block, dummy_key, channel_id, false, - ); + let monitor = crate::util::test_utils::dummy_monitor(channel_id, |keys| keys); let nondust_htlcs = preimages_slice_to_htlcs!(preimages[0..10]); let dummy_commitment_tx = HolderCommitmentTransaction::dummy(0, funding_outpoint, nondust_htlcs); @@ -7218,49 +7171,9 @@ mod tests { let dummy_key = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); - let keys = InMemorySigner::new( - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - true, - SecretKey::from_slice(&[41; 32]).unwrap(), - SecretKey::from_slice(&[41; 32]).unwrap(), - [41; 32], - [0; 32], - [0; 32], - ); - - let counterparty_pubkeys = ChannelPublicKeys { - funding_pubkey: PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[44; 32]).unwrap()), - revocation_basepoint: RevocationBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[45; 32]).unwrap())), - payment_point: PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[46; 32]).unwrap()), - delayed_payment_basepoint: DelayedPaymentBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[47; 32]).unwrap())), - htlc_basepoint: HtlcBasepoint::from(PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[48; 32]).unwrap())), - }; let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; let channel_id = ChannelId::v1_from_funding_outpoint(funding_outpoint); - let channel_parameters = ChannelTransactionParameters { - holder_pubkeys: keys.pubkeys(&secp_ctx), - holder_selected_contest_delay: 66, - is_outbound_from_holder: true, - counterparty_parameters: Some(CounterpartyChannelTransactionParameters { - pubkeys: counterparty_pubkeys, - selected_contest_delay: 67, - }), - funding_outpoint: Some(funding_outpoint), - splice_parent_funding_txid: None, - channel_type_features: ChannelTypeFeatures::only_static_remote_key(), - channel_value_satoshis: 0, - }; - let shutdown_pubkey = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); - let shutdown_script = ShutdownScript::new_p2wpkh_from_pubkey(shutdown_pubkey); - let best_block = BestBlock::from_network(Network::Testnet); - let monitor = ChannelMonitor::new( - Secp256k1::new(), keys, Some(shutdown_script.into_inner()), 0, &ScriptBuf::new(), - &channel_parameters, true, 0, HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), - best_block, dummy_key, channel_id, false, - ); + let monitor = crate::util::test_utils::dummy_monitor(channel_id, |keys| keys); let chan_id = monitor.inner.lock().unwrap().channel_id(); let payment_hash = PaymentHash([1; 32]); diff --git a/lightning/src/ln/chan_utils.rs b/lightning/src/ln/chan_utils.rs index 4bb8ffac9ef..020e57d6c41 100644 --- a/lightning/src/ln/chan_utils.rs +++ b/lightning/src/ln/chan_utils.rs @@ -1344,7 +1344,7 @@ impl_writeable_tlv_based!(HolderCommitmentTransaction, { }); impl HolderCommitmentTransaction { - #[cfg(test)] + #[cfg(any(test, feature = "_test_utils"))] #[rustfmt::skip] pub fn dummy(channel_value_satoshis: u64, funding_outpoint: chain::transaction::OutPoint, nondust_htlcs: Vec) -> Self { let secp_ctx = Secp256k1::new(); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 22be4367c7a..c74f2ca8223 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -23,6 +23,11 @@ use crate::chain::transaction::OutPoint; use crate::chain::WatchedOutput; #[cfg(any(test, feature = "_externalize_tests"))] use crate::ln::chan_utils::CommitmentTransaction; +use crate::ln::chan_utils::{ + ChannelPublicKeys, ChannelTransactionParameters, CounterpartyChannelTransactionParameters, + HolderCommitmentTransaction, +}; +use crate::ln::channel_keys::{DelayedPaymentBasepoint, HtlcBasepoint, RevocationBasepoint}; use crate::ln::channel_state::ChannelDetails; use crate::ln::channelmanager; use crate::ln::inbound_payment::ExpandedKey; @@ -44,9 +49,11 @@ use crate::routing::router::{ }; use crate::routing::scoring::{ChannelUsage, ScoreLookUp, ScoreUpdate}; use crate::routing::utxo::{UtxoLookup, UtxoLookupError, UtxoResult}; +use crate::sign::InMemorySigner; use crate::sign::{self, ReceiveAuthKey}; use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; +use crate::types::features::ChannelTypeFeatures; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use crate::util::async_poll::MaybeSend; use crate::util::config::UserConfig; @@ -2336,3 +2343,66 @@ impl WalletSourceSync for TestWalletSource { self.sign_tx(tx).map_err(|_| ()) } } + +/// Creates a minimal `ChannelMonitor` for testing purposes. +/// +/// The `wrap_signer` closure converts the raw `InMemorySigner` into the desired signer type +/// (e.g. wrapping it in `TestChannelSigner` or passing it through unchanged). +pub fn dummy_monitor( + channel_id: ChannelId, wrap_signer: impl FnOnce(InMemorySigner) -> S, +) -> crate::chain::channelmonitor::ChannelMonitor { + let secp_ctx = Secp256k1::new(); + let dummy_key = + PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let keys = InMemorySigner::new( + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + true, + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + [41; 32], + [0; 32], + [0; 32], + ); + let counterparty_pubkeys = ChannelPublicKeys { + funding_pubkey: dummy_key, + revocation_basepoint: RevocationBasepoint::from(dummy_key), + payment_point: dummy_key, + delayed_payment_basepoint: DelayedPaymentBasepoint::from(dummy_key), + htlc_basepoint: HtlcBasepoint::from(dummy_key), + }; + let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; + let channel_parameters = ChannelTransactionParameters { + holder_pubkeys: keys.pubkeys(&secp_ctx), + holder_selected_contest_delay: 66, + is_outbound_from_holder: true, + counterparty_parameters: Some(CounterpartyChannelTransactionParameters { + pubkeys: counterparty_pubkeys, + selected_contest_delay: 67, + }), + funding_outpoint: Some(funding_outpoint), + splice_parent_funding_txid: None, + channel_type_features: ChannelTypeFeatures::only_static_remote_key(), + channel_value_satoshis: 0, + }; + let shutdown_script = ShutdownScript::new_p2wpkh_from_pubkey(dummy_key); + let best_block = crate::chain::BestBlock::from_network(Network::Testnet); + let signer = wrap_signer(keys); + ChannelMonitor::new( + secp_ctx, + signer, + Some(shutdown_script.into_inner()), + 0, + &ScriptBuf::new(), + &channel_parameters, + true, + 0, + HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), + best_block, + dummy_key, + channel_id, + false, + ) +} From c6153db78c0ecee268bdd4cd783d954b2f05b49e Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 13:55:24 +0100 Subject: [PATCH 3/8] Extract watch_channel_internal/update_channel_internal from Watch impl Pure refactor: move the bodies of Watch::watch_channel and Watch::update_channel into methods on ChainMonitor, and have the Watch trait methods delegate to them. This prepares for adding deferred mode where the Watch methods will conditionally queue operations instead of executing them immediately. Co-Authored-By: Claude Opus 4.6 --- lightning/src/chain/chainmonitor.rs | 300 +++++++++++++++------------- 1 file changed, 156 insertions(+), 144 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 74e5e03d07f..17f79528b07 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1060,6 +1060,160 @@ where Ok(ChannelMonitorUpdateStatus::Completed) } + + fn watch_channel_internal( + &self, channel_id: ChannelId, monitor: ChannelMonitor, + ) -> Result { + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + let mut monitors = self.monitors.write().unwrap(); + let entry = match monitors.entry(channel_id) { + hash_map::Entry::Occupied(_) => { + log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); + return Err(()); + }, + hash_map::Entry::Vacant(e) => e, + }; + log_trace!(logger, "Got new ChannelMonitor"); + let update_id = monitor.get_latest_update_id(); + let mut pending_monitor_updates = Vec::new(); + let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + log_info!(logger, "Persistence of new ChannelMonitor in progress",); + pending_monitor_updates.push(update_id); + }, + ChannelMonitorUpdateStatus::Completed => { + log_info!(logger, "Persistence of new ChannelMonitor completed",); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + if let Some(ref chain_source) = self.chain_source { + monitor.load_outputs_to_watch(chain_source, &self.logger); + } + entry.insert(MonitorHolder { + monitor, + pending_monitor_updates: Mutex::new(pending_monitor_updates), + }); + Ok(persist_res) + } + + fn update_channel_internal( + &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those + // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. + debug_assert_eq!(update.channel_id.unwrap(), channel_id); + // Update the monitor that watches the channel referred to by the given outpoint. + let monitors = self.monitors.read().unwrap(); + match monitors.get(&channel_id) { + None => { + let logger = WithContext::from(&self.logger, None, Some(channel_id), None); + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); + + // We should never ever trigger this from within ChannelManager. Technically a + // user could use this object with some proxying in between which makes this + // possible, but in tests and fuzzing, this should be a panic. + #[cfg(debug_assertions)] + panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); + #[cfg(not(debug_assertions))] + ChannelMonitorUpdateStatus::InProgress + }, + Some(monitor_state) => { + let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); + + // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we + // have well-ordered updates from the users' point of view. See the + // `pending_monitor_updates` docs for more. + let mut pending_monitor_updates = + monitor_state.pending_monitor_updates.lock().unwrap(); + let update_res = monitor.update_monitor( + update, + &self.broadcaster, + &self.fee_estimator, + &self.logger, + ); + + let update_id = update.update_id; + let persist_res = if update_res.is_err() { + // Even if updating the monitor returns an error, the monitor's state will + // still be changed. Therefore, we should persist the updated monitor despite the error. + // We don't want to persist a `monitor_update` which results in a failure to apply later + // while reading `channel_monitor` with updates from storage. Instead, we should persist + // the entire `channel_monitor` here. + log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); + self.persister.update_persisted_channel( + monitor.persistence_key(), + None, + monitor, + ) + } else { + self.persister.update_persisted_channel( + monitor.persistence_key(), + Some(update), + monitor, + ) + }; + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + pending_monitor_updates.push(update_id); + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} in progress", + update_id, + ); + }, + ChannelMonitorUpdateStatus::Completed => { + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} completed", + update_id, + ); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(pending_monitor_updates); + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + + // We may need to start monitoring for any alternative funding transactions. + if let Some(ref chain_source) = self.chain_source { + for (funding_outpoint, funding_script) in + update.internal_renegotiated_funding_data() + { + log_trace!( + logger, + "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", + funding_outpoint + ); + chain_source.register_tx(&funding_outpoint.txid, &funding_script); + chain_source.register_output(WatchedOutput { + block_hash: None, + outpoint: funding_outpoint, + script_pubkey: funding_script, + }); + } + } + + if update_res.is_err() { + ChannelMonitorUpdateStatus::InProgress + } else { + persist_res + } + }, + } + } } impl< @@ -1274,155 +1428,13 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - let mut monitors = self.monitors.write().unwrap(); - let entry = match monitors.entry(channel_id) { - hash_map::Entry::Occupied(_) => { - log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); - return Err(()); - }, - hash_map::Entry::Vacant(e) => e, - }; - log_trace!(logger, "Got new ChannelMonitor"); - let update_id = monitor.get_latest_update_id(); - let mut pending_monitor_updates = Vec::new(); - let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - log_info!(logger, "Persistence of new ChannelMonitor in progress",); - pending_monitor_updates.push(update_id); - }, - ChannelMonitorUpdateStatus::Completed => { - log_info!(logger, "Persistence of new ChannelMonitor completed",); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source, &self.logger); - } - entry.insert(MonitorHolder { - monitor, - pending_monitor_updates: Mutex::new(pending_monitor_updates), - }); - Ok(persist_res) + self.watch_channel_internal(channel_id, monitor) } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { - // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those - // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. - debug_assert_eq!(update.channel_id.unwrap(), channel_id); - // Update the monitor that watches the channel referred to by the given outpoint. - let monitors = self.monitors.read().unwrap(); - match monitors.get(&channel_id) { - None => { - let logger = WithContext::from(&self.logger, None, Some(channel_id), None); - log_error!(logger, "Failed to update channel monitor: no such monitor registered"); - - // We should never ever trigger this from within ChannelManager. Technically a - // user could use this object with some proxying in between which makes this - // possible, but in tests and fuzzing, this should be a panic. - #[cfg(debug_assertions)] - panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); - #[cfg(not(debug_assertions))] - ChannelMonitorUpdateStatus::InProgress - }, - Some(monitor_state) => { - let monitor = &monitor_state.monitor; - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); - - // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we - // have well-ordered updates from the users' point of view. See the - // `pending_monitor_updates` docs for more. - let mut pending_monitor_updates = - monitor_state.pending_monitor_updates.lock().unwrap(); - let update_res = monitor.update_monitor( - update, - &self.broadcaster, - &self.fee_estimator, - &self.logger, - ); - - let update_id = update.update_id; - let persist_res = if update_res.is_err() { - // Even if updating the monitor returns an error, the monitor's state will - // still be changed. Therefore, we should persist the updated monitor despite the error. - // We don't want to persist a `monitor_update` which results in a failure to apply later - // while reading `channel_monitor` with updates from storage. Instead, we should persist - // the entire `channel_monitor` here. - log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); - self.persister.update_persisted_channel( - monitor.persistence_key(), - None, - monitor, - ) - } else { - self.persister.update_persisted_channel( - monitor.persistence_key(), - Some(update), - monitor, - ) - }; - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - pending_monitor_updates.push(update_id); - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} in progress", - update_id, - ); - }, - ChannelMonitorUpdateStatus::Completed => { - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} completed", - update_id, - ); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(pending_monitor_updates); - core::mem::drop(monitors); - let _poison = self.monitors.write().unwrap(); - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - - // We may need to start monitoring for any alternative funding transactions. - if let Some(ref chain_source) = self.chain_source { - for (funding_outpoint, funding_script) in - update.internal_renegotiated_funding_data() - { - log_trace!( - logger, - "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", - funding_outpoint - ); - chain_source.register_tx(&funding_outpoint.txid, &funding_script); - chain_source.register_output(WatchedOutput { - block_hash: None, - outpoint: funding_outpoint, - script_pubkey: funding_script, - }); - } - } - - if update_res.is_err() { - ChannelMonitorUpdateStatus::InProgress - } else { - persist_res - } - }, - } + self.update_channel_internal(channel_id, update) } fn release_pending_monitor_events( From c58fd174f0c3a420eb2d28c6d02f7ef59e5a8da8 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 14:46:35 +0100 Subject: [PATCH 4/8] Add deferred bool to ChainMonitor Add a `deferred` parameter to `ChainMonitor::new` and `ChainMonitor::new_async_beta`. When set to true, the Watch trait methods (watch_channel and update_channel) will unimplemented!() for now. All existing callers pass false to preserve current behavior. Co-Authored-By: Claude Opus 4.6 --- fuzz/src/chanmon_consistency.rs | 1 + fuzz/src/full_stack.rs | 1 + fuzz/src/lsps_message.rs | 1 + lightning/src/chain/chainmonitor.rs | 21 +++++++++++++++---- lightning/src/ln/chanmon_update_fail_tests.rs | 1 + lightning/src/ln/channelmanager.rs | 4 ++-- lightning/src/util/test_utils.rs | 1 + 7 files changed, 24 insertions(+), 6 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 476362324ad..c4389d72b9a 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -282,6 +282,7 @@ impl TestChainMonitor { Arc::clone(&persister), Arc::clone(&keys), keys.get_peer_storage_key(), + false, )), logger, keys, diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 03d5e48a014..57fad600fac 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -603,6 +603,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let network = Network::Bitcoin; diff --git a/fuzz/src/lsps_message.rs b/fuzz/src/lsps_message.rs index 8371d1c5fc7..a4c4108a6cc 100644 --- a/fuzz/src/lsps_message.rs +++ b/fuzz/src/lsps_message.rs @@ -59,6 +59,7 @@ pub fn do_test(data: &[u8]) { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 17f79528b07..99f792fc531 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -373,6 +373,9 @@ pub struct ChainMonitor< #[cfg(peer_storage)] our_peerstorage_encryption_key: PeerStorageKey, + + /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. + deferred: bool, } impl< @@ -399,7 +402,7 @@ where pub fn new_async_beta( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, - _our_peerstorage_encryption_key: PeerStorageKey, + _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { let event_notifier = Arc::new(Notifier::new()); Self { @@ -416,6 +419,7 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, } } } @@ -605,7 +609,7 @@ where /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub fn new( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P, - _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, + _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { Self { monitors: RwLock::new(new_hash_map()), @@ -621,6 +625,7 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, } } @@ -1428,13 +1433,21 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { - self.watch_channel_internal(channel_id, monitor) + if !self.deferred { + return self.watch_channel_internal(channel_id, monitor); + } + + unimplemented!(); } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { - self.update_channel_internal(channel_id, update) + if !self.deferred { + return self.update_channel_internal(channel_id, update); + } + + unimplemented!(); } fn release_pending_monitor_events( diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index cd32d219b93..03632913383 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4927,6 +4927,7 @@ fn native_async_persist() { native_async_persister, Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, ); // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed` diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6bf04cd62a4..c8c03c40e28 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -21242,7 +21242,7 @@ pub mod bench { let seed_a = [1u8; 32]; let keys_manager_a = KeysManager::new(&seed_a, 42, 42, true); - let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key()); + let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key(), false); let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &message_router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), @@ -21252,7 +21252,7 @@ pub mod bench { let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); let seed_b = [2u8; 32]; let keys_manager_b = KeysManager::new(&seed_b, 42, 42, true); - let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key()); + let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key(), false); let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &message_router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index c74f2ca8223..a6790d4a352 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -543,6 +543,7 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), + false, ), keys_manager, expect_channel_force_closed: Mutex::new(None), From d25dd01056186f64a82af945d46995101c1ed25b Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 14:48:31 +0100 Subject: [PATCH 5/8] Implement deferred monitor write queueing and flushing Replace the unimplemented!() stubs with a full deferred write implementation. When ChainMonitor has deferred=true, Watch trait operations queue PendingMonitorOp entries instead of executing immediately. A new flush() method drains the queue and forwards operations to the internal watch/update methods, calling channel_monitor_updated on Completed status. The BackgroundProcessor is updated to capture pending_operation_count before persisting the ChannelManager, then flush that many writes afterward - ensuring monitor writes happen in the correct order relative to manager persistence. Key changes: - Add PendingMonitorOp enum and pending_ops queue to ChainMonitor - Implement flush() and pending_operation_count() public methods - Integrate flush calls in BackgroundProcessor (both sync and async) - Add TestChainMonitor::new_deferred, flush helpers, and auto-flush in release_pending_monitor_events for test compatibility - Add create_node_cfgs_deferred for deferred-mode test networks - Add unit tests for queue/flush mechanics and full payment flow Co-Authored-By: Claude Opus 4.6 --- lightning-background-processor/src/lib.rs | 81 +++++- lightning/src/chain/chainmonitor.rs | 316 +++++++++++++++++++++- lightning/src/ln/functional_test_utils.rs | 44 ++- lightning/src/util/test_utils.rs | 59 +++- 4 files changed, 485 insertions(+), 15 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index da415c70a32..33f5c83b971 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1120,7 +1120,15 @@ where let mut futures = Joiner::new(); + // We capture pending_operation_count inside the persistence branch to + // avoid a race: ChannelManager handlers queue deferred monitor ops + // before the persistence flag is set. Capturing outside would let us + // observe pending ops while the flag is still unset, causing us to + // flush monitor writes without persisting the ChannelManager. + let mut pending_monitor_writes = 0; + if channel_manager.get_cm().get_and_clear_needs_persistence() { + pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); log_trace!(logger, "Persisting ChannelManager..."); let fut = async { @@ -1317,6 +1325,10 @@ where res?; } + // Flush monitor operations that were pending before we persisted. New updates + // that arrived after are left for the next iteration. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { sleeper(ONION_MESSAGE_HANDLER_TIMER) }) { @@ -1373,6 +1385,7 @@ where // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1381,6 +1394,10 @@ where channel_manager.get_cm().encode(), ) .await?; + + // Flush monitor operations that were pending before final persistence. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store .write( @@ -1684,7 +1701,17 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } + + // We capture pending_operation_count inside the persistence + // branch to avoid a race: ChannelManager handlers queue + // deferred monitor ops before the persistence flag is set. + // Capturing outside would let us observe pending ops while the + // flag is still unset, causing us to flush monitor writes + // without persisting the ChannelManager. + let mut pending_monitor_writes = 0; + if channel_manager.get_cm().get_and_clear_needs_persistence() { + pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1695,6 +1722,10 @@ impl BackgroundProcessor { log_trace!(logger, "Done persisting ChannelManager."); } + // Flush monitor operations that were pending before we persisted. New + // updates that arrived after are left for the next iteration. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(liquidity_manager) = liquidity_manager.as_ref() { log_trace!(logger, "Persisting LiquidityManager..."); let _ = liquidity_manager.get_lm().persist().map_err(|e| { @@ -1809,12 +1840,17 @@ impl BackgroundProcessor { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), )?; + + // Flush monitor operations that were pending before final persistence. + chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1896,9 +1932,10 @@ mod tests { use bitcoin::transaction::{Transaction, TxOut}; use bitcoin::{Amount, ScriptBuf, Txid}; use core::sync::atomic::{AtomicBool, Ordering}; + use lightning::chain::chainmonitor; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::transaction::OutPoint; - use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter}; + use lightning::chain::{BestBlock, Confirm, Filter}; use lightning::events::{Event, PathFailure, ReplayEvent}; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{ @@ -2444,6 +2481,7 @@ mod tests { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + true, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; @@ -2567,6 +2605,8 @@ mod tests { (persist_dir, nodes) } + /// Opens a channel between two nodes without a running `BackgroundProcessor`, + /// so deferred monitor operations are flushed manually at each step. macro_rules! open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ begin_open_channel!($node_a, $node_b, $channel_value); @@ -2582,12 +2622,19 @@ mod tests { tx.clone(), ) .unwrap(); + // funding_transaction_generated does not call watch_channel, so no + // deferred op is queued and FundingCreated is available immediately. let msg_a = get_event_msg!( $node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id() ); $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a); + // Flush node_b's new monitor (watch_channel) so it releases the + // FundingSigned message. + $node_b + .chain_monitor + .flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger); get_event!($node_b, Event::ChannelPending); let msg_b = get_event_msg!( $node_b, @@ -2595,6 +2642,11 @@ mod tests { $node_a.node.get_our_node_id() ); $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b); + // Flush node_a's new monitor (watch_channel) queued by + // handle_funding_signed. + $node_a + .chain_monitor + .flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger); get_event!($node_a, Event::ChannelPending); tx }}; @@ -2720,6 +2772,20 @@ mod tests { confirm_transaction_depth(node, tx, ANTI_REORG_DELAY); } + /// Waits until the background processor has flushed all pending deferred monitor + /// operations for the given node. Panics if the pending count does not reach zero + /// within `EVENT_DEADLINE`. + fn wait_for_flushed(chain_monitor: &ChainMonitor) { + let start = std::time::Instant::now(); + while chain_monitor.pending_operation_count() > 0 { + assert!( + start.elapsed() < EVENT_DEADLINE, + "Pending monitor operations were not flushed within deadline" + ); + std::thread::sleep(Duration::from_millis(10)); + } + } + #[test] fn test_background_processor() { // Test that when a new channel is created, the ChannelManager needs to be re-persisted with @@ -3060,11 +3126,21 @@ mod tests { .node .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone()) .unwrap(); + // funding_transaction_generated does not call watch_channel, so no deferred op is + // queued and the FundingCreated message is available immediately. let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id); nodes[1].node.handle_funding_created(node_0_id, &msg_0); + // Node 1 has no bg processor, flush its new monitor (watch_channel) manually so + // events and FundingSigned are released. + nodes[1] + .chain_monitor + .flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger); get_event!(nodes[1], Event::ChannelPending); let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id); nodes[0].node.handle_funding_signed(node_1_id, &msg_1); + // Wait for the bg processor to flush the new monitor (watch_channel) queued by + // handle_funding_signed. + wait_for_flushed(&nodes[0].chain_monitor); channel_pending_recv .recv_timeout(EVENT_DEADLINE) .expect("ChannelPending not handled within deadline"); @@ -3125,6 +3201,9 @@ mod tests { error_message.to_string(), ) .unwrap(); + // Wait for the bg processor to flush the monitor update triggered by force close + // so the commitment tx is broadcast. + wait_for_flushed(&nodes[0].chain_monitor); let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 99f792fc531..c3811c5f42a 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -60,12 +60,21 @@ use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; +use alloc::collections::VecDeque; use alloc::sync::Arc; #[cfg(peer_storage)] use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. +enum PendingMonitorOp { + /// A new monitor to insert and persist. + NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor }, + /// An update to apply and persist. + Update { channel_id: ChannelId, update: ChannelMonitorUpdate }, +} + /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// @@ -376,6 +385,8 @@ pub struct ChainMonitor< /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. deferred: bool, + /// Queued monitor operations awaiting flush. Unused when `deferred` is `false`. + pending_ops: Mutex>>, } impl< @@ -398,6 +409,18 @@ where /// /// Note that async monitor updating is considered beta, and bugs may be triggered by its use. /// + /// When `deferred` is `true`, [`chain::Watch::watch_channel`] and + /// [`chain::Watch::update_channel`] calls are not executed immediately. Instead, they are + /// queued internally and must be flushed by the caller via [`Self::flush`]. Use + /// [`Self::pending_operation_count`] to check how many operations are queued, then call + /// [`Self::flush`] to process them. This allows the caller to ensure that the + /// [`ChannelManager`] is persisted before its associated monitors, avoiding the risk of + /// force closures from a crash between monitor and channel manager persistence. + /// + /// When `deferred` is `false`, monitor operations are executed inline as usual. + /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + /// /// This is not exported to bindings users as async is not supported outside of Rust. pub fn new_async_beta( chain_source: Option, broadcaster: T, logger: L, feeest: F, @@ -420,6 +443,7 @@ where #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, deferred, + pending_ops: Mutex::new(VecDeque::new()), } } } @@ -604,6 +628,16 @@ where /// is obtained by the [`ChannelManager`] through [`NodeSigner`] to decrypt peer backups. /// Using an inconsistent or incorrect key will result in the inability to decrypt previously encrypted backups. /// + /// When `deferred` is `true`, [`chain::Watch::watch_channel`] and + /// [`chain::Watch::update_channel`] calls are not executed immediately. Instead, they are + /// queued internally and must be flushed by the caller via [`Self::flush`]. Use + /// [`Self::pending_operation_count`] to check how many operations are queued, then call + /// [`Self::flush`] to process them. This allows the caller to ensure that the + /// [`ChannelManager`] is persisted before its associated monitors, avoiding the risk of + /// force closures from a crash between monitor and channel manager persistence. + /// + /// When `deferred` is `false`, monitor operations are executed inline as usual. + /// /// [`NodeSigner`]: crate::sign::NodeSigner /// [`NodeSigner::get_peer_storage_key`]: crate::sign::NodeSigner::get_peer_storage_key /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager @@ -626,6 +660,7 @@ where #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, deferred, + pending_ops: Mutex::new(VecDeque::new()), } } @@ -1219,6 +1254,87 @@ where }, } } + + /// Returns the number of pending monitor operations queued for later execution. + /// + /// When the `ChainMonitor` is constructed with `deferred` set to `true`, + /// [`chain::Watch::watch_channel`] and [`chain::Watch::update_channel`] calls are queued + /// instead of being executed immediately. Call this method to determine how many operations + /// are waiting, then pass the result to [`Self::flush`] to process them. + pub fn pending_operation_count(&self) -> usize { + self.pending_ops.lock().unwrap().len() + } + + /// Flushes the first `count` pending monitor operations that were queued while the + /// `ChainMonitor` operates in deferred mode. `count` must not exceed the number of + /// pending operations returned by [`Self::pending_operation_count`]. + /// + /// A typical usage pattern is to call [`Self::pending_operation_count`], persist the + /// [`ChannelManager`], then pass the count to this method to flush the queued operations. + /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + pub fn flush(&self, count: usize, logger: &L) { + if count > 0 { + log_info!(logger, "Flushing up to {} monitor operations", count); + } + for _ in 0..count { + let mut queue = self.pending_ops.lock().unwrap(); + let op = match queue.pop_front() { + Some(op) => op, + None => { + debug_assert!(false, "flush count exceeded queue length"); + return; + }, + }; + + let (channel_id, update_id, status) = match op { + PendingMonitorOp::NewMonitor { channel_id, monitor } => { + let logger = WithChannelMonitor::from(logger, &monitor, None); + let update_id = monitor.get_latest_update_id(); + log_trace!(logger, "Flushing new monitor"); + // Hold `pending_ops` across the internal call so that + // `watch_channel` (which checks `monitors` + `pending_ops` + // atomically) cannot race with this insertion. + match self.watch_channel_internal(channel_id, monitor) { + Ok(status) => { + drop(queue); + (channel_id, update_id, status) + }, + Err(()) => { + // `watch_channel` checks both `pending_ops` and `monitors` + // for duplicates before queueing, so this is unreachable. + unreachable!(); + }, + } + }, + PendingMonitorOp::Update { channel_id, update } => { + let logger = WithContext::from(logger, None, Some(channel_id), None); + log_trace!(logger, "Flushing monitor update {}", update.update_id); + // Release `pending_ops` before the internal call so that + // concurrent `update_channel` queuing is not blocked. + drop(queue); + let update_id = update.update_id; + let status = self.update_channel_internal(channel_id, &update); + (channel_id, update_id, status) + }, + }; + + match status { + ChannelMonitorUpdateStatus::Completed => { + let logger = WithContext::from(logger, None, Some(channel_id), None); + if let Err(e) = self.channel_monitor_updated(channel_id, update_id) { + log_error!(logger, "channel_monitor_updated failed: {:?}", e); + } + }, + ChannelMonitorUpdateStatus::InProgress => {}, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Neither watch_channel_internal nor update_channel_internal + // return UnrecoverableError. + unreachable!(); + }, + } + } + } } impl< @@ -1437,7 +1553,22 @@ where return self.watch_channel_internal(channel_id, monitor); } - unimplemented!(); + // Atomically check for duplicates in both the pending queue and the + // flushed monitor set. + let mut pending_ops = self.pending_ops.lock().unwrap(); + let monitors = self.monitors.read().unwrap(); + if monitors.contains_key(&channel_id) { + return Err(()); + } + let already_pending = pending_ops.iter().any(|op| match op { + PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, + _ => false, + }); + if already_pending { + return Err(()); + } + pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor }); + Ok(ChannelMonitorUpdateStatus::InProgress) } fn update_channel( @@ -1447,7 +1578,9 @@ where return self.update_channel_internal(channel_id, update); } - unimplemented!(); + let mut pending_ops = self.pending_ops.lock().unwrap(); + pending_ops.push_back(PendingMonitorOp::Update { channel_id, update: update.clone() }); + ChannelMonitorUpdateStatus::InProgress } fn release_pending_monitor_events( @@ -1577,12 +1710,22 @@ where #[cfg(test)] mod tests { - use crate::chain::channelmonitor::ANTI_REORG_DELAY; + use super::ChainMonitor; + use crate::chain::channelmonitor::{ChannelMonitorUpdate, ANTI_REORG_DELAY}; use crate::chain::{ChannelMonitorUpdateStatus, Watch}; use crate::events::{ClosureReason, Event}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, MessageSendEvent}; + use crate::ln::types::ChannelId; + use crate::sign::NodeSigner; + use crate::util::dyn_signer::DynSigner; + use crate::util::test_channel_signer::TestChannelSigner; + use crate::util::test_utils::{ + TestBroadcaster, TestChainSource, TestFeeEstimator, TestKeysInterface, TestLogger, + TestPersister, + }; use crate::{expect_payment_path_successful, get_event_msg}; + use bitcoin::Network; const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5; @@ -1840,4 +1983,171 @@ mod tests { }) .is_err()); } + + /// Concrete `ChainMonitor` type wired to the standard test utilities in deferred mode. + type TestDeferredChainMonitor<'a> = ChainMonitor< + TestChannelSigner, + &'a TestChainSource, + &'a TestBroadcaster, + &'a TestFeeEstimator, + &'a TestLogger, + &'a TestPersister, + &'a TestKeysInterface, + >; + + /// Creates a minimal `ChannelMonitorUpdate` with no actual update steps. + fn dummy_update(update_id: u64, channel_id: ChannelId) -> ChannelMonitorUpdate { + ChannelMonitorUpdate { updates: vec![], update_id, channel_id: Some(channel_id) } + } + + fn create_deferred_chain_monitor<'a>( + chain_source: &'a TestChainSource, broadcaster: &'a TestBroadcaster, + logger: &'a TestLogger, fee_est: &'a TestFeeEstimator, persister: &'a TestPersister, + keys: &'a TestKeysInterface, + ) -> TestDeferredChainMonitor<'a> { + ChainMonitor::new( + Some(chain_source), + broadcaster, + logger, + fee_est, + persister, + keys, + keys.get_peer_storage_key(), + true, + ) + } + + /// Tests queueing and flushing of both `watch_channel` and `update_channel` operations + /// when `ChainMonitor` is in deferred mode, verifying that operations flow through to + /// `Persist` and that `channel_monitor_updated` is called on `Completed` status. + #[test] + fn test_queue_and_flush() { + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_est = TestFeeEstimator::new(253); + let logger = TestLogger::new(); + let persister = TestPersister::new(); + let chain_source = TestChainSource::new(Network::Testnet); + let keys = TestKeysInterface::new(&[0; 32], Network::Testnet); + let deferred = create_deferred_chain_monitor( + &chain_source, + &broadcaster, + &logger, + &fee_est, + &persister, + &keys, + ); + + // Queue starts empty. + assert_eq!(deferred.pending_operation_count(), 0); + + // Queue a watch_channel, verifying InProgress status. + let chan = ChannelId::from_bytes([1u8; 32]); + let monitor = crate::util::test_utils::dummy_monitor(chan, |keys| { + TestChannelSigner::new(DynSigner::new(keys)) + }); + let status = Watch::watch_channel(&deferred, chan, monitor); + assert_eq!(status, Ok(ChannelMonitorUpdateStatus::InProgress)); + assert_eq!(deferred.pending_operation_count(), 1); + + // Nothing persisted yet — operations are only queued. + assert!(persister.new_channel_persistences.lock().unwrap().is_empty()); + + // Queue two updates after the watch. Update IDs must be sequential (starting + // from 1 since the initial monitor has update_id 0). + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(1, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(2, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!(deferred.pending_operation_count(), 3); + + // Flush 2 of 3: persist_new_channel returns Completed (triggers + // channel_monitor_updated), update_persisted_channel returns InProgress (does not). + persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); + persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + deferred.flush(2, &&logger); + + assert_eq!(deferred.pending_operation_count(), 1); + + // persist_new_channel was called for the watch. + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), 1); + + // Because persist_new_channel returned Completed, channel_monitor_updated was called, + // so update_id 0 should no longer be pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(!pending_for_chan.contains(&0)); + + // update_persisted_channel was called for update_id 1, and because it returned + // InProgress, update_id 1 remains pending. + let monitor_name = deferred.get_monitor(chan).unwrap().persistence_key(); + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&1)); + assert!(pending_for_chan.contains(&1)); + + // Flush remaining: update_persisted_channel returns Completed (default), triggers + // channel_monitor_updated. + deferred.flush(1, &&logger); + assert_eq!(deferred.pending_operation_count(), 0); + + // update_persisted_channel was called for update_id 2. + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&2)); + + // update_id 1 is still pending from the InProgress earlier, but update_id 2 was + // completed in this flush so it is no longer pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(pending_for_chan.contains(&1)); + assert!(!pending_for_chan.contains(&2)); + + // Flushing an empty queue is a no-op. + let persist_count_before = persister.new_channel_persistences.lock().unwrap().len(); + deferred.flush(0, &&logger); + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), persist_count_before); + } + + /// Tests that `ChainMonitor` in deferred mode properly defers `watch_channel` and + /// `update_channel` operations, verifying correctness through a complete channel open + /// and payment flow. Operations are auto-flushed via the `TestChainMonitor` + /// `release_pending_monitor_events` helper. + #[test] + fn test_deferred_monitor_payment() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs_deferred(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let chain_monitor_a = &nodes[0].chain_monitor.chain_monitor; + let chain_monitor_b = &nodes[1].chain_monitor.chain_monitor; + + create_announced_chan_between_nodes(&nodes, 0, 1); + + let (preimage, _hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 10_000); + claim_payment(&nodes[0], &[&nodes[1]], preimage); + + assert_eq!(chain_monitor_a.list_monitors().len(), 1); + assert_eq!(chain_monitor_b.list_monitors().len(), 1); + assert_eq!(chain_monitor_a.pending_operation_count(), 0); + assert_eq!(chain_monitor_b.pending_operation_count(), 0); + } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 2d971c3a100..a1805f6c1c8 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -4564,6 +4564,7 @@ pub fn create_chanmon_cfgs_internal( fn create_node_cfgs_internal<'a, F>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, + deferred: bool, ) -> Vec> where F: Fn( @@ -4576,14 +4577,25 @@ where for i in 0..node_count { let cfg = &chanmon_cfgs[i]; let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &cfg.logger)); - let chain_monitor = test_utils::TestChainMonitor::new( - Some(&cfg.chain_source), - &cfg.tx_broadcaster, - &cfg.logger, - &cfg.fee_estimator, - persisters[i], - &cfg.keys_manager, - ); + let chain_monitor = if deferred { + test_utils::TestChainMonitor::new_deferred( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + } else { + test_utils::TestChainMonitor::new( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + }; let seed = [i as u8; 32]; nodes.push(NodeCfg { @@ -4620,6 +4632,20 @@ pub fn create_node_cfgs<'a>( chanmon_cfgs, persisters, test_utils::TestMessageRouter::new_default, + false, + ) +} + +pub fn create_node_cfgs_deferred<'a>( + node_count: usize, chanmon_cfgs: &'a Vec, +) -> Vec> { + let persisters = chanmon_cfgs.iter().map(|c| &c.persister).collect(); + create_node_cfgs_internal( + node_count, + chanmon_cfgs, + persisters, + test_utils::TestMessageRouter::new_default, + true, ) } @@ -4632,6 +4658,7 @@ pub fn create_node_cfgs_with_persisters<'a>( chanmon_cfgs, persisters, test_utils::TestMessageRouter::new_default, + false, ) } @@ -4644,6 +4671,7 @@ pub fn create_node_cfgs_with_node_id_message_router<'a>( chanmon_cfgs, persisters, test_utils::TestMessageRouter::new_node_id_router, + false, ) } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index a6790d4a352..83c46631a8f 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -515,6 +515,7 @@ pub struct TestChainMonitor<'a> { &'a TestKeysInterface, >, pub keys_manager: &'a TestKeysInterface, + pub logger: &'a TestLogger, /// If this is set to Some(), the next update_channel call (not watch_channel) must be a /// ChannelForceClosed event for the given channel_id with should_broadcast set to the given /// boolean. @@ -530,6 +531,38 @@ impl<'a> TestChainMonitor<'a> { chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + false, + ) + } + + pub fn new_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + true, + ) + } + + fn with_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, deferred: bool, ) -> Self { Self { added_monitors: Mutex::new(Vec::new()), @@ -543,9 +576,10 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), - false, + deferred, ), keys_manager, + logger, expect_channel_force_closed: Mutex::new(None), expect_monitor_round_trip_fail: Mutex::new(None), #[cfg(feature = "std")] @@ -553,6 +587,10 @@ impl<'a> TestChainMonitor<'a> { } } + pub fn pending_operation_count(&self) -> usize { + self.chain_monitor.pending_operation_count() + } + pub fn complete_sole_pending_chan_update(&self, channel_id: &ChannelId) { let (_, latest_update) = self.latest_monitor_update_id.lock().unwrap().get(channel_id).unwrap().clone(); @@ -683,6 +721,12 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + // Auto-flush pending operations so that the ChannelManager can pick up monitor + // completion events. When not in deferred mode the queue is empty so this only + // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) + // work with deferred chain monitors. + let count = self.chain_monitor.pending_operation_count(); + self.chain_monitor.flush(count, &self.logger); return self.chain_monitor.release_pending_monitor_events(); } } @@ -842,6 +886,8 @@ pub struct TestPersister { /// The queue of update statuses we'll return. If none are queued, ::Completed will always be /// returned. pub update_rets: Mutex>, + /// When we get a persist_new_channel call, we push the monitor name here. + pub new_channel_persistences: Mutex>, /// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the /// [`ChannelMonitor::get_latest_update_id`] here. pub offchain_monitor_updates: Mutex>>, @@ -852,9 +898,15 @@ pub struct TestPersister { impl TestPersister { pub fn new() -> Self { let update_rets = Mutex::new(VecDeque::new()); + let new_channel_persistences = Mutex::new(Vec::new()); let offchain_monitor_updates = Mutex::new(new_hash_map()); let chain_sync_monitor_persistences = Mutex::new(VecDeque::new()); - Self { update_rets, offchain_monitor_updates, chain_sync_monitor_persistences } + Self { + update_rets, + new_channel_persistences, + offchain_monitor_updates, + chain_sync_monitor_persistences, + } } /// Queue an update status to return. @@ -864,8 +916,9 @@ impl TestPersister { } impl Persist for TestPersister { fn persist_new_channel( - &self, _monitor_name: MonitorName, _data: &ChannelMonitor, + &self, monitor_name: MonitorName, _data: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { + self.new_channel_persistences.lock().unwrap().push(monitor_name); if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() { return update_ret; } From 1e37fd68cdd3d01b5d8b4f1fb66aee1cdc0136d2 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 4 Mar 2026 12:36:31 +0100 Subject: [PATCH 6/8] Fail HTLCs from late counterparty commitment updates after funding spend When a ChannelMonitorUpdate containing a new counterparty commitment is dispatched (e.g. via deferred writes) before a channel force-closes but only applied to the in-memory monitor after the commitment transaction has already confirmed on-chain, the outbound HTLCs in that update must be failed back. Add fail_htlcs_from_update_after_funding_spend to ChannelMonitorImpl which detects this race condition during update_monitor. When a LatestCounterpartyCommitmentTXInfo or LatestCounterpartyCommitment update is applied and the funding output has already been spent, the function creates OnchainEvent::HTLCUpdate entries for HTLCs that are not present as non-dust outputs in the confirmed commitment transaction. These entries mature after ANTI_REORG_DELAY blocks, giving time for the peer to potentially broadcast the newer commitment. HTLCs that appear as non-dust outputs in the confirmed commitment (whether counterparty or holder) are skipped, as they will be resolved on-chain via the normal HTLC timeout/success path. HTLCs already fulfilled by the counterparty (tracked in counterparty_fulfilled_htlcs) are also skipped. Duplicate failures from previously-known counterparty commitments are handled gracefully by the ChannelManager. AI tools were used in preparing this commit. --- lightning/src/chain/channelmonitor.rs | 136 ++++++++++++ lightning/src/ln/chanmon_update_fail_tests.rs | 202 ++++++++++++++++++ lightning/src/util/test_utils.rs | 11 +- 3 files changed, 347 insertions(+), 2 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index eb763de1d5c..7b7267b9fbb 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -4290,6 +4290,41 @@ impl ChannelMonitorImpl { self.latest_update_id = updates.update_id; + // If a counterparty commitment update was applied while the funding output has already + // been spent on-chain, fail back the outbound HTLCs from the update. This handles the + // race where a monitor update is dispatched before the channel force-closes but only + // applied after the commitment transaction confirms. + for update in updates.updates.iter() { + let htlcs: Vec<(&HTLCSource, PaymentHash, u64)> = match update { + ChannelMonitorUpdateStep::LatestCounterpartyCommitmentTXInfo { + htlc_outputs, .. + } => htlc_outputs + .iter() + .filter_map(|(htlc, source)| { + source.as_ref().map(|s| (&**s, htlc.payment_hash, htlc.amount_msat)) + }) + .collect(), + ChannelMonitorUpdateStep::LatestCounterpartyCommitment { + commitment_txs, htlc_data, + } => { + let nondust = commitment_txs[0] + .nondust_htlcs() + .iter() + .filter(|htlc| !htlc.offered) + .zip(htlc_data.nondust_htlc_sources.iter()) + .map(|(htlc, source)| (source, htlc.payment_hash, htlc.amount_msat)); + let dust = htlc_data.dust_htlcs.iter().filter_map(|(htlc, source)| { + source.as_ref().map(|s| (s, htlc.payment_hash, htlc.amount_msat)) + }); + nondust.chain(dust).collect() + }, + _ => continue, + }; + if !htlcs.is_empty() { + self.fail_htlcs_from_update_after_funding_spend(&htlcs, logger); + } + } + // Refuse updates after we've detected a spend onchain (or if the channel was otherwise // closed), but only if the update isn't the kind of update we expect to see after channel // closure. @@ -4336,6 +4371,107 @@ impl ChannelMonitorImpl { self.funding_spend_seen || self.lockdown_from_offchain || self.holder_tx_signed } + /// Given outbound HTLCs from a counterparty commitment update, checks if the funding output + /// has been spent on-chain. If so, creates `OnchainEvent::HTLCUpdate` entries to fail back + /// those HTLCs. + /// + /// This handles the race where a `ChannelMonitorUpdate` with a new counterparty commitment + /// is dispatched (e.g., via deferred writes) before the channel force-closes, but only + /// applied to the in-memory monitor after the commitment transaction has already confirmed. + fn fail_htlcs_from_update_after_funding_spend( + &mut self, htlcs: &[(&HTLCSource, PaymentHash, u64)], logger: &WithContext, + ) { + // Determine the confirmed spending txid, either from the fully confirmed field or + // from a pending FundingSpendConfirmation event still awaiting ANTI_REORG_DELAY. + // There is at most one FundingSpendConfirmation at a time (splice promotions happen + // via AlternativeFundingConfirmation on a separate funding scope). + let spending_txid = if let Some(txid) = self.funding_spend_confirmed { + txid + } else if let Some(txid) = + self.onchain_events_awaiting_threshold_conf.iter().find_map(|event| { + if let OnchainEvent::FundingSpendConfirmation { .. } = &event.event { + Some(event.txid) + } else { + None + } + }) { + txid + } else { + return; + }; + + // Collect the confirmed commitment's non-dust HTLCs so we can skip HTLCs that have + // on-chain outputs (those will be resolved via the normal HTLC timeout/success path). + let confirmed_commitment_htlcs: Vec<_> = if let Some(htlc_list) = + self.funding.counterparty_claimable_outpoints.get(&spending_txid) + { + htlc_list.iter().map(|(htlc, _)| htlc.clone()).collect() + } else { + // Holder commitment confirmed. Check both current and previous since we + // don't track which holder commitment txid was broadcast. + let funding = &self.funding; + let current_txid = funding.current_holder_commitment_tx.trust().txid(); + if current_txid == spending_txid { + holder_commitment_htlcs!(self, CURRENT).cloned().collect() + } else if funding + .prev_holder_commitment_tx + .as_ref() + .is_some_and(|tx| tx.trust().txid() == spending_txid) + { + holder_commitment_htlcs!(self, PREV) + .map(|iter| iter.cloned().collect()) + .unwrap_or_default() + } else { + Vec::new() + } + }; + + // We use best_block.height so that entries wait ANTI_REORG_DELAY blocks before + // maturing. Since we don't know whether `commitment_signed` reached the peer, + // they may still broadcast the newer commitment, and we need to allow time for + // that before treating these HTLCs as failed. + // + // HTLCs that were already failed via `fail_unbroadcast_htlcs` (from a + // previously-known counterparty commitment) may produce duplicate entries here. + // This is safe because the `ChannelManager` handles duplicate `HTLCEvent`s + // gracefully. + let entry_height = self.best_block.height; + for &(source, payment_hash, amount_msat) in htlcs { + // Skip HTLCs that appear as non-dust outputs in the confirmed commitment. + let in_confirmed_commitment = confirmed_commitment_htlcs.iter().any(|htlc| { + htlc.transaction_output_index.is_some() + && htlc.payment_hash == payment_hash + && htlc.amount_msat == amount_msat + }); + if in_confirmed_commitment { + continue; + } + if self.counterparty_fulfilled_htlcs.get(&SentHTLCId::from_source(source)).is_some() { + continue; + } + let entry = OnchainEventEntry { + txid: spending_txid, + transaction: None, + height: entry_height, + block_hash: None, + event: OnchainEvent::HTLCUpdate { + source: source.clone(), + payment_hash, + htlc_value_satoshis: Some(amount_msat / 1000), + commitment_tx_output_idx: None, + }, + }; + let logger = WithContext::from(logger, None, None, Some(payment_hash)); + log_trace!( + logger, + "Failing HTLC from late counterparty commitment update, \ + waiting for confirmation (at height {})", + entry.confirmation_threshold() + ); + self.onchain_events_awaiting_threshold_conf.push(entry); + } + } + fn get_latest_update_id(&self) -> u64 { self.latest_update_id } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 03632913383..24aa33532e3 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -48,6 +48,7 @@ use crate::util::test_utils; use crate::prelude::*; use crate::sync::{Arc, Mutex}; use bitcoin::hashes::Hash; +use core::sync::atomic::Ordering; #[test] fn test_monitor_and_persister_update_fail() { @@ -5171,3 +5172,204 @@ fn test_mpp_claim_to_holding_cell() { expect_payment_claimable!(nodes[3], paymnt_hash_2, payment_secret_2, 400_000); claim_payment(&nodes[2], &[&nodes[3]], preimage_2); } + +fn do_test_late_counterparty_commitment_update_after_funding_spend(fully_confirmed: bool) { + // Tests that when a ChannelMonitorUpdate containing a new counterparty commitment (with an + // outbound HTLC) is applied to a monitor that has already seen the funding output spent + // on-chain, the HTLC is properly failed back. + // + // This exercises the race condition where: + // 1. A sends an HTLC to B, creating a monitor update with LatestCounterpartyCommitmentTXInfo + // 2. In deferred-write mode, this update is queued but not applied to the in-memory monitor + // 3. B's commitment transaction (without the HTLC) is broadcast and confirmed + // 4. The queued update is flushed, applying the counterparty commitment to the monitor + // 5. The monitor detects the funding spend and fails the HTLC + // + // When `fully_confirmed` is true, ANTI_REORG_DELAY has fully passed before the flush, so + // funding_spend_confirmed is set. Otherwise, the FundingSpendConfirmation entry is still + // pending in onchain_events_awaiting_threshold_conf. + + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs_deferred(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let node_b_id = nodes[1].node.get_our_node_id(); + + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + // Get B's commitment transaction before any HTLCs are added. This is the transaction that + // will be mined on-chain, simulating B broadcasting while A's monitor update is pending. + let bs_commitment_tx = get_local_commitment_txn!(nodes[1], chan_id); + assert_eq!(bs_commitment_tx.len(), 1); + + // Pause auto-flush on A so that the monitor update from send_payment is queued but NOT + // applied to the in-memory monitor. + nodes[0].chain_monitor.pause_flush.store(true, Ordering::Release); + + // Send a payment from A to B. The ChannelManager creates a LatestCounterpartyCommitmentTXInfo + // monitor update, but in deferred mode with pause_flush it remains queued. + let (route, payment_hash, _, payment_secret) = + get_route_and_payment_hash!(nodes[0], nodes[1], 1_000_000); + let payment_id = PaymentId(payment_hash.0); + nodes[0] + .node + .send_payment_with_route( + route, + payment_hash, + RecipientOnionFields::secret_only(payment_secret, 1_000_000), + payment_id, + ) + .unwrap(); + check_added_monitors(&nodes[0], 1); + + // Mine B's (old) commitment transaction on A and advance blocks. When fully_confirmed, + // advance past ANTI_REORG_DELAY so FundingSpendConfirmation is consumed and + // funding_spend_confirmed is set. Otherwise, stop one block short so the entry remains + // in onchain_events_awaiting_threshold_conf. + mine_transaction(&nodes[0], &bs_commitment_tx[0]); + let extra_blocks = if fully_confirmed { ANTI_REORG_DELAY - 1 } else { ANTI_REORG_DELAY - 2 }; + connect_blocks(&nodes[0], extra_blocks); + + if fully_confirmed { + // The channel close event, error message, and ChannelForceClosed monitor update were + // generated during block connection. Consume them before flushing. + check_closed_event( + &nodes[0], + 1, + ClosureReason::CommitmentTxConfirmed, + &[node_b_id], + 100000, + ); + check_closed_broadcast(&nodes[0], 1, true); + check_added_monitors(&nodes[0], 1); + } + + // Flush the queued monitor updates. This applies the LatestCounterpartyCommitmentTXInfo + // (and ChannelForceClosed) to the monitor, which triggers fail_htlcs_from_update_after_ + // funding_spend to create OnchainEvent::HTLCUpdate entries for the HTLC. + nodes[0].chain_monitor.pause_flush.store(false, Ordering::Release); + let pending_count = nodes[0].chain_monitor.chain_monitor.pending_operation_count(); + nodes[0].chain_monitor.chain_monitor.flush(pending_count, &nodes[0].logger); + + if !fully_confirmed { + // The channel close event, error message, and ChannelForceClosed monitor update were + // generated during block connection. + check_closed_event( + &nodes[0], + 1, + ClosureReason::CommitmentTxConfirmed, + &[node_b_id], + 100000, + ); + check_closed_broadcast(&nodes[0], 1, true); + check_added_monitors(&nodes[0], 1); + } + + // Advance ANTI_REORG_DELAY blocks so the OnchainEvent::HTLCUpdate entries (created at + // best_block.height during the flush) mature into MonitorEvent::HTLCEvent. + connect_blocks(&nodes[0], ANTI_REORG_DELAY); + + // The ChannelManager processes the MonitorEvent::HTLCEvent and fails the payment. + expect_payment_failed_conditions( + &nodes[0], + payment_hash, + false, + PaymentFailedConditions::new(), + ); + // The payment failure generates a ReleasePaymentComplete monitor update. + check_added_monitors(&nodes[0], 1); +} + +#[test] +fn test_late_counterparty_commitment_update_after_funding_spend() { + do_test_late_counterparty_commitment_update_after_funding_spend(false); +} + +#[test] +fn test_late_counterparty_commitment_update_after_funding_spend_fully_confirmed() { + do_test_late_counterparty_commitment_update_after_funding_spend(true); +} + +#[test] +fn test_late_counterparty_commitment_update_after_holder_commitment_spend() { + // Tests that when the confirmed spending transaction is a holder commitment, HTLCs that + // have non-dust outputs in the holder commitment are NOT failed by + // fail_htlcs_from_update_after_funding_spend (they'll be resolved on-chain via + // HTLC-timeout), while HTLCs only present in the late counterparty commitment update ARE + // failed. + // + // Setup: + // 1. Route HTLC X from A to B (fully committed in both holder and counterparty commitments) + // 2. Grab A's holder commitment (which contains HTLC X) + // 3. Pause flush, then send HTLC Y from A to B (counterparty commitment update is queued) + // 4. Mine A's holder commitment (contains X but not Y) + // 5. Flush the queued update (contains both X and Y) + // 6. Verify: X is not failed by our code (on-chain output), Y is failed + + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs_deferred(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let node_b_id = nodes[1].node.get_our_node_id(); + + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + // Route HTLC X fully (committed in both commitments). + let (_, _payment_hash_x, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); + + // Get A's holder commitment which now contains HTLC X. + let as_commitment_tx = get_local_commitment_txn!(nodes[0], chan_id); + assert_eq!(as_commitment_tx.len(), 1); + // Verify the commitment tx has at least 3 outputs (to_local, to_remote, HTLC X). + assert!(as_commitment_tx[0].output.len() >= 3); + + // Pause flush so the next monitor update is queued. + nodes[0].chain_monitor.pause_flush.store(true, Ordering::Release); + + // Send HTLC Y. The LatestCounterpartyCommitmentTXInfo (containing both X and Y) is queued. + let (route, payment_hash_y, _, payment_secret_y) = + get_route_and_payment_hash!(nodes[0], nodes[1], 2_000_000); + let payment_id_y = PaymentId(payment_hash_y.0); + nodes[0] + .node + .send_payment_with_route( + route, + payment_hash_y, + RecipientOnionFields::secret_only(payment_secret_y, 2_000_000), + payment_id_y, + ) + .unwrap(); + check_added_monitors(&nodes[0], 1); + + // Mine A's holder commitment (contains X but not Y). + mine_transaction(&nodes[0], &as_commitment_tx[0]); + connect_blocks(&nodes[0], ANTI_REORG_DELAY - 2); + + // Flush the queued monitor updates. + nodes[0].chain_monitor.pause_flush.store(false, Ordering::Release); + let pending_count = nodes[0].chain_monitor.chain_monitor.pending_operation_count(); + nodes[0].chain_monitor.chain_monitor.flush(pending_count, &nodes[0].logger); + + check_closed_event(&nodes[0], 1, ClosureReason::CommitmentTxConfirmed, &[node_b_id], 100000); + check_closed_broadcast(&nodes[0], 1, true); + check_added_monitors(&nodes[0], 1); + + // Advance ANTI_REORG_DELAY blocks so OnchainEvent::HTLCUpdate entries mature. + connect_blocks(&nodes[0], ANTI_REORG_DELAY); + + // Only HTLC Y should be failed by our code. HTLC X has an on-chain output in the holder + // commitment and will be resolved via the HTLC-timeout path. + expect_payment_failed_conditions( + &nodes[0], + payment_hash_y, + false, + PaymentFailedConditions::new(), + ); + check_added_monitors(&nodes[0], 1); + + // Verify HTLC X was NOT failed (no payment failure event for it at this point). + // It will be resolved later via the on-chain HTLC-timeout claim. + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); +} diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 83c46631a8f..33097ec33c5 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -525,6 +525,10 @@ pub struct TestChainMonitor<'a> { pub expect_monitor_round_trip_fail: Mutex>, #[cfg(feature = "std")] pub write_blocker: Mutex>>, + /// When set to `true`, `release_pending_monitor_events` will not auto-flush pending + /// deferred operations. This allows tests to control exactly when queued monitor updates + /// are applied to the in-memory monitor. + pub pause_flush: AtomicBool, } impl<'a> TestChainMonitor<'a> { pub fn new( @@ -584,6 +588,7 @@ impl<'a> TestChainMonitor<'a> { expect_monitor_round_trip_fail: Mutex::new(None), #[cfg(feature = "std")] write_blocker: Mutex::new(None), + pause_flush: AtomicBool::new(false), } } @@ -725,8 +730,10 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // completion events. When not in deferred mode the queue is empty so this only // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) // work with deferred chain monitors. - let count = self.chain_monitor.pending_operation_count(); - self.chain_monitor.flush(count, &self.logger); + if !self.pause_flush.load(Ordering::Acquire) { + let count = self.chain_monitor.pending_operation_count(); + self.chain_monitor.flush(count, &self.logger); + } return self.chain_monitor.release_pending_monitor_events(); } } From d45d07de9cbc29c7e56abc692f48de9b7d1732a8 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 4 Mar 2026 17:38:31 +0100 Subject: [PATCH 7/8] f: fixes --- lightning/src/chain/channelmonitor.rs | 68 +++++++++++++++------------ 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 7b7267b9fbb..8cf17d5f6c8 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -4295,15 +4295,17 @@ impl ChannelMonitorImpl { // race where a monitor update is dispatched before the channel force-closes but only // applied after the commitment transaction confirms. for update in updates.updates.iter() { - let htlcs: Vec<(&HTLCSource, PaymentHash, u64)> = match update { + match update { ChannelMonitorUpdateStep::LatestCounterpartyCommitmentTXInfo { htlc_outputs, .. - } => htlc_outputs - .iter() - .filter_map(|(htlc, source)| { - source.as_ref().map(|s| (&**s, htlc.payment_hash, htlc.amount_msat)) - }) - .collect(), + } => { + self.fail_htlcs_from_update_after_funding_spend( + htlc_outputs.iter().filter_map(|(htlc, source)| { + source.as_ref().map(|s| (&**s, htlc.payment_hash, htlc.amount_msat)) + }), + logger, + ); + }, ChannelMonitorUpdateStep::LatestCounterpartyCommitment { commitment_txs, htlc_data, } => { @@ -4316,12 +4318,12 @@ impl ChannelMonitorImpl { let dust = htlc_data.dust_htlcs.iter().filter_map(|(htlc, source)| { source.as_ref().map(|s| (s, htlc.payment_hash, htlc.amount_msat)) }); - nondust.chain(dust).collect() + self.fail_htlcs_from_update_after_funding_spend( + nondust.chain(dust), + logger, + ); }, - _ => continue, - }; - if !htlcs.is_empty() { - self.fail_htlcs_from_update_after_funding_spend(&htlcs, logger); + _ => {}, } } @@ -4378,8 +4380,9 @@ impl ChannelMonitorImpl { /// This handles the race where a `ChannelMonitorUpdate` with a new counterparty commitment /// is dispatched (e.g., via deferred writes) before the channel force-closes, but only /// applied to the in-memory monitor after the commitment transaction has already confirmed. - fn fail_htlcs_from_update_after_funding_spend( - &mut self, htlcs: &[(&HTLCSource, PaymentHash, u64)], logger: &WithContext, + fn fail_htlcs_from_update_after_funding_spend<'a, L: Logger>( + &mut self, htlcs: impl Iterator, + logger: &WithContext, ) { // Determine the confirmed spending txid, either from the fully confirmed field or // from a pending FundingSpendConfirmation event still awaiting ANTI_REORG_DELAY. @@ -4400,26 +4403,37 @@ impl ChannelMonitorImpl { return; }; - // Collect the confirmed commitment's non-dust HTLCs so we can skip HTLCs that have - // on-chain outputs (those will be resolved via the normal HTLC timeout/success path). - let confirmed_commitment_htlcs: Vec<_> = if let Some(htlc_list) = + // Collect sources for non-dust outbound HTLCs in the confirmed commitment so we + // can skip them (those will be resolved via the normal HTLC timeout/success path). + let confirmed_nondust_sources: Vec<&HTLCSource> = if let Some(htlc_list) = self.funding.counterparty_claimable_outpoints.get(&spending_txid) { - htlc_list.iter().map(|(htlc, _)| htlc.clone()).collect() + // Counterparty commitment: our outbound HTLCs have sources. + htlc_list + .iter() + .filter_map(|(htlc, source)| { + htlc.transaction_output_index.and(source.as_ref().map(|s| s.as_ref())) + }) + .collect() } else { // Holder commitment confirmed. Check both current and previous since we // don't track which holder commitment txid was broadcast. + let nondust_source = |(htlc, source): (&HTLCOutputInCommitment, _)| { + htlc.transaction_output_index.and(source) + }; let funding = &self.funding; let current_txid = funding.current_holder_commitment_tx.trust().txid(); if current_txid == spending_txid { - holder_commitment_htlcs!(self, CURRENT).cloned().collect() + holder_commitment_htlcs!(self, CURRENT_WITH_SOURCES) + .filter_map(nondust_source) + .collect() } else if funding .prev_holder_commitment_tx .as_ref() .is_some_and(|tx| tx.trust().txid() == spending_txid) { - holder_commitment_htlcs!(self, PREV) - .map(|iter| iter.cloned().collect()) + holder_commitment_htlcs!(self, PREV_WITH_SOURCES) + .map(|iter| iter.filter_map(nondust_source).collect()) .unwrap_or_default() } else { Vec::new() @@ -4436,14 +4450,10 @@ impl ChannelMonitorImpl { // This is safe because the `ChannelManager` handles duplicate `HTLCEvent`s // gracefully. let entry_height = self.best_block.height; - for &(source, payment_hash, amount_msat) in htlcs { - // Skip HTLCs that appear as non-dust outputs in the confirmed commitment. - let in_confirmed_commitment = confirmed_commitment_htlcs.iter().any(|htlc| { - htlc.transaction_output_index.is_some() - && htlc.payment_hash == payment_hash - && htlc.amount_msat == amount_msat - }); - if in_confirmed_commitment { + for (source, payment_hash, amount_msat) in htlcs { + // Skip HTLCs that have non-dust outputs in the confirmed commitment + // (those will be resolved via the normal HTLC timeout/success path). + if confirmed_nondust_sources.contains(&source) { continue; } if self.counterparty_fulfilled_htlcs.get(&SentHTLCId::from_source(source)).is_some() { From 3bca4faf581626ebc7caf0bdac7d11255cd7894f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 5 Mar 2026 09:46:39 +0100 Subject: [PATCH 8/8] f: update comment --- lightning/src/chain/channelmonitor.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 8cf17d5f6c8..2f1ed454b9e 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -4440,10 +4440,12 @@ impl ChannelMonitorImpl { } }; - // We use best_block.height so that entries wait ANTI_REORG_DELAY blocks before - // maturing. Since we don't know whether `commitment_signed` reached the peer, - // they may still broadcast the newer commitment, and we need to allow time for - // that before treating these HTLCs as failed. + // We push entries to `onchain_events_awaiting_threshold_conf` rather than failing + // immediately as a safeguard. With deferred writes, `commitment_signed` is held + // back until the monitor update completes, so the peer should not have received + // it yet. However, waiting ANTI_REORG_DELAY blocks is defensive and also handles + // reorgs correctly (entries are removed by `blocks_disconnected` if the funding + // spend is reorged out, and the monitor now has the latest commitment stored). // // HTLCs that were already failed via `fail_unbroadcast_htlcs` (from a // previously-known counterparty commitment) may produce duplicate entries here.