From df7758be183951fa8664f57731f65bc7cce88f25 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 00:30:18 +0800 Subject: [PATCH 1/5] fix: process batch RPC request in parallel --- src/rpc/mod.rs | 24 ++++--- src/rpc/parallel_batch_layer.rs | 118 ++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 10 deletions(-) create mode 100644 src/rpc/parallel_batch_layer.rs diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f5af90855161..cf4f9f4ed2a4 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -12,6 +12,7 @@ mod filter_list; pub mod json_validator; mod log_layer; mod metrics_layer; +mod parallel_batch_layer; mod request; mod segregation_layer; mod set_extension_layer; @@ -28,6 +29,7 @@ pub use filter_list::FilterList; use futures::FutureExt as _; use jsonrpsee::server::ServerConfig; use log_layer::LogLayer; +use parallel_batch_layer::ParallelBatchLayer; use reflect::Ctx; pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt}; pub use request::Request; @@ -573,18 +575,18 @@ pub async fn start_rpc( let methods: Arc> = Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect()); + let server_config = ServerConfig::builder() + .max_request_body_size(MAX_REQUEST_BODY_SIZE) + // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` + .max_response_body_size(*MAX_RESPONSE_BODY_SIZE) + .max_connections(default_max_connections()) + .set_id_provider(RandomHexStringIdProvider::new()) + .build(); + let max_response_body_size = *MAX_RESPONSE_BODY_SIZE as usize; let per_conn = PerConnection { stop_handle: stop_handle.clone(), svc_builder: Server::builder() - .set_config( - ServerConfig::builder() - .max_request_body_size(MAX_REQUEST_BODY_SIZE) - // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` - .max_response_body_size(*MAX_RESPONSE_BODY_SIZE) - .max_connections(default_max_connections()) - .set_id_provider(RandomHexStringIdProvider::new()) - .build(), - ) + .set_config(server_config.clone()) .set_http_middleware( tower::ServiceBuilder::new() .option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new)) @@ -647,7 +649,9 @@ pub async fn start_rpc( keystore: keystore.clone(), }) .layer(LogLayer::default()) - .layer(MetricsLayer::default()); + .layer(MetricsLayer::default()) + // `ParallelBatchLayer` has to be the last layer + .layer(ParallelBatchLayer::new(max_response_body_size)); let mut jsonrpsee_svc = svc_builder .set_rpc_middleware(rpc_middleware) .build(methods, stop_handle); diff --git a/src/rpc/parallel_batch_layer.rs b/src/rpc/parallel_batch_layer.rs new file mode 100644 index 000000000000..329f97a9a00b --- /dev/null +++ b/src/rpc/parallel_batch_layer.rs @@ -0,0 +1,118 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::prelude::*; +use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; +use jsonrpsee::{ + MethodResponse, + core::middleware::{Batch, BatchEntry, Notification}, + server::{BatchResponseBuilder, middleware::rpc::RpcServiceT}, +}; +use tower::Layer; + +/// Parallelize batch RPC requests that are processed in sequence by default +/// See +/// +/// Note that such parallelization is allowed as per the [`JSON-RPC` specification](https://www.jsonrpc.org/specification#:~:text=6%20Batch) +#[derive(Clone, derive_more::Constructor)] +pub(super) struct ParallelBatchLayer { + max_response_body_size: usize, +} + +impl Layer for ParallelBatchLayer { + type Service = ParallelBatchService; + + fn layer(&self, service: S) -> Self::Service { + ParallelBatchService { + service: service.into(), + max_response_body_size: self.max_response_body_size, + } + } +} + +#[derive(Clone)] +pub(super) struct ParallelBatchService { + service: Arc, + max_response_body_size: usize, +} + +impl RpcServiceT for ParallelBatchService +where + S: RpcServiceT< + MethodResponse = MethodResponse, + NotificationResponse = MethodResponse, + BatchResponse = MethodResponse, + > + Send + + Sync + + 'static, +{ + type MethodResponse = S::MethodResponse; + type NotificationResponse = S::NotificationResponse; + type BatchResponse = S::BatchResponse; + + fn call<'a>( + &self, + req: jsonrpsee::types::Request<'a>, + ) -> impl Future + Send + 'a { + self.service.call(req) + } + + // Parallelized version of https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L151 + fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { + // Process batch in parallel instead of delegating to the inner service, which processes them sequentially. + let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size); + let mut got_notification = false; + // Although it's not neccesary to perserve the order in response, we do it to avoid potential bugs on client side + // See + let mut tasks = FuturesOrdered::new(); + for batch_entry in batch.into_iter() { + match batch_entry { + Ok(BatchEntry::Call(req)) => { + tasks.push_back(self.service.call(req).map(Some).boxed()); + } + Ok(BatchEntry::Notification(n)) => { + got_notification = true; + let service = self.service.shallow_clone(); + tasks.push_back( + async move { + service.notification(n).await; + None + } + .boxed(), + ); + } + Err(err) => { + let (err, id) = err.into_parts(); + let rp = MethodResponse::error(id, err); + tasks.push_back(async move { Some(rp) }.boxed()); + } + } + } + + async move { + while let Some(r) = tasks.next().await { + if let Some(rp) = r + && let Err(err) = batch_rp.append(rp) + { + return err; + } + } + + // If the batch is empty and we got a notification, we return an empty response. + if batch_rp.is_empty() && got_notification { + MethodResponse::notification() + } + // An empty batch is regarded as an invalid request here. + else { + MethodResponse::from_batch(batch_rp.finish()) + } + } + } + + fn notification<'a>( + &self, + n: Notification<'a>, + ) -> impl Future + Send + 'a { + self.service.notification(n) + } +} From bf427dfaf554f5eae177dc6d28008a2dedbc797b Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 01:09:48 +0800 Subject: [PATCH 2/5] FOREST_RPC_BATCH_MAX_CONCURRENCY --- docs/docs/users/reference/env_variables.md | 1 + src/rpc/parallel_batch_layer.rs | 29 ++++++++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/docs/docs/users/reference/env_variables.md b/docs/docs/users/reference/env_variables.md index fa0fba485157..a7e59646fe26 100644 --- a/docs/docs/users/reference/env_variables.md +++ b/docs/docs/users/reference/env_variables.md @@ -69,6 +69,7 @@ process. | `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS` | positive integer | 32 | 32 | Maximum number of inbound chain exchange requests Forest will service concurrently. Excess requests are rejected with a `GoAway` response | | `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER` | positive integer | 4 | 4 | Per-peer cap on concurrent inbound chain exchange requests. Excess requests from a single peer are rejected with a `GoAway` response | | `FOREST_MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES` | positive integer (bytes) | 10485760 (10 MiB) | 10485760 | Cap on the encoded byte size of a chain exchange response Forest serves to peers. Building stops as soon as the running encoded size would exceed this cap and the response is returned with `PartialResponse` status | +| `FOREST_RPC_BATCH_MAX_CONCURRENCY` | positive integer | 8 | 8 | max number of entries in an RPC batch request that can be processed in parallel | ### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT` diff --git a/src/rpc/parallel_batch_layer.rs b/src/rpc/parallel_batch_layer.rs index 329f97a9a00b..69ee292ab901 100644 --- a/src/rpc/parallel_batch_layer.rs +++ b/src/rpc/parallel_batch_layer.rs @@ -1,6 +1,8 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use std::{num::NonZeroUsize, sync::LazyLock}; + use crate::prelude::*; use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; use jsonrpsee::{ @@ -8,6 +10,8 @@ use jsonrpsee::{ core::middleware::{Batch, BatchEntry, Notification}, server::{BatchResponseBuilder, middleware::rpc::RpcServiceT}, }; +use nonzero_ext::nonzero; +use tokio::sync::Semaphore; use tower::Layer; /// Parallelize batch RPC requests that are processed in sequence by default @@ -60,21 +64,42 @@ where // Parallelized version of https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L151 fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { // Process batch in parallel instead of delegating to the inner service, which processes them sequentially. + const MAX_CONCURRENCY_ENV: &str = "FOREST_RPC_BATCH_MAX_CONCURRENCY"; + static MAX_CONCURRENCY: LazyLock = LazyLock::new(|| { + std::env::var(MAX_CONCURRENCY_ENV) + .ok() + .and_then(|i| i.parse().ok()) + .inspect(|i| { + tracing::info!( + "Max RPC batch concurrency is set to {i} by {MAX_CONCURRENCY_ENV}" + ) + }) + .unwrap_or(nonzero!(8usize)) + }); + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENCY.get())); let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size); let mut got_notification = false; // Although it's not neccesary to perserve the order in response, we do it to avoid potential bugs on client side // See let mut tasks = FuturesOrdered::new(); for batch_entry in batch.into_iter() { + let service = self.service.shallow_clone(); + let semaphore = semaphore.shallow_clone(); match batch_entry { Ok(BatchEntry::Call(req)) => { - tasks.push_back(self.service.call(req).map(Some).boxed()); + tasks.push_back( + async move { + let _permit = semaphore.acquire().await; + service.call(req).map(Some).await + } + .boxed(), + ); } Ok(BatchEntry::Notification(n)) => { got_notification = true; - let service = self.service.shallow_clone(); tasks.push_back( async move { + let _permit = semaphore.acquire().await; service.notification(n).await; None } From 5628ce04385d42c0b48363c8faf1d89e9d8005a9 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 01:11:23 +0800 Subject: [PATCH 3/5] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0b49e7e9787..35d28e00c6b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,8 @@ - [`#7066`](https://github.com/ChainSafe/forest/pull/7066): Disable JSON-RPC HTTP response compression by default. Set `FOREST_RPC_COMPRESS_MIN_BODY_SIZE` to a non-negative value (e.g. `1024`) to re-enable gzip compression of responses above that size. +- [`#7093`](https://github.com/ChainSafe/forest/pull/7093): Parallelize RPC batch request processing. + ### Removed ### Fixed From c86a3309e6d146862caa601c9f7246e8c3ae4d6e Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 01:57:22 +0800 Subject: [PATCH 4/5] cover batch in unit test --- src/rpc/client.rs | 4 ++-- src/rpc/mod.rs | 35 +++++++++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 08e8ac8fb1e0..287fd4fd16bf 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -176,7 +176,7 @@ fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) { /// Represents a single, perhaps persistent connection to a URL over which requests /// can be made using [`jsonrpsee`] primitives. -struct UrlClient { +pub struct UrlClient { url: Url, inner: UrlClientInner, } @@ -190,7 +190,7 @@ impl Debug for UrlClient { } impl UrlClient { - async fn new(url: Url, token: impl Into>) -> Result { + pub async fn new(url: Url, token: impl Into>) -> Result { const ONE_DAY: Duration = Duration::from_secs(24 * 3600); // we handle timeouts ourselves. let headers = match token.into() { Some(token) => HeaderMap::from_iter([( diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index cf4f9f4ed2a4..0105683f98e5 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -805,10 +805,18 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR mod tests { use super::*; use crate::{ - db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion, + db::MemoryDB, + networks::NetworkChain, + rpc::{client::UrlClient, common::ShiftingVersion}, tool::offline_server::server::offline_rpc_state, }; - use jsonrpsee::server::stop_channel; + use jsonrpsee::{ + core::{ + client::{BatchResponse, ClientT}, + params::BatchRequestBuilder, + }, + server::stop_channel, + }; use std::net::{Ipv4Addr, SocketAddr}; use tokio::task::JoinSet; @@ -917,6 +925,29 @@ mod tests { drop(client); + // Sending a batch request + let client = UrlClient::new( + format!("http://{}:{}/rpc/v1", rpc_address.ip(), rpc_address.port()) + .parse() + .unwrap(), + None, + ) + .await + .unwrap(); + let mut batch_request_builder = BatchRequestBuilder::new(); + let empty_payload: [(); 0] = []; + batch_request_builder + .insert("Filecoin.Version", empty_payload) + .unwrap(); + batch_request_builder + .insert("eth_chainId", empty_payload) + .unwrap(); + let batch_response: BatchResponse = + client.batch_request(batch_request_builder).await.unwrap(); + assert_eq!(batch_response.len(), 2); + assert_eq!(batch_response.num_successful_calls(), 2); + assert_eq!(batch_response.num_failed_calls(), 0); + // Gracefully shutdown the RPC server println!("sending shutdown signal"); shutdown_send.send(()).await.unwrap(); From d274cea8130fc5028f23d2bd5ec91596ec0c3aa7 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 22 May 2026 20:56:44 +0800 Subject: [PATCH 5/5] no FOREST_RPC_BATCH_MAX_CONCURRENCY --- CHANGELOG.md | 2 -- docs/docs/users/reference/env_variables.md | 1 - src/rpc/parallel_batch_layer.rs | 41 +++------------------- 3 files changed, 4 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35d28e00c6b5..e0b49e7e9787 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,8 +37,6 @@ - [`#7066`](https://github.com/ChainSafe/forest/pull/7066): Disable JSON-RPC HTTP response compression by default. Set `FOREST_RPC_COMPRESS_MIN_BODY_SIZE` to a non-negative value (e.g. `1024`) to re-enable gzip compression of responses above that size. -- [`#7093`](https://github.com/ChainSafe/forest/pull/7093): Parallelize RPC batch request processing. - ### Removed ### Fixed diff --git a/docs/docs/users/reference/env_variables.md b/docs/docs/users/reference/env_variables.md index a7e59646fe26..fa0fba485157 100644 --- a/docs/docs/users/reference/env_variables.md +++ b/docs/docs/users/reference/env_variables.md @@ -69,7 +69,6 @@ process. | `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS` | positive integer | 32 | 32 | Maximum number of inbound chain exchange requests Forest will service concurrently. Excess requests are rejected with a `GoAway` response | | `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER` | positive integer | 4 | 4 | Per-peer cap on concurrent inbound chain exchange requests. Excess requests from a single peer are rejected with a `GoAway` response | | `FOREST_MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES` | positive integer (bytes) | 10485760 (10 MiB) | 10485760 | Cap on the encoded byte size of a chain exchange response Forest serves to peers. Building stops as soon as the running encoded size would exceed this cap and the response is returned with `PartialResponse` status | -| `FOREST_RPC_BATCH_MAX_CONCURRENCY` | positive integer | 8 | 8 | max number of entries in an RPC batch request that can be processed in parallel | ### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT` diff --git a/src/rpc/parallel_batch_layer.rs b/src/rpc/parallel_batch_layer.rs index 69ee292ab901..1c55217c0322 100644 --- a/src/rpc/parallel_batch_layer.rs +++ b/src/rpc/parallel_batch_layer.rs @@ -1,17 +1,12 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::{num::NonZeroUsize, sync::LazyLock}; - -use crate::prelude::*; use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; use jsonrpsee::{ MethodResponse, core::middleware::{Batch, BatchEntry, Notification}, server::{BatchResponseBuilder, middleware::rpc::RpcServiceT}, }; -use nonzero_ext::nonzero; -use tokio::sync::Semaphore; use tower::Layer; /// Parallelize batch RPC requests that are processed in sequence by default @@ -28,7 +23,7 @@ impl Layer for ParallelBatchLayer { fn layer(&self, service: S) -> Self::Service { ParallelBatchService { - service: service.into(), + service, max_response_body_size: self.max_response_body_size, } } @@ -36,7 +31,7 @@ impl Layer for ParallelBatchLayer { #[derive(Clone)] pub(super) struct ParallelBatchService { - service: Arc, + service: S, max_response_body_size: usize, } @@ -64,47 +59,19 @@ where // Parallelized version of https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L151 fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { // Process batch in parallel instead of delegating to the inner service, which processes them sequentially. - const MAX_CONCURRENCY_ENV: &str = "FOREST_RPC_BATCH_MAX_CONCURRENCY"; - static MAX_CONCURRENCY: LazyLock = LazyLock::new(|| { - std::env::var(MAX_CONCURRENCY_ENV) - .ok() - .and_then(|i| i.parse().ok()) - .inspect(|i| { - tracing::info!( - "Max RPC batch concurrency is set to {i} by {MAX_CONCURRENCY_ENV}" - ) - }) - .unwrap_or(nonzero!(8usize)) - }); - let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENCY.get())); let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size); let mut got_notification = false; // Although it's not neccesary to perserve the order in response, we do it to avoid potential bugs on client side // See let mut tasks = FuturesOrdered::new(); for batch_entry in batch.into_iter() { - let service = self.service.shallow_clone(); - let semaphore = semaphore.shallow_clone(); match batch_entry { Ok(BatchEntry::Call(req)) => { - tasks.push_back( - async move { - let _permit = semaphore.acquire().await; - service.call(req).map(Some).await - } - .boxed(), - ); + tasks.push_back(self.service.call(req).map(Some).boxed()); } Ok(BatchEntry::Notification(n)) => { got_notification = true; - tasks.push_back( - async move { - let _permit = semaphore.acquire().await; - service.notification(n).await; - None - } - .boxed(), - ); + tasks.push_back(self.service.notification(n).map(|_| None).boxed()); } Err(err) => { let (err, id) = err.into_parts();