feat(conversation_service): enhance session management with state sea…#56
feat(conversation_service): enhance session management with state sea…#56
Conversation
…rch index This commit introduces the creation of a new state search index for the session management system, allowing for independent and precise querying by session ID. The changes include: - Added `DEFAULT_STATE_SEARCH_INDEX` to the model for consistent index naming. - Updated `OTSBackend` and `SessionStore` to initialize both conversation and state search indices during table setup. - Implemented asynchronous methods for creating the state search index, ensuring it can be created without conflicts if it already exists. - Enhanced documentation for table and index creation methods to reflect the new functionality. These improvements aim to optimize session state retrieval and management within the conversation service.
There was a problem hiding this comment.
Pull request overview
This PR introduces infrastructure for enhanced session state management by adding a new state search index to the OTS (Object Table Service) backend. The changes create the foundation for independent and precise querying of session state by session_id, eliminating the constraint of requiring primary key prefix matching.
Changes:
- Added
DEFAULT_STATE_SEARCH_INDEXconstant for consistent state search index naming across the codebase - Updated
OTSBackendinitialization methods to automatically create both conversation and state search indices - Included a comprehensive example demonstrating Google ADK integration with OTS session persistence
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| agentrun/conversation_service/model.py | Added DEFAULT_STATE_SEARCH_INDEX constant for state search index naming |
| agentrun/conversation_service/ots_backend.py | Implemented async and sync methods to create state search index with fields for agent_id, user_id, session_id, created_at, and updated_at; integrated index creation into init_tables methods |
| agentrun/conversation_service/__ots_backend_async_template.py | Applied same state search index implementation to async-only template version |
| agentrun/conversation_service/session_store.py | Updated documentation to reflect that init methods now create both conversation and state search indices |
| agentrun/conversation_service/__session_store_async_template.py | Updated documentation in async-only template to match main implementation |
| examples/conversation_service_adk_server.py | Added comprehensive example showing Google ADK agent integration with OTS session management including state persistence |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def init_search_index(self) -> None: | ||
| """创建 Conversation 多元索引(同步)。按需调用。""" | ||
| """创建 Conversation 和 State 多元索引(同步)。 | ||
|
|
||
| 索引已存在时跳过,可重复调用。 | ||
| """ | ||
| self._create_conversation_search_index() | ||
| self._create_state_search_index() |
There was a problem hiding this comment.
The init_search_index method now creates two search indices (conversation and state) instead of one. The existing test at test_ots_backend.py line 226 uses assert_called_once() which will fail when two indices are created. The test should be updated to verify that create_search_index is called twice with the correct parameters.
| except Exception as e: | ||
| print(f"ADK Runner 执行异常: {e}") | ||
| raise Exception("Internal Error") |
There was a problem hiding this comment.
Generic exception is raised instead of a more descriptive custom exception. Consider creating a specific exception type or at minimum providing more context in the exception message, such as including the original exception details to aid in debugging.
| async def _create_state_search_index_async(self) -> None: | ||
| """创建 State 表的多元索引(异步)。 | ||
|
|
||
| 支持按 session_id 独立精确匹配查询,不受主键前缀限制。 | ||
| 索引已存在时跳过。 | ||
| """ | ||
| from tablestore import FieldType # type: ignore[import-untyped] | ||
| from tablestore import IndexSetting # type: ignore[import-untyped] | ||
| from tablestore import SortOrder # type: ignore[import-untyped] | ||
| from tablestore import FieldSchema | ||
| from tablestore import ( | ||
| FieldSort as OTSFieldSort, | ||
| ) # type: ignore[import-untyped] | ||
| from tablestore import SearchIndexMeta | ||
| from tablestore import Sort as OTSSort # type: ignore[import-untyped] | ||
|
|
||
| fields = [ | ||
| FieldSchema( | ||
| "agent_id", | ||
| FieldType.KEYWORD, | ||
| index=True, | ||
| enable_sort_and_agg=True, | ||
| ), | ||
| FieldSchema( | ||
| "user_id", | ||
| FieldType.KEYWORD, | ||
| index=True, | ||
| enable_sort_and_agg=True, | ||
| ), | ||
| FieldSchema( | ||
| "session_id", | ||
| FieldType.KEYWORD, | ||
| index=True, | ||
| enable_sort_and_agg=True, | ||
| ), | ||
| FieldSchema( | ||
| "created_at", | ||
| FieldType.LONG, | ||
| index=True, | ||
| enable_sort_and_agg=True, | ||
| ), | ||
| FieldSchema( | ||
| "updated_at", | ||
| FieldType.LONG, | ||
| index=True, | ||
| enable_sort_and_agg=True, | ||
| ), | ||
| ] | ||
|
|
||
| index_setting = IndexSetting(routing_fields=["agent_id"]) | ||
| index_sort = OTSSort( | ||
| sorters=[OTSFieldSort("updated_at", sort_order=SortOrder.DESC)] | ||
| ) | ||
| index_meta = SearchIndexMeta( | ||
| fields, | ||
| index_setting=index_setting, | ||
| index_sort=index_sort, | ||
| ) | ||
|
|
||
| try: | ||
| await self._async_client.create_search_index( | ||
| self._state_table, | ||
| self._state_search_index, | ||
| index_meta, | ||
| ) | ||
| logger.info( | ||
| "Created search index: %s on table: %s", | ||
| self._state_search_index, | ||
| self._state_table, | ||
| ) | ||
| except OTSServiceError as e: | ||
| if "already exist" in str(e).lower() or ( | ||
| hasattr(e, "code") and e.code == "OTSObjectAlreadyExist" | ||
| ): | ||
| logger.warning( | ||
| "Search index %s already exists, skipping.", | ||
| self._state_search_index, | ||
| ) | ||
| else: | ||
| raise | ||
|
|
There was a problem hiding this comment.
The _create_state_search_index_async and _create_state_search_index methods contain 81 lines of nearly identical code (imports, field definitions, index configuration). Consider extracting the common index metadata creation logic into a shared helper method to reduce duplication and improve maintainability. This pattern is also present in the conversation search index methods.
| self._conversation_search_index = ( | ||
| f"{table_prefix}{DEFAULT_CONVERSATION_SEARCH_INDEX}" | ||
| ) | ||
| self._state_search_index = f"{table_prefix}{DEFAULT_STATE_SEARCH_INDEX}" |
There was a problem hiding this comment.
The table_prefix test should be updated to include an assertion for _state_search_index to ensure the newly added state search index name is correctly prefixed, maintaining consistency with other table and index name tests.
Adapt existing tests to account for init_search_index now creating both conversation and state search indices (2 calls instead of 1). Add _state_search_index assertion to test_table_prefix. Made-with: Cursor
…rch index
This commit introduces the creation of a new state search index for the session management system, allowing for independent and precise querying by session ID. The changes include:
DEFAULT_STATE_SEARCH_INDEXto the model for consistent index naming.OTSBackendandSessionStoreto initialize both conversation and state search indices during table setup.These improvements aim to optimize session state retrieval and management within the conversation service.
Fix bugs
Bug detail
Pull request tasks
Update docs
Reason for update
Pull request tasks
Add contributor
Contributed content
Content detail
Others
Reason for update