Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 129 additions & 83 deletions src/host/hostMain.ts

Large diffs are not rendered by default.

63 changes: 59 additions & 4 deletions src/host/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,41 @@ import {
sessionDir,
socketPath,
} from '../storage/sessionPaths.js';
import { makeAbortError, throwIfAborted } from '../util/abort.js';
import { invariant } from '../util/assert.js';
import { sendRpc } from './rpcClient.js';

const DESTROY_POLL_INTERVAL_MS = 100;
const DESTROY_MAX_ATTEMPTS = 50;

interface PollOptions {
readonly signal?: AbortSignal;
}

function pollOptions(signal?: AbortSignal): PollOptions {
return signal === undefined ? {} : { signal };
}

function delayOptions(
signal?: AbortSignal,
): { signal: AbortSignal } | undefined {
return signal === undefined ? undefined : { signal };
}

async function pollDelay(
intervalMs: number,
signal?: AbortSignal,
): Promise<void> {
try {
await delay(intervalMs, undefined, delayOptions(signal));
} catch (error) {
if (signal?.aborted === true) {
throw makeAbortError(signal);
}
throw error;
}
}

interface NodeError extends Error {
code?: string;
}
Expand Down Expand Up @@ -246,6 +275,7 @@ async function waitForTerminalManifest(
manifestFile: string,
maxAttempts: number = DESTROY_MAX_ATTEMPTS,
intervalMs: number = DESTROY_POLL_INTERVAL_MS,
options: PollOptions = {},
): Promise<SessionRecord | null> {
invariant(
Number.isInteger(maxAttempts) && maxAttempts > 0,
Expand All @@ -256,15 +286,19 @@ async function waitForTerminalManifest(
'intervalMs must be a non-negative integer',
);

const { signal } = options;
throwIfAborted(signal);

for (let attempt = 0; attempt < maxAttempts; attempt += 1) {
throwIfAborted(signal);
const manifest = await readManifest(manifestFile);

if (isTerminalSessionStatus(manifest.status)) {
return manifest;
}

if (attempt + 1 < maxAttempts) {
await delay(intervalMs);
await pollDelay(intervalMs, signal);
}
}

Expand All @@ -277,6 +311,7 @@ async function waitForProcessAndSocketShutdown(
socketFile: string,
maxAttempts: number = DESTROY_MAX_ATTEMPTS,
intervalMs: number = DESTROY_POLL_INTERVAL_MS,
options: PollOptions = {},
): Promise<boolean> {
invariant(
Number.isInteger(maxAttempts) && maxAttempts > 0,
Expand All @@ -287,7 +322,11 @@ async function waitForProcessAndSocketShutdown(
'intervalMs must be a non-negative integer',
);

const { signal } = options;
throwIfAborted(signal);

for (let attempt = 0; attempt < maxAttempts; attempt += 1) {
throwIfAborted(signal);
const hostAlive = isProcessAlive(hostPid);
const childAlive = isProcessAlive(childPid);
const socketPresent = await pathExists(socketFile);
Expand All @@ -297,7 +336,7 @@ async function waitForProcessAndSocketShutdown(
}

if (attempt + 1 < maxAttempts) {
await delay(intervalMs);
await pollDelay(intervalMs, signal);
}
}

Expand Down Expand Up @@ -458,10 +497,17 @@ export function launchHost(config: LaunchHostConfig): number {
return child.pid;
}

export interface DestroySessionOptions {
readonly signal?: AbortSignal;
}

export async function destroySession(
sessionId: string,
force?: boolean,
options: DestroySessionOptions = {},
): Promise<void> {
const { signal } = options;
throwIfAborted(signal);
const { sessionDirectory, manifestFile, socketFile } =
getSessionPaths(sessionId);
const manifest = await readSessionManifestOrThrow(sessionId, manifestFile);
Expand Down Expand Up @@ -493,6 +539,9 @@ export async function destroySession(
manifest.hostPid,
manifest.childPid,
socketFile,
DESTROY_MAX_ATTEMPTS,
DESTROY_POLL_INTERVAL_MS,
pollOptions(signal),
);
await reconcileSession(sessionDirectory);

Expand All @@ -514,7 +563,8 @@ export async function destroySession(
}

try {
await sendRpc(socketFile, 'destroy');
throwIfAborted(signal);
await sendRpc(socketFile, 'destroy', undefined, undefined, signal);
} catch (error) {
if (
!(error instanceof CliError) ||
Expand All @@ -535,7 +585,12 @@ export async function destroySession(
throw error;
}

const terminalManifest = await waitForTerminalManifest(manifestFile);
const terminalManifest = await waitForTerminalManifest(
manifestFile,
DESTROY_MAX_ATTEMPTS,
DESTROY_POLL_INTERVAL_MS,
pollOptions(signal),
);
if (terminalManifest !== null) {
return;
}
Expand Down
63 changes: 31 additions & 32 deletions src/host/rpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ import {
RpcResponseSchema,
type RpcMethod,
} from '../protocol/messages.js';
import {
addAbortListener,
createResourceScopedSettlers,
makeAbortError,
throwIfAborted,
} from '../util/abort.js';
import { invariant } from '../util/assert.js';
import { ResourceScope } from '../util/resourceScope.js';

const DEFAULT_TIMEOUT_MS = 5_000;
const MAX_RPC_BUFFER_BYTES = 1_048_576;
Expand Down Expand Up @@ -95,7 +102,9 @@ export async function sendRpc(
method: string,
params?: Record<string, unknown>,
timeoutMs?: number,
signal?: AbortSignal,
): Promise<unknown> {
throwIfAborted(signal);
const effectiveTimeoutMs = timeoutMs ?? DEFAULT_TIMEOUT_MS;
invariant(
Number.isFinite(effectiveTimeoutMs) && effectiveTimeoutMs >= 0,
Expand All @@ -116,35 +125,25 @@ export async function sendRpc(

return await new Promise<unknown>((resolve, reject) => {
const socket = net.connect({ path: socketPath });
let settled = false;
const scope = new ResourceScope();
scope.add('rpc client socket', () => {
socket.destroy();
});
const settlers = createResourceScopedSettlers(scope, resolve, reject);
let responseHandled = false;
let buffer = '';

const rejectWithCliError = (error: CliError): void => {
if (settled) {
return;
}

settled = true;
socket.destroy();
reject(error);
};

const rejectWithTransportError = (error: unknown): void => {
rejectWithCliError(
settlers.reject(
toTransportCliError(error, socketPath, method, effectiveTimeoutMs),
);
};

const resolveWithResult = (result: unknown): void => {
if (settled) {
return;
}

settled = true;
socket.destroy();
resolve(result);
};
if (signal !== undefined) {
addAbortListener(scope, 'rpc client abort listener', signal, () => {
settlers.reject(makeAbortError(signal));
});
}

socket.setEncoding('utf8');
socket.setTimeout(effectiveTimeoutMs);
Expand All @@ -154,7 +153,7 @@ export async function sendRpc(
});

socket.on('timeout', () => {
rejectWithCliError(
settlers.reject(
makeCliError(ERROR_CODES.HOST_TIMEOUT, {
message: `RPC request timed out after ${String(effectiveTimeoutMs)}ms.`,
details: {
Expand All @@ -176,7 +175,7 @@ export async function sendRpc(
}

if (buffer.length + chunk.length > MAX_RPC_BUFFER_BYTES) {
rejectWithCliError(
settlers.reject(
makeCliError(ERROR_CODES.RPC_ERROR, {
message: 'RPC response exceeds maximum buffer size.',
details: { method, socketPath },
Expand All @@ -200,7 +199,7 @@ export async function sendRpc(
const responseResult = RpcResponseSchema.safeParse(rawResponse);

if (!responseResult.success) {
rejectWithCliError(
settlers.reject(
makeCliError(ERROR_CODES.RPC_ERROR, {
message: 'RPC response failed schema validation.',
details: {
Expand All @@ -216,7 +215,7 @@ export async function sendRpc(
const response = responseResult.data;

if (response.id !== request.id) {
rejectWithCliError(
settlers.reject(
makeCliError(ERROR_CODES.RPC_ERROR, {
message: `RPC response id mismatch for method "${method}".`,
details: {
Expand All @@ -237,7 +236,7 @@ export async function sendRpc(
);

if (!resultResult.success) {
rejectWithCliError(
settlers.reject(
makeCliError(ERROR_CODES.RPC_ERROR, {
message: `RPC result failed validation for method "${method}".`,
details: {
Expand All @@ -250,19 +249,19 @@ export async function sendRpc(
return;
}

resolveWithResult(resultResult.data);
settlers.resolve(resultResult.data);
return;
}

resolveWithResult(response.result);
settlers.resolve(response.result);
return;
}

rejectWithCliError(
settlers.reject(
toResponseCliError(response.error.code, response.error.message),
);
} catch (error) {
rejectWithCliError(
settlers.reject(
makeCliError(ERROR_CODES.RPC_ERROR, {
message: toErrorMessage(
error,
Expand All @@ -279,11 +278,11 @@ export async function sendRpc(
});

socket.on('end', () => {
if (settled || responseHandled) {
if (settlers.isSettled() || responseHandled) {
return;
}

rejectWithCliError(
settlers.reject(
makeCliError(ERROR_CODES.RPC_ERROR, {
message: `RPC connection closed before a complete response was received for method "${method}".`,
details: {
Expand Down
Loading
Loading