Skip to content

perf(webapp): parallelize Phase 2 streaming batch-item ingest (TRI-10273)#3777

Open
matt-aitken wants to merge 2 commits into
mainfrom
feature/tri-10273-managed-runs-streaming-batchtriggerandwait-fails-with-408
Open

perf(webapp): parallelize Phase 2 streaming batch-item ingest (TRI-10273)#3777
matt-aitken wants to merge 2 commits into
mainfrom
feature/tri-10273-managed-runs-streaming-batchtriggerandwait-fails-with-408

Conversation

@matt-aitken

Copy link
Copy Markdown
Member

Problem

Phase 2 of the v3 streaming batch API (POST /api/v3/batches/:batchId/items) processed streamed items strictly sequentially. For a batch of many large payloads — each offloaded to object storage inline — this serialized N object-store round-trips inside a single request, exceeding Node's default server.requestTimeout (300s). The webapp returned 408, which the SDK reads as 408 terminated and retries 5×, turning a slow ingest into a ~26-minute failure (BatchTriggerError: Failed to stream items ... 408 terminated).

Closes TRI-10273 — https://linear.app/triggerdotdev/issue/TRI-10273

Fix

Ingest now runs through p-map over the NDJSON async iterable with bounded concurrency (STREAMING_BATCH_INGEST_CONCURRENCY, default 10):

  • p-map pulls lazily from the stream — at most concurrency items are read/in-flight at once, so peak memory is bounded to roughly concurrency × STREAMING_BATCH_ITEM_MAXIMUM_SIZE and request-body backpressure is preserved.
  • Set the env to 1 for fully sequential ingestion (escape hatch).
  • The default lives only in env.server.ts; the service takes a required number.

Why this is safe (ordering/idempotency unchanged)

  • Ordering derives from each item's index (enqueue timestamp = batch.createdAt + index), not enqueue order.
  • Dedup is atomic per index in enqueueBatchItem.
  • The NDJSON parser now stamps oversized-item markers with their emit position, removing the consumer's sequential lastIndex assumption (the only order-dependent bit).
  • The count-check + conditional seal path is untouched.

Tests

  • 100-item batch ingested concurrently → all enqueued + sealed, correct counts
  • in-flight processing never exceeds the configured concurrency (real instrumented payload processor)
  • concurrent dedup on Phase 2 retry (pre-enqueued half re-streamed)
  • emit-position marker indexing (parser unit test)
  • Full existing sealing/idempotency suite still green — 42/42 pass; webapp typecheck clean.

Follow-ups (not in this PR)

  • SDK pre-offload of large item payloads (send application/store refs instead of raw blobs) to remove object-store work from the request hot path and shrink the request body — bigger, protocol-level change.
  • Optional server.requestTimeout bump as a safety net.

🤖 Generated with Claude Code

@changeset-bot

changeset-bot Bot commented May 29, 2026

Copy link
Copy Markdown

⚠️ No Changeset found

Latest commit: 999ccad

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai

coderabbitai Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: ba4b9733-81e3-4ac6-89bd-1652ff0270d4

📥 Commits

Reviewing files that changed from the base of the PR and between de3489f and 999ccad.

📒 Files selected for processing (6)
  • .server-changes/parallel-batch-item-ingest.md
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/routes/api.v3.batches.$batchId.items.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
  • apps/webapp/test/engine/streamBatchItems.test.ts
  • docs/self-hosting/env/webapp.mdx
✅ Files skipped from review due to trivial changes (3)
  • apps/webapp/app/routes/api.v3.batches.$batchId.items.ts
  • .server-changes/parallel-batch-item-ingest.md
  • docs/self-hosting/env/webapp.mdx
🚧 Files skipped from review as they are similar to previous changes (3)
  • apps/webapp/app/env.server.ts
  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📜 Recent review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (5, 10)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (2, 10)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (6, 10)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (8, 10)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (7, 10)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (3, 10)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (10, 10)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (9, 10)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (1, 10)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (4, 10)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e-webapp / 🧪 E2E Tests: Webapp
  • GitHub Check: 🛡️ E2E Auth Tests (full)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Build and publish previews

Walkthrough

This PR changes Phase 2 streaming batch ingest to process NDJSON items with bounded concurrency (p-map) instead of strict sequential offload+enqueue. It adds STREAMING_BATCH_INGEST_CONCURRENCY (default 10) and wires it into the API route, injects an optional payloadProcessor for tests, extracts per-item logic into a concurrent-safe #processItem, and updates the NDJSON parser to track emit positions and backfill oversized-item indices when extraction fails. Ordering, per-index deduplication, sealing, and idempotency semantics are preserved. Tests cover concurrency bounds, deduplication, oversized-index validation, and parser backfill behavior.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: parallelizing Phase 2 streaming batch-item ingestion with the issue reference (TRI-10273).
Description check ✅ Passed The description covers the problem, fix, safety rationale, testing, and follow-ups comprehensively. It includes a problem statement, solution explanation, testing details, and aligns with the template structure despite not using the exact template format.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/tri-10273-managed-runs-streaming-batchtriggerandwait-fails-with-408

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint install failed: dependency version conflict. Check your lock file or package.json.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@devin-ai-integration devin-ai-integration Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 4 additional findings.

Open in Devin Review

coderabbitai[bot]

This comment was marked as resolved.

@mintlify

mintlify Bot commented May 29, 2026

Copy link
Copy Markdown
Contributor

Preview deployment for your docs. Learn more about Mintlify Previews.

Project Status Preview Updated (UTC)
trigger 🟢 Ready View Preview May 29, 2026, 6:09 PM

💡 Tip: Enable Workflows to automatically generate PRs for you.

@pkg-pr-new

pkg-pr-new Bot commented Jun 6, 2026

Copy link
Copy Markdown

Open in StackBlitz

@trigger.dev/build

npm i https://pkg.pr.new/@trigger.dev/build@999ccad

trigger.dev

npm i https://pkg.pr.new/trigger.dev@999ccad

@trigger.dev/core

npm i https://pkg.pr.new/@trigger.dev/core@999ccad

@trigger.dev/plugins

npm i https://pkg.pr.new/@trigger.dev/plugins@999ccad

@trigger.dev/python

npm i https://pkg.pr.new/@trigger.dev/python@999ccad

@trigger.dev/react-hooks

npm i https://pkg.pr.new/@trigger.dev/react-hooks@999ccad

@trigger.dev/redis-worker

npm i https://pkg.pr.new/@trigger.dev/redis-worker@999ccad

@trigger.dev/rsc

npm i https://pkg.pr.new/@trigger.dev/rsc@999ccad

@trigger.dev/schema-to-json

npm i https://pkg.pr.new/@trigger.dev/schema-to-json@999ccad

@trigger.dev/sdk

npm i https://pkg.pr.new/@trigger.dev/sdk@999ccad

commit: 999ccad

matt-aitken and others added 2 commits June 7, 2026 16:51
…273)

Phase 2 of the v3 streaming batch API (POST /api/v3/batches/:batchId/items)
processed streamed items strictly sequentially. For batches of many large
payloads — each offloaded to object storage inline — this serialized N object-store
round-trips inside one request, blowing past Node's default 300s server.requestTimeout.
The webapp then returned 408, which the SDK reads as "408 terminated" and retries 5x,
turning a slow ingest into a ~26-minute failure.

Ingest now runs through p-map over the NDJSON async iterable with bounded concurrency
(STREAMING_BATCH_INGEST_CONCURRENCY, default 10). p-map pulls lazily, so at most
`concurrency` items are read/in-flight at once — bounding peak memory to roughly
concurrency x STREAMING_BATCH_ITEM_MAXIMUM_SIZE while preserving stream backpressure.
Set the env to 1 for fully sequential ingestion.

Safe by construction: run order derives from each item's index (enqueue timestamp =
batch.createdAt + index), and enqueueBatchItem dedups atomically per index — neither
depends on processing order. The NDJSON parser now stamps oversized-item markers with
their emit position, removing the consumer's sequential lastIndex assumption. The
count-check + conditional seal path is unchanged.

Tests: bounded-concurrency ingest of a 100-item batch, in-flight cap assertion,
concurrent dedup on Phase 2 retry, and emit-position marker indexing. Full existing
sealing/idempotency suite still green (42/42).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Enforce positive STREAMING_BATCH_INGEST_CONCURRENCY in the env schema
  (.int().positive()) — p-map requires concurrency >= 1, so 0/negative would
  throw at runtime.
- Apply the same out-of-range index guard to oversized-item markers as normal
  items, so an oversized item with index >= runCount returns a 4xx instead of
  creating a stray pre-failed run. Covered by a new test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@matt-aitken matt-aitken force-pushed the feature/tri-10273-managed-runs-streaming-batchtriggerandwait-fails-with-408 branch from de3489f to 999ccad Compare June 8, 2026 10:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant