fix: process batch RPC request in parallel#7093
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
WalkthroughAdds ParallelBatchLayer middleware that executes JSON-RPC batch entries concurrently using ordered futures and a size-limited BatchResponseBuilder, exposes a max_response_body_size configuration, and wires the layer into the RPC server stack after MetricsLayer using a shared ServerConfig. ChangesParallel Batch Processing
Sequence DiagramsequenceDiagram
participant Client
participant ParallelBatchService
participant InnerService
participant BatchResponseBuilder
Client->>ParallelBatchService: send batch (array of entries)
ParallelBatchService->>InnerService: spawn per-entry tasks (bounded by semaphore)
InnerService-->>ParallelBatchService: per-entry MethodResponse | None (notification)
ParallelBatchService->>BatchResponseBuilder: append call responses in original order
ParallelBatchService-->>Client: return batch response or notification-only response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
| }; | ||
| use tower::Layer; | ||
|
|
||
| // State-less jsonrpcsee layer for measuring RPC metrics |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/rpc/parallel_batch_layer.rs`:
- Around line 58-85: The loop in parallel_batch_layer::batch schedules every
BatchEntry immediately via tasks (FuturesOrdered) which can create unbounded
in-flight work; add a configurable concurrency cap (e.g., a semaphore or a
bounded buffer) referenced from the struct (introduce a field like
concurrency_limit or semaphore) and acquire a permit before spawning each task
for self.service.call(...) or service.notification(...), releasing the permit
when the spawned future completes, so you still push the task into tasks
(FuturesOrdered) for response assembly but never exceed the configured in-flight
count; ensure Err branch (MethodResponse::error(...)) still gets pushed
immediately without consuming a permit if you prefer, or respect the same cap to
be consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 0f6fea72-6328-4972-8676-b80d11aa4797
📒 Files selected for processing (2)
src/rpc/mod.rssrc/rpc/parallel_batch_layer.rs
51d0b77 to
e9030aa
Compare
e9030aa to
df7758b
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/rpc/parallel_batch_layer.rs (1)
85-123:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftAvoid returning on
batch_rp.append(...)error whiletasks(FuturesOrdered) still owns in-flight RPC futures.
The earlyreturn errdropstasks, and dropping aFuturesOrderedcancels/drops all futures it still manages; since call/notification futures are running under the concurrency semaphore, some entries may be aborted mid-flight and only the overflow error is returned to the client. Drain/poll the remainingtasksuntil empty (ignoring/short-circuiting furtherappends as needed) before returning the error.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/rpc/parallel_batch_layer.rs` around lines 85 - 123, The loop currently returns early when batch_rp.append(...) fails, which drops tasks (the FuturesOrdered) and cancels in-flight call/notification futures; instead, capture the append error into a local Option<ErrType> (or a flag) when MethodResponse append fails, but do not return immediately—continue polling/draining tasks (the variable tasks) until it is exhausted so semaphore-backed futures (service.call/service.notification) can complete or be polled to completion, and after the while loop returns the captured error if any; update the async block containing tasks.next().await to implement this drain-and-lazy-error-return behavior while still creating the error MethodResponse via MethodResponse::error(...) for Err(BatchEntry) cases.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/rpc/parallel_batch_layer.rs`:
- Around line 67-80: The semaphore is being created inside batch() so each
request gets a fresh limiter; move the limiter out to be a shared field on the
long-lived type (e.g., add a field like semaphore: Arc<Semaphore> on
ParallelBatchLayer or ParallelBatchService and initialize it once using
MAX_CONCURRENCY / MAX_CONCURRENCY_ENV during construction) and then clone that
Arc inside batch() instead of creating a new Semaphore there; alternatively, if
the intent is per-batch limiting, rename the local variable and
MAX_CONCURRENCY_ENV to make that explicit (e.g., per-batch max) so callers know
it is not global.
---
Outside diff comments:
In `@src/rpc/parallel_batch_layer.rs`:
- Around line 85-123: The loop currently returns early when batch_rp.append(...)
fails, which drops tasks (the FuturesOrdered) and cancels in-flight
call/notification futures; instead, capture the append error into a local
Option<ErrType> (or a flag) when MethodResponse append fails, but do not return
immediately—continue polling/draining tasks (the variable tasks) until it is
exhausted so semaphore-backed futures (service.call/service.notification) can
complete or be polled to completion, and after the while loop returns the
captured error if any; update the async block containing tasks.next().await to
implement this drain-and-lazy-error-return behavior while still creating the
error MethodResponse via MethodResponse::error(...) for Err(BatchEntry) cases.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: a192deef-805b-4c77-97cf-25c1bd52f98b
📒 Files selected for processing (3)
CHANGELOG.mddocs/docs/users/reference/env_variables.mdsrc/rpc/parallel_batch_layer.rs
✅ Files skipped from review due to trivial changes (2)
- docs/docs/users/reference/env_variables.md
- CHANGELOG.md
| const MAX_CONCURRENCY_ENV: &str = "FOREST_RPC_BATCH_MAX_CONCURRENCY"; | ||
| static MAX_CONCURRENCY: LazyLock<NonZeroUsize> = LazyLock::new(|| { | ||
| std::env::var(MAX_CONCURRENCY_ENV) | ||
| .ok() | ||
| .and_then(|i| i.parse().ok()) | ||
| .inspect(|i| { | ||
| tracing::info!( | ||
| "Max RPC batch concurrency is set to {i} by {MAX_CONCURRENCY_ENV}" | ||
| ) | ||
| }) | ||
| .unwrap_or(nonzero!(8usize)) | ||
| }); | ||
| let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENCY.get())); | ||
| let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size); |
There was a problem hiding this comment.
Share the limiter across batch invocations, or label it as per-batch.
Each batch() call creates a fresh Semaphore, so FOREST_RPC_BATCH_MAX_CONCURRENCY=8 still allows 8 × concurrent_batches in-flight RPCs. If this knob is meant to protect the node globally, it needs to live on ParallelBatchLayer/ParallelBatchService and be cloned, not rebuilt per request.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/rpc/parallel_batch_layer.rs` around lines 67 - 80, The semaphore is being
created inside batch() so each request gets a fresh limiter; move the limiter
out to be a shared field on the long-lived type (e.g., add a field like
semaphore: Arc<Semaphore> on ParallelBatchLayer or ParallelBatchService and
initialize it once using MAX_CONCURRENCY / MAX_CONCURRENCY_ENV during
construction) and then clone that Arc inside batch() instead of creating a new
Semaphore there; alternatively, if the intent is per-batch limiting, rename the
local variable and MAX_CONCURRENCY_ENV to make that explicit (e.g., per-batch
max) so callers know it is not global.
There was a problem hiding this comment.
This is intended for now. Will make the change if testing proves we do need a global Semaphore
There was a problem hiding this comment.
@hanabi1224, understood — the per-batch concurrency limit is intentional for now, acting as an intra-batch throttle rather than a global one. That's a reasonable starting point. Would you like me to open a follow-up issue to track the potential migration to a shared semaphore once you have performance data?
(°ᴥ° )~
✏️ Learnings added
Learnt from: hanabi1224
Repo: ChainSafe/forest PR: 7093
File: src/rpc/parallel_batch_layer.rs:67-80
Timestamp: 2026-05-21T17:26:37.270Z
Learning: In `src/rpc/parallel_batch_layer.rs` (ChainSafe/forest), the `Semaphore` inside `ParallelBatchService::batch()` is intentionally created per-batch call to limit concurrency within a single batch. It is NOT intended to be a global limiter across all concurrent batches. The `FOREST_RPC_BATCH_MAX_CONCURRENCY` environment variable (default 8) therefore controls per-batch parallelism, not node-wide RPC concurrency. A migration to a shared/global semaphore is deferred until testing proves it necessary.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: hanabi1224
Repo: ChainSafe/forest PR: 6381
File: src/lotus_json/actors/states/cron_state.rs:8-8
Timestamp: 2026-01-05T12:54:40.850Z
Learning: In Rust code reviews, do not derive Eq for a struct if any field does not implement Eq (e.g., types from external dependencies). If a type like CronStateLotusJson includes fields wrapping external dependencies that lack Eq, derive PartialEq (or implement PartialEq manually) but avoid deriving Eq. This ensures comparisons compile and reflect actual equivalence semantics. When needed, consider implementing custom PartialEq (and possibly Eq) only after ensuring all fields (or wrappers) implement Eq, or keep PartialEq-only if full equality semantics cannot be expressed.
Learnt from: hanabi1224
Repo: ChainSafe/forest PR: 6381
File: src/lotus_json/actors/states/evm_state.rs:41-44
Timestamp: 2026-01-05T12:56:13.802Z
Learning: In Rust codebases (e.g., Forest), do not add #[cfg(test)] to functions already annotated with #[test]. The #[test] attribute ensures the function is compiled only for tests, so a separate #[cfg(test)] is redundant and can be removed if present. Apply this check to all Rust files that contain #[test] functions.
Learnt from: hanabi1224
Repo: ChainSafe/forest PR: 6666
File: src/tool/subcommands/archive_cmd.rs:628-631
Timestamp: 2026-03-02T09:43:34.946Z
Learning: In Rust sources related to Forest snapshot handling, implement the rule: enable message_receipts and events (message_receipts: true, events: true) only for GC snapshots as defined in src/db/gc/snapshot.rs, since these are internal snapshots created during garbage collection. For user-facing export commands such as src/tool/subcommands/archive_cmd.rs, disable receipts and events by default (message_receipts: false, events: false) to keep user-facing snapshots smaller, unless explicitly requested. This guidance targets Rust files; apply consistently across similar snapshot-related modules, using the narrowest applicable scope when extending beyond the two specified files.
Learnt from: LesnyRumcajs
Repo: ChainSafe/forest PR: 6903
File: src/shim/executor.rs:243-251
Timestamp: 2026-04-14T17:24:13.631Z
Learning: In the ChainSafe/forest (LesnyRumcajs) codebase, do not require rustdoc comments on public methods when the method’s purpose is clearly self-explanatory from its name (e.g., `entries(&self)`, `into_entries(self)`). When reviewing Rust (`.rs`) files, avoid flagging missing documentation for these obviously named public APIs, reserving doc requirements for less clear or non-obvious public methods.
Learnt from: hanabi1224
Repo: ChainSafe/forest PR: 7077
File: src/utils/cache/size_tracking.rs:174-182
Timestamp: 2026-05-19T10:55:04.684Z
Learning: In ChainSafe/forest code reviews, avoid raising review comments for issues that are likely compile-time failures (e.g., missing imports, unresolved types, or other compiler errors that CI will catch). Instead, focus feedback on logic, correctness, and design/maintainability concerns, since the CI pipeline reliably verifies compilation.
Codecov Report❌ Patch coverage is
Additional details and impacted files
... and 10 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary of changes
Changes introduced in this pull request:
Reference issue to close (if applicable)
Closes #7092
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit
New Features
Refactor
Documentation
Chores