diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8891089c44df0..5ff1ad000ca3d 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -11547,6 +11547,7 @@ dependencies = [ "auto_impl", "bincode 2.0.1", "bytes", + "crossbeam-channel", "derive_more", "eyre", "metrics", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 6c1c681aa84ae..77c4df23e71e6 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -637,6 +637,7 @@ url = { version = "2.5.8", default-features = false } vergen = "9.1.0" vergen-git2 = "9.1.0" byteorder = "1" +crossbeam-channel = "0.5.13" # allocators jemalloc_pprof = { version = "0.8", default-features = false } diff --git a/rust/op-reth/crates/cli/src/commands/op_proofs/prune.rs b/rust/op-reth/crates/cli/src/commands/op_proofs/prune.rs index 986a600ebfa5f..9c90661583a9e 100644 --- a/rust/op-reth/crates/cli/src/commands/op_proofs/prune.rs +++ b/rust/op-reth/crates/cli/src/commands/op_proofs/prune.rs @@ -85,8 +85,8 @@ impl> PruneCommand { storage, provider_factory, self.proofs_history_window, - self.proofs_history_prune_batch_size, ); + let pruner = pruner.with_batch_size(self.proofs_history_prune_batch_size); pruner.run(); }}; } diff --git a/rust/op-reth/crates/exex/src/lib.rs b/rust/op-reth/crates/exex/src/lib.rs index a02c2aff165f3..5d84c6b67eb3f 100644 --- a/rust/op-reth/crates/exex/src/lib.rs +++ b/rust/op-reth/crates/exex/src/lib.rs @@ -15,11 +15,10 @@ use reth_execution_types::Chain; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_api::{FullNodeComponents, NodePrimitives, NodeTypes}; use reth_optimism_trie::{ - OpProofStoragePrunerTask, OpProofsStorage, OpProofsProviderRO, OpProofsStore, - live::LiveTrieCollector, + live::LiveTrieCollector, OpProofsProviderRO, OpProofStoragePruner, OpProofsStore, }; use reth_provider::{BlockNumReader, BlockReader, TransactionVariant}; -use reth_trie::{HashedPostStateSorted, SortedTrieData, updates::TrieUpdatesSorted}; +use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, SortedTrieData}; use std::{sync::Arc, time::Duration}; use tokio::{sync::watch, task, time}; use tracing::{debug, error, info}; @@ -41,9 +40,6 @@ const SYNC_IDLE_SLEEP_SECS: u64 = 5; /// Default proofs history window: 1 month of blocks at 2s block time const DEFAULT_PROOFS_HISTORY_WINDOW: u64 = 1_296_000; -/// Default interval between proof-storage prune runs. Default is 15 seconds. -const DEFAULT_PRUNE_INTERVAL: Duration = Duration::from_secs(15); - /// Default verification interval: disabled const DEFAULT_VERIFICATION_INTERVAL: u64 = 0; // disabled @@ -54,9 +50,8 @@ where Node: FullNodeComponents, { ctx: ExExContext, - storage: OpProofsStorage, + storage: Storage, proofs_history_window: u64, - proofs_history_prune_interval: Duration, verification_interval: u64, } @@ -65,12 +60,11 @@ where Node: FullNodeComponents, { /// Create a new builder with required parameters and defaults. - pub const fn new(ctx: ExExContext, storage: OpProofsStorage) -> Self { + pub const fn new(ctx: ExExContext, storage: Storage) -> Self { Self { ctx, storage, proofs_history_window: DEFAULT_PROOFS_HISTORY_WINDOW, - proofs_history_prune_interval: DEFAULT_PRUNE_INTERVAL, verification_interval: DEFAULT_VERIFICATION_INTERVAL, } } @@ -81,12 +75,6 @@ where self } - /// Sets the interval between proof-storage prune runs. - pub const fn with_proofs_history_prune_interval(mut self, interval: Duration) -> Self { - self.proofs_history_prune_interval = interval; - self - } - /// Sets the verification interval. pub const fn with_verification_interval(mut self, interval: u64) -> Self { self.verification_interval = interval; @@ -99,7 +87,6 @@ where ctx: self.ctx, storage: self.storage, proofs_history_window: self.proofs_history_window, - proofs_history_prune_interval: self.proofs_history_prune_interval, verification_interval: self.verification_interval, } } @@ -124,8 +111,8 @@ where /// use reth_node_builder::{NodeBuilder, NodeConfig}; /// use reth_optimism_chainspec::BASE_MAINNET; /// use reth_optimism_exex::OpProofsExEx; -/// use reth_optimism_node::{OpNode, args::RollupArgs}; -/// use reth_optimism_trie::{InMemoryProofsStorage, OpProofsStorage, db::MdbxProofsStorage}; +/// use reth_optimism_node::{args::RollupArgs, OpNode}; +/// use reth_optimism_trie::{db::MdbxProofsStorageV2, InMemoryProofsStorage, OpProofsStorage}; /// use reth_provider::providers::BlockchainProvider; /// use std::{sync::Arc, time::Duration}; /// @@ -142,8 +129,8 @@ where /// # let temp_dir = tempfile::tempdir().expect("Failed to create temp dir"); /// # let storage_path = temp_dir.path().join("proofs_storage"); /// -/// # let storage: OpProofsStorage> = Arc::new( -/// # MdbxProofsStorage::new(&storage_path).expect("Failed to create MdbxProofsStorage"), +/// # let storage: OpProofsStorage> = Arc::new( +/// # MdbxProofsStorageV2::new(&storage_path).expect("Failed to create MdbxProofsStorageV2"), /// # ).into(); /// /// let storage_exec = storage.clone(); @@ -180,12 +167,10 @@ where /// events. ctx: ExExContext, /// The type of storage DB. - storage: OpProofsStorage, + storage: Storage, /// The window to span blocks for proofs history. Value is the number of blocks, received as /// cli arg. proofs_history_window: u64, - /// Interval between proof-storage prune runs - proofs_history_prune_interval: Duration, /// Verification interval: perform full block execution every N blocks for data integrity. /// If 0, verification is disabled (always use fast path when available). /// If 1, verification is always enabled (always execute blocks). @@ -197,14 +182,14 @@ where Node: FullNodeComponents, { /// Create a new `OpProofsExEx` instance. - pub fn new(ctx: ExExContext, storage: OpProofsStorage) -> Self { + pub fn new(ctx: ExExContext, storage: Storage) -> Self { OpProofsExExBuilder::new(ctx, storage).build() } /// Create a new builder for `OpProofsExEx`. pub const fn builder( ctx: ExExContext, - storage: OpProofsStorage, + storage: Storage, ) -> OpProofsExExBuilder { OpProofsExExBuilder::new(ctx, storage) } @@ -219,23 +204,21 @@ where /// Main execution loop for the ExEx pub async fn run(mut self) -> eyre::Result<()> { self.ensure_initialized()?; - let sync_target_tx = self.spawn_sync_task(); - let prune_task = OpProofStoragePrunerTask::new( + let pruner = OpProofStoragePruner::new( self.storage.clone(), self.ctx.provider().clone(), self.proofs_history_window, - self.proofs_history_prune_interval, ); - self.ctx - .task_executor() - .spawn_with_graceful_shutdown_signal(|signal| Box::pin(prune_task.run(signal))); - let collector = LiveTrieCollector::new( + let collector = Arc::new(LiveTrieCollector::new( self.ctx.evm_config().clone(), self.ctx.provider().clone(), - &self.storage, - ); + self.storage.clone(), + pruner, + )); + + let sync_target_tx = self.spawn_sync_task(collector.clone()); while let Some(notification) = self.ctx.notifications.try_next().await? { self.handle_notification(notification, &collector, &sync_target_tx)?; @@ -257,7 +240,7 @@ where } }; - let latest_block_number: u64 = match provider_ro.get_latest_block_number()? { + let latest_block_number = match provider_ro.get_latest_block_number()? { Some((n, _)) => n, None => { return Err(eyre::eyre!( @@ -277,41 +260,26 @@ where "Configuration requires pruning {} blocks, which exceeds the safety threshold of {}. \ Huge prune operations can stall the node. \ Please run 'op-reth proofs prune' manually before starting the node.", - blocks_to_prune, - MAX_PRUNE_BLOCKS_STARTUP + blocks_to_prune, + MAX_PRUNE_BLOCKS_STARTUP )); } } - // Need to update the earliest block metric on startup as this is not called frequently and - // can show outdated info. When metrics are disabled, this is a no-op. - #[cfg(feature = "metrics")] - { - self.storage - .metrics() - .block_metrics() - .earliest_number - .set(earliest_block_number as f64); - } - Ok(()) } /// Spawn the background sync task and return the target sender - fn spawn_sync_task(&self) -> watch::Sender { + fn spawn_sync_task( + &self, + collector: Arc>, + ) -> watch::Sender { let (sync_target_tx, sync_target_rx) = watch::channel(0u64); - - let task_storage = self.storage.clone(); let task_provider = self.ctx.provider().clone(); - let task_evm_config = self.ctx.evm_config().clone(); - self.ctx.task_executor().spawn_critical_task( "optimism::exex::proofs_storage_sync_loop", async move { - let storage = task_storage.clone(); - let task_collector = - LiveTrieCollector::new(task_evm_config, task_provider.clone(), &storage); - Self::sync_loop(sync_target_rx, task_storage, task_provider, &task_collector).await; + Self::sync_loop(sync_target_rx, task_provider, &collector).await; }, ); @@ -321,22 +289,19 @@ where /// Background sync loop that processes blocks up to the target async fn sync_loop( mut sync_target_rx: watch::Receiver, - storage: OpProofsStorage, provider: Node::Provider, - collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>, + collector: &LiveTrieCollector, ) { debug!(target: "optimism::exex", "Starting proofs storage sync loop"); loop { let target = *sync_target_rx.borrow_and_update(); - let latest = match storage.provider_ro().and_then(|p| p.get_latest_block_number()) { - Ok(Some((n, _))) => n, - Ok(None) => { - error!(target: "optimism::exex", "No blocks stored in proofs storage during sync loop"); - continue; - } + let latest = match collector.get_tip_block_number() { + Ok(n) => n, Err(e) => { - error!(target: "optimism::exex", error = ?e, "Failed to get latest block"); + error!(target: "optimism::exex", error = ?e, "Failed to get collector tip block"); + // If we can't read the tip, simple retry backoff or continue + time::sleep(Duration::from_secs(1)).await; continue; } }; @@ -364,7 +329,7 @@ where start: u64, target: u64, provider: &Node::Provider, - collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>, + collector: &LiveTrieCollector, batch_size: usize, ) -> eyre::Result<()> { let end = (start + batch_size as u64).min(target); @@ -389,12 +354,12 @@ where fn handle_notification( &self, notification: ExExNotification, - collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>, + collector: &LiveTrieCollector, sync_target_tx: &watch::Sender, ) -> eyre::Result<()> { - let latest_stored = match self.storage.provider_ro()?.get_latest_block_number()? { - Some((n, _)) => n, - None => { + let latest_stored = match collector.get_tip_block_number() { + Ok(n) => n, + Err(_) => { return Err(eyre::eyre!("No blocks stored in proofs storage")); } }; @@ -422,7 +387,7 @@ where &self, new: Arc>, latest_stored: u64, - collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>, + collector: &LiveTrieCollector, sync_target_tx: &watch::Sender, ) -> eyre::Result<()> { debug!( @@ -485,7 +450,7 @@ where &self, block_number: u64, chain: &Chain, - collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>, + collector: &LiveTrieCollector, ) -> eyre::Result<()> { // Check if this block should be verified via full execution let should_verify = self.verification_interval > 0 && @@ -554,7 +519,7 @@ where old: Arc>, new: Arc>, latest_stored: u64, - collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>, + collector: &LiveTrieCollector, ) -> eyre::Result<()> { info!( old_block_number = old.tip().number(), @@ -614,7 +579,7 @@ where &self, old: Arc>, latest_stored: u64, - collector: &LiveTrieCollector<'_, Node::Evm, Node::Provider, Storage>, + collector: &LiveTrieCollector, ) -> eyre::Result<()> { info!( target: "optimism::exex", @@ -642,21 +607,16 @@ where mod tests { use super::*; use alloy_consensus::private::alloy_primitives::B256; - use alloy_eips::{BlockNumHash, NumHash, eip1898::BlockWithParent}; + use alloy_eips::{eip1898::BlockWithParent, BlockNumHash, NumHash}; use reth_db::test_utils::tempdir_path; use reth_ethereum_primitives::{Block, Receipt}; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_optimism_trie::{ - BlockStateDiff, OpProofsStorage, OpProofsProviderRO, OpProofsProviderRw, OpProofsStore, - db::MdbxProofsStorage, + db::MdbxProofsStorageV2, BlockStateDiff, OpProofsProviderRO, OpProofsProviderRw, OpProofsStore, }; - - fn get_latest(proofs: &OpProofsStorage) -> Option<(u64, B256)> { - proofs.provider_ro().expect("provider_ro").get_latest_block_number().expect("get latest") - } use reth_primitives_traits::RecoveredBlock; - use reth_trie::{HashedPostStateSorted, LazyTrieData, updates::TrieUpdatesSorted}; - use std::{collections::BTreeMap, default::Default, sync::Arc, time::Duration}; + use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, LazyTrieData}; + use std::{collections::BTreeMap, default::Default, sync::Arc}; // ------------------------------------------------------------------------- // Helpers: deterministic blocks and deterministic Chain with precomputed updates @@ -717,25 +677,23 @@ mod tests { } // Init_storage to the genesis block - fn init_storage(storage: OpProofsStorage) { + fn init_storage(storage: S) { let genesis_block = NumHash::new(0, b256(0x00)); - let provider_rw = storage.provider_rw().expect("provider_rw"); - provider_rw - .set_earliest_block_number(genesis_block.number, genesis_block.hash) + let rw = storage.provider_rw().expect("provider rw"); + rw.set_earliest_block_number(genesis_block.number, genesis_block.hash) .expect("set earliest"); - provider_rw - .store_trie_updates( - BlockWithParent::new(genesis_block.hash, genesis_block), - BlockStateDiff::default(), - ) - .expect("store trie update"); - provider_rw.commit().expect("commit"); + rw.store_trie_updates( + BlockWithParent::new(genesis_block.hash, genesis_block), + BlockStateDiff::default(), + ) + .expect("store trie update"); + rw.commit().expect("commit"); } // Initialize exex with config fn build_test_exex( ctx: ExExContext, - storage: OpProofsStorage, + storage: Store, ) -> OpProofsExEx where NodeT: FullNodeComponents, @@ -743,7 +701,6 @@ mod tests { { OpProofsExEx::builder(ctx, storage) .with_proofs_history_window(20) - .with_proofs_history_prune_interval(Duration::from_secs(3600)) .with_verification_interval(1000) .build() } @@ -752,20 +709,27 @@ mod tests { async fn handle_notification_chain_committed() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); - init_storage(proofs.clone()); + init_storage(store.clone()); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); + let pruner = OpProofStoragePruner::new( + store.clone(), + ctx.components.provider.clone(), + 20, + ); let collector = LiveTrieCollector::new( ctx.components.components.evm_config.clone(), ctx.components.provider.clone(), - &proofs, - ); - let exex = build_test_exex(ctx, proofs.clone()); + store.clone(), + pruner, + ) + .with_persistence_threshold(1) + .with_backpressure_threshold(2); + let exex = build_test_exex(ctx, store.clone()); // Notification: chain committed 1..5 let new_chain = Arc::new(mk_chain_with_updates(1, 1, None)); @@ -775,7 +739,8 @@ mod tests { exex.handle_notification(notif, &collector, &sync_target_tx).expect("handle chain commit"); - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 1); } @@ -783,21 +748,28 @@ mod tests { async fn handle_notification_chain_committed_skips_already_processed() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); - init_storage(proofs.clone()); + init_storage(store.clone()); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); + let pruner = OpProofStoragePruner::new( + store.clone(), + ctx.components.provider.clone(), + 20, + ); let collector = LiveTrieCollector::new( ctx.components.components.evm_config.clone(), ctx.components.provider.clone(), - &proofs, - ); + store.clone(), + pruner + ) + .with_persistence_threshold(1) + .with_backpressure_threshold(2); - let exex = build_test_exex(ctx, proofs.clone()); + let exex = build_test_exex(ctx, store.clone()); let (sync_target_tx, _) = tokio::sync::watch::channel(0u64); // Process blocks 1..5 sequentially to trigger real-time path (synchronous) @@ -808,14 +780,16 @@ mod tests { .expect("handle chain commit"); } - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 5); // Try to handle already processed notification let new_chain = Arc::new(mk_chain_with_updates(5, 5, Some(hash_for_num(10)))); let notif = ExExNotification::ChainCommitted { new: new_chain }; exex.handle_notification(notif, &collector, &sync_target_tx).expect("handle chain commit"); - let latest = get_latest(&proofs).expect("ok"); + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok"); assert_eq!(latest.0, 5); assert_eq!(latest.1, hash_for_num(5)); // block was not updated } @@ -824,21 +798,28 @@ mod tests { async fn handle_notification_chain_reorged() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); - init_storage(proofs.clone()); + init_storage(store.clone()); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); + let pruner = OpProofStoragePruner::new( + store.clone(), + ctx.components.provider.clone(), + 20, + ); let collector = LiveTrieCollector::new( ctx.components.components.evm_config.clone(), ctx.components.provider.clone(), - &proofs, - ); + store.clone(), + pruner, + ) + .with_persistence_threshold(1) + .with_backpressure_threshold(2); - let exex = build_test_exex(ctx, proofs.clone()); + let exex = build_test_exex(ctx, store.clone()); let (sync_target_tx, _) = tokio::sync::watch::channel(0u64); @@ -849,7 +830,8 @@ mod tests { .expect("handle chain commit"); } - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 10); // Now the tip is 10, and we want to reorg from block 6..12 @@ -861,7 +843,8 @@ mod tests { exex.handle_notification(notif, &collector, &sync_target_tx) .expect("handle chain re-orged"); - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 12); } @@ -869,21 +852,28 @@ mod tests { async fn handle_notification_chain_reorged_skips_beyond_stored_blocks() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); - init_storage(proofs.clone()); + init_storage(store.clone()); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); + let pruner = OpProofStoragePruner::new( + store.clone(), + ctx.components.provider.clone(), + 20, + ); let collector = LiveTrieCollector::new( ctx.components.components.evm_config.clone(), ctx.components.provider.clone(), - &proofs, - ); + store.clone(), + pruner, + ) + .with_persistence_threshold(1) + .with_backpressure_threshold(2); - let exex = build_test_exex(ctx, proofs.clone()); + let exex = build_test_exex(ctx, store.clone()); let (sync_target_tx, _) = tokio::sync::watch::channel(0u64); @@ -895,7 +885,8 @@ mod tests { .expect("handle chain commit"); } - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 10); // Now the tip is 10, and we want to reorg from block 12..15 @@ -907,7 +898,8 @@ mod tests { exex.handle_notification(notif, &collector, &sync_target_tx) .expect("handle chain re-orged"); - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 10); } @@ -915,21 +907,28 @@ mod tests { async fn handle_notification_chain_reverted() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); - init_storage(proofs.clone()); + init_storage(store.clone()); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); + let pruner = OpProofStoragePruner::new( + store.clone(), + ctx.components.provider.clone(), + 20, + ); let collector = LiveTrieCollector::new( ctx.components.components.evm_config.clone(), ctx.components.provider.clone(), - &proofs, - ); + store.clone(), + pruner, + ) + .with_persistence_threshold(1) + .with_backpressure_threshold(2); - let exex = build_test_exex(ctx, proofs.clone()); + let exex = build_test_exex(ctx, store.clone()); let (sync_target_tx, _) = tokio::sync::watch::channel(0u64); @@ -941,7 +940,8 @@ mod tests { .expect("handle chain commit"); } - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 10); // Now the tip is 10, and we want to revert from block 9..10 @@ -952,7 +952,8 @@ mod tests { exex.handle_notification(notif, &collector, &sync_target_tx) .expect("handle chain reverted"); - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 8); } @@ -960,21 +961,28 @@ mod tests { async fn handle_notification_chain_reverted_skips_beyond_stored_blocks() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); - init_storage(proofs.clone()); + init_storage(store.clone()); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); + let pruner = OpProofStoragePruner::new( + store.clone(), + ctx.components.provider.clone(), + 20, + ); let collector = LiveTrieCollector::new( ctx.components.components.evm_config.clone(), ctx.components.provider.clone(), - &proofs, - ); + store.clone(), + pruner, + ) + .with_persistence_threshold(1) + .with_backpressure_threshold(2); - let exex = build_test_exex(ctx, proofs.clone()); + let exex = build_test_exex(ctx, store.clone()); let (sync_target_tx, _) = tokio::sync::watch::channel(0u64); @@ -986,7 +994,8 @@ mod tests { .expect("handle chain commit"); } - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 5); // Now the tip is 10, and we want to revert from block 9..10 @@ -997,7 +1006,8 @@ mod tests { exex.handle_notification(notif, &collector, &sync_target_tx) .expect("handle chain reverted"); - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get latest block").expect("ok").0; assert_eq!(latest, 5); } @@ -1005,13 +1015,12 @@ mod tests { async fn ensure_initialized_errors_on_storage_not_initialized() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); - let exex = build_test_exex(ctx, proofs.clone()); + let exex = build_test_exex(ctx, store.clone()); let _ = exex.ensure_initialized().expect_err("should return error"); } @@ -1019,14 +1028,13 @@ mod tests { async fn ensure_initialized_errors_when_prune_exceeds_threshold() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); - init_storage(proofs.clone()); + init_storage(store.clone()); for i in 1..1100 { - let p = proofs.provider_rw().expect("provider_rw"); - p.store_trie_updates( + let rw = store.provider_rw().expect("provider rw"); + rw.store_trie_updates( BlockWithParent::new( hash_for_num(i - 1), BlockNumHash::new(i, hash_for_num(i)), @@ -1034,13 +1042,13 @@ mod tests { BlockStateDiff::default(), ) .expect("store trie update"); - p.commit().expect("commit"); + rw.commit().expect("commit"); } let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); - let exex = build_test_exex(ctx, proofs.clone()); + let exex = build_test_exex(ctx, store.clone()); let _ = exex.ensure_initialized().expect_err("should return error"); } @@ -1048,15 +1056,14 @@ mod tests { async fn ensure_initialized_succeeds() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); - init_storage(proofs.clone()); + init_storage(store.clone()); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); - let exex = build_test_exex(ctx, proofs.clone()); + let exex = build_test_exex(ctx, store.clone()); exex.ensure_initialized().expect("should not return error"); } @@ -1064,19 +1071,26 @@ mod tests { async fn handle_notification_errors_on_empty_storage() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); + let pruner = OpProofStoragePruner::new( + store.clone(), + ctx.components.provider.clone(), + 20, + ); let collector = LiveTrieCollector::new( ctx.components.components.evm_config.clone(), ctx.components.provider.clone(), - &proofs, - ); + store.clone(), + pruner, + ) + .with_persistence_threshold(1) + .with_backpressure_threshold(2); - let exex = build_test_exex(ctx, proofs.clone()); + let exex = build_test_exex(ctx, store.clone()); // Any notification will do let new_chain = Arc::new(mk_chain_with_updates(1, 5, None)); @@ -1091,20 +1105,28 @@ mod tests { async fn handle_notification_schedules_async_on_gap() { // MDBX proofs storage let dir = tempdir_path(); - let store = Arc::new(MdbxProofsStorage::new(dir.as_path()).expect("env")); - let proofs: OpProofsStorage> = store.clone().into(); + let store = Arc::new(MdbxProofsStorageV2::new(dir.as_path()).expect("env")); - init_storage(proofs.clone()); + init_storage(store.clone()); let (ctx, _handle) = reth_exex_test_utils::test_exex_context().await.expect("exex test context"); + let pruner = OpProofStoragePruner::new( + store.clone(), + ctx.components.provider.clone(), + 20, + ); let collector = LiveTrieCollector::new( ctx.components.components.evm_config.clone(), ctx.components.provider.clone(), - &proofs, - ); - let exex = build_test_exex(ctx, proofs.clone()); + store.clone(), + pruner, + ) + .with_persistence_threshold(1) + .with_backpressure_threshold(2); + + let exex = build_test_exex(ctx, store.clone()); // Notification: chain committed 5..10 (Blocks 1,2,3,4 are missing from storage) let new_chain = Arc::new(mk_chain_with_updates(5, 10, None)); @@ -1128,7 +1150,8 @@ mod tests { // Because we didn't spawn the actual worker thread in this test, storage should still be at // 0. This proves the 'handle_notification' returned instantly without doing the // heavy lifting. - let latest = get_latest(&proofs).expect("ok").0; + collector.wait_for_persistence(); + let latest = store.provider_ro().expect("provider ro").get_latest_block_number().expect("get").expect("ok").0; assert_eq!(latest, 0, "Main thread should not have processed the blocks synchronously"); } -} +} \ No newline at end of file diff --git a/rust/op-reth/crates/node/src/proof_history.rs b/rust/op-reth/crates/node/src/proof_history.rs index 2f1461e9a46d4..5ec017ecdee73 100644 --- a/rust/op-reth/crates/node/src/proof_history.rs +++ b/rust/op-reth/crates/node/src/proof_history.rs @@ -28,7 +28,6 @@ pub async fn launch_node_with_proof_history( let RollupArgs { proofs_history, proofs_history_window, - proofs_history_prune_interval, proofs_history_verification_interval, proofs_history_storage_version, .. @@ -64,7 +63,6 @@ pub async fn launch_node_with_proof_history( .install_exex("proofs-history", async move |exex_context| { Ok(OpProofsExEx::builder(exex_context, storage_exec) .with_proofs_history_window(proofs_history_window) - .with_proofs_history_prune_interval(proofs_history_prune_interval) .with_verification_interval(proofs_history_verification_interval) .build() .run() diff --git a/rust/op-reth/crates/trie/Cargo.toml b/rust/op-reth/crates/trie/Cargo.toml index 402fe690d49b2..3007f8cbfc713 100644 --- a/rust/op-reth/crates/trie/Cargo.toml +++ b/rust/op-reth/crates/trie/Cargo.toml @@ -52,6 +52,7 @@ auto_impl.workspace = true eyre = { workspace = true, optional = true } strum.workspace = true tracing.workspace = true +crossbeam-channel.workspace = true derive_more.workspace = true [dev-dependencies] diff --git a/rust/op-reth/crates/trie/src/error.rs b/rust/op-reth/crates/trie/src/error.rs index 1bf07a73b8b63..6d7180e736abe 100644 --- a/rust/op-reth/crates/trie/src/error.rs +++ b/rust/op-reth/crates/trie/src/error.rs @@ -105,6 +105,9 @@ pub enum OpProofsStorageError { Please clear proofs data and retry initialization." )] InitializeStorageInconsistentState, + /// Other error with a message. + #[error("{0}")] + Other(String), } impl From for OpProofsStorageError { diff --git a/rust/op-reth/crates/trie/src/lib.rs b/rust/op-reth/crates/trie/src/lib.rs index 5573253527f51..561ef6c9f675b 100644 --- a/rust/op-reth/crates/trie/src/lib.rs +++ b/rust/op-reth/crates/trie/src/lib.rs @@ -47,6 +47,7 @@ pub type OpProofsStorage = S; pub mod proof; pub mod provider; +pub mod overlay_provider; pub mod live; @@ -65,3 +66,6 @@ pub use prune::{ OpProofStoragePruner, OpProofStoragePrunerResult, OpProofStoragePrunerTask, PrunerError, PrunerOutput, }; + +pub mod state; +pub mod persistence; diff --git a/rust/op-reth/crates/trie/src/live.rs b/rust/op-reth/crates/trie/src/live.rs index 04701654c3bb0..1f29050042166 100644 --- a/rust/op-reth/crates/trie/src/live.rs +++ b/rust/op-reth/crates/trie/src/live.rs @@ -1,95 +1,147 @@ //! Live trie collector for external proofs storage. use crate::{ - BlockStateDiff, OpProofsStorage, OpProofsStorageError, OpProofsStore, - api::{OperationDurations, OpProofsProviderRO, OpProofsProviderRw}, - provider::OpProofsStateProviderRef, + provider::OpProofsStateProviderRef, state::LiveTrieState, BlockStateDiff, + OpProofsStorageError, OpProofsStore, OpProofsProviderRO, persistence::{LiveTriePersistenceHandle, PersistenceStatus}, + OpProofStoragePruner, }; -use alloy_eips::{BlockNumHash, NumHash, eip1898::BlockWithParent}; -use derive_more::Constructor; -use reth_evm::{ConfigureEvm, execute::Executor}; +#[cfg(feature = "metrics")] +use crate::metrics::LiveMetrics; +use alloy_eips::{eip1898::BlockWithParent, NumHash}; +use crossbeam_channel::{bounded, RecvTimeoutError}; +use reth_evm::{execute::Executor, ConfigureEvm}; use reth_primitives_traits::{AlloyBlockHeader, BlockTy, RecoveredBlock}; use reth_provider::{ - DatabaseProviderFactory, HashedPostStateProvider, StateProviderFactory, StateReader, - StateRootProvider, + BlockHashReader, DatabaseProviderFactory, HashedPostStateProvider, StateProviderFactory, StateReader, StateRootProvider }; use reth_revm::database::StateProviderDatabase; -use reth_trie_common::{HashedPostStateSorted, updates::TrieUpdatesSorted}; -use std::{sync::Arc, time::Instant}; -use tracing::info; +use reth_trie_common::{updates::TrieUpdatesSorted, HashedPostStateSorted}; +use std::{sync::Arc, time::{Duration, Instant}}; +use tracing::{error, info}; + +/// Default number of blocks to keep in memory before persisting. +pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 5; + +/// Default number of blocks where we block execution to allow persistence to catch up. +pub const DEFAULT_BACKPRESSURE_THRESHOLD: u64 = 10; + +/// Default timeout for waiting on a persistence save operation (in seconds). +pub const DEFAULT_PERSISTENCE_TIMEOUT_SECS: u64 = 60; /// Live trie collector for external proofs storage. -#[derive(Debug, Constructor)] -pub struct LiveTrieCollector<'tx, Evm, Provider, PreimageStore> +#[derive(Debug)] +pub struct LiveTrieCollector where Evm: ConfigureEvm, Provider: StateReader + DatabaseProviderFactory + StateProviderFactory, { evm_config: Evm, provider: Provider, - storage: &'tx OpProofsStorage, + storage: Store, + memory: LiveTrieState, + + /// Number of blocks to keep in memory before persisting. + persistence_threshold: u64, + /// Number of blocks to keep in memory limit (backpressure). + backpressure_threshold: u64, + persistence_handle: LiveTriePersistenceHandle, + /// Tracks if a background persistence task is currently running. + persistence_status: Arc, + + #[cfg(feature = "metrics")] + metrics: LiveMetrics, } -impl<'tx, Evm, Provider, Store> LiveTrieCollector<'tx, Evm, Provider, Store> +impl LiveTrieCollector where Evm: ConfigureEvm, - Provider: StateReader + DatabaseProviderFactory + StateProviderFactory, - Store: 'tx + OpProofsStore + Clone + 'static, + Provider: BlockHashReader + StateReader + DatabaseProviderFactory + StateProviderFactory + Clone + 'static, + Store: OpProofsStore + Clone + 'static, { - /// Execute a block and store the updates in the storage. + /// Create a new live trie collector. + pub fn new( + evm_config: Evm, + provider: Provider, + storage: Store, + pruner: OpProofStoragePruner, + ) -> Self { + let persistence_handle = LiveTriePersistenceHandle::spawn(pruner, storage.clone()); + Self { + evm_config, + provider, + storage, + memory: LiveTrieState::new(), + + persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD, + backpressure_threshold: DEFAULT_BACKPRESSURE_THRESHOLD, + persistence_handle, + persistence_status: Arc::new(PersistenceStatus::new()), + + #[cfg(feature = "metrics")] + metrics: LiveMetrics::new_with_labels(&[] as &[(&str, &str)]), + } + } + + /// Set the persistence threshold (number of blocks to keep in memory before persisting). + pub fn with_persistence_threshold(mut self, threshold: u64) -> Self { + self.persistence_threshold = threshold; + self + } + + /// Set the backpressure threshold (number of blocks before execution blocks). + pub fn with_backpressure_threshold(mut self, threshold: u64) -> Self { + self.backpressure_threshold = threshold; + self + } + + /// Execute a block and store the updates in the in-memory buffer. pub fn execute_and_store_block_updates( &self, block: &RecoveredBlock>, ) -> Result<(), OpProofsStorageError> { - let mut operation_durations = OperationDurations::default(); - let start = Instant::now(); - // ensure that we have the state of the parent block - let provider_ro = self.storage.provider_ro()?; - let (Some((earliest, _)), Some((latest, _))) = - (provider_ro.get_earliest_block_number()?, provider_ro.get_latest_block_number()?) - else { - return Err(OpProofsStorageError::NoBlocksFound); - }; + // Check if we have the parent state + let tip = self.get_tip()?; let parent_block_number = block.number() - 1; - if parent_block_number < earliest { - return Err(OpProofsStorageError::UnknownParent); - } - if parent_block_number > latest { - return Err(OpProofsStorageError::MissingParentBlock { + if block.parent_hash() != tip.hash { + return Err(OpProofsStorageError::OutOfOrder { block_number: block.number(), - parent_block_number, - latest_block_number: latest, + parent_block_hash: block.parent_hash(), + latest_block_hash: tip.hash, }); } let block_ref = BlockWithParent::new(block.parent_hash(), NumHash::new(block.number(), block.hash())); - // TODO: should we check block hash here? - - let state_provider = OpProofsStateProviderRef::new( + let inner_provider = OpProofsStateProviderRef::new( self.provider.state_by_block_hash(block.parent_hash())?, self.storage.provider_ro()?, parent_block_number, ); + // 2. Wrap it with memory overlay using LiveTrieState + // This gathers all buffered blocks required to build the state on top of disk + let state_provider = self.memory.state_provider(block.parent_hash(), inner_provider); + + // 3. Execute block let db = StateProviderDatabase::new(&state_provider); let block_executor = self.evm_config.batch_executor(db); let execution_result = block_executor.execute(&(*block).clone())?; - operation_durations.execution_duration_seconds = start.elapsed(); + let execution_duration = start.elapsed(); + // 4. Calculate state root let hashed_state = state_provider.hashed_post_state(&execution_result.state); let (state_root, trie_updates) = state_provider.state_root_with_updates(hashed_state.clone())?; - operation_durations.state_root_duration_seconds = - start.elapsed() - operation_durations.execution_duration_seconds; + let state_root_duration = start.elapsed() - execution_duration; + // 5. Verify root if state_root != block.state_root() { return Err(OpProofsStorageError::StateRootMismatch { block_number: block.number(), @@ -98,35 +150,35 @@ where }); } - let provider_rw = self.storage.provider_rw()?; - let update_result = provider_rw.store_trie_updates( + // 6. Store Diff to Memory + self.memory.insert( block_ref, BlockStateDiff { sorted_trie_updates: trie_updates.into_sorted(), sorted_post_state: hashed_state.into_sorted(), }, - )?; - provider_rw.commit()?; + ); - operation_durations.total_duration_seconds = start.elapsed(); - operation_durations.write_duration_seconds = operation_durations.total_duration_seconds - - operation_durations.state_root_duration_seconds - - operation_durations.execution_duration_seconds; + let total_duration = start.elapsed(); #[cfg(feature = "metrics")] { - let block_metrics = self.storage.metrics().block_metrics(); - block_metrics.record_operation_durations(&operation_durations); - block_metrics.increment_write_counts(&update_result); + self.metrics.total_duration_seconds.record(total_duration); + self.metrics.execution_duration_seconds.record(execution_duration); + self.metrics.state_root_duration_seconds.record(state_root_duration); } info!( block_number = block.number(), - ?operation_durations, - ?update_result, - "Block executed and trie updates stored successfully", + ?total_duration, + ?execution_duration, + ?state_root_duration, + "Block executed and trie updates buffered successfully", ); + // Trigger persistence + self.advance_persistence()?; + Ok(()) } @@ -138,31 +190,37 @@ where sorted_post_state: HashedPostStateSorted, ) -> Result<(), OpProofsStorageError> { let start = Instant::now(); - let mut operation_durations = OperationDurations::default(); - let provider_rw = self.storage.provider_rw()?; - let storage_result = - provider_rw.store_trie_updates(block, BlockStateDiff { sorted_trie_updates, sorted_post_state })?; - provider_rw.commit()?; + // Check if we have the parent state + let tip = self.get_tip()?; + + if block.parent != tip.hash { + return Err(OpProofsStorageError::OutOfOrder { + block_number: block.block.number, + parent_block_hash: block.parent, + latest_block_hash: tip.hash, + }); + } - let write_duration = start.elapsed(); - operation_durations.total_duration_seconds = write_duration; - operation_durations.write_duration_seconds = write_duration; + self.memory.insert( + block, + BlockStateDiff { sorted_trie_updates, sorted_post_state }, + ); + + let total_duration = start.elapsed(); #[cfg(feature = "metrics")] - { - let block_metrics = self.storage.metrics().block_metrics(); - block_metrics.record_operation_durations(&operation_durations); - block_metrics.increment_write_counts(&storage_result); - } + self.metrics.total_duration_seconds.record(total_duration); info!( block_number = block.block.number, - ?operation_durations, - ?storage_result, - "Trie updates stored successfully", + ?total_duration, + "Trie updates buffered successfully", ); + // Trigger persistence check + self.advance_persistence()?; + Ok(()) } @@ -185,50 +243,266 @@ where } let start = Instant::now(); - let mut operation_durations = OperationDurations::default(); let first = &block_updates[0].0; - let latest_common_block = - BlockNumHash::new(first.block.number.saturating_sub(1), first.parent); - let mut block_trie_updates: Vec<(BlockWithParent, BlockStateDiff)> = - Vec::with_capacity(block_updates.len()); + // The common ancestor is one block before the first diverging block. + let common_ancestor_number = first.block.number.saturating_sub(1); + + info!( + target: "live-trie", + reorg_depth = block_updates.len(), + common_ancestor = common_ancestor_number, + "Handling reorg: unwinding and buffering new path" + ); + let unwind_start = Instant::now(); + // 1. Unwind Persistence (Disk) + // `unwind_history` on the store removes starting from `to.block.number` (inclusive), + // so we pass `first` (the first diverging block) to preserve the common ancestor. + self.unwind_persistence(*first)?; + + // 2. Unwind Memory + // Remove `first` and everything after it, mirroring `unwind_persistence(*first)`. + self.memory.unwind(first.block.number); + let unwind_duration = unwind_start.elapsed(); + + // 3. Store new blocks in In-Memory Buffer + // Just insert them. They become the new tip. for (block, trie_updates, hashed_state) in &block_updates { - block_trie_updates.push(( + self.memory.insert( *block, BlockStateDiff { sorted_trie_updates: (**trie_updates).clone(), sorted_post_state: (**hashed_state).clone(), }, - )); + ); } - let provider_rw = self.storage.provider_rw()?; - provider_rw.replace_updates(latest_common_block, block_trie_updates)?; - provider_rw.commit()?; - let write_duration = start.elapsed(); - operation_durations.total_duration_seconds = write_duration; - operation_durations.write_duration_seconds = write_duration; + let total_duration = start.elapsed(); #[cfg(feature = "metrics")] { - let block_metrics = self.storage.metrics().block_metrics(); - block_metrics.record_operation_durations(&operation_durations); + self.metrics.total_duration_seconds.record(total_duration); + self.metrics.unwind_duration_seconds.record(unwind_duration); } info!( start_block_number = block_updates.first().map(|(b, _, _)| b.block.number), end_block_number = block_updates.last().map(|(b, _, _)| b.block.number), - ?operation_durations, - "Trie updates rewound and stored successfully", + ?total_duration, + ?unwind_duration, + "Trie updates rewound and buffered successfully", ); + + // Check if we need to flush (this might happen if the reorg introduced many blocks) + self.advance_persistence()?; + Ok(()) } - /// Remove account, storage and trie updates from historical storage for all blocks from - /// the specified block (inclusive). + /// Remove account, storage and trie updates from history starting from `to` (inclusive). + /// + /// After this call, state up to `to.block.number - 1` is preserved; `to` itself and + /// all later blocks are removed from both disk and in-memory buffer. pub fn unwind_history(&self, to: BlockWithParent) -> Result<(), OpProofsStorageError> { - let provider_rw = self.storage.provider_rw()?; - provider_rw.unwind_history(to)?; - provider_rw.commit() + info!(target: "live-trie", to_block = to.block.number, "Unwinding history"); + + // 1. Unwind Persistence (Disk) + // `unwind_history` on the store removes `to.block.number..=latest` and sets + // latest to `to.block.number - 1`, so `to` itself is removed. + self.unwind_persistence(to)?; + + // 2. Unwind Memory + // Mirror disk: remove `to` and everything after it. + self.memory.unwind(to.block.number); + + Ok(()) } -} + + /// Returns the (number, hash) of the true tip of the collector. + fn get_tip(&self) -> Result { + let memory_inner = self.memory.inner(); + let numbers = memory_inner.numbers.read(); + + // Check memory first + if let Some((&highest_num, &highest_hash)) = numbers.iter().next_back() { + return Ok(NumHash::new(highest_num, highest_hash)); + } + + // Fallback to storage + self.storage + .provider_ro()? + .get_latest_block_number()? + .map(|(n, h)| NumHash::new(n, h)) + .ok_or(OpProofsStorageError::NoBlocksFound) + } + + /// Returns the block number of the true tip of the collector. + /// + /// This resolves to the highest block in the memory buffer, or falls back to + /// the storage tip if the buffer is empty. + pub fn get_tip_block_number(&self) -> Result { + let memory_inner = self.memory.inner(); + let numbers = memory_inner.numbers.read(); + + // Check memory first + if let Some(&highest) = numbers.keys().next_back() { + return Ok(highest); + } + + // Fallback to storage + self.storage + .provider_ro()? + .get_latest_block_number()? + .map(|(n, _)| n) + .ok_or_else(|| OpProofsStorageError::NoBlocksFound) + } + + /// Blocks the current thread until any in-progress background persistence completes. + pub fn wait_for_persistence(&self) { + self.persistence_status.wait_until_idle(); + } + + /// Checks the persistence threshold and triggers persistence if necessary. + /// + /// - If buffer >= backpressure: Blocks current thread until persistence frees up space. + /// - If buffer >= persistence: Triggers background persistence if not already running. + pub fn advance_persistence(&self) -> Result<(), OpProofsStorageError> { + let current_size = { + self.memory.inner().numbers.read().len() as u64 + }; + + // 1. Backpressure Check (Blocking) + // If we are over the limit, we MUST wait for the background task to clear some space. + if current_size >= self.backpressure_threshold { + if self.persistence_status.is_running() { + info!( + target: "live-trie", + current_size, + threshold = self.backpressure_threshold, + "Backpressure triggered: Blocking execution until persistence completes" + ); + + self.persistence_status.wait_until_idle(); + + info!(target: "live-trie", "Backpressure released: Persistence task completed"); + } + } + + // 2. Persistence Trigger Check (Async) + // We re-check the size because if we waited, the memory was pruned. + let current_size = { + self.memory.inner().numbers.read().len() as u64 + }; + + if current_size >= self.persistence_threshold { + if self.persistence_status.mark_running() { + // Snapshot blocks to persist + let blocks_to_persist = self.get_blocks_to_persist(); + + if blocks_to_persist.is_empty() { + self.persistence_status.mark_idle(); + return Ok(()); + } + + info!( + target: "live-trie", + current_size, + count = blocks_to_persist.len(), + start_block = blocks_to_persist.first().map(|arc| arc.0.block.number), + end_block = blocks_to_persist.last().map(|arc| arc.0.block.number), + threshold = self.persistence_threshold, + "Persistence threshold reached: Spawning background persistence task" + ); + + // Clone data for the background thread + let persistence_handle = self.persistence_handle.clone(); + let persistence_status = self.persistence_status.clone(); + let memory = self.memory.clone(); + + std::thread::spawn(move || { + let result = Self::persist_blocks_background(persistence_handle, blocks_to_persist); + + match result { + Ok(Some(last_persisted)) => { + info!( + target: "live-trie", + block_number = last_persisted, + "Background persistence successful, pruning memory" + ); + memory.prune_before(last_persisted + 1); + } + Ok(None) => {} + Err(e) => { + error!(target: "live-trie", ?e, "Background persistence failed"); + } + } + + // Notify completion + persistence_status.mark_idle(); + }); + } + } + + Ok(()) + } + + /// Helper to perform persistence interaction in a background thread. + fn persist_blocks_background( + handle: LiveTriePersistenceHandle, + blocks: Vec>, + ) -> Result, OpProofsStorageError> { + let (tx, rx) = bounded(1); + handle.save_updates(blocks, tx)?; + + match rx.recv_timeout(Duration::from_secs(DEFAULT_PERSISTENCE_TIMEOUT_SECS)) { + Ok(res) => Ok(res), + Err(RecvTimeoutError::Timeout) => { + Err(OpProofsStorageError::Other("Persistence timeout".into())) + } + Err(RecvTimeoutError::Disconnected) => { + Err(OpProofsStorageError::Other("Persistence service disconnected".into())) + } + } + } + + /// Returns all buffered blocks to persist, ordered from Oldest to Newest. + /// + /// Returns `Arc`s to avoid deep-cloning `BlockStateDiff` on the caller thread. + /// The persistence thread will unwrap or clone as needed. + fn get_blocks_to_persist(&self) -> Vec> { + let memory_inner = self.memory.inner(); + let numbers = memory_inner.numbers.read(); + let blocks = memory_inner.blocks.read(); + + let mut blocks_to_persist = Vec::with_capacity(numbers.len()); + + // BTreeMap is sorted by keys (block numbers), ensuring implicit Oldest -> Newest order. + for hash in numbers.values() { + if let Some(state) = blocks.get(hash) { + blocks_to_persist.push(Arc::clone(state)); + } + } + blocks_to_persist + } + + /// Helper to send unwind command to persistence service and wait for completion. + fn unwind_persistence(&self, to: BlockWithParent) -> Result<(), OpProofsStorageError> { + // Wait for any ongoing persistence to finish to avoid race conditions + if self.persistence_status.is_running() { + info!(target: "live-trie", "Unwind waiting for background persistence..."); + self.persistence_status.wait_until_idle(); + } + + let (tx, rx) = bounded(1); + self.persistence_handle.unwind(to, tx)?; + + match rx.recv_timeout(Duration::from_secs(DEFAULT_PERSISTENCE_TIMEOUT_SECS)) { + Ok(Ok(())) => Ok(()), + Ok(Err(reason)) => Err(OpProofsStorageError::Other( + format!("Unwind failed in persistence service: {reason}") + )), + Err(RecvTimeoutError::Timeout) => Err(OpProofsStorageError::Other("Unwind timeout".into())), + Err(RecvTimeoutError::Disconnected) => Err(OpProofsStorageError::Other("Persistence service disconnected".into())), + } + } +} \ No newline at end of file diff --git a/rust/op-reth/crates/trie/src/metrics.rs b/rust/op-reth/crates/trie/src/metrics.rs index cdec4bae42cb3..f9948fe596ad1 100644 --- a/rust/op-reth/crates/trie/src/metrics.rs +++ b/rust/op-reth/crates/trie/src/metrics.rs @@ -1,15 +1,14 @@ //! Storage wrapper that records metrics for all operations. use crate::{ - BlockStateDiff, OpProofsStorageResult, OpProofsStore, api::{ - InitialStateAnchor, OperationDurations, OpProofsInitProvider, OpProofsProviderRO, - OpProofsProviderRw, WriteCounts, + InitialStateAnchor, OpProofsInitProvider, OpProofsProviderRO, OpProofsProviderRw, + WriteCounts, }, - cursor, + cursor, BlockStateDiff, OpProofsStorageResult, OpProofsStore, }; -use alloy_eips::{BlockNumHash, eip1898::BlockWithParent}; -use alloy_primitives::{B256, U256, map::HashMap}; +use alloy_eips::{eip1898::BlockWithParent, BlockNumHash}; +use alloy_primitives::{map::HashMap, B256, U256}; use derive_more::Constructor; use metrics::{Counter, Gauge, Histogram}; use reth_db::DatabaseError; @@ -29,7 +28,7 @@ use std::{ use strum::{EnumCount, EnumIter, IntoEnumIterator}; /// Alias for [`OpProofsStorageWithMetrics`]. -pub type OpProofsStorage = OpProofsStorageWithMetrics; +pub type OpProofsStorage = OpProofsStoreWithMetrics; /// Alias for [`TrieCursor`](cursor::OpProofsTrieCursor) with metrics layer. pub type OpProofsTrieCursor = cursor::OpProofsTrieCursor>; @@ -87,13 +86,23 @@ impl StorageOperation { } } +/// Metrics tracking the range of blocks available for proof generation. +#[derive(Metrics, Clone)] +#[metrics(scope = "optimism_trie.proof_window")] +pub struct ProofWindowMetrics { + /// Earliest block number available in the proof window. + pub earliest: Gauge, + /// Latest block number available in the proof window. + pub latest: Gauge, +} + /// Metrics for storage operations. #[derive(Debug)] pub struct StorageMetrics { /// Cache of operation metrics handles, keyed by (operation, context) operations: HashMap, - /// Block-level metrics - block_metrics: BlockMetrics, + /// Proof window metrics + pub proof_window: ProofWindowMetrics, } impl StorageMetrics { @@ -101,7 +110,7 @@ impl StorageMetrics { pub fn new() -> Self { Self { operations: Self::generate_operation_handles(), - block_metrics: BlockMetrics::new_with_labels(&[] as &[(&str, &str)]), + proof_window: ProofWindowMetrics::new_with_labels(&[] as &[(&str, &str)]), } } @@ -120,7 +129,11 @@ impl StorageMetrics { /// Record a storage operation with timing. pub fn record_operation(&self, operation: StorageOperation, f: impl FnOnce() -> R) -> R { - if let Some(metrics) = self.operations.get(&operation) { metrics.record(f) } else { f() } + if let Some(metrics) = self.operations.get(&operation) { + metrics.record(f) + } else { + f() + } } /// Record a storage operation with timing (async version). @@ -139,11 +152,6 @@ impl StorageMetrics { result } - /// Get block metrics for recording high-level timing. - pub const fn block_metrics(&self) -> &BlockMetrics { - &self.block_metrics - } - /// Record a pre-measured duration for an operation. pub fn record_duration(&self, operation: StorageOperation, duration: Duration) { if let Some(metrics) = self.operations.get(&operation) { @@ -201,18 +209,32 @@ impl OperationMetrics { } } -/// High-level block processing metrics. +/// High-level live block processing metrics. #[derive(Metrics, Clone)] -#[metrics(scope = "optimism_trie.block")] -pub struct BlockMetrics { +#[metrics(scope = "optimism_trie.live")] +pub struct LiveMetrics { /// Total time to process a block (end-to-end) in seconds pub total_duration_seconds: Histogram, /// Time spent executing the block (EVM) in seconds pub execution_duration_seconds: Histogram, /// Time spent calculating state root in seconds pub state_root_duration_seconds: Histogram, + /// Time spent unwinding persistence and memory during a reorg in seconds + pub unwind_duration_seconds: Histogram, +} + +/// Metrics tracking items written to persistent storage. +#[derive(Metrics, Clone)] +#[metrics(scope = "optimism_trie.persistence")] +pub struct PersistenceMetrics { + /// Time spent opening a read-write transaction in seconds + pub open_tx_duration_seconds: Histogram, /// Time spent writing trie updates to storage in seconds pub write_duration_seconds: Histogram, + /// Time spent pruning old state in seconds + pub prune_duration_seconds: Histogram, + /// Time spent committing the transaction in seconds + pub commit_duration_seconds: Histogram, /// Number of trie updates written pub account_trie_updates_written_total: Counter, /// Number of storage trie updates written @@ -221,21 +243,9 @@ pub struct BlockMetrics { pub hashed_accounts_written_total: Counter, /// Number of hashed storages written pub hashed_storages_written_total: Counter, - /// Earliest block number that the proofs storage has stored. - pub earliest_number: Gauge, - /// Latest block number that the proofs storage has stored. - pub latest_number: Gauge, } -impl BlockMetrics { - /// Record operation durations for the processing of a block. - pub fn record_operation_durations(&self, durations: &OperationDurations) { - self.total_duration_seconds.record(durations.total_duration_seconds); - self.execution_duration_seconds.record(durations.execution_duration_seconds); - self.state_root_duration_seconds.record(durations.state_root_duration_seconds); - self.write_duration_seconds.record(durations.write_duration_seconds); - } - +impl PersistenceMetrics { /// Increment write counts of historical trie updates for a single block. pub fn increment_write_counts(&self, counts: &WriteCounts) { self.account_trie_updates_written_total @@ -336,12 +346,12 @@ impl HashedStorageCursor for OpProofsHashedCursorWithMet /// Wrapper around [`OpProofsStore`] type that records metrics for all operations. #[derive(Debug, Clone)] -pub struct OpProofsStorageWithMetrics { +pub struct OpProofsStoreWithMetrics { storage: S, metrics: Arc, } -impl OpProofsStorageWithMetrics { +impl OpProofsStoreWithMetrics { /// Initializes new [`StorageMetrics`] and wraps given storage instance. pub fn new(storage: S) -> Self { Self { storage, metrics: Arc::new(StorageMetrics::default()) } @@ -358,7 +368,7 @@ impl OpProofsStorageWithMetrics { } } -impl OpProofsStore for OpProofsStorageWithMetrics +impl OpProofsStore for OpProofsStoreWithMetrics where S: OpProofsStore, { @@ -415,7 +425,7 @@ impl OpProofsProviderRO for OpProofsProviderROWithMetrics fn get_earliest_block_number(&self) -> OpProofsStorageResult> { let result = self.provider.get_earliest_block_number()?; if let Some((number, _)) = result { - self.metrics.block_metrics.earliest_number.set(number as f64); + self.metrics.proof_window.earliest.set(number as f64); } Ok(result) } @@ -424,7 +434,7 @@ impl OpProofsProviderRO for OpProofsProviderROWithMetrics fn get_latest_block_number(&self) -> OpProofsStorageResult> { let result = self.provider.get_latest_block_number()?; if let Some((number, _)) = result { - self.metrics.block_metrics.latest_number.set(number as f64); + self.metrics.proof_window.latest.set(number as f64); } Ok(result) } @@ -498,7 +508,7 @@ impl OpProofsProviderRO for OpProofsProviderRwWithMetrics fn get_earliest_block_number(&self) -> OpProofsStorageResult> { let result = self.provider.get_earliest_block_number()?; if let Some((number, _)) = result { - self.metrics.block_metrics.earliest_number.set(number as f64); + self.metrics.proof_window.earliest.set(number as f64); } Ok(result) } @@ -507,7 +517,7 @@ impl OpProofsProviderRO for OpProofsProviderRwWithMetrics fn get_latest_block_number(&self) -> OpProofsStorageResult> { let result = self.provider.get_latest_block_number()?; if let Some((number, _)) = result { - self.metrics.block_metrics.latest_number.set(number as f64); + self.metrics.proof_window.latest.set(number as f64); } Ok(result) } @@ -564,7 +574,7 @@ impl OpProofsProviderRw for OpProofsProviderRwWithMetrics block_state_diff: BlockStateDiff, ) -> OpProofsStorageResult { let result = self.provider.store_trie_updates(block_ref, block_state_diff)?; - self.metrics.block_metrics.latest_number.set(block_ref.block.number as f64); + self.metrics.proof_window.latest.set(block_ref.block.number as f64); Ok(result) } @@ -575,7 +585,7 @@ impl OpProofsProviderRw for OpProofsProviderRwWithMetrics ) -> OpProofsStorageResult { let result = self.provider.store_trie_updates_batch(updates.clone())?; if let Some((latest_block_ref, _)) = updates.last() { - self.metrics.block_metrics.latest_number.set(latest_block_ref.block.number as f64); + self.metrics.proof_window.latest.set(latest_block_ref.block.number as f64); } Ok(result) } @@ -585,7 +595,7 @@ impl OpProofsProviderRw for OpProofsProviderRwWithMetrics &self, new_earliest_block_ref: BlockWithParent, ) -> OpProofsStorageResult { - self.metrics.block_metrics.earliest_number.set(new_earliest_block_ref.block.number as f64); + self.metrics.proof_window.earliest.set(new_earliest_block_ref.block.number as f64); self.provider.prune_earliest_state(new_earliest_block_ref) } @@ -609,7 +619,7 @@ impl OpProofsProviderRw for OpProofsProviderRwWithMetrics block_number: u64, hash: B256, ) -> OpProofsStorageResult<()> { - self.metrics.block_metrics.earliest_number.set(block_number as f64); + self.metrics.proof_window.earliest.set(block_number as f64); self.provider.set_earliest_block_number(block_number, hash) } @@ -646,6 +656,8 @@ impl OpProofsInitProvider for OpProofsInitProviderWithM let start = Instant::now(); let result = self.provider.store_account_branches(account_nodes); let duration = start.elapsed(); + + // Record per-item duration if count > 0 { self.metrics.record_duration_per_item( StorageOperation::StoreAccountBranch, @@ -653,6 +665,7 @@ impl OpProofsInitProvider for OpProofsInitProviderWithM count, ); } + result } @@ -666,6 +679,8 @@ impl OpProofsInitProvider for OpProofsInitProviderWithM let start = Instant::now(); let result = self.provider.store_storage_branches(hashed_address, storage_nodes); let duration = start.elapsed(); + + // Record per-item duration if count > 0 { self.metrics.record_duration_per_item( StorageOperation::StoreStorageBranch, @@ -673,6 +688,7 @@ impl OpProofsInitProvider for OpProofsInitProviderWithM count, ); } + result } @@ -685,6 +701,8 @@ impl OpProofsInitProvider for OpProofsInitProviderWithM let start = Instant::now(); let result = self.provider.store_hashed_accounts(accounts); let duration = start.elapsed(); + + // Record per-item duration if count > 0 { self.metrics.record_duration_per_item( StorageOperation::StoreHashedAccount, @@ -692,6 +710,7 @@ impl OpProofsInitProvider for OpProofsInitProviderWithM count, ); } + result } @@ -705,6 +724,8 @@ impl OpProofsInitProvider for OpProofsInitProviderWithM let start = Instant::now(); let result = self.provider.store_hashed_storages(hashed_address, storages); let duration = start.elapsed(); + + // Record per-item duration if count > 0 { self.metrics.record_duration_per_item( StorageOperation::StoreHashedStorage, @@ -712,13 +733,14 @@ impl OpProofsInitProvider for OpProofsInitProviderWithM count, ); } + result } #[inline] fn commit_initial_state(&self) -> OpProofsStorageResult { let block = self.provider.commit_initial_state()?; - self.metrics.block_metrics.earliest_number.set(block.number as f64); + self.metrics.proof_window.earliest.set(block.number as f64); Ok(block) } @@ -728,7 +750,7 @@ impl OpProofsInitProvider for OpProofsInitProviderWithM } } -impl From for OpProofsStorageWithMetrics +impl From for OpProofsStoreWithMetrics where S: OpProofsStore + Clone + 'static, { diff --git a/rust/op-reth/crates/trie/src/overlay_provider.rs b/rust/op-reth/crates/trie/src/overlay_provider.rs new file mode 100644 index 0000000000000..a160a644fc5a5 --- /dev/null +++ b/rust/op-reth/crates/trie/src/overlay_provider.rs @@ -0,0 +1,281 @@ +//! Overlay Provider for external proofs storage + +use crate::{api::OpProofsProviderRO, provider::OpProofsStateProviderRef, BlockStateDiff}; +use alloy_eips::eip1898::BlockWithParent; +use alloy_primitives::{keccak256, Address, BlockNumber, Bytes, StorageValue, B256}; +use reth_primitives_traits::{Account, Bytecode}; +use reth_provider::{ + AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider, + StateProvider, StateRootProvider, StorageRootProvider, ProviderResult +}; +use reth_revm::db::BundleState; +use reth_trie::{ + updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof, + MultiProofTargets, StorageMultiProof, TrieInput, +}; +use std::{fmt::Debug, sync::{Arc, OnceLock}}; + +/// A state provider that overlays in-memory buffered blocks on top of the persistent proofs storage. +#[derive(Debug)] +pub struct MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO, +{ + inner: OpProofsStateProviderRef<'a, P>, + /// Ordered list of buffered blocks (Oldest to Newest). + memory: Vec>, + trie_input: OnceLock, +} + +impl<'a, P> MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO + Clone, +{ + /// Create a new overlay provider. + /// + /// `memory` should be strictly ordered from oldest to newest. + pub fn new( + inner: OpProofsStateProviderRef<'a, P>, + memory: Vec>, + ) -> Self { + Self { inner, memory, trie_input: OnceLock::new() } + } + + /// Aggregates trie updates from the memory buffer. + fn trie_input(&self) -> &TrieInput { + self.trie_input.get_or_init(|| { + let mut input = TrieInput::default(); + // Iterate over buffered blocks to collect all trie nodes and state updates. + // memory is expected to be ordered Oldest -> Newest. + for state in &self.memory { + let diff = &state.1; + input.nodes.extend_from_sorted(&diff.sorted_trie_updates); + input.state.extend_from_sorted(&diff.sorted_post_state); + } + input + }) + } + + /// Merges the overlay storage for the given address with the provided storage. + fn merged_hashed_storage(&self, address: Address, storage: HashedStorage) -> HashedStorage { + let hashed_address = keccak256(address); + // Start with the overlay storage from our trie input cache + let state = &self.trie_input().state; + let mut overlay = state.storages.get(&hashed_address).cloned().unwrap_or_default(); + + overlay.extend(&storage); + overlay + } +} + +impl<'a, P> AccountReader for MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO + Clone, +{ + fn basic_account(&self, address: &Address) -> ProviderResult> { + let hashed_address = keccak256(address); + // Check buffer via trie_input cache + if let Some(account) = self.trie_input().state.accounts.get(&hashed_address) { + return Ok(account.clone()); + } + self.inner.basic_account(address) + } +} + +impl<'a, P> StateProvider for MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO + Clone, +{ + fn storage(&self, address: Address, storage_key: B256) -> ProviderResult> { + let hashed_slot = keccak256(storage_key); + self.storage_by_hashed_key(address, hashed_slot) + } + + fn storage_by_hashed_key( + &self, + address: Address, + hashed_key: B256, + ) -> ProviderResult> { + let hashed_address = keccak256(address); + + // Check buffer via trie_input cache + let state = &self.trie_input().state; + + // Check for storage updates or wipes in the overlay + if let Some(account_storage) = state.storages.get(&hashed_address) { + // Check specific slot + if let Some(value) = account_storage.storage.get(&hashed_key) { + return Ok(Some(*value)); + } + // If the whole storage was wiped in the overlay (e.g. reused address), we return 0 + if account_storage.wiped { + return Ok(Some(StorageValue::ZERO)); + } + } + + // Check if account was destroyed in the overlay (implicit storage wipe) + if let Some(account) = state.accounts.get(&hashed_address) { + if account.is_none() { + return Ok(Some(StorageValue::ZERO)); + } + } + + self.inner.storage_by_hashed_key(address, hashed_key) + } +} + +impl<'a, P> BytecodeReader for MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO + Clone, +{ + fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult> { + // HashedPostStateSorted does not store bytecode, so we cannot look it up in the overlay. + // We fallback strictly to the inner provider. + self.inner.bytecode_by_hash(code_hash) + } +} + +impl<'a, P> StateRootProvider for MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO + Clone, +{ + fn state_root(&self, state: HashedPostState) -> ProviderResult { + self.state_root_from_nodes(TrieInput::from_state(state)) + } + + fn state_root_from_nodes(&self, mut input: TrieInput) -> ProviderResult { + // Combine updates from the buffer (overlay) with the current input + input.prepend_self(self.trie_input().clone()); + + // Delegate to inner to calculate root against disk + overlay + self.inner.state_root_from_nodes(input) + } + + fn state_root_with_updates( + &self, + state: HashedPostState, + ) -> ProviderResult<(B256, TrieUpdates)> { + self.state_root_from_nodes_with_updates(TrieInput::from_state(state)) + } + + fn state_root_from_nodes_with_updates( + &self, + mut input: TrieInput, + ) -> ProviderResult<(B256, TrieUpdates)> { + input.prepend_self(self.trie_input().clone()); + self.inner.state_root_from_nodes_with_updates(input) + } +} + +impl<'a, P> StorageRootProvider for MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO + Clone, +{ + fn storage_root( + &self, + address: Address, + hashed_storage: HashedStorage, + ) -> ProviderResult { + let merged = self.merged_hashed_storage(address, hashed_storage); + self.inner.storage_root(address, merged) + } + + fn storage_proof( + &self, + address: Address, + slot: B256, + hashed_storage: HashedStorage, + ) -> ProviderResult { + let merged = self.merged_hashed_storage(address, hashed_storage); + self.inner.storage_proof(address, slot, merged) + } + + fn storage_multiproof( + &self, + address: Address, + slots: &[B256], + hashed_storage: HashedStorage, + ) -> ProviderResult { + let merged = self.merged_hashed_storage(address, hashed_storage); + self.inner.storage_multiproof(address, slots, merged) + } +} + +impl<'a, P> StateProofProvider for MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO + Clone, +{ + fn proof( + &self, + mut input: TrieInput, + address: Address, + slots: &[B256], + ) -> ProviderResult { + input.prepend_self(self.trie_input().clone()); + self.inner.proof(input, address, slots) + } + + fn multiproof( + &self, + mut input: TrieInput, + targets: MultiProofTargets, + ) -> ProviderResult { + input.prepend_self(self.trie_input().clone()); + self.inner.multiproof(input, targets) + } + + fn witness( + &self, + mut input: TrieInput, + target: HashedPostState, + ) -> ProviderResult> { + input.prepend_self(self.trie_input().clone()); + self.inner.witness(input, target) + } +} + +impl<'a, P> BlockHashReader for MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO, +{ + fn block_hash(&self, number: BlockNumber) -> ProviderResult> { + // Iterate backwards (Newest to Oldest) to find most recent definition + for state in self.memory.iter().rev() { + if state.0.block.number == number { + return Ok(Some(state.0.block.hash)); + } + } + self.inner.block_hash(number) + } + + fn canonical_hashes_range( + &self, + start: BlockNumber, + end: BlockNumber, + ) -> ProviderResult> { + let mut hashes = self.inner.canonical_hashes_range(start, end)?; + + // Overlay with in-memory blocks + for state in &self.memory { + let num = state.0.block.number; + if num >= start && num < end { + let idx = (num - start) as usize; + if idx < hashes.len() { + hashes[idx] = state.0.block.hash; + } else if idx == hashes.len() { + hashes.push(state.0.block.hash); + } + } + } + Ok(hashes) + } +} + +impl<'a, P> HashedPostStateProvider for MemoryOverlayOpProofsStateProviderRef<'a, P> +where + P: OpProofsProviderRO + Clone, +{ + fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState { + self.inner.hashed_post_state(bundle_state) + } +} diff --git a/rust/op-reth/crates/trie/src/persistence.rs b/rust/op-reth/crates/trie/src/persistence.rs new file mode 100644 index 0000000000000..e177984fa7d8f --- /dev/null +++ b/rust/op-reth/crates/trie/src/persistence.rs @@ -0,0 +1,309 @@ +//! Persistence implementation for external proof + +use crate::{ + api::{OpProofsProviderRw, WriteCounts}, + prune::OpProofStoragePruner, + BlockStateDiff, OpProofsStore, OpProofsStorageError, +}; +#[cfg(feature = "metrics")] +use crate::metrics::PersistenceMetrics; +use alloy_eips::eip1898::BlockWithParent; +use reth_provider::BlockHashReader; +use crossbeam_channel::{Receiver, Sender}; +use parking_lot::{Mutex, Condvar}; +use std::{sync::Arc, thread, time::Instant}; +use tracing::{debug, error, info}; + +/// Thread-safe tracker for whether a background persistence task is running. +#[derive(Debug)] +pub struct PersistenceStatus { + is_running: Mutex, + done: Condvar, +} + +impl Default for PersistenceStatus { + fn default() -> Self { + Self::new() + } +} + +impl PersistenceStatus { + /// Create a new idle status. + pub fn new() -> Self { + Self { + is_running: Mutex::new(false), + done: Condvar::new(), + } + } + + /// Returns `true` if a persistence task is currently running. + pub fn is_running(&self) -> bool { + *self.is_running.lock() + } + + /// Mark persistence as running. Returns `true` if it was previously idle + /// (i.e. this call "won" the race), `false` if already running. + pub fn mark_running(&self) -> bool { + let mut running = self.is_running.lock(); + if *running { + false + } else { + *running = true; + true + } + } + + /// Mark persistence as idle and wake all waiters. + pub fn mark_idle(&self) { + let mut running = self.is_running.lock(); + *running = false; + self.done.notify_all(); + } + + /// Block the calling thread until persistence is idle. + pub fn wait_until_idle(&self) { + let mut running = self.is_running.lock(); + while *running { + self.done.wait(&mut running); + } + } +} + +/// Messages sent to the persistence service. +#[derive(Debug)] +pub enum LiveTriePersistenceAction { + /// Save a batch of trie updates to storage. + /// + /// Contains: + /// 1. The list of blocks and their diffs (ordered Oldest -> Newest). + /// 2. A response channel to return the highest block number persisted (for pruning). + SaveUpdates(Vec>, Sender>), + /// Unwind history to the specified block (inclusive). + /// All history strictly after this block is removed. + Unwind(BlockWithParent, Sender>), +} + +/// A handle to communicate with the Live Trie persistence service. +#[derive(Debug, Clone)] +pub struct LiveTriePersistenceHandle { + sender: Sender, +} + +impl LiveTriePersistenceHandle { + /// Create a new handle. + pub fn new(sender: Sender) -> Self { + Self { sender } + } + + /// Spawn the service in a new thread and return a handle. + pub fn spawn(pruner: OpProofStoragePruner, storage: S) -> Self + where + S: OpProofsStore + Clone + 'static, + H: BlockHashReader + Send + Sync + 'static, + { + let (tx, rx) = crossbeam_channel::bounded(2); + let service = LiveTriePersistenceService::new(pruner, storage, rx); + + thread::Builder::new() + .name("Live Trie Persistence".into()) + .spawn(move || service.run()) + .expect("failed to spawn live trie persistence thread"); + + Self::new(tx) + } + + /// Send a save request. + /// + /// Returns an error if the persistence service has stopped. + pub fn save_updates( + &self, + updates: Vec>, + response_tx: Sender>, + ) -> Result<(), OpProofsStorageError> { + self.sender.send(LiveTriePersistenceAction::SaveUpdates(updates, response_tx)) + .map_err(|_| OpProofsStorageError::Other("Persistence service disconnected".into())) + } + + /// Send an unwind request. + /// + /// Returns an error if the persistence service has stopped. + pub fn unwind( + &self, + to: BlockWithParent, + response_tx: Sender>, + ) -> Result<(), OpProofsStorageError> { + self.sender.send(LiveTriePersistenceAction::Unwind(to, response_tx)) + .map_err(|_| OpProofsStorageError::Other("Persistence service disconnected".into())) + } +} + +/// Service that runs in a background thread to persist trie updates. +#[derive(Debug)] +pub struct LiveTriePersistenceService { + /// Pruner that also owns the storage backend and block hash reader. + pruner: OpProofStoragePruner, + storage: S, + incoming: Receiver, + + #[cfg(feature = "metrics")] + metrics: PersistenceMetrics, +} + +impl LiveTriePersistenceService { + /// Create a new persistence service. + pub fn new(pruner: OpProofStoragePruner, storage: S, incoming: Receiver) -> Self { + Self { + pruner, + storage, + incoming, + + #[cfg(feature = "metrics")] + metrics: PersistenceMetrics::new_with_labels(&[] as &[(&str, &str)]), + } + } + + /// Main loop for the service. + /// Listens for incoming actions and processes them sequentially. + pub fn run(self) { + debug!(target: "live-trie::persistence", "Service started"); + + while let Ok(action) = self.incoming.recv() { + match action { + LiveTriePersistenceAction::Unwind(to, reply_tx) => { + self.on_unwind(to, reply_tx); + } + LiveTriePersistenceAction::SaveUpdates(updates, reply_tx) => { + self.on_save_updates(updates, reply_tx); + } + } + } + debug!(target: "live-trie::persistence", "Service shutting down"); + } + + fn on_save_updates( + &self, + arc_updates: Vec>, + reply_tx: Sender>, + ) { + if arc_updates.is_empty() { + let _ = reply_tx.send(None); + return; + } + + let start = Instant::now(); + let count = arc_updates.len(); + let first = arc_updates.first().map(|arc| arc.0.block.number); + let last = arc_updates.last().map(|arc| arc.0.block.number); + debug!(target: "live-trie::persistence", ?count, ?first, ?last, "Writing batch to storage"); + + // Convert from Arc to owned on the persistence thread (not the caller thread) + // to avoid blocking block execution with deep clones. + let updates: Vec<(BlockWithParent, BlockStateDiff)> = arc_updates + .into_iter() + .map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone())) + .collect(); + + // Store updates and prune in a single transaction + let provider_rw_start = Instant::now(); + let result = self.storage.provider_rw().and_then(|provider| { + let open_tx_duration = provider_rw_start.elapsed(); + + // 1. Store the new block updates (without pruning — pass None) + let write_start = Instant::now(); + let res = provider.store_trie_updates_batch(updates)?; + let write_duration = write_start.elapsed(); + + // 2. Prune old state using the pruner on the same transaction + let prune_start = Instant::now(); + let prune_result = self.pruner.prune_with_provider(&provider); + let prune_duration = prune_start.elapsed(); + + match &prune_result { + Ok(output) => { + if *output != Default::default() { + info!( + target: "live-trie::persistence", + ?output, + "Pruning complete within save transaction" + ); + } + } + Err(e) => { + error!(target: "live-trie::persistence", ?e, "Pruning failed during save, aborting transaction"); + } + } + + // 3. Abort the entire transaction if pruning failed + prune_result.map_err(|e| crate::OpProofsStorageError::Other(e.to_string()))?; + + // 4. Commit both store and prune atomically + let commit_start = Instant::now(); + provider.commit()?; + let commit_duration = commit_start.elapsed(); + + Ok((res, open_tx_duration, write_duration, prune_duration, commit_duration)) + }); + + let (successful_last, total_write_count, open_tx_duration, write_duration, prune_duration, commit_duration) = match result { + Ok((counts, otd, wd, pd, cd)) => (last, counts, Some(otd), Some(wd), Some(pd), Some(cd)), + Err(e) => { + error!(target: "live-trie::persistence", ?e, "Failed to persist batch trie updates"); + (None, WriteCounts::default(), None, None, None, None) + } + }; + + #[cfg(feature = "metrics")] + { + self.metrics.increment_write_counts(&total_write_count); + if let Some(d) = open_tx_duration { + self.metrics.open_tx_duration_seconds.record(d); + } + if let Some(d) = write_duration { + self.metrics.write_duration_seconds.record(d); + } + if let Some(d) = prune_duration { + self.metrics.prune_duration_seconds.record(d); + } + if let Some(d) = commit_duration { + self.metrics.commit_duration_seconds.record(d); + } + } + + let duration = start.elapsed(); + info!( + target: "live-trie::persistence", + ?successful_last, + ?duration, + ?open_tx_duration, + ?write_duration, + ?prune_duration, + ?commit_duration, + ?total_write_count, + blocks_count = count, + "Batch write complete" + ); + let _ = reply_tx.send(successful_last); + } + + fn on_unwind( + &self, + to: BlockWithParent, + reply_tx: Sender>, + ) { + debug!(target: "live-trie::persistence", to_block = ?to.block.number, "Unwinding storage"); + let result = self.storage.provider_rw().and_then(|provider| { + provider.unwind_history(to)?; + provider.commit() + }); + match result { + Ok(_) => { + debug!(target: "live-trie::persistence", "Unwind successful"); + let _ = reply_tx.send(Ok(())); + } + Err(e) => { + error!(target: "live-trie::persistence", ?e, "Unwind failed"); + let _ = reply_tx.send(Err(e.to_string())); + } + } + } +} diff --git a/rust/op-reth/crates/trie/src/prune/pruner.rs b/rust/op-reth/crates/trie/src/prune/pruner.rs index db4ca0bb7cc51..8189ee30b0773 100644 --- a/rust/op-reth/crates/trie/src/prune/pruner.rs +++ b/rust/op-reth/crates/trie/src/prune/pruner.rs @@ -1,21 +1,24 @@ #[cfg(feature = "metrics")] use crate::prune::metrics::Metrics; use crate::{ - OpProofsStore, - api::{OpProofsProviderRO, OpProofsProviderRw}, prune::error::{OpProofStoragePrunerResult, PrunerError, PrunerOutput}, + OpProofsProviderRO, OpProofsProviderRw, OpProofsStore, }; -use alloy_eips::{BlockNumHash, eip1898::BlockWithParent}; + +use alloy_eips::{eip1898::BlockWithParent, BlockNumHash}; use reth_provider::BlockHashReader; use std::cmp; use tokio::time::Instant; use tracing::{error, info, trace}; +/// Default batch size for pruning operations. +const DEFAULT_PRUNE_BATCH_SIZE: u64 = 50; + /// Prunes the proof storage by calling `prune_earliest_state` on the storage provider. #[derive(Debug)] -pub struct OpProofStoragePruner { - // Database provider for the prune - provider: P, +pub struct OpProofStoragePruner { + /// Storage backend for the prune + store: S, /// Reader to fetch block hash by block number block_hash_reader: H, /// Keep at least these many recent blocks @@ -28,43 +31,51 @@ pub struct OpProofStoragePruner { metrics: Metrics, } -impl OpProofStoragePruner { +impl OpProofStoragePruner { /// Create a new pruner. pub fn new( - provider: P, + store: S, block_hash_reader: H, min_block_interval: u64, - prune_batch_size: u64, ) -> Self { Self { - provider, + store, block_hash_reader, min_block_interval, - prune_batch_size, + prune_batch_size: DEFAULT_PRUNE_BATCH_SIZE, #[cfg(feature = "metrics")] metrics: Metrics::default(), } } + + /// Set the batch size for pruning operations. The pruner will prune + /// at most this many blocks in one database transaction. + pub fn with_batch_size(mut self, prune_batch_size: u64) -> Self { + self.prune_batch_size = prune_batch_size; + self + } } -impl OpProofStoragePruner +impl OpProofStoragePruner where - P: OpProofsStore, + S: OpProofsStore, H: BlockHashReader, { fn run_inner(&self) -> OpProofStoragePrunerResult { - let provider_ro = self.provider.provider_ro()?; + let provider_ro = self.store.provider_ro()?; let latest_block_opt = provider_ro.get_latest_block_number()?; if latest_block_opt.is_none() { trace!(target: "trie::pruner", "No latest blocks in the proof storage"); - return Ok(PrunerOutput::default()); + return Ok(PrunerOutput::default()) } let earliest_block_opt = provider_ro.get_earliest_block_number()?; if earliest_block_opt.is_none() { trace!(target: "trie::pruner", "No earliest blocks in the proof storage"); - return Ok(PrunerOutput::default()); + return Ok(PrunerOutput::default()) } + // Drop the read-only provider before starting write transactions + drop(provider_ro); let latest_block = latest_block_opt.unwrap().0; let earliest_block = earliest_block_opt.unwrap().0; @@ -72,7 +83,7 @@ where let interval = latest_block.saturating_sub(earliest_block); if interval <= self.min_block_interval { trace!(target: "trie::pruner", "Nothing to prune"); - return Ok(PrunerOutput::default()); + return Ok(PrunerOutput::default()) } // at this point `latest_block` is always greater than `min_block_interval` @@ -92,28 +103,93 @@ where ..Default::default() }; - // Prune in batches + // Prune in batches, committing each batch separately to avoid + // holding a large write transaction for the entire prune window. while current_earliest_block < target_earliest_block { - // Calculate the end of this batch let batch_end_block = cmp::min(current_earliest_block + self.prune_batch_size, target_earliest_block); - let batch_output = self.prune_batch(current_earliest_block, batch_end_block)?; + let provider_rw = self.store.provider_rw()?; + let batch_output = + self.prune_batch_on_provider(&provider_rw, current_earliest_block, batch_end_block)?; + provider_rw.commit()?; prune_output.extend_ref(batch_output); + current_earliest_block = batch_end_block; + } + + Ok(prune_output) + } + + /// Prune proof storage using the given write provider, without committing. + /// + /// This allows callers to batch pruning with other write operations (e.g., storing + /// new block updates) in a single database transaction. The caller is responsible + /// for committing the transaction. + pub fn prune_with_provider( + &self, + provider_rw: &RW, + ) -> OpProofStoragePrunerResult { + let latest_block_opt = provider_rw.get_latest_block_number()?; + if latest_block_opt.is_none() { + trace!(target: "trie::pruner", "No latest blocks in the proof storage"); + return Ok(PrunerOutput::default()) + } + + let earliest_block_opt = provider_rw.get_earliest_block_number()?; + if earliest_block_opt.is_none() { + trace!(target: "trie::pruner", "No earliest blocks in the proof storage"); + return Ok(PrunerOutput::default()) + } + + let latest_block = latest_block_opt.unwrap().0; + let earliest_block = earliest_block_opt.unwrap().0; + + let interval = latest_block.saturating_sub(earliest_block); + if interval <= self.min_block_interval { + trace!(target: "trie::pruner", "Nothing to prune"); + return Ok(PrunerOutput::default()) + } + + let target_earliest_block = latest_block - self.min_block_interval; + + info!( + target: "trie::pruner", + from_block = earliest_block, + to_block = target_earliest_block, + "Starting pruning proof storage (in-tx)", + ); + + let mut current_earliest_block = earliest_block; + let mut prune_output = PrunerOutput { + start_block: earliest_block, + end_block: target_earliest_block, + ..Default::default() + }; + + while current_earliest_block < target_earliest_block { + let batch_end_block = + cmp::min(current_earliest_block + self.prune_batch_size, target_earliest_block); - // Update loop state + let batch_output = + self.prune_batch_on_provider(provider_rw, current_earliest_block, batch_end_block)?; + + prune_output.extend_ref(batch_output); current_earliest_block = batch_end_block; } Ok(prune_output) } - /// Prunes a single batch of blocks. - fn prune_batch(&self, start_block: u64, end_block: u64) -> Result { + /// Execute a single prune batch on the given write provider without committing. + fn prune_batch_on_provider( + &self, + provider_rw: &RW, + start_block: u64, + end_block: u64, + ) -> Result { let batch_start_time = Instant::now(); - // Fetch block hashes for the new earliest block of this batch let new_earliest_block_hash = self .block_hash_reader .block_hash(end_block) @@ -141,22 +217,16 @@ where })? .ok_or(PrunerError::BlockNotFound(parent_block_num))?; - batch_start_time.elapsed(); - let block_with_parent = BlockWithParent { parent: parent_block_hash, block: BlockNumHash { number: end_block, hash: new_earliest_block_hash }, }; - // Commit this batch - let provider_rw = self.provider.provider_rw()?; let write_counts = provider_rw.prune_earliest_state(block_with_parent)?; - provider_rw.commit()?; let duration = batch_start_time.elapsed(); let batch_output = PrunerOutput { duration, start_block, end_block, write_counts }; - // Record metrics for this batch #[cfg(feature = "metrics")] self.metrics.record_prune_result(batch_output.clone()); @@ -182,17 +252,17 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{BlockStateDiff, OpProofsStore, api::{OpProofsProviderRO, OpProofsProviderRw}, db::MdbxProofsStorage}; + use crate::{db::MdbxProofsStorageV2, BlockStateDiff, OpProofsStorage}; use alloy_eips::{BlockHashOrNumber, NumHash}; - use alloy_primitives::{B256, BlockNumber, U256}; + use alloy_primitives::{BlockNumber, B256, U256}; use mockall::mock; use reth_primitives_traits::Account; use reth_storage_errors::provider::ProviderResult; use reth_trie::{ - BranchNodeCompact, HashedPostState, HashedStorage, Nibbles, hashed_cursor::HashedCursor, trie_cursor::TrieCursor, updates::{StorageTrieUpdates, TrieUpdates, TrieUpdatesSorted}, + BranchNodeCompact, HashedPostState, HashedStorage, Nibbles, }; use std::sync::Arc; use tempfile::TempDir; @@ -222,26 +292,6 @@ mod tests { keccak256(n.to_be_bytes()) } - fn store_block(store: &Arc, block: BlockWithParent, diff: BlockStateDiff) { - let p = store.provider_rw().unwrap(); - p.store_trie_updates(block, diff).unwrap(); - p.commit().unwrap(); - } - - fn set_earliest(store: &Arc, num: u64, hash: B256) { - let p = store.provider_rw().unwrap(); - p.set_earliest_block_number(num, hash).unwrap(); - p.commit().unwrap(); - } - - fn get_earliest(store: &Arc) -> Option<(u64, B256)> { - store.provider_ro().unwrap().get_earliest_block_number().unwrap() - } - - fn get_latest(store: &Arc) -> Option<(u64, B256)> { - store.provider_ro().unwrap().get_latest_block_number().unwrap() - } - /// Build a block-with-parent for number `n` with deterministic hash. fn block(n: u64, parent: B256) -> BlockWithParent { BlockWithParent::new(parent, NumHash::new(n, b256(n))) @@ -251,13 +301,14 @@ mod tests { async fn run_inner_and_and_verify_updated_state() { // --- env/store --- let dir = TempDir::new().unwrap(); - let store: Arc = - Arc::new(MdbxProofsStorage::new(dir.path()).expect("env")); + #[allow(clippy::useless_conversion)] + let store: OpProofsStorage> = + Arc::new(MdbxProofsStorageV2::new(dir.path()).expect("env")).into(); { - let p = store.provider_rw().unwrap(); - p.set_earliest_block_number(0, B256::ZERO).expect("set earliest"); - p.commit().unwrap(); + let provider = store.provider_rw().expect("provider_rw"); + provider.set_earliest_block_number(0, B256::ZERO).expect("set earliest"); + provider.commit().expect("commit"); } // --- entities --- @@ -321,7 +372,9 @@ mod tests { sorted_post_state: d_post_state.into_sorted(), sorted_trie_updates: d_trie_updates.into_sorted(), }; - store_block(&store, b1, d); + let provider = store.provider_rw().expect("provider_rw"); + provider.store_trie_updates(b1, d).expect("b1"); + provider.commit().expect("commit"); parent = b256(1); } @@ -354,7 +407,9 @@ mod tests { sorted_post_state: d_post_state.into_sorted(), sorted_trie_updates: d_trie_updates.into_sorted(), }; - store_block(&store, b2, d); + let provider = store.provider_rw().expect("provider_rw"); + provider.store_trie_updates(b2, d).expect("b2"); + provider.commit().expect("commit"); parent = b256(2); } @@ -380,7 +435,9 @@ mod tests { sorted_post_state: d_post_state.into_sorted(), sorted_trie_updates: d_trie_updates.into_sorted(), }; - store_block(&store, b3, d); + let provider = store.provider_rw().expect("provider_rw"); + provider.store_trie_updates(b3, d).expect("b3"); + provider.commit().expect("commit"); parent = b256(3); } @@ -407,7 +464,9 @@ mod tests { sorted_post_state: d_post_state.into_sorted(), sorted_trie_updates: d_trie_updates.into_sorted(), }; - store_block(&store, b4, d); + let provider = store.provider_rw().expect("provider_rw"); + provider.store_trie_updates(b4, d).expect("b4"); + provider.commit().expect("commit"); parent = b256(4); } @@ -430,13 +489,16 @@ mod tests { sorted_post_state: d_post_state.into_sorted(), sorted_trie_updates: TrieUpdatesSorted::default(), }; - store_block(&store, b5, d); + let provider = store.provider_rw().expect("provider_rw"); + provider.store_trie_updates(b5, d).expect("b5"); + provider.commit().expect("commit"); } // sanity: earliest=0, latest=5 { - let e = get_earliest(&store).expect("some"); - let l = get_latest(&store).expect("some"); + let provider = store.provider_ro().expect("provider_ro"); + let e = provider.get_earliest_block_number().expect("earliest").expect("some"); + let l = provider.get_latest_block_number().expect("latest").expect("some"); assert_eq!(e.0, 0); assert_eq!(l.0, 5); } @@ -454,15 +516,16 @@ mod tests { .withf(move |block_num| *block_num == 3) .returning(move |_| Ok(Some(b256(3)))); - let pruner = OpProofStoragePruner::new(store.clone(), block_hash_reader, 1, 1000); + let pruner = OpProofStoragePruner::new(store.clone(), block_hash_reader, 1); let out = pruner.run_inner().expect("pruner ok"); assert_eq!(out.start_block, 0); assert_eq!(out.end_block, 4, "pruned up to 4 (inclusive); new earliest is 4"); // proof window moved: earliest=4, latest=5 { - let e = get_earliest(&store).expect("some"); - let l = get_latest(&store).expect("some"); + let provider = store.provider_ro().expect("provider_ro"); + let e = provider.get_earliest_block_number().expect("earliest").expect("some"); + let l = provider.get_latest_block_number().expect("latest").expect("some"); assert_eq!(e.0, 4); assert_eq!(e.1, b256(4)); assert_eq!(l.0, 5); @@ -470,11 +533,11 @@ mod tests { } // --- DB checks - let provider_ro = store.provider_ro().expect("provider_ro"); - let mut acc_cur = provider_ro.account_hashed_cursor(4).expect("acc cur"); - let mut stor_cur = provider_ro.storage_hashed_cursor(stor_addr, 4).expect("stor cur"); - let mut acc_trie_cur = provider_ro.account_trie_cursor(4).expect("acc trie cur"); - let mut stor_trie_cur = provider_ro.storage_trie_cursor(stor_addr, 4).expect("stor trie cur"); + let provider = store.provider_ro().expect("provider_ro"); + let mut acc_cur = provider.account_hashed_cursor(4).expect("acc cur"); + let mut stor_cur = provider.storage_hashed_cursor(stor_addr, 4).expect("stor cur"); + let mut acc_trie_cur = provider.account_trie_cursor(4).expect("acc trie cur"); + let mut stor_trie_cur = provider.storage_trie_cursor(stor_addr, 4).expect("stor trie cur"); // Check these histories have been removed let pruned_hashed_account = a1; @@ -544,17 +607,20 @@ mod tests { #[tokio::test] async fn run_inner_where_latest_block_is_none() { let dir = TempDir::new().unwrap(); - let store: Arc = - Arc::new(MdbxProofsStorage::new(dir.path()).expect("env")); + let store: OpProofsStorage> = + OpProofsStorage::from(Arc::new(MdbxProofsStorageV2::new(dir.path()).expect("env"))); - let earliest = get_earliest(&store); - let latest = get_latest(&store); - println!("{earliest:?} {latest:?}"); - assert!(earliest.is_none()); - assert!(latest.is_none()); + { + let provider = store.provider_ro().unwrap(); + let earliest = provider.get_earliest_block_number().unwrap(); + let latest = provider.get_latest_block_number().unwrap(); + println!("{:?} {:?}", earliest, latest); + assert!(earliest.is_none()); + assert!(latest.is_none()); + } let block_hash_reader = MockBlockHashReader::new(); - let pruner = OpProofStoragePruner::new(store, block_hash_reader, 10, 1000); + let pruner = OpProofStoragePruner::new(store, block_hash_reader, 10); let out = pruner.run_inner().expect("ok"); assert_eq!(out, PrunerOutput::default(), "should early-return default output"); } @@ -565,19 +631,28 @@ mod tests { use crate::BlockStateDiff; let dir = TempDir::new().unwrap(); - let store: Arc = - Arc::new(MdbxProofsStorage::new(dir.path()).expect("env")); + let store: OpProofsStorage> = + OpProofsStorage::from(Arc::new(MdbxProofsStorageV2::new(dir.path()).expect("env"))); // Write a single block to set *latest* only. - store_block(&store, block(3, B256::ZERO), BlockStateDiff::default()); + { + let provider = store.provider_rw().expect("provider_rw"); + provider + .store_trie_updates(block(3, B256::ZERO), BlockStateDiff::default()) + .expect("store b1"); + provider.commit().expect("commit"); + } - let earliest = get_earliest(&store); - let latest = get_latest(&store); - assert!(earliest.is_none(), "earliest must remain None"); - assert_eq!(latest.unwrap().0, 3); + { + let provider = store.provider_ro().unwrap(); + let earliest = provider.get_earliest_block_number().unwrap(); + let latest = provider.get_latest_block_number().unwrap(); + assert!(earliest.is_none(), "earliest must remain None"); + assert_eq!(latest.unwrap().0, 3); + } let block_hash_reader = MockBlockHashReader::new(); - let pruner = OpProofStoragePruner::new(store, block_hash_reader, 1, 1000); + let pruner = OpProofStoragePruner::new(store, block_hash_reader, 1); let out = pruner.run_inner().expect("ok"); assert_eq!(out, PrunerOutput::default(), "should early-return default output"); } @@ -588,28 +663,35 @@ mod tests { use crate::BlockStateDiff; let dir = TempDir::new().unwrap(); - let store: Arc = - Arc::new(MdbxProofsStorage::new(dir.path()).expect("env")); + let store: OpProofsStorage> = + OpProofsStorage::from(Arc::new(MdbxProofsStorageV2::new(dir.path()).expect("env"))); // Set earliest=4 explicitly let earliest_num = 4u64; let h4 = b256(4); - set_earliest(&store, earliest_num, h4); + { + let provider = store.provider_rw().expect("provider_rw"); + provider.set_earliest_block_number(earliest_num, h4).expect("set earliest"); - // Set latest=5 by storing block 5 - let b5 = block(5, h4); - store_block(&store, b5, BlockStateDiff::default()); + // Set latest=5 by storing block 5 + let b5 = block(5, h4); + provider.store_trie_updates(b5, BlockStateDiff::default()).expect("store b5"); + provider.commit().expect("commit"); + } // Sanity: earliest=4, latest=5 => interval=1 - let e = get_earliest(&store).unwrap(); - let l = get_latest(&store).unwrap(); - assert_eq!(e.0, 4); - assert_eq!(l.0, 5); + { + let provider = store.provider_ro().unwrap(); + let e = provider.get_earliest_block_number().unwrap().unwrap(); + let l = provider.get_latest_block_number().unwrap().unwrap(); + assert_eq!(e.0, 4); + assert_eq!(l.0, 5); + } // Require min_block_interval=2 (or greater) so interval < min let block_hash_reader = MockBlockHashReader::new(); - let pruner = OpProofStoragePruner::new(store, block_hash_reader, 2, 1000); + let pruner = OpProofStoragePruner::new(store, block_hash_reader, 2); let out = pruner.run_inner().expect("ok"); assert_eq!(out, PrunerOutput::default(), "no pruning should occur"); } -} +} \ No newline at end of file diff --git a/rust/op-reth/crates/trie/src/prune/task.rs b/rust/op-reth/crates/trie/src/prune/task.rs index 9f5b90a72b389..b7dd6e54b858e 100644 --- a/rust/op-reth/crates/trie/src/prune/task.rs +++ b/rust/op-reth/crates/trie/src/prune/task.rs @@ -1,4 +1,4 @@ -use crate::{OpProofsStore, prune::OpProofStoragePruner}; +use crate::{prune::OpProofStoragePruner, OpProofsStore}; use reth_provider::BlockHashReader; use reth_tasks::shutdown::GracefulShutdown; use tokio::{ @@ -7,30 +7,28 @@ use tokio::{ }; use tracing::info; -const PRUNE_BATCH_SIZE: u64 = 200; - /// Periodic pruner task: constructs the pruner and runs it every interval. #[derive(Debug)] -pub struct OpProofStoragePrunerTask { - pruner: OpProofStoragePruner, +pub struct OpProofStoragePrunerTask { + pruner: OpProofStoragePruner, min_block_interval: u64, task_run_interval: Duration, } -impl OpProofStoragePrunerTask +impl OpProofStoragePrunerTask where - P: OpProofsStore, + S: OpProofsStore, H: BlockHashReader, { /// Initialize a new [`OpProofStoragePrunerTask`] pub fn new( - provider: P, + store: S, hash_reader: H, min_block_interval: u64, task_run_interval: Duration, ) -> Self { let pruner = - OpProofStoragePruner::new(provider, hash_reader, min_block_interval, PRUNE_BATCH_SIZE); + OpProofStoragePruner::new(store, hash_reader, min_block_interval); Self { pruner, min_block_interval, task_run_interval } } diff --git a/rust/op-reth/crates/trie/src/state.rs b/rust/op-reth/crates/trie/src/state.rs new file mode 100644 index 0000000000000..cd2f9ca32485b --- /dev/null +++ b/rust/op-reth/crates/trie/src/state.rs @@ -0,0 +1,213 @@ +//! State management for the live trie collector. + +use crate::{ + overlay_provider::MemoryOverlayOpProofsStateProviderRef, provider::OpProofsStateProviderRef, + BlockStateDiff, OpProofsProviderRO, +}; +use alloy_eips::eip1898::BlockWithParent; +use alloy_primitives::{map::HashMap, B256}; +use parking_lot::RwLock; +use std::{collections::BTreeMap, sync::Arc}; + +/// Buffer for holding blocks waiting to be persisted. +/// +/// This acts as the in-memory "tip" of the chain for the trie calculator. +#[derive(Debug, Default)] +pub(crate) struct InMemoryState { + /// All blocks that are not on disk yet. + pub(crate) blocks: RwLock>>, + /// Mapping of block numbers to block hashes. + pub(crate) numbers: RwLock>, +} + +impl InMemoryState { + /// Create a new empty in-memory state. + pub(crate) fn new() -> Self { + Self { + blocks: RwLock::new(HashMap::default()), + numbers: RwLock::new(BTreeMap::new()), + } + } + + /// Insert a block into the buffer. + pub(crate) fn insert(&self, block: BlockWithParent, diff: BlockStateDiff) { + let hash = block.block.hash; + let number = block.block.number; + let state = Arc::new((block, diff)); + + // Write locks + let mut blocks = self.blocks.write(); + let mut numbers = self.numbers.write(); + + blocks.insert(hash, state); + numbers.insert(number, hash); + } + + /// Returns the number of buffered blocks. + pub(crate) fn len(&self) -> usize { + self.blocks.read().len() + } + + /// Returns true if the buffer is empty. + pub(crate) fn is_empty(&self) -> bool { + self.blocks.read().is_empty() + } + + /// Clear the buffer. + pub(crate) fn clear(&self) { + let mut blocks = self.blocks.write(); + let mut numbers = self.numbers.write(); + blocks.clear(); + numbers.clear(); + } + + /// Prunes blocks from the buffer that are strictly before the given block number. + pub(crate) fn prune_before(&self, number: u64) { + let mut blocks = self.blocks.write(); + let mut numbers = self.numbers.write(); + + // Identify block numbers to remove + let mut to_remove = Vec::new(); + // Use BTreeMap's ordered nature + for (&num, &hash) in numbers.iter() { + if num < number { + to_remove.push((num, hash)); + } else { + break; + } + } + + for (num, hash) in to_remove { + numbers.remove(&num); + blocks.remove(&hash); + } + } + + /// Removes blocks starting from `from` (inclusive) through the tip. + /// + /// Mirrors the disk `unwind_history(to)` semantics where `to.block.number` is the + /// first block removed. After this call, only blocks with number < `from` remain. + pub(crate) fn unwind(&self, from: u64) { + let mut blocks = self.blocks.write(); + let mut numbers = self.numbers.write(); + + let mut to_remove = Vec::new(); + for (&num, &hash) in numbers.iter().rev() { + if num >= from { + to_remove.push((num, hash)); + } else { + break; + } + } + + for (num, hash) in to_remove { + numbers.remove(&num); + blocks.remove(&hash); + } + } + + /// Returns the state for a given block hash. + pub(crate) fn state_by_hash(&self, hash: B256) -> Option> { + self.blocks.read().get(&hash).cloned() + } + + /// Returns the hash for a specific block number + pub(crate) fn hash_by_number(&self, number: u64) -> Option { + self.numbers.read().get(&number).copied() + } + + /// Returns the state for a given block number. + pub(crate) fn state_by_number(&self, number: u64) -> Option> { + let hash = self.hash_by_number(number)?; + self.state_by_hash(hash) + } +} + +/// Manager for the in-memory state of the live trie. +#[derive(Debug, Clone, Default)] +pub struct LiveTrieState { + inner: Arc, +} + +impl LiveTrieState { + /// Create a new live trie state manager. + pub fn new() -> Self { + Self { + inner: Arc::new(InMemoryState::new()), + } + } + + /// Insert a block into the buffer. + pub fn insert(&self, block: BlockWithParent, diff: BlockStateDiff) { + self.inner.insert(block, diff); + } + + /// Returns the number of buffered blocks. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns true if the buffer is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Clear the buffer. + pub fn clear(&self) { + self.inner.clear(); + } + + /// Return a reference to the inner in-memory state. + pub(crate) fn inner(&self) -> Arc { + self.inner.clone() + } + + /// Prunes blocks from the buffer that are strictly before the given block number. + pub fn prune_before(&self, number: u64) { + self.inner.prune_before(number); + } + + /// Removes blocks starting from `from` (inclusive) through the tip. + pub fn unwind(&self, from: u64) { + self.inner.unwind(from); + } + + /// Returns the state for a given block hash. + pub fn state_by_hash(&self, hash: B256) -> Option> { + self.inner.state_by_hash(hash) + } + + /// Returns the state for a given block number. + pub fn state_by_number(&self, number: u64) -> Option> { + self.inner.state_by_number(number) + } + + /// Return state provider with reference to in-memory blocks that overlay storage state. + /// + /// This retrieves the chain of blocks ending at `hash` from the in-memory buffer, + /// providing a view that includes both the buffered changes and the underlying disk state. + pub fn state_provider<'a, P>( + &self, + hash: B256, + inner: OpProofsStateProviderRef<'a, P>, + ) -> MemoryOverlayOpProofsStateProviderRef<'a, P> + where + P: OpProofsProviderRO + Clone, + { + let mut in_memory = Vec::new(); + let blocks = self.inner.blocks.read(); + + // Trace back from the requested hash to finding no parent in memory + let mut current_hash = hash; + while let Some(state) = blocks.get(¤t_hash) { + in_memory.push(state.clone()); + current_hash = state.0.parent; + } + + // The vector is currently Newest -> Oldest. Reverse it to Oldest -> Newest + // as expected by the overlay provider for correct replay. + in_memory.reverse(); + + MemoryOverlayOpProofsStateProviderRef::new(inner, in_memory) + } +} diff --git a/rust/op-reth/crates/trie/tests/live.rs b/rust/op-reth/crates/trie/tests/live.rs index 967653505d118..aa2eadf1d0634 100644 --- a/rust/op-reth/crates/trie/tests/live.rs +++ b/rust/op-reth/crates/trie/tests/live.rs @@ -12,8 +12,8 @@ use reth_evm::{ConfigureEvm, execute::Executor}; use reth_evm_ethereum::EthEvmConfig; use reth_node_api::{NodePrimitives, NodeTypesWithDB}; use reth_optimism_trie::{ - MdbxProofsStorage, MdbxProofsStorageV2, OpProofsStorage, OpProofsStorageError, - OpProofsStore, initialize::InitializationJob, live::LiveTrieCollector, + MdbxProofsStorage, MdbxProofsStorageV2, OpProofStoragePruner, OpProofsStorage, + OpProofsStorageError, OpProofsStore, initialize::InitializationJob, live::LiveTrieCollector, }; use reth_primitives_traits::{Block as _, RecoveredBlock}; use reth_provider::{ @@ -272,8 +272,14 @@ where initialization_job.run(last_block_number, last_block_hash)?; } - // Execute blocks after initialization using live collector + // Execute blocks after initialization using live collector. + // A single collector is shared across all blocks so the in-memory buffer accumulates + // state between iterations (the new async-persistence architecture requires this). let evm_config = EthEvmConfig::ethereum(chain_spec.clone()); + let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; + let pruner = OpProofStoragePruner::new(storage.clone(), blockchain_db.clone(), 1000); + let live_trie_collector = + LiveTrieCollector::new(evm_config, blockchain_db, storage.clone(), pruner); for (idx, block_spec) in scenario.blocks_after_initialization.iter().enumerate() { let block_number = last_block_number + idx as u64 + 1; @@ -289,11 +295,6 @@ where // Execute the block to get the correct state root let execution_output = execute_block(&mut block, &provider_factory, &chain_spec)?; - // Create a fresh blockchain provider to ensure it sees all committed blocks - let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; - let live_trie_collector = - LiveTrieCollector::new(evm_config.clone(), blockchain_db, &storage); - // Use the live collector to execute and store trie updates live_trie_collector.execute_and_store_block_updates(&block)?; @@ -303,6 +304,9 @@ where last_block_hash = block.hash(); } + // Drain any pending in-memory blocks to disk before returning. + live_trie_collector.wait_for_persistence(); + Ok(()) } @@ -388,14 +392,20 @@ where ); let blockchain_db = BlockchainProvider::new(provider_factory).unwrap(); + #[allow(clippy::useless_conversion)] let storage_wrapped: OpProofsStorage = storage.into(); - let collector = - LiveTrieCollector::new(EthEvmConfig::ethereum(chain_spec.clone()), blockchain_db, &storage_wrapped); + let pruner = OpProofStoragePruner::new(storage_wrapped.clone(), blockchain_db.clone(), 1000); + let collector = LiveTrieCollector::new( + EthEvmConfig::ethereum(chain_spec.clone()), + blockchain_db, + storage_wrapped, + pruner, + ); - // EXPECT: MissingParentBlock + // EXPECT: OutOfOrder (parent hash doesn't match the current tip) let err = collector.execute_and_store_block_updates(&incorrect_block).unwrap_err(); - assert!(matches!(err, OpProofsStorageError::MissingParentBlock { .. })); + assert!(matches!(err, OpProofsStorageError::OutOfOrder { .. })); Ok(()) } @@ -433,9 +443,15 @@ where // Generate a second block normally let blockchain_db = BlockchainProvider::new(provider_factory.clone()).unwrap(); + #[allow(clippy::useless_conversion)] let storage_wrapped: OpProofsStorage> = storage.into(); - let collector = - LiveTrieCollector::new(EthEvmConfig::ethereum(chain_spec.clone()), blockchain_db, &storage_wrapped); + let pruner = OpProofStoragePruner::new(storage_wrapped.clone(), blockchain_db.clone(), 1000); + let collector = LiveTrieCollector::new( + EthEvmConfig::ethereum(chain_spec.clone()), + blockchain_db, + storage_wrapped, + pruner, + ); // Create the next block let mut nonce_counter = 0;