diff --git a/.changeset/stdio-max-message-bytes.md b/.changeset/stdio-max-message-bytes.md new file mode 100644 index 0000000000..073547251b --- /dev/null +++ b/.changeset/stdio-max-message-bytes.md @@ -0,0 +1,10 @@ +--- +'@modelcontextprotocol/client': minor +'@modelcontextprotocol/server': minor +--- + +Add `maxMessageBytes` option to the stdio transports and make the stdio read buffer amortized O(1) per byte + +A stdio peer that writes a very large amount of data without a newline (accidental binary output, runaway log line, or a malicious server) previously grew the receiving process's memory without bound, and each incoming chunk re-copied the entire buffered backlog (`Buffer.concat` per chunk). There was no public way to bound or replace the read buffer, so integrators who had built flood protection on v1 transport internals had nothing to migrate to. + +`StdioClientTransport` and `StdioServerTransport` now accept an optional `maxMessageBytes`. When a single message exceeds it, the data is dropped, an `SdkError` with the new code `SdkErrorCode.MessageTooLarge` is reported via `onerror`, and the transport recovers at the next newline boundary. The default remains unlimited. The read buffer also now grows geometrically with read/scan offsets instead of concatenating on every chunk. diff --git a/docs/migration-SKILL.md b/docs/migration-SKILL.md index b849da8b3d..ab46b0fcfd 100644 --- a/docs/migration-SKILL.md +++ b/docs/migration-SKILL.md @@ -216,7 +216,7 @@ if (error instanceof OAuthError && error.code === OAuthErrorCode.InvalidClient) ``` **Unchanged APIs** (only import paths changed): `Client` constructor and most methods, `McpServer` constructor, `server.connect()`, `server.close()`, all client transports (`StreamableHTTPClientTransport`, `SSEClientTransport`, `StdioClientTransport`), `StdioServerTransport`, all -Zod schemas, all callback return types. Note: `callTool()` and `request()` signatures changed (schema parameter removed, see section 11). +Zod schemas, all callback return types. Note: `callTool()` and `request()` signatures changed (schema parameter removed, see section 11). The stdio transports additionally accept a new optional `maxMessageBytes` option in v2 (see the stdio transport notes). ## 6. McpServer API Changes @@ -507,6 +507,8 @@ NOT removed (wire surface, kept for 2025-11-25 interop): task Zod schemas + infe `Client.listPrompts()`, `listResources()`, `listResourceTemplates()`, `listTools()` now return empty results when the server lacks the corresponding capability (instead of sending the request). Set `enforceStrictCapabilities: true` in `ClientOptions` to throw an error instead. +Stdio transports: non-JSON lines on the stream are now skipped silently (v1 surfaced a `SyntaxError` via `onerror`); valid-JSON messages failing schema validation still reach `onerror`. Both stdio transports accept an optional `maxMessageBytes` — oversized messages are dropped and reported via `onerror` as `SdkError` with code `SdkErrorCode.MessageTooLarge` (default: unlimited). Use it to replace any v1-era flood protection built on transport internals. + ## 14. Runtime-Specific JSON Schema Validators (Enhancement) The SDK now auto-selects the appropriate JSON Schema validator based on runtime: diff --git a/docs/migration.md b/docs/migration.md index bf88cf2b76..8e5a1284e1 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -141,6 +141,27 @@ import { StreamableHTTPClientTransport } from '@modelcontextprotocol/client'; const transport = new StreamableHTTPClientTransport(new URL('http://localhost:3000/mcp')); ``` +### Stdio transports: non-JSON lines are now skipped silently + +In v1, a non-JSON line on the stdio stream (for example, debug output from a hot-reload +tool writing to stdout) surfaced as a `SyntaxError` through the transport's `onerror` +callback. In v2, the stdio read buffer silently skips lines that are not valid JSON and +continues with the next line; only valid-JSON messages that fail schema validation still +reach `onerror`. If you relied on `onerror` to detect a misbehaving server that writes +noise to stdout, that signal no longer fires for non-JSON lines. + +Relatedly, both stdio transports now accept an optional `maxMessageBytes` setting that +bounds how large a single message may grow before it is dropped and reported via +`onerror` (`SdkError` with code `SdkErrorCode.MessageTooLarge`). v1 had no built-in +protection against a peer flooding the stream with unbounded data on a single line; if +you implemented such protection against v1 transport internals, migrate to this option. + +```typescript +import { StdioClientTransport } from '@modelcontextprotocol/client/stdio'; + +const transport = new StdioClientTransport({ command: 'my-server' }, { maxMessageBytes: 4 * 1024 * 1024 }); +``` + ### Server auth split Resource Server helpers (`requireBearerAuth`, `mcpAuthMetadataRouter`, `getOAuthProtectedResourceMetadataUrl`, `OAuthTokenVerifier`) are first-class in `@modelcontextprotocol/express`. @@ -984,8 +1005,8 @@ The following APIs are unchanged between v1 and v2 (only the import paths change - `Client` constructor and most client methods (`connect`, `listTools`, `listPrompts`, `listResources`, `readResource`, etc.) — note: `callTool()` signature changed (schema parameter removed) - `McpServer` constructor, `server.connect(transport)`, `server.close()` - `Server` (low-level) constructor and all methods -- `StreamableHTTPClientTransport`, `SSEClientTransport`, `StdioClientTransport` constructors and options -- `StdioServerTransport` constructor and options +- `StreamableHTTPClientTransport` and `SSEClientTransport` constructors and options +- `StdioClientTransport` and `StdioServerTransport` constructors — note: both accept a new optional `maxMessageBytes` option in v2 (see the stdio transport notes above) - All Zod schemas and type definitions from `types.ts` (except the aliases listed above) - Tool, prompt, and resource callback return types diff --git a/packages/client/src/client/stdio.ts b/packages/client/src/client/stdio.ts index 5dcb8ef9a6..e8b98d74c0 100644 --- a/packages/client/src/client/stdio.ts +++ b/packages/client/src/client/stdio.ts @@ -7,6 +7,22 @@ import type { JSONRPCMessage, Transport } from '@modelcontextprotocol/core'; import { ReadBuffer, SdkError, SdkErrorCode, serializeMessage } from '@modelcontextprotocol/core'; import spawn from 'cross-spawn'; +export type StdioClientTransportOptions = { + /** + * Maximum size, in bytes, that a single inbound message may occupy. + * + * Protects against a misbehaving server flooding the client with an unbounded + * amount of data on a single line (e.g. accidental binary or log output on + * stdout), which would otherwise grow client memory without limit. When a + * message exceeds this size it is dropped, an {@linkcode SdkError} with code + * `SdkErrorCode.MessageTooLarge` is reported via `onerror`, and the transport + * recovers at the next newline boundary. + * + * Defaults to undefined (no limit), matching previous behavior. + */ + maxMessageBytes?: number; +}; + export type StdioServerParameters = { /** * The executable to run to start the server. @@ -92,7 +108,7 @@ export function getDefaultEnvironment(): Record { */ export class StdioClientTransport implements Transport { private _process?: ChildProcess; - private _readBuffer: ReadBuffer = new ReadBuffer(); + private _readBuffer: ReadBuffer; private _serverParams: StdioServerParameters; private _stderrStream: PassThrough | null = null; @@ -100,8 +116,9 @@ export class StdioClientTransport implements Transport { onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; - constructor(server: StdioServerParameters) { + constructor(server: StdioServerParameters, options?: StdioClientTransportOptions) { this._serverParams = server; + this._readBuffer = new ReadBuffer({ maxMessageBytes: options?.maxMessageBytes }); if (server.stderr === 'pipe' || server.stderr === 'overlapped') { this._stderrStream = new PassThrough(); } diff --git a/packages/client/src/stdio.ts b/packages/client/src/stdio.ts index a6ecd1697e..12cf120646 100644 --- a/packages/client/src/stdio.ts +++ b/packages/client/src/stdio.ts @@ -4,5 +4,5 @@ // Cloudflare Workers targets does not pull in `node:child_process`, `node:stream`, or `cross-spawn`. Import // from `@modelcontextprotocol/client/stdio` only in process-spawning runtimes (Node.js, Bun, Deno). -export type { StdioServerParameters } from './client/stdio.js'; +export type { StdioClientTransportOptions, StdioServerParameters } from './client/stdio.js'; export { DEFAULT_INHERITED_ENV_VARS, getDefaultEnvironment, StdioClientTransport } from './client/stdio.js'; diff --git a/packages/client/test/client/stdio.test.ts b/packages/client/test/client/stdio.test.ts index 28a7834bcb..21e603b152 100644 --- a/packages/client/test/client/stdio.test.ts +++ b/packages/client/test/client/stdio.test.ts @@ -1,4 +1,5 @@ import type { JSONRPCMessage } from '@modelcontextprotocol/core'; +import { SdkError, SdkErrorCode } from '@modelcontextprotocol/core'; import type { StdioServerParameters } from '../../src/client/stdio.js'; import { StdioClientTransport } from '../../src/client/stdio.js'; @@ -77,3 +78,84 @@ test('should return child process pid', async () => { await client.close(); expect(client.pid).toBeNull(); }); + +test('should surface MessageTooLarge via onerror and keep running when maxMessageBytes is exceeded', async () => { + // `tee`/`more` echo stdin back on stdout, so an oversized outbound message + // becomes an oversized inbound message. + const client = new StdioClientTransport(serverParameters, { maxMessageBytes: 1024 }); + + const errors: Error[] = []; + const oversizedReported = new Promise(resolve => { + client.onerror = error => { + errors.push(error); + resolve(); + }; + }); + + const messages: JSONRPCMessage[] = []; + const smallMessageEchoed = new Promise(resolve => { + client.onmessage = message => { + messages.push(message); + resolve(); + }; + }); + + await client.start(); + + const oversized: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'oversized', + params: { payload: 'x'.repeat(10_000) } + }; + await client.send(oversized); + await oversizedReported; + + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + + // The transport recovers: a small message still round-trips. + const small: JSONRPCMessage = { jsonrpc: '2.0', method: 'small' }; + await client.send(small); + await smallMessageEchoed; + expect(messages).toEqual([small]); + + await client.close(); +}); + +test('should recover when a child floods stdout without a newline', async () => { + // A misbehaving server that writes a large amount of data with no message + // boundary, then a valid message: the limit must trip while the flood is + // still incomplete, and the transport must recover at the newline. + const childScript = [ + "process.stdout.write('x'.repeat(1_000_000));", + "process.stdout.write('\\n');", + "process.stdout.write(JSON.stringify({ jsonrpc: '2.0', method: 'after-flood' }) + '\\n');", + 'setInterval(() => {}, 1 << 30);' + ].join(' '); + + const client = new StdioClientTransport({ command: process.execPath, args: ['-e', childScript] }, { maxMessageBytes: 65_536 }); + + const errors: Error[] = []; + client.onerror = error => { + errors.push(error); + }; + + const messages: JSONRPCMessage[] = []; + const messageAfterFlood = new Promise(resolve => { + client.onmessage = message => { + messages.push(message); + resolve(); + }; + }); + + await client.start(); + await messageAfterFlood; + + expect(messages).toEqual([{ jsonrpc: '2.0', method: 'after-flood' }]); + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + + await client.close(); +}); diff --git a/packages/core/src/errors/sdkErrors.ts b/packages/core/src/errors/sdkErrors.ts index af432c6389..0ce4eb34f2 100644 --- a/packages/core/src/errors/sdkErrors.ts +++ b/packages/core/src/errors/sdkErrors.ts @@ -26,6 +26,8 @@ export enum SdkErrorCode { ConnectionClosed = 'CONNECTION_CLOSED', /** Failed to send message */ SendFailed = 'SEND_FAILED', + /** A single inbound message exceeded the configured maximum size */ + MessageTooLarge = 'MESSAGE_TOO_LARGE', /** Response result failed local schema validation */ InvalidResult = 'INVALID_RESULT', diff --git a/packages/core/src/exports/public/index.ts b/packages/core/src/exports/public/index.ts index 729144f1a3..e29c0c786b 100644 --- a/packages/core/src/exports/public/index.ts +++ b/packages/core/src/exports/public/index.ts @@ -54,6 +54,7 @@ export { DEFAULT_REQUEST_TIMEOUT_MSEC } from '../../shared/protocol.js'; // stdio message framing utilities (for custom transport authors) export { deserializeMessage, ReadBuffer, serializeMessage } from '../../shared/stdio.js'; +export type { ReadBufferOptions } from '../../shared/stdio.js'; // Transport types (NOT normalizeHeaders) export type { FetchLike, Transport, TransportSendOptions } from '../../shared/transport.js'; diff --git a/packages/core/src/shared/stdio.ts b/packages/core/src/shared/stdio.ts index 7283a5ef96..a31ab69e2f 100644 --- a/packages/core/src/shared/stdio.ts +++ b/packages/core/src/shared/stdio.ts @@ -1,30 +1,115 @@ +import { SdkError, SdkErrorCode } from '../errors/sdkErrors.js'; import type { JSONRPCMessage } from '../types/index.js'; import { JSONRPCMessageSchema } from '../types/index.js'; +/** + * Options for {@linkcode ReadBuffer}. + */ +export interface ReadBufferOptions { + /** + * Maximum size, in bytes, that a single newline-delimited message may occupy. + * + * When set, a message larger than this limit — whether or not its terminating + * newline has arrived yet — is dropped and an {@linkcode SdkError} with code + * {@linkcode SdkErrorCode.MessageTooLarge} is thrown from + * {@linkcode ReadBuffer.readMessage}. The stdio transports surface that error + * through their `onerror` callback and keep running: buffered data belonging to + * the oversized message is discarded until the next newline boundary, after + * which subsequent messages are processed normally. + * + * When undefined (the default), no limit is enforced. + */ + maxMessageBytes?: number; +} + +const INITIAL_CAPACITY = 8192; + +/** + * Capacity above which the internal buffer is shrunk back to {@linkcode INITIAL_CAPACITY} + * once it is fully drained, so that one large message does not pin memory forever. + */ +const SHRINK_CAPACITY_THRESHOLD = 131_072; + +const NEWLINE = 0x0a; + /** * Buffers a continuous stdio stream into discrete JSON-RPC messages. + * + * Internally maintains a single growable buffer with read/scan offsets so that + * appending a chunk and scanning for message boundaries are amortized O(1) per + * byte, independent of how many chunks an incomplete message spans. */ export class ReadBuffer { - private _buffer?: Buffer; + private _buffer: Buffer = Buffer.alloc(INITIAL_CAPACITY); + /** Offset of the first unconsumed byte. */ + private _start = 0; + /** Offset past the last valid byte. */ + private _end = 0; + /** Offset up to which the buffer has already been scanned for a newline. */ + private _scanned = 0; + /** When true, data is dropped until the next newline boundary (oversized-message recovery). */ + private _discarding = false; + private readonly _maxMessageBytes?: number; + + constructor(options?: ReadBufferOptions) { + this._maxMessageBytes = options?.maxMessageBytes; + } append(chunk: Buffer): void { - this._buffer = this._buffer ? Buffer.concat([this._buffer, chunk]) : chunk; + this._ensureCapacity(chunk.length); + chunk.copy(this._buffer, this._end); + this._end += chunk.length; } readMessage(): JSONRPCMessage | null { - while (this._buffer) { - const index = this._buffer.indexOf('\n'); - if (index === -1) { + while (true) { + const newlineIndex = this._findNewline(); + if (newlineIndex === -1) { + if (this._discarding) { + // Still inside an oversized message: drop everything buffered so far. + this._reset(); + return null; + } + + const pending = this._end - this._start; + if (this._maxMessageBytes !== undefined && pending > this._maxMessageBytes) { + this._discarding = true; + this._reset(); + throw new SdkError( + SdkErrorCode.MessageTooLarge, + `Message exceeds maxMessageBytes (${this._maxMessageBytes}): received ${pending} bytes without a message boundary. ` + + `Discarding data until the next newline.` + ); + } + return null; } - const line = this._buffer.toString('utf8', 0, index).replace(/\r$/, ''); - this._buffer = this._buffer.subarray(index + 1); + const lineStart = this._start; + const lineLength = newlineIndex - lineStart; + + if (this._discarding) { + // The tail of an oversized message: drop it and resume normal processing. + this._consume(newlineIndex); + this._discarding = false; + continue; + } + + if (this._maxMessageBytes !== undefined && lineLength > this._maxMessageBytes) { + this._consume(newlineIndex); + throw new SdkError( + SdkErrorCode.MessageTooLarge, + `Message exceeds maxMessageBytes (${this._maxMessageBytes}): a ${lineLength}-byte message was received and dropped.` + ); + } + + const line = this._buffer.toString('utf8', lineStart, newlineIndex).replace(/\r$/, ''); + this._consume(newlineIndex); try { return deserializeMessage(line); } catch (error) { - // Skip non-JSON lines (e.g., debug output from hot-reload tools like + // Skip non-JSON lines (e.g. debug output from hot-reload tools like // tsx or nodemon that write to stdout). Schema validation errors still // throw so malformed-but-valid-JSON messages surface via onerror. if (error instanceof SyntaxError) { @@ -33,11 +118,71 @@ export class ReadBuffer { throw error; } } - return null; } clear(): void { - this._buffer = undefined; + this._discarding = false; + this._reset(); + } + + /** Returns the index of the next newline, scanning only bytes not seen before. */ + private _findNewline(): number { + if (this._scanned >= this._end) { + return -1; + } + + const index = this._buffer.subarray(this._scanned, this._end).indexOf(NEWLINE); + if (index === -1) { + this._scanned = this._end; + return -1; + } + + return this._scanned + index; + } + + /** Consumes all bytes up to and including the newline at `newlineIndex`. */ + private _consume(newlineIndex: number): void { + this._start = newlineIndex + 1; + this._scanned = this._start; + if (this._start === this._end) { + this._reset(); + } + } + + /** Drops all buffered data, shrinking the buffer if a large message inflated it. */ + private _reset(): void { + this._start = 0; + this._end = 0; + this._scanned = 0; + if (this._buffer.length > SHRINK_CAPACITY_THRESHOLD) { + this._buffer = Buffer.alloc(INITIAL_CAPACITY); + } + } + + /** Makes room for `extra` more bytes, compacting in place or growing geometrically. */ + private _ensureCapacity(extra: number): void { + if (this._end + extra <= this._buffer.length) { + return; + } + + const used = this._end - this._start; + if (used + extra <= this._buffer.length) { + // Enough total room: reclaim the consumed prefix in place. + this._buffer.copyWithin(0, this._start, this._end); + } else { + let capacity = this._buffer.length * 2; + while (capacity < used + extra) { + capacity *= 2; + } + + const next = Buffer.alloc(capacity); + this._buffer.copy(next, 0, this._start, this._end); + this._buffer = next; + } + + this._end = used; + this._scanned -= this._start; + this._start = 0; } } diff --git a/packages/core/test/shared/stdio.test.ts b/packages/core/test/shared/stdio.test.ts index 65d1de0eaa..fbf126d8d2 100644 --- a/packages/core/test/shared/stdio.test.ts +++ b/packages/core/test/shared/stdio.test.ts @@ -1,3 +1,4 @@ +import { SdkError, SdkErrorCode } from '../../src/errors/sdkErrors.js'; import { ReadBuffer } from '../../src/shared/stdio.js'; import type { JSONRPCMessage } from '../../src/types/index.js'; @@ -113,3 +114,215 @@ describe('non-JSON line filtering', () => { expect(() => readBuffer.readMessage()).toThrow(); }); }); + +describe('chunked message assembly', () => { + test('should assemble a message split across many small chunks', () => { + const readBuffer = new ReadBuffer(); + const serialized = JSON.stringify(testMessage) + '\n'; + + for (const char of serialized) { + readBuffer.append(Buffer.from(char)); + } + + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should yield multiple messages from a single chunk', () => { + const readBuffer = new ReadBuffer(); + const message1: JSONRPCMessage = { jsonrpc: '2.0', method: 'method1' }; + const message2: JSONRPCMessage = { jsonrpc: '2.0', method: 'method2' }; + const message3: JSONRPCMessage = { jsonrpc: '2.0', method: 'method3' }; + readBuffer.append(Buffer.from([message1, message2, message3].map(m => JSON.stringify(m) + '\n').join(''))); + + expect(readBuffer.readMessage()).toEqual(message1); + expect(readBuffer.readMessage()).toEqual(message2); + expect(readBuffer.readMessage()).toEqual(message3); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should handle a message boundary exactly at a chunk boundary', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\n')); + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\n')); + + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should handle CRLF line endings', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\r\n')); + + expect(readBuffer.readMessage()).toEqual(testMessage); + }); + + test('should handle messages larger than the initial buffer capacity', () => { + const readBuffer = new ReadBuffer(); + const bigMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'big', + params: { payload: 'x'.repeat(100_000) } + }; + + const serialized = Buffer.from(JSON.stringify(bigMessage) + '\n'); + // Append in 1 KiB slices to exercise growth across many appends. + for (let offset = 0; offset < serialized.length; offset += 1024) { + readBuffer.append(serialized.subarray(offset, Math.min(offset + 1024, serialized.length))); + } + + expect(readBuffer.readMessage()).toEqual(bigMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); +}); + +describe('maxMessageBytes', () => { + const expectMessageTooLarge = (fn: () => unknown) => { + try { + fn(); + throw new Error('expected readMessage to throw'); + } catch (error) { + expect(error).toBeInstanceOf(SdkError); + expect((error as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + } + }; + + test('does not limit message size by default', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from('x'.repeat(1_000_000))); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('accepts messages up to the limit', () => { + const message: JSONRPCMessage = { jsonrpc: '2.0', method: 'test' }; + const serialized = JSON.stringify(message); + const readBuffer = new ReadBuffer({ maxMessageBytes: serialized.length }); + readBuffer.append(Buffer.from(serialized + '\n')); + + expect(readBuffer.readMessage()).toEqual(message); + }); + + test('throws once for an incomplete oversized message, then recovers at the next newline', () => { + const readBuffer = new ReadBuffer({ maxMessageBytes: 64 }); + + // Oversized data with no newline: a single error when the limit is crossed... + readBuffer.append(Buffer.from('x'.repeat(100))); + expectMessageTooLarge(() => readBuffer.readMessage()); + + // ...then silence while the rest of the oversized message streams in. + readBuffer.append(Buffer.from('x'.repeat(100))); + expect(readBuffer.readMessage()).toBeNull(); + readBuffer.append(Buffer.from('x'.repeat(100))); + expect(readBuffer.readMessage()).toBeNull(); + + // The tail of the oversized message ends at the newline; the following + // message is processed normally. + readBuffer.append(Buffer.from('xxx\n' + JSON.stringify(testMessage) + '\n')); + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('drops an oversized complete line and continues with the rest of the buffer', () => { + const readBuffer = new ReadBuffer({ maxMessageBytes: 64 }); + readBuffer.append(Buffer.from('y'.repeat(200) + '\n' + JSON.stringify(testMessage) + '\n')); + + expectMessageTooLarge(() => readBuffer.readMessage()); + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('counts incomplete bytes across many appends', () => { + const readBuffer = new ReadBuffer({ maxMessageBytes: 64 }); + for (let i = 0; i < 6; i++) { + readBuffer.append(Buffer.from('z'.repeat(10))); + expect(readBuffer.readMessage()).toBeNull(); + } + + readBuffer.append(Buffer.from('z'.repeat(10))); + expectMessageTooLarge(() => readBuffer.readMessage()); + }); + + test('clear() resets oversized-message recovery state', () => { + const readBuffer = new ReadBuffer({ maxMessageBytes: 64 }); + readBuffer.append(Buffer.from('x'.repeat(100))); + expectMessageTooLarge(() => readBuffer.readMessage()); + + readBuffer.clear(); + + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\n')); + expect(readBuffer.readMessage()).toEqual(testMessage); + }); +}); + +describe('capacity events with a consumed prefix', () => { + // These cases pin the in-place compaction and growth paths of the internal + // buffer when a previous message has already been consumed (start offset > 0) + // while a partial message is still buffered - the common interleaving on a + // busy stream. A copy/offset bug here silently corrupts message bytes. + // Each message uses a distinct payload character so that a copy bug that + // splices bytes from a neighboring message changes content detectably. + const messageWithPayload = (method: string, payloadLength: number): JSONRPCMessage => ({ + jsonrpc: '2.0', + method, + params: { payload: method.charAt(0).repeat(payloadLength) } + }); + + test('compacts in place without corrupting a partially buffered message', () => { + const readBuffer = new ReadBuffer(); + const messageA = messageWithPayload('a', 5900); // ~6000 bytes serialized + const messageB = messageWithPayload('b', 1500); // ~1600 bytes serialized + const messageC = messageWithPayload('c', 1900); // ~2000 bytes serialized + + const serializedB = Buffer.from(JSON.stringify(messageB) + '\n'); + + // First chunk: all of A plus the first 1000 bytes of B (fits the initial + // 8 KiB capacity). Consuming A leaves a consumed prefix ahead of B's bytes. + readBuffer.append(Buffer.concat([Buffer.from(JSON.stringify(messageA) + '\n'), serializedB.subarray(0, 1000)])); + expect(readBuffer.readMessage()).toEqual(messageA); + expect(readBuffer.readMessage()).toBeNull(); + + // Second chunk: the rest of B plus C (~2600 bytes). Total valid bytes fit + // the existing capacity only after reclaiming the consumed prefix, so this + // append must compact in place rather than grow. + readBuffer.append(Buffer.concat([serializedB.subarray(1000), Buffer.from(JSON.stringify(messageC) + '\n')])); + expect(readBuffer.readMessage()).toEqual(messageB); + expect(readBuffer.readMessage()).toEqual(messageC); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('grows without corrupting a partially buffered message', () => { + const readBuffer = new ReadBuffer(); + const messageA = messageWithPayload('a', 3900); // ~4000 bytes serialized + const messageB = messageWithPayload('b', 8800); // ~8900 bytes serialized + + const serializedB = Buffer.from(JSON.stringify(messageB) + '\n'); + + readBuffer.append(Buffer.concat([Buffer.from(JSON.stringify(messageA) + '\n'), serializedB.subarray(0, 1000)])); + expect(readBuffer.readMessage()).toEqual(messageA); + expect(readBuffer.readMessage()).toBeNull(); + + // The remaining ~7900 bytes of B exceed the initial capacity even after + // compaction, forcing the growth path while the start offset is non-zero. + readBuffer.append(serializedB.subarray(1000)); + expect(readBuffer.readMessage()).toEqual(messageB); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('releases an inflated buffer once fully drained', () => { + const readBuffer = new ReadBuffer(); + const bigMessage = messageWithPayload('big', 200_000); + readBuffer.append(Buffer.from(JSON.stringify(bigMessage) + '\n')); + expect(readBuffer.readMessage()).toEqual(bigMessage); + + // The documented memory bound: one large message must not pin a large + // allocation forever. This intentionally inspects the internal buffer - + // the property is not observable through the public API. + const internal = readBuffer as unknown as { _buffer: Buffer }; + expect(internal._buffer.length).toBeLessThanOrEqual(131_072); + + // And the buffer still works after shrinking. + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\n')); + expect(readBuffer.readMessage()).toEqual(testMessage); + }); +}); diff --git a/packages/server/src/server/stdio.ts b/packages/server/src/server/stdio.ts index ac2dd3f784..ba6203cc2e 100644 --- a/packages/server/src/server/stdio.ts +++ b/packages/server/src/server/stdio.ts @@ -16,15 +16,52 @@ import { process } from '@modelcontextprotocol/server/_shims'; * await server.connect(transport); * ``` */ +export type StdioServerTransportOptions = { + /** + * Maximum size, in bytes, that a single inbound message may occupy. + * + * Protects against a misbehaving client flooding the server with an unbounded + * amount of data on a single line, which would otherwise grow server memory + * without limit. When a message exceeds this size it is dropped, an + * {@linkcode SdkError} with code `SdkErrorCode.MessageTooLarge` is reported via + * `onerror`, and the transport recovers at the next newline boundary. + * + * Defaults to undefined (no limit), matching previous behavior. + */ + maxMessageBytes?: number; +}; + +/** + * Distinguishes a stream argument from an options object in the overloaded + * {@linkcode StdioServerTransport} constructor. Every Node.js readable stream + * implements `pipe()`; an options literal does not. + */ +function isReadableStream(value: Readable | StdioServerTransportOptions): value is Readable { + return typeof (value as Readable).pipe === 'function'; +} + export class StdioServerTransport implements Transport { - private _readBuffer: ReadBuffer = new ReadBuffer(); + private _stdin: Readable; + private _stdout: Writable; + private _readBuffer: ReadBuffer; private _started = false; private _closed = false; - constructor( - private _stdin: Readable = process.stdin, - private _stdout: Writable = process.stdout - ) {} + /** Communicate over the current process' `stdin` and `stdout`. */ + constructor(options?: StdioServerTransportOptions); + /** Communicate over the given streams, defaulting to the current process' `stdin` and `stdout`. */ + constructor(stdin?: Readable, stdout?: Writable, options?: StdioServerTransportOptions); + constructor(stdinOrOptions?: Readable | StdioServerTransportOptions, stdout?: Writable, options?: StdioServerTransportOptions) { + let stdin: Readable | undefined; + if (stdinOrOptions !== undefined && isReadableStream(stdinOrOptions)) { + stdin = stdinOrOptions; + } else if (stdinOrOptions !== undefined) { + options = stdinOrOptions; + } + this._stdin = stdin ?? process.stdin; + this._stdout = stdout ?? process.stdout; + this._readBuffer = new ReadBuffer({ maxMessageBytes: options?.maxMessageBytes }); + } onclose?: () => void; onerror?: (error: Error) => void; diff --git a/packages/server/src/stdio.ts b/packages/server/src/stdio.ts index 7865c9cedc..a2c2e05570 100644 --- a/packages/server/src/stdio.ts +++ b/packages/server/src/stdio.ts @@ -5,4 +5,5 @@ // subpath gives consumers a consistent shape across packages. Import from // `@modelcontextprotocol/server/stdio` only in process-stdio runtimes (Node.js, Bun, Deno). +export type { StdioServerTransportOptions } from './server/stdio.js'; export { StdioServerTransport } from './server/stdio.js'; diff --git a/packages/server/test/server/stdio.test.ts b/packages/server/test/server/stdio.test.ts index 92671cacd9..96cb7d70e9 100644 --- a/packages/server/test/server/stdio.test.ts +++ b/packages/server/test/server/stdio.test.ts @@ -1,7 +1,9 @@ import { Readable, Writable } from 'node:stream'; import type { JSONRPCMessage } from '@modelcontextprotocol/core'; -import { ReadBuffer, serializeMessage } from '@modelcontextprotocol/core'; +import { ReadBuffer, SdkError, SdkErrorCode, serializeMessage } from '@modelcontextprotocol/core'; + +import { process as shimProcess } from '@modelcontextprotocol/server/_shims'; import { StdioServerTransport } from '../../src/server/stdio.js'; @@ -179,3 +181,74 @@ test('should fire onerror before onclose on stdout error', async () => { expect(events).toEqual(['error', 'close']); }); + +test('should surface MessageTooLarge via onerror and keep running when maxMessageBytes is exceeded', async () => { + const server = new StdioServerTransport(input, output, { maxMessageBytes: 64 }); + + const errors: Error[] = []; + server.onerror = error => { + errors.push(error); + }; + + const messages: JSONRPCMessage[] = []; + server.onmessage = message => { + messages.push(message); + }; + + await server.start(); + + // An oversized line is dropped and reported exactly once... + input.push('x'.repeat(200) + '\n'); + await new Promise(resolve => setImmediate(resolve)); + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + expect(messages).toHaveLength(0); + + // ...and the transport keeps processing subsequent messages. + const message: JSONRPCMessage = { jsonrpc: '2.0', method: 'small' }; + input.push(serializeMessage(message)); + await new Promise(resolve => setImmediate(resolve)); + expect(messages).toEqual([message]); + expect(errors).toHaveLength(1); + + await server.close(); +}); + +test('options-only constructor uses the process streams and applies maxMessageBytes', async () => { + const server = new StdioServerTransport({ maxMessageBytes: 64 }); + + const errors: Error[] = []; + const messages: JSONRPCMessage[] = []; + server.onerror = error => errors.push(error); + server.onmessage = message => messages.push(message); + + const listenersBefore = shimProcess.stdin.listenerCount('data'); + await server.start(); + expect(shimProcess.stdin.listenerCount('data')).toBe(listenersBefore + 1); + + // Drive the read path directly: an oversized line, then a valid message. + server._ondata(Buffer.from(`${'x'.repeat(200)}\n`)); + const valid: JSONRPCMessage = { jsonrpc: '2.0', method: 'notifications/initialized' }; + server._ondata(Buffer.from(serializeMessage(valid))); + + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + expect(messages).toEqual([valid]); + + await server.close(); + expect(shimProcess.stdin.listenerCount('data')).toBe(listenersBefore); +}); + +test('three-argument form with undefined stream placeholders still applies maxMessageBytes', () => { + const server = new StdioServerTransport(undefined, undefined, { maxMessageBytes: 64 }); + + const errors: Error[] = []; + server.onerror = error => errors.push(error); + server._ondata(Buffer.from(`${'x'.repeat(200)}\n`)); + + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); +});