diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index 152ed1d608..87c81f08c8 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -48,7 +48,7 @@ import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointRea import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts"; import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts"; -import { RuntimeReceiptBusLive } from "../src/orchestration/Layers/RuntimeReceiptBus.ts"; +import { RuntimeReceiptBusTest } from "../src/orchestration/Layers/RuntimeReceiptBus.ts"; import { OrchestrationReactorLive } from "../src/orchestration/Layers/OrchestrationReactor.ts"; import { ProviderCommandReactorLive } from "../src/orchestration/Layers/ProviderCommandReactor.ts"; import { ProviderRuntimeIngestionLive } from "../src/orchestration/Layers/ProviderRuntimeIngestion.ts"; @@ -297,7 +297,7 @@ export const makeOrchestrationIntegrationHarness = ( ProjectionPendingApprovalRepositoryLive, checkpointStoreLayer, providerLayer, - RuntimeReceiptBusLive, + RuntimeReceiptBusTest, ); const serverSettingsLayer = ServerSettingsService.layerTest(); const runtimeIngestionLayer = ProviderRuntimeIngestionLive.pipe( @@ -376,7 +376,7 @@ export const makeOrchestrationIntegrationHarness = ( runtime.runPromise(reactor.start().pipe(Scope.provide(scope))), ).pipe(Effect.orDie); const receiptHistory = yield* Ref.make>([]); - yield* Stream.runForEach(runtimeReceiptBus.stream, (receipt) => + yield* Stream.runForEach(runtimeReceiptBus.streamEventsForTest, (receipt) => Ref.update(receiptHistory, (history) => [...history, receipt]).pipe(Effect.asVoid), ).pipe(Effect.forkIn(scope)); yield* Effect.sleep(10); diff --git a/apps/server/integration/providerService.integration.test.ts b/apps/server/integration/providerService.integration.test.ts index ef03a1ab5c..1ca6fa1b83 100644 --- a/apps/server/integration/providerService.integration.test.ts +++ b/apps/server/integration/providerService.integration.test.ts @@ -86,6 +86,7 @@ const collectEventsDuring = ( Effect.forkScoped, ); + yield* Effect.sleep("50 millis"); yield* action; return yield* Effect.forEach( @@ -104,7 +105,6 @@ const runTurn = (input: { }) => Effect.gen(function* () { yield* input.harness.queueTurnResponse(input.threadId, input.response); - return yield* collectEventsDuring( input.provider.streamEvents, input.response.events.length, @@ -116,7 +116,7 @@ const runTurn = (input: { ); }); -it.effect("replays typed runtime fixture events", () => +it.live("replays typed runtime fixture events", () => Effect.gen(function* () { const fixture = yield* makeIntegrationFixture; @@ -149,7 +149,7 @@ it.effect("replays typed runtime fixture events", () => }).pipe(Effect.provide(NodeServices.layer)), ); -it.effect("replays file-changing fixture turn events", () => +it.live("replays file-changing fixture turn events", () => Effect.gen(function* () { const fixture = yield* makeIntegrationFixture; const { join } = yield* Path.Path; @@ -188,7 +188,7 @@ it.effect("replays file-changing fixture turn events", () => }).pipe(Effect.provide(NodeServices.layer)), ); -it.effect("runs multi-turn tool/approval flow", () => +it.live("runs multi-turn tool/approval flow", () => Effect.gen(function* () { const fixture = yield* makeIntegrationFixture; const { join } = yield* Path.Path; @@ -242,7 +242,7 @@ it.effect("runs multi-turn tool/approval flow", () => }).pipe(Effect.provide(NodeServices.layer)), ); -it.effect("rolls back provider conversation state only", () => +it.live("rolls back provider conversation state only", () => Effect.gen(function* () { const fixture = yield* makeIntegrationFixture; const { join } = yield* Path.Path; diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts index ab9f633e02..72adb175f9 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts @@ -96,7 +96,9 @@ function createProviderServiceHarness( listSessions, getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }), rollbackConversation, - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + get streamEvents() { + return Stream.fromPubSub(runtimeEventPubSub); + }, }; const emit = (event: LegacyProviderRuntimeEvent): void => { diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index ca3dc04517..fe6cb9caf5 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -208,7 +208,9 @@ describe("ProviderCommandReactor", () => { sessionModelSwitch: input?.sessionModelSwitch ?? "in-session", }), rollbackConversation: () => unsupported(), - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + get streamEvents() { + return Stream.fromPubSub(runtimeEventPubSub); + }, }; const orchestrationLayer = OrchestrationEngineLive.pipe( diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 6c27e1010c..85f4d966e3 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -98,7 +98,9 @@ function createProviderServiceHarness() { listSessions: () => Effect.succeed([...runtimeSessions]), getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }), rollbackConversation: () => unsupported(), - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + get streamEvents() { + return Stream.fromPubSub(runtimeEventPubSub); + }, }; const setSession = (session: ProviderSession): void => { diff --git a/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts b/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts index 56c526c08b..5c3148c943 100644 --- a/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts +++ b/apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts @@ -1,3 +1,13 @@ +/** + * RuntimeReceiptBus layers. + * + * `RuntimeReceiptBusLive` is the production default and intentionally does not + * retain or broadcast receipts. `RuntimeReceiptBusTest` installs the in-memory + * PubSub-backed implementation used by integration tests that need to await + * checkpoint-reactor milestones precisely. + * + * @module RuntimeReceiptBus + */ import { Effect, Layer, PubSub, Stream } from "effect"; import { @@ -6,13 +16,21 @@ import { type OrchestrationRuntimeReceipt, } from "../Services/RuntimeReceiptBus.ts"; -const makeRuntimeReceiptBus = Effect.gen(function* () { +const makeRuntimeReceiptBus = Effect.succeed({ + publish: () => Effect.void, + streamEventsForTest: Stream.empty, +} satisfies RuntimeReceiptBusShape); + +const makeRuntimeReceiptBusTest = Effect.gen(function* () { const pubSub = yield* PubSub.unbounded(); return { publish: (receipt) => PubSub.publish(pubSub, receipt).pipe(Effect.asVoid), - stream: Stream.fromPubSub(pubSub), + get streamEventsForTest() { + return Stream.fromPubSub(pubSub); + }, } satisfies RuntimeReceiptBusShape; }); export const RuntimeReceiptBusLive = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBus); +export const RuntimeReceiptBusTest = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBusTest); diff --git a/apps/server/src/orchestration/Services/RuntimeReceiptBus.ts b/apps/server/src/orchestration/Services/RuntimeReceiptBus.ts index a08ff91685..bf5c94f3a0 100644 --- a/apps/server/src/orchestration/Services/RuntimeReceiptBus.ts +++ b/apps/server/src/orchestration/Services/RuntimeReceiptBus.ts @@ -1,3 +1,19 @@ +/** + * RuntimeReceiptBus - Internal checkpoint-reactor synchronization receipts. + * + * This service exists to expose short-lived orchestration milestones that are + * useful in tests and harnesses but are not part of the production runtime + * event model. `CheckpointReactor` publishes receipts such as baseline capture, + * diff finalization, and turn-processing quiescence so integration tests can + * wait for those exact points without inferring them indirectly from persisted + * state. + * + * Production code should only call `publish`. Test code may subscribe via + * `streamEventsForTest`, which is intentionally named to make the intended + * usage explicit. + * + * @module RuntimeReceiptBus + */ import { CheckpointRef, IsoDateTime, NonNegativeInt, ThreadId, TurnId } from "@t3tools/contracts"; import { Schema, ServiceMap } from "effect"; import type { Effect, Stream } from "effect"; @@ -40,7 +56,7 @@ export type OrchestrationRuntimeReceipt = typeof OrchestrationRuntimeReceipt.Typ export interface RuntimeReceiptBusShape { readonly publish: (receipt: OrchestrationRuntimeReceipt) => Effect.Effect; - readonly stream: Stream.Stream; + readonly streamEventsForTest: Stream.Stream; } export class RuntimeReceiptBus extends ServiceMap.Service< diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index d99e2ad203..9f2eeb014e 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -3054,7 +3054,9 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( listSessions, hasSession, stopAll, - streamEvents: Stream.fromQueue(runtimeEventQueue), + get streamEvents() { + return Stream.fromQueue(runtimeEventQueue); + }, } satisfies ClaudeAdapterShape; }); diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index cee6bca6ed..8b9f3b59e7 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -1631,7 +1631,9 @@ const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* ( listSessions, hasSession, stopAll, - streamEvents: Stream.fromQueue(runtimeEventQueue), + get streamEvents() { + return Stream.fromQueue(runtimeEventQueue); + }, } satisfies CodexAdapterShape; }); diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts index 258b50d6f2..9284ffac45 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts @@ -112,6 +112,49 @@ describe("EventNdjsonLogger", () => { }), ); + it.effect("serializes concurrent first writes for the same segment", () => + Effect.gen(function* () { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-")); + const basePath = path.join(tempDir, "provider-canonical.ndjson"); + + try { + const logger = yield* makeEventNdjsonLogger(basePath, { + stream: "canonical", + batchWindowMs: 0, + }); + assert.notEqual(logger, undefined); + if (!logger) { + return; + } + + yield* Effect.all( + [ + logger.write({ id: "evt-concurrent-1" }, null), + logger.write({ id: "evt-concurrent-2" }, null), + ], + { concurrency: "unbounded" }, + ); + yield* logger.close(); + + const globalPath = path.join(tempDir, "_global.log"); + assert.equal(fs.existsSync(globalPath), true); + const lines = fs + .readFileSync(globalPath, "utf8") + .trim() + .split("\n") + .map((line) => parseLogLine(line)); + + assert.equal(lines.length, 2); + assert.deepEqual(lines.map((line) => line.payload).toSorted(), [ + '{"id":"evt-concurrent-1"}', + '{"id":"evt-concurrent-2"}', + ]); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }), + ); + it.effect("rotates per-thread files when max size is exceeded", () => Effect.gen(function* () { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-")); diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index a4fd6f235d..2a512efe66 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -10,7 +10,7 @@ import path from "node:path"; import type { ThreadId } from "@t3tools/contracts"; import { RotatingFileSink } from "@t3tools/shared/logging"; -import { Effect, Exit, Logger, Scope } from "effect"; +import { Effect, Exit, Logger, Scope, SynchronizedRef } from "effect"; import { toSafeThreadAttachmentSegment } from "../../attachmentStore.ts"; @@ -40,6 +40,11 @@ interface ThreadWriter { close: () => Effect.Effect; } +interface LoggerState { + readonly threadWriters: Map; + readonly failedSegments: Set; +} + function logWarning(message: string, context: Record): Effect.Effect { return Effect.logWarning(message, context).pipe(Effect.annotateLogs({ scope: LOG_SCOPE })); } @@ -193,34 +198,56 @@ export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function return undefined; } - const threadWriters = new Map(); - const failedSegments = new Set(); + const stateRef = yield* SynchronizedRef.make({ + threadWriters: new Map(), + failedSegments: new Set(), + }); const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* ( threadSegment: string, ): Effect.fn.Return { - if (failedSegments.has(threadSegment)) { - return undefined; - } - const existing = threadWriters.get(threadSegment); - if (existing) { - return existing; - } + return yield* SynchronizedRef.modifyEffect(stateRef, (state) => { + if (state.failedSegments.has(threadSegment)) { + return Effect.succeed([undefined, state] as const); + } - const writer = yield* makeThreadWriter({ - filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), - maxBytes, - maxFiles, - batchWindowMs, - streamLabel, - }); - if (!writer) { - failedSegments.add(threadSegment); - return undefined; - } + const existing = state.threadWriters.get(threadSegment); + if (existing) { + return Effect.succeed([existing, state] as const); + } + + return makeThreadWriter({ + filePath: path.join(path.dirname(filePath), `${threadSegment}.log`), + maxBytes, + maxFiles, + batchWindowMs, + streamLabel, + }).pipe( + Effect.map((writer) => { + if (!writer) { + const nextFailedSegments = new Set(state.failedSegments); + nextFailedSegments.add(threadSegment); + return [ + undefined, + { + ...state, + failedSegments: nextFailedSegments, + }, + ] as const; + } - threadWriters.set(threadSegment, writer); - return writer; + const nextThreadWriters = new Map(state.threadWriters); + nextThreadWriters.set(threadSegment, writer); + return [ + writer, + { + ...state, + threadWriters: nextThreadWriters, + }, + ] as const; + }), + ); + }); }); const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) { @@ -239,10 +266,21 @@ export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function }); const close = Effect.fn("close")(function* () { - for (const writer of threadWriters.values()) { - yield* writer.close(); - } - threadWriters.clear(); + yield* SynchronizedRef.modifyEffect(stateRef, (state) => + Effect.gen(function* () { + for (const writer of state.threadWriters.values()) { + yield* writer.close(); + } + + return [ + undefined, + { + threadWriters: new Map(), + failedSegments: new Set(), + }, + ] as const; + }), + ); }); return { diff --git a/apps/server/src/provider/Layers/ProviderRegistry.test.ts b/apps/server/src/provider/Layers/ProviderRegistry.test.ts index 116c008d67..ca27371b61 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.test.ts @@ -115,7 +115,9 @@ function makeMutableServerSettingsService( yield* PubSub.publish(changes, next); return next; }), - streamChanges: Stream.fromPubSub(changes), + get streamChanges() { + return Stream.fromPubSub(changes); + }, } satisfies ServerSettingsShape; }); } diff --git a/apps/server/src/provider/Layers/ProviderService.test.ts b/apps/server/src/provider/Layers/ProviderService.test.ts index cd6c81405b..fc3c9cf25c 100644 --- a/apps/server/src/provider/Layers/ProviderService.test.ts +++ b/apps/server/src/provider/Layers/ProviderService.test.ts @@ -191,7 +191,9 @@ function makeFakeCodexAdapter(provider: ProviderKind = "codex") { readThread, rollbackThread, stopAll, - streamEvents: Stream.fromPubSub(runtimeEventPubSub), + get streamEvents() { + return Stream.fromPubSub(runtimeEventPubSub); + }, }; const emit = (event: LegacyProviderRuntimeEvent): void => { diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index da2c293fac..85fe9fbc32 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -22,7 +22,7 @@ import { type ProviderRuntimeEvent, type ProviderSession, } from "@t3tools/contracts"; -import { Effect, Layer, Option, PubSub, Queue, Schema, SchemaIssue, Stream } from "effect"; +import { Effect, Layer, Option, PubSub, Schema, SchemaIssue, Stream } from "effect"; import { increment, @@ -156,7 +156,6 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( const registry = yield* ProviderAdapterRegistry; const directory = yield* ProviderSessionDirectory; - const runtimeEventQueue = yield* Queue.unbounded(); const runtimeEventPubSub = yield* PubSub.unbounded(); const publishRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect => @@ -194,15 +193,8 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( eventType: event.type, }).pipe(Effect.andThen(publishRuntimeEvent(event))); - const worker = Effect.forever( - Queue.take(runtimeEventQueue).pipe(Effect.flatMap(processRuntimeEvent)), - ); - yield* Effect.forkScoped(worker); - yield* Effect.forEach(adapters, (adapter) => - Stream.runForEach(adapter.streamEvents, (event) => - Queue.offer(runtimeEventQueue, event).pipe(Effect.asVoid), - ).pipe(Effect.forkScoped), + Stream.runForEach(adapter.streamEvents, processRuntimeEvent).pipe(Effect.forkScoped), ).pipe(Effect.asVoid); const recoverSessionForThread = Effect.fn("recoverSessionForThread")(function* (input: {