Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 121 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "1.11.0"
version = "1.11.1"
edition = "2024"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
Expand Down
5 changes: 5 additions & 0 deletions crates/chain-state/src/deferred_trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ impl DeferredTrieData {
/// Given that invariant, circular wait dependencies are impossible.
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
pub fn wait_cloned(&self) -> ComputedTrieData {
#[cfg(feature = "rayon")]
debug_assert!(
rayon::current_thread_index().is_none(),
"wait_cloned must not be called from a rayon worker thread"
);
let mut state = self.state.lock();
match &mut *state {
// If the deferred trie data is ready, return the cached result.
Expand Down
37 changes: 37 additions & 0 deletions crates/engine/primitives/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;

/// Default maximum number of entries the persistence pruner may delete per run.
/// Caps MDBX dirty page accumulation to ~400 MB (100k entries × ~4 KB pages).
pub const DEFAULT_PERSISTENCE_PRUNER_DELETE_LIMIT: usize = 100_000;

/// Returns the default number of storage worker threads based on available parallelism.
fn default_storage_worker_count() -> usize {
#[cfg(feature = "std")]
Expand Down Expand Up @@ -186,6 +190,13 @@ pub struct TreeConfig {
/// computation is spawned in parallel and whichever finishes first is used.
/// If `None`, the timeout fallback is disabled.
state_root_task_timeout: Option<Duration>,
/// Maximum number of entries the persistence pruner may delete in a single run.
/// Limits MDBX dirty page accumulation to prevent OOM during the first prune after startup.
persistence_pruner_delete_limit: usize,
/// Timeout for the persistence pruner per run. Prevents a single prune from blocking
/// block persistence for too long. Account and Storage History segments treat this as
/// a soft limit.
persistence_pruner_timeout: Option<Duration>,
}

impl Default for TreeConfig {
Expand Down Expand Up @@ -220,6 +231,8 @@ impl Default for TreeConfig {
sparse_trie_max_storage_tries: DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout: Some(DEFAULT_STATE_ROOT_TASK_TIMEOUT),
persistence_pruner_delete_limit: DEFAULT_PERSISTENCE_PRUNER_DELETE_LIMIT,
persistence_pruner_timeout: None,
}
}
}
Expand Down Expand Up @@ -286,6 +299,8 @@ impl TreeConfig {
sparse_trie_max_storage_tries,
disable_sparse_trie_cache_pruning: false,
state_root_task_timeout,
persistence_pruner_delete_limit: DEFAULT_PERSISTENCE_PRUNER_DELETE_LIMIT,
persistence_pruner_timeout: None,
}
}

Expand Down Expand Up @@ -656,4 +671,26 @@ impl TreeConfig {
self.state_root_task_timeout = timeout;
self
}

/// Returns the persistence pruner delete limit.
pub const fn persistence_pruner_delete_limit(&self) -> usize {
self.persistence_pruner_delete_limit
}

/// Setter for persistence pruner delete limit.
pub const fn with_persistence_pruner_delete_limit(mut self, limit: usize) -> Self {
self.persistence_pruner_delete_limit = limit;
self
}

/// Returns the persistence pruner timeout.
pub const fn persistence_pruner_timeout(&self) -> Option<Duration> {
self.persistence_pruner_timeout
}

/// Setter for persistence pruner timeout.
pub const fn with_persistence_pruner_timeout(mut self, timeout: Option<Duration>) -> Self {
self.persistence_pruner_timeout = timeout;
self
}
}
11 changes: 9 additions & 2 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ where
let start_time = Instant::now();

if let Some(last) = last_block {
// Commit block data immediately so persistence is not blocked by pruning.
// The pruner can take hours on the first run after startup (its
// previous_tip_block_number starts at None, triggering a full segment scan
// even when the pipeline already pruned). Running it in a separate transaction
// ensures blocks become durable on disk without waiting for pruning to finish.
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;

Expand All @@ -164,14 +169,16 @@ where
provider_rw.save_safe_block_number(safe)?;
}

provider_rw.commit()?;

if self.pruner.is_pruning_needed(last.number) {
debug!(target: "engine::persistence", block_num=?last.number, "Running pruner");
let prune_start = Instant::now();
let provider_rw = self.provider.database_provider_rw()?;
let _ = self.pruner.run_with_provider(&provider_rw, last.number)?;
provider_rw.commit()?;
self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
}

provider_rw.commit()?;
}

debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
Expand Down
13 changes: 12 additions & 1 deletion crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,17 @@ where
} else if self.should_persist() {
let blocks_to_persist =
self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
if blocks_to_persist.is_empty() {
// This indicates the canonical head block is not in blocks_by_hash,
// which prevents the chain walk from collecting blocks to persist.
warn!(
target: "engine::tree",
canonical_head = ?self.state.tree_state.canonical_head(),
last_persisted = ?self.persistence_state.last_persisted_block,
"should_persist=true but no blocks found to persist; \
canonical head may not be in memory"
);
}
self.persist_blocks(blocks_to_persist);
}
}
Expand Down Expand Up @@ -1411,7 +1422,7 @@ where
// Spawn a background task to trigger computation so it's ready when the next payload
// arrives.
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
rayon::spawn(move || {
tokio::task::spawn_blocking(move || {
let _ = overlay.get();
});
}
Expand Down
5 changes: 5 additions & 0 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ impl EngineNodeLauncher {
pruner_builder =
pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
}
pruner_builder =
pruner_builder.delete_limit(engine_tree_config.persistence_pruner_delete_limit());
if let Some(timeout) = engine_tree_config.persistence_pruner_timeout() {
pruner_builder = pruner_builder.timeout(timeout);
}
let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
let pruner_events = pruner.events();
info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
Expand Down
39 changes: 37 additions & 2 deletions crates/node/core/src/args/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use clap::{builder::Resettable, Args};
use reth_engine_primitives::{
TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES,
DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_PERSISTENCE_PRUNER_DELETE_LIMIT,
DEFAULT_SPARSE_TRIE_MAX_STORAGE_TRIES, DEFAULT_SPARSE_TRIE_PRUNE_DEPTH,
};
use std::{sync::OnceLock, time::Duration};

Expand Down Expand Up @@ -400,6 +400,25 @@ pub struct EngineArgs {
default_value = DefaultEngineValues::get_global().state_root_task_timeout.as_deref().unwrap_or("1s"),
)]
pub state_root_task_timeout: Option<Duration>,

/// Maximum number of entries the persistence pruner may delete in a single run.
/// Limits MDBX dirty page accumulation to prevent OOM when the pruner runs for the
/// first time after startup on a large database.
///
/// Set to 0 to disable the limit (unlimited).
#[arg(long = "engine.persistence-pruner-delete-limit", default_value_t = DEFAULT_PERSISTENCE_PRUNER_DELETE_LIMIT)]
pub persistence_pruner_delete_limit: usize,

/// Timeout for the persistence pruner per run. Prevents a single prune from blocking
/// block persistence for too long.
///
/// CAUTION: Account and Storage History segments treat this as a soft limit.
///
/// Set to 0s to disable.
///
/// --engine.persistence-pruner-timeout 30s
#[arg(long = "engine.persistence-pruner-timeout", value_parser = humantime::parse_duration)]
pub persistence_pruner_timeout: Option<Duration>,
}

#[allow(deprecated)]
Expand Down Expand Up @@ -464,6 +483,8 @@ impl Default for EngineArgs {
state_root_task_timeout: state_root_task_timeout
.as_deref()
.map(|s| humantime::parse_duration(s).expect("valid default duration")),
persistence_pruner_delete_limit: DEFAULT_PERSISTENCE_PRUNER_DELETE_LIMIT,
persistence_pruner_timeout: None,
}
}
}
Expand Down Expand Up @@ -498,6 +519,14 @@ impl EngineArgs {
.with_sparse_trie_max_storage_tries(self.sparse_trie_max_storage_tries)
.with_disable_sparse_trie_cache_pruning(self.disable_sparse_trie_cache_pruning)
.with_state_root_task_timeout(self.state_root_task_timeout.filter(|d| !d.is_zero()))
.with_persistence_pruner_delete_limit(if self.persistence_pruner_delete_limit == 0 {
usize::MAX
} else {
self.persistence_pruner_delete_limit
})
.with_persistence_pruner_timeout(
self.persistence_pruner_timeout.filter(|d| !d.is_zero()),
)
}
}

Expand Down Expand Up @@ -553,6 +582,8 @@ mod tests {
sparse_trie_max_storage_tries: 100,
disable_sparse_trie_cache_pruning: true,
state_root_task_timeout: Some(Duration::from_secs(2)),
persistence_pruner_delete_limit: 100_000,
persistence_pruner_timeout: Some(Duration::from_secs(30)),
};

let parsed_args = CommandParser::<EngineArgs>::parse_from([
Expand Down Expand Up @@ -591,6 +622,10 @@ mod tests {
"--engine.disable-sparse-trie-cache-pruning",
"--engine.state-root-task-timeout",
"2s",
"--engine.persistence-pruner-delete-limit",
"100000",
"--engine.persistence-pruner-timeout",
"30s",
])
.args;

Expand Down
77 changes: 38 additions & 39 deletions crates/trie/sparse/src/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl SparseTrie for ParallelSparseTrie {
#[cfg(feature = "std")]
// Reveal lower subtrie nodes in parallel
{
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use tracing::Span;

// Capture the current span so it can be propagated to rayon worker threads
Expand All @@ -267,23 +267,39 @@ impl SparseTrie for ParallelSparseTrie {
// Capture reference to upper subtrie nodes for boundary leaf reachability checks
let upper_nodes = &self.upper_subtrie.nodes;

// Group the nodes by lower subtrie. This must be collected into a Vec in order for
// rayon's `zip` to be happy.
let node_groups: Vec<_> = lower_nodes
// Group the nodes by lower subtrie.
let results = lower_nodes
.chunk_by(|node_a, node_b| {
SparseSubtrieType::from_path(&node_a.path) ==
SparseSubtrieType::from_path(&node_b.path)
})
.collect();

// Take the lower subtries in the same order that the nodes were grouped into, so that
// the two can be zipped together. This also must be collected into a Vec for rayon's
// `zip` to be happy.
let lower_subtries: Vec<_> = node_groups
.iter()
// Filter out chunks for unreachable subtries.
.filter_map(|nodes| {
// NOTE: chunk_by won't produce empty groups
let node = &nodes[0];
let mut nodes = nodes
.iter()
.filter(|node| {
// For boundary leaves, check reachability from upper subtrie's parent
// branch.
if node.path.len() == UPPER_TRIE_MAX_DEPTH &&
!Self::is_boundary_leaf_reachable(
upper_nodes,
&node.path,
&node.node,
)
{
trace!(
target: "trie::parallel_sparse",
path = ?node.path,
"Boundary leaf not reachable from upper subtrie, skipping",
);
false
} else {
true
}
})
.peekable();

let node = nodes.peek()?;
let idx =
SparseSubtrieType::from_path(&node.path).lower_index().unwrap_or_else(
|| panic!("upper subtrie node {node:?} found amongst lower nodes"),
Expand All @@ -303,41 +319,24 @@ impl SparseTrie for ParallelSparseTrie {
// shortest path being revealed for each subtrie. Therefore we can reveal the
// subtrie itself using this path and retain correct behavior.
self.lower_subtries[idx].reveal(&node.path);
Some((idx, self.lower_subtries[idx].take_revealed().expect("just revealed")))
Some((
idx,
self.lower_subtries[idx].take_revealed().expect("just revealed"),
nodes,
))
})
.collect();

// Zip the lower subtries and their corresponding node groups, and reveal lower subtrie
// nodes in parallel
let results: Vec<_> = lower_subtries
.collect::<Vec<_>>()
.into_par_iter()
.zip(node_groups.into_par_iter())
.map(|((subtrie_idx, mut subtrie), nodes)| {
.map(|(subtrie_idx, mut subtrie, nodes)| {
// Enter the parent span to propagate context (e.g., hashed_address for storage
// tries) to the worker thread
let _guard = parent_span.enter();

// reserve space in the HashMap ahead of time; doing it on a node-by-node basis
// can cause multiple re-allocations as the hashmap grows.
subtrie.nodes.reserve(nodes.len());
subtrie.nodes.reserve(nodes.size_hint().1.unwrap_or(0));

for node in nodes {
// For boundary leaves, check reachability from upper subtrie's parent
// branch
if node.path.len() == UPPER_TRIE_MAX_DEPTH &&
!Self::is_boundary_leaf_reachable(
upper_nodes,
&node.path,
&node.node,
)
{
trace!(
target: "trie::parallel_sparse",
path = ?node.path,
"Boundary leaf not reachable from upper subtrie, skipping",
);
continue;
}
// Reveal each node in the subtrie, returning early on any errors
let res = subtrie.reveal_node(node.path, &node.node, node.masks);
if res.is_err() {
Expand All @@ -346,7 +345,7 @@ impl SparseTrie for ParallelSparseTrie {
}
(subtrie_idx, subtrie, Ok(()))
})
.collect();
.collect::<Vec<_>>();

// Put subtries back which were processed in the rayon pool, collecting the last
// seen error in the process and returning that.
Expand Down
2 changes: 1 addition & 1 deletion docs/vocs/vocs.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export default defineConfig({
},
{ text: 'GitHub', link: 'https://github.com/paradigmxyz/reth' },
{
text: 'v1.11.0',
text: 'v1.11.1',
items: [
{
text: 'Releases',
Expand Down