From dd95e69181353b8045bd88f68b104ed745c1cd93 Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Thu, 7 May 2026 19:30:27 +0200 Subject: [PATCH 1/2] fix(appkit): decouple cache in-flight execution from per-caller abort signal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cache in-flight deduplication map shared a single Promise across concurrent callers with the same cacheKey. When one caller's signal aborted (e.g. a React StrictMode mount/cleanup pair), fn()'s rejection cascaded through the shared promise to every other awaiter — including still-connected SSE consumers, which broadcast the abort as UPSTREAM_ERROR to the browser. Replace the bare promise map with a reference-counted in-flight entry owning its own AbortController. Callers pass their own callerSignal; the shared controller aborts only when every caller has bailed (refCount -> 0). Each caller's await is raced against its own signal so local aborts reject locally without poisoning the shared result. The catch block no longer wraps abort errors as ExecutionError.statementFailed when the shared controller has already aborted, since no live awaiter would observe them anyway. CacheInterceptor forwards context.signal as callerSignal and swaps context.signal to the cache's shared signal for the duration of fn() so the inner interceptor chain (timeout/retry/telemetry) and the underlying I/O (e.g. analytics SDK calls) observe the shared lifecycle rather than the per-request stream signal. Existing cache + plugin + stream + analytics test suites pass unchanged (715 tests). Signed-off-by: MarioCadenas --- packages/appkit/src/cache/index.ts | 154 ++++++++++++++++-- .../appkit/src/plugin/interceptors/cache.ts | 20 ++- 2 files changed, 154 insertions(+), 20 deletions(-) diff --git a/packages/appkit/src/cache/index.ts b/packages/appkit/src/cache/index.ts index 873aada18..3ae84a6d3 100644 --- a/packages/appkit/src/cache/index.ts +++ b/packages/appkit/src/cache/index.ts @@ -12,6 +12,28 @@ import { InMemoryStorage, PersistentStorage } from "./storage"; const logger = createLogger("cache"); +/** + * Reference-counted in-flight cache execution entry. + * + * `sharedController` decouples the cached `fn()` from any single caller's + * abort signal. Callers join an in-flight entry by incrementing `refCount`; + * when a caller aborts, refCount is decremented. The shared controller is + * aborted only when refCount drops to 0 — i.e. all callers have abandoned + * the request. This prevents one caller's cancellation (e.g. React + * StrictMode unmount) from poisoning the in-flight result for other still- + * connected awaiters. + */ +interface InFlightEntry { + promise: Promise; + refCount: number; + sharedController: AbortController; +} + +function createAbortError(signal: AbortSignal): unknown { + if (signal.reason !== undefined) return signal.reason; + return new DOMException("The operation was aborted.", "AbortError"); +} + /** * Cache manager class to handle cache operations. * Can be used with in-memory storage or persistent storage (Lakebase). @@ -34,7 +56,7 @@ export class CacheManager { private storage: CacheStorage; private config: CacheConfig; - private inFlightRequests: Map>; + private inFlightRequests: Map>; private cleanupInProgress: boolean; private lastCleanupAttempt: number; @@ -174,20 +196,37 @@ export class CacheManager { } /** - * Get or execute a function and cache the result - * @param key - Cache key - * @param fn - Function to execute - * @param userKey - User key + * Get or execute a function and cache the result. + * + * Multiple concurrent callers with the same `cacheKey` are deduplicated + * onto a single in-flight execution. Each caller may pass its own + * `callerSignal`; the underlying `fn()` is run with a shared, internally + * managed `AbortSignal` that aborts only when *all* callers have + * abandoned the request (reference counted). This decouples a single + * caller's cancellation (e.g. React StrictMode unmount) from the shared + * result, so other still-connected callers receive the cached value + * normally. + * + * @param key - Cache key parts + * @param fn - Function to execute. Receives the cache-owned shared signal; + * pass it through to the underlying I/O so the work is cancelled when + * no caller is left waiting. + * @param userKey - User key for cache namespacing * @param options - Options for the cache * @returns Promise of the result */ async getOrExecute( key: (string | number | object)[], - fn: () => Promise, + fn: (sharedSignal?: AbortSignal) => Promise, userKey: string, - options?: { ttl?: number }, + options?: { ttl?: number; callerSignal?: AbortSignal }, ): Promise { - if (!this.config.enabled) return fn(); + if (!this.config.enabled) return fn(options?.callerSignal); + + const callerSignal = options?.callerSignal; + if (callerSignal?.aborted) { + throw createAbortError(callerSignal); + } const cacheKey = this.generateKey(key, userKey); @@ -218,9 +257,14 @@ export class CacheManager { return cached.value; } - // check if the value is being processed by another request - const inFlight = this.inFlightRequests.get(cacheKey); - if (inFlight) { + // check if the value is being processed by another request — join + // the existing in-flight entry under reference counting so this + // caller's abort doesn't poison the shared result. + const existing = this.inFlightRequests.get(cacheKey) as + | InFlightEntry + | undefined; + if (existing) { + existing.refCount++; span.setAttribute("cache.hit", true); span.setAttribute("cache.deduplication", true); span.addEvent("cache.deduplication_used", { @@ -239,10 +283,10 @@ export class CacheManager { }); span.end(); - return inFlight as Promise; + return await this._waitWithRefCount(existing, callerSignal); } - // cache miss - execute function + // cache miss - execute function under a shared abort controller span.setAttribute("cache.hit", false); span.addEvent("cache.miss", { "cache.key": cacheKey }); this.telemetryMetrics.cacheMissCount.add(1, { @@ -254,7 +298,14 @@ export class CacheManager { cache_key: cacheKey, }); - const promise = fn() + const sharedController = new AbortController(); + const entry: InFlightEntry = { + promise: undefined as unknown as Promise, + refCount: 1, + sharedController, + }; + + entry.promise = fn(sharedController.signal) .then(async (result) => { await this.set(cacheKey, result, options); span.addEvent("cache.value_stored", { @@ -266,8 +317,13 @@ export class CacheManager { .catch((error) => { span.recordException(error); span.setStatus({ code: SpanStatusCode.ERROR }); - // Preserve AppKit errors and Databricks API errors (with status codes) - // so route handlers can map them to proper HTTP responses. + // If the shared controller aborted, all callers have already + // abandoned the request (or are about to via their own signals) + // — propagate the original error without wrapping. No live + // awaiter will observe this rejection. + if (sharedController.signal.aborted) { + throw error; + } if (error instanceof AppKitError || error instanceof ApiError) { throw error; } @@ -279,9 +335,14 @@ export class CacheManager { this.inFlightRequests.delete(cacheKey); }); - this.inFlightRequests.set(cacheKey, promise); + // Suppress unhandled rejection warnings when every caller bailed + // before fn() resolved (their own promises rejected via + // _waitWithRefCount; the underlying entry.promise has no awaiter). + entry.promise.catch(() => {}); + + this.inFlightRequests.set(cacheKey, entry as InFlightEntry); - const result = await promise; + const result = await this._waitWithRefCount(entry, callerSignal); span.setStatus({ code: SpanStatusCode.OK }); return result; } catch (error) { @@ -296,6 +357,63 @@ export class CacheManager { ); } + /** + * Wait on an in-flight entry, racing the underlying promise against the + * caller's abort signal. When the caller aborts, the entry's refCount is + * decremented; if it hits zero the shared controller is aborted so the + * underlying `fn()` can stop. Other callers continue to await the same + * entry and receive the result when it arrives. + */ + private _waitWithRefCount( + entry: InFlightEntry, + callerSignal?: AbortSignal, + ): Promise { + if (!callerSignal) return entry.promise; + + return new Promise((resolve, reject) => { + let settled = false; + + const release = () => { + if (entry.refCount > 0) entry.refCount--; + if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) { + entry.sharedController.abort( + callerSignal.reason ?? "all cache callers aborted", + ); + } + }; + + const onAbort = () => { + if (settled) return; + settled = true; + callerSignal.removeEventListener("abort", onAbort); + release(); + reject(createAbortError(callerSignal)); + }; + + if (callerSignal.aborted) { + onAbort(); + return; + } + + callerSignal.addEventListener("abort", onAbort, { once: true }); + + entry.promise.then( + (value) => { + if (settled) return; + settled = true; + callerSignal.removeEventListener("abort", onAbort); + resolve(value); + }, + (error) => { + if (settled) return; + settled = true; + callerSignal.removeEventListener("abort", onAbort); + reject(error); + }, + ); + }); + } + /** * Get a cached value * @param key - Cache key diff --git a/packages/appkit/src/plugin/interceptors/cache.ts b/packages/appkit/src/plugin/interceptors/cache.ts index b8af0ca32..d1637afed 100644 --- a/packages/appkit/src/plugin/interceptors/cache.ts +++ b/packages/appkit/src/plugin/interceptors/cache.ts @@ -18,11 +18,27 @@ export class CacheInterceptor implements ExecutionInterceptor { return fn(); } + const callerSignal = context.signal; + + // The cache may dedupe this request onto a shared in-flight execution. + // Swap context.signal to the cache-owned shared signal for the duration + // of fn() so the inner interceptor chain (timeout/retry/telemetry) and + // the underlying I/O observe abort only when *all* callers have left, + // not just this one. Without this swap, mount #1's abort under React + // StrictMode poisons mount #2's joined inflight result. return this.cacheManager.getOrExecute( this.config.cacheKey, - fn, + async (sharedSignal) => { + const previousSignal = context.signal; + context.signal = sharedSignal; + try { + return await fn(); + } finally { + context.signal = previousSignal; + } + }, context.userKey, - { ttl: this.config.ttl }, + { ttl: this.config.ttl, callerSignal }, ); } } From 8128932552794b7665ee18df00585da421e18c4b Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Mon, 25 May 2026 12:13:13 +0200 Subject: [PATCH 2/2] =?UTF-8?q?fix(appkit):=20harden=20cache=20ref-countin?= =?UTF-8?q?g=20=E2=80=94=20race=20conditions,=20grace=20period,=20abort=20?= =?UTF-8?q?types?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review findings and edge cases in the ref-counted cache abort handling: - Prevent callers from joining already-aborted in-flight entries by guarding on sharedController.signal.aborted - Add 100ms grace period before aborting shared execution so React StrictMode remounts can rejoin the in-flight entry - Make .finally() cleanup conditional to avoid removing a newer entry for the same cache key - Use proper DOMException("…", "AbortError") for stream disconnect abort reasons instead of plain strings - Remove premature span.setStatus(OK) in dedup path — let the outer try/catch handle status consistently - Rename _waitWithRefCount → waitWithRefCount (TypeScript private, not convention prefix) - Clear pending abort timers on cache clear() and when new callers join - Add comprehensive test suite for abort/ref-counting (11 tests) - Update all getOrExecute mock signatures across test files Signed-off-by: Pawel Kosiec --- packages/appkit/src/cache/index.ts | 32 +- .../src/cache/tests/cache-manager.test.ts | 396 ++++++++++++++++++ .../lakebase/tests/pool-manager.test.ts | 5 +- .../lakebase/tests/routing-pool.test.ts | 5 +- .../core/tests/appkit-as-user-exports.test.ts | 10 +- .../appkit/src/plugin/tests/cache.test.ts | 42 +- .../agents/tests/approval-config.test.ts | 5 +- .../agents/tests/approval-route.test.ts | 5 +- .../agents/tests/dispatch-tool-call.test.ts | 5 +- .../plugins/agents/tests/dos-limits.test.ts | 5 +- .../agents/tests/route-handler-errors.test.ts | 5 +- .../tests/analytics.readonly.test.ts | 5 +- .../plugins/analytics/tests/analytics.test.ts | 6 +- .../src/plugins/files/tests/delete.test.ts | 5 +- .../files/tests/download-endpoint.test.ts | 5 +- .../files/tests/error-handling.test.ts | 5 +- .../src/plugins/files/tests/mkdir.test.ts | 5 +- .../files/tests/path-validation.test.ts | 5 +- .../src/plugins/files/tests/plugin.test.ts | 5 +- .../plugins/files/tests/raw-endpoint.test.ts | 5 +- .../src/plugins/files/tests/shutdown.test.ts | 5 +- .../src/plugins/files/tests/upload.test.ts | 5 +- .../plugins/files/tests/volume-config.test.ts | 5 +- .../src/plugins/genie/tests/genie.test.ts | 5 +- .../src/plugins/jobs/tests/plugin.test.ts | 5 +- .../tests/lakebase-agent-tool.test.ts | 5 +- .../src/plugins/serving/tests/serving.test.ts | 5 +- packages/appkit/src/stream/stream-manager.ts | 12 +- 28 files changed, 547 insertions(+), 61 deletions(-) diff --git a/packages/appkit/src/cache/index.ts b/packages/appkit/src/cache/index.ts index 3ae84a6d3..7a948a822 100644 --- a/packages/appkit/src/cache/index.ts +++ b/packages/appkit/src/cache/index.ts @@ -27,6 +27,7 @@ interface InFlightEntry { promise: Promise; refCount: number; sharedController: AbortController; + abortTimer?: ReturnType; } function createAbortError(signal: AbortSignal): unknown { @@ -50,6 +51,7 @@ function createAbortError(signal: AbortSignal): unknown { */ export class CacheManager { private static readonly MIN_CLEANUP_INTERVAL_MS = 60_000; + private static readonly ABORT_GRACE_PERIOD_MS = 100; private readonly name: string = "cache-manager"; private static instance: CacheManager | null = null; private static initPromise: Promise | null = null; @@ -263,14 +265,18 @@ export class CacheManager { const existing = this.inFlightRequests.get(cacheKey) as | InFlightEntry | undefined; - if (existing) { + if (existing && !existing.sharedController.signal.aborted) { existing.refCount++; + // Cancel any pending abort timer — a new caller has joined + if (existing.abortTimer) { + clearTimeout(existing.abortTimer); + existing.abortTimer = undefined; + } span.setAttribute("cache.hit", true); span.setAttribute("cache.deduplication", true); span.addEvent("cache.deduplication_used", { "cache.key": cacheKey, }); - span.setStatus({ code: SpanStatusCode.OK }); this.telemetryMetrics.cacheHitCount.add(1, { "cache.key": cacheKey, "cache.deduplication": "true", @@ -282,7 +288,6 @@ export class CacheManager { cache_deduplication: true, }); - span.end(); return await this._waitWithRefCount(existing, callerSignal); } @@ -332,12 +337,14 @@ export class CacheManager { ); }) .finally(() => { - this.inFlightRequests.delete(cacheKey); + if (this.inFlightRequests.get(cacheKey) === entry) { + this.inFlightRequests.delete(cacheKey); + } }); // Suppress unhandled rejection warnings when every caller bailed // before fn() resolved (their own promises rejected via - // _waitWithRefCount; the underlying entry.promise has no awaiter). + // waitWithRefCount; the underlying entry.promise has no awaiter). entry.promise.catch(() => {}); this.inFlightRequests.set(cacheKey, entry as InFlightEntry); @@ -376,9 +383,15 @@ export class CacheManager { const release = () => { if (entry.refCount > 0) entry.refCount--; if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) { - entry.sharedController.abort( - callerSignal.reason ?? "all cache callers aborted", - ); + // Grace period: delay abort so a StrictMode remount can join + // the in-flight entry before the shared execution is cancelled. + entry.abortTimer = setTimeout(() => { + if (entry.refCount <= 0 && !entry.sharedController.signal.aborted) { + entry.sharedController.abort( + callerSignal.reason ?? "all cache callers aborted", + ); + } + }, CacheManager.ABORT_GRACE_PERIOD_MS); } }; @@ -504,6 +517,9 @@ export class CacheManager { /** Clear the cache */ async clear(): Promise { await this.storage.clear(); + for (const entry of this.inFlightRequests.values()) { + if (entry.abortTimer) clearTimeout(entry.abortTimer); + } this.inFlightRequests.clear(); } diff --git a/packages/appkit/src/cache/tests/cache-manager.test.ts b/packages/appkit/src/cache/tests/cache-manager.test.ts index 3916ef1eb..3f8fa5dbd 100644 --- a/packages/appkit/src/cache/tests/cache-manager.test.ts +++ b/packages/appkit/src/cache/tests/cache-manager.test.ts @@ -378,6 +378,402 @@ describe("CacheManager", () => { expect(r2).toBe("result-2"); expect(fn).toHaveBeenCalledTimes(2); }); + + test("should work when fn ignores signal parameter (non-signal caller regression)", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + // Simulates direct callers like telemetry-example-plugin that pass + // a plain async function without accepting the shared signal. + const fn = vi.fn().mockResolvedValue("no-signal-result"); + + const result = await cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + }); + + expect(result).toBe("no-signal-result"); + expect(fn).toHaveBeenCalledTimes(1); + // The cache passes the shared AbortSignal even if the fn ignores it + expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal)); + }); + }); + + describe("abort / ref-counting", () => { + test("one caller aborts while another still waits — waiting caller resolves normally", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveFn = resolve; + }), + ); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller2.signal, + }); + + // Wait for fn to be invoked and both callers to join the entry + await vi.waitFor(() => expect(fn).toHaveBeenCalledTimes(1)); + await new Promise((r) => setTimeout(r, 0)); + + // Abort caller 1 — caller 2 still holds a ref + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Resolve the underlying fn — caller 2 should still get the result + resolveFn("shared-result"); + await expect(p2).resolves.toBe("shared-result"); + + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("all callers abort — shared controller signal is aborted", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve, reject) => { + capturedSignal = signal; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + }), + ); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller2.signal, + }); + + // Wait for fn to be invoked inside the telemetry span + await vi.waitFor(() => expect(capturedSignal).toBeDefined()); + expect(capturedSignal.aborted).toBe(false); + + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Shared signal still active — one caller remains + expect(capturedSignal.aborted).toBe(false); + + controller2.abort(); + await expect(p2).rejects.toThrow(); + + // Shared signal not yet aborted (grace period) + expect(capturedSignal.aborted).toBe(false); + + // Wait for grace period to expire + await new Promise((r) => setTimeout(r, 150)); + expect(capturedSignal.aborted).toBe(true); + }); + + test("pre-aborted callerSignal throws immediately without executing fn", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + const controller = new AbortController(); + controller.abort(); + + const fn = vi.fn().mockResolvedValue("should-not-run"); + + await expect( + cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }), + ).rejects.toThrow(); + + expect(fn).not.toHaveBeenCalled(); + }); + + test("single caller abort mid-flight rejects with abort error and aborts shared controller", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve, reject) => { + capturedSignal = signal; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + }), + ); + + const controller = new AbortController(); + const p = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }); + + controller.abort(); + await expect(p).rejects.toThrow(); + + // Grace period: shared controller not yet aborted + expect(capturedSignal.aborted).toBe(false); + + // Wait for grace period to expire + await new Promise((r) => setTimeout(r, 150)); + expect(capturedSignal.aborted).toBe(true); + }); + + test("deduped caller abort does not poison the first caller's result", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveFn = resolve; + }), + ); + + const controllerDeduped = new AbortController(); + + // First caller — no signal (or non-aborting) + const p1 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); + + // Second caller joins with an abort signal + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controllerDeduped.signal, + }); + + // Abort the deduped caller + controllerDeduped.abort(); + await expect(p2).rejects.toThrow(); + + // Resolve underlying fn — first caller should get the result + resolveFn("first-caller-result"); + await expect(p1).resolves.toBe("first-caller-result"); + + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("fn rejects while multiple callers wait — all receive the error", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let rejectFn!: (error: Error) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((_resolve, reject) => { + rejectFn = reject; + }), + ); + + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + const p2 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller2.signal, + }); + + // Wait for fn to be invoked inside the telemetry span + await vi.waitFor(() => expect(rejectFn).toBeDefined()); + + const networkError = new Error("network failure"); + rejectFn(networkError); + + await expect(p1).rejects.toThrow("network failure"); + await expect(p2).rejects.toThrow("network failure"); + expect(fn).toHaveBeenCalledTimes(1); + }); + + test("caller aborts after promise already resolved — gets the resolved value", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + const fn = vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveFn = resolve; + }), + ); + + const controller = new AbortController(); + + const p = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }); + + // Wait for fn to be invoked inside the telemetry span + await vi.waitFor(() => expect(resolveFn).toBeDefined()); + + // Resolve first, then abort + resolveFn("already-resolved"); + // Allow microtask to settle the promise + await new Promise((r) => setTimeout(r, 0)); + controller.abort(); + + await expect(p).resolves.toBe("already-resolved"); + }); + + test("new caller after previous entry fully aborted gets fresh execution", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let callCount = 0; + const capturedSignals: AbortSignal[] = []; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve, reject) => { + callCount++; + capturedSignals.push(signal); + const currentCall = callCount; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + // Only auto-resolve for the second call (first is held open + // until the abort timer fires and cancels it) + if (currentCall > 1) { + setTimeout(() => { + if (!signal.aborted) resolve(`result-${currentCall}`); + }, 10); + } + }), + ); + + const controller1 = new AbortController(); + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + + // Abort the only caller — shared controller should abort after grace period + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Wait for grace period + .finally() cleanup + await new Promise((r) => setTimeout(r, 200)); + + // New caller arrives — should get a fresh execution, not the aborted one + const p2 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); + await expect(p2).resolves.toBe("result-2"); + + expect(fn).toHaveBeenCalledTimes(2); + expect(capturedSignals[0].aborted).toBe(true); + expect(capturedSignals[1].aborted).toBe(false); + }); + + test("grace period: new caller joins before timer fires — no abort, single execution", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let resolveFn!: (value: string) => void; + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((resolve) => { + capturedSignal = signal; + resolveFn = resolve; + }), + ); + + const controller1 = new AbortController(); + + // Mount #1: starts execution + const p1 = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller1.signal, + }); + + // Wait for fn to be invoked + await vi.waitFor(() => expect(fn).toHaveBeenCalledTimes(1)); + + // Mount #1 unmounts — abort fires, grace period starts + controller1.abort(); + await expect(p1).rejects.toThrow(); + + // Shared signal should NOT be aborted yet (grace period active) + expect(capturedSignal.aborted).toBe(false); + + // Mount #2 arrives within the grace period — joins the same entry + const p2 = cache.getOrExecute(["key"], fn, "user1", { ttl: 60 }); + + // Resolve the underlying fn — mount #2 gets the result + resolveFn("shared-result"); + await expect(p2).resolves.toBe("shared-result"); + + // fn was only called once — no duplicate execution + expect(fn).toHaveBeenCalledTimes(1); + // Shared signal was never aborted + expect(capturedSignal.aborted).toBe(false); + }); + + test("grace period: no new caller arrives — timer fires and aborts shared controller", async () => { + const cache = await CacheManager.getInstance({ + storage: createMockStorage(), + }); + + let capturedSignal!: AbortSignal; + const fn = vi.fn().mockImplementation( + (signal: AbortSignal) => + new Promise((_resolve, reject) => { + capturedSignal = signal; + signal.addEventListener("abort", () => + reject(new DOMException("aborted", "AbortError")), + ); + }), + ); + + const controller = new AbortController(); + const p = cache.getOrExecute(["key"], fn, "user1", { + ttl: 60, + callerSignal: controller.signal, + }); + + await vi.waitFor(() => expect(capturedSignal).toBeDefined()); + + controller.abort(); + await expect(p).rejects.toThrow(); + + // Shared signal not yet aborted (grace period) + expect(capturedSignal.aborted).toBe(false); + + // Wait for grace period to expire (100ms + buffer) + await new Promise((r) => setTimeout(r, 150)); + + // Now shared signal should be aborted + expect(capturedSignal.aborted).toBe(true); + }); }); describe("disabled cache", () => { diff --git a/packages/appkit/src/connectors/lakebase/tests/pool-manager.test.ts b/packages/appkit/src/connectors/lakebase/tests/pool-manager.test.ts index 2b0efff77..9def8f43e 100644 --- a/packages/appkit/src/connectors/lakebase/tests/pool-manager.test.ts +++ b/packages/appkit/src/connectors/lakebase/tests/pool-manager.test.ts @@ -7,8 +7,9 @@ vi.mock("../../../cache", () => ({ get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })), diff --git a/packages/appkit/src/connectors/lakebase/tests/routing-pool.test.ts b/packages/appkit/src/connectors/lakebase/tests/routing-pool.test.ts index f87277616..75b278a41 100644 --- a/packages/appkit/src/connectors/lakebase/tests/routing-pool.test.ts +++ b/packages/appkit/src/connectors/lakebase/tests/routing-pool.test.ts @@ -8,8 +8,9 @@ vi.mock("../../../cache", () => ({ get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })), diff --git a/packages/appkit/src/core/tests/appkit-as-user-exports.test.ts b/packages/appkit/src/core/tests/appkit-as-user-exports.test.ts index 968c064e1..e00a37e36 100644 --- a/packages/appkit/src/core/tests/appkit-as-user-exports.test.ts +++ b/packages/appkit/src/core/tests/appkit-as-user-exports.test.ts @@ -18,8 +18,9 @@ vi.mock("../../cache", () => ({ get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })), @@ -27,8 +28,9 @@ vi.mock("../../cache", () => ({ get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })), diff --git a/packages/appkit/src/plugin/tests/cache.test.ts b/packages/appkit/src/plugin/tests/cache.test.ts index 7e01e7787..12730f67b 100644 --- a/packages/appkit/src/plugin/tests/cache.test.ts +++ b/packages/appkit/src/plugin/tests/cache.test.ts @@ -20,9 +20,9 @@ class MockCacheManager { async getOrExecute( key: (string | number | object)[], - fn: () => Promise, + fn: (sharedSignal?: AbortSignal) => Promise, userKey: string, - options?: { ttl?: number }, + options?: { ttl?: number; callerSignal?: AbortSignal }, ): Promise { const cacheKey = this.generateKey(key, userKey); const cached = await this.get(cacheKey); @@ -35,7 +35,8 @@ class MockCacheManager { return inFlight as Promise; } - const promise = fn() + const sharedController = new AbortController(); + const promise = fn(sharedController.signal) .then(async (result) => { await this.set(cacheKey, result, options); return result; @@ -263,4 +264,39 @@ describe("CacheInterceptor", () => { expect(fn).toHaveBeenCalledTimes(2); }); + + test("should swap context.signal to shared signal during fn() and restore afterward", async () => { + const config: CacheConfig = { + enabled: true, + cacheKey: ["test", "signal-swap"], + }; + const callerSignal = new AbortController().signal; + const contextWithSignal: InterceptorContext = { + metadata: new Map(), + userKey: "service", + signal: callerSignal, + }; + const interceptor = new CacheInterceptor( + cacheManager as unknown as ConstructorParameters< + typeof CacheInterceptor + >[0], + config, + ); + + let signalDuringExecution: AbortSignal | undefined; + const fn = vi.fn().mockImplementation(async () => { + signalDuringExecution = contextWithSignal.signal; + return "result"; + }); + + await interceptor.intercept(fn, contextWithSignal); + + // During fn(), context.signal should have been swapped to the shared signal + expect(signalDuringExecution).toBeDefined(); + expect(signalDuringExecution).not.toBe(callerSignal); + expect(signalDuringExecution).toBeInstanceOf(AbortSignal); + + // After intercept completes, context.signal should be restored + expect(contextWithSignal.signal).toBe(callerSignal); + }); }); diff --git a/packages/appkit/src/plugins/agents/tests/approval-config.test.ts b/packages/appkit/src/plugins/agents/tests/approval-config.test.ts index 0827da35e..855bf716b 100644 --- a/packages/appkit/src/plugins/agents/tests/approval-config.test.ts +++ b/packages/appkit/src/plugins/agents/tests/approval-config.test.ts @@ -27,8 +27,9 @@ beforeEach(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })) as unknown as typeof CacheManager.getInstanceSync; diff --git a/packages/appkit/src/plugins/agents/tests/approval-route.test.ts b/packages/appkit/src/plugins/agents/tests/approval-route.test.ts index 6e090bd2f..61595fcbf 100644 --- a/packages/appkit/src/plugins/agents/tests/approval-route.test.ts +++ b/packages/appkit/src/plugins/agents/tests/approval-route.test.ts @@ -51,8 +51,9 @@ beforeEach(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })) as any; diff --git a/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts b/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts index 047836648..2767766c3 100644 --- a/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts +++ b/packages/appkit/src/plugins/agents/tests/dispatch-tool-call.test.ts @@ -27,8 +27,9 @@ beforeEach(() => { (CacheManager as any).instance = { get: vi.fn(), set: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), }; diff --git a/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts b/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts index aecf75fb9..8aba4ce1d 100644 --- a/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts +++ b/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts @@ -52,8 +52,9 @@ beforeEach(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), // biome-ignore lint/suspicious/noExplicitAny: test mock diff --git a/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts b/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts index 9ea5cb2bf..2fc493ef4 100644 --- a/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts +++ b/packages/appkit/src/plugins/agents/tests/route-handler-errors.test.ts @@ -23,8 +23,9 @@ beforeEach(() => { (CacheManager as any).instance = { get: vi.fn(), set: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), }; diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.readonly.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.readonly.test.ts index b8cf21d77..68b6b94d7 100644 --- a/packages/appkit/src/plugins/analytics/tests/analytics.readonly.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/analytics.readonly.test.ts @@ -6,8 +6,9 @@ vi.mock("../../../cache", () => ({ get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })), diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts index eb06ea952..d5034c689 100644 --- a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts @@ -27,7 +27,11 @@ const { mockCacheStore, mockCacheInstance } = vi.hoisted(() => { set: vi.fn(), delete: vi.fn(), getOrExecute: vi.fn( - async (key: unknown[], fn: () => Promise, userKey: string) => { + async ( + key: unknown[], + fn: (signal?: AbortSignal) => Promise, + userKey: string, + ) => { const cacheKey = generateKey(key, userKey); if (store.has(cacheKey)) { return store.get(cacheKey); diff --git a/packages/appkit/src/plugins/files/tests/delete.test.ts b/packages/appkit/src/plugins/files/tests/delete.test.ts index 146ec8752..748152b6a 100644 --- a/packages/appkit/src/plugins/files/tests/delete.test.ts +++ b/packages/appkit/src/plugins/files/tests/delete.test.ts @@ -37,8 +37,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts b/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts index 5d7bc3bae..550c20be1 100644 --- a/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts +++ b/packages/appkit/src/plugins/files/tests/download-endpoint.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/error-handling.test.ts b/packages/appkit/src/plugins/files/tests/error-handling.test.ts index c0ce31c41..a16e40d0a 100644 --- a/packages/appkit/src/plugins/files/tests/error-handling.test.ts +++ b/packages/appkit/src/plugins/files/tests/error-handling.test.ts @@ -37,8 +37,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/mkdir.test.ts b/packages/appkit/src/plugins/files/tests/mkdir.test.ts index e6557acd2..8f667f5c0 100644 --- a/packages/appkit/src/plugins/files/tests/mkdir.test.ts +++ b/packages/appkit/src/plugins/files/tests/mkdir.test.ts @@ -37,8 +37,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/path-validation.test.ts b/packages/appkit/src/plugins/files/tests/path-validation.test.ts index e4b4ac64f..8ef0cdb62 100644 --- a/packages/appkit/src/plugins/files/tests/path-validation.test.ts +++ b/packages/appkit/src/plugins/files/tests/path-validation.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/plugin.test.ts b/packages/appkit/src/plugins/files/tests/plugin.test.ts index da70b538b..3136a21c0 100644 --- a/packages/appkit/src/plugins/files/tests/plugin.test.ts +++ b/packages/appkit/src/plugins/files/tests/plugin.test.ts @@ -44,8 +44,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(), }; diff --git a/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts b/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts index c3de70aba..99641bce4 100644 --- a/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts +++ b/packages/appkit/src/plugins/files/tests/raw-endpoint.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/shutdown.test.ts b/packages/appkit/src/plugins/files/tests/shutdown.test.ts index dcf761c48..99eaa1e86 100644 --- a/packages/appkit/src/plugins/files/tests/shutdown.test.ts +++ b/packages/appkit/src/plugins/files/tests/shutdown.test.ts @@ -30,8 +30,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/upload.test.ts b/packages/appkit/src/plugins/files/tests/upload.test.ts index baf3aa549..725756a99 100644 --- a/packages/appkit/src/plugins/files/tests/upload.test.ts +++ b/packages/appkit/src/plugins/files/tests/upload.test.ts @@ -38,8 +38,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/files/tests/volume-config.test.ts b/packages/appkit/src/plugins/files/tests/volume-config.test.ts index da50bcc25..1f676afc3 100644 --- a/packages/appkit/src/plugins/files/tests/volume-config.test.ts +++ b/packages/appkit/src/plugins/files/tests/volume-config.test.ts @@ -30,8 +30,9 @@ const { mockClient, MockApiError, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn((...args: unknown[]) => JSON.stringify(args)), }; diff --git a/packages/appkit/src/plugins/genie/tests/genie.test.ts b/packages/appkit/src/plugins/genie/tests/genie.test.ts index fcb2e167c..0af6c25b3 100644 --- a/packages/appkit/src/plugins/genie/tests/genie.test.ts +++ b/packages/appkit/src/plugins/genie/tests/genie.test.ts @@ -21,7 +21,10 @@ const { mockCacheInstance } = vi.hoisted(() => { getOrExecute: vi .fn() .mockImplementation( - async (_key: unknown[], fn: () => Promise) => { + async ( + _key: unknown[], + fn: (signal?: AbortSignal) => Promise, + ) => { return await fn(); }, ), diff --git a/packages/appkit/src/plugins/jobs/tests/plugin.test.ts b/packages/appkit/src/plugins/jobs/tests/plugin.test.ts index ba9e65193..9fff3535a 100644 --- a/packages/appkit/src/plugins/jobs/tests/plugin.test.ts +++ b/packages/appkit/src/plugins/jobs/tests/plugin.test.ts @@ -35,8 +35,9 @@ const { mockClient, mockCacheInstance } = vi.hoisted(() => { get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_key: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(), }; diff --git a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts index 24d37341f..5f96d6efe 100644 --- a/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts +++ b/packages/appkit/src/plugins/lakebase/tests/lakebase-agent-tool.test.ts @@ -15,8 +15,9 @@ vi.mock("../../../cache", () => ({ get: vi.fn(), set: vi.fn(), delete: vi.fn(), - getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => - fn(), + getOrExecute: vi.fn( + async (_k: unknown[], fn: (signal?: AbortSignal) => Promise) => + fn(), ), generateKey: vi.fn(() => "test-key"), })), diff --git a/packages/appkit/src/plugins/serving/tests/serving.test.ts b/packages/appkit/src/plugins/serving/tests/serving.test.ts index 8fbe79bba..216c2b727 100644 --- a/packages/appkit/src/plugins/serving/tests/serving.test.ts +++ b/packages/appkit/src/plugins/serving/tests/serving.test.ts @@ -20,7 +20,10 @@ const { mockCacheInstance } = vi.hoisted(() => { getOrExecute: vi .fn() .mockImplementation( - async (_key: unknown[], fn: () => Promise) => { + async ( + _key: unknown[], + fn: (signal?: AbortSignal) => Promise, + ) => { return await fn(); }, ), diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index 901e0b46c..a5b3a4b32 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -76,7 +76,9 @@ export class StreamManager { abortAll(): void { this.activeOperations.forEach((operation) => { if (operation.heartbeat) clearInterval(operation.heartbeat); - operation.controller.abort("Server shutdown"); + operation.controller.abort( + new DOMException("Server shutdown", "AbortError"), + ); }); this.activeOperations.clear(); this.streamRegistry.clear(); @@ -140,7 +142,9 @@ export class StreamManager { // Stop the generator when no clients remain if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - streamEntry.abortController.abort("All clients disconnected"); + streamEntry.abortController.abort( + new DOMException("All clients disconnected", "AbortError"), + ); } // cleanup if stream is completed and no clients are connected @@ -227,7 +231,9 @@ export class StreamManager { // Stop the generator when no clients remain so polling loops // (e.g. jobs runAndWait) don't keep running in the background. if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { - abortController.abort("Client disconnected"); + abortController.abort( + new DOMException("Client disconnected", "AbortError"), + ); } });