diff --git a/packages/codev/src/agent-farm/__tests__/tower-command.test.ts b/packages/codev/src/agent-farm/__tests__/tower-command.test.ts new file mode 100644 index 000000000..be9ed7285 --- /dev/null +++ b/packages/codev/src/agent-farm/__tests__/tower-command.test.ts @@ -0,0 +1,76 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +const mockKillScopedShellpers = vi.fn(); + +vi.mock('../../terminal/session-manager.js', () => ({ + SessionManager: vi.fn(function SessionManagerMock() { + return { killScopedShellpers: mockKillScopedShellpers }; + }), +})); + +vi.mock('../utils/config.js', () => ({ + getConfig: vi.fn(() => ({ serversDir: '/tmp/codev-test-servers' })), +})); + +vi.mock('../utils/logger.js', () => ({ + logger: { + header: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + success: vi.fn(), + kv: vi.fn(), + blank: vi.fn(), + }, + fatal: vi.fn((message: string) => { + throw new Error(message); + }), +})); + +vi.mock('../lib/tower-client.js', () => ({ + DEFAULT_TOWER_PORT: 4100, + AGENT_FARM_DIR: '/tmp/codev-test-agent-farm', +})); + +vi.mock('node:child_process', async () => { + const actual = await vi.importActual('node:child_process'); + return { + ...actual, + spawn: vi.fn(), + execSync: vi.fn(() => { + throw new Error('no process on port'); + }), + }; +}); + +describe('tower command lifecycle options', () => { + beforeEach(() => { + mockKillScopedShellpers.mockReset(); + }); + + it('waits for tower start readiness by default', async () => { + const { shouldWaitForTowerStart } = await import('../commands/tower.js'); + + expect(shouldWaitForTowerStart()).toBe(true); + expect(shouldWaitForTowerStart({ wait: undefined })).toBe(true); + expect(shouldWaitForTowerStart({ wait: false })).toBe(false); + }); + + it('cleans scoped shellpers on explicit stop by default when tower is already stopped', async () => { + mockKillScopedShellpers.mockResolvedValue(2); + const { towerStop } = await import('../commands/tower.js'); + + await towerStop({ port: 49_123 }); + + expect(mockKillScopedShellpers).toHaveBeenCalledTimes(1); + }); + + it('preserves scoped shellpers when requested', async () => { + mockKillScopedShellpers.mockResolvedValue(2); + const { towerStop } = await import('../commands/tower.js'); + + await towerStop({ port: 49_123, preserveShellpers: true }); + + expect(mockKillScopedShellpers).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/codev/src/agent-farm/cli.ts b/packages/codev/src/agent-farm/cli.ts index 220c4bbcc..bede08847 100644 --- a/packages/codev/src/agent-farm/cli.ts +++ b/packages/codev/src/agent-farm/cli.ts @@ -637,9 +637,9 @@ export async function runAgentFarm(args: string[]): Promise { towerCmd .command('start') - .description('Start the tower dashboard (daemonizes by default)') + .description('Start the tower dashboard and wait for readiness by default') .option('-p, --port ', 'Port to run on (default: 4100)') - .option('--wait', 'Wait for server to start before returning') + .option('--no-wait', 'Daemonize without waiting for readiness') .action(async (options) => { try { await towerStart({ @@ -656,12 +656,14 @@ export async function runAgentFarm(args: string[]): Promise { .command('stop') .description('Stop the tower dashboard') .option('-p, --port ', 'Port to stop (default: 4100)') + .option('--preserve-shellpers', 'Stop Tower but leave scoped shellper processes running') .option('--force-kill-all-child-processes', 'SIGKILL tower and every child process (builders, shells, everything)') .action(async (options) => { try { await towerStop({ port: options.port ? parseInt(options.port, 10) : undefined, forceKillAllChildProcesses: options.forceKillAllChildProcesses, + preserveShellpers: options.preserveShellpers, }); } catch (error) { logger.error(error instanceof Error ? error.message : String(error)); diff --git a/packages/codev/src/agent-farm/commands/tower.ts b/packages/codev/src/agent-farm/commands/tower.ts index 0ae5e46d8..29c3db3bd 100644 --- a/packages/codev/src/agent-farm/commands/tower.ts +++ b/packages/codev/src/agent-farm/commands/tower.ts @@ -2,6 +2,7 @@ * Tower command - launches the tower dashboard showing all instances */ +import { homedir } from 'node:os'; import { resolve } from 'node:path'; import { existsSync, mkdirSync, appendFileSync } from 'node:fs'; import http from 'node:http'; @@ -11,6 +12,7 @@ import { getConfig } from '../utils/config.js'; import { execSync } from 'node:child_process'; import { DEFAULT_TOWER_PORT, AGENT_FARM_DIR } from '../lib/tower-client.js'; import { isPortAvailable } from '../utils/shell.js'; +import { SessionManager } from '../../terminal/session-manager.js'; // Log file location const LOG_FILE = resolve(AGENT_FARM_DIR, 'tower.log'); @@ -21,12 +23,17 @@ const STARTUP_CHECK_INTERVAL_MS = 200; export interface TowerStartOptions { port?: number; - wait?: boolean; // Wait for server to start before returning + wait?: boolean; // Defaults to true. Set false for fire-and-forget startup. } export interface TowerStopOptions { port?: number; forceKillAllChildProcesses?: boolean; + preserveShellpers?: boolean; +} + +export function shouldWaitForTowerStart(options: TowerStartOptions = {}): boolean { + return options.wait ?? true; } /** @@ -112,6 +119,7 @@ function getProcessesOnPort(port: number): number[] { */ export async function towerStart(options: TowerStartOptions = {}): Promise { const port = options.port || DEFAULT_TOWER_PORT; + const wait = shouldWaitForTowerStart(options); // Check if already running and responding if (await isServerResponding(port)) { @@ -185,7 +193,7 @@ export async function towerStart(options: TowerStartOptions = {}): Promise const dashboardUrl = `http://localhost:${port}`; - if (options.wait) { + if (wait) { // Wait for server to actually start responding logger.info('Waiting for server to start...'); const started = await waitForServer(port); @@ -210,12 +218,28 @@ export async function towerStart(options: TowerStartOptions = {}): Promise } } +function getShellperSocketDir(): string { + return process.env.SHELLPER_SOCKET_DIR || resolve(homedir(), '.codev', 'run'); +} + +async function cleanupScopedShellpers(): Promise { + const config = getConfig(); + const manager = new SessionManager({ + socketDir: getShellperSocketDir(), + shellperScript: resolve(config.serversDir, '../terminal/shellper-main.js'), + nodeExecutable: process.execPath, + logger: (message) => logToFile(message), + }); + return manager.killScopedShellpers(); +} + /** * Stop the tower dashboard */ export async function towerStop(options: TowerStopOptions = {}): Promise { const port = options.port || DEFAULT_TOWER_PORT; const forceKill = options.forceKillAllChildProcesses || false; + const preserveShellpers = options.preserveShellpers || false; logger.header(forceKill ? 'Force-Killing Tower and All Child Processes' : 'Stopping Tower'); @@ -223,6 +247,12 @@ export async function towerStop(options: TowerStopOptions = {}): Promise { if (pids.length === 0) { logger.info('Tower is not running'); + if (!preserveShellpers) { + const killed = await cleanupScopedShellpers(); + if (killed > 0) { + logger.success(`Cleaned up ${killed} scoped shellper process${killed > 1 ? 'es' : ''}`); + } + } return; } @@ -289,6 +319,18 @@ export async function towerStop(options: TowerStopOptions = {}): Promise { if (stopped > 0) { logger.success(`Tower stopped (${stopped} process${stopped > 1 ? 'es' : ''}: PIDs ${pids.join(', ')})`); } + + if (preserveShellpers) { + logger.info('Preserving shellper processes'); + return; + } + + const killed = await cleanupScopedShellpers(); + if (killed > 0) { + logger.success(`Cleaned up ${killed} scoped shellper process${killed > 1 ? 'es' : ''}`); + } else { + logger.info('No scoped shellper processes found'); + } } export interface TowerLogOptions { diff --git a/packages/codev/src/terminal/__tests__/session-manager.test.ts b/packages/codev/src/terminal/__tests__/session-manager.test.ts index 3e55de338..65a5eb66c 100644 --- a/packages/codev/src/terminal/__tests__/session-manager.test.ts +++ b/packages/codev/src/terminal/__tests__/session-manager.test.ts @@ -559,6 +559,65 @@ describe('SessionManager', () => { } }); + it('kills scoped shellpers without probing responsive sockets', async () => { + const liveSocketPath = path.join(socketDir, 'shellper-live-cleanup.sock'); + + const logs: string[] = []; + const manager = new SessionManager({ + socketDir, + shellperScript: '/nonexistent/shellper.js', + nodeExecutable: process.execPath, + logger: (msg) => logs.push(msg), + }); + + vi.spyOn(manager as any, 'findShellperProcesses').mockResolvedValue([ + { pid: 7777, socketPath: liveSocketPath }, + ]); + vi.spyOn(manager as any, 'waitForProcessExit').mockResolvedValue(true); + const probeSpy = vi.spyOn(manager as any, 'probeSocket'); + + const killed: Array<{ pid: number; signal: string }> = []; + const originalKill = process.kill; + process.kill = ((pid: number, signal?: string | number) => { + killed.push({ pid, signal: String(signal || 'SIGTERM') }); + return true; + }) as typeof process.kill; + + try { + const count = await manager.killScopedShellpers(); + expect(count).toBe(1); + expect(probeSpy).not.toHaveBeenCalled(); + expect(killed).toContainEqual({ pid: -7777, signal: 'SIGTERM' }); + expect(logs.some(m => m.includes('Killing scoped shellper process: pid=7777'))).toBe(true); + } finally { + process.kill = originalKill; + } + }); + + it('killScopedShellpers does not kill shellpers from another socket dir', async () => { + const uniqueDir = `/tmp/codev-scoped-cleanup-test-${Date.now()}-${Math.random().toString(36)}`; + const manager = new SessionManager({ + socketDir: uniqueDir, + shellperScript: '/nonexistent/shellper.js', + nodeExecutable: process.execPath, + }); + + const killed: number[] = []; + const originalKill = process.kill; + process.kill = ((pid: number) => { + killed.push(pid); + return true; + }) as typeof process.kill; + + try { + const count = await manager.killScopedShellpers(); + expect(count).toBe(0); + expect(killed).toEqual([]); + } finally { + process.kill = originalKill; + } + }); + it('returns 0 when no orphans found', async () => { const manager = new SessionManager({ socketDir, @@ -795,6 +854,30 @@ describe('SessionManager', () => { try { process.kill(pid, 0); alive = true; } catch { /* ESRCH = dead, good */ } expect(alive).toBe(false); }, 20000); + + it('surfaces shellper stderr when startup info is missing', async () => { + const failScript = path.join(socketDir, 'fail-before-info.js'); + fs.writeFileSync(failScript, [ + `process.stderr.write('node-pty failed: posix_spawnp failed\\n');`, + `process.exit(1);`, + ].join('\n')); + + const manager = new SessionManager({ + socketDir, + shellperScript: failScript, + nodeExecutable: process.execPath, + }); + + await expect(manager.createSession({ + sessionId: 'stderr-startup-failure', + command: '/bin/echo', + args: [], + cwd: '/tmp', + env: { PATH: process.env.PATH || '/usr/bin:/bin', SECRET_VALUE: 'do-not-log' }, + cols: 80, + rows: 24, + })).rejects.toThrow(/(Shellper exited with code 1 before writing info|Invalid shellper info JSON)[\s\S]*posix_spawnp failed/); + }, 15000); }); describe('killSession', () => { diff --git a/packages/codev/src/terminal/session-manager.ts b/packages/codev/src/terminal/session-manager.ts index 81724d7dd..e5625195a 100644 --- a/packages/codev/src/terminal/session-manager.ts +++ b/packages/codev/src/terminal/session-manager.ts @@ -182,7 +182,7 @@ export class SessionManager extends EventEmitter { // Read PID + startTime from stdout let info: { pid: number; startTime: number }; try { - info = await this.readShellperInfo(child); + info = await this.readShellperInfo(child, stderrLogPath); } catch (err) { this.log(`Session ${opts.sessionId} creation failed: ${(err as Error).message}`); // Kill orphaned child process using handle (not PID — may not be available yet) @@ -601,6 +601,40 @@ export class SessionManager extends EventEmitter { return killed; } + /** + * Kill all shellper-main.js processes scoped to this manager's socketDir. + * + * This is intentionally stronger than killOrphanedShellpers(): it is for an + * explicit operator stop/cleanup path, so responsive shellpers are reclaimed + * too. Startup reconciliation should keep using killOrphanedShellpers(). + */ + async killScopedShellpers(): Promise { + let killed = 0; + try { + const entries = await this.findShellperProcesses(); + for (const { pid, socketPath } of entries) { + if (pid === process.pid) continue; + + this.log(`Killing scoped shellper process: pid=${pid}${socketPath ? `, socket=${socketPath}` : ''}`); + const terminated = await this.terminateShellperProcess(pid); + if (terminated) { + killed++; + } + + if (terminated && socketPath) { + this.unlinkSocketIfExists(socketPath); + } + } + } catch { + return 0; + } + + if (killed > 0) { + this.log(`Killed ${killed} scoped shellper process(es)`); + } + return killed; + } + /** * Find shellper-main.js processes belonging to THIS Tower instance. * @@ -695,11 +729,19 @@ export class SessionManager extends EventEmitter { private readShellperInfo( child: ReturnType, + stderrLogPath: string, ): Promise<{ pid: number; startTime: number }> { return new Promise((resolve, reject) => { let data = ''; + let settled = false; + const fail = (message: string) => { + if (settled) return; + settled = true; + clearTimeout(timeout); + reject(new Error(this.withShellperDiagnostics(message, data, stderrLogPath))); + }; const timeout = setTimeout(() => { - reject(new Error('Timed out reading shellper info from stdout')); + fail('Timed out reading shellper info from stdout'); }, 10_000); child.stdout!.on('data', (chunk: Buffer) => { @@ -707,29 +749,60 @@ export class SessionManager extends EventEmitter { }); child.stdout!.on('end', () => { - clearTimeout(timeout); + if (settled) return; try { const info = JSON.parse(data) as { pid: number; startTime: number }; + settled = true; + clearTimeout(timeout); resolve(info); } catch { - reject(new Error(`Invalid shellper info JSON: ${data}`)); + fail('Invalid shellper info JSON'); } }); child.on('error', (err) => { - clearTimeout(timeout); - reject(err); + fail(err.message); }); child.on('exit', (code) => { if (code !== null && code !== 0 && data === '') { - clearTimeout(timeout); - reject(new Error(`Shellper exited with code ${code} before writing info`)); + fail(`Shellper exited with code ${code} before writing info`); } }); }); } + private withShellperDiagnostics(message: string, stdout: string, stderrLogPath: string): string { + const details: string[] = [message]; + const stdoutSnippet = this.safeDiagnosticSnippet(stdout); + if (stdoutSnippet) { + details.push(`stdout: ${stdoutSnippet}`); + } + + const stderrTail = this.readTextTail(stderrLogPath, 4000); + if (stderrTail) { + details.push(`stderr log (${stderrLogPath}): ${stderrTail}`); + } + + return details.join('\n'); + } + + private safeDiagnosticSnippet(value: string): string { + const trimmed = value.trim(); + if (!trimmed) return ''; + return trimmed.replace(/"env"\s*:\s*\{[^}]*\}/g, '"env":"[redacted]"').slice(0, 1000); + } + + private readTextTail(filePath: string, maxChars: number): string { + try { + const content = fs.readFileSync(filePath, 'utf-8').trim(); + if (!content) return ''; + return content.length > maxChars ? content.slice(-maxChars) : content; + } catch { + return ''; + } + } + private waitForSocket(socketPath: string, timeout = 5000): Promise { return new Promise((resolve, reject) => { const start = Date.now(); @@ -776,6 +849,31 @@ export class SessionManager extends EventEmitter { }); } + private async terminateShellperProcess(pid: number): Promise { + let signaled = false; + try { + process.kill(-pid, 'SIGTERM'); + signaled = true; + } catch { + try { + process.kill(pid, 'SIGTERM'); + signaled = true; + } catch { + return false; + } + } + + const died = await this.waitForProcessExit(pid, 5000); + if (!died) { + try { + process.kill(-pid, 'SIGKILL'); + } catch { + try { process.kill(pid, 'SIGKILL'); } catch { /* already dead */ } + } + } + return signaled; + } + private setupAutoRestart(session: ManagedSession, sessionId: string): void { session.client.on('exit', () => { // Check if session was removed (killed intentionally)