Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ fn trace_params(params: impl jsonrpsee::core::traits::ToRpcParams) {

/// Represents a single, perhaps persistent connection to a URL over which requests
/// can be made using [`jsonrpsee`] primitives.
struct UrlClient {
pub struct UrlClient {
url: Url,
inner: UrlClientInner,
}
Expand All @@ -190,7 +190,7 @@ impl Debug for UrlClient {
}

impl UrlClient {
async fn new(url: Url, token: impl Into<Option<String>>) -> Result<Self, ClientError> {
pub async fn new(url: Url, token: impl Into<Option<String>>) -> Result<Self, ClientError> {
const ONE_DAY: Duration = Duration::from_secs(24 * 3600); // we handle timeouts ourselves.
let headers = match token.into() {
Some(token) => HeaderMap::from_iter([(
Expand Down
59 changes: 47 additions & 12 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod filter_list;
pub mod json_validator;
mod log_layer;
mod metrics_layer;
mod parallel_batch_layer;
mod request;
mod segregation_layer;
mod set_extension_layer;
Expand All @@ -28,6 +29,7 @@ pub use filter_list::FilterList;
use futures::FutureExt as _;
use jsonrpsee::server::ServerConfig;
use log_layer::LogLayer;
use parallel_batch_layer::ParallelBatchLayer;
use reflect::Ctx;
pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt};
pub use request::Request;
Expand Down Expand Up @@ -573,18 +575,18 @@ pub async fn start_rpc(
let methods: Arc<HashMap<ApiPaths, Methods>> =
Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());

let server_config = ServerConfig::builder()
.max_request_body_size(MAX_REQUEST_BODY_SIZE)
// Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
.max_response_body_size(*MAX_RESPONSE_BODY_SIZE)
.max_connections(default_max_connections())
.set_id_provider(RandomHexStringIdProvider::new())
.build();
let max_response_body_size = *MAX_RESPONSE_BODY_SIZE as usize;
let per_conn = PerConnection {
stop_handle: stop_handle.clone(),
svc_builder: Server::builder()
.set_config(
ServerConfig::builder()
.max_request_body_size(MAX_REQUEST_BODY_SIZE)
// Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
.max_response_body_size(*MAX_RESPONSE_BODY_SIZE)
.max_connections(default_max_connections())
.set_id_provider(RandomHexStringIdProvider::new())
.build(),
)
.set_config(server_config.clone())
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(COMPRESS_MIN_BODY_SIZE.map(CompressionLayer::new))
Expand Down Expand Up @@ -647,7 +649,9 @@ pub async fn start_rpc(
keystore: keystore.clone(),
})
.layer(LogLayer::default())
.layer(MetricsLayer::default());
.layer(MetricsLayer::default())
// `ParallelBatchLayer` has to be the last layer
.layer(ParallelBatchLayer::new(max_response_body_size));
let mut jsonrpsee_svc = svc_builder
.set_rpc_middleware(rpc_middleware)
.build(methods, stop_handle);
Expand Down Expand Up @@ -801,10 +805,18 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR
mod tests {
use super::*;
use crate::{
db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
db::MemoryDB,
networks::NetworkChain,
rpc::{client::UrlClient, common::ShiftingVersion},
tool::offline_server::server::offline_rpc_state,
};
use jsonrpsee::server::stop_channel;
use jsonrpsee::{
core::{
client::{BatchResponse, ClientT},
params::BatchRequestBuilder,
},
server::stop_channel,
};
use std::net::{Ipv4Addr, SocketAddr};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -913,6 +925,29 @@ mod tests {

drop(client);

// Sending a batch request
let client = UrlClient::new(
format!("http://{}:{}/rpc/v1", rpc_address.ip(), rpc_address.port())
.parse()
.unwrap(),
None,
)
.await
.unwrap();
let mut batch_request_builder = BatchRequestBuilder::new();
let empty_payload: [(); 0] = [];
batch_request_builder
.insert("Filecoin.Version", empty_payload)
.unwrap();
batch_request_builder
.insert("eth_chainId", empty_payload)
.unwrap();
let batch_response: BatchResponse<serde_json::Value> =
client.batch_request(batch_request_builder).await.unwrap();
assert_eq!(batch_response.len(), 2);
assert_eq!(batch_response.num_successful_calls(), 2);
assert_eq!(batch_response.num_failed_calls(), 0);

// Gracefully shutdown the RPC server
println!("sending shutdown signal");
shutdown_send.send(()).await.unwrap();
Expand Down
110 changes: 110 additions & 0 deletions src/rpc/parallel_batch_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
use jsonrpsee::{
MethodResponse,
core::middleware::{Batch, BatchEntry, Notification},
server::{BatchResponseBuilder, middleware::rpc::RpcServiceT},
};
use tower::Layer;

/// Parallelize batch RPC requests that are processed in sequence by default
/// See <https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L157>
///
/// Note that such parallelization is allowed as per the [`JSON-RPC` specification](https://www.jsonrpc.org/specification#:~:text=6%20Batch)
#[derive(Clone, derive_more::Constructor)]
pub(super) struct ParallelBatchLayer {
max_response_body_size: usize,
}

impl<S> Layer<S> for ParallelBatchLayer {
type Service = ParallelBatchService<S>;

fn layer(&self, service: S) -> Self::Service {
ParallelBatchService {
service,
max_response_body_size: self.max_response_body_size,
}
}
}

#[derive(Clone)]
pub(super) struct ParallelBatchService<S> {
service: S,
max_response_body_size: usize,
}

impl<S> RpcServiceT for ParallelBatchService<S>
where
S: RpcServiceT<
MethodResponse = MethodResponse,
NotificationResponse = MethodResponse,
BatchResponse = MethodResponse,
> + Send
+ Sync
+ 'static,
{
type MethodResponse = S::MethodResponse;
type NotificationResponse = S::NotificationResponse;
type BatchResponse = S::BatchResponse;

fn call<'a>(
&self,
req: jsonrpsee::types::Request<'a>,
) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
self.service.call(req)
}

// Parallelized version of https://github.com/paritytech/jsonrpsee/blob/v0.26.0/server/src/middleware/rpc.rs#L151
fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
// Process batch in parallel instead of delegating to the inner service, which processes them sequentially.
let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
let mut got_notification = false;
// Although it's not neccesary to perserve the order in response, we do it to avoid potential bugs on client side
// See <https://www.jsonrpc.org/specification#:~:text=6%20Batch>
let mut tasks = FuturesOrdered::new();
for batch_entry in batch.into_iter() {
match batch_entry {
Ok(BatchEntry::Call(req)) => {
tasks.push_back(self.service.call(req).map(Some).boxed());
}
Ok(BatchEntry::Notification(n)) => {
got_notification = true;
tasks.push_back(self.service.notification(n).map(|_| None).boxed());
}
Err(err) => {
let (err, id) = err.into_parts();
let rp = MethodResponse::error(id, err);
tasks.push_back(async move { Some(rp) }.boxed());
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

async move {
while let Some(r) = tasks.next().await {
if let Some(rp) = r
&& let Err(err) = batch_rp.append(rp)
{
return err;
}
}

// If the batch is empty and we got a notification, we return an empty response.
if batch_rp.is_empty() && got_notification {
MethodResponse::notification()
}
// An empty batch is regarded as an invalid request here.
else {
MethodResponse::from_batch(batch_rp.finish())
}
}
}

fn notification<'a>(
&self,
n: Notification<'a>,
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
self.service.notification(n)
}
}
Loading