Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -623,31 +623,46 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
)(function* (event, attachmentSideEffects) {
switch (event.type) {
case "thread.message-sent": {
const existingRows = yield* projectionThreadMessageRepository.listByThreadId({
threadId: event.payload.threadId,
});
const existingMessage = existingRows.find(
(row) => row.messageId === event.payload.messageId,
);
const nextText =
existingMessage && event.payload.streaming
? `${existingMessage.text}${event.payload.text}`
: existingMessage && event.payload.text.length === 0
? existingMessage.text
: event.payload.text;
const nextAttachments =
event.payload.attachments !== undefined
? yield* materializeAttachmentsForProjection({
attachments: event.payload.attachments,
})
: existingMessage?.attachments;
: undefined;
if (event.payload.streaming) {
yield* projectionThreadMessageRepository.appendTextDelta({
messageId: event.payload.messageId,
threadId: event.payload.threadId,
turnId: event.payload.turnId,
role: event.payload.role,
delta: event.payload.text,
...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}),
isStreaming: true,
createdAt: event.payload.createdAt,
updatedAt: event.payload.updatedAt,
});
return;
}

const existingMessage = yield* projectionThreadMessageRepository
.getByMessageId({
messageId: event.payload.messageId,
})
.pipe(Effect.map(Option.getOrUndefined));
const nextText =
existingMessage !== undefined && event.payload.text.length === 0
? existingMessage.text
: event.payload.text;
const persistedAttachments = nextAttachments ?? existingMessage?.attachments;
yield* projectionThreadMessageRepository.upsert({
messageId: event.payload.messageId,
threadId: event.payload.threadId,
turnId: event.payload.turnId,
role: event.payload.role,
text: nextText,
...(nextAttachments !== undefined ? { attachments: [...nextAttachments] } : {}),
...(persistedAttachments !== undefined
? { attachments: [...persistedAttachments] }
: {}),
isStreaming: event.payload.streaming,
createdAt: existingMessage?.createdAt ?? event.payload.createdAt,
updatedAt: event.payload.updatedAt,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { MessageId, ThreadId } from "@t3tools/contracts";
import { assert, it } from "@effect/vitest";
import { Effect, Layer } from "effect";
import { Effect, Layer, Option } from "effect";

import { ProjectionThreadMessageRepository } from "../Services/ProjectionThreadMessages.ts";
import { ProjectionThreadMessageRepositoryLive } from "./ProjectionThreadMessages.ts";
Expand Down Expand Up @@ -103,4 +103,84 @@ layer("ProjectionThreadMessageRepository", (it) => {
assert.deepEqual(rows[0]?.attachments, []);
}),
);

it.effect("looks up a projected message by id", () =>
Effect.gen(function* () {
const repository = yield* ProjectionThreadMessageRepository;
const threadId = ThreadId.makeUnsafe("thread-get-by-message-id");
const messageId = MessageId.makeUnsafe("message-get-by-message-id");
const createdAt = "2026-02-28T19:20:00.000Z";
const updatedAt = "2026-02-28T19:20:01.000Z";

yield* repository.upsert({
messageId,
threadId,
turnId: null,
role: "assistant",
text: "lookup me",
isStreaming: false,
createdAt,
updatedAt,
});

const maybeRow = yield* repository.getByMessageId({ messageId });
assert.isTrue(Option.isSome(maybeRow));
const row = Option.getOrThrow(maybeRow);
assert.equal(row.messageId, messageId);
assert.equal(row.threadId, threadId);
assert.equal(row.text, "lookup me");
assert.equal(row.createdAt, createdAt);
assert.equal(row.updatedAt, updatedAt);
}),
);

it.effect("appends streaming deltas without losing createdAt or attachments", () =>
Effect.gen(function* () {
const repository = yield* ProjectionThreadMessageRepository;
const threadId = ThreadId.makeUnsafe("thread-append-delta");
const messageId = MessageId.makeUnsafe("message-append-delta");
const createdAt = "2026-02-28T19:30:00.000Z";
const persistedAttachments = [
{
type: "image" as const,
id: "thread-append-delta-att-1",
name: "example.png",
mimeType: "image/png",
sizeBytes: 5,
},
];

yield* repository.appendTextDelta({
messageId,
threadId,
turnId: null,
role: "assistant",
delta: "Hello",
attachments: persistedAttachments,
isStreaming: true,
createdAt,
updatedAt: "2026-02-28T19:30:01.000Z",
});

yield* repository.appendTextDelta({
messageId,
threadId,
turnId: null,
role: "assistant",
delta: " world",
isStreaming: true,
createdAt: "2026-02-28T19:30:05.000Z",
updatedAt: "2026-02-28T19:30:06.000Z",
});

const maybeRow = yield* repository.getByMessageId({ messageId });
assert.isTrue(Option.isSome(maybeRow));
const row = Option.getOrThrow(maybeRow);
assert.equal(row.text, "Hello world");
assert.equal(row.createdAt, createdAt);
assert.equal(row.updatedAt, "2026-02-28T19:30:06.000Z");
assert.deepEqual(row.attachments, persistedAttachments);
assert.isTrue(row.isStreaming);
}),
);
});
110 changes: 97 additions & 13 deletions apps/server/src/persistence/Layers/ProjectionThreadMessages.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import * as SqlClient from "effect/unstable/sql/SqlClient";
import * as SqlSchema from "effect/unstable/sql/SqlSchema";
import { Effect, Layer, Schema, Struct } from "effect";
import { Effect, Layer, Option, Schema, Struct } from "effect";
import { ChatAttachment } from "@t3tools/contracts";

import { toPersistenceSqlError } from "../Errors.ts";
import {
AppendProjectionThreadMessageDeltaInput,
GetProjectionThreadMessageInput,
ProjectionThreadMessageRepository,
type ProjectionThreadMessageRepositoryShape,
DeleteProjectionThreadMessagesInput,
Expand All @@ -18,6 +20,19 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields(
attachments: Schema.NullOr(Schema.fromJsonString(Schema.Array(ChatAttachment))),
}),
);
type ProjectionThreadMessageDbRow = typeof ProjectionThreadMessageDbRowSchema.Type;

const toProjectionThreadMessage = (row: ProjectionThreadMessageDbRow) => ({
messageId: row.messageId,
threadId: row.threadId,
turnId: row.turnId,
role: row.role,
text: row.text,
isStreaming: row.isStreaming === 1,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
...(row.attachments !== null ? { attachments: row.attachments } : {}),
});

const makeProjectionThreadMessageRepository = Effect.gen(function* () {
const sql = yield* SqlClient.SqlClient;
Expand Down Expand Up @@ -95,6 +110,70 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
`,
});

const getProjectionThreadMessageRow = SqlSchema.findOneOption({
Request: GetProjectionThreadMessageInput,
Result: ProjectionThreadMessageDbRowSchema,
execute: ({ messageId }) =>
sql`
SELECT
message_id AS "messageId",
thread_id AS "threadId",
turn_id AS "turnId",
role,
text,
attachments_json AS "attachments",
is_streaming AS "isStreaming",
created_at AS "createdAt",
updated_at AS "updatedAt"
FROM projection_thread_messages
WHERE message_id = ${messageId}
`,
});

const appendProjectionThreadMessageDeltaRow = SqlSchema.void({
Request: AppendProjectionThreadMessageDeltaInput,
execute: (row) => {
const nextAttachmentsJson =
row.attachments !== undefined ? JSON.stringify(row.attachments) : null;
return sql`
INSERT INTO projection_thread_messages (
message_id,
thread_id,
turn_id,
role,
text,
attachments_json,
is_streaming,
created_at,
updated_at
)
VALUES (
${row.messageId},
${row.threadId},
${row.turnId},
${row.role},
${row.delta},
${nextAttachmentsJson},
${row.isStreaming ? 1 : 0},
${row.createdAt},
${row.updatedAt}
)
ON CONFLICT (message_id)
DO UPDATE SET
thread_id = excluded.thread_id,
turn_id = COALESCE(excluded.turn_id, projection_thread_messages.turn_id),
role = excluded.role,
text = projection_thread_messages.text || excluded.text,
attachments_json = COALESCE(
excluded.attachments_json,
projection_thread_messages.attachments_json
),
is_streaming = excluded.is_streaming,
updated_at = excluded.updated_at
`;
},
});

const deleteProjectionThreadMessageRows = SqlSchema.void({
Request: DeleteProjectionThreadMessagesInput,
execute: ({ threadId }) =>
Expand All @@ -114,18 +193,21 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
Effect.mapError(
toPersistenceSqlError("ProjectionThreadMessageRepository.listByThreadId:query"),
),
Effect.map((rows) =>
rows.map((row) => ({
messageId: row.messageId,
threadId: row.threadId,
turnId: row.turnId,
role: row.role,
text: row.text,
isStreaming: row.isStreaming === 1,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
...(row.attachments !== null ? { attachments: row.attachments } : {}),
})),
Effect.map((rows) => rows.map(toProjectionThreadMessage)),
);

const getByMessageId: ProjectionThreadMessageRepositoryShape["getByMessageId"] = (input) =>
getProjectionThreadMessageRow(input).pipe(
Effect.mapError(
toPersistenceSqlError("ProjectionThreadMessageRepository.getByMessageId:query"),
),
Effect.map(Option.map(toProjectionThreadMessage)),
);

const appendTextDelta: ProjectionThreadMessageRepositoryShape["appendTextDelta"] = (input) =>
appendProjectionThreadMessageDeltaRow(input).pipe(
Effect.mapError(
toPersistenceSqlError("ProjectionThreadMessageRepository.appendTextDelta:query"),
),
);

Expand All @@ -139,6 +221,8 @@ const makeProjectionThreadMessageRepository = Effect.gen(function* () {
return {
upsert,
listByThreadId,
getByMessageId,
appendTextDelta,
deleteByThreadId,
} satisfies ProjectionThreadMessageRepositoryShape;
});
Expand Down
35 changes: 34 additions & 1 deletion apps/server/src/persistence/Services/ProjectionThreadMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
TurnId,
IsoDateTime,
} from "@t3tools/contracts";
import { Schema, ServiceMap } from "effect";
import { Option, Schema, ServiceMap } from "effect";
import type { Effect } from "effect";

import type { ProjectionRepositoryError } from "../Errors.ts";
Expand All @@ -37,11 +37,30 @@ export const ListProjectionThreadMessagesInput = Schema.Struct({
});
export type ListProjectionThreadMessagesInput = typeof ListProjectionThreadMessagesInput.Type;

export const GetProjectionThreadMessageInput = Schema.Struct({
messageId: MessageId,
});
export type GetProjectionThreadMessageInput = typeof GetProjectionThreadMessageInput.Type;

export const DeleteProjectionThreadMessagesInput = Schema.Struct({
threadId: ThreadId,
});
export type DeleteProjectionThreadMessagesInput = typeof DeleteProjectionThreadMessagesInput.Type;

export const AppendProjectionThreadMessageDeltaInput = Schema.Struct({
messageId: MessageId,
threadId: ThreadId,
turnId: Schema.NullOr(TurnId),
role: OrchestrationMessageRole,
delta: Schema.String,
attachments: Schema.optional(Schema.Array(ChatAttachment)),
isStreaming: Schema.Boolean,
createdAt: IsoDateTime,
updatedAt: IsoDateTime,
});
export type AppendProjectionThreadMessageDeltaInput =
typeof AppendProjectionThreadMessageDeltaInput.Type;

/**
* ProjectionThreadMessageRepositoryShape - Service API for projected thread messages.
*/
Expand All @@ -64,6 +83,20 @@ export interface ProjectionThreadMessageRepositoryShape {
input: ListProjectionThreadMessagesInput,
) => Effect.Effect<ReadonlyArray<ProjectionThreadMessage>, ProjectionRepositoryError>;

/**
* Look up a projected message by id.
*/
readonly getByMessageId: (
input: GetProjectionThreadMessageInput,
) => Effect.Effect<Option.Option<ProjectionThreadMessage>, ProjectionRepositoryError>;

/**
* Append a streaming text delta to an existing projected message row, or insert it.
*/
readonly appendTextDelta: (
input: AppendProjectionThreadMessageDeltaInput,
) => Effect.Effect<void, ProjectionRepositoryError>;

/**
* Delete projected thread messages by thread.
*/
Expand Down
Loading