-
Notifications
You must be signed in to change notification settings - Fork 2.7k
feat: start and close ClientSession in a single task in McpSessionManager #4025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
challenger71498
wants to merge
23
commits into
google:main
Choose a base branch
from
challenger71498:feat/task-based-mcp-session-manager
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+816
−46
Open
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
9232cca
feat: add experimental task-based session manager
challenger71498 fbbbb2f
fix: check exception only if done
challenger71498 b59cff4
test: add test for experimental MCP session manager
challenger71498 a3a6ddb
test: add test for session lifecycle
challenger71498 b814686
chore: tidy up error msg
challenger71498 fd022fb
chore: rm redundant comments
challenger71498 addda67
chore: rename SessionLifecycle to SessionContext
challenger71498 ada136a
feat: use SessionContext
challenger71498 4b79453
test: rm transports
challenger71498 379c7c3
chore: rm experimental
challenger71498 af1f471
chore: match error msg
challenger71498 e7bed5a
test: assert that initialized has been called
challenger71498 6e40bdb
test: validate Session on test_session_context.py
challenger71498 eace2b7
style: run autoformat.sh
challenger71498 1e45086
test: add case for task-safety
challenger71498 ba9b9d8
chore: update desc
challenger71498 db03a2e
test: execute create_task each
challenger71498 8b09154
chore: add missing s
challenger71498 0081acb
feat: add timeout on session close
challenger71498 9f23396
feat: set MCP client timeout to SSE read timeout
challenger71498 b0f69d7
style: run autoformat.sh
challenger71498 ecdda62
chore: rm unused import
challenger71498 64672cd
refactor: rm local var
challenger71498 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| from contextlib import AsyncExitStack | ||
| from datetime import timedelta | ||
| import logging | ||
| from typing import AsyncContextManager | ||
| from typing import Optional | ||
|
|
||
| from mcp import ClientSession | ||
|
|
||
| logger = logging.getLogger('google_adk.' + __name__) | ||
|
|
||
|
|
||
| class SessionContext: | ||
| """Represents the context of a single MCP session within a dedicated task. | ||
| AnyIO's TaskGroup/CancelScope requires that the start and end of a scope | ||
| occur within the same task. Since MCP clients use AnyIO internally, we need | ||
| to ensure that the client's entire lifecycle (creation, usage, and cleanup) | ||
| happens within a single dedicated task. | ||
| This class spawns a background task that: | ||
| 1. Enters the MCP client's async context and initializes the session | ||
| 2. Signals readiness via an asyncio.Event | ||
| 3. Waits for a close signal | ||
| 4. Cleans up the client within the same task | ||
| This ensures CancelScope constraints are satisfied regardless of which | ||
| task calls start() or close(). | ||
| Can be used in two ways: | ||
| 1. Direct method calls: start() and close() | ||
| 2. As an async context manager: async with lifecycle as session: ... | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| client: AsyncContextManager, | ||
| timeout: Optional[float], | ||
| sse_read_timeout: Optional[float], | ||
| is_stdio: bool = False, | ||
| ): | ||
| """ | ||
| Args: | ||
| client: An MCP client context manager (e.g., from streamablehttp_client, | ||
| sse_client, or stdio_client). | ||
| timeout: Timeout in seconds for connection and initialization. | ||
| sse_read_timeout: Timeout in seconds for reading data from the MCP SSE | ||
| server. | ||
| is_stdio: Whether this is a stdio connection (affects read timeout). | ||
| """ | ||
| self._client = client | ||
| self._timeout = timeout | ||
| self._sse_read_timeout = sse_read_timeout | ||
| self._is_stdio = is_stdio | ||
| self._session: Optional[ClientSession] = None | ||
| self._ready_event = asyncio.Event() | ||
| self._close_event = asyncio.Event() | ||
| self._task: Optional[asyncio.Task] = None | ||
|
|
||
| @property | ||
| def session(self) -> Optional[ClientSession]: | ||
| """Get the managed ClientSession, if available.""" | ||
| return self._session | ||
|
|
||
| async def start(self) -> ClientSession: | ||
| """Start the runner and wait for the session to be ready. | ||
| Returns: | ||
| The initialized ClientSession. | ||
| Raises: | ||
| ConnectionError: If session creation fails. | ||
| """ | ||
| self._task = asyncio.create_task(self._run()) | ||
| await self._ready_event.wait() | ||
|
|
||
| if self._task.cancelled(): | ||
| raise ConnectionError('Failed to create MCP session: task cancelled') | ||
|
|
||
| if self._task.done() and self._task.exception(): | ||
| raise ConnectionError( | ||
| f'Failed to create MCP session: {self._task.exception()}' | ||
| ) from self._task.exception() | ||
|
|
||
| return self._session | ||
|
|
||
| async def close(self): | ||
| """Signal the context task to close and wait for cleanup.""" | ||
| self._close_event.set() | ||
| if self._task: | ||
| try: | ||
| await asyncio.wait_for(self._task, timeout=self._timeout) | ||
| except asyncio.TimeoutError: | ||
| logger.warning('Failed to close MCP session: task timed out') | ||
| self._task.cancel() | ||
| except asyncio.CancelledError: | ||
| pass | ||
| except Exception as e: | ||
| logger.warning(f'Failed to close MCP session: {e}') | ||
|
|
||
| async def __aenter__(self) -> ClientSession: | ||
| return await self.start() | ||
|
|
||
| async def __aexit__(self, exc_type, exc_val, exc_tb): | ||
| await self.close() | ||
|
|
||
| async def _run(self): | ||
| """Run the complete session context within a single task.""" | ||
| try: | ||
| async with AsyncExitStack() as exit_stack: | ||
| transports = await asyncio.wait_for( | ||
| exit_stack.enter_async_context(self._client), | ||
| timeout=self._timeout, | ||
| ) | ||
| # The streamable http client returns a GetSessionCallback in addition | ||
| # to the read/write MemoryObjectStreams needed to build the | ||
| # ClientSession. We limit to the first two values to be compatible | ||
| # with all clients. | ||
| if self._is_stdio: | ||
| session = await exit_stack.enter_async_context( | ||
| ClientSession( | ||
| *transports[:2], | ||
| read_timeout_seconds=timedelta(seconds=self._timeout) | ||
| if self._timeout is not None | ||
| else None, | ||
| ) | ||
| ) | ||
| else: | ||
| # For SSE and Streamable HTTP clients, use the sse_read_timeout | ||
| # instead of the connection timeout as the read_timeout for the session. | ||
| session = await exit_stack.enter_async_context( | ||
| ClientSession( | ||
| *transports[:2], | ||
| read_timeout_seconds=timedelta(seconds=self._sse_read_timeout) | ||
| if self._sse_read_timeout is not None | ||
| else None, | ||
| ) | ||
| ) | ||
| await asyncio.wait_for(session.initialize(), timeout=self._timeout) | ||
| logger.debug('Session has been successfully initialized') | ||
|
|
||
| self._session = session | ||
| self._ready_event.set() | ||
|
|
||
| # Wait for close signal - the session remains valid while we wait | ||
| await self._close_event.wait() | ||
| except BaseException as e: | ||
| logger.warning(f'Error on session runner task: {e}') | ||
| raise | ||
| finally: | ||
| self._ready_event.set() | ||
| self._close_event.set() | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential race condition where
start()can returnNone, which violates its-> ClientSessiontype hint. This can happen ifclose()is called from another task shortly afterstart()is called, but before the background task has initialized the session.Here's the sequence:
start()is called, creates the_runtask, andawaits_ready_event.close()is called from another task, setting_close_event._runtask executes.await self._close_event.wait()returns immediately._runtask finishes without ever assigning a value toself._session.finallyblock in_runsets_ready_event.start()unblocks, the exception checks pass, and it returnsself._session, which isNone.This will likely lead to an
AttributeErrorin the calling code. You should add a check to ensureself._sessionis notNonebefore returning.