diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 08e8ac8fb1e..287fd4fd16b 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -176,7 +176,7 @@ fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) { /// Represents a single, perhaps persistent connection to a URL over which requests /// can be made using [`jsonrpsee`] primitives. -struct UrlClient { +pub struct UrlClient { url: Url, inner: UrlClientInner, } @@ -190,7 +190,7 @@ impl Debug for UrlClient { } impl UrlClient { - async fn new(url: Url, token: impl Into>) -> Result { + pub async fn new(url: Url, token: impl Into>) -> Result { const ONE_DAY: Duration = Duration::from_secs(24 * 3600); // we handle timeouts ourselves. let headers = match token.into() { Some(token) => HeaderMap::from_iter([( diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f5af9085516..0105683f98e 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -12,6 +12,7 @@ mod filter_list; pub mod json_validator; mod log_layer; mod metrics_layer; +mod parallel_batch_layer; mod request; mod segregation_layer; mod set_extension_layer; @@ -28,6 +29,7 @@ pub use filter_list::FilterList; use futures::FutureExt as _; use jsonrpsee::server::ServerConfig; use log_layer::LogLayer; +use parallel_batch_layer::ParallelBatchLayer; use reflect::Ctx; pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt}; pub use request::Request; @@ -573,18 +575,18 @@ pub async fn start_rpc( let methods: Arc> = Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect()); + let server_config = ServerConfig::builder() + .max_request_body_size(MAX_REQUEST_BODY_SIZE) + // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` + .max_response_body_size(*MAX_RESPONSE_BODY_SIZE) + .max_connections(default_max_connections()) + .set_id_provider(RandomHexStringIdProvider::new()) + .build(); + let max_response_body_size = *MAX_RESPONSE_BODY_SIZE as usize; let per_conn = PerConnection { stop_handle: stop_handle.clone(), svc_builder: Server::builder() - .set_config( - ServerConfig::builder() - .max_request_body_size(MAX_REQUEST_BODY_SIZE) - // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` - .max_response_body_size(*MAX_RESPONSE_BODY_SIZE) - .max_connections(default_max_connections()) - .set_id_provider(RandomHexStringIdProvider::new()) - .build(), - ) + .set_config(server_config.clone()) .set_http_middleware( tower::ServiceBuilder::new() .option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new)) @@ -647,7 +649,9 @@ pub async fn start_rpc( keystore: keystore.clone(), }) .layer(LogLayer::default()) - .layer(MetricsLayer::default()); + .layer(MetricsLayer::default()) + // `ParallelBatchLayer` has to be the last layer + .layer(ParallelBatchLayer::new(max_response_body_size)); let mut jsonrpsee_svc = svc_builder .set_rpc_middleware(rpc_middleware) .build(methods, stop_handle); @@ -801,10 +805,18 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR mod tests { use super::*; use crate::{ - db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion, + db::MemoryDB, + networks::NetworkChain, + rpc::{client::UrlClient, common::ShiftingVersion}, tool::offline_server::server::offline_rpc_state, }; - use jsonrpsee::server::stop_channel; + use jsonrpsee::{ + core::{ + client::{BatchResponse, ClientT}, + params::BatchRequestBuilder, + }, + server::stop_channel, + }; use std::net::{Ipv4Addr, SocketAddr}; use tokio::task::JoinSet; @@ -913,6 +925,29 @@ mod tests { drop(client); + // Sending a batch request + let client = UrlClient::new( + format!("http://{}:{}/rpc/v1", rpc_address.ip(), rpc_address.port()) + .parse() + .unwrap(), + None, + ) + .await + .unwrap(); + let mut batch_request_builder = BatchRequestBuilder::new(); + let empty_payload: [(); 0] = []; + batch_request_builder + .insert("Filecoin.Version", empty_payload) + .unwrap(); + batch_request_builder + .insert("eth_chainId", empty_payload) + .unwrap(); + let batch_response: BatchResponse = + client.batch_request(batch_request_builder).await.unwrap(); + assert_eq!(batch_response.len(), 2); + assert_eq!(batch_response.num_successful_calls(), 2); + assert_eq!(batch_response.num_failed_calls(), 0); + // Gracefully shutdown the RPC server println!("sending shutdown signal"); shutdown_send.send(()).await.unwrap(); diff --git a/src/rpc/parallel_batch_layer.rs b/src/rpc/parallel_batch_layer.rs new file mode 100644 index 00000000000..1c55217c032 --- /dev/null +++ b/src/rpc/parallel_batch_layer.rs @@ -0,0 +1,110 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; +use jsonrpsee::{ + MethodResponse, + core::middleware::{Batch, BatchEntry, Notification}, + server::{BatchResponseBuilder, middleware::rpc::RpcServiceT}, +}; +use tower::Layer; + +/// Parallelize batch RPC requests that are processed in sequence by default +/// See +/// +/// Note that such parallelization is allowed as per the [`JSON-RPC` specification](https://www.jsonrpc.org/specification#:~:text=6%20Batch) +#[derive(Clone, derive_more::Constructor)] +pub(super) struct ParallelBatchLayer { + max_response_body_size: usize, +} + +impl Layer for ParallelBatchLayer { + type Service = ParallelBatchService; + + fn layer(&self, service: S) -> Self::Service { + ParallelBatchService { + service, + max_response_body_size: self.max_response_body_size, + } + } +} + +#[derive(Clone)] +pub(super) struct ParallelBatchService { + service: S, + max_response_body_size: usize, +} + +impl RpcServiceT for ParallelBatchService +where + S: RpcServiceT< + MethodResponse = MethodResponse, + NotificationResponse = MethodResponse, + BatchResponse = MethodResponse, + > + Send + + Sync + + 'static, +{ + type MethodResponse = S::MethodResponse; + type NotificationResponse = S::NotificationResponse; + type BatchResponse = S::BatchResponse; + + fn call<'a>( + &self, + req: jsonrpsee::types::Request<'a>, + ) -> impl Future + Send + 'a { + self.service.call(req) + } + + // Parallelized version of https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L151 + fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { + // Process batch in parallel instead of delegating to the inner service, which processes them sequentially. + let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size); + let mut got_notification = false; + // Although it's not neccesary to perserve the order in response, we do it to avoid potential bugs on client side + // See + let mut tasks = FuturesOrdered::new(); + for batch_entry in batch.into_iter() { + match batch_entry { + Ok(BatchEntry::Call(req)) => { + tasks.push_back(self.service.call(req).map(Some).boxed()); + } + Ok(BatchEntry::Notification(n)) => { + got_notification = true; + tasks.push_back(self.service.notification(n).map(|_| None).boxed()); + } + Err(err) => { + let (err, id) = err.into_parts(); + let rp = MethodResponse::error(id, err); + tasks.push_back(async move { Some(rp) }.boxed()); + } + } + } + + async move { + while let Some(r) = tasks.next().await { + if let Some(rp) = r + && let Err(err) = batch_rp.append(rp) + { + return err; + } + } + + // If the batch is empty and we got a notification, we return an empty response. + if batch_rp.is_empty() && got_notification { + MethodResponse::notification() + } + // An empty batch is regarded as an invalid request here. + else { + MethodResponse::from_batch(batch_rp.finish()) + } + } + } + + fn notification<'a>( + &self, + n: Notification<'a>, + ) -> impl Future + Send + 'a { + self.service.notification(n) + } +}