Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

### Added

- [#6941](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` RPC method now supports the `pendingTransactions` subscription.

- [#6012](https://github.com/ChainSafe/forest/issues/6012): Stricter validation of address arguments in `forest-wallet` subcommands.

### Changed
Expand All @@ -41,6 +43,8 @@

### Fixed

- [#6941](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` `logs` subscription now emits one log object per notification instead of one array of logs per tipset.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use the linked issue number instead of PR number in this changelog entry.

Line 46 should reference issue #6031 (the tracked objective) rather than PR #6941, to match the project’s changelog convention.

Suggested edit
-- [`#6941`](https://github.com/ChainSafe/forest/pull/6941): The `eth_subscribe` `logs` subscription now emits one log object per notification instead of one array of logs per tipset.
+- [`#6031`](https://github.com/ChainSafe/forest/issues/6031): The `eth_subscribe` `logs` subscription now emits one log object per notification instead of one array of logs per tipset.

Based on learnings: “In CHANGELOG.md entries, when both an issue and a PR exist for a change, reference the issue number… Use PR numbers only if there is no corresponding issue.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@CHANGELOG.md` at line 46, Replace the PR reference
"[`#6941`](https://github.com/ChainSafe/forest/pull/6941)" in the changelog entry
that reads "The `eth_subscribe` `logs` subscription now emits one log object per
notification instead of one array of logs per tipset." with the issue reference
"`#6031`" (e.g. "[`#6031`](https://github.com/ChainSafe/forest/issues/6031)") so the
entry references the tracked issue rather than the PR, keeping the rest of the
sentence unchanged.


## Forest v0.33.4 "Stray"

Mandatory release for mainnet node operators. It includes support for the _NV28 FireHorse_ network upgrade on mainnet, which is set to activate at epoch `6052800` (2026-05-27T14:00:00Z). It also includes a few improvements and fixes for the JSON-RPC server.
Expand Down
2 changes: 1 addition & 1 deletion src/message_pool/msgpool/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::message::SignedMessage;
pub(in crate::message_pool) const MPOOL_UPDATE_CHANNEL_CAPACITY: usize = 256;

/// A change to the pending pool.
#[allow(dead_code)] // TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941
#[derive(Clone, Debug)]
pub enum MpoolUpdate {
Add(SignedMessage),
#[allow(dead_code)]
Remove(SignedMessage),
}
1 change: 0 additions & 1 deletion src/message_pool/msgpool/msg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ where

/// Subscribe to [`MpoolUpdate`] events for every insertion into and
/// removal from the pending pool.
#[allow(dead_code)] // surfaces the MpoolUpdate API for external subscribers.
pub fn subscribe_to_updates(&self) -> broadcast::Receiver<MpoolUpdate> {
self.pending.subscribe()
}
Expand Down
1 change: 0 additions & 1 deletion src/message_pool/msgpool/pending_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ impl PendingStore {

/// Subscribe to the [`MpoolUpdate`] stream. Returned receiver is
/// independent; dropping it does not affect other subscribers.
#[allow(dead_code)] // consumed by MessagePool::subscribe_to_updates / external subscribers.
pub fn subscribe(&self) -> broadcast::Receiver<MpoolUpdate> {
self.inner.events.subscribe()
}
Expand Down
78 changes: 0 additions & 78 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self};
use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
use crate::message::{ChainMessage, SignedMessage};
use crate::prelude::*;
use crate::rpc::eth::{
Block as EthBlock, EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders,
types::EthFilterSpec,
};
use crate::rpc::f3::F3ExportLatestSnapshot;
use crate::rpc::types::*;
use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError};
Expand Down Expand Up @@ -49,7 +45,6 @@ use tokio::sync::{
Mutex,
broadcast::{self, Receiver as Subscriber},
};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

const HEAD_CHANNEL_CAPACITY: usize = 10;
Expand All @@ -72,79 +67,6 @@ pub const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200;
static CHAIN_EXPORT_LOCK: LazyLock<Mutex<Option<CancellationToken>>> =
LazyLock::new(|| Mutex::new(None));

/// Subscribes to head changes from the chain store and broadcasts new blocks.
///
/// # Notes
///
/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`,
/// allowing manual cleanup if needed.
pub(crate) fn new_heads(data: Ctx) -> (Subscriber<ApiHeaders>, JoinHandle<()>) {
let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);

let mut head_changes_rx = data.chain_store().subscribe_head_changes();

let handle = tokio::spawn(async move {
while let Ok(changes) = head_changes_rx.recv().await {
for ts in changes.applies {
// Convert the tipset to an Ethereum block with full transaction info
// Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block
match EthBlock::from_filecoin_tipset(&data.state_manager, ts, TxInfo::Full).await {
Ok(block) => {
if let Err(e) = sender.send(ApiHeaders(block)) {
tracing::error!("Failed to send headers: {}", e);
return;
}
}
Err(e) => {
tracing::error!("Failed to convert tipset to eth block: {}", e);
}
}
}
}
});

(receiver, handle)
}

/// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs.
///
/// # Notes
///
/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`,
/// allowing manual cleanup if needed.
pub(crate) fn logs(
ctx: &Ctx,
filter: Option<EthFilterSpec>,
) -> (Subscriber<Vec<EthLog>>, JoinHandle<()>) {
let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);

let mut head_changes_rx = ctx.chain_store().subscribe_head_changes();

let ctx = ctx.clone();

let handle = tokio::spawn(async move {
while let Ok(changes) = head_changes_rx.recv().await {
for ts in changes.applies {
match eth_logs_with_filter(&ctx, &ts, filter.clone()).await {
Ok(logs) => {
if !logs.is_empty()
&& let Err(e) = sender.send(logs)
{
tracing::error!("Failed to send logs for tipset {}: {}", ts.key(), e);
break;
}
}
Err(e) => {
tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e);
}
}
}
}
});

(receiver, handle)
}

pub enum ChainGetFinalizedTipset {}
impl RpcMethod<0> for ChainGetFinalizedTipset {
const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet";
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3056,7 +3056,7 @@ fn eth_log_from_event(entries: &[EventEntry]) -> Option<(EthBytes, Vec<EthHash>)
Some((data, topics))
}

fn eth_tx_hash_from_signed_message(
pub(crate) fn eth_tx_hash_from_signed_message(
message: &SignedMessage,
eth_chain_id: EthChainIdType,
) -> anyhow::Result<EthHash> {
Expand Down
170 changes: 96 additions & 74 deletions src/rpc/methods/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,20 @@
//! ```
//!

use crate::rpc::eth::pubsub_trait::{
EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams,
use crate::blocks::Tipset;
use crate::message_pool::MpoolUpdate;
use crate::prelude::ShallowClone;
use crate::rpc::RPCState;
use crate::rpc::eth::pubsub_trait::{EthPubSubApiServer, SubscriptionKind, SubscriptionParams};
use crate::rpc::eth::types::{ApiHeaders, EthFilterSpec};
use crate::rpc::eth::{
Block as EthBlock, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message,
};
use crate::rpc::{RPCState, chain};
use jsonrpsee::PendingSubscriptionSink;
use jsonrpsee::core::{SubscriptionError, SubscriptionResult};
use crate::utils::broadcast::subscription_stream;
use futures::{Stream, StreamExt as _};
use jsonrpsee::core::SubscriptionResult;
use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink};
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError};

#[derive(derive_more::Constructor)]
pub struct EthPubSub {
Expand All @@ -82,93 +88,109 @@ impl EthPubSubApiServer for EthPubSub {
params: Option<SubscriptionParams>,
) -> SubscriptionResult {
let sink = pending.accept().await?;
let ctx = self.ctx.clone();

let ctx = self.ctx.shallow_clone();
match kind {
SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await,
SubscriptionKind::PendingTransactions => {
return Err(SubscriptionError::from(
jsonrpsee::types::ErrorObjectOwned::owned(
jsonrpsee::types::error::METHOD_NOT_FOUND_CODE,
"pendingTransactions subscription not yet implemented",
None::<()>,
),
));
}
SubscriptionKind::NewHeads => spawn_new_heads(sink, ctx),
SubscriptionKind::PendingTransactions => spawn_pending_transactions(sink, ctx),
SubscriptionKind::Logs => {
let filter = params.and_then(|p| p.filter);
self.handle_logs_subscription(sink, ctx, filter).await
let filter = params.and_then(|p| p.filter).map(EthFilterSpec::from);
spawn_logs(sink, ctx, filter);
}
}

Ok(())
}
}

impl EthPubSub {
async fn handle_new_heads_subscription(
&self,
accepted_sink: jsonrpsee::SubscriptionSink,
ctx: Arc<RPCState>,
) {
let (subscriber, handle) = chain::new_heads(ctx);
tokio::spawn(async move {
handle_subscription(subscriber, accepted_sink, handle).await;
});
}
/// Stream of tipsets as they are applied to the chain head. Reverts are
/// ignored; lagged events are dropped (and logged) by [`subscription_stream`].
fn head_applied_tipsets(ctx: &Arc<RPCState>) -> impl Stream<Item = Tipset> + Send + use<> {
subscription_stream(ctx.chain_store().subscribe_head_changes())
.flat_map(|changes| futures::stream::iter(changes.applies))
}

async fn handle_logs_subscription(
&self,
accepted_sink: jsonrpsee::SubscriptionSink,
ctx: Arc<RPCState>,
filter_spec: Option<LogFilter>,
) {
let filter_spec = filter_spec.map(Into::into);
let (logs, handle) = chain::logs(&ctx, filter_spec);
tokio::spawn(async move {
handle_subscription(logs, accepted_sink, handle).await;
});
}
fn spawn_new_heads(sink: SubscriptionSink, ctx: Arc<RPCState>) {
let stream = head_applied_tipsets(&ctx)
.filter_map(move |ts| {
let state_mngr = ctx.state_manager.shallow_clone();
async move {
EthBlock::from_filecoin_tipset(&state_mngr, ts, TxInfo::Full)
.await
.inspect_err(|e| {
tracing::error!("Failed to convert tipset to eth block: {e:#}")
})
.ok()
.map(ApiHeaders)
}
})
.boxed();
tokio::spawn(pipe_stream_to_sink(stream, sink));
}

async fn handle_subscription<T>(
mut subscriber: Subscriber<T>,
sink: jsonrpsee::SubscriptionSink,
handle: tokio::task::JoinHandle<()>,
) where
T: serde::Serialize + Clone,
fn spawn_logs(sink: SubscriptionSink, ctx: Arc<RPCState>, filter: Option<EthFilterSpec>) {
let stream = head_applied_tipsets(&ctx)
.filter_map(move |ts| {
let ctx = ctx.shallow_clone();
let filter = filter.clone();
async move {
eth_logs_with_filter(&ctx, &ts, filter)
.await
.inspect_err(|e| {
tracing::error!("Failed to fetch logs for tipset {}: {e:#}", ts.key())
})
.ok()
}
})
.flat_map(futures::stream::iter)
.boxed();
tokio::spawn(pipe_stream_to_sink(stream, sink));
}

fn spawn_pending_transactions(sink: SubscriptionSink, ctx: Arc<RPCState>) {
let mpool_rx = ctx.mpool.subscribe_to_updates();
let eth_chain_id = ctx.chain_config().eth_chain_id;
let stream = subscription_stream(mpool_rx)
.filter_map(move |update| async move {
let MpoolUpdate::Add(msg) = update else {
return None;
};
eth_tx_hash_from_signed_message(&msg, eth_chain_id).ok()
})
.boxed();
tokio::spawn(pipe_stream_to_sink(stream, sink));
}

/// Forward stream items to the subscription sink until the sink is closed,
/// the client disconnects, or the upstream stream ends. The stream is
/// expected to absorb upstream backpressure (e.g. `Lagged`) on its own; this
/// helper only cares about the sink side.
async fn pipe_stream_to_sink<S, T>(mut stream: S, sink: SubscriptionSink)
where
S: Stream<Item = T> + Unpin + Send,
T: serde::Serialize + Send,
{
loop {
tokio::select! {
action = subscriber.recv() => {
match action {
Ok(v) => {
match jsonrpsee::SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &v) {
Ok(msg) => {
if let Err(e) = sink.send(msg).await {
tracing::error!("Failed to send message: {:?}", e);
break;
}
}
Err(e) => {
tracing::error!("Failed to serialize message: {:?}", e);
break;
}
}
}
Err(RecvError::Closed) => {
_ = sink.closed() => break,
maybe = stream.next() => {
let Some(item) = maybe else { break };
let msg = match jsonrpsee::SubscriptionMessage::new(
sink.method_name(),
sink.subscription_id(),
&item,
) {
Ok(m) => m,
Err(e) => {
tracing::error!("Failed to serialize subscription message: {e:?}");
break;
}
Err(RecvError::Lagged(_)) => {
}
};
if let Err(e) = sink.send(msg).await {
tracing::debug!("Subscription sink send failed (client disconnected): {e:?}");
break;
}
}
_ = sink.closed() => {
break;
}
}
}
handle.abort();

tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id());
tracing::debug!("Subscription task ended (id: {:?})", sink.subscription_id());
}
Loading
Loading