From f7abb9df3010684c4f2998733792b406ac491c5f Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Wed, 25 Mar 2026 12:55:03 +0100 Subject: [PATCH 1/3] feat(server): forward optional stdin/stdout to stdio_server MCPServer.run_stdio_async and run(..., transport="stdio") now accept optional async text streams, matching mcp.server.stdio.stdio_server. This supports hosts that redirect process stdout for logging while keeping a dedicated MCP JSON-RPC stream. Made-with: Cursor --- src/mcp/server/mcpserver/server.py | 39 +++++++++++++--- .../test_run_stdio_custom_streams.py | 44 +++++++++++++++++++ 2 files changed, 78 insertions(+), 5 deletions(-) create mode 100644 tests/server/mcpserver/test_run_stdio_custom_streams.py diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index 2a7a58117..f7443646a 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -238,7 +238,13 @@ def session_manager(self) -> StreamableHTTPSessionManager: return self._lowlevel_server.session_manager # pragma: no cover @overload - def run(self, transport: Literal["stdio"] = ...) -> None: ... + def run( + self, + transport: Literal["stdio"] = ..., + *, + stdin: anyio.AsyncFile[str] | None = ..., + stdout: anyio.AsyncFile[str] | None = ..., + ) -> None: ... @overload def run( @@ -270,12 +276,19 @@ def run( def run( self, transport: Literal["stdio", "sse", "streamable-http"] = "stdio", + *, + stdin: anyio.AsyncFile[str] | None = None, + stdout: anyio.AsyncFile[str] | None = None, **kwargs: Any, ) -> None: """Run the MCP server. Note this is a synchronous function. Args: transport: Transport protocol to use ("stdio", "sse", or "streamable-http") + stdin: Optional async text stream for MCP input (stdio transport only). + When omitted, uses process stdin. See :func:`mcp.server.stdio.stdio_server`. + stdout: Optional async text stream for MCP output (stdio transport only). + When omitted, uses process stdout. **kwargs: Transport-specific options (see overloads for details) """ TRANSPORTS = Literal["stdio", "sse", "streamable-http"] @@ -284,7 +297,7 @@ def run( match transport: case "stdio": - anyio.run(self.run_stdio_async) + anyio.run(lambda: self.run_stdio_async(stdin=stdin, stdout=stdout)) case "sse": # pragma: no cover anyio.run(lambda: self.run_sse_async(**kwargs)) case "streamable-http": # pragma: no cover @@ -836,9 +849,25 @@ def decorator( # pragma: no cover return decorator # pragma: no cover - async def run_stdio_async(self) -> None: - """Run the server using stdio transport.""" - async with stdio_server() as (read_stream, write_stream): + async def run_stdio_async( + self, + *, + stdin: anyio.AsyncFile[str] | None = None, + stdout: anyio.AsyncFile[str] | None = None, + ) -> None: + """Run the server using stdio transport. + + Args: + stdin: Async text stream to read JSON-RPC lines from. When ``None``, + uses the process stdin (see :func:`mcp.server.stdio.stdio_server`). + stdout: Async text stream to write JSON-RPC lines to. When ``None``, + uses the process stdout. + + Custom streams are useful when the process ``sys.stdout`` / ``sys.stdin`` + must be redirected (for example so logging or subprocess output does not + corrupt the MCP JSON-RPC stream on fd 1). + """ + async with stdio_server(stdin=stdin, stdout=stdout) as (read_stream, write_stream): await self._lowlevel_server.run( read_stream, write_stream, diff --git a/tests/server/mcpserver/test_run_stdio_custom_streams.py b/tests/server/mcpserver/test_run_stdio_custom_streams.py new file mode 100644 index 000000000..5c9b1c9ca --- /dev/null +++ b/tests/server/mcpserver/test_run_stdio_custom_streams.py @@ -0,0 +1,44 @@ +"""MCPServer.run_stdio_async forwards optional stdin/stdout to stdio_server.""" + +from __future__ import annotations + +import io +from contextlib import asynccontextmanager +from unittest.mock import AsyncMock + +import anyio +import pytest + +from mcp.server.mcpserver import MCPServer + + +@pytest.mark.anyio +async def test_run_stdio_async_passes_streams_to_stdio_server(monkeypatch: pytest.MonkeyPatch) -> None: + captured: dict[str, object] = {} + + @asynccontextmanager + async def spy_stdio_server(stdin=None, stdout=None): + captured["stdin"] = stdin + captured["stdout"] = stdout + read_stream = AsyncMock() + write_stream = AsyncMock() + yield read_stream, write_stream + + async def noop_run(*_args, **_kwargs): + return None + + monkeypatch.setattr("mcp.server.mcpserver.server.stdio_server", spy_stdio_server) + + server = MCPServer("test-stdio-spy") + monkeypatch.setattr(server._lowlevel_server, "run", noop_run) + monkeypatch.setattr(server._lowlevel_server, "create_initialization_options", lambda: object()) + + sin = io.StringIO() + sout = io.StringIO() + await server.run_stdio_async( + stdin=anyio.AsyncFile(sin), + stdout=anyio.AsyncFile(sout), + ) + + assert captured["stdin"] is not None + assert captured["stdout"] is not None From 3d1bdead121ccff173f7b400e0529d5530901f16 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Wed, 25 Mar 2026 13:01:41 +0100 Subject: [PATCH 2/3] test: add pyright-safe annotations for stdio spy Made-with: Cursor --- tests/server/mcpserver/test_run_stdio_custom_streams.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/server/mcpserver/test_run_stdio_custom_streams.py b/tests/server/mcpserver/test_run_stdio_custom_streams.py index 5c9b1c9ca..9e6fd87b9 100644 --- a/tests/server/mcpserver/test_run_stdio_custom_streams.py +++ b/tests/server/mcpserver/test_run_stdio_custom_streams.py @@ -3,7 +3,9 @@ from __future__ import annotations import io +from collections.abc import AsyncIterator from contextlib import asynccontextmanager +from typing import Any from unittest.mock import AsyncMock import anyio @@ -17,14 +19,17 @@ async def test_run_stdio_async_passes_streams_to_stdio_server(monkeypatch: pytes captured: dict[str, object] = {} @asynccontextmanager - async def spy_stdio_server(stdin=None, stdout=None): + async def spy_stdio_server( + stdin: anyio.AsyncFile[str] | None = None, + stdout: anyio.AsyncFile[str] | None = None, + ) -> AsyncIterator[tuple[AsyncMock, AsyncMock]]: captured["stdin"] = stdin captured["stdout"] = stdout read_stream = AsyncMock() write_stream = AsyncMock() yield read_stream, write_stream - async def noop_run(*_args, **_kwargs): + async def noop_run(*_args: Any, **_kwargs: Any) -> None: return None monkeypatch.setattr("mcp.server.mcpserver.server.stdio_server", spy_stdio_server) From 0a82623e92a746f7962195896ea415cd2de2cde6 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Wed, 25 Mar 2026 13:06:54 +0100 Subject: [PATCH 3/3] refactor: pass stdio params through **kwargs like other transports The SSE and streamable-http overloads declare transport-specific params but the implementation catches them all via **kwargs. Follow the same pattern for stdin/stdout so the implementation signature stays uniform. Made-with: Cursor --- src/mcp/server/mcpserver/server.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index f7443646a..d3502f2dd 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -276,19 +276,12 @@ def run( def run( self, transport: Literal["stdio", "sse", "streamable-http"] = "stdio", - *, - stdin: anyio.AsyncFile[str] | None = None, - stdout: anyio.AsyncFile[str] | None = None, **kwargs: Any, ) -> None: """Run the MCP server. Note this is a synchronous function. Args: transport: Transport protocol to use ("stdio", "sse", or "streamable-http") - stdin: Optional async text stream for MCP input (stdio transport only). - When omitted, uses process stdin. See :func:`mcp.server.stdio.stdio_server`. - stdout: Optional async text stream for MCP output (stdio transport only). - When omitted, uses process stdout. **kwargs: Transport-specific options (see overloads for details) """ TRANSPORTS = Literal["stdio", "sse", "streamable-http"] @@ -297,7 +290,7 @@ def run( match transport: case "stdio": - anyio.run(lambda: self.run_stdio_async(stdin=stdin, stdout=stdout)) + anyio.run(lambda: self.run_stdio_async(**kwargs)) case "sse": # pragma: no cover anyio.run(lambda: self.run_sse_async(**kwargs)) case "streamable-http": # pragma: no cover