Use lazy stream accessors for provider runtime events#1746
Use lazy stream accessors for provider runtime events#1746juliusmarminge merged 7 commits intomainfrom
Conversation
- Return fresh streams from adapter, bus, and test doubles - Avoid reusing single stream instances across subscribers
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
ApprovabilityVerdict: Approved Mechanical refactoring converting stream properties to lazy getters, with intentional optimizations: production RuntimeReceiptBus becomes a no-op (these receipts are test-only), and an unnecessary intermediate queue is removed from ProviderService. The author is the primary contributor to all modified files. Includes a concurrency fix for EventNdjsonLogger with appropriate test coverage. You can customize Macroscope's approvability policy. Learn more. |
- Remove the intermediate runtime event queue - Fork adapter streams straight into event handling
Dismissing prior approval to re-evaluate 51d60d7
- Keep `RuntimeReceiptBusLive` lazy - Switch the orchestration harness to `RuntimeReceiptBusTestLive` - Co-authored-by: codex <codex@users.noreply.github.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Concurrent adapter fibers race on shared logger state
- Added a Semaphore(1) mutex around resolveThreadWriter so that concurrent adapter fibers serialize writer creation, eliminating the TOCTOU race that could create duplicate writers and leak file handles.
Or push these changes by commenting:
@cursor push afb58bbef9
Preview (afb58bbef9)
diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts
--- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts
+++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts
@@ -10,7 +10,7 @@
import type { ThreadId } from "@t3tools/contracts";
import { RotatingFileSink } from "@t3tools/shared/logging";
-import { Effect, Exit, Logger, Scope } from "effect";
+import { Effect, Exit, Logger, Scope, Semaphore } from "effect";
import { toSafeThreadAttachmentSegment } from "../../attachmentStore.ts";
@@ -195,33 +195,35 @@
const threadWriters = new Map<string, ThreadWriter>();
const failedSegments = new Set<string>();
+ const writerMutex = yield* Semaphore.make(1);
- const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* (
- threadSegment: string,
- ): Effect.fn.Return<ThreadWriter | undefined> {
- if (failedSegments.has(threadSegment)) {
- return undefined;
- }
- const existing = threadWriters.get(threadSegment);
- if (existing) {
- return existing;
- }
+ const resolveThreadWriter = (threadSegment: string) =>
+ writerMutex.withPermits(1)(
+ Effect.gen(function* () {
+ if (failedSegments.has(threadSegment)) {
+ return undefined;
+ }
+ const existing = threadWriters.get(threadSegment);
+ if (existing) {
+ return existing;
+ }
- const writer = yield* makeThreadWriter({
- filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
- maxBytes,
- maxFiles,
- batchWindowMs,
- streamLabel,
- });
- if (!writer) {
- failedSegments.add(threadSegment);
- return undefined;
- }
+ const writer = yield* makeThreadWriter({
+ filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
+ maxBytes,
+ maxFiles,
+ batchWindowMs,
+ streamLabel,
+ });
+ if (!writer) {
+ failedSegments.add(threadSegment);
+ return undefined;
+ }
- threadWriters.set(threadSegment, writer);
- return writer;
- });
+ threadWriters.set(threadSegment, writer);
+ return writer;
+ }).pipe(Effect.withSpan("resolveThreadWriter")),
+ );
const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) {
const threadSegment = resolveThreadSegment(threadId);You can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit f5a208a. Configure here.
- Expose `streamEventsForTest` for harnesses - Keep the live bus publish-only Co-authored-by: codex <codex@users.noreply.github.com>
- prevent duplicate writer creation on concurrent first writes - add regression test
- Wait for pubsub subscriptions before emitting turn events - Close NDJSON writers under synchronized state
- Avoid eager pull conversion for runtime event streams - Mark integration specs as live and wait for subscription setup
Co-authored-by: codex <codex@users.noreply.github.com>
…eam-accessors Merge upstream pingdotgg#1746: Use lazy stream accessors for provider runtime events


Summary
Streamover the underlying queue or pubsub.Testing
bun fmt,bun lint,bun typecheck, andbun run test.Note
Medium Risk
Touches core provider runtime event fanout and orchestration test synchronization, so regressions could cause missed/duplicated events or flaky integration tests. Changes are scoped and mostly behavioral around streaming/subscription semantics and concurrency safety.
Overview
Switches runtime
Streamfields to lazy getters so each subscription gets a freshStreaminstance (provider adapters, provider/orchestration test harnesses, and server settings test stubs).Simplifies ProviderService runtime event ingestion by removing the intermediate queue/worker loop and processing adapter
streamEventsdirectly while still publishing to the shared PubSub and metrics.Splits
RuntimeReceiptBusinto prod vs test implementations:RuntimeReceiptBusLivebecomes a no-op (no retention/broadcast), whileRuntimeReceiptBusTestprovides a PubSub-backed stream renamed tostreamEventsForTest, and integration harnesses/tests are updated accordingly.Fixes a concurrency race in
EventNdjsonLoggerby making per-segment writer creation/failure tracking atomic viaSynchronizedRef, with a new test covering concurrent first writes.Reviewed by Cursor Bugbot for commit bfd64ce. Bugbot is set up for automated code reviews on this repo. Configure here.
Note
Use lazy stream getters for provider runtime events to fix race conditions
streamEventsinClaudeAdapter,CodexAdapter, and test harnesses from direct property assignments to lazy getters returningStream.fromPubSub/Stream.fromQueue, ensuring a fresh stream is created per subscriber rather than at construction time.Queueand worker inProviderService.ts;adapter.streamEventsis now consumed directly viaStream.runForEach.RuntimeReceiptBusintoRuntimeReceiptBusLive(no-op publish, empty stream) andRuntimeReceiptBusTest(PubSub-backed), and renames the subscription accessor fromstreamtostreamEventsForTest.EventNdjsonLogger.tsby replacing mutable Maps with aSynchronizedRef, serializing writer creation per segment.it.effecttoit.live, andcollectEventsDuringadds a 50ms pre-action sleep to let subscriptions establish before events are emitted.Macroscope summarized bfd64ce.