From dea701c7a91dcd71549d307de591bedef09df24f Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 16:24:26 -0700 Subject: [PATCH 1/7] Use lazy stream accessors for provider runtime events - Return fresh streams from adapter, bus, and test doubles - Avoid reusing single stream instances across subscribers --- .../server/src/orchestration/Layers/CheckpointReactor.test.ts | 4 +++- .../src/orchestration/Layers/ProviderCommandReactor.test.ts | 4 +++- .../src/orchestration/Layers/ProviderRuntimeIngestion.test.ts | 4 +++- apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts | 4 +++- apps/server/src/provider/Layers/ClaudeAdapter.ts | 4 +++- apps/server/src/provider/Layers/CodexAdapter.ts | 4 +++- apps/server/src/provider/Layers/ProviderRegistry.test.ts | 4 +++- apps/server/src/provider/Layers/ProviderService.test.ts | 4 +++- 8 files changed, 24 insertions(+), 8 deletions(-) diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts index ab9f633e02..72adb175f9 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts @@ -96,7 +96,9 @@ function createProviderServiceHarness( listSessions, getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }), rollbackConversation, - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + get streamEvents() { + return Stream.fromPubSub(runtimeEventPubSub); + }, }; const emit = (event: LegacyProviderRuntimeEvent): void => { diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index ca3dc04517..fe6cb9caf5 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -208,7 +208,9 @@ describe("ProviderCommandReactor", () => { sessionModelSwitch: input?.sessionModelSwitch ?? "in-session", }), rollbackConversation: () => unsupported(), - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + get streamEvents() { + return Stream.fromPubSub(runtimeEventPubSub); + }, }; const orchestrationLayer = OrchestrationEngineLive.pipe( diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 6c27e1010c..85f4d966e3 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -98,7 +98,9 @@ function createProviderServiceHarness() { listSessions: () => Effect.succeed([...runtimeSessions]), getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }), rollbackConversation: () => unsupported(), - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + get streamEvents() { + return Stream.fromPubSub(runtimeEventPubSub); + }, }; const setSession = (session: ProviderSession): void => { diff --git a/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts b/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts index 56c526c08b..e314b5df69 100644 --- a/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts +++ b/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts @@ -11,7 +11,9 @@ const makeRuntimeReceiptBus = Effect.gen(function* () { return { publish: (receipt) => PubSub.publish(pubSub, receipt).pipe(Effect.asVoid), - stream: Stream.fromPubSub(pubSub), + get stream() { + return Stream.fromPubSub(pubSub); + }, } satisfies RuntimeReceiptBusShape; }); diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index d99e2ad203..9f2eeb014e 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -3054,7 +3054,9 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( listSessions, hasSession, stopAll, - streamEvents: Stream.fromQueue(runtimeEventQueue), + get streamEvents() { + return Stream.fromQueue(runtimeEventQueue); + }, } satisfies ClaudeAdapterShape; }); diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index cee6bca6ed..8b9f3b59e7 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -1631,7 +1631,9 @@ const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* ( listSessions, hasSession, stopAll, - streamEvents: Stream.fromQueue(runtimeEventQueue), + get streamEvents() { + return Stream.fromQueue(runtimeEventQueue); + }, } satisfies CodexAdapterShape; }); diff --git a/apps/server/src/provider/Layers/ProviderRegistry.test.ts b/apps/server/src/provider/Layers/ProviderRegistry.test.ts index 116c008d67..ca27371b61 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.test.ts @@ -115,7 +115,9 @@ function makeMutableServerSettingsService( yield* PubSub.publish(changes, next); return next; }), - streamChanges: Stream.fromPubSub(changes), + get streamChanges() { + return Stream.fromPubSub(changes); + }, } satisfies ServerSettingsShape; }); } diff --git a/apps/server/src/provider/Layers/ProviderService.test.ts b/apps/server/src/provider/Layers/ProviderService.test.ts index cd6c81405b..fc3c9cf25c 100644 --- a/apps/server/src/provider/Layers/ProviderService.test.ts +++ b/apps/server/src/provider/Layers/ProviderService.test.ts @@ -191,7 +191,9 @@ function makeFakeCodexAdapter(provider: ProviderKind = "codex") { readThread, rollbackThread, stopAll, - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + get streamEvents() { + return Stream.fromPubSub(runtimeEventPubSub); + }, }; const emit = (event: LegacyProviderRuntimeEvent): void => { From 51d60d7118fca9088799719dafad42cd0827a886 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 16:56:55 -0700 Subject: [PATCH 2/7] Process provider stream events directly - Remove the intermediate runtime event queue - Fork adapter streams straight into event handling --- apps/server/src/provider/Layers/ProviderService.ts | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index da2c293fac..85fe9fbc32 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -22,7 +22,7 @@ import { type ProviderRuntimeEvent, type ProviderSession, } from "@t3tools/contracts"; -import { Effect, Layer, Option, PubSub, Queue, Schema, SchemaIssue, Stream } from "effect"; +import { Effect, Layer, Option, PubSub, Schema, SchemaIssue, Stream } from "effect"; import { increment, @@ -156,7 +156,6 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( const registry = yield* ProviderAdapterRegistry; const directory = yield* ProviderSessionDirectory; - const runtimeEventQueue = yield* Queue.unbounded(); const runtimeEventPubSub = yield* PubSub.unbounded(); const publishRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect => @@ -194,15 +193,8 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( eventType: event.type, }).pipe(Effect.andThen(publishRuntimeEvent(event))); - const worker = Effect.forever( - Queue.take(runtimeEventQueue).pipe(Effect.flatMap(processRuntimeEvent)), - ); - yield* Effect.forkScoped(worker); - yield* Effect.forEach(adapters, (adapter) => - Stream.runForEach(adapter.streamEvents, (event) => - Queue.offer(runtimeEventQueue, event).pipe(Effect.asVoid), - ).pipe(Effect.forkScoped), + Stream.runForEach(adapter.streamEvents, processRuntimeEvent).pipe(Effect.forkScoped), ).pipe(Effect.asVoid); const recoverSessionForThread = Effect.fn("recoverSessionForThread")(function* (input: { From f5a208abc09b161404b77d1a9ba9a68a10c33e8d Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 17:02:22 -0700 Subject: [PATCH 3/7] Use dedicated receipt bus layer for integration tests - Keep `RuntimeReceiptBusLive` lazy - Switch the orchestration harness to `RuntimeReceiptBusTestLive` - Co-authored-by: codex --- .../integration/OrchestrationEngineHarness.integration.ts | 4 ++-- apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts | 8 +++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index 152ed1d608..2a7a0b48a9 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -48,7 +48,7 @@ import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointRea import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts"; import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts"; -import { RuntimeReceiptBusLive } from "../src/orchestration/Layers/RuntimeReceiptBus.ts"; +import { RuntimeReceiptBusTestLive } from "../src/orchestration/Layers/RuntimeReceiptBus.ts"; import { OrchestrationReactorLive } from "../src/orchestration/Layers/OrchestrationReactor.ts"; import { ProviderCommandReactorLive } from "../src/orchestration/Layers/ProviderCommandReactor.ts"; import { ProviderRuntimeIngestionLive } from "../src/orchestration/Layers/ProviderRuntimeIngestion.ts"; @@ -297,7 +297,7 @@ export const makeOrchestrationIntegrationHarness = ( ProjectionPendingApprovalRepositoryLive, checkpointStoreLayer, providerLayer, - RuntimeReceiptBusLive, + RuntimeReceiptBusTestLive, ); const serverSettingsLayer = ServerSettingsService.layerTest(); const runtimeIngestionLayer = ProviderRuntimeIngestionLive.pipe( diff --git a/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts b/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts index e314b5df69..155dcbe48e 100644 --- a/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts +++ b/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts @@ -6,7 +6,12 @@ import { type OrchestrationRuntimeReceipt, } from "../Services/RuntimeReceiptBus.ts"; -const makeRuntimeReceiptBus = Effect.gen(function* () { +const makeRuntimeReceiptBus = Effect.succeed({ + publish: () => Effect.void, + stream: Stream.empty, +} satisfies RuntimeReceiptBusShape); + +const makeRuntimeReceiptBusTest = Effect.gen(function* () { const pubSub = yield* PubSub.unbounded(); return { @@ -18,3 +23,4 @@ const makeRuntimeReceiptBus = Effect.gen(function* () { }); export const RuntimeReceiptBusLive = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBus); +export const RuntimeReceiptBusTestLive = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBusTest); From 2b1791ca817d250188b7c8fc4d8d4b2074ee681e Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 17:13:07 -0700 Subject: [PATCH 4/7] Rename runtime receipt bus test stream accessor - Expose `streamEventsForTest` for harnesses - Keep the live bus publish-only Co-authored-by: codex --- .../OrchestrationEngineHarness.integration.ts | 6 +++--- .../orchestration/Layers/RuntimeReceiptBus.ts | 16 +++++++++++++--- .../Services/RuntimeReceiptBus.ts | 18 +++++++++++++++++- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index 2a7a0b48a9..87c81f08c8 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -48,7 +48,7 @@ import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointRea import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts"; import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts"; -import { RuntimeReceiptBusTestLive } from "../src/orchestration/Layers/RuntimeReceiptBus.ts"; +import { RuntimeReceiptBusTest } from "../src/orchestration/Layers/RuntimeReceiptBus.ts"; import { OrchestrationReactorLive } from "../src/orchestration/Layers/OrchestrationReactor.ts"; import { ProviderCommandReactorLive } from "../src/orchestration/Layers/ProviderCommandReactor.ts"; import { ProviderRuntimeIngestionLive } from "../src/orchestration/Layers/ProviderRuntimeIngestion.ts"; @@ -297,7 +297,7 @@ export const makeOrchestrationIntegrationHarness = ( ProjectionPendingApprovalRepositoryLive, checkpointStoreLayer, providerLayer, - RuntimeReceiptBusTestLive, + RuntimeReceiptBusTest, ); const serverSettingsLayer = ServerSettingsService.layerTest(); const runtimeIngestionLayer = ProviderRuntimeIngestionLive.pipe( @@ -376,7 +376,7 @@ export const makeOrchestrationIntegrationHarness = ( runtime.runPromise(reactor.start().pipe(Scope.provide(scope))), ).pipe(Effect.orDie); const receiptHistory = yield* Ref.make>([]); - yield* Stream.runForEach(runtimeReceiptBus.stream, (receipt) => + yield* Stream.runForEach(runtimeReceiptBus.streamEventsForTest, (receipt) => Ref.update(receiptHistory, (history) => [...history, receipt]).pipe(Effect.asVoid), ).pipe(Effect.forkIn(scope)); yield* Effect.sleep(10); diff --git a/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts b/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts index 155dcbe48e..5c3148c943 100644 --- a/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts +++ b/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts @@ -1,3 +1,13 @@ +/** + * RuntimeReceiptBus layers. + * + * `RuntimeReceiptBusLive` is the production default and intentionally does not + * retain or broadcast receipts. `RuntimeReceiptBusTest` installs the in-memory + * PubSub-backed implementation used by integration tests that need to await + * checkpoint-reactor milestones precisely. + * + * @module RuntimeReceiptBus + */ import { Effect, Layer, PubSub, Stream } from "effect"; import { @@ -8,7 +18,7 @@ import { const makeRuntimeReceiptBus = Effect.succeed({ publish: () => Effect.void, - stream: Stream.empty, + streamEventsForTest: Stream.empty, } satisfies RuntimeReceiptBusShape); const makeRuntimeReceiptBusTest = Effect.gen(function* () { @@ -16,11 +26,11 @@ const makeRuntimeReceiptBusTest = Effect.gen(function* () { return { publish: (receipt) => PubSub.publish(pubSub, receipt).pipe(Effect.asVoid), - get stream() { + get streamEventsForTest() { return Stream.fromPubSub(pubSub); }, } satisfies RuntimeReceiptBusShape; }); export const RuntimeReceiptBusLive = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBus); -export const RuntimeReceiptBusTestLive = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBusTest); +export const RuntimeReceiptBusTest = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBusTest); diff --git a/apps/server/src/orchestration/Services/RuntimeReceiptBus.ts b/apps/server/src/orchestration/Services/RuntimeReceiptBus.ts index a08ff91685..bf5c94f3a0 100644 --- a/apps/server/src/orchestration/Services/RuntimeReceiptBus.ts +++ b/apps/server/src/orchestration/Services/RuntimeReceiptBus.ts @@ -1,3 +1,19 @@ +/** + * RuntimeReceiptBus - Internal checkpoint-reactor synchronization receipts. + * + * This service exists to expose short-lived orchestration milestones that are + * useful in tests and harnesses but are not part of the production runtime + * event model. `CheckpointReactor` publishes receipts such as baseline capture, + * diff finalization, and turn-processing quiescence so integration tests can + * wait for those exact points without inferring them indirectly from persisted + * state. + * + * Production code should only call `publish`. Test code may subscribe via + * `streamEventsForTest`, which is intentionally named to make the intended + * usage explicit. + * + * @module RuntimeReceiptBus + */ import { CheckpointRef, IsoDateTime, NonNegativeInt, ThreadId, TurnId } from "@t3tools/contracts"; import { Schema, ServiceMap } from "effect"; import type { Effect, Stream } from "effect"; @@ -40,7 +56,7 @@ export type OrchestrationRuntimeReceipt = typeof OrchestrationRuntimeReceipt.Typ export interface RuntimeReceiptBusShape { readonly publish: (receipt: OrchestrationRuntimeReceipt) => Effect.Effect; - readonly stream: Stream.Stream; + readonly streamEventsForTest: Stream.Stream; } export class RuntimeReceiptBus extends ServiceMap.Service< From 3fa98910e92e7bd0e492e2b66c8459a3b469134e Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 17:35:17 -0700 Subject: [PATCH 5/7] Serialize thread writer initialization - prevent duplicate writer creation on concurrent first writes - add regression test --- .../provider/Layers/EventNdjsonLogger.test.ts | 43 ++++++++++ .../src/provider/Layers/EventNdjsonLogger.ts | 81 +++++++++++++------ 2 files changed, 99 insertions(+), 25 deletions(-) diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts index 258b50d6f2..9284ffac45 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts @@ -112,6 +112,49 @@ describe("EventNdjsonLogger", () => { }), ); + it.effect("serializes concurrent first writes for the same segment", () => + Effect.gen(function* () { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-")); + const basePath = path.join(tempDir, "provider-canonical.ndjson"); + + try { + const logger = yield* makeEventNdjsonLogger(basePath, { + stream: "canonical", + batchWindowMs: 0, + }); + assert.notEqual(logger, undefined); + if (!logger) { + return; + } + + yield* Effect.all( + [ + logger.write({ id: "evt-concurrent-1" }, null), + logger.write({ id: "evt-concurrent-2" }, null), + ], + { concurrency: "unbounded" }, + ); + yield* logger.close(); + + const globalPath = path.join(tempDir, "_global.log"); + assert.equal(fs.existsSync(globalPath), true); + const lines = fs + .readFileSync(globalPath, "utf8") + .trim() + .split("\n") + .map((line) => parseLogLine(line)); + + assert.equal(lines.length, 2); + assert.deepEqual(lines.map((line) => line.payload).toSorted(), [ + '{"id":"evt-concurrent-1"}', + '{"id":"evt-concurrent-2"}', + ]); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }), + ); + it.effect("rotates per-thread files when max size is exceeded", () => Effect.gen(function* () { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-")); diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index a4fd6f235d..6af3d1898e 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -10,7 +10,7 @@ import path from "node:path"; import type { ThreadId } from "@t3tools/contracts"; import { RotatingFileSink } from "@t3tools/shared/logging"; -import { Effect, Exit, Logger, Scope } from "effect"; +import { Effect, Exit, Logger, Scope, SynchronizedRef } from "effect"; import { toSafeThreadAttachmentSegment } from "../../attachmentStore.ts"; @@ -40,6 +40,11 @@ interface ThreadWriter { close: () => Effect.Effect; } +interface LoggerState { + readonly threadWriters: Map; + readonly failedSegments: Set; +} + function logWarning(message: string, context: Record): Effect.Effect { return Effect.logWarning(message, context).pipe(Effect.annotateLogs({ scope: LOG_SCOPE })); } @@ -193,34 +198,56 @@ export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function return undefined; } - const threadWriters = new Map(); - const failedSegments = new Set(); + const stateRef = yield* SynchronizedRef.make({ + threadWriters: new Map(), + failedSegments: new Set(), + }); const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* ( threadSegment: string, ): Effect.fn.Return { - if (failedSegments.has(threadSegment)) { - return undefined; - } - const existing = threadWriters.get(threadSegment); - if (existing) { - return existing; - } + return yield* SynchronizedRef.modifyEffect(stateRef, (state) => { + if (state.failedSegments.has(threadSegment)) { + return Effect.succeed([undefined, state] as const); + } - const writer = yield* makeThreadWriter({ - filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), - maxBytes, - maxFiles, - batchWindowMs, - streamLabel, - }); - if (!writer) { - failedSegments.add(threadSegment); - return undefined; - } + const existing = state.threadWriters.get(threadSegment); + if (existing) { + return Effect.succeed([existing, state] as const); + } + + return makeThreadWriter({ + filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), + maxBytes, + maxFiles, + batchWindowMs, + streamLabel, + }).pipe( + Effect.map((writer) => { + if (!writer) { + const nextFailedSegments = new Set(state.failedSegments); + nextFailedSegments.add(threadSegment); + return [ + undefined, + { + ...state, + failedSegments: nextFailedSegments, + }, + ] as const; + } - threadWriters.set(threadSegment, writer); - return writer; + const nextThreadWriters = new Map(state.threadWriters); + nextThreadWriters.set(threadSegment, writer); + return [ + writer, + { + ...state, + threadWriters: nextThreadWriters, + }, + ] as const; + }), + ); + }); }); const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) { @@ -239,10 +266,14 @@ export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function }); const close = Effect.fn("close")(function* () { - for (const writer of threadWriters.values()) { + const state = yield* SynchronizedRef.get(stateRef); + for (const writer of state.threadWriters.values()) { yield* writer.close(); } - threadWriters.clear(); + yield* SynchronizedRef.set(stateRef, { + threadWriters: new Map(), + failedSegments: new Set(), + }); }); return { From 0a816d2492acf39518a9fce5bc0acb4dc76a45cb Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 20:22:26 -0700 Subject: [PATCH 6/7] Fix lazy stream subscriptions and logger shutdown - Wait for pubsub subscriptions before emitting turn events - Close NDJSON writers under synchronized state --- .../providerService.integration.test.ts | 8 ++++++- .../src/provider/Layers/EventNdjsonLogger.ts | 23 ++++++++++++------- .../src/provider/Layers/ProviderService.ts | 8 ++++++- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/apps/server/integration/providerService.integration.test.ts b/apps/server/integration/providerService.integration.test.ts index ef03a1ab5c..2cd1dc68ba 100644 --- a/apps/server/integration/providerService.integration.test.ts +++ b/apps/server/integration/providerService.integration.test.ts @@ -29,6 +29,10 @@ import { codexTurnTextFixture, } from "./fixtures/providerRuntime.ts"; +const waitForRealtimeSubscription = Effect.promise( + () => new Promise((resolve) => setTimeout(resolve, 50)), +); + const makeWorkspaceDirectory = Effect.gen(function* () { const fs = yield* FileSystem.FileSystem; const pathService = yield* Path.Path; @@ -86,6 +90,9 @@ const collectEventsDuring = ( Effect.forkScoped, ); + // `Stream.fromPubSub` subscriptions attach when the forked fiber runs, not + // when the stream value is constructed. + yield* waitForRealtimeSubscription; yield* action; return yield* Effect.forEach( @@ -104,7 +111,6 @@ const runTurn = (input: { }) => Effect.gen(function* () { yield* input.harness.queueTurnResponse(input.threadId, input.response); - return yield* collectEventsDuring( input.provider.streamEvents, input.response.events.length, diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index 6af3d1898e..2a512efe66 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -266,14 +266,21 @@ export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function }); const close = Effect.fn("close")(function* () { - const state = yield* SynchronizedRef.get(stateRef); - for (const writer of state.threadWriters.values()) { - yield* writer.close(); - } - yield* SynchronizedRef.set(stateRef, { - threadWriters: new Map(), - failedSegments: new Set(), - }); + yield* SynchronizedRef.modifyEffect(stateRef, (state) => + Effect.gen(function* () { + for (const writer of state.threadWriters.values()) { + yield* writer.close(); + } + + return [ + undefined, + { + threadWriters: new Map(), + failedSegments: new Set(), + }, + ] as const; + }), + ); }); return { diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index 85fe9fbc32..d1249b54fd 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -194,7 +194,13 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( }).pipe(Effect.andThen(publishRuntimeEvent(event))); yield* Effect.forEach(adapters, (adapter) => - Stream.runForEach(adapter.streamEvents, processRuntimeEvent).pipe(Effect.forkScoped), + Effect.gen(function* () { + const pull = yield* Stream.toPull(adapter.streamEvents); + yield* Stream.fromPull(Effect.succeed(pull)).pipe( + Stream.runForEach(processRuntimeEvent), + Effect.forkScoped, + ); + }), ).pipe(Effect.asVoid); const recoverSessionForThread = Effect.fn("recoverSessionForThread")(function* (input: { From bfd64ceb19a51b079f7cf0aca81776a79e7eef82 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 20:25:08 -0700 Subject: [PATCH 7/7] Run adapter streams directly - Avoid eager pull conversion for runtime event streams - Mark integration specs as live and wait for subscription setup --- .../providerService.integration.test.ts | 16 +++++----------- .../src/provider/Layers/ProviderService.ts | 8 +------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/apps/server/integration/providerService.integration.test.ts b/apps/server/integration/providerService.integration.test.ts index 2cd1dc68ba..1ca6fa1b83 100644 --- a/apps/server/integration/providerService.integration.test.ts +++ b/apps/server/integration/providerService.integration.test.ts @@ -29,10 +29,6 @@ import { codexTurnTextFixture, } from "./fixtures/providerRuntime.ts"; -const waitForRealtimeSubscription = Effect.promise( - () => new Promise((resolve) => setTimeout(resolve, 50)), -); - const makeWorkspaceDirectory = Effect.gen(function* () { const fs = yield* FileSystem.FileSystem; const pathService = yield* Path.Path; @@ -90,9 +86,7 @@ const collectEventsDuring = ( Effect.forkScoped, ); - // `Stream.fromPubSub` subscriptions attach when the forked fiber runs, not - // when the stream value is constructed. - yield* waitForRealtimeSubscription; + yield* Effect.sleep("50 millis"); yield* action; return yield* Effect.forEach( @@ -122,7 +116,7 @@ const runTurn = (input: { ); }); -it.effect("replays typed runtime fixture events", () => +it.live("replays typed runtime fixture events", () => Effect.gen(function* () { const fixture = yield* makeIntegrationFixture; @@ -155,7 +149,7 @@ it.effect("replays typed runtime fixture events", () => }).pipe(Effect.provide(NodeServices.layer)), ); -it.effect("replays file-changing fixture turn events", () => +it.live("replays file-changing fixture turn events", () => Effect.gen(function* () { const fixture = yield* makeIntegrationFixture; const { join } = yield* Path.Path; @@ -194,7 +188,7 @@ it.effect("replays file-changing fixture turn events", () => }).pipe(Effect.provide(NodeServices.layer)), ); -it.effect("runs multi-turn tool/approval flow", () => +it.live("runs multi-turn tool/approval flow", () => Effect.gen(function* () { const fixture = yield* makeIntegrationFixture; const { join } = yield* Path.Path; @@ -248,7 +242,7 @@ it.effect("runs multi-turn tool/approval flow", () => }).pipe(Effect.provide(NodeServices.layer)), ); -it.effect("rolls back provider conversation state only", () => +it.live("rolls back provider conversation state only", () => Effect.gen(function* () { const fixture = yield* makeIntegrationFixture; const { join } = yield* Path.Path; diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index d1249b54fd..85fe9fbc32 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -194,13 +194,7 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( }).pipe(Effect.andThen(publishRuntimeEvent(event))); yield* Effect.forEach(adapters, (adapter) => - Effect.gen(function* () { - const pull = yield* Stream.toPull(adapter.streamEvents); - yield* Stream.fromPull(Effect.succeed(pull)).pipe( - Stream.runForEach(processRuntimeEvent), - Effect.forkScoped, - ); - }), + Stream.runForEach(adapter.streamEvents, processRuntimeEvent).pipe(Effect.forkScoped), ).pipe(Effect.asVoid); const recoverSessionForThread = Effect.fn("recoverSessionForThread")(function* (input: {