diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7db1b697c2b..9ee6ebb529b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1032,11 +1032,13 @@ where /// See the release notes for LDK 0.1 for more information on this requirement. /// /// [`ChannelMonitor`]s which do not need to be persisted (i.e. were last written by LDK 0.1 or - /// later) will be loaded without persistence and this method will return - /// [`ChannelMonitorUpdateStatus::Completed`]. + /// later) will be loaded without persistence and a [`MonitorEvent::Completed`] will be + /// immediately queued. + /// + /// [`MonitorEvent::Completed`]: channelmonitor::MonitorEvent::Completed pub fn load_existing_monitor( &self, channel_id: ChannelId, monitor: ChannelMonitor, - ) -> Result { + ) -> Result<(), ()> { if !monitor.written_by_0_1_or_later() { return chain::Watch::watch_channel(self, channel_id, monitor); } @@ -1054,9 +1056,20 @@ where if let Some(ref chain_source) = self.chain_source { monitor.load_outputs_to_watch(chain_source, &self.logger); } + // The monitor is already persisted, so generate MonitorEvent::Completed immediately. + let funding_txo = monitor.get_funding_txo(); + let counterparty_node_id = monitor.get_counterparty_node_id(); + let update_id = monitor.get_latest_update_id(); entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(Vec::new()) }); + self.pending_monitor_events.lock().unwrap().push(( + funding_txo, + channel_id, + vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id: update_id }], + counterparty_node_id, + )); + self.event_notifier.notify(); - Ok(ChannelMonitorUpdateStatus::Completed) + Ok(()) } } @@ -1271,7 +1284,7 @@ where { fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, - ) -> Result { + ) -> Result<(), ()> { let logger = WithChannelMonitor::from(&self.logger, &monitor, None); let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(channel_id) { @@ -1285,33 +1298,52 @@ where let update_id = monitor.get_latest_update_id(); let mut pending_monitor_updates = Vec::new(); let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { + let persist_completed = match persist_res { ChannelMonitorUpdateStatus::InProgress => { log_info!(logger, "Persistence of new ChannelMonitor in progress",); pending_monitor_updates.push(update_id); + false }, ChannelMonitorUpdateStatus::Completed => { log_info!(logger, "Persistence of new ChannelMonitor completed",); + true }, ChannelMonitorUpdateStatus::UnrecoverableError => { let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; log_error!(logger, "{}", err_str); panic!("{}", err_str); }, - } + }; if let Some(ref chain_source) = self.chain_source { monitor.load_outputs_to_watch(chain_source, &self.logger); } + // Capture monitor info before moving it into the map. + let funding_txo = monitor.get_funding_txo(); + let counterparty_node_id = monitor.get_counterparty_node_id(); entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates), }); - Ok(persist_res) + if persist_completed { + // Persist returned Completed, so generate MonitorEvent::Completed immediately. + // We can't call channel_monitor_updated here because we hold the monitors write + // lock. Instead, push directly to pending_monitor_events which is a separate Mutex. + self.pending_monitor_events.lock().unwrap().push(( + funding_txo, + channel_id, + vec![MonitorEvent::Completed { + funding_txo, + channel_id, + monitor_update_id: update_id, + }], + counterparty_node_id, + )); + self.event_notifier.notify(); + } + Ok(()) } - fn update_channel( - &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus { + fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) { // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. debug_assert_eq!(update.channel_id.unwrap(), channel_id); @@ -1328,7 +1360,7 @@ where #[cfg(debug_assertions)] panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); #[cfg(not(debug_assertions))] - ChannelMonitorUpdateStatus::InProgress + return; }, Some(monitor_state) => { let monitor = &monitor_state.monitor; @@ -1382,6 +1414,24 @@ where "Persistence of ChannelMonitorUpdate id {:?} completed", update_id, ); + // If no prior async updates are pending, we can immediately signal + // completion. Otherwise, completion of those prior updates via + // channel_monitor_updated will eventually generate the event (using + // monitor.get_latest_update_id() which covers this update too). + if !monitor_state.has_pending_updates(&pending_monitor_updates) { + let funding_txo = monitor.get_funding_txo(); + self.pending_monitor_events.lock().unwrap().push(( + funding_txo, + channel_id, + vec![MonitorEvent::Completed { + funding_txo, + channel_id, + monitor_update_id: monitor.get_latest_update_id(), + }], + monitor.get_counterparty_node_id(), + )); + self.event_notifier.notify(); + } }, ChannelMonitorUpdateStatus::UnrecoverableError => { // Take the monitors lock for writing so that we poison it and any future @@ -1413,12 +1463,6 @@ where }); } } - - if update_res.is_err() { - ChannelMonitorUpdateStatus::InProgress - } else { - persist_res - } }, } } diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index bc47f1b1db6..62a0847447b 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -207,7 +207,7 @@ pub trait Confirm { /// An enum representing the status of a channel monitor update persistence. /// -/// These are generally used as the return value for an implementation of [`Persist`] which is used +/// These are used as the return value for an implementation of [`Persist`] which is used /// as the storage layer for a [`ChainMonitor`]. See the docs on [`Persist`] for a high-level /// explanation of how to handle different cases. /// @@ -234,7 +234,7 @@ pub enum ChannelMonitorUpdateStatus { /// be available on restart even if the application crashes. /// /// If you return this variant, you cannot later return [`InProgress`] from the same instance of - /// [`Persist`]/[`Watch`] without first restarting. + /// [`Persist`] without first restarting. /// /// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress /// [`Persist`]: chainmonitor::Persist @@ -264,7 +264,7 @@ pub enum ChannelMonitorUpdateStatus { /// remaining cases are fixed, in rare cases, *using this feature may lead to funds loss*. /// /// If you return this variant, you cannot later return [`Completed`] from the same instance of - /// [`Persist`]/[`Watch`] without first restarting. + /// [`Persist`] without first restarting. /// /// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress /// [`Completed`]: ChannelMonitorUpdateStatus::Completed @@ -293,7 +293,8 @@ pub enum ChannelMonitorUpdateStatus { /// persisted to disk to ensure that the latest [`ChannelMonitor`] state can be reloaded if the /// application crashes. /// -/// See method documentation and [`ChannelMonitorUpdateStatus`] for specific requirements. +/// Updates are always considered in-progress until completion is signaled asynchronously via +/// [`MonitorEvent::Completed`] in [`Watch::release_pending_monitor_events`]. pub trait Watch { /// Watches a channel identified by `channel_id` using `monitor`. /// @@ -312,26 +313,30 @@ pub trait Watch { /// [`blocks_disconnected`]: channelmonitor::ChannelMonitor::blocks_disconnected fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, - ) -> Result; + ) -> Result<(), ()>; /// Updates a channel identified by `channel_id` by applying `update` to its monitor. /// /// Implementations must call [`ChannelMonitor::update_monitor`] with the given update. This - /// may fail (returning an `Err(())`), in which case this should return - /// [`ChannelMonitorUpdateStatus::InProgress`] (and the update should never complete). This + /// may fail (returning an `Err(())`), in which case the update should never complete. This /// generally implies the channel has been closed (either by the funding outpoint being spent /// on-chain or the [`ChannelMonitor`] having decided to do so and broadcasted a transaction), /// and the [`ChannelManager`] state will be updated once it sees the funding spend on-chain. /// - /// In general, persistence failures should be retried after returning - /// [`ChannelMonitorUpdateStatus::InProgress`] and eventually complete. If a failure truly - /// cannot be retried, the node should shut down immediately after returning - /// [`ChannelMonitorUpdateStatus::UnrecoverableError`], see its documentation for more info. + /// The update is considered in-progress until a [`MonitorEvent::Completed`] is provided via + /// [`Watch::release_pending_monitor_events`]. While in-progress, the channel will be + /// "frozen", preventing us from revoking old states or submitting a new commitment + /// transaction to the counterparty. + /// + /// Even when a channel has been "frozen", updates to the [`ChannelMonitor`] can continue to + /// occur (e.g. if an inbound HTLC which we forwarded was claimed upstream, resulting in us + /// attempting to claim it on this channel) and those updates must still be persisted. + /// + /// In general, persistence failures should be retried in the background and eventually + /// complete. If a failure truly cannot be retried, the node should shut down. /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager - fn update_channel( - &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus; + fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate); /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. @@ -339,9 +344,6 @@ pub trait Watch { /// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no /// further events may be returned here until the [`ChannelMonitor`] has been fully persisted /// to disk. - /// - /// For details on asynchronous [`ChannelMonitor`] updating and returning - /// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`]. fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; @@ -352,13 +354,11 @@ impl + ?Sized, W: Der { fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, - ) -> Result { + ) -> Result<(), ()> { self.deref().watch_channel(channel_id, monitor) } - fn update_channel( - &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus { + fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) { self.deref().update_channel(channel_id, update) } @@ -384,9 +384,8 @@ impl + ?Sized, W: Der /// Note that use as part of a [`Watch`] implementation involves reentrancy. Therefore, the `Filter` /// should not block on I/O. Implementations should instead queue the newly monitored data to be /// processed later. Then, in order to block until the data has been processed, any [`Watch`] -/// invocation that has called the `Filter` must return [`InProgress`]. -/// -/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress +/// invocation that has called the `Filter` should delay its [`MonitorEvent::Completed`] until +/// processing finishes. /// [BIP 157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki /// [BIP 158]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki pub trait Filter { diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 15aa1daecfe..65feca076d4 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -9193,8 +9193,6 @@ where /// [`Self::monitor_updating_restored`] is called. /// /// [`ChannelManager`]: super::channelmanager::ChannelManager - /// [`chain::Watch`]: crate::chain::Watch - /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress fn monitor_updating_paused( &mut self, resend_raa: bool, resend_commitment: bool, resend_channel_ready: bool, pending_forwards: Vec<(PendingHTLCInfo, u64)>, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6bf04cd62a4..461205a39d2 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -48,7 +48,7 @@ use crate::chain::channelmonitor::{ LATENCY_GRACE_PERIOD_BLOCKS, MAX_BLOCKS_FOR_CONF, }; use crate::chain::transaction::{OutPoint, TransactionData}; -use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Confirm, Watch}; +use crate::chain::{BestBlock, Confirm, Watch}; use crate::events::{ self, ClosureReason, Event, EventHandler, EventsProvider, HTLCHandlingFailureType, InboundChannelFunds, PaymentFailureReason, ReplayEvent, @@ -2793,13 +2793,6 @@ pub struct ChannelManager< #[cfg(any(test, feature = "_test_utils"))] pub(super) per_peer_state: FairRwLock>>>, - /// We only support using one of [`ChannelMonitorUpdateStatus::InProgress`] and - /// [`ChannelMonitorUpdateStatus::Completed`] without restarting. Because the API does not - /// otherwise directly enforce this, we enforce it in non-test builds here by storing which one - /// is in use. - #[cfg(not(any(test, feature = "_externalize_tests")))] - monitor_update_type: AtomicUsize, - /// The set of events which we need to give to the user to handle. In some cases an event may /// require some further action after the user handles it (currently only blocking a monitor /// update from being handed to the user to ensure the included changes to the channel state @@ -3202,12 +3195,7 @@ pub struct PhantomRouteHints { } /// The return type of [`ChannelManager::check_free_peer_holding_cells`] -type FreeHoldingCellsResult = Vec<( - ChannelId, - PublicKey, - Option, - Vec<(HTLCSource, PaymentHash)>, -)>; +type FreeHoldingCellsResult = Vec<(ChannelId, PublicKey, Vec<(HTLCSource, PaymentHash)>)>; macro_rules! insert_short_channel_id { ($short_to_chan_info: ident, $channel: expr) => {{ @@ -3541,9 +3529,6 @@ impl< per_peer_state: FairRwLock::new(new_hash_map()), - #[cfg(not(any(test, feature = "_externalize_tests")))] - monitor_update_type: AtomicUsize::new(0), - pending_events: Mutex::new(VecDeque::new()), pending_events_processor: AtomicBool::new(false), pending_htlc_forwards_processor: AtomicBool::new(false), @@ -3964,19 +3949,12 @@ impl< // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt.take() { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo_opt.unwrap(), monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } } else { let reason = ClosureReason::LocallyCoopClosedUnfundedChannel; @@ -4096,19 +4074,12 @@ impl< match peer_state.channel_by_id.entry(channel_id) { hash_map::Entry::Occupied(mut chan_entry) => { if let Some(chan) = chan_entry.get_mut().as_funded_mut() { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo, monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); return; } else { debug_assert!(false, "We shouldn't have an update for a non-funded channel"); @@ -4117,18 +4088,13 @@ impl< hash_map::Entry::Vacant(_) => {}, } - if let Some(actions) = self.handle_post_close_monitor_update( + self.handle_new_monitor_update_locked_actions_handled_by_caller( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, + channel_id, funding_txo, - monitor_update, counterparty_node_id, - channel_id, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(actions); - } + monitor_update, + ); } /// When a channel is removed, two things need to happen: @@ -5327,30 +5293,12 @@ impl< ); match break_channel_entry!(self, peer_state, send_res, chan_entry) { Some(monitor_update) => { - let (update_completed, completion_data) = self - .handle_new_monitor_update_with_status( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - chan, - funding_txo, - monitor_update, - ); - if let Some(data) = completion_data { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } - if !update_completed { - // Note that MonitorUpdateInProgress here indicates (per function - // docs) that we will resend the commitment update once monitor - // updating completes. Therefore, we must return an error - // indicating that it is unsafe to retry the payment wholesale, - // which we do in the send_payment check for - // MonitorUpdateInProgress, below. - return Err(APIError::MonitorUpdateInProgress); - } + self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + chan, + funding_txo, + monitor_update, + ); }, None => {}, } @@ -5433,7 +5381,7 @@ impl< /// /// Additionally, in the scenario where we begin the process of sending a payment, but crash /// before `send_payment` returns (or prior to [`ChannelMonitorUpdate`] persistence if you're - /// using [`ChannelMonitorUpdateStatus::InProgress`]), the payment may be lost on restart. See + /// using asynchronous [`chain::Watch`] persistence), the payment may be lost on restart. See /// [`ChannelManager::list_recent_payments`] for more information. /// /// Routes are automatically found using the [`Router`] provided on startup. To fix a route for a @@ -5444,7 +5392,6 @@ impl< /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`UpdateHTLCs`]: MessageSendEvent::UpdateHTLCs /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events - /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress pub fn send_payment( &self, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry, @@ -6645,9 +6592,7 @@ impl< &self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, transaction: Transaction, ) -> Result<(), APIError> { let mut funding_tx_signed_result = Ok(()); - let mut monitor_update_result: Option< - Result, - > = None; + let mut monitor_update_result: Option = None; PersistenceNotifierGuard::optionally_notify(self, || { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6752,18 +6697,12 @@ impl< match counterparty_initial_commitment_signed_result { Some(Ok(Some(monitor_update))) => { let funding_txo = funded_chan.funding.get_funding_txo(); - if let Some(post_update_data) = self - .handle_new_monitor_update( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - funded_chan, - funding_txo.unwrap(), - monitor_update, - ) { - monitor_update_result = Some(Ok(post_update_data)); - } + self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + funded_chan, + funding_txo.unwrap(), + monitor_update, + ); }, Some(Err(err)) => { let (drop, err) = self.locked_handle_funded_force_close( @@ -6776,7 +6715,7 @@ impl< chan_entry.remove_entry(); } - monitor_update_result = Some(Err(err)); + monitor_update_result = Some(err); }, Some(Ok(None)) | None => {}, } @@ -6800,15 +6739,8 @@ impl< mem::drop(peer_state_lock); mem::drop(per_peer_state); - if let Some(monitor_update_result) = monitor_update_result { - match monitor_update_result { - Ok(post_update_data) => { - self.handle_post_monitor_update_chan_resume(post_update_data); - }, - Err(_) => { - let _ = self.handle_error(monitor_update_result, *counterparty_node_id); - }, - } + if let Some(err) = monitor_update_result { + let _ = self.handle_error::<()>(Err(err), *counterparty_node_id); } NotifyOption::DoPersist @@ -9359,19 +9291,12 @@ impl< .or_insert_with(Vec::new) .push(raa_blocker); } - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, prev_hop.funding_txo, monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); }, UpdateFulfillCommitFetch::DuplicateClaim {} => { let (action_opt, raa_blocker_opt) = completion_action(None, true); @@ -9532,18 +9457,13 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ .push(action); } - if let Some(actions) = self.handle_post_close_monitor_update( + self.handle_new_monitor_update_locked_actions_handled_by_caller( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, + chan_id, prev_hop.funding_txo, - preimage_update, prev_hop.counterparty_node_id, - chan_id, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(actions); - } + preimage_update, + ); } fn finalize_claims(&self, sources: Vec<(HTLCSource, Option)>) { @@ -10027,19 +9947,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ /// Applies a [`ChannelMonitorUpdate`] to the channel monitor. /// /// Monitor updates must be applied while holding the same lock under which they were generated - /// to ensure correct ordering. However, completion handling requires releasing those locks. - /// This method applies the update immediately (while locks are held) and returns whether the - /// update completed, allowing the caller to handle completion separately after releasing locks. + /// to ensure correct ordering. The update is always considered in-progress until completion is + /// signaled asynchronously via [`MonitorEvent::Completed`]. /// - /// Returns a tuple of `(update_completed, all_updates_completed)`: - /// - `update_completed`: whether this specific monitor update finished persisting - /// - `all_updates_completed`: whether all in-flight updates for this channel are now complete + /// [`MonitorEvent::Completed`]: chain::channelmonitor::MonitorEvent::Completed fn handle_new_monitor_update_locked_actions_handled_by_caller( &self, in_flight_monitor_updates: &mut BTreeMap)>, channel_id: ChannelId, funding_txo: OutPoint, counterparty_node_id: PublicKey, new_update: ChannelMonitorUpdate, - ) -> (bool, bool) { + ) { let in_flight_updates = &mut in_flight_monitor_updates .entry(channel_id) .or_insert_with(|| (funding_txo, Vec::new())) @@ -10054,15 +9971,13 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }); if self.background_events_processed_since_startup.load(Ordering::Acquire) { - let update_res = - self.chain_monitor.update_channel(channel_id, &in_flight_updates[update_idx]); + self.chain_monitor.update_channel(channel_id, &in_flight_updates[update_idx]); let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None); - let update_completed = self.handle_monitor_update_res(update_res, logger); - if update_completed { - let _ = in_flight_updates.remove(update_idx); - } - (update_completed, update_completed && in_flight_updates.is_empty()) + log_debug!( + logger, + "ChannelMonitor update in flight, holding messages until the update completes.", + ); } else { // We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we // fail to persist it. This is a fairly safe assumption, however, since anything we do @@ -10082,167 +9997,42 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ // We could work around that with some effort, but its simpler to just track updates // twice. self.pending_background_events.lock().unwrap().push(event); - (false, false) } } - /// Handles a monitor update for a closed channel, returning optionally the completion actions - /// to process after locks are released. + /// Logs that the initial monitor persistence is in-progress. Completion is signaled + /// asynchronously via [`MonitorEvent::Completed`]. /// - /// Returns `Some` if all in-flight updates are complete. - fn handle_post_close_monitor_update( - &self, - in_flight_monitor_updates: &mut BTreeMap)>, - monitor_update_blocked_actions: &mut BTreeMap< - ChannelId, - Vec, - >, - funding_txo: OutPoint, update: ChannelMonitorUpdate, counterparty_node_id: PublicKey, - channel_id: ChannelId, - ) -> Option> { - let (_update_completed, all_updates_complete) = self - .handle_new_monitor_update_locked_actions_handled_by_caller( - in_flight_monitor_updates, - channel_id, - funding_txo, - counterparty_node_id, - update, - ); - if all_updates_complete { - Some(monitor_update_blocked_actions.remove(&channel_id).unwrap_or(Vec::new())) - } else { - None - } - } - - /// Returns whether the monitor update is completed, `false` if the update is in-progress. - fn handle_monitor_update_res( - &self, update_res: ChannelMonitorUpdateStatus, logger: LG, - ) -> bool { + /// [`MonitorEvent::Completed`]: chain::channelmonitor::MonitorEvent::Completed + fn handle_initial_monitor(&self, chan: &FundedChannel) { debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire)); - match update_res { - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - ChannelMonitorUpdateStatus::InProgress => { - #[cfg(not(any(test, feature = "_externalize_tests")))] - if self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 { - panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); - } - log_debug!( - logger, - "ChannelMonitor update in flight, holding messages until the update completes.", - ); - false - }, - ChannelMonitorUpdateStatus::Completed => { - #[cfg(not(any(test, feature = "_externalize_tests")))] - if self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 { - panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); - } - true - }, - } - } - - /// Handles the initial monitor persistence, returning optionally data to process after locks - /// are released. - /// - /// Note: This method takes individual fields from `PeerState` rather than the whole struct - /// to avoid borrow checker issues when the channel is borrowed from `peer_state.channel_by_id`. - fn handle_initial_monitor( - &self, - in_flight_monitor_updates: &mut BTreeMap)>, - monitor_update_blocked_actions: &mut BTreeMap< - ChannelId, - Vec, - >, - pending_msg_events: &mut Vec, is_connected: bool, - chan: &mut FundedChannel, update_res: ChannelMonitorUpdateStatus, - ) -> Option { let logger = WithChannelContext::from(&self.logger, &chan.context, None); - let update_completed = self.handle_monitor_update_res(update_res, logger); - if update_completed { - Some(self.try_resume_channel_post_monitor_update( - in_flight_monitor_updates, - monitor_update_blocked_actions, - pending_msg_events, - is_connected, - chan, - )) - } else { - None - } + log_debug!( + logger, + "ChannelMonitor update in flight, holding messages until the update completes.", + ); } - /// Applies a new monitor update and attempts to resume the channel if all updates are complete. - /// - /// Returns [`PostMonitorUpdateChanResume`] if all in-flight updates are complete, which should - /// be passed to [`Self::handle_post_monitor_update_chan_resume`] after releasing locks. + /// Applies a new monitor update, sending it to the [`Watch`] implementation. /// /// Note: This method takes individual fields from [`PeerState`] rather than the whole struct /// to avoid borrow checker issues when the channel is borrowed from `peer_state.channel_by_id`. + /// + /// [`Watch`]: chain::Watch fn handle_new_monitor_update( &self, in_flight_monitor_updates: &mut BTreeMap)>, - monitor_update_blocked_actions: &mut BTreeMap< - ChannelId, - Vec, - >, - pending_msg_events: &mut Vec, is_connected: bool, chan: &mut FundedChannel, funding_txo: OutPoint, update: ChannelMonitorUpdate, - ) -> Option { - self.handle_new_monitor_update_with_status( + ) { + let chan_id = chan.context.channel_id(); + let counterparty_node_id = chan.context.get_counterparty_node_id(); + self.handle_new_monitor_update_locked_actions_handled_by_caller( in_flight_monitor_updates, - monitor_update_blocked_actions, - pending_msg_events, - is_connected, - chan, + chan_id, funding_txo, + counterparty_node_id, update, - ) - .1 - } - - /// Like [`Self::handle_new_monitor_update`], but also returns whether this specific update - /// completed (as opposed to being in-progress). - fn handle_new_monitor_update_with_status( - &self, - in_flight_monitor_updates: &mut BTreeMap)>, - monitor_update_blocked_actions: &mut BTreeMap< - ChannelId, - Vec, - >, - pending_msg_events: &mut Vec, is_connected: bool, - chan: &mut FundedChannel, funding_txo: OutPoint, update: ChannelMonitorUpdate, - ) -> (bool, Option) { - let chan_id = chan.context.channel_id(); - let counterparty_node_id = chan.context.get_counterparty_node_id(); - - let (update_completed, all_updates_complete) = self - .handle_new_monitor_update_locked_actions_handled_by_caller( - in_flight_monitor_updates, - chan_id, - funding_txo, - counterparty_node_id, - update, - ); - - let completion_data = if all_updates_complete { - Some(self.try_resume_channel_post_monitor_update( - in_flight_monitor_updates, - monitor_update_blocked_actions, - pending_msg_events, - is_connected, - chan, - )) - } else { - None - }; - - (update_completed, completion_data) + ); } /// Attempts to resume a channel after a monitor update completes, while locks are still held. @@ -11227,7 +11017,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }, hash_map::Entry::Vacant(e) => { let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); - if let Ok(persist_state) = monitor_res { + if monitor_res.is_ok() { // There's no problem signing a counterparty's funding transaction if our monitor // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't // accepted payment from yet. We do, however, need to wait to send our channel_ready @@ -11240,18 +11030,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } if let Some(funded_chan) = e.insert(Channel::from(chan)).as_funded_mut() { - if let Some(data) = self.handle_initial_monitor( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - funded_chan, - persist_state, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + self.handle_initial_monitor(funded_chan); } else { unreachable!("This must be a funded channel as we just inserted it."); } @@ -11406,22 +11185,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ funded_chan.unset_funding_info(); ChannelError::close("Channel ID was a duplicate".to_owned()) }) - .map(|persist_status| (funded_chan, persist_status)) + .map(|()| funded_chan) }) { - Ok((funded_chan, persist_status)) => { - if let Some(data) = self.handle_initial_monitor( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - funded_chan, - persist_status, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + Ok(funded_chan) => { + self.handle_initial_monitor(funded_chan); Ok(()) }, Err(e) => try_channel_entry!(self, peer_state, Err(e), chan_entry), @@ -11969,19 +11737,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } // Update the monitor with the shutdown script if necessary. if let Some(monitor_update) = monitor_update_opt { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo_opt.unwrap(), monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } }, None => { @@ -12310,19 +12071,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(monitor) = monitor_opt { let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); - if let Ok(persist_state) = monitor_res { - if let Some(data) = self.handle_initial_monitor( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - chan, - persist_state, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + if monitor_res.is_ok() { + self.handle_initial_monitor(chan); } else { let logger = WithChannelContext::from(&self.logger, &chan.context, None); @@ -12333,19 +12083,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ try_channel_entry!(self, peer_state, Err(err), chan_entry) } } else if let Some(monitor_update) = monitor_update_opt { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo.unwrap(), monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } } Ok(()) @@ -12376,19 +12119,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ ); if let Some(monitor_update) = monitor_update_opt { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo.unwrap(), monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } } Ok(()) @@ -12455,6 +12191,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }) } + #[cfg(any(test, feature = "_test_utils"))] + pub(crate) fn test_process_pending_monitor_events(&self) { + let _read_guard = self.total_consistency_lock.read().unwrap(); + self.process_pending_monitor_events(); + } + #[cfg(any(test, feature = "_test_utils"))] pub(crate) fn test_raa_monitor_updates_held( &self, counterparty_node_id: PublicKey, channel_id: ChannelId, @@ -12495,19 +12237,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(monitor_update) = monitor_update_opt { let funding_txo = funding_txo_opt .expect("Funding outpoint must have been set for RAA handling to succeed"); - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, funding_txo, monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } (htlcs_to_fail, static_invoices) } else { @@ -12986,19 +12721,12 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } if let Some(monitor_update) = splice_promotion.monitor_update { - if let Some(data) = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, splice_promotion.funding_txo, monitor_update, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_post_monitor_update_chan_resume(data); - } + ); } } } else { @@ -13153,11 +12881,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread ); - for (chan_id, cp_node_id, post_update_data, failed_htlcs) in result { - if let Some(data) = post_update_data { - self.handle_post_monitor_update_chan_resume(data); - } - + for (chan_id, cp_node_id, failed_htlcs) in result { self.fail_holding_cell_htlcs(failed_htlcs, chan_id, &cp_node_id); self.needs_persist_flag.store(true, Ordering::Release); self.event_persist_notifier.notify(); @@ -13188,21 +12912,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ &&WithChannelContext::from(&self.logger, &chan.context, None), ); if monitor_opt.is_some() || !holding_cell_failed_htlcs.is_empty() { - let update_res = monitor_opt - .map(|monitor_update| { - self.handle_new_monitor_update( - &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, - chan, - chan.funding.get_funding_txo().unwrap(), - monitor_update, - ) - }) - .flatten(); + if let Some(monitor_update) = monitor_opt { + self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + chan, + chan.funding.get_funding_txo().unwrap(), + monitor_update, + ); + } let cp_node_id = chan.context.get_counterparty_node_id(); - updates.push((*chan_id, cp_node_id, update_res, holding_cell_failed_htlcs)); + updates.push((*chan_id, cp_node_id, holding_cell_failed_htlcs)); } } updates @@ -14736,11 +14455,8 @@ impl< if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() { log_debug!(logger, "Unlocking monitor updating and updating monitor", ); - let post_update_data = self.handle_new_monitor_update( + self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, - &mut peer_state.pending_msg_events, - peer_state.is_connected, chan, channel_funding_outpoint, monitor_update, @@ -14750,10 +14466,6 @@ impl< mem::drop(peer_state_lck); mem::drop(per_peer_state); - if let Some(data) = post_update_data { - self.handle_post_monitor_update_chan_resume(data); - } - self.handle_holding_cell_free_result(holding_cell_res); if further_update_exists { @@ -14833,18 +14545,13 @@ impl< }; self.pending_background_events.lock().unwrap().push(event); } else { - if let Some(actions) = self.handle_post_close_monitor_update( + self.handle_new_monitor_update_locked_actions_handled_by_caller( &mut peer_state.in_flight_monitor_updates, - &mut peer_state.monitor_update_blocked_actions, + channel_id, channel_funding_outpoint, - update, counterparty_node_id, - channel_id, - ) { - mem::drop(peer_state_lock); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions(actions); - } + update, + ); } }, } @@ -19699,9 +19406,6 @@ impl< per_peer_state: FairRwLock::new(per_peer_state), - #[cfg(not(any(test, feature = "_externalize_tests")))] - monitor_update_type: AtomicUsize::new(0), - pending_events: Mutex::new(pending_events_read), pending_events_processor: AtomicBool::new(false), pending_htlc_forwards_processor: AtomicBool::new(false), diff --git a/lightning/src/util/errors.rs b/lightning/src/util/errors.rs index cd72d60327f..a98a607640b 100644 --- a/lightning/src/util/errors.rs +++ b/lightning/src/util/errors.rs @@ -52,13 +52,11 @@ pub enum APIError { err: String, }, /// An attempt to call [`chain::Watch::watch_channel`]/[`chain::Watch::update_channel`] - /// returned a [`ChannelMonitorUpdateStatus::InProgress`] indicating the persistence of a - /// monitor update is awaiting async resolution. Once it resolves the attempted action should - /// complete automatically. + /// indicates the persistence of a monitor update is awaiting async resolution. Once it + /// resolves the attempted action should complete automatically. /// /// [`chain::Watch::watch_channel`]: crate::chain::Watch::watch_channel /// [`chain::Watch::update_channel`]: crate::chain::Watch::update_channel - /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress MonitorUpdateInProgress, /// [`SignerProvider::get_shutdown_scriptpubkey`] returned a shutdown scriptpubkey incompatible /// with the channel counterparty as negotiated in [`InitFeatures`].