Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ reth-node-types.workspace = true
reth-network.workspace = true
reth-network-api.workspace = true
reth-network-p2p.workspace = true
reth-provider.workspace = true
reth-revm.workspace = true
reth-rpc-api.workspace = true
reth-rpc-eth-api.workspace = true
Expand Down Expand Up @@ -86,7 +87,6 @@ reth-db = { workspace = true, optional = true, features = ["test-utils"] }
reth-e2e-test-utils = { workspace = true, optional = true }
reth-engine-local = { workspace = true, optional = true }
reth-fs-util = { workspace = true, optional = true }
reth-provider = { workspace = true, optional = true }
reth-rpc-layer = { workspace = true, optional = true }
reth-rpc-server-types = { workspace = true, optional = true }
reth-storage-api = { workspace = true, optional = true }
Expand Down
1 change: 1 addition & 0 deletions crates/node/src/add_ons/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ where
let remote_source = RemoteBlockSourceAddOn::new(
remote_block_source_config,
rollup_manager_handle.clone(),
rpc_handle.provider().clone(),
)
.await?;
ctx.node
Expand Down
82 changes: 59 additions & 23 deletions crates/node/src/add_ons/remote_block_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

use crate::args::RemoteBlockSourceArgs;
use alloy_primitives::Signature;
use alloy_provider::{Provider, ProviderBuilder};
use alloy_provider::{Provider, ProviderBuilder, RootProvider};
use alloy_rpc_client::RpcClient;
use alloy_transport::layers::RetryBackoffLayer;
use futures::StreamExt;
use reth_network_api::{FullNetwork, PeerId};
use reth_provider::BlockReader;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tasks::shutdown::Shutdown;
use reth_tokio_util::EventStream;
Expand All @@ -26,7 +27,12 @@ where
/// Configuration for the remote block source.
config: RemoteBlockSourceArgs,
/// Handle to the chain orchestrator for sending commands.
handle: ChainOrchestratorHandle<N>,
orchestrator_handle: ChainOrchestratorHandle<N>,
/// An event stream for listening to chain orchestrator events, used to wait for block build
/// completion.
events: EventStream<ChainOrchestratorEvent>,
/// A provider for the remote node, used to fetch blocks and block information.
remote: RootProvider<Scroll>,
/// Tracks the last block number we imported from remote.
/// This is different from local head because we build blocks on top of imports.
last_imported_block: u64,
Expand All @@ -40,40 +46,72 @@ where
pub async fn new(
config: RemoteBlockSourceArgs,
handle: ChainOrchestratorHandle<N>,
provider: impl BlockReader,
) -> eyre::Result<Self> {
let last_imported_block = handle.status().await?.l2.fcs.head_block_info().number;
Ok(Self { config, handle, last_imported_block })
}

/// Runs the remote block source until shutdown.
pub async fn run_until_shutdown(mut self, mut shutdown: Shutdown) -> eyre::Result<()> {
let Some(url) = self.config.url.clone() else {
// Build remote provider with retry layer.
let Some(url) = config.url.clone() else {
tracing::error!(target: "scroll::remote_source", "URL required when remote-source is enabled");
return Err(eyre::eyre!("URL required when remote-source is enabled"));
};

// Build remote provider with retry layer
let retry_layer = RetryBackoffLayer::new(10, 100, 330);
let client = RpcClient::builder().layer(retry_layer).http(url);
let remote = ProviderBuilder::<_, _, Scroll>::default().connect_client(client);

// Get event listener for waiting on block completion
let mut event_stream = match self.handle.get_event_listener().await {
let events = match handle.get_event_listener().await {
Ok(stream) => stream,
Err(e) => {
tracing::error!(target: "scroll::remote_source", ?e, "Failed to get event listener");
return Err(eyre::eyre!(e));
}
};

// Determine the last imported block by finding the highest common block
// between the local chain and the remote node.
let local_head = provider.best_block_number()?;
let remote_head = remote.get_block_number().await?;

let last_imported_block;
let mut search = local_head.min(remote_head);
loop {
if search == 0 {
// Genesis is always a common block (same chain spec assumed).
last_imported_block = 0;
break;
}
let local_hash = provider.block_hash(search)?;
let remote_block = remote.get_block_by_number(search.into()).await?;
match (local_hash, remote_block) {
(Some(lh), Some(rb)) if lh == rb.header.hash => {
last_imported_block = search;
break;
}
_ => {
search = search.saturating_sub(1);
}
}
}
tracing::info!(
target: "scroll::remote_source",
last_imported_block,
local_head,
remote_head,
"Determined highest common block with remote"
);

Ok(Self { config, orchestrator_handle: handle, events, remote, last_imported_block })
}

/// Runs the remote block source until shutdown.
pub async fn run_until_shutdown(mut self, mut shutdown: Shutdown) -> eyre::Result<()> {
let mut poll_interval = interval(Duration::from_millis(self.config.poll_interval_ms));

loop {
tokio::select! {
biased;
_guard = &mut shutdown => break,
_ = poll_interval.tick() => {
if let Err(e) = self.follow_and_build(&remote, &mut event_stream).await {
if let Err(e) = self.follow_and_build().await {
tracing::error!(target: "scroll::remote_source", ?e, "Sync error");
}
}
Expand All @@ -84,14 +122,11 @@ where
}

/// Follows the remote node and builds blocks on top of imported blocks.
async fn follow_and_build<P: Provider<Scroll>>(
&mut self,
remote: &P,
event_stream: &mut EventStream<ChainOrchestratorEvent>,
) -> eyre::Result<()> {
async fn follow_and_build(&mut self) -> eyre::Result<()> {
loop {
// Get remote head
let remote_block = remote
let remote_block = self
.remote
.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
.full()
.await?
Expand All @@ -117,7 +152,8 @@ where

// Fetch and import the next block from remote
let next_block_num = self.last_imported_block + 1;
let block = remote
let block = self
.remote
.get_block_by_number(next_block_num.into())
.full()
.await?
Expand All @@ -134,7 +170,7 @@ where

// Import the block (this will cause a reorg if we had a locally built block at this
// height)
let chain_import = match self.handle.import_block(block_with_peer).await {
let chain_import = match self.orchestrator_handle.import_block(block_with_peer).await {
Ok(Ok(chain_import)) => {
self.last_imported_block = next_block_num;
chain_import
Expand All @@ -155,12 +191,12 @@ where
}

// Trigger block building on top of the imported block
self.handle.build_block();
self.orchestrator_handle.build_block();

// Wait for BlockSequenced event
tracing::debug!(target: "scroll::remote_source", "Waiting for block to be built...");
loop {
match event_stream.next().await {
match self.events.next().await {
Some(ChainOrchestratorEvent::BlockSequenced(block)) => {
tracing::info!(target: "scroll::remote_source",
block_number = block.header.number,
Expand Down
14 changes: 10 additions & 4 deletions crates/node/src/test_utils/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct TestFixture {
pub anvil: Option<anvil::NodeHandle>,
/// The configuration for the nodes.
pub config: ScrollRollupNodeConfig,
/// Whether this fixture has a remote source node (always the last node).
pub has_remote_source_node: bool,
}

impl Debug for TestFixture {
Expand All @@ -70,6 +72,7 @@ impl Debug for TestFixture {
.field("wallet", &"<Mutex<Wallet>>")
.field("chain_spec", &self.chain_spec)
.field("anvil", &self.anvil.is_some())
.field("has_remote_source_node", &self.has_remote_source_node)
.field("_tasks", &"<TaskManager>")
.finish()
}
Expand Down Expand Up @@ -703,7 +706,7 @@ impl TestFixtureBuilder {
None
};

let (mut nodes, dbs, wallet) = setup_engine(
let (mut nodes, mut dbs, wallet) = setup_engine(
self.config.clone(),
self.num_nodes,
chain_spec.clone(),
Expand All @@ -728,7 +731,7 @@ impl TestFixtureBuilder {
// Use a fast poll interval for tests
remote_config.remote_block_source_args.poll_interval_ms = 100;

let (mut remote_nodes, _, _) = setup_engine(
let (mut remote_nodes, remote_dbs, _) = setup_engine(
remote_config,
1,
chain_spec.clone(),
Expand All @@ -739,13 +742,15 @@ impl TestFixtureBuilder {
.await?;

nodes.push(remote_nodes.pop().unwrap());
dbs.extend(remote_dbs);
}

let mut node_handles = Vec::with_capacity(nodes.len());
let nodes_len = nodes.len();
let mut node_handles = Vec::with_capacity(nodes_len);
for (index, node) in nodes.into_iter().enumerate() {
let typ = if self.config.sequencer_args.sequencer_enabled && index == 0 {
NodeType::Sequencer
} else if self.config.remote_block_source_args.enabled && index == node_handles.len() {
} else if self.has_remote_source_node && index == nodes_len - 1 {
NodeType::RemoteSource
} else {
NodeType::Follower
Expand All @@ -760,6 +765,7 @@ impl TestFixtureBuilder {
chain_spec,
anvil,
config: self.config,
has_remote_source_node: self.has_remote_source_node,
})
}

Expand Down
39 changes: 34 additions & 5 deletions crates/node/src/test_utils/reboot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,43 @@ impl TestFixture {

tracing::info!("Starting node at index {} (reusing database)", node_index);

// Detect whether the node being restarted is the remote source (always the last index).
let is_remote = self.has_remote_source_node && node_index == self.nodes.len() - 1;

let (config, reboot_node_idx) = if is_remote {
// Reconstruct the remote config from the base config.
// Node 0 (sequencer) must still be running so we can read its RPC port.
let sequencer_port = self.nodes[0]
.as_ref()
.expect("sequencer must be running to restart remote source")
.node
.rpc_url()
.port()
.expect("sequencer RPC port must be set");
let sequencer_url: reqwest::Url =
format!("http://localhost:{}", sequencer_port).parse()?;

let mut remote_config = self.config.clone();
remote_config.sequencer_args.sequencer_enabled = true;
remote_config.sequencer_args.auto_start = false;
remote_config.remote_block_source_args.enabled = true;
remote_config.remote_block_source_args.url = Some(sequencer_url);
remote_config.remote_block_source_args.poll_interval_ms = 100;

// Use reboot_node_idx=0 so setup_engine does NOT disable sequencer_enabled.
(remote_config, 0usize)
} else {
(self.config.clone(), node_index)
};

// Create node instance with existing database
let (mut new_nodes, _, _) = setup_engine(
self.config.clone(),
config,
1,
self.chain_spec.clone(),
true,
false,
Some((node_index, self.dbs[node_index].clone())),
Some((reboot_node_idx, self.dbs[node_index].clone())),
)
.await?;

Expand All @@ -109,10 +138,10 @@ impl TestFixture {
}

let new_node = new_nodes.remove(0);
let typ = if self.config.sequencer_args.sequencer_enabled && node_index == 0 {
crate::test_utils::fixture::NodeType::Sequencer
} else if self.config.remote_block_source_args.enabled && node_index == self.nodes.len() {
let typ = if is_remote {
crate::test_utils::fixture::NodeType::RemoteSource
} else if self.config.sequencer_args.sequencer_enabled && node_index == 0 {
crate::test_utils::fixture::NodeType::Sequencer
} else {
crate::test_utils::fixture::NodeType::Follower
};
Expand Down
53 changes: 53 additions & 0 deletions crates/node/tests/remote_block_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,56 @@ async fn test_remote_block_source() -> eyre::Result<()> {

Ok(())
}

/// Test that the remote block source correctly determines its resume point on restart.
///
/// The remote source's local chain has blocks 1-3 (imported from sequencer) plus
/// block 4 (built locally). The sequencer goes on to produce blocks 4-6. On restart,
/// the highest-common-block walk must identify block 3 (locally-built block 4 diverges
/// from sequencer's block 4) and import only blocks 4-6.
///
/// If the detection were broken (e.g. always returning 0), the remote source would try
/// to re-import blocks 1-6, producing 6 `BlockSequenced` events before reaching blocks
/// 5, 6, 7. This test asserts exactly three events in the correct order, confirming
/// the resume point is block 3.
#[allow(clippy::large_stack_frames)]
#[tokio::test]
async fn test_remote_block_source_resumes_from_correct_head() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

let mut fixture = TestFixture::builder().sequencer().remote_source_node().build().await?;

fixture.l1().sync().await?;

// Sequencer produces blocks 1-3; remote source imports each and builds on top.
// After this phase the remote source local chain is: 1, 2, 3 (sequencer) + 4 (local).
for i in 1..=3u64 {
fixture.build_block().expect_block_number(i).build_and_await_block().await?;
fixture.expect_event_on(1).block_sequenced(i + 1).await?;
}

// Shut down the remote source node (index 1).
fixture.shutdown_node(1).await?;

// Sequencer produces blocks 4-6 while the remote source is offline.
for i in 4..=6u64 {
fixture.build_block().expect_block_number(i).build_and_await_block().await?;
}

// Restart the remote source.
// Expected detection: local_head=4, remote_head=6, min=4.
// Block 4: local hash (locally built) ≠ remote hash (sequencer's) → walk back.
// Block 3: local hash == remote hash → last_imported_block = 3.
// The add-on should therefore import blocks 4, 5, 6 and build 5, 6, 7 on top.
fixture.start_node(1).await?;

// Synchronise L1 state on the restarted remote source node.
fixture.l1().for_node(1).sync().await?;

// Verify the remote source catches up with the 3 missed sequencer blocks.
fixture.expect_event_on(1).block_sequenced(5).await?;
fixture.expect_event_on(1).block_sequenced(6).await?;
fixture.expect_event_on(1).block_sequenced(7).await?;

Ok(())
}
Loading