Skip to content

Use lazy stream accessors for provider runtime events#1746

Merged
juliusmarminge merged 7 commits intomainfrom
feature/lazy-stream-accessors
Apr 5, 2026
Merged

Use lazy stream accessors for provider runtime events#1746
juliusmarminge merged 7 commits intomainfrom
feature/lazy-stream-accessors

Conversation

@juliusmarminge
Copy link
Copy Markdown
Member

@juliusmarminge juliusmarminge commented Apr 4, 2026

Summary

  • Switched provider/runtime stream fields to lazy getters so each access creates a fresh Stream over the underlying queue or pubsub.
  • Updated orchestration and provider adapters to use the new accessor pattern for runtime events, receipts, and settings changes.
  • Adjusted affected tests to construct streams through getters instead of storing a single eager stream instance.

Testing

  • Not run (PR content only).
  • Relevant checks for this change: bun fmt, bun lint, bun typecheck, and bun run test.

Note

Medium Risk
Touches core provider runtime event fanout and orchestration test synchronization, so regressions could cause missed/duplicated events or flaky integration tests. Changes are scoped and mostly behavioral around streaming/subscription semantics and concurrency safety.

Overview
Switches runtime Stream fields to lazy getters so each subscription gets a fresh Stream instance (provider adapters, provider/orchestration test harnesses, and server settings test stubs).

Simplifies ProviderService runtime event ingestion by removing the intermediate queue/worker loop and processing adapter streamEvents directly while still publishing to the shared PubSub and metrics.

Splits RuntimeReceiptBus into prod vs test implementations: RuntimeReceiptBusLive becomes a no-op (no retention/broadcast), while RuntimeReceiptBusTest provides a PubSub-backed stream renamed to streamEventsForTest, and integration harnesses/tests are updated accordingly.

Fixes a concurrency race in EventNdjsonLogger by making per-segment writer creation/failure tracking atomic via SynchronizedRef, with a new test covering concurrent first writes.

Reviewed by Cursor Bugbot for commit bfd64ce. Bugbot is set up for automated code reviews on this repo. Configure here.

Note

Use lazy stream getters for provider runtime events to fix race conditions

  • Converts streamEvents in ClaudeAdapter, CodexAdapter, and test harnesses from direct property assignments to lazy getters returning Stream.fromPubSub/Stream.fromQueue, ensuring a fresh stream is created per subscriber rather than at construction time.
  • Removes the intermediate Queue and worker in ProviderService.ts; adapter.streamEvents is now consumed directly via Stream.runForEach.
  • Splits RuntimeReceiptBus into RuntimeReceiptBusLive (no-op publish, empty stream) and RuntimeReceiptBusTest (PubSub-backed), and renames the subscription accessor from stream to streamEventsForTest.
  • Fixes concurrent first-write races in EventNdjsonLogger.ts by replacing mutable Maps with a SynchronizedRef, serializing writer creation per segment.
  • Integration tests that rely on real time are switched from it.effect to it.live, and collectEventsDuring adds a 50ms pre-action sleep to let subscriptions establish before events are emitted.

Macroscope summarized bfd64ce.

- Return fresh streams from adapter, bus, and test doubles
- Avoid reusing single stream instances across subscribers
@github-actions github-actions bot added vouch:trusted PR author is trusted by repo permissions or the VOUCHED list. size:S 10-29 changed lines (additions + deletions). labels Apr 4, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 4, 2026

Important

Review skipped

Auto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 3c1c9866-662f-467a-9323-e0c5dadf8a1a

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/lazy-stream-accessors

Comment @coderabbitai help to get the list of available commands and usage tips.

macroscopeapp[bot]
macroscopeapp bot previously approved these changes Apr 4, 2026
@macroscopeapp
Copy link
Copy Markdown
Contributor

macroscopeapp bot commented Apr 4, 2026

Approvability

Verdict: Approved

Mechanical refactoring converting stream properties to lazy getters, with intentional optimizations: production RuntimeReceiptBus becomes a no-op (these receipts are test-only), and an unnecessary intermediate queue is removed from ProviderService. The author is the primary contributor to all modified files. Includes a concurrency fix for EventNdjsonLogger with appropriate test coverage.

You can customize Macroscope's approvability policy. Learn more.

- Remove the intermediate runtime event queue
- Fork adapter streams straight into event handling
@macroscopeapp macroscopeapp bot dismissed their stale review April 4, 2026 23:57

Dismissing prior approval to re-evaluate 51d60d7

- Keep `RuntimeReceiptBusLive` lazy
- Switch the orchestration harness to `RuntimeReceiptBusTestLive`
- Co-authored-by: codex <codex@users.noreply.github.com>
@github-actions github-actions bot added size:M 30-99 changed lines (additions + deletions). and removed size:S 10-29 changed lines (additions + deletions). labels Apr 5, 2026
Copy link
Copy Markdown
Contributor

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Concurrent adapter fibers race on shared logger state
    • Added a Semaphore(1) mutex around resolveThreadWriter so that concurrent adapter fibers serialize writer creation, eliminating the TOCTOU race that could create duplicate writers and leak file handles.

Create PR

Or push these changes by commenting:

@cursor push afb58bbef9
Preview (afb58bbef9)
diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts
--- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts
+++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts
@@ -10,7 +10,7 @@
 
 import type { ThreadId } from "@t3tools/contracts";
 import { RotatingFileSink } from "@t3tools/shared/logging";
-import { Effect, Exit, Logger, Scope } from "effect";
+import { Effect, Exit, Logger, Scope, Semaphore } from "effect";
 
 import { toSafeThreadAttachmentSegment } from "../../attachmentStore.ts";
 
@@ -195,33 +195,35 @@
 
   const threadWriters = new Map<string, ThreadWriter>();
   const failedSegments = new Set<string>();
+  const writerMutex = yield* Semaphore.make(1);
 
-  const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* (
-    threadSegment: string,
-  ): Effect.fn.Return<ThreadWriter | undefined> {
-    if (failedSegments.has(threadSegment)) {
-      return undefined;
-    }
-    const existing = threadWriters.get(threadSegment);
-    if (existing) {
-      return existing;
-    }
+  const resolveThreadWriter = (threadSegment: string) =>
+    writerMutex.withPermits(1)(
+      Effect.gen(function* () {
+        if (failedSegments.has(threadSegment)) {
+          return undefined;
+        }
+        const existing = threadWriters.get(threadSegment);
+        if (existing) {
+          return existing;
+        }
 
-    const writer = yield* makeThreadWriter({
-      filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
-      maxBytes,
-      maxFiles,
-      batchWindowMs,
-      streamLabel,
-    });
-    if (!writer) {
-      failedSegments.add(threadSegment);
-      return undefined;
-    }
+        const writer = yield* makeThreadWriter({
+          filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
+          maxBytes,
+          maxFiles,
+          batchWindowMs,
+          streamLabel,
+        });
+        if (!writer) {
+          failedSegments.add(threadSegment);
+          return undefined;
+        }
 
-    threadWriters.set(threadSegment, writer);
-    return writer;
-  });
+        threadWriters.set(threadSegment, writer);
+        return writer;
+      }).pipe(Effect.withSpan("resolveThreadWriter")),
+    );
 
   const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) {
     const threadSegment = resolveThreadSegment(threadId);

You can send follow-ups to the cloud agent here.

Reviewed by Cursor Bugbot for commit f5a208a. Configure here.

juliusmarminge and others added 2 commits April 4, 2026 17:13
- Expose `streamEventsForTest` for harnesses
- Keep the live bus publish-only

Co-authored-by: codex <codex@users.noreply.github.com>
- prevent duplicate writer creation on concurrent first writes
- add regression test
@github-actions github-actions bot added size:L 100-499 changed lines (additions + deletions). and removed size:M 30-99 changed lines (additions + deletions). labels Apr 5, 2026
- Wait for pubsub subscriptions before emitting turn events
- Close NDJSON writers under synchronized state
- Avoid eager pull conversion for runtime event streams
- Mark integration specs as live and wait for subscription setup
@juliusmarminge juliusmarminge merged commit 740d7a3 into main Apr 5, 2026
12 checks passed
@juliusmarminge juliusmarminge deleted the feature/lazy-stream-accessors branch April 5, 2026 06:22
aaditagrawal pushed a commit to aaditagrawal/t3code that referenced this pull request Apr 5, 2026
Co-authored-by: codex <codex@users.noreply.github.com>
aaditagrawal added a commit to aaditagrawal/t3code that referenced this pull request Apr 5, 2026
…eam-accessors

Merge upstream pingdotgg#1746: Use lazy stream accessors for provider runtime events
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L 100-499 changed lines (additions + deletions). vouch:trusted PR author is trusted by repo permissions or the VOUCHED list.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant