Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,157 @@ describe("OrchestrationEngine", () => {
await system.dispose();
});

it("prioritizes client commands ahead of queued internal stream commands", async () => {
const system = await createOrchestrationSystem();
const { engine } = system;
const createdAt = now();
const threadId = ThreadId.makeUnsafe("thread-priority-lane");

await system.run(
engine.dispatch({
type: "project.create",
commandId: CommandId.makeUnsafe("cmd-project-priority-lane-create"),
projectId: asProjectId("project-priority-lane"),
title: "Priority Lane Project",
workspaceRoot: "/tmp/project-priority-lane",
defaultModelSelection: {
provider: "codex",
model: "gpt-5-codex",
},
createdAt,
}),
);
await system.run(
engine.dispatch({
type: "thread.create",
commandId: CommandId.makeUnsafe("cmd-thread-priority-lane-create"),
threadId,
projectId: asProjectId("project-priority-lane"),
title: "Priority Lane Thread",
modelSelection: {
provider: "codex",
model: "gpt-5-codex",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "full-access",
branch: null,
worktreePath: null,
createdAt,
}),
);

const lowPriorityCommandCount = 250;
const lowPriorityDispatches = Array.from({ length: lowPriorityCommandCount }, (_, index) =>
system.run(
engine.dispatch({
type: "thread.message.assistant.delta",
commandId: CommandId.makeUnsafe(`cmd-thread-priority-lane-delta-${index}`),
threadId,
messageId: asMessageId("assistant-priority-lane"),
delta: `chunk-${index}`,
turnId: asTurnId("turn-priority-lane"),
createdAt,
}),
),
);

const archiveResult = await system.run(
engine.dispatch({
type: "thread.archive",
commandId: CommandId.makeUnsafe("cmd-thread-priority-lane-archive"),
threadId,
}),
);
const lowPriorityResults = await Promise.all(lowPriorityDispatches);
const lowPriorityCommandsAheadOfArchive = lowPriorityResults.filter(
(result) => result.sequence < archiveResult.sequence,
).length;

expect(lowPriorityCommandsAheadOfArchive).toBeLessThan(lowPriorityCommandCount / 3);
expect(archiveResult.sequence).toBeLessThan(lowPriorityResults.at(-1)?.sequence ?? Infinity);
await system.dispose();
});

it("treats normalized thread.turn.start as a prioritized client command", async () => {
const system = await createOrchestrationSystem();
const { engine } = system;
const createdAt = now();
const threadId = ThreadId.makeUnsafe("thread-priority-turn-start");

await system.run(
engine.dispatch({
type: "project.create",
commandId: CommandId.makeUnsafe("cmd-project-priority-turn-start-create"),
projectId: asProjectId("project-priority-turn-start"),
title: "Priority Turn Start Project",
workspaceRoot: "/tmp/project-priority-turn-start",
defaultModelSelection: {
provider: "codex",
model: "gpt-5-codex",
},
createdAt,
}),
);
await system.run(
engine.dispatch({
type: "thread.create",
commandId: CommandId.makeUnsafe("cmd-thread-priority-turn-start-create"),
threadId,
projectId: asProjectId("project-priority-turn-start"),
title: "Priority Turn Start Thread",
modelSelection: {
provider: "codex",
model: "gpt-5-codex",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "full-access",
branch: null,
worktreePath: null,
createdAt,
}),
);

const lowPriorityCommandCount = 150;
const lowPriorityDispatches = Array.from({ length: lowPriorityCommandCount }, (_, index) =>
system.run(
engine.dispatch({
type: "thread.message.assistant.delta",
commandId: CommandId.makeUnsafe(`cmd-thread-priority-turn-start-delta-${index}`),
threadId,
messageId: asMessageId("assistant-priority-turn-start"),
delta: `chunk-${index}`,
turnId: asTurnId("turn-priority-turn-start-internal"),
createdAt,
}),
),
);

const turnStartResult = await system.run(
engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.makeUnsafe("cmd-thread-priority-turn-start"),
threadId,
message: {
messageId: asMessageId("user-priority-turn-start"),
role: "user",
text: "hello",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "full-access",
createdAt,
}),
);
const lowPriorityResults = await Promise.all(lowPriorityDispatches);
const lowPriorityCommandsAheadOfTurnStart = lowPriorityResults.filter(
(result) => result.sequence < turnStartResult.sequence,
).length;

expect(lowPriorityCommandsAheadOfTurnStart).toBeLessThan(lowPriorityCommandCount / 3);
expect(turnStartResult.sequence).toBeLessThan(lowPriorityResults.at(-1)?.sequence ?? Infinity);
await system.dispose();
});

it("streams persisted domain events in order", async () => {
const system = await createOrchestrationSystem();
const { engine } = system;
Expand Down
58 changes: 53 additions & 5 deletions apps/server/src/orchestration/Layers/OrchestrationEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,18 @@ import type {
ProjectId,
ThreadId,
} from "@t3tools/contracts";
import { OrchestrationCommand } from "@t3tools/contracts";
import { Deferred, Effect, Layer, Option, PubSub, Queue, Schema, Stream } from "effect";
import { DispatchableClientOrchestrationCommand, OrchestrationCommand } from "@t3tools/contracts";
import {
Deferred,
Effect,
Layer,
Option,
Order,
PubSub,
Schema,
Stream,
TxPriorityQueue,
} from "effect";
import * as SqlClient from "effect/unstable/sql/SqlClient";

import { toPersistenceSqlError } from "../../persistence/Errors.ts";
Expand All @@ -29,6 +39,24 @@ interface CommandEnvelope {
result: Deferred.Deferred<{ sequence: number }, OrchestrationDispatchError>;
}

type CommandPriority = 0 | 1;

interface PrioritizedCommandEnvelope {
readonly priority: CommandPriority;
readonly insertionSequence: number;
readonly envelope: CommandEnvelope;
}

const COMMAND_PRIORITY = {
control: 0,
stream: 1,
} as const satisfies Record<string, CommandPriority>;

const prioritizedCommandEnvelopeOrder = Order.combine(
Order.mapInput(Order.Number, (item: PrioritizedCommandEnvelope) => item.priority),
Order.mapInput(Order.Number, (item: PrioritizedCommandEnvelope) => item.insertionSequence),
);

function commandToAggregateRef(command: OrchestrationCommand): {
readonly aggregateKind: "project" | "thread";
readonly aggregateId: ProjectId | ThreadId;
Expand All @@ -49,15 +77,25 @@ function commandToAggregateRef(command: OrchestrationCommand): {
}
}

function commandPriority(command: OrchestrationCommand): CommandPriority {
if (Schema.is(DispatchableClientOrchestrationCommand)(command)) {
return COMMAND_PRIORITY.control;
}
return COMMAND_PRIORITY.stream;
}

const makeOrchestrationEngine = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;
const eventStore = yield* OrchestrationEventStore;
const commandReceiptRepository = yield* OrchestrationCommandReceiptRepository;
const projectionPipeline = yield* OrchestrationProjectionPipeline;

let readModel = createEmptyReadModel(new Date().toISOString());
let nextCommandInsertionSequence = 0;

const commandQueue = yield* Queue.unbounded<CommandEnvelope>();
const commandQueue = yield* TxPriorityQueue.empty<PrioritizedCommandEnvelope>(
prioritizedCommandEnvelopeOrder,
);
const eventPubSub = yield* PubSub.unbounded<OrchestrationEvent>();

const processEnvelope = (envelope: CommandEnvelope): Effect.Effect<void> => {
Expand Down Expand Up @@ -203,7 +241,13 @@ const makeOrchestrationEngine = Effect.gen(function* () {
}),
);

const worker = Effect.forever(Queue.take(commandQueue).pipe(Effect.flatMap(processEnvelope)));
const worker = Effect.forever(
TxPriorityQueue.take(commandQueue).pipe(
Effect.tx,
Effect.map((item) => item.envelope),
Effect.flatMap(processEnvelope),
),
);
yield* Effect.forkScoped(worker);
yield* Effect.logDebug("orchestration engine started").pipe(
Effect.annotateLogs({ sequence: readModel.snapshotSequence }),
Expand All @@ -218,7 +262,11 @@ const makeOrchestrationEngine = Effect.gen(function* () {
const dispatch: OrchestrationEngineShape["dispatch"] = (command) =>
Effect.gen(function* () {
const result = yield* Deferred.make<{ sequence: number }, OrchestrationDispatchError>();
yield* Queue.offer(commandQueue, { command, result });
yield* TxPriorityQueue.offer(commandQueue, {
priority: commandPriority(command),
insertionSequence: nextCommandInsertionSequence++,
envelope: { command, result },
}).pipe(Effect.tx);
return yield* Deferred.await(result);
});

Expand Down
2 changes: 1 addition & 1 deletion packages/contracts/src/orchestration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ const ThreadSessionStopCommand = Schema.Struct({
createdAt: IsoDateTime,
});

const DispatchableClientOrchestrationCommand = Schema.Union([
export const DispatchableClientOrchestrationCommand = Schema.Union([
ProjectCreateCommand,
ProjectMetaUpdateCommand,
ProjectDeleteCommand,
Expand Down
Loading
Loading