diff --git a/README.md b/README.md index 655d2342..f93df686 100644 --- a/README.md +++ b/README.md @@ -85,10 +85,13 @@ async fn run_scanner( .contract_address(contract) .event(MyContract::SomeEvent::SIGNATURE); - let mut stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - // Start the scanner - scanner.start().await?; + // Start the scanner and get the proof + let proof = scanner.start().await?; + + // Access the stream using the proof + let mut stream = subscription.stream(&proof); // Process messages from the stream while let Some(message) = stream.next().await { @@ -174,7 +177,7 @@ let scanner = EventScannerBuilder::sync() .await?; ``` -Invoking `scanner.start()` starts the scanner in the specified mode. +Invoking `scanner.start()` starts the scanner in the specified mode and returns a `StartProof` that must be passed to `subscription.stream()` to access the event stream. This compile-time guarantee ensures the scanner is started before attempting to read events. ### Defining Event Filters diff --git a/examples/historical_scanning/main.rs b/examples/historical_scanning/main.rs index 70f31d2f..44dfcf25 100644 --- a/examples/historical_scanning/main.rs +++ b/examples/historical_scanning/main.rs @@ -63,9 +63,12 @@ async fn main() -> anyhow::Result<()> { let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?; - let mut stream = scanner.subscribe(increase_filter); + let subscription = scanner.subscribe(increase_filter); - scanner.start().await.expect("failed to start scanner"); + let proof = scanner.start().await.expect("failed to start scanner"); + + // Access the stream using the proof (proves scanner is started) + let mut stream = subscription.stream(&proof); while let Some(message) = stream.next().await { match message { diff --git a/examples/latest_events_scanning/main.rs b/examples/latest_events_scanning/main.rs index 642837ec..8c8dbdf9 100644 --- a/examples/latest_events_scanning/main.rs +++ b/examples/latest_events_scanning/main.rs @@ -60,13 +60,16 @@ async fn main() -> anyhow::Result<()> { let mut scanner = EventScannerBuilder::latest(5).connect(robust_provider).await?; - let mut stream = scanner.subscribe(increase_filter); + let subscription = scanner.subscribe(increase_filter); for _ in 0..8 { _ = counter_contract.increase().send().await?; } - scanner.start().await?; + let proof = scanner.start().await?; + + // Access the stream using the proof (proves scanner is started) + let mut stream = subscription.stream(&proof); while let Some(message) = stream.next().await { match message { diff --git a/examples/live_scanning/main.rs b/examples/live_scanning/main.rs index cadcaabf..7fbbb384 100644 --- a/examples/live_scanning/main.rs +++ b/examples/live_scanning/main.rs @@ -61,9 +61,12 @@ async fn main() -> anyhow::Result<()> { let mut scanner = EventScannerBuilder::live().connect(robust_provider).await?; - let mut stream = scanner.subscribe(increase_filter); + let subscription = scanner.subscribe(increase_filter); - scanner.start().await.expect("failed to start scanner"); + let proof = scanner.start().await.expect("failed to start scanner"); + + // Access the stream using the proof (proves scanner is started) + let mut stream = subscription.stream(&proof); _ = counter_contract.increase().send().await?; diff --git a/examples/sync_from_block_scanning/main.rs b/examples/sync_from_block_scanning/main.rs index 30b60e27..d502c4bf 100644 --- a/examples/sync_from_block_scanning/main.rs +++ b/examples/sync_from_block_scanning/main.rs @@ -69,10 +69,13 @@ async fn main() -> anyhow::Result<()> { let mut scanner = EventScannerBuilder::sync().from_block(0).connect(robust_provider).await?; - let mut stream = scanner.subscribe(increase_filter); + let subscription = scanner.subscribe(increase_filter); info!("Starting sync scanner..."); - scanner.start().await.expect("failed to start scanner"); + let proof = scanner.start().await.expect("failed to start scanner"); + + // Access the stream using the proof (proves scanner is started) + let mut stream = subscription.stream(&proof); info!("Creating live events..."); for i in 0..2 { diff --git a/examples/sync_from_latest_scanning/main.rs b/examples/sync_from_latest_scanning/main.rs index c9227104..43350c04 100644 --- a/examples/sync_from_latest_scanning/main.rs +++ b/examples/sync_from_latest_scanning/main.rs @@ -61,13 +61,16 @@ async fn main() -> anyhow::Result<()> { let mut client = EventScannerBuilder::sync().from_latest(5).connect(robust_provider).await?; - let mut stream = client.subscribe(increase_filter); + let subscription = client.subscribe(increase_filter); for _ in 0..10 { _ = counter_contract.increase().send().await?; } - client.start().await.expect("failed to start scanner"); + let proof = client.start().await.expect("failed to start scanner"); + + // Access the stream using the proof (proves scanner is started) + let mut stream = subscription.stream(&proof); // emit some events for live mode to pick up _ = counter_contract.increase().send().await?; diff --git a/src/event_scanner/mod.rs b/src/event_scanner/mod.rs index 62c673b7..deaa51f8 100644 --- a/src/event_scanner/mod.rs +++ b/src/event_scanner/mod.rs @@ -16,6 +16,6 @@ mod scanner; pub use filter::EventFilter; pub use message::{EventScannerResult, Message}; pub use scanner::{ - DEFAULT_MAX_CONCURRENT_FETCHES, EventScanner, EventScannerBuilder, Historic, LatestEvents, - Live, SyncFromBlock, SyncFromLatestEvents, block_range_handler, + DEFAULT_MAX_CONCURRENT_FETCHES, EventScanner, EventScannerBuilder, EventSubscription, Historic, + LatestEvents, Live, StartProof, SyncFromBlock, SyncFromLatestEvents, block_range_handler, }; diff --git a/src/event_scanner/scanner/historic.rs b/src/event_scanner/scanner/historic.rs index ef2eb99c..73c82a4b 100644 --- a/src/event_scanner/scanner/historic.rs +++ b/src/event_scanner/scanner/historic.rs @@ -7,7 +7,10 @@ use alloy::{ use super::block_range_handler::StreamHandler; use crate::{ EventScannerBuilder, ScannerError, - event_scanner::scanner::{EventScanner, Historic, block_range_handler::BlockRangeHandler}, + event_scanner::{ + StartProof, + scanner::{EventScanner, Historic, block_range_handler::BlockRangeHandler}, + }, robust_provider::IntoRobustProvider, }; @@ -133,7 +136,7 @@ impl EventScanner { /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. /// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved. - pub async fn start(self) -> Result<(), ScannerError> { + pub async fn start(self) -> Result { info!( from_block = ?self.config.from_block, to_block = ?self.config.to_block, @@ -159,7 +162,7 @@ impl EventScanner { handler.handle(stream).await; }); - Ok(()) + Ok(StartProof::new()) } } diff --git a/src/event_scanner/scanner/latest.rs b/src/event_scanner/scanner/latest.rs index 6fda05cc..b3720f41 100644 --- a/src/event_scanner/scanner/latest.rs +++ b/src/event_scanner/scanner/latest.rs @@ -7,7 +7,7 @@ use alloy::{ use crate::{ EventScannerBuilder, ScannerError, event_scanner::{ - EventScanner, LatestEvents, + EventScanner, LatestEvents, StartProof, scanner::block_range_handler::{BlockRangeHandler, LatestEventsHandler}, }, robust_provider::IntoRobustProvider, @@ -146,7 +146,15 @@ impl EventScanner { /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. /// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved. - pub async fn start(self) -> Result<(), ScannerError> { + pub async fn start(self) -> Result { + info!( + from_block = ?self.config.from_block, + to_block = ?self.config.to_block, + count = ?self.config.count, + listener_count = self.listeners.len(), + "Starting EventScanner in LatestEvents mode" + ); + let stream = self .block_range_scanner .stream_rewind(self.config.from_block, self.config.to_block) @@ -166,7 +174,7 @@ impl EventScanner { handler.handle(stream).await; }); - Ok(()) + Ok(StartProof::new()) } } diff --git a/src/event_scanner/scanner/live.rs b/src/event_scanner/scanner/live.rs index aec4813c..e2571c96 100644 --- a/src/event_scanner/scanner/live.rs +++ b/src/event_scanner/scanner/live.rs @@ -3,7 +3,7 @@ use alloy::network::Network; use crate::{ EventScannerBuilder, ScannerError, event_scanner::{ - EventScanner, + EventScanner, StartProof, scanner::{ Live, block_range_handler::{BlockRangeHandler, StreamHandler}, @@ -72,7 +72,7 @@ impl EventScanner { /// /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. - pub async fn start(self) -> Result<(), ScannerError> { + pub async fn start(self) -> Result { info!( block_confirmations = self.config.block_confirmations, listener_count = self.listeners.len(), @@ -93,7 +93,7 @@ impl EventScanner { handler.handle(stream).await; }); - Ok(()) + Ok(StartProof::new()) } } diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index 3c92cd5d..111c6503 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -39,9 +39,12 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use crate::{ - BlockRangeScannerBuilder, EventFilter, ScannerError, - block_range_scanner::{BlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, RingBufferCapacity}, - event_scanner::{EventScannerResult, listener::EventListener}, + block_range_scanner::{ + BlockRangeScanner, BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, + RingBufferCapacity, + }, + error::ScannerError, + event_scanner::{EventScannerResult, filter::EventFilter, listener::EventListener}, robust_provider::IntoRobustProvider, }; @@ -191,9 +194,9 @@ impl EventScannerBuilder { /// let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?; /// /// let filter = EventFilter::new().contract_address(contract_address); - /// let mut stream = scanner.subscribe(filter); - /// - /// scanner.start().await?; + /// let subscription = scanner.subscribe(filter); + /// let proof = scanner.start().await?; + /// let mut stream = subscription.stream(&proof); /// /// while let Some(Ok(Message::Data(logs))) = stream.next().await { /// println!("Received {} logs", logs.len()); @@ -268,9 +271,9 @@ impl EventScannerBuilder { /// .await?; /// /// let filter = EventFilter::new().contract_address(contract_address); - /// let mut stream = scanner.subscribe(filter); - /// - /// scanner.start().await?; + /// let subscription = scanner.subscribe(filter); + /// let proof = scanner.start().await?; + /// let mut stream = subscription.stream(&proof); /// /// while let Some(msg) = stream.next().await { /// match msg { @@ -358,9 +361,9 @@ impl EventScannerBuilder { /// let mut scanner = EventScannerBuilder::latest(10).connect(robust_provider).await?; /// /// let filter = EventFilter::new().contract_address(contract_address); - /// let mut stream = scanner.subscribe(filter); - /// - /// scanner.start().await?; + /// let subscription = scanner.subscribe(filter); + /// let proof = scanner.start().await?; + /// let mut stream = subscription.stream(&proof); /// /// // Expect a single message with up to 10 logs, then the stream ends /// while let Some(Ok(Message::Data(logs))) = stream.next().await { @@ -568,6 +571,58 @@ impl EventScannerBuilder { } } +/// A subscription to scanner events that requires proof the scanner has started. +/// +/// Created by [`EventScanner::subscribe()`](crate::EventScanner::subscribe), this type holds the +/// underlying stream but prevents access until [`stream()`](EventSubscription::stream) is called +/// with a valid [`StartProof`]. +/// +/// This pattern ensures at compile time that [`EventScanner::start()`](crate::EventScanner::start) +/// is called before attempting to read from the event stream. +/// +/// # Example +/// +/// ```ignore +/// let mut scanner = EventScannerBuilder::live().connect(provider).await?; +/// +/// // Create subscription (cannot access stream yet) +/// let subscription = scanner.subscribe(filter); +/// +/// // Start scanner and get proof +/// let proof = scanner.start().await?; +/// +/// // Now access the stream with the proof +/// let mut stream = subscription.stream(&proof); +/// +/// while let Some(msg) = stream.next().await { +/// // process events +/// } +/// ``` +pub struct EventSubscription { + inner: ReceiverStream, +} + +impl EventSubscription { + /// Creates a new subscription wrapping the given stream. + pub(crate) fn new(inner: ReceiverStream) -> Self { + Self { inner } + } + + /// Access the event stream. + /// + /// Requires a reference to a [`StartProof`] as proof that the scanner + /// has been started. The proof is obtained by calling + /// `EventScanner::start()`. + /// + /// # Arguments + /// + /// * `_proof` - Proof that the scanner has been started + #[must_use] + pub fn stream(self, _proof: &StartProof) -> ReceiverStream { + self.inner + } +} + impl EventScanner { /// Returns the configured stream buffer capacity. #[must_use] @@ -593,11 +648,43 @@ impl EventScanner { /// /// For scanner to properly stream events, register all subscriptions before calling `start()`. #[must_use] - pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream { + pub fn subscribe(&mut self, filter: EventFilter) -> EventSubscription { let (sender, receiver) = mpsc::channel::(self.block_range_scanner.buffer_capacity()); self.listeners.push(EventListener { filter, sender }); - ReceiverStream::new(receiver) + EventSubscription::new(ReceiverStream::new(receiver)) + } +} + +/// Proof that the scanner has been started. +/// +/// This proof is returned by `EventScanner::start()` and must be passed to +/// [`EventSubscription::stream()`] to access the event stream. This ensures at compile +/// time that the scanner is started before attempting to read events. +/// +/// # Example +/// +/// ```ignore +/// let mut scanner = EventScannerBuilder::sync().from_block(0).connect(provider).await?; +/// let subscription = scanner.subscribe(filter); +/// +/// // Start the scanner and get the proof +/// let proof = scanner.start().await?; +/// +/// // Now we can access the stream +/// let mut stream = subscription.stream(&proof); +/// ``` +#[derive(Debug, Clone)] +pub struct StartProof { + /// Private field prevents construction outside this crate + _private: (), +} + +impl StartProof { + /// Creates a new start proof. + #[must_use] + pub(crate) fn new() -> Self { + Self { _private: () } } } diff --git a/src/event_scanner/scanner/sync/from_block.rs b/src/event_scanner/scanner/sync/from_block.rs index a5d44922..d594f06f 100644 --- a/src/event_scanner/scanner/sync/from_block.rs +++ b/src/event_scanner/scanner/sync/from_block.rs @@ -3,7 +3,7 @@ use alloy::{eips::BlockId, network::Network}; use crate::{ EventScannerBuilder, ScannerError, event_scanner::{ - EventScanner, SyncFromBlock, + EventScanner, StartProof, SyncFromBlock, scanner::block_range_handler::{BlockRangeHandler, StreamHandler}, }, robust_provider::IntoRobustProvider, @@ -75,7 +75,7 @@ impl EventScanner { /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. /// * [`ScannerError::BlockNotFound`] - if `from_block` cannot be resolved. - pub async fn start(self) -> Result<(), ScannerError> { + pub async fn start(self) -> Result { info!( from_block = ?self.config.from_block, block_confirmations = self.config.block_confirmations, @@ -101,7 +101,7 @@ impl EventScanner { handler.handle(stream).await; }); - Ok(()) + Ok(StartProof::new()) } } diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index 9c87187f..24ffc74f 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -3,7 +3,7 @@ use alloy::{eips::BlockNumberOrTag, network::Network}; use crate::{ EventScannerBuilder, ScannerError, event_scanner::{ - EventScanner, + EventScanner, StartProof, scanner::{ SyncFromLatestEvents, block_range_handler::{BlockRangeHandler, LatestEventsHandler, StreamHandler}, @@ -74,7 +74,7 @@ impl EventScanner { /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. #[allow(clippy::missing_panics_doc)] - pub async fn start(self) -> Result<(), ScannerError> { + pub async fn start(self) -> Result { info!( event_count = self.config.count, block_confirmations = self.config.block_confirmations, @@ -157,7 +157,7 @@ impl EventScanner { debug!("SyncFromLatestEvents stream ended"); }); - Ok(()) + Ok(StartProof::new()) } } diff --git a/src/event_scanner/scanner/sync/mod.rs b/src/event_scanner/scanner/sync/mod.rs index d234b021..3032b34f 100644 --- a/src/event_scanner/scanner/sync/mod.rs +++ b/src/event_scanner/scanner/sync/mod.rs @@ -39,9 +39,10 @@ impl EventScannerBuilder { /// .await?; /// /// let filter = EventFilter::new().contract_address(contract_address); - /// let mut stream = scanner.subscribe(filter); + /// let subscription = scanner.subscribe(filter); /// - /// scanner.start().await?; + /// let proof = scanner.start().await?; + /// let mut stream = subscription.stream(&proof); /// /// while let Some(msg) = stream.next().await { /// match msg { @@ -153,9 +154,10 @@ impl EventScannerBuilder { /// .await?; /// /// let filter = EventFilter::new().contract_address(contract_address); - /// let mut stream = scanner.subscribe(filter); + /// let subscription = scanner.subscribe(filter); /// - /// scanner.start().await?; + /// let proof = scanner.start().await?; + /// let mut stream = subscription.stream(&proof); /// /// while let Some(msg) = stream.next().await { /// match msg { diff --git a/src/lib.rs b/src/lib.rs index dfa21481..a90c8426 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,6 +69,7 @@ pub use types::{Notification, ScannerMessage}; pub use event_scanner::{ DEFAULT_MAX_CONCURRENT_FETCHES, EventFilter, EventScanner, EventScannerBuilder, - EventScannerResult, Historic, LatestEvents, Live, Message, SyncFromBlock, SyncFromLatestEvents, + EventScannerResult, EventSubscription, Historic, LatestEvents, Live, Message, StartProof, + SyncFromBlock, SyncFromLatestEvents, block_range_handler::{BlockRangeHandler, LatestEventsHandler, StreamHandler}, }; diff --git a/src/macros/test_utils.rs b/src/macros/test_utils.rs index 3164b707..e5871668 100644 --- a/src/macros/test_utils.rs +++ b/src/macros/test_utils.rs @@ -98,7 +98,9 @@ macro_rules! assert_empty { /// async fn test_event_order() { /// // scanner setup... /// -/// let mut stream = scanner.subscribe(EventFilter::new().contract_address(contract_address)); +/// let subscription = scanner.subscribe(EventFilter::new().contract_address(contract_address)); +/// let handle = scanner.start().await.unwrap(); +/// let mut stream = subscription.stream(&handle); /// /// // Assert these two events are emitted in order /// assert_event_sequence!( diff --git a/tests/common/setup_scanner.rs b/tests/common/setup_scanner.rs index 9471f632..75398b55 100644 --- a/tests/common/setup_scanner.rs +++ b/tests/common/setup_scanner.rs @@ -6,10 +6,9 @@ use alloy::{ }; use alloy_node_bindings::AnvilInstance; use event_scanner::{ - EventFilter, EventScanner, EventScannerBuilder, EventScannerResult, Historic, LatestEvents, + EventFilter, EventScanner, EventScannerBuilder, EventSubscription, Historic, LatestEvents, Live, SyncFromBlock, SyncFromLatestEvents, robust_provider::RobustProvider, }; -use tokio_stream::wrappers::ReceiverStream; use crate::common::{ TestCounter::{self, CountIncreased}, @@ -24,7 +23,7 @@ where pub provider: RobustProvider, pub contract: TestCounter::TestCounterInstance

, pub scanner: S, - pub stream: ReceiverStream, + pub subscription: EventSubscription, #[allow(dead_code)] pub anvil: AnvilInstance, } @@ -68,9 +67,9 @@ pub async fn setup_live_scanner( .connect(provider.clone()) .await?; - let stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - Ok(ScannerSetup { provider, contract, scanner, stream, anvil }) + Ok(ScannerSetup { provider, contract, scanner, subscription, anvil }) } pub async fn setup_sync_scanner( @@ -87,9 +86,9 @@ pub async fn setup_sync_scanner( .connect(provider.clone()) .await?; - let stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - Ok(ScannerSetup { provider, contract, scanner, stream, anvil }) + Ok(ScannerSetup { provider, contract, scanner, subscription, anvil }) } pub async fn setup_sync_from_latest_scanner( @@ -106,9 +105,9 @@ pub async fn setup_sync_from_latest_scanner( .connect(provider.clone()) .await?; - let stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - Ok(ScannerSetup { provider, contract, scanner, stream, anvil }) + Ok(ScannerSetup { provider, contract, scanner, subscription, anvil }) } pub async fn setup_historic_scanner( @@ -124,9 +123,9 @@ pub async fn setup_historic_scanner( .connect(provider.clone()) .await?; - let stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - Ok(ScannerSetup { provider, contract, scanner, stream, anvil }) + Ok(ScannerSetup { provider, contract, scanner, subscription, anvil }) } pub async fn setup_latest_scanner( @@ -147,7 +146,7 @@ pub async fn setup_latest_scanner( let mut scanner = builder.connect(provider.clone()).await?; - let stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - Ok(ScannerSetup { provider, contract, scanner, stream, anvil }) + Ok(ScannerSetup { provider, contract, scanner, subscription, anvil }) } diff --git a/tests/historic/basic.rs b/tests/historic/basic.rs index 97d8ac3a..3afd5bb6 100644 --- a/tests/historic/basic.rs +++ b/tests/historic/basic.rs @@ -10,7 +10,7 @@ async fn processes_events_within_specified_historical_range() -> anyhow::Result< .await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; @@ -18,7 +18,8 @@ async fn processes_events_within_specified_historical_range() -> anyhow::Result< contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); assert_next!( stream, diff --git a/tests/latest_events/basic.rs b/tests/latest_events/basic.rs index 77ff571e..1bb84407 100644 --- a/tests/latest_events/basic.rs +++ b/tests/latest_events/basic.rs @@ -10,13 +10,14 @@ async fn exact_count_returns_last_events_in_order() -> anyhow::Result<()> { let setup = setup_latest_scanner(None, None, 5, None, None).await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; for _ in 0..8 { contract.increase().send().await?.watch().await?; } - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); assert_next!( stream, @@ -39,14 +40,15 @@ async fn fewer_available_than_count_returns_all() -> anyhow::Result<()> { let setup = setup_latest_scanner(None, None, count, None, None).await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; // Produce only 3 events contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); assert_next!( stream, @@ -66,9 +68,10 @@ async fn no_past_events_returns_empty() -> anyhow::Result<()> { let count = 5; let setup = setup_latest_scanner(None, None, count, None, None).await?; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); assert_next!(stream, Notification::NoPastLogsFound); assert_closed!(stream); @@ -97,9 +100,10 @@ async fn respects_range_subset() -> anyhow::Result<()> { let mut scanner_with_range = EventScannerBuilder::latest(10).from_block(start).to_block(end).connect(provider).await?; - let mut stream_with_range = scanner_with_range.subscribe(default_filter); + let subscription = scanner_with_range.subscribe(default_filter); - scanner_with_range.start().await?; + let proof = scanner_with_range.start().await?; + let mut stream_with_range = subscription.stream(&proof); assert_next!( stream_with_range, @@ -119,13 +123,13 @@ async fn multiple_listeners_to_same_event_receive_same_results() -> anyhow::Resu let setup = setup_latest_scanner(None, None, count, None, None).await?; let contract = setup.contract; let mut scanner = setup.scanner; - let mut stream1 = setup.stream; + let subscription1 = setup.subscription; // Add a second listener with the same filter let filter2 = EventFilter::new() .contract_address(*contract.address()) .event(TestCounter::CountIncreased::SIGNATURE); - let mut stream2 = scanner.subscribe(filter2); + let subscription2 = scanner.subscribe(filter2); // Produce 7 events contract.increase().send().await?.watch().await?; @@ -136,7 +140,9 @@ async fn multiple_listeners_to_same_event_receive_same_results() -> anyhow::Resu contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream1 = subscription1.stream(&proof); + let mut stream2 = subscription2.stream(&proof); let expected = &[ TestCounter::CountIncreased { newCount: U256::from(3) }, @@ -166,13 +172,13 @@ async fn different_filters_receive_different_results() -> anyhow::Result<()> { let filter_inc = EventFilter::new() .contract_address(*contract.address()) .event(TestCounter::CountIncreased::SIGNATURE); - let mut stream_inc = scanner.subscribe(filter_inc); + let subscription_inc = scanner.subscribe(filter_inc); // Second listener for CountDecreased let filter_dec = EventFilter::new() .contract_address(*contract.address()) .event(TestCounter::CountDecreased::SIGNATURE); - let mut stream_dec = scanner.subscribe(filter_dec); + let subscription_dec = scanner.subscribe(filter_dec); // Produce 5 increases, then 2 decreases contract.increase().send().await?.watch().await?; @@ -186,7 +192,9 @@ async fn different_filters_receive_different_results() -> anyhow::Result<()> { // Ask for latest 3 across the full range: each filtered listener should receive their own last // 3 events - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream_inc = subscription_inc.stream(&proof); + let mut stream_dec = subscription_dec.stream(&proof); assert_next!( stream_inc, @@ -216,13 +224,13 @@ async fn mixed_events_and_filters_return_correct_streams() -> anyhow::Result<()> let setup = setup_latest_scanner(None, None, count, None, None).await?; let contract = setup.contract; let mut scanner = setup.scanner; - let mut stream_inc = setup.stream; // CountIncreased by default + let subscription_inc = setup.subscription; // CountIncreased by default // Add a CountDecreased listener let filter_dec = EventFilter::new() .contract_address(*contract.address()) .event(TestCounter::CountDecreased::SIGNATURE); - let mut stream_dec = scanner.subscribe(filter_dec); + let subscription_dec = scanner.subscribe(filter_dec); contract.increase().send().await?.watch().await?; // inc(1) contract.increase().send().await?.watch().await?; // inc(2) @@ -230,7 +238,9 @@ async fn mixed_events_and_filters_return_correct_streams() -> anyhow::Result<()> contract.increase().send().await?.watch().await?; // inc(2) contract.decrease().send().await?.watch().await?; // dec(1) - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream_inc = subscription_inc.stream(&proof); + let mut stream_dec = subscription_dec.stream(&proof); assert_next!( stream_inc, @@ -264,7 +274,7 @@ async fn ignores_non_tracked_contract() -> anyhow::Result<()> { let contract_b = deploy_counter(provider.primary()).await?; // Listener only for contract A CountIncreased - let mut stream_a = setup.stream; + let subscription_a = setup.subscription; // Emit interleaved events from A and B: A(1), B(1), A(2), B(2), A(3) contract_a.increase().send().await?.watch().await?; @@ -273,7 +283,8 @@ async fn ignores_non_tracked_contract() -> anyhow::Result<()> { contract_b.increase().send().await?.watch().await?; // ignored by filter contract_a.increase().send().await?.watch().await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream_a = subscription_a.stream(&proof); assert_next!( stream_a, @@ -308,9 +319,10 @@ async fn large_gaps_and_empty_ranges() -> anyhow::Result<()> { let mut scanner_with_range = EventScannerBuilder::latest(5).from_block(start).to_block(end).connect(provider).await?; - let mut stream_with_range = scanner_with_range.subscribe(default_filter); + let subscription = scanner_with_range.subscribe(default_filter); - scanner_with_range.start().await?; + let proof = scanner_with_range.start().await?; + let mut stream_with_range = subscription.stream(&proof); assert_next!( stream_with_range, @@ -339,9 +351,10 @@ async fn boundary_range_single_block() -> anyhow::Result<()> { let mut scanner_with_range = EventScannerBuilder::latest(5).from_block(start).to_block(end).connect(provider).await?; - let mut stream_with_range = scanner_with_range.subscribe(default_filter); + let subscription = scanner_with_range.subscribe(default_filter); - scanner_with_range.start().await?; + let proof = scanner_with_range.start().await?; + let mut stream_with_range = subscription.stream(&proof); assert_next!(stream_with_range, &[TestCounter::CountIncreased { newCount: U256::from(2) }]); assert_closed!(stream_with_range); diff --git a/tests/latest_events/reorg.rs b/tests/latest_events/reorg.rs index f9fc853d..6d72e858 100644 --- a/tests/latest_events/reorg.rs +++ b/tests/latest_events/reorg.rs @@ -21,9 +21,10 @@ async fn reorged_logs_are_removed_from_stream() -> anyhow::Result<()> { let mut scanner = EventScannerBuilder::latest(5).max_block_range(2).connect(provider.clone()).await?; - let mut stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Trigger a reorg that removes the last 2 blocks (events 19 and 20) provider.primary().anvil_reorg(ReorgOptions { depth: 2, tx_block_pairs: vec![] }).await?; @@ -53,7 +54,7 @@ async fn new_logs_in_reorged_blocks_are_included() -> anyhow::Result<()> { let provider = setup.provider; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; for _ in 0..8 { contract.increase().send().await?.watch().await?; @@ -62,7 +63,8 @@ async fn new_logs_in_reorged_blocks_are_included() -> anyhow::Result<()> { // Mine 2 empty blocks - max newCount is still 8 provider.primary().anvil_mine(Some(2), None).await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Trigger a reorg that removes 2 empty blocks and adds 2 new events in replacement blocks. // Events 9 and 10 can only come from the reorged blocks. @@ -95,13 +97,14 @@ async fn rewind_continues_further_when_reorg_removes_logs() -> anyhow::Result<() let provider = setup.provider; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; for _ in 0..10 { contract.increase().send().await?.watch().await?; } - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); provider.primary().anvil_reorg(ReorgOptions { depth: 3, tx_block_pairs: vec![] }).await?; @@ -129,13 +132,14 @@ async fn deep_reorg_closes_stream_when_fewer_events_remain_than_requested() -> a let provider = setup.provider; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; for _ in 0..8 { contract.increase().send().await?.watch().await?; } - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Reorg removes 5 blocks - this removes events 4-8 // Only events 1-3 remain diff --git a/tests/live/basic.rs b/tests/live/basic.rs index 3f9154fc..2c1045cd 100644 --- a/tests/live/basic.rs +++ b/tests/live/basic.rs @@ -7,9 +7,10 @@ async fn basic_single_event_scanning() -> anyhow::Result<()> { let setup = setup_live_scanner(None, None, 0).await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); for _ in 0..5 { contract.increase().send().await?.watch().await?; @@ -36,15 +37,17 @@ async fn multiple_contracts_same_event_isolate_callbacks() -> anyhow::Result<()> let mut scanner = setup.scanner; let a = setup.contract; - let mut a_stream = setup.stream; + let a_subscription = setup.subscription; let b = deploy_counter(provider.primary().clone()).await?; let b_filter = EventFilter::new() .contract_address(*b.address()) .event(TestCounter::CountIncreased::SIGNATURE.to_owned()); - let mut b_stream = scanner.subscribe(b_filter); + let b_subscription = scanner.subscribe(b_filter); - scanner.start().await?; + let proof = scanner.start().await?; + let mut a_stream = a_subscription.stream(&proof); + let mut b_stream = b_subscription.stream(&proof); for _ in 0..3 { a.increase().send().await?.watch().await?; @@ -78,14 +81,16 @@ async fn multiple_events_same_contract() -> anyhow::Result<()> { let setup = setup_live_scanner(None, None, 0).await?; let mut scanner = setup.scanner; let contract = setup.contract; - let mut incr_stream = setup.stream; + let incr_subscription = setup.subscription; let decrease_filter = EventFilter::new() .contract_address(*contract.address()) .event(TestCounter::CountDecreased::SIGNATURE.to_owned()); - let mut decr_stream = scanner.subscribe(decrease_filter); + let decr_subscription = scanner.subscribe(decrease_filter); - scanner.start().await?; + let proof = scanner.start().await?; + let mut incr_stream = incr_subscription.stream(&proof); + let mut decr_stream = decr_subscription.stream(&proof); contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; @@ -122,9 +127,10 @@ async fn signature_matching_ignores_irrelevant_events() -> anyhow::Result<()> { .contract_address(*contract.address()) .event(TestCounter::CountDecreased::SIGNATURE.to_owned()); - let stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - scanner.start().await?; + let proof = scanner.start().await?; + let stream = subscription.stream(&proof); contract.increase().send().await?.watch().await?; @@ -142,9 +148,10 @@ async fn filters_malformed_signature_graceful() -> anyhow::Result<()> { let filter = EventFilter::new().contract_address(*contract.address()).event("invalid-sig".to_string()); - let stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - scanner.start().await?; + let proof = scanner.start().await?; + let stream = subscription.stream(&proof); contract.increase().send().await?.watch().await?; diff --git a/tests/live/optional_fields.rs b/tests/live/optional_fields.rs index e4204caa..6ac683bc 100644 --- a/tests/live/optional_fields.rs +++ b/tests/live/optional_fields.rs @@ -13,9 +13,10 @@ async fn track_all_events_from_contract() -> anyhow::Result<()> { // Create filter that tracks ALL events from a specific contract (no event signature specified) let filter = EventFilter::new().contract_address(contract_address); - let mut stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Generate both increase and decrease events for _ in 0..5 { @@ -52,9 +53,10 @@ async fn track_all_events_in_block_range() -> anyhow::Result<()> { // specified) let filter = EventFilter::new(); - let mut stream = scanner.subscribe(filter); + let subscription = scanner.subscribe(filter); - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Generate events from our contract for _ in 0..3 { @@ -82,10 +84,12 @@ async fn mixed_optional_and_required_filters() -> anyhow::Result<()> { // Filter for all events from all contracts let all_events_filter = EventFilter::new(); - let mut all_stream = scanner.subscribe(all_events_filter); - let contract_1_stream = setup.stream; + let all_subscription = scanner.subscribe(all_events_filter); + let contract_1_subscription = setup.subscription; - scanner.start().await?; + let proof = scanner.start().await?; + let mut all_stream = all_subscription.stream(&proof); + let contract_1_stream = contract_1_subscription.stream(&proof); // First increase the contract_2 newCount contract_2.increase().send().await?.watch().await?; diff --git a/tests/live/performance.rs b/tests/live/performance.rs index 1b676ebe..6280d31b 100644 --- a/tests/live/performance.rs +++ b/tests/live/performance.rs @@ -5,10 +5,11 @@ use crate::common::{LiveScannerSetup, TestCounter::CountIncreased, setup_live_sc #[tokio::test] async fn high_event_volume_no_loss() -> anyhow::Result<()> { - let LiveScannerSetup { contract, provider: _p, scanner, mut stream, anvil: _a } = + let LiveScannerSetup { contract, provider: _p, scanner, subscription, anvil: _a } = setup_live_scanner(None, None, 0).await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); tokio::spawn(async move { for _ in 0..100 { diff --git a/tests/live/reorg.rs b/tests/live/reorg.rs index 3254a4e1..bbb0baff 100644 --- a/tests/live/reorg.rs +++ b/tests/live/reorg.rs @@ -12,10 +12,11 @@ use event_scanner::{ #[tokio::test] async fn rescans_events_within_same_block() -> anyhow::Result<()> { - let LiveScannerSetup { provider, contract, scanner, mut stream, anvil: _anvil } = + let LiveScannerSetup { provider, contract, scanner, subscription, anvil: _anvil } = setup_live_scanner(None, None, 0).await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // emit initial events for _ in 0..5 { @@ -62,10 +63,11 @@ async fn rescans_events_within_same_block() -> anyhow::Result<()> { #[tokio::test] async fn rescans_events_with_ascending_blocks() -> anyhow::Result<()> { - let LiveScannerSetup { provider, contract, scanner, mut stream, anvil: _anvil } = + let LiveScannerSetup { provider, contract, scanner, subscription, anvil: _anvil } = setup_live_scanner(None, None, 0).await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // emit initial events for _ in 0..5 { @@ -110,10 +112,11 @@ async fn rescans_events_with_ascending_blocks() -> anyhow::Result<()> { #[tokio::test] async fn depth_one() -> anyhow::Result<()> { - let LiveScannerSetup { provider, contract, scanner, mut stream, anvil: _anvil } = + let LiveScannerSetup { provider, contract, scanner, subscription, anvil: _anvil } = setup_live_scanner(None, None, 0).await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // emit initial events for _ in 0..4 { @@ -148,10 +151,11 @@ async fn depth_one() -> anyhow::Result<()> { #[tokio::test] async fn depth_two() -> anyhow::Result<()> { - let LiveScannerSetup { provider, contract, scanner, mut stream, anvil: _anvil } = + let LiveScannerSetup { provider, contract, scanner, subscription, anvil: _anvil } = setup_live_scanner(None, None, 0).await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // emit initial events for _ in 0..4 { @@ -187,10 +191,11 @@ async fn depth_two() -> anyhow::Result<()> { #[tokio::test] async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> { // any reorg ≤ 5 should be invisible to consumers - let LiveScannerSetup { provider, contract, scanner, stream, anvil: _anvil } = + let LiveScannerSetup { provider, contract, scanner, subscription, anvil: _anvil } = setup_live_scanner(None, None, 5).await?; - scanner.start().await?; + let proof = scanner.start().await?; + let stream = subscription.stream(&proof); // mine some initial blocks provider.primary().anvil_mine(Some(10), None).await?; diff --git a/tests/sync/from_block.rs b/tests/sync/from_block.rs index 607fc4f9..ae08210c 100644 --- a/tests/sync/from_block.rs +++ b/tests/sync/from_block.rs @@ -13,14 +13,15 @@ async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> { let setup = setup_sync_scanner(None, None, BlockNumberOrTag::Earliest, 0).await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; // emit "historic" events contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // historical events assert_next!( @@ -57,10 +58,11 @@ async fn sync_from_future_block_waits_until_minted() -> anyhow::Result<()> { let setup = setup_sync_scanner(None, None, future_start_block, 0).await?; let contract = setup.contract; let scanner = setup.scanner; - let stream = setup.stream; + let subscription = setup.subscription; // Start the scanner in sync mode from the future block - scanner.start().await?; + let proof = scanner.start().await?; + let stream = subscription.stream(&proof); // Send 2 transactions that should not appear in the stream contract.increase().send().await?.watch().await?; @@ -85,7 +87,7 @@ async fn sync_from_future_block_waits_until_minted() -> anyhow::Result<()> { #[tokio::test] async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> { // any reorg ≤ 5 should be invisible to consumers - let SyncScannerSetup { provider, contract, scanner, mut stream, anvil: _anvil } = + let SyncScannerSetup { provider, contract, scanner, subscription, anvil: _anvil } = setup_sync_scanner(None, None, BlockNumberOrTag::Earliest, 5).await?; // mine some initial "historic" blocks @@ -93,7 +95,8 @@ async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> { contract.increase().send().await?.watch().await?; } - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // assert historic events are streamed in a batch assert_next!( diff --git a/tests/sync/from_latest.rs b/tests/sync/from_latest.rs index fd60445c..fe21f554 100644 --- a/tests/sync/from_latest.rs +++ b/tests/sync/from_latest.rs @@ -10,7 +10,7 @@ async fn happy_path_no_duplicates() -> anyhow::Result<()> { let setup = setup_sync_from_latest_scanner(None, None, 3, 0).await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; // Historical: produce 6 events total contract.increase().send().await?.watch().await?; @@ -21,7 +21,8 @@ async fn happy_path_no_duplicates() -> anyhow::Result<()> { contract.increase().send().await?.watch().await?; // Ask for the latest 3, then live - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Latest phase assert_next!( @@ -58,13 +59,14 @@ async fn fewer_historical_then_continues_live() -> anyhow::Result<()> { let setup = setup_sync_from_latest_scanner(None, None, 5, 0).await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; // Historical: only 2 available contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Latest phase returns all available assert_next!( @@ -100,7 +102,7 @@ async fn exact_historical_count_then_live() -> anyhow::Result<()> { let setup = setup_sync_from_latest_scanner(None, None, 4, 0).await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; // Historical: produce exactly 4 events contract.increase().send().await?.watch().await?; @@ -108,7 +110,8 @@ async fn exact_historical_count_then_live() -> anyhow::Result<()> { contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); assert_next!( stream, @@ -141,9 +144,10 @@ async fn no_historical_only_live_streams() -> anyhow::Result<()> { let setup = setup_sync_from_latest_scanner(None, None, 5, 0).await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Latest is empty assert_next!(stream, Notification::NoPastLogsFound); @@ -176,7 +180,7 @@ async fn block_gaps_do_not_affect_number_of_events_streamed() -> anyhow::Result< let provider = setup.provider; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; // Historical: emit 3, mine 1 empty block to form a clear boundary contract.increase().send().await?.watch().await?; @@ -188,7 +192,8 @@ async fn block_gaps_do_not_affect_number_of_events_streamed() -> anyhow::Result< provider.primary().anvil_mine(Some(1), None).await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Latest phase assert_next!( @@ -221,14 +226,15 @@ async fn waiting_on_live_logs_arriving() -> anyhow::Result<()> { let setup = setup_sync_from_latest_scanner(None, None, 3, 0).await?; let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream; + let subscription = setup.subscription; // Historical: emit 3 contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; - scanner.start().await?; + let proof = scanner.start().await?; + let mut stream = subscription.stream(&proof); // Latest phase assert_next!(