From f6c860e8544a3bc72a17bbcd568fa1bdc8c67bfd Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 11 Jun 2026 18:26:16 +0200 Subject: [PATCH 1/4] fix(stream): remove completed/errored streams from registry after buffer TTL Every SSE stream that finished (completed or errored) while its original client was still connected stayed in the StreamRegistry forever: the success path called _cleanupStream synchronously before the client's close event fired (so clients.size was still > 0 and nothing was scheduled), the close handler installed by _createNewStream had no cleanup-scheduling branch, and the error path never scheduled cleanup at all. Since every analytics query is one SSE stream whose final event is the full result, a busy server retained up to maxActiveStreams (1000) event buffers, generators, abort controllers, and trace contexts in heap. Factor the TTL-based removal scheduling into _scheduleRemovalAfterTTL and call it from every termination path: both close handlers, the success path, the abort path, and the error path. The event buffer remains available for reconnect replay for bufferTTL after the last client disconnects; a reconnect during the TTL window refreshes lastAccess so the pending timer no-ops and a fresh TTL starts when that client disconnects. Cleanup timers are unref()'d so they don't keep the process alive. Also drop the never-referenced cleanupInterval and maxPersistentBuffers keys from streamDefaults; the per-stream timers now cover every termination path, so no periodic sweeper is wired. Co-authored-by: Isaac Signed-off-by: MarioCadenas --- packages/appkit/src/stream/defaults.ts | 2 - packages/appkit/src/stream/stream-manager.ts | 58 ++++-- .../appkit/src/stream/tests/stream.test.ts | 179 ++++++++++++++++++ 3 files changed, 219 insertions(+), 20 deletions(-) diff --git a/packages/appkit/src/stream/defaults.ts b/packages/appkit/src/stream/defaults.ts index 304a63112..06b27078d 100644 --- a/packages/appkit/src/stream/defaults.ts +++ b/packages/appkit/src/stream/defaults.ts @@ -2,8 +2,6 @@ export const streamDefaults = { bufferSize: 100, maxEventSize: 1024 * 1024, // 1MB bufferTTL: 10 * 60 * 1000, // 10 minutes - cleanupInterval: 5 * 60 * 1000, // 5 minutes - maxPersistentBuffers: 10000, // 10000 buffers heartbeatInterval: 10 * 1000, // 10 seconds maxActiveStreams: 1000, // 1000 streams disconnectGraceMs: 15_000, // 15 seconds diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index 14741d64e..a1402e94f 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -153,13 +153,7 @@ export class StreamManager { } // cleanup if stream is completed and no clients are connected - if (streamEntry.isCompleted && streamEntry.clients.size === 0) { - setTimeout(() => { - if (streamEntry.clients.size === 0) { - this.streamRegistry.remove(streamEntry.streamId); - } - }, this.bufferTTL); - } + this._scheduleRemovalAfterTTL(streamEntry); }); // if stream is completed, close connection @@ -237,6 +231,11 @@ export class StreamManager { if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { this._scheduleGraceAbort(streamEntry); } + + // if the stream already finished (completed or errored), schedule + // registry removal once the buffer TTL elapses so completed streams + // don't accumulate in the registry forever + this._scheduleRemovalAfterTTL(streamEntry); }); await this._processGeneratorInBackground(streamEntry); @@ -293,8 +292,9 @@ export class StreamManager { // close all clients this._closeAllClients(streamEntry); - // cleanup if no clients are connected - this._cleanupStream(streamEntry); + // cleanup if no clients are connected (clients that are still + // connected schedule cleanup from their close handlers instead) + this._scheduleRemovalAfterTTL(streamEntry); } catch (error) { const errorMsg = error instanceof Error ? error.message : "Internal server error"; @@ -307,7 +307,7 @@ export class StreamManager { streamEntry.isCompleted = true; this._clearGraceTimer(streamEntry); this._closeAllClients(streamEntry); - this._cleanupStream(streamEntry); + this._scheduleRemovalAfterTTL(streamEntry); return; } @@ -335,6 +335,10 @@ export class StreamManager { ); streamEntry.isCompleted = true; this._clearGraceTimer(streamEntry); + + // cleanup if no clients are connected (clients that are still + // connected schedule cleanup from their close handlers instead) + this._scheduleRemovalAfterTTL(streamEntry); } }); } @@ -434,15 +438,33 @@ export class StreamManager { } } - // cleanup stream if no clients are connected - private _cleanupStream(streamEntry: StreamEntry): void { - if (streamEntry.clients.size === 0) { - setTimeout(() => { - if (streamEntry.clients.size === 0) { - this.streamRegistry.remove(streamEntry.streamId); - } - }, this.bufferTTL); + // schedule registry removal once a finished (completed or errored) stream + // has no connected clients. The event buffer stays available for + // reconnect replay for `bufferTTL` after the last client disconnects; + // after that the stream entry is removed from the registry so it can be + // garbage collected. + private _scheduleRemovalAfterTTL(streamEntry: StreamEntry): void { + if (!streamEntry.isCompleted || streamEntry.clients.size > 0) { + return; } + + // mark the moment the stream became idle so a reconnect during the TTL + // window (which refreshes lastAccess) makes the pending timer a no-op + streamEntry.lastAccess = Date.now(); + + const timer = setTimeout(() => { + // no-op if a client reconnected during the TTL window; the reconnect's + // own close handler schedules a fresh removal when it disconnects + if ( + streamEntry.clients.size === 0 && + Date.now() - streamEntry.lastAccess >= this.bufferTTL + ) { + this.streamRegistry.remove(streamEntry.streamId); + } + }, this.bufferTTL); + + // don't keep the process alive just to clean up finished streams + timer.unref?.(); } private _categorizeError(error: unknown): SSEErrorCode { diff --git a/packages/appkit/src/stream/tests/stream.test.ts b/packages/appkit/src/stream/tests/stream.test.ts index 1de49b5fd..32b8fb0eb 100644 --- a/packages/appkit/src/stream/tests/stream.test.ts +++ b/packages/appkit/src/stream/tests/stream.test.ts @@ -908,6 +908,185 @@ describe("StreamManager", () => { }); }); + describe("registry cleanup after stream termination", () => { + // default bufferTTL from streamDefaults + const BUFFER_TTL = 10 * 60 * 1000; + + function getRegistry(manager: StreamManager) { + return (manager as any).streamRegistry as { + has(streamId: string): boolean; + }; + } + + function captureCloseHandler(mockRes: { on: ReturnType }) { + const handlers: (() => void)[] = []; + mockRes.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") handlers.push(handler); + }); + return () => { + for (const handler of handlers) { + handler(); + } + }; + } + + test("removes a completed stream from the registry after buffer TTL once the client disconnects", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-completed-123"; + const { mockRes } = createMockResponse(); + const fireClose = captureCloseHandler(mockRes); + + async function* generator() { + yield { type: "message", data: "result" }; + } + + await streamManager.stream(mockRes as any, generator, { streamId }); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + // server ended the response; the close event fires asynchronously + fireClose(); + + // buffer is retained for reconnect replay during the TTL window + await vi.advanceTimersByTimeAsync(BUFFER_TTL - 1); + expect(registry.has(streamId)).toBe(true); + + // once the TTL elapses, the stream is removed from the registry + await vi.advanceTimersByTimeAsync(1); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("removes an errored stream from the registry after buffer TTL once the client disconnects", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-errored-123"; + const { mockRes } = createMockResponse(); + const fireClose = captureCloseHandler(mockRes); + + async function* generator() { + yield { type: "start" }; + throw new Error("Boom"); + } + + await streamManager.stream(mockRes as any, generator, { streamId }); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + fireClose(); + + await vi.advanceTimersByTimeAsync(BUFFER_TTL - 1); + expect(registry.has(streamId)).toBe(true); + + await vi.advanceTimersByTimeAsync(1); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("removes an errored stream even when the client disconnected before the error", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-errored-disconnected-123"; + const { mockRes } = createMockResponse(); + const fireClose = captureCloseHandler(mockRes); + + async function* generator() { + yield { type: "start" }; + // simulate client going away mid-stream + fireClose(); + throw new Error("Boom"); + } + + await streamManager.stream(mockRes as any, generator, { streamId }); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + await vi.advanceTimersByTimeAsync(BUFFER_TTL); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("keeps the buffer available for reconnect replay within the TTL window", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-replay-123"; + + const { mockRes: mockRes1, events: events1 } = createMockResponse(); + const fireClose1 = captureCloseHandler(mockRes1); + + async function* generator1() { + yield { type: "message", data: "event1" }; + yield { type: "message", data: "event2" }; + } + + await streamManager.stream(mockRes1 as any, generator1, { streamId }); + fireClose1(); + + // reconnect halfway through the TTL window + await vi.advanceTimersByTimeAsync(BUFFER_TTL / 2); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + const eventIds = events1 + .filter((e) => e.startsWith("id: ")) + .map((e) => e.replace("id: ", "").replace("\n", "")); + + const { mockRes: mockRes2, events: events2 } = createMockResponse({ + "last-event-id": eventIds[0], + }); + const fireClose2 = captureCloseHandler(mockRes2); + + async function* generator2() { + yield { type: "should-not-run" }; + } + + await streamManager.stream(mockRes2 as any, generator2, { streamId }); + + // missed events are replayed from the buffer + const replayedData = events2 + .filter((e) => e.startsWith("data: ")) + .map((e) => e.replace("data: ", "").replace("\n\n", "")); + expect(replayedData.length).toBe(1); + expect(replayedData[0]).toContain("event2"); + + fireClose2(); + + // the timer from the first disconnect fires now but must no-op since + // the reconnect refreshed the stream's last access + await vi.advanceTimersByTimeAsync(BUFFER_TTL / 2); + expect(registry.has(streamId)).toBe(true); + + // a full TTL after the second disconnect, the stream is removed + await vi.advanceTimersByTimeAsync(BUFFER_TTL / 2); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("does not remove a completed stream while a client is still connected", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-still-connected-123"; + const { mockRes } = createMockResponse(); + + async function* generator() { + yield { type: "message", data: "result" }; + } + + // mockRes.on never fires "close", so the client stays connected + await streamManager.stream(mockRes as any, generator, { streamId }); + + const registry = getRegistry(streamManager); + await vi.advanceTimersByTimeAsync(BUFFER_TTL * 2); + expect(registry.has(streamId)).toBe(true); + + vi.useRealTimers(); + }); + }); + describe("error categorization", () => { async function captureErrorEvent( manager: StreamManager, From 069357b47461042289a60ee49991b50939758996 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 11 Jun 2026 19:16:07 +0200 Subject: [PATCH 2/4] fix(appkit): decouple stream cleanup from close events and prefer evicting completed streams Follow-up to the stream registry leak fix, addressing review findings: - _closeAllClients now removes the clients it deliberately ends from the stream entry, so registry removal is scheduled on every terminal path (complete/error/abort) instead of hinging on the transport emitting 'close'. The close handlers' later deletes remain safe no-ops. - At most one removal timer per stream: the timer handle is stored on the StreamEntry, rescheduling clears the prior timer, and a reconnect-attach cancels any pending removal. The fire-time lastAccess re-check stays as a safety net. - Registry eviction now prefers the oldest completed stream (waiting out its buffer TTL) and only falls back to LRU when every stream is live, so a full registry no longer evicts live streams while dead ones survive. StreamRegistry is backed by a Map instead of RingBuffer: ring slot overwrites clobbered an unrelated live entry whenever the evicted stream wasn't in the oldest insertion slot. - Remove unused StreamConfig keys cleanupInterval and maxPersistentBuffers (never read; their only definition site was already removed). - Tests: mock response end() now marks the response ended and fires close handlers asynchronously like Node; replaced the test that encoded the leaky behavior with active-stream-retained and completed-stream-removed- without-close coverage; added end()-fires-close cleanup, eviction preference, and replay boundary (newest id, evicted id) tests. - knip: set ignoreExportsUsedInFile so RingBuffer (still used inside buffers.ts by EventRingBuffer) isn't flagged once the registry stops importing it. Co-authored-by: Isaac Signed-off-by: MarioCadenas --- knip.json | 1 + packages/appkit/src/stream/stream-manager.ts | 48 +++-- packages/appkit/src/stream/stream-registry.ts | 69 ++++--- .../src/stream/tests/stream-registry.test.ts | 84 +++++++++ .../appkit/src/stream/tests/stream.test.ts | 172 +++++++++++++++++- packages/appkit/src/stream/types.ts | 5 + packages/shared/src/execute.ts | 4 +- 7 files changed, 339 insertions(+), 44 deletions(-) diff --git a/knip.json b/knip.json index 251cc61eb..1e48feaf3 100644 --- a/knip.json +++ b/knip.json @@ -1,5 +1,6 @@ { "$schema": "https://unpkg.com/knip@5/schema.json", + "ignoreExportsUsedInFile": true, "ignoreWorkspaces": [ "packages/shared", "packages/lakebase", diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index a1402e94f..c98d41a96 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -122,6 +122,13 @@ export class StreamManager { // a reconnecting client cancels the pending disconnect-grace abort this._clearGraceTimer(streamEntry); + // a reconnect cancels any pending registry removal so the entry isn't + // pulled out from under the newly attached client + if (streamEntry.removalTimer) { + clearTimeout(streamEntry.removalTimer); + streamEntry.removalTimer = undefined; + } + // add client to stream entry streamEntry.clients.add(res); streamEntry.lastAccess = Date.now(); @@ -162,6 +169,11 @@ export class StreamManager { // cleanup operation this.activeOperations.delete(streamOperation); clearInterval(heartbeat); + // we deliberately ended this client, so drop it from the entry and + // schedule removal now instead of relying on the transport's `close` + // event (the close handler's later delete is a safe no-op) + streamEntry.clients.delete(res); + this._scheduleRemovalAfterTTL(streamEntry); } } private async _createNewStream( @@ -289,11 +301,10 @@ export class StreamManager { // no late grace abort should fire on a completed stream this._clearGraceTimer(streamEntry); - // close all clients + // close all clients (this also drops them from the entry, so the + // removal below is scheduled regardless of whether the transport + // ever emits `close`) this._closeAllClients(streamEntry); - - // cleanup if no clients are connected (clients that are still - // connected schedule cleanup from their close handlers instead) this._scheduleRemovalAfterTTL(streamEntry); } catch (error) { const errorMsg = @@ -336,8 +347,10 @@ export class StreamManager { streamEntry.isCompleted = true; this._clearGraceTimer(streamEntry); - // cleanup if no clients are connected (clients that are still - // connected schedule cleanup from their close handlers instead) + // the broadcast above already ended the connected clients; drop them + // from the entry so removal is scheduled regardless of whether the + // transport ever emits `close` + this._closeAllClients(streamEntry); this._scheduleRemovalAfterTTL(streamEntry); } }); @@ -402,13 +415,18 @@ export class StreamManager { } } - // close all connected clients + // close all connected clients and remove them from the stream entry. + // We are deliberately terminating these connections, so cleanup must not + // depend on the transport emitting a `close` event for each client (it may + // never fire). The close handlers' later `clients.delete(...)` calls remain + // safe no-ops. private _closeAllClients(streamEntry: StreamEntry): void { for (const client of streamEntry.clients) { if (!client.writableEnded) { client.end(); } } + streamEntry.clients.clear(); } // abort the generator after the grace window unless a client reconnects first @@ -448,13 +466,21 @@ export class StreamManager { return; } + // at most one removal timer per stream: rescheduling replaces any + // pending timer instead of stacking a new one (each pending timer pins + // the entry's buffer/generator/trace context for the full TTL) + if (streamEntry.removalTimer) { + clearTimeout(streamEntry.removalTimer); + } + // mark the moment the stream became idle so a reconnect during the TTL // window (which refreshes lastAccess) makes the pending timer a no-op streamEntry.lastAccess = Date.now(); - const timer = setTimeout(() => { - // no-op if a client reconnected during the TTL window; the reconnect's - // own close handler schedules a fresh removal when it disconnects + streamEntry.removalTimer = setTimeout(() => { + streamEntry.removalTimer = undefined; + // safety net: no-op if a client reconnected during the TTL window; + // a fresh removal is scheduled when that client disconnects if ( streamEntry.clients.size === 0 && Date.now() - streamEntry.lastAccess >= this.bufferTTL @@ -464,7 +490,7 @@ export class StreamManager { }, this.bufferTTL); // don't keep the process alive just to clean up finished streams - timer.unref?.(); + streamEntry.removalTimer.unref?.(); } private _categorizeError(error: unknown): SSEErrorCode { diff --git a/packages/appkit/src/stream/stream-registry.ts b/packages/appkit/src/stream/stream-registry.ts index 129f53e5f..aeb4d8909 100644 --- a/packages/appkit/src/stream/stream-registry.ts +++ b/packages/appkit/src/stream/stream-registry.ts @@ -1,29 +1,31 @@ -import { RingBuffer } from "./buffers"; import { SSEErrorCode, type StreamEntry } from "./types"; export class StreamRegistry { - private streams: RingBuffer; + // keyed storage with explicit, policy-driven eviction. A ring buffer is + // unsuitable here: it overwrites by insertion slot, so evicting an entry + // chosen by policy (e.g. a completed stream) and then adding would + // silently clobber an unrelated live stream sitting in the oldest slot. + private streams: Map; + private maxActiveStreams: number; constructor(maxActiveStreams: number) { - this.streams = new RingBuffer( - maxActiveStreams, - (entry) => entry.streamId, - ); + this.streams = new Map(); + this.maxActiveStreams = maxActiveStreams; } // add a stream to the registry add(entry: StreamEntry): void { // enforce hard cap - if (this.streams.getSize() >= this.streams.capacity) { + if (this.streams.size >= this.maxActiveStreams) { this._evictOldestStream(entry.streamId); } - this.streams.add(entry); + this.streams.set(entry.streamId, entry); } // get a stream from the registry get(streamId: string): StreamEntry | null { - return this.streams.get(streamId); + return this.streams.get(streamId) ?? null; } // check if a stream exists in the registry @@ -33,20 +35,22 @@ export class StreamRegistry { // remove a stream from the registry remove(streamId: string): void { - this.streams.remove(streamId); + this.streams.delete(streamId); } // get the number of streams in the registry size(): number { - return this.streams.getSize(); + return this.streams.size; } clear(): void { - const allStreams = this.streams.getAll(); - - for (const stream of allStreams) { - this._clearGraceTimer(stream); + for (const stream of this.streams.values()) { stream.abortController.abort("Server shutdown"); + this._clearGraceTimer(stream); + if (stream.removalTimer) { + clearTimeout(stream.removalTimer); + stream.removalTimer = undefined; + } } this.streams.clear(); @@ -60,24 +64,36 @@ export class StreamRegistry { } } - // evict the oldest stream from the registry + // evict the oldest stream from the registry, preferring completed streams. + // Completed streams waiting out their buffer TTL can look recently + // accessed, so plain LRU could evict a live stream while dead ones + // survive. Prefer the oldest completed stream when one exists and fall + // back to LRU over all streams otherwise. private _evictOldestStream(excludeStreamId: string): void { - const allStreams = this.streams.getAll(); + const allStreams = this.streams.values(); let oldestStream: StreamEntry | null = null; let oldestAccess = Infinity; + let oldestCompletedStream: StreamEntry | null = null; + let oldestCompletedAccess = Infinity; - // find the least recently accessed stream + // find the least recently accessed stream (overall and completed-only) for (const stream of allStreams) { - if ( - stream.streamId !== excludeStreamId && - stream.lastAccess < oldestAccess - ) { + if (stream.streamId === excludeStreamId) continue; + + if (stream.lastAccess < oldestAccess) { oldestStream = stream; oldestAccess = stream.lastAccess; } + + if (stream.isCompleted && stream.lastAccess < oldestCompletedAccess) { + oldestCompletedStream = stream; + oldestCompletedAccess = stream.lastAccess; + } } - // abort the oldest stream + oldestStream = oldestCompletedStream ?? oldestStream; + + // abort the evicted stream if (oldestStream) { // broadcast stream eviction error to all clients for (const client of oldestStream.clients) { @@ -94,7 +110,12 @@ export class StreamRegistry { } this._clearGraceTimer(oldestStream); oldestStream.abortController.abort("Stream evicted"); - this.streams.remove(oldestStream.streamId); + // a pending removal timer would otherwise pin the evicted entry + if (oldestStream.removalTimer) { + clearTimeout(oldestStream.removalTimer); + oldestStream.removalTimer = undefined; + } + this.streams.delete(oldestStream.streamId); } } } diff --git a/packages/appkit/src/stream/tests/stream-registry.test.ts b/packages/appkit/src/stream/tests/stream-registry.test.ts index d3f70e95a..343039394 100644 --- a/packages/appkit/src/stream/tests/stream-registry.test.ts +++ b/packages/appkit/src/stream/tests/stream-registry.test.ts @@ -262,6 +262,90 @@ describe("StreamRegistry", () => { expect(abortController1.signal.reason).toBe("Stream evicted"); }); + + test("should prefer evicting a completed stream over an older live stream", () => { + const liveAbortController = new AbortController(); + + // the live stream is the LRU candidate, but a completed stream + // (waiting out its buffer TTL) exists and must be evicted first + registry.add( + createMockStreamEntry("live-old", { + lastAccess: 100, + abortController: liveAbortController, + }), + ); + registry.add( + createMockStreamEntry("completed-recent", { + lastAccess: 300, + isCompleted: true, + }), + ); + registry.add(createMockStreamEntry("live-recent", { lastAccess: 200 })); + + registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); + + expect(registry.has("completed-recent")).toBe(false); + expect(registry.has("live-old")).toBe(true); + expect(registry.has("live-recent")).toBe(true); + expect(registry.has("stream-4")).toBe(true); + expect(liveAbortController.signal.aborted).toBe(false); + }); + + test("should evict the oldest completed stream when several are completed", () => { + registry.add( + createMockStreamEntry("completed-old", { + lastAccess: 200, + isCompleted: true, + }), + ); + registry.add( + createMockStreamEntry("completed-recent", { + lastAccess: 300, + isCompleted: true, + }), + ); + registry.add(createMockStreamEntry("live", { lastAccess: 100 })); + + registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); + + expect(registry.has("completed-old")).toBe(false); + expect(registry.has("completed-recent")).toBe(true); + expect(registry.has("live")).toBe(true); + }); + + test("should fall back to LRU eviction when no stream is completed", () => { + registry.add(createMockStreamEntry("stream-1", { lastAccess: 300 })); + registry.add(createMockStreamEntry("stream-2", { lastAccess: 100 })); + registry.add(createMockStreamEntry("stream-3", { lastAccess: 200 })); + + registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); + + expect(registry.has("stream-2")).toBe(false); + expect(registry.has("stream-1")).toBe(true); + expect(registry.has("stream-3")).toBe(true); + expect(registry.has("stream-4")).toBe(true); + }); + + test("should clear a pending removal timer on the evicted completed stream", () => { + vi.useFakeTimers(); + const removalTimer = setTimeout(() => {}, 60_000); + + registry.add( + createMockStreamEntry("completed", { + lastAccess: 100, + isCompleted: true, + removalTimer, + }), + ); + registry.add(createMockStreamEntry("stream-2", { lastAccess: 200 })); + registry.add(createMockStreamEntry("stream-3", { lastAccess: 300 })); + + registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); + + expect(registry.has("completed")).toBe(false); + expect(vi.getTimerCount()).toBe(0); + vi.useRealTimers(); + }); }); describe("eviction SSE broadcast", () => { diff --git a/packages/appkit/src/stream/tests/stream.test.ts b/packages/appkit/src/stream/tests/stream.test.ts index 32b8fb0eb..35d4ec8f8 100644 --- a/packages/appkit/src/stream/tests/stream.test.ts +++ b/packages/appkit/src/stream/tests/stream.test.ts @@ -3,14 +3,27 @@ import { StreamManager } from "../index"; function createMockResponse(headers: Record = {}) { const events: string[] = []; + const closeHandlers: (() => void)[] = []; const mockRes = { setHeader: vi.fn(), flushHeaders: vi.fn(), write: vi.fn((data: string) => { events.push(data); }), - end: vi.fn(), - on: vi.fn(), + // mirror Node's lifecycle: end() marks the response as ended and the + // 'close' event fires asynchronously afterwards + end: vi.fn(() => { + if (mockRes.writableEnded) return; + mockRes.writableEnded = true; + queueMicrotask(() => { + for (const handler of closeHandlers) { + handler(); + } + }); + }), + on: vi.fn((event: string, handler: () => void) => { + if (event === "close") closeHandlers.push(handler); + }), writableEnded: false, req: { headers: headers, @@ -866,6 +879,81 @@ describe("StreamManager", () => { expect(events2.some((e) => e.includes("STREAM_FORBIDDEN"))).toBe(true); }); + test("replays nothing when last-event-id is the newest buffered event", async () => { + const streamId = "replay-newest-123"; + + const { mockRes: mockRes1, events: events1 } = createMockResponse(); + + async function* generator1() { + yield { type: "message", data: "event1" }; + yield { type: "message", data: "event2" }; + yield { type: "message", data: "event3" }; + } + + await streamManager.stream(mockRes1 as any, generator1, { streamId }); + + const eventIds = events1 + .filter((e) => e.startsWith("id: ")) + .map((e) => e.replace("id: ", "").replace("\n", "")); + const newestEventId = eventIds[eventIds.length - 1]; + + const { mockRes: mockRes2, events: events2 } = createMockResponse({ + "last-event-id": newestEventId, + }); + + async function* generator2() { + yield { type: "should-not-run" }; + } + + await streamManager.stream(mockRes2 as any, generator2, { streamId }); + + // the client is already caught up: zero replayed events and no + // buffer-overflow warning + expect(events2.filter((e) => e.startsWith("data: ")).length).toBe(0); + expect(events2.some((e) => e.includes("BUFFER_OVERFLOW_RESTART"))).toBe( + false, + ); + expect(mockRes2.end).toHaveBeenCalled(); + }); + + test("sends a buffer overflow warning when last-event-id was evicted from the ring", async () => { + const streamId = "replay-evicted-456"; + + const { mockRes: mockRes1, events: events1 } = createMockResponse(); + + async function* generator1() { + for (let i = 0; i < 5; i++) { + yield { type: "message", count: i }; + } + } + + await streamManager.stream(mockRes1 as any, generator1, { + streamId, + bufferSize: 2, + }); + + const eventIds = events1 + .filter((e) => e.startsWith("id: ")) + .map((e) => e.replace("id: ", "").replace("\n", "")); + expect(eventIds.length).toBe(5); + + // the first event id has been pushed out of the size-2 ring buffer + const { mockRes: mockRes2, events: events2 } = createMockResponse({ + "last-event-id": eventIds[0], + }); + + async function* generator2() { + yield { type: "should-not-run" }; + } + + await streamManager.stream(mockRes2 as any, generator2, { streamId }); + + expect(events2.some((e) => e.includes("BUFFER_OVERFLOW_RESTART"))).toBe( + true, + ); + expect(events2.some((e) => e.includes("should-not-run"))).toBe(false); + }); + test("should replay successfully when within buffer capacity", async () => { const streamId = "no-overflow-test-456"; @@ -1067,22 +1155,94 @@ describe("StreamManager", () => { vi.useRealTimers(); }); - test("does not remove a completed stream while a client is still connected", async () => { + test("does not remove an active stream while a client is still connected", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-active-connected-123"; + const { mockRes } = createMockResponse(); + + let release!: () => void; + const gate = new Promise((resolve) => { + release = resolve; + }); + + async function* generator() { + yield { type: "start" }; + await gate; + yield { type: "end" }; + } + + const streamPromise = streamManager.stream(mockRes as any, generator, { + streamId, + }); + + // let the generator yield its first event and park on the gate + await vi.advanceTimersByTimeAsync(0); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + // an active stream with a connected client is never removed, + // no matter how much time passes + await vi.advanceTimersByTimeAsync(BUFFER_TTL * 2); + expect(registry.has(streamId)).toBe(true); + + release(); + await streamPromise; + vi.useRealTimers(); + }); + + test("removes a completed stream after TTL even if the transport never emits close", async () => { + vi.useFakeTimers(); + const streamId = "cleanup-no-close-123"; + const { mockRes } = createMockResponse(); + + // simulate a transport that never emits the 'close' event + mockRes.on.mockImplementation(() => {}); + mockRes.end.mockImplementation(() => { + mockRes.writableEnded = true; + }); + + async function* generator() { + yield { type: "message", data: "result" }; + } + + await streamManager.stream(mockRes as any, generator, { streamId }); + + const registry = getRegistry(streamManager); + expect(registry.has(streamId)).toBe(true); + + // cleanup must not hinge on the transport emitting 'close' + await vi.advanceTimersByTimeAsync(BUFFER_TTL); + expect(registry.has(streamId)).toBe(false); + + vi.useRealTimers(); + }); + + test("removes the entry after buffer TTL when end() fires the close event", async () => { vi.useFakeTimers(); - const streamId = "cleanup-still-connected-123"; + const streamId = "cleanup-end-close-123"; const { mockRes } = createMockResponse(); async function* generator() { yield { type: "message", data: "result" }; } - // mockRes.on never fires "close", so the client stays connected await streamManager.stream(mockRes as any, generator, { streamId }); + // the generator completed with the client attached, so the server + // ended the response; the mock fires 'close' asynchronously + expect(mockRes.end).toHaveBeenCalled(); + await vi.advanceTimersByTimeAsync(0); + const registry = getRegistry(streamManager); - await vi.advanceTimersByTimeAsync(BUFFER_TTL * 2); expect(registry.has(streamId)).toBe(true); + await vi.advanceTimersByTimeAsync(BUFFER_TTL - 1); + expect(registry.has(streamId)).toBe(true); + + await vi.advanceTimersByTimeAsync(1); + expect(registry.has(streamId)).toBe(false); + vi.useRealTimers(); }); }); diff --git a/packages/appkit/src/stream/types.ts b/packages/appkit/src/stream/types.ts index e9a6bbc0f..b7f6b9abb 100644 --- a/packages/appkit/src/stream/types.ts +++ b/packages/appkit/src/stream/types.ts @@ -51,6 +51,11 @@ export interface StreamEntry { traceContext: Context; // pending grace-window abort, set while the last client is disconnected disconnectGraceTimer?: NodeJS.Timeout; + /** + * Pending registry-removal timer. At most one removal timer exists per + * stream; scheduling a new one clears any previous timer first. + */ + removalTimer?: NodeJS.Timeout; } export interface StreamOperation { diff --git a/packages/shared/src/execute.ts b/packages/shared/src/execute.ts index 708a892be..5b21a5c8b 100644 --- a/packages/shared/src/execute.ts +++ b/packages/shared/src/execute.ts @@ -1,14 +1,12 @@ import type { CacheConfig } from "./cache"; -/** SSE stream configuration for `executeStream()`. Controls buffer sizes, heartbeat interval, and cleanup behavior. */ +/** SSE stream configuration for `executeStream()`. Controls buffer sizes, buffer retention, and heartbeat interval. */ export interface StreamConfig { userSignal?: AbortSignal; streamId?: string; bufferSize?: number; maxEventSize?: number; bufferTTL?: number; - cleanupInterval?: number; - maxPersistentBuffers?: number; heartbeatInterval?: number; maxActiveStreams?: number; // ms to keep a generator alive after the last client disconnects, so a From fe34c576c23b2d0d289312c5355e07d81d3d7da6 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 11 Jun 2026 19:25:28 +0200 Subject: [PATCH 3/4] fix(appkit): scope knip suppression and harden stream registry eviction edge cases - revert repo-wide ignoreExportsUsedInFile in knip.json; replace with a file-scoped ignore entry for packages/appkit/src/stream/buffers.ts only - StreamRegistry.add(): only evict when inserting a genuinely new key, so re-adding an existing streamId at capacity no longer destroys an unrelated live stream for a net-zero insert - eviction now aborts with DOMException("Stream evicted", "AbortError") to match the manager's terminal abort paths so the error categorizer routes eviction as an abort rather than a stream failure - update stale RingBuffer comments in stream-registry tests to Map semantics and adjust eviction/abort-reason test expectations Co-authored-by: Isaac Signed-off-by: MarioCadenas --- knip.json | 2 +- packages/appkit/src/stream/stream-registry.ts | 16 ++++++++-- .../src/stream/tests/stream-registry.test.ts | 32 ++++++++++--------- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/knip.json b/knip.json index 1e48feaf3..430316b56 100644 --- a/knip.json +++ b/knip.json @@ -1,6 +1,5 @@ { "$schema": "https://unpkg.com/knip@5/schema.json", - "ignoreExportsUsedInFile": true, "ignoreWorkspaces": [ "packages/shared", "packages/lakebase", @@ -24,6 +23,7 @@ "packages/appkit/src/core/agent/tools/index.ts", "packages/appkit/src/core/agent/load-agents.ts", "packages/appkit/src/connectors/mcp/index.ts", + "packages/appkit/src/stream/buffers.ts", "packages/appkit/src/typedoc.entry.ts", "template/**", "tools/**", diff --git a/packages/appkit/src/stream/stream-registry.ts b/packages/appkit/src/stream/stream-registry.ts index aeb4d8909..1b62595c7 100644 --- a/packages/appkit/src/stream/stream-registry.ts +++ b/packages/appkit/src/stream/stream-registry.ts @@ -15,8 +15,13 @@ export class StreamRegistry { // add a stream to the registry add(entry: StreamEntry): void { - // enforce hard cap - if (this.streams.size >= this.maxActiveStreams) { + // enforce hard cap, but only when inserting a genuinely new key: + // re-adding an existing streamId updates in place and doesn't grow the + // map, so evicting another stream for it would destroy an innocent one + if ( + !this.streams.has(entry.streamId) && + this.streams.size >= this.maxActiveStreams + ) { this._evictOldestStream(entry.streamId); } @@ -109,7 +114,12 @@ export class StreamRegistry { } } this._clearGraceTimer(oldestStream); - oldestStream.abortController.abort("Stream evicted"); + // abort with a DOMException AbortError so the error categorizer routes + // eviction as an abort (matching the manager's terminal paths) rather + // than a stream failure + oldestStream.abortController.abort( + new DOMException("Stream evicted", "AbortError"), + ); // a pending removal timer would otherwise pin the evicted entry if (oldestStream.removalTimer) { clearTimeout(oldestStream.removalTimer); diff --git a/packages/appkit/src/stream/tests/stream-registry.test.ts b/packages/appkit/src/stream/tests/stream-registry.test.ts index 343039394..26395d444 100644 --- a/packages/appkit/src/stream/tests/stream-registry.test.ts +++ b/packages/appkit/src/stream/tests/stream-registry.test.ts @@ -209,26 +209,25 @@ describe("StreamRegistry", () => { expect(registry.has("stream-4")).toBe(true); }); - test("should exclude the stream being added from eviction", () => { - // This tests the excludeStreamId parameter: if a stream with the same - // ID as the one being added already exists and is the oldest, it should - // still be excluded from eviction. In practice, the new stream won't be - // in the registry yet when eviction runs, so excludeStreamId prevents - // misidentification. + test("should not evict when re-adding an existing key at capacity", () => { + // Re-adding an existing streamId updates the Map entry in place and + // doesn't grow the registry, so no other stream should be evicted for + // a net-zero insert. registry.add(createMockStreamEntry("stream-1", { lastAccess: 100 })); registry.add(createMockStreamEntry("stream-2", { lastAccess: 200 })); registry.add(createMockStreamEntry("stream-3", { lastAccess: 300 })); - // Add stream with id "stream-1" again; eviction should skip "stream-1" - // even though stream-1 has the oldest lastAccess, because it's the - // excludeStreamId. stream-2 should be evicted instead. + // Add stream with id "stream-1" again at capacity; it replaces the + // existing entry without triggering eviction. registry.add(createMockStreamEntry("stream-1", { lastAccess: 400 })); - // stream-1 is updated (RingBuffer updates existing keys in place) + // stream-1 is updated (Map.set replaces the existing key in place) expect(registry.has("stream-1")).toBe(true); - // stream-2 should have been evicted as it was the oldest non-excluded - expect(registry.has("stream-2")).toBe(false); + expect(registry.get("stream-1")?.lastAccess).toBe(400); + // no stream should have been evicted + expect(registry.has("stream-2")).toBe(true); expect(registry.has("stream-3")).toBe(true); + expect(registry.size()).toBe(3); }); test("should abort the evicted stream's AbortController", () => { @@ -247,7 +246,7 @@ describe("StreamRegistry", () => { expect(abortController1.signal.aborted).toBe(true); }); - test("should abort with 'Stream evicted' reason", () => { + test("should abort with a 'Stream evicted' AbortError reason", () => { const abortController1 = new AbortController(); registry.add( createMockStreamEntry("stream-1", { @@ -260,7 +259,10 @@ describe("StreamRegistry", () => { registry.add(createMockStreamEntry("stream-4", { lastAccess: 400 })); - expect(abortController1.signal.reason).toBe("Stream evicted"); + const reason = abortController1.signal.reason; + expect(reason).toBeInstanceOf(DOMException); + expect(reason.name).toBe("AbortError"); + expect(reason.message).toBe("Stream evicted"); }); test("should prefer evicting a completed stream over an older live stream", () => { @@ -603,7 +605,7 @@ describe("StreamRegistry", () => { registry.add(entry1); registry.add(entry2); - // The RingBuffer updates in place for same key + // The Map updates in place for same key expect(registry.size()).toBe(1); const retrieved = registry.get("stream-1"); expect(retrieved?.lastAccess).toBe(200); From ae38fbc39a6ef1cbde154efce8eda55081a54d3a Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Wed, 24 Jun 2026 13:10:37 +0200 Subject: [PATCH 4/4] refactor(appkit): extract shared stream timer-clear helpers The grace-abort and registry-removal timer teardown was duplicated: _clearGraceTimer existed identically on both StreamManager and StreamRegistry, and the removalTimer clear-and-undefined block was inlined in several call sites. Extract clearGraceTimer/clearRemovalTimer as pure helpers in types.ts (alongside StreamEntry, already imported by both files) and route all call sites through them. Behavior unchanged. Signed-off-by: MarioCadenas --- packages/appkit/src/stream/stream-manager.ts | 31 ++++++++----------- packages/appkit/src/stream/stream-registry.ts | 29 ++++++----------- packages/appkit/src/stream/types.ts | 18 +++++++++++ 3 files changed, 41 insertions(+), 37 deletions(-) diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index c98d41a96..8541f7288 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -6,7 +6,13 @@ import { EventRingBuffer } from "./buffers"; import { streamDefaults } from "./defaults"; import { SSEWriter } from "./sse-writer"; import { StreamRegistry } from "./stream-registry"; -import { SSEErrorCode, type StreamEntry, type StreamOperation } from "./types"; +import { + clearGraceTimer, + clearRemovalTimer, + SSEErrorCode, + type StreamEntry, + type StreamOperation, +} from "./types"; import { StreamValidator } from "./validator"; const logger = createLogger("stream"); @@ -120,14 +126,11 @@ export class StreamManager { } // a reconnecting client cancels the pending disconnect-grace abort - this._clearGraceTimer(streamEntry); + clearGraceTimer(streamEntry); // a reconnect cancels any pending registry removal so the entry isn't // pulled out from under the newly attached client - if (streamEntry.removalTimer) { - clearTimeout(streamEntry.removalTimer); - streamEntry.removalTimer = undefined; - } + clearRemovalTimer(streamEntry); // add client to stream entry streamEntry.clients.add(res); @@ -299,7 +302,7 @@ export class StreamManager { streamEntry.isCompleted = true; // no late grace abort should fire on a completed stream - this._clearGraceTimer(streamEntry); + clearGraceTimer(streamEntry); // close all clients (this also drops them from the entry, so the // removal below is scheduled regardless of whether the transport @@ -316,7 +319,7 @@ export class StreamManager { if (errorCode === SSEErrorCode.STREAM_ABORTED) { logger.info("Stream aborted by client (code=%s)", errorCode); streamEntry.isCompleted = true; - this._clearGraceTimer(streamEntry); + clearGraceTimer(streamEntry); this._closeAllClients(streamEntry); this._scheduleRemovalAfterTTL(streamEntry); return; @@ -345,7 +348,7 @@ export class StreamManager { true, ); streamEntry.isCompleted = true; - this._clearGraceTimer(streamEntry); + clearGraceTimer(streamEntry); // the broadcast above already ended the connected clients; drop them // from the entry so removal is scheduled regardless of whether the @@ -432,7 +435,7 @@ export class StreamManager { // abort the generator after the grace window unless a client reconnects first private _scheduleGraceAbort(streamEntry: StreamEntry): void { // clear any existing timer to avoid stacking - this._clearGraceTimer(streamEntry); + clearGraceTimer(streamEntry); const timer = setTimeout(() => { streamEntry.disconnectGraceTimer = undefined; @@ -448,14 +451,6 @@ export class StreamManager { streamEntry.disconnectGraceTimer = timer; } - // clear a pending disconnect-grace timer, if any - private _clearGraceTimer(streamEntry: StreamEntry): void { - if (streamEntry.disconnectGraceTimer) { - clearTimeout(streamEntry.disconnectGraceTimer); - streamEntry.disconnectGraceTimer = undefined; - } - } - // schedule registry removal once a finished (completed or errored) stream // has no connected clients. The event buffer stays available for // reconnect replay for `bufferTTL` after the last client disconnects; diff --git a/packages/appkit/src/stream/stream-registry.ts b/packages/appkit/src/stream/stream-registry.ts index 1b62595c7..8fb433114 100644 --- a/packages/appkit/src/stream/stream-registry.ts +++ b/packages/appkit/src/stream/stream-registry.ts @@ -1,4 +1,9 @@ -import { SSEErrorCode, type StreamEntry } from "./types"; +import { + clearGraceTimer, + clearRemovalTimer, + SSEErrorCode, + type StreamEntry, +} from "./types"; export class StreamRegistry { // keyed storage with explicit, policy-driven eviction. A ring buffer is @@ -51,24 +56,13 @@ export class StreamRegistry { clear(): void { for (const stream of this.streams.values()) { stream.abortController.abort("Server shutdown"); - this._clearGraceTimer(stream); - if (stream.removalTimer) { - clearTimeout(stream.removalTimer); - stream.removalTimer = undefined; - } + clearGraceTimer(stream); + clearRemovalTimer(stream); } this.streams.clear(); } - // clear a pending grace timer so a removed stream isn't pinned until it fires - private _clearGraceTimer(stream: StreamEntry): void { - if (stream.disconnectGraceTimer) { - clearTimeout(stream.disconnectGraceTimer); - stream.disconnectGraceTimer = undefined; - } - } - // evict the oldest stream from the registry, preferring completed streams. // Completed streams waiting out their buffer TTL can look recently // accessed, so plain LRU could evict a live stream while dead ones @@ -113,7 +107,7 @@ export class StreamRegistry { } } } - this._clearGraceTimer(oldestStream); + clearGraceTimer(oldestStream); // abort with a DOMException AbortError so the error categorizer routes // eviction as an abort (matching the manager's terminal paths) rather // than a stream failure @@ -121,10 +115,7 @@ export class StreamRegistry { new DOMException("Stream evicted", "AbortError"), ); // a pending removal timer would otherwise pin the evicted entry - if (oldestStream.removalTimer) { - clearTimeout(oldestStream.removalTimer); - oldestStream.removalTimer = undefined; - } + clearRemovalTimer(oldestStream); this.streams.delete(oldestStream.streamId); } } diff --git a/packages/appkit/src/stream/types.ts b/packages/appkit/src/stream/types.ts index b7f6b9abb..a4fe4e680 100644 --- a/packages/appkit/src/stream/types.ts +++ b/packages/appkit/src/stream/types.ts @@ -63,3 +63,21 @@ export interface StreamOperation { type: "query" | "stream"; heartbeat?: NodeJS.Timeout; } + +// clear a pending disconnect-grace timer so a removed/reconnected stream +// isn't pinned until it fires +export function clearGraceTimer(entry: StreamEntry): void { + if (entry.disconnectGraceTimer) { + clearTimeout(entry.disconnectGraceTimer); + entry.disconnectGraceTimer = undefined; + } +} + +// clear a pending registry-removal timer so a reconnected/evicted stream +// isn't pulled out from under a client +export function clearRemovalTimer(entry: StreamEntry): void { + if (entry.removalTimer) { + clearTimeout(entry.removalTimer); + entry.removalTimer = undefined; + } +}