Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 90 additions & 81 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,21 +1120,14 @@ where
None => {},
}

// We capture pending_operation_count inside the persistence branch to
// avoid a race: ChannelManager handlers queue deferred monitor ops
// before the persistence flag is set. Capturing outside would let us
// observe pending ops while the flag is still unset, causing us to
// flush monitor writes without persisting the ChannelManager.
// Declared before futures so it outlives the Joiner (drop order).
let pending_monitor_writes;

let mut futures = Joiner::new();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
log_trace!(logger, "Persisting ChannelManager...");
let needs_cm_persist = channel_manager.get_cm().get_and_clear_needs_persistence();
let mut cm_fut = core::pin::pin!(async {
if needs_cm_persist {
// Capture the monitor operations pending before we persist the ChannelManager.
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();

let fut = async {
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1147,22 +1140,24 @@ where
// Flush monitor operations that were pending before we persisted. New updates
// that arrived after are left for the next iteration.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
Ok(())
};
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
let mut fut = Box::pin(fut);

// Because persisting the ChannelManager is important to avoid accidental
// force-closures, go ahead and poll the future once before we do slightly more
// CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping
// below. This will get it moving but won't block us for too long if the underlying
// future is actually async.
}
Ok(())
});

// Because persisting the ChannelManager is important to avoid accidental force-closures,
// go ahead and poll the future once before we do slightly more CPU-intensive tasks in the
// form of NetworkGraph pruning or scorer time-stepping below. This will get it moving but
// won't block us for too long if the underlying future is actually async. We stash the
// outcome and feed it into the `Joiner` once it is constructed.
if needs_cm_persist {
log_trace!(logger, "Persisting ChannelManager...");

use core::future::Future;
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
match cm_fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(res) => futures.set_a_res(res),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe there's any reason to change this, it would probably be simpler to keep the code structured the way it was.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've simplified it, although, I couldn't fully restore the original now that the futures are stack-pinned, Joiner holds Pin<&mut> references into them and so must be declared after all of them.

Meaning it doesn't exist yet at the eager-poll site, so the outcome has to be stashed and fed in once the Joiner is constructed. Open to a cleaner approach if you have any recommendations.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I had only tested a smaller patch and was hoping it would move up, but at least this reduces patch size marginally:

diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs
index b2b0c8d9b2..f2b3cdd183 100644
--- a/lightning-background-processor/src/lib.rs
+++ b/lightning-background-processor/src/lib.rs
@@ -1122,2 +1122,4 @@ where

+               let mut futures = Joiner::new();
+
                let needs_cm_persist = channel_manager.get_cm().get_and_clear_needs_persistence();
@@ -1149,3 +1151,2 @@ where
                // outcome and feed it into the `Joiner` once it is constructed.
-               let mut cm_persist_res = None;
                if needs_cm_persist {
@@ -1156,4 +1157,5 @@ where
                        let mut ctx = task::Context::from_waker(&mut waker);
-                       if let task::Poll::Ready(res) = cm_fut.as_mut().poll(&mut ctx) {
-                               cm_persist_res = Some(res);
+                       match cm_fut.as_mut().poll(&mut ctx) {
+                               task::Poll::Ready(res) => futures.set_a_res(res),
+                               task::Poll::Pending => futures.set_a(cm_fut),
                        }
@@ -1283,2 +1285,4 @@ where
                });
+               futures.set_b(network_graph_fut);
+
                let scorer_fut =
@@ -1302,2 +1306,4 @@ where
                        });
+               futures.set_c(scorer_fut);
+
                let sweeper_fut = core::pin::pin!(async {
@@ -1310,2 +1316,4 @@ where
                });
+               futures.set_d(sweeper_fut);
+
                let lm_fut = core::pin::pin!(async {
@@ -1329,12 +1337,2 @@ where
                });
-
-               let mut futures = Joiner::new();
-               match cm_persist_res {
-                       Some(res) => futures.set_a_res(res),
-                       None if needs_cm_persist => futures.set_a(cm_fut),
-                       None => {},
-               }
-               futures.set_b(network_graph_fut);
-               futures.set_c(scorer_fut);
-               futures.set_d(sweeper_fut);
                futures.set_e(lm_fut);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thanks, this works 👍

task::Poll::Pending => futures.set_a(fut),
task::Poll::Pending => futures.set_a(cm_fut),
}

log_trace!(logger, "Done persisting ChannelManager.");
Expand Down Expand Up @@ -1210,7 +1205,8 @@ where
GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
_ => prune_timer_elapsed,
};
if should_prune {

let network_graph_to_persist = if should_prune {
// The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = gossip_sync.prunable_network_graph() {
if let Some(duration_since_epoch) = fetch_time() {
Expand All @@ -1222,28 +1218,15 @@ where
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
log_trace!(logger, "Persisting network graph.");
}
let fut = async {
if let Err(e) = kv_store
.write(
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY,
network_graph.encode(),
)
.await
{
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
}

Ok(())
};

// TODO: Once our MSRV is 1.68 we should be able to drop the Box
futures.set_b(Box::pin(fut));

have_pruned = true;
Some(network_graph)
} else {
None
}
}
} else {
None
};
if !have_decayed_scorer {
if let Some(ref scorer) = scorer {
if let Some(duration_since_epoch) = fetch_time() {
Expand All @@ -1253,7 +1236,9 @@ where
}
have_decayed_scorer = true;
}
match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
// Step the scorer forward synchronously here, deferring the actual write to the
// future built below.
let persist_scorer = match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
sleeper(SCORER_PERSIST_TIMER)
}) {
Some(false) => {
Expand All @@ -1264,7 +1249,46 @@ where
} else {
log_trace!(logger, "Persisting scorer");
}
let fut = async {
true
} else {
false
}
},
Some(true) => break,
None => false,
};
let persist_sweeper =
match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
Some(false) => {
log_trace!(logger, "Regenerating sweeper spends if necessary");
true
},
Some(true) => break,
None => false,
};

let network_graph_fut = core::pin::pin!(async {
if let Some(network_graph) = network_graph_to_persist {
if let Err(e) = kv_store
.write(
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY,
network_graph.encode(),
)
.await
{
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
}
}
Ok(())
});
futures.set_b(network_graph_fut);

let scorer_fut =
core::pin::pin!(async {
if persist_scorer {
if let Some(ref scorer) = scorer {
if let Err(e) = kv_store
.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1274,43 +1298,26 @@ where
)
.await
{
log_error!(
logger,
"Error: Failed to persist scorer, check your disk and permissions {}",
e
);
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
}

Ok(())
};

// TODO: Once our MSRV is 1.68 we should be able to drop the Box
futures.set_c(Box::pin(fut));
}
}
},
Some(true) => break,
None => {},
}
match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
Some(false) => {
log_trace!(logger, "Regenerating sweeper spends if necessary");
if let Some(ref sweeper) = sweeper {
let fut = async {
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;

Ok(())
};
Ok(())
});
futures.set_c(scorer_fut);

// TODO: Once our MSRV is 1.68 we should be able to drop the Box
futures.set_d(Box::pin(fut));
let sweeper_fut = core::pin::pin!(async {
if persist_sweeper {
if let Some(ref sweeper) = sweeper {
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
}
},
Some(true) => break,
None => {},
}
}
Ok(())
});
futures.set_d(sweeper_fut);

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
let fut = async {
let lm_fut = core::pin::pin!(async {
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
liquidity_manager
.get_lm()
.persist()
Expand All @@ -1324,9 +1331,11 @@ where
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
e
})
};
futures.set_e(Box::pin(fut));
}
} else {
Ok(())
}
});
futures.set_e(lm_fut);

// Run persistence tasks in parallel and exit if any of them returns an error.
for res in futures.await {
Expand Down
Loading