diff --git a/CHANGELOG.md b/CHANGELOG.md index e0b49e7e9787..7ee49ab2bae5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ ### Added +- [#6941](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` RPC method now supports the `pendingTransactions` subscription. + - [#6012](https://github.com/ChainSafe/forest/issues/6012): Stricter validation of address arguments in `forest-wallet` subcommands. ### Changed @@ -41,6 +43,8 @@ ### Fixed +- [#6941](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` `logs` subscription now emits one log object per notification instead of one array of logs per tipset. + ## Forest v0.33.4 "Stray" Mandatory release for mainnet node operators. It includes support for the _NV28 FireHorse_ network upgrade on mainnet, which is set to activate at epoch `6052800` (2026-05-27T14:00:00Z). It also includes a few improvements and fixes for the JSON-RPC server. diff --git a/src/message_pool/msgpool/events.rs b/src/message_pool/msgpool/events.rs index 85151ff44dc3..8c44d61d2a39 100644 --- a/src/message_pool/msgpool/events.rs +++ b/src/message_pool/msgpool/events.rs @@ -8,9 +8,9 @@ use crate::message::SignedMessage; pub(in crate::message_pool) const MPOOL_UPDATE_CHANNEL_CAPACITY: usize = 256; /// A change to the pending pool. -#[allow(dead_code)] // TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941 #[derive(Clone, Debug)] pub enum MpoolUpdate { Add(SignedMessage), + #[allow(dead_code)] Remove(SignedMessage), } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 85f40f2f359a..0b1636a9eed4 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -430,7 +430,6 @@ where /// Subscribe to [`MpoolUpdate`] events for every insertion into and /// removal from the pending pool. - #[allow(dead_code)] // surfaces the MpoolUpdate API for external subscribers. pub fn subscribe_to_updates(&self) -> broadcast::Receiver { self.pending.subscribe() } diff --git a/src/message_pool/msgpool/pending_store.rs b/src/message_pool/msgpool/pending_store.rs index 76c70202cd60..0c49f18357a2 100644 --- a/src/message_pool/msgpool/pending_store.rs +++ b/src/message_pool/msgpool/pending_store.rs @@ -121,7 +121,6 @@ impl PendingStore { /// Subscribe to the [`MpoolUpdate`] stream. Returned receiver is /// independent; dropping it does not affect other subscribers. - #[allow(dead_code)] // consumed by MessagePool::subscribe_to_updates / external subscribers. pub fn subscribe(&self) -> broadcast::Receiver { self.inner.events.subscribe() } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index f9f8f6d23bbd..d153188ce9cd 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -18,10 +18,6 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::message::{ChainMessage, SignedMessage}; use crate::prelude::*; -use crate::rpc::eth::{ - Block as EthBlock, EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders, - types::EthFilterSpec, -}; use crate::rpc::f3::F3ExportLatestSnapshot; use crate::rpc::types::*; use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError}; @@ -49,7 +45,6 @@ use tokio::sync::{ Mutex, broadcast::{self, Receiver as Subscriber}, }; -use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; const HEAD_CHANNEL_CAPACITY: usize = 10; @@ -72,79 +67,6 @@ pub const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200; static CHAIN_EXPORT_LOCK: LazyLock>> = LazyLock::new(|| Mutex::new(None)); -/// Subscribes to head changes from the chain store and broadcasts new blocks. -/// -/// # Notes -/// -/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`, -/// allowing manual cleanup if needed. -pub(crate) fn new_heads(data: Ctx) -> (Subscriber, JoinHandle<()>) { - let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - - let mut head_changes_rx = data.chain_store().subscribe_head_changes(); - - let handle = tokio::spawn(async move { - while let Ok(changes) = head_changes_rx.recv().await { - for ts in changes.applies { - // Convert the tipset to an Ethereum block with full transaction info - // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block - match EthBlock::from_filecoin_tipset(&data.state_manager, ts, TxInfo::Full).await { - Ok(block) => { - if let Err(e) = sender.send(ApiHeaders(block)) { - tracing::error!("Failed to send headers: {}", e); - return; - } - } - Err(e) => { - tracing::error!("Failed to convert tipset to eth block: {}", e); - } - } - } - } - }); - - (receiver, handle) -} - -/// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs. -/// -/// # Notes -/// -/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`, -/// allowing manual cleanup if needed. -pub(crate) fn logs( - ctx: &Ctx, - filter: Option, -) -> (Subscriber>, JoinHandle<()>) { - let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - - let mut head_changes_rx = ctx.chain_store().subscribe_head_changes(); - - let ctx = ctx.clone(); - - let handle = tokio::spawn(async move { - while let Ok(changes) = head_changes_rx.recv().await { - for ts in changes.applies { - match eth_logs_with_filter(&ctx, &ts, filter.clone()).await { - Ok(logs) => { - if !logs.is_empty() - && let Err(e) = sender.send(logs) - { - tracing::error!("Failed to send logs for tipset {}: {}", ts.key(), e); - break; - } - } - Err(e) => { - tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e); - } - } - } - } - }); - - (receiver, handle) -} - pub enum ChainGetFinalizedTipset {} impl RpcMethod<0> for ChainGetFinalizedTipset { const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet"; diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 18908282a310..275da79789c1 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -3056,7 +3056,7 @@ fn eth_log_from_event(entries: &[EventEntry]) -> Option<(EthBytes, Vec) Some((data, topics)) } -fn eth_tx_hash_from_signed_message( +pub(crate) fn eth_tx_hash_from_signed_message( message: &SignedMessage, eth_chain_id: EthChainIdType, ) -> anyhow::Result { diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 8c81a284b0fe..f0ae9e113895 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -59,14 +59,20 @@ //! ``` //! -use crate::rpc::eth::pubsub_trait::{ - EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams, +use crate::blocks::Tipset; +use crate::message_pool::MpoolUpdate; +use crate::prelude::ShallowClone; +use crate::rpc::RPCState; +use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; +use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec}; +use crate::rpc::eth::{ + Block as EthBlock, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message, }; -use crate::rpc::{RPCState, chain}; -use jsonrpsee::PendingSubscriptionSink; -use jsonrpsee::core::{SubscriptionError, SubscriptionResult}; +use crate::utils::broadcast::subscription_stream; +use futures::{Stream, StreamExt as _}; +use jsonrpsee::core::SubscriptionResult; +use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink}; use std::sync::Arc; -use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; #[derive(derive_more::Constructor)] pub struct EthPubSub { @@ -82,22 +88,13 @@ impl EthPubSubApiServer for EthPubSub { params: Option, ) -> SubscriptionResult { let sink = pending.accept().await?; - let ctx = self.ctx.clone(); - + let ctx = self.ctx.shallow_clone(); match kind { - SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await, - SubscriptionKind::PendingTransactions => { - return Err(SubscriptionError::from( - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, - "pendingTransactions subscription not yet implemented", - None::<()>, - ), - )); - } + SubscriptionKind::NewHeads => spawn_new_heads(sink, ctx), + SubscriptionKind::PendingTransactions => spawn_pending_transactions(sink, ctx), SubscriptionKind::Logs => { - let filter = params.and_then(|p| p.filter); - self.handle_logs_subscription(sink, ctx, filter).await + let filter = params.and_then(|p| p.filter).map(EthFilterSpec::from); + spawn_logs(sink, ctx, filter); } } @@ -105,70 +102,95 @@ impl EthPubSubApiServer for EthPubSub { } } -impl EthPubSub { - async fn handle_new_heads_subscription( - &self, - accepted_sink: jsonrpsee::SubscriptionSink, - ctx: Arc, - ) { - let (subscriber, handle) = chain::new_heads(ctx); - tokio::spawn(async move { - handle_subscription(subscriber, accepted_sink, handle).await; - }); - } +/// Stream of tipsets as they are applied to the chain head. Reverts are +/// ignored; lagged events are dropped (and logged) by [`subscription_stream`]. +fn head_applied_tipsets(ctx: &Arc) -> impl Stream + Send + use<> { + subscription_stream(ctx.chain_store().subscribe_head_changes()) + .flat_map(|changes| futures::stream::iter(changes.applies)) +} - async fn handle_logs_subscription( - &self, - accepted_sink: jsonrpsee::SubscriptionSink, - ctx: Arc, - filter_spec: Option, - ) { - let filter_spec = filter_spec.map(Into::into); - let (logs, handle) = chain::logs(&ctx, filter_spec); - tokio::spawn(async move { - handle_subscription(logs, accepted_sink, handle).await; - }); - } +fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { + let stream = head_applied_tipsets(&ctx) + .filter_map(move |ts| { + let state_mngr = ctx.state_manager.shallow_clone(); + async move { + EthBlock::from_filecoin_tipset(&state_mngr, ts, TxInfo::Full) + .await + .inspect_err(|e| { + tracing::error!("Failed to convert tipset to eth block: {e:#}") + }) + .ok() + .map(ApiHeaders) + } + }) + .boxed(); + tokio::spawn(pipe_stream_to_sink(stream, sink)); } -async fn handle_subscription( - mut subscriber: Subscriber, - sink: jsonrpsee::SubscriptionSink, - handle: tokio::task::JoinHandle<()>, -) where - T: serde::Serialize + Clone, +fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option) { + let stream = head_applied_tipsets(&ctx) + .filter_map(move |ts| { + let ctx = ctx.shallow_clone(); + let filter = filter.clone(); + async move { + eth_logs_with_filter(&ctx, &ts, filter) + .await + .inspect_err(|e| { + tracing::error!("Failed to fetch logs for tipset {}: {e:#}", ts.key()) + }) + .ok() + } + }) + .flat_map(futures::stream::iter) + .boxed(); + tokio::spawn(pipe_stream_to_sink(stream, sink)); +} + +fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc) { + let mpool_rx = ctx.mpool.subscribe_to_updates(); + let eth_chain_id = ctx.chain_config().eth_chain_id; + let stream = subscription_stream(mpool_rx) + .filter_map(move |update| async move { + let MpoolUpdate::Add(msg) = update else { + return None; + }; + eth_tx_hash_from_signed_message(&msg, eth_chain_id).ok() + }) + .boxed(); + tokio::spawn(pipe_stream_to_sink(stream, sink)); +} + +/// Forward stream items to the subscription sink until the sink is closed, +/// the client disconnects, or the upstream stream ends. The stream is +/// expected to absorb upstream backpressure (e.g. `Lagged`) on its own; this +/// helper only cares about the sink side. +async fn pipe_stream_to_sink(mut stream: S, sink: SubscriptionSink) +where + S: Stream + Unpin + Send, + T: serde::Serialize + Send, { loop { tokio::select! { - action = subscriber.recv() => { - match action { - Ok(v) => { - match jsonrpsee::SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &v) { - Ok(msg) => { - if let Err(e) = sink.send(msg).await { - tracing::error!("Failed to send message: {:?}", e); - break; - } - } - Err(e) => { - tracing::error!("Failed to serialize message: {:?}", e); - break; - } - } - } - Err(RecvError::Closed) => { + _ = sink.closed() => break, + maybe = stream.next() => { + let Some(item) = maybe else { break }; + let msg = match jsonrpsee::SubscriptionMessage::new( + sink.method_name(), + sink.subscription_id(), + &item, + ) { + Ok(m) => m, + Err(e) => { + tracing::error!("Failed to serialize subscription message: {e:?}"); break; } - Err(RecvError::Lagged(_)) => { - } + }; + if let Err(e) = sink.send(msg).await { + tracing::debug!("Subscription sink send failed (client disconnected): {e:?}"); + break; } } - _ = sink.closed() => { - break; - } } } - handle.abort(); - - tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id()); + tracing::debug!("Subscription task ended (id: {:?})", sink.subscription_id()); } diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 7feed17c22e2..8df22c542646 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -4,6 +4,7 @@ use crate::eth::EVMMethod; use crate::message::SignedMessage; use crate::networks::calibnet::ETH_CHAIN_ID; use crate::rpc::eth::EthUint64; +use crate::rpc::eth::pubsub_trait::SubscriptionKind; use crate::rpc::eth::types::*; use crate::rpc::types::ApiTipsetKey; use crate::rpc::{self, RpcMethod, prelude::*}; @@ -172,12 +173,30 @@ pub(super) async fn run_tests( Ok(()) } +/// A client-side WebSocket stream to the node's JSON-RPC endpoint. +type EthSubStream = + tokio_tungstenite::WebSocketStream>; + +/// Open a WebSocket to the node's JSON-RPC endpoint (`/rpc/v1`). +/// Returns the live stream. +async fn connect_ws(client: &rpc::Client) -> anyhow::Result { + let mut url = client.base_url().clone(); + let ws_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + scheme => anyhow::bail!("unsupported RPC URL scheme: {scheme}"), + }; + url.set_scheme(ws_scheme) + .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; + url.set_path("/rpc/v1"); + let (ws_stream, _) = connect_async(url.as_str()).await?; + Ok(ws_stream) +} + #[allow(unreachable_code)] async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { async fn close_channel( - stream: &mut tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream, - >, + stream: &mut EthSubStream, id: &serde_json::Value, ) -> anyhow::Result<()> { let request = json!({ @@ -195,13 +214,7 @@ async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { Ok(()) } - let mut url = client.base_url().clone(); - url.set_scheme("ws") - .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; - url.set_path("/rpc/v1"); - - // Note: The token is not required for the ChainNotify method. - let (mut ws_stream, _) = connect_async(url.as_str()).await?; + let mut ws_stream = connect_ws(client).await?; let request = json!({ "jsonrpc": "2.0", @@ -359,6 +372,267 @@ async fn invoke_contract(client: &rpc::Client, tx: &TestTransaction) -> anyhow:: Ok(cid) } +/// Open a WebSocket and start an `eth_subscribe` subscription of the given +/// `kind`, with optional `filter` params (used by `logs`). Returns the live +/// stream and the assigned subscription id. +async fn open_eth_subscription( + client: &rpc::Client, + kind: SubscriptionKind, + filter: Option, +) -> anyhow::Result<(EthSubStream, serde_json::Value)> { + let mut ws_stream = connect_ws(client).await?; + + let mut params = vec![serde_json::to_value(kind)?]; + params.extend(filter); + let request = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_subscribe", + "params": params, + }); + ws_stream + .send(WsMessage::Text(request.to_string().into())) + .await + .context("failed to send eth_subscribe request")?; + + // The acknowledgement carries the subscription id in `result`. + let subscription_id = loop { + let msg = match tokio::time::timeout(Duration::from_secs(30), ws_stream.next()).await { + Ok(Some(msg)) => msg, + Ok(None) => anyhow::bail!("WebSocket stream closed before eth_subscribe ack"), + Err(_) => anyhow::bail!("timeout waiting for eth_subscribe ack"), + }; + match msg { + Ok(WsMessage::Text(text)) => { + let json: serde_json::Value = serde_json::from_str(&text)?; + if let Some(error) = json.get("error") { + anyhow::bail!("eth_subscribe failed: {error}"); + } + if let Some(result) = json.get("result") { + break result.clone(); + } + } + Err(..) | Ok(WsMessage::Close(..)) => { + anyhow::bail!("WebSocket closed before eth_subscribe ack") + } + _ => {} + } + }; + + Ok((ws_stream, subscription_id)) +} + +/// Wait for the next `eth_subscription` notification on `subscription_id` and +/// return its `result` payload. +async fn next_subscription_payload( + ws_stream: &mut EthSubStream, + subscription_id: &serde_json::Value, + timeout: Duration, +) -> anyhow::Result { + loop { + let msg = match tokio::time::timeout(timeout, ws_stream.next()).await { + Ok(Some(msg)) => msg, + Ok(None) => anyhow::bail!("WebSocket stream closed"), + Err(_) => anyhow::bail!("timeout waiting for subscription notification"), + }; + match msg { + Ok(WsMessage::Text(text)) => { + let json: serde_json::Value = serde_json::from_str(&text)?; + if json.get("method").and_then(|m| m.as_str()) != Some("eth_subscription") { + continue; + } + let params = json + .get("params") + .context("subscription notification missing params")?; + anyhow::ensure!( + params.get("subscription") == Some(subscription_id), + "subscription id mismatch in notification" + ); + return params + .get("result") + .cloned() + .context("subscription notification missing result"); + } + Err(..) | Ok(WsMessage::Close(..)) => anyhow::bail!("WebSocket closed unexpectedly"), + _ => {} + } + } +} + +/// Cancel the subscription and close the WebSocket. Best-effort: the node also +/// drops the subscription when the socket closes. +async fn close_eth_subscription( + ws_stream: &mut EthSubStream, + subscription_id: &serde_json::Value, +) -> anyhow::Result<()> { + let request = json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "eth_unsubscribe", + "params": [subscription_id], + }); + ws_stream + .send(WsMessage::Text(request.to_string().into())) + .await + .context("failed to send eth_unsubscribe request")?; + ws_stream.close(None).await?; + Ok(()) +} + +fn eth_subscribe_new_heads() -> RpcTestScenario { + RpcTestScenario::basic(|client| async move { + // Remember the chain head before subscribing so we can prove the head we + // receive is real, advancing — not stale or random data. + let start = client.call(EthBlockNumber::request(())?).await?; + + let (mut ws_stream, subscription_id) = + open_eth_subscription(&client, SubscriptionKind::NewHeads, None).await?; + + // A new head is published once per tipset (~30s on calibnet). + let payload = + next_subscription_payload(&mut ws_stream, &subscription_id, Duration::from_secs(180)) + .await; + + let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; + let payload = payload?; + + // `newHeads` must yield a block-header object + anyhow::ensure!( + payload.is_object(), + "newHeads must yield a block-header object, got: {payload}" + ); + let header: ApiHeaders = serde_json::from_value(payload) + .context("newHeads payload is not a valid Eth block header")?; + + // Identity: the header's number must be at or beyond the head seen at + // subscription time, proving it's a genuine fresh head. + anyhow::ensure!( + header.0.number.0 >= start.0 as i64, + "newHeads number {} precedes the head {} seen at subscription time", + header.0.number.0, + start.0 + ); + Ok(()) + }) +} + +fn eth_subscribe_pending_transactions(tx: TestTransaction) -> RpcTestScenario { + RpcTestScenario::basic(move |client| { + let tx = tx.clone(); + async move { + let (mut ws_stream, subscription_id) = + open_eth_subscription(&client, SubscriptionKind::PendingTransactions, None).await?; + + // The subscription is active, so the pending tx we push now is observable. + let cid = invoke_contract(&client, &tx).await?; + let tx_hash = client + .call(EthGetTransactionHashByCid::request((cid,))?) + .await? + .context("no Eth transaction hash for CID")?; + + // Watch pending-tx hashes until `our` transaction shows up. + let watch = async { + loop { + let payload = next_subscription_payload( + &mut ws_stream, + &subscription_id, + Duration::from_secs(120), + ) + .await?; + // a pending tx is a single hash string. + anyhow::ensure!( + payload.is_string(), + "pendingTransactions must yield a tx-hash string, got: {payload}" + ); + let hash: EthHash = serde_json::from_value(payload) + .context("pendingTransactions payload is not an Eth hash")?; + // received hash must be `our` transaction hash. + if hash.eq(&tx_hash) { + break; + } + } + anyhow::Ok(()) + }; + let outcome = tokio::time::timeout(Duration::from_secs(120), watch) + .await + .unwrap_or_else(|_| { + Err(anyhow::anyhow!( + "timed out waiting for our pendingTransactions notification" + )) + }); + + let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; + outcome + } + }) +} + +/// Minimal typed view of an `EthLog` for the fields the log test asserts on. +#[derive(serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct LogView { + topics: Vec, + transaction_hash: EthHash, +} + +fn eth_subscribe_logs(tx: TestTransaction) -> RpcTestScenario { + RpcTestScenario::basic(move |client| { + let tx = tx.clone(); + async move { + let filter = json!({ + "address": [], + "topics": [tx.topic.to_string()], + }); + let (mut ws_stream, subscription_id) = + open_eth_subscription(&client, SubscriptionKind::Logs, Some(filter)).await?; + + // Emit the event on-chain and remember the exact tx that produced it, + // so we can confirm the log we receive is `ours` + let cid = invoke_contract(&client, &tx).await?; + let tx_hash = client + .call(EthGetTransactionHashByCid::request((cid,))?) + .await? + .context("no Eth transaction hash for CID")?; + + // Logs are delivered when the tipset holding the event is applied, + // which can take a few epochs (~30s each) on calibnet + let watch = async { + loop { + let payload = next_subscription_payload( + &mut ws_stream, + &subscription_id, + Duration::from_secs(300), + ) + .await?; + // A logs notification is a single log object (one per log, + // matching geth/reth/Lotus) — not an array. + anyhow::ensure!( + payload.is_object(), + "logs must yield a single log object, got: {payload}" + ); + let log: LogView = serde_json::from_value(payload) + .context("logs payload is not an Eth log")?; + // Identity: the log must carry our event topic and `our` tx hash. + if log.transaction_hash == tx_hash && log.topics.contains(&tx.topic) { + break; + } + } + anyhow::Ok(()) + }; + let outcome = tokio::time::timeout(Duration::from_secs(300), watch) + .await + .unwrap_or_else(|_| { + Err(anyhow::anyhow!( + "timed out waiting for our logs notification" + )) + }); + + let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; + outcome + } + }) +} + fn create_eth_new_filter_test() -> RpcTestScenario { RpcTestScenario::basic(|client| async move { const BLOCK_RANGE: u64 = 200; @@ -672,5 +946,23 @@ pub(super) async fn create_tests(tx: TestTransaction) -> Vec { EthGetFilterLogs, EthUninstallFilter ), + with_methods!( + eth_subscribe_new_heads().name("eth_subscribe newHeads works"), + EthSubscribe, + EthUnsubscribe + ), + with_methods!( + eth_subscribe_pending_transactions(tx.clone()) + .name("eth_subscribe pendingTransactions works"), + EthSubscribe, + EthUnsubscribe, + EthGetTransactionHashByCid + ), + with_methods!( + eth_subscribe_logs(tx.clone()).name("eth_subscribe logs works"), + EthSubscribe, + EthUnsubscribe, + EthGetTransactionHashByCid + ), ] } diff --git a/src/utils/broadcast/mod.rs b/src/utils/broadcast/mod.rs index ee34a6de496a..56b83fcd66a7 100644 --- a/src/utils/broadcast/mod.rs +++ b/src/utils/broadcast/mod.rs @@ -4,9 +4,29 @@ #[cfg(test)] mod tests; -use futures::FutureExt as _; +use futures::{FutureExt as _, Stream, StreamExt as _}; +use std::pin::Pin; +use tokio::sync::broadcast::Receiver; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; /// Returns `true` if there are any active subscribers to the given broadcast channel. pub fn has_subscribers(tx: &tokio::sync::broadcast::Sender) -> bool { tx.closed().now_or_never().is_none() } + +/// Wraps a broadcast [`Receiver`] as a pinned [`Stream`] that skips `Lagged` +/// events (logging the skip count at warn level) and terminates on `Closed`. +pub fn subscription_stream( + rx: Receiver, +) -> Pin + Send>> { + Box::pin(BroadcastStream::new(rx).filter_map(|r| async move { + match r { + Ok(v) => Some(v), + Err(BroadcastStreamRecvError::Lagged(n)) => { + tracing::warn!("broadcast subscription lagged: dropped {n} events"); + None + } + } + })) +} diff --git a/src/utils/broadcast/tests.rs b/src/utils/broadcast/tests.rs index 27e9af6f65f7..2f96aea03e49 100644 --- a/src/utils/broadcast/tests.rs +++ b/src/utils/broadcast/tests.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::*; +use futures::StreamExt as _; #[tokio::test] async fn test_has_subscribers() { @@ -23,3 +24,32 @@ async fn test_has_subscribers() { drop(rx3); assert!(!has_subscribers(&tx)); } + +#[tokio::test] +async fn subscription_stream_terminates_when_sender_dropped() { + let (tx, rx) = tokio::sync::broadcast::channel::(8); + let stream = subscription_stream(rx); + + tx.send(42).unwrap(); + drop(tx); + + let collected: Vec = stream.collect().await; + assert_eq!(collected, vec![42]); +} + +#[tokio::test] +async fn subscription_stream_skips_lagged_events_and_keeps_going() { + // capacity=2 ring buffer; sending 5 values forces the slow receiver to lag. + let (tx, rx) = tokio::sync::broadcast::channel::(2); + let stream = subscription_stream(rx); + + for i in 0..5u32 { + tx.send(i).unwrap(); + } + drop(tx); + + // After Lagged, tokio's BroadcastStream catches up to the buffered window. + // With capacity=2 the last two sends survive: 3 and 4. + let collected: Vec = stream.collect().await; + assert_eq!(collected, vec![3, 4]); +}