From 4e03073c750834ae6ca170f14b644d620b99be63 Mon Sep 17 00:00:00 2001 From: yug49 <148035793+yug49@users.noreply.github.com> Date: Sat, 6 Dec 2025 07:01:51 +0530 Subject: [PATCH] refactor: reorg handler trait --- src/block_range_scanner.rs | 12 ++++---- src/block_range_scanner/common.rs | 23 ++++++++-------- src/block_range_scanner/reorg_handler.rs | 35 ++++++++++++++++++++++-- src/block_range_scanner/sync_handler.rs | 17 +++++++----- 4 files changed, 61 insertions(+), 26 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 1c0cfb9b..93c202ce 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -86,7 +86,7 @@ mod reorg_handler; mod ring_buffer; mod sync_handler; -use reorg_handler::ReorgHandler; +use reorg_handler::{DefaultReorgHandler, ReorgHandler}; pub use ring_buffer::RingBufferCapacity; pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000; @@ -331,7 +331,7 @@ impl Service { tokio::spawn(async move { let mut reorg_handler = - ReorgHandler::new(provider.clone(), past_blocks_storage_capacity); + DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity); common::stream_live_blocks( range_start, @@ -378,7 +378,7 @@ impl Service { tokio::spawn(async move { let mut reorg_handler = - ReorgHandler::new(provider.clone(), past_blocks_storage_capacity); + DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity); _ = common::stream_historical_range( start_block_num, @@ -432,7 +432,7 @@ impl Service { tokio::spawn(async move { let mut reorg_handler = - ReorgHandler::new(provider.clone(), past_blocks_storage_capacity); + DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity); Self::stream_rewind(from, to, max_block_range, &sender, &provider, &mut reorg_handler) .await; @@ -448,13 +448,13 @@ impl Service { /// # Errors /// /// Returns an error if the stream fails - async fn stream_rewind( + async fn stream_rewind>( from: N::BlockResponse, to: N::BlockResponse, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, + reorg_handler: &mut R, ) { let mut batch_count = 0; diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index 6140c853..6eaada5e 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -16,14 +16,14 @@ use alloy::{ use tracing::{debug, error, info, warn}; #[allow(clippy::too_many_arguments)] -pub(crate) async fn stream_live_blocks( +pub(crate) async fn stream_live_blocks>( stream_start: BlockNumber, subscription: RobustSubscription, sender: &mpsc::Sender, provider: &RobustProvider, block_confirmations: u64, max_block_range: u64, - reorg_handler: &mut ReorgHandler, + reorg_handler: &mut R, notify_after_first_block: bool, ) { // Phase 1: Wait for first relevant block @@ -122,14 +122,14 @@ fn skip_to_first_relevant_block( /// Initializes the streaming state after receiving the first block /// Returns None if the channel is closed -async fn initialize_live_streaming_state( +async fn initialize_live_streaming_state>( first_block: N::HeaderResponse, stream_start: BlockNumber, block_confirmations: u64, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, + reorg_handler: &mut R, ) -> Option> { let incoming_block_num = first_block.number(); info!(block_number = incoming_block_num, "Received first block header"); @@ -162,6 +162,7 @@ async fn initialize_live_streaming_state( async fn stream_blocks_continuously< N: Network, S: tokio_stream::Stream> + Unpin, + R: ReorgHandler, >( stream: &mut S, state: &mut LiveStreamingState, @@ -170,7 +171,7 @@ async fn stream_blocks_continuously< max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, + reorg_handler: &mut R, ) { while let Some(incoming_block) = stream.next().await { let incoming_block = match incoming_block { @@ -279,14 +280,14 @@ async fn handle_reorg_detected( /// Streams the next batch of blocks up to `batch_end_num`. /// Returns false if the channel is closed -async fn stream_next_batch( +async fn stream_next_batch>( batch_end_num: BlockNumber, state: &mut LiveStreamingState, stream_start: BlockNumber, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, + reorg_handler: &mut R, ) -> bool { if batch_end_num < state.batch_start { // No new confirmed blocks to stream yet @@ -327,13 +328,13 @@ struct LiveStreamingState { } #[must_use] -pub(crate) async fn stream_historical_range( +pub(crate) async fn stream_historical_range>( start: BlockNumber, end: BlockNumber, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, + reorg_handler: &mut R, ) -> Option<()> { info!("Getting finalized block number"); let finalized = match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await @@ -390,14 +391,14 @@ pub(crate) async fn stream_historical_range( } /// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks. -pub(crate) async fn stream_range_with_reorg_handling( +pub(crate) async fn stream_range_with_reorg_handling>( min_common_ancestor: BlockNumber, mut next_start_block: BlockNumber, end: BlockNumber, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, + reorg_handler: &mut R, ) -> Option { let mut batch_count = 0; diff --git a/src/block_range_scanner/reorg_handler.rs b/src/block_range_scanner/reorg_handler.rs index 9fd12614..fedb397b 100644 --- a/src/block_range_scanner/reorg_handler.rs +++ b/src/block_range_scanner/reorg_handler.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use alloy::{ consensus::BlockHeader, eips::BlockNumberOrTag, @@ -14,13 +16,33 @@ use crate::{ use super::ring_buffer::RingBuffer; +/// Trait for handling chain reorganizations. +pub(crate) trait ReorgHandler: Clone + Send { + /// Checks if a block was reorged and returns the common ancestor if found. + /// + /// # Arguments + /// + /// * `block` - The block to check for reorg. + /// + /// # Returns + /// + /// * `Ok(Some(common_ancestor))` - If a reorg was detected, returns the common ancestor block. + /// * `Ok(None)` - If no reorg was detected, returns `None`. + /// * `Err(e)` - If an error occurred while checking for reorg. + fn check( + &mut self, + block: &N::BlockResponse, + ) -> impl Future, ScannerError>> + Send; +} + +/// Default implementation of [`ReorgHandler`] that uses an RPC provider. #[derive(Clone)] -pub(crate) struct ReorgHandler { +pub(crate) struct DefaultReorgHandler { provider: RobustProvider, buffer: RingBuffer, } -impl ReorgHandler { +impl DefaultReorgHandler { pub fn new(provider: RobustProvider, capacity: RingBufferCapacity) -> Self { Self { provider, buffer: RingBuffer::new(capacity) } } @@ -133,3 +155,12 @@ impl ReorgHandler { Ok(Some(common_ancestor)) } } + +impl ReorgHandler for DefaultReorgHandler { + async fn check( + &mut self, + block: &N::BlockResponse, + ) -> Result, ScannerError> { + DefaultReorgHandler::check(self, block).await + } +} diff --git a/src/block_range_scanner/sync_handler.rs b/src/block_range_scanner/sync_handler.rs index 3c0e371c..d84cb4af 100644 --- a/src/block_range_scanner/sync_handler.rs +++ b/src/block_range_scanner/sync_handler.rs @@ -5,7 +5,9 @@ use tracing::{error, info}; use crate::{ Notification, ScannerError, block_range_scanner::{ - BlockScannerResult, common, reorg_handler::ReorgHandler, ring_buffer::RingBufferCapacity, + BlockScannerResult, common, + reorg_handler::{DefaultReorgHandler, ReorgHandler}, + ring_buffer::RingBufferCapacity, }, robust_provider::RobustProvider, types::TryStream, @@ -25,7 +27,7 @@ pub(crate) struct SyncHandler { start_id: BlockId, block_confirmations: u64, sender: mpsc::Sender, - reorg_handler: ReorgHandler, + reorg_handler: DefaultReorgHandler, } impl SyncHandler { @@ -37,7 +39,8 @@ impl SyncHandler { past_blocks_storage_capacity: RingBufferCapacity, sender: mpsc::Sender, ) -> Self { - let reorg_handler = ReorgHandler::new(provider.clone(), past_blocks_storage_capacity); + let reorg_handler = + DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity); Self { provider, max_block_range, start_id, block_confirmations, sender, reorg_handler } } @@ -153,14 +156,14 @@ impl SyncHandler { /// Catches up on historical blocks until we reach the chain tip /// Returns the block number where live streaming should begin - async fn catchup_historical_blocks( + async fn catchup_historical_blocks>( mut start_block: BlockNumber, mut confirmed_tip: BlockNumber, block_confirmations: u64, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, + reorg_handler: &mut R, ) -> Result, ScannerError> { while start_block < confirmed_tip { if common::stream_historical_range( @@ -189,13 +192,13 @@ impl SyncHandler { } /// Subscribes to live blocks and begins streaming - async fn transition_to_live( + async fn transition_to_live>( start_block: BlockNumber, block_confirmations: u64, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, - reorg_handler: &mut ReorgHandler, + reorg_handler: &mut R, ) { let subscription = match provider.subscribe_blocks().await { Ok(sub) => sub,