From 3bec3d2c8de7fa601583e7c1fc4a8d4da178e742 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Thu, 2 Apr 2026 02:21:12 -0700 Subject: [PATCH] Optimize streaming message projection path Co-authored-by: codex --- .../Layers/ProjectionPipeline.ts | 43 ++++--- .../Layers/ProjectionThreadMessages.test.ts | 82 ++++++++++++- .../Layers/ProjectionThreadMessages.ts | 110 +++++++++++++++--- .../Services/ProjectionThreadMessages.ts | 35 +++++- 4 files changed, 241 insertions(+), 29 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 61de04ad0e..9458539757 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -623,31 +623,46 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti )(function* (event, attachmentSideEffects) { switch (event.type) { case "thread.message-sent": { - const existingRows = yield* projectionThreadMessageRepository.listByThreadId({ - threadId: event.payload.threadId, - }); - const existingMessage = existingRows.find( - (row) => row.messageId === event.payload.messageId, - ); - const nextText = - existingMessage && event.payload.streaming - ? `${existingMessage.text}${event.payload.text}` - : existingMessage && event.payload.text.length === 0 - ? existingMessage.text - : event.payload.text; const nextAttachments = event.payload.attachments !== undefined ? yield* materializeAttachmentsForProjection({ attachments: event.payload.attachments, }) - : existingMessage?.attachments; + : undefined; + if (event.payload.streaming) { + yield* projectionThreadMessageRepository.appendTextDelta({ + messageId: event.payload.messageId, + threadId: event.payload.threadId, + turnId: event.payload.turnId, + role: event.payload.role, + delta: event.payload.text, + ...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}), + isStreaming: true, + createdAt: event.payload.createdAt, + updatedAt: event.payload.updatedAt, + }); + return; + } + + const existingMessage = yield* projectionThreadMessageRepository + .getByMessageId({ + messageId: event.payload.messageId, + }) + .pipe(Effect.map(Option.getOrUndefined)); + const nextText = + existingMessage !== undefined && event.payload.text.length === 0 + ? existingMessage.text + : event.payload.text; + const persistedAttachments = nextAttachments ?? existingMessage?.attachments; yield* projectionThreadMessageRepository.upsert({ messageId: event.payload.messageId, threadId: event.payload.threadId, turnId: event.payload.turnId, role: event.payload.role, text: nextText, - ...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}), + ...(persistedAttachments !== undefined + ? { attachments: [...persistedAttachments] } + : {}), isStreaming: event.payload.streaming, createdAt: existingMessage?.createdAt ?? event.payload.createdAt, updatedAt: event.payload.updatedAt, diff --git a/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts b/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts index b761387d47..02e63e2d4f 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadMessages.test.ts @@ -1,6 +1,6 @@ import { MessageId, ThreadId } from "@t3tools/contracts"; import { assert, it } from "@effect/vitest"; -import { Effect, Layer } from "effect"; +import { Effect, Layer, Option } from "effect"; import { ProjectionThreadMessageRepository } from "../Services/ProjectionThreadMessages.ts"; import { ProjectionThreadMessageRepositoryLive } from "./ProjectionThreadMessages.ts"; @@ -103,4 +103,84 @@ layer("ProjectionThreadMessageRepository", (it) => { assert.deepEqual(rows[0]?.attachments, []); }), ); + + it.effect("looks up a projected message by id", () => + Effect.gen(function* () { + const repository = yield* ProjectionThreadMessageRepository; + const threadId = ThreadId.makeUnsafe("thread-get-by-message-id"); + const messageId = MessageId.makeUnsafe("message-get-by-message-id"); + const createdAt = "2026-02-28T19:20:00.000Z"; + const updatedAt = "2026-02-28T19:20:01.000Z"; + + yield* repository.upsert({ + messageId, + threadId, + turnId: null, + role: "assistant", + text: "lookup me", + isStreaming: false, + createdAt, + updatedAt, + }); + + const maybeRow = yield* repository.getByMessageId({ messageId }); + assert.isTrue(Option.isSome(maybeRow)); + const row = Option.getOrThrow(maybeRow); + assert.equal(row.messageId, messageId); + assert.equal(row.threadId, threadId); + assert.equal(row.text, "lookup me"); + assert.equal(row.createdAt, createdAt); + assert.equal(row.updatedAt, updatedAt); + }), + ); + + it.effect("appends streaming deltas without losing createdAt or attachments", () => + Effect.gen(function* () { + const repository = yield* ProjectionThreadMessageRepository; + const threadId = ThreadId.makeUnsafe("thread-append-delta"); + const messageId = MessageId.makeUnsafe("message-append-delta"); + const createdAt = "2026-02-28T19:30:00.000Z"; + const persistedAttachments = [ + { + type: "image" as const, + id: "thread-append-delta-att-1", + name: "example.png", + mimeType: "image/png", + sizeBytes: 5, + }, + ]; + + yield* repository.appendTextDelta({ + messageId, + threadId, + turnId: null, + role: "assistant", + delta: "Hello", + attachments: persistedAttachments, + isStreaming: true, + createdAt, + updatedAt: "2026-02-28T19:30:01.000Z", + }); + + yield* repository.appendTextDelta({ + messageId, + threadId, + turnId: null, + role: "assistant", + delta: " world", + isStreaming: true, + createdAt: "2026-02-28T19:30:05.000Z", + updatedAt: "2026-02-28T19:30:06.000Z", + }); + + const maybeRow = yield* repository.getByMessageId({ messageId }); + assert.isTrue(Option.isSome(maybeRow)); + const row = Option.getOrThrow(maybeRow); + assert.equal(row.text, "Hello world"); + assert.equal(row.createdAt, createdAt); + assert.equal(row.updatedAt, "2026-02-28T19:30:06.000Z"); + assert.deepEqual(row.attachments, persistedAttachments); + assert.isTrue(row.isStreaming); + }), + ); }); diff --git a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts index 6f0b25ddff..deda12b5e2 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadMessages.ts @@ -1,10 +1,12 @@ import * as SqlClient from "effect/unstable/sql/SqlClient"; import * as SqlSchema from "effect/unstable/sql/SqlSchema"; -import { Effect, Layer, Schema, Struct } from "effect"; +import { Effect, Layer, Option, Schema, Struct } from "effect"; import { ChatAttachment } from "@t3tools/contracts"; import { toPersistenceSqlError } from "../Errors.ts"; import { + AppendProjectionThreadMessageDeltaInput, + GetProjectionThreadMessageInput, ProjectionThreadMessageRepository, type ProjectionThreadMessageRepositoryShape, DeleteProjectionThreadMessagesInput, @@ -18,6 +20,19 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields( attachments: Schema.NullOr(Schema.fromJsonString(Schema.Array(ChatAttachment))), }), ); +type ProjectionThreadMessageDbRow = typeof ProjectionThreadMessageDbRowSchema.Type; + +const toProjectionThreadMessage = (row: ProjectionThreadMessageDbRow) => ({ + messageId: row.messageId, + threadId: row.threadId, + turnId: row.turnId, + role: row.role, + text: row.text, + isStreaming: row.isStreaming === 1, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + ...(row.attachments !== null ? { attachments: row.attachments } : {}), +}); const makeProjectionThreadMessageRepository = Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; @@ -95,6 +110,70 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { `, }); + const getProjectionThreadMessageRow = SqlSchema.findOneOption({ + Request: GetProjectionThreadMessageInput, + Result: ProjectionThreadMessageDbRowSchema, + execute: ({ messageId }) => + sql` + SELECT + message_id AS "messageId", + thread_id AS "threadId", + turn_id AS "turnId", + role, + text, + attachments_json AS "attachments", + is_streaming AS "isStreaming", + created_at AS "createdAt", + updated_at AS "updatedAt" + FROM projection_thread_messages + WHERE message_id = ${messageId} + `, + }); + + const appendProjectionThreadMessageDeltaRow = SqlSchema.void({ + Request: AppendProjectionThreadMessageDeltaInput, + execute: (row) => { + const nextAttachmentsJson = + row.attachments !== undefined ? JSON.stringify(row.attachments) : null; + return sql` + INSERT INTO projection_thread_messages ( + message_id, + thread_id, + turn_id, + role, + text, + attachments_json, + is_streaming, + created_at, + updated_at + ) + VALUES ( + ${row.messageId}, + ${row.threadId}, + ${row.turnId}, + ${row.role}, + ${row.delta}, + ${nextAttachmentsJson}, + ${row.isStreaming ? 1 : 0}, + ${row.createdAt}, + ${row.updatedAt} + ) + ON CONFLICT (message_id) + DO UPDATE SET + thread_id = excluded.thread_id, + turn_id = COALESCE(excluded.turn_id, projection_thread_messages.turn_id), + role = excluded.role, + text = projection_thread_messages.text || excluded.text, + attachments_json = COALESCE( + excluded.attachments_json, + projection_thread_messages.attachments_json + ), + is_streaming = excluded.is_streaming, + updated_at = excluded.updated_at + `; + }, + }); + const deleteProjectionThreadMessageRows = SqlSchema.void({ Request: DeleteProjectionThreadMessagesInput, execute: ({ threadId }) => @@ -114,18 +193,21 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { Effect.mapError( toPersistenceSqlError("ProjectionThreadMessageRepository.listByThreadId:query"), ), - Effect.map((rows) => - rows.map((row) => ({ - messageId: row.messageId, - threadId: row.threadId, - turnId: row.turnId, - role: row.role, - text: row.text, - isStreaming: row.isStreaming === 1, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - ...(row.attachments !== null ? { attachments: row.attachments } : {}), - })), + Effect.map((rows) => rows.map(toProjectionThreadMessage)), + ); + + const getByMessageId: ProjectionThreadMessageRepositoryShape["getByMessageId"] = (input) => + getProjectionThreadMessageRow(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadMessageRepository.getByMessageId:query"), + ), + Effect.map(Option.map(toProjectionThreadMessage)), + ); + + const appendTextDelta: ProjectionThreadMessageRepositoryShape["appendTextDelta"] = (input) => + appendProjectionThreadMessageDeltaRow(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadMessageRepository.appendTextDelta:query"), ), ); @@ -139,6 +221,8 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () { return { upsert, listByThreadId, + getByMessageId, + appendTextDelta, deleteByThreadId, } satisfies ProjectionThreadMessageRepositoryShape; }); diff --git a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts index 00b1d399c6..bfb4fc8899 100644 --- a/apps/server/src/persistence/Services/ProjectionThreadMessages.ts +++ b/apps/server/src/persistence/Services/ProjectionThreadMessages.ts @@ -14,7 +14,7 @@ import { TurnId, IsoDateTime, } from "@t3tools/contracts"; -import { Schema, ServiceMap } from "effect"; +import { Option, Schema, ServiceMap } from "effect"; import type { Effect } from "effect"; import type { ProjectionRepositoryError } from "../Errors.ts"; @@ -37,11 +37,30 @@ export const ListProjectionThreadMessagesInput = Schema.Struct({ }); export type ListProjectionThreadMessagesInput = typeof ListProjectionThreadMessagesInput.Type; +export const GetProjectionThreadMessageInput = Schema.Struct({ + messageId: MessageId, +}); +export type GetProjectionThreadMessageInput = typeof GetProjectionThreadMessageInput.Type; + export const DeleteProjectionThreadMessagesInput = Schema.Struct({ threadId: ThreadId, }); export type DeleteProjectionThreadMessagesInput = typeof DeleteProjectionThreadMessagesInput.Type; +export const AppendProjectionThreadMessageDeltaInput = Schema.Struct({ + messageId: MessageId, + threadId: ThreadId, + turnId: Schema.NullOr(TurnId), + role: OrchestrationMessageRole, + delta: Schema.String, + attachments: Schema.optional(Schema.Array(ChatAttachment)), + isStreaming: Schema.Boolean, + createdAt: IsoDateTime, + updatedAt: IsoDateTime, +}); +export type AppendProjectionThreadMessageDeltaInput = + typeof AppendProjectionThreadMessageDeltaInput.Type; + /** * ProjectionThreadMessageRepositoryShape - Service API for projected thread messages. */ @@ -64,6 +83,20 @@ export interface ProjectionThreadMessageRepositoryShape { input: ListProjectionThreadMessagesInput, ) => Effect.Effect, ProjectionRepositoryError>; + /** + * Look up a projected message by id. + */ + readonly getByMessageId: ( + input: GetProjectionThreadMessageInput, + ) => Effect.Effect, ProjectionRepositoryError>; + + /** + * Append a streaming text delta to an existing projected message row, or insert it. + */ + readonly appendTextDelta: ( + input: AppendProjectionThreadMessageDeltaInput, + ) => Effect.Effect; + /** * Delete projected thread messages by thread. */