From 3cad6af6979fb7b72379050c3e6a46329994b1eb Mon Sep 17 00:00:00 2001 From: Abeeujah Date: Sat, 20 Jun 2026 21:04:37 +0100 Subject: [PATCH] Avoid heap-allocating background processor futures Replace Box::pin with core::pin::pin! in process_events_async now that MSRV is 1.75. This eliminates a heap allocation per task on every loop iteration by pinning the futures directly to the stack. To satisfy lifetime and Joiner bounds, the loop logic was refactored to run synchronous timer checks first, using flags to conditionally execute the stack-pinned futures. Existing eager polling and early-break semantics are preserved. --- lightning-background-processor/src/lib.rs | 171 ++++++++++++---------- 1 file changed, 90 insertions(+), 81 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 8ab20d5a1f3..f2b3cdd1831 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1120,21 +1120,14 @@ where None => {}, } - // We capture pending_operation_count inside the persistence branch to - // avoid a race: ChannelManager handlers queue deferred monitor ops - // before the persistence flag is set. Capturing outside would let us - // observe pending ops while the flag is still unset, causing us to - // flush monitor writes without persisting the ChannelManager. - // Declared before futures so it outlives the Joiner (drop order). - let pending_monitor_writes; - let mut futures = Joiner::new(); - if channel_manager.get_cm().get_and_clear_needs_persistence() { - pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); - log_trace!(logger, "Persisting ChannelManager..."); + let needs_cm_persist = channel_manager.get_cm().get_and_clear_needs_persistence(); + let mut cm_fut = core::pin::pin!(async { + if needs_cm_persist { + // Capture the monitor operations pending before we persist the ChannelManager. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); - let fut = async { kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1147,22 +1140,24 @@ where // Flush monitor operations that were pending before we persisted. New updates // that arrived after are left for the next iteration. chain_monitor.get_cm().flush(pending_monitor_writes, &logger); - Ok(()) - }; - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - let mut fut = Box::pin(fut); - - // Because persisting the ChannelManager is important to avoid accidental - // force-closures, go ahead and poll the future once before we do slightly more - // CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping - // below. This will get it moving but won't block us for too long if the underlying - // future is actually async. + } + Ok(()) + }); + + // Because persisting the ChannelManager is important to avoid accidental force-closures, + // go ahead and poll the future once before we do slightly more CPU-intensive tasks in the + // form of NetworkGraph pruning or scorer time-stepping below. This will get it moving but + // won't block us for too long if the underlying future is actually async. We stash the + // outcome and feed it into the `Joiner` once it is constructed. + if needs_cm_persist { + log_trace!(logger, "Persisting ChannelManager..."); + use core::future::Future; let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(&mut fut).poll(&mut ctx) { + match cm_fut.as_mut().poll(&mut ctx) { task::Poll::Ready(res) => futures.set_a_res(res), - task::Poll::Pending => futures.set_a(fut), + task::Poll::Pending => futures.set_a(cm_fut), } log_trace!(logger, "Done persisting ChannelManager."); @@ -1210,7 +1205,8 @@ where GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, _ => prune_timer_elapsed, }; - if should_prune { + + let network_graph_to_persist = if should_prune { // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = gossip_sync.prunable_network_graph() { if let Some(duration_since_epoch) = fetch_time() { @@ -1222,28 +1218,15 @@ where log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."); log_trace!(logger, "Persisting network graph."); } - let fut = async { - if let Err(e) = kv_store - .write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - network_graph.encode(), - ) - .await - { - log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); - } - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_b(Box::pin(fut)); have_pruned = true; + Some(network_graph) + } else { + None } - } + } else { + None + }; if !have_decayed_scorer { if let Some(ref scorer) = scorer { if let Some(duration_since_epoch) = fetch_time() { @@ -1253,7 +1236,9 @@ where } have_decayed_scorer = true; } - match check_and_reset_sleeper(&mut last_scorer_persist_call, || { + // Step the scorer forward synchronously here, deferring the actual write to the + // future built below. + let persist_scorer = match check_and_reset_sleeper(&mut last_scorer_persist_call, || { sleeper(SCORER_PERSIST_TIMER) }) { Some(false) => { @@ -1264,7 +1249,46 @@ where } else { log_trace!(logger, "Persisting scorer"); } - let fut = async { + true + } else { + false + } + }, + Some(true) => break, + None => false, + }; + let persist_sweeper = + match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) { + Some(false) => { + log_trace!(logger, "Regenerating sweeper spends if necessary"); + true + }, + Some(true) => break, + None => false, + }; + + let network_graph_fut = core::pin::pin!(async { + if let Some(network_graph) = network_graph_to_persist { + if let Err(e) = kv_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + network_graph.encode(), + ) + .await + { + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); + } + } + Ok(()) + }); + futures.set_b(network_graph_fut); + + let scorer_fut = + core::pin::pin!(async { + if persist_scorer { + if let Some(ref scorer) = scorer { if let Err(e) = kv_store .write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1274,43 +1298,26 @@ where ) .await { - log_error!( - logger, - "Error: Failed to persist scorer, check your disk and permissions {}", - e - ); + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); } - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_c(Box::pin(fut)); + } } - }, - Some(true) => break, - None => {}, - } - match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) { - Some(false) => { - log_trace!(logger, "Regenerating sweeper spends if necessary"); - if let Some(ref sweeper) = sweeper { - let fut = async { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; - - Ok(()) - }; + Ok(()) + }); + futures.set_c(scorer_fut); - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_d(Box::pin(fut)); + let sweeper_fut = core::pin::pin!(async { + if persist_sweeper { + if let Some(ref sweeper) = sweeper { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; } - }, - Some(true) => break, - None => {}, - } + } + Ok(()) + }); + futures.set_d(sweeper_fut); - if let Some(liquidity_manager) = liquidity_manager.as_ref() { - let fut = async { + let lm_fut = core::pin::pin!(async { + if let Some(liquidity_manager) = liquidity_manager.as_ref() { liquidity_manager .get_lm() .persist() @@ -1324,9 +1331,11 @@ where log_error!(logger, "Persisting LiquidityManager failed: {}", e); e }) - }; - futures.set_e(Box::pin(fut)); - } + } else { + Ok(()) + } + }); + futures.set_e(lm_fut); // Run persistence tasks in parallel and exit if any of them returns an error. for res in futures.await {