From 2a55a2676a688d8a8af4445568bdea6e6b699dae Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 12:12:32 -0400 Subject: [PATCH 01/20] Split up some tests that have many variants Helps when debugging to know which variants failed. --- lightning/src/ln/monitor_tests.rs | 60 +++++++++++++++++++++++++++++-- lightning/src/ln/payment_tests.rs | 26 +++++++++++++- 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index efd2084a38e..1a68e5165a8 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -3801,27 +3801,83 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b } #[test] -fn test_lost_timeout_monitor_events() { +fn test_lost_timeout_monitor_events_a() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_b() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_c() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_d() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_e() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_f() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_g() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_h() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, false); +} +#[test] +fn test_lost_timeout_monitor_events_i() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, false); +} +#[test] +fn test_lost_timeout_monitor_events_j() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, false); - +} +#[test] +fn test_lost_timeout_monitor_events_k() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_l() { do_test_lost_timeout_monitor_events(CommitmentType::RevokedCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_m() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_n() { do_test_lost_timeout_monitor_events(CommitmentType::PreviousCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_o() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_p() { do_test_lost_timeout_monitor_events(CommitmentType::LatestCounterparty, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_q() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_r() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithoutLastHTLC, true, true); +} +#[test] +fn test_lost_timeout_monitor_events_s() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, false, true); +} +#[test] +fn test_lost_timeout_monitor_events_t() { do_test_lost_timeout_monitor_events(CommitmentType::LocalWithLastHTLC, true, true); } diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 7d198d2d70d..d405567473d 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -1423,15 +1423,39 @@ fn do_test_dup_htlc_onchain_doesnt_fail_on_reload( } #[test] -fn test_dup_htlc_onchain_doesnt_fail_on_reload() { +fn test_dup_htlc_onchain_doesnt_fail_on_reload_a() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_b() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_c() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_d() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_e() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_f() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, false, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_g() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, true); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_h() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, true, false); +} +#[test] +fn test_dup_htlc_onchain_doesnt_fail_on_reload_i() { do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, false, false); } From 0430d7e9091eecff11592717f70a19399ace0905 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 19 Mar 2026 16:08:46 -0400 Subject: [PATCH 02/20] Remove unnecessary pending_monitor_events clone --- lightning/src/chain/channelmonitor.rs | 31 +++++++++++++-------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 1eb1484d07d..ec33db6be56 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -68,8 +68,8 @@ use crate::util::byte_utils; use crate::util::logger::{Logger, WithContext}; use crate::util::persist::MonitorName; use crate::util::ser::{ - MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, Writeable, Writer, - U48, + Iterable, MaybeReadable, Readable, ReadableArgs, RequiredWrapper, UpgradableRequired, + Writeable, Writer, U48, }; #[allow(unused_imports)] @@ -1719,24 +1719,23 @@ pub(crate) fn write_chanmon_internal( channel_monitor.lockdown_from_offchain.write(writer)?; channel_monitor.holder_tx_signed.write(writer)?; - // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` for backwards compatibility. - let pending_monitor_events = - match channel_monitor.pending_monitor_events.iter().find(|ev| match ev { - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) { - Some(MonitorEvent::HolderForceClosedWithInfo { outpoint, .. }) => { - let mut pending_monitor_events = channel_monitor.pending_monitor_events.clone(); - pending_monitor_events.push(MonitorEvent::HolderForceClosed(*outpoint)); - pending_monitor_events - }, - _ => channel_monitor.pending_monitor_events.clone(), - }; + // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` + // for backwards compatibility. + let holder_force_closed_compat = channel_monitor.pending_monitor_events.iter().find_map(|ev| { + if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { + Some(MonitorEvent::HolderForceClosed(*outpoint)) + } else { + None + } + }); + let pending_monitor_events = Iterable( + channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), + ); write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events, required_vec), + (5, pending_monitor_events, required), (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), From e3b1672d2fb20871ae3935dfad0e4535944f5f5c Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 15:26:43 -0400 Subject: [PATCH 03/20] Add persistent_monitor_events flag to monitors/manager Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. As a first step towards this, here we persist a flag in the ChannelManager and ChannelMonitors indicating whether this new feature is enabled. It will be used in upcoming commits to maintain compatibility and create an upgrade/downgrade path between LDK versions. --- lightning/src/chain/channelmonitor.rs | 21 ++++++++++++++++++++ lightning/src/ln/channelmanager.rs | 28 +++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index ec33db6be56..98687b0bf10 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1281,6 +1281,12 @@ pub(crate) struct ChannelMonitorImpl { // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. pending_monitor_events: Vec, + /// When set, monitor events are retained until explicitly acked rather than cleared on read. + /// + /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on + /// startup, and make the monitor responsible for both off- and on-chain payment resolution. Will + /// be always set once support for this feature is complete. + persistent_events_enabled: bool, pub(super) pending_events: Vec, pub(super) is_processing_pending_events: bool, @@ -1732,8 +1738,12 @@ pub(crate) fn write_chanmon_internal( channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), ); + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(()); + write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), + (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), (5, pending_monitor_events, required), (7, channel_monitor.funding_spend_seen, required), @@ -1937,6 +1947,7 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), + persistent_events_enabled: false, pending_events: Vec::new(), is_processing_pending_events: false, @@ -6693,8 +6704,10 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut alternative_funding_confirmed = None; let mut is_manual_broadcast = RequiredWrapper(None); let mut funding_seen_onchain = RequiredWrapper(None); + let mut persistent_events_enabled: Option<()> = None; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), + (2, persistent_events_enabled, option), (3, htlcs_resolved_on_chain, optional_vec), (5, pending_monitor_events, optional_vec), (7, funding_spend_seen, option), @@ -6716,6 +6729,13 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (35, is_manual_broadcast, (default_value, false)), (37, funding_seen_onchain, (default_value, true)), }); + + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_events_enabled.is_some() { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue) + } + // Note that `payment_preimages_with_info` was added (and is always written) in LDK 0.1, so // we can use it to determine if this monitor was last written by LDK 0.1 or later. let written_by_0_1_or_later = payment_preimages_with_info.is_some(); @@ -6864,6 +6884,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP payment_preimages, pending_monitor_events: pending_monitor_events.unwrap(), + persistent_events_enabled: persistent_events_enabled.is_some(), pending_events, is_processing_pending_events: false, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 30eb7f85d71..d7f13e156be 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2952,6 +2952,15 @@ pub struct ChannelManager< /// offer they resolve to to the given one. pub testing_dnssec_proof_offer_resolution_override: Mutex>, + /// When set, monitors will repeatedly provide an event back to the `ChannelManager` on restart + /// until the event is explicitly acknowledged as processed. + /// + /// Allows us to reconstruct pending HTLC state by replaying monitor events on startup, rather + /// than from complexly polling and reconciling Channel{Monitor} APIs, as well as move the + /// responsibility of off-chain payment resolution from the Channel to the monitor. Will be + /// always set once support is complete. + persistent_monitor_events: bool, + #[cfg(test)] pub(super) entropy_source: ES, #[cfg(not(test))] @@ -3641,6 +3650,8 @@ impl< logger, + persistent_monitor_events: false, + #[cfg(feature = "_test_utils")] testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), } @@ -18189,6 +18200,9 @@ impl< } } + // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. + let persistent_monitor_events = self.persistent_monitor_events.then_some(()); + write_tlv_fields!(writer, { (1, pending_outbound_payments_no_retry, required), (2, pending_intercepted_htlcs, option), @@ -18201,6 +18215,7 @@ impl< (9, htlc_purposes, required_vec), (10, legacy_in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), + (12, persistent_monitor_events, option), (13, htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_opt, option), (15, self.inbound_payment_id_secret, required), @@ -18300,6 +18315,7 @@ pub(super) struct ChannelManagerData { forward_htlcs_legacy: HashMap>, pending_intercepted_htlcs_legacy: HashMap, decode_update_add_htlcs_legacy: HashMap>, + persistent_monitor_events: bool, // The `ChannelManager` version that was written. version: u8, } @@ -18485,6 +18501,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> let mut inbound_payment_id_secret = None; let mut peer_storage_dir: Option)>> = None; let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new(); + let mut persistent_monitor_events: Option<()> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs_legacy, option), @@ -18497,6 +18514,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (9, claimable_htlc_purposes, optional_vec), (10, legacy_in_flight_monitor_updates, option), (11, probing_cookie_secret, option), + (12, persistent_monitor_events, option), (13, amountless_claimable_htlc_onion_fields, optional_vec), (14, decode_update_add_htlcs_legacy, option), (15, inbound_payment_id_secret, option), @@ -18505,6 +18523,12 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (21, async_receive_offer_cache, (default_value, async_receive_offer_cache)), }); + #[cfg(not(any(feature = "_test_utils", test)))] + if persistent_monitor_events.is_some() { + // This feature isn't supported yet so error if the writer expected it to be. + return Err(DecodeError::InvalidValue); + } + // Merge legacy pending_outbound_payments fields into a single HashMap. // Priority: pending_outbound_payments (TLV 3) > pending_outbound_payments_no_retry (TLV 1) // > pending_outbound_payments_compat (non-TLV legacy) @@ -18621,6 +18645,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> peer_storage_dir: peer_storage_dir.unwrap_or_default(), async_receive_offer_cache, version, + persistent_monitor_events: persistent_monitor_events.is_some(), }) } } @@ -18922,6 +18947,7 @@ impl< mut in_flight_monitor_updates, peer_storage_dir, async_receive_offer_cache, + persistent_monitor_events, version: _version, } = data; @@ -20185,6 +20211,8 @@ impl< logger: args.logger, config: RwLock::new(args.config), + persistent_monitor_events, + #[cfg(feature = "_test_utils")] testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), }; From 76a984bbef03606a47ad154d8d0947421e12c37f Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 18 Mar 2026 15:08:54 -0400 Subject: [PATCH 04/20] Add helper to push monitor events Cleans up the next commit --- lightning/src/chain/chainmonitor.rs | 53 +++++++-------------------- lightning/src/chain/channelmonitor.rs | 26 +++++++++---- 2 files changed, 32 insertions(+), 47 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 07d835dc785..8f54c1a85e8 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -367,9 +367,6 @@ pub struct ChainMonitor< fee_estimator: F, persister: P, _entropy_source: ES, - /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly - /// from the user and not from a [`ChannelMonitor`]. - pending_monitor_events: Mutex, PublicKey)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, @@ -437,7 +434,6 @@ where logger, fee_estimator: feeest, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::clone(&event_notifier), persister: AsyncPersister { persister, event_notifier }, @@ -656,7 +652,6 @@ where fee_estimator: feeest, persister, _entropy_source, - pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), event_notifier: Arc::new(Notifier::new()), pending_send_only_events: Mutex::new(Vec::new()), @@ -801,16 +796,11 @@ where return Ok(()); } let funding_txo = monitor_data.monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( + monitor_data.monitor.push_monitor_event(MonitorEvent::Completed { funding_txo, channel_id, - vec![MonitorEvent::Completed { - funding_txo, - channel_id, - monitor_update_id: monitor_data.monitor.get_latest_update_id(), - }], - monitor_data.monitor.get_counterparty_node_id(), - )); + monitor_update_id: monitor_data.monitor.get_latest_update_id(), + }); self.event_notifier.notify(); Ok(()) @@ -823,14 +813,11 @@ where pub fn force_channel_monitor_updated(&self, channel_id: ChannelId, monitor_update_id: u64) { let monitors = self.monitors.read().unwrap(); let monitor = &monitors.get(&channel_id).unwrap().monitor; - let counterparty_node_id = monitor.get_counterparty_node_id(); - let funding_txo = monitor.get_funding_txo(); - self.pending_monitor_events.lock().unwrap().push(( - funding_txo, + monitor.push_monitor_event(MonitorEvent::Completed { + funding_txo: monitor.get_funding_txo(), channel_id, - vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id }], - counterparty_node_id, - )); + monitor_update_id, + }); self.event_notifier.notify(); } @@ -1265,21 +1252,13 @@ where // The channel is post-close (funding spend seen, lockdown, or // holder tx signed). Return InProgress so ChannelManager freezes // the channel until the force-close MonitorEvents are processed. - // Push a Completed event into pending_monitor_events so it gets - // picked up after the per-monitor events in the next - // release_pending_monitor_events call. - let funding_txo = monitor.get_funding_txo(); - let channel_id = monitor.channel_id(); - self.pending_monitor_events.lock().unwrap().push(( - funding_txo, - channel_id, - vec![MonitorEvent::Completed { - funding_txo, - channel_id, - monitor_update_id: monitor.get_latest_update_id(), - }], - monitor.get_counterparty_node_id(), - )); + // Push a Completed event into the monitor so it gets picked up + // in the next release_pending_monitor_events call. + monitor.push_monitor_event(MonitorEvent::Completed { + funding_txo: monitor.get_funding_txo(), + channel_id: monitor.channel_id(), + monitor_update_id: monitor.get_latest_update_id(), + }); log_debug!( logger, "Deferring completion of ChannelMonitorUpdate id {:?} (channel is post-close)", @@ -1664,10 +1643,6 @@ where )); } } - // Drain pending_monitor_events (which includes deferred post-close - // completions) after per-monitor events so that force-close - // MonitorEvents are processed by ChannelManager first. - pending_monitor_events.extend(self.pending_monitor_events.lock().unwrap().split_off(0)); pending_monitor_events } } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 98687b0bf10..85e885c1185 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -183,6 +183,10 @@ impl Readable for ChannelMonitorUpdate { } } +fn push_monitor_event(pending_monitor_events: &mut Vec, event: MonitorEvent) { + pending_monitor_events.push(event); +} + /// An event to be processed by the ChannelManager. #[derive(Clone, PartialEq, Eq)] pub enum MonitorEvent { @@ -226,8 +230,6 @@ pub enum MonitorEvent { }, } impl_writeable_tlv_based_enum_upgradable_legacy!(MonitorEvent, - // Note that Completed is currently never serialized to disk as it is generated only in - // ChainMonitor. (0, Completed) => { (0, funding_txo, required), (2, monitor_update_id, required), @@ -2165,6 +2167,10 @@ impl ChannelMonitor { self.inner.lock().unwrap().get_and_clear_pending_monitor_events() } + pub(super) fn push_monitor_event(&self, event: MonitorEvent) { + self.inner.lock().unwrap().push_monitor_event(event); + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] @@ -3890,7 +3896,7 @@ impl ChannelMonitorImpl { outpoint: funding_outpoint, channel_id: self.channel_id, }; - self.pending_monitor_events.push(event); + push_monitor_event(&mut self.pending_monitor_events, event); } // Although we aren't signing the transaction directly here, the transaction will be signed @@ -4551,6 +4557,10 @@ impl ChannelMonitorImpl { &self.outputs_to_watch } + fn push_monitor_event(&mut self, event: MonitorEvent) { + push_monitor_event(&mut self.pending_monitor_events, event); + } + fn get_and_clear_pending_monitor_events(&mut self) -> Vec { let mut ret = Vec::new(); mem::swap(&mut ret, &mut self.pending_monitor_events); @@ -5610,7 +5620,7 @@ impl ChannelMonitorImpl { ); log_info!(logger, "Channel closed by funding output spend in txid {txid}"); if !self.funding_spend_seen { - self.pending_monitor_events.push(MonitorEvent::CommitmentTxConfirmed(())); + self.push_monitor_event(MonitorEvent::CommitmentTxConfirmed(())); } self.funding_spend_seen = true; @@ -5785,7 +5795,7 @@ impl ChannelMonitorImpl { log_debug!(logger, "HTLC {} failure update in {} has got enough confirmations to be passed upstream", &payment_hash, entry.txid); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + self.push_monitor_event(MonitorEvent::HTLCEvent(HTLCUpdate { payment_hash, payment_preimage: None, source, @@ -5895,7 +5905,7 @@ impl ChannelMonitorImpl { log_error!(logger, "Failing back HTLC {} upstream to preserve the \ channel as the forward HTLC hasn't resolved and our backward HTLC \ expires soon at {}", log_bytes!(htlc.payment_hash.0), inbound_htlc_expiry); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source: source.clone(), payment_preimage: None, payment_hash: htlc.payment_hash, @@ -6312,7 +6322,7 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, @@ -6336,7 +6346,7 @@ impl ChannelMonitorImpl { }, }); self.counterparty_fulfilled_htlcs.insert(SentHTLCId::from_source(&source), payment_preimage); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + push_monitor_event(&mut self.pending_monitor_events, MonitorEvent::HTLCEvent(HTLCUpdate { source, payment_preimage: Some(payment_preimage), payment_hash, From 52599075a430703013d1e14faa93519553d7a440 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 18 Mar 2026 17:18:26 -0400 Subject: [PATCH 05/20] Rename pending_monitor_events to _legacy This field will be deprecated in upcoming commits when we start persisting MonitorEvent ids alongside the MonitorEvents. --- lightning/src/chain/channelmonitor.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 85e885c1185..758905ffdec 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1736,7 +1736,7 @@ pub(crate) fn write_chanmon_internal( None } }); - let pending_monitor_events = Iterable( + let pending_monitor_events_legacy = Iterable( channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), ); @@ -1747,7 +1747,7 @@ pub(crate) fn write_chanmon_internal( (1, channel_monitor.funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events, required), + (5, pending_monitor_events_legacy, required), (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), @@ -6644,16 +6644,16 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } - let pending_monitor_events_len: u64 = Readable::read(reader)?; - let mut pending_monitor_events = Some( - Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); - for _ in 0..pending_monitor_events_len { + let pending_monitor_events_legacy_len: u64 = Readable::read(reader)?; + let mut pending_monitor_events_legacy = Some( + Vec::with_capacity(cmp::min(pending_monitor_events_legacy_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)))); + for _ in 0..pending_monitor_events_legacy_len { let ev = match ::read(reader)? { 0 => MonitorEvent::HTLCEvent(Readable::read(reader)?), 1 => MonitorEvent::HolderForceClosed(outpoint), _ => return Err(DecodeError::InvalidValue) }; - pending_monitor_events.as_mut().unwrap().push(ev); + pending_monitor_events_legacy.as_mut().unwrap().push(ev); } let pending_events_len: u64 = Readable::read(reader)?; @@ -6719,7 +6719,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (1, funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, htlcs_resolved_on_chain, optional_vec), - (5, pending_monitor_events, optional_vec), + (5, pending_monitor_events_legacy, optional_vec), (7, funding_spend_seen, option), (9, counterparty_node_id, option), (11, confirmed_commitment_tx_counterparty_output, option), @@ -6768,11 +6768,11 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both // events, we can remove the `HolderForceClosed` event and just keep the `HolderForceClosedWithInfo`. - if let Some(ref mut pending_monitor_events) = pending_monitor_events { - if pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && - pending_monitor_events.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) + if let Some(ref mut evs) = pending_monitor_events_legacy { + if evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosed(_))) && + evs.iter().any(|e| matches!(e, MonitorEvent::HolderForceClosedWithInfo { .. })) { - pending_monitor_events.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); + evs.retain(|e| !matches!(e, MonitorEvent::HolderForceClosed(_))); } } @@ -6893,7 +6893,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP current_holder_commitment_number, payment_preimages, - pending_monitor_events: pending_monitor_events.unwrap(), + pending_monitor_events: pending_monitor_events_legacy.unwrap(), persistent_events_enabled: persistent_events_enabled.is_some(), pending_events, is_processing_pending_events: false, From 48438df7f2e43bca98e637004d8f4673e117124c Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 13:37:38 -0400 Subject: [PATCH 06/20] Add chain::Watch ack_monitor_event API Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we add an as-yet-unused API to chain::Watch to allow the ChannelManager to tell the a ChannelMonitor that a MonitorEvent has been irrevocably processed and can be deleted. --- fuzz/src/chanmon_consistency.rs | 5 +++++ lightning/src/chain/chainmonitor.rs | 22 ++++++++++++++++++++++ lightning/src/chain/channelmonitor.rs | 6 ++++++ lightning/src/chain/mod.rs | 14 ++++++++++++++ lightning/src/util/test_utils.rs | 6 +++++- 5 files changed, 52 insertions(+), 1 deletion(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index e4fd3475024..6ad1fa7c150 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,6 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; +use lightning::chain::chainmonitor::MonitorEventSource; use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; use lightning::chain::transaction::OutPoint; use lightning::chain::{ @@ -366,6 +367,10 @@ impl chain::Watch for TestChainMonitor { ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } struct KeyProvider { diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 8f54c1a85e8..1b4a2166271 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -67,6 +67,21 @@ use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// Identifies the source of a [`MonitorEvent`] for acknowledgment via +/// [`chain::Watch::ack_monitor_event`] once the event has been processed. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct MonitorEventSource { + /// The event ID assigned by the [`ChannelMonitor`]. + pub event_id: u64, + /// The channel from which the [`MonitorEvent`] originated. + pub channel_id: ChannelId, +} + +impl_writeable_tlv_based!(MonitorEventSource, { + (1, event_id, required), + (3, channel_id, required), +}); + /// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. enum PendingMonitorOp { /// A new monitor to insert and persist. @@ -1645,6 +1660,13 @@ where } pending_monitor_events } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + let monitors = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitors.get(&source.channel_id) { + monitor_state.monitor.ack_monitor_event(source.event_id); + } + } } impl< diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 758905ffdec..f09cb5dfdd9 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -2171,6 +2171,12 @@ impl ChannelMonitor { self.inner.lock().unwrap().push_monitor_event(event); } + /// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed. + /// Generally called by [`chain::Watch::ack_monitor_event`]. + pub fn ack_monitor_event(&self, _event_id: u64) { + // TODO: once events have ids, remove the corresponding event here + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 99e184d8fda..fd61d492509 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -18,6 +18,7 @@ use bitcoin::network::Network; use bitcoin::script::{Script, ScriptBuf}; use bitcoin::secp256k1::PublicKey; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::types::ChannelId; @@ -341,6 +342,15 @@ pub trait Watch { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; + + /// Acknowledges and removes a [`MonitorEvent`] previously returned by + /// [`Watch::release_pending_monitor_events`] by its event ID. + /// + /// Once acknowledged, the event will no longer be returned by future calls to + /// [`Watch::release_pending_monitor_events`] and will not be replayed on restart. + /// + /// Events may be acknowledged in any order. + fn ack_monitor_event(&self, source: MonitorEventSource); } impl + ?Sized, W: Deref> @@ -363,6 +373,10 @@ impl + ?Sized, W: Der ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { self.deref().release_pending_monitor_events() } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.deref().ack_monitor_event(source) + } } /// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index abcc24adf8d..6a68f42254e 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -15,7 +15,7 @@ use crate::chain::chaininterface; #[cfg(any(test, feature = "_externalize_tests"))] use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW; use crate::chain::chaininterface::{ConfirmationTarget, TransactionType}; -use crate::chain::chainmonitor::{ChainMonitor, Persist}; +use crate::chain::chainmonitor::{ChainMonitor, MonitorEventSource, Persist}; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, }; @@ -733,6 +733,10 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { } return self.chain_monitor.release_pending_monitor_events(); } + + fn ack_monitor_event(&self, source: MonitorEventSource) { + self.chain_monitor.ack_monitor_event(source); + } } #[cfg(any(test, feature = "_externalize_tests"))] From 868fd0c7130d56f6abaafde19870a5b4696d4094 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 18 Mar 2026 17:24:04 -0400 Subject: [PATCH 07/20] Add monitor event ids Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To allow the ChannelManager to ack specific monitor events once they are resolved in upcoming commits, here we give each MonitorEvent a corresponding unique id. It's implemented in such a way that we can delete legacy monitor event code in the future when the new persistent monitor events flag is enabled by default. --- fuzz/src/chanmon_consistency.rs | 2 +- lightning/src/chain/chainmonitor.rs | 2 +- lightning/src/chain/channelmonitor.rs | 200 +++++++++++++----- lightning/src/chain/mod.rs | 8 +- lightning/src/ln/chanmon_update_fail_tests.rs | 6 +- lightning/src/ln/channelmanager.rs | 2 +- lightning/src/util/test_utils.rs | 6 +- 7 files changed, 161 insertions(+), 65 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 6ad1fa7c150..ef84b7ded3c 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -364,7 +364,7 @@ impl chain::Watch for TestChainMonitor { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { return self.chain_monitor.release_pending_monitor_events(); } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 1b4a2166271..6e2fa4050a9 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1638,7 +1638,7 @@ where fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { let _ = self.channel_monitor_updated(channel_id, update_id); } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index f09cb5dfdd9..2457c752cfd 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -183,8 +183,13 @@ impl Readable for ChannelMonitorUpdate { } } -fn push_monitor_event(pending_monitor_events: &mut Vec, event: MonitorEvent) { - pending_monitor_events.push(event); +fn push_monitor_event( + pending_monitor_events: &mut Vec<(u64, MonitorEvent)>, event: MonitorEvent, + next_monitor_event_id: &mut u64, +) { + let id = *next_monitor_event_id; + *next_monitor_event_id += 1; + pending_monitor_events.push((id, event)); } /// An event to be processed by the ChannelManager. @@ -1282,13 +1287,14 @@ pub(crate) struct ChannelMonitorImpl { // Note that because the `event_lock` in `ChainMonitor` is only taken in // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. - pending_monitor_events: Vec, + pending_monitor_events: Vec<(u64, MonitorEvent)>, /// When set, monitor events are retained until explicitly acked rather than cleared on read. /// /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on /// startup, and make the monitor responsible for both off- and on-chain payment resolution. Will /// be always set once support for this feature is complete. persistent_events_enabled: bool, + next_monitor_event_id: u64, pub(super) pending_events: Vec, pub(super) is_processing_pending_events: bool, @@ -1669,32 +1675,38 @@ pub(crate) fn write_chanmon_internal( writer.write_all(&payment_preimage.0[..])?; } - writer.write_all( - &(channel_monitor - .pending_monitor_events - .iter() - .filter(|ev| match ev { - MonitorEvent::HTLCEvent(_) => true, - MonitorEvent::HolderForceClosed(_) => true, - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) - .count() as u64) - .to_be_bytes(), - )?; - for event in channel_monitor.pending_monitor_events.iter() { - match event { - MonitorEvent::HTLCEvent(upd) => { - 0u8.write(writer)?; - upd.write(writer)?; - }, - MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, - // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep - // backwards compatibility, we write a `HolderForceClosed` event along with the - // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. - MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, - _ => {}, // Covered in the TLV writes below + if !channel_monitor.persistent_events_enabled { + writer.write_all( + &(channel_monitor + .pending_monitor_events + .iter() + .filter(|(_, ev)| match ev { + MonitorEvent::HTLCEvent(_) => true, + MonitorEvent::HolderForceClosed(_) => true, + MonitorEvent::HolderForceClosedWithInfo { .. } => true, + _ => false, + }) + .count() as u64) + .to_be_bytes(), + )?; + for (_, event) in channel_monitor.pending_monitor_events.iter() { + match event { + MonitorEvent::HTLCEvent(upd) => { + 0u8.write(writer)?; + upd.write(writer)?; + }, + MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, + // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep + // backwards compatibility, we write a `HolderForceClosed` event along with the + // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. + MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, + _ => {}, // Covered in the TLV writes below + } } + } else { + // If `persistent_events_enabled` is set, we'll write the events with their event ids in the + // TLV section below. + writer.write_all(&(0u64).to_be_bytes())?; } writer.write_all(&(channel_monitor.pending_events.len() as u64).to_be_bytes())?; @@ -1729,25 +1741,40 @@ pub(crate) fn write_chanmon_internal( // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` // for backwards compatibility. - let holder_force_closed_compat = channel_monitor.pending_monitor_events.iter().find_map(|ev| { - if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { - Some(MonitorEvent::HolderForceClosed(*outpoint)) - } else { - None - } - }); - let pending_monitor_events_legacy = Iterable( - channel_monitor.pending_monitor_events.iter().chain(holder_force_closed_compat.as_ref()), - ); + let holder_force_closed_compat = + channel_monitor.pending_monitor_events.iter().find_map(|(_, ev)| { + if let MonitorEvent::HolderForceClosedWithInfo { outpoint, .. } = ev { + Some(MonitorEvent::HolderForceClosed(*outpoint)) + } else { + None + } + }); + let pending_monitor_events_legacy = if !channel_monitor.persistent_events_enabled { + Some(Iterable( + channel_monitor + .pending_monitor_events + .iter() + .map(|(_, ev)| ev) + .chain(holder_force_closed_compat.as_ref()), + )) + } else { + None + }; // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(()); + let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() { + Some(Iterable(channel_monitor.pending_monitor_events.iter())) + } else { + None + }; write_tlv_fields!(writer, { (1, channel_monitor.funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, channel_monitor.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events_legacy, required), + (4, pending_mon_evs_with_ids, option), + (5, pending_monitor_events_legacy, option), (7, channel_monitor.funding_spend_seen, required), (9, channel_monitor.counterparty_node_id, required), (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), @@ -1766,6 +1793,7 @@ pub(crate) fn write_chanmon_internal( (34, channel_monitor.alternative_funding_confirmed, option), (35, channel_monitor.is_manual_broadcast, required), (37, channel_monitor.funding_seen_onchain, required), + (39, channel_monitor.next_monitor_event_id, required), }); Ok(()) @@ -1950,6 +1978,7 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), persistent_events_enabled: false, + next_monitor_event_id: 0, pending_events: Vec::new(), is_processing_pending_events: false, @@ -2163,7 +2192,7 @@ impl ChannelMonitor { /// Get the list of HTLCs who's status has been updated on chain. This should be called by /// ChannelManager via [`chain::Watch::release_pending_monitor_events`]. - pub fn get_and_clear_pending_monitor_events(&self) -> Vec { + pub fn get_and_clear_pending_monitor_events(&self) -> Vec<(u64, MonitorEvent)> { self.inner.lock().unwrap().get_and_clear_pending_monitor_events() } @@ -2177,6 +2206,20 @@ impl ChannelMonitor { // TODO: once events have ids, remove the corresponding event here } + /// Copies [`MonitorEvent`] state from `other` into `self`. + /// Used in tests to align transient runtime state before equality comparison after a + /// serialization round-trip. + #[cfg(any(test, feature = "_test_utils"))] + pub fn copy_monitor_event_state(&self, other: &ChannelMonitor) { + let (pending, next_id) = { + let other_inner = other.inner.lock().unwrap(); + (other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id) + }; + let mut self_inner = self.inner.lock().unwrap(); + self_inner.pending_monitor_events = pending; + self_inner.next_monitor_event_id = next_id; + } + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] @@ -3902,7 +3945,7 @@ impl ChannelMonitorImpl { outpoint: funding_outpoint, channel_id: self.channel_id, }; - push_monitor_event(&mut self.pending_monitor_events, event); + push_monitor_event(&mut self.pending_monitor_events, event, &mut self.next_monitor_event_id); } // Although we aren't signing the transaction directly here, the transaction will be signed @@ -4493,12 +4536,16 @@ impl ChannelMonitorImpl { "Failing HTLC from late counterparty commitment update immediately \ (funding spend already confirmed)" ); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { - payment_hash, - payment_preimage: None, - source: source.clone(), - htlc_value_satoshis, - })); + push_monitor_event( + &mut self.pending_monitor_events, + MonitorEvent::HTLCEvent(HTLCUpdate { + payment_hash, + payment_preimage: None, + source: source.clone(), + htlc_value_satoshis, + }), + &mut self.next_monitor_event_id, + ); self.htlcs_resolved_on_chain.push(IrrevocablyResolvedHTLC { commitment_tx_output_idx: None, resolving_txid: Some(confirmed_txid), @@ -4564,10 +4611,14 @@ impl ChannelMonitorImpl { } fn push_monitor_event(&mut self, event: MonitorEvent) { - push_monitor_event(&mut self.pending_monitor_events, event); + push_monitor_event( + &mut self.pending_monitor_events, + event, + &mut self.next_monitor_event_id, + ); } - fn get_and_clear_pending_monitor_events(&mut self) -> Vec { + fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> { let mut ret = Vec::new(); mem::swap(&mut ret, &mut self.pending_monitor_events); ret @@ -5898,7 +5949,7 @@ impl ChannelMonitorImpl { continue; } let duplicate_event = self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == *source } else { false }); if duplicate_event { @@ -5916,7 +5967,7 @@ impl ChannelMonitorImpl { payment_preimage: None, payment_hash: htlc.payment_hash, htlc_value_satoshis: Some(htlc.amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } } @@ -6315,7 +6366,7 @@ impl ChannelMonitorImpl { if let Some((source, payment_hash, amount_msat)) = payment_data { if accepted_preimage_claim { if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid: tx.compute_txid(), height, @@ -6333,11 +6384,11 @@ impl ChannelMonitorImpl { payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } else if offered_preimage_claim { if !self.pending_monitor_events.iter().any( - |update| if let &MonitorEvent::HTLCEvent(ref upd) = update { + |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { @@ -6357,7 +6408,7 @@ impl ChannelMonitorImpl { payment_preimage: Some(payment_preimage), payment_hash, htlc_value_satoshis: Some(amount_msat / 1000), - })); + }), &mut self.next_monitor_event_id); } } else { self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { @@ -6721,10 +6772,13 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut is_manual_broadcast = RequiredWrapper(None); let mut funding_seen_onchain = RequiredWrapper(None); let mut persistent_events_enabled: Option<()> = None; + let mut next_monitor_event_id: Option = None; + let mut pending_mon_evs_with_ids: Option> = None; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), (2, persistent_events_enabled, option), (3, htlcs_resolved_on_chain, optional_vec), + (4, pending_mon_evs_with_ids, optional_vec), (5, pending_monitor_events_legacy, optional_vec), (7, funding_spend_seen, option), (9, counterparty_node_id, option), @@ -6744,6 +6798,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (34, alternative_funding_confirmed, option), (35, is_manual_broadcast, (default_value, false)), (37, funding_seen_onchain, (default_value, true)), + (39, next_monitor_event_id, option), }); #[cfg(not(any(feature = "_test_utils", test)))] @@ -6782,6 +6837,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } + // If persistent events are enabled, use the events with their persisted IDs from TLV 4. + // Otherwise, use the legacy events from TLV 5 and assign sequential IDs. + let (next_monitor_event_id, pending_monitor_events): (u64, Vec<(u64, MonitorEvent)>) = + if persistent_events_enabled.is_some() { + let evs = pending_mon_evs_with_ids.unwrap_or_default() + .into_iter().map(|ev| (ev.0, ev.1)).collect(); + (next_monitor_event_id.unwrap_or(0), evs) + } else if let Some(events) = pending_monitor_events_legacy { + let next_id = next_monitor_event_id.unwrap_or(events.len() as u64); + let evs = events.into_iter().enumerate() + .map(|(i, ev)| (i as u64, ev)).collect(); + (next_id, evs) + } else { + (next_monitor_event_id.unwrap_or(0), Vec::new()) + }; + let channel_parameters = channel_parameters.unwrap_or_else(|| { onchain_tx_handler.channel_parameters().clone() }); @@ -6899,8 +6970,9 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP current_holder_commitment_number, payment_preimages, - pending_monitor_events: pending_monitor_events_legacy.unwrap(), + pending_monitor_events, persistent_events_enabled: persistent_events_enabled.is_some(), + next_monitor_event_id, pending_events, is_processing_pending_events: false, @@ -6949,6 +7021,22 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } +/// Deserialization wrapper for reading a `(u64, MonitorEvent)`. +/// Necessary because we can't deserialize a (Readable, MaybeReadable) tuple due to trait +/// conflicts. +struct ReadableIdMonitorEvent(u64, MonitorEvent); + +impl MaybeReadable for ReadableIdMonitorEvent { + fn read(reader: &mut R) -> Result, DecodeError> { + let id: u64 = Readable::read(reader)?; + let event_opt: Option = MaybeReadable::read(reader)?; + match event_opt { + Some(ev) => Ok(Some(ReadableIdMonitorEvent(id, ev))), + None => Ok(None), + } + } +} + #[cfg(test)] pub(super) fn dummy_monitor( channel_id: ChannelId, wrap_signer: impl FnOnce(crate::sign::InMemorySigner) -> S, diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index fd61d492509..9d61bf5aa10 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -333,6 +333,10 @@ pub trait Watch { /// Returns any monitor events since the last call. Subsequent calls must only return new /// events. /// + /// Each event comes with a corresponding id. Once the event is processed, call + /// [`Watch::ack_monitor_event`] with the corresponding id and channel id. Unacknowledged events + /// will be re-provided by this method after startup. + /// /// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no /// further events may be returned here until the [`ChannelMonitor`] has been fully persisted /// to disk. @@ -341,7 +345,7 @@ pub trait Watch { /// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`]. fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)>; + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)>; /// Acknowledges and removes a [`MonitorEvent`] previously returned by /// [`Watch::release_pending_monitor_events`] by its event ID. @@ -370,7 +374,7 @@ impl + ?Sized, W: Der fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { self.deref().release_pending_monitor_events() } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 0d8a4a020f0..30507928aba 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -5004,7 +5004,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Now test two async `ChannelMonitorUpdate`s in flight at once, completing them in-order but // separately. @@ -5052,7 +5052,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - assert!(matches!(completed_persist[0].2[0], MonitorEvent::Completed { .. })); + assert!(matches!(completed_persist[0].2[0].1, MonitorEvent::Completed { .. })); // Finally, test two async `ChanelMonitorUpdate`s in flight at once, completing them // out-of-order and ensuring that no `MonitorEvent::Completed` is generated until they are both @@ -5098,7 +5098,7 @@ fn native_async_persist() { let completed_persist = async_chain_monitor.release_pending_monitor_events(); assert_eq!(completed_persist.len(), 1); assert_eq!(completed_persist[0].2.len(), 1); - if let MonitorEvent::Completed { monitor_update_id, .. } = &completed_persist[0].2[0] { + if let (_, MonitorEvent::Completed { monitor_update_id, .. }) = &completed_persist[0].2[0] { assert_eq!(*monitor_update_id, 4); } else { panic!(); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d7f13e156be..d8ff799cd7e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -13493,7 +13493,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) { - for monitor_event in monitor_events.drain(..) { + for (_event_id, monitor_event) in monitor_events.drain(..) { match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { let logger = WithContext::from( diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 6a68f42254e..9ad9aec8bf0 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -648,6 +648,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { ) .unwrap() .1; + new_monitor.copy_monitor_event_state(&monitor); assert!(new_monitor == monitor); self.latest_monitor_update_id .lock() @@ -709,6 +710,9 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // it so it doesn't leak into the rest of the test. let failed_back = monitor.inner.lock().unwrap().failed_back_htlc_ids.clone(); new_monitor.inner.lock().unwrap().failed_back_htlc_ids = failed_back; + // The deserialized monitor will reset the monitor event state, so copy it from the live + // monitor before comparing. + new_monitor.copy_monitor_event_state(&monitor); if let Some(chan_id) = self.expect_monitor_round_trip_fail.lock().unwrap().take() { assert_eq!(chan_id, channel_id); assert!(new_monitor != *monitor); @@ -722,7 +726,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + ) -> Vec<(OutPoint, ChannelId, Vec<(u64, MonitorEvent)>, PublicKey)> { // Auto-flush pending operations so that the ChannelManager can pick up monitor // completion events. When not in deferred mode the queue is empty so this only // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) From ac5b59a223132f2a283b552ae191c8c63ae916d4 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 16:42:19 -0400 Subject: [PATCH 08/20] Add MonitorUpdateCompletionAction::AckMonitorEvents Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we add a MonitorUpdateCompletionAction that will allow the ChannelManager to tell a monitor that an event is complete, upon completion of a particular ChannelMonitorUpdate. For example, this will be used when an HTLC is irrevocably failed backwards post-channel-close, to delete the corresponding MonitorEvent::HTLCEvent. --- lightning/src/ln/channelmanager.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d8ff799cd7e..5856d30f5f5 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -42,6 +42,7 @@ use crate::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, WithChannelMonitor, ANTI_REORG_DELAY, CLTV_CLAIM_BUFFER, HTLC_FAIL_BACK_BUFFER, @@ -1492,6 +1493,10 @@ pub(crate) enum MonitorUpdateCompletionAction { blocking_action: RAAMonitorUpdateBlockingAction, downstream_channel_id: ChannelId, }, + /// Indicates that one or more [`MonitorEvent`]s should be acknowledged via + /// [`chain::Watch::ack_monitor_event`] once the associated [`ChannelMonitorUpdate`] has been + /// durably persisted. + AckMonitorEvents { monitor_events_to_ack: Vec }, } impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, @@ -1513,6 +1518,9 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (0, event, upgradable_option), (1, downstream_counterparty_and_funding_outpoint, upgradable_required), }, + (3, AckMonitorEvents) => { + (0, monitor_events_to_ack, required_vec), + }, ); /// Result of attempting to resume a channel after a monitor update completes while locks are held. @@ -10345,6 +10353,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(blocking_action), ); }, + MonitorUpdateCompletionAction::AckMonitorEvents { monitor_events_to_ack } => { + for source in monitor_events_to_ack { + self.chain_monitor.ack_monitor_event(source); + } + }, } } From 12295140e0db9aa0a7372ec467694b680335f493 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 13:45:21 -0400 Subject: [PATCH 09/20] Ack all monitor events besides HTLCUpdate Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we take care of the monitor events that are trivial to ACK, since all except HTLCEvents can be ACK'd immediately after the event is handled and don't need to be re-processed on startup. --- lightning/src/ln/channelmanager.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 5856d30f5f5..3b574f9729d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -13506,7 +13506,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) { - for (_event_id, monitor_event) in monitor_events.drain(..) { + for (event_id, monitor_event) in monitor_events.drain(..) { + let monitor_event_source = MonitorEventSource { event_id, channel_id }; match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { let logger = WithContext::from( @@ -13589,6 +13590,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::CommitmentTxConfirmed(_) => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -13610,6 +13614,9 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ failed_channels.push((Err(e), counterparty_node_id)); } } + // Channel close monitor events do not need to be replayed on startup because we + // already check the monitors to see if the channel is closed. + self.chain_monitor.ack_monitor_event(monitor_event_source); }, MonitorEvent::Completed { channel_id, monitor_update_id, .. } => { self.channel_monitor_updated( @@ -13617,6 +13624,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(monitor_update_id), &counterparty_node_id, ); + self.chain_monitor.ack_monitor_event(monitor_event_source); }, } } From 4af1b4cf1c1e5099d6a322dc9ca3a490f97d79d4 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 14:44:59 -0400 Subject: [PATCH 10/20] Fix replay of monitor events for outbounds Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. If an outbound payment resolved on-chain and we get a monitor event for it, we want to mark that monitor event as resolved once the user has processed a PaymentSent or a PaymentFailed event. --- lightning/src/ln/channelmanager.rs | 33 +++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 3b574f9729d..d206787186c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1545,12 +1545,29 @@ enum PostMonitorUpdateChanResume { }, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Eq)] pub(crate) struct PaymentCompleteUpdate { counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, channel_id: ChannelId, htlc_id: SentHTLCId, + /// The id of the [`MonitorEvent`] that triggered this update. + /// + /// Used when the [`Event`] corresponding to this update's [`EventCompletionAction`] has been + /// processed by the user, to remove the [`MonitorEvent`] from the [`ChannelMonitor`] and prevent + /// it from being re-provided on startup. + monitor_event_id: Option, +} + +impl PartialEq for PaymentCompleteUpdate { + fn eq(&self, other: &Self) -> bool { + self.counterparty_node_id == other.counterparty_node_id + && self.channel_funding_outpoint == other.channel_funding_outpoint + && self.channel_id == other.channel_id + && self.htlc_id == other.htlc_id + // monitor_event_id is excluded: it's metadata about the event source, + // not part of the semantic identity of the payment resolution. + } } impl_writeable_tlv_based!(PaymentCompleteUpdate, { @@ -1558,6 +1575,7 @@ impl_writeable_tlv_based!(PaymentCompleteUpdate, { (3, counterparty_node_id, required), (5, channel_id, required), (7, htlc_id, required), + (9, monitor_event_id, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -9965,6 +9983,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ next_channel_counterparty_node_id: PublicKey, next_channel_outpoint: OutPoint, next_channel_id: ChannelId, next_user_channel_id: Option, attribution_data: Option, send_timestamp: Option, + monitor_event_id: Option, ) { let startup_replay = !self.background_events_processed_since_startup.load(Ordering::Acquire); @@ -9983,6 +10002,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ channel_funding_outpoint: next_channel_outpoint, channel_id: next_channel_id, htlc_id, + monitor_event_id, }; Some(EventCompletionAction::ReleasePaymentCompleteChannelMonitorUpdate(release)) } else { @@ -12611,6 +12631,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Some(next_user_channel_id), msg.attribution_data, send_timestamp, + None, ); Ok(()) @@ -13536,6 +13557,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ None, None, None, + Some(event_id), ); } else { log_trace!(logger, "Failing HTLC from our monitor"); @@ -13548,6 +13570,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ channel_funding_outpoint: funding_outpoint, channel_id, htlc_id: SentHTLCId::from_source(&htlc_update.source), + monitor_event_id: Some(event_id), }); self.fail_htlc_backwards_internal( &htlc_update.source, @@ -15325,8 +15348,13 @@ impl< channel_funding_outpoint, channel_id, htlc_id, + monitor_event_id, }, ) => { + if let Some(event_id) = monitor_event_id { + self.chain_monitor + .ack_monitor_event(MonitorEventSource { channel_id, event_id }); + } let per_peer_state = self.per_peer_state.read().unwrap(); let mut peer_state_lock = per_peer_state .get(&counterparty_node_id) @@ -19730,6 +19758,7 @@ impl< channel_funding_outpoint: monitor.get_funding_txo(), channel_id: monitor.channel_id(), htlc_id, + monitor_event_id: None, }; let mut compl_action = Some( EventCompletionAction::ReleasePaymentCompleteChannelMonitorUpdate(update) @@ -19809,6 +19838,7 @@ impl< channel_funding_outpoint: monitor.get_funding_txo(), channel_id: monitor.channel_id(), htlc_id: SentHTLCId::from_source(&htlc_source), + monitor_event_id: None, }); failed_htlcs.push(( @@ -20566,6 +20596,7 @@ impl< downstream_user_channel_id, None, None, + None, ); } From 26522cc18bbc0cb95e64e9f6725e579b4e40be2b Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 15:05:54 -0400 Subject: [PATCH 11/20] Fix replay of preimage monitor events for fwds Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. If the outbound edge of a forwarded payment is resolved on-chain via preimage, and we get a monitor event for it, we want to mark that monitor event resolved once the preimage is durably persisted in the inbound edge. Hence, we add the monitor event resolution to the completion of the inbound edge's preimage monitor update. --- lightning/src/ln/channelmanager.rs | 41 ++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d206787186c..90321e4d923 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1475,6 +1475,9 @@ pub(crate) enum MonitorUpdateCompletionAction { EmitEventOptionAndFreeOtherChannel { event: Option, downstream_counterparty_and_funding_outpoint: EventUnblockedChannel, + /// If this action originated from a [`MonitorEvent`], the source to acknowledge once the + /// action is handled. + monitor_event_source: Option, }, /// Indicates we should immediately resume the operation of another channel, unless there is /// some other reason why the channel is blocked. In practice this simply means immediately @@ -1492,6 +1495,9 @@ pub(crate) enum MonitorUpdateCompletionAction { downstream_counterparty_node_id: PublicKey, blocking_action: RAAMonitorUpdateBlockingAction, downstream_channel_id: ChannelId, + /// If this action originated from a [`MonitorEvent`], the source to acknowledge once the + /// action is handled. + monitor_event_source: Option, }, /// Indicates that one or more [`MonitorEvent`]s should be acknowledged via /// [`chain::Watch::ack_monitor_event`] once the associated [`ChannelMonitorUpdate`] has been @@ -1510,6 +1516,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (0, downstream_counterparty_node_id, required), (4, blocking_action, upgradable_required), (5, downstream_channel_id, required), + (7, monitor_event_source, option), }, (2, EmitEventOptionAndFreeOtherChannel) => { // LDK prior to 0.3 required this field. It will not be present for trampoline payments @@ -1517,6 +1524,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, // are in the process of being resolved. (0, event, upgradable_option), (1, downstream_counterparty_and_funding_outpoint, upgradable_required), + (3, monitor_event_source, option), }, (3, AckMonitorEvents) => { (0, monitor_events_to_ack, required_vec), @@ -9535,6 +9543,7 @@ impl< startup_replay: bool, next_channel_counterparty_node_id: PublicKey, next_channel_outpoint: OutPoint, next_channel_id: ChannelId, hop_data: HTLCPreviousHopData, attribution_data: Option, send_timestamp: Option, + monitor_event_source: Option, ) { let _prev_channel_id = hop_data.channel_id; let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); @@ -9624,6 +9633,7 @@ impl< downstream_counterparty_node_id: chan_to_release.counterparty_node_id, downstream_channel_id: chan_to_release.channel_id, blocking_action: chan_to_release.blocking_action, + monitor_event_source, }), None, ) @@ -9639,6 +9649,7 @@ impl< Some(MonitorUpdateCompletionAction::EmitEventOptionAndFreeOtherChannel { event, downstream_counterparty_and_funding_outpoint: chan_to_release, + monitor_event_source, }), None, ) @@ -9823,8 +9834,12 @@ impl< downstream_counterparty_node_id: node_id, blocking_action: blocker, downstream_channel_id: channel_id, + monitor_event_source, } = action { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } if let Some(peer_state_mtx) = per_peer_state.get(&node_id) { let mut peer_state = peer_state_mtx.lock().unwrap(); if let Some(blockers) = peer_state @@ -10083,6 +10098,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ hop_data, attribution_data, send_timestamp, + monitor_event_id + .map(|id| MonitorEventSource { event_id: id, channel_id: next_channel_id }), ); }, HTLCSource::TrampolineForward { previous_hop_data, .. } => { @@ -10126,6 +10143,22 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ current_previous_hop_data, attribution_data.clone(), send_timestamp, + // Only pass the monitor event ID for the first hop. Once one inbound channel durably + // has the preimage, the outbound monitor event can be acked. The preimage remains in + // the outbound monitor's storage and will be replayed to all remaining inbound hops on + // restart via pending_claims_to_replay. + // + // Note that we plan to move away from this restart replay in the future, and instead + // on restart push temporary monitor events, reusing existing MonitorEvent handling + // logic to do the multi-claim. + if i == 0 { + monitor_event_id.map(|id| MonitorEventSource { + event_id: id, + channel_id: next_channel_id, + }) + } else { + None + }, ); } }, @@ -10352,7 +10385,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ MonitorUpdateCompletionAction::EmitEventOptionAndFreeOtherChannel { event, downstream_counterparty_and_funding_outpoint, + monitor_event_source, } => { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } if let Some(event) = event { self.pending_events.lock().unwrap().push_back((event, None)); } @@ -10366,7 +10403,11 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ downstream_counterparty_node_id, downstream_channel_id, blocking_action, + monitor_event_source, } => { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } self.handle_monitor_update_release( downstream_counterparty_node_id, downstream_channel_id, From 001fb53e7f41be7c665a4c19fab0e1e6ad2c1f58 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 16:03:46 -0400 Subject: [PATCH 12/20] Track monitor event id in HTLCForwardInfo::Fail* Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To ensure we can resolve HTLC monitor events for forward failures, we need to pipe the event id through the HTLC failure pipeline, starting from when we first handle the monitor event and call fail_htlc_backwards. To get it from there to the next stage, here we store the monitor event id in the manager's HTLCForwardInfo::Fail*. In upcoming commits we will eventually get to the point of acking the monitor event when the HTLC is irrevocably removed from the inbound edge via monitor update. --- lightning/src/ln/channelmanager.rs | 61 +++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 90321e4d923..afb7c9d43c8 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -497,8 +497,21 @@ impl PendingAddHTLCInfo { #[cfg_attr(test, derive(Clone, Debug, PartialEq))] pub(super) enum HTLCForwardInfo { AddHTLC(PendingAddHTLCInfo), - FailHTLC { htlc_id: u64, err_packet: msgs::OnionErrorPacket }, - FailMalformedHTLC { htlc_id: u64, failure_code: u16, sha256_of_onion: [u8; 32] }, + FailHTLC { + htlc_id: u64, + err_packet: msgs::OnionErrorPacket, + /// A pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this failed + /// forward, once the failure is durably persisted on the inbound edge. + monitor_event_source: Option, + }, + FailMalformedHTLC { + htlc_id: u64, + failure_code: u16, + sha256_of_onion: [u8; 32], + /// A pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this failed + /// forward, once the failure is durably persisted on the inbound edge. + monitor_event_source: Option, + }, } /// Whether this blinded HTLC is being failed backwards by the introduction node or a blinded node, @@ -7626,12 +7639,14 @@ impl< HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC { htlc_id: fail_htlc.htlc_id, err_packet: fail_htlc.into(), + monitor_event_source: None, }, HTLCFailureMsg::Malformed(fail_malformed_htlc) => { HTLCForwardInfo::FailMalformedHTLC { htlc_id: fail_malformed_htlc.htlc_id, sha256_of_onion: fail_malformed_htlc.sha256_of_onion, failure_code: fail_malformed_htlc.failure_code.into(), + monitor_event_source: None, } }, }; @@ -8161,7 +8176,7 @@ impl< } None }, - HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet } => { + HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet, .. } => { if let Some(chan) = peer_state .channel_by_id .get_mut(&forward_chan_id) @@ -8181,7 +8196,12 @@ impl< break; } }, - HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => { + HTLCForwardInfo::FailMalformedHTLC { + htlc_id, + failure_code, + sha256_of_onion, + .. + } => { if let Some(chan) = peer_state .channel_by_id .get_mut(&forward_chan_id) @@ -9219,6 +9239,10 @@ impl< onion_error ); + let monitor_event_source = from_monitor_update_completion.as_ref().and_then(|u| { + u.monitor_event_id + .map(|id| MonitorEventSource { event_id: id, channel_id: u.channel_id }) + }); push_forward_htlcs_failure( *prev_outbound_scid_alias, get_htlc_forward_failure( @@ -9228,6 +9252,7 @@ impl< trampoline_shared_secret, phantom_shared_secret, *htlc_id, + monitor_event_source, ), ); @@ -9263,7 +9288,12 @@ impl< // necessarily want to fail all of our incoming HTLCs back yet. We may have other // outgoing HTLCs that need to resolve first. This will be tracked in our // pending_outbound_payments in a followup. - for current_hop_data in previous_hop_data { + let trampoline_monitor_event_source = + from_monitor_update_completion.as_ref().and_then(|u| { + u.monitor_event_id + .map(|id| MonitorEventSource { event_id: id, channel_id: u.channel_id }) + }); + for (i, current_hop_data) in previous_hop_data.iter().enumerate() { let HTLCPreviousHopData { prev_outbound_scid_alias, htlc_id, @@ -9290,6 +9320,7 @@ impl< &incoming_trampoline_shared_secret, &None, *htlc_id, + if i == 0 { trampoline_monitor_event_source } else { None }, ), ); } @@ -14273,6 +14304,7 @@ fn get_htlc_forward_failure( blinded_failure: &Option, onion_error: &HTLCFailReason, incoming_packet_shared_secret: &[u8; 32], trampoline_shared_secret: &Option<[u8; 32]>, phantom_shared_secret: &Option<[u8; 32]>, htlc_id: u64, + monitor_event_source: Option, ) -> HTLCForwardInfo { // TODO: Correctly wrap the error packet twice if failing back a trampoline + phantom HTLC. let secondary_shared_secret = trampoline_shared_secret.or(*phantom_shared_secret); @@ -14284,19 +14316,20 @@ fn get_htlc_forward_failure( incoming_packet_shared_secret, &secondary_shared_secret, ); - HTLCForwardInfo::FailHTLC { htlc_id, err_packet } + HTLCForwardInfo::FailHTLC { htlc_id, err_packet, monitor_event_source } }, Some(BlindedFailure::FromBlindedNode) => HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code: LocalHTLCFailureReason::InvalidOnionBlinding.failure_code(), sha256_of_onion: [0; 32], + monitor_event_source, }, None => { let err_packet = onion_error.get_encrypted_failure_packet( incoming_packet_shared_secret, &secondary_shared_secret, ); - HTLCForwardInfo::FailHTLC { htlc_id, err_packet } + HTLCForwardInfo::FailHTLC { htlc_id, err_packet, monitor_event_source } }, } } @@ -17966,15 +17999,21 @@ impl Writeable for HTLCForwardInfo { 0u8.write(w)?; info.write(w)?; }, - Self::FailHTLC { htlc_id, err_packet } => { + Self::FailHTLC { htlc_id, err_packet, monitor_event_source } => { FAIL_HTLC_VARIANT_ID.write(w)?; write_tlv_fields!(w, { (0, htlc_id, required), (2, err_packet.data, required), (5, err_packet.attribution_data, option), + (7, monitor_event_source, option), }); }, - Self::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => { + Self::FailMalformedHTLC { + htlc_id, + failure_code, + sha256_of_onion, + monitor_event_source, + } => { // Since this variant was added in 0.0.119, write this as `::FailHTLC` with an empty error // packet so older versions have something to fail back with, but serialize the real data as // optional TLVs for the benefit of newer versions. @@ -17984,6 +18023,7 @@ impl Writeable for HTLCForwardInfo { (1, failure_code, required), (2, Vec::::new(), required), (3, sha256_of_onion, required), + (7, monitor_event_source, option), }); }, } @@ -18004,6 +18044,7 @@ impl Readable for HTLCForwardInfo { (2, err_packet, required), (3, sha256_of_onion, option), (5, attribution_data, option), + (7, monitor_event_source, option), }); if let Some(failure_code) = malformed_htlc_failure_code { if attribution_data.is_some() { @@ -18013,6 +18054,7 @@ impl Readable for HTLCForwardInfo { htlc_id: _init_tlv_based_struct_field!(htlc_id, required), failure_code, sha256_of_onion: sha256_of_onion.ok_or(DecodeError::InvalidValue)?, + monitor_event_source, } } else { Self::FailHTLC { @@ -18021,6 +18063,7 @@ impl Readable for HTLCForwardInfo { data: _init_tlv_based_struct_field!(err_packet, required), attribution_data: _init_tlv_based_struct_field!(attribution_data, option), }, + monitor_event_source, } } }, From d2a9e09a65e70e116d50bf83dd3ba712793d2a70 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 16:29:47 -0400 Subject: [PATCH 13/20] Add monitor_event_source to holding cell HTLC fails Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To ensure we can resolve HTLC monitor events for forward failures, we need to pipe the event id through the HTLC failure pipeline. We started this process in the previous commit, and here we continue by storing the monitor event id in the Channel's holding cell HTLC failures. In upcoming commits we will eventually get to the point of acking the monitor event when the HTLC is irrevocably removed from the inbound edge via monitor update. --- lightning/src/ln/channel.rs | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 8b05d984e30..ad398d3be9c 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -30,6 +30,7 @@ use crate::blinded_path::message::BlindedMessagePath; use crate::chain::chaininterface::{ ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; +use crate::chain::chainmonitor::MonitorEventSource; use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, CommitmentHTLCData, LATENCY_GRACE_PERIOD_BLOCKS, @@ -551,11 +552,21 @@ enum HTLCUpdateAwaitingACK { FailHTLC { htlc_id: u64, err_packet: msgs::OnionErrorPacket, + /// Contains a pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this + /// failed forward, once the failure is durably persisted in this channel (the inbound edge). + /// + /// [`MonitorEvent`]: crate::chain::channelmonitor::MonitorEvent + monitor_event_source: Option, }, FailMalformedHTLC { htlc_id: u64, failure_code: u16, sha256_of_onion: [u8; 32], + /// Contains a pointer to a [`MonitorEvent`] that can be removed from the outbound edge of this + /// failed forward, once the failure is durably persisted in this channel (the inbound edge). + /// + /// [`MonitorEvent`]: crate::chain::channelmonitor::MonitorEvent + monitor_event_source: Option, }, } @@ -6566,7 +6577,7 @@ impl FailHTLCContents for msgs::OnionErrorPacket { InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailRelay(self)) } fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK { - HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self } + HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self, monitor_event_source: None } } } impl FailHTLCContents for ([u8; 32], u16) { @@ -6590,6 +6601,7 @@ impl FailHTLCContents for ([u8; 32], u16) { htlc_id, sha256_of_onion: self.0, failure_code: self.1, + monitor_event_source: None, } } } @@ -8437,7 +8449,7 @@ where monitor_update.updates.append(&mut additional_monitor_update.updates); None }, - &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => Some( + &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet, .. } => Some( self.fail_htlc(htlc_id, err_packet.clone(), false, logger) .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ), @@ -8445,6 +8457,7 @@ where htlc_id, failure_code, sha256_of_onion, + .. } => Some( self.fail_htlc(htlc_id, (sha256_of_onion, failure_code), false, logger) .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), @@ -15091,7 +15104,7 @@ impl Writeable for FundedChannel { // Store the attribution data for later writing. holding_cell_attribution_data.push(attribution_data.as_ref()); }, - &HTLCUpdateAwaitingACK::FailHTLC { ref htlc_id, ref err_packet } => { + &HTLCUpdateAwaitingACK::FailHTLC { ref htlc_id, ref err_packet, .. } => { 2u8.write(writer)?; htlc_id.write(writer)?; err_packet.data.write(writer)?; @@ -15103,6 +15116,7 @@ impl Writeable for FundedChannel { htlc_id, failure_code, sha256_of_onion, + .. } => { // We don't want to break downgrading by adding a new variant, so write a dummy // `::FailHTLC` variant and write the real malformed error as an optional TLV. @@ -15542,6 +15556,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> data: Readable::read(reader)?, attribution_data: None, }, + monitor_event_source: None, }, _ => return Err(DecodeError::InvalidValue), }); @@ -15996,7 +16011,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> let htlc_idx = holding_cell_htlc_updates .iter() .position(|htlc| { - if let HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet } = htlc { + if let HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet, .. } = htlc { let matches = *htlc_id == malformed_htlc_id; if matches { debug_assert!(err_packet.data.is_empty()) @@ -16011,6 +16026,7 @@ impl<'a, 'b, 'c, ES: EntropySource, SP: SignerProvider> htlc_id: malformed_htlc_id, failure_code, sha256_of_onion, + monitor_event_source: None, }; let _ = core::mem::replace(&mut holding_cell_htlc_updates[htlc_idx], malformed_htlc); @@ -17036,12 +17052,14 @@ mod tests { |htlc_id, attribution_data| HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: msgs::OnionErrorPacket { data: vec![42], attribution_data }, + monitor_event_source: None, }; let dummy_holding_cell_malformed_htlc = |htlc_id| HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, failure_code: LocalHTLCFailureReason::InvalidOnionBlinding.failure_code(), sha256_of_onion: [0; 32], + monitor_event_source: None, }; let mut holding_cell_htlc_updates = Vec::with_capacity(12); for i in 0..16 { From b8bb43b2830da5ca74f47e836358d53b871c6bae Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 16:37:55 -0400 Subject: [PATCH 14/20] Pipe monitor event source HTLCForwardInfo -> holding cell htlcs Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. To ensure we can resolve HTLC monitor events for forward failures, we need to pipe the event id through the HTLC failure pipeline. Here we pipe the monitor event source from the manager to the channel, so it can be put in the channel's holding cell. In upcoming commits we will eventually get to the point of acking the monitor event when the HTLC is irrevocably removed from the inbound edge via monitor update. --- lightning/src/ln/channel.rs | 77 +++++++++++++++++++++--------- lightning/src/ln/channelmanager.rs | 15 ++++-- 2 files changed, 67 insertions(+), 25 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index ad398d3be9c..dd7fad72e9b 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -6561,7 +6561,9 @@ trait FailHTLCContents { type Message: FailHTLCMessageName; fn to_message(self, htlc_id: u64, channel_id: ChannelId) -> Self::Message; fn to_inbound_htlc_state(self) -> InboundHTLCState; - fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK; + fn to_htlc_update_awaiting_ack( + self, htlc_id: u64, monitor_event_source: Option, + ) -> HTLCUpdateAwaitingACK; } impl FailHTLCContents for msgs::OnionErrorPacket { type Message = msgs::UpdateFailHTLC; @@ -6576,8 +6578,10 @@ impl FailHTLCContents for msgs::OnionErrorPacket { fn to_inbound_htlc_state(self) -> InboundHTLCState { InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::FailRelay(self)) } - fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK { - HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self, monitor_event_source: None } + fn to_htlc_update_awaiting_ack( + self, htlc_id: u64, monitor_event_source: Option, + ) -> HTLCUpdateAwaitingACK { + HTLCUpdateAwaitingACK::FailHTLC { htlc_id, err_packet: self, monitor_event_source } } } impl FailHTLCContents for ([u8; 32], u16) { @@ -6596,12 +6600,14 @@ impl FailHTLCContents for ([u8; 32], u16) { failure_code: self.1, }) } - fn to_htlc_update_awaiting_ack(self, htlc_id: u64) -> HTLCUpdateAwaitingACK { + fn to_htlc_update_awaiting_ack( + self, htlc_id: u64, monitor_event_source: Option, + ) -> HTLCUpdateAwaitingACK { HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, sha256_of_onion: self.0, failure_code: self.1, - monitor_event_source: None, + monitor_event_source, } } } @@ -7343,9 +7349,10 @@ where /// Returns `Err` (always with [`ChannelError::Ignore`]) if the HTLC could not be failed (e.g. /// if it was already resolved). Otherwise returns `Ok`. pub fn queue_fail_htlc( - &mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L, + &mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, + monitor_event_source: Option, logger: &L, ) -> Result<(), ChannelError> { - self.fail_htlc(htlc_id_arg, err_packet, true, logger) + self.fail_htlc(htlc_id_arg, err_packet, true, monitor_event_source, logger) .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?")) } @@ -7354,10 +7361,17 @@ where /// /// See [`Self::queue_fail_htlc`] for more info. pub fn queue_fail_malformed_htlc( - &mut self, htlc_id_arg: u64, failure_code: u16, sha256_of_onion: [u8; 32], logger: &L, + &mut self, htlc_id_arg: u64, failure_code: u16, sha256_of_onion: [u8; 32], + monitor_event_source: Option, logger: &L, ) -> Result<(), ChannelError> { - self.fail_htlc(htlc_id_arg, (sha256_of_onion, failure_code), true, logger) - .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?")) + self.fail_htlc( + htlc_id_arg, + (sha256_of_onion, failure_code), + true, + monitor_event_source, + logger, + ) + .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?")) } /// Returns `Err` (always with [`ChannelError::Ignore`]) if the HTLC could not be failed (e.g. @@ -7365,7 +7379,7 @@ where #[rustfmt::skip] fn fail_htlc( &mut self, htlc_id_arg: u64, err_contents: E, mut force_holding_cell: bool, - logger: &L + monitor_event_source: Option, logger: &L ) -> Result, ChannelError> { if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { panic!("Was asked to fail an HTLC when channel was not in an operational state"); @@ -7402,17 +7416,20 @@ where // Now update local state: if force_holding_cell { - for pending_update in self.context.holding_cell_htlc_updates.iter() { + for pending_update in self.context.holding_cell_htlc_updates.iter_mut() { match pending_update { - &HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => { + &mut HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => { if htlc_id_arg == htlc_id { return Err(ChannelError::Ignore(format!("HTLC {} was already claimed!", htlc_id))); } }, - &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, .. } | - &HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, .. } => + &mut HTLCUpdateAwaitingACK::FailHTLC { htlc_id, monitor_event_source: ref mut src, .. } | + &mut HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, monitor_event_source: ref mut src, .. } => { if htlc_id_arg == htlc_id { + if src.is_none() { + *src = monitor_event_source; + } return Err(ChannelError::Ignore(format!("HTLC {} was already pending failure", htlc_id))); } }, @@ -7420,7 +7437,7 @@ where } } log_trace!(logger, "Placing failure for HTLC ID {} in holding cell.", htlc_id_arg); - self.context.holding_cell_htlc_updates.push(err_contents.to_htlc_update_awaiting_ack(htlc_id_arg)); + self.context.holding_cell_htlc_updates.push(err_contents.to_htlc_update_awaiting_ack(htlc_id_arg, monitor_event_source)); return Ok(None); } @@ -8449,18 +8466,34 @@ where monitor_update.updates.append(&mut additional_monitor_update.updates); None }, - &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet, .. } => Some( - self.fail_htlc(htlc_id, err_packet.clone(), false, logger) - .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), + &HTLCUpdateAwaitingACK::FailHTLC { + htlc_id, + ref err_packet, + monitor_event_source, + } => Some( + self.fail_htlc( + htlc_id, + err_packet.clone(), + false, + monitor_event_source, + logger, + ) + .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ), &HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion, - .. + monitor_event_source, } => Some( - self.fail_htlc(htlc_id, (sha256_of_onion, failure_code), false, logger) - .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), + self.fail_htlc( + htlc_id, + (sha256_of_onion, failure_code), + false, + monitor_event_source, + logger, + ) + .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ), }; if let Some(res) = fail_htlc_res { diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index afb7c9d43c8..7aaeeef59e6 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -8176,7 +8176,7 @@ impl< } None }, - HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet, .. } => { + HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet, monitor_event_source } => { if let Some(chan) = peer_state .channel_by_id .get_mut(&forward_chan_id) @@ -8184,7 +8184,15 @@ impl< { let logger = WithChannelContext::from(&self.logger, &chan.context, None); log_trace!(logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); - Some((chan.queue_fail_htlc(htlc_id, err_packet.clone(), &&logger), htlc_id)) + Some(( + chan.queue_fail_htlc( + htlc_id, + err_packet.clone(), + monitor_event_source, + &&logger, + ), + htlc_id, + )) } else { self.forwarding_channel_not_found( core::iter::once(forward_info).chain(draining_pending_forwards), @@ -8200,7 +8208,7 @@ impl< htlc_id, failure_code, sha256_of_onion, - .. + monitor_event_source, } => { if let Some(chan) = peer_state .channel_by_id @@ -8213,6 +8221,7 @@ impl< htlc_id, failure_code, sha256_of_onion, + monitor_event_source, &&logger, ); Some((res, htlc_id)) From 9a9716d52d7064116a8b98df2dda946576303e5b Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 16 Mar 2026 17:40:44 -0400 Subject: [PATCH 15/20] Ack monitor events on check_free_peer_holding_cells Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we build on recent commits by ACK'ing monitor events for forward failures once the monitor update that marks them as failed on the inbound edge is complete. --- lightning/src/ln/channel.rs | 62 +++++++++++++++++++----------- lightning/src/ln/channelmanager.rs | 11 +++++- 2 files changed, 50 insertions(+), 23 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index dd7fad72e9b..fc283373a0c 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -8331,7 +8331,7 @@ where /// returns `(None, Vec::new())`. pub fn maybe_free_holding_cell_htlcs( &mut self, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, - ) -> (Option, Vec<(HTLCSource, PaymentHash)>) { + ) -> (Option<(ChannelMonitorUpdate, Vec)>, Vec<(HTLCSource, PaymentHash)>) { if matches!(self.context.channel_state, ChannelState::ChannelReady(_)) && self.context.channel_state.can_generate_new_commitment() { @@ -8345,7 +8345,7 @@ where /// for our counterparty. fn free_holding_cell_htlcs( &mut self, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, - ) -> (Option, Vec<(HTLCSource, PaymentHash)>) { + ) -> (Option<(ChannelMonitorUpdate, Vec)>, Vec<(HTLCSource, PaymentHash)>) { assert!(matches!(self.context.channel_state, ChannelState::ChannelReady(_))); assert!(!self.context.channel_state.is_monitor_update_in_progress()); assert!(!self.context.channel_state.is_quiescent()); @@ -8375,6 +8375,7 @@ where let mut update_fulfill_count = 0; let mut update_fail_count = 0; let mut htlcs_to_fail = Vec::new(); + let mut monitor_events_to_ack = Vec::new(); for htlc_update in htlc_updates.drain(..) { // Note that this *can* fail, though it should be due to rather-rare conditions on // fee races with adding too many outputs which push our total payments just over @@ -8470,31 +8471,41 @@ where htlc_id, ref err_packet, monitor_event_source, - } => Some( - self.fail_htlc( - htlc_id, - err_packet.clone(), - false, - monitor_event_source, - logger, + } => { + if let Some(source) = monitor_event_source { + monitor_events_to_ack.push(source); + } + Some( + self.fail_htlc( + htlc_id, + err_packet.clone(), + false, + monitor_event_source, + logger, + ) + .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ) - .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), - ), + }, &HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion, monitor_event_source, - } => Some( - self.fail_htlc( - htlc_id, - (sha256_of_onion, failure_code), - false, - monitor_event_source, - logger, + } => { + if let Some(source) = monitor_event_source { + monitor_events_to_ack.push(source); + } + Some( + self.fail_htlc( + htlc_id, + (sha256_of_onion, failure_code), + false, + monitor_event_source, + logger, + ) + .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), ) - .map(|fail_msg_opt| fail_msg_opt.map(|_| ())), - ), + }, }; if let Some(res) = fail_htlc_res { match res { @@ -8546,7 +8557,11 @@ where Vec::new(), logger, ); - (self.push_ret_blockable_mon_update(monitor_update), htlcs_to_fail) + ( + self.push_ret_blockable_mon_update(monitor_update) + .map(|upd| (upd, monitor_events_to_ack)), + htlcs_to_fail, + ) } else { (None, Vec::new()) } @@ -8899,7 +8914,10 @@ where self.context.monitor_pending_update_adds.append(&mut pending_update_adds); match self.maybe_free_holding_cell_htlcs(fee_estimator, logger) { - (Some(mut additional_update), htlcs_to_fail) => { + // TODO: Thread monitor_events_to_ack through the revoke_and_ack return + // value so the ChannelManager can attach an AckMonitorEvents completion + // action to this monitor update. + (Some((mut additional_update, _monitor_events_to_ack)), htlcs_to_fail) => { // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.context.latest_monitor_update_id = monitor_update.update_id; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 7aaeeef59e6..13774239f28 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -13782,7 +13782,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ ); if monitor_opt.is_some() || !holding_cell_failed_htlcs.is_empty() { let update_res = monitor_opt - .map(|monitor_update| { + .map(|(monitor_update, monitor_events_to_ack)| { + if !monitor_events_to_ack.is_empty() { + peer_state + .monitor_update_blocked_actions + .entry(*chan_id) + .or_default() + .push(MonitorUpdateCompletionAction::AckMonitorEvents { + monitor_events_to_ack, + }); + } self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, &mut peer_state.monitor_update_blocked_actions, From 0f1a4a0c7b91c51e88944c09f72a41133aef2b64 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 10:17:48 -0400 Subject: [PATCH 16/20] Ack monitor events on revoke_and_ack Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we build on recent commits by ACK'ing monitor events for forward failures once the monitor update that marks them as failed on the inbound edge is complete, when the HTLC was irrevocably failed via revoke_and_ack. --- lightning/src/ln/channel.rs | 16 ++++++++++------ lightning/src/ln/channelmanager.rs | 11 ++++++++++- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index fc283373a0c..db60411e6f0 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -8587,7 +8587,7 @@ where ( Vec<(HTLCSource, PaymentHash)>, Vec<(StaticInvoice, BlindedMessagePath)>, - Option, + Option<(ChannelMonitorUpdate, Vec)>, ), ChannelError, > { @@ -8898,6 +8898,7 @@ where } else { "Blocked" }; + let mut monitor_events_to_ack = Vec::new(); macro_rules! return_with_htlcs_to_fail { ($htlcs_to_fail: expr) => { if !release_monitor { @@ -8906,7 +8907,12 @@ where .push(PendingChannelMonitorUpdate { update: monitor_update }); return Ok(($htlcs_to_fail, static_invoices, None)); } else { - return Ok(($htlcs_to_fail, static_invoices, Some(monitor_update))); + let events_to_ack = core::mem::take(&mut monitor_events_to_ack); + return Ok(( + $htlcs_to_fail, + static_invoices, + Some((monitor_update, events_to_ack)), + )); } }; } @@ -8914,10 +8920,8 @@ where self.context.monitor_pending_update_adds.append(&mut pending_update_adds); match self.maybe_free_holding_cell_htlcs(fee_estimator, logger) { - // TODO: Thread monitor_events_to_ack through the revoke_and_ack return - // value so the ChannelManager can attach an AckMonitorEvents completion - // action to this monitor update. - (Some((mut additional_update, _monitor_events_to_ack)), htlcs_to_fail) => { + (Some((mut additional_update, holding_cell_monitor_events_to_ack)), htlcs_to_fail) => { + monitor_events_to_ack = holding_cell_monitor_events_to_ack; // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.context.latest_monitor_update_id = monitor_update.update_id; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 13774239f28..4ba22324a44 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -12981,7 +12981,16 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ *counterparty_node_id); let (htlcs_to_fail, static_invoices, monitor_update_opt) = try_channel_entry!(self, peer_state, chan.revoke_and_ack(&msg, &self.fee_estimator, &&logger, mon_update_blocked), chan_entry); - if let Some(monitor_update) = monitor_update_opt { + if let Some((monitor_update, monitor_events_to_ack)) = monitor_update_opt { + if !monitor_events_to_ack.is_empty() { + peer_state + .monitor_update_blocked_actions + .entry(msg.channel_id) + .or_default() + .push(MonitorUpdateCompletionAction::AckMonitorEvents { + monitor_events_to_ack, + }); + } let funding_txo = funding_txo_opt .expect("Funding outpoint must have been set for RAA handling to succeed"); if let Some(data) = self.handle_new_monitor_update( From 1896811197fede18a531c1d013b44cffb30dbcf3 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 10:21:44 -0400 Subject: [PATCH 17/20] Ack monitor event when inbound edge is closed Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we build on recent commits by ACK'ing monitor events for forward failures if try to fail backwards and discover the inbound edge is closed. If that's the case, there's nothing for us to do as the HTLC will be resolved via timeout by the inbound edge counterparty. --- lightning/src/ln/channelmanager.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 4ba22324a44..f3f8ea73c42 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -7971,11 +7971,15 @@ impl< continue; } }, - HTLCForwardInfo::FailHTLC { .. } | HTLCForwardInfo::FailMalformedHTLC { .. } => { + HTLCForwardInfo::FailHTLC { monitor_event_source, .. } + | HTLCForwardInfo::FailMalformedHTLC { monitor_event_source, .. } => { // Channel went away before we could fail it. This implies // the channel is now on chain and our counterparty is // trying to broadcast the HTLC-Timeout, but that's their // problem, not ours. + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } }, } } From 147388ec2e316560e9145bebb05a519bf2aad17e Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 10:30:12 -0400 Subject: [PATCH 18/20] Ack monitor events when failing-to-fail-backwards Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we build on recent commits by ACK'ing monitor events for forward failures if we go to fail on the inbound edge and get an error when queueing the backwards failure. If we get an error in this case it means the failure is duplicate or the channel is closed, and in either case it's fine to mark the event as resolved. --- lightning/src/ln/channelmanager.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index f3f8ea73c42..dfba7ebf581 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -8196,6 +8196,7 @@ impl< &&logger, ), htlc_id, + monitor_event_source, )) } else { self.forwarding_channel_not_found( @@ -8228,7 +8229,7 @@ impl< monitor_event_source, &&logger, ); - Some((res, htlc_id)) + Some((res, htlc_id, monitor_event_source)) } else { self.forwarding_channel_not_found( core::iter::once(forward_info).chain(draining_pending_forwards), @@ -8241,8 +8242,12 @@ impl< } }, }; - if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res { + if let Some((queue_fail_htlc_res, htlc_id, monitor_event_source)) = queue_fail_htlc_res + { if let Err(e) = queue_fail_htlc_res { + if let Some(source) = monitor_event_source { + self.chain_monitor.ack_monitor_event(source); + } if let ChannelError::Ignore(msg) = e { if let Some(chan) = peer_state .channel_by_id From a34f3792e2dc1b6f549f455dccab1b97023ae66e Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Mon, 23 Mar 2026 15:39:33 -0400 Subject: [PATCH 19/20] Ack monitor events from blocked updates in Channel Previously, we would drop the list of monitor events that could be ack'd after a ChannelMonitorUpdate completed, if that monitor update was generated within the Channel and ended up in the Channel's blocked-updates queue. --- lightning/src/ln/channel.rs | 53 ++++++++++++++++++++++-------- lightning/src/ln/channelmanager.rs | 23 ++++++++++--- 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index db60411e6f0..7541a3c3394 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -1485,10 +1485,13 @@ pub(crate) const CHANNEL_ANNOUNCEMENT_PROPAGATION_DELAY: u32 = 144; #[derive(Debug)] struct PendingChannelMonitorUpdate { update: ChannelMonitorUpdate, + /// `MonitorEvent`s that can be ack'd after `update` is durably persisted. + post_update_ackable_events: Vec, } impl_writeable_tlv_based!(PendingChannelMonitorUpdate, { (0, update, required), + (1, post_update_ackable_events, optional_vec), }); /// A payment channel with a counterparty throughout its life-cycle, encapsulating negotiation and @@ -7325,9 +7328,10 @@ where if !update_blocked { debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set"); let update = self.build_commitment_no_status_check(logger); - self.context - .blocked_monitor_updates - .push(PendingChannelMonitorUpdate { update }); + self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate { + update, + post_update_ackable_events: Vec::new(), + }); } } @@ -8558,8 +8562,10 @@ where logger, ); ( - self.push_ret_blockable_mon_update(monitor_update) - .map(|upd| (upd, monitor_events_to_ack)), + self.push_ret_blockable_mon_update_with_event_sources( + monitor_update, + monitor_events_to_ack, + ), htlcs_to_fail, ) } else { @@ -8901,13 +8907,14 @@ where let mut monitor_events_to_ack = Vec::new(); macro_rules! return_with_htlcs_to_fail { ($htlcs_to_fail: expr) => { + let events_to_ack = core::mem::take(&mut monitor_events_to_ack); if !release_monitor { - self.context - .blocked_monitor_updates - .push(PendingChannelMonitorUpdate { update: monitor_update }); + self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate { + update: monitor_update, + post_update_ackable_events: events_to_ack, + }); return Ok(($htlcs_to_fail, static_invoices, None)); } else { - let events_to_ack = core::mem::take(&mut monitor_events_to_ack); return Ok(( $htlcs_to_fail, static_invoices, @@ -11001,12 +11008,16 @@ where /// Returns the next blocked monitor update, if one exists, and a bool which indicates a /// further blocked monitor update exists after the next. - pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> { + pub fn unblock_next_blocked_monitor_update( + &mut self, + ) -> Option<(ChannelMonitorUpdate, Vec, bool)> { if self.context.blocked_monitor_updates.is_empty() { return None; } + let pending = self.context.blocked_monitor_updates.remove(0); Some(( - self.context.blocked_monitor_updates.remove(0).update, + pending.update, + pending.post_update_ackable_events, !self.context.blocked_monitor_updates.is_empty(), )) } @@ -11016,14 +11027,24 @@ where #[rustfmt::skip] fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> Option { + self.push_ret_blockable_mon_update_with_event_sources(update, Vec::new()) + .map(|(upd, _)| upd) + } + + /// Similar to `push_ret_blockable_mon_update`, but allows including a list of + /// `MonitorEventSource`s that can be ack'd after the `update` is durably persisted. + fn push_ret_blockable_mon_update_with_event_sources( + &mut self, update: ChannelMonitorUpdate, monitor_event_sources: Vec, + ) -> Option<(ChannelMonitorUpdate, Vec)> { let release_monitor = self.context.blocked_monitor_updates.is_empty(); if !release_monitor { self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate { update, + post_update_ackable_events: monitor_event_sources, }); None } else { - Some(update) + Some((update, monitor_event_sources)) } } @@ -11032,19 +11053,23 @@ where /// here after logging them. pub fn on_startup_drop_completed_blocked_mon_updates_through( &mut self, logger: &L, loaded_mon_update_id: u64, - ) { - self.context.blocked_monitor_updates.retain(|update| { + ) -> Vec { + let mut monitor_events_to_ack = Vec::new(); + self.context.blocked_monitor_updates.retain_mut(|update| { if update.update.update_id <= loaded_mon_update_id { log_info!( logger, "Dropping completed ChannelMonitorUpdate id {} due to a stale ChannelManager", update.update.update_id, ); + monitor_events_to_ack + .extend(core::mem::take(&mut update.post_update_ackable_events)); false } else { true } }); + monitor_events_to_ack } pub fn blocked_monitor_updates_pending(&self) -> usize { diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index dfba7ebf581..2ad321065dd 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -15392,9 +15392,18 @@ impl< channel_id) { if let Some(chan) = chan_entry.get_mut().as_funded_mut() { let channel_funding_outpoint = chan.funding_outpoint(); - if let Some((monitor_update, further_update_exists)) = chan.unblock_next_blocked_monitor_update() { + if let Some((monitor_update, monitor_events_to_ack, further_update_exists)) = chan.unblock_next_blocked_monitor_update() { log_debug!(logger, "Unlocking monitor updating and updating monitor", ); + if !monitor_events_to_ack.is_empty() { + peer_state + .monitor_update_blocked_actions + .entry(channel_id) + .or_default() + .push(MonitorUpdateCompletionAction::AckMonitorEvents { + monitor_events_to_ack, + }); + } let post_update_data = self.handle_new_monitor_update( &mut peer_state.in_flight_monitor_updates, &mut peer_state.monitor_update_blocked_actions, @@ -19270,10 +19279,14 @@ impl< } } } else { - channel.on_startup_drop_completed_blocked_mon_updates_through( - &logger, - monitor.get_latest_update_id(), - ); + let events_to_ack = channel + .on_startup_drop_completed_blocked_mon_updates_through( + &logger, + monitor.get_latest_update_id(), + ); + for source in events_to_ack { + args.chain_monitor.ack_monitor_event(source); + } log_info!(logger, "Successfully loaded at update_id {} against monitor at update id {} with {} blocked updates", channel.context.get_latest_monitor_update_id(), monitor.get_latest_update_id(), channel.blocked_monitor_updates_pending()); From 63fc6982040977f7dd2669c6ad412af2d72d99dc Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 17 Mar 2026 15:31:39 -0400 Subject: [PATCH 20/20] Support persistent monitor events Currently, the resolution of HTLCs (and decisions on when HTLCs can be forwarded) is the responsibility of Channel objects (a part of ChannelManager) until the channel is closed, and then the ChannelMonitor thereafter. This leads to some complexity around race conditions for HTLCs right around channel closure. Additionally, there is lots of complexity reconstructing the state of all HTLCs in the ChannelManager deserialization/loading logic. Instead, we want to do all resolution in ChannelMonitors (in response to ChannelMonitorUpdates) and pass them back to ChannelManager in the form of MonitorEvents (similar to how HTLCs are resolved after channels are closed). In order to have reliable resolution, we'll need to keep MonitorEvents around in the ChannelMonitor until the ChannelManager has finished processing them. This will simplify things - on restart instead of examining the set of HTLCs in monitors we can simply replay all the pending MonitorEvents. Here we complete work that was built on recent prior commits and actually start re-providing monitor events on startup if they went un-acked during runtime. This isn't actually supported in prod yet, so this new code will run randomly in tests, to ensure we still support the old paths. --- lightning/src/chain/channelmonitor.rs | 53 +++++++++++++++++++++------ lightning/src/ln/channelmanager.rs | 27 +++++++++++++- lightning/src/ln/monitor_tests.rs | 3 ++ lightning/src/util/config.rs | 6 +++ 4 files changed, 77 insertions(+), 12 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 2457c752cfd..0021523b569 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1288,6 +1288,11 @@ pub(crate) struct ChannelMonitorImpl { // block/transaction-connected events and *not* during block/transaction-disconnected events, // we further MUST NOT generate events during block/transaction-disconnection. pending_monitor_events: Vec<(u64, MonitorEvent)>, + // `MonitorEvent`s that have been provided to the `ChannelManager` via + // [`ChannelMonitor::get_and_clear_pending_monitor_events`] and are awaiting + // [`ChannelMonitor::ack_monitor_event`] for removal. If an event in this queue is not ack'd, it + // will be re-provided to the `ChannelManager` on startup. + provided_monitor_events: Vec<(u64, MonitorEvent)>, /// When set, monitor events are retained until explicitly acked rather than cleared on read. /// /// Allows the ChannelManager to reconstruct pending HTLC state by replaying monitor events on @@ -1764,7 +1769,12 @@ pub(crate) fn write_chanmon_internal( // Only write `persistent_events_enabled` if it's set to true, as it's an even TLV. let persistent_events_enabled = channel_monitor.persistent_events_enabled.then_some(()); let pending_mon_evs_with_ids = if persistent_events_enabled.is_some() { - Some(Iterable(channel_monitor.pending_monitor_events.iter())) + Some(Iterable( + channel_monitor + .provided_monitor_events + .iter() + .chain(channel_monitor.pending_monitor_events.iter()), + )) } else { None }; @@ -1977,6 +1987,7 @@ impl ChannelMonitor { payment_preimages: new_hash_map(), pending_monitor_events: Vec::new(), + provided_monitor_events: Vec::new(), persistent_events_enabled: false, next_monitor_event_id: 0, pending_events: Vec::new(), @@ -2202,8 +2213,15 @@ impl ChannelMonitor { /// Removes a [`MonitorEvent`] by its event ID, acknowledging that it has been processed. /// Generally called by [`chain::Watch::ack_monitor_event`]. - pub fn ack_monitor_event(&self, _event_id: u64) { - // TODO: once events have ids, remove the corresponding event here + pub fn ack_monitor_event(&self, event_id: u64) { + let inner = &mut *self.inner.lock().unwrap(); + inner.provided_monitor_events.retain(|(id, _)| *id != event_id); + } + + /// Enables persistent monitor events mode. When enabled, monitor events are retained until + /// explicitly acked rather than cleared on read. + pub(crate) fn set_persistent_events_enabled(&self, enabled: bool) { + self.inner.lock().unwrap().persistent_events_enabled = enabled; } /// Copies [`MonitorEvent`] state from `other` into `self`. @@ -2211,11 +2229,16 @@ impl ChannelMonitor { /// serialization round-trip. #[cfg(any(test, feature = "_test_utils"))] pub fn copy_monitor_event_state(&self, other: &ChannelMonitor) { - let (pending, next_id) = { + let (provided, pending, next_id) = { let other_inner = other.inner.lock().unwrap(); - (other_inner.pending_monitor_events.clone(), other_inner.next_monitor_event_id) + ( + other_inner.provided_monitor_events.clone(), + other_inner.pending_monitor_events.clone(), + other_inner.next_monitor_event_id, + ) }; let mut self_inner = self.inner.lock().unwrap(); + self_inner.provided_monitor_events = provided; self_inner.pending_monitor_events = pending; self_inner.next_monitor_event_id = next_id; } @@ -4619,9 +4642,16 @@ impl ChannelMonitorImpl { } fn get_and_clear_pending_monitor_events(&mut self) -> Vec<(u64, MonitorEvent)> { - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut self.pending_monitor_events); - ret + if self.persistent_events_enabled { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + self.provided_monitor_events.extend(ret.iter().cloned()); + ret + } else { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_monitor_events); + ret + } } /// Gets the set of events that are repeated regularly (e.g. those which RBF bump @@ -5948,8 +5978,8 @@ impl ChannelMonitorImpl { if inbound_htlc_expiry > max_expiry_height { continue; } - let duplicate_event = self.pending_monitor_events.iter().any( - |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { + let duplicate_event = self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()) + .any(|(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == *source } else { false }); if duplicate_event { @@ -6365,7 +6395,7 @@ impl ChannelMonitorImpl { // HTLC resolution backwards to and figure out whether we learned a preimage from it. if let Some((source, payment_hash, amount_msat)) = payment_data { if accepted_preimage_claim { - if !self.pending_monitor_events.iter().any( + if !self.pending_monitor_events.iter().chain(self.provided_monitor_events.iter()).any( |(_, update)| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) { self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid: tx.compute_txid(), @@ -6971,6 +7001,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP payment_preimages, pending_monitor_events, + provided_monitor_events: Vec::new(), persistent_events_enabled: persistent_events_enabled.is_some(), next_monitor_event_id, pending_events, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 2ad321065dd..32b0941889c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3641,6 +3641,8 @@ impl< our_network_pubkey, current_timestamp, expanded_inbound_key, node_signer.get_receive_auth_key(), secp_ctx.clone(), message_router, logger.clone(), ); + #[cfg(test)] + let override_persistent_monitor_events = config.override_persistent_monitor_events; ChannelManager { config: RwLock::new(config), @@ -3697,7 +3699,27 @@ impl< logger, - persistent_monitor_events: false, + persistent_monitor_events: { + #[cfg(not(test))] + { false } + #[cfg(test)] + { + override_persistent_monitor_events.unwrap_or_else(|| { + use core::hash::{BuildHasher, Hasher}; + match std::env::var("LDK_TEST_PERSISTENT_MON_EVENTS") { + Ok(val) => match val.as_str() { + "1" => true, + "0" => false, + _ => panic!("LDK_TEST_PERSISTENT_MON_EVENTS must be 0 or 1, got: {}", val), + }, + Err(_) => { + let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish(); + rand_val % 2 == 0 + }, + } + }) + } + }, #[cfg(feature = "_test_utils")] testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), @@ -11721,6 +11743,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ fail_chan!("Already had channel with the new channel_id"); }, hash_map::Entry::Vacant(e) => { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { // There's no problem signing a counterparty's funding transaction if our monitor @@ -11891,6 +11914,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ match chan .funding_signed(&msg, best_block, &self.signer_provider, &self.logger) .and_then(|(funded_chan, monitor)| { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); self.chain_monitor .watch_channel(funded_chan.context.channel_id(), monitor) .map_err(|()| { @@ -12806,6 +12830,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(chan) = chan.as_funded_mut() { if let Some(monitor) = monitor_opt { + monitor.set_persistent_events_enabled(self.persistent_monitor_events); let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index 1a68e5165a8..4c97df2f42e 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -3592,6 +3592,9 @@ fn do_test_lost_timeout_monitor_events(confirm_tx: CommitmentType, dust_htlcs: b let mut cfg = test_default_channel_config(); cfg.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; cfg.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor; + // This test specifically tests lost monitor events, which requires the legacy + // (non-persistent) monitor event behavior. + cfg.override_persistent_monitor_events = Some(false); let cfgs = [Some(cfg.clone()), Some(cfg.clone()), Some(cfg.clone())]; let chanmon_cfgs = create_chanmon_cfgs(3); diff --git a/lightning/src/util/config.rs b/lightning/src/util/config.rs index e4158910b9a..6b0f7942423 100644 --- a/lightning/src/util/config.rs +++ b/lightning/src/util/config.rs @@ -1103,6 +1103,10 @@ pub struct UserConfig { /// /// [`ChannelManager::splice_channel`]: crate::ln::channelmanager::ChannelManager::splice_channel pub reject_inbound_splices: bool, + /// If set to `Some`, overrides the random selection of whether to use persistent monitor + /// events. Only available in tests. + #[cfg(test)] + pub override_persistent_monitor_events: Option, } impl Default for UserConfig { @@ -1119,6 +1123,8 @@ impl Default for UserConfig { enable_htlc_hold: false, hold_outbound_htlcs_at_next_hop: false, reject_inbound_splices: true, + #[cfg(test)] + override_persistent_monitor_events: None, } } }