Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ internal sealed class StdioClientSessionTransport : StreamClientSessionTransport
private readonly StdioClientTransportOptions _options;
private readonly Process _process;
private readonly Queue<string> _stderrRollingLog;
private readonly DataReceivedEventHandler _errorHandler;
private int _cleanedUp = 0;
private readonly int? _processId;

public StdioClientSessionTransport(StdioClientTransportOptions options, Process process, string endpointName, Queue<string> stderrRollingLog, ILoggerFactory? loggerFactory) :
public StdioClientSessionTransport(StdioClientTransportOptions options, Process process, string endpointName, Queue<string> stderrRollingLog, DataReceivedEventHandler errorHandler, ILoggerFactory? loggerFactory) :
base(process.StandardInput.BaseStream, process.StandardOutput.BaseStream, encoding: null, endpointName, loggerFactory)
{
_options = options;
_process = process;
_stderrRollingLog = stderrRollingLog;
_errorHandler = errorHandler;
try { _processId = process.Id; } catch { }
}

Expand All @@ -45,16 +47,24 @@ public override async Task SendMessageAsync(JsonRpcMessage message, Cancellation
/// <inheritdoc/>
protected override async ValueTask CleanupAsync(Exception? error = null, CancellationToken cancellationToken = default)
{
// Only clean up once.
// Only run the full stdio cleanup once (handler detach, process kill, etc.).
// If another call is already handling cleanup, cancel the shutdown token
// to unblock it (e.g. if it's stuck in WaitForExitAsync) and let it
// call SetDisconnected with full StdioClientCompletionDetails.
if (Interlocked.Exchange(ref _cleanedUp, 1) != 0)
{
CancelShutdown();
return;
}

// We've not yet forcefully terminated the server. If it's already shut down, something went wrong,
// so create an exception with details about that.
error ??= await GetUnexpectedExitExceptionAsync(cancellationToken).ConfigureAwait(false);

// Detach the stderr handler so no further ErrorDataReceived events
// are dispatched during or after process disposal.
_process.ErrorDataReceived -= _errorHandler;

// Terminate the server process (or confirm it already exited), then build
// and publish strongly-typed completion details while the process handle
// is still valid so we can read the exit code.
Expand Down Expand Up @@ -89,13 +99,17 @@ protected override async ValueTask CleanupAsync(Exception? error = null, Cancell
try
{
// The process has exited, but we still need to ensure stderr has been flushed.
// WaitForExitAsync only waits for exit; it does not guarantee that all
// ErrorDataReceived events have been dispatched. The synchronous WaitForExit()
// (no arguments) does ensure that, so call it after WaitForExitAsync completes.
// Use a bounded wait: the process is already dead, we're just draining pipe
// buffers. If the caller's token is never canceled (e.g. _shutdownCts hasn't
// been canceled yet), an unbounded wait here can hang indefinitely when the
// threadpool is slow to deliver the stderr EOF callback.
#if NET
await _process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(_options.ShutdownTimeout);
await _process.WaitForExitAsync(timeoutCts.Token).ConfigureAwait(false);
#else
_process.WaitForExit((int)_options.ShutdownTimeout.TotalMilliseconds);
#endif
_process.WaitForExit();
}
catch { }

Expand Down
38 changes: 23 additions & 15 deletions src/ModelContextProtocol.Core/Client/StdioClientTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public async Task<ITransport> ConnectAsync(CancellationToken cancellationToken =

Process? process = null;
bool processStarted = false;
DataReceivedEventHandler? errorHandler = null;

string command = _options.Command;
IList<string>? arguments = _options.Arguments;
Expand Down Expand Up @@ -136,7 +137,7 @@ public async Task<ITransport> ConnectAsync(CancellationToken cancellationToken =
// few lines in a rolling log for use in exceptions.
const int MaxStderrLength = 10; // keep the last 10 lines of stderr
Queue<string> stderrRollingLog = new(MaxStderrLength);
process.ErrorDataReceived += (sender, args) =>
errorHandler = (sender, args) =>
{
string? data = args.Data;
if (data is not null)
Expand All @@ -151,11 +152,22 @@ public async Task<ITransport> ConnectAsync(CancellationToken cancellationToken =
stderrRollingLog.Enqueue(data);
}

_options.StandardErrorLines?.Invoke(data);
try
{
_options.StandardErrorLines?.Invoke(data);
}
catch (Exception ex)
{
// Prevent exceptions in the user callback from propagating
// to the background thread that dispatches ErrorDataReceived,
// which would crash the process.
LogStderrCallbackFailed(logger, endpointName, ex);
}

LogReadStderr(logger, endpointName, data);
}
};
process.ErrorDataReceived += errorHandler;

// We need both stdin and stdout to use a no-BOM UTF-8 encoding. On .NET Core,
// we can use ProcessStartInfo.StandardOutputEncoding/StandardInputEncoding, but
Expand Down Expand Up @@ -193,14 +205,19 @@ public async Task<ITransport> ConnectAsync(CancellationToken cancellationToken =

process.BeginErrorReadLine();

return new StdioClientSessionTransport(_options, process, endpointName, stderrRollingLog, _loggerFactory);
return new StdioClientSessionTransport(_options, process, endpointName, stderrRollingLog, errorHandler, _loggerFactory);
}
catch (Exception ex)
{
LogTransportConnectFailed(logger, endpointName, ex);

try
{
if (process is not null && errorHandler is not null)
{
process.ErrorDataReceived -= errorHandler;
}

DisposeProcess(process, processStarted, _options.ShutdownTimeout);
}
catch (Exception ex2)
Expand Down Expand Up @@ -228,18 +245,6 @@ internal static void DisposeProcess(
process.KillTree(shutdownTimeout);
}

// Ensure all redirected stderr/stdout events have been dispatched
// before disposing. Only the no-arg WaitForExit() guarantees this;
// WaitForExit(int) (as used by KillTree) does not.
// This should not hang: either the process already exited on its own
// (no child processes holding handles), or KillTree killed the entire
// process tree. If it does take too long, the test infrastructure's
// own timeout will catch it.
if (!processRunning && HasExited(process))
{
process.WaitForExit();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't we now possibly miss some error messages being reported over stderr from the child process?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WaitForExit() that was removed from DisposeProcess was only called when !processRunning && HasExited(process) — i.e. when the process had already exited on its own before we got here. The stderr flushing for that case is now handled earlier in StdioClientSessionTransport.GetUnexpectedExitExceptionAsync (line 99-106), which calls WaitForExitAsync(ct) on .NET or WaitForExit(timeout) on Framework. So stderr is still flushed before we build the error message and completion details.

In the processRunning case (where we call KillTree), the old code never called WaitForExit() either — the !processRunning guard prevented it. So there's no behavior change for that path.

The reason for removing it from DisposeProcess: DisposeProcess is a static helper called from both the session transport cleanup and the ConnectAsync error path. The session transport cleanup already handles stderr flushing, and the ConnectAsync error path doesn't need it. Keeping the no-arg WaitForExit() in DisposeProcess risked hangs when called from contexts where the streams aren't fully set up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or WaitForExit(timeout) on Framework. So stderr is still flushed before we build the error message and completion details

WaitForExit(timeout) doesn't wait for EOF of the streams though, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — WaitForExit(timeout) does not wait for stream EOF.

I checked the .NET runtime source for WaitForExitAsync. It calls WaitUntilOutputEOF(cancellationToken) which does _error.EOF.WaitAsync(cancellationToken) — so it DOES wait for stream EOF, and the cancellation token IS respected for the stream-drain wait, not just the process-exit wait. So on .NET, WaitForExitAsync(ct) is strictly better than the old WaitForExit() — it flushes streams AND can be cancelled via CancelShutdown() if grandchild processes hold pipe handles open.

On .NET Framework there's no WaitForExitAsync, so we use WaitForExit(timeout) which doesn't guarantee stream flush. The risk of missing events is low in practice: by this point HasExited has returned true (line 93), the error handler is still attached and receiving events, and the ErrorDataReceived dispatching happens asynchronously — events that were already in the OS pipe buffer will still be delivered during the timeout window. We'd only miss events if there's a significant delay between the process exiting and the runtime dispatching buffered pipe data, which shouldn't happen when the process has cleanly exited. The alternative — no-arg WaitForExit() — has no cancellation mechanism on Framework and hangs indefinitely when grandchild processes hold pipe handles open.


// Invoke the callback while the process handle is still valid,
// e.g. to read ExitCode before Dispose() invalidates it.
beforeDispose?.Invoke();
Expand Down Expand Up @@ -299,6 +304,9 @@ private static string EscapeArgumentString(string argument) =>
[LoggerMessage(Level = LogLevel.Information, Message = "{EndpointName} received stderr log: '{Data}'.")]
private static partial void LogReadStderr(ILogger logger, string endpointName, string data);

[LoggerMessage(Level = LogLevel.Warning, Message = "{EndpointName} StandardErrorLines callback failed.")]
private static partial void LogStderrCallbackFailed(ILogger logger, string endpointName, Exception exception);

[LoggerMessage(Level = LogLevel.Information, Message = "{EndpointName} started server process with PID {ProcessId}.")]
private static partial void LogTransportProcessStarted(ILogger logger, string endpointName, int processId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,21 @@ private async Task ProcessMessageAsync(string line, CancellationToken cancellati
}
}

/// <summary>
/// Cancels the shutdown token to signal that the transport is shutting down,
/// without performing any other cleanup.
/// </summary>
protected void CancelShutdown()
{
try
{
_shutdownCts?.Cancel();
}
catch (ObjectDisposedException)
{
}
}

protected virtual async ValueTask CleanupAsync(Exception? error = null, CancellationToken cancellationToken = default)
{
LogTransportShuttingDown(Name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,29 @@ public async Task RunConformanceTest(string scenario)

var process = new Process { StartInfo = startInfo };

process.OutputDataReceived += (sender, e) =>
// Protect callbacks with try/catch to prevent ITestOutputHelper from
// throwing on a background thread if events arrive after the test completes.
DataReceivedEventHandler outputHandler = (sender, e) =>
{
if (e.Data != null)
{
_output.WriteLine(e.Data);
try { _output.WriteLine(e.Data); } catch { }
outputBuilder.AppendLine(e.Data);
}
};

process.ErrorDataReceived += (sender, e) =>
DataReceivedEventHandler errorHandler = (sender, e) =>
{
if (e.Data != null)
{
_output.WriteLine(e.Data);
try { _output.WriteLine(e.Data); } catch { }
errorBuilder.AppendLine(e.Data);
}
};

process.OutputDataReceived += outputHandler;
process.ErrorDataReceived += errorHandler;

process.Start();
process.BeginOutputReadLine();
process.BeginErrorReadLine();
Expand All @@ -112,13 +117,18 @@ public async Task RunConformanceTest(string scenario)
catch (OperationCanceledException)
{
process.Kill(entireProcessTree: true);
process.OutputDataReceived -= outputHandler;
process.ErrorDataReceived -= errorHandler;
return (
Success: false,
Output: outputBuilder.ToString(),
Error: errorBuilder.ToString() + "\nProcess timed out after 5 minutes and was killed."
);
}

process.OutputDataReceived -= outputHandler;
process.ErrorDataReceived -= errorHandler;

var output = outputBuilder.ToString();
var error = errorBuilder.ToString();
var success = process.ExitCode == 0 || HasOnlyWarnings(output, error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,29 @@ public async Task RunPendingConformanceTest_ServerSsePolling()

var process = new Process { StartInfo = startInfo };

process.OutputDataReceived += (sender, e) =>
// Protect callbacks with try/catch to prevent ITestOutputHelper from
// throwing on a background thread if events arrive after the test completes.
DataReceivedEventHandler outputHandler = (sender, e) =>
{
if (e.Data != null)
{
output.WriteLine(e.Data);
try { output.WriteLine(e.Data); } catch { }
outputBuilder.AppendLine(e.Data);
}
};

process.ErrorDataReceived += (sender, e) =>
DataReceivedEventHandler errorHandler = (sender, e) =>
{
if (e.Data != null)
{
output.WriteLine(e.Data);
try { output.WriteLine(e.Data); } catch { }
errorBuilder.AppendLine(e.Data);
}
};

process.OutputDataReceived += outputHandler;
process.ErrorDataReceived += errorHandler;

process.Start();
process.BeginOutputReadLine();
process.BeginErrorReadLine();
Expand All @@ -166,13 +171,18 @@ public async Task RunPendingConformanceTest_ServerSsePolling()
catch (OperationCanceledException)
{
process.Kill(entireProcessTree: true);
process.OutputDataReceived -= outputHandler;
process.ErrorDataReceived -= errorHandler;
return (
Success: false,
Output: outputBuilder.ToString(),
Error: errorBuilder.ToString() + "\nProcess timed out after 5 minutes and was killed."
);
}

process.OutputDataReceived -= outputHandler;
process.ErrorDataReceived -= errorHandler;

return (
Success: process.ExitCode == 0,
Output: outputBuilder.ToString(),
Expand Down
Loading
Loading