From cde2516b27e78409501a91b2f6e4e366b65f7e28 Mon Sep 17 00:00:00 2001 From: grypez <143971198+grypez@users.noreply.github.com> Date: Tue, 17 Feb 2026 07:03:43 -0600 Subject: [PATCH 1/5] refactor(kernel-utils): move ifDefined from kernel-agents to kernel-utils Co-Authored-By: Claude Opus 4.6 --- packages/kernel-agents/src/utils.ts | 12 +---------- packages/kernel-utils/src/index.test.ts | 1 + packages/kernel-utils/src/index.ts | 2 +- packages/kernel-utils/src/misc.test.ts | 27 ++++++++++++++++++++++++- packages/kernel-utils/src/misc.ts | 13 ++++++++++++ 5 files changed, 42 insertions(+), 13 deletions(-) diff --git a/packages/kernel-agents/src/utils.ts b/packages/kernel-agents/src/utils.ts index 6112bfe2b..986afdd99 100644 --- a/packages/kernel-agents/src/utils.ts +++ b/packages/kernel-agents/src/utils.ts @@ -2,17 +2,7 @@ import type { Logger } from '@metamask/logger'; import type { SampleCollector } from './types.ts'; -/** - * Return a new object with the undefined values removed. - * - * @param record - The record to filter. - * @returns The new object with the undefined values removed. - */ -// eslint-disable-next-line @typescript-eslint/explicit-function-return-type -export const ifDefined = (record: Record) => - Object.fromEntries( - Object.entries(record).filter(([_, value]) => value !== undefined), - ); +export { ifDefined } from '@metamask/kernel-utils'; /** * Await a promise, and call the abort callback when done or on error. diff --git a/packages/kernel-utils/src/index.test.ts b/packages/kernel-utils/src/index.test.ts index 67cef419d..7533cd072 100644 --- a/packages/kernel-utils/src/index.test.ts +++ b/packages/kernel-utils/src/index.test.ts @@ -14,6 +14,7 @@ describe('index', () => { 'delay', 'fetchValidatedJson', 'fromHex', + 'ifDefined', 'installWakeDetector', 'isJsonRpcCall', 'isJsonRpcMessage', diff --git a/packages/kernel-utils/src/index.ts b/packages/kernel-utils/src/index.ts index 934437874..09d85af54 100644 --- a/packages/kernel-utils/src/index.ts +++ b/packages/kernel-utils/src/index.ts @@ -3,7 +3,7 @@ export { makeDiscoverableExo } from './discoverable.ts'; export type { DiscoverableExo } from './discoverable.ts'; export type { JsonSchema, MethodSchema } from './schema.ts'; export { fetchValidatedJson } from './fetchValidatedJson.ts'; -export { abortableDelay, delay, makeCounter } from './misc.ts'; +export { abortableDelay, delay, ifDefined, makeCounter } from './misc.ts'; export { stringify } from './stringify.ts'; export { installWakeDetector } from './wake-detector.ts'; export type { WakeDetectorOptions } from './wake-detector.ts'; diff --git a/packages/kernel-utils/src/misc.test.ts b/packages/kernel-utils/src/misc.test.ts index 64ef72d57..83bb3a929 100644 --- a/packages/kernel-utils/src/misc.test.ts +++ b/packages/kernel-utils/src/misc.test.ts @@ -1,7 +1,32 @@ import { AbortError } from '@metamask/kernel-errors'; import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import { abortableDelay, delay, makeCounter } from './misc.ts'; +import { abortableDelay, delay, ifDefined, makeCounter } from './misc.ts'; + +describe('ifDefined', () => { + it('removes undefined values', () => { + expect(ifDefined({ a: 1, b: undefined, c: 3 })).toStrictEqual({ + a: 1, + c: 3, + }); + }); + + it('returns empty object when all values are undefined', () => { + expect(ifDefined({ a: undefined, b: undefined })).toStrictEqual({}); + }); + + it('preserves all values when none are undefined', () => { + expect(ifDefined({ a: 1, b: 'two', c: null })).toStrictEqual({ + a: 1, + b: 'two', + c: null, + }); + }); + + it('returns empty object for empty input', () => { + expect(ifDefined({})).toStrictEqual({}); + }); +}); describe('misc utilities', () => { beforeEach(() => { diff --git a/packages/kernel-utils/src/misc.ts b/packages/kernel-utils/src/misc.ts index 5f5893adb..8c2b30dcf 100644 --- a/packages/kernel-utils/src/misc.ts +++ b/packages/kernel-utils/src/misc.ts @@ -1,5 +1,18 @@ import { AbortError } from '@metamask/kernel-errors'; +/** + * Return a new object with the undefined values removed. + * Useful for building options bags with exact optional property types. + * + * @param record - The record to filter. + * @returns The new object with the undefined values removed. + */ +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export const ifDefined = (record: Record) => + Object.fromEntries( + Object.entries(record).filter(([_, value]) => value !== undefined), + ); + /** * A simple counter which increments and returns when called. * From 9b29f26e74fa5062d0c82ed1fab97ecdbd50bf08 Mon Sep 17 00:00:00 2001 From: grypez <143971198+grypez@users.noreply.github.com> Date: Tue, 17 Feb 2026 07:03:51 -0600 Subject: [PATCH 2/5] feat(ocap-kernel): add console vat and system console vat with IO dispatch The system console vat manages a REPL loop over an IO channel, dispatching CLI commands (help, status, launch, terminate, subclusters, listRefs, revoke) and managing refs in persistent baggage. Refs use a monotonic counter (d-1, d-2, ...) since crypto.randomUUID() is unavailable under SES lockdown. Cross-vat errors are serialized via JSON.stringify fallback for reliable error reporting. Co-Authored-By: Claude Opus 4.6 --- .../src/vats/system-console-vat.test.ts | 383 ++++++++++++++++++ .../src/vats/system-console-vat.ts | 369 +++++++++++++++++ 2 files changed, 752 insertions(+) create mode 100644 packages/ocap-kernel/src/vats/system-console-vat.test.ts create mode 100644 packages/ocap-kernel/src/vats/system-console-vat.ts diff --git a/packages/ocap-kernel/src/vats/system-console-vat.test.ts b/packages/ocap-kernel/src/vats/system-console-vat.test.ts new file mode 100644 index 000000000..7cc1da57d --- /dev/null +++ b/packages/ocap-kernel/src/vats/system-console-vat.test.ts @@ -0,0 +1,383 @@ +import { makeDefaultExo } from '@metamask/kernel-utils/exo'; +import { describe, it, expect, beforeEach } from 'vitest'; + +import { buildRootObject } from './system-console-vat.ts'; + +/** + * Create a mock baggage store. + * + * @returns A mock baggage with has/get/set/init methods. + */ +function makeMockBaggage() { + const store = new Map(); + return { + has: (key: string) => store.has(key), + get: (key: string) => store.get(key), + set: (key: string, value: unknown) => store.set(key, value), + init: (key: string, value: unknown) => { + if (store.has(key)) { + throw new Error(`Key already exists: ${key}`); + } + store.set(key, value); + }, + }; +} + +/** + * Create a mock IO service with controllable read queue. + * + * @returns Mock IO service and control functions. + */ +function makeMockIOService() { + const readQueue: (string | null)[] = []; + const pendingReads: ((value: string | null) => void)[] = []; + const written: string[] = []; + + return { + ioService: makeDefaultExo('mockIOService', { + async read() { + const queued = readQueue.shift(); + if (queued !== undefined) { + return queued; + } + return new Promise((resolve) => { + pendingReads.push(resolve); + }); + }, + async write(data: string) { + written.push(data); + }, + }), + deliverLine(line: string): void { + const reader = pendingReads.shift(); + if (reader) { + reader(line); + } else { + readQueue.push(line); + } + }, + deliverEOF(): void { + const reader = pendingReads.shift(); + if (reader) { + reader(null); + } else { + readQueue.push(null); + } + }, + get written() { + return written; + }, + }; +} + +/** + * Create a mock kernel facet using plain functions (not vi.fn) to avoid + * SES lockdown issues with frozen mock internals. + * + * @returns A mock kernel facet and call trackers. + */ +function makeMockKernelFacet() { + const calls: Record = { + getStatus: [], + getSubclusters: [], + launchSubcluster: [], + terminateSubcluster: [], + }; + + const facet = makeDefaultExo('mockKernelFacet', { + async getStatus(...args: unknown[]) { + calls.getStatus.push(args); + return { + incarnation: 1, + subclusters: 0, + vats: 1, + pendingMessages: 0, + }; + }, + async getSubclusters(...args: unknown[]) { + calls.getSubclusters.push(args); + return []; + }, + async launchSubcluster(...args: unknown[]) { + calls.launchSubcluster.push(args); + return { + subclusterId: 'sub-1', + rootKref: 'ko1', + bootstrapResult: undefined, + }; + }, + async terminateSubcluster(...args: unknown[]) { + calls.terminateSubcluster.push(args); + }, + }); + + return { facet, calls }; +} + +describe('system-console-vat', () => { + let baggage: ReturnType; + let kernelFacet: ReturnType; + let io: ReturnType; + + beforeEach(() => { + baggage = makeMockBaggage(); + kernelFacet = makeMockKernelFacet(); + io = makeMockIOService(); + }); + + describe('bootstrap', () => { + it('stores kernel facet in baggage', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + expect(baggage.has('kernelFacet')).toBe(true); + }); + + it('starts REPL loop when console IO service is provided', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap( + {}, + { + kernelFacet: kernelFacet.facet, + console: io.ioService, + }, + ); + + // Give it a tick to start + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Send a help command — if the REPL loop is running, it will respond + io.deliverLine(JSON.stringify({ method: 'help' })); + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(io.written.length).toBeGreaterThan(0); + }); + }); + + describe('REPL dispatch', () => { + async function setupRepl() { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap( + {}, + { + kernelFacet: kernelFacet.facet, + console: io.ioService, + }, + ); + await new Promise((resolve) => setTimeout(resolve, 10)); + return root; + } + + async function sendRequest(request: Record) { + io.deliverLine(JSON.stringify(request)); + await new Promise((resolve) => setTimeout(resolve, 50)); + const lastWrite = io.written[io.written.length - 1]; + return JSON.parse(lastWrite!) as { + ok: boolean; + result?: unknown; + error?: string; + }; + } + + it('dispatches help command', async () => { + await setupRepl(); + const response = await sendRequest({ method: 'help' }); + + expect(response.ok).toBe(true); + expect(response.result).toStrictEqual({ + commands: expect.arrayContaining([ + expect.stringContaining('help'), + expect.stringContaining('status'), + ]), + }); + }); + + it('dispatches status command', async () => { + await setupRepl(); + const response = await sendRequest({ method: 'status' }); + + expect(response.ok).toBe(true); + expect(kernelFacet.calls.getStatus).toHaveLength(1); + }); + + it('dispatches subclusters command', async () => { + await setupRepl(); + const response = await sendRequest({ method: 'subclusters' }); + + expect(response.ok).toBe(true); + expect(kernelFacet.calls.getSubclusters).toHaveLength(1); + }); + + it('dispatches launch command and issues ref', async () => { + await setupRepl(); + const config = { + bootstrap: 'test', + vats: { test: { bundleSpec: 'test-bundle' } }, + }; + const response = await sendRequest({ method: 'launch', args: [config] }); + + expect(response.ok).toBe(true); + const result = response.result as { ref: string; subclusterId: string }; + expect(result.ref).toMatch(/^d-\d+$/u); + expect(result.subclusterId).toBe('sub-1'); + expect(kernelFacet.calls.launchSubcluster).toHaveLength(1); + }); + + it('dispatches terminate command', async () => { + await setupRepl(); + const response = await sendRequest({ + method: 'terminate', + args: ['sub-1'], + }); + + expect(response.ok).toBe(true); + expect(kernelFacet.calls.terminateSubcluster).toHaveLength(1); + }); + + it('dispatches revoke command', async () => { + await setupRepl(); + + // First launch to get a ref + const launchResponse = await sendRequest({ + method: 'launch', + args: [{ bootstrap: 'x', vats: { x: { bundleSpec: 'x' } } }], + }); + const { ref } = launchResponse.result as { ref: string }; + + // Revoke the ref + const response = await sendRequest({ method: 'revoke', args: [ref] }); + expect(response).toStrictEqual({ ok: true, result: { ok: true } }); + }); + + it('dispatches listRefs command', async () => { + await setupRepl(); + + // Launch to create a ref + await sendRequest({ + method: 'launch', + args: [{ bootstrap: 'x', vats: { x: { bundleSpec: 'x' } } }], + }); + + const response = await sendRequest({ method: 'listRefs' }); + expect(response.ok).toBe(true); + const result = response.result as { + refs: { ref: string; kref: string }[]; + }; + expect(result.refs).toHaveLength(1); + expect(result.refs[0]!.kref).toBe('ko1'); + }); + + it('returns error for unknown command', async () => { + await setupRepl(); + const response = await sendRequest({ method: 'bogus' }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown command'); + }); + + it('returns error for invalid JSON', async () => { + await setupRepl(); + io.deliverLine('not json'); + await new Promise((resolve) => setTimeout(resolve, 50)); + + const lastWrite = io.written[io.written.length - 1]; + const response = JSON.parse(lastWrite!) as { ok: boolean; error: string }; + expect(response.ok).toBe(false); + expect(response.error).toBeDefined(); + }); + + it('continues after EOF (client disconnect)', async () => { + await setupRepl(); + + // Send a command + const response1 = await sendRequest({ method: 'help' }); + expect(response1.ok).toBe(true); + + // Simulate disconnect + io.deliverEOF(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Send another command (new connection) + const response2 = await sendRequest({ method: 'status' }); + expect(response2.ok).toBe(true); + }); + }); + + describe('ref manager', () => { + it('issues idempotent refs for the same kref', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + + const ref1 = root.issueRef('ko1'); + const ref2 = root.issueRef('ko1'); + expect(ref1).toBe(ref2); + expect(ref1).toMatch(/^d-\d+$/u); + }); + + it('issues different refs for different krefs', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + + const ref1 = root.issueRef('ko1'); + const ref2 = root.issueRef('ko2'); + expect(ref1).not.toBe(ref2); + }); + + it('persists refs in baggage', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + + root.issueRef('ko1'); + expect(baggage.has('refs')).toBe(true); + expect(baggage.has('krefToRef')).toBe(true); + }); + + it('lists issued refs', async () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + + const ref = root.issueRef('ko1'); + const refList = root.listRefs(); + expect(refList).toStrictEqual([{ ref, kref: 'ko1' }]); + }); + }); + + describe('help', () => { + it('returns command list', () => { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + const result = root.help(); + expect(result).toHaveProperty('commands'); + expect(result.commands.length).toBeGreaterThan(0); + }); + }); +}); diff --git a/packages/ocap-kernel/src/vats/system-console-vat.ts b/packages/ocap-kernel/src/vats/system-console-vat.ts new file mode 100644 index 000000000..020a797ef --- /dev/null +++ b/packages/ocap-kernel/src/vats/system-console-vat.ts @@ -0,0 +1,369 @@ +// eslint-disable-next-line import-x/no-extraneous-dependencies, n/no-extraneous-import -- vat dependency provided by kernel runtime +import { E } from '@endo/eventual-send'; +import { makeDefaultExo } from '@metamask/kernel-utils/exo'; + +import type { + Baggage, + ClusterConfig, + KernelStatus, + Subcluster, + SubclusterLaunchResult, +} from '../types.ts'; + +/** + * Kernel facet interface for system vat operations. + */ +type KernelFacet = { + getStatus: () => Promise; + getSubclusters: () => Promise; + launchSubcluster: (config: ClusterConfig) => Promise; + terminateSubcluster: (subclusterId: string) => Promise; + queueMessage: ( + target: string, + method: string, + args: unknown[], + ) => Promise; +}; + +/** + * Services provided to the system console vat during bootstrap. + */ +type BootstrapServices = { + kernelFacet?: KernelFacet; + console?: IOService; +}; + +/** + * IO service interface for reading and writing lines. + */ +type IOService = { + read: () => Promise; + write: (data: string) => Promise; +}; + +/** + * A JSON request from the CLI. + */ +type Request = { + ref?: string; + method: string; + args?: unknown[]; +}; + +/** + * Build function for the system console vat. + * + * This vat manages the REPL loop over an IO channel, dispatching CLI + * commands and managing refs (capability references) in persistent baggage. + * + * @param _vatPowers - The vat powers (unused). + * @param _parameters - The vat parameters (unused). + * @param _parameters.name - Optional name for the console vat. + * @param baggage - The vat's persistent baggage storage. + * @returns The root object for the new vat. + */ +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export function buildRootObject( + _vatPowers: unknown, + _parameters: { name?: string }, + baggage: Baggage, +) { + // Monotonic counter for generating unique ref identifiers (persisted in baggage) + let refCounter: number = baggage.has('refCounter') + ? (baggage.get('refCounter') as number) + : 0; + // Restore kernel facet from baggage if available (for resuscitation) + let kernelFacet: KernelFacet | undefined = baggage.has('kernelFacet') + ? (baggage.get('kernelFacet') as KernelFacet) + : undefined; + + // Ref manager state in baggage: ref → kref and kref → ref maps + // Stored as plain objects since baggage serializes them + const refs: Record = baggage.has('refs') + ? (baggage.get('refs') as Record) + : {}; + const krefToRef: Record = baggage.has('krefToRef') + ? (baggage.get('krefToRef') as Record) + : {}; + + /** + * Persist the current ref state to baggage. + */ + function persistRefs(): void { + if (baggage.has('refs')) { + baggage.set('refs', harden({ ...refs })); + } else { + baggage.init('refs', harden({ ...refs })); + } + if (baggage.has('krefToRef')) { + baggage.set('krefToRef', harden({ ...krefToRef })); + } else { + baggage.init('krefToRef', harden({ ...krefToRef })); + } + } + + /** + * Issue a ref for a kref. If the kref already has a ref, return it. + * + * @param kref - The kernel reference. + * @returns The issued ref. + */ + function issueRef(kref: string): string { + const existing = krefToRef[kref]; + if (existing) { + return existing; + } + refCounter += 1; + if (baggage.has('refCounter')) { + baggage.set('refCounter', refCounter); + } else { + baggage.init('refCounter', refCounter); + } + const ref = `d-${refCounter}`; + refs[ref] = kref; + krefToRef[kref] = ref; + persistRefs(); + return ref; + } + + /** + * Look up the kref for a ref. + * + * @param ref - The ref to look up. + * @returns The kref, or undefined if not found. + */ + function lookupKref(ref: string): string | undefined { + return refs[ref]; + } + + /** + * Revoke a ref, removing it from both maps. + * + * @param ref - The ref to revoke. + * @returns True if the ref was found and revoked. + */ + function revokeRef(ref: string): boolean { + const kref = refs[ref]; + if (!kref) { + return false; + } + delete refs[ref]; + delete krefToRef[kref]; + persistRefs(); + return true; + } + + /** + * List all issued refs. + * + * @returns Array of ref/kref pairs. + */ + function listRefs(): { ref: string; kref: string }[] { + return Object.entries(refs).map(([ref, kref]) => ({ ref, kref })); + } + + /** + * Get the kernel facet, throwing if not yet bootstrapped. + * + * @returns The kernel facet. + */ + function requireKernelFacet(): KernelFacet { + if (!kernelFacet) { + throw new Error('Kernel facet not available (bootstrap not called?)'); + } + return kernelFacet; + } + + /** + * Dispatch a request that has no ref (operates on the system console itself). + * + * @param method - The method name. + * @param args - The method arguments. + * @returns The response payload. + */ + async function dispatchConsoleMethod( + method: string, + args: unknown[], + ): Promise { + switch (method) { + case 'help': + return { + commands: [ + 'help - show available commands', + 'status - kernel status', + 'launch - launch a subcluster', + 'terminate - terminate a subcluster', + 'subclusters - list subclusters', + 'revoke - revoke a ref', + 'listRefs - list all issued refs', + ], + }; + + case 'status': + return E(requireKernelFacet()).getStatus(); + + case 'subclusters': + return E(requireKernelFacet()).getSubclusters(); + + case 'launch': { + const config = args[0] as ClusterConfig; + if (!config) { + throw new Error('launch requires a config argument'); + } + const result = await E(requireKernelFacet()).launchSubcluster(config); + const ref = issueRef(result.rootKref); + return { ref, subclusterId: result.subclusterId }; + } + + case 'terminate': { + const subclusterId = args[0] as string; + if (!subclusterId) { + throw new Error('terminate requires a subclusterId argument'); + } + await E(requireKernelFacet()).terminateSubcluster(subclusterId); + return { ok: true }; + } + + case 'revoke': { + const ref = args[0] as string; + if (!ref) { + throw new Error('revoke requires a ref argument'); + } + return { ok: revokeRef(ref) }; + } + + case 'listRefs': + return { refs: listRefs() }; + + default: + throw new Error(`Unknown command: ${method}`); + } + } + + /** + * Handle a single parsed request and return the response. + * + * @param request - The parsed request. + * @returns The response payload. + */ + async function handleRequest(request: Request): Promise { + const { ref, method, args = [] } = request; + + if (!ref) { + return dispatchConsoleMethod(method, args); + } + + // Ref-based dispatch: resolve ref → kref, then queue message + const kref = lookupKref(ref); + if (!kref) { + throw new Error(`Unknown ref: ${ref}`); + } + return E(requireKernelFacet()).queueMessage(kref, method, args); + } + + /** + * Run the REPL loop: read a JSON line, dispatch, write response, repeat. + * + * @param ioService - The IO service to read/write from. + */ + async function runReplLoop(ioService: IOService): Promise { + for (;;) { + const line = await E(ioService).read(); + if (line === null) { + // Client disconnected — wait for next connection + continue; + } + + let response: unknown; + try { + const request = JSON.parse(line) as Request; + const result = await handleRequest(request); + response = { ok: true, result }; + } catch (error) { + // Errors crossing vat boundaries may arrive as plain objects. + // Try multiple strategies to extract a human-readable message. + let errorMessage: string; + if (error instanceof Error) { + errorMessage = error.message ?? error.stack ?? String(error); + } else if (typeof error === 'string') { + errorMessage = error; + } else { + try { + errorMessage = JSON.stringify(error); + } catch { + errorMessage = String(error); + } + } + response = { ok: false, error: errorMessage }; + } + + try { + await E(ioService).write(JSON.stringify(response)); + } catch { + // Write failed (client disconnected mid-response) — continue loop + } + } + } + + return makeDefaultExo('root', { + /** + * Bootstrap the vat. + * + * @param _vats - The vats object (unused). + * @param services - The services object containing kernelFacet and console IO. + */ + async bootstrap( + _vats: unknown, + services: BootstrapServices, + ): Promise { + if (!kernelFacet && services.kernelFacet) { + kernelFacet = services.kernelFacet; + baggage.init('kernelFacet', kernelFacet); + } + + if (services.console) { + // Fire-and-forget the REPL loop — it runs indefinitely + // eslint-disable-next-line no-console -- vat diagnostic output + runReplLoop(services.console).catch(console.error); + } + }, + + /** + * Get help information. + * + * @returns The help object. + */ + help() { + return harden({ + commands: [ + 'help - show available commands', + 'status - kernel status', + 'launch - launch a subcluster', + 'terminate - terminate a subcluster', + 'subclusters - list subclusters', + 'revoke - revoke a ref', + 'listRefs - list all issued refs', + ], + }); + }, + + /** + * Issue a ref for a kref. Exposed for the daemon to get the initial console ref. + * + * @param kref - The kernel reference. + * @returns The issued ref. + */ + issueRef(kref: string): string { + return issueRef(kref); + }, + + /** + * List all issued refs. + * + * @returns Array of ref/kref pairs. + */ + listRefs(): { ref: string; kref: string }[] { + return listRefs(); + }, + }); +} From 8e9367f63e2e15842bec4e181d5636d7d6359097 Mon Sep 17 00:00:00 2001 From: grypez <143971198+grypez@users.noreply.github.com> Date: Tue, 17 Feb 2026 07:04:00 -0600 Subject: [PATCH 3/5] feat(nodejs): add daemon orchestration with IO channel support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add startDaemon() which boots a kernel with a system console vat listening on a UNIX domain socket IO channel. The kernel process IS the daemon — no separate HTTP server. Includes socket channel fix to block reads when no client is connected, flush-daemon utility, and e2e tests for the full daemon stack protocol. Co-Authored-By: Claude Opus 4.6 --- packages/nodejs/src/daemon/flush-daemon.ts | 31 ++ .../nodejs/src/daemon/start-daemon.test.ts | 104 +++++ packages/nodejs/src/daemon/start-daemon.ts | 107 ++++++ packages/nodejs/src/index.ts | 7 + packages/nodejs/src/io/socket-channel.test.ts | 14 +- packages/nodejs/src/io/socket-channel.ts | 4 +- packages/nodejs/src/kernel/make-kernel.ts | 9 +- packages/nodejs/test/e2e/daemon-stack.test.ts | 356 ++++++++++++++++++ packages/nodejs/test/helpers/kernel.ts | 11 +- .../nodejs/test/vats/system-console-vat.ts | 288 ++++++++++++++ 10 files changed, 923 insertions(+), 8 deletions(-) create mode 100644 packages/nodejs/src/daemon/flush-daemon.ts create mode 100644 packages/nodejs/src/daemon/start-daemon.test.ts create mode 100644 packages/nodejs/src/daemon/start-daemon.ts create mode 100644 packages/nodejs/test/e2e/daemon-stack.test.ts create mode 100644 packages/nodejs/test/vats/system-console-vat.ts diff --git a/packages/nodejs/src/daemon/flush-daemon.ts b/packages/nodejs/src/daemon/flush-daemon.ts new file mode 100644 index 000000000..cf4b2adbe --- /dev/null +++ b/packages/nodejs/src/daemon/flush-daemon.ts @@ -0,0 +1,31 @@ +import { rm } from 'node:fs/promises'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; + +/** + * Options for flushing daemon state. + */ +export type FlushDaemonOptions = { + /** UNIX socket path. Defaults to ~/.ocap/console.sock. */ + socketPath?: string; + /** SQLite database filename. Defaults to ~/.ocap/kernel.sqlite. */ + dbFilename?: string; +}; + +/** + * Delete all daemon state: kernel DB, bundles cache, and socket. + * + * @param options - Optional overrides for file paths. + */ +export async function flushDaemon(options?: FlushDaemonOptions): Promise { + const ocapDir = join(homedir(), '.ocap'); + const socketPath = options?.socketPath ?? join(ocapDir, 'console.sock'); + const dbFilename = options?.dbFilename ?? join(ocapDir, 'kernel.sqlite'); + const bundlesDir = join(ocapDir, 'bundles'); + + await Promise.all([ + rm(dbFilename, { force: true }), + rm(socketPath, { force: true }), + rm(bundlesDir, { recursive: true, force: true }), + ]); +} diff --git a/packages/nodejs/src/daemon/start-daemon.test.ts b/packages/nodejs/src/daemon/start-daemon.test.ts new file mode 100644 index 000000000..21e027f56 --- /dev/null +++ b/packages/nodejs/src/daemon/start-daemon.test.ts @@ -0,0 +1,104 @@ +import { vi, describe, it, expect, afterEach } from 'vitest'; + +import { startDaemon } from './start-daemon.ts'; +import type { DaemonHandle } from './start-daemon.ts'; + +// Mock makeKernel to avoid real kernel creation +vi.mock('../kernel/make-kernel.ts', () => ({ + makeKernel: vi.fn().mockResolvedValue({ + initIdentity: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + }), +})); + +// Mock filesystem operations +vi.mock('node:fs/promises', async () => { + const actual = + await vi.importActual( + 'node:fs/promises', + ); + return { + ...actual, + mkdir: vi.fn().mockResolvedValue(undefined), + }; +}); + +describe('startDaemon', () => { + let handle: DaemonHandle | undefined; + + afterEach(async () => { + if (handle) { + const toClose = handle; + handle = undefined; + await toClose.close(); + } + }); + + it('creates kernel with IO-based system subcluster config', async () => { + const { makeKernel } = await import('../kernel/make-kernel.ts'); + const mockedMakeKernel = vi.mocked(makeKernel); + + const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; + + handle = await startDaemon({ + systemConsoleBundleSpec: 'http://localhost/bundle', + systemConsoleName: 'my-console', + socketPath: tmpSocket, + }); + + expect(mockedMakeKernel).toHaveBeenCalledWith( + expect.objectContaining({ + systemSubclusters: [ + { + name: 'my-console', + config: { + bootstrap: 'my-console', + io: { + console: { + type: 'socket', + path: tmpSocket, + }, + }, + services: ['kernelFacet', 'console'], + vats: { + 'my-console': { + bundleSpec: 'http://localhost/bundle', + parameters: { name: 'my-console' }, + }, + }, + }, + }, + ], + }), + ); + }); + + it('returns socket path and close function', async () => { + const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; + + handle = await startDaemon({ + systemConsoleBundleSpec: 'http://localhost/bundle', + socketPath: tmpSocket, + }); + + expect(handle.socketPath).toBe(tmpSocket); + expect(typeof handle.close).toBe('function'); + expect(handle.kernel).toBeDefined(); + }); + + it('calls kernel.stop on close', async () => { + const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; + + handle = await startDaemon({ + systemConsoleBundleSpec: 'http://localhost/bundle', + socketPath: tmpSocket, + }); + + const { stop } = handle.kernel; + const toClose = handle; + handle = undefined; + await toClose.close(); + + expect(stop).toHaveBeenCalled(); + }); +}); diff --git a/packages/nodejs/src/daemon/start-daemon.ts b/packages/nodejs/src/daemon/start-daemon.ts new file mode 100644 index 000000000..a778ec15f --- /dev/null +++ b/packages/nodejs/src/daemon/start-daemon.ts @@ -0,0 +1,107 @@ +import { ifDefined } from '@metamask/kernel-utils'; +import type { Logger } from '@metamask/logger'; +import type { Kernel, SystemSubclusterConfig } from '@metamask/ocap-kernel'; +import { mkdir } from 'node:fs/promises'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; + +import { makeKernel } from '../kernel/make-kernel.ts'; + +/** + * Options for starting the daemon. + */ +export type StartDaemonOptions = { + /** UNIX socket path for the system console IO channel. Defaults to ~/.ocap/console.sock. */ + socketPath?: string; + /** URL to the bundled system-console-vat. */ + systemConsoleBundleSpec: string; + /** Name for the system console subcluster. Defaults to 'system-console'. */ + systemConsoleName?: string; + /** Path to vat worker file. */ + workerFilePath?: string; + /** SQLite database filename. Defaults to ~/.ocap/kernel.sqlite. */ + dbFilename?: string; + /** If true, clear kernel storage. */ + resetStorage?: boolean; + /** Logger instance. */ + logger?: Logger; + /** Seed for libp2p key generation. */ + keySeed?: string; +}; + +/** + * Handle returned by {@link startDaemon}. + */ +export type DaemonHandle = { + kernel: Kernel; + socketPath: string; + close: () => Promise; +}; + +/** + * Start the OCAP daemon. + * + * Creates a kernel with a system console vat that listens for commands + * on a UNIX domain socket IO channel. The kernel process IS the daemon. + * + * @param options - Configuration options. + * @returns A daemon handle. + */ +export async function startDaemon( + options: StartDaemonOptions, +): Promise { + const { + systemConsoleBundleSpec, + systemConsoleName = 'system-console', + workerFilePath, + resetStorage, + logger, + keySeed, + } = options; + + const ocapDir = join(homedir(), '.ocap'); + await mkdir(ocapDir, { recursive: true }); + + const socketPath = options.socketPath ?? join(ocapDir, 'console.sock'); + const dbFilename = options.dbFilename ?? join(ocapDir, 'kernel.sqlite'); + + // Build system subcluster config with IO channel for the console socket + const systemSubcluster: SystemSubclusterConfig = { + name: systemConsoleName, + config: { + bootstrap: systemConsoleName, + io: { + console: { + type: 'socket' as const, + path: socketPath, + }, + }, + services: ['kernelFacet', 'console'], + vats: { + [systemConsoleName]: { + bundleSpec: systemConsoleBundleSpec, + parameters: { name: systemConsoleName }, + }, + }, + }, + }; + + const kernel = await makeKernel({ + ...ifDefined({ workerFilePath, resetStorage, logger }), + dbFilename, + keySeed, + systemSubclusters: [systemSubcluster], + }); + + await kernel.initIdentity(); + + const close = async (): Promise => { + await kernel.stop(); + }; + + return { + kernel, + socketPath, + close, + }; +} diff --git a/packages/nodejs/src/index.ts b/packages/nodejs/src/index.ts index 49c133fdf..41f78b592 100644 --- a/packages/nodejs/src/index.ts +++ b/packages/nodejs/src/index.ts @@ -2,3 +2,10 @@ export { NodejsPlatformServices } from './kernel/PlatformServices.ts'; export { makeKernel } from './kernel/make-kernel.ts'; export { makeNodeJsVatSupervisor } from './vat/make-supervisor.ts'; export { makeIOChannelFactory, makeSocketIOChannel } from './io/index.ts'; +export { startDaemon } from './daemon/start-daemon.ts'; +export type { + StartDaemonOptions, + DaemonHandle, +} from './daemon/start-daemon.ts'; +export { flushDaemon } from './daemon/flush-daemon.ts'; +export type { FlushDaemonOptions } from './daemon/flush-daemon.ts'; diff --git a/packages/nodejs/src/io/socket-channel.test.ts b/packages/nodejs/src/io/socket-channel.test.ts index d6a773658..fe8bf982c 100644 --- a/packages/nodejs/src/io/socket-channel.test.ts +++ b/packages/nodejs/src/io/socket-channel.test.ts @@ -134,13 +134,21 @@ describe('makeSocketIOChannel', () => { expect(result).toBeNull(); }); - it('returns null when no client is connected', async () => { + it('blocks read until a client connects and sends data', async () => { const socketPath = tempSocketPath(); const channel = await makeSocketIOChannel('test', socketPath); channels.push(channel); - const result = await channel.read(); - expect(result).toBeNull(); + // Start read before any client connects — should block + const readPromise = channel.read(); + + // Connect and send data + const client = await connectToSocket(socketPath); + clients.push(client); + await writeLine(client, 'hello'); + + const result = await readPromise; + expect(result).toBe('hello'); }); it('throws on write when no client is connected', async () => { diff --git a/packages/nodejs/src/io/socket-channel.ts b/packages/nodejs/src/io/socket-channel.ts index c6cce477f..97ebfc46e 100644 --- a/packages/nodejs/src/io/socket-channel.ts +++ b/packages/nodejs/src/io/socket-channel.ts @@ -132,9 +132,7 @@ export async function makeSocketIOChannel( if (queued !== undefined) { return queued; } - if (!currentSocket) { - return null; - } + // Block until data arrives (from a current or future client connection) return new Promise((resolve) => { readerQueue.push({ resolve }); }); diff --git a/packages/nodejs/src/kernel/make-kernel.ts b/packages/nodejs/src/kernel/make-kernel.ts index 00f6353b4..68bd9b4ab 100644 --- a/packages/nodejs/src/kernel/make-kernel.ts +++ b/packages/nodejs/src/kernel/make-kernel.ts @@ -1,7 +1,10 @@ import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; import { Logger } from '@metamask/logger'; import { Kernel } from '@metamask/ocap-kernel'; -import type { IOChannelFactory } from '@metamask/ocap-kernel'; +import type { + IOChannelFactory, + SystemSubclusterConfig, +} from '@metamask/ocap-kernel'; import { NodejsPlatformServices } from './PlatformServices.ts'; import { makeIOChannelFactory } from '../io/index.ts'; @@ -16,6 +19,7 @@ import { makeIOChannelFactory } from '../io/index.ts'; * @param options.logger - The logger to use for the kernel. * @param options.keySeed - Optional seed for libp2p key generation. * @param options.ioChannelFactory - Optional factory for creating IO channels. + * @param options.systemSubclusters - Optional system subcluster configurations. * @returns The kernel, initialized. */ export async function makeKernel({ @@ -25,6 +29,7 @@ export async function makeKernel({ logger, keySeed, ioChannelFactory, + systemSubclusters, }: { workerFilePath?: string; resetStorage?: boolean; @@ -32,6 +37,7 @@ export async function makeKernel({ logger?: Logger; keySeed?: string | undefined; ioChannelFactory?: IOChannelFactory; + systemSubclusters?: SystemSubclusterConfig[]; }): Promise { const rootLogger = logger ?? new Logger('kernel-worker'); const platformServicesClient = new NodejsPlatformServices({ @@ -48,6 +54,7 @@ export async function makeKernel({ logger: rootLogger.subLogger({ tags: ['kernel'] }), keySeed, ioChannelFactory: ioChannelFactory ?? makeIOChannelFactory(), + ...(systemSubclusters ? { systemSubclusters } : {}), }); return kernel; diff --git a/packages/nodejs/test/e2e/daemon-stack.test.ts b/packages/nodejs/test/e2e/daemon-stack.test.ts new file mode 100644 index 000000000..d29b962b7 --- /dev/null +++ b/packages/nodejs/test/e2e/daemon-stack.test.ts @@ -0,0 +1,356 @@ +import type { KernelDatabase } from '@metamask/kernel-store'; +import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; +import { waitUntilQuiescent } from '@metamask/kernel-utils'; +import type { Kernel, IOChannel, IOConfig } from '@metamask/ocap-kernel'; +import * as net from 'node:net'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { pathToFileURL } from 'node:url'; +import { describe, it, expect, afterEach } from 'vitest'; + +import { makeTestKernel } from '../helpers/kernel.ts'; + +const SYSTEM_CONSOLE_NAME = 'system-console'; + +/** + * Generate a unique temp socket path. + * + * @returns A unique socket path. + */ +function tempSocketPath(): string { + return join( + tmpdir(), + `daemon-e2e-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, + ); +} + +/** + * Connect to a UNIX socket. + * + * @param socketPath - The socket path. + * @returns The connected socket. + */ +async function connectToSocket(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const client = net.createConnection(socketPath, () => { + client.removeListener('error', reject); + resolve(client); + }); + client.on('error', reject); + }); +} + +/** + * Write a newline-delimited line to a socket. + * + * @param socket - The socket. + * @param line - The line to write. + */ +async function writeLine(socket: net.Socket, line: string): Promise { + return new Promise((resolve, reject) => { + socket.write(`${line}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +/** + * Read a newline-delimited line from a socket. + * + * @param socket - The socket. + * @returns The line read. + */ +async function readLine(socket: net.Socket): Promise { + return new Promise((resolve) => { + let buffer = ''; + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + socket.removeListener('data', onData); + resolve(buffer.slice(0, idx)); + } + }; + socket.on('data', onData); + }); +} + +/** + * Send a JSON request over a socket and read the JSON response. + * + * @param socketPath - The socket path. + * @param request - The request object. + * @returns The parsed response. + */ +async function sendCommand( + socketPath: string, + request: Record, +): Promise<{ ok: boolean; result?: unknown; error?: string }> { + const socket = await connectToSocket(socketPath); + try { + await writeLine(socket, JSON.stringify(request)); + const responseLine = await readLine(socket); + return JSON.parse(responseLine) as { + ok: boolean; + result?: unknown; + error?: string; + }; + } finally { + socket.destroy(); + } +} + +/** + * Create a test socket IO channel factory. + * + * @returns The factory function. + */ +function makeTestIOChannelFactory() { + const fsPromises = import('node:fs/promises'); + + return async (_name: string, config: IOConfig): Promise => { + if (config.type !== 'socket') { + throw new Error(`unsupported IO type: ${config.type}`); + } + const fs = await fsPromises; + const lineQueue: string[] = []; + const readerQueue: { resolve: (value: string | null) => void }[] = []; + let currentSocket: net.Socket | null = null; + let lineBuffer = ''; + let closed = false; + + function deliverLine(line: string): void { + const reader = readerQueue.shift(); + if (reader) { + reader.resolve(line); + } else { + lineQueue.push(line); + } + } + + function deliverEOF(): void { + while (readerQueue.length > 0) { + readerQueue.shift()?.resolve(null); + } + } + + const server = net.createServer((socket) => { + if (currentSocket) { + socket.destroy(); + return; + } + currentSocket = socket; + lineBuffer = ''; + socket.on('data', (data: Buffer) => { + lineBuffer += data.toString(); + let idx = lineBuffer.indexOf('\n'); + while (idx !== -1) { + deliverLine(lineBuffer.slice(0, idx)); + lineBuffer = lineBuffer.slice(idx + 1); + idx = lineBuffer.indexOf('\n'); + } + }); + socket.on('end', () => { + if (lineBuffer.length > 0) { + deliverLine(lineBuffer); + lineBuffer = ''; + } + currentSocket = null; + deliverEOF(); + }); + socket.on('error', () => { + currentSocket = null; + deliverEOF(); + }); + }); + + try { + await fs.unlink(config.path); + } catch { + // ignore + } + + await new Promise((resolve, reject) => { + server.on('error', reject); + server.listen(config.path, () => { + server.removeListener('error', reject); + resolve(); + }); + }); + + return { + async read() { + if (closed) { + return null; + } + const queued = lineQueue.shift(); + if (queued !== undefined) { + return queued; + } + if (!currentSocket) { + return null; + } + return new Promise((resolve) => { + readerQueue.push({ resolve }); + }); + }, + async write(data: string) { + if (!currentSocket) { + throw new Error('no connected client'); + } + const socket = currentSocket; + return new Promise((resolve, reject) => { + socket.write(`${data}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + }, + async close() { + if (closed) { + return; + } + closed = true; + deliverEOF(); + currentSocket?.destroy(); + currentSocket = null; + await new Promise((resolve) => { + server.close(() => resolve()); + }); + try { + await fs.unlink(config.path); + } catch { + // ignore + } + }, + }; + }; +} + +/** + * Get the bundle spec for the system console vat test bundle. + * + * @returns The bundle spec URL. + */ +function getSystemConsoleBundleSpec(): string { + const bundlePath = join( + import.meta.dirname, + '../vats/system-console-vat.bundle', + ); + return pathToFileURL(bundlePath).href; +} + +describe('Daemon Stack (IO socket protocol)', { timeout: 30_000 }, () => { + let kernel: Kernel | undefined; + let kernelDatabase: KernelDatabase | undefined; + + /** + * Boot a kernel with a system console subcluster using IO socket. + * + * @returns The socket path. + */ + async function bootDaemonStack(): Promise { + const socketPath = tempSocketPath(); + + kernelDatabase = await makeSQLKernelDatabase({ dbFilename: ':memory:' }); + kernel = await makeTestKernel(kernelDatabase, { + ioChannelFactory: makeTestIOChannelFactory(), + systemSubclusters: [ + { + name: SYSTEM_CONSOLE_NAME, + config: { + bootstrap: SYSTEM_CONSOLE_NAME, + io: { + console: { + type: 'socket' as const, + path: socketPath, + }, + }, + services: ['kernelFacet', 'console'], + vats: { + [SYSTEM_CONSOLE_NAME]: { + bundleSpec: getSystemConsoleBundleSpec(), + parameters: { name: SYSTEM_CONSOLE_NAME }, + }, + }, + }, + }, + ], + }); + + await kernel.initIdentity(); + await waitUntilQuiescent(100); + + return socketPath; + } + + afterEach(async () => { + if (kernel) { + const stopResult = kernel.stop(); + kernel = undefined; + await stopResult; + } + if (kernelDatabase) { + kernelDatabase.close(); + kernelDatabase = undefined; + } + }); + + it('dispatches help command via socket', async () => { + const socketPath = await bootDaemonStack(); + + const response = await sendCommand(socketPath, { method: 'help' }); + + expect(response.ok).toBe(true); + const result = response.result as { commands: string[] }; + expect(result.commands).toBeDefined(); + expect(result.commands.length).toBeGreaterThan(0); + expect(result.commands.some((cmd) => cmd.includes('help'))).toBe(true); + expect(result.commands.some((cmd) => cmd.includes('status'))).toBe(true); + }); + + it('dispatches status command via socket', async () => { + const socketPath = await bootDaemonStack(); + + const response = await sendCommand(socketPath, { method: 'status' }); + + expect(response.ok).toBe(true); + }); + + it('dispatches listRefs command', async () => { + const socketPath = await bootDaemonStack(); + + const response = await sendCommand(socketPath, { method: 'listRefs' }); + + expect(response.ok).toBe(true); + const result = response.result as { refs: { ref: string; kref: string }[] }; + expect(result.refs).toBeDefined(); + expect(Array.isArray(result.refs)).toBe(true); + }); + + it('returns error for unknown command', async () => { + const socketPath = await bootDaemonStack(); + + const response = await sendCommand(socketPath, { method: 'nonexistent' }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown command'); + }); + + it('handles sequential requests on separate connections', async () => { + const socketPath = await bootDaemonStack(); + + const response1 = await sendCommand(socketPath, { method: 'help' }); + expect(response1.ok).toBe(true); + + const response2 = await sendCommand(socketPath, { method: 'status' }); + expect(response2.ok).toBe(true); + }); +}); diff --git a/packages/nodejs/test/helpers/kernel.ts b/packages/nodejs/test/helpers/kernel.ts index 524b17bb7..1595897c1 100644 --- a/packages/nodejs/test/helpers/kernel.ts +++ b/packages/nodejs/test/helpers/kernel.ts @@ -4,6 +4,7 @@ import { Logger } from '@metamask/logger'; import { Kernel, kunser } from '@metamask/ocap-kernel'; import type { ClusterConfig, + IOChannelFactory, SystemSubclusterConfig, } from '@metamask/ocap-kernel'; @@ -13,6 +14,7 @@ type MakeTestKernelOptions = { resetStorage?: boolean; mnemonic?: string; systemSubclusters?: SystemSubclusterConfig[]; + ioChannelFactory?: IOChannelFactory; }; /** @@ -24,13 +26,19 @@ type MakeTestKernelOptions = { * @param options.resetStorage - Whether to reset the storage (default: true). * @param options.mnemonic - Optional BIP39 mnemonic string. * @param options.systemSubclusters - Optional system subcluster configurations. + * @param options.ioChannelFactory - Optional IO channel factory. * @returns The kernel. */ export async function makeTestKernel( kernelDatabase: KernelDatabase, options: MakeTestKernelOptions = {}, ): Promise { - const { resetStorage = true, mnemonic, systemSubclusters } = options; + const { + resetStorage = true, + mnemonic, + systemSubclusters, + ioChannelFactory, + } = options; const logger = new Logger('test-kernel'); const platformServices = new NodejsPlatformServices({ @@ -40,6 +48,7 @@ export async function makeTestKernel( resetStorage, mnemonic, systemSubclusters, + ioChannelFactory, logger: logger.subLogger({ tags: ['kernel'] }), }); diff --git a/packages/nodejs/test/vats/system-console-vat.ts b/packages/nodejs/test/vats/system-console-vat.ts new file mode 100644 index 000000000..905538bad --- /dev/null +++ b/packages/nodejs/test/vats/system-console-vat.ts @@ -0,0 +1,288 @@ +import { E } from '@endo/eventual-send'; +import { makeDefaultExo } from '@metamask/kernel-utils/exo'; +import type { + Baggage, + ClusterConfig, + KernelStatus, + Subcluster, + SubclusterLaunchResult, +} from '@metamask/ocap-kernel'; + +/** + * Kernel facet interface for system vat operations. + */ +type KernelFacet = { + getStatus: () => Promise; + getSubclusters: () => Promise; + launchSubcluster: (config: ClusterConfig) => Promise; + terminateSubcluster: (subclusterId: string) => Promise; + queueMessage: ( + target: string, + method: string, + args: unknown[], + ) => Promise; +}; + +/** + * Services provided to the system console vat during bootstrap. + */ +type BootstrapServices = { + kernelFacet?: KernelFacet; + console?: IOService; +}; + +/** + * IO service interface for reading and writing lines. + */ +type IOService = { + read: () => Promise; + write: (data: string) => Promise; +}; + +/** + * A JSON request from the CLI. + */ +type Request = { + ref?: string; + method: string; + args?: unknown[]; +}; + +/** + * Build function for the system console vat. + * + * This vat manages the REPL loop over an IO channel, dispatching CLI + * commands and managing refs (capability references) in persistent baggage. + * + * @param _vatPowers - The vat powers (unused). + * @param parameters - The vat parameters. + * @param parameters.name - Optional name for the console vat. + * @param baggage - The vat's persistent baggage storage. + * @returns The root object for the new vat. + */ +export function buildRootObject( + _vatPowers: unknown, + parameters: { name?: string }, + baggage: Baggage, +) { + const name = parameters.name ?? 'system-console'; + + // Monotonic counter for generating unique ref identifiers (persisted in baggage) + let refCounter: number = baggage.has('refCounter') + ? (baggage.get('refCounter') as number) + : 0; + + // Restore kernel facet from baggage if available (for resuscitation) + let kernelFacet: KernelFacet | undefined = baggage.has('kernelFacet') + ? (baggage.get('kernelFacet') as KernelFacet) + : undefined; + + // Ref manager state in baggage + const refs: Record = baggage.has('refs') + ? (baggage.get('refs') as Record) + : {}; + const krefToRef: Record = baggage.has('krefToRef') + ? (baggage.get('krefToRef') as Record) + : {}; + + function persistRefs(): void { + if (baggage.has('refs')) { + baggage.set('refs', harden({ ...refs })); + } else { + baggage.init('refs', harden({ ...refs })); + } + if (baggage.has('krefToRef')) { + baggage.set('krefToRef', harden({ ...krefToRef })); + } else { + baggage.init('krefToRef', harden({ ...krefToRef })); + } + } + + function issueRef(kref: string): string { + const existing = krefToRef[kref]; + if (existing) { + return existing; + } + refCounter += 1; + if (baggage.has('refCounter')) { + baggage.set('refCounter', refCounter); + } else { + baggage.init('refCounter', refCounter); + } + const ref = `d-${refCounter}`; + refs[ref] = kref; + krefToRef[kref] = ref; + persistRefs(); + return ref; + } + + function lookupKref(ref: string): string | undefined { + return refs[ref]; + } + + function revokeRef(ref: string): boolean { + const kref = refs[ref]; + if (!kref) { + return false; + } + delete refs[ref]; + delete krefToRef[kref]; + persistRefs(); + return true; + } + + function listRefs(): { ref: string; kref: string }[] { + return Object.entries(refs).map(([ref, kref]) => ({ ref, kref })); + } + + async function dispatchConsoleMethod( + method: string, + args: unknown[], + ): Promise { + switch (method) { + case 'help': + return { + commands: [ + 'help - show available commands', + 'status - kernel status', + 'launch - launch a subcluster', + 'terminate - terminate a subcluster', + 'subclusters - list subclusters', + 'revoke - revoke a ref', + 'listRefs - list all issued refs', + ], + }; + + case 'status': + return E(kernelFacet!).getStatus(); + + case 'subclusters': + return E(kernelFacet!).getSubclusters(); + + case 'launch': { + const config = args[0] as ClusterConfig; + if (!config) { + throw new Error('launch requires a config argument'); + } + const result = await E(kernelFacet!).launchSubcluster(config); + const ref = issueRef(result.rootKref); + return { ref, subclusterId: result.subclusterId }; + } + + case 'terminate': { + const subclusterId = args[0] as string; + if (!subclusterId) { + throw new Error('terminate requires a subclusterId argument'); + } + await E(kernelFacet!).terminateSubcluster(subclusterId); + return { ok: true }; + } + + case 'revoke': { + const ref = args[0] as string; + if (!ref) { + throw new Error('revoke requires a ref argument'); + } + return { ok: revokeRef(ref) }; + } + + case 'listRefs': + return { refs: listRefs() }; + + default: + throw new Error(`Unknown command: ${method}`); + } + } + + async function handleRequest(request: Request): Promise { + const { ref, method, args = [] } = request; + + if (!ref) { + return dispatchConsoleMethod(method, args); + } + + const kref = lookupKref(ref); + if (!kref) { + throw new Error(`Unknown ref: ${ref}`); + } + return E(kernelFacet!).queueMessage(kref, method, args); + } + + async function runReplLoop(ioService: IOService): Promise { + for (;;) { + const line = await E(ioService).read(); + if (line === null) { + continue; + } + + let response: unknown; + try { + const request = JSON.parse(line) as Request; + const result = await handleRequest(request); + response = { ok: true, result }; + } catch (error) { + // Errors crossing vat boundaries may arrive as plain objects. + // Try multiple strategies to extract a human-readable message. + let errorMessage: string; + if (error instanceof Error) { + errorMessage = error.message ?? error.stack ?? String(error); + } else if (typeof error === 'string') { + errorMessage = error; + } else { + try { + errorMessage = JSON.stringify(error); + } catch { + errorMessage = String(error); + } + } + response = { ok: false, error: errorMessage }; + } + + try { + await E(ioService).write(JSON.stringify(response)); + } catch { + // Write failed — continue loop + } + } + } + + return makeDefaultExo('root', { + async bootstrap( + _vats: unknown, + services: BootstrapServices, + ): Promise { + if (!kernelFacet && services.kernelFacet) { + kernelFacet = services.kernelFacet; + baggage.init('kernelFacet', kernelFacet); + } + + if (services.console) { + runReplLoop(services.console).catch((error) => { + console.error(`[${name}] REPL loop error:`, error); + }); + } + }, + + help() { + return harden({ + commands: [ + 'help - show available commands', + 'status - kernel status', + 'launch - launch a subcluster', + 'terminate - terminate a subcluster', + 'subclusters - list subclusters', + 'revoke - revoke a ref', + 'listRefs - list all issued refs', + ], + }); + }, + + issueRef(kref: string): string { + return issueRef(kref); + }, + + listRefs(): { ref: string; kref: string }[] { + return listRefs(); + }, + }); +} From 2806502a6a29024a6251b8148333f9b6f9c9c4e9 Mon Sep 17 00:00:00 2001 From: grypez <143971198+grypez@users.noreply.github.com> Date: Tue, 17 Feb 2026 07:24:16 -0600 Subject: [PATCH 4/5] feat(cli): add ok binary with yargs CLI for kernel interaction Add the 'ok' CLI that communicates with the kernel daemon over a UNIX domain socket using newline-delimited JSON. Uses yargs for command definitions with --help support on all commands. Supports three input modes: file arg (ok file.ocap method), stdin redirect (ok launch < config.json), and pipe (cat config.json | ok launch). Relative bundleSpec paths in launch configs are resolved to file:// URLs against CWD. Ref results are output as .ocap files when stdout is not a TTY. Co-Authored-By: Claude Opus 4.6 --- packages/cli/package.json | 3 +- packages/cli/src/commands/daemon-client.ts | 173 ++++++++ packages/cli/src/commands/daemon-entry.ts | 99 +++++ packages/cli/src/commands/daemon-spawn.ts | 48 +++ packages/cli/src/ok.ts | 450 +++++++++++++++++++++ packages/cli/tsconfig.build.json | 3 +- yarn.lock | 1 + 7 files changed, 775 insertions(+), 2 deletions(-) create mode 100644 packages/cli/src/commands/daemon-client.ts create mode 100644 packages/cli/src/commands/daemon-entry.ts create mode 100644 packages/cli/src/commands/daemon-spawn.ts create mode 100644 packages/cli/src/ok.ts diff --git a/packages/cli/package.json b/packages/cli/package.json index 4a6c8317b..2652c1710 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -9,7 +9,8 @@ }, "type": "module", "bin": { - "ocap": "./dist/app.mjs" + "ocap": "./dist/app.mjs", + "ok": "./dist/ok.mjs" }, "files": [ "dist/" diff --git a/packages/cli/src/commands/daemon-client.ts b/packages/cli/src/commands/daemon-client.ts new file mode 100644 index 000000000..8cb77502d --- /dev/null +++ b/packages/cli/src/commands/daemon-client.ts @@ -0,0 +1,173 @@ +import { createConnection } from 'node:net'; +import type { Socket } from 'node:net'; +import { homedir } from 'node:os'; +import { join } from 'node:path'; + +/** + * Get the default daemon socket path. + * + * @returns The socket path. + */ +export function getSocketPath(): string { + return join(homedir(), '.ocap', 'console.sock'); +} + +/** + * Connect to a UNIX domain socket. + * + * @param socketPath - The socket path to connect to. + * @returns A connected socket. + */ +async function connectSocket(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const socket = createConnection(socketPath, () => { + socket.removeListener('error', reject); + resolve(socket); + }); + socket.on('error', reject); + }); +} + +/** + * Read a single newline-delimited line from a socket. + * + * @param socket - The socket to read from. + * @returns The line read. + */ +async function readLine(socket: Socket): Promise { + return new Promise((resolve, reject) => { + let buffer = ''; + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + socket.removeAllListeners('data'); + socket.removeAllListeners('error'); + resolve(buffer.slice(0, idx)); + } + }; + socket.on('data', onData); + socket.once('error', (error) => { + socket.removeAllListeners('data'); + reject(error); + }); + }); +} + +/** + * Write a newline-delimited line to a socket. + * + * @param socket - The socket to write to. + * @param line - The line to write. + */ +async function writeLine(socket: Socket, line: string): Promise { + return new Promise((resolve, reject) => { + socket.write(`${line}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +/** + * The response shape from the system console vat. + */ +export type ConsoleResponse = { + ok: boolean; + result?: unknown; + error?: string; +}; + +/** + * Send a JSON request to the daemon over a UNIX socket and return the response. + * + * Opens a connection, writes one JSON line, reads one JSON response line, + * then closes the connection. + * + * @param socketPath - The UNIX socket path. + * @param request - The request to send. + * @param request.ref - Optional ref targeting a capability. + * @param request.method - The method name to invoke. + * @param request.args - Optional arguments array. + * @returns The parsed response. + */ +export async function sendCommand( + socketPath: string, + request: { ref?: string; method: string; args?: unknown[] }, +): Promise { + const socket = await connectSocket(socketPath); + try { + await writeLine(socket, JSON.stringify(request)); + const responseLine = await readLine(socket); + return JSON.parse(responseLine) as ConsoleResponse; + } finally { + socket.destroy(); + } +} + +/** + * Check whether the daemon is running by probing the socket. + * + * @param socketPath - The UNIX socket path. + * @returns True if the daemon socket accepts a connection. + */ +export async function isDaemonRunning(socketPath: string): Promise { + try { + const socket = await connectSocket(socketPath); + socket.destroy(); + return true; + } catch { + return false; + } +} + +/** + * Read all content from stdin, stripping shebang lines. + * + * @returns The stdin content with shebang lines removed. + */ +export async function readStdin(): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + process.stdin.on('data', (chunk: Buffer) => chunks.push(chunk)); + process.stdin.on('end', () => { + const raw = Buffer.concat(chunks).toString().trim(); + const lines = raw.split('\n').filter((line) => !line.startsWith('#!')); + resolve(lines.join('\n').trim()); + }); + process.stdin.on('error', reject); + }); +} + +/** + * Read a ref from stdin. Strips shebang lines. + * + * @returns The ref string. + */ +export async function readRefFromStdin(): Promise { + const content = await readStdin(); + if (!content) { + throw new Error('No ref found in stdin'); + } + return content; +} + +/** + * Read a ref from a .ocap file. Strips shebang lines. + * + * @param filePath - The path to the .ocap file. + * @returns The ref string. + */ +export async function readRefFromFile(filePath: string): Promise { + const { readFile } = await import('node:fs/promises'); + const raw = (await readFile(filePath, 'utf-8')).trim(); + const lines = raw.split('\n').filter((line) => !line.startsWith('#!')); + const ref = lines.join('\n').trim(); + if (!ref) { + throw new Error(`No ref found in ${filePath}`); + } + return ref; +} diff --git a/packages/cli/src/commands/daemon-entry.ts b/packages/cli/src/commands/daemon-entry.ts new file mode 100644 index 000000000..4e0f710f3 --- /dev/null +++ b/packages/cli/src/commands/daemon-entry.ts @@ -0,0 +1,99 @@ +/* eslint-disable n/no-process-exit, n/no-process-env, n/no-sync */ +import '@metamask/kernel-shims/endoify-node'; +import { Logger } from '@metamask/logger'; +import type { LogEntry } from '@metamask/logger'; +import { mkdir } from 'node:fs/promises'; +import { createRequire } from 'node:module'; +import { homedir } from 'node:os'; +import { dirname, join, resolve } from 'node:path'; +import { pathToFileURL } from 'node:url'; + +import { bundleFile } from './bundle.ts'; + +/** + * Create a file transport that writes logs to a file. + * + * @param logPath - The log file path. + * @returns A log transport function. + */ +function makeFileTransport(logPath: string) { + // eslint-disable-next-line @typescript-eslint/no-require-imports, n/global-require -- need sync fs for log transport + const { appendFileSync } = require('node:fs') as typeof import('node:fs'); + return (entry: LogEntry): void => { + const line = `[${new Date().toISOString()}] [${entry.level}] ${entry.message ?? ''} ${(entry.data ?? []).map(String).join(' ')}\n`; + appendFileSync(logPath, line); + }; +} + +/** + * Main daemon entry point. Starts the daemon process and keeps it running. + */ +async function main(): Promise { + const ocapDir = join(homedir(), '.ocap'); + await mkdir(ocapDir, { recursive: true }); + + const logPath = join(ocapDir, 'daemon.log'); + const logger = new Logger({ + tags: ['daemon'], + transports: [makeFileTransport(logPath)], + }); + + try { + const socketPath = + process.env.OCAP_SOCKET_PATH ?? join(ocapDir, 'console.sock'); + const consoleName = process.env.OCAP_CONSOLE_NAME ?? 'system-console'; + + // Bundle system console vat if needed + const bundlesDir = join(ocapDir, 'bundles'); + await mkdir(bundlesDir, { recursive: true }); + + const bundlePath = join(bundlesDir, 'system-console-vat.bundle'); + const cjsRequire = createRequire(import.meta.url); + const kernelPkgPath = cjsRequire.resolve( + '@metamask/ocap-kernel/package.json', + ); + const vatSource = resolve( + dirname(kernelPkgPath), + 'src/vats/system-console-vat.ts', + ); + logger.info(`Bundling system console vat from ${vatSource}...`); + await bundleFile(vatSource, { logger, targetPath: bundlePath }); + const bundleSpec = pathToFileURL(bundlePath).href; + + // Dynamically import to avoid pulling @ocap/nodejs into the CLI bundle graph + // eslint-disable-next-line import-x/no-extraneous-dependencies -- workspace package + const { startDaemon } = await import('@ocap/nodejs'); + + const handle = await startDaemon({ + systemConsoleBundleSpec: bundleSpec, + systemConsoleName: consoleName, + socketPath, + resetStorage: true, + logger, + }); + + logger.info(`Daemon started. Socket: ${handle.socketPath}`); + + // Keep the process alive + const shutdown = async (signal: string): Promise => { + logger.info(`Received ${signal}, shutting down...`); + await handle.close(); + process.exit(0); + }; + + process.on('SIGTERM', () => { + shutdown('SIGTERM').catch(() => process.exit(1)); + }); + process.on('SIGINT', () => { + shutdown('SIGINT').catch(() => process.exit(1)); + }); + } catch (error) { + logger.error('Daemon startup failed:', error); + process.exit(1); + } +} + +main().catch((error) => { + process.stderr.write(`Daemon fatal: ${String(error)}\n`); + process.exit(1); +}); diff --git a/packages/cli/src/commands/daemon-spawn.ts b/packages/cli/src/commands/daemon-spawn.ts new file mode 100644 index 000000000..0b9e128f9 --- /dev/null +++ b/packages/cli/src/commands/daemon-spawn.ts @@ -0,0 +1,48 @@ +import { spawn } from 'node:child_process'; +import { dirname, join } from 'node:path'; +import { fileURLToPath } from 'node:url'; + +import { isDaemonRunning } from './daemon-client.ts'; + +const POLL_INTERVAL_MS = 100; +const MAX_POLLS = 300; // 30 seconds + +/** + * Ensure the daemon is running. If it is not, spawn it as a detached process + * and wait until the socket becomes responsive. + * + * @param socketPath - The UNIX socket path. + */ +export async function ensureDaemon(socketPath: string): Promise { + if (await isDaemonRunning(socketPath)) { + return; + } + + process.stderr.write('Starting daemon...\n'); + + const currentDir = dirname(fileURLToPath(import.meta.url)); + const entryPath = join(currentDir, 'daemon-entry.mjs'); + + const child = spawn(process.execPath, [entryPath], { + detached: true, + stdio: 'ignore', + env: { + ...process.env, // eslint-disable-line n/no-process-env -- pass env to child + OCAP_SOCKET_PATH: socketPath, + }, + }); + child.unref(); + + // Poll until daemon responds + for (let i = 0; i < MAX_POLLS; i++) { + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)); + if (await isDaemonRunning(socketPath)) { + process.stderr.write('Daemon ready.\n'); + return; + } + } + + throw new Error( + `Daemon did not start within ${(MAX_POLLS * POLL_INTERVAL_MS) / 1000}s`, + ); +} diff --git a/packages/cli/src/ok.ts b/packages/cli/src/ok.ts new file mode 100644 index 000000000..adc021768 --- /dev/null +++ b/packages/cli/src/ok.ts @@ -0,0 +1,450 @@ +/* eslint-disable n/no-process-exit, n/no-sync, no-negated-condition */ +import '@metamask/kernel-shims/endoify-node'; +import { existsSync, fstatSync } from 'node:fs'; +import { writeFile, chmod } from 'node:fs/promises'; +import { resolve } from 'node:path'; +import { pathToFileURL } from 'node:url'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; + +import { + getSocketPath, + sendCommand, + readStdin, + readRefFromStdin, + readRefFromFile, +} from './commands/daemon-client.ts'; +import { ensureDaemon } from './commands/daemon-spawn.ts'; + +/** + * Handle the core invocation: resolve ref, call method, output result. + * + * @param args - CLI arguments after `ok`. + * @param socketPath - The daemon socket path. + */ +async function handleInvoke(args: string[], socketPath: string): Promise { + let ref: string | undefined; + let method: string; + let methodArgs: string[]; + + const firstArg = args[0]; + if ( + firstArg !== undefined && + (firstArg.endsWith('.ocap') || existsSync(firstArg)) + ) { + // File arg mode: ok [...args] + ref = await readRefFromFile(firstArg); + method = args[1] ?? 'help'; + methodArgs = args.slice(2); + } else if ( + !process.stdin.isTTY && + (fstatSync(0).isFIFO() || fstatSync(0).isFile()) + ) { + // Redirected stdin (pipe or file): could be a ref (d-) or JSON data + const stdinContent = await readStdin(); + if (!stdinContent) { + throw new Error('No input on stdin'); + } + if (stdinContent.startsWith('d-')) { + // Ref mode: ok [...args] < file.ocap + ref = stdinContent; + method = args[0] ?? 'help'; + methodArgs = args.slice(1); + } else { + // Data mode: cat config.json | ok launch + method = args[0] ?? 'help'; + methodArgs = [stdinContent, ...args.slice(1)]; + } + } else { + // No ref — dispatch on the system console itself + method = args[0] ?? 'help'; + methodArgs = args.slice(1); + } + + // For launch: resolve relative bundleSpec paths to file:// URLs + if (method === 'launch') { + methodArgs = methodArgs.map((arg) => { + try { + const parsed = JSON.parse(arg) as unknown; + if (isClusterConfigLike(parsed)) { + return JSON.stringify(resolveBundleSpecs(parsed)); + } + } catch { + // not JSON — leave as-is + } + return arg; + }); + } + + // Parse args: try JSON for each, fall back to string + const parsedArgs = methodArgs.map((arg) => { + try { + return JSON.parse(arg) as unknown; + } catch { + return arg; + } + }); + + const request: { ref?: string; method: string; args?: unknown[] } = { + method, + ...(ref !== undefined ? { ref } : {}), + ...(parsedArgs.length > 0 ? { args: parsedArgs } : {}), + }; + + const response = await sendCommand(socketPath, request); + + if (!response.ok) { + process.stderr.write(`Error: ${response.error}\n`); + process.exit(1); + } + + const isTTY = process.stdout.isTTY ?? false; + const { result } = response; + + // Check if result contains a ref (capability) + const resultRef = isRefResult(result) ? result.ref : undefined; + + if (resultRef && !isTTY) { + // Piped: output .ocap content for the ref + process.stdout.write(`#!/usr/bin/env ok\n${resultRef}\n`); + } else if (isTTY) { + process.stdout.write(`${JSON.stringify(result, null, 2)}\n`); + } else { + process.stdout.write(`${JSON.stringify(result)}\n`); + } +} + +/** + * Check if a result object contains a ref field. + * + * @param result - The result to check. + * @returns True if the result has a ref string. + */ +function isRefResult(result: unknown): result is { ref: string } { + return ( + typeof result === 'object' && + result !== null && + 'ref' in result && + typeof (result as { ref: unknown }).ref === 'string' + ); +} + +/** + * Check if a value looks like a cluster config (has bootstrap + vats). + * + * @param value - The value to check. + * @returns True if the value has bootstrap and vats fields. + */ +function isClusterConfigLike( + value: unknown, +): value is { vats: Record } { + return ( + typeof value === 'object' && + value !== null && + 'bootstrap' in value && + 'vats' in value && + typeof (value as { vats: unknown }).vats === 'object' + ); +} + +/** + * Resolve relative bundleSpec paths in a cluster config to file:// URLs. + * + * @param config - The cluster config object. + * @returns The config with resolved bundleSpec URLs. + */ +function resolveBundleSpecs(config: { + vats: Record; +}): unknown { + const resolvedVats: Record = {}; + for (const [vatName, vatConfig] of Object.entries(config.vats)) { + const spec = vatConfig.bundleSpec; + if (spec && !spec.includes('://')) { + resolvedVats[vatName] = { + ...vatConfig, + bundleSpec: pathToFileURL(resolve(spec)).href, + }; + } else { + resolvedVats[vatName] = vatConfig; + } + } + return { ...config, vats: resolvedVats }; +} + +/** + * Handle daemon management commands. + * + * @param args - CLI arguments after `ok daemon`. + * @param socketPath - The daemon socket path. + */ +async function handleDaemon(args: string[], socketPath: string): Promise { + const subcommand = args[0]; + + if (subcommand === 'stop') { + const { isDaemonRunning } = await import('./commands/daemon-client.ts'); + if (await isDaemonRunning(socketPath)) { + process.stderr.write( + 'Stopping daemon... (send SIGTERM to daemon process)\n', + ); + process.stderr.write( + `Run: kill $(lsof -t ${socketPath}) or use pkill -f daemon-entry\n`, + ); + } else { + process.stderr.write('Daemon is not running.\n'); + } + return; + } + + if (subcommand === 'begone') { + const forGood = args.includes('--forgood'); + if (!forGood) { + process.stderr.write( + 'Usage: ok daemon begone --forgood\n' + + 'This will delete all OCAP daemon state.\n', + ); + process.exit(1); + } + // eslint-disable-next-line import-x/no-extraneous-dependencies -- workspace package + const { flushDaemon } = await import('@ocap/nodejs'); + await flushDaemon({ socketPath }); + process.stderr.write('All daemon state flushed.\n'); + return; + } + + // Default: start daemon (or confirm running) + let consoleName = 'system-console'; + const consoleIdx = args.indexOf('--console'); + if (consoleIdx !== -1 && args[consoleIdx + 1]) { + consoleName = args[consoleIdx + 1] ?? consoleName; + } + + // eslint-disable-next-line n/no-process-env -- CLI sets env for daemon child process + process.env.OCAP_CONSOLE_NAME = consoleName; + + await ensureDaemon(socketPath); + + // Check if the .ocap file exists + const ocapPath = `${consoleName}.ocap`; + if (!existsSync(ocapPath)) { + // Request listRefs from the daemon to find the system console ref + const response = await sendCommand(socketPath, { method: 'listRefs' }); + + if (response.ok) { + const result = response.result as { + refs: { ref: string; kref: string }[]; + }; + const firstRef = result.refs[0]; + if (firstRef) { + const content = `#!/usr/bin/env ok\n${firstRef.ref}\n`; + await writeFile(ocapPath, content); + await chmod(ocapPath, 0o755); + process.stderr.write(`Created ${ocapPath}\n`); + } + } + } + + process.stderr.write(`Daemon running. Socket: ${socketPath}\n`); +} + +/** + * Handle revoke command. + * + * @param args - CLI arguments after `ok revoke`. + * @param socketPath - The daemon socket path. + */ +async function handleRevoke(args: string[], socketPath: string): Promise { + let ref: string; + + const firstArg = args[0]; + if ( + firstArg !== undefined && + (firstArg.endsWith('.ocap') || existsSync(firstArg)) + ) { + ref = await readRefFromFile(firstArg); + } else if ( + !process.stdin.isTTY && + (fstatSync(0).isFIFO() || fstatSync(0).isFile()) + ) { + ref = await readRefFromStdin(); + } else { + process.stderr.write( + 'Usage: ok revoke \n ok revoke < file.ocap\n', + ); + process.exit(1); + } + + const response = await sendCommand(socketPath, { + method: 'revoke', + args: [ref], + }); + + if (response.ok && (response.result as { ok: boolean }).ok) { + process.stderr.write(`Revoked ref: ${ref}\n`); + } else { + process.stderr.write(`Ref not found: ${ref}\n`); + process.exit(1); + } +} + +const socketPath = getSocketPath(); + +const cli = yargs(hideBin(process.argv)) + .scriptName('ok') + .usage('$0 [file.ocap] [...args]') + + .command( + 'launch [config]', + 'Launch a subcluster', + (_yargs) => + _yargs + .positional('config', { + describe: 'Cluster config as inline JSON string', + type: 'string', + }) + .example( + '$0 launch \'{"bootstrap":"v","vats":{"v":{"bundleSpec":"file:///path/to.bundle"}}}\'', + 'Inline JSON', + ) + .example('$0 launch < config.json > root.ocap', 'File redirect') + .example('cat config.json | $0 launch', 'Piped'), + async (args) => { + await ensureDaemon(socketPath); + await handleInvoke( + ['launch', ...(args.config ? [args.config] : [])], + socketPath, + ); + }, + ) + + .command( + 'terminate ', + 'Terminate a subcluster', + (_yargs) => + _yargs.positional('subclusterId', { + describe: 'ID of the subcluster to terminate', + type: 'string', + demandOption: true, + }), + async (args) => { + await ensureDaemon(socketPath); + await handleInvoke(['terminate', String(args.subclusterId)], socketPath); + }, + ) + + .command( + 'status', + 'Show kernel status', + () => ({}), + async () => { + await ensureDaemon(socketPath); + await handleInvoke(['status'], socketPath); + }, + ) + + .command( + 'subclusters', + 'List subclusters', + () => ({}), + async () => { + await ensureDaemon(socketPath); + await handleInvoke(['subclusters'], socketPath); + }, + ) + + .command( + 'listRefs', + 'List all issued refs', + () => ({}), + async () => { + await ensureDaemon(socketPath); + await handleInvoke(['listRefs'], socketPath); + }, + ) + + .command( + 'help', + 'Show available kernel commands', + () => ({}), + async () => { + await ensureDaemon(socketPath); + await handleInvoke(['help'], socketPath); + }, + ) + + .command( + 'revoke [target]', + 'Revoke a capability ref', + (_yargs) => + _yargs + .positional('target', { + describe: 'Path to .ocap file', + type: 'string', + }) + .example('$0 revoke file.ocap', 'By file path') + .example('$0 revoke < file.ocap', 'From stdin'), + async (args) => { + await handleRevoke(args.target ? [String(args.target)] : [], socketPath); + }, + ) + + .command( + 'daemon [subcommand]', + 'Manage the daemon process', + (_yargs) => + _yargs + .positional('subcommand', { + describe: 'Subcommand: stop, begone', + type: 'string', + }) + .option('console', { + describe: 'System console name', + type: 'string', + default: 'system-console', + }) + .option('forgood', { + describe: 'Confirm state deletion (for begone)', + type: 'boolean', + }), + async (args) => { + const daemonArgs: string[] = []; + if (args.subcommand) { + daemonArgs.push(String(args.subcommand)); + } + if (args.forgood) { + daemonArgs.push('--forgood'); + } + if (args.console && args.console !== 'system-console') { + daemonArgs.push('--console', String(args.console)); + } + await handleDaemon(daemonArgs, socketPath); + }, + ) + + // Default: file.ocap dispatch or bare invocation + .command( + '$0 [args..]', + false, + (_yargs) => _yargs.strict(false), + async (args) => { + const invokeArgs = ((args.args ?? []) as string[]).map(String); + await ensureDaemon(socketPath); + await handleInvoke( + invokeArgs.length > 0 ? invokeArgs : ['help'], + socketPath, + ); + }, + ) + + .version(false) + .fail((message, error) => { + if (error) { + process.stderr.write( + `Error: ${error instanceof Error ? error.message : String(error)}\n`, + ); + } else if (message) { + process.stderr.write(`${message}\n`); + } + process.exit(1); + }); + +await cli.parse(); diff --git a/packages/cli/tsconfig.build.json b/packages/cli/tsconfig.build.json index 5982e62d4..0aac5d7a6 100644 --- a/packages/cli/tsconfig.build.json +++ b/packages/cli/tsconfig.build.json @@ -6,7 +6,8 @@ "outDir": "./dist", "emitDeclarationOnly": false, "rootDir": "./src", - "types": ["ses", "node"] + "types": ["ses", "node"], + "paths": {} }, "references": [ { "path": "../logger/tsconfig.build.json" }, diff --git a/yarn.lock b/yarn.lock index cc2c6e1c8..d3b4fa063 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3498,6 +3498,7 @@ __metadata: yargs: "npm:^17.7.2" bin: ocap: ./dist/app.mjs + ok: ./dist/ok.mjs languageName: unknown linkType: soft From 5f8febae18a6eaf50326bd84ceeea48f0d51b980 Mon Sep 17 00:00:00 2001 From: grypez <143971198+grypez@users.noreply.github.com> Date: Tue, 17 Feb 2026 08:20:12 -0600 Subject: [PATCH 5/5] feat(ocap-kernel,cli,nodejs): two-tier access for system console vat Implement a two-tier access model: unauthenticated daemon-tier commands (help, status) and privileged ref-based dispatch via .ocap capability files. Self-ref dispatch bypasses kernel round-trip for the console root object. Fix kref leaks, improve socket channel reliability with stale connection detection and client-side retry. --- packages/cli/src/commands/daemon-client.ts | 59 +++- packages/cli/src/commands/daemon-entry.ts | 15 +- packages/cli/src/ok.ts | 284 +++++++--------- packages/nodejs/src/daemon/flush-daemon.ts | 3 + .../nodejs/src/daemon/start-daemon.test.ts | 36 +- packages/nodejs/src/daemon/start-daemon.ts | 11 + packages/nodejs/src/io/socket-channel.ts | 18 +- packages/nodejs/test/e2e/daemon-stack.test.ts | 312 ++++++++--------- .../nodejs/test/vats/system-console-vat.ts | 289 +--------------- packages/ocap-kernel/src/Kernel.ts | 23 +- packages/ocap-kernel/src/kernel-facet.test.ts | 2 + packages/ocap-kernel/src/kernel-facet.ts | 1 + .../src/vats/system-console-vat.test.ts | 292 +++++++++++----- .../src/vats/system-console-vat.ts | 317 +++++++++++++----- 14 files changed, 840 insertions(+), 822 deletions(-) diff --git a/packages/cli/src/commands/daemon-client.ts b/packages/cli/src/commands/daemon-client.ts index 8cb77502d..7897962b0 100644 --- a/packages/cli/src/commands/daemon-client.ts +++ b/packages/cli/src/commands/daemon-client.ts @@ -3,6 +3,8 @@ import type { Socket } from 'node:net'; import { homedir } from 'node:os'; import { join } from 'node:path'; +const READ_TIMEOUT_MS = 30_000; + /** * Get the default daemon socket path. * @@ -37,20 +39,45 @@ async function connectSocket(socketPath: string): Promise { async function readLine(socket: Socket): Promise { return new Promise((resolve, reject) => { let buffer = ''; + + const timer = setTimeout(() => { + cleanup(); + reject(new Error('Daemon response timed out')); + }, READ_TIMEOUT_MS); + + /** + * Remove all listeners and clear the timeout. + */ + function cleanup(): void { + clearTimeout(timer); + socket.removeAllListeners('data'); + socket.removeAllListeners('error'); + socket.removeAllListeners('end'); + socket.removeAllListeners('close'); + } + const onData = (data: Buffer): void => { buffer += data.toString(); const idx = buffer.indexOf('\n'); if (idx !== -1) { - socket.removeAllListeners('data'); - socket.removeAllListeners('error'); + cleanup(); resolve(buffer.slice(0, idx)); } }; + socket.on('data', onData); socket.once('error', (error) => { - socket.removeAllListeners('data'); + cleanup(); reject(error); }); + socket.once('end', () => { + cleanup(); + reject(new Error('Socket closed before response received')); + }); + socket.once('close', () => { + cleanup(); + reject(new Error('Socket closed before response received')); + }); }); } @@ -85,7 +112,8 @@ export type ConsoleResponse = { * Send a JSON request to the daemon over a UNIX socket and return the response. * * Opens a connection, writes one JSON line, reads one JSON response line, - * then closes the connection. + * then closes the connection. Retries once after a short delay if the + * connection is rejected (e.g. due to a probe connection race). * * @param socketPath - The UNIX socket path. * @param request - The request to send. @@ -98,13 +126,24 @@ export async function sendCommand( socketPath: string, request: { ref?: string; method: string; args?: unknown[] }, ): Promise { - const socket = await connectSocket(socketPath); + const attempt = async (): Promise => { + const socket = await connectSocket(socketPath); + try { + await writeLine(socket, JSON.stringify(request)); + const responseLine = await readLine(socket); + return JSON.parse(responseLine) as ConsoleResponse; + } finally { + socket.destroy(); + } + }; + try { - await writeLine(socket, JSON.stringify(request)); - const responseLine = await readLine(socket); - return JSON.parse(responseLine) as ConsoleResponse; - } finally { - socket.destroy(); + return await attempt(); + } catch { + // Retry once after a short delay — the daemon's socket channel may + // still be cleaning up a previous probe connection. + await new Promise((resolve) => setTimeout(resolve, 100)); + return attempt(); } } diff --git a/packages/cli/src/commands/daemon-entry.ts b/packages/cli/src/commands/daemon-entry.ts index 4e0f710f3..85fc704be 100644 --- a/packages/cli/src/commands/daemon-entry.ts +++ b/packages/cli/src/commands/daemon-entry.ts @@ -2,7 +2,7 @@ import '@metamask/kernel-shims/endoify-node'; import { Logger } from '@metamask/logger'; import type { LogEntry } from '@metamask/logger'; -import { mkdir } from 'node:fs/promises'; +import { chmod, mkdir, rm, writeFile } from 'node:fs/promises'; import { createRequire } from 'node:module'; import { homedir } from 'node:os'; import { dirname, join, resolve } from 'node:path'; @@ -72,12 +72,25 @@ async function main(): Promise { logger, }); + // Write the admin .ocap file so `ok ` works + const ocapPath = + process.env.OCAP_CONSOLE_PATH ?? join(ocapDir, `${consoleName}.ocap`); + await mkdir(dirname(ocapPath), { recursive: true }); + await writeFile(ocapPath, `#!/usr/bin/env ok\n${handle.selfRef}\n`); + await chmod(ocapPath, 0o700); + logger.info(`Wrote ${ocapPath}`); + + // Write PID file so `ok daemon stop` can signal this process + const pidPath = join(ocapDir, 'daemon.pid'); + await writeFile(pidPath, String(process.pid)); + logger.info(`Daemon started. Socket: ${handle.socketPath}`); // Keep the process alive const shutdown = async (signal: string): Promise => { logger.info(`Received ${signal}, shutting down...`); await handle.close(); + await rm(pidPath, { force: true }); process.exit(0); }; diff --git a/packages/cli/src/ok.ts b/packages/cli/src/ok.ts index adc021768..15401c6f7 100644 --- a/packages/cli/src/ok.ts +++ b/packages/cli/src/ok.ts @@ -1,21 +1,34 @@ /* eslint-disable n/no-process-exit, n/no-sync, no-negated-condition */ import '@metamask/kernel-shims/endoify-node'; import { existsSync, fstatSync } from 'node:fs'; -import { writeFile, chmod } from 'node:fs/promises'; -import { resolve } from 'node:path'; +import { readFile, rm } from 'node:fs/promises'; +import { homedir } from 'node:os'; +import { join, resolve } from 'node:path'; import { pathToFileURL } from 'node:url'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import { getSocketPath, + isDaemonRunning, sendCommand, readStdin, - readRefFromStdin, readRefFromFile, } from './commands/daemon-client.ts'; import { ensureDaemon } from './commands/daemon-spawn.ts'; +const home = homedir(); + +/** + * Replace the home directory prefix with `~` for display. + * + * @param path - An absolute path. + * @returns The path with the home prefix replaced. + */ +function tildify(path: string): string { + return path.startsWith(home) ? `~${path.slice(home.length)}` : path; +} + /** * Handle the core invocation: resolve ref, call method, output result. * @@ -61,8 +74,11 @@ async function handleInvoke(args: string[], socketPath: string): Promise { methodArgs = args.slice(1); } - // For launch: resolve relative bundleSpec paths to file:// URLs + // For launch: resolve relative bundleSpec paths to file:// URLs. + // Handle shell word-splitting: `$(cat file.json)` without quotes splits + // JSON into many args. Try joining all args as one JSON string first. if (method === 'launch') { + methodArgs = rejoinSplitJson(methodArgs); methodArgs = methodArgs.map((arg) => { try { const parsed = JSON.parse(arg) as unknown; @@ -129,6 +145,39 @@ function isRefResult(result: unknown): result is { ref: string } { ); } +/** + * Rejoin args that were word-split by the shell from a single JSON value. + * + * When a user writes `ok launch $(cat config.json)` without quotes, bash + * splits the JSON on whitespace into many argv entries. This function + * detects that pattern and reassembles the original JSON. + * + * @param args - The method arguments (possibly word-split). + * @returns The args, with split JSON rejoined into a single element. + */ +function rejoinSplitJson(args: string[]): string[] { + if (args.length <= 1) { + return args; + } + // If the first arg already parses as a complete JSON object, no fix needed + const first = args[0]; + if (first !== undefined) { + try { + JSON.parse(first); + return args; + } catch { + // First arg alone isn't valid JSON — try joining + } + } + const joined = args.join(' '); + try { + JSON.parse(joined); + return [joined]; + } catch { + return args; + } +} + /** * Check if a value looks like a cluster config (has bootstrap + vats). * @@ -151,6 +200,7 @@ function isClusterConfigLike( * Resolve relative bundleSpec paths in a cluster config to file:// URLs. * * @param config - The cluster config object. + * @param config.vats - The vat configurations with optional bundleSpec paths. * @returns The config with resolved bundleSpec URLs. */ function resolveBundleSpecs(config: { @@ -181,16 +231,54 @@ async function handleDaemon(args: string[], socketPath: string): Promise { const subcommand = args[0]; if (subcommand === 'stop') { - const { isDaemonRunning } = await import('./commands/daemon-client.ts'); - if (await isDaemonRunning(socketPath)) { + if (!(await isDaemonRunning(socketPath))) { + process.stderr.write('Daemon is not running.\n'); + return; + } + + const pidPath = join(homedir(), '.ocap', 'daemon.pid'); + + let pid: number | undefined; + try { + pid = Number(await readFile(pidPath, 'utf-8')); + } catch { + // PID file missing — fall back to manual instructions + } + + if (!pid || Number.isNaN(pid)) { process.stderr.write( - 'Stopping daemon... (send SIGTERM to daemon process)\n', + 'PID file not found. Stop the daemon manually:\n' + + ` kill $(lsof -t ${tildify(socketPath)})\n`, ); + return; + } + + process.stderr.write('Stopping daemon...\n'); + try { + process.kill(pid, 'SIGTERM'); + } catch { process.stderr.write( - `Run: kill $(lsof -t ${socketPath}) or use pkill -f daemon-entry\n`, + 'Failed to send SIGTERM (process may already be gone).\n', ); + await rm(pidPath, { force: true }); + return; + } + + // Poll until socket stops responding (max 5s) + const pollEnd = Date.now() + 5_000; + while (Date.now() < pollEnd) { + await new Promise((_resolve) => setTimeout(_resolve, 250)); + if (!(await isDaemonRunning(socketPath))) { + break; + } + } + + await rm(pidPath, { force: true }); + + if (await isDaemonRunning(socketPath)) { + process.stderr.write('Daemon did not stop within 5 seconds.\n'); } else { - process.stderr.write('Daemon is not running.\n'); + process.stderr.write('Daemon stopped.\n'); } return; } @@ -212,77 +300,29 @@ async function handleDaemon(args: string[], socketPath: string): Promise { } // Default: start daemon (or confirm running) - let consoleName = 'system-console'; + let consolePath = 'system-console.ocap'; const consoleIdx = args.indexOf('--console'); if (consoleIdx !== -1 && args[consoleIdx + 1]) { - consoleName = args[consoleIdx + 1] ?? consoleName; + consolePath = args[consoleIdx + 1] ?? consolePath; } + // Resolve relative to PWD + const ocapPath = resolve(consolePath); + // Derive the console name from the filename (strip .ocap if present) + const consoleName = ocapPath.endsWith('.ocap') + ? ocapPath.slice(ocapPath.lastIndexOf('/') + 1, -5) + : ocapPath.slice(ocapPath.lastIndexOf('/') + 1); + // eslint-disable-next-line n/no-process-env -- CLI sets env for daemon child process process.env.OCAP_CONSOLE_NAME = consoleName; + // eslint-disable-next-line n/no-process-env -- CLI sets env for daemon child process + process.env.OCAP_CONSOLE_PATH = ocapPath; await ensureDaemon(socketPath); - // Check if the .ocap file exists - const ocapPath = `${consoleName}.ocap`; - if (!existsSync(ocapPath)) { - // Request listRefs from the daemon to find the system console ref - const response = await sendCommand(socketPath, { method: 'listRefs' }); - - if (response.ok) { - const result = response.result as { - refs: { ref: string; kref: string }[]; - }; - const firstRef = result.refs[0]; - if (firstRef) { - const content = `#!/usr/bin/env ok\n${firstRef.ref}\n`; - await writeFile(ocapPath, content); - await chmod(ocapPath, 0o755); - process.stderr.write(`Created ${ocapPath}\n`); - } - } - } - - process.stderr.write(`Daemon running. Socket: ${socketPath}\n`); -} - -/** - * Handle revoke command. - * - * @param args - CLI arguments after `ok revoke`. - * @param socketPath - The daemon socket path. - */ -async function handleRevoke(args: string[], socketPath: string): Promise { - let ref: string; - - const firstArg = args[0]; - if ( - firstArg !== undefined && - (firstArg.endsWith('.ocap') || existsSync(firstArg)) - ) { - ref = await readRefFromFile(firstArg); - } else if ( - !process.stdin.isTTY && - (fstatSync(0).isFIFO() || fstatSync(0).isFile()) - ) { - ref = await readRefFromStdin(); - } else { - process.stderr.write( - 'Usage: ok revoke \n ok revoke < file.ocap\n', - ); - process.exit(1); - } - - const response = await sendCommand(socketPath, { - method: 'revoke', - args: [ref], - }); - - if (response.ok && (response.result as { ok: boolean }).ok) { - process.stderr.write(`Revoked ref: ${ref}\n`); - } else { - process.stderr.write(`Ref not found: ${ref}\n`); - process.exit(1); + process.stderr.write(`Daemon running. Socket: ${tildify(socketPath)}\n`); + if (existsSync(ocapPath)) { + process.stderr.write(`Admin console: ${tildify(ocapPath)}\n`); } } @@ -291,101 +331,7 @@ const socketPath = getSocketPath(); const cli = yargs(hideBin(process.argv)) .scriptName('ok') .usage('$0 [file.ocap] [...args]') - - .command( - 'launch [config]', - 'Launch a subcluster', - (_yargs) => - _yargs - .positional('config', { - describe: 'Cluster config as inline JSON string', - type: 'string', - }) - .example( - '$0 launch \'{"bootstrap":"v","vats":{"v":{"bundleSpec":"file:///path/to.bundle"}}}\'', - 'Inline JSON', - ) - .example('$0 launch < config.json > root.ocap', 'File redirect') - .example('cat config.json | $0 launch', 'Piped'), - async (args) => { - await ensureDaemon(socketPath); - await handleInvoke( - ['launch', ...(args.config ? [args.config] : [])], - socketPath, - ); - }, - ) - - .command( - 'terminate ', - 'Terminate a subcluster', - (_yargs) => - _yargs.positional('subclusterId', { - describe: 'ID of the subcluster to terminate', - type: 'string', - demandOption: true, - }), - async (args) => { - await ensureDaemon(socketPath); - await handleInvoke(['terminate', String(args.subclusterId)], socketPath); - }, - ) - - .command( - 'status', - 'Show kernel status', - () => ({}), - async () => { - await ensureDaemon(socketPath); - await handleInvoke(['status'], socketPath); - }, - ) - - .command( - 'subclusters', - 'List subclusters', - () => ({}), - async () => { - await ensureDaemon(socketPath); - await handleInvoke(['subclusters'], socketPath); - }, - ) - - .command( - 'listRefs', - 'List all issued refs', - () => ({}), - async () => { - await ensureDaemon(socketPath); - await handleInvoke(['listRefs'], socketPath); - }, - ) - - .command( - 'help', - 'Show available kernel commands', - () => ({}), - async () => { - await ensureDaemon(socketPath); - await handleInvoke(['help'], socketPath); - }, - ) - - .command( - 'revoke [target]', - 'Revoke a capability ref', - (_yargs) => - _yargs - .positional('target', { - describe: 'Path to .ocap file', - type: 'string', - }) - .example('$0 revoke file.ocap', 'By file path') - .example('$0 revoke < file.ocap', 'From stdin'), - async (args) => { - await handleRevoke(args.target ? [String(args.target)] : [], socketPath); - }, - ) + .help(false) .command( 'daemon [subcommand]', @@ -397,9 +343,9 @@ const cli = yargs(hideBin(process.argv)) type: 'string', }) .option('console', { - describe: 'System console name', + describe: 'Path for the .ocap admin file (relative to PWD)', type: 'string', - default: 'system-console', + default: 'system-console.ocap', }) .option('forgood', { describe: 'Confirm state deletion (for begone)', @@ -413,7 +359,7 @@ const cli = yargs(hideBin(process.argv)) if (args.forgood) { daemonArgs.push('--forgood'); } - if (args.console && args.console !== 'system-console') { + if (args.console && args.console !== 'system-console.ocap') { daemonArgs.push('--console', String(args.console)); } await handleDaemon(daemonArgs, socketPath); diff --git a/packages/nodejs/src/daemon/flush-daemon.ts b/packages/nodejs/src/daemon/flush-daemon.ts index cf4b2adbe..426cb422e 100644 --- a/packages/nodejs/src/daemon/flush-daemon.ts +++ b/packages/nodejs/src/daemon/flush-daemon.ts @@ -23,9 +23,12 @@ export async function flushDaemon(options?: FlushDaemonOptions): Promise { const dbFilename = options?.dbFilename ?? join(ocapDir, 'kernel.sqlite'); const bundlesDir = join(ocapDir, 'bundles'); + const pidPath = join(ocapDir, 'daemon.pid'); + await Promise.all([ rm(dbFilename, { force: true }), rm(socketPath, { force: true }), rm(bundlesDir, { recursive: true, force: true }), + rm(pidPath, { force: true }), ]); } diff --git a/packages/nodejs/src/daemon/start-daemon.test.ts b/packages/nodejs/src/daemon/start-daemon.test.ts index 21e027f56..9f18c24bd 100644 --- a/packages/nodejs/src/daemon/start-daemon.test.ts +++ b/packages/nodejs/src/daemon/start-daemon.test.ts @@ -8,9 +8,19 @@ vi.mock('../kernel/make-kernel.ts', () => ({ makeKernel: vi.fn().mockResolvedValue({ initIdentity: vi.fn().mockResolvedValue(undefined), stop: vi.fn().mockResolvedValue(undefined), + getSystemSubclusterRoot: vi.fn().mockReturnValue('ko-root'), + queueMessage: vi.fn().mockResolvedValue({ body: '"d-1"', slots: [] }), }), })); +// Mock kunser to deserialise the capdata returned by queueMessage +vi.mock('@metamask/ocap-kernel', async () => { + const actual = await vi.importActual( + '@metamask/ocap-kernel', + ); + return { ...actual, kunser: vi.fn().mockReturnValue('d-1') }; +}); + // Mock filesystem operations vi.mock('node:fs/promises', async () => { const actual = @@ -73,7 +83,7 @@ describe('startDaemon', () => { ); }); - it('returns socket path and close function', async () => { + it('returns socket path, selfRef, and close function', async () => { const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; handle = await startDaemon({ @@ -82,10 +92,34 @@ describe('startDaemon', () => { }); expect(handle.socketPath).toBe(tmpSocket); + expect(handle.selfRef).toBe('d-1'); expect(typeof handle.close).toBe('function'); expect(handle.kernel).toBeDefined(); }); + it('issues a self-ref via getSystemSubclusterRoot and queueMessage', async () => { + const { makeKernel } = await import('../kernel/make-kernel.ts'); + const mockedMakeKernel = vi.mocked(makeKernel); + + const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; + + handle = await startDaemon({ + systemConsoleBundleSpec: 'http://localhost/bundle', + systemConsoleName: 'my-console', + socketPath: tmpSocket, + }); + + const mockKernel = await mockedMakeKernel.mock.results[0]!.value; + expect(mockKernel.getSystemSubclusterRoot).toHaveBeenCalledWith( + 'my-console', + ); + expect(mockKernel.queueMessage).toHaveBeenCalledWith( + 'ko-root', + 'issueRef', + ['ko-root', true], + ); + }); + it('calls kernel.stop on close', async () => { const tmpSocket = `/tmp/daemon-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`; diff --git a/packages/nodejs/src/daemon/start-daemon.ts b/packages/nodejs/src/daemon/start-daemon.ts index a778ec15f..5533f1e51 100644 --- a/packages/nodejs/src/daemon/start-daemon.ts +++ b/packages/nodejs/src/daemon/start-daemon.ts @@ -1,6 +1,7 @@ import { ifDefined } from '@metamask/kernel-utils'; import type { Logger } from '@metamask/logger'; import type { Kernel, SystemSubclusterConfig } from '@metamask/ocap-kernel'; +import { kunser } from '@metamask/ocap-kernel'; import { mkdir } from 'node:fs/promises'; import { homedir } from 'node:os'; import { join } from 'node:path'; @@ -35,6 +36,7 @@ export type StartDaemonOptions = { export type DaemonHandle = { kernel: Kernel; socketPath: string; + selfRef: string; close: () => Promise; }; @@ -95,6 +97,14 @@ export async function startDaemon( await kernel.initIdentity(); + // Issue a self-ref so the admin .ocap file can address the console root object + const rootKref = kernel.getSystemSubclusterRoot(systemConsoleName); + const capData = await kernel.queueMessage(rootKref, 'issueRef', [ + rootKref, + true, + ]); + const selfRef = kunser(capData) as string; + const close = async (): Promise => { await kernel.stop(); }; @@ -102,6 +112,7 @@ export async function startDaemon( return { kernel, socketPath, + selfRef, close, }; } diff --git a/packages/nodejs/src/io/socket-channel.ts b/packages/nodejs/src/io/socket-channel.ts index 97ebfc46e..3acda62f5 100644 --- a/packages/nodejs/src/io/socket-channel.ts +++ b/packages/nodejs/src/io/socket-channel.ts @@ -90,13 +90,21 @@ export async function makeSocketIOChannel( const server = net.createServer((socket) => { if (currentSocket) { - // Only one connection at a time - socket.destroy(); - return; + if (currentSocket.readableEnded || currentSocket.destroyed) { + // Old connection is dead but events haven't been fully processed; + // clean it up and accept the new connection. + currentSocket.removeAllListeners(); + currentSocket.destroy(); + currentSocket = null; + } else { + // Existing active client — reject the new connection + socket.destroy(); + return; + } } - // Drain stale state from any previous connection + // Drain stale data from any previous connection, but keep pending + // readers alive so they can receive data from the new connection. lineQueue.length = 0; - deliverEOF(); currentSocket = socket; decoder = new StringDecoder('utf8'); diff --git a/packages/nodejs/test/e2e/daemon-stack.test.ts b/packages/nodejs/test/e2e/daemon-stack.test.ts index d29b962b7..9e815db44 100644 --- a/packages/nodejs/test/e2e/daemon-stack.test.ts +++ b/packages/nodejs/test/e2e/daemon-stack.test.ts @@ -1,13 +1,15 @@ import type { KernelDatabase } from '@metamask/kernel-store'; import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; import { waitUntilQuiescent } from '@metamask/kernel-utils'; -import type { Kernel, IOChannel, IOConfig } from '@metamask/ocap-kernel'; +import type { Kernel } from '@metamask/ocap-kernel'; +import { kunser } from '@metamask/ocap-kernel'; import * as net from 'node:net'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { pathToFileURL } from 'node:url'; import { describe, it, expect, afterEach } from 'vitest'; +import { makeIOChannelFactory } from '../../src/io/index.ts'; import { makeTestKernel } from '../helpers/kernel.ts'; const SYSTEM_CONSOLE_NAME = 'system-console'; @@ -104,136 +106,6 @@ async function sendCommand( } } -/** - * Create a test socket IO channel factory. - * - * @returns The factory function. - */ -function makeTestIOChannelFactory() { - const fsPromises = import('node:fs/promises'); - - return async (_name: string, config: IOConfig): Promise => { - if (config.type !== 'socket') { - throw new Error(`unsupported IO type: ${config.type}`); - } - const fs = await fsPromises; - const lineQueue: string[] = []; - const readerQueue: { resolve: (value: string | null) => void }[] = []; - let currentSocket: net.Socket | null = null; - let lineBuffer = ''; - let closed = false; - - function deliverLine(line: string): void { - const reader = readerQueue.shift(); - if (reader) { - reader.resolve(line); - } else { - lineQueue.push(line); - } - } - - function deliverEOF(): void { - while (readerQueue.length > 0) { - readerQueue.shift()?.resolve(null); - } - } - - const server = net.createServer((socket) => { - if (currentSocket) { - socket.destroy(); - return; - } - currentSocket = socket; - lineBuffer = ''; - socket.on('data', (data: Buffer) => { - lineBuffer += data.toString(); - let idx = lineBuffer.indexOf('\n'); - while (idx !== -1) { - deliverLine(lineBuffer.slice(0, idx)); - lineBuffer = lineBuffer.slice(idx + 1); - idx = lineBuffer.indexOf('\n'); - } - }); - socket.on('end', () => { - if (lineBuffer.length > 0) { - deliverLine(lineBuffer); - lineBuffer = ''; - } - currentSocket = null; - deliverEOF(); - }); - socket.on('error', () => { - currentSocket = null; - deliverEOF(); - }); - }); - - try { - await fs.unlink(config.path); - } catch { - // ignore - } - - await new Promise((resolve, reject) => { - server.on('error', reject); - server.listen(config.path, () => { - server.removeListener('error', reject); - resolve(); - }); - }); - - return { - async read() { - if (closed) { - return null; - } - const queued = lineQueue.shift(); - if (queued !== undefined) { - return queued; - } - if (!currentSocket) { - return null; - } - return new Promise((resolve) => { - readerQueue.push({ resolve }); - }); - }, - async write(data: string) { - if (!currentSocket) { - throw new Error('no connected client'); - } - const socket = currentSocket; - return new Promise((resolve, reject) => { - socket.write(`${data}\n`, (error) => { - if (error) { - reject(error); - } else { - resolve(); - } - }); - }); - }, - async close() { - if (closed) { - return; - } - closed = true; - deliverEOF(); - currentSocket?.destroy(); - currentSocket = null; - await new Promise((resolve) => { - server.close(() => resolve()); - }); - try { - await fs.unlink(config.path); - } catch { - // ignore - } - }, - }; - }; -} - /** * Get the bundle spec for the system console vat test bundle. * @@ -261,7 +133,7 @@ describe('Daemon Stack (IO socket protocol)', { timeout: 30_000 }, () => { kernelDatabase = await makeSQLKernelDatabase({ dbFilename: ':memory:' }); kernel = await makeTestKernel(kernelDatabase, { - ioChannelFactory: makeTestIOChannelFactory(), + ioChannelFactory: makeIOChannelFactory(), systemSubclusters: [ { name: SYSTEM_CONSOLE_NAME, @@ -303,54 +175,162 @@ describe('Daemon Stack (IO socket protocol)', { timeout: 30_000 }, () => { } }); - it('dispatches help command via socket', async () => { - const socketPath = await bootDaemonStack(); + describe('daemon tier (no ref)', () => { + it('dispatches help command with daemon-tier commands only', async () => { + const socketPath = await bootDaemonStack(); - const response = await sendCommand(socketPath, { method: 'help' }); + const response = await sendCommand(socketPath, { method: 'help' }); - expect(response.ok).toBe(true); - const result = response.result as { commands: string[] }; - expect(result.commands).toBeDefined(); - expect(result.commands.length).toBeGreaterThan(0); - expect(result.commands.some((cmd) => cmd.includes('help'))).toBe(true); - expect(result.commands.some((cmd) => cmd.includes('status'))).toBe(true); - }); + expect(response.ok).toBe(true); + expect(response.result).toStrictEqual({ + commands: ['help - show available commands', 'status - daemon status'], + }); + }); - it('dispatches status command via socket', async () => { - const socketPath = await bootDaemonStack(); + it('dispatches status command returning liveness indicator', async () => { + const socketPath = await bootDaemonStack(); - const response = await sendCommand(socketPath, { method: 'status' }); + const response = await sendCommand(socketPath, { method: 'status' }); - expect(response.ok).toBe(true); - }); + expect(response).toStrictEqual({ + ok: true, + result: { running: true }, + }); + }); - it('dispatches listRefs command', async () => { - const socketPath = await bootDaemonStack(); + it('returns error for unknown command', async () => { + const socketPath = await bootDaemonStack(); - const response = await sendCommand(socketPath, { method: 'listRefs' }); + const response = await sendCommand(socketPath, { + method: 'nonexistent', + }); - expect(response.ok).toBe(true); - const result = response.result as { refs: { ref: string; kref: string }[] }; - expect(result.refs).toBeDefined(); - expect(Array.isArray(result.refs)).toBe(true); - }); + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown command'); + }); + + it('rejects privileged commands at daemon tier', async () => { + const socketPath = await bootDaemonStack(); + + const response = await sendCommand(socketPath, { method: 'ls' }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown command'); + }); - it('returns error for unknown command', async () => { - const socketPath = await bootDaemonStack(); + it('handles sequential requests on separate connections', async () => { + const socketPath = await bootDaemonStack(); - const response = await sendCommand(socketPath, { method: 'nonexistent' }); + const response1 = await sendCommand(socketPath, { method: 'help' }); + expect(response1.ok).toBe(true); - expect(response.ok).toBe(false); - expect(response.error).toContain('Unknown command'); + const response2 = await sendCommand(socketPath, { method: 'status' }); + expect(response2.ok).toBe(true); + }); }); - it('handles sequential requests on separate connections', async () => { - const socketPath = await bootDaemonStack(); + describe('privileged tier (ref-based dispatch)', () => { + /** + * Boot daemon and issue a self-ref for the console root object. + * + * @returns The socket path and the issued ref. + */ + async function bootWithSelfRef(): Promise<{ + socketPath: string; + selfRef: string; + }> { + const socketPath = await bootDaemonStack(); + + // Issue a self-ref via kernel API (same as start-daemon.ts does) + const rootKref = kernel!.getSystemSubclusterRoot(SYSTEM_CONSOLE_NAME); + const capData = await kernel!.queueMessage(rootKref, 'issueRef', [ + rootKref, + true, + ]); + const selfRef = kunser(capData) as string; + + return { socketPath, selfRef }; + } + + it('dispatches help via ref', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'help', + }); + + expect(response.ok).toBe(true); + const result = response.result as { commands: string[] }; + expect(result.commands).toContain('help - show available commands'); + expect(result.commands).toContain('ls - list all issued refs'); + }); - const response1 = await sendCommand(socketPath, { method: 'help' }); - expect(response1.ok).toBe(true); + it('dispatches status via ref (returns kernel status)', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'status', + }); - const response2 = await sendCommand(socketPath, { method: 'status' }); - expect(response2.ok).toBe(true); + expect(response.ok).toBe(true); + const result = response.result as Record; + expect(result).toHaveProperty('vats'); + expect(result).toHaveProperty('subclusters'); + }); + + it('dispatches ls via ref', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'ls', + }); + + expect(response.ok).toBe(true); + const result = response.result as { refs: string[] }; + expect(Array.isArray(result.refs)).toBe(true); + }); + + it('dispatches subclusters via ref', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'subclusters', + }); + + expect(response.ok).toBe(true); + expect(Array.isArray(response.result)).toBe(true); + }); + + it('dispatches invoke to call method on a ref through the kernel', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + // Use invoke to call 'ls' on the self-ref (goes through getPresence + E()) + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'invoke', + args: [selfRef, 'ls'], + }); + + expect(response.ok).toBe(true); + const result = response.result as { refs: string[] }; + expect(Array.isArray(result.refs)).toBe(true); + }); + + it('returns error when invoke targets unknown ref', async () => { + const { socketPath, selfRef } = await bootWithSelfRef(); + + const response = await sendCommand(socketPath, { + ref: selfRef, + method: 'invoke', + args: ['d-999', 'someMethod'], + }); + + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown ref: d-999'); + }); }); }); diff --git a/packages/nodejs/test/vats/system-console-vat.ts b/packages/nodejs/test/vats/system-console-vat.ts index 905538bad..75da555db 100644 --- a/packages/nodejs/test/vats/system-console-vat.ts +++ b/packages/nodejs/test/vats/system-console-vat.ts @@ -1,288 +1 @@ -import { E } from '@endo/eventual-send'; -import { makeDefaultExo } from '@metamask/kernel-utils/exo'; -import type { - Baggage, - ClusterConfig, - KernelStatus, - Subcluster, - SubclusterLaunchResult, -} from '@metamask/ocap-kernel'; - -/** - * Kernel facet interface for system vat operations. - */ -type KernelFacet = { - getStatus: () => Promise; - getSubclusters: () => Promise; - launchSubcluster: (config: ClusterConfig) => Promise; - terminateSubcluster: (subclusterId: string) => Promise; - queueMessage: ( - target: string, - method: string, - args: unknown[], - ) => Promise; -}; - -/** - * Services provided to the system console vat during bootstrap. - */ -type BootstrapServices = { - kernelFacet?: KernelFacet; - console?: IOService; -}; - -/** - * IO service interface for reading and writing lines. - */ -type IOService = { - read: () => Promise; - write: (data: string) => Promise; -}; - -/** - * A JSON request from the CLI. - */ -type Request = { - ref?: string; - method: string; - args?: unknown[]; -}; - -/** - * Build function for the system console vat. - * - * This vat manages the REPL loop over an IO channel, dispatching CLI - * commands and managing refs (capability references) in persistent baggage. - * - * @param _vatPowers - The vat powers (unused). - * @param parameters - The vat parameters. - * @param parameters.name - Optional name for the console vat. - * @param baggage - The vat's persistent baggage storage. - * @returns The root object for the new vat. - */ -export function buildRootObject( - _vatPowers: unknown, - parameters: { name?: string }, - baggage: Baggage, -) { - const name = parameters.name ?? 'system-console'; - - // Monotonic counter for generating unique ref identifiers (persisted in baggage) - let refCounter: number = baggage.has('refCounter') - ? (baggage.get('refCounter') as number) - : 0; - - // Restore kernel facet from baggage if available (for resuscitation) - let kernelFacet: KernelFacet | undefined = baggage.has('kernelFacet') - ? (baggage.get('kernelFacet') as KernelFacet) - : undefined; - - // Ref manager state in baggage - const refs: Record = baggage.has('refs') - ? (baggage.get('refs') as Record) - : {}; - const krefToRef: Record = baggage.has('krefToRef') - ? (baggage.get('krefToRef') as Record) - : {}; - - function persistRefs(): void { - if (baggage.has('refs')) { - baggage.set('refs', harden({ ...refs })); - } else { - baggage.init('refs', harden({ ...refs })); - } - if (baggage.has('krefToRef')) { - baggage.set('krefToRef', harden({ ...krefToRef })); - } else { - baggage.init('krefToRef', harden({ ...krefToRef })); - } - } - - function issueRef(kref: string): string { - const existing = krefToRef[kref]; - if (existing) { - return existing; - } - refCounter += 1; - if (baggage.has('refCounter')) { - baggage.set('refCounter', refCounter); - } else { - baggage.init('refCounter', refCounter); - } - const ref = `d-${refCounter}`; - refs[ref] = kref; - krefToRef[kref] = ref; - persistRefs(); - return ref; - } - - function lookupKref(ref: string): string | undefined { - return refs[ref]; - } - - function revokeRef(ref: string): boolean { - const kref = refs[ref]; - if (!kref) { - return false; - } - delete refs[ref]; - delete krefToRef[kref]; - persistRefs(); - return true; - } - - function listRefs(): { ref: string; kref: string }[] { - return Object.entries(refs).map(([ref, kref]) => ({ ref, kref })); - } - - async function dispatchConsoleMethod( - method: string, - args: unknown[], - ): Promise { - switch (method) { - case 'help': - return { - commands: [ - 'help - show available commands', - 'status - kernel status', - 'launch - launch a subcluster', - 'terminate - terminate a subcluster', - 'subclusters - list subclusters', - 'revoke - revoke a ref', - 'listRefs - list all issued refs', - ], - }; - - case 'status': - return E(kernelFacet!).getStatus(); - - case 'subclusters': - return E(kernelFacet!).getSubclusters(); - - case 'launch': { - const config = args[0] as ClusterConfig; - if (!config) { - throw new Error('launch requires a config argument'); - } - const result = await E(kernelFacet!).launchSubcluster(config); - const ref = issueRef(result.rootKref); - return { ref, subclusterId: result.subclusterId }; - } - - case 'terminate': { - const subclusterId = args[0] as string; - if (!subclusterId) { - throw new Error('terminate requires a subclusterId argument'); - } - await E(kernelFacet!).terminateSubcluster(subclusterId); - return { ok: true }; - } - - case 'revoke': { - const ref = args[0] as string; - if (!ref) { - throw new Error('revoke requires a ref argument'); - } - return { ok: revokeRef(ref) }; - } - - case 'listRefs': - return { refs: listRefs() }; - - default: - throw new Error(`Unknown command: ${method}`); - } - } - - async function handleRequest(request: Request): Promise { - const { ref, method, args = [] } = request; - - if (!ref) { - return dispatchConsoleMethod(method, args); - } - - const kref = lookupKref(ref); - if (!kref) { - throw new Error(`Unknown ref: ${ref}`); - } - return E(kernelFacet!).queueMessage(kref, method, args); - } - - async function runReplLoop(ioService: IOService): Promise { - for (;;) { - const line = await E(ioService).read(); - if (line === null) { - continue; - } - - let response: unknown; - try { - const request = JSON.parse(line) as Request; - const result = await handleRequest(request); - response = { ok: true, result }; - } catch (error) { - // Errors crossing vat boundaries may arrive as plain objects. - // Try multiple strategies to extract a human-readable message. - let errorMessage: string; - if (error instanceof Error) { - errorMessage = error.message ?? error.stack ?? String(error); - } else if (typeof error === 'string') { - errorMessage = error; - } else { - try { - errorMessage = JSON.stringify(error); - } catch { - errorMessage = String(error); - } - } - response = { ok: false, error: errorMessage }; - } - - try { - await E(ioService).write(JSON.stringify(response)); - } catch { - // Write failed — continue loop - } - } - } - - return makeDefaultExo('root', { - async bootstrap( - _vats: unknown, - services: BootstrapServices, - ): Promise { - if (!kernelFacet && services.kernelFacet) { - kernelFacet = services.kernelFacet; - baggage.init('kernelFacet', kernelFacet); - } - - if (services.console) { - runReplLoop(services.console).catch((error) => { - console.error(`[${name}] REPL loop error:`, error); - }); - } - }, - - help() { - return harden({ - commands: [ - 'help - show available commands', - 'status - kernel status', - 'launch - launch a subcluster', - 'terminate - terminate a subcluster', - 'subclusters - list subclusters', - 'revoke - revoke a ref', - 'listRefs - list all issued refs', - ], - }); - }, - - issueRef(kref: string): string { - return issueRef(kref); - }, - - listRefs(): { ref: string; kref: string }[] { - return listRefs(); - }, - }); -} +export { buildRootObject } from '@metamask/ocap-kernel/src/vats/system-console-vat.ts'; diff --git a/packages/ocap-kernel/src/Kernel.ts b/packages/ocap-kernel/src/Kernel.ts index 4a5296aa4..01465bbb0 100644 --- a/packages/ocap-kernel/src/Kernel.ts +++ b/packages/ocap-kernel/src/Kernel.ts @@ -11,7 +11,7 @@ import { KernelRouter } from './KernelRouter.ts'; import { KernelServiceManager } from './KernelServiceManager.ts'; import type { KernelService } from './KernelServiceManager.ts'; import type { SlotValue } from './liveslots/kernel-marshal.ts'; -import { kslot } from './liveslots/kernel-marshal.ts'; +import { kslot, kunser } from './liveslots/kernel-marshal.ts'; import { OcapURLManager } from './remotes/kernel/OcapURLManager.ts'; import { RemoteManager } from './remotes/kernel/RemoteManager.ts'; import type { RemoteCommsOptions } from './remotes/types.ts'; @@ -379,6 +379,27 @@ export class Kernel { return this.#kernelQueue.enqueueMessage(target, method, args); } + /** + * Send a message to an object in a vat and return the deserialized result. + * + * Unlike {@link queueMessage}, which returns raw CapData, this method + * deserializes the result so that it can be returned through a kernel + * service and re-serialized by liveslots for the calling vat. + * + * @param target - The kref of the object to invoke. + * @param method - The method name. + * @param args - The method arguments. + * @returns The deserialized result of the method invocation. + */ + async invokeMethod( + target: KRef, + method: string, + args: unknown[], + ): Promise { + const capData = await this.queueMessage(target, method, args); + return kunser(capData); + } + /** * Issue an OCAP URL for a kernel reference. * diff --git a/packages/ocap-kernel/src/kernel-facet.test.ts b/packages/ocap-kernel/src/kernel-facet.test.ts index 4e53f8a58..ddb4ddddf 100644 --- a/packages/ocap-kernel/src/kernel-facet.test.ts +++ b/packages/ocap-kernel/src/kernel-facet.test.ts @@ -11,6 +11,7 @@ const makeMockKernel = (): KernelFacetSource => ({ getSubcluster: () => undefined, getSubclusters: () => [], getSystemSubclusterRoot: () => 'ko99', + invokeMethod: async () => Promise.resolve(null), launchSubcluster: async () => Promise.resolve({ subclusterId: 's1', @@ -32,6 +33,7 @@ describe('makeKernelFacet', () => { expect(typeof facet.getSubcluster).toBe('function'); expect(typeof facet.getSubclusters).toBe('function'); expect(typeof facet.getSystemSubclusterRoot).toBe('function'); + expect(typeof facet.invokeMethod).toBe('function'); expect(typeof facet.launchSubcluster).toBe('function'); expect(typeof facet.ping).toBe('function'); expect(typeof facet.pingVat).toBe('function'); diff --git a/packages/ocap-kernel/src/kernel-facet.ts b/packages/ocap-kernel/src/kernel-facet.ts index 79ecbe443..13ff05321 100644 --- a/packages/ocap-kernel/src/kernel-facet.ts +++ b/packages/ocap-kernel/src/kernel-facet.ts @@ -8,6 +8,7 @@ const kernelFacetMethodNames = [ 'getSubcluster', 'getSubclusters', 'getSystemSubclusterRoot', + 'invokeMethod', 'launchSubcluster', 'pingVat', 'queueMessage', diff --git a/packages/ocap-kernel/src/vats/system-console-vat.test.ts b/packages/ocap-kernel/src/vats/system-console-vat.test.ts index 7cc1da57d..d22086d28 100644 --- a/packages/ocap-kernel/src/vats/system-console-vat.test.ts +++ b/packages/ocap-kernel/src/vats/system-console-vat.test.ts @@ -80,11 +80,16 @@ function makeMockKernelFacet() { const calls: Record = { getStatus: [], getSubclusters: [], + invokeMethod: [], launchSubcluster: [], terminateSubcluster: [], }; const facet = makeDefaultExo('mockKernelFacet', { + async invokeMethod(...args: unknown[]) { + calls.invokeMethod.push(args); + return { mocked: true }; + }, async getStatus(...args: unknown[]) { calls.getStatus.push(args); return { @@ -160,7 +165,7 @@ describe('system-console-vat', () => { }); }); - describe('REPL dispatch', () => { + describe('REPL dispatch (daemon tier)', () => { async function setupRepl() { const root = buildRootObject( {}, @@ -189,92 +194,66 @@ describe('system-console-vat', () => { }; } - it('dispatches help command', async () => { + it('dispatches help command with daemon-tier commands only', async () => { await setupRepl(); const response = await sendRequest({ method: 'help' }); expect(response.ok).toBe(true); expect(response.result).toStrictEqual({ - commands: expect.arrayContaining([ - expect.stringContaining('help'), - expect.stringContaining('status'), - ]), + commands: ['help - show available commands', 'status - daemon status'], }); }); - it('dispatches status command', async () => { + it('dispatches status command returning liveness indicator', async () => { await setupRepl(); const response = await sendRequest({ method: 'status' }); - expect(response.ok).toBe(true); - expect(kernelFacet.calls.getStatus).toHaveLength(1); - }); - - it('dispatches subclusters command', async () => { - await setupRepl(); - const response = await sendRequest({ method: 'subclusters' }); - - expect(response.ok).toBe(true); - expect(kernelFacet.calls.getSubclusters).toHaveLength(1); + expect(response).toStrictEqual({ + ok: true, + result: { running: true }, + }); }); - it('dispatches launch command and issues ref', async () => { - await setupRepl(); - const config = { - bootstrap: 'test', - vats: { test: { bundleSpec: 'test-bundle' } }, - }; - const response = await sendRequest({ method: 'launch', args: [config] }); + it.each(['launch', 'terminate', 'subclusters', 'ls', 'revoke', 'invoke'])( + 'returns "Unknown command" for privileged command "%s"', + async (method) => { + await setupRepl(); + const response = await sendRequest({ method }); - expect(response.ok).toBe(true); - const result = response.result as { ref: string; subclusterId: string }; - expect(result.ref).toMatch(/^d-\d+$/u); - expect(result.subclusterId).toBe('sub-1'); - expect(kernelFacet.calls.launchSubcluster).toHaveLength(1); - }); + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown command'); + }, + ); - it('dispatches terminate command', async () => { - await setupRepl(); - const response = await sendRequest({ - method: 'terminate', - args: ['sub-1'], - }); + it('dispatches self-ref REPL command directly on root', async () => { + const root = await setupRepl(); + const ref = root.issueRef('ko-self', true); + const response = await sendRequest({ ref, method: 'help' }); expect(response.ok).toBe(true); - expect(kernelFacet.calls.terminateSubcluster).toHaveLength(1); - }); - - it('dispatches revoke command', async () => { - await setupRepl(); - - // First launch to get a ref - const launchResponse = await sendRequest({ - method: 'launch', - args: [{ bootstrap: 'x', vats: { x: { bundleSpec: 'x' } } }], + expect(response.result).toStrictEqual({ + commands: [ + 'help - show available commands', + 'status - kernel status', + 'subclusters - list subclusters', + 'launch - launch a subcluster', + 'terminate - terminate a subcluster', + 'ls - list all issued refs', + 'revoke - revoke a ref', + 'invoke [...args] - call a method on a ref', + ], }); - const { ref } = launchResponse.result as { ref: string }; - - // Revoke the ref - const response = await sendRequest({ method: 'revoke', args: [ref] }); - expect(response).toStrictEqual({ ok: true, result: { ok: true } }); + // Should NOT have called invokeMethod — dispatch was direct + expect(kernelFacet.calls.invokeMethod).toHaveLength(0); }); - it('dispatches listRefs command', async () => { - await setupRepl(); - - // Launch to create a ref - await sendRequest({ - method: 'launch', - args: [{ bootstrap: 'x', vats: { x: { bundleSpec: 'x' } } }], - }); + it('returns error for unknown method on self-ref', async () => { + const root = await setupRepl(); + const ref = root.issueRef('ko-self', true); + const response = await sendRequest({ ref, method: 'nonexistent' }); - const response = await sendRequest({ method: 'listRefs' }); - expect(response.ok).toBe(true); - const result = response.result as { - refs: { ref: string; kref: string }[]; - }; - expect(result.refs).toHaveLength(1); - expect(result.refs[0]!.kref).toBe('ko1'); + expect(response.ok).toBe(false); + expect(response.error).toContain('Unknown method on root'); }); it('returns error for unknown command', async () => { @@ -296,6 +275,39 @@ describe('system-console-vat', () => { expect(response.error).toBeDefined(); }); + it('returns error for non-object request', async () => { + await setupRepl(); + const response = await sendRequest(42 as never); + expect(response.ok).toBe(false); + expect(response.error).toContain('Request must be a JSON object'); + }); + + it('returns error for request missing method', async () => { + await setupRepl(); + const response = await sendRequest({ ref: 'd-1' }); + expect(response.ok).toBe(false); + expect(response.error).toContain( + 'Request must have a string "method" field', + ); + }); + + it('returns error for non-string ref', async () => { + await setupRepl(); + const response = await sendRequest({ method: 'help', ref: 123 } as never); + expect(response.ok).toBe(false); + expect(response.error).toContain('"ref" must be a string'); + }); + + it('returns error for non-array args', async () => { + await setupRepl(); + const response = await sendRequest({ + method: 'help', + args: 'not-array', + } as never); + expect(response.ok).toBe(false); + expect(response.error).toContain('"args" must be an array'); + }); + it('continues after EOF (client disconnect)', async () => { await setupRepl(); @@ -313,6 +325,128 @@ describe('system-console-vat', () => { }); }); + describe('privileged root object methods', () => { + async function setupRoot() { + const root = buildRootObject( + {}, + { name: 'test-console' }, + baggage as never, + ); + await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); + return root; + } + + it('returns help with all privileged commands', async () => { + const root = await setupRoot(); + const result = root.help(); + expect(result.commands).toStrictEqual([ + 'help - show available commands', + 'status - kernel status', + 'subclusters - list subclusters', + 'launch - launch a subcluster', + 'terminate - terminate a subcluster', + 'ls - list all issued refs', + 'revoke - revoke a ref', + 'invoke [...args] - call a method on a ref', + ]); + }); + + it('returns kernel status', async () => { + const root = await setupRoot(); + const result = await root.status(); + + expect(result).toStrictEqual({ + incarnation: 1, + subclusters: 0, + vats: 1, + pendingMessages: 0, + }); + expect(kernelFacet.calls.getStatus).toHaveLength(1); + }); + + it('returns subclusters list', async () => { + const root = await setupRoot(); + const result = await root.subclusters(); + + expect(result).toStrictEqual([]); + expect(kernelFacet.calls.getSubclusters).toHaveLength(1); + }); + + it('launches subcluster and issues ref', async () => { + const root = await setupRoot(); + const config = { + bootstrap: 'test', + vats: { test: { bundleSpec: 'test-bundle' } }, + }; + const result = await root.launch(config); + + expect(result.ref).toMatch(/^d-\d+$/u); + expect(result.subclusterId).toBe('sub-1'); + expect(kernelFacet.calls.launchSubcluster).toHaveLength(1); + }); + + it('terminates subcluster', async () => { + const root = await setupRoot(); + const result = await root.terminate('sub-1'); + + expect(result).toStrictEqual({ ok: true }); + expect(kernelFacet.calls.terminateSubcluster).toHaveLength(1); + }); + + it('revokes a ref', async () => { + const root = await setupRoot(); + root.issueRef('ko1'); + + const result = root.revoke('d-1'); + expect(result).toStrictEqual({ ok: true }); + }); + + it('returns false when revoking unknown ref', async () => { + const root = await setupRoot(); + const result = root.revoke('d-999'); + expect(result).toStrictEqual({ ok: false }); + }); + + it('lists issued refs', async () => { + const root = await setupRoot(); + root.issueRef('ko1'); + root.issueRef('ko2'); + + const result = root.ls(); + expect(result.refs).toHaveLength(2); + expect(result.refs[0]).toMatch(/^d-\d+$/u); + expect(result.refs[1]).toMatch(/^d-\d+$/u); + }); + + it('returns empty list when no refs issued', async () => { + const root = await setupRoot(); + const result = root.ls(); + expect(result).toStrictEqual({ refs: [] }); + }); + + it('throws when invoke target ref is unknown', async () => { + const root = await setupRoot(); + await expect(root.invoke('d-999', 'transfer')).rejects.toThrow( + 'Unknown ref: d-999', + ); + }); + + it('throws when invoke is called without a target ref', async () => { + const root = await setupRoot(); + await expect(root.invoke('', 'transfer')).rejects.toThrow( + 'invoke requires a target ref', + ); + }); + + it('throws when invoke is called without a method', async () => { + const root = await setupRoot(); + const ref = root.issueRef('ko-wallet'); + await expect(root.invoke(ref, '')).rejects.toThrow( + 'invoke requires a method name', + ); + }); + }); + describe('ref manager', () => { it('issues idempotent refs for the same kref', async () => { const root = buildRootObject( @@ -353,31 +487,5 @@ describe('system-console-vat', () => { expect(baggage.has('refs')).toBe(true); expect(baggage.has('krefToRef')).toBe(true); }); - - it('lists issued refs', async () => { - const root = buildRootObject( - {}, - { name: 'test-console' }, - baggage as never, - ); - await root.bootstrap({}, { kernelFacet: kernelFacet.facet }); - - const ref = root.issueRef('ko1'); - const refList = root.listRefs(); - expect(refList).toStrictEqual([{ ref, kref: 'ko1' }]); - }); - }); - - describe('help', () => { - it('returns command list', () => { - const root = buildRootObject( - {}, - { name: 'test-console' }, - baggage as never, - ); - const result = root.help(); - expect(result).toHaveProperty('commands'); - expect(result.commands.length).toBeGreaterThan(0); - }); }); }); diff --git a/packages/ocap-kernel/src/vats/system-console-vat.ts b/packages/ocap-kernel/src/vats/system-console-vat.ts index 020a797ef..beb50c373 100644 --- a/packages/ocap-kernel/src/vats/system-console-vat.ts +++ b/packages/ocap-kernel/src/vats/system-console-vat.ts @@ -16,13 +16,13 @@ import type { type KernelFacet = { getStatus: () => Promise; getSubclusters: () => Promise; - launchSubcluster: (config: ClusterConfig) => Promise; - terminateSubcluster: (subclusterId: string) => Promise; - queueMessage: ( + invokeMethod: ( target: string, method: string, args: unknown[], ) => Promise; + launchSubcluster: (config: ClusterConfig) => Promise; + terminateSubcluster: (subclusterId: string) => Promise; }; /** @@ -68,57 +68,81 @@ export function buildRootObject( _parameters: { name?: string }, baggage: Baggage, ) { + /** + * Get a value from baggage, or return a fallback if the key is absent. + * + * @param key - The baggage key. + * @param fallback - The value to return if the key is absent. + * @returns The stored value or the fallback. + */ + function baggageGet(key: string, fallback: T): T { + return baggage.has(key) ? (baggage.get(key) as T) : fallback; + } + + /** + * Set a value in baggage, initialising the key if it doesn't exist. + * + * @param key - The baggage key. + * @param value - The value to store. + */ + function baggagePut(key: string, value: unknown): void { + if (baggage.has(key)) { + baggage.set(key, value); + } else { + baggage.init(key, value); + } + } + // Monotonic counter for generating unique ref identifiers (persisted in baggage) - let refCounter: number = baggage.has('refCounter') - ? (baggage.get('refCounter') as number) - : 0; + let refCounter: number = baggageGet('refCounter', 0); // Restore kernel facet from baggage if available (for resuscitation) - let kernelFacet: KernelFacet | undefined = baggage.has('kernelFacet') - ? (baggage.get('kernelFacet') as KernelFacet) - : undefined; + let kernelFacet: KernelFacet | undefined = baggageGet< + KernelFacet | undefined + >('kernelFacet', undefined); + + // Track which kref is the root's own, so isSelf-ref dispatch avoids kernel round-trip + let selfKref: string | undefined = baggageGet( + 'selfKref', + undefined, + ); // Ref manager state in baggage: ref → kref and kref → ref maps // Stored as plain objects since baggage serializes them - const refs: Record = baggage.has('refs') - ? (baggage.get('refs') as Record) - : {}; - const krefToRef: Record = baggage.has('krefToRef') - ? (baggage.get('krefToRef') as Record) - : {}; + const refs: Record = baggageGet( + 'refs', + {} as Record, + ); + const krefToRef: Record = baggageGet( + 'krefToRef', + {} as Record, + ); /** * Persist the current ref state to baggage. */ function persistRefs(): void { - if (baggage.has('refs')) { - baggage.set('refs', harden({ ...refs })); - } else { - baggage.init('refs', harden({ ...refs })); - } - if (baggage.has('krefToRef')) { - baggage.set('krefToRef', harden({ ...krefToRef })); - } else { - baggage.init('krefToRef', harden({ ...krefToRef })); - } + baggagePut('refs', harden({ ...refs })); + baggagePut('krefToRef', harden({ ...krefToRef })); } /** * Issue a ref for a kref. If the kref already has a ref, return it. * * @param kref - The kernel reference. + * @param isSelf - If true, marks this kref as the root's own for direct dispatch. * @returns The issued ref. */ - function issueRef(kref: string): string { + function issueRef(kref: string, isSelf?: boolean): string { + if (isSelf) { + selfKref = kref; + baggagePut('selfKref', selfKref); + } const existing = krefToRef[kref]; if (existing) { return existing; } refCounter += 1; - if (baggage.has('refCounter')) { - baggage.set('refCounter', refCounter); - } else { - baggage.init('refCounter', refCounter); - } + baggagePut('refCounter', refCounter); const ref = `d-${refCounter}`; refs[ref] = kref; krefToRef[kref] = ref; @@ -175,65 +199,49 @@ export function buildRootObject( } /** - * Dispatch a request that has no ref (operates on the system console itself). + * Dispatch a method call on the root exo directly, bypassing the kernel. * * @param method - The method name. * @param args - The method arguments. + * @returns The result of the method call. + */ + function dispatchOnSelf(method: string, args: unknown[]): unknown { + // eslint-disable-next-line @typescript-eslint/no-use-before-define + const fn = root[method as keyof typeof root] as + | ((...a: unknown[]) => unknown) + | undefined; + if (typeof fn !== 'function') { + throw new Error(`Unknown method on root: ${method}`); + } + // eslint-disable-next-line @typescript-eslint/no-use-before-define + return fn.call(root, ...args); + } + + /** + * Dispatch a request that has no ref (daemon-tier commands only). + * + * Only basic liveness commands are available without a capability ref. + * Privileged operations require a ref obtained from the `.ocap` file. + * + * @param method - The method name. + * @param _args - The method arguments (unused for daemon-tier commands). * @returns The response payload. */ async function dispatchConsoleMethod( method: string, - args: unknown[], + _args: unknown[], ): Promise { switch (method) { case 'help': return { commands: [ 'help - show available commands', - 'status - kernel status', - 'launch - launch a subcluster', - 'terminate - terminate a subcluster', - 'subclusters - list subclusters', - 'revoke - revoke a ref', - 'listRefs - list all issued refs', + 'status - daemon status', ], }; case 'status': - return E(requireKernelFacet()).getStatus(); - - case 'subclusters': - return E(requireKernelFacet()).getSubclusters(); - - case 'launch': { - const config = args[0] as ClusterConfig; - if (!config) { - throw new Error('launch requires a config argument'); - } - const result = await E(requireKernelFacet()).launchSubcluster(config); - const ref = issueRef(result.rootKref); - return { ref, subclusterId: result.subclusterId }; - } - - case 'terminate': { - const subclusterId = args[0] as string; - if (!subclusterId) { - throw new Error('terminate requires a subclusterId argument'); - } - await E(requireKernelFacet()).terminateSubcluster(subclusterId); - return { ok: true }; - } - - case 'revoke': { - const ref = args[0] as string; - if (!ref) { - throw new Error('revoke requires a ref argument'); - } - return { ok: revokeRef(ref) }; - } - - case 'listRefs': - return { refs: listRefs() }; + return { running: true }; default: throw new Error(`Unknown command: ${method}`); @@ -253,12 +261,46 @@ export function buildRootObject( return dispatchConsoleMethod(method, args); } - // Ref-based dispatch: resolve ref → kref, then queue message + // Ref-based dispatch: resolve ref → kref const kref = lookupKref(ref); if (!kref) { throw new Error(`Unknown ref: ${ref}`); } - return E(requireKernelFacet()).queueMessage(kref, method, args); + + // Self-ref: dispatch directly to avoid kernel round-trip + if (kref === selfKref) { + return dispatchOnSelf(method, args); + } + + // External ref: dispatch through the kernel's message queue + return E(requireKernelFacet()).invokeMethod(kref, method, args); + } + + /** + * Validate and coerce a parsed JSON value into a {@link Request}. + * + * @param parsed - The raw parsed JSON value. + * @returns The validated request. + */ + function validateRequest(parsed: unknown): Request { + if (typeof parsed !== 'object' || parsed === null) { + throw new Error('Request must be a JSON object'); + } + const obj = parsed as Record; + if (typeof obj.method !== 'string') { + throw new Error('Request must have a string "method" field'); + } + if (obj.ref !== undefined && typeof obj.ref !== 'string') { + throw new Error('"ref" must be a string'); + } + if (obj.args !== undefined && !Array.isArray(obj.args)) { + throw new Error('"args" must be an array'); + } + return { + method: obj.method, + ...(typeof obj.ref === 'string' ? { ref: obj.ref } : {}), + ...(Array.isArray(obj.args) ? { args: obj.args as unknown[] } : {}), + }; } /** @@ -276,7 +318,7 @@ export function buildRootObject( let response: unknown; try { - const request = JSON.parse(line) as Request; + const request = validateRequest(JSON.parse(line)); const result = await handleRequest(request); response = { ok: true, result }; } catch (error) { @@ -305,7 +347,7 @@ export function buildRootObject( } } - return makeDefaultExo('root', { + const root = makeDefaultExo('root', { /** * Bootstrap the vat. * @@ -318,7 +360,7 @@ export function buildRootObject( ): Promise { if (!kernelFacet && services.kernelFacet) { kernelFacet = services.kernelFacet; - baggage.init('kernelFacet', kernelFacet); + baggagePut('kernelFacet', kernelFacet); } if (services.console) { @@ -329,7 +371,18 @@ export function buildRootObject( }, /** - * Get help information. + * Issue a ref for a kref. Exposed for the daemon to get the initial console ref. + * + * @param kref - The kernel reference. + * @param isSelf - If true, marks this kref as the root's own for direct dispatch. + * @returns The issued ref. + */ + issueRef(kref: string, isSelf?: boolean): string { + return issueRef(kref, isSelf); + }, + + /** + * Get help information (privileged — lists all available commands). * * @returns The help object. */ @@ -338,32 +391,118 @@ export function buildRootObject( commands: [ 'help - show available commands', 'status - kernel status', + 'subclusters - list subclusters', 'launch - launch a subcluster', 'terminate - terminate a subcluster', - 'subclusters - list subclusters', + 'ls - list all issued refs', 'revoke - revoke a ref', - 'listRefs - list all issued refs', + 'invoke [...args] - call a method on a ref', ], }); }, /** - * Issue a ref for a kref. Exposed for the daemon to get the initial console ref. + * Get kernel status. * - * @param kref - The kernel reference. - * @returns The issued ref. + * @returns The kernel status. */ - issueRef(kref: string): string { - return issueRef(kref); + async status(): Promise { + return E(requireKernelFacet()).getStatus(); + }, + + /** + * List subclusters. + * + * @returns The subclusters. + */ + async subclusters(): Promise { + return E(requireKernelFacet()).getSubclusters(); + }, + + /** + * Launch a subcluster and issue a ref for its root object. + * + * @param config - The cluster config. + * @returns The issued ref and subcluster ID. + */ + async launch( + config: ClusterConfig, + ): Promise<{ ref: string; subclusterId: string }> { + if (!config) { + throw new Error('launch requires a config argument'); + } + const result = await E(requireKernelFacet()).launchSubcluster(config); + const ref = issueRef(result.rootKref); + return harden({ ref, subclusterId: result.subclusterId }); + }, + + /** + * Terminate a subcluster. + * + * @param subclusterId - The subcluster ID. + * @returns Confirmation. + */ + async terminate(subclusterId: string): Promise<{ ok: true }> { + if (!subclusterId) { + throw new Error('terminate requires a subclusterId argument'); + } + await E(requireKernelFacet()).terminateSubcluster(subclusterId); + return harden({ ok: true as const }); + }, + + /** + * Revoke a ref. + * + * @param ref - The ref to revoke. + * @returns Whether the ref was found and revoked. + */ + revoke(ref: string): { ok: boolean } { + if (!ref) { + throw new Error('revoke requires a ref argument'); + } + return harden({ ok: revokeRef(ref) }); }, /** * List all issued refs. * - * @returns Array of ref/kref pairs. + * @returns Array of ref strings. + */ + ls(): { refs: string[] } { + return harden({ refs: listRefs().map((entry) => entry.ref) }); + }, + + /** + * Invoke a method on a target ref, forwarding pure-data arguments + * through the kernel. + * + * @param targetRef - The ref to invoke the method on. + * @param method - The method name. + * @param args - The method arguments. + * @returns The result of the method call. */ - listRefs(): { ref: string; kref: string }[] { - return listRefs(); + async invoke( + targetRef: string, + method: string, + ...args: unknown[] + ): Promise { + if (!targetRef) { + throw new Error('invoke requires a target ref'); + } + if (!method) { + throw new Error('invoke requires a method name'); + } + const kref = lookupKref(targetRef); + if (!kref) { + throw new Error(`Unknown ref: ${targetRef}`); + } + // Self-ref: dispatch directly to avoid kernel round-trip + if (kref === selfKref) { + return dispatchOnSelf(method, args); + } + return E(requireKernelFacet()).invokeMethod(kref, method, args); }, }); + + return root; }