Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions apps/server/src/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,41 @@ it.layer(NodeServices.layer)("server router seam", (it) => {
}).pipe(Effect.provide(NodeHttpServer.layerTest)),
);

it.effect("closes thread terminals after a successful archive command", () =>
Effect.gen(function* () {
const threadId = ThreadId.makeUnsafe("thread-archive");
const closeInputs: Array<Parameters<TerminalManagerShape["close"]>[0]> = [];

yield* buildAppUnderTest({
layers: {
terminalManager: {
close: (input) =>
Effect.sync(() => {
closeInputs.push(input);
}),
},
orchestrationEngine: {
dispatch: () => Effect.succeed({ sequence: 8 }),
},
},
});

const wsUrl = yield* getWsServerUrl("/ws");
const dispatchResult = yield* Effect.scoped(
withWsRpcClient(wsUrl, (client) =>
client[ORCHESTRATION_WS_METHODS.dispatchCommand]({
type: "thread.archive",
commandId: CommandId.makeUnsafe("cmd-thread-archive"),
threadId,
}),
),
);

assert.equal(dispatchResult.sequence, 8);
assert.deepEqual(closeInputs, [{ threadId }]);
}).pipe(Effect.provide(NodeHttpServer.layerTest)),
);

it.effect(
"routes websocket rpc subscribeOrchestrationDomainEvents with replay/live overlap resilience",
() =>
Expand Down
15 changes: 14 additions & 1 deletion apps/server/src/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,20 @@ const WsRpcLayer = WsRpcGroup.toLayer(
[ORCHESTRATION_WS_METHODS.dispatchCommand]: (command) =>
Effect.gen(function* () {
const normalizedCommand = yield* normalizeDispatchCommand(command);
return yield* startup.enqueueCommand(orchestrationEngine.dispatch(normalizedCommand));
const result = yield* startup.enqueueCommand(
orchestrationEngine.dispatch(normalizedCommand),
);
if (normalizedCommand.type === "thread.archive") {
yield* terminalManager.close({ threadId: normalizedCommand.threadId }).pipe(
Effect.catch((error) =>
Effect.logWarning("failed to close thread terminals after archive", {
threadId: normalizedCommand.threadId,
error: error.message,
}),
),
);
}
return result;
}).pipe(
Effect.mapError((cause) =>
Schema.is(OrchestrationDispatchCommandError)(cause)
Expand Down
36 changes: 31 additions & 5 deletions apps/web/src/lib/terminalStateCleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,50 @@ describe("collectActiveTerminalThreadIds", () => {
it("retains non-deleted server threads", () => {
const activeThreadIds = collectActiveTerminalThreadIds({
snapshotThreads: [
{ id: threadId("server-1"), deletedAt: null },
{ id: threadId("server-2"), deletedAt: null },
{ id: threadId("server-1"), deletedAt: null, archivedAt: null },
{ id: threadId("server-2"), deletedAt: null, archivedAt: null },
],
draftThreadIds: [],
});

expect(activeThreadIds).toEqual(new Set([threadId("server-1"), threadId("server-2")]));
});

it("ignores deleted server threads and keeps local draft threads", () => {
it("ignores deleted and archived server threads and keeps local draft threads", () => {
const activeThreadIds = collectActiveTerminalThreadIds({
snapshotThreads: [
{ id: threadId("server-active"), deletedAt: null },
{ id: threadId("server-deleted"), deletedAt: "2026-03-05T08:00:00.000Z" },
{ id: threadId("server-active"), deletedAt: null, archivedAt: null },
{
id: threadId("server-deleted"),
deletedAt: "2026-03-05T08:00:00.000Z",
archivedAt: null,
},
{
id: threadId("server-archived"),
deletedAt: null,
archivedAt: "2026-03-05T09:00:00.000Z",
},
],
draftThreadIds: [threadId("local-draft")],
});

expect(activeThreadIds).toEqual(new Set([threadId("server-active"), threadId("local-draft")]));
});

it("does not keep draft-linked terminal state for archived server threads", () => {
const archivedThreadId = threadId("server-archived");

const activeThreadIds = collectActiveTerminalThreadIds({
snapshotThreads: [
{
id: archivedThreadId,
deletedAt: null,
archivedAt: "2026-03-05T09:00:00.000Z",
},
],
draftThreadIds: [archivedThreadId, threadId("local-draft")],
});

expect(activeThreadIds).toEqual(new Set([threadId("local-draft")]));
});
});
10 changes: 10 additions & 0 deletions apps/web/src/lib/terminalStateCleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { ThreadId } from "@t3tools/contracts";
interface TerminalRetentionThread {
id: ThreadId;
deletedAt: string | null;
archivedAt: string | null;
}

interface CollectActiveTerminalThreadIdsInput {
Expand All @@ -14,11 +15,20 @@ export function collectActiveTerminalThreadIds(
input: CollectActiveTerminalThreadIdsInput,
): Set<ThreadId> {
const activeThreadIds = new Set<ThreadId>();
const snapshotThreadById = new Map(input.snapshotThreads.map((thread) => [thread.id, thread]));
for (const thread of input.snapshotThreads) {
if (thread.deletedAt !== null) continue;
if (thread.archivedAt !== null) continue;
activeThreadIds.add(thread.id);
}
for (const draftThreadId of input.draftThreadIds) {
const snapshotThread = snapshotThreadById.get(draftThreadId);
if (
snapshotThread &&
(snapshotThread.deletedAt !== null || snapshotThread.archivedAt !== null)
) {
continue;
}
activeThreadIds.add(draftThreadId);
}
return activeThreadIds;
Expand Down
28 changes: 27 additions & 1 deletion apps/web/src/orchestrationEventEffects.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ describe("deriveOrchestrationBatchEffects", () => {
it("targets draft promotion and terminal cleanup from thread lifecycle events", () => {
const createdThreadId = ThreadId.makeUnsafe("thread-created");
const deletedThreadId = ThreadId.makeUnsafe("thread-deleted");
const archivedThreadId = ThreadId.makeUnsafe("thread-archived");

const effects = deriveOrchestrationBatchEffects([
makeEvent("thread.created", {
Expand All @@ -60,11 +61,16 @@ describe("deriveOrchestrationBatchEffects", () => {
threadId: deletedThreadId,
deletedAt: "2026-02-27T00:00:01.000Z",
}),
makeEvent("thread.archived", {
threadId: archivedThreadId,
archivedAt: "2026-02-27T00:00:02.000Z",
updatedAt: "2026-02-27T00:00:02.000Z",
}),
]);

expect(effects.clearPromotedDraftThreadIds).toEqual([createdThreadId]);
expect(effects.clearDeletedThreadIds).toEqual([deletedThreadId]);
expect(effects.removeTerminalStateThreadIds).toEqual([deletedThreadId]);
expect(effects.removeTerminalStateThreadIds).toEqual([deletedThreadId, archivedThreadId]);
expect(effects.needsProviderInvalidation).toBe(false);
});

Expand Down Expand Up @@ -105,4 +111,24 @@ describe("deriveOrchestrationBatchEffects", () => {
expect(effects.removeTerminalStateThreadIds).toEqual([]);
expect(effects.needsProviderInvalidation).toBe(true);
});

it("does not retain archive cleanup when a thread is unarchived later in the same batch", () => {
const threadId = ThreadId.makeUnsafe("thread-1");

const effects = deriveOrchestrationBatchEffects([
makeEvent("thread.archived", {
threadId,
archivedAt: "2026-02-27T00:00:01.000Z",
updatedAt: "2026-02-27T00:00:01.000Z",
}),
makeEvent("thread.unarchived", {
threadId,
updatedAt: "2026-02-27T00:00:02.000Z",
}),
]);

expect(effects.clearPromotedDraftThreadIds).toEqual([]);
expect(effects.clearDeletedThreadIds).toEqual([]);
expect(effects.removeTerminalStateThreadIds).toEqual([]);
});
});
18 changes: 18 additions & 0 deletions apps/web/src/orchestrationEventEffects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ export function deriveOrchestrationBatchEffects(
break;
}

case "thread.archived": {
threadLifecycleEffects.set(event.payload.threadId, {
clearPromotedDraft: false,
clearDeletedThread: false,
removeTerminalState: true,
});
break;
}

case "thread.unarchived": {
threadLifecycleEffects.set(event.payload.threadId, {
clearPromotedDraft: false,
clearDeletedThread: false,
removeTerminalState: false,
});
break;
}

default: {
break;
}
Expand Down
10 changes: 9 additions & 1 deletion apps/web/src/routes/__root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@ function EventRouter() {
useComposerDraftStore.getState().draftThreadsByThreadId,
) as ThreadId[];
const activeThreadIds = collectActiveTerminalThreadIds({
snapshotThreads: threads.map((thread) => ({ id: thread.id, deletedAt: null })),
snapshotThreads: threads.map((thread) => ({
id: thread.id,
deletedAt: null,
archivedAt: thread.archivedAt,
})),
draftThreadIds,
});
removeOrphanedTerminalStates(activeThreadIds);
Expand Down Expand Up @@ -497,6 +501,10 @@ function EventRouter() {
}
});
const unsubTerminalEvent = api.terminal.onEvent((event) => {
const thread = useStore.getState().threads.find((entry) => entry.id === event.threadId);
if (thread && thread.archivedAt !== null) {
return;
}
useTerminalStateStore.getState().recordTerminalEvent(event);
const hasRunningSubprocess = terminalRunningSubprocessFromEvent(event);
if (hasRunningSubprocess === null) {
Expand Down
Loading