From f91c4fe1f8fc8992bdde28b9aead92071978d8ef Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 27 Feb 2026 12:09:20 +0100 Subject: [PATCH] Remove Completed from Watch trait, making updates always-async The Watch trait previously shared ChannelMonitorUpdateStatus (with Completed, InProgress, UnrecoverableError variants) with the Persist trait. This meant ChannelManager had to handle both synchronous completion and asynchronous completion, including a runtime check (monitor_update_type atomic) to ensure they weren't mixed. This commit makes the Watch trait always-async: watch_channel returns Result<(), ()> and update_channel returns (). ChainMonitor maps Persist::Completed onto an immediately-queued MonitorEvent::Completed so the channel unblocks on the next event processing round. When Persist returns Completed but prior async updates are still pending, no event is emitted since the prior updates' eventual completion via channel_monitor_updated will cover this update too. This allows removing from ChannelManager: - The WatchUpdateStatus enum (and former ChannelMonitorUpdateStatus usage in Watch) - The monitor_update_type atomic and its mode-mixing checks - handle_monitor_update_res (was just a log, inlined) - handle_post_close_monitor_update (trivial wrapper, inlined) - handle_new_monitor_update_with_status (sync completion path) AI tools were used in preparing this commit. --- lightning/src/chain/chainmonitor.rs | 80 ++++- lightning/src/chain/mod.rs | 47 ++- lightning/src/ln/channel.rs | 2 - lightning/src/ln/channelmanager.rs | 492 ++++++---------------------- lightning/src/util/errors.rs | 6 +- 5 files changed, 185 insertions(+), 442 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7db1b697c2b..9ee6ebb529b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1032,11 +1032,13 @@ where /// See the release notes for LDK 0.1 for more information on this requirement. /// /// [`ChannelMonitor`]s which do not need to be persisted (i.e. were last written by LDK 0.1 or - /// later) will be loaded without persistence and this method will return - /// [`ChannelMonitorUpdateStatus::Completed`]. + /// later) will be loaded without persistence and a [`MonitorEvent::Completed`] will be + /// immediately queued. + /// + /// [`MonitorEvent::Completed`]: channelmonitor::MonitorEvent::Completed pub fn load_existing_monitor( &self, channel_id: ChannelId, monitor: ChannelMonitor, - ) -> Result { + ) -> Result<(), ()> { if !monitor.written_by_0_1_or_later() { return chain::Watch::watch_channel(self, channel_id, monitor); } @@ -1054,9 +1056,20 @@ where if let Some(ref chain_source) = self.chain_source { monitor.load_outputs_to_watch(chain_source, &self.logger); } + // The monitor is already persisted, so generate MonitorEvent::Completed immediately. + let funding_txo = monitor.get_funding_txo(); + let counterparty_node_id = monitor.get_counterparty_node_id(); + let update_id = monitor.get_latest_update_id(); entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(Vec::new()) }); + self.pending_monitor_events.lock().unwrap().push(( + funding_txo, + channel_id, + vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id: update_id }], + counterparty_node_id, + )); + self.event_notifier.notify(); - Ok(ChannelMonitorUpdateStatus::Completed) + Ok(()) } } @@ -1271,7 +1284,7 @@ where { fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, - ) -> Result { + ) -> Result<(), ()> { let logger = WithChannelMonitor::from(&self.logger, &monitor, None); let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(channel_id) { @@ -1285,33 +1298,52 @@ where let update_id = monitor.get_latest_update_id(); let mut pending_monitor_updates = Vec::new(); let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { + let persist_completed = match persist_res { ChannelMonitorUpdateStatus::InProgress => { log_info!(logger, "Persistence of new ChannelMonitor in progress",); pending_monitor_updates.push(update_id); + false }, ChannelMonitorUpdateStatus::Completed => { log_info!(logger, "Persistence of new ChannelMonitor completed",); + true }, ChannelMonitorUpdateStatus::UnrecoverableError => { let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; log_error!(logger, "{}", err_str); panic!("{}", err_str); }, - } + }; if let Some(ref chain_source) = self.chain_source { monitor.load_outputs_to_watch(chain_source, &self.logger); } + // Capture monitor info before moving it into the map. + let funding_txo = monitor.get_funding_txo(); + let counterparty_node_id = monitor.get_counterparty_node_id(); entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates), }); - Ok(persist_res) + if persist_completed { + // Persist returned Completed, so generate MonitorEvent::Completed immediately. + // We can't call channel_monitor_updated here because we hold the monitors write + // lock. Instead, push directly to pending_monitor_events which is a separate Mutex. + self.pending_monitor_events.lock().unwrap().push(( + funding_txo, + channel_id, + vec![MonitorEvent::Completed { + funding_txo, + channel_id, + monitor_update_id: update_id, + }], + counterparty_node_id, + )); + self.event_notifier.notify(); + } + Ok(()) } - fn update_channel( - &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus { + fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) { // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. debug_assert_eq!(update.channel_id.unwrap(), channel_id); @@ -1328,7 +1360,7 @@ where #[cfg(debug_assertions)] panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); #[cfg(not(debug_assertions))] - ChannelMonitorUpdateStatus::InProgress + return; }, Some(monitor_state) => { let monitor = &monitor_state.monitor; @@ -1382,6 +1414,24 @@ where "Persistence of ChannelMonitorUpdate id {:?} completed", update_id, ); + // If no prior async updates are pending, we can immediately signal + // completion. Otherwise, completion of those prior updates via + // channel_monitor_updated will eventually generate the event (using + // monitor.get_latest_update_id() which covers this update too). + if !monitor_state.has_pending_updates(&pending_monitor_updates) { + let funding_txo = monitor.get_funding_txo(); + self.pending_monitor_events.lock().unwrap().push(( + funding_txo, + channel_id, + vec![MonitorEvent::Completed { + funding_txo, + channel_id, + monitor_update_id: monitor.get_latest_update_id(), + }], + monitor.get_counterparty_node_id(), + )); + self.event_notifier.notify(); + } }, ChannelMonitorUpdateStatus::UnrecoverableError => { // Take the monitors lock for writing so that we poison it and any future @@ -1413,12 +1463,6 @@ where }); } } - - if update_res.is_err() { - ChannelMonitorUpdateStatus::InProgress - } else { - persist_res - } }, } } diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index bc47f1b1db6..62a0847447b 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -207,7 +207,7 @@ pub trait Confirm { /// An enum representing the status of a channel monitor update persistence. /// -/// These are generally used as the return value for an implementation of [`Persist`] which is used +/// These are used as the return value for an implementation of [`Persist`] which is used /// as the storage layer for a [`ChainMonitor`]. See the docs on [`Persist`] for a high-level /// explanation of how to handle different cases. /// @@ -234,7 +234,7 @@ pub enum ChannelMonitorUpdateStatus { /// be available on restart even if the application crashes. /// /// If you return this variant, you cannot later return [`InProgress`] from the same instance of - /// [`Persist`]/[`Watch`] without first restarting. + /// [`Persist`] without first restarting. /// /// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress /// [`Persist`]: chainmonitor::Persist @@ -264,7 +264,7 @@ pub enum ChannelMonitorUpdateStatus { /// remaining cases are fixed, in rare cases, *using this feature may lead to funds loss*. /// /// If you return this variant, you cannot later return [`Completed`] from the same instance of - /// [`Persist`]/[`Watch`] without first restarting. + /// [`Persist`] without first restarting. /// /// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress /// [`Completed`]: ChannelMonitorUpdateStatus::Completed @@ -293,7 +293,8 @@ pub enum ChannelMonitorUpdateStatus { /// persisted to disk to ensure that the latest [`ChannelMonitor`] state can be reloaded if the /// application crashes. /// -/// See method documentation and [`ChannelMonitorUpdateStatus`] for specific requirements. +/// Updates are always considered in-progress until completion is signaled asynchronously via +/// [`MonitorEvent::Completed`] in [`Watch::release_pending_monitor_events`]. pub trait Watch { /// Watches a channel identified by `channel_id` using `monitor`. /// @@ -312,26 +313,30 @@ pub trait Watch { /// [`blocks_disconnected`]: channelmonitor::ChannelMonitor::blocks_disconnected fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, - ) -> Result; + ) -> Result<(), ()>; /// Updates a channel identified by `channel_id` by applying `update` to its monitor. /// /// Implementations must call [`ChannelMonitor::update_monitor`] with the given update. This - /// may fail (returning an `Err(())`), in which case this should return - /// [`ChannelMonitorUpdateStatus::InProgress`] (and the update should never complete). This + /// may fail (returning an `Err(())`), in which case the update should never complete. This /// generally implies the channel has been closed (either by the funding outpoint being spent /// on-chain or the [`ChannelMonitor`] having decided to do so and broadcasted a transaction), /// and the [`ChannelManager`] state will be updated once it sees the funding spend on-chain. /// - /// In general, persistence failures should be retried after returning - /// [`ChannelMonitorUpdateStatus::InProgress`] and eventually complete. If a failure truly - /// cannot be retried, the node should shut down immediately after returning - /// [`ChannelMonitorUpdateStatus::UnrecoverableError`], see its documentation for more info. + /// The update is considered in-progress until a [`MonitorEvent::Completed`] is provided via + /// [`Watch::release_pending_monitor_events`]. While in-progress, the channel will be + /// "frozen", preventing us from revoking old states or submitting a new commitment + /// transaction to the counterparty. + /// + /// Even when a channel has been "frozen", updates to the [`ChannelMonitor`] can continue to + /// occur (e.g. if an inbound HTLC which we forwarded was claimed upstream, resulting in us + /// attempting to claim it on this channel) and those updates must still be persisted. + /// + /// In general, persistence failures should be retried in the background and eventually + /// complete. If a failure truly cannot be retried, the node should shut down. /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager - fn update_channel( - &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus; + fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate); /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. @@ -339,9 +344,6 @@ pub trait Watch { /// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no /// further events may be returned here until the [`ChannelMonitor`] has been fully persisted /// to disk. - /// - /// For details on asynchronous [`ChannelMonitor`] updating and returning - /// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`]. fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; @@ -352,13 +354,11 @@ impl + ?Sized, W: Der { fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, - ) -> Result { + ) -> Result<(), ()> { self.deref().watch_channel(channel_id, monitor) } - fn update_channel( - &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus { + fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) { self.deref().update_channel(channel_id, update) } @@ -384,9 +384,8 @@ impl + ?Sized, W: Der /// Note that use as part of a [`Watch`] implementation involves reentrancy. Therefore, the `Filter` /// should not block on I/O. Implementations should instead queue the newly monitored data to be /// processed later. Then, in order to block until the data has been processed, any [`Watch`] -/// invocation that has called the `Filter` must return [`InProgress`]. -/// -/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress +/// invocation that has called the `Filter` should delay its [`MonitorEvent::Completed`] until +/// processing finishes. /// [BIP 157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki /// [BIP 158]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki pub trait Filter { diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 15aa1daecfe..65feca076d4 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -9193,8 +9193,6 @@ where /// [`Self::monitor_updating_restored`] is called. /// /// [`ChannelManager`]: super::channelmanager::ChannelManager - /// [`chain::Watch`]: crate::chain::Watch - /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress fn monitor_updating_paused( &mut self, resend_raa: bool, resend_commitment: bool, resend_channel_ready: bool, pending_forwards: Vec<(PendingHTLCInfo, u64)>, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6bf04cd62a4..461205a39d2 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -48,7 +48,7 @@ use crate::chain::channelmonitor::{ LATENCY_GRACE_PERIOD_BLOCKS, MAX_BLOCKS_FOR_CONF, }; use crate::chain::transaction::{OutPoint, TransactionData}; -use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Confirm, Watch}; +use crate::chain::{BestBlock, Confirm, Watch}; use crate::events::{ self, ClosureReason, Event, EventHandler, EventsProvider, HTLCHandlingFailureType, InboundChannelFunds, PaymentFailureReason, ReplayEvent, @@ -2793,13 +2793,6 @@ pub struct ChannelManager< #[cfg(any(test, feature = "_test_utils"))] pub(super) per_peer_state: FairRwLock>>>, - /// We only support using one of [`ChannelMonitorUpdateStatus::InProgress`] and - /// [`ChannelMonitorUpdateStatus::Completed`] without restarting. Because the API does not - /// otherwise directly enforce this, we enforce it in non-test builds here by storing which one - /// is in use. - #[cfg(not(any(test, feature = "_externalize_tests")))] - monitor_update_type: AtomicUsize, - /// The set of events which we need to give to the user to handle. In some cases an event may /// require some further action after the user handles it (currently only blocking a monitor /// update from being handed to the user to ensure the included changes to the channel state @@ -3202,12 +3195,7 @@ pub struct PhantomRouteHints { } /// The return type of [`ChannelManager::check_free_peer_holding_cells`] -type FreeHoldingCellsResult = Vec<( - ChannelId, - PublicKey, - Option, - Vec<(HTLCSource, PaymentHash)>, -)>; +type FreeHoldingCellsResult = Vec<(ChannelId, PublicKey, Vec<(HTLCSource, PaymentHash)>)>; macro_rules! insert_short_channel_id { ($short_to_chan_info: ident, $channel: expr) => {{ @@ -3541,9 +3529,6 @@ impl< per_peer_state: FairRwLock::new(new_hash_map()), - #[cfg(not(any(test, feature = "_externalize_tests")))] - monitor_update_type: AtomicUsize::new(0), - pending_events: Mutex::new(VecDeque::new()), pending_events_processor: AtomicBool::new(false), pending_htlc_forwards_processor: AtomicBool::new(false), @@ -3964,19 +3949,12 @@ impl< // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt.take() { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo_opt.unwrap(), monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } } else { let reason = ClosureReason::LocallyCoopClosedUnfundedChannel; @@ -4096,19 +4074,12 @@ impl< match peer_state.channel_by_id.entry(channel_id) { hash_map::Entry::Occupied(mut chan_entry) => { if let Some(chan) = chan_entry.get_mut().as_funded_mut() { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo, monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); return; } else { debug_assert!(false, "We shouldn't have an update for a non-funded channel"); @@ -4117,18 +4088,13 @@ impl< hash_map::Entry::Vacant(_) => {}, } - if let Some(actions) = self.handle_post_close_monitor_update( + self.handle_new_monitor_update_locked_actions_handled_by_caller( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, + channel_id, funding_txo, - monitor_update, counterparty_node_id, - channel_id, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(actions); - } + monitor_update, + ); } /// When a channel is removed, two things need to happen: @@ -5327,30 +5293,12 @@ impl< ); match break_channel_entry!(self, peer_state, send_res, chan_entry) { Some(monitor_update) => { - let (update_completed, completion_data) = self - .handle_new_monitor_update_with_status( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - chan, - funding_txo, - monitor_update, - ); - if let Some(data) = completion_data { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } - if !update_completed { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); - } + self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + chan, + funding_txo, + monitor_update, + ); }, None => {}, } @@ -5433,7 +5381,7 @@ impl< /// /// Additionally, in the scenario where we begin the process of sending a payment, but crash /// before `send_payment` returns (or prior to [`ChannelMonitorUpdate`] persistence if you're - /// using [`ChannelMonitorUpdateStatus::InProgress`]), the payment may be lost on restart. See + /// using asynchronous [`chain::Watch`] persistence), the payment may be lost on restart. See /// [`ChannelManager::list_recent_payments`] for more information. /// /// Routes are automatically found using the [`Router`] provided on startup. To fix a route for a @@ -5444,7 +5392,6 @@ impl< /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`UpdateHTLCs`]: MessageSendEvent::UpdateHTLCs /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events - /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress pub fn send_payment( &self, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry, @@ -6645,9 +6592,7 @@ impl< &self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, transaction: Transaction, ) -> Result<(), APIError> { let mut funding_tx_signed_result = Ok(()); - let mut monitor_update_result: Option< - Result, - > = None; + let mut monitor_update_result: Option = None; PersistenceNotifierGuard::optionally_notify(self, || { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6752,18 +6697,12 @@ impl< match counterparty_initial_commitment_signed_result { Some(Ok(Some(monitor_update))) => { let funding_txo = funded_chan.funding.get_funding_txo(); - if let Some(post_update_data) = self - .handle_new_monitor_update( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - funded_chan, - funding_txo.unwrap(), - monitor_update, - ) { - monitor_update_result = Some(Ok(post_update_data)); - } + self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + funded_chan, + funding_txo.unwrap(), + monitor_update, + ); }, Some(Err(err)) => { let (drop, err) = self.locked_handle_funded_force_close( @@ -6776,7 +6715,7 @@ impl< chan_entry.remove_entry(); } - monitor_update_result = Some(Err(err)); + monitor_update_result = Some(err); }, Some(Ok(None)) | None => {}, } @@ -6800,15 +6739,8 @@ impl< mem::drop(peer_state_lock); mem::drop(per_peer_state); - if let Some(monitor_update_result) = monitor_update_result { - match monitor_update_result { - Ok(post_update_data) => { - self.handle_post_monitor_update_chan_resume(post_update_data); - }, - Err(_) => { - let _ = self.handle_error(monitor_update_result, *counterparty_node_id); - }, - } + if let Some(err) = monitor_update_result { + let _ = self.handle_error::<()>(Err(err), *counterparty_node_id); } NotifyOption::DoPersist @@ -9359,19 +9291,12 @@ impl< .or_insert_with(Vec::new) .push(raa_blocker); } - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, prev_hop.funding_txo, monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); }, UpdateFulfillCommitFetch::DuplicateClaim {} => { let (action_opt, raa_blocker_opt) = completion_action(None, true); @@ -9532,18 +9457,13 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ .push(action); } - if let Some(actions) = self.handle_post_close_monitor_update( + self.handle_new_monitor_update_locked_actions_handled_by_caller( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, + chan_id, prev_hop.funding_txo, - preimage_update, prev_hop.counterparty_node_id, - chan_id, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(actions); - } + preimage_update, + ); } fn finalize_claims(&self, sources: Vec<(HTLCSource, Option)>) { @@ -10027,19 +9947,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ /// Applies a [`ChannelMonitorUpdate`] to the channel monitor. /// /// Monitor updates must be applied while holding the same lock under which they were generated - /// to ensure correct ordering. However, completion handling requires releasing those locks. - /// This method applies the update immediately (while locks are held) and returns whether the - /// update completed, allowing the caller to handle completion separately after releasing locks. + /// to ensure correct ordering. The update is always considered in-progress until completion is + /// signaled asynchronously via [`MonitorEvent::Completed`]. /// - /// Returns a tuple of `(update_completed, all_updates_completed)`: - /// - `update_completed`: whether this specific monitor update finished persisting - /// - `all_updates_completed`: whether all in-flight updates for this channel are now complete + /// [`MonitorEvent::Completed`]: chain::channelmonitor::MonitorEvent::Completed fn handle_new_monitor_update_locked_actions_handled_by_caller( &self, in_flight_monitor_updates: &mut BTreeMap)>, channel_id: ChannelId, funding_txo: OutPoint, counterparty_node_id: PublicKey, new_update: ChannelMonitorUpdate, - ) -> (bool, bool) { + ) { let in_flight_updates = &mut in_flight_monitor_updates .entry(channel_id) .or_insert_with(|| (funding_txo, Vec::new())) @@ -10054,15 +9971,13 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }); if self.background_events_processed_since_startup.load(Ordering::Acquire) { - let update_res = - self.chain_monitor.update_channel(channel_id, &in_flight_updates[update_idx]); + self.chain_monitor.update_channel(channel_id, &in_flight_updates[update_idx]); let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None); - let update_completed = self.handle_monitor_update_res(update_res, logger); - if update_completed { - let _ = in_flight_updates.remove(update_idx); - } - (update_completed, update_completed && in_flight_updates.is_empty()) + log_debug!( + logger, + "ChannelMonitor update in flight, holding messages until the update completes.", + ); } else { // We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we // fail to persist it. This is a fairly safe assumption, however, since anything we do @@ -10082,167 +9997,42 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ // We could work around that with some effort, but its simpler to just track updates // twice. self.pending_background_events.lock().unwrap().push(event); - (false, false) } } - /// Handles a monitor update for a closed channel, returning optionally the completion actions - /// to process after locks are released. + /// Logs that the initial monitor persistence is in-progress. Completion is signaled + /// asynchronously via [`MonitorEvent::Completed`]. /// - /// Returns `Some` if all in-flight updates are complete. - fn handle_post_close_monitor_update( - &self, - in_flight_monitor_updates: &mut BTreeMap)>, - monitor_update_blocked_actions: &mut BTreeMap< - ChannelId, - Vec, - >, - funding_txo: OutPoint, update: ChannelMonitorUpdate, counterparty_node_id: PublicKey, - channel_id: ChannelId, - ) -> Option> { - let (_update_completed, all_updates_complete) = self - .handle_new_monitor_update_locked_actions_handled_by_caller( - in_flight_monitor_updates, - channel_id, - funding_txo, - counterparty_node_id, - update, - ); - if all_updates_complete { - Some(monitor_update_blocked_actions.remove(&channel_id).unwrap_or(Vec::new())) - } else { - None - } - } - - /// Returns whether the monitor update is completed, `false` if the update is in-progress. - fn handle_monitor_update_res( - &self, update_res: ChannelMonitorUpdateStatus, logger: LG, - ) -> bool { + /// [`MonitorEvent::Completed`]: chain::channelmonitor::MonitorEvent::Completed + fn handle_initial_monitor(&self, chan: &FundedChannel) { debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire)); - match update_res { - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - ChannelMonitorUpdateStatus::InProgress => { - #[cfg(not(any(test, feature = "_externalize_tests")))] - if self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 { - panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); - } - log_debug!( - logger, - "ChannelMonitor update in flight, holding messages until the update completes.", - ); - false - }, - ChannelMonitorUpdateStatus::Completed => { - #[cfg(not(any(test, feature = "_externalize_tests")))] - if self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 { - panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); - } - true - }, - } - } - - /// Handles the initial monitor persistence, returning optionally data to process after locks - /// are released. - /// - /// Note: This method takes individual fields from `PeerState` rather than the whole struct - /// to avoid borrow checker issues when the channel is borrowed from `peer_state.channel_by_id`. - fn handle_initial_monitor( - &self, - in_flight_monitor_updates: &mut BTreeMap)>, - monitor_update_blocked_actions: &mut BTreeMap< - ChannelId, - Vec, - >, - pending_msg_events: &mut Vec, is_connected: bool, - chan: &mut FundedChannel, update_res: ChannelMonitorUpdateStatus, - ) -> Option { let logger = WithChannelContext::from(&self.logger, &chan.context, None); - let update_completed = self.handle_monitor_update_res(update_res, logger); - if update_completed { - Some(self.try_resume_channel_post_monitor_update( - in_flight_monitor_updates, - monitor_update_blocked_actions, - pending_msg_events, - is_connected, - chan, - )) - } else { - None - } + log_debug!( + logger, + "ChannelMonitor update in flight, holding messages until the update completes.", + ); } - /// Applies a new monitor update and attempts to resume the channel if all updates are complete. - /// - /// Returns [`PostMonitorUpdateChanResume`] if all in-flight updates are complete, which should - /// be passed to [`Self::handle_post_monitor_update_chan_resume`] after releasing locks. + /// Applies a new monitor update, sending it to the [`Watch`] implementation. /// /// Note: This method takes individual fields from [`PeerState`] rather than the whole struct /// to avoid borrow checker issues when the channel is borrowed from `peer_state.channel_by_id`. + /// + /// [`Watch`]: chain::Watch fn handle_new_monitor_update( &self, in_flight_monitor_updates: &mut BTreeMap)>, - monitor_update_blocked_actions: &mut BTreeMap< - ChannelId, - Vec, - >, - pending_msg_events: &mut Vec, is_connected: bool, chan: &mut FundedChannel, funding_txo: OutPoint, update: ChannelMonitorUpdate, - ) -> Option { - self.handle_new_monitor_update_with_status( + ) { + let chan_id = chan.context.channel_id(); + let counterparty_node_id = chan.context.get_counterparty_node_id(); + self.handle_new_monitor_update_locked_actions_handled_by_caller( in_flight_monitor_updates, - monitor_update_blocked_actions, - pending_msg_events, - is_connected, - chan, + chan_id, funding_txo, + counterparty_node_id, update, - ) - .1 - } - - /// Like [`Self::handle_new_monitor_update`], but also returns whether this specific update - /// completed (as opposed to being in-progress). - fn handle_new_monitor_update_with_status( - &self, - in_flight_monitor_updates: &mut BTreeMap)>, - monitor_update_blocked_actions: &mut BTreeMap< - ChannelId, - Vec, - >, - pending_msg_events: &mut Vec, is_connected: bool, - chan: &mut FundedChannel, funding_txo: OutPoint, update: ChannelMonitorUpdate, - ) -> (bool, Option) { - let chan_id = chan.context.channel_id(); - let counterparty_node_id = chan.context.get_counterparty_node_id(); - - let (update_completed, all_updates_complete) = self - .handle_new_monitor_update_locked_actions_handled_by_caller( - in_flight_monitor_updates, - chan_id, - funding_txo, - counterparty_node_id, - update, - ); - - let completion_data = if all_updates_complete { - Some(self.try_resume_channel_post_monitor_update( - in_flight_monitor_updates, - monitor_update_blocked_actions, - pending_msg_events, - is_connected, - chan, - )) - } else { - None - }; - - (update_completed, completion_data) + ); } /// Attempts to resume a channel after a monitor update completes, while locks are still held. @@ -11227,7 +11017,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }, hash_map::Entry::Vacant(e) => { let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); - if let Ok(persist_state) = monitor_res { + if monitor_res.is_ok() { // There's no problem signing a counterparty's funding transaction if our monitor // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't // accepted payment from yet. We do, however, need to wait to send our channel_ready @@ -11240,18 +11030,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } if let Some(funded_chan) = e.insert(Channel::from(chan)).as_funded_mut() { - if let Some(data) = self.handle_initial_monitor( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - funded_chan, - persist_state, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + self.handle_initial_monitor(funded_chan); } else { unreachable!("This must be a funded channel as we just inserted it."); } @@ -11406,22 +11185,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ funded_chan.unset_funding_info(); ChannelError::close("Channel ID was a duplicate".to_owned()) }) - .map(|persist_status| (funded_chan, persist_status)) + .map(|()| funded_chan) }) { - Ok((funded_chan, persist_status)) => { - if let Some(data) = self.handle_initial_monitor( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - funded_chan, - persist_status, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + Ok(funded_chan) => { + self.handle_initial_monitor(funded_chan); Ok(()) }, Err(e) => try_channel_entry!(self, peer_state, Err(e), chan_entry), @@ -11969,19 +11737,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo_opt.unwrap(), monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } }, None => { @@ -12310,19 +12071,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(monitor) = monitor_opt { let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); - if let Ok(persist_state) = monitor_res { - if let Some(data) = self.handle_initial_monitor( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - chan, - persist_state, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + if monitor_res.is_ok() { + self.handle_initial_monitor(chan); } else { let logger = WithChannelContext::from(&self.logger, &chan.context, None); @@ -12333,19 +12083,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ try_channel_entry!(self, peer_state, Err(err), chan_entry) } } else if let Some(monitor_update) = monitor_update_opt { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo.unwrap(), monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } } Ok(()) @@ -12376,19 +12119,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ ); if let Some(monitor_update) = monitor_update_opt { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo.unwrap(), monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } } Ok(()) @@ -12455,6 +12191,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }) } + #[cfg(any(test, feature = "_test_utils"))] + pub(crate) fn test_process_pending_monitor_events(&self) { + let _read_guard = self.total_consistency_lock.read().unwrap(); + self.process_pending_monitor_events(); + } + #[cfg(any(test, feature = "_test_utils"))] pub(crate) fn test_raa_monitor_updates_held( &self, counterparty_node_id: PublicKey, channel_id: ChannelId, @@ -12495,19 +12237,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(monitor_update) = monitor_update_opt { let funding_txo = funding_txo_opt .expect("Funding outpoint must have been set for RAA handling to succeed"); - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo, monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } (htlcs_to_fail, static_invoices) } else { @@ -12986,19 +12721,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } if let Some(monitor_update) = splice_promotion.monitor_update { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, splice_promotion.funding_txo, monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } } } else { @@ -13153,11 +12881,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread ); - for (chan_id, cp_node_id, post_update_data, failed_htlcs) in result { - if let Some(data) = post_update_data { - self.handle_post_monitor_update_chan_resume(data); - } - + for (chan_id, cp_node_id, failed_htlcs) in result { self.fail_holding_cell_htlcs(failed_htlcs, chan_id, &cp_node_id); self.needs_persist_flag.store(true, Ordering::Release); self.event_persist_notifier.notify(); @@ -13188,21 +12912,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ &&WithChannelContext::from(&self.logger, &chan.context, None), ); if monitor_opt.is_some() || !holding_cell_failed_htlcs.is_empty() { - let update_res = monitor_opt - .map(|monitor_update| { - self.handle_new_monitor_update( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - chan, - chan.funding.get_funding_txo().unwrap(), - monitor_update, - ) - }) - .flatten(); + if let Some(monitor_update) = monitor_opt { + self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + chan, + chan.funding.get_funding_txo().unwrap(), + monitor_update, + ); + } let cp_node_id = chan.context.get_counterparty_node_id(); - updates.push((*chan_id, cp_node_id, update_res, holding_cell_failed_htlcs)); + updates.push((*chan_id, cp_node_id, holding_cell_failed_htlcs)); } } updates @@ -14736,11 +14455,8 @@ impl< if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() { log_debug!(logger, "Unlocking monitor updating and updating monitor", ); - let post_update_data = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, channel_funding_outpoint, monitor_update, @@ -14750,10 +14466,6 @@ impl< mem::drop(peer_state_lck); mem::drop(per_peer_state); - if let Some(data) = post_update_data { - self.handle_post_monitor_update_chan_resume(data); - } - self.handle_holding_cell_free_result(holding_cell_res); if further_update_exists { @@ -14833,18 +14545,13 @@ impl< }; self.pending_background_events.lock().unwrap().push(event); } else { - if let Some(actions) = self.handle_post_close_monitor_update( + self.handle_new_monitor_update_locked_actions_handled_by_caller( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, + channel_id, channel_funding_outpoint, - update, counterparty_node_id, - channel_id, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(actions); - } + update, + ); } }, } @@ -19699,9 +19406,6 @@ impl< per_peer_state: FairRwLock::new(per_peer_state), - #[cfg(not(any(test, feature = "_externalize_tests")))] - monitor_update_type: AtomicUsize::new(0), - pending_events: Mutex::new(pending_events_read), pending_events_processor: AtomicBool::new(false), pending_htlc_forwards_processor: AtomicBool::new(false), diff --git a/lightning/src/util/errors.rs b/lightning/src/util/errors.rs index cd72d60327f..a98a607640b 100644 --- a/lightning/src/util/errors.rs +++ b/lightning/src/util/errors.rs @@ -52,13 +52,11 @@ pub enum APIError { err: String, }, /// An attempt to call [`chain::Watch::watch_channel`]/[`chain::Watch::update_channel`] - /// returned a [`ChannelMonitorUpdateStatus::InProgress`] indicating the persistence of a - /// monitor update is awaiting async resolution. Once it resolves the attempted action should - /// complete automatically. + /// indicates the persistence of a monitor update is awaiting async resolution. Once it + /// resolves the attempted action should complete automatically. /// /// [`chain::Watch::watch_channel`]: crate::chain::Watch::watch_channel /// [`chain::Watch::update_channel`]: crate::chain::Watch::update_channel - /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress MonitorUpdateInProgress, /// [`SignerProvider::get_shutdown_scriptpubkey`] returned a shutdown scriptpubkey incompatible /// with the channel counterparty as negotiated in [`InitFeatures`].