From 1b3b990c3b8fcaeabe2e7dcfbc10ca12c9fb34e2 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Tue, 12 May 2026 08:07:50 +0000 Subject: [PATCH 1/3] fix: thread abort signals through host waits --- mise.lock | 3 +- src/host/hostMain.ts | 232 ++++++++++++------ src/host/lifecycle.ts | 49 +++- src/host/rpcClient.ts | 35 ++- src/host/rpcServer.ts | 79 +++++- src/host/runCompletionCoordinator.ts | 87 +++++-- src/util/abort.ts | 44 ++++ .../host/runCompletionCoordinator.test.ts | 37 +++ 8 files changed, 455 insertions(+), 111 deletions(-) create mode 100644 src/util/abort.ts diff --git a/mise.lock b/mise.lock index dd1b348..87e7c40 100644 --- a/mise.lock +++ b/mise.lock @@ -1,4 +1,4 @@ -# @generated - this file is auto-generated by `mise lock` https://mise.en.dev/dev-tools/mise-lock.html +# @generated - this file is auto-generated by `mise lock` https://mise.jdx.dev/dev-tools/mise-lock.html [[tools.actionlint]] version = "1.7.12" @@ -97,6 +97,7 @@ url_api = "https://api.github.com/repos/jdx/communique/releases/assets/413753918 checksum = "sha256:61b46cf231882e0a10e0489df310ec9b875e83da0f38b60d6ac7bd67e09a15dc" url = "https://github.com/jdx/communique/releases/download/v1.1.3/communique-x86_64-unknown-linux-gnu.tar.gz" url_api = "https://api.github.com/repos/jdx/communique/releases/assets/413753688" +provenance = "github-attestations" [tools.communique."platforms.linux-x64-musl"] checksum = "sha256:9c8d784d8bb91e83cd467ed0d26be6ed3290a61a482413d42b12321aee20d5c0" diff --git a/src/host/hostMain.ts b/src/host/hostMain.ts index 663bfcb..002795f 100644 --- a/src/host/hostMain.ts +++ b/src/host/hostMain.ts @@ -49,7 +49,13 @@ import { sessionDir, socketPath, } from '../storage/sessionPaths.js'; +import { + addAbortListener, + makeAbortError, + throwIfAborted, +} from '../util/abort.js'; import { invariant } from '../util/assert.js'; +import { ResourceScope } from '../util/resourceScope.js'; const ALLOWED_SIGNALS = [ 'SIGTERM', @@ -100,6 +106,69 @@ function rethrowAsync(error: unknown): void { }); } +async function waitForScopedOperation(params: { + readonly operationName: string; + readonly operation: Promise; + readonly scope: ResourceScope; + readonly signal: AbortSignal; + readonly timeoutMs?: number; + readonly timeoutResult?: () => T; +}): Promise { + const { operationName, operation, scope, signal, timeoutMs, timeoutResult } = + params; + invariant(operationName.length > 0, 'operationName must not be empty'); + if (signal.aborted) { + await scope.close(); + throw makeAbortError(signal); + } + + const { promise, reject, resolve } = Promise.withResolvers(); + let settled = false; + + const rejectWithError = (error: unknown): void => { + if (settled) { + return; + } + + settled = true; + void scope.close().then(() => { + reject(error instanceof Error ? error : new Error(String(error))); + }, reject); + }; + + const resolveWithValue = (value: T): void => { + if (settled) { + return; + } + + settled = true; + void scope.close().then(() => { + resolve(value); + }, reject); + }; + + if (timeoutMs !== undefined) { + invariant( + timeoutResult !== undefined, + 'timeoutResult must be provided when timeoutMs is set', + ); + const timeoutHandle = setTimeout(() => { + resolveWithValue(timeoutResult()); + }, timeoutMs); + scope.add(`${operationName} timeout`, () => { + clearTimeout(timeoutHandle); + }); + } + + addAbortListener(scope, `${operationName} abort listener`, signal, () => { + rejectWithError(makeAbortError(signal)); + }); + + void operation.then(resolveWithValue, rejectWithError); + + return await promise; +} + function resolveHostRendererName(input: string | undefined): RendererName { try { return resolveRendererName( @@ -169,7 +238,9 @@ export async function runHost(sessionId: string): Promise { let ptyHasExited = false; let lastOutputAt = Date.now(); let lastActivityAt = lastOutputAt; + const hostAbortController = new AbortController(); let idleTimeoutHandle: ReturnType | null = null; + let idleTimeoutScope: ResourceScope | null = null; let rpcListenPromise: Promise | null = null; let shutdownPromise: Promise | null = null; let markPtyExited: () => void = () => { @@ -250,25 +321,35 @@ export async function runHost(sessionId: string): Promise { const clearIdleTimeout = (): void => { // Idempotent: safe to call multiple times during shutdown and PTY exit. - if (idleTimeoutHandle === null) { + const scope = idleTimeoutScope; + idleTimeoutScope = null; + if (scope === null) { return; } - clearInterval(idleTimeoutHandle); - idleTimeoutHandle = null; + void scope.close().catch(rethrowAsync); }; const startIdlePolling = (): void => { if ( idleTimeoutMs <= 0 || !isSessionCommandable(state) || - idleTimeoutHandle !== null + idleTimeoutScope !== null ) { return; } + throwIfAborted(hostAbortController.signal); + + const scope = new ResourceScope(); + idleTimeoutScope = scope; const checkIntervalMs = Math.min(idleTimeoutMs, IDLE_CHECK_CAP_MS); idleTimeoutHandle = setInterval(() => { + if (hostAbortController.signal.aborted) { + clearIdleTimeout(); + return; + } + if (!isSessionCommandable(state)) { clearIdleTimeout(); return; @@ -280,6 +361,20 @@ export async function runHost(sessionId: string): Promise { pty.kill(); } }, checkIntervalMs); + scope.add('idle timeout interval', () => { + if (idleTimeoutHandle !== null) { + clearInterval(idleTimeoutHandle); + idleTimeoutHandle = null; + } + }); + addAbortListener( + scope, + 'idle timeout abort listener', + hostAbortController.signal, + () => { + clearIdleTimeout(); + }, + ); }; const initiateShutdown = (): Promise => { @@ -289,6 +384,7 @@ export async function runHost(sessionId: string): Promise { shutdownPromise = (async () => { try { + hostAbortController.abort(makeAbortError()); clearIdleTimeout(); if (isSessionCommandable(state)) { pty.kill(); @@ -478,9 +574,11 @@ export async function runHost(sessionId: string): Promise { await eventLog.append('input_paste', { data: encoded }); return {}; }, - run: async (params: unknown) => { + run: async (params: unknown, context) => { const { command, noWait, timeoutMs } = params as RunParams; + const { signal } = context; + throwIfAborted(signal); assertSessionCommandable(state); invariant( @@ -527,7 +625,7 @@ export async function runHost(sessionId: string): Promise { pty.write(injectedText); lastActivityAt = Date.now(); - const waitResult = await completion.wait(effectiveTimeoutMs); + const waitResult = await completion.wait(effectiveTimeoutMs, { signal }); const durationMs = Date.now() - startTime; if (waitResult.kind === 'completed') { @@ -656,11 +754,13 @@ export async function runHost(sessionId: string): Promise { await eventLog.append('signal', { signal }); return {}; }, - wait: async (params: unknown) => { + wait: async (params: unknown, context) => { const { exit, idleMs, timeoutMs } = params as WaitParams; + const { signal } = context; const hasExit = exit === true; const hasIdle = idleMs !== undefined; + throwIfAborted(signal); if (hasExit === hasIdle) { throw makeCliError(ERROR_CODES.INVALID_DURATION, { message: 'Specify exactly one of exit or idleMs.', @@ -680,8 +780,8 @@ export async function runHost(sessionId: string): Promise { ); } + const waitScope = new ResourceScope(); let waitCondition: Promise; - let clearWaitCondition: (() => void) | null = null; if (hasExit) { if (ptyHasExited) { @@ -714,13 +814,16 @@ export async function runHost(sessionId: string): Promise { waitCondition = new Promise((resolve) => { const checkInterval = setInterval( () => { + if (signal.aborted) { + return; + } + const effectiveLastOutput = Math.max(lastOutputAt, idleAnchor); const elapsed = Date.now() - effectiveLastOutput; if (elapsed < idleDuration) { return; } - clearInterval(checkInterval); const snapshot = state.snapshot(); const result: WaitOutcome = { timedOut: false }; if (snapshot.exitCode !== null) { @@ -731,30 +834,26 @@ export async function runHost(sessionId: string): Promise { Math.min(idleDuration / 2, 100), ); - clearWaitCondition = (): void => { + waitScope.add('wait idle poll interval', () => { clearInterval(checkInterval); - }; + }); }); } - if (timeoutMs === undefined) { - return await waitCondition; - } - - return await new Promise((resolve) => { - const timeoutHandle = setTimeout(() => { - clearWaitCondition?.(); - resolve({ timedOut: true }); - }, timeoutMs); - - void waitCondition.then((result) => { - clearTimeout(timeoutHandle); - clearWaitCondition?.(); - resolve(result); - }); + return await waitForScopedOperation({ + operationName: 'wait', + operation: waitCondition, + scope: waitScope, + signal, + ...(timeoutMs === undefined + ? {} + : { + timeoutMs, + timeoutResult: () => ({ timedOut: true }), + }), }); }, - waitForRender: async (params: unknown) => { + waitForRender: async (params: unknown, context) => { const { text, regex, @@ -764,7 +863,9 @@ export async function runHost(sessionId: string): Promise { timeoutMs, rendererName: requestedRendererName, } = params as WaitForRenderParams; + const { signal } = context; + throwIfAborted(signal); const preparedCondition = prepareRenderWaitCondition({ text, regex, @@ -782,30 +883,33 @@ export async function runHost(sessionId: string): Promise { const rendererName = resolveHostRendererName(requestedRendererName); const profile = resolveProfile(DEFAULT_RENDER_PROFILE_NAME); const pollIntervalMs = 200; + const waitScope = new ResourceScope(); let lastVisibleText: string | undefined; let lastTextChangeAt = Date.now(); let latestCapturedAtSeq = 0; - let clearWaitPoll: (() => void) | null = null; const pollCondition = new Promise((resolve) => { let pollInFlight = false; let consecutiveFailures = 0; const checkInterval = setInterval(() => { - if (pollInFlight) { + if (signal.aborted || pollInFlight) { return; } pollInFlight = true; void (async () => { try { + throwIfAborted(signal); const replayInput = loadReplayInput(); const backend = await rendererManager.getBackend( rendererName, profile, replayInput, ); + throwIfAborted(signal); const snapshot = await backend.snapshot(); + throwIfAborted(signal); const visibleText = snapshot.visibleLines .map((line) => line.text) .join('\n'); @@ -831,7 +935,6 @@ export async function runHost(sessionId: string): Promise { ); if (match.matched) { - clearInterval(checkInterval); resolve({ matched: true, timedOut: false, @@ -844,10 +947,13 @@ export async function runHost(sessionId: string): Promise { }); } } catch (pollError) { + if (signal.aborted) { + return; + } + void pollError; consecutiveFailures += 1; if (consecutiveFailures >= MAX_CONSECUTIVE_POLL_FAILURES) { - clearInterval(checkInterval); resolve({ matched: false, timedOut: true, @@ -862,47 +968,35 @@ export async function runHost(sessionId: string): Promise { })(); }, pollIntervalMs); - clearWaitPoll = (): void => { + waitScope.add('waitForRender poll interval', () => { clearInterval(checkInterval); - }; + }); }); - if (timeoutMs === undefined) { - return await pollCondition; - } - - return await new Promise((resolve) => { - let resolved = false; - const timeoutHandle = setTimeout(() => { - if (resolved) { - return; - } - resolved = true; - clearWaitPoll?.(); - - try { - const replayInput = loadReplayInput(); - latestCapturedAtSeq = replayInput?.targetSeq ?? 0; - } catch { - // Best-effort snapshot for timeout reporting. - } - - resolve({ - matched: false, - timedOut: true, - capturedAtSeq: latestCapturedAtSeq, - }); - }, timeoutMs); - - void pollCondition.then((result) => { - if (resolved) { - return; - } - resolved = true; - clearTimeout(timeoutHandle); - clearWaitPoll?.(); - resolve(result); - }); + return await waitForScopedOperation({ + operationName: 'waitForRender', + operation: pollCondition, + scope: waitScope, + signal, + ...(timeoutMs === undefined + ? {} + : { + timeoutMs, + timeoutResult: () => { + try { + const replayInput = loadReplayInput(); + latestCapturedAtSeq = replayInput?.targetSeq ?? 0; + } catch { + // Best-effort snapshot for timeout reporting. + } + + return { + matched: false, + timedOut: true, + capturedAtSeq: latestCapturedAtSeq, + }; + }, + }), }); }, destroy: () => { diff --git a/src/host/lifecycle.ts b/src/host/lifecycle.ts index 1b423c7..5623dff 100644 --- a/src/host/lifecycle.ts +++ b/src/host/lifecycle.ts @@ -25,12 +25,27 @@ import { sessionDir, socketPath, } from '../storage/sessionPaths.js'; +import { throwIfAborted } from '../util/abort.js'; import { invariant } from '../util/assert.js'; import { sendRpc } from './rpcClient.js'; const DESTROY_POLL_INTERVAL_MS = 100; const DESTROY_MAX_ATTEMPTS = 50; +interface PollOptions { + readonly signal?: AbortSignal; +} + +function pollOptions(signal?: AbortSignal): PollOptions { + return signal === undefined ? {} : { signal }; +} + +function delayOptions( + signal?: AbortSignal, +): { signal: AbortSignal } | undefined { + return signal === undefined ? undefined : { signal }; +} + interface NodeError extends Error { code?: string; } @@ -246,6 +261,7 @@ async function waitForTerminalManifest( manifestFile: string, maxAttempts: number = DESTROY_MAX_ATTEMPTS, intervalMs: number = DESTROY_POLL_INTERVAL_MS, + options: PollOptions = {}, ): Promise { invariant( Number.isInteger(maxAttempts) && maxAttempts > 0, @@ -256,7 +272,11 @@ async function waitForTerminalManifest( 'intervalMs must be a non-negative integer', ); + const { signal } = options; + throwIfAborted(signal); + for (let attempt = 0; attempt < maxAttempts; attempt += 1) { + throwIfAborted(signal); const manifest = await readManifest(manifestFile); if (isTerminalSessionStatus(manifest.status)) { @@ -264,7 +284,7 @@ async function waitForTerminalManifest( } if (attempt + 1 < maxAttempts) { - await delay(intervalMs); + await delay(intervalMs, undefined, delayOptions(signal)); } } @@ -277,6 +297,7 @@ async function waitForProcessAndSocketShutdown( socketFile: string, maxAttempts: number = DESTROY_MAX_ATTEMPTS, intervalMs: number = DESTROY_POLL_INTERVAL_MS, + options: PollOptions = {}, ): Promise { invariant( Number.isInteger(maxAttempts) && maxAttempts > 0, @@ -287,7 +308,11 @@ async function waitForProcessAndSocketShutdown( 'intervalMs must be a non-negative integer', ); + const { signal } = options; + throwIfAborted(signal); + for (let attempt = 0; attempt < maxAttempts; attempt += 1) { + throwIfAborted(signal); const hostAlive = isProcessAlive(hostPid); const childAlive = isProcessAlive(childPid); const socketPresent = await pathExists(socketFile); @@ -297,7 +322,7 @@ async function waitForProcessAndSocketShutdown( } if (attempt + 1 < maxAttempts) { - await delay(intervalMs); + await delay(intervalMs, undefined, delayOptions(signal)); } } @@ -458,10 +483,17 @@ export function launchHost(config: LaunchHostConfig): number { return child.pid; } +export interface DestroySessionOptions { + readonly signal?: AbortSignal; +} + export async function destroySession( sessionId: string, force?: boolean, + options: DestroySessionOptions = {}, ): Promise { + const { signal } = options; + throwIfAborted(signal); const { sessionDirectory, manifestFile, socketFile } = getSessionPaths(sessionId); const manifest = await readSessionManifestOrThrow(sessionId, manifestFile); @@ -493,6 +525,9 @@ export async function destroySession( manifest.hostPid, manifest.childPid, socketFile, + DESTROY_MAX_ATTEMPTS, + DESTROY_POLL_INTERVAL_MS, + pollOptions(signal), ); await reconcileSession(sessionDirectory); @@ -514,7 +549,8 @@ export async function destroySession( } try { - await sendRpc(socketFile, 'destroy'); + throwIfAborted(signal); + await sendRpc(socketFile, 'destroy', undefined, undefined, signal); } catch (error) { if ( !(error instanceof CliError) || @@ -535,7 +571,12 @@ export async function destroySession( throw error; } - const terminalManifest = await waitForTerminalManifest(manifestFile); + const terminalManifest = await waitForTerminalManifest( + manifestFile, + DESTROY_MAX_ATTEMPTS, + DESTROY_POLL_INTERVAL_MS, + pollOptions(signal), + ); if (terminalManifest !== null) { return; } diff --git a/src/host/rpcClient.ts b/src/host/rpcClient.ts index e1bab2f..8398333 100644 --- a/src/host/rpcClient.ts +++ b/src/host/rpcClient.ts @@ -13,7 +13,13 @@ import { RpcResponseSchema, type RpcMethod, } from '../protocol/messages.js'; +import { + addAbortListener, + makeAbortError, + throwIfAborted, +} from '../util/abort.js'; import { invariant } from '../util/assert.js'; +import { ResourceScope } from '../util/resourceScope.js'; const DEFAULT_TIMEOUT_MS = 5_000; const MAX_RPC_BUFFER_BYTES = 1_048_576; @@ -95,7 +101,9 @@ export async function sendRpc( method: string, params?: Record, timeoutMs?: number, + signal?: AbortSignal, ): Promise { + throwIfAborted(signal); const effectiveTimeoutMs = timeoutMs ?? DEFAULT_TIMEOUT_MS; invariant( Number.isFinite(effectiveTimeoutMs) && effectiveTimeoutMs >= 0, @@ -116,6 +124,10 @@ export async function sendRpc( return await new Promise((resolve, reject) => { const socket = net.connect({ path: socketPath }); + const scope = new ResourceScope(); + scope.add('rpc client socket', () => { + socket.destroy(); + }); let settled = false; let responseHandled = false; let buffer = ''; @@ -126,8 +138,9 @@ export async function sendRpc( } settled = true; - socket.destroy(); - reject(error); + void scope.close().then(() => { + reject(error); + }, reject); }; const rejectWithTransportError = (error: unknown): void => { @@ -142,10 +155,24 @@ export async function sendRpc( } settled = true; - socket.destroy(); - resolve(result); + void scope.close().then(() => { + resolve(result); + }, reject); }; + if (signal !== undefined) { + addAbortListener(scope, 'rpc client abort listener', signal, () => { + if (settled) { + return; + } + + settled = true; + void scope.close().then(() => { + reject(makeAbortError(signal)); + }, reject); + }); + } + socket.setEncoding('utf8'); socket.setTimeout(effectiveTimeoutMs); diff --git a/src/host/rpcServer.ts b/src/host/rpcServer.ts index 873a523..c5bb61d 100644 --- a/src/host/rpcServer.ts +++ b/src/host/rpcServer.ts @@ -10,14 +10,24 @@ import { type RpcMethod, type RpcResponse, } from '../protocol/messages.js'; +import { makeAbortError } from '../util/abort.js'; import { invariant } from '../util/assert.js'; +import { ResourceScope } from '../util/resourceScope.js'; const MAX_UNIX_SOCKET_PATH = 104; const MAX_RPC_BUFFER_BYTES = 1_048_576; +const SOCKET_LIVENESS_PROBE_TIMEOUT_MS = 1_000; const UNKNOWN_REQUEST_ID = 'unknown'; -export type MethodHandler = (params: unknown) => Promise; +export interface MethodContext { + readonly signal: AbortSignal; +} + +export type MethodHandler = ( + params: unknown, + context: MethodContext, +) => Promise; function isKnownRpcMethod(method: string): method is RpcMethod { return Object.hasOwn(RpcMethodSchemas, method); @@ -46,22 +56,53 @@ async function socketPathExists(socketPath: string): Promise { async function probeSocketLiveness(socketPath: string): Promise { return await new Promise((resolve, reject) => { + const scope = new ResourceScope(); const probe = net.connect({ path: socketPath }); + let settled = false; + + const resolveWithValue = (value: boolean): void => { + if (settled) { + return; + } + + settled = true; + void scope.close().then(() => { + resolve(value); + }, reject); + }; + + const rejectWithError = (error: unknown): void => { + if (settled) { + return; + } + + settled = true; + void scope.close().then(() => { + reject(error instanceof Error ? error : new Error(String(error))); + }, reject); + }; + + scope.add('rpc liveness probe socket', () => { + probe.destroy(); + }); + const timeoutHandle = setTimeout(() => { + rejectWithError(new Error(`Timed out probing RPC socket: ${socketPath}`)); + }, SOCKET_LIVENESS_PROBE_TIMEOUT_MS); + scope.add('rpc liveness probe timeout', () => { + clearTimeout(timeoutHandle); + }); probe.once('connect', () => { - probe.end(); - resolve(true); + resolveWithValue(true); }); probe.once('error', (error: NodeJS.ErrnoException) => { - probe.destroy(); - if (error.code === 'ECONNREFUSED' || error.code === 'ENOENT') { - resolve(false); + resolveWithValue(false); return; } - reject(error); + rejectWithError(error); }); }); } @@ -368,8 +409,24 @@ export class RpcServer { return; } + const requestScope = new ResourceScope(); + const requestAbortController = new AbortController(); + const abortRequest = (): void => { + requestAbortController.abort(makeAbortError()); + }; + socket.once('close', abortRequest); + requestScope.add('rpc request close listener', () => { + socket.off('close', abortRequest); + }); + try { - const result = await handler(paramsResult.data); + const result = await handler(paramsResult.data, { + signal: requestAbortController.signal, + }); + if (requestAbortController.signal.aborted) { + return; + } + const resultResult = RpcMethodSchemas[request.method].result.safeParse(result); @@ -386,6 +443,10 @@ export class RpcServer { buildSuccessResponse(request.id, resultResult.data), ); } catch (error) { + if (requestAbortController.signal.aborted) { + return; + } + this.sendResponse( socket, error instanceof CliError @@ -398,6 +459,8 @@ export class RpcServer { ), ), ); + } finally { + await requestScope.close(); } } diff --git a/src/host/runCompletionCoordinator.ts b/src/host/runCompletionCoordinator.ts index fb2f30e..32819cb 100644 --- a/src/host/runCompletionCoordinator.ts +++ b/src/host/runCompletionCoordinator.ts @@ -8,7 +8,9 @@ import { RunCompletionSentinelScanner, type SentinelPiece, } from './runCompletionSentinel.js'; +import { addAbortListener, makeAbortError } from '../util/abort.js'; import { invariant } from '../util/assert.js'; +import { ResourceScope } from '../util/resourceScope.js'; const RUN_COMPLETION_POSTAMBLE_ECHO_PREFIX = String.raw`printf '\033\137`; const RUN_COMPLETION_SIGNAL_TOKEN_BYTES = 4; @@ -49,10 +51,17 @@ export interface PreparedWaitedRun { } /** Registered completion state returned after `input_run` appends successfully. */ +export interface RunCompletionWaitOptions { + readonly signal?: AbortSignal; +} + export interface RegisteredWaitedRunCompletion { postamble: string; sentinel: string; - wait(timeoutMs: number): Promise; + wait( + timeoutMs: number, + options?: RunCompletionWaitOptions, + ): Promise; } function shellOctalEscapedBytes(value: string): string { @@ -190,13 +199,21 @@ export class RunCompletionCoordinator { return { postamble, sentinel, - wait: (timeoutMs: number): Promise => { + wait: ( + timeoutMs: number, + options: RunCompletionWaitOptions = {}, + ): Promise => { invariant( !waitStarted, 'run completion wait must only be started once', ); waitStarted = true; - return this.#waitForRunCompletion(marker, completionPromise, timeoutMs); + return this.#waitForRunCompletion( + marker, + completionPromise, + timeoutMs, + options, + ); }, }; } @@ -255,6 +272,7 @@ export class RunCompletionCoordinator { marker: string, completionPromise: Promise, timeoutMs: number, + options: RunCompletionWaitOptions, ): Promise { assertRunMarker(marker); invariant( @@ -262,41 +280,60 @@ export class RunCompletionCoordinator { 'timeoutMs must be a positive integer', ); + const { signal } = options; + if (signal?.aborted === true) { + this.#runCompletionWaiters.delete(marker); + throw makeAbortError(signal); + } + + const scope = new ResourceScope(); const { promise, reject, resolve } = Promise.withResolvers(); let resolved = false; - const timeoutHandle = setTimeout(() => { + + const rejectWithError = (error: unknown): void => { if (resolved) { return; } resolved = true; + void scope.close().then(() => { + reject(error instanceof Error ? error : new Error(String(error))); + }, reject); + }; + + const resolveWithResult = (result: TimedRunCompletionWaitResult): void => { + if (resolved) { + return; + } + + resolved = true; + void scope.close().then(() => { + resolve(result); + }, reject); + }; + + const timeoutHandle = setTimeout(() => { // Keep sentinel/postamble registrations active after timeout so the // eventual internal completion bytes are still hidden from artifacts. this.#runCompletionWaiters.delete(marker); - resolve({ kind: 'timeout' }); + resolveWithResult({ kind: 'timeout' }); }, timeoutMs); + scope.add('run completion timeout', () => { + clearTimeout(timeoutHandle); + }); + + if (signal !== undefined) { + addAbortListener(scope, 'run completion abort listener', signal, () => { + // Match timeout behavior: stop waiting for a client response but keep + // sentinel/postamble registrations active so eventual completion bytes + // remain hidden and replayable. + this.#runCompletionWaiters.delete(marker); + rejectWithError(makeAbortError(signal)); + }); + } - void completionPromise.then( - (result) => { - if (resolved) { - return; - } - - resolved = true; - clearTimeout(timeoutHandle); - resolve(result); - }, - (error: unknown) => { - if (resolved) { - return; - } - - resolved = true; - clearTimeout(timeoutHandle); - reject(error instanceof Error ? error : new Error(String(error))); - }, - ); + void completionPromise.then(resolveWithResult, rejectWithError); return await promise; } diff --git a/src/util/abort.ts b/src/util/abort.ts new file mode 100644 index 0000000..606c9b5 --- /dev/null +++ b/src/util/abort.ts @@ -0,0 +1,44 @@ +import { invariant } from './assert.js'; +import type { ResourceScope } from './resourceScope.js'; + +export function makeAbortError(signal?: AbortSignal): Error { + const reason: unknown = signal?.reason; + if (reason instanceof Error) { + return reason; + } + + const error = new Error( + typeof reason === 'string' && reason.length > 0 + ? reason + : 'Operation aborted.', + ); + error.name = 'AbortError'; + return error; +} + +export function throwIfAborted(signal?: AbortSignal): void { + if (signal?.aborted === true) { + throw makeAbortError(signal); + } +} + +export function addAbortListener( + scope: ResourceScope, + name: string, + signal: AbortSignal, + listener: () => void, +): void { + invariant( + !signal.aborted, + 'abort listener must be registered before signal aborts', + ); + invariant( + typeof listener === 'function', + 'abort listener must be a function', + ); + + signal.addEventListener('abort', listener, { once: true }); + scope.add(name, () => { + signal.removeEventListener('abort', listener); + }); +} diff --git a/test/unit/host/runCompletionCoordinator.test.ts b/test/unit/host/runCompletionCoordinator.test.ts index 2b029c8..e184548 100644 --- a/test/unit/host/runCompletionCoordinator.test.ts +++ b/test/unit/host/runCompletionCoordinator.test.ts @@ -174,6 +174,43 @@ describe('RunCompletionCoordinator', () => { } }); + it('aborts an active wait and clears its timeout while preserving hidden completion bytes', async () => { + vi.useFakeTimers(); + try { + const { appender, events } = createFakeAppender(); + const coordinator = new RunCompletionCoordinator(appender); + const prepared = coordinator.prepareWaitedRun(); + const completion = coordinator.registerWaitedRun({ + marker: prepared.marker, + inputRunSeq: 13, + }); + const controller = new AbortController(); + const abortReason = new Error('caller disconnected'); + const waitPromise = completion.wait(1_000, { signal: controller.signal }); + + controller.abort(abortReason); + + await expect(waitPromise).rejects.toThrow('caller disconnected'); + expect(vi.getTimerCount()).toBe(0); + + await vi.advanceTimersByTimeAsync(1_000); + await coordinator.ingestPtyData(`before${completion.sentinel}after`); + + expect(events).toEqual([ + { type: 'output', data: 'before' }, + { + type: 'run_complete', + marker: prepared.marker, + inputRunSeq: 13, + seq: 100, + }, + { type: 'output', data: 'after' }, + ]); + } finally { + vi.useRealTimers(); + } + }); + it('fails ingestion loudly when a timed-out completion later cannot append run_complete', async () => { vi.useFakeTimers(); try { From b7c01435a6118c256d44ea66542c6b424b79206b Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Tue, 12 May 2026 08:50:21 +0000 Subject: [PATCH 2/3] test: cover abort signal cleanup paths --- src/host/hostMain.ts | 64 +----------- src/host/rpcClient.ts | 32 ++---- src/host/rpcServer.ts | 36 ++----- src/host/runCompletionCoordinator.ts | 69 +++---------- src/util/abort.ts | 122 +++++++++++++++++++++++ test/unit/host/rpcClient.test.ts | 86 ++++++++++++++++ test/unit/host/rpcServer.test.ts | 60 +++++++++++ test/unit/util/abort.test.ts | 144 +++++++++++++++++++++++++++ 8 files changed, 443 insertions(+), 170 deletions(-) create mode 100644 test/unit/host/rpcClient.test.ts create mode 100644 test/unit/host/rpcServer.test.ts create mode 100644 test/unit/util/abort.test.ts diff --git a/src/host/hostMain.ts b/src/host/hostMain.ts index 002795f..b51271c 100644 --- a/src/host/hostMain.ts +++ b/src/host/hostMain.ts @@ -53,6 +53,7 @@ import { addAbortListener, makeAbortError, throwIfAborted, + waitForScopedOperation, } from '../util/abort.js'; import { invariant } from '../util/assert.js'; import { ResourceScope } from '../util/resourceScope.js'; @@ -106,69 +107,6 @@ function rethrowAsync(error: unknown): void { }); } -async function waitForScopedOperation(params: { - readonly operationName: string; - readonly operation: Promise; - readonly scope: ResourceScope; - readonly signal: AbortSignal; - readonly timeoutMs?: number; - readonly timeoutResult?: () => T; -}): Promise { - const { operationName, operation, scope, signal, timeoutMs, timeoutResult } = - params; - invariant(operationName.length > 0, 'operationName must not be empty'); - if (signal.aborted) { - await scope.close(); - throw makeAbortError(signal); - } - - const { promise, reject, resolve } = Promise.withResolvers(); - let settled = false; - - const rejectWithError = (error: unknown): void => { - if (settled) { - return; - } - - settled = true; - void scope.close().then(() => { - reject(error instanceof Error ? error : new Error(String(error))); - }, reject); - }; - - const resolveWithValue = (value: T): void => { - if (settled) { - return; - } - - settled = true; - void scope.close().then(() => { - resolve(value); - }, reject); - }; - - if (timeoutMs !== undefined) { - invariant( - timeoutResult !== undefined, - 'timeoutResult must be provided when timeoutMs is set', - ); - const timeoutHandle = setTimeout(() => { - resolveWithValue(timeoutResult()); - }, timeoutMs); - scope.add(`${operationName} timeout`, () => { - clearTimeout(timeoutHandle); - }); - } - - addAbortListener(scope, `${operationName} abort listener`, signal, () => { - rejectWithError(makeAbortError(signal)); - }); - - void operation.then(resolveWithValue, rejectWithError); - - return await promise; -} - function resolveHostRendererName(input: string | undefined): RendererName { try { return resolveRendererName( diff --git a/src/host/rpcClient.ts b/src/host/rpcClient.ts index 8398333..7273cf8 100644 --- a/src/host/rpcClient.ts +++ b/src/host/rpcClient.ts @@ -15,6 +15,7 @@ import { } from '../protocol/messages.js'; import { addAbortListener, + createResourceScopedSettlers, makeAbortError, throwIfAborted, } from '../util/abort.js'; @@ -128,19 +129,12 @@ export async function sendRpc( scope.add('rpc client socket', () => { socket.destroy(); }); - let settled = false; + const settlers = createResourceScopedSettlers(scope, resolve, reject); let responseHandled = false; let buffer = ''; const rejectWithCliError = (error: CliError): void => { - if (settled) { - return; - } - - settled = true; - void scope.close().then(() => { - reject(error); - }, reject); + settlers.reject(error); }; const rejectWithTransportError = (error: unknown): void => { @@ -150,26 +144,12 @@ export async function sendRpc( }; const resolveWithResult = (result: unknown): void => { - if (settled) { - return; - } - - settled = true; - void scope.close().then(() => { - resolve(result); - }, reject); + settlers.resolve(result); }; if (signal !== undefined) { addAbortListener(scope, 'rpc client abort listener', signal, () => { - if (settled) { - return; - } - - settled = true; - void scope.close().then(() => { - reject(makeAbortError(signal)); - }, reject); + settlers.reject(makeAbortError(signal)); }); } @@ -306,7 +286,7 @@ export async function sendRpc( }); socket.on('end', () => { - if (settled || responseHandled) { + if (settlers.isSettled() || responseHandled) { return; } diff --git a/src/host/rpcServer.ts b/src/host/rpcServer.ts index c5bb61d..55549be 100644 --- a/src/host/rpcServer.ts +++ b/src/host/rpcServer.ts @@ -10,7 +10,7 @@ import { type RpcMethod, type RpcResponse, } from '../protocol/messages.js'; -import { makeAbortError } from '../util/abort.js'; +import { createResourceScopedSettlers, makeAbortError } from '../util/abort.js'; import { invariant } from '../util/assert.js'; import { ResourceScope } from '../util/resourceScope.js'; @@ -58,51 +58,31 @@ async function probeSocketLiveness(socketPath: string): Promise { return await new Promise((resolve, reject) => { const scope = new ResourceScope(); const probe = net.connect({ path: socketPath }); - let settled = false; - - const resolveWithValue = (value: boolean): void => { - if (settled) { - return; - } - - settled = true; - void scope.close().then(() => { - resolve(value); - }, reject); - }; - - const rejectWithError = (error: unknown): void => { - if (settled) { - return; - } - - settled = true; - void scope.close().then(() => { - reject(error instanceof Error ? error : new Error(String(error))); - }, reject); - }; + const settlers = createResourceScopedSettlers(scope, resolve, reject); scope.add('rpc liveness probe socket', () => { probe.destroy(); }); const timeoutHandle = setTimeout(() => { - rejectWithError(new Error(`Timed out probing RPC socket: ${socketPath}`)); + // If connect neither succeeds nor fails promptly, treat the socket path as + // stale rather than blocking host startup indefinitely. + settlers.resolve(false); }, SOCKET_LIVENESS_PROBE_TIMEOUT_MS); scope.add('rpc liveness probe timeout', () => { clearTimeout(timeoutHandle); }); probe.once('connect', () => { - resolveWithValue(true); + settlers.resolve(true); }); probe.once('error', (error: NodeJS.ErrnoException) => { if (error.code === 'ECONNREFUSED' || error.code === 'ENOENT') { - resolveWithValue(false); + settlers.resolve(false); return; } - rejectWithError(error); + settlers.reject(error); }); }); } diff --git a/src/host/runCompletionCoordinator.ts b/src/host/runCompletionCoordinator.ts index 32819cb..12c7b65 100644 --- a/src/host/runCompletionCoordinator.ts +++ b/src/host/runCompletionCoordinator.ts @@ -8,7 +8,7 @@ import { RunCompletionSentinelScanner, type SentinelPiece, } from './runCompletionSentinel.js'; -import { addAbortListener, makeAbortError } from '../util/abort.js'; +import { waitForScopedOperation } from '../util/abort.js'; import { invariant } from '../util/assert.js'; import { ResourceScope } from '../util/resourceScope.js'; @@ -280,62 +280,25 @@ export class RunCompletionCoordinator { 'timeoutMs must be a positive integer', ); - const { signal } = options; - if (signal?.aborted === true) { + const forgetWaiter = (): void => { + // Match timeout behavior: stop waiting for a client response but keep + // sentinel/postamble registrations active so eventual completion bytes + // remain hidden and replayable. this.#runCompletionWaiters.delete(marker); - throw makeAbortError(signal); - } - - const scope = new ResourceScope(); - const { promise, reject, resolve } = - Promise.withResolvers(); - let resolved = false; - - const rejectWithError = (error: unknown): void => { - if (resolved) { - return; - } - - resolved = true; - void scope.close().then(() => { - reject(error instanceof Error ? error : new Error(String(error))); - }, reject); - }; - - const resolveWithResult = (result: TimedRunCompletionWaitResult): void => { - if (resolved) { - return; - } - - resolved = true; - void scope.close().then(() => { - resolve(result); - }, reject); }; - const timeoutHandle = setTimeout(() => { - // Keep sentinel/postamble registrations active after timeout so the - // eventual internal completion bytes are still hidden from artifacts. - this.#runCompletionWaiters.delete(marker); - resolveWithResult({ kind: 'timeout' }); - }, timeoutMs); - scope.add('run completion timeout', () => { - clearTimeout(timeoutHandle); + return await waitForScopedOperation({ + operationName: 'run completion', + operation: completionPromise, + scope: new ResourceScope(), + ...(options.signal === undefined ? {} : { signal: options.signal }), + timeoutMs, + timeoutResult: () => { + forgetWaiter(); + return { kind: 'timeout' }; + }, + onAbort: forgetWaiter, }); - - if (signal !== undefined) { - addAbortListener(scope, 'run completion abort listener', signal, () => { - // Match timeout behavior: stop waiting for a client response but keep - // sentinel/postamble registrations active so eventual completion bytes - // remain hidden and replayable. - this.#runCompletionWaiters.delete(marker); - rejectWithError(makeAbortError(signal)); - }); - } - - void completionPromise.then(resolveWithResult, rejectWithError); - - return await promise; } async #appendOutput(data: string): Promise { diff --git a/src/util/abort.ts b/src/util/abort.ts index 606c9b5..98e1c11 100644 --- a/src/util/abort.ts +++ b/src/util/abort.ts @@ -1,6 +1,36 @@ import { invariant } from './assert.js'; import type { ResourceScope } from './resourceScope.js'; +export interface ResourceScopedSettlers { + readonly isSettled: () => boolean; + readonly reject: (error: unknown) => void; + readonly resolve: (value: T) => void; +} + +export interface ScopedOperationOptions { + readonly operationName: string; + readonly operation: Promise; + readonly scope: ResourceScope; + readonly signal?: AbortSignal; + readonly timeoutMs?: number; + readonly timeoutResult?: () => T; + readonly onAbort?: () => void; +} + +function toError(error: unknown): Error { + return error instanceof Error ? error : new Error(String(error)); +} + +function makeScopeCloseRejectionError( + originalError: unknown, + closeError: unknown, +): AggregateError { + return new AggregateError( + [toError(originalError), toError(closeError)], + 'ResourceScope close failed while rejecting operation.', + ); +} + export function makeAbortError(signal?: AbortSignal): Error { const reason: unknown = signal?.reason; if (reason instanceof Error) { @@ -42,3 +72,95 @@ export function addAbortListener( signal.removeEventListener('abort', listener); }); } + +export function createResourceScopedSettlers( + scope: ResourceScope, + resolve: (value: T) => void, + reject: (error: Error) => void, +): ResourceScopedSettlers { + invariant(typeof resolve === 'function', 'resolve must be a function'); + invariant(typeof reject === 'function', 'reject must be a function'); + + let settled = false; + + return { + isSettled: () => settled, + reject: (error: unknown): void => { + if (settled) { + return; + } + + settled = true; + void scope.close().then( + () => { + reject(toError(error)); + }, + (closeError: unknown) => { + reject(makeScopeCloseRejectionError(error, closeError)); + }, + ); + }, + resolve: (value: T): void => { + if (settled) { + return; + } + + settled = true; + void scope.close().then( + () => { + resolve(value); + }, + (closeError: unknown) => { + reject(toError(closeError)); + }, + ); + }, + }; +} + +export async function waitForScopedOperation( + options: ScopedOperationOptions, +): Promise { + const { + operationName, + operation, + scope, + signal, + timeoutMs, + timeoutResult, + onAbort, + } = options; + invariant(operationName.length > 0, 'operationName must not be empty'); + if (signal?.aborted === true) { + onAbort?.(); + await scope.close(); + throw makeAbortError(signal); + } + + const { promise, reject, resolve } = Promise.withResolvers(); + const settlers = createResourceScopedSettlers(scope, resolve, reject); + + if (timeoutMs !== undefined) { + invariant( + timeoutResult !== undefined, + 'timeoutResult must be provided when timeoutMs is set', + ); + const timeoutHandle = setTimeout(() => { + settlers.resolve(timeoutResult()); + }, timeoutMs); + scope.add(`${operationName} timeout`, () => { + clearTimeout(timeoutHandle); + }); + } + + if (signal !== undefined) { + addAbortListener(scope, `${operationName} abort listener`, signal, () => { + onAbort?.(); + settlers.reject(makeAbortError(signal)); + }); + } + + void operation.then(settlers.resolve, settlers.reject); + + return await promise; +} diff --git a/test/unit/host/rpcClient.test.ts b/test/unit/host/rpcClient.test.ts new file mode 100644 index 0000000..58389b6 --- /dev/null +++ b/test/unit/host/rpcClient.test.ts @@ -0,0 +1,86 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import net from 'node:net'; + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { sendRpc } from '../../../src/host/rpcClient.js'; + +let tempDir = ''; + +describe('sendRpc abort handling', () => { + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'agent-tty-rpc-client-')); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + tempDir = ''; + }); + + it('rejects before opening a socket when the signal is already aborted', async () => { + const controller = new AbortController(); + const reason = new Error('already cancelled'); + controller.abort(reason); + + await expect( + sendRpc( + join(tempDir, 'missing.sock'), + 'inspect', + {}, + 5_000, + controller.signal, + ), + ).rejects.toThrow(reason); + }); + + it('destroys an in-flight socket when the signal aborts', async () => { + const socketFile = join(tempDir, 'rpc.sock'); + const connected = Promise.withResolvers(); + const serverSocketClosed = Promise.withResolvers(); + const server = net.createServer((socket) => { + connected.resolve(socket); + socket.once('close', () => { + serverSocketClosed.resolve(undefined); + }); + socket.resume(); + }); + await new Promise((resolve, reject) => { + server.once('error', reject); + server.listen(socketFile, () => { + server.off('error', reject); + resolve(); + }); + }); + + try { + const controller = new AbortController(); + const reason = new Error('client cancelled'); + const request = sendRpc( + socketFile, + 'inspect', + {}, + 5_000, + controller.signal, + ); + const serverSocket = await connected.promise; + + controller.abort(reason); + + await expect(request).rejects.toThrow(reason); + await serverSocketClosed.promise; + expect(serverSocket.destroyed).toBe(true); + } finally { + await new Promise((resolve, reject) => { + server.close((error) => { + if (error !== undefined) { + reject(error); + return; + } + resolve(); + }); + }); + } + }); +}); diff --git a/test/unit/host/rpcServer.test.ts b/test/unit/host/rpcServer.test.ts new file mode 100644 index 0000000..37e653e --- /dev/null +++ b/test/unit/host/rpcServer.test.ts @@ -0,0 +1,60 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import net from 'node:net'; + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { RpcServer } from '../../../src/host/rpcServer.js'; + +let tempDir = ''; + +describe('RpcServer request abort handling', () => { + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'agent-tty-rpc-server-')); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + tempDir = ''; + }); + + it('aborts the request context when the client socket closes', async () => { + const socketFile = join(tempDir, 'rpc.sock'); + const handlerStarted = Promise.withResolvers(); + const requestAborted = Promise.withResolvers(); + const server = new RpcServer(socketFile, { + inspect: async (_params, context) => { + handlerStarted.resolve(undefined); + context.signal.addEventListener( + 'abort', + () => { + requestAborted.resolve(context.signal); + }, + { once: true }, + ); + await requestAborted.promise; + return {}; + }, + }); + await server.listen(); + + const client = net.connect({ path: socketFile }); + await new Promise((resolve, reject) => { + client.once('connect', resolve); + client.once('error', reject); + }); + client.write( + `${JSON.stringify({ id: 'request-1', method: 'inspect', params: {} })}\n`, + ); + await handlerStarted.promise; + client.destroy(); + + try { + const signal = await requestAborted.promise; + expect(signal.aborted).toBe(true); + } finally { + await server.close(); + } + }); +}); diff --git a/test/unit/util/abort.test.ts b/test/unit/util/abort.test.ts new file mode 100644 index 0000000..d85f9ff --- /dev/null +++ b/test/unit/util/abort.test.ts @@ -0,0 +1,144 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { + addAbortListener, + createResourceScopedSettlers, + makeAbortError, + throwIfAborted, + waitForScopedOperation, +} from '../../../src/util/abort.js'; +import { + ResourceScope, + ResourceScopeCloseError, +} from '../../../src/util/resourceScope.js'; + +describe('abort utilities', () => { + it('creates AbortError instances from missing or string abort reasons', () => { + const defaultError = makeAbortError(); + expect(defaultError.name).toBe('AbortError'); + expect(defaultError.message).toBe('Operation aborted.'); + + const controller = new AbortController(); + controller.abort('client disconnected'); + + const reasonError = makeAbortError(controller.signal); + expect(reasonError.name).toBe('AbortError'); + expect(reasonError.message).toBe('client disconnected'); + }); + + it('forwards Error abort reasons without wrapping them', () => { + const controller = new AbortController(); + const reason = new Error('stop now'); + controller.abort(reason); + + expect(makeAbortError(controller.signal)).toBe(reason); + expect(() => throwIfAborted(controller.signal)).toThrow(reason); + }); + + it('registers abort listeners with ResourceScope cleanup', async () => { + const scope = new ResourceScope(); + const controller = new AbortController(); + const listener = vi.fn(); + + addAbortListener(scope, 'test abort listener', controller.signal, listener); + await scope.close(); + controller.abort(); + + expect(listener).not.toHaveBeenCalled(); + }); + + it('asserts when registering a listener on an already aborted signal', () => { + const controller = new AbortController(); + controller.abort(); + + expect(() => + addAbortListener( + new ResourceScope(), + 'late listener', + controller.signal, + () => undefined, + ), + ).toThrow(/before signal aborts/u); + }); + + it('settles only once and closes the ResourceScope before resolving', async () => { + const scope = new ResourceScope(); + const releases: string[] = []; + scope.add('release', () => { + releases.push('closed'); + }); + const { promise, reject, resolve } = Promise.withResolvers(); + const settlers = createResourceScopedSettlers(scope, resolve, reject); + + settlers.resolve('ok'); + settlers.reject(new Error('late')); + + await expect(promise).resolves.toBe('ok'); + expect(releases).toEqual(['closed']); + }); + + it('preserves the original rejection when scope close also fails', async () => { + const scope = new ResourceScope(); + const closeFailure = new Error('close failed'); + const originalFailure = new Error('operation failed'); + scope.add('failing release', () => { + throw closeFailure; + }); + const { promise, reject, resolve } = Promise.withResolvers(); + const settlers = createResourceScopedSettlers(scope, resolve, reject); + + settlers.reject(originalFailure); + + await expect(promise).rejects.toMatchObject({ + errors: [originalFailure, expect.any(ResourceScopeCloseError)], + }); + }); + + it('waitForScopedOperation resolves timeout results and clears the timer', async () => { + vi.useFakeTimers(); + try { + const never = new Promise(() => undefined); + const promise = waitForScopedOperation({ + operationName: 'test operation', + operation: never, + scope: new ResourceScope(), + timeoutMs: 10, + timeoutResult: () => 'timed out', + }); + + await vi.advanceTimersByTimeAsync(10); + + await expect(promise).resolves.toBe('timed out'); + expect(vi.getTimerCount()).toBe(0); + } finally { + vi.useRealTimers(); + } + }); + + it('waitForScopedOperation aborts, runs onAbort, and clears the timeout', async () => { + vi.useFakeTimers(); + try { + const controller = new AbortController(); + const onAbort = vi.fn(); + const reason = new Error('request closed'); + const never = new Promise(() => undefined); + const promise = waitForScopedOperation({ + operationName: 'test operation', + operation: never, + scope: new ResourceScope(), + signal: controller.signal, + timeoutMs: 100, + timeoutResult: () => 'timed out', + onAbort, + }); + + controller.abort(reason); + + await expect(promise).rejects.toThrow(reason); + expect(onAbort).toHaveBeenCalledTimes(1); + expect(vi.getTimerCount()).toBe(0); + } finally { + vi.useRealTimers(); + } + }); +}); From 9165f30f8fbc5ae2bdac054bb001bc6965ce1dcc Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Tue, 12 May 2026 10:04:59 +0000 Subject: [PATCH 3/3] fix: guard scope cleanup and abort reasons --- mise.lock | 3 +- src/host/hostMain.ts | 46 ++++++---- src/host/lifecycle.ts | 20 ++++- src/host/rpcClient.ts | 30 +++---- src/host/rpcServer.ts | 21 ++++- src/host/runCompletionCoordinator.ts | 4 +- src/util/abort.ts | 69 ++++++++++++--- test/unit/util/abort.test.ts | 128 +++++++++++++++++++++++++++ 8 files changed, 263 insertions(+), 58 deletions(-) diff --git a/mise.lock b/mise.lock index 87e7c40..dd1b348 100644 --- a/mise.lock +++ b/mise.lock @@ -1,4 +1,4 @@ -# @generated - this file is auto-generated by `mise lock` https://mise.jdx.dev/dev-tools/mise-lock.html +# @generated - this file is auto-generated by `mise lock` https://mise.en.dev/dev-tools/mise-lock.html [[tools.actionlint]] version = "1.7.12" @@ -97,7 +97,6 @@ url_api = "https://api.github.com/repos/jdx/communique/releases/assets/413753918 checksum = "sha256:61b46cf231882e0a10e0489df310ec9b875e83da0f38b60d6ac7bd67e09a15dc" url = "https://github.com/jdx/communique/releases/download/v1.1.3/communique-x86_64-unknown-linux-gnu.tar.gz" url_api = "https://api.github.com/repos/jdx/communique/releases/assets/413753688" -provenance = "github-attestations" [tools.communique."platforms.linux-x64-musl"] checksum = "sha256:9c8d784d8bb91e83cd467ed0d26be6ed3290a61a482413d42b12321aee20d5c0" diff --git a/src/host/hostMain.ts b/src/host/hostMain.ts index b51271c..c4c9385 100644 --- a/src/host/hostMain.ts +++ b/src/host/hostMain.ts @@ -51,7 +51,7 @@ import { } from '../storage/sessionPaths.js'; import { addAbortListener, - makeAbortError, + makeAbortReason, throwIfAborted, waitForScopedOperation, } from '../util/abort.js'; @@ -177,7 +177,6 @@ export async function runHost(sessionId: string): Promise { let lastOutputAt = Date.now(); let lastActivityAt = lastOutputAt; const hostAbortController = new AbortController(); - let idleTimeoutHandle: ReturnType | null = null; let idleTimeoutScope: ResourceScope | null = null; let rpcListenPromise: Promise | null = null; let shutdownPromise: Promise | null = null; @@ -193,6 +192,10 @@ export async function runHost(sessionId: string): Promise { }); let ptyIngestionQueue: Promise = Promise.resolve(); + // Per-client wait-exit callbacks, cleaned up individually via ResourceScope. + // Using ptyExitPromise.then() would permanently attach to the shared promise. + const ptyExitWaiters = new Set<() => void>(); + const ptyExitPromise = new Promise((resolve) => { markPtyExited = (): void => { if (ptyHasExited) { @@ -201,6 +204,11 @@ export async function runHost(sessionId: string): Promise { ptyHasExited = true; resolve(); + const waiters = [...ptyExitWaiters]; + ptyExitWaiters.clear(); + for (const waiter of waiters) { + waiter(); + } }; }); @@ -282,6 +290,7 @@ export async function runHost(sessionId: string): Promise { const scope = new ResourceScope(); idleTimeoutScope = scope; const checkIntervalMs = Math.min(idleTimeoutMs, IDLE_CHECK_CAP_MS); + let idleTimeoutHandle: ReturnType | null = null; idleTimeoutHandle = setInterval(() => { if (hostAbortController.signal.aborted) { clearIdleTimeout(); @@ -322,7 +331,7 @@ export async function runHost(sessionId: string): Promise { shutdownPromise = (async () => { try { - hostAbortController.abort(makeAbortError()); + hostAbortController.abort(makeAbortReason('Host is shutting down.')); clearIdleTimeout(); if (isSessionCommandable(state)) { pty.kill(); @@ -391,6 +400,15 @@ export async function runHost(sessionId: string): Promise { })().catch(rethrowAsync); }; + const makeWaitExitOutcome = (): WaitOutcome => { + const snapshot = state.snapshot(); + const result: WaitOutcome = { timedOut: false }; + if (snapshot.exitCode !== null) { + result.exitCode = snapshot.exitCode; + } + return result; + }; + const handlers: Record = { inspect: () => Promise.resolve({ session: state.snapshot() }), snapshot: async (params: unknown) => { @@ -723,21 +741,17 @@ export async function runHost(sessionId: string): Promise { if (hasExit) { if (ptyHasExited) { - const snapshot = state.snapshot(); - const result: WaitOutcome = { timedOut: false }; - if (snapshot.exitCode !== null) { - result.exitCode = snapshot.exitCode; - } - return result; + return makeWaitExitOutcome(); } - waitCondition = ptyExitPromise.then(() => { - const snapshot = state.snapshot(); - const result: WaitOutcome = { timedOut: false }; - if (snapshot.exitCode !== null) { - result.exitCode = snapshot.exitCode; - } - return result; + waitCondition = new Promise((resolve) => { + const waiter = (): void => { + resolve(makeWaitExitOutcome()); + }; + ptyExitWaiters.add(waiter); + waitScope.add('wait exit waiter', () => { + ptyExitWaiters.delete(waiter); + }); }); } else { assertSessionCommandable(state); diff --git a/src/host/lifecycle.ts b/src/host/lifecycle.ts index 5623dff..8d8e70b 100644 --- a/src/host/lifecycle.ts +++ b/src/host/lifecycle.ts @@ -25,7 +25,7 @@ import { sessionDir, socketPath, } from '../storage/sessionPaths.js'; -import { throwIfAborted } from '../util/abort.js'; +import { makeAbortError, throwIfAborted } from '../util/abort.js'; import { invariant } from '../util/assert.js'; import { sendRpc } from './rpcClient.js'; @@ -46,6 +46,20 @@ function delayOptions( return signal === undefined ? undefined : { signal }; } +async function pollDelay( + intervalMs: number, + signal?: AbortSignal, +): Promise { + try { + await delay(intervalMs, undefined, delayOptions(signal)); + } catch (error) { + if (signal?.aborted === true) { + throw makeAbortError(signal); + } + throw error; + } +} + interface NodeError extends Error { code?: string; } @@ -284,7 +298,7 @@ async function waitForTerminalManifest( } if (attempt + 1 < maxAttempts) { - await delay(intervalMs, undefined, delayOptions(signal)); + await pollDelay(intervalMs, signal); } } @@ -322,7 +336,7 @@ async function waitForProcessAndSocketShutdown( } if (attempt + 1 < maxAttempts) { - await delay(intervalMs, undefined, delayOptions(signal)); + await pollDelay(intervalMs, signal); } } diff --git a/src/host/rpcClient.ts b/src/host/rpcClient.ts index 7273cf8..50d27e9 100644 --- a/src/host/rpcClient.ts +++ b/src/host/rpcClient.ts @@ -133,20 +133,12 @@ export async function sendRpc( let responseHandled = false; let buffer = ''; - const rejectWithCliError = (error: CliError): void => { - settlers.reject(error); - }; - const rejectWithTransportError = (error: unknown): void => { - rejectWithCliError( + settlers.reject( toTransportCliError(error, socketPath, method, effectiveTimeoutMs), ); }; - const resolveWithResult = (result: unknown): void => { - settlers.resolve(result); - }; - if (signal !== undefined) { addAbortListener(scope, 'rpc client abort listener', signal, () => { settlers.reject(makeAbortError(signal)); @@ -161,7 +153,7 @@ export async function sendRpc( }); socket.on('timeout', () => { - rejectWithCliError( + settlers.reject( makeCliError(ERROR_CODES.HOST_TIMEOUT, { message: `RPC request timed out after ${String(effectiveTimeoutMs)}ms.`, details: { @@ -183,7 +175,7 @@ export async function sendRpc( } if (buffer.length + chunk.length > MAX_RPC_BUFFER_BYTES) { - rejectWithCliError( + settlers.reject( makeCliError(ERROR_CODES.RPC_ERROR, { message: 'RPC response exceeds maximum buffer size.', details: { method, socketPath }, @@ -207,7 +199,7 @@ export async function sendRpc( const responseResult = RpcResponseSchema.safeParse(rawResponse); if (!responseResult.success) { - rejectWithCliError( + settlers.reject( makeCliError(ERROR_CODES.RPC_ERROR, { message: 'RPC response failed schema validation.', details: { @@ -223,7 +215,7 @@ export async function sendRpc( const response = responseResult.data; if (response.id !== request.id) { - rejectWithCliError( + settlers.reject( makeCliError(ERROR_CODES.RPC_ERROR, { message: `RPC response id mismatch for method "${method}".`, details: { @@ -244,7 +236,7 @@ export async function sendRpc( ); if (!resultResult.success) { - rejectWithCliError( + settlers.reject( makeCliError(ERROR_CODES.RPC_ERROR, { message: `RPC result failed validation for method "${method}".`, details: { @@ -257,19 +249,19 @@ export async function sendRpc( return; } - resolveWithResult(resultResult.data); + settlers.resolve(resultResult.data); return; } - resolveWithResult(response.result); + settlers.resolve(response.result); return; } - rejectWithCliError( + settlers.reject( toResponseCliError(response.error.code, response.error.message), ); } catch (error) { - rejectWithCliError( + settlers.reject( makeCliError(ERROR_CODES.RPC_ERROR, { message: toErrorMessage( error, @@ -290,7 +282,7 @@ export async function sendRpc( return; } - rejectWithCliError( + settlers.reject( makeCliError(ERROR_CODES.RPC_ERROR, { message: `RPC connection closed before a complete response was received for method "${method}".`, details: { diff --git a/src/host/rpcServer.ts b/src/host/rpcServer.ts index 55549be..04831c0 100644 --- a/src/host/rpcServer.ts +++ b/src/host/rpcServer.ts @@ -10,7 +10,10 @@ import { type RpcMethod, type RpcResponse, } from '../protocol/messages.js'; -import { createResourceScopedSettlers, makeAbortError } from '../util/abort.js'; +import { + createResourceScopedSettlers, + makeAbortReason, +} from '../util/abort.js'; import { invariant } from '../util/assert.js'; import { ResourceScope } from '../util/resourceScope.js'; @@ -20,6 +23,12 @@ const MAX_RPC_BUFFER_BYTES = 1_048_576; const SOCKET_LIVENESS_PROBE_TIMEOUT_MS = 1_000; const UNKNOWN_REQUEST_ID = 'unknown'; +/** + * Per-request context passed to RPC method handlers. + * + * `signal` aborts when the client socket closes, indicating the caller is no + * longer waiting for a response. + */ export interface MethodContext { readonly signal: AbortSignal; } @@ -392,7 +401,9 @@ export class RpcServer { const requestScope = new ResourceScope(); const requestAbortController = new AbortController(); const abortRequest = (): void => { - requestAbortController.abort(makeAbortError()); + requestAbortController.abort( + makeAbortReason('RPC client socket closed.'), + ); }; socket.once('close', abortRequest); requestScope.add('rpc request close listener', () => { @@ -440,7 +451,11 @@ export class RpcServer { ), ); } finally { - await requestScope.close(); + try { + await requestScope.close(); + } catch (error) { + console.debug('RPC request ResourceScope cleanup failed:', error); + } } } diff --git a/src/host/runCompletionCoordinator.ts b/src/host/runCompletionCoordinator.ts index 12c7b65..8e38594 100644 --- a/src/host/runCompletionCoordinator.ts +++ b/src/host/runCompletionCoordinator.ts @@ -50,11 +50,11 @@ export interface PreparedWaitedRun { marker: string; } -/** Registered completion state returned after `input_run` appends successfully. */ export interface RunCompletionWaitOptions { readonly signal?: AbortSignal; } +/** Registered completion state returned after `input_run` appends successfully. */ export interface RegisteredWaitedRunCompletion { postamble: string; sentinel: string; @@ -291,7 +291,7 @@ export class RunCompletionCoordinator { operationName: 'run completion', operation: completionPromise, scope: new ResourceScope(), - ...(options.signal === undefined ? {} : { signal: options.signal }), + signal: options.signal, timeoutMs, timeoutResult: () => { forgetWaiter(); diff --git a/src/util/abort.ts b/src/util/abort.ts index 98e1c11..b523edd 100644 --- a/src/util/abort.ts +++ b/src/util/abort.ts @@ -7,15 +7,23 @@ export interface ResourceScopedSettlers { readonly resolve: (value: T) => void; } -export interface ScopedOperationOptions { +type TimeoutConfig = + | { + readonly timeoutMs: number; + readonly timeoutResult: () => T; + } + | { + readonly timeoutMs?: never; + readonly timeoutResult?: never; + }; + +export type ScopedOperationOptions = { readonly operationName: string; readonly operation: Promise; readonly scope: ResourceScope; - readonly signal?: AbortSignal; - readonly timeoutMs?: number; - readonly timeoutResult?: () => T; + readonly signal?: AbortSignal | undefined; readonly onAbort?: () => void; -} +} & TimeoutConfig; function toError(error: unknown): Error { return error instanceof Error ? error : new Error(String(error)); @@ -31,6 +39,15 @@ function makeScopeCloseRejectionError( ); } +/** Creates a specific AbortError reason to pass into `AbortController.abort()`. */ +export function makeAbortReason(message: string): Error { + invariant(message.length > 0, 'abort reason message must not be empty'); + const error = new Error(message); + error.name = 'AbortError'; + return error; +} + +/** Extracts an AbortError from an observed signal, preserving `signal.reason`. */ export function makeAbortError(signal?: AbortSignal): Error { const reason: unknown = signal?.reason; if (reason instanceof Error) { @@ -46,12 +63,19 @@ export function makeAbortError(signal?: AbortSignal): Error { return error; } +/** Throws if `signal` is aborted; no-op when `signal` is undefined. */ export function throwIfAborted(signal?: AbortSignal): void { if (signal?.aborted === true) { throw makeAbortError(signal); } } +/** + * Registers an abort listener and removes it when `scope` closes. + * + * The signal must not already be aborted; callers should check + * `signal.aborted` or call `throwIfAborted()` before registering. + */ export function addAbortListener( scope: ResourceScope, name: string, @@ -73,6 +97,13 @@ export function addAbortListener( }); } +/** + * Creates idempotent Promise settlers that close `scope` before resolving or + * rejecting the outer operation. If cleanup fails while resolving, the promise + * rejects with the cleanup error. If cleanup fails while rejecting, the original + * operation error is preserved alongside the cleanup failure in an + * `AggregateError`. + */ export function createResourceScopedSettlers( scope: ResourceScope, resolve: (value: T) => void, @@ -118,6 +149,22 @@ export function createResourceScopedSettlers( }; } +function runAbortCallback(onAbort: (() => void) | undefined): Error | null { + try { + onAbort?.(); + return null; + } catch (error) { + return toError(error); + } +} + +/** + * Waits for `operation`, an optional timeout, or an optional abort signal while + * tying all timers/listeners to `scope`. The scope closes before the returned + * promise settles. `timeoutResult` is evaluated lazily when the timeout wins, + * and `onAbort` runs before scope cleanup for both pre-aborted and later-aborted + * signals. + */ export async function waitForScopedOperation( options: ScopedOperationOptions, ): Promise { @@ -132,19 +179,15 @@ export async function waitForScopedOperation( } = options; invariant(operationName.length > 0, 'operationName must not be empty'); if (signal?.aborted === true) { - onAbort?.(); + const abortCallbackError = runAbortCallback(onAbort); await scope.close(); - throw makeAbortError(signal); + throw abortCallbackError ?? makeAbortError(signal); } const { promise, reject, resolve } = Promise.withResolvers(); const settlers = createResourceScopedSettlers(scope, resolve, reject); if (timeoutMs !== undefined) { - invariant( - timeoutResult !== undefined, - 'timeoutResult must be provided when timeoutMs is set', - ); const timeoutHandle = setTimeout(() => { settlers.resolve(timeoutResult()); }, timeoutMs); @@ -155,8 +198,8 @@ export async function waitForScopedOperation( if (signal !== undefined) { addAbortListener(scope, `${operationName} abort listener`, signal, () => { - onAbort?.(); - settlers.reject(makeAbortError(signal)); + const abortCallbackError = runAbortCallback(onAbort); + settlers.reject(abortCallbackError ?? makeAbortError(signal)); }); } diff --git a/test/unit/util/abort.test.ts b/test/unit/util/abort.test.ts index d85f9ff..96d968a 100644 --- a/test/unit/util/abort.test.ts +++ b/test/unit/util/abort.test.ts @@ -4,6 +4,7 @@ import { addAbortListener, createResourceScopedSettlers, makeAbortError, + makeAbortReason, throwIfAborted, waitForScopedOperation, } from '../../../src/util/abort.js'; @@ -26,6 +27,13 @@ describe('abort utilities', () => { expect(reasonError.message).toBe('client disconnected'); }); + it('creates named abort reasons for internal controllers', () => { + const error = makeAbortReason('host shutdown'); + + expect(error.name).toBe('AbortError'); + expect(error.message).toBe('host shutdown'); + }); + it('forwards Error abort reasons without wrapping them', () => { const controller = new AbortController(); const reason = new Error('stop now'); @@ -94,6 +102,83 @@ describe('abort utilities', () => { }); }); + it('rejects with the close failure when resolving cannot close the scope', async () => { + const scope = new ResourceScope(); + const closeFailure = new Error('close failed'); + scope.add('failing release', () => { + throw closeFailure; + }); + const { promise, reject, resolve } = Promise.withResolvers(); + const settlers = createResourceScopedSettlers(scope, resolve, reject); + + settlers.resolve('ok'); + + let caught: unknown; + try { + await promise; + } catch (error) { + caught = error; + } + + expect(caught).toBeInstanceOf(ResourceScopeCloseError); + expect((caught as ResourceScopeCloseError).failures).toEqual([ + { name: 'failing release', error: closeFailure }, + ]); + }); + + it('waitForScopedOperation resolves when the operation resolves first', async () => { + const releases: string[] = []; + const scope = new ResourceScope(); + scope.add('release', () => { + releases.push('closed'); + }); + + await expect( + waitForScopedOperation({ + operationName: 'test operation', + operation: Promise.resolve('done'), + scope, + }), + ).resolves.toBe('done'); + expect(releases).toEqual(['closed']); + }); + + it('waitForScopedOperation rejects when the operation rejects first', async () => { + const releases: string[] = []; + const failure = new Error('operation failed'); + const scope = new ResourceScope(); + scope.add('release', () => { + releases.push('closed'); + }); + + await expect( + waitForScopedOperation({ + operationName: 'test operation', + operation: Promise.reject(failure), + scope, + }), + ).rejects.toThrow(failure); + expect(releases).toEqual(['closed']); + }); + + it('waitForScopedOperation clears the timeout when the operation resolves first', async () => { + vi.useFakeTimers(); + try { + const promise = waitForScopedOperation({ + operationName: 'test operation', + operation: Promise.resolve('done'), + scope: new ResourceScope(), + timeoutMs: 100, + timeoutResult: () => 'timed out', + }); + + await expect(promise).resolves.toBe('done'); + expect(vi.getTimerCount()).toBe(0); + } finally { + vi.useRealTimers(); + } + }); + it('waitForScopedOperation resolves timeout results and clears the timer', async () => { vi.useFakeTimers(); try { @@ -115,6 +200,49 @@ describe('abort utilities', () => { } }); + it('waitForScopedOperation handles pre-aborted signals with cleanup and onAbort', async () => { + const controller = new AbortController(); + const reason = new Error('already closed'); + const releases: string[] = []; + const onAbort = vi.fn(); + const scope = new ResourceScope(); + scope.add('release', () => { + releases.push('closed'); + }); + controller.abort(reason); + + await expect( + waitForScopedOperation({ + operationName: 'test operation', + operation: Promise.resolve('late'), + scope, + signal: controller.signal, + onAbort, + }), + ).rejects.toThrow(reason); + expect(onAbort).toHaveBeenCalledTimes(1); + expect(releases).toEqual(['closed']); + }); + + it('waitForScopedOperation rejects instead of throwing from an abort callback', async () => { + const controller = new AbortController(); + const callbackError = new Error('abort cleanup failed'); + const never = new Promise(() => undefined); + const promise = waitForScopedOperation({ + operationName: 'test operation', + operation: never, + scope: new ResourceScope(), + signal: controller.signal, + onAbort: () => { + throw callbackError; + }, + }); + + controller.abort(new Error('request closed')); + + await expect(promise).rejects.toThrow(callbackError); + }); + it('waitForScopedOperation aborts, runs onAbort, and clears the timeout', async () => { vi.useFakeTimers(); try {