From 2f3f2914dbbff50de9cbb3e8bd2f0681694c4267 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 2 Jun 2026 07:58:04 +0000 Subject: [PATCH 1/4] Add maxMessageBytes option to stdio transports and make ReadBuffer amortized O(1) per byte A peer that writes a large amount of data without a newline previously grew the receiving process's memory without bound, and every incoming chunk re-copied the entire buffered backlog via Buffer.concat. There was no public way to bound or replace the read buffer. StdioClientTransport and StdioServerTransport now accept an optional maxMessageBytes. Oversized messages are dropped and reported through onerror as an SdkError with the new code SdkErrorCode.MessageTooLarge, and the transport recovers at the next newline boundary. Default is unlimited, so behavior is unchanged unless the option is set. The read buffer now uses a single growable buffer with read/scan offsets: appends are amortized O(1) per byte and newline scanning never revisits bytes, instead of concatenating the full backlog on every chunk. --- .changeset/stdio-max-message-bytes.md | 10 ++ packages/client/src/client/stdio.ts | 21 ++- packages/client/src/stdio.ts | 2 +- packages/client/test/client/stdio.test.ts | 45 ++++++ packages/core/src/errors/sdkErrors.ts | 2 + packages/core/src/shared/stdio.ts | 165 ++++++++++++++++++++-- packages/core/test/shared/stdio.test.ts | 141 ++++++++++++++++++ packages/server/src/server/stdio.ts | 24 +++- packages/server/src/stdio.ts | 1 + packages/server/test/server/stdio.test.ts | 35 ++++- 10 files changed, 429 insertions(+), 17 deletions(-) create mode 100644 .changeset/stdio-max-message-bytes.md diff --git a/.changeset/stdio-max-message-bytes.md b/.changeset/stdio-max-message-bytes.md new file mode 100644 index 0000000000..073547251b --- /dev/null +++ b/.changeset/stdio-max-message-bytes.md @@ -0,0 +1,10 @@ +--- +'@modelcontextprotocol/client': minor +'@modelcontextprotocol/server': minor +--- + +Add `maxMessageBytes` option to the stdio transports and make the stdio read buffer amortized O(1) per byte + +A stdio peer that writes a very large amount of data without a newline (accidental binary output, runaway log line, or a malicious server) previously grew the receiving process's memory without bound, and each incoming chunk re-copied the entire buffered backlog (`Buffer.concat` per chunk). There was no public way to bound or replace the read buffer, so integrators who had built flood protection on v1 transport internals had nothing to migrate to. + +`StdioClientTransport` and `StdioServerTransport` now accept an optional `maxMessageBytes`. When a single message exceeds it, the data is dropped, an `SdkError` with the new code `SdkErrorCode.MessageTooLarge` is reported via `onerror`, and the transport recovers at the next newline boundary. The default remains unlimited. The read buffer also now grows geometrically with read/scan offsets instead of concatenating on every chunk. diff --git a/packages/client/src/client/stdio.ts b/packages/client/src/client/stdio.ts index 5dcb8ef9a6..e8b98d74c0 100644 --- a/packages/client/src/client/stdio.ts +++ b/packages/client/src/client/stdio.ts @@ -7,6 +7,22 @@ import type { JSONRPCMessage, Transport } from '@modelcontextprotocol/core'; import { ReadBuffer, SdkError, SdkErrorCode, serializeMessage } from '@modelcontextprotocol/core'; import spawn from 'cross-spawn'; +export type StdioClientTransportOptions = { + /** + * Maximum size, in bytes, that a single inbound message may occupy. + * + * Protects against a misbehaving server flooding the client with an unbounded + * amount of data on a single line (e.g. accidental binary or log output on + * stdout), which would otherwise grow client memory without limit. When a + * message exceeds this size it is dropped, an {@linkcode SdkError} with code + * `SdkErrorCode.MessageTooLarge` is reported via `onerror`, and the transport + * recovers at the next newline boundary. + * + * Defaults to undefined (no limit), matching previous behavior. + */ + maxMessageBytes?: number; +}; + export type StdioServerParameters = { /** * The executable to run to start the server. @@ -92,7 +108,7 @@ export function getDefaultEnvironment(): Record { */ export class StdioClientTransport implements Transport { private _process?: ChildProcess; - private _readBuffer: ReadBuffer = new ReadBuffer(); + private _readBuffer: ReadBuffer; private _serverParams: StdioServerParameters; private _stderrStream: PassThrough | null = null; @@ -100,8 +116,9 @@ export class StdioClientTransport implements Transport { onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void; - constructor(server: StdioServerParameters) { + constructor(server: StdioServerParameters, options?: StdioClientTransportOptions) { this._serverParams = server; + this._readBuffer = new ReadBuffer({ maxMessageBytes: options?.maxMessageBytes }); if (server.stderr === 'pipe' || server.stderr === 'overlapped') { this._stderrStream = new PassThrough(); } diff --git a/packages/client/src/stdio.ts b/packages/client/src/stdio.ts index a6ecd1697e..12cf120646 100644 --- a/packages/client/src/stdio.ts +++ b/packages/client/src/stdio.ts @@ -4,5 +4,5 @@ // Cloudflare Workers targets does not pull in `node:child_process`, `node:stream`, or `cross-spawn`. Import // from `@modelcontextprotocol/client/stdio` only in process-spawning runtimes (Node.js, Bun, Deno). -export type { StdioServerParameters } from './client/stdio.js'; +export type { StdioClientTransportOptions, StdioServerParameters } from './client/stdio.js'; export { DEFAULT_INHERITED_ENV_VARS, getDefaultEnvironment, StdioClientTransport } from './client/stdio.js'; diff --git a/packages/client/test/client/stdio.test.ts b/packages/client/test/client/stdio.test.ts index 28a7834bcb..604dfefc86 100644 --- a/packages/client/test/client/stdio.test.ts +++ b/packages/client/test/client/stdio.test.ts @@ -1,4 +1,5 @@ import type { JSONRPCMessage } from '@modelcontextprotocol/core'; +import { SdkError, SdkErrorCode } from '@modelcontextprotocol/core'; import type { StdioServerParameters } from '../../src/client/stdio.js'; import { StdioClientTransport } from '../../src/client/stdio.js'; @@ -77,3 +78,47 @@ test('should return child process pid', async () => { await client.close(); expect(client.pid).toBeNull(); }); + +test('should surface MessageTooLarge via onerror and keep running when maxMessageBytes is exceeded', async () => { + // `tee`/`more` echo stdin back on stdout, so an oversized outbound message + // becomes an oversized inbound message. + const client = new StdioClientTransport(serverParameters, { maxMessageBytes: 1024 }); + + const errors: Error[] = []; + const oversizedReported = new Promise(resolve => { + client.onerror = error => { + errors.push(error); + resolve(); + }; + }); + + const messages: JSONRPCMessage[] = []; + const smallMessageEchoed = new Promise(resolve => { + client.onmessage = message => { + messages.push(message); + resolve(); + }; + }); + + await client.start(); + + const oversized: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'oversized', + params: { payload: 'x'.repeat(10_000) } + }; + await client.send(oversized); + await oversizedReported; + + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + + // The transport recovers: a small message still round-trips. + const small: JSONRPCMessage = { jsonrpc: '2.0', method: 'small' }; + await client.send(small); + await smallMessageEchoed; + expect(messages).toEqual([small]); + + await client.close(); +}); diff --git a/packages/core/src/errors/sdkErrors.ts b/packages/core/src/errors/sdkErrors.ts index af432c6389..0ce4eb34f2 100644 --- a/packages/core/src/errors/sdkErrors.ts +++ b/packages/core/src/errors/sdkErrors.ts @@ -26,6 +26,8 @@ export enum SdkErrorCode { ConnectionClosed = 'CONNECTION_CLOSED', /** Failed to send message */ SendFailed = 'SEND_FAILED', + /** A single inbound message exceeded the configured maximum size */ + MessageTooLarge = 'MESSAGE_TOO_LARGE', /** Response result failed local schema validation */ InvalidResult = 'INVALID_RESULT', diff --git a/packages/core/src/shared/stdio.ts b/packages/core/src/shared/stdio.ts index 7283a5ef96..a31ab69e2f 100644 --- a/packages/core/src/shared/stdio.ts +++ b/packages/core/src/shared/stdio.ts @@ -1,30 +1,115 @@ +import { SdkError, SdkErrorCode } from '../errors/sdkErrors.js'; import type { JSONRPCMessage } from '../types/index.js'; import { JSONRPCMessageSchema } from '../types/index.js'; +/** + * Options for {@linkcode ReadBuffer}. + */ +export interface ReadBufferOptions { + /** + * Maximum size, in bytes, that a single newline-delimited message may occupy. + * + * When set, a message larger than this limit — whether or not its terminating + * newline has arrived yet — is dropped and an {@linkcode SdkError} with code + * {@linkcode SdkErrorCode.MessageTooLarge} is thrown from + * {@linkcode ReadBuffer.readMessage}. The stdio transports surface that error + * through their `onerror` callback and keep running: buffered data belonging to + * the oversized message is discarded until the next newline boundary, after + * which subsequent messages are processed normally. + * + * When undefined (the default), no limit is enforced. + */ + maxMessageBytes?: number; +} + +const INITIAL_CAPACITY = 8192; + +/** + * Capacity above which the internal buffer is shrunk back to {@linkcode INITIAL_CAPACITY} + * once it is fully drained, so that one large message does not pin memory forever. + */ +const SHRINK_CAPACITY_THRESHOLD = 131_072; + +const NEWLINE = 0x0a; + /** * Buffers a continuous stdio stream into discrete JSON-RPC messages. + * + * Internally maintains a single growable buffer with read/scan offsets so that + * appending a chunk and scanning for message boundaries are amortized O(1) per + * byte, independent of how many chunks an incomplete message spans. */ export class ReadBuffer { - private _buffer?: Buffer; + private _buffer: Buffer = Buffer.alloc(INITIAL_CAPACITY); + /** Offset of the first unconsumed byte. */ + private _start = 0; + /** Offset past the last valid byte. */ + private _end = 0; + /** Offset up to which the buffer has already been scanned for a newline. */ + private _scanned = 0; + /** When true, data is dropped until the next newline boundary (oversized-message recovery). */ + private _discarding = false; + private readonly _maxMessageBytes?: number; + + constructor(options?: ReadBufferOptions) { + this._maxMessageBytes = options?.maxMessageBytes; + } append(chunk: Buffer): void { - this._buffer = this._buffer ? Buffer.concat([this._buffer, chunk]) : chunk; + this._ensureCapacity(chunk.length); + chunk.copy(this._buffer, this._end); + this._end += chunk.length; } readMessage(): JSONRPCMessage | null { - while (this._buffer) { - const index = this._buffer.indexOf('\n'); - if (index === -1) { + while (true) { + const newlineIndex = this._findNewline(); + if (newlineIndex === -1) { + if (this._discarding) { + // Still inside an oversized message: drop everything buffered so far. + this._reset(); + return null; + } + + const pending = this._end - this._start; + if (this._maxMessageBytes !== undefined && pending > this._maxMessageBytes) { + this._discarding = true; + this._reset(); + throw new SdkError( + SdkErrorCode.MessageTooLarge, + `Message exceeds maxMessageBytes (${this._maxMessageBytes}): received ${pending} bytes without a message boundary. ` + + `Discarding data until the next newline.` + ); + } + return null; } - const line = this._buffer.toString('utf8', 0, index).replace(/\r$/, ''); - this._buffer = this._buffer.subarray(index + 1); + const lineStart = this._start; + const lineLength = newlineIndex - lineStart; + + if (this._discarding) { + // The tail of an oversized message: drop it and resume normal processing. + this._consume(newlineIndex); + this._discarding = false; + continue; + } + + if (this._maxMessageBytes !== undefined && lineLength > this._maxMessageBytes) { + this._consume(newlineIndex); + throw new SdkError( + SdkErrorCode.MessageTooLarge, + `Message exceeds maxMessageBytes (${this._maxMessageBytes}): a ${lineLength}-byte message was received and dropped.` + ); + } + + const line = this._buffer.toString('utf8', lineStart, newlineIndex).replace(/\r$/, ''); + this._consume(newlineIndex); try { return deserializeMessage(line); } catch (error) { - // Skip non-JSON lines (e.g., debug output from hot-reload tools like + // Skip non-JSON lines (e.g. debug output from hot-reload tools like // tsx or nodemon that write to stdout). Schema validation errors still // throw so malformed-but-valid-JSON messages surface via onerror. if (error instanceof SyntaxError) { @@ -33,11 +118,71 @@ export class ReadBuffer { throw error; } } - return null; } clear(): void { - this._buffer = undefined; + this._discarding = false; + this._reset(); + } + + /** Returns the index of the next newline, scanning only bytes not seen before. */ + private _findNewline(): number { + if (this._scanned >= this._end) { + return -1; + } + + const index = this._buffer.subarray(this._scanned, this._end).indexOf(NEWLINE); + if (index === -1) { + this._scanned = this._end; + return -1; + } + + return this._scanned + index; + } + + /** Consumes all bytes up to and including the newline at `newlineIndex`. */ + private _consume(newlineIndex: number): void { + this._start = newlineIndex + 1; + this._scanned = this._start; + if (this._start === this._end) { + this._reset(); + } + } + + /** Drops all buffered data, shrinking the buffer if a large message inflated it. */ + private _reset(): void { + this._start = 0; + this._end = 0; + this._scanned = 0; + if (this._buffer.length > SHRINK_CAPACITY_THRESHOLD) { + this._buffer = Buffer.alloc(INITIAL_CAPACITY); + } + } + + /** Makes room for `extra` more bytes, compacting in place or growing geometrically. */ + private _ensureCapacity(extra: number): void { + if (this._end + extra <= this._buffer.length) { + return; + } + + const used = this._end - this._start; + if (used + extra <= this._buffer.length) { + // Enough total room: reclaim the consumed prefix in place. + this._buffer.copyWithin(0, this._start, this._end); + } else { + let capacity = this._buffer.length * 2; + while (capacity < used + extra) { + capacity *= 2; + } + + const next = Buffer.alloc(capacity); + this._buffer.copy(next, 0, this._start, this._end); + this._buffer = next; + } + + this._end = used; + this._scanned -= this._start; + this._start = 0; } } diff --git a/packages/core/test/shared/stdio.test.ts b/packages/core/test/shared/stdio.test.ts index 65d1de0eaa..36b40eb718 100644 --- a/packages/core/test/shared/stdio.test.ts +++ b/packages/core/test/shared/stdio.test.ts @@ -1,3 +1,4 @@ +import { SdkError, SdkErrorCode } from '../../src/errors/sdkErrors.js'; import { ReadBuffer } from '../../src/shared/stdio.js'; import type { JSONRPCMessage } from '../../src/types/index.js'; @@ -113,3 +114,143 @@ describe('non-JSON line filtering', () => { expect(() => readBuffer.readMessage()).toThrow(); }); }); + +describe('chunked message assembly', () => { + test('should assemble a message split across many small chunks', () => { + const readBuffer = new ReadBuffer(); + const serialized = JSON.stringify(testMessage) + '\n'; + + for (const char of serialized) { + readBuffer.append(Buffer.from(char)); + } + + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should yield multiple messages from a single chunk', () => { + const readBuffer = new ReadBuffer(); + const message1: JSONRPCMessage = { jsonrpc: '2.0', method: 'method1' }; + const message2: JSONRPCMessage = { jsonrpc: '2.0', method: 'method2' }; + const message3: JSONRPCMessage = { jsonrpc: '2.0', method: 'method3' }; + readBuffer.append(Buffer.from([message1, message2, message3].map(m => JSON.stringify(m) + '\n').join(''))); + + expect(readBuffer.readMessage()).toEqual(message1); + expect(readBuffer.readMessage()).toEqual(message2); + expect(readBuffer.readMessage()).toEqual(message3); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should handle a message boundary exactly at a chunk boundary', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\n')); + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\n')); + + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('should handle CRLF line endings', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\r\n')); + + expect(readBuffer.readMessage()).toEqual(testMessage); + }); + + test('should handle messages larger than the initial buffer capacity', () => { + const readBuffer = new ReadBuffer(); + const bigMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'big', + params: { payload: 'x'.repeat(100_000) } + }; + + const serialized = Buffer.from(JSON.stringify(bigMessage) + '\n'); + // Append in 1 KiB slices to exercise growth across many appends. + for (let offset = 0; offset < serialized.length; offset += 1024) { + readBuffer.append(serialized.subarray(offset, Math.min(offset + 1024, serialized.length))); + } + + expect(readBuffer.readMessage()).toEqual(bigMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); +}); + +describe('maxMessageBytes', () => { + const expectMessageTooLarge = (fn: () => unknown) => { + try { + fn(); + throw new Error('expected readMessage to throw'); + } catch (error) { + expect(error).toBeInstanceOf(SdkError); + expect((error as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + } + }; + + test('does not limit message size by default', () => { + const readBuffer = new ReadBuffer(); + readBuffer.append(Buffer.from('x'.repeat(1_000_000))); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('accepts messages up to the limit', () => { + const message: JSONRPCMessage = { jsonrpc: '2.0', method: 'test' }; + const serialized = JSON.stringify(message); + const readBuffer = new ReadBuffer({ maxMessageBytes: serialized.length }); + readBuffer.append(Buffer.from(serialized + '\n')); + + expect(readBuffer.readMessage()).toEqual(message); + }); + + test('throws once for an incomplete oversized message, then recovers at the next newline', () => { + const readBuffer = new ReadBuffer({ maxMessageBytes: 64 }); + + // Oversized data with no newline: a single error when the limit is crossed... + readBuffer.append(Buffer.from('x'.repeat(100))); + expectMessageTooLarge(() => readBuffer.readMessage()); + + // ...then silence while the rest of the oversized message streams in. + readBuffer.append(Buffer.from('x'.repeat(100))); + expect(readBuffer.readMessage()).toBeNull(); + readBuffer.append(Buffer.from('x'.repeat(100))); + expect(readBuffer.readMessage()).toBeNull(); + + // The tail of the oversized message ends at the newline; the following + // message is processed normally. + readBuffer.append(Buffer.from('xxx\n' + JSON.stringify(testMessage) + '\n')); + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('drops an oversized complete line and continues with the rest of the buffer', () => { + const readBuffer = new ReadBuffer({ maxMessageBytes: 64 }); + readBuffer.append(Buffer.from('y'.repeat(200) + '\n' + JSON.stringify(testMessage) + '\n')); + + expectMessageTooLarge(() => readBuffer.readMessage()); + expect(readBuffer.readMessage()).toEqual(testMessage); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('counts incomplete bytes across many appends', () => { + const readBuffer = new ReadBuffer({ maxMessageBytes: 64 }); + for (let i = 0; i < 6; i++) { + readBuffer.append(Buffer.from('z'.repeat(10))); + expect(readBuffer.readMessage()).toBeNull(); + } + + readBuffer.append(Buffer.from('z'.repeat(10))); + expectMessageTooLarge(() => readBuffer.readMessage()); + }); + + test('clear() resets oversized-message recovery state', () => { + const readBuffer = new ReadBuffer({ maxMessageBytes: 64 }); + readBuffer.append(Buffer.from('x'.repeat(100))); + expectMessageTooLarge(() => readBuffer.readMessage()); + + readBuffer.clear(); + + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\n')); + expect(readBuffer.readMessage()).toEqual(testMessage); + }); +}); diff --git a/packages/server/src/server/stdio.ts b/packages/server/src/server/stdio.ts index ac2dd3f784..6f857b32db 100644 --- a/packages/server/src/server/stdio.ts +++ b/packages/server/src/server/stdio.ts @@ -16,15 +16,33 @@ import { process } from '@modelcontextprotocol/server/_shims'; * await server.connect(transport); * ``` */ +export type StdioServerTransportOptions = { + /** + * Maximum size, in bytes, that a single inbound message may occupy. + * + * Protects against a misbehaving client flooding the server with an unbounded + * amount of data on a single line, which would otherwise grow server memory + * without limit. When a message exceeds this size it is dropped, an + * {@linkcode SdkError} with code `SdkErrorCode.MessageTooLarge` is reported via + * `onerror`, and the transport recovers at the next newline boundary. + * + * Defaults to undefined (no limit), matching previous behavior. + */ + maxMessageBytes?: number; +}; + export class StdioServerTransport implements Transport { - private _readBuffer: ReadBuffer = new ReadBuffer(); + private _readBuffer: ReadBuffer; private _started = false; private _closed = false; constructor( private _stdin: Readable = process.stdin, - private _stdout: Writable = process.stdout - ) {} + private _stdout: Writable = process.stdout, + options?: StdioServerTransportOptions + ) { + this._readBuffer = new ReadBuffer({ maxMessageBytes: options?.maxMessageBytes }); + } onclose?: () => void; onerror?: (error: Error) => void; diff --git a/packages/server/src/stdio.ts b/packages/server/src/stdio.ts index 7865c9cedc..a2c2e05570 100644 --- a/packages/server/src/stdio.ts +++ b/packages/server/src/stdio.ts @@ -5,4 +5,5 @@ // subpath gives consumers a consistent shape across packages. Import from // `@modelcontextprotocol/server/stdio` only in process-stdio runtimes (Node.js, Bun, Deno). +export type { StdioServerTransportOptions } from './server/stdio.js'; export { StdioServerTransport } from './server/stdio.js'; diff --git a/packages/server/test/server/stdio.test.ts b/packages/server/test/server/stdio.test.ts index 92671cacd9..92760ffbad 100644 --- a/packages/server/test/server/stdio.test.ts +++ b/packages/server/test/server/stdio.test.ts @@ -1,7 +1,7 @@ import { Readable, Writable } from 'node:stream'; import type { JSONRPCMessage } from '@modelcontextprotocol/core'; -import { ReadBuffer, serializeMessage } from '@modelcontextprotocol/core'; +import { ReadBuffer, SdkError, SdkErrorCode, serializeMessage } from '@modelcontextprotocol/core'; import { StdioServerTransport } from '../../src/server/stdio.js'; @@ -179,3 +179,36 @@ test('should fire onerror before onclose on stdout error', async () => { expect(events).toEqual(['error', 'close']); }); + +test('should surface MessageTooLarge via onerror and keep running when maxMessageBytes is exceeded', async () => { + const server = new StdioServerTransport(input, output, { maxMessageBytes: 64 }); + + const errors: Error[] = []; + server.onerror = error => { + errors.push(error); + }; + + const messages: JSONRPCMessage[] = []; + server.onmessage = message => { + messages.push(message); + }; + + await server.start(); + + // An oversized line is dropped and reported exactly once... + input.push('x'.repeat(200) + '\n'); + await new Promise(resolve => setImmediate(resolve)); + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + expect(messages).toHaveLength(0); + + // ...and the transport keeps processing subsequent messages. + const message: JSONRPCMessage = { jsonrpc: '2.0', method: 'small' }; + input.push(serializeMessage(message)); + await new Promise(resolve => setImmediate(resolve)); + expect(messages).toEqual([message]); + expect(errors).toHaveLength(1); + + await server.close(); +}); From 0fcdb957bb6db4dd2b88bc947d17de9a96d3606a Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 2 Jun 2026 07:58:04 +0000 Subject: [PATCH 2/4] Document stdio non-JSON line skipping and maxMessageBytes in migration guides v2 silently skips non-JSON lines on the stdio stream where v1 surfaced a SyntaxError through onerror; migrators relying on that signal should know. Also documents the new maxMessageBytes option as the supported replacement for flood protection previously built against v1 transport internals. --- docs/migration-SKILL.md | 2 ++ docs/migration.md | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/docs/migration-SKILL.md b/docs/migration-SKILL.md index b849da8b3d..8dae66813e 100644 --- a/docs/migration-SKILL.md +++ b/docs/migration-SKILL.md @@ -507,6 +507,8 @@ NOT removed (wire surface, kept for 2025-11-25 interop): task Zod schemas + infe `Client.listPrompts()`, `listResources()`, `listResourceTemplates()`, `listTools()` now return empty results when the server lacks the corresponding capability (instead of sending the request). Set `enforceStrictCapabilities: true` in `ClientOptions` to throw an error instead. +Stdio transports: non-JSON lines on the stream are now skipped silently (v1 surfaced a `SyntaxError` via `onerror`); valid-JSON messages failing schema validation still reach `onerror`. Both stdio transports accept an optional `maxMessageBytes` — oversized messages are dropped and reported via `onerror` as `SdkError` with code `SdkErrorCode.MessageTooLarge` (default: unlimited). Use it to replace any v1-era flood protection built on transport internals. + ## 14. Runtime-Specific JSON Schema Validators (Enhancement) The SDK now auto-selects the appropriate JSON Schema validator based on runtime: diff --git a/docs/migration.md b/docs/migration.md index bf88cf2b76..2f7227b320 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -141,6 +141,27 @@ import { StreamableHTTPClientTransport } from '@modelcontextprotocol/client'; const transport = new StreamableHTTPClientTransport(new URL('http://localhost:3000/mcp')); ``` +### Stdio transports: non-JSON lines are now skipped silently + +In v1, a non-JSON line on the stdio stream (for example, debug output from a hot-reload +tool writing to stdout) surfaced as a `SyntaxError` through the transport's `onerror` +callback. In v2, the stdio read buffer silently skips lines that are not valid JSON and +continues with the next line; only valid-JSON messages that fail schema validation still +reach `onerror`. If you relied on `onerror` to detect a misbehaving server that writes +noise to stdout, that signal no longer fires for non-JSON lines. + +Relatedly, both stdio transports now accept an optional `maxMessageBytes` setting that +bounds how large a single message may grow before it is dropped and reported via +`onerror` (`SdkError` with code `SdkErrorCode.MessageTooLarge`). v1 had no built-in +protection against a peer flooding the stream with unbounded data on a single line; if +you implemented such protection against v1 transport internals, migrate to this option. + +```typescript +import { StdioClientTransport } from '@modelcontextprotocol/client/stdio'; + +const transport = new StdioClientTransport({ command: 'my-server' }, { maxMessageBytes: 4 * 1024 * 1024 }); +``` + ### Server auth split Resource Server helpers (`requireBearerAuth`, `mcpAuthMetadataRouter`, `getOAuthProtectedResourceMetadataUrl`, `OAuthTokenVerifier`) are first-class in `@modelcontextprotocol/express`. From 7a8627e741ffb6094f3bb125c272dce6b48413a4 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 2 Jun 2026 08:25:28 +0000 Subject: [PATCH 3/4] Add regression tests for buffer compaction, growth, shrink, and mid-stream flood recovery The existing chunked-assembly tests only exercised capacity events with no consumed prefix, so a copy or offset bug in the compaction and growth paths could corrupt a partially buffered message without any test noticing. The new cases interleave a consumed message with a partial one before forcing each capacity event, using distinct payload characters so spliced bytes change content detectably. Also pins the shrink-after-drain memory bound and adds a client transport test where a child process floods stdout past the limit without a newline and the transport recovers at the next boundary. --- packages/client/test/client/stdio.test.ts | 37 ++++++++++++ packages/core/test/shared/stdio.test.ts | 72 +++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/packages/client/test/client/stdio.test.ts b/packages/client/test/client/stdio.test.ts index 604dfefc86..21e603b152 100644 --- a/packages/client/test/client/stdio.test.ts +++ b/packages/client/test/client/stdio.test.ts @@ -122,3 +122,40 @@ test('should surface MessageTooLarge via onerror and keep running when maxMessag await client.close(); }); + +test('should recover when a child floods stdout without a newline', async () => { + // A misbehaving server that writes a large amount of data with no message + // boundary, then a valid message: the limit must trip while the flood is + // still incomplete, and the transport must recover at the newline. + const childScript = [ + "process.stdout.write('x'.repeat(1_000_000));", + "process.stdout.write('\\n');", + "process.stdout.write(JSON.stringify({ jsonrpc: '2.0', method: 'after-flood' }) + '\\n');", + 'setInterval(() => {}, 1 << 30);' + ].join(' '); + + const client = new StdioClientTransport({ command: process.execPath, args: ['-e', childScript] }, { maxMessageBytes: 65_536 }); + + const errors: Error[] = []; + client.onerror = error => { + errors.push(error); + }; + + const messages: JSONRPCMessage[] = []; + const messageAfterFlood = new Promise(resolve => { + client.onmessage = message => { + messages.push(message); + resolve(); + }; + }); + + await client.start(); + await messageAfterFlood; + + expect(messages).toEqual([{ jsonrpc: '2.0', method: 'after-flood' }]); + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + + await client.close(); +}); diff --git a/packages/core/test/shared/stdio.test.ts b/packages/core/test/shared/stdio.test.ts index 36b40eb718..fbf126d8d2 100644 --- a/packages/core/test/shared/stdio.test.ts +++ b/packages/core/test/shared/stdio.test.ts @@ -254,3 +254,75 @@ describe('maxMessageBytes', () => { expect(readBuffer.readMessage()).toEqual(testMessage); }); }); + +describe('capacity events with a consumed prefix', () => { + // These cases pin the in-place compaction and growth paths of the internal + // buffer when a previous message has already been consumed (start offset > 0) + // while a partial message is still buffered - the common interleaving on a + // busy stream. A copy/offset bug here silently corrupts message bytes. + // Each message uses a distinct payload character so that a copy bug that + // splices bytes from a neighboring message changes content detectably. + const messageWithPayload = (method: string, payloadLength: number): JSONRPCMessage => ({ + jsonrpc: '2.0', + method, + params: { payload: method.charAt(0).repeat(payloadLength) } + }); + + test('compacts in place without corrupting a partially buffered message', () => { + const readBuffer = new ReadBuffer(); + const messageA = messageWithPayload('a', 5900); // ~6000 bytes serialized + const messageB = messageWithPayload('b', 1500); // ~1600 bytes serialized + const messageC = messageWithPayload('c', 1900); // ~2000 bytes serialized + + const serializedB = Buffer.from(JSON.stringify(messageB) + '\n'); + + // First chunk: all of A plus the first 1000 bytes of B (fits the initial + // 8 KiB capacity). Consuming A leaves a consumed prefix ahead of B's bytes. + readBuffer.append(Buffer.concat([Buffer.from(JSON.stringify(messageA) + '\n'), serializedB.subarray(0, 1000)])); + expect(readBuffer.readMessage()).toEqual(messageA); + expect(readBuffer.readMessage()).toBeNull(); + + // Second chunk: the rest of B plus C (~2600 bytes). Total valid bytes fit + // the existing capacity only after reclaiming the consumed prefix, so this + // append must compact in place rather than grow. + readBuffer.append(Buffer.concat([serializedB.subarray(1000), Buffer.from(JSON.stringify(messageC) + '\n')])); + expect(readBuffer.readMessage()).toEqual(messageB); + expect(readBuffer.readMessage()).toEqual(messageC); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('grows without corrupting a partially buffered message', () => { + const readBuffer = new ReadBuffer(); + const messageA = messageWithPayload('a', 3900); // ~4000 bytes serialized + const messageB = messageWithPayload('b', 8800); // ~8900 bytes serialized + + const serializedB = Buffer.from(JSON.stringify(messageB) + '\n'); + + readBuffer.append(Buffer.concat([Buffer.from(JSON.stringify(messageA) + '\n'), serializedB.subarray(0, 1000)])); + expect(readBuffer.readMessage()).toEqual(messageA); + expect(readBuffer.readMessage()).toBeNull(); + + // The remaining ~7900 bytes of B exceed the initial capacity even after + // compaction, forcing the growth path while the start offset is non-zero. + readBuffer.append(serializedB.subarray(1000)); + expect(readBuffer.readMessage()).toEqual(messageB); + expect(readBuffer.readMessage()).toBeNull(); + }); + + test('releases an inflated buffer once fully drained', () => { + const readBuffer = new ReadBuffer(); + const bigMessage = messageWithPayload('big', 200_000); + readBuffer.append(Buffer.from(JSON.stringify(bigMessage) + '\n')); + expect(readBuffer.readMessage()).toEqual(bigMessage); + + // The documented memory bound: one large message must not pin a large + // allocation forever. This intentionally inspects the internal buffer - + // the property is not observable through the public API. + const internal = readBuffer as unknown as { _buffer: Buffer }; + expect(internal._buffer.length).toBeLessThanOrEqual(131_072); + + // And the buffer still works after shrinking. + readBuffer.append(Buffer.from(JSON.stringify(testMessage) + '\n')); + expect(readBuffer.readMessage()).toEqual(testMessage); + }); +}); From ee4a482be97af263b6cd05f84835cbc06702b75e Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 2 Jun 2026 15:21:04 +0000 Subject: [PATCH 4/4] Accept options as the sole StdioServerTransport argument; export ReadBufferOptions --- docs/migration-SKILL.md | 2 +- docs/migration.md | 4 +-- packages/core/src/exports/public/index.ts | 1 + packages/server/src/server/stdio.ts | 29 +++++++++++++--- packages/server/test/server/stdio.test.ts | 40 +++++++++++++++++++++++ 5 files changed, 68 insertions(+), 8 deletions(-) diff --git a/docs/migration-SKILL.md b/docs/migration-SKILL.md index 8dae66813e..ab46b0fcfd 100644 --- a/docs/migration-SKILL.md +++ b/docs/migration-SKILL.md @@ -216,7 +216,7 @@ if (error instanceof OAuthError && error.code === OAuthErrorCode.InvalidClient) ``` **Unchanged APIs** (only import paths changed): `Client` constructor and most methods, `McpServer` constructor, `server.connect()`, `server.close()`, all client transports (`StreamableHTTPClientTransport`, `SSEClientTransport`, `StdioClientTransport`), `StdioServerTransport`, all -Zod schemas, all callback return types. Note: `callTool()` and `request()` signatures changed (schema parameter removed, see section 11). +Zod schemas, all callback return types. Note: `callTool()` and `request()` signatures changed (schema parameter removed, see section 11). The stdio transports additionally accept a new optional `maxMessageBytes` option in v2 (see the stdio transport notes). ## 6. McpServer API Changes diff --git a/docs/migration.md b/docs/migration.md index 2f7227b320..8e5a1284e1 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -1005,8 +1005,8 @@ The following APIs are unchanged between v1 and v2 (only the import paths change - `Client` constructor and most client methods (`connect`, `listTools`, `listPrompts`, `listResources`, `readResource`, etc.) — note: `callTool()` signature changed (schema parameter removed) - `McpServer` constructor, `server.connect(transport)`, `server.close()` - `Server` (low-level) constructor and all methods -- `StreamableHTTPClientTransport`, `SSEClientTransport`, `StdioClientTransport` constructors and options -- `StdioServerTransport` constructor and options +- `StreamableHTTPClientTransport` and `SSEClientTransport` constructors and options +- `StdioClientTransport` and `StdioServerTransport` constructors — note: both accept a new optional `maxMessageBytes` option in v2 (see the stdio transport notes above) - All Zod schemas and type definitions from `types.ts` (except the aliases listed above) - Tool, prompt, and resource callback return types diff --git a/packages/core/src/exports/public/index.ts b/packages/core/src/exports/public/index.ts index 729144f1a3..e29c0c786b 100644 --- a/packages/core/src/exports/public/index.ts +++ b/packages/core/src/exports/public/index.ts @@ -54,6 +54,7 @@ export { DEFAULT_REQUEST_TIMEOUT_MSEC } from '../../shared/protocol.js'; // stdio message framing utilities (for custom transport authors) export { deserializeMessage, ReadBuffer, serializeMessage } from '../../shared/stdio.js'; +export type { ReadBufferOptions } from '../../shared/stdio.js'; // Transport types (NOT normalizeHeaders) export type { FetchLike, Transport, TransportSendOptions } from '../../shared/transport.js'; diff --git a/packages/server/src/server/stdio.ts b/packages/server/src/server/stdio.ts index 6f857b32db..ba6203cc2e 100644 --- a/packages/server/src/server/stdio.ts +++ b/packages/server/src/server/stdio.ts @@ -31,16 +31,35 @@ export type StdioServerTransportOptions = { maxMessageBytes?: number; }; +/** + * Distinguishes a stream argument from an options object in the overloaded + * {@linkcode StdioServerTransport} constructor. Every Node.js readable stream + * implements `pipe()`; an options literal does not. + */ +function isReadableStream(value: Readable | StdioServerTransportOptions): value is Readable { + return typeof (value as Readable).pipe === 'function'; +} + export class StdioServerTransport implements Transport { + private _stdin: Readable; + private _stdout: Writable; private _readBuffer: ReadBuffer; private _started = false; private _closed = false; - constructor( - private _stdin: Readable = process.stdin, - private _stdout: Writable = process.stdout, - options?: StdioServerTransportOptions - ) { + /** Communicate over the current process' `stdin` and `stdout`. */ + constructor(options?: StdioServerTransportOptions); + /** Communicate over the given streams, defaulting to the current process' `stdin` and `stdout`. */ + constructor(stdin?: Readable, stdout?: Writable, options?: StdioServerTransportOptions); + constructor(stdinOrOptions?: Readable | StdioServerTransportOptions, stdout?: Writable, options?: StdioServerTransportOptions) { + let stdin: Readable | undefined; + if (stdinOrOptions !== undefined && isReadableStream(stdinOrOptions)) { + stdin = stdinOrOptions; + } else if (stdinOrOptions !== undefined) { + options = stdinOrOptions; + } + this._stdin = stdin ?? process.stdin; + this._stdout = stdout ?? process.stdout; this._readBuffer = new ReadBuffer({ maxMessageBytes: options?.maxMessageBytes }); } diff --git a/packages/server/test/server/stdio.test.ts b/packages/server/test/server/stdio.test.ts index 92760ffbad..96cb7d70e9 100644 --- a/packages/server/test/server/stdio.test.ts +++ b/packages/server/test/server/stdio.test.ts @@ -3,6 +3,8 @@ import { Readable, Writable } from 'node:stream'; import type { JSONRPCMessage } from '@modelcontextprotocol/core'; import { ReadBuffer, SdkError, SdkErrorCode, serializeMessage } from '@modelcontextprotocol/core'; +import { process as shimProcess } from '@modelcontextprotocol/server/_shims'; + import { StdioServerTransport } from '../../src/server/stdio.js'; let input: Readable; @@ -212,3 +214,41 @@ test('should surface MessageTooLarge via onerror and keep running when maxMessag await server.close(); }); + +test('options-only constructor uses the process streams and applies maxMessageBytes', async () => { + const server = new StdioServerTransport({ maxMessageBytes: 64 }); + + const errors: Error[] = []; + const messages: JSONRPCMessage[] = []; + server.onerror = error => errors.push(error); + server.onmessage = message => messages.push(message); + + const listenersBefore = shimProcess.stdin.listenerCount('data'); + await server.start(); + expect(shimProcess.stdin.listenerCount('data')).toBe(listenersBefore + 1); + + // Drive the read path directly: an oversized line, then a valid message. + server._ondata(Buffer.from(`${'x'.repeat(200)}\n`)); + const valid: JSONRPCMessage = { jsonrpc: '2.0', method: 'notifications/initialized' }; + server._ondata(Buffer.from(serializeMessage(valid))); + + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); + expect(messages).toEqual([valid]); + + await server.close(); + expect(shimProcess.stdin.listenerCount('data')).toBe(listenersBefore); +}); + +test('three-argument form with undefined stream placeholders still applies maxMessageBytes', () => { + const server = new StdioServerTransport(undefined, undefined, { maxMessageBytes: 64 }); + + const errors: Error[] = []; + server.onerror = error => errors.push(error); + server._ondata(Buffer.from(`${'x'.repeat(200)}\n`)); + + expect(errors).toHaveLength(1); + expect(errors[0]).toBeInstanceOf(SdkError); + expect((errors[0] as SdkError).code).toBe(SdkErrorCode.MessageTooLarge); +});