Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 92 additions & 3 deletions agentrun/conversation_service/__ots_backend_async_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
DEFAULT_CONVERSATION_SECONDARY_INDEX,
DEFAULT_CONVERSATION_TABLE,
DEFAULT_EVENT_TABLE,
DEFAULT_STATE_SEARCH_INDEX,
DEFAULT_STATE_TABLE,
DEFAULT_USER_STATE_TABLE,
StateData,
Expand Down Expand Up @@ -97,15 +98,17 @@ def __init__(
self._conversation_search_index = (
f"{table_prefix}{DEFAULT_CONVERSATION_SEARCH_INDEX}"
)
self._state_search_index = f"{table_prefix}{DEFAULT_STATE_SEARCH_INDEX}"

# -----------------------------------------------------------------------
# 建表(异步)/ Table creation (async)
# -----------------------------------------------------------------------

async def init_tables_async(self) -> None:
"""创建五张表和 Conversation 二级索引(异步)。
"""创建五张表、二级索引和多元索引(异步)。

表已存在时跳过(catch OTSServiceError 并 log warning)。
包括 Conversation 二级索引、Conversation 多元索引和 State 多元索引。
表或索引已存在时跳过(catch OTSServiceError 并 log warning)。
"""
await self._create_conversation_table_async()
await self._create_event_table_async()
Expand All @@ -125,6 +128,7 @@ async def init_tables_async(self) -> None:
self._user_state_table,
[("agent_id", "STRING"), ("user_id", "STRING")],
)
await self.init_search_index_async()

async def init_core_tables_async(self) -> None:
"""创建核心表(Conversation + Event)和二级索引(异步)。"""
Expand All @@ -151,8 +155,12 @@ async def init_state_tables_async(self) -> None:
)

async def init_search_index_async(self) -> None:
"""创建 Conversation 多元索引(异步)。按需调用。"""
"""创建 Conversation 和 State 多元索引(异步)。

索引已存在时跳过,可重复调用。
"""
await self._create_conversation_search_index_async()
await self._create_state_search_index_async()

async def _create_conversation_table_async(self) -> None:
"""创建 Conversation 表 + 二级索引(异步)。"""
Expand Down Expand Up @@ -383,6 +391,87 @@ async def _create_conversation_search_index_async(self) -> None:
else:
raise

async def _create_state_search_index_async(self) -> None:
"""创建 State 表的多元索引(异步)。

支持按 session_id 独立精确匹配查询,不受主键前缀限制。
索引已存在时跳过。
"""
from tablestore import FieldType # type: ignore[import-untyped]
from tablestore import IndexSetting # type: ignore[import-untyped]
from tablestore import SortOrder # type: ignore[import-untyped]
from tablestore import FieldSchema
from tablestore import (
FieldSort as OTSFieldSort,
) # type: ignore[import-untyped]
from tablestore import SearchIndexMeta
from tablestore import Sort as OTSSort # type: ignore[import-untyped]

fields = [
FieldSchema(
"agent_id",
FieldType.KEYWORD,
index=True,
enable_sort_and_agg=True,
),
FieldSchema(
"user_id",
FieldType.KEYWORD,
index=True,
enable_sort_and_agg=True,
),
FieldSchema(
"session_id",
FieldType.KEYWORD,
index=True,
enable_sort_and_agg=True,
),
FieldSchema(
"created_at",
FieldType.LONG,
index=True,
enable_sort_and_agg=True,
),
FieldSchema(
"updated_at",
FieldType.LONG,
index=True,
enable_sort_and_agg=True,
),
]

index_setting = IndexSetting(routing_fields=["agent_id"])
index_sort = OTSSort(
sorters=[OTSFieldSort("updated_at", sort_order=SortOrder.DESC)]
)
index_meta = SearchIndexMeta(
fields,
index_setting=index_setting,
index_sort=index_sort,
)

try:
await self._async_client.create_search_index(
self._state_table,
self._state_search_index,
index_meta,
)
logger.info(
"Created search index: %s on table: %s",
self._state_search_index,
self._state_table,
)
except OTSServiceError as e:
if "already exist" in str(e).lower() or (
hasattr(e, "code") and e.code == "OTSObjectAlreadyExist"
):
logger.warning(
"Search index %s already exists, skipping.",
self._state_search_index,
)
else:
raise

# -----------------------------------------------------------------------
# Session CRUD(异步)/ Session CRUD (async)
# -----------------------------------------------------------------------
Expand Down
10 changes: 8 additions & 2 deletions agentrun/conversation_service/__session_store_async_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def __init__(self, ots_backend: OTSBackend) -> None:
self._backend = ots_backend

async def init_tables_async(self) -> None:
"""创建所有 OTS 表和索引(异步)。代理到 OTSBackend.init_tables_async()。"""
"""创建所有 OTS 表、二级索引和多元索引(异步)。

包括建表和创建搜索索引,无需再单独调用 init_search_index_async()。
"""
await self._backend.init_tables_async()

async def init_core_tables_async(self) -> None:
Expand All @@ -47,7 +50,10 @@ async def init_state_tables_async(self) -> None:
await self._backend.init_state_tables_async()

async def init_search_index_async(self) -> None:
"""创建 Conversation 多元索引(异步)。按需调用。"""
"""创建 Conversation 和 State 多元索引(异步)。

索引已存在时跳过,可重复调用。
"""
await self._backend.init_search_index_async()

# -------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions agentrun/conversation_service/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
DEFAULT_USER_STATE_TABLE = "user_state"
DEFAULT_CONVERSATION_SECONDARY_INDEX = "conversation_secondary_index"
DEFAULT_CONVERSATION_SEARCH_INDEX = "conversation_search_index"
DEFAULT_STATE_SEARCH_INDEX = "state_search_index"


# ---------------------------------------------------------------------------
Expand Down
Loading
Loading