Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 62 additions & 18 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,11 +1032,13 @@ where
/// See the release notes for LDK 0.1 for more information on this requirement.
///
/// [`ChannelMonitor`]s which do not need to be persisted (i.e. were last written by LDK 0.1 or
/// later) will be loaded without persistence and this method will return
/// [`ChannelMonitorUpdateStatus::Completed`].
/// later) will be loaded without persistence and a [`MonitorEvent::Completed`] will be
/// immediately queued.
///
/// [`MonitorEvent::Completed`]: channelmonitor::MonitorEvent::Completed
pub fn load_existing_monitor(
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
) -> Result<ChannelMonitorUpdateStatus, ()> {
) -> Result<(), ()> {
if !monitor.written_by_0_1_or_later() {
return chain::Watch::watch_channel(self, channel_id, monitor);
}
Expand All @@ -1054,9 +1056,20 @@ where
if let Some(ref chain_source) = self.chain_source {
monitor.load_outputs_to_watch(chain_source, &self.logger);
}
// The monitor is already persisted, so generate MonitorEvent::Completed immediately.
let funding_txo = monitor.get_funding_txo();
let counterparty_node_id = monitor.get_counterparty_node_id();
let update_id = monitor.get_latest_update_id();
entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(Vec::new()) });
self.pending_monitor_events.lock().unwrap().push((
funding_txo,
channel_id,
vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id: update_id }],
counterparty_node_id,
));
self.event_notifier.notify();

Ok(ChannelMonitorUpdateStatus::Completed)
Ok(())
}
}

Expand Down Expand Up @@ -1271,7 +1284,7 @@ where
{
fn watch_channel(
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
) -> Result<ChannelMonitorUpdateStatus, ()> {
) -> Result<(), ()> {
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
let mut monitors = self.monitors.write().unwrap();
let entry = match monitors.entry(channel_id) {
Expand All @@ -1285,33 +1298,52 @@ where
let update_id = monitor.get_latest_update_id();
let mut pending_monitor_updates = Vec::new();
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
match persist_res {
let persist_completed = match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
log_info!(logger, "Persistence of new ChannelMonitor in progress",);
pending_monitor_updates.push(update_id);
false
},
ChannelMonitorUpdateStatus::Completed => {
log_info!(logger, "Persistence of new ChannelMonitor completed",);
true
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
}
};
if let Some(ref chain_source) = self.chain_source {
monitor.load_outputs_to_watch(chain_source, &self.logger);
}
// Capture monitor info before moving it into the map.
let funding_txo = monitor.get_funding_txo();
let counterparty_node_id = monitor.get_counterparty_node_id();
entry.insert(MonitorHolder {
monitor,
pending_monitor_updates: Mutex::new(pending_monitor_updates),
});
Ok(persist_res)
if persist_completed {
// Persist returned Completed, so generate MonitorEvent::Completed immediately.
// We can't call channel_monitor_updated here because we hold the monitors write
// lock. Instead, push directly to pending_monitor_events which is a separate Mutex.
self.pending_monitor_events.lock().unwrap().push((
funding_txo,
channel_id,
vec![MonitorEvent::Completed {
funding_txo,
channel_id,
monitor_update_id: update_id,
}],
counterparty_node_id,
));
self.event_notifier.notify();
}
Ok(())
}

fn update_channel(
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
) -> ChannelMonitorUpdateStatus {
fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) {
// `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
// versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
debug_assert_eq!(update.channel_id.unwrap(), channel_id);
Expand All @@ -1328,7 +1360,7 @@ where
#[cfg(debug_assertions)]
panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
#[cfg(not(debug_assertions))]
ChannelMonitorUpdateStatus::InProgress
return;
},
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
Expand Down Expand Up @@ -1382,6 +1414,24 @@ where
"Persistence of ChannelMonitorUpdate id {:?} completed",
update_id,
);
// If no prior async updates are pending, we can immediately signal
// completion. Otherwise, completion of those prior updates via
// channel_monitor_updated will eventually generate the event (using
// monitor.get_latest_update_id() which covers this update too).
if !monitor_state.has_pending_updates(&pending_monitor_updates) {
let funding_txo = monitor.get_funding_txo();
self.pending_monitor_events.lock().unwrap().push((
funding_txo,
channel_id,
vec![MonitorEvent::Completed {
funding_txo,
channel_id,
monitor_update_id: monitor.get_latest_update_id(),
}],
monitor.get_counterparty_node_id(),
));
self.event_notifier.notify();
}
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
// Take the monitors lock for writing so that we poison it and any future
Expand Down Expand Up @@ -1413,12 +1463,6 @@ where
});
}
}

if update_res.is_err() {
ChannelMonitorUpdateStatus::InProgress
} else {
persist_res
}
},
}
}
Expand Down
47 changes: 23 additions & 24 deletions lightning/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub trait Confirm {

/// An enum representing the status of a channel monitor update persistence.
///
/// These are generally used as the return value for an implementation of [`Persist`] which is used
/// These are used as the return value for an implementation of [`Persist`] which is used
/// as the storage layer for a [`ChainMonitor`]. See the docs on [`Persist`] for a high-level
/// explanation of how to handle different cases.
///
Expand All @@ -234,7 +234,7 @@ pub enum ChannelMonitorUpdateStatus {
/// be available on restart even if the application crashes.
///
/// If you return this variant, you cannot later return [`InProgress`] from the same instance of
/// [`Persist`]/[`Watch`] without first restarting.
/// [`Persist`] without first restarting.
///
/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress
/// [`Persist`]: chainmonitor::Persist
Expand Down Expand Up @@ -264,7 +264,7 @@ pub enum ChannelMonitorUpdateStatus {
/// remaining cases are fixed, in rare cases, *using this feature may lead to funds loss*.
///
/// If you return this variant, you cannot later return [`Completed`] from the same instance of
/// [`Persist`]/[`Watch`] without first restarting.
/// [`Persist`] without first restarting.
///
/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress
/// [`Completed`]: ChannelMonitorUpdateStatus::Completed
Expand Down Expand Up @@ -293,7 +293,8 @@ pub enum ChannelMonitorUpdateStatus {
/// persisted to disk to ensure that the latest [`ChannelMonitor`] state can be reloaded if the
/// application crashes.
///
/// See method documentation and [`ChannelMonitorUpdateStatus`] for specific requirements.
/// Updates are always considered in-progress until completion is signaled asynchronously via
/// [`MonitorEvent::Completed`] in [`Watch::release_pending_monitor_events`].
pub trait Watch<ChannelSigner: EcdsaChannelSigner> {
/// Watches a channel identified by `channel_id` using `monitor`.
///
Expand All @@ -312,36 +313,37 @@ pub trait Watch<ChannelSigner: EcdsaChannelSigner> {
/// [`blocks_disconnected`]: channelmonitor::ChannelMonitor::blocks_disconnected
fn watch_channel(
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
) -> Result<ChannelMonitorUpdateStatus, ()>;
) -> Result<(), ()>;

/// Updates a channel identified by `channel_id` by applying `update` to its monitor.
///
/// Implementations must call [`ChannelMonitor::update_monitor`] with the given update. This
/// may fail (returning an `Err(())`), in which case this should return
/// [`ChannelMonitorUpdateStatus::InProgress`] (and the update should never complete). This
/// may fail (returning an `Err(())`), in which case the update should never complete. This
/// generally implies the channel has been closed (either by the funding outpoint being spent
/// on-chain or the [`ChannelMonitor`] having decided to do so and broadcasted a transaction),
/// and the [`ChannelManager`] state will be updated once it sees the funding spend on-chain.
///
/// In general, persistence failures should be retried after returning
/// [`ChannelMonitorUpdateStatus::InProgress`] and eventually complete. If a failure truly
/// cannot be retried, the node should shut down immediately after returning
/// [`ChannelMonitorUpdateStatus::UnrecoverableError`], see its documentation for more info.
/// The update is considered in-progress until a [`MonitorEvent::Completed`] is provided via
/// [`Watch::release_pending_monitor_events`]. While in-progress, the channel will be
/// "frozen", preventing us from revoking old states or submitting a new commitment
/// transaction to the counterparty.
///
/// Even when a channel has been "frozen", updates to the [`ChannelMonitor`] can continue to
/// occur (e.g. if an inbound HTLC which we forwarded was claimed upstream, resulting in us
/// attempting to claim it on this channel) and those updates must still be persisted.
///
/// In general, persistence failures should be retried in the background and eventually
/// complete. If a failure truly cannot be retried, the node should shut down.
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
fn update_channel(
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
) -> ChannelMonitorUpdateStatus;
fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate);

/// Returns any monitor events since the last call. Subsequent calls must only return new
/// events.
///
/// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no
/// further events may be returned here until the [`ChannelMonitor`] has been fully persisted
/// to disk.
///
/// For details on asynchronous [`ChannelMonitor`] updating and returning
/// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`].
fn release_pending_monitor_events(
&self,
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>;
Expand All @@ -352,13 +354,11 @@ impl<ChannelSigner: EcdsaChannelSigner, T: Watch<ChannelSigner> + ?Sized, W: Der
{
fn watch_channel(
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
) -> Result<ChannelMonitorUpdateStatus, ()> {
) -> Result<(), ()> {
self.deref().watch_channel(channel_id, monitor)
}

fn update_channel(
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
) -> ChannelMonitorUpdateStatus {
fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) {
self.deref().update_channel(channel_id, update)
}

Expand All @@ -384,9 +384,8 @@ impl<ChannelSigner: EcdsaChannelSigner, T: Watch<ChannelSigner> + ?Sized, W: Der
/// Note that use as part of a [`Watch`] implementation involves reentrancy. Therefore, the `Filter`
/// should not block on I/O. Implementations should instead queue the newly monitored data to be
/// processed later. Then, in order to block until the data has been processed, any [`Watch`]
/// invocation that has called the `Filter` must return [`InProgress`].
///
/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress
/// invocation that has called the `Filter` should delay its [`MonitorEvent::Completed`] until
/// processing finishes.
/// [BIP 157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
/// [BIP 158]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
pub trait Filter {
Expand Down
2 changes: 0 additions & 2 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9193,8 +9193,6 @@ where
/// [`Self::monitor_updating_restored`] is called.
///
/// [`ChannelManager`]: super::channelmanager::ChannelManager
/// [`chain::Watch`]: crate::chain::Watch
/// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress
fn monitor_updating_paused<L: Logger>(
&mut self, resend_raa: bool, resend_commitment: bool, resend_channel_ready: bool,
pending_forwards: Vec<(PendingHTLCInfo, u64)>,
Expand Down
Loading
Loading