From 0bf020bc80f66ec34fb9baa51cceaebfc39c2709 Mon Sep 17 00:00:00 2001 From: frisitano Date: Wed, 18 Feb 2026 19:43:56 +0400 Subject: [PATCH] remote source startup refactor --- crates/node/Cargo.toml | 2 +- crates/node/src/add_ons/mod.rs | 1 + .../node/src/add_ons/remote_block_source.rs | 82 +++++++++++++------ crates/node/src/test_utils/fixture.rs | 14 +++- crates/node/src/test_utils/reboot.rs | 39 +++++++-- crates/node/tests/remote_block_source.rs | 53 ++++++++++++ 6 files changed, 158 insertions(+), 33 deletions(-) diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 3847bc19..8f8f9d3c 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -56,6 +56,7 @@ reth-node-types.workspace = true reth-network.workspace = true reth-network-api.workspace = true reth-network-p2p.workspace = true +reth-provider.workspace = true reth-revm.workspace = true reth-rpc-api.workspace = true reth-rpc-eth-api.workspace = true @@ -86,7 +87,6 @@ reth-db = { workspace = true, optional = true, features = ["test-utils"] } reth-e2e-test-utils = { workspace = true, optional = true } reth-engine-local = { workspace = true, optional = true } reth-fs-util = { workspace = true, optional = true } -reth-provider = { workspace = true, optional = true } reth-rpc-layer = { workspace = true, optional = true } reth-rpc-server-types = { workspace = true, optional = true } reth-storage-api = { workspace = true, optional = true } diff --git a/crates/node/src/add_ons/mod.rs b/crates/node/src/add_ons/mod.rs index cda0e7f8..50b9410d 100644 --- a/crates/node/src/add_ons/mod.rs +++ b/crates/node/src/add_ons/mod.rs @@ -171,6 +171,7 @@ where let remote_source = RemoteBlockSourceAddOn::new( remote_block_source_config, rollup_manager_handle.clone(), + rpc_handle.provider().clone(), ) .await?; ctx.node diff --git a/crates/node/src/add_ons/remote_block_source.rs b/crates/node/src/add_ons/remote_block_source.rs index 29802aa7..a6adbfe0 100644 --- a/crates/node/src/add_ons/remote_block_source.rs +++ b/crates/node/src/add_ons/remote_block_source.rs @@ -3,11 +3,12 @@ use crate::args::RemoteBlockSourceArgs; use alloy_primitives::Signature; -use alloy_provider::{Provider, ProviderBuilder}; +use alloy_provider::{Provider, ProviderBuilder, RootProvider}; use alloy_rpc_client::RpcClient; use alloy_transport::layers::RetryBackoffLayer; use futures::StreamExt; use reth_network_api::{FullNetwork, PeerId}; +use reth_provider::BlockReader; use reth_scroll_node::ScrollNetworkPrimitives; use reth_tasks::shutdown::Shutdown; use reth_tokio_util::EventStream; @@ -26,7 +27,12 @@ where /// Configuration for the remote block source. config: RemoteBlockSourceArgs, /// Handle to the chain orchestrator for sending commands. - handle: ChainOrchestratorHandle, + orchestrator_handle: ChainOrchestratorHandle, + /// An event stream for listening to chain orchestrator events, used to wait for block build + /// completion. + events: EventStream, + /// A provider for the remote node, used to fetch blocks and block information. + remote: RootProvider, /// Tracks the last block number we imported from remote. /// This is different from local head because we build blocks on top of imports. last_imported_block: u64, @@ -40,25 +46,19 @@ where pub async fn new( config: RemoteBlockSourceArgs, handle: ChainOrchestratorHandle, + provider: impl BlockReader, ) -> eyre::Result { - let last_imported_block = handle.status().await?.l2.fcs.head_block_info().number; - Ok(Self { config, handle, last_imported_block }) - } - - /// Runs the remote block source until shutdown. - pub async fn run_until_shutdown(mut self, mut shutdown: Shutdown) -> eyre::Result<()> { - let Some(url) = self.config.url.clone() else { + // Build remote provider with retry layer. + let Some(url) = config.url.clone() else { tracing::error!(target: "scroll::remote_source", "URL required when remote-source is enabled"); return Err(eyre::eyre!("URL required when remote-source is enabled")); }; - - // Build remote provider with retry layer let retry_layer = RetryBackoffLayer::new(10, 100, 330); let client = RpcClient::builder().layer(retry_layer).http(url); let remote = ProviderBuilder::<_, _, Scroll>::default().connect_client(client); // Get event listener for waiting on block completion - let mut event_stream = match self.handle.get_event_listener().await { + let events = match handle.get_event_listener().await { Ok(stream) => stream, Err(e) => { tracing::error!(target: "scroll::remote_source", ?e, "Failed to get event listener"); @@ -66,6 +66,44 @@ where } }; + // Determine the last imported block by finding the highest common block + // between the local chain and the remote node. + let local_head = provider.best_block_number()?; + let remote_head = remote.get_block_number().await?; + + let last_imported_block; + let mut search = local_head.min(remote_head); + loop { + if search == 0 { + // Genesis is always a common block (same chain spec assumed). + last_imported_block = 0; + break; + } + let local_hash = provider.block_hash(search)?; + let remote_block = remote.get_block_by_number(search.into()).await?; + match (local_hash, remote_block) { + (Some(lh), Some(rb)) if lh == rb.header.hash => { + last_imported_block = search; + break; + } + _ => { + search = search.saturating_sub(1); + } + } + } + tracing::info!( + target: "scroll::remote_source", + last_imported_block, + local_head, + remote_head, + "Determined highest common block with remote" + ); + + Ok(Self { config, orchestrator_handle: handle, events, remote, last_imported_block }) + } + + /// Runs the remote block source until shutdown. + pub async fn run_until_shutdown(mut self, mut shutdown: Shutdown) -> eyre::Result<()> { let mut poll_interval = interval(Duration::from_millis(self.config.poll_interval_ms)); loop { @@ -73,7 +111,7 @@ where biased; _guard = &mut shutdown => break, _ = poll_interval.tick() => { - if let Err(e) = self.follow_and_build(&remote, &mut event_stream).await { + if let Err(e) = self.follow_and_build().await { tracing::error!(target: "scroll::remote_source", ?e, "Sync error"); } } @@ -84,14 +122,11 @@ where } /// Follows the remote node and builds blocks on top of imported blocks. - async fn follow_and_build>( - &mut self, - remote: &P, - event_stream: &mut EventStream, - ) -> eyre::Result<()> { + async fn follow_and_build(&mut self) -> eyre::Result<()> { loop { // Get remote head - let remote_block = remote + let remote_block = self + .remote .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest) .full() .await? @@ -117,7 +152,8 @@ where // Fetch and import the next block from remote let next_block_num = self.last_imported_block + 1; - let block = remote + let block = self + .remote .get_block_by_number(next_block_num.into()) .full() .await? @@ -134,7 +170,7 @@ where // Import the block (this will cause a reorg if we had a locally built block at this // height) - let chain_import = match self.handle.import_block(block_with_peer).await { + let chain_import = match self.orchestrator_handle.import_block(block_with_peer).await { Ok(Ok(chain_import)) => { self.last_imported_block = next_block_num; chain_import @@ -155,12 +191,12 @@ where } // Trigger block building on top of the imported block - self.handle.build_block(); + self.orchestrator_handle.build_block(); // Wait for BlockSequenced event tracing::debug!(target: "scroll::remote_source", "Waiting for block to be built..."); loop { - match event_stream.next().await { + match self.events.next().await { Some(ChainOrchestratorEvent::BlockSequenced(block)) => { tracing::info!(target: "scroll::remote_source", block_number = block.header.number, diff --git a/crates/node/src/test_utils/fixture.rs b/crates/node/src/test_utils/fixture.rs index 85a83d6f..58d2eb4c 100644 --- a/crates/node/src/test_utils/fixture.rs +++ b/crates/node/src/test_utils/fixture.rs @@ -61,6 +61,8 @@ pub struct TestFixture { pub anvil: Option, /// The configuration for the nodes. pub config: ScrollRollupNodeConfig, + /// Whether this fixture has a remote source node (always the last node). + pub has_remote_source_node: bool, } impl Debug for TestFixture { @@ -70,6 +72,7 @@ impl Debug for TestFixture { .field("wallet", &">") .field("chain_spec", &self.chain_spec) .field("anvil", &self.anvil.is_some()) + .field("has_remote_source_node", &self.has_remote_source_node) .field("_tasks", &"") .finish() } @@ -703,7 +706,7 @@ impl TestFixtureBuilder { None }; - let (mut nodes, dbs, wallet) = setup_engine( + let (mut nodes, mut dbs, wallet) = setup_engine( self.config.clone(), self.num_nodes, chain_spec.clone(), @@ -728,7 +731,7 @@ impl TestFixtureBuilder { // Use a fast poll interval for tests remote_config.remote_block_source_args.poll_interval_ms = 100; - let (mut remote_nodes, _, _) = setup_engine( + let (mut remote_nodes, remote_dbs, _) = setup_engine( remote_config, 1, chain_spec.clone(), @@ -739,13 +742,15 @@ impl TestFixtureBuilder { .await?; nodes.push(remote_nodes.pop().unwrap()); + dbs.extend(remote_dbs); } - let mut node_handles = Vec::with_capacity(nodes.len()); + let nodes_len = nodes.len(); + let mut node_handles = Vec::with_capacity(nodes_len); for (index, node) in nodes.into_iter().enumerate() { let typ = if self.config.sequencer_args.sequencer_enabled && index == 0 { NodeType::Sequencer - } else if self.config.remote_block_source_args.enabled && index == node_handles.len() { + } else if self.has_remote_source_node && index == nodes_len - 1 { NodeType::RemoteSource } else { NodeType::Follower @@ -760,6 +765,7 @@ impl TestFixtureBuilder { chain_spec, anvil, config: self.config, + has_remote_source_node: self.has_remote_source_node, }) } diff --git a/crates/node/src/test_utils/reboot.rs b/crates/node/src/test_utils/reboot.rs index 698753fa..cb9591aa 100644 --- a/crates/node/src/test_utils/reboot.rs +++ b/crates/node/src/test_utils/reboot.rs @@ -93,14 +93,43 @@ impl TestFixture { tracing::info!("Starting node at index {} (reusing database)", node_index); + // Detect whether the node being restarted is the remote source (always the last index). + let is_remote = self.has_remote_source_node && node_index == self.nodes.len() - 1; + + let (config, reboot_node_idx) = if is_remote { + // Reconstruct the remote config from the base config. + // Node 0 (sequencer) must still be running so we can read its RPC port. + let sequencer_port = self.nodes[0] + .as_ref() + .expect("sequencer must be running to restart remote source") + .node + .rpc_url() + .port() + .expect("sequencer RPC port must be set"); + let sequencer_url: reqwest::Url = + format!("http://localhost:{}", sequencer_port).parse()?; + + let mut remote_config = self.config.clone(); + remote_config.sequencer_args.sequencer_enabled = true; + remote_config.sequencer_args.auto_start = false; + remote_config.remote_block_source_args.enabled = true; + remote_config.remote_block_source_args.url = Some(sequencer_url); + remote_config.remote_block_source_args.poll_interval_ms = 100; + + // Use reboot_node_idx=0 so setup_engine does NOT disable sequencer_enabled. + (remote_config, 0usize) + } else { + (self.config.clone(), node_index) + }; + // Create node instance with existing database let (mut new_nodes, _, _) = setup_engine( - self.config.clone(), + config, 1, self.chain_spec.clone(), true, false, - Some((node_index, self.dbs[node_index].clone())), + Some((reboot_node_idx, self.dbs[node_index].clone())), ) .await?; @@ -109,10 +138,10 @@ impl TestFixture { } let new_node = new_nodes.remove(0); - let typ = if self.config.sequencer_args.sequencer_enabled && node_index == 0 { - crate::test_utils::fixture::NodeType::Sequencer - } else if self.config.remote_block_source_args.enabled && node_index == self.nodes.len() { + let typ = if is_remote { crate::test_utils::fixture::NodeType::RemoteSource + } else if self.config.sequencer_args.sequencer_enabled && node_index == 0 { + crate::test_utils::fixture::NodeType::Sequencer } else { crate::test_utils::fixture::NodeType::Follower }; diff --git a/crates/node/tests/remote_block_source.rs b/crates/node/tests/remote_block_source.rs index eb6cf734..0b3cb014 100644 --- a/crates/node/tests/remote_block_source.rs +++ b/crates/node/tests/remote_block_source.rs @@ -23,3 +23,56 @@ async fn test_remote_block_source() -> eyre::Result<()> { Ok(()) } + +/// Test that the remote block source correctly determines its resume point on restart. +/// +/// The remote source's local chain has blocks 1-3 (imported from sequencer) plus +/// block 4 (built locally). The sequencer goes on to produce blocks 4-6. On restart, +/// the highest-common-block walk must identify block 3 (locally-built block 4 diverges +/// from sequencer's block 4) and import only blocks 4-6. +/// +/// If the detection were broken (e.g. always returning 0), the remote source would try +/// to re-import blocks 1-6, producing 6 `BlockSequenced` events before reaching blocks +/// 5, 6, 7. This test asserts exactly three events in the correct order, confirming +/// the resume point is block 3. +#[allow(clippy::large_stack_frames)] +#[tokio::test] +async fn test_remote_block_source_resumes_from_correct_head() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + let mut fixture = TestFixture::builder().sequencer().remote_source_node().build().await?; + + fixture.l1().sync().await?; + + // Sequencer produces blocks 1-3; remote source imports each and builds on top. + // After this phase the remote source local chain is: 1, 2, 3 (sequencer) + 4 (local). + for i in 1..=3u64 { + fixture.build_block().expect_block_number(i).build_and_await_block().await?; + fixture.expect_event_on(1).block_sequenced(i + 1).await?; + } + + // Shut down the remote source node (index 1). + fixture.shutdown_node(1).await?; + + // Sequencer produces blocks 4-6 while the remote source is offline. + for i in 4..=6u64 { + fixture.build_block().expect_block_number(i).build_and_await_block().await?; + } + + // Restart the remote source. + // Expected detection: local_head=4, remote_head=6, min=4. + // Block 4: local hash (locally built) ≠ remote hash (sequencer's) → walk back. + // Block 3: local hash == remote hash → last_imported_block = 3. + // The add-on should therefore import blocks 4, 5, 6 and build 5, 6, 7 on top. + fixture.start_node(1).await?; + + // Synchronise L1 state on the restarted remote source node. + fixture.l1().for_node(1).sync().await?; + + // Verify the remote source catches up with the 3 missed sequencer blocks. + fixture.expect_event_on(1).block_sequenced(5).await?; + fixture.expect_event_on(1).block_sequenced(6).await?; + fixture.expect_event_on(1).block_sequenced(7).await?; + + Ok(()) +}