diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 8ab20d5a1f3..f2b3cdd1831 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1120,21 +1120,14 @@ where None => {}, } - // We capture pending_operation_count inside the persistence branch to - // avoid a race: ChannelManager handlers queue deferred monitor ops - // before the persistence flag is set. Capturing outside would let us - // observe pending ops while the flag is still unset, causing us to - // flush monitor writes without persisting the ChannelManager. - // Declared before futures so it outlives the Joiner (drop order). - let pending_monitor_writes; - let mut futures = Joiner::new(); - if channel_manager.get_cm().get_and_clear_needs_persistence() { - pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); - log_trace!(logger, "Persisting ChannelManager..."); + let needs_cm_persist = channel_manager.get_cm().get_and_clear_needs_persistence(); + let mut cm_fut = core::pin::pin!(async { + if needs_cm_persist { + // Capture the monitor operations pending before we persist the ChannelManager. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); - let fut = async { kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1147,22 +1140,24 @@ where // Flush monitor operations that were pending before we persisted. New updates // that arrived after are left for the next iteration. chain_monitor.get_cm().flush(pending_monitor_writes, &logger); - Ok(()) - }; - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - let mut fut = Box::pin(fut); - - // Because persisting the ChannelManager is important to avoid accidental - // force-closures, go ahead and poll the future once before we do slightly more - // CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping - // below. This will get it moving but won't block us for too long if the underlying - // future is actually async. + } + Ok(()) + }); + + // Because persisting the ChannelManager is important to avoid accidental force-closures, + // go ahead and poll the future once before we do slightly more CPU-intensive tasks in the + // form of NetworkGraph pruning or scorer time-stepping below. This will get it moving but + // won't block us for too long if the underlying future is actually async. We stash the + // outcome and feed it into the `Joiner` once it is constructed. + if needs_cm_persist { + log_trace!(logger, "Persisting ChannelManager..."); + use core::future::Future; let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(&mut fut).poll(&mut ctx) { + match cm_fut.as_mut().poll(&mut ctx) { task::Poll::Ready(res) => futures.set_a_res(res), - task::Poll::Pending => futures.set_a(fut), + task::Poll::Pending => futures.set_a(cm_fut), } log_trace!(logger, "Done persisting ChannelManager."); @@ -1210,7 +1205,8 @@ where GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, _ => prune_timer_elapsed, }; - if should_prune { + + let network_graph_to_persist = if should_prune { // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = gossip_sync.prunable_network_graph() { if let Some(duration_since_epoch) = fetch_time() { @@ -1222,28 +1218,15 @@ where log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."); log_trace!(logger, "Persisting network graph."); } - let fut = async { - if let Err(e) = kv_store - .write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - network_graph.encode(), - ) - .await - { - log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); - } - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_b(Box::pin(fut)); have_pruned = true; + Some(network_graph) + } else { + None } - } + } else { + None + }; if !have_decayed_scorer { if let Some(ref scorer) = scorer { if let Some(duration_since_epoch) = fetch_time() { @@ -1253,7 +1236,9 @@ where } have_decayed_scorer = true; } - match check_and_reset_sleeper(&mut last_scorer_persist_call, || { + // Step the scorer forward synchronously here, deferring the actual write to the + // future built below. + let persist_scorer = match check_and_reset_sleeper(&mut last_scorer_persist_call, || { sleeper(SCORER_PERSIST_TIMER) }) { Some(false) => { @@ -1264,7 +1249,46 @@ where } else { log_trace!(logger, "Persisting scorer"); } - let fut = async { + true + } else { + false + } + }, + Some(true) => break, + None => false, + }; + let persist_sweeper = + match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) { + Some(false) => { + log_trace!(logger, "Regenerating sweeper spends if necessary"); + true + }, + Some(true) => break, + None => false, + }; + + let network_graph_fut = core::pin::pin!(async { + if let Some(network_graph) = network_graph_to_persist { + if let Err(e) = kv_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + network_graph.encode(), + ) + .await + { + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); + } + } + Ok(()) + }); + futures.set_b(network_graph_fut); + + let scorer_fut = + core::pin::pin!(async { + if persist_scorer { + if let Some(ref scorer) = scorer { if let Err(e) = kv_store .write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1274,43 +1298,26 @@ where ) .await { - log_error!( - logger, - "Error: Failed to persist scorer, check your disk and permissions {}", - e - ); + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); } - - Ok(()) - }; - - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_c(Box::pin(fut)); + } } - }, - Some(true) => break, - None => {}, - } - match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) { - Some(false) => { - log_trace!(logger, "Regenerating sweeper spends if necessary"); - if let Some(ref sweeper) = sweeper { - let fut = async { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; - - Ok(()) - }; + Ok(()) + }); + futures.set_c(scorer_fut); - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - futures.set_d(Box::pin(fut)); + let sweeper_fut = core::pin::pin!(async { + if persist_sweeper { + if let Some(ref sweeper) = sweeper { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; } - }, - Some(true) => break, - None => {}, - } + } + Ok(()) + }); + futures.set_d(sweeper_fut); - if let Some(liquidity_manager) = liquidity_manager.as_ref() { - let fut = async { + let lm_fut = core::pin::pin!(async { + if let Some(liquidity_manager) = liquidity_manager.as_ref() { liquidity_manager .get_lm() .persist() @@ -1324,9 +1331,11 @@ where log_error!(logger, "Persisting LiquidityManager failed: {}", e); e }) - }; - futures.set_e(Box::pin(fut)); - } + } else { + Ok(()) + } + }); + futures.set_e(lm_fut); // Run persistence tasks in parallel and exit if any of them returns an error. for res in futures.await {