diff --git a/packages/cli/package.json b/packages/cli/package.json index 4a6c8317b..2652c1710 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -9,7 +9,8 @@ }, "type": "module", "bin": { - "ocap": "./dist/app.mjs" + "ocap": "./dist/app.mjs", + "ok": "./dist/ok.mjs" }, "files": [ "dist/" diff --git a/packages/cli/src/commands/daemon-client.ts b/packages/cli/src/commands/daemon-client.ts new file mode 100644 index 000000000..7897962b0 --- /dev/null +++ b/packages/cli/src/commands/daemon-client.ts @@ -0,0 +1,212 @@ +import { createConnection } from 'node:net'; +import type { Socket } from 'node:net'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; + +const READ_TIMEOUT_MS = 30_000; + +/** + * Get the default daemon socket path. + * + * @returns The socket path. + */ +export function getSocketPath(): string { + return join(homedir(), '.ocap', 'console.sock'); +} + +/** + * Connect to a UNIX domain socket. + * + * @param socketPath - The socket path to connect to. + * @returns A connected socket. + */ +async function connectSocket(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const socket = createConnection(socketPath, () => { + socket.removeListener('error', reject); + resolve(socket); + }); + socket.on('error', reject); + }); +} + +/** + * Read a single newline-delimited line from a socket. + * + * @param socket - The socket to read from. + * @returns The line read. + */ +async function readLine(socket: Socket): Promise { + return new Promise((resolve, reject) => { + let buffer = ''; + + const timer = setTimeout(() => { + cleanup(); + reject(new Error('Daemon response timed out')); + }, READ_TIMEOUT_MS); + + /** + * Remove all listeners and clear the timeout. + */ + function cleanup(): void { + clearTimeout(timer); + socket.removeAllListeners('data'); + socket.removeAllListeners('error'); + socket.removeAllListeners('end'); + socket.removeAllListeners('close'); + } + + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + cleanup(); + resolve(buffer.slice(0, idx)); + } + }; + + socket.on('data', onData); + socket.once('error', (error) => { + cleanup(); + reject(error); + }); + socket.once('end', () => { + cleanup(); + reject(new Error('Socket closed before response received')); + }); + socket.once('close', () => { + cleanup(); + reject(new Error('Socket closed before response received')); + }); + }); +} + +/** + * Write a newline-delimited line to a socket. + * + * @param socket - The socket to write to. + * @param line - The line to write. + */ +async function writeLine(socket: Socket, line: string): Promise { + return new Promise((resolve, reject) => { + socket.write(`${line}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +/** + * The response shape from the system console vat. + */ +export type ConsoleResponse = { + ok: boolean; + result?: unknown; + error?: string; +}; + +/** + * Send a JSON request to the daemon over a UNIX socket and return the response. + * + * Opens a connection, writes one JSON line, reads one JSON response line, + * then closes the connection. Retries once after a short delay if the + * connection is rejected (e.g. due to a probe connection race). + * + * @param socketPath - The UNIX socket path. + * @param request - The request to send. + * @param request.ref - Optional ref targeting a capability. + * @param request.method - The method name to invoke. + * @param request.args - Optional arguments array. + * @returns The parsed response. + */ +export async function sendCommand( + socketPath: string, + request: { ref?: string; method: string; args?: unknown[] }, +): Promise { + const attempt = async (): Promise => { + const socket = await connectSocket(socketPath); + try { + await writeLine(socket, JSON.stringify(request)); + const responseLine = await readLine(socket); + return JSON.parse(responseLine) as ConsoleResponse; + } finally { + socket.destroy(); + } + }; + + try { + return await attempt(); + } catch { + // Retry once after a short delay — the daemon's socket channel may + // still be cleaning up a previous probe connection. + await new Promise((resolve) => setTimeout(resolve, 100)); + return attempt(); + } +} + +/** + * Check whether the daemon is running by probing the socket. + * + * @param socketPath - The UNIX socket path. + * @returns True if the daemon socket accepts a connection. + */ +export async function isDaemonRunning(socketPath: string): Promise { + try { + const socket = await connectSocket(socketPath); + socket.destroy(); + return true; + } catch { + return false; + } +} + +/** + * Read all content from stdin, stripping shebang lines. + * + * @returns The stdin content with shebang lines removed. + */ +export async function readStdin(): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + process.stdin.on('data', (chunk: Buffer) => chunks.push(chunk)); + process.stdin.on('end', () => { + const raw = Buffer.concat(chunks).toString().trim(); + const lines = raw.split('\n').filter((line) => !line.startsWith('#!')); + resolve(lines.join('\n').trim()); + }); + process.stdin.on('error', reject); + }); +} + +/** + * Read a ref from stdin. Strips shebang lines. + * + * @returns The ref string. + */ +export async function readRefFromStdin(): Promise { + const content = await readStdin(); + if (!content) { + throw new Error('No ref found in stdin'); + } + return content; +} + +/** + * Read a ref from a .ocap file. Strips shebang lines. + * + * @param filePath - The path to the .ocap file. + * @returns The ref string. + */ +export async function readRefFromFile(filePath: string): Promise { + const { readFile } = await import('node:fs/promises'); + const raw = (await readFile(filePath, 'utf-8')).trim(); + const lines = raw.split('\n').filter((line) => !line.startsWith('#!')); + const ref = lines.join('\n').trim(); + if (!ref) { + throw new Error(`No ref found in ${filePath}`); + } + return ref; +} diff --git a/packages/cli/src/commands/daemon-entry.ts b/packages/cli/src/commands/daemon-entry.ts new file mode 100644 index 000000000..85fc704be --- /dev/null +++ b/packages/cli/src/commands/daemon-entry.ts @@ -0,0 +1,112 @@ +/* eslint-disable n/no-process-exit, n/no-process-env, n/no-sync */ +import '@metamask/kernel-shims/endoify-node'; +import { Logger } from '@metamask/logger'; +import type { LogEntry } from '@metamask/logger'; +import { chmod, mkdir, rm, writeFile } from 'node:fs/promises'; +import { createRequire } from 'node:module'; +import { homedir } from 'node:os'; +import { dirname, join, resolve } from 'node:path'; +import { pathToFileURL } from 'node:url'; + +import { bundleFile } from './bundle.ts'; + +/** + * Create a file transport that writes logs to a file. + * + * @param logPath - The log file path. + * @returns A log transport function. + */ +function makeFileTransport(logPath: string) { + // eslint-disable-next-line @typescript-eslint/no-require-imports, n/global-require -- need sync fs for log transport + const { appendFileSync } = require('node:fs') as typeof import('node:fs'); + return (entry: LogEntry): void => { + const line = `[${new Date().toISOString()}] [${entry.level}] ${entry.message ?? ''} ${(entry.data ?? []).map(String).join(' ')}\n`; + appendFileSync(logPath, line); + }; +} + +/** + * Main daemon entry point. Starts the daemon process and keeps it running. + */ +async function main(): Promise { + const ocapDir = join(homedir(), '.ocap'); + await mkdir(ocapDir, { recursive: true }); + + const logPath = join(ocapDir, 'daemon.log'); + const logger = new Logger({ + tags: ['daemon'], + transports: [makeFileTransport(logPath)], + }); + + try { + const socketPath = + process.env.OCAP_SOCKET_PATH ?? join(ocapDir, 'console.sock'); + const consoleName = process.env.OCAP_CONSOLE_NAME ?? 'system-console'; + + // Bundle system console vat if needed + const bundlesDir = join(ocapDir, 'bundles'); + await mkdir(bundlesDir, { recursive: true }); + + const bundlePath = join(bundlesDir, 'system-console-vat.bundle'); + const cjsRequire = createRequire(import.meta.url); + const kernelPkgPath = cjsRequire.resolve( + '@metamask/ocap-kernel/package.json', + ); + const vatSource = resolve( + dirname(kernelPkgPath), + 'src/vats/system-console-vat.ts', + ); + logger.info(`Bundling system console vat from ${vatSource}...`); + await bundleFile(vatSource, { logger, targetPath: bundlePath }); + const bundleSpec = pathToFileURL(bundlePath).href; + + // Dynamically import to avoid pulling @ocap/nodejs into the CLI bundle graph + // eslint-disable-next-line import-x/no-extraneous-dependencies -- workspace package + const { startDaemon } = await import('@ocap/nodejs'); + + const handle = await startDaemon({ + systemConsoleBundleSpec: bundleSpec, + systemConsoleName: consoleName, + socketPath, + resetStorage: true, + logger, + }); + + // Write the admin .ocap file so `ok ` works + const ocapPath = + process.env.OCAP_CONSOLE_PATH ?? join(ocapDir, `${consoleName}.ocap`); + await mkdir(dirname(ocapPath), { recursive: true }); + await writeFile(ocapPath, `#!/usr/bin/env ok\n${handle.selfRef}\n`); + await chmod(ocapPath, 0o700); + logger.info(`Wrote ${ocapPath}`); + + // Write PID file so `ok daemon stop` can signal this process + const pidPath = join(ocapDir, 'daemon.pid'); + await writeFile(pidPath, String(process.pid)); + + logger.info(`Daemon started. Socket: ${handle.socketPath}`); + + // Keep the process alive + const shutdown = async (signal: string): Promise => { + logger.info(`Received ${signal}, shutting down...`); + await handle.close(); + await rm(pidPath, { force: true }); + process.exit(0); + }; + + process.on('SIGTERM', () => { + shutdown('SIGTERM').catch(() => process.exit(1)); + }); + process.on('SIGINT', () => { + shutdown('SIGINT').catch(() => process.exit(1)); + }); + } catch (error) { + logger.error('Daemon startup failed:', error); + process.exit(1); + } +} + +main().catch((error) => { + process.stderr.write(`Daemon fatal: ${String(error)}\n`); + process.exit(1); +}); diff --git a/packages/cli/src/commands/daemon-spawn.ts b/packages/cli/src/commands/daemon-spawn.ts new file mode 100644 index 000000000..0b9e128f9 --- /dev/null +++ b/packages/cli/src/commands/daemon-spawn.ts @@ -0,0 +1,48 @@ +import { spawn } from 'node:child_process'; +import { dirname, join } from 'node:path'; +import { fileURLToPath } from 'node:url'; + +import { isDaemonRunning } from './daemon-client.ts'; + +const POLL_INTERVAL_MS = 100; +const MAX_POLLS = 300; // 30 seconds + +/** + * Ensure the daemon is running. If it is not, spawn it as a detached process + * and wait until the socket becomes responsive. + * + * @param socketPath - The UNIX socket path. + */ +export async function ensureDaemon(socketPath: string): Promise { + if (await isDaemonRunning(socketPath)) { + return; + } + + process.stderr.write('Starting daemon...\n'); + + const currentDir = dirname(fileURLToPath(import.meta.url)); + const entryPath = join(currentDir, 'daemon-entry.mjs'); + + const child = spawn(process.execPath, [entryPath], { + detached: true, + stdio: 'ignore', + env: { + ...process.env, // eslint-disable-line n/no-process-env -- pass env to child + OCAP_SOCKET_PATH: socketPath, + }, + }); + child.unref(); + + // Poll until daemon responds + for (let i = 0; i < MAX_POLLS; i++) { + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)); + if (await isDaemonRunning(socketPath)) { + process.stderr.write('Daemon ready.\n'); + return; + } + } + + throw new Error( + `Daemon did not start within ${(MAX_POLLS * POLL_INTERVAL_MS) / 1000}s`, + ); +} diff --git a/packages/cli/src/ok.ts b/packages/cli/src/ok.ts new file mode 100644 index 000000000..15401c6f7 --- /dev/null +++ b/packages/cli/src/ok.ts @@ -0,0 +1,396 @@ +/* eslint-disable n/no-process-exit, n/no-sync, no-negated-condition */ +import '@metamask/kernel-shims/endoify-node'; +import { existsSync, fstatSync } from 'node:fs'; +import { readFile, rm } from 'node:fs/promises'; +import { homedir } from 'node:os'; +import { join, resolve } from 'node:path'; +import { pathToFileURL } from 'node:url'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; + +import { + getSocketPath, + isDaemonRunning, + sendCommand, + readStdin, + readRefFromFile, +} from './commands/daemon-client.ts'; +import { ensureDaemon } from './commands/daemon-spawn.ts'; + +const home = homedir(); + +/** + * Replace the home directory prefix with `~` for display. + * + * @param path - An absolute path. + * @returns The path with the home prefix replaced. + */ +function tildify(path: string): string { + return path.startsWith(home) ? `~${path.slice(home.length)}` : path; +} + +/** + * Handle the core invocation: resolve ref, call method, output result. + * + * @param args - CLI arguments after `ok`. + * @param socketPath - The daemon socket path. + */ +async function handleInvoke(args: string[], socketPath: string): Promise { + let ref: string | undefined; + let method: string; + let methodArgs: string[]; + + const firstArg = args[0]; + if ( + firstArg !== undefined && + (firstArg.endsWith('.ocap') || existsSync(firstArg)) + ) { + // File arg mode: ok [...args] + ref = await readRefFromFile(firstArg); + method = args[1] ?? 'help'; + methodArgs = args.slice(2); + } else if ( + !process.stdin.isTTY && + (fstatSync(0).isFIFO() || fstatSync(0).isFile()) + ) { + // Redirected stdin (pipe or file): could be a ref (d-) or JSON data + const stdinContent = await readStdin(); + if (!stdinContent) { + throw new Error('No input on stdin'); + } + if (stdinContent.startsWith('d-')) { + // Ref mode: ok [...args] < file.ocap + ref = stdinContent; + method = args[0] ?? 'help'; + methodArgs = args.slice(1); + } else { + // Data mode: cat config.json | ok launch + method = args[0] ?? 'help'; + methodArgs = [stdinContent, ...args.slice(1)]; + } + } else { + // No ref — dispatch on the system console itself + method = args[0] ?? 'help'; + methodArgs = args.slice(1); + } + + // For launch: resolve relative bundleSpec paths to file:// URLs. + // Handle shell word-splitting: `$(cat file.json)` without quotes splits + // JSON into many args. Try joining all args as one JSON string first. + if (method === 'launch') { + methodArgs = rejoinSplitJson(methodArgs); + methodArgs = methodArgs.map((arg) => { + try { + const parsed = JSON.parse(arg) as unknown; + if (isClusterConfigLike(parsed)) { + return JSON.stringify(resolveBundleSpecs(parsed)); + } + } catch { + // not JSON — leave as-is + } + return arg; + }); + } + + // Parse args: try JSON for each, fall back to string + const parsedArgs = methodArgs.map((arg) => { + try { + return JSON.parse(arg) as unknown; + } catch { + return arg; + } + }); + + const request: { ref?: string; method: string; args?: unknown[] } = { + method, + ...(ref !== undefined ? { ref } : {}), + ...(parsedArgs.length > 0 ? { args: parsedArgs } : {}), + }; + + const response = await sendCommand(socketPath, request); + + if (!response.ok) { + process.stderr.write(`Error: ${response.error}\n`); + process.exit(1); + } + + const isTTY = process.stdout.isTTY ?? false; + const { result } = response; + + // Check if result contains a ref (capability) + const resultRef = isRefResult(result) ? result.ref : undefined; + + if (resultRef && !isTTY) { + // Piped: output .ocap content for the ref + process.stdout.write(`#!/usr/bin/env ok\n${resultRef}\n`); + } else if (isTTY) { + process.stdout.write(`${JSON.stringify(result, null, 2)}\n`); + } else { + process.stdout.write(`${JSON.stringify(result)}\n`); + } +} + +/** + * Check if a result object contains a ref field. + * + * @param result - The result to check. + * @returns True if the result has a ref string. + */ +function isRefResult(result: unknown): result is { ref: string } { + return ( + typeof result === 'object' && + result !== null && + 'ref' in result && + typeof (result as { ref: unknown }).ref === 'string' + ); +} + +/** + * Rejoin args that were word-split by the shell from a single JSON value. + * + * When a user writes `ok launch $(cat config.json)` without quotes, bash + * splits the JSON on whitespace into many argv entries. This function + * detects that pattern and reassembles the original JSON. + * + * @param args - The method arguments (possibly word-split). + * @returns The args, with split JSON rejoined into a single element. + */ +function rejoinSplitJson(args: string[]): string[] { + if (args.length <= 1) { + return args; + } + // If the first arg already parses as a complete JSON object, no fix needed + const first = args[0]; + if (first !== undefined) { + try { + JSON.parse(first); + return args; + } catch { + // First arg alone isn't valid JSON — try joining + } + } + const joined = args.join(' '); + try { + JSON.parse(joined); + return [joined]; + } catch { + return args; + } +} + +/** + * Check if a value looks like a cluster config (has bootstrap + vats). + * + * @param value - The value to check. + * @returns True if the value has bootstrap and vats fields. + */ +function isClusterConfigLike( + value: unknown, +): value is { vats: Record } { + return ( + typeof value === 'object' && + value !== null && + 'bootstrap' in value && + 'vats' in value && + typeof (value as { vats: unknown }).vats === 'object' + ); +} + +/** + * Resolve relative bundleSpec paths in a cluster config to file:// URLs. + * + * @param config - The cluster config object. + * @param config.vats - The vat configurations with optional bundleSpec paths. + * @returns The config with resolved bundleSpec URLs. + */ +function resolveBundleSpecs(config: { + vats: Record; +}): unknown { + const resolvedVats: Record = {}; + for (const [vatName, vatConfig] of Object.entries(config.vats)) { + const spec = vatConfig.bundleSpec; + if (spec && !spec.includes('://')) { + resolvedVats[vatName] = { + ...vatConfig, + bundleSpec: pathToFileURL(resolve(spec)).href, + }; + } else { + resolvedVats[vatName] = vatConfig; + } + } + return { ...config, vats: resolvedVats }; +} + +/** + * Handle daemon management commands. + * + * @param args - CLI arguments after `ok daemon`. + * @param socketPath - The daemon socket path. + */ +async function handleDaemon(args: string[], socketPath: string): Promise { + const subcommand = args[0]; + + if (subcommand === 'stop') { + if (!(await isDaemonRunning(socketPath))) { + process.stderr.write('Daemon is not running.\n'); + return; + } + + const pidPath = join(homedir(), '.ocap', 'daemon.pid'); + + let pid: number | undefined; + try { + pid = Number(await readFile(pidPath, 'utf-8')); + } catch { + // PID file missing — fall back to manual instructions + } + + if (!pid || Number.isNaN(pid)) { + process.stderr.write( + 'PID file not found. Stop the daemon manually:\n' + + ` kill $(lsof -t ${tildify(socketPath)})\n`, + ); + return; + } + + process.stderr.write('Stopping daemon...\n'); + try { + process.kill(pid, 'SIGTERM'); + } catch { + process.stderr.write( + 'Failed to send SIGTERM (process may already be gone).\n', + ); + await rm(pidPath, { force: true }); + return; + } + + // Poll until socket stops responding (max 5s) + const pollEnd = Date.now() + 5_000; + while (Date.now() < pollEnd) { + await new Promise((_resolve) => setTimeout(_resolve, 250)); + if (!(await isDaemonRunning(socketPath))) { + break; + } + } + + await rm(pidPath, { force: true }); + + if (await isDaemonRunning(socketPath)) { + process.stderr.write('Daemon did not stop within 5 seconds.\n'); + } else { + process.stderr.write('Daemon stopped.\n'); + } + return; + } + + if (subcommand === 'begone') { + const forGood = args.includes('--forgood'); + if (!forGood) { + process.stderr.write( + 'Usage: ok daemon begone --forgood\n' + + 'This will delete all OCAP daemon state.\n', + ); + process.exit(1); + } + // eslint-disable-next-line import-x/no-extraneous-dependencies -- workspace package + const { flushDaemon } = await import('@ocap/nodejs'); + await flushDaemon({ socketPath }); + process.stderr.write('All daemon state flushed.\n'); + return; + } + + // Default: start daemon (or confirm running) + let consolePath = 'system-console.ocap'; + const consoleIdx = args.indexOf('--console'); + if (consoleIdx !== -1 && args[consoleIdx + 1]) { + consolePath = args[consoleIdx + 1] ?? consolePath; + } + + // Resolve relative to PWD + const ocapPath = resolve(consolePath); + // Derive the console name from the filename (strip .ocap if present) + const consoleName = ocapPath.endsWith('.ocap') + ? ocapPath.slice(ocapPath.lastIndexOf('/') + 1, -5) + : ocapPath.slice(ocapPath.lastIndexOf('/') + 1); + + // eslint-disable-next-line n/no-process-env -- CLI sets env for daemon child process + process.env.OCAP_CONSOLE_NAME = consoleName; + // eslint-disable-next-line n/no-process-env -- CLI sets env for daemon child process + process.env.OCAP_CONSOLE_PATH = ocapPath; + + await ensureDaemon(socketPath); + + process.stderr.write(`Daemon running. Socket: ${tildify(socketPath)}\n`); + if (existsSync(ocapPath)) { + process.stderr.write(`Admin console: ${tildify(ocapPath)}\n`); + } +} + +const socketPath = getSocketPath(); + +const cli = yargs(hideBin(process.argv)) + .scriptName('ok') + .usage('$0 [file.ocap] [...args]') + .help(false) + + .command( + 'daemon [subcommand]', + 'Manage the daemon process', + (_yargs) => + _yargs + .positional('subcommand', { + describe: 'Subcommand: stop, begone', + type: 'string', + }) + .option('console', { + describe: 'Path for the .ocap admin file (relative to PWD)', + type: 'string', + default: 'system-console.ocap', + }) + .option('forgood', { + describe: 'Confirm state deletion (for begone)', + type: 'boolean', + }), + async (args) => { + const daemonArgs: string[] = []; + if (args.subcommand) { + daemonArgs.push(String(args.subcommand)); + } + if (args.forgood) { + daemonArgs.push('--forgood'); + } + if (args.console && args.console !== 'system-console.ocap') { + daemonArgs.push('--console', String(args.console)); + } + await handleDaemon(daemonArgs, socketPath); + }, + ) + + // Default: file.ocap dispatch or bare invocation + .command( + '$0 [args..]', + false, + (_yargs) => _yargs.strict(false), + async (args) => { + const invokeArgs = ((args.args ?? []) as string[]).map(String); + await ensureDaemon(socketPath); + await handleInvoke( + invokeArgs.length > 0 ? invokeArgs : ['help'], + socketPath, + ); + }, + ) + + .version(false) + .fail((message, error) => { + if (error) { + process.stderr.write( + `Error: ${error instanceof Error ? error.message : String(error)}\n`, + ); + } else if (message) { + process.stderr.write(`${message}\n`); + } + process.exit(1); + }); + +await cli.parse(); diff --git a/packages/cli/tsconfig.build.json b/packages/cli/tsconfig.build.json index 5982e62d4..0aac5d7a6 100644 --- a/packages/cli/tsconfig.build.json +++ b/packages/cli/tsconfig.build.json @@ -6,7 +6,8 @@ "outDir": "./dist", "emitDeclarationOnly": false, "rootDir": "./src", - "types": ["ses", "node"] + "types": ["ses", "node"], + "paths": {} }, "references": [ { "path": "../logger/tsconfig.build.json" }, diff --git a/packages/kernel-agents/src/utils.ts b/packages/kernel-agents/src/utils.ts index 6112bfe2b..986afdd99 100644 --- a/packages/kernel-agents/src/utils.ts +++ b/packages/kernel-agents/src/utils.ts @@ -2,17 +2,7 @@ import type { Logger } from '@metamask/logger'; import type { SampleCollector } from './types.ts'; -/** - * Return a new object with the undefined values removed. - * - * @param record - The record to filter. - * @returns The new object with the undefined values removed. - */ -// eslint-disable-next-line @typescript-eslint/explicit-function-return-type -export const ifDefined = (record: Record) => - Object.fromEntries( - Object.entries(record).filter(([_, value]) => value !== undefined), - ); +export { ifDefined } from '@metamask/kernel-utils'; /** * Await a promise, and call the abort callback when done or on error. diff --git a/packages/kernel-utils/src/index.test.ts b/packages/kernel-utils/src/index.test.ts index 67cef419d..7533cd072 100644 --- a/packages/kernel-utils/src/index.test.ts +++ b/packages/kernel-utils/src/index.test.ts @@ -14,6 +14,7 @@ describe('index', () => { 'delay', 'fetchValidatedJson', 'fromHex', + 'ifDefined', 'installWakeDetector', 'isJsonRpcCall', 'isJsonRpcMessage', diff --git a/packages/kernel-utils/src/index.ts b/packages/kernel-utils/src/index.ts index 934437874..09d85af54 100644 --- a/packages/kernel-utils/src/index.ts +++ b/packages/kernel-utils/src/index.ts @@ -3,7 +3,7 @@ export { makeDiscoverableExo } from './discoverable.ts'; export type { DiscoverableExo } from './discoverable.ts'; export type { JsonSchema, MethodSchema } from './schema.ts'; export { fetchValidatedJson } from './fetchValidatedJson.ts'; -export { abortableDelay, delay, makeCounter } from './misc.ts'; +export { abortableDelay, delay, ifDefined, makeCounter } from './misc.ts'; export { stringify } from './stringify.ts'; export { installWakeDetector } from './wake-detector.ts'; export type { WakeDetectorOptions } from './wake-detector.ts'; diff --git a/packages/kernel-utils/src/misc.test.ts b/packages/kernel-utils/src/misc.test.ts index 64ef72d57..83bb3a929 100644 --- a/packages/kernel-utils/src/misc.test.ts +++ b/packages/kernel-utils/src/misc.test.ts @@ -1,7 +1,32 @@ import { AbortError } from '@metamask/kernel-errors'; import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import { abortableDelay, delay, makeCounter } from './misc.ts'; +import { abortableDelay, delay, ifDefined, makeCounter } from './misc.ts'; + +describe('ifDefined', () => { + it('removes undefined values', () => { + expect(ifDefined({ a: 1, b: undefined, c: 3 })).toStrictEqual({ + a: 1, + c: 3, + }); + }); + + it('returns empty object when all values are undefined', () => { + expect(ifDefined({ a: undefined, b: undefined })).toStrictEqual({}); + }); + + it('preserves all values when none are undefined', () => { + expect(ifDefined({ a: 1, b: 'two', c: null })).toStrictEqual({ + a: 1, + b: 'two', + c: null, + }); + }); + + it('returns empty object for empty input', () => { + expect(ifDefined({})).toStrictEqual({}); + }); +}); describe('misc utilities', () => { beforeEach(() => { diff --git a/packages/kernel-utils/src/misc.ts b/packages/kernel-utils/src/misc.ts index 5f5893adb..8c2b30dcf 100644 --- a/packages/kernel-utils/src/misc.ts +++ b/packages/kernel-utils/src/misc.ts @@ -1,5 +1,18 @@ import { AbortError } from '@metamask/kernel-errors'; +/** + * Return a new object with the undefined values removed. + * Useful for building options bags with exact optional property types. + * + * @param record - The record to filter. + * @returns The new object with the undefined values removed. + */ +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export const ifDefined = (record: Record) => + Object.fromEntries( + Object.entries(record).filter(([_, value]) => value !== undefined), + ); + /** * A simple counter which increments and returns when called. * diff --git a/packages/nodejs/src/daemon/flush-daemon.ts b/packages/nodejs/src/daemon/flush-daemon.ts new file mode 100644 index 000000000..426cb422e --- /dev/null +++ b/packages/nodejs/src/daemon/flush-daemon.ts @@ -0,0 +1,34 @@ +import { rm } from 'node:fs/promises'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; + +/** + * Options for flushing daemon state. + */ +export type FlushDaemonOptions = { + /** UNIX socket path. Defaults to ~/.ocap/console.sock. */ + socketPath?: string; + /** SQLite database filename. Defaults to ~/.ocap/kernel.sqlite. */ + dbFilename?: string; +}; + +/** + * Delete all daemon state: kernel DB, bundles cache, and socket. + * + * @param options - Optional overrides for file paths. + */ +export async function flushDaemon(options?: FlushDaemonOptions): Promise { + const ocapDir = join(homedir(), '.ocap'); + const socketPath = options?.socketPath ?? join(ocapDir, 'console.sock'); + const dbFilename = options?.dbFilename ?? join(ocapDir, 'kernel.sqlite'); + const bundlesDir = join(ocapDir, 'bundles'); + + const pidPath = join(ocapDir, 'daemon.pid'); + + await Promise.all([ + rm(dbFilename, { force: true }), + rm(socketPath, { force: true }), + rm(bundlesDir, { recursive: true, force: true }), + rm(pidPath, { force: true }), + ]); +} diff --git a/packages/nodejs/src/daemon/start-daemon.test.ts b/packages/nodejs/src/daemon/start-daemon.test.ts new file mode 100644 index 000000000..9f18c24bd --- /dev/null +++ b/packages/nodejs/src/daemon/start-daemon.test.ts @@ -0,0 +1,138 @@ +import { vi, describe, it, expect, afterEach } from 'vitest'; + +import { startDaemon } from './start-daemon.ts'; +import type { DaemonHandle } from './start-daemon.ts'; + +// Mock makeKernel to avoid real kernel creation +vi.mock('../kernel/make-kernel.ts', () => ({ + makeKernel: vi.fn().mockResolvedValue({ + initIdentity: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + getSystemSubclusterRoot: vi.fn().mockReturnValue('ko-root'), + queueMessage: vi.fn().mockResolvedValue({ body: '"d-1"', slots: [] }), + }), +})); + +// Mock kunser to deserialise the capdata returned by queueMessage +vi.mock('@metamask/ocap-kernel', async () => { + const actual = await vi.importActual( + '@metamask/ocap-kernel', + ); + return { ...actual, kunser: vi.fn().mockReturnValue('d-1') }; +}); + +// Mock filesystem operations +vi.mock('node:fs/promises', async () => { + const actual = + await vi.importActual( + 'node:fs/promises', + ); + return { + ...actual, + mkdir: vi.fn().mockResolvedValue(undefined), + }; +}); + +describe('startDaemon', () => { + let handle: DaemonHandle | undefined; + + afterEach(async () => { + if (handle) { + const toClose = handle; + handle = undefined; + await toClose.close(); + } + }); + + it('creates kernel with IO-based system subcluster config', async () => { + const { makeKernel } = await import('../kernel/make-kernel.ts'); + const mockedMakeKernel = vi.mocked(makeKernel); + + const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; + + handle = await startDaemon({ + systemConsoleBundleSpec: 'http://localhost/bundle', + systemConsoleName: 'my-console', + socketPath: tmpSocket, + }); + + expect(mockedMakeKernel).toHaveBeenCalledWith( + expect.objectContaining({ + systemSubclusters: [ + { + name: 'my-console', + config: { + bootstrap: 'my-console', + io: { + console: { + type: 'socket', + path: tmpSocket, + }, + }, + services: ['kernelFacet', 'console'], + vats: { + 'my-console': { + bundleSpec: 'http://localhost/bundle', + parameters: { name: 'my-console' }, + }, + }, + }, + }, + ], + }), + ); + }); + + it('returns socket path, selfRef, and close function', async () => { + const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; + + handle = await startDaemon({ + systemConsoleBundleSpec: 'http://localhost/bundle', + socketPath: tmpSocket, + }); + + expect(handle.socketPath).toBe(tmpSocket); + expect(handle.selfRef).toBe('d-1'); + expect(typeof handle.close).toBe('function'); + expect(handle.kernel).toBeDefined(); + }); + + it('issues a self-ref via getSystemSubclusterRoot and queueMessage', async () => { + const { makeKernel } = await import('../kernel/make-kernel.ts'); + const mockedMakeKernel = vi.mocked(makeKernel); + + const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; + + handle = await startDaemon({ + systemConsoleBundleSpec: 'http://localhost/bundle', + systemConsoleName: 'my-console', + socketPath: tmpSocket, + }); + + const mockKernel = await mockedMakeKernel.mock.results[0]!.value; + expect(mockKernel.getSystemSubclusterRoot).toHaveBeenCalledWith( + 'my-console', + ); + expect(mockKernel.queueMessage).toHaveBeenCalledWith( + 'ko-root', + 'issueRef', + ['ko-root', true], + ); + }); + + it('calls kernel.stop on close', async () => { + const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; + + handle = await startDaemon({ + systemConsoleBundleSpec: 'http://localhost/bundle', + socketPath: tmpSocket, + }); + + const { stop } = handle.kernel; + const toClose = handle; + handle = undefined; + await toClose.close(); + + expect(stop).toHaveBeenCalled(); + }); +}); diff --git a/packages/nodejs/src/daemon/start-daemon.ts b/packages/nodejs/src/daemon/start-daemon.ts new file mode 100644 index 000000000..5533f1e51 --- /dev/null +++ b/packages/nodejs/src/daemon/start-daemon.ts @@ -0,0 +1,118 @@ +import { ifDefined } from '@metamask/kernel-utils'; +import type { Logger } from '@metamask/logger'; +import type { Kernel, SystemSubclusterConfig } from '@metamask/ocap-kernel'; +import { kunser } from '@metamask/ocap-kernel'; +import { mkdir } from 'node:fs/promises'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; + +import { makeKernel } from '../kernel/make-kernel.ts'; + +/** + * Options for starting the daemon. + */ +export type StartDaemonOptions = { + /** UNIX socket path for the system console IO channel. Defaults to ~/.ocap/console.sock. */ + socketPath?: string; + /** URL to the bundled system-console-vat. */ + systemConsoleBundleSpec: string; + /** Name for the system console subcluster. Defaults to 'system-console'. */ + systemConsoleName?: string; + /** Path to vat worker file. */ + workerFilePath?: string; + /** SQLite database filename. Defaults to ~/.ocap/kernel.sqlite. */ + dbFilename?: string; + /** If true, clear kernel storage. */ + resetStorage?: boolean; + /** Logger instance. */ + logger?: Logger; + /** Seed for libp2p key generation. */ + keySeed?: string; +}; + +/** + * Handle returned by {@link startDaemon}. + */ +export type DaemonHandle = { + kernel: Kernel; + socketPath: string; + selfRef: string; + close: () => Promise; +}; + +/** + * Start the OCAP daemon. + * + * Creates a kernel with a system console vat that listens for commands + * on a UNIX domain socket IO channel. The kernel process IS the daemon. + * + * @param options - Configuration options. + * @returns A daemon handle. + */ +export async function startDaemon( + options: StartDaemonOptions, +): Promise { + const { + systemConsoleBundleSpec, + systemConsoleName = 'system-console', + workerFilePath, + resetStorage, + logger, + keySeed, + } = options; + + const ocapDir = join(homedir(), '.ocap'); + await mkdir(ocapDir, { recursive: true }); + + const socketPath = options.socketPath ?? join(ocapDir, 'console.sock'); + const dbFilename = options.dbFilename ?? join(ocapDir, 'kernel.sqlite'); + + // Build system subcluster config with IO channel for the console socket + const systemSubcluster: SystemSubclusterConfig = { + name: systemConsoleName, + config: { + bootstrap: systemConsoleName, + io: { + console: { + type: 'socket' as const, + path: socketPath, + }, + }, + services: ['kernelFacet', 'console'], + vats: { + [systemConsoleName]: { + bundleSpec: systemConsoleBundleSpec, + parameters: { name: systemConsoleName }, + }, + }, + }, + }; + + const kernel = await makeKernel({ + ...ifDefined({ workerFilePath, resetStorage, logger }), + dbFilename, + keySeed, + systemSubclusters: [systemSubcluster], + }); + + await kernel.initIdentity(); + + // Issue a self-ref so the admin .ocap file can address the console root object + const rootKref = kernel.getSystemSubclusterRoot(systemConsoleName); + const capData = await kernel.queueMessage(rootKref, 'issueRef', [ + rootKref, + true, + ]); + const selfRef = kunser(capData) as string; + + const close = async (): Promise => { + await kernel.stop(); + }; + + return { + kernel, + socketPath, + selfRef, + close, + }; +} diff --git a/packages/nodejs/src/index.ts b/packages/nodejs/src/index.ts index 49c133fdf..41f78b592 100644 --- a/packages/nodejs/src/index.ts +++ b/packages/nodejs/src/index.ts @@ -2,3 +2,10 @@ export { NodejsPlatformServices } from './kernel/PlatformServices.ts'; export { makeKernel } from './kernel/make-kernel.ts'; export { makeNodeJsVatSupervisor } from './vat/make-supervisor.ts'; export { makeIOChannelFactory, makeSocketIOChannel } from './io/index.ts'; +export { startDaemon } from './daemon/start-daemon.ts'; +export type { + StartDaemonOptions, + DaemonHandle, +} from './daemon/start-daemon.ts'; +export { flushDaemon } from './daemon/flush-daemon.ts'; +export type { FlushDaemonOptions } from './daemon/flush-daemon.ts'; diff --git a/packages/nodejs/src/io/socket-channel.test.ts b/packages/nodejs/src/io/socket-channel.test.ts index d6a773658..fe8bf982c 100644 --- a/packages/nodejs/src/io/socket-channel.test.ts +++ b/packages/nodejs/src/io/socket-channel.test.ts @@ -134,13 +134,21 @@ describe('makeSocketIOChannel', () => { expect(result).toBeNull(); }); - it('returns null when no client is connected', async () => { + it('blocks read until a client connects and sends data', async () => { const socketPath = tempSocketPath(); const channel = await makeSocketIOChannel('test', socketPath); channels.push(channel); - const result = await channel.read(); - expect(result).toBeNull(); + // Start read before any client connects — should block + const readPromise = channel.read(); + + // Connect and send data + const client = await connectToSocket(socketPath); + clients.push(client); + await writeLine(client, 'hello'); + + const result = await readPromise; + expect(result).toBe('hello'); }); it('throws on write when no client is connected', async () => { diff --git a/packages/nodejs/src/io/socket-channel.ts b/packages/nodejs/src/io/socket-channel.ts index c6cce477f..3acda62f5 100644 --- a/packages/nodejs/src/io/socket-channel.ts +++ b/packages/nodejs/src/io/socket-channel.ts @@ -90,13 +90,21 @@ export async function makeSocketIOChannel( const server = net.createServer((socket) => { if (currentSocket) { - // Only one connection at a time - socket.destroy(); - return; + if (currentSocket.readableEnded || currentSocket.destroyed) { + // Old connection is dead but events haven't been fully processed; + // clean it up and accept the new connection. + currentSocket.removeAllListeners(); + currentSocket.destroy(); + currentSocket = null; + } else { + // Existing active client — reject the new connection + socket.destroy(); + return; + } } - // Drain stale state from any previous connection + // Drain stale data from any previous connection, but keep pending + // readers alive so they can receive data from the new connection. lineQueue.length = 0; - deliverEOF(); currentSocket = socket; decoder = new StringDecoder('utf8'); @@ -132,9 +140,7 @@ export async function makeSocketIOChannel( if (queued !== undefined) { return queued; } - if (!currentSocket) { - return null; - } + // Block until data arrives (from a current or future client connection) return new Promise((resolve) => { readerQueue.push({ resolve }); }); diff --git a/packages/nodejs/src/kernel/make-kernel.ts b/packages/nodejs/src/kernel/make-kernel.ts index 00f6353b4..68bd9b4ab 100644 --- a/packages/nodejs/src/kernel/make-kernel.ts +++ b/packages/nodejs/src/kernel/make-kernel.ts @@ -1,7 +1,10 @@ import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; import { Logger } from '@metamask/logger'; import { Kernel } from '@metamask/ocap-kernel'; -import type { IOChannelFactory } from '@metamask/ocap-kernel'; +import type { + IOChannelFactory, + SystemSubclusterConfig, +} from '@metamask/ocap-kernel'; import { NodejsPlatformServices } from './PlatformServices.ts'; import { makeIOChannelFactory } from '../io/index.ts'; @@ -16,6 +19,7 @@ import { makeIOChannelFactory } from '../io/index.ts'; * @param options.logger - The logger to use for the kernel. * @param options.keySeed - Optional seed for libp2p key generation. * @param options.ioChannelFactory - Optional factory for creating IO channels. + * @param options.systemSubclusters - Optional system subcluster configurations. * @returns The kernel, initialized. */ export async function makeKernel({ @@ -25,6 +29,7 @@ export async function makeKernel({ logger, keySeed, ioChannelFactory, + systemSubclusters, }: { workerFilePath?: string; resetStorage?: boolean; @@ -32,6 +37,7 @@ export async function makeKernel({ logger?: Logger; keySeed?: string | undefined; ioChannelFactory?: IOChannelFactory; + systemSubclusters?: SystemSubclusterConfig[]; }): Promise { const rootLogger = logger ?? new Logger('kernel-worker'); const platformServicesClient = new NodejsPlatformServices({ @@ -48,6 +54,7 @@ export async function makeKernel({ logger: rootLogger.subLogger({ tags: ['kernel'] }), keySeed, ioChannelFactory: ioChannelFactory ?? makeIOChannelFactory(), + ...(systemSubclusters ? { systemSubclusters } : {}), }); return kernel; diff --git a/packages/nodejs/test/e2e/daemon-stack.test.ts b/packages/nodejs/test/e2e/daemon-stack.test.ts new file mode 100644 index 000000000..9e815db44 --- /dev/null +++ b/packages/nodejs/test/e2e/daemon-stack.test.ts @@ -0,0 +1,336 @@ +import type { KernelDatabase } from '@metamask/kernel-store'; +import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; +import { waitUntilQuiescent } from '@metamask/kernel-utils'; +import type { Kernel } from '@metamask/ocap-kernel'; +import { kunser } from '@metamask/ocap-kernel'; +import * as net from 'node:net'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { pathToFileURL } from 'node:url'; +import { describe, it, expect, afterEach } from 'vitest'; + +import { makeIOChannelFactory } from '../../src/io/index.ts'; +import { makeTestKernel } from '../helpers/kernel.ts'; + +const SYSTEM_CONSOLE_NAME = 'system-console'; + +/** + * Generate a unique temp socket path. + * + * @returns A unique socket path. + */ +function tempSocketPath(): string { + return join( + tmpdir(), + `daemon-e2e-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, + ); +} + +/** + * Connect to a UNIX socket. + * + * @param socketPath - The socket path. + * @returns The connected socket. + */ +async function connectToSocket(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const client = net.createConnection(socketPath, () => { + client.removeListener('error', reject); + resolve(client); + }); + client.on('error', reject); + }); +} + +/** + * Write a newline-delimited line to a socket. + * + * @param socket - The socket. + * @param line - The line to write. + */ +async function writeLine(socket: net.Socket, line: string): Promise { + return new Promise((resolve, reject) => { + socket.write(`${line}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +/** + * Read a newline-delimited line from a socket. + * + * @param socket - The socket. + * @returns The line read. + */ +async function readLine(socket: net.Socket): Promise { + return new Promise((resolve) => { + let buffer = ''; + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + socket.removeListener('data', onData); + resolve(buffer.slice(0, idx)); + } + }; + socket.on('data', onData); + }); +} + +/** + * Send a JSON request over a socket and read the JSON response. + * + * @param socketPath - The socket path. + * @param request - The request object. + * @returns The parsed response. + */ +async function sendCommand( + socketPath: string, + request: Record, +): Promise<{ ok: boolean; result?: unknown; error?: string }> { + const socket = await connectToSocket(socketPath); + try { + await writeLine(socket, JSON.stringify(request)); + const responseLine = await readLine(socket); + return JSON.parse(responseLine) as { + ok: boolean; + result?: unknown; + error?: string; + }; + } finally { + socket.destroy(); + } +} + +/** + * Get the bundle spec for the system console vat test bundle. + * + * @returns The bundle spec URL. + */ +function getSystemConsoleBundleSpec(): string { + const bundlePath = join( + import.meta.dirname, + '../vats/system-console-vat.bundle', + ); + return pathToFileURL(bundlePath).href; +} + +describe('Daemon Stack (IO socket protocol)', { timeout: 30_000 }, () => { + let kernel: Kernel | undefined; + let kernelDatabase: KernelDatabase | undefined; + + /** + * Boot a kernel with a system console subcluster using IO socket. + * + * @returns The socket path. + */ + async function bootDaemonStack(): Promise { + const socketPath = tempSocketPath(); + + kernelDatabase = await makeSQLKernelDatabase({ dbFilename: ':memory:' }); + kernel = await makeTestKernel(kernelDatabase, { + ioChannelFactory: makeIOChannelFactory(), + systemSubclusters: [ + { + name: SYSTEM_CONSOLE_NAME, + config: { + bootstrap: SYSTEM_CONSOLE_NAME, + io: { + console: { + type: 'socket' as const, + path: socketPath, + }, + }, + services: ['kernelFacet', 'console'], + vats: { + [SYSTEM_CONSOLE_NAME]: { + bundleSpec: getSystemConsoleBundleSpec(), + parameters: { name: SYSTEM_CONSOLE_NAME }, + }, + }, + }, + }, + ], + }); + + await kernel.initIdentity(); + await waitUntilQuiescent(100); + + return socketPath; + } + + afterEach(async () => { + if (kernel) { + const stopResult = kernel.stop(); + kernel = undefined; + await stopResult; + } + if (kernelDatabase) { + kernelDatabase.close(); + kernelDatabase = undefined; + } + }); + + describe('daemon tier (no ref)', () => { + it('dispatches help command with daemon-tier commands only', async () => { + const socketPath = await bootDaemonStack(); + + const response = await sendCommand(socketPath, { method: 'help' }); + + expect(response.ok).toBe(true); + expect(response.result).toStrictEqual({ + commands: ['help - show available commands', 'status - daemon status'], + }); + }); + + it('dispatches status command returning liveness indicator', async () => { + const socketPath = await bootDaemonStack(); + + const response = await sendCommand(socketPath, { method: 'status' }); + + expect(response).toStrictEqual({ + ok: true, + result: { running: true }, + }); + }); + + it('returns error for unknown command', async () => { + const socketPath = await bootDaemonStack(); + + const response = await sendCommand(socketPath, { + method: 'nonexistent', + }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown command'); + }); + + it('rejects privileged commands at daemon tier', async () => { + const socketPath = await bootDaemonStack(); + + const response = await sendCommand(socketPath, { method: 'ls' }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown command'); + }); + + it('handles sequential requests on separate connections', async () => { + const socketPath = await bootDaemonStack(); + + const response1 = await sendCommand(socketPath, { method: 'help' }); + expect(response1.ok).toBe(true); + + const response2 = await sendCommand(socketPath, { method: 'status' }); + expect(response2.ok).toBe(true); + }); + }); + + describe('privileged tier (ref-based dispatch)', () => { + /** + * Boot daemon and issue a self-ref for the console root object. + * + * @returns The socket path and the issued ref. + */ + async function bootWithSelfRef(): Promise<{ + socketPath: string; + selfRef: string; + }> { + const socketPath = await bootDaemonStack(); + + // Issue a self-ref via kernel API (same as start-daemon.ts does) + const rootKref = kernel!.getSystemSubclusterRoot(SYSTEM_CONSOLE_NAME); + const capData = await kernel!.queueMessage(rootKref, 'issueRef', [ + rootKref, + true, + ]); + const selfRef = kunser(capData) as string; + + return { socketPath, selfRef }; + } + + it('dispatches help via ref', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'help', + }); + + expect(response.ok).toBe(true); + const result = response.result as { commands: string[] }; + expect(result.commands).toContain('help - show available commands'); + expect(result.commands).toContain('ls - list all issued refs'); + }); + + it('dispatches status via ref (returns kernel status)', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'status', + }); + + expect(response.ok).toBe(true); + const result = response.result as Record; + expect(result).toHaveProperty('vats'); + expect(result).toHaveProperty('subclusters'); + }); + + it('dispatches ls via ref', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'ls', + }); + + expect(response.ok).toBe(true); + const result = response.result as { refs: string[] }; + expect(Array.isArray(result.refs)).toBe(true); + }); + + it('dispatches subclusters via ref', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'subclusters', + }); + + expect(response.ok).toBe(true); + expect(Array.isArray(response.result)).toBe(true); + }); + + it('dispatches invoke to call method on a ref through the kernel', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + // Use invoke to call 'ls' on the self-ref (goes through getPresence + E()) + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'invoke', + args: [selfRef, 'ls'], + }); + + expect(response.ok).toBe(true); + const result = response.result as { refs: string[] }; + expect(Array.isArray(result.refs)).toBe(true); + }); + + it('returns error when invoke targets unknown ref', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'invoke', + args: ['d-999', 'someMethod'], + }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown ref: d-999'); + }); + }); +}); diff --git a/packages/nodejs/test/helpers/kernel.ts b/packages/nodejs/test/helpers/kernel.ts index 524b17bb7..1595897c1 100644 --- a/packages/nodejs/test/helpers/kernel.ts +++ b/packages/nodejs/test/helpers/kernel.ts @@ -4,6 +4,7 @@ import { Logger } from '@metamask/logger'; import { Kernel, kunser } from '@metamask/ocap-kernel'; import type { ClusterConfig, + IOChannelFactory, SystemSubclusterConfig, } from '@metamask/ocap-kernel'; @@ -13,6 +14,7 @@ type MakeTestKernelOptions = { resetStorage?: boolean; mnemonic?: string; systemSubclusters?: SystemSubclusterConfig[]; + ioChannelFactory?: IOChannelFactory; }; /** @@ -24,13 +26,19 @@ type MakeTestKernelOptions = { * @param options.resetStorage - Whether to reset the storage (default: true). * @param options.mnemonic - Optional BIP39 mnemonic string. * @param options.systemSubclusters - Optional system subcluster configurations. + * @param options.ioChannelFactory - Optional IO channel factory. * @returns The kernel. */ export async function makeTestKernel( kernelDatabase: KernelDatabase, options: MakeTestKernelOptions = {}, ): Promise { - const { resetStorage = true, mnemonic, systemSubclusters } = options; + const { + resetStorage = true, + mnemonic, + systemSubclusters, + ioChannelFactory, + } = options; const logger = new Logger('test-kernel'); const platformServices = new NodejsPlatformServices({ @@ -40,6 +48,7 @@ export async function makeTestKernel( resetStorage, mnemonic, systemSubclusters, + ioChannelFactory, logger: logger.subLogger({ tags: ['kernel'] }), }); diff --git a/packages/nodejs/test/vats/system-console-vat.ts b/packages/nodejs/test/vats/system-console-vat.ts new file mode 100644 index 000000000..75da555db --- /dev/null +++ b/packages/nodejs/test/vats/system-console-vat.ts @@ -0,0 +1 @@ +export { buildRootObject } from '@metamask/ocap-kernel/src/vats/system-console-vat.ts'; diff --git a/packages/ocap-kernel/src/Kernel.ts b/packages/ocap-kernel/src/Kernel.ts index 4a5296aa4..01465bbb0 100644 --- a/packages/ocap-kernel/src/Kernel.ts +++ b/packages/ocap-kernel/src/Kernel.ts @@ -11,7 +11,7 @@ import { KernelRouter } from './KernelRouter.ts'; import { KernelServiceManager } from './KernelServiceManager.ts'; import type { KernelService } from './KernelServiceManager.ts'; import type { SlotValue } from './liveslots/kernel-marshal.ts'; -import { kslot } from './liveslots/kernel-marshal.ts'; +import { kslot, kunser } from './liveslots/kernel-marshal.ts'; import { OcapURLManager } from './remotes/kernel/OcapURLManager.ts'; import { RemoteManager } from './remotes/kernel/RemoteManager.ts'; import type { RemoteCommsOptions } from './remotes/types.ts'; @@ -379,6 +379,27 @@ export class Kernel { return this.#kernelQueue.enqueueMessage(target, method, args); } + /** + * Send a message to an object in a vat and return the deserialized result. + * + * Unlike {@link queueMessage}, which returns raw CapData, this method + * deserializes the result so that it can be returned through a kernel + * service and re-serialized by liveslots for the calling vat. + * + * @param target - The kref of the object to invoke. + * @param method - The method name. + * @param args - The method arguments. + * @returns The deserialized result of the method invocation. + */ + async invokeMethod( + target: KRef, + method: string, + args: unknown[], + ): Promise { + const capData = await this.queueMessage(target, method, args); + return kunser(capData); + } + /** * Issue an OCAP URL for a kernel reference. * diff --git a/packages/ocap-kernel/src/kernel-facet.test.ts b/packages/ocap-kernel/src/kernel-facet.test.ts index 4e53f8a58..ddb4ddddf 100644 --- a/packages/ocap-kernel/src/kernel-facet.test.ts +++ b/packages/ocap-kernel/src/kernel-facet.test.ts @@ -11,6 +11,7 @@ const makeMockKernel = (): KernelFacetSource => ({ getSubcluster: () => undefined, getSubclusters: () => [], getSystemSubclusterRoot: () => 'ko99', + invokeMethod: async () => Promise.resolve(null), launchSubcluster: async () => Promise.resolve({ subclusterId: 's1', @@ -32,6 +33,7 @@ describe('makeKernelFacet', () => { expect(typeof facet.getSubcluster).toBe('function'); expect(typeof facet.getSubclusters).toBe('function'); expect(typeof facet.getSystemSubclusterRoot).toBe('function'); + expect(typeof facet.invokeMethod).toBe('function'); expect(typeof facet.launchSubcluster).toBe('function'); expect(typeof facet.ping).toBe('function'); expect(typeof facet.pingVat).toBe('function'); diff --git a/packages/ocap-kernel/src/kernel-facet.ts b/packages/ocap-kernel/src/kernel-facet.ts index 79ecbe443..13ff05321 100644 --- a/packages/ocap-kernel/src/kernel-facet.ts +++ b/packages/ocap-kernel/src/kernel-facet.ts @@ -8,6 +8,7 @@ const kernelFacetMethodNames = [ 'getSubcluster', 'getSubclusters', 'getSystemSubclusterRoot', + 'invokeMethod', 'launchSubcluster', 'pingVat', 'queueMessage', diff --git a/packages/ocap-kernel/src/vats/system-console-vat.test.ts b/packages/ocap-kernel/src/vats/system-console-vat.test.ts new file mode 100644 index 000000000..d22086d28 --- /dev/null +++ b/packages/ocap-kernel/src/vats/system-console-vat.test.ts @@ -0,0 +1,491 @@ +import { makeDefaultExo } from '@metamask/kernel-utils/exo'; +import { describe, it, expect, beforeEach } from 'vitest'; + +import { buildRootObject } from './system-console-vat.ts'; + +/** + * Create a mock baggage store. + * + * @returns A mock baggage with has/get/set/init methods. + */ +function makeMockBaggage() { + const store = new Map(); + return { + has: (key: string) => store.has(key), + get: (key: string) => store.get(key), + set: (key: string, value: unknown) => store.set(key, value), + init: (key: string, value: unknown) => { + if (store.has(key)) { + throw new Error(`Key already exists: ${key}`); + } + store.set(key, value); + }, + }; +} + +/** + * Create a mock IO service with controllable read queue. + * + * @returns Mock IO service and control functions. + */ +function makeMockIOService() { + const readQueue: (string | null)[] = []; + const pendingReads: ((value: string | null) => void)[] = []; + const written: string[] = []; + + return { + ioService: makeDefaultExo('mockIOService', { + async read() { + const queued = readQueue.shift(); + if (queued !== undefined) { + return queued; + } + return new Promise((resolve) => { + pendingReads.push(resolve); + }); + }, + async write(data: string) { + written.push(data); + }, + }), + deliverLine(line: string): void { + const reader = pendingReads.shift(); + if (reader) { + reader(line); + } else { + readQueue.push(line); + } + }, + deliverEOF(): void { + const reader = pendingReads.shift(); + if (reader) { + reader(null); + } else { + readQueue.push(null); + } + }, + get written() { + return written; + }, + }; +} + +/** + * Create a mock kernel facet using plain functions (not vi.fn) to avoid + * SES lockdown issues with frozen mock internals. + * + * @returns A mock kernel facet and call trackers. + */ +function makeMockKernelFacet() { + const calls: Record = { + getStatus: [], + getSubclusters: [], + invokeMethod: [], + launchSubcluster: [], + terminateSubcluster: [], + }; + + const facet = makeDefaultExo('mockKernelFacet', { + async invokeMethod(...args: unknown[]) { + calls.invokeMethod.push(args); + return { mocked: true }; + }, + async getStatus(...args: unknown[]) { + calls.getStatus.push(args); + return { + incarnation: 1, + subclusters: 0, + vats: 1, + pendingMessages: 0, + }; + }, + async getSubclusters(...args: unknown[]) { + calls.getSubclusters.push(args); + return []; + }, + async launchSubcluster(...args: unknown[]) { + calls.launchSubcluster.push(args); + return { + subclusterId: 'sub-1', + rootKref: 'ko1', + bootstrapResult: undefined, + }; + }, + async terminateSubcluster(...args: unknown[]) { + calls.terminateSubcluster.push(args); + }, + }); + + return { facet, calls }; +} + +describe('system-console-vat', () => { + let baggage: ReturnType; + let kernelFacet: ReturnType; + let io: ReturnType; + + beforeEach(() => { + baggage = makeMockBaggage(); + kernelFacet = makeMockKernelFacet(); + io = makeMockIOService(); + }); + + describe('bootstrap', () => { + it('stores kernel facet in baggage', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + expect(baggage.has('kernelFacet')).toBe(true); + }); + + it('starts REPL loop when console IO service is provided', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap( + {}, + { + kernelFacet: kernelFacet.facet, + console: io.ioService, + }, + ); + + // Give it a tick to start + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Send a help command — if the REPL loop is running, it will respond + io.deliverLine(JSON.stringify({ method: 'help' })); + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(io.written.length).toBeGreaterThan(0); + }); + }); + + describe('REPL dispatch (daemon tier)', () => { + async function setupRepl() { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap( + {}, + { + kernelFacet: kernelFacet.facet, + console: io.ioService, + }, + ); + await new Promise((resolve) => setTimeout(resolve, 10)); + return root; + } + + async function sendRequest(request: Record) { + io.deliverLine(JSON.stringify(request)); + await new Promise((resolve) => setTimeout(resolve, 50)); + const lastWrite = io.written[io.written.length - 1]; + return JSON.parse(lastWrite!) as { + ok: boolean; + result?: unknown; + error?: string; + }; + } + + it('dispatches help command with daemon-tier commands only', async () => { + await setupRepl(); + const response = await sendRequest({ method: 'help' }); + + expect(response.ok).toBe(true); + expect(response.result).toStrictEqual({ + commands: ['help - show available commands', 'status - daemon status'], + }); + }); + + it('dispatches status command returning liveness indicator', async () => { + await setupRepl(); + const response = await sendRequest({ method: 'status' }); + + expect(response).toStrictEqual({ + ok: true, + result: { running: true }, + }); + }); + + it.each(['launch', 'terminate', 'subclusters', 'ls', 'revoke', 'invoke'])( + 'returns "Unknown command" for privileged command "%s"', + async (method) => { + await setupRepl(); + const response = await sendRequest({ method }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown command'); + }, + ); + + it('dispatches self-ref REPL command directly on root', async () => { + const root = await setupRepl(); + const ref = root.issueRef('ko-self', true); + const response = await sendRequest({ ref, method: 'help' }); + + expect(response.ok).toBe(true); + expect(response.result).toStrictEqual({ + commands: [ + 'help - show available commands', + 'status - kernel status', + 'subclusters - list subclusters', + 'launch - launch a subcluster', + 'terminate - terminate a subcluster', + 'ls - list all issued refs', + 'revoke - revoke a ref', + 'invoke [...args] - call a method on a ref', + ], + }); + // Should NOT have called invokeMethod — dispatch was direct + expect(kernelFacet.calls.invokeMethod).toHaveLength(0); + }); + + it('returns error for unknown method on self-ref', async () => { + const root = await setupRepl(); + const ref = root.issueRef('ko-self', true); + const response = await sendRequest({ ref, method: 'nonexistent' }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown method on root'); + }); + + it('returns error for unknown command', async () => { + await setupRepl(); + const response = await sendRequest({ method: 'bogus' }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown command'); + }); + + it('returns error for invalid JSON', async () => { + await setupRepl(); + io.deliverLine('not json'); + await new Promise((resolve) => setTimeout(resolve, 50)); + + const lastWrite = io.written[io.written.length - 1]; + const response = JSON.parse(lastWrite!) as { ok: boolean; error: string }; + expect(response.ok).toBe(false); + expect(response.error).toBeDefined(); + }); + + it('returns error for non-object request', async () => { + await setupRepl(); + const response = await sendRequest(42 as never); + expect(response.ok).toBe(false); + expect(response.error).toContain('Request must be a JSON object'); + }); + + it('returns error for request missing method', async () => { + await setupRepl(); + const response = await sendRequest({ ref: 'd-1' }); + expect(response.ok).toBe(false); + expect(response.error).toContain( + 'Request must have a string "method" field', + ); + }); + + it('returns error for non-string ref', async () => { + await setupRepl(); + const response = await sendRequest({ method: 'help', ref: 123 } as never); + expect(response.ok).toBe(false); + expect(response.error).toContain('"ref" must be a string'); + }); + + it('returns error for non-array args', async () => { + await setupRepl(); + const response = await sendRequest({ + method: 'help', + args: 'not-array', + } as never); + expect(response.ok).toBe(false); + expect(response.error).toContain('"args" must be an array'); + }); + + it('continues after EOF (client disconnect)', async () => { + await setupRepl(); + + // Send a command + const response1 = await sendRequest({ method: 'help' }); + expect(response1.ok).toBe(true); + + // Simulate disconnect + io.deliverEOF(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Send another command (new connection) + const response2 = await sendRequest({ method: 'status' }); + expect(response2.ok).toBe(true); + }); + }); + + describe('privileged root object methods', () => { + async function setupRoot() { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + return root; + } + + it('returns help with all privileged commands', async () => { + const root = await setupRoot(); + const result = root.help(); + expect(result.commands).toStrictEqual([ + 'help - show available commands', + 'status - kernel status', + 'subclusters - list subclusters', + 'launch - launch a subcluster', + 'terminate - terminate a subcluster', + 'ls - list all issued refs', + 'revoke - revoke a ref', + 'invoke [...args] - call a method on a ref', + ]); + }); + + it('returns kernel status', async () => { + const root = await setupRoot(); + const result = await root.status(); + + expect(result).toStrictEqual({ + incarnation: 1, + subclusters: 0, + vats: 1, + pendingMessages: 0, + }); + expect(kernelFacet.calls.getStatus).toHaveLength(1); + }); + + it('returns subclusters list', async () => { + const root = await setupRoot(); + const result = await root.subclusters(); + + expect(result).toStrictEqual([]); + expect(kernelFacet.calls.getSubclusters).toHaveLength(1); + }); + + it('launches subcluster and issues ref', async () => { + const root = await setupRoot(); + const config = { + bootstrap: 'test', + vats: { test: { bundleSpec: 'test-bundle' } }, + }; + const result = await root.launch(config); + + expect(result.ref).toMatch(/^d-\d+$/u); + expect(result.subclusterId).toBe('sub-1'); + expect(kernelFacet.calls.launchSubcluster).toHaveLength(1); + }); + + it('terminates subcluster', async () => { + const root = await setupRoot(); + const result = await root.terminate('sub-1'); + + expect(result).toStrictEqual({ ok: true }); + expect(kernelFacet.calls.terminateSubcluster).toHaveLength(1); + }); + + it('revokes a ref', async () => { + const root = await setupRoot(); + root.issueRef('ko1'); + + const result = root.revoke('d-1'); + expect(result).toStrictEqual({ ok: true }); + }); + + it('returns false when revoking unknown ref', async () => { + const root = await setupRoot(); + const result = root.revoke('d-999'); + expect(result).toStrictEqual({ ok: false }); + }); + + it('lists issued refs', async () => { + const root = await setupRoot(); + root.issueRef('ko1'); + root.issueRef('ko2'); + + const result = root.ls(); + expect(result.refs).toHaveLength(2); + expect(result.refs[0]).toMatch(/^d-\d+$/u); + expect(result.refs[1]).toMatch(/^d-\d+$/u); + }); + + it('returns empty list when no refs issued', async () => { + const root = await setupRoot(); + const result = root.ls(); + expect(result).toStrictEqual({ refs: [] }); + }); + + it('throws when invoke target ref is unknown', async () => { + const root = await setupRoot(); + await expect(root.invoke('d-999', 'transfer')).rejects.toThrow( + 'Unknown ref: d-999', + ); + }); + + it('throws when invoke is called without a target ref', async () => { + const root = await setupRoot(); + await expect(root.invoke('', 'transfer')).rejects.toThrow( + 'invoke requires a target ref', + ); + }); + + it('throws when invoke is called without a method', async () => { + const root = await setupRoot(); + const ref = root.issueRef('ko-wallet'); + await expect(root.invoke(ref, '')).rejects.toThrow( + 'invoke requires a method name', + ); + }); + }); + + describe('ref manager', () => { + it('issues idempotent refs for the same kref', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + + const ref1 = root.issueRef('ko1'); + const ref2 = root.issueRef('ko1'); + expect(ref1).toBe(ref2); + expect(ref1).toMatch(/^d-\d+$/u); + }); + + it('issues different refs for different krefs', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + + const ref1 = root.issueRef('ko1'); + const ref2 = root.issueRef('ko2'); + expect(ref1).not.toBe(ref2); + }); + + it('persists refs in baggage', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + + root.issueRef('ko1'); + expect(baggage.has('refs')).toBe(true); + expect(baggage.has('krefToRef')).toBe(true); + }); + }); +}); diff --git a/packages/ocap-kernel/src/vats/system-console-vat.ts b/packages/ocap-kernel/src/vats/system-console-vat.ts new file mode 100644 index 000000000..beb50c373 --- /dev/null +++ b/packages/ocap-kernel/src/vats/system-console-vat.ts @@ -0,0 +1,508 @@ +// eslint-disable-next-line import-x/no-extraneous-dependencies, n/no-extraneous-import -- vat dependency provided by kernel runtime +import { E } from '@endo/eventual-send'; +import { makeDefaultExo } from '@metamask/kernel-utils/exo'; + +import type { + Baggage, + ClusterConfig, + KernelStatus, + Subcluster, + SubclusterLaunchResult, +} from '../types.ts'; + +/** + * Kernel facet interface for system vat operations. + */ +type KernelFacet = { + getStatus: () => Promise; + getSubclusters: () => Promise; + invokeMethod: ( + target: string, + method: string, + args: unknown[], + ) => Promise; + launchSubcluster: (config: ClusterConfig) => Promise; + terminateSubcluster: (subclusterId: string) => Promise; +}; + +/** + * Services provided to the system console vat during bootstrap. + */ +type BootstrapServices = { + kernelFacet?: KernelFacet; + console?: IOService; +}; + +/** + * IO service interface for reading and writing lines. + */ +type IOService = { + read: () => Promise; + write: (data: string) => Promise; +}; + +/** + * A JSON request from the CLI. + */ +type Request = { + ref?: string; + method: string; + args?: unknown[]; +}; + +/** + * Build function for the system console vat. + * + * This vat manages the REPL loop over an IO channel, dispatching CLI + * commands and managing refs (capability references) in persistent baggage. + * + * @param _vatPowers - The vat powers (unused). + * @param _parameters - The vat parameters (unused). + * @param _parameters.name - Optional name for the console vat. + * @param baggage - The vat's persistent baggage storage. + * @returns The root object for the new vat. + */ +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export function buildRootObject( + _vatPowers: unknown, + _parameters: { name?: string }, + baggage: Baggage, +) { + /** + * Get a value from baggage, or return a fallback if the key is absent. + * + * @param key - The baggage key. + * @param fallback - The value to return if the key is absent. + * @returns The stored value or the fallback. + */ + function baggageGet(key: string, fallback: T): T { + return baggage.has(key) ? (baggage.get(key) as T) : fallback; + } + + /** + * Set a value in baggage, initialising the key if it doesn't exist. + * + * @param key - The baggage key. + * @param value - The value to store. + */ + function baggagePut(key: string, value: unknown): void { + if (baggage.has(key)) { + baggage.set(key, value); + } else { + baggage.init(key, value); + } + } + + // Monotonic counter for generating unique ref identifiers (persisted in baggage) + let refCounter: number = baggageGet('refCounter', 0); + // Restore kernel facet from baggage if available (for resuscitation) + let kernelFacet: KernelFacet | undefined = baggageGet< + KernelFacet | undefined + >('kernelFacet', undefined); + + // Track which kref is the root's own, so isSelf-ref dispatch avoids kernel round-trip + let selfKref: string | undefined = baggageGet( + 'selfKref', + undefined, + ); + + // Ref manager state in baggage: ref → kref and kref → ref maps + // Stored as plain objects since baggage serializes them + const refs: Record = baggageGet( + 'refs', + {} as Record, + ); + const krefToRef: Record = baggageGet( + 'krefToRef', + {} as Record, + ); + + /** + * Persist the current ref state to baggage. + */ + function persistRefs(): void { + baggagePut('refs', harden({ ...refs })); + baggagePut('krefToRef', harden({ ...krefToRef })); + } + + /** + * Issue a ref for a kref. If the kref already has a ref, return it. + * + * @param kref - The kernel reference. + * @param isSelf - If true, marks this kref as the root's own for direct dispatch. + * @returns The issued ref. + */ + function issueRef(kref: string, isSelf?: boolean): string { + if (isSelf) { + selfKref = kref; + baggagePut('selfKref', selfKref); + } + const existing = krefToRef[kref]; + if (existing) { + return existing; + } + refCounter += 1; + baggagePut('refCounter', refCounter); + const ref = `d-${refCounter}`; + refs[ref] = kref; + krefToRef[kref] = ref; + persistRefs(); + return ref; + } + + /** + * Look up the kref for a ref. + * + * @param ref - The ref to look up. + * @returns The kref, or undefined if not found. + */ + function lookupKref(ref: string): string | undefined { + return refs[ref]; + } + + /** + * Revoke a ref, removing it from both maps. + * + * @param ref - The ref to revoke. + * @returns True if the ref was found and revoked. + */ + function revokeRef(ref: string): boolean { + const kref = refs[ref]; + if (!kref) { + return false; + } + delete refs[ref]; + delete krefToRef[kref]; + persistRefs(); + return true; + } + + /** + * List all issued refs. + * + * @returns Array of ref/kref pairs. + */ + function listRefs(): { ref: string; kref: string }[] { + return Object.entries(refs).map(([ref, kref]) => ({ ref, kref })); + } + + /** + * Get the kernel facet, throwing if not yet bootstrapped. + * + * @returns The kernel facet. + */ + function requireKernelFacet(): KernelFacet { + if (!kernelFacet) { + throw new Error('Kernel facet not available (bootstrap not called?)'); + } + return kernelFacet; + } + + /** + * Dispatch a method call on the root exo directly, bypassing the kernel. + * + * @param method - The method name. + * @param args - The method arguments. + * @returns The result of the method call. + */ + function dispatchOnSelf(method: string, args: unknown[]): unknown { + // eslint-disable-next-line @typescript-eslint/no-use-before-define + const fn = root[method as keyof typeof root] as + | ((...a: unknown[]) => unknown) + | undefined; + if (typeof fn !== 'function') { + throw new Error(`Unknown method on root: ${method}`); + } + // eslint-disable-next-line @typescript-eslint/no-use-before-define + return fn.call(root, ...args); + } + + /** + * Dispatch a request that has no ref (daemon-tier commands only). + * + * Only basic liveness commands are available without a capability ref. + * Privileged operations require a ref obtained from the `.ocap` file. + * + * @param method - The method name. + * @param _args - The method arguments (unused for daemon-tier commands). + * @returns The response payload. + */ + async function dispatchConsoleMethod( + method: string, + _args: unknown[], + ): Promise { + switch (method) { + case 'help': + return { + commands: [ + 'help - show available commands', + 'status - daemon status', + ], + }; + + case 'status': + return { running: true }; + + default: + throw new Error(`Unknown command: ${method}`); + } + } + + /** + * Handle a single parsed request and return the response. + * + * @param request - The parsed request. + * @returns The response payload. + */ + async function handleRequest(request: Request): Promise { + const { ref, method, args = [] } = request; + + if (!ref) { + return dispatchConsoleMethod(method, args); + } + + // Ref-based dispatch: resolve ref → kref + const kref = lookupKref(ref); + if (!kref) { + throw new Error(`Unknown ref: ${ref}`); + } + + // Self-ref: dispatch directly to avoid kernel round-trip + if (kref === selfKref) { + return dispatchOnSelf(method, args); + } + + // External ref: dispatch through the kernel's message queue + return E(requireKernelFacet()).invokeMethod(kref, method, args); + } + + /** + * Validate and coerce a parsed JSON value into a {@link Request}. + * + * @param parsed - The raw parsed JSON value. + * @returns The validated request. + */ + function validateRequest(parsed: unknown): Request { + if (typeof parsed !== 'object' || parsed === null) { + throw new Error('Request must be a JSON object'); + } + const obj = parsed as Record; + if (typeof obj.method !== 'string') { + throw new Error('Request must have a string "method" field'); + } + if (obj.ref !== undefined && typeof obj.ref !== 'string') { + throw new Error('"ref" must be a string'); + } + if (obj.args !== undefined && !Array.isArray(obj.args)) { + throw new Error('"args" must be an array'); + } + return { + method: obj.method, + ...(typeof obj.ref === 'string' ? { ref: obj.ref } : {}), + ...(Array.isArray(obj.args) ? { args: obj.args as unknown[] } : {}), + }; + } + + /** + * Run the REPL loop: read a JSON line, dispatch, write response, repeat. + * + * @param ioService - The IO service to read/write from. + */ + async function runReplLoop(ioService: IOService): Promise { + for (;;) { + const line = await E(ioService).read(); + if (line === null) { + // Client disconnected — wait for next connection + continue; + } + + let response: unknown; + try { + const request = validateRequest(JSON.parse(line)); + const result = await handleRequest(request); + response = { ok: true, result }; + } catch (error) { + // Errors crossing vat boundaries may arrive as plain objects. + // Try multiple strategies to extract a human-readable message. + let errorMessage: string; + if (error instanceof Error) { + errorMessage = error.message ?? error.stack ?? String(error); + } else if (typeof error === 'string') { + errorMessage = error; + } else { + try { + errorMessage = JSON.stringify(error); + } catch { + errorMessage = String(error); + } + } + response = { ok: false, error: errorMessage }; + } + + try { + await E(ioService).write(JSON.stringify(response)); + } catch { + // Write failed (client disconnected mid-response) — continue loop + } + } + } + + const root = makeDefaultExo('root', { + /** + * Bootstrap the vat. + * + * @param _vats - The vats object (unused). + * @param services - The services object containing kernelFacet and console IO. + */ + async bootstrap( + _vats: unknown, + services: BootstrapServices, + ): Promise { + if (!kernelFacet && services.kernelFacet) { + kernelFacet = services.kernelFacet; + baggagePut('kernelFacet', kernelFacet); + } + + if (services.console) { + // Fire-and-forget the REPL loop — it runs indefinitely + // eslint-disable-next-line no-console -- vat diagnostic output + runReplLoop(services.console).catch(console.error); + } + }, + + /** + * Issue a ref for a kref. Exposed for the daemon to get the initial console ref. + * + * @param kref - The kernel reference. + * @param isSelf - If true, marks this kref as the root's own for direct dispatch. + * @returns The issued ref. + */ + issueRef(kref: string, isSelf?: boolean): string { + return issueRef(kref, isSelf); + }, + + /** + * Get help information (privileged — lists all available commands). + * + * @returns The help object. + */ + help() { + return harden({ + commands: [ + 'help - show available commands', + 'status - kernel status', + 'subclusters - list subclusters', + 'launch - launch a subcluster', + 'terminate - terminate a subcluster', + 'ls - list all issued refs', + 'revoke - revoke a ref', + 'invoke [...args] - call a method on a ref', + ], + }); + }, + + /** + * Get kernel status. + * + * @returns The kernel status. + */ + async status(): Promise { + return E(requireKernelFacet()).getStatus(); + }, + + /** + * List subclusters. + * + * @returns The subclusters. + */ + async subclusters(): Promise { + return E(requireKernelFacet()).getSubclusters(); + }, + + /** + * Launch a subcluster and issue a ref for its root object. + * + * @param config - The cluster config. + * @returns The issued ref and subcluster ID. + */ + async launch( + config: ClusterConfig, + ): Promise<{ ref: string; subclusterId: string }> { + if (!config) { + throw new Error('launch requires a config argument'); + } + const result = await E(requireKernelFacet()).launchSubcluster(config); + const ref = issueRef(result.rootKref); + return harden({ ref, subclusterId: result.subclusterId }); + }, + + /** + * Terminate a subcluster. + * + * @param subclusterId - The subcluster ID. + * @returns Confirmation. + */ + async terminate(subclusterId: string): Promise<{ ok: true }> { + if (!subclusterId) { + throw new Error('terminate requires a subclusterId argument'); + } + await E(requireKernelFacet()).terminateSubcluster(subclusterId); + return harden({ ok: true as const }); + }, + + /** + * Revoke a ref. + * + * @param ref - The ref to revoke. + * @returns Whether the ref was found and revoked. + */ + revoke(ref: string): { ok: boolean } { + if (!ref) { + throw new Error('revoke requires a ref argument'); + } + return harden({ ok: revokeRef(ref) }); + }, + + /** + * List all issued refs. + * + * @returns Array of ref strings. + */ + ls(): { refs: string[] } { + return harden({ refs: listRefs().map((entry) => entry.ref) }); + }, + + /** + * Invoke a method on a target ref, forwarding pure-data arguments + * through the kernel. + * + * @param targetRef - The ref to invoke the method on. + * @param method - The method name. + * @param args - The method arguments. + * @returns The result of the method call. + */ + async invoke( + targetRef: string, + method: string, + ...args: unknown[] + ): Promise { + if (!targetRef) { + throw new Error('invoke requires a target ref'); + } + if (!method) { + throw new Error('invoke requires a method name'); + } + const kref = lookupKref(targetRef); + if (!kref) { + throw new Error(`Unknown ref: ${targetRef}`); + } + // Self-ref: dispatch directly to avoid kernel round-trip + if (kref === selfKref) { + return dispatchOnSelf(method, args); + } + return E(requireKernelFacet()).invokeMethod(kref, method, args); + }, + }); + + return root; +} diff --git a/yarn.lock b/yarn.lock index cc2c6e1c8..d3b4fa063 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3498,6 +3498,7 @@ __metadata: yargs: "npm:^17.7.2" bin: ocap: ./dist/app.mjs + ok: ./dist/ok.mjs languageName: unknown linkType: soft