diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 7bf7585d11..4367ca40a7 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -1193,6 +1193,41 @@ it.layer(NodeServices.layer)("server router seam", (it) => { }).pipe(Effect.provide(NodeHttpServer.layerTest)), ); + it.effect("closes thread terminals after a successful archive command", () => + Effect.gen(function* () { + const threadId = ThreadId.makeUnsafe("thread-archive"); + const closeInputs: Array[0]> = []; + + yield* buildAppUnderTest({ + layers: { + terminalManager: { + close: (input) => + Effect.sync(() => { + closeInputs.push(input); + }), + }, + orchestrationEngine: { + dispatch: () => Effect.succeed({ sequence: 8 }), + }, + }, + }); + + const wsUrl = yield* getWsServerUrl("/ws"); + const dispatchResult = yield* Effect.scoped( + withWsRpcClient(wsUrl, (client) => + client[ORCHESTRATION_WS_METHODS.dispatchCommand]({ + type: "thread.archive", + commandId: CommandId.makeUnsafe("cmd-thread-archive"), + threadId, + }), + ), + ); + + assert.equal(dispatchResult.sequence, 8); + assert.deepEqual(closeInputs, [{ threadId }]); + }).pipe(Effect.provide(NodeHttpServer.layerTest)), + ); + it.effect( "routes websocket rpc subscribeOrchestrationDomainEvents with replay/live overlap resilience", () => diff --git a/apps/server/src/ws.ts b/apps/server/src/ws.ts index cff7e26efa..e467ea4454 100644 --- a/apps/server/src/ws.ts +++ b/apps/server/src/ws.ts @@ -85,7 +85,20 @@ const WsRpcLayer = WsRpcGroup.toLayer( [ORCHESTRATION_WS_METHODS.dispatchCommand]: (command) => Effect.gen(function* () { const normalizedCommand = yield* normalizeDispatchCommand(command); - return yield* startup.enqueueCommand(orchestrationEngine.dispatch(normalizedCommand)); + const result = yield* startup.enqueueCommand( + orchestrationEngine.dispatch(normalizedCommand), + ); + if (normalizedCommand.type === "thread.archive") { + yield* terminalManager.close({ threadId: normalizedCommand.threadId }).pipe( + Effect.catch((error) => + Effect.logWarning("failed to close thread terminals after archive", { + threadId: normalizedCommand.threadId, + error: error.message, + }), + ), + ); + } + return result; }).pipe( Effect.mapError((cause) => Schema.is(OrchestrationDispatchCommandError)(cause) diff --git a/apps/web/src/lib/terminalStateCleanup.test.ts b/apps/web/src/lib/terminalStateCleanup.test.ts index 8bc3c37300..faf2c477cb 100644 --- a/apps/web/src/lib/terminalStateCleanup.test.ts +++ b/apps/web/src/lib/terminalStateCleanup.test.ts @@ -9,8 +9,8 @@ describe("collectActiveTerminalThreadIds", () => { it("retains non-deleted server threads", () => { const activeThreadIds = collectActiveTerminalThreadIds({ snapshotThreads: [ - { id: threadId("server-1"), deletedAt: null }, - { id: threadId("server-2"), deletedAt: null }, + { id: threadId("server-1"), deletedAt: null, archivedAt: null }, + { id: threadId("server-2"), deletedAt: null, archivedAt: null }, ], draftThreadIds: [], }); @@ -18,15 +18,41 @@ describe("collectActiveTerminalThreadIds", () => { expect(activeThreadIds).toEqual(new Set([threadId("server-1"), threadId("server-2")])); }); - it("ignores deleted server threads and keeps local draft threads", () => { + it("ignores deleted and archived server threads and keeps local draft threads", () => { const activeThreadIds = collectActiveTerminalThreadIds({ snapshotThreads: [ - { id: threadId("server-active"), deletedAt: null }, - { id: threadId("server-deleted"), deletedAt: "2026-03-05T08:00:00.000Z" }, + { id: threadId("server-active"), deletedAt: null, archivedAt: null }, + { + id: threadId("server-deleted"), + deletedAt: "2026-03-05T08:00:00.000Z", + archivedAt: null, + }, + { + id: threadId("server-archived"), + deletedAt: null, + archivedAt: "2026-03-05T09:00:00.000Z", + }, ], draftThreadIds: [threadId("local-draft")], }); expect(activeThreadIds).toEqual(new Set([threadId("server-active"), threadId("local-draft")])); }); + + it("does not keep draft-linked terminal state for archived server threads", () => { + const archivedThreadId = threadId("server-archived"); + + const activeThreadIds = collectActiveTerminalThreadIds({ + snapshotThreads: [ + { + id: archivedThreadId, + deletedAt: null, + archivedAt: "2026-03-05T09:00:00.000Z", + }, + ], + draftThreadIds: [archivedThreadId, threadId("local-draft")], + }); + + expect(activeThreadIds).toEqual(new Set([threadId("local-draft")])); + }); }); diff --git a/apps/web/src/lib/terminalStateCleanup.ts b/apps/web/src/lib/terminalStateCleanup.ts index 5f2bfdafaa..f11b30af92 100644 --- a/apps/web/src/lib/terminalStateCleanup.ts +++ b/apps/web/src/lib/terminalStateCleanup.ts @@ -3,6 +3,7 @@ import type { ThreadId } from "@t3tools/contracts"; interface TerminalRetentionThread { id: ThreadId; deletedAt: string | null; + archivedAt: string | null; } interface CollectActiveTerminalThreadIdsInput { @@ -14,11 +15,20 @@ export function collectActiveTerminalThreadIds( input: CollectActiveTerminalThreadIdsInput, ): Set { const activeThreadIds = new Set(); + const snapshotThreadById = new Map(input.snapshotThreads.map((thread) => [thread.id, thread])); for (const thread of input.snapshotThreads) { if (thread.deletedAt !== null) continue; + if (thread.archivedAt !== null) continue; activeThreadIds.add(thread.id); } for (const draftThreadId of input.draftThreadIds) { + const snapshotThread = snapshotThreadById.get(draftThreadId); + if ( + snapshotThread && + (snapshotThread.deletedAt !== null || snapshotThread.archivedAt !== null) + ) { + continue; + } activeThreadIds.add(draftThreadId); } return activeThreadIds; diff --git a/apps/web/src/orchestrationEventEffects.test.ts b/apps/web/src/orchestrationEventEffects.test.ts index 263610bb95..9829ba9455 100644 --- a/apps/web/src/orchestrationEventEffects.test.ts +++ b/apps/web/src/orchestrationEventEffects.test.ts @@ -42,6 +42,7 @@ describe("deriveOrchestrationBatchEffects", () => { it("targets draft promotion and terminal cleanup from thread lifecycle events", () => { const createdThreadId = ThreadId.makeUnsafe("thread-created"); const deletedThreadId = ThreadId.makeUnsafe("thread-deleted"); + const archivedThreadId = ThreadId.makeUnsafe("thread-archived"); const effects = deriveOrchestrationBatchEffects([ makeEvent("thread.created", { @@ -60,11 +61,16 @@ describe("deriveOrchestrationBatchEffects", () => { threadId: deletedThreadId, deletedAt: "2026-02-27T00:00:01.000Z", }), + makeEvent("thread.archived", { + threadId: archivedThreadId, + archivedAt: "2026-02-27T00:00:02.000Z", + updatedAt: "2026-02-27T00:00:02.000Z", + }), ]); expect(effects.clearPromotedDraftThreadIds).toEqual([createdThreadId]); expect(effects.clearDeletedThreadIds).toEqual([deletedThreadId]); - expect(effects.removeTerminalStateThreadIds).toEqual([deletedThreadId]); + expect(effects.removeTerminalStateThreadIds).toEqual([deletedThreadId, archivedThreadId]); expect(effects.needsProviderInvalidation).toBe(false); }); @@ -105,4 +111,24 @@ describe("deriveOrchestrationBatchEffects", () => { expect(effects.removeTerminalStateThreadIds).toEqual([]); expect(effects.needsProviderInvalidation).toBe(true); }); + + it("does not retain archive cleanup when a thread is unarchived later in the same batch", () => { + const threadId = ThreadId.makeUnsafe("thread-1"); + + const effects = deriveOrchestrationBatchEffects([ + makeEvent("thread.archived", { + threadId, + archivedAt: "2026-02-27T00:00:01.000Z", + updatedAt: "2026-02-27T00:00:01.000Z", + }), + makeEvent("thread.unarchived", { + threadId, + updatedAt: "2026-02-27T00:00:02.000Z", + }), + ]); + + expect(effects.clearPromotedDraftThreadIds).toEqual([]); + expect(effects.clearDeletedThreadIds).toEqual([]); + expect(effects.removeTerminalStateThreadIds).toEqual([]); + }); }); diff --git a/apps/web/src/orchestrationEventEffects.ts b/apps/web/src/orchestrationEventEffects.ts index d4dda76d9e..b19afa331f 100644 --- a/apps/web/src/orchestrationEventEffects.ts +++ b/apps/web/src/orchestrationEventEffects.ts @@ -46,6 +46,24 @@ export function deriveOrchestrationBatchEffects( break; } + case "thread.archived": { + threadLifecycleEffects.set(event.payload.threadId, { + clearPromotedDraft: false, + clearDeletedThread: false, + removeTerminalState: true, + }); + break; + } + + case "thread.unarchived": { + threadLifecycleEffects.set(event.payload.threadId, { + clearPromotedDraft: false, + clearDeletedThread: false, + removeTerminalState: false, + }); + break; + } + default: { break; } diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx index bf5eb4410c..3377e4bb44 100644 --- a/apps/web/src/routes/__root.tsx +++ b/apps/web/src/routes/__root.tsx @@ -328,7 +328,11 @@ function EventRouter() { useComposerDraftStore.getState().draftThreadsByThreadId, ) as ThreadId[]; const activeThreadIds = collectActiveTerminalThreadIds({ - snapshotThreads: threads.map((thread) => ({ id: thread.id, deletedAt: null })), + snapshotThreads: threads.map((thread) => ({ + id: thread.id, + deletedAt: null, + archivedAt: thread.archivedAt, + })), draftThreadIds, }); removeOrphanedTerminalStates(activeThreadIds); @@ -497,6 +501,10 @@ function EventRouter() { } }); const unsubTerminalEvent = api.terminal.onEvent((event) => { + const thread = useStore.getState().threads.find((entry) => entry.id === event.threadId); + if (thread && thread.archivedAt !== null) { + return; + } useTerminalStateStore.getState().recordTerminalEvent(event); const hasRunningSubprocess = terminalRunningSubprocessFromEvent(event); if (hasRunningSubprocess === null) {