Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ mod reorg_handler;
mod ring_buffer;
mod sync_handler;

use reorg_handler::ReorgHandler;
use reorg_handler::{DefaultReorgHandler, ReorgHandler};
pub use ring_buffer::RingBufferCapacity;

pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
Expand Down Expand Up @@ -331,7 +331,7 @@ impl<N: Network> Service<N> {

tokio::spawn(async move {
let mut reorg_handler =
ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);

common::stream_live_blocks(
range_start,
Expand Down Expand Up @@ -378,7 +378,7 @@ impl<N: Network> Service<N> {

tokio::spawn(async move {
let mut reorg_handler =
ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);

_ = common::stream_historical_range(
start_block_num,
Expand Down Expand Up @@ -432,7 +432,7 @@ impl<N: Network> Service<N> {

tokio::spawn(async move {
let mut reorg_handler =
ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);

Self::stream_rewind(from, to, max_block_range, &sender, &provider, &mut reorg_handler)
.await;
Expand All @@ -448,13 +448,13 @@ impl<N: Network> Service<N> {
/// # Errors
///
/// Returns an error if the stream fails
async fn stream_rewind(
async fn stream_rewind<R: ReorgHandler<N>>(
from: N::BlockResponse,
to: N::BlockResponse,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
reorg_handler: &mut R,
) {
let mut batch_count = 0;

Expand Down
23 changes: 12 additions & 11 deletions src/block_range_scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use alloy::{
use tracing::{debug, error, info, warn};

#[allow(clippy::too_many_arguments)]
pub(crate) async fn stream_live_blocks<N: Network>(
pub(crate) async fn stream_live_blocks<N: Network, R: ReorgHandler<N>>(
stream_start: BlockNumber,
subscription: RobustSubscription<N>,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
block_confirmations: u64,
max_block_range: u64,
reorg_handler: &mut ReorgHandler<N>,
reorg_handler: &mut R,
notify_after_first_block: bool,
) {
// Phase 1: Wait for first relevant block
Expand Down Expand Up @@ -122,14 +122,14 @@ fn skip_to_first_relevant_block<N: Network>(

/// Initializes the streaming state after receiving the first block
/// Returns None if the channel is closed
async fn initialize_live_streaming_state<N: Network>(
async fn initialize_live_streaming_state<N: Network, R: ReorgHandler<N>>(
first_block: N::HeaderResponse,
stream_start: BlockNumber,
block_confirmations: u64,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
reorg_handler: &mut R,
) -> Option<LiveStreamingState<N>> {
let incoming_block_num = first_block.number();
info!(block_number = incoming_block_num, "Received first block header");
Expand Down Expand Up @@ -162,6 +162,7 @@ async fn initialize_live_streaming_state<N: Network>(
async fn stream_blocks_continuously<
N: Network,
S: tokio_stream::Stream<Item = Result<N::HeaderResponse, subscription::Error>> + Unpin,
R: ReorgHandler<N>,
>(
stream: &mut S,
state: &mut LiveStreamingState<N>,
Expand All @@ -170,7 +171,7 @@ async fn stream_blocks_continuously<
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
reorg_handler: &mut R,
) {
while let Some(incoming_block) = stream.next().await {
let incoming_block = match incoming_block {
Expand Down Expand Up @@ -279,14 +280,14 @@ async fn handle_reorg_detected<N: Network>(

/// Streams the next batch of blocks up to `batch_end_num`.
/// Returns false if the channel is closed
async fn stream_next_batch<N: Network>(
async fn stream_next_batch<N: Network, R: ReorgHandler<N>>(
batch_end_num: BlockNumber,
state: &mut LiveStreamingState<N>,
stream_start: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
reorg_handler: &mut R,
) -> bool {
if batch_end_num < state.batch_start {
// No new confirmed blocks to stream yet
Expand Down Expand Up @@ -327,13 +328,13 @@ struct LiveStreamingState<N: Network> {
}

#[must_use]
pub(crate) async fn stream_historical_range<N: Network>(
pub(crate) async fn stream_historical_range<N: Network, R: ReorgHandler<N>>(
start: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
reorg_handler: &mut R,
) -> Option<()> {
info!("Getting finalized block number");
let finalized = match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await
Expand Down Expand Up @@ -390,14 +391,14 @@ pub(crate) async fn stream_historical_range<N: Network>(
}

/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
pub(crate) async fn stream_range_with_reorg_handling<N: Network, R: ReorgHandler<N>>(
min_common_ancestor: BlockNumber,
mut next_start_block: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
reorg_handler: &mut R,
) -> Option<N::BlockResponse> {
let mut batch_count = 0;

Expand Down
35 changes: 33 additions & 2 deletions src/block_range_scanner/reorg_handler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::future::Future;

use alloy::{
consensus::BlockHeader,
eips::BlockNumberOrTag,
Expand All @@ -14,13 +16,33 @@ use crate::{

use super::ring_buffer::RingBuffer;

/// Trait for handling chain reorganizations.
pub(crate) trait ReorgHandler<N: Network>: Clone + Send {
/// Checks if a block was reorged and returns the common ancestor if found.
///
/// # Arguments
///
/// * `block` - The block to check for reorg.
///
/// # Returns
///
/// * `Ok(Some(common_ancestor))` - If a reorg was detected, returns the common ancestor block.
/// * `Ok(None)` - If no reorg was detected, returns `None`.
/// * `Err(e)` - If an error occurred while checking for reorg.
fn check(
&mut self,
block: &N::BlockResponse,
) -> impl Future<Output = Result<Option<N::BlockResponse>, ScannerError>> + Send;
}

/// Default implementation of [`ReorgHandler`] that uses an RPC provider.
#[derive(Clone)]
pub(crate) struct ReorgHandler<N: Network = Ethereum> {
pub(crate) struct DefaultReorgHandler<N: Network = Ethereum> {
provider: RobustProvider<N>,
buffer: RingBuffer<BlockHash>,
}

impl<N: Network> ReorgHandler<N> {
impl<N: Network> DefaultReorgHandler<N> {
pub fn new(provider: RobustProvider<N>, capacity: RingBufferCapacity) -> Self {
Self { provider, buffer: RingBuffer::new(capacity) }
}
Expand Down Expand Up @@ -133,3 +155,12 @@ impl<N: Network> ReorgHandler<N> {
Ok(Some(common_ancestor))
}
}

impl<N: Network> ReorgHandler<N> for DefaultReorgHandler<N> {
async fn check(
&mut self,
block: &N::BlockResponse,
) -> Result<Option<N::BlockResponse>, ScannerError> {
DefaultReorgHandler::check(self, block).await
}
}
17 changes: 10 additions & 7 deletions src/block_range_scanner/sync_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use tracing::{error, info};
use crate::{
Notification, ScannerError,
block_range_scanner::{
BlockScannerResult, common, reorg_handler::ReorgHandler, ring_buffer::RingBufferCapacity,
BlockScannerResult, common,
reorg_handler::{DefaultReorgHandler, ReorgHandler},
ring_buffer::RingBufferCapacity,
},
robust_provider::RobustProvider,
types::TryStream,
Expand All @@ -25,7 +27,7 @@ pub(crate) struct SyncHandler<N: Network> {
start_id: BlockId,
block_confirmations: u64,
sender: mpsc::Sender<BlockScannerResult>,
reorg_handler: ReorgHandler<N>,
reorg_handler: DefaultReorgHandler<N>,
}

impl<N: Network> SyncHandler<N> {
Expand All @@ -37,7 +39,8 @@ impl<N: Network> SyncHandler<N> {
past_blocks_storage_capacity: RingBufferCapacity,
sender: mpsc::Sender<BlockScannerResult>,
) -> Self {
let reorg_handler = ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
let reorg_handler =
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
Self { provider, max_block_range, start_id, block_confirmations, sender, reorg_handler }
}

Expand Down Expand Up @@ -153,14 +156,14 @@ impl<N: Network> SyncHandler<N> {

/// Catches up on historical blocks until we reach the chain tip
/// Returns the block number where live streaming should begin
async fn catchup_historical_blocks(
async fn catchup_historical_blocks<R: ReorgHandler<N>>(
mut start_block: BlockNumber,
mut confirmed_tip: BlockNumber,
block_confirmations: u64,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
reorg_handler: &mut R,
) -> Result<Option<BlockNumber>, ScannerError> {
while start_block < confirmed_tip {
if common::stream_historical_range(
Expand Down Expand Up @@ -189,13 +192,13 @@ impl<N: Network> SyncHandler<N> {
}

/// Subscribes to live blocks and begins streaming
async fn transition_to_live(
async fn transition_to_live<R: ReorgHandler<N>>(
start_block: BlockNumber,
block_confirmations: u64,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
reorg_handler: &mut R,
) {
let subscription = match provider.subscribe_blocks().await {
Ok(sub) => sub,
Expand Down