Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ffb79f3
feat: add handle and subscribe
LeoPatOZ Dec 3, 2025
3eb60b2
feat: add handle to scanner modes
LeoPatOZ Dec 3, 2025
4457c1d
feat: subscribe returns event subscription
LeoPatOZ Dec 3, 2025
e1eac9e
test: update to use new event subscription
LeoPatOZ Dec 3, 2025
ca78c76
feat: update examples to show new sub handler
LeoPatOZ Dec 3, 2025
ae0ea40
doc: update to show new handle
LeoPatOZ Dec 3, 2025
b113062
ref: merge into one stream file
LeoPatOZ Dec 3, 2025
c307014
doc: fix
LeoPatOZ Dec 3, 2025
1d110d6
Merge branch 'main' into start-typestate
LeoPatOZ Dec 3, 2025
95bfb72
fix: format
LeoPatOZ Dec 3, 2025
8b79a56
ref: handle to token
LeoPatOZ Dec 4, 2025
ffe2219
Merge branch 'main' into start-typestate
LeoPatOZ Dec 5, 2025
9d28c26
Merge branch 'main' into start-typestate
LeoPatOZ Dec 5, 2025
9b5f5a8
Merge branch 'main' into start-typestate
LeoPatOZ Dec 8, 2025
04b898d
Merge branch 'main' into start-typestate
LeoPatOZ Dec 11, 2025
7ee4006
fix: merge
LeoPatOZ Dec 11, 2025
920ab0e
Merge branch 'main' into start-typestate
LeoPatOZ Dec 15, 2025
5a0a1a4
fix: format
LeoPatOZ Dec 15, 2025
2961ac3
fix: import in lib.rs
LeoPatOZ Dec 15, 2025
1bab5cd
Merge branch 'main' into start-typestate
LeoPatOZ Dec 15, 2025
9a2d37f
Merge branch 'main' into start-typestate
LeoPatOZ Dec 16, 2025
b73aa5c
Merge branch 'main' into start-typestate
LeoPatOZ Dec 18, 2025
899366d
Merge branch 'main' into start-typestate
LeoPatOZ Dec 19, 2025
d4a7b69
fix: merge
LeoPatOZ Dec 19, 2025
43b8755
fix conflicts
0xNeshi Dec 26, 2025
b356238
move EventSubscription to root es mod
0xNeshi Dec 26, 2025
3ef6cd7
ScannerToken->StartProof
0xNeshi Dec 26, 2025
e2d6b9c
docs: update
0xNeshi Dec 26, 2025
937a3be
tests: update token->proof
0xNeshi Dec 26, 2025
245315d
test: revert import removal in historic/basic
0xNeshi Dec 26, 2025
d457282
remove builder
0xNeshi Dec 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,13 @@ async fn run_scanner(
.contract_address(contract)
.event(MyContract::SomeEvent::SIGNATURE);

let mut stream = scanner.subscribe(filter);
let subscription = scanner.subscribe(filter);

// Start the scanner
scanner.start().await?;
// Start the scanner and get the proof
let proof = scanner.start().await?;

// Access the stream using the proof
let mut stream = subscription.stream(&proof);

// Process messages from the stream
while let Some(message) = stream.next().await {
Expand Down Expand Up @@ -174,7 +177,7 @@ let scanner = EventScannerBuilder::sync()
.await?;
```

Invoking `scanner.start()` starts the scanner in the specified mode.
Invoking `scanner.start()` starts the scanner in the specified mode and returns a `StartProof` that must be passed to `subscription.stream()` to access the event stream. This compile-time guarantee ensures the scanner is started before attempting to read events.

### Defining Event Filters

Expand Down
7 changes: 5 additions & 2 deletions examples/historical_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ async fn main() -> anyhow::Result<()> {

let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?;

let mut stream = scanner.subscribe(increase_filter);
let subscription = scanner.subscribe(increase_filter);

scanner.start().await.expect("failed to start scanner");
let proof = scanner.start().await.expect("failed to start scanner");

// Access the stream using the proof (proves scanner is started)
let mut stream = subscription.stream(&proof);

while let Some(message) = stream.next().await {
match message {
Expand Down
7 changes: 5 additions & 2 deletions examples/latest_events_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,16 @@ async fn main() -> anyhow::Result<()> {

let mut scanner = EventScannerBuilder::latest(5).connect(robust_provider).await?;

let mut stream = scanner.subscribe(increase_filter);
let subscription = scanner.subscribe(increase_filter);

for _ in 0..8 {
_ = counter_contract.increase().send().await?;
}

scanner.start().await?;
let proof = scanner.start().await?;

// Access the stream using the proof (proves scanner is started)
let mut stream = subscription.stream(&proof);

while let Some(message) = stream.next().await {
match message {
Expand Down
7 changes: 5 additions & 2 deletions examples/live_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ async fn main() -> anyhow::Result<()> {

let mut scanner = EventScannerBuilder::live().connect(robust_provider).await?;

let mut stream = scanner.subscribe(increase_filter);
let subscription = scanner.subscribe(increase_filter);

scanner.start().await.expect("failed to start scanner");
let proof = scanner.start().await.expect("failed to start scanner");

// Access the stream using the proof (proves scanner is started)
let mut stream = subscription.stream(&proof);

_ = counter_contract.increase().send().await?;

Expand Down
7 changes: 5 additions & 2 deletions examples/sync_from_block_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,13 @@ async fn main() -> anyhow::Result<()> {

let mut scanner = EventScannerBuilder::sync().from_block(0).connect(robust_provider).await?;

let mut stream = scanner.subscribe(increase_filter);
let subscription = scanner.subscribe(increase_filter);

info!("Starting sync scanner...");
scanner.start().await.expect("failed to start scanner");
let proof = scanner.start().await.expect("failed to start scanner");

// Access the stream using the proof (proves scanner is started)
let mut stream = subscription.stream(&proof);

info!("Creating live events...");
for i in 0..2 {
Expand Down
7 changes: 5 additions & 2 deletions examples/sync_from_latest_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,16 @@ async fn main() -> anyhow::Result<()> {

let mut client = EventScannerBuilder::sync().from_latest(5).connect(robust_provider).await?;

let mut stream = client.subscribe(increase_filter);
let subscription = client.subscribe(increase_filter);

for _ in 0..10 {
_ = counter_contract.increase().send().await?;
}

client.start().await.expect("failed to start scanner");
let proof = client.start().await.expect("failed to start scanner");

// Access the stream using the proof (proves scanner is started)
let mut stream = subscription.stream(&proof);

// emit some events for live mode to pick up
_ = counter_contract.increase().send().await?;
Expand Down
4 changes: 2 additions & 2 deletions src/event_scanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ mod scanner;
pub use filter::EventFilter;
pub use message::{EventScannerResult, Message};
pub use scanner::{
DEFAULT_MAX_CONCURRENT_FETCHES, EventScanner, EventScannerBuilder, Historic, LatestEvents,
Live, SyncFromBlock, SyncFromLatestEvents, block_range_handler,
DEFAULT_MAX_CONCURRENT_FETCHES, EventScanner, EventScannerBuilder, EventSubscription, Historic,
LatestEvents, Live, StartProof, SyncFromBlock, SyncFromLatestEvents, block_range_handler,
};
9 changes: 6 additions & 3 deletions src/event_scanner/scanner/historic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use alloy::{
use super::block_range_handler::StreamHandler;
use crate::{
EventScannerBuilder, ScannerError,
event_scanner::scanner::{EventScanner, Historic, block_range_handler::BlockRangeHandler},
event_scanner::{
StartProof,
scanner::{EventScanner, Historic, block_range_handler::BlockRangeHandler},
},
robust_provider::IntoRobustProvider,
};

Expand Down Expand Up @@ -133,7 +136,7 @@ impl<N: Network> EventScanner<Historic, N> {
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
/// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved.
pub async fn start(self) -> Result<(), ScannerError> {
pub async fn start(self) -> Result<StartProof, ScannerError> {
info!(
from_block = ?self.config.from_block,
to_block = ?self.config.to_block,
Expand All @@ -159,7 +162,7 @@ impl<N: Network> EventScanner<Historic, N> {
handler.handle(stream).await;
});

Ok(())
Ok(StartProof::new())
}
}

Expand Down
14 changes: 11 additions & 3 deletions src/event_scanner/scanner/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use alloy::{
use crate::{
EventScannerBuilder, ScannerError,
event_scanner::{
EventScanner, LatestEvents,
EventScanner, LatestEvents, StartProof,
scanner::block_range_handler::{BlockRangeHandler, LatestEventsHandler},
},
robust_provider::IntoRobustProvider,
Expand Down Expand Up @@ -146,7 +146,15 @@ impl<N: Network> EventScanner<LatestEvents, N> {
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
/// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved.
pub async fn start(self) -> Result<(), ScannerError> {
pub async fn start(self) -> Result<StartProof, ScannerError> {
info!(
from_block = ?self.config.from_block,
to_block = ?self.config.to_block,
count = ?self.config.count,
listener_count = self.listeners.len(),
"Starting EventScanner in LatestEvents mode"
);

let stream = self
.block_range_scanner
.stream_rewind(self.config.from_block, self.config.to_block)
Expand All @@ -166,7 +174,7 @@ impl<N: Network> EventScanner<LatestEvents, N> {
handler.handle(stream).await;
});

Ok(())
Ok(StartProof::new())
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/event_scanner/scanner/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use alloy::network::Network;
use crate::{
EventScannerBuilder, ScannerError,
event_scanner::{
EventScanner,
EventScanner, StartProof,
scanner::{
Live,
block_range_handler::{BlockRangeHandler, StreamHandler},
Expand Down Expand Up @@ -72,7 +72,7 @@ impl<N: Network> EventScanner<Live, N> {
///
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
pub async fn start(self) -> Result<(), ScannerError> {
pub async fn start(self) -> Result<StartProof, ScannerError> {
info!(
block_confirmations = self.config.block_confirmations,
listener_count = self.listeners.len(),
Expand All @@ -93,7 +93,7 @@ impl<N: Network> EventScanner<Live, N> {
handler.handle(stream).await;
});

Ok(())
Ok(StartProof::new())
}
}

Expand Down
115 changes: 101 additions & 14 deletions src/event_scanner/scanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use crate::{
BlockRangeScannerBuilder, EventFilter, ScannerError,
block_range_scanner::{BlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, RingBufferCapacity},
event_scanner::{EventScannerResult, listener::EventListener},
block_range_scanner::{
BlockRangeScanner, BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS,
RingBufferCapacity,
},
error::ScannerError,
event_scanner::{EventScannerResult, filter::EventFilter, listener::EventListener},
robust_provider::IntoRobustProvider,
};

Expand Down Expand Up @@ -191,9 +194,9 @@ impl EventScannerBuilder<Unspecified> {
/// let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?;
///
/// let filter = EventFilter::new().contract_address(contract_address);
/// let mut stream = scanner.subscribe(filter);
///
/// scanner.start().await?;
/// let subscription = scanner.subscribe(filter);
/// let proof = scanner.start().await?;
/// let mut stream = subscription.stream(&proof);
///
/// while let Some(Ok(Message::Data(logs))) = stream.next().await {
/// println!("Received {} logs", logs.len());
Expand Down Expand Up @@ -268,9 +271,9 @@ impl EventScannerBuilder<Unspecified> {
/// .await?;
///
/// let filter = EventFilter::new().contract_address(contract_address);
/// let mut stream = scanner.subscribe(filter);
///
/// scanner.start().await?;
/// let subscription = scanner.subscribe(filter);
/// let proof = scanner.start().await?;
/// let mut stream = subscription.stream(&proof);
///
/// while let Some(msg) = stream.next().await {
/// match msg {
Expand Down Expand Up @@ -358,9 +361,9 @@ impl EventScannerBuilder<Unspecified> {
/// let mut scanner = EventScannerBuilder::latest(10).connect(robust_provider).await?;
///
/// let filter = EventFilter::new().contract_address(contract_address);
/// let mut stream = scanner.subscribe(filter);
///
/// scanner.start().await?;
/// let subscription = scanner.subscribe(filter);
/// let proof = scanner.start().await?;
/// let mut stream = subscription.stream(&proof);
///
/// // Expect a single message with up to 10 logs, then the stream ends
/// while let Some(Ok(Message::Data(logs))) = stream.next().await {
Expand Down Expand Up @@ -568,6 +571,58 @@ impl<Mode> EventScannerBuilder<Mode> {
}
}

/// A subscription to scanner events that requires proof the scanner has started.
///
/// Created by [`EventScanner::subscribe()`](crate::EventScanner::subscribe), this type holds the
/// underlying stream but prevents access until [`stream()`](EventSubscription::stream) is called
/// with a valid [`StartProof`].
///
/// This pattern ensures at compile time that [`EventScanner::start()`](crate::EventScanner::start)
/// is called before attempting to read from the event stream.
///
/// # Example
///
/// ```ignore
/// let mut scanner = EventScannerBuilder::live().connect(provider).await?;
///
/// // Create subscription (cannot access stream yet)
/// let subscription = scanner.subscribe(filter);
///
/// // Start scanner and get proof
/// let proof = scanner.start().await?;
///
/// // Now access the stream with the proof
/// let mut stream = subscription.stream(&proof);
///
/// while let Some(msg) = stream.next().await {
/// // process events
/// }
/// ```
pub struct EventSubscription {
inner: ReceiverStream<EventScannerResult>,
}

impl EventSubscription {
/// Creates a new subscription wrapping the given stream.
pub(crate) fn new(inner: ReceiverStream<EventScannerResult>) -> Self {
Self { inner }
}

/// Access the event stream.
///
/// Requires a reference to a [`StartProof`] as proof that the scanner
/// has been started. The proof is obtained by calling
/// `EventScanner::start()`.
///
/// # Arguments
///
/// * `_proof` - Proof that the scanner has been started
#[must_use]
pub fn stream(self, _proof: &StartProof) -> ReceiverStream<EventScannerResult> {
self.inner
}
}

impl<Mode, N: Network> EventScanner<Mode, N> {
/// Returns the configured stream buffer capacity.
#[must_use]
Expand All @@ -593,11 +648,43 @@ impl<Mode, N: Network> EventScanner<Mode, N> {
///
/// For scanner to properly stream events, register all subscriptions before calling `start()`.
#[must_use]
pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream<EventScannerResult> {
pub fn subscribe(&mut self, filter: EventFilter) -> EventSubscription {
let (sender, receiver) =
mpsc::channel::<EventScannerResult>(self.block_range_scanner.buffer_capacity());
self.listeners.push(EventListener { filter, sender });
ReceiverStream::new(receiver)
EventSubscription::new(ReceiverStream::new(receiver))
}
}

/// Proof that the scanner has been started.
///
/// This proof is returned by `EventScanner::start()` and must be passed to
/// [`EventSubscription::stream()`] to access the event stream. This ensures at compile
/// time that the scanner is started before attempting to read events.
///
/// # Example
///
/// ```ignore
/// let mut scanner = EventScannerBuilder::sync().from_block(0).connect(provider).await?;
/// let subscription = scanner.subscribe(filter);
///
/// // Start the scanner and get the proof
/// let proof = scanner.start().await?;
///
/// // Now we can access the stream
/// let mut stream = subscription.stream(&proof);
/// ```
#[derive(Debug, Clone)]
pub struct StartProof {
/// Private field prevents construction outside this crate
_private: (),
}

impl StartProof {
/// Creates a new start proof.
#[must_use]
pub(crate) fn new() -> Self {
Self { _private: () }
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/event_scanner/scanner/sync/from_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use alloy::{eips::BlockId, network::Network};
use crate::{
EventScannerBuilder, ScannerError,
event_scanner::{
EventScanner, SyncFromBlock,
EventScanner, StartProof, SyncFromBlock,
scanner::block_range_handler::{BlockRangeHandler, StreamHandler},
},
robust_provider::IntoRobustProvider,
Expand Down Expand Up @@ -75,7 +75,7 @@ impl<N: Network> EventScanner<SyncFromBlock, N> {
/// * [`ScannerError::Timeout`] - if an RPC call required for startup times out.
/// * [`ScannerError::RpcError`] - if an RPC call required for startup fails.
/// * [`ScannerError::BlockNotFound`] - if `from_block` cannot be resolved.
pub async fn start(self) -> Result<(), ScannerError> {
pub async fn start(self) -> Result<StartProof, ScannerError> {
info!(
from_block = ?self.config.from_block,
block_confirmations = self.config.block_confirmations,
Expand All @@ -101,7 +101,7 @@ impl<N: Network> EventScanner<SyncFromBlock, N> {
handler.handle(stream).await;
});

Ok(())
Ok(StartProof::new())
}
}

Expand Down
Loading