|
5 | 5 | """ |
6 | 6 |
|
7 | 7 | import logging |
| 8 | +from typing import Protocol, cast |
8 | 9 |
|
9 | 10 | import anyio |
10 | 11 | import httpx |
|
19 | 20 | from mcp.types import InitializeResult |
20 | 21 |
|
21 | 22 |
|
| 23 | +class _ExceptionGroupWithExceptions(Protocol): |
| 24 | + exceptions: tuple[BaseException, ...] |
| 25 | + |
| 26 | + |
22 | 27 | async def mock_github_endpoint(request: Request) -> Response: |
23 | 28 | """Mock endpoint that returns 405 for GET (like GitHub MCP).""" |
24 | 29 | if request.method == "GET": |
@@ -73,7 +78,7 @@ async def test_405_get_stream_does_not_hang(caplog: pytest.LogCaptureFixture): |
73 | 78 | transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0 |
74 | 79 | ) as http_client: |
75 | 80 | transport_cm = streamable_http_client("http://testserver/mcp", http_client=http_client) |
76 | | - async with transport_cm as transport_streams: # pragma: no cover |
| 81 | + async with transport_cm as transport_streams: |
77 | 82 | read_stream, write_stream = transport_streams |
78 | 83 | async with ClientSession(read_stream, write_stream) as session: |
79 | 84 | # Initialize sends the initialized notification internally |
@@ -108,6 +113,34 @@ async def test_405_get_stream_does_not_hang(caplog: pytest.LogCaptureFixture): |
108 | 113 | ) |
109 | 114 |
|
110 | 115 |
|
| 116 | +@pytest.mark.anyio |
| 117 | +async def test_streamable_http_client_context_manager_exception_exit_is_covered() -> None: |
| 118 | + """Cover the exceptional exit path of the streamable HTTP transport context manager. |
| 119 | +
|
| 120 | + Branch coverage can vary across Python versions for `async with` teardown paths. |
| 121 | + This test ensures the exception-unwind path is exercised without relying on pragmas. |
| 122 | + """ |
| 123 | + app = Starlette(routes=[Route("/mcp", mock_github_endpoint, methods=["GET", "POST"])]) |
| 124 | + |
| 125 | + async with httpx.AsyncClient( |
| 126 | + transport=httpx.ASGITransport(app=app), |
| 127 | + base_url="http://testserver", |
| 128 | + timeout=5.0, |
| 129 | + ) as http_client: |
| 130 | + transport_cm = streamable_http_client("http://testserver/mcp", http_client=http_client) |
| 131 | + with pytest.raises(BaseException) as excinfo: |
| 132 | + async with transport_cm: |
| 133 | + raise RuntimeError("boom") |
| 134 | + |
| 135 | + exc = excinfo.value |
| 136 | + if hasattr(exc, "exceptions"): |
| 137 | + excs = cast(_ExceptionGroupWithExceptions, exc).exceptions |
| 138 | + else: |
| 139 | + excs = (exc,) |
| 140 | + |
| 141 | + assert any(isinstance(inner, RuntimeError) and str(inner) == "boom" for inner in excs) |
| 142 | + |
| 143 | + |
111 | 144 | @pytest.mark.anyio |
112 | 145 | async def test_mock_github_endpoint_other_method_returns_405() -> None: |
113 | 146 | """Ensure fallback 405 branch is covered for non-GET/POST methods.""" |
|
0 commit comments