Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointRea
import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts";
import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts";
import { RuntimeReceiptBusLive } from "../src/orchestration/Layers/RuntimeReceiptBus.ts";
import { RuntimeReceiptBusTest } from "../src/orchestration/Layers/RuntimeReceiptBus.ts";
import { OrchestrationReactorLive } from "../src/orchestration/Layers/OrchestrationReactor.ts";
import { ProviderCommandReactorLive } from "../src/orchestration/Layers/ProviderCommandReactor.ts";
import { ProviderRuntimeIngestionLive } from "../src/orchestration/Layers/ProviderRuntimeIngestion.ts";
Expand Down Expand Up @@ -297,7 +297,7 @@ export const makeOrchestrationIntegrationHarness = (
ProjectionPendingApprovalRepositoryLive,
checkpointStoreLayer,
providerLayer,
RuntimeReceiptBusLive,
RuntimeReceiptBusTest,
);
const serverSettingsLayer = ServerSettingsService.layerTest();
const runtimeIngestionLayer = ProviderRuntimeIngestionLive.pipe(
Expand Down Expand Up @@ -376,7 +376,7 @@ export const makeOrchestrationIntegrationHarness = (
runtime.runPromise(reactor.start().pipe(Scope.provide(scope))),
).pipe(Effect.orDie);
const receiptHistory = yield* Ref.make<ReadonlyArray<OrchestrationRuntimeReceipt>>([]);
yield* Stream.runForEach(runtimeReceiptBus.stream, (receipt) =>
yield* Stream.runForEach(runtimeReceiptBus.streamEventsForTest, (receipt) =>
Ref.update(receiptHistory, (history) => [...history, receipt]).pipe(Effect.asVoid),
).pipe(Effect.forkIn(scope));
yield* Effect.sleep(10);
Expand Down
10 changes: 5 additions & 5 deletions apps/server/integration/providerService.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const collectEventsDuring = <A, E, R>(
Effect.forkScoped,
);

yield* Effect.sleep("50 millis");
yield* action;

return yield* Effect.forEach(
Expand All @@ -104,7 +105,6 @@ const runTurn = (input: {
}) =>
Effect.gen(function* () {
yield* input.harness.queueTurnResponse(input.threadId, input.response);

return yield* collectEventsDuring(
input.provider.streamEvents,
input.response.events.length,
Expand All @@ -116,7 +116,7 @@ const runTurn = (input: {
);
});

it.effect("replays typed runtime fixture events", () =>
it.live("replays typed runtime fixture events", () =>
Effect.gen(function* () {
const fixture = yield* makeIntegrationFixture;

Expand Down Expand Up @@ -149,7 +149,7 @@ it.effect("replays typed runtime fixture events", () =>
}).pipe(Effect.provide(NodeServices.layer)),
);

it.effect("replays file-changing fixture turn events", () =>
it.live("replays file-changing fixture turn events", () =>
Effect.gen(function* () {
const fixture = yield* makeIntegrationFixture;
const { join } = yield* Path.Path;
Expand Down Expand Up @@ -188,7 +188,7 @@ it.effect("replays file-changing fixture turn events", () =>
}).pipe(Effect.provide(NodeServices.layer)),
);

it.effect("runs multi-turn tool/approval flow", () =>
it.live("runs multi-turn tool/approval flow", () =>
Effect.gen(function* () {
const fixture = yield* makeIntegrationFixture;
const { join } = yield* Path.Path;
Expand Down Expand Up @@ -242,7 +242,7 @@ it.effect("runs multi-turn tool/approval flow", () =>
}).pipe(Effect.provide(NodeServices.layer)),
);

it.effect("rolls back provider conversation state only", () =>
it.live("rolls back provider conversation state only", () =>
Effect.gen(function* () {
const fixture = yield* makeIntegrationFixture;
const { join } = yield* Path.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ function createProviderServiceHarness(
listSessions,
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
rollbackConversation,
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
get streamEvents() {
return Stream.fromPubSub(runtimeEventPubSub);
},
};

const emit = (event: LegacyProviderRuntimeEvent): void => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ describe("ProviderCommandReactor", () => {
sessionModelSwitch: input?.sessionModelSwitch ?? "in-session",
}),
rollbackConversation: () => unsupported(),
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
get streamEvents() {
return Stream.fromPubSub(runtimeEventPubSub);
},
};

const orchestrationLayer = OrchestrationEngineLive.pipe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ function createProviderServiceHarness() {
listSessions: () => Effect.succeed([...runtimeSessions]),
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
rollbackConversation: () => unsupported(),
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
get streamEvents() {
return Stream.fromPubSub(runtimeEventPubSub);
},
};

const setSession = (session: ProviderSession): void => {
Expand Down
22 changes: 20 additions & 2 deletions apps/server/src/orchestration/Layers/RuntimeReceiptBus.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
/**
* RuntimeReceiptBus layers.
*
* `RuntimeReceiptBusLive` is the production default and intentionally does not
* retain or broadcast receipts. `RuntimeReceiptBusTest` installs the in-memory
* PubSub-backed implementation used by integration tests that need to await
* checkpoint-reactor milestones precisely.
*
* @module RuntimeReceiptBus
*/
import { Effect, Layer, PubSub, Stream } from "effect";

import {
Expand All @@ -6,13 +16,21 @@ import {
type OrchestrationRuntimeReceipt,
} from "../Services/RuntimeReceiptBus.ts";

const makeRuntimeReceiptBus = Effect.gen(function* () {
const makeRuntimeReceiptBus = Effect.succeed({
publish: () => Effect.void,
streamEventsForTest: Stream.empty,
} satisfies RuntimeReceiptBusShape);

const makeRuntimeReceiptBusTest = Effect.gen(function* () {
const pubSub = yield* PubSub.unbounded<OrchestrationRuntimeReceipt>();

return {
publish: (receipt) => PubSub.publish(pubSub, receipt).pipe(Effect.asVoid),
stream: Stream.fromPubSub(pubSub),
get streamEventsForTest() {
return Stream.fromPubSub(pubSub);
},
} satisfies RuntimeReceiptBusShape;
});

export const RuntimeReceiptBusLive = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBus);
export const RuntimeReceiptBusTest = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBusTest);
18 changes: 17 additions & 1 deletion apps/server/src/orchestration/Services/RuntimeReceiptBus.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/**
* RuntimeReceiptBus - Internal checkpoint-reactor synchronization receipts.
*
* This service exists to expose short-lived orchestration milestones that are
* useful in tests and harnesses but are not part of the production runtime
* event model. `CheckpointReactor` publishes receipts such as baseline capture,
* diff finalization, and turn-processing quiescence so integration tests can
* wait for those exact points without inferring them indirectly from persisted
* state.
*
* Production code should only call `publish`. Test code may subscribe via
* `streamEventsForTest`, which is intentionally named to make the intended
* usage explicit.
*
* @module RuntimeReceiptBus
*/
import { CheckpointRef, IsoDateTime, NonNegativeInt, ThreadId, TurnId } from "@t3tools/contracts";
import { Schema, ServiceMap } from "effect";
import type { Effect, Stream } from "effect";
Expand Down Expand Up @@ -40,7 +56,7 @@ export type OrchestrationRuntimeReceipt = typeof OrchestrationRuntimeReceipt.Typ

export interface RuntimeReceiptBusShape {
readonly publish: (receipt: OrchestrationRuntimeReceipt) => Effect.Effect<void>;
readonly stream: Stream.Stream<OrchestrationRuntimeReceipt>;
readonly streamEventsForTest: Stream.Stream<OrchestrationRuntimeReceipt>;
}

export class RuntimeReceiptBus extends ServiceMap.Service<
Expand Down
4 changes: 3 additions & 1 deletion apps/server/src/provider/Layers/ClaudeAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3054,7 +3054,9 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* (
listSessions,
hasSession,
stopAll,
streamEvents: Stream.fromQueue(runtimeEventQueue),
get streamEvents() {
return Stream.fromQueue(runtimeEventQueue);
},
} satisfies ClaudeAdapterShape;
});

Expand Down
4 changes: 3 additions & 1 deletion apps/server/src/provider/Layers/CodexAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1631,7 +1631,9 @@ const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
listSessions,
hasSession,
stopAll,
streamEvents: Stream.fromQueue(runtimeEventQueue),
get streamEvents() {
return Stream.fromQueue(runtimeEventQueue);
},
} satisfies CodexAdapterShape;
});

Expand Down
43 changes: 43 additions & 0 deletions apps/server/src/provider/Layers/EventNdjsonLogger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,49 @@ describe("EventNdjsonLogger", () => {
}),
);

it.effect("serializes concurrent first writes for the same segment", () =>
Effect.gen(function* () {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-"));
const basePath = path.join(tempDir, "provider-canonical.ndjson");

try {
const logger = yield* makeEventNdjsonLogger(basePath, {
stream: "canonical",
batchWindowMs: 0,
});
assert.notEqual(logger, undefined);
if (!logger) {
return;
}

yield* Effect.all(
[
logger.write({ id: "evt-concurrent-1" }, null),
logger.write({ id: "evt-concurrent-2" }, null),
],
{ concurrency: "unbounded" },
);
yield* logger.close();

const globalPath = path.join(tempDir, "_global.log");
assert.equal(fs.existsSync(globalPath), true);
const lines = fs
.readFileSync(globalPath, "utf8")
.trim()
.split("\n")
.map((line) => parseLogLine(line));

assert.equal(lines.length, 2);
assert.deepEqual(lines.map((line) => line.payload).toSorted(), [
'{"id":"evt-concurrent-1"}',
'{"id":"evt-concurrent-2"}',
]);
} finally {
fs.rmSync(tempDir, { recursive: true, force: true });
}
}),
);

it.effect("rotates per-thread files when max size is exceeded", () =>
Effect.gen(function* () {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-"));
Expand Down
92 changes: 65 additions & 27 deletions apps/server/src/provider/Layers/EventNdjsonLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import path from "node:path";

import type { ThreadId } from "@t3tools/contracts";
import { RotatingFileSink } from "@t3tools/shared/logging";
import { Effect, Exit, Logger, Scope } from "effect";
import { Effect, Exit, Logger, Scope, SynchronizedRef } from "effect";

import { toSafeThreadAttachmentSegment } from "../../attachmentStore.ts";

Expand Down Expand Up @@ -40,6 +40,11 @@ interface ThreadWriter {
close: () => Effect.Effect<void>;
}

interface LoggerState {
readonly threadWriters: Map<string, ThreadWriter>;
readonly failedSegments: Set<string>;
}

function logWarning(message: string, context: Record<string, unknown>): Effect.Effect<void> {
return Effect.logWarning(message, context).pipe(Effect.annotateLogs({ scope: LOG_SCOPE }));
}
Expand Down Expand Up @@ -193,34 +198,56 @@ export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function
return undefined;
}

const threadWriters = new Map<string, ThreadWriter>();
const failedSegments = new Set<string>();
const stateRef = yield* SynchronizedRef.make<LoggerState>({
threadWriters: new Map(),
failedSegments: new Set(),
});

const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* (
threadSegment: string,
): Effect.fn.Return<ThreadWriter | undefined> {
if (failedSegments.has(threadSegment)) {
return undefined;
}
const existing = threadWriters.get(threadSegment);
if (existing) {
return existing;
}
return yield* SynchronizedRef.modifyEffect(stateRef, (state) => {
if (state.failedSegments.has(threadSegment)) {
return Effect.succeed([undefined, state] as const);
}

const writer = yield* makeThreadWriter({
filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
maxBytes,
maxFiles,
batchWindowMs,
streamLabel,
});
if (!writer) {
failedSegments.add(threadSegment);
return undefined;
}
const existing = state.threadWriters.get(threadSegment);
if (existing) {
return Effect.succeed([existing, state] as const);
}

return makeThreadWriter({
filePath: path.join(path.dirname(filePath), `${threadSegment}.log`),
maxBytes,
maxFiles,
batchWindowMs,
streamLabel,
}).pipe(
Effect.map((writer) => {
if (!writer) {
const nextFailedSegments = new Set(state.failedSegments);
nextFailedSegments.add(threadSegment);
return [
undefined,
{
...state,
failedSegments: nextFailedSegments,
},
] as const;
}

threadWriters.set(threadSegment, writer);
return writer;
const nextThreadWriters = new Map(state.threadWriters);
nextThreadWriters.set(threadSegment, writer);
return [
writer,
{
...state,
threadWriters: nextThreadWriters,
},
] as const;
}),
);
});
});

const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) {
Expand All @@ -239,10 +266,21 @@ export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function
});

const close = Effect.fn("close")(function* () {
for (const writer of threadWriters.values()) {
yield* writer.close();
}
threadWriters.clear();
yield* SynchronizedRef.modifyEffect(stateRef, (state) =>
Effect.gen(function* () {
for (const writer of state.threadWriters.values()) {
yield* writer.close();
}

return [
undefined,
{
threadWriters: new Map<string, ThreadWriter>(),
failedSegments: new Set<string>(),
},
] as const;
}),
);
});

return {
Expand Down
4 changes: 3 additions & 1 deletion apps/server/src/provider/Layers/ProviderRegistry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ function makeMutableServerSettingsService(
yield* PubSub.publish(changes, next);
return next;
}),
streamChanges: Stream.fromPubSub(changes),
get streamChanges() {
return Stream.fromPubSub(changes);
},
} satisfies ServerSettingsShape;
});
}
Expand Down
4 changes: 3 additions & 1 deletion apps/server/src/provider/Layers/ProviderService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ function makeFakeCodexAdapter(provider: ProviderKind = "codex") {
readThread,
rollbackThread,
stopAll,
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
get streamEvents() {
return Stream.fromPubSub(runtimeEventPubSub);
},
};

const emit = (event: LegacyProviderRuntimeEvent): void => {
Expand Down
Loading
Loading