From 2214c872b9d5e17207d4e9b24f8e21e5f6cabc47 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 19 May 2026 18:31:53 +0530 Subject: [PATCH 1/5] add pending transaction support for the Eth subscription API --- src/message_pool/msgpool/events.rs | 2 +- src/message_pool/msgpool/msg_pool.rs | 1 - src/rpc/methods/chain.rs | 34 ++++++++++++++++++++++++++-- src/rpc/methods/eth.rs | 2 +- src/rpc/methods/eth/pubsub.rs | 21 ++++++++++------- src/utils/broadcast/mod.rs | 17 +++++++++++++- 6 files changed, 63 insertions(+), 14 deletions(-) diff --git a/src/message_pool/msgpool/events.rs b/src/message_pool/msgpool/events.rs index 85151ff44dc3..8c44d61d2a39 100644 --- a/src/message_pool/msgpool/events.rs +++ b/src/message_pool/msgpool/events.rs @@ -8,9 +8,9 @@ use crate::message::SignedMessage; pub(in crate::message_pool) const MPOOL_UPDATE_CHANNEL_CAPACITY: usize = 256; /// A change to the pending pool. -#[allow(dead_code)] // TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941 #[derive(Clone, Debug)] pub enum MpoolUpdate { Add(SignedMessage), + #[allow(dead_code)] Remove(SignedMessage), } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 85f40f2f359a..0b1636a9eed4 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -430,7 +430,6 @@ where /// Subscribe to [`MpoolUpdate`] events for every insertion into and /// removal from the pending pool. - #[allow(dead_code)] // surfaces the MpoolUpdate API for external subscribers. pub fn subscribe_to_updates(&self) -> broadcast::Receiver { self.pending.subscribe() } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index f9f8f6d23bbd..e80f83addce2 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -17,10 +17,11 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; #[cfg(test)] use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::message::{ChainMessage, SignedMessage}; +use crate::message_pool::MpoolUpdate; use crate::prelude::*; use crate::rpc::eth::{ - Block as EthBlock, EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders, - types::EthFilterSpec, + Block as EthBlock, EthLog, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message, + types::ApiHeaders, types::EthFilterSpec, }; use crate::rpc::f3::F3ExportLatestSnapshot; use crate::rpc::types::*; @@ -29,11 +30,13 @@ use crate::shim::clock::ChainEpoch; use crate::shim::error::ExitCode; use crate::shim::executor::Receipt; use crate::shim::message::Message; +use crate::utils::broadcast::subscription_stream; use crate::utils::db::CborStoreExt as _; use crate::utils::io::VoidAsyncWriter; use crate::utils::misc::env::is_env_truthy; use anyhow::{Context as _, Result}; use enumflags2::{BitFlags, make_bitflags}; +use futures::StreamExt; use fvm_ipld_encoding::{CborStore, RawBytes}; use hex::ToHex; use ipld_core::ipld::Ipld; @@ -51,6 +54,7 @@ use tokio::sync::{ }; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use crate::rpc::eth::types::EthHash; const HEAD_CHANNEL_CAPACITY: usize = 10; @@ -145,6 +149,32 @@ pub(crate) fn logs( (receiver, handle) } +/// Subscribe to mpool changes and broadcast the new pending transaction (only newly Added transaction is broadcasted) +pub(crate) fn new_pending_txns(ctx: &Ctx) -> (Subscriber, JoinHandle<()>) { + let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); + let mut stream = subscription_stream(ctx.mpool.subscribe_to_updates()); + let eth_chain_id = ctx.chain_config().eth_chain_id; + + let handle = tokio::spawn(async move { + while let Some(update) = stream.next().await { + let MpoolUpdate::Add(msg) = update else { + continue; + }; + + let Ok(hash) = eth_tx_hash_from_signed_message(&msg, eth_chain_id) else { + continue; + }; + + if let Err(e) = sender.send(hash) { + tracing::error!("Failed to send pending txn hash: {}", e); + return; + } + } + }); + + (receiver, handle) +} + pub enum ChainGetFinalizedTipset {} impl RpcMethod<0> for ChainGetFinalizedTipset { const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet"; diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 18908282a310..3f43aabcc90b 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -3056,7 +3056,7 @@ fn eth_log_from_event(entries: &[EventEntry]) -> Option<(EthBytes, Vec) Some((data, topics)) } -fn eth_tx_hash_from_signed_message( +pub fn eth_tx_hash_from_signed_message( message: &SignedMessage, eth_chain_id: EthChainIdType, ) -> anyhow::Result { diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 8c81a284b0fe..8dc0c14c04f5 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -64,7 +64,7 @@ use crate::rpc::eth::pubsub_trait::{ }; use crate::rpc::{RPCState, chain}; use jsonrpsee::PendingSubscriptionSink; -use jsonrpsee::core::{SubscriptionError, SubscriptionResult}; +use jsonrpsee::core::SubscriptionResult; use std::sync::Arc; use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; @@ -87,13 +87,7 @@ impl EthPubSubApiServer for EthPubSub { match kind { SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await, SubscriptionKind::PendingTransactions => { - return Err(SubscriptionError::from( - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, - "pendingTransactions subscription not yet implemented", - None::<()>, - ), - )); + self.handle_pending_txn_subscription(sink, ctx).await } SubscriptionKind::Logs => { let filter = params.and_then(|p| p.filter); @@ -129,6 +123,17 @@ impl EthPubSub { handle_subscription(logs, accepted_sink, handle).await; }); } + + async fn handle_pending_txn_subscription( + &self, + accepted_sink: jsonrpsee::SubscriptionSink, + ctx: Arc, + ) { + let (pending_rx, handle) = chain::new_pending_txns(&ctx); + tokio::spawn(async move { + handle_subscription(pending_rx, accepted_sink, handle).await; + }); + } } async fn handle_subscription( diff --git a/src/utils/broadcast/mod.rs b/src/utils/broadcast/mod.rs index ee34a6de496a..5fb66ab148cf 100644 --- a/src/utils/broadcast/mod.rs +++ b/src/utils/broadcast/mod.rs @@ -4,9 +4,24 @@ #[cfg(test)] mod tests; -use futures::FutureExt as _; +use futures::{FutureExt as _, Stream, StreamExt as _}; +use std::pin::Pin; +use tokio::sync::broadcast::Receiver; +use tokio_stream::wrappers::BroadcastStream; /// Returns `true` if there are any active subscribers to the given broadcast channel. pub fn has_subscribers(tx: &tokio::sync::broadcast::Sender) -> bool { tx.closed().now_or_never().is_none() } + +/// Wraps a broadcast [`Receiver`] as a pinned [`Stream`] that skips `Lagged` +/// events and terminates on `Closed`. +/// +/// Use this in place of a manual `rx.recv()` loop so the lagged/closed handling +/// stays DRY while each caller retains ownership of its own state across +/// iterations (no per-event `Arc::clone`s). +pub fn subscription_stream( + rx: Receiver, +) -> Pin + Send>> { + Box::pin(BroadcastStream::new(rx).filter_map(|r| async move { r.ok() })) +} From 2291945c750c03ecfd01218a6554e156cf640a68 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 19 May 2026 21:34:14 +0530 Subject: [PATCH 2/5] refactor subsciption to use stream instead of extra channel --- src/rpc/methods/chain.rs | 108 ---------------------- src/rpc/methods/eth.rs | 2 +- src/rpc/methods/eth/pubsub.rs | 164 ++++++++++++++++++---------------- src/utils/broadcast/mod.rs | 17 ++-- src/utils/broadcast/tests.rs | 30 +++++++ 5 files changed, 131 insertions(+), 190 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index e80f83addce2..d153188ce9cd 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -17,12 +17,7 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; #[cfg(test)] use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::message::{ChainMessage, SignedMessage}; -use crate::message_pool::MpoolUpdate; use crate::prelude::*; -use crate::rpc::eth::{ - Block as EthBlock, EthLog, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message, - types::ApiHeaders, types::EthFilterSpec, -}; use crate::rpc::f3::F3ExportLatestSnapshot; use crate::rpc::types::*; use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError}; @@ -30,13 +25,11 @@ use crate::shim::clock::ChainEpoch; use crate::shim::error::ExitCode; use crate::shim::executor::Receipt; use crate::shim::message::Message; -use crate::utils::broadcast::subscription_stream; use crate::utils::db::CborStoreExt as _; use crate::utils::io::VoidAsyncWriter; use crate::utils::misc::env::is_env_truthy; use anyhow::{Context as _, Result}; use enumflags2::{BitFlags, make_bitflags}; -use futures::StreamExt; use fvm_ipld_encoding::{CborStore, RawBytes}; use hex::ToHex; use ipld_core::ipld::Ipld; @@ -52,9 +45,7 @@ use tokio::sync::{ Mutex, broadcast::{self, Receiver as Subscriber}, }; -use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -use crate::rpc::eth::types::EthHash; const HEAD_CHANNEL_CAPACITY: usize = 10; @@ -76,105 +67,6 @@ pub const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200; static CHAIN_EXPORT_LOCK: LazyLock>> = LazyLock::new(|| Mutex::new(None)); -/// Subscribes to head changes from the chain store and broadcasts new blocks. -/// -/// # Notes -/// -/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`, -/// allowing manual cleanup if needed. -pub(crate) fn new_heads(data: Ctx) -> (Subscriber, JoinHandle<()>) { - let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - - let mut head_changes_rx = data.chain_store().subscribe_head_changes(); - - let handle = tokio::spawn(async move { - while let Ok(changes) = head_changes_rx.recv().await { - for ts in changes.applies { - // Convert the tipset to an Ethereum block with full transaction info - // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block - match EthBlock::from_filecoin_tipset(&data.state_manager, ts, TxInfo::Full).await { - Ok(block) => { - if let Err(e) = sender.send(ApiHeaders(block)) { - tracing::error!("Failed to send headers: {}", e); - return; - } - } - Err(e) => { - tracing::error!("Failed to convert tipset to eth block: {}", e); - } - } - } - } - }); - - (receiver, handle) -} - -/// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs. -/// -/// # Notes -/// -/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`, -/// allowing manual cleanup if needed. -pub(crate) fn logs( - ctx: &Ctx, - filter: Option, -) -> (Subscriber>, JoinHandle<()>) { - let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - - let mut head_changes_rx = ctx.chain_store().subscribe_head_changes(); - - let ctx = ctx.clone(); - - let handle = tokio::spawn(async move { - while let Ok(changes) = head_changes_rx.recv().await { - for ts in changes.applies { - match eth_logs_with_filter(&ctx, &ts, filter.clone()).await { - Ok(logs) => { - if !logs.is_empty() - && let Err(e) = sender.send(logs) - { - tracing::error!("Failed to send logs for tipset {}: {}", ts.key(), e); - break; - } - } - Err(e) => { - tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e); - } - } - } - } - }); - - (receiver, handle) -} - -/// Subscribe to mpool changes and broadcast the new pending transaction (only newly Added transaction is broadcasted) -pub(crate) fn new_pending_txns(ctx: &Ctx) -> (Subscriber, JoinHandle<()>) { - let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - let mut stream = subscription_stream(ctx.mpool.subscribe_to_updates()); - let eth_chain_id = ctx.chain_config().eth_chain_id; - - let handle = tokio::spawn(async move { - while let Some(update) = stream.next().await { - let MpoolUpdate::Add(msg) = update else { - continue; - }; - - let Ok(hash) = eth_tx_hash_from_signed_message(&msg, eth_chain_id) else { - continue; - }; - - if let Err(e) = sender.send(hash) { - tracing::error!("Failed to send pending txn hash: {}", e); - return; - } - } - }); - - (receiver, handle) -} - pub enum ChainGetFinalizedTipset {} impl RpcMethod<0> for ChainGetFinalizedTipset { const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet"; diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 3f43aabcc90b..275da79789c1 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -3056,7 +3056,7 @@ fn eth_log_from_event(entries: &[EventEntry]) -> Option<(EthBytes, Vec) Some((data, topics)) } -pub fn eth_tx_hash_from_signed_message( +pub(crate) fn eth_tx_hash_from_signed_message( message: &SignedMessage, eth_chain_id: EthChainIdType, ) -> anyhow::Result { diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 8dc0c14c04f5..bbdb76e92d3e 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -59,14 +59,18 @@ //! ``` //! -use crate::rpc::eth::pubsub_trait::{ - EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams, +use crate::message_pool::MpoolUpdate; +use crate::rpc::RPCState; +use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; +use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec}; +use crate::rpc::eth::{ + Block as EthBlock, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message, }; -use crate::rpc::{RPCState, chain}; -use jsonrpsee::PendingSubscriptionSink; +use crate::utils::broadcast::subscription_stream; +use futures::{Stream, StreamExt as _}; use jsonrpsee::core::SubscriptionResult; +use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink}; use std::sync::Arc; -use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; #[derive(derive_more::Constructor)] pub struct EthPubSub { @@ -85,13 +89,11 @@ impl EthPubSubApiServer for EthPubSub { let ctx = self.ctx.clone(); match kind { - SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await, - SubscriptionKind::PendingTransactions => { - self.handle_pending_txn_subscription(sink, ctx).await - } + SubscriptionKind::NewHeads => spawn_new_heads(sink, ctx), + SubscriptionKind::PendingTransactions => spawn_pending_transactions(sink, ctx), SubscriptionKind::Logs => { - let filter = params.and_then(|p| p.filter); - self.handle_logs_subscription(sink, ctx, filter).await + let filter = params.and_then(|p| p.filter).map(EthFilterSpec::from); + spawn_logs(sink, ctx, filter); } } @@ -99,81 +101,93 @@ impl EthPubSubApiServer for EthPubSub { } } -impl EthPubSub { - async fn handle_new_heads_subscription( - &self, - accepted_sink: jsonrpsee::SubscriptionSink, - ctx: Arc, - ) { - let (subscriber, handle) = chain::new_heads(ctx); - tokio::spawn(async move { - handle_subscription(subscriber, accepted_sink, handle).await; - }); - } +fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { + let head_rx = ctx.chain_store().subscribe_head_changes(); + let stream = subscription_stream(head_rx) + .flat_map(|changes| futures::stream::iter(changes.applies)) + .filter_map(move |ts| { + let ctx = ctx.clone(); + async move { + match EthBlock::from_filecoin_tipset(&ctx.state_manager, ts, TxInfo::Full).await { + Ok(block) => Some(ApiHeaders(block)), + Err(e) => { + tracing::error!("Failed to convert tipset to eth block: {e:#}"); + None + } + } + } + }) + .boxed(); + tokio::spawn(pipe_stream_to_sink(stream, sink)); +} - async fn handle_logs_subscription( - &self, - accepted_sink: jsonrpsee::SubscriptionSink, - ctx: Arc, - filter_spec: Option, - ) { - let filter_spec = filter_spec.map(Into::into); - let (logs, handle) = chain::logs(&ctx, filter_spec); - tokio::spawn(async move { - handle_subscription(logs, accepted_sink, handle).await; - }); - } +fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option) { + let head_rx = ctx.chain_store().subscribe_head_changes(); + let stream = subscription_stream(head_rx) + .flat_map(|changes| futures::stream::iter(changes.applies)) + .filter_map(move |ts| { + let ctx = ctx.clone(); + let filter = filter.clone(); + async move { + match eth_logs_with_filter(&ctx, &ts, filter).await { + Ok(logs) if !logs.is_empty() => Some(logs), + Ok(_) => None, + Err(e) => { + tracing::error!("Failed to fetch logs for tipset {}: {e:#}", ts.key()); + None + } + } + } + }) + .boxed(); + tokio::spawn(pipe_stream_to_sink(stream, sink)); +} - async fn handle_pending_txn_subscription( - &self, - accepted_sink: jsonrpsee::SubscriptionSink, - ctx: Arc, - ) { - let (pending_rx, handle) = chain::new_pending_txns(&ctx); - tokio::spawn(async move { - handle_subscription(pending_rx, accepted_sink, handle).await; - }); - } +fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc) { + let mpool_rx = ctx.mpool.subscribe_to_updates(); + let eth_chain_id = ctx.chain_config().eth_chain_id; + let stream = subscription_stream(mpool_rx) + .filter_map(move |update| async move { + let MpoolUpdate::Add(msg) = update else { + return None; + }; + eth_tx_hash_from_signed_message(&msg, eth_chain_id).ok() + }) + .boxed(); + tokio::spawn(pipe_stream_to_sink(stream, sink)); } -async fn handle_subscription( - mut subscriber: Subscriber, - sink: jsonrpsee::SubscriptionSink, - handle: tokio::task::JoinHandle<()>, -) where - T: serde::Serialize + Clone, +/// Forward stream items to the subscription sink until the sink is closed, +/// the client disconnects, or the upstream stream ends. The stream is +/// expected to absorb upstream backpressure (e.g. `Lagged`) on its own; this +/// helper only cares about the sink side. +async fn pipe_stream_to_sink(mut stream: S, sink: SubscriptionSink) +where + S: Stream + Unpin + Send, + T: serde::Serialize + Send, { loop { tokio::select! { - action = subscriber.recv() => { - match action { - Ok(v) => { - match jsonrpsee::SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &v) { - Ok(msg) => { - if let Err(e) = sink.send(msg).await { - tracing::error!("Failed to send message: {:?}", e); - break; - } - } - Err(e) => { - tracing::error!("Failed to serialize message: {:?}", e); - break; - } - } - } - Err(RecvError::Closed) => { + _ = sink.closed() => break, + maybe = stream.next() => { + let Some(item) = maybe else { break }; + let msg = match jsonrpsee::SubscriptionMessage::new( + sink.method_name(), + sink.subscription_id(), + &item, + ) { + Ok(m) => m, + Err(e) => { + tracing::error!("Failed to serialize subscription message: {e:?}"); break; } - Err(RecvError::Lagged(_)) => { - } + }; + if let Err(e) = sink.send(msg).await { + tracing::debug!("Subscription sink send failed (client disconnected): {e:?}"); + break; } } - _ = sink.closed() => { - break; - } } } - handle.abort(); - - tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id()); + tracing::debug!("Subscription task ended (id: {:?})", sink.subscription_id()); } diff --git a/src/utils/broadcast/mod.rs b/src/utils/broadcast/mod.rs index 5fb66ab148cf..56b83fcd66a7 100644 --- a/src/utils/broadcast/mod.rs +++ b/src/utils/broadcast/mod.rs @@ -8,6 +8,7 @@ use futures::{FutureExt as _, Stream, StreamExt as _}; use std::pin::Pin; use tokio::sync::broadcast::Receiver; use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; /// Returns `true` if there are any active subscribers to the given broadcast channel. pub fn has_subscribers(tx: &tokio::sync::broadcast::Sender) -> bool { @@ -15,13 +16,17 @@ pub fn has_subscribers(tx: &tokio::sync::broadcast::Sender) -> bool { } /// Wraps a broadcast [`Receiver`] as a pinned [`Stream`] that skips `Lagged` -/// events and terminates on `Closed`. -/// -/// Use this in place of a manual `rx.recv()` loop so the lagged/closed handling -/// stays DRY while each caller retains ownership of its own state across -/// iterations (no per-event `Arc::clone`s). +/// events (logging the skip count at warn level) and terminates on `Closed`. pub fn subscription_stream( rx: Receiver, ) -> Pin + Send>> { - Box::pin(BroadcastStream::new(rx).filter_map(|r| async move { r.ok() })) + Box::pin(BroadcastStream::new(rx).filter_map(|r| async move { + match r { + Ok(v) => Some(v), + Err(BroadcastStreamRecvError::Lagged(n)) => { + tracing::warn!("broadcast subscription lagged: dropped {n} events"); + None + } + } + })) } diff --git a/src/utils/broadcast/tests.rs b/src/utils/broadcast/tests.rs index 27e9af6f65f7..2f96aea03e49 100644 --- a/src/utils/broadcast/tests.rs +++ b/src/utils/broadcast/tests.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::*; +use futures::StreamExt as _; #[tokio::test] async fn test_has_subscribers() { @@ -23,3 +24,32 @@ async fn test_has_subscribers() { drop(rx3); assert!(!has_subscribers(&tx)); } + +#[tokio::test] +async fn subscription_stream_terminates_when_sender_dropped() { + let (tx, rx) = tokio::sync::broadcast::channel::(8); + let stream = subscription_stream(rx); + + tx.send(42).unwrap(); + drop(tx); + + let collected: Vec = stream.collect().await; + assert_eq!(collected, vec![42]); +} + +#[tokio::test] +async fn subscription_stream_skips_lagged_events_and_keeps_going() { + // capacity=2 ring buffer; sending 5 values forces the slow receiver to lag. + let (tx, rx) = tokio::sync::broadcast::channel::(2); + let stream = subscription_stream(rx); + + for i in 0..5u32 { + tx.send(i).unwrap(); + } + drop(tx); + + // After Lagged, tokio's BroadcastStream catches up to the buffered window. + // With capacity=2 the last two sends survive: 3 and 4. + let collected: Vec = stream.collect().await; + assert_eq!(collected, vec![3, 4]); +} From 0d4eca0331795ba6df851c1f90e5b287c9857984 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 21 May 2026 19:12:37 +0530 Subject: [PATCH 3/5] add tests --- src/rpc/methods/eth/pubsub.rs | 8 +- .../subcommands/api_cmd/stateful_tests.rs | 246 ++++++++++++++++++ 2 files changed, 250 insertions(+), 4 deletions(-) diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index bbdb76e92d3e..d50210e384e9 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -60,6 +60,7 @@ //! use crate::message_pool::MpoolUpdate; +use crate::prelude::ShallowClone; use crate::rpc::RPCState; use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams}; use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec}; @@ -86,8 +87,7 @@ impl EthPubSubApiServer for EthPubSub { params: Option, ) -> SubscriptionResult { let sink = pending.accept().await?; - let ctx = self.ctx.clone(); - + let ctx = self.ctx.shallow_clone(); match kind { SubscriptionKind::NewHeads => spawn_new_heads(sink, ctx), SubscriptionKind::PendingTransactions => spawn_pending_transactions(sink, ctx), @@ -106,7 +106,7 @@ fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { let stream = subscription_stream(head_rx) .flat_map(|changes| futures::stream::iter(changes.applies)) .filter_map(move |ts| { - let ctx = ctx.clone(); + let ctx = ctx.shallow_clone(); async move { match EthBlock::from_filecoin_tipset(&ctx.state_manager, ts, TxInfo::Full).await { Ok(block) => Some(ApiHeaders(block)), @@ -126,7 +126,7 @@ fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option anyhow:: Ok(cid) } +/// A client-side WebSocket stream to the node's JSON-RPC endpoint. +type EthSubStream = + tokio_tungstenite::WebSocketStream>; + +/// Open a WebSocket and start an `eth_subscribe` subscription of the given +/// `kind` (`"newHeads"`, `"pendingTransactions"` or `"logs"`), with optional +/// `filter` params (used by `logs`). Returns the live stream and the assigned +/// subscription id. +async fn open_eth_subscription( + client: &rpc::Client, + kind: &str, + filter: Option, +) -> anyhow::Result<(EthSubStream, serde_json::Value)> { + let mut url = client.base_url().clone(); + url.set_scheme("ws") + .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; + url.set_path("/rpc/v1"); + + // `eth_subscribe` requires only `Read` permission, so no auth token is + // needed (same as `Filecoin.ChainNotify`). + let (mut ws_stream, _) = connect_async(url.as_str()).await?; + + let mut params = vec![json!(kind)]; + params.extend(filter); + let request = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_subscribe", + "params": params, + }); + ws_stream + .send(WsMessage::Text(request.to_string().into())) + .await + .context("failed to send eth_subscribe request")?; + + // The acknowledgement carries the subscription id in `result`. + let subscription_id = loop { + let msg = match tokio::time::timeout(Duration::from_secs(30), ws_stream.next()).await { + Ok(Some(msg)) => msg, + Ok(None) => anyhow::bail!("WebSocket stream closed before eth_subscribe ack"), + Err(_) => anyhow::bail!("timeout waiting for eth_subscribe ack"), + }; + match msg { + Ok(WsMessage::Text(text)) => { + let json: serde_json::Value = serde_json::from_str(&text)?; + if let Some(error) = json.get("error") { + anyhow::bail!("eth_subscribe failed: {error}"); + } + if let Some(result) = json.get("result") { + break result.clone(); + } + } + Err(..) | Ok(WsMessage::Close(..)) => { + anyhow::bail!("WebSocket closed before eth_subscribe ack") + } + _ => {} + } + }; + + Ok((ws_stream, subscription_id)) +} + +/// Wait for the next `eth_subscription` notification on `subscription_id` and +/// return its `result` payload. +async fn next_subscription_payload( + ws_stream: &mut EthSubStream, + subscription_id: &serde_json::Value, + timeout: Duration, +) -> anyhow::Result { + loop { + let msg = match tokio::time::timeout(timeout, ws_stream.next()).await { + Ok(Some(msg)) => msg, + Ok(None) => anyhow::bail!("WebSocket stream closed"), + Err(_) => anyhow::bail!("timeout waiting for subscription notification"), + }; + match msg { + Ok(WsMessage::Text(text)) => { + let json: serde_json::Value = serde_json::from_str(&text)?; + if json.get("method").and_then(|m| m.as_str()) != Some("eth_subscription") { + continue; + } + let params = json + .get("params") + .context("subscription notification missing params")?; + anyhow::ensure!( + params.get("subscription") == Some(subscription_id), + "subscription id mismatch in notification" + ); + return params + .get("result") + .cloned() + .context("subscription notification missing result"); + } + Err(..) | Ok(WsMessage::Close(..)) => anyhow::bail!("WebSocket closed unexpectedly"), + _ => {} + } + } +} + +/// Cancel the subscription and close the WebSocket. Best-effort: the node also +/// drops the subscription when the socket closes. +async fn close_eth_subscription( + ws_stream: &mut EthSubStream, + subscription_id: &serde_json::Value, +) -> anyhow::Result<()> { + let request = json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "eth_unsubscribe", + "params": [subscription_id], + }); + ws_stream + .send(WsMessage::Text(request.to_string().into())) + .await + .context("failed to send eth_unsubscribe request")?; + ws_stream.close(None).await?; + Ok(()) +} + +fn eth_subscribe_new_heads() -> RpcTestScenario { + RpcTestScenario::basic(|client| async move { + let (mut ws_stream, subscription_id) = + open_eth_subscription(&client, "newHeads", None).await?; + + // A new head is published once per tipset (~30s on calibnet). + let payload = + next_subscription_payload(&mut ws_stream, &subscription_id, Duration::from_secs(180)) + .await; + + let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; + + serde_json::from_value::(payload?) + .context("newHeads payload is not a valid Eth block header")?; + Ok(()) + }) +} + +fn eth_subscribe_pending_transactions(tx: TestTransaction) -> RpcTestScenario { + RpcTestScenario::basic(move |client| { + let tx = tx.clone(); + async move { + let (mut ws_stream, subscription_id) = + open_eth_subscription(&client, "pendingTransactions", None).await?; + + // The subscription is active, so the pending tx we push now is observable. + let cid = invoke_contract(&client, &tx).await?; + let tx_hash = client + .call(EthGetTransactionHashByCid::request((cid,))?) + .await? + .context("no Eth transaction hash for CID")?; + + // Watch the pending-tx hashes until our transaction shows up. + let outcome: anyhow::Result<()> = async { + loop { + let payload = next_subscription_payload( + &mut ws_stream, + &subscription_id, + Duration::from_secs(120), + ) + .await?; + let hash: EthHash = serde_json::from_value(payload) + .context("pendingTransactions payload is not an Eth hash")?; + if hash == tx_hash { + break; + } + } + Ok(()) + } + .await; + + let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; + outcome + } + }) +} + +fn eth_subscribe_logs(tx: TestTransaction) -> RpcTestScenario { + RpcTestScenario::basic(move |client| { + let tx = tx.clone(); + async move { + // Filter on the test contract's event topic (topic position 0) and + // any address — mirrors the `eth_getFilterLogs` test. + let want_topic = tx.topic.to_string(); + let filter = json!({ + "address": [], + "topics": [want_topic.clone()], + }); + let (mut ws_stream, subscription_id) = + open_eth_subscription(&client, "logs", Some(filter)).await?; + + // The subscription is active; emit the event on-chain. + invoke_contract(&client, &tx).await?; + + // Logs are delivered when the tipset holding the event is applied, + // which can take a few epochs (~30s each) on calibnet. + let outcome: anyhow::Result<()> = async { + loop { + let payload = next_subscription_payload( + &mut ws_stream, + &subscription_id, + Duration::from_secs(300), + ) + .await?; + // Validate the shape, then confirm our event topic is present. + let logs: Vec = serde_json::from_value(payload.clone()) + .context("logs payload is not a list of Eth logs")?; + anyhow::ensure!(!logs.is_empty(), "received an empty logs notification"); + let matched = payload + .as_array() + .into_iter() + .flatten() + .filter_map(|log| log.get("topics").and_then(|t| t.as_array())) + .flatten() + .any(|t| t.as_str() == Some(want_topic.as_str())); + if matched { + break; + } + } + Ok(()) + } + .await; + + let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; + outcome + } + }) +} + fn create_eth_new_filter_test() -> RpcTestScenario { RpcTestScenario::basic(|client| async move { const BLOCK_RANGE: u64 = 200; @@ -672,5 +901,22 @@ pub(super) async fn create_tests(tx: TestTransaction) -> Vec { EthGetFilterLogs, EthUninstallFilter ), + with_methods!( + eth_subscribe_new_heads().name("eth_subscribe newHeads works"), + EthSubscribe, + EthUnsubscribe + ), + with_methods!( + eth_subscribe_pending_transactions(tx.clone()) + .name("eth_subscribe pendingTransactions works"), + EthSubscribe, + EthUnsubscribe, + EthGetTransactionHashByCid + ), + with_methods!( + eth_subscribe_logs(tx.clone()).name("eth_subscribe logs works"), + EthSubscribe, + EthUnsubscribe + ), ] } From 709de17870834b5b221db73f6c8d6715025e7bef Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 22 May 2026 15:45:52 +0530 Subject: [PATCH 4/5] refactor stateful tests and subscription methods --- CHANGELOG.md | 2 + src/message_pool/msgpool/pending_store.rs | 1 - src/rpc/methods/eth/pubsub.rs | 48 ++++--- .../subcommands/api_cmd/stateful_tests.rs | 129 +++++++++++------- 4 files changed, 108 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0b49e7e9787..69fbbc5a1328 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ ### Added +- [#6941](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` RPC method now supports the `pendingTransactions` subscription. + - [#6012](https://github.com/ChainSafe/forest/issues/6012): Stricter validation of address arguments in `forest-wallet` subcommands. ### Changed diff --git a/src/message_pool/msgpool/pending_store.rs b/src/message_pool/msgpool/pending_store.rs index 76c70202cd60..0c49f18357a2 100644 --- a/src/message_pool/msgpool/pending_store.rs +++ b/src/message_pool/msgpool/pending_store.rs @@ -121,7 +121,6 @@ impl PendingStore { /// Subscribe to the [`MpoolUpdate`] stream. Returned receiver is /// independent; dropping it does not affect other subscribers. - #[allow(dead_code)] // consumed by MessagePool::subscribe_to_updates / external subscribers. pub fn subscribe(&self) -> broadcast::Receiver { self.inner.events.subscribe() } diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index d50210e384e9..bee30892c225 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -59,6 +59,7 @@ //! ``` //! +use crate::blocks::Tipset; use crate::message_pool::MpoolUpdate; use crate::prelude::ShallowClone; use crate::rpc::RPCState; @@ -101,20 +102,25 @@ impl EthPubSubApiServer for EthPubSub { } } -fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { - let head_rx = ctx.chain_store().subscribe_head_changes(); - let stream = subscription_stream(head_rx) +/// Stream of tipsets as they are applied to the chain head. Reverts are +/// ignored; lagged events are dropped (and logged) by [`subscription_stream`]. +fn head_applied_tipsets(ctx: &Arc) -> impl Stream + Send + use<> { + subscription_stream(ctx.chain_store().subscribe_head_changes()) .flat_map(|changes| futures::stream::iter(changes.applies)) +} + +fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { + let stream = head_applied_tipsets(&ctx) .filter_map(move |ts| { - let ctx = ctx.shallow_clone(); + let state_mngr = ctx.state_manager.shallow_clone(); async move { - match EthBlock::from_filecoin_tipset(&ctx.state_manager, ts, TxInfo::Full).await { - Ok(block) => Some(ApiHeaders(block)), - Err(e) => { - tracing::error!("Failed to convert tipset to eth block: {e:#}"); - None - } - } + EthBlock::from_filecoin_tipset(&state_mngr, ts, TxInfo::Full) + .await + .inspect_err(|e| { + tracing::error!("Failed to convert tipset to eth block: {e:#}") + }) + .ok() + .map(ApiHeaders) } }) .boxed(); @@ -122,21 +128,19 @@ fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc) { } fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option) { - let head_rx = ctx.chain_store().subscribe_head_changes(); - let stream = subscription_stream(head_rx) - .flat_map(|changes| futures::stream::iter(changes.applies)) + let stream = head_applied_tipsets(&ctx) .filter_map(move |ts| { let ctx = ctx.shallow_clone(); let filter = filter.clone(); async move { - match eth_logs_with_filter(&ctx, &ts, filter).await { - Ok(logs) if !logs.is_empty() => Some(logs), - Ok(_) => None, - Err(e) => { - tracing::error!("Failed to fetch logs for tipset {}: {e:#}", ts.key()); - None - } - } + eth_logs_with_filter(&ctx, &ts, filter) + .await + .inspect_err(|e| { + tracing::error!("Failed to fetch logs for tipset {}: {e:#}", ts.key()) + }) + .ok() + // Skip tipsets with no matching logs — nothing to notify. + .filter(|logs| !logs.is_empty()) } }) .boxed(); diff --git a/src/tool/subcommands/api_cmd/stateful_tests.rs b/src/tool/subcommands/api_cmd/stateful_tests.rs index 64f7ef95bfd9..28ca445c6f95 100644 --- a/src/tool/subcommands/api_cmd/stateful_tests.rs +++ b/src/tool/subcommands/api_cmd/stateful_tests.rs @@ -3,8 +3,8 @@ use crate::eth::EVMMethod; use crate::message::SignedMessage; use crate::networks::calibnet::ETH_CHAIN_ID; -use crate::rpc::eth::EthLog; use crate::rpc::eth::EthUint64; +use crate::rpc::eth::pubsub_trait::SubscriptionKind; use crate::rpc::eth::types::*; use crate::rpc::types::ApiTipsetKey; use crate::rpc::{self, RpcMethod, prelude::*}; @@ -173,12 +173,25 @@ pub(super) async fn run_tests( Ok(()) } +/// A client-side WebSocket stream to the node's JSON-RPC endpoint. +type EthSubStream = + tokio_tungstenite::WebSocketStream>; + +/// Open a WebSocket to the node's JSON-RPC endpoint (`/rpc/v1`). +/// Returns the live stream. +async fn connect_ws(client: &rpc::Client) -> anyhow::Result { + let mut url = client.base_url().clone(); + url.set_scheme("ws") + .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; + url.set_path("/rpc/v1"); + let (ws_stream, _) = connect_async(url.as_str()).await?; + Ok(ws_stream) +} + #[allow(unreachable_code)] async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { async fn close_channel( - stream: &mut tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream, - >, + stream: &mut EthSubStream, id: &serde_json::Value, ) -> anyhow::Result<()> { let request = json!({ @@ -196,13 +209,7 @@ async fn next_tipset(client: &rpc::Client) -> anyhow::Result<()> { Ok(()) } - let mut url = client.base_url().clone(); - url.set_scheme("ws") - .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; - url.set_path("/rpc/v1"); - - // Note: The token is not required for the ChainNotify method. - let (mut ws_stream, _) = connect_async(url.as_str()).await?; + let mut ws_stream = connect_ws(client).await?; let request = json!({ "jsonrpc": "2.0", @@ -360,29 +367,17 @@ async fn invoke_contract(client: &rpc::Client, tx: &TestTransaction) -> anyhow:: Ok(cid) } -/// A client-side WebSocket stream to the node's JSON-RPC endpoint. -type EthSubStream = - tokio_tungstenite::WebSocketStream>; - /// Open a WebSocket and start an `eth_subscribe` subscription of the given -/// `kind` (`"newHeads"`, `"pendingTransactions"` or `"logs"`), with optional -/// `filter` params (used by `logs`). Returns the live stream and the assigned -/// subscription id. +/// `kind`, with optional `filter` params (used by `logs`). Returns the live +/// stream and the assigned subscription id. async fn open_eth_subscription( client: &rpc::Client, - kind: &str, + kind: SubscriptionKind, filter: Option, ) -> anyhow::Result<(EthSubStream, serde_json::Value)> { - let mut url = client.base_url().clone(); - url.set_scheme("ws") - .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; - url.set_path("/rpc/v1"); - - // `eth_subscribe` requires only `Read` permission, so no auth token is - // needed (same as `Filecoin.ChainNotify`). - let (mut ws_stream, _) = connect_async(url.as_str()).await?; + let mut ws_stream = connect_ws(client).await?; - let mut params = vec![json!(kind)]; + let mut params = vec![serde_json::to_value(kind)?]; params.extend(filter); let request = json!({ "jsonrpc": "2.0", @@ -481,8 +476,12 @@ async fn close_eth_subscription( fn eth_subscribe_new_heads() -> RpcTestScenario { RpcTestScenario::basic(|client| async move { + // Remember the chain head before subscribing so we can prove the head we + // receive is real, advancing — not stale or random data. + let start = client.call(EthBlockNumber::request(())?).await?; + let (mut ws_stream, subscription_id) = - open_eth_subscription(&client, "newHeads", None).await?; + open_eth_subscription(&client, SubscriptionKind::NewHeads, None).await?; // A new head is published once per tipset (~30s on calibnet). let payload = @@ -490,9 +489,24 @@ fn eth_subscribe_new_heads() -> RpcTestScenario { .await; let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; + let payload = payload?; - serde_json::from_value::(payload?) + // `newHeads` must yield a block-header object + anyhow::ensure!( + payload.is_object(), + "newHeads must yield a block-header object, got: {payload}" + ); + let header: ApiHeaders = serde_json::from_value(payload) .context("newHeads payload is not a valid Eth block header")?; + + // Identity: the header's number must be at or beyond the head seen at + // subscription time, proving it's a genuine fresh head. + anyhow::ensure!( + header.0.number.0 >= start.0 as i64, + "newHeads number {} precedes the head {} seen at subscription time", + header.0.number.0, + start.0 + ); Ok(()) }) } @@ -502,7 +516,7 @@ fn eth_subscribe_pending_transactions(tx: TestTransaction) -> RpcTestScenario { let tx = tx.clone(); async move { let (mut ws_stream, subscription_id) = - open_eth_subscription(&client, "pendingTransactions", None).await?; + open_eth_subscription(&client, SubscriptionKind::PendingTransactions, None).await?; // The subscription is active, so the pending tx we push now is observable. let cid = invoke_contract(&client, &tx).await?; @@ -511,7 +525,7 @@ fn eth_subscribe_pending_transactions(tx: TestTransaction) -> RpcTestScenario { .await? .context("no Eth transaction hash for CID")?; - // Watch the pending-tx hashes until our transaction shows up. + // Watch pending-tx hashes until *our* transaction shows up. let outcome: anyhow::Result<()> = async { loop { let payload = next_subscription_payload( @@ -520,8 +534,14 @@ fn eth_subscribe_pending_transactions(tx: TestTransaction) -> RpcTestScenario { Duration::from_secs(120), ) .await?; + // a pending tx is a single hash string. + anyhow::ensure!( + payload.is_string(), + "pendingTransactions must yield a tx-hash string, got: {payload}" + ); let hash: EthHash = serde_json::from_value(payload) .context("pendingTransactions payload is not an Eth hash")?; + // Identity: it must be the exact tx we just submitted. if hash == tx_hash { break; } @@ -536,22 +556,32 @@ fn eth_subscribe_pending_transactions(tx: TestTransaction) -> RpcTestScenario { }) } +/// Minimal typed view of an `EthLog` for the fields the log test asserts on. +#[derive(serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct LogView { + topics: Vec, + transaction_hash: EthHash, +} + fn eth_subscribe_logs(tx: TestTransaction) -> RpcTestScenario { RpcTestScenario::basic(move |client| { let tx = tx.clone(); async move { - // Filter on the test contract's event topic (topic position 0) and - // any address — mirrors the `eth_getFilterLogs` test. - let want_topic = tx.topic.to_string(); let filter = json!({ "address": [], - "topics": [want_topic.clone()], + "topics": [tx.topic.to_string()], }); let (mut ws_stream, subscription_id) = - open_eth_subscription(&client, "logs", Some(filter)).await?; + open_eth_subscription(&client, SubscriptionKind::Logs, Some(filter)).await?; - // The subscription is active; emit the event on-chain. - invoke_contract(&client, &tx).await?; + // Emit the event on-chain and remember the exact tx that produced it, + // so we can confirm the log we receive is `ours` + let cid = invoke_contract(&client, &tx).await?; + let tx_hash = client + .call(EthGetTransactionHashByCid::request((cid,))?) + .await? + .context("no Eth transaction hash for CID")?; // Logs are delivered when the tipset holding the event is applied, // which can take a few epochs (~30s each) on calibnet. @@ -563,17 +593,17 @@ fn eth_subscribe_logs(tx: TestTransaction) -> RpcTestScenario { Duration::from_secs(300), ) .await?; - // Validate the shape, then confirm our event topic is present. - let logs: Vec = serde_json::from_value(payload.clone()) + anyhow::ensure!( + payload.is_array(), + "logs must yield a JSON array, got: {payload}" + ); + let logs: Vec = serde_json::from_value(payload) .context("logs payload is not a list of Eth logs")?; anyhow::ensure!(!logs.is_empty(), "received an empty logs notification"); - let matched = payload - .as_array() - .into_iter() - .flatten() - .filter_map(|log| log.get("topics").and_then(|t| t.as_array())) - .flatten() - .any(|t| t.as_str() == Some(want_topic.as_str())); + // Identity: a log carrying our event topic and `our` tx hash. + let matched = logs.iter().any(|log| { + log.transaction_hash == tx_hash && log.topics.contains(&tx.topic) + }); if matched { break; } @@ -916,7 +946,8 @@ pub(super) async fn create_tests(tx: TestTransaction) -> Vec { with_methods!( eth_subscribe_logs(tx.clone()).name("eth_subscribe logs works"), EthSubscribe, - EthUnsubscribe + EthUnsubscribe, + EthGetTransactionHashByCid ), ] } From 05713594acc343a8a9c9ed52bebc33d278746f7d Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 22 May 2026 20:36:26 +0530 Subject: [PATCH 5/5] address ai comments --- CHANGELOG.md | 2 + src/rpc/methods/eth/pubsub.rs | 3 +- .../subcommands/api_cmd/stateful_tests.rs | 61 ++++++++++++------- 3 files changed, 41 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69fbbc5a1328..7ee49ab2bae5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,8 @@ ### Fixed +- [#6941](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` `logs` subscription now emits one log object per notification instead of one array of logs per tipset. + ## Forest v0.33.4 "Stray" Mandatory release for mainnet node operators. It includes support for the _NV28 FireHorse_ network upgrade on mainnet, which is set to activate at epoch `6052800` (2026-05-27T14:00:00Z). It also includes a few improvements and fixes for the JSON-RPC server. diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index bee30892c225..f0ae9e113895 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -139,10 +139,9 @@ fn spawn_logs(sink: SubscriptionSink, ctx: Arc, filter: Option anyhow::Result { let mut url = client.base_url().clone(); - url.set_scheme("ws") + let ws_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + scheme => anyhow::bail!("unsupported RPC URL scheme: {scheme}"), + }; + url.set_scheme(ws_scheme) .map_err(|_| anyhow::anyhow!("failed to set scheme"))?; url.set_path("/rpc/v1"); let (ws_stream, _) = connect_async(url.as_str()).await?; @@ -525,8 +530,8 @@ fn eth_subscribe_pending_transactions(tx: TestTransaction) -> RpcTestScenario { .await? .context("no Eth transaction hash for CID")?; - // Watch pending-tx hashes until *our* transaction shows up. - let outcome: anyhow::Result<()> = async { + // Watch pending-tx hashes until `our` transaction shows up. + let watch = async { loop { let payload = next_subscription_payload( &mut ws_stream, @@ -541,14 +546,20 @@ fn eth_subscribe_pending_transactions(tx: TestTransaction) -> RpcTestScenario { ); let hash: EthHash = serde_json::from_value(payload) .context("pendingTransactions payload is not an Eth hash")?; - // Identity: it must be the exact tx we just submitted. - if hash == tx_hash { + // received hash must be `our` transaction hash. + if hash.eq(&tx_hash) { break; } } - Ok(()) - } - .await; + anyhow::Ok(()) + }; + let outcome = tokio::time::timeout(Duration::from_secs(120), watch) + .await + .unwrap_or_else(|_| { + Err(anyhow::anyhow!( + "timed out waiting for our pendingTransactions notification" + )) + }); let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; outcome @@ -584,8 +595,8 @@ fn eth_subscribe_logs(tx: TestTransaction) -> RpcTestScenario { .context("no Eth transaction hash for CID")?; // Logs are delivered when the tipset holding the event is applied, - // which can take a few epochs (~30s each) on calibnet. - let outcome: anyhow::Result<()> = async { + // which can take a few epochs (~30s each) on calibnet + let watch = async { loop { let payload = next_subscription_payload( &mut ws_stream, @@ -593,24 +604,28 @@ fn eth_subscribe_logs(tx: TestTransaction) -> RpcTestScenario { Duration::from_secs(300), ) .await?; + // A logs notification is a single log object (one per log, + // matching geth/reth/Lotus) — not an array. anyhow::ensure!( - payload.is_array(), - "logs must yield a JSON array, got: {payload}" + payload.is_object(), + "logs must yield a single log object, got: {payload}" ); - let logs: Vec = serde_json::from_value(payload) - .context("logs payload is not a list of Eth logs")?; - anyhow::ensure!(!logs.is_empty(), "received an empty logs notification"); - // Identity: a log carrying our event topic and `our` tx hash. - let matched = logs.iter().any(|log| { - log.transaction_hash == tx_hash && log.topics.contains(&tx.topic) - }); - if matched { + let log: LogView = serde_json::from_value(payload) + .context("logs payload is not an Eth log")?; + // Identity: the log must carry our event topic and `our` tx hash. + if log.transaction_hash == tx_hash && log.topics.contains(&tx.topic) { break; } } - Ok(()) - } - .await; + anyhow::Ok(()) + }; + let outcome = tokio::time::timeout(Duration::from_secs(300), watch) + .await + .unwrap_or_else(|_| { + Err(anyhow::anyhow!( + "timed out waiting for our logs notification" + )) + }); let _ = close_eth_subscription(&mut ws_stream, &subscription_id).await; outcome