From 1f900cb70c482cc0b7fb2446fb81a1eff70610f0 Mon Sep 17 00:00:00 2001 From: Sodawyx Date: Mon, 2 Mar 2026 13:39:30 +0800 Subject: [PATCH 1/2] fix: update sandbox and template models to use Pydantic Field for TTL aliases This commit modifies the `sandbox_ttlin_seconds` and `sandbox_idle_ttlin_seconds` fields in the `Sandbox` and `Template` models to utilize Pydantic's `Field` for aliasing to `sandboxTTLInSeconds` and `sandboxIdleTTLInSeconds`, respectively. Additionally, tests have been added to ensure correct parsing and serialization of these fields with their aliases. Co-developed-by: Aone Copilot Signed-off-by: Sodawyx --- agentrun/integration/utils/tool.py | 4 +- agentrun/sandbox/__sandbox_async_template.py | 6 +- agentrun/sandbox/__template_async_template.py | 6 +- agentrun/sandbox/model.py | 6 +- agentrun/sandbox/sandbox.py | 6 +- agentrun/sandbox/template.py | 6 +- .../test_langchain_agui_integration.py | 8 +- tests/unittests/toolset/api/test_openapi.py | 20 ++--- tests/unittests/utils/test_model.py | 78 +++++++++++++++++++ 9 files changed, 111 insertions(+), 29 deletions(-) diff --git a/agentrun/integration/utils/tool.py b/agentrun/integration/utils/tool.py index 5b0e874..bde72a5 100644 --- a/agentrun/integration/utils/tool.py +++ b/agentrun/integration/utils/tool.py @@ -1562,8 +1562,8 @@ def _build_openapi_schema( if isinstance(schema, dict): properties[name] = { **schema, - "description": ( - param.get("description") or schema.get("description", "") + "description": param.get("description") or schema.get( + "description", "" ), } if param.get("required"): diff --git a/agentrun/sandbox/__sandbox_async_template.py b/agentrun/sandbox/__sandbox_async_template.py index 734214f..3f555c0 100644 --- a/agentrun/sandbox/__sandbox_async_template.py +++ b/agentrun/sandbox/__sandbox_async_template.py @@ -15,6 +15,8 @@ Union, ) +from pydantic import Field + from agentrun.sandbox.model import TemplateType from agentrun.utils.config import Config from agentrun.utils.model import BaseModel @@ -56,7 +58,9 @@ class Sandbox(BaseModel): """沙箱全局唯一资源名称 / Sandbox ARN""" sandbox_id: Optional[str] = None """沙箱 ID / Sandbox ID""" - sandbox_idle_ttlin_seconds: Optional[int] = None + sandbox_idle_ttlin_seconds: Optional[int] = Field( + None, alias="sandboxIdleTTLInSeconds" + ) """沙箱空闲 TTL(秒) / Sandbox Idle TTL (seconds)""" sandbox_idle_timeout_seconds: Optional[int] = None """沙箱空闲超时时间(秒) / Sandbox Idle Timeout (seconds)""" diff --git a/agentrun/sandbox/__template_async_template.py b/agentrun/sandbox/__template_async_template.py index bcebe07..2042515 100644 --- a/agentrun/sandbox/__template_async_template.py +++ b/agentrun/sandbox/__template_async_template.py @@ -6,6 +6,8 @@ from typing import Dict, List, Optional +from pydantic import Field + from agentrun.sandbox.model import ( PageableInput, TemplateContainerConfiguration, @@ -52,7 +54,9 @@ class Template(BaseModel): """执行角色 ARN / Execution Role ARN""" sandbox_idle_timeout_in_seconds: Optional[int] = None """沙箱空闲超时时间(秒) / Sandbox Idle Timeout (seconds)""" - sandbox_ttlin_seconds: Optional[int] = None + sandbox_ttlin_seconds: Optional[int] = Field( + None, alias="sandboxTTLInSeconds" + ) """沙箱存活时间(秒) / Sandbox TTL (seconds)""" share_concurrency_limit_per_sandbox: Optional[int] = None """每个沙箱的最大并发会话数 / Max Concurrency Limit Per Sandbox""" diff --git a/agentrun/sandbox/model.py b/agentrun/sandbox/model.py index f79e1b2..b392616 100644 --- a/agentrun/sandbox/model.py +++ b/agentrun/sandbox/model.py @@ -8,7 +8,7 @@ from typing import Any, Dict, List, Optional, TYPE_CHECKING import uuid -from pydantic import model_validator +from pydantic import Field, model_validator from agentrun.utils.model import BaseModel @@ -264,7 +264,9 @@ class TemplateInput(BaseModel): """执行角色 ARN / Execution Role ARN""" sandbox_idle_timeout_in_seconds: Optional[int] = 1800 """沙箱空闲超时时间(秒) / Sandbox Idle Timeout (seconds)""" - sandbox_ttlin_seconds: Optional[int] = 21600 + sandbox_ttlin_seconds: Optional[int] = Field( + 21600, alias="sandboxTTLInSeconds" + ) """沙箱存活时间(秒) / Sandbox TTL (seconds)""" share_concurrency_limit_per_sandbox: Optional[int] = 200 """每个沙箱的最大并发会话数 / Max Concurrency Limit Per Sandbox""" diff --git a/agentrun/sandbox/sandbox.py b/agentrun/sandbox/sandbox.py index 3bf229a..0b6dafd 100644 --- a/agentrun/sandbox/sandbox.py +++ b/agentrun/sandbox/sandbox.py @@ -25,6 +25,8 @@ Union, ) +from pydantic import Field + from agentrun.sandbox.model import TemplateType from agentrun.utils.config import Config from agentrun.utils.model import BaseModel @@ -66,7 +68,9 @@ class Sandbox(BaseModel): """沙箱全局唯一资源名称 / Sandbox ARN""" sandbox_id: Optional[str] = None """沙箱 ID / Sandbox ID""" - sandbox_idle_ttlin_seconds: Optional[int] = None + sandbox_idle_ttlin_seconds: Optional[int] = Field( + None, alias="sandboxIdleTTLInSeconds" + ) """沙箱空闲 TTL(秒) / Sandbox Idle TTL (seconds)""" sandbox_idle_timeout_seconds: Optional[int] = None """沙箱空闲超时时间(秒) / Sandbox Idle Timeout (seconds)""" diff --git a/agentrun/sandbox/template.py b/agentrun/sandbox/template.py index f5bbb4d..3203c14 100644 --- a/agentrun/sandbox/template.py +++ b/agentrun/sandbox/template.py @@ -16,6 +16,8 @@ from typing import Dict, List, Optional +from pydantic import Field + from agentrun.sandbox.model import ( PageableInput, TemplateContainerConfiguration, @@ -62,7 +64,9 @@ class Template(BaseModel): """执行角色 ARN / Execution Role ARN""" sandbox_idle_timeout_in_seconds: Optional[int] = None """沙箱空闲超时时间(秒) / Sandbox Idle Timeout (seconds)""" - sandbox_ttlin_seconds: Optional[int] = None + sandbox_ttlin_seconds: Optional[int] = Field( + None, alias="sandboxTTLInSeconds" + ) """沙箱存活时间(秒) / Sandbox TTL (seconds)""" share_concurrency_limit_per_sandbox: Optional[int] = None """每个沙箱的最大并发会话数 / Max Concurrency Limit Per Sandbox""" diff --git a/tests/unittests/integration/test_langchain_agui_integration.py b/tests/unittests/integration/test_langchain_agui_integration.py index b5dba63..6cfb32b 100644 --- a/tests/unittests/integration/test_langchain_agui_integration.py +++ b/tests/unittests/integration/test_langchain_agui_integration.py @@ -664,9 +664,7 @@ async def invoke_agent(request: AgentRequest): json={ "messages": [{ "role": "user", - "content": ( - "查询当前的时间,并获取天气信息,同时输出我的密钥信息" - ), + "content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息", }], "stream": True, }, @@ -729,9 +727,7 @@ async def invoke_agent(request: AgentRequest): json={ "messages": [{ "role": "user", - "content": ( - "查询当前的时间,并获取天气信息,同时输出我的密钥信息" - ), + "content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息", }], "stream": True, }, diff --git a/tests/unittests/toolset/api/test_openapi.py b/tests/unittests/toolset/api/test_openapi.py index ab9acc6..3a60866 100644 --- a/tests/unittests/toolset/api/test_openapi.py +++ b/tests/unittests/toolset/api/test_openapi.py @@ -545,9 +545,7 @@ def test_post_with_ref_schema(self): "content": { "application/json": { "schema": { - "$ref": ( - "#/components/schemas/CreateOrderRequest" - ) + "$ref": "#/components/schemas/CreateOrderRequest" } } }, @@ -758,9 +756,7 @@ def test_invalid_ref_gracefully_handled(self): "content": { "application/json": { "schema": { - "$ref": ( - "#/components/schemas/NonExistent" - ) + "$ref": "#/components/schemas/NonExistent" } } } @@ -793,9 +789,7 @@ def test_external_ref_not_resolved(self): "content": { "application/json": { "schema": { - "$ref": ( - "https://example.com/schemas/external.json" - ) + "$ref": "https://example.com/schemas/external.json" } } } @@ -915,9 +909,7 @@ def _get_coffee_shop_schema(): "content": { "application/json": { "schema": { - "$ref": ( - "#/components/schemas/CreateOrderRequest" - ) + "$ref": "#/components/schemas/CreateOrderRequest" } } }, @@ -953,9 +945,7 @@ def _get_coffee_shop_schema(): "content": { "application/json": { "schema": { - "$ref": ( - "#/components/schemas/UpdateOrderStatusRequest" - ) + "$ref": "#/components/schemas/UpdateOrderStatusRequest" } } }, diff --git a/tests/unittests/utils/test_model.py b/tests/unittests/utils/test_model.py index b74399c..ab02dcc 100644 --- a/tests/unittests/utils/test_model.py +++ b/tests/unittests/utils/test_model.py @@ -204,3 +204,81 @@ def test_is_final_instance_method(self): """测试实例方法 is_final""" assert Status.READY.is_final() is True assert Status.CREATING.is_final() is False + + +class TestTTLAliasFixIssue53: + """测试 TTL 字段的显式 alias 修复 (Issue #53) + + 验证含有连续大写缩写词 (TTL) 的字段能正确从 API 返回的 camelCase key 解析。 + """ + + def test_template_sandbox_ttlin_seconds_from_api_data(self): + """Template.sandbox_ttlin_seconds 应能通过 sandboxTTLInSeconds 正确解析""" + from agentrun.sandbox.template import Template + + api_data = { + "templateName": "code-interpreter-01", + "sandboxIdleTimeoutInSeconds": 900, + "sandboxTTLInSeconds": 3600, + } + t = Template.model_validate(api_data, by_alias=True) + assert t.sandbox_idle_timeout_in_seconds == 900 + assert t.sandbox_ttlin_seconds == 3600 + assert t.model_extra.get("sandboxTTLInSeconds") is None + + def test_template_sandbox_ttlin_seconds_by_field_name(self): + """Template.sandbox_ttlin_seconds 也应支持通过字段名直接赋值""" + from agentrun.sandbox.template import Template + + t = Template(sandbox_ttlin_seconds=1800) + assert t.sandbox_ttlin_seconds == 1800 + + def test_template_sandbox_ttlin_seconds_serialization(self): + """Template 序列化时应使用正确的 alias sandboxTTLInSeconds""" + from agentrun.sandbox.template import Template + + t = Template(sandbox_ttlin_seconds=7200) + data = t.model_dump(by_alias=True) + assert data["sandboxTTLInSeconds"] == 7200 + + def test_template_input_sandbox_ttlin_seconds_serialization(self): + """TemplateInput.sandbox_ttlin_seconds 序列化应使用 sandboxTTLInSeconds""" + from agentrun.sandbox.model import TemplateInput, TemplateType + + inp = TemplateInput( + template_type=TemplateType.CODE_INTERPRETER, + sandbox_ttlin_seconds=600, + ) + data = inp.model_dump(by_alias=True) + assert data["sandboxTTLInSeconds"] == 600 + + def test_sandbox_idle_ttlin_seconds_from_api_data(self): + """Sandbox.sandbox_idle_ttlin_seconds 应能通过 sandboxIdleTTLInSeconds 正确解析""" + from agentrun.sandbox.sandbox import Sandbox + + api_data = { + "sandboxId": "sb-123", + "sandboxIdleTTLInSeconds": 300, + "sandboxIdleTimeoutSeconds": 600, + } + s = Sandbox.model_validate(api_data, by_alias=True) + assert s.sandbox_idle_ttlin_seconds == 300 + assert s.sandbox_idle_timeout_seconds == 600 + assert s.model_extra.get("sandboxIdleTTLInSeconds") is None + + def test_template_from_inner_object_with_ttl(self): + """Template.from_inner_object 应正确解析 sandboxTTLInSeconds""" + from agentrun.sandbox.template import Template + + class MockDaraModel: + + def to_map(self): + return { + "templateName": "test-template", + "sandboxIdleTimeoutInSeconds": 900, + "sandboxTTLInSeconds": 3600, + } + + t = Template.from_inner_object(MockDaraModel()) + assert t.sandbox_ttlin_seconds == 3600 + assert t.sandbox_idle_timeout_in_seconds == 900 From 61f18a4e2f28ab4cba6ea444c8c08ca2f5d65372 Mon Sep 17 00:00:00 2001 From: OhYee Date: Tue, 3 Mar 2026 11:52:45 +0800 Subject: [PATCH 2/2] test: migrate from astream_events to astream for reliable SSE streaming in CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrate test implementations to use astream(stream_mode="updates") instead of astream_events to resolve async generator cancellation issues in CI environments. Also switch from httpx.ASGITransport to real HTTP servers with uvicorn for proper SSE stream handling in tests, addressing flaky streaming behavior in Linux CI environments. 将测试实现从 astream_events 迁移到 astream(stream_mode="updates") 以解决 CI 环境中的异步生成器取消问题。同时从 httpx.ASGITransport 切换到使用 uvicorn 的真实 HTTP 服务器,以正确处理测试中的 SSE 流式传输,解决 Linux CI 环境中的不稳定流式行为。 Change-Id: I4e9f694a80e952a94e240f479bb40fef59c0d649 Signed-off-by: OhYee --- .../langchain/test_agent_invoke_methods.py | 139 ++++++++++++++---- .../test_langchain_agui_integration.py | 72 ++++++--- 2 files changed, 157 insertions(+), 54 deletions(-) diff --git a/tests/unittests/integration/langchain/test_agent_invoke_methods.py b/tests/unittests/integration/langchain/test_agent_invoke_methods.py index d24404a..4f5237c 100644 --- a/tests/unittests/integration/langchain/test_agent_invoke_methods.py +++ b/tests/unittests/integration/langchain/test_agent_invoke_methods.py @@ -81,6 +81,37 @@ def _sse(data: Dict[str, Any]) -> str: return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" +def _start_server(app: FastAPI) -> tuple: + """启动 FastAPI 服务器并返回 (base_url, server, thread) + + 使用真实的 HTTP 服务器而不是 httpx.ASGITransport, + 因为 ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应, + 会导致流式响应被提前取消 (CancelledError)。 + """ + port = _find_free_port() + config = uvicorn.Config( + app, host="127.0.0.1", port=port, log_level="warning" + ) + server = uvicorn.Server(config) + + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + base_url = f"http://127.0.0.1:{port}" + for i in range(50): + try: + httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2) + break + except Exception: + if i == 49: + raise RuntimeError( + f"Server failed to start within {50 * 0.1}s: {base_url}" + ) + time.sleep(0.1) + + return base_url, server, thread + + def _build_mock_openai_app() -> FastAPI: """构建本地 OpenAI 协议兼容的简单服务""" app = FastAPI() @@ -297,20 +328,34 @@ def parse_sse_events(content: str) -> List[Dict[str, Any]]: async def request_agui_events( - server_app, + server_url_or_app: Union[str, FastAPI], messages: List[Dict[str, str]], stream: bool = True, ) -> List[Dict[str, Any]]: - """发送 AG-UI 请求并返回事件列表""" - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=server_app), - base_url="http://test", - ) as client: - response = await client.post( - "/ag-ui/agent", - json={"messages": messages, "stream": stream}, - timeout=60.0, - ) + """发送 AG-UI 请求并返回事件列表 + + Args: + server_url_or_app: 服务器 URL 或 FastAPI app 对象 + messages: 消息列表 + stream: 是否流式响应 + """ + if isinstance(server_url_or_app, str): + async with httpx.AsyncClient(base_url=server_url_or_app) as client: + response = await client.post( + "/ag-ui/agent", + json={"messages": messages, "stream": stream}, + timeout=60.0, + ) + else: + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=server_url_or_app), + base_url="http://test", + ) as client: + response = await client.post( + "/ag-ui/agent", + json={"messages": messages, "stream": stream}, + timeout=60.0, + ) assert response.status_code == 200 return parse_sse_events(response.text) @@ -670,7 +715,7 @@ def assert_openai_tool_call_response( async def request_openai_events( - server_app, + server_url_or_app: Union[str, FastAPI], messages: List[Dict[str, str]], stream: bool = True, ) -> Union[List[Dict[str, Any]], Dict[str, Any]]: @@ -681,15 +726,23 @@ async def request_openai_events( "stream": stream, } - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=server_app), - base_url="http://test", - ) as client: - response = await client.post( - "/openai/v1/chat/completions", - json=payload, - timeout=60.0, - ) + if isinstance(server_url_or_app, str): + async with httpx.AsyncClient(base_url=server_url_or_app) as client: + response = await client.post( + "/openai/v1/chat/completions", + json=payload, + timeout=60.0, + ) + else: + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=server_url_or_app), + base_url="http://test", + ) as client: + response = await client.post( + "/openai/v1/chat/completions", + json=payload, + timeout=60.0, + ) assert response.status_code == 200 @@ -749,7 +802,16 @@ def agent_model(mock_openai_server: str): @pytest.fixture def server_app_astream_events(agent_model): - """创建使用 astream_events 的服务器(AG-UI/OpenAI 通用)""" + """创建使用 astream 的服务器(AG-UI/OpenAI 通用) + + 返回服务器 URL 而不是 app 对象,因为流式测试需要真实的 HTTP 连接。 + httpx.ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应。 + + 注意: 这里使用 astream(stream_mode="updates") 而非 astream_events, + 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中会出现 + async generator 被提前取消或事件丢失的问题。 + astream_events 的转换逻辑由 test_convert_python_3_10/3_12 单独覆盖。 + """ agent = build_agent(agent_model) async def invoke_agent(request: AgentRequest): @@ -770,8 +832,8 @@ async def invoke_agent(request: AgentRequest): converter = AgentRunConverter() async def generator(): - async for event in agent.astream_events( - cast(Any, input_data), version="v2" + async for event in agent.astream( + cast(Any, input_data), stream_mode="updates" ): for item in converter.convert(event): yield item @@ -779,7 +841,12 @@ async def generator(): return generator() server = AgentRunServer(invoke_agent=invoke_agent) - return server.app + base_url, uvicorn_server, thread = _start_server(server.app) + + yield base_url + + uvicorn_server.should_exit = True + thread.join(timeout=5) # ============================================================================= @@ -1753,8 +1820,8 @@ async def invoke_agent(request: AgentRequest): converter = AgentRunConverter() async def generator(): - async for event in agent.astream_events( - cast(Any, input_data), version="v2" + async for event in agent.astream( + cast(Any, input_data), stream_mode="updates" ): for item in converter.convert(event): yield item @@ -1762,7 +1829,12 @@ async def generator(): return generator() server = AgentRunServer(invoke_agent=invoke_agent) - return server.app + base_url, uvicorn_server, thread = _start_server(server.app) + + yield base_url + + uvicorn_server.should_exit = True + thread.join(timeout=5) @pytest.fixture def server_app_async(self, agent_model): @@ -1787,8 +1859,8 @@ async def invoke_agent(request: AgentRequest): converter = AgentRunConverter() async def generator(): - async for event in agent.astream_events( - cast(Any, input_data), version="v2" + async for event in agent.astream( + cast(Any, input_data), stream_mode="updates" ): for item in converter.convert(event): yield item @@ -1796,7 +1868,12 @@ async def generator(): return generator() server = AgentRunServer(invoke_agent=invoke_agent) - return server.app + base_url, uvicorn_server, thread = _start_server(server.app) + + yield base_url + + uvicorn_server.should_exit = True + thread.join(timeout=5) @pytest.mark.parametrize( "case_key,prompt", diff --git a/tests/unittests/integration/test_langchain_agui_integration.py b/tests/unittests/integration/test_langchain_agui_integration.py index 6cfb32b..0824a56 100644 --- a/tests/unittests/integration/test_langchain_agui_integration.py +++ b/tests/unittests/integration/test_langchain_agui_integration.py @@ -648,36 +648,62 @@ async def invoke_agent(request: AgentRequest): }] } + # 使用 astream(updates) 代替 astream_events, + # 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中 + # 会出现 async generator 被提前取消或事件丢失的问题。 converter = AgentRunConverter() - async for event in agent.astream_events(input_data, version="v2"): + async for event in agent.astream(input_data, stream_mode="updates"): for item in converter.convert(event): yield item - app = AgentRunServer(invoke_agent=invoke_agent).app + server_app = AgentRunServer(invoke_agent=invoke_agent).app - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=app), - base_url="http://test", - ) as client: - response = await client.post( - "/ag-ui/agent", - json={ - "messages": [{ - "role": "user", - "content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息", - }], - "stream": True, - }, - timeout=60.0, - ) + # 使用真实的 HTTP 服务器而不是 httpx.ASGITransport, + # 因为 ASGITransport 在 CI 中无法正确处理 SSE 流式响应。 + port = _find_free_port() + config = uvicorn.Config( + server_app, host="127.0.0.1", port=port, log_level="warning" + ) + uvicorn_server = uvicorn.Server(config) + server_thread = threading.Thread(target=uvicorn_server.run, daemon=True) + server_thread.start() + + base_url = f"http://127.0.0.1:{port}" + for i in range(50): + try: + httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2) + break + except Exception: + if i == 49: + raise RuntimeError( + f"Server failed to start within {50 * 0.1}s" + ) + time.sleep(0.1) - assert response.status_code == 200 + try: + async with httpx.AsyncClient(base_url=base_url) as client: + response = await client.post( + "/ag-ui/agent", + json={ + "messages": [{ + "role": "user", + "content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息", + }], + "stream": True, + }, + timeout=60.0, + ) - events = [line for line in response.text.split("\n") if line] - # Normalize empty delta for consistency with check_result expectations - # astream_events yields "" for empty args, while astream yields "{}" - events = [e.replace('"delta":""', '"delta":"{}"') for e in events] - self.check_result(events) + assert response.status_code == 200 + + events = [line for line in response.text.split("\n") if line] + # Normalize empty delta for consistency with check_result expectations + # astream_events yields "" for empty args, while astream yields "{}" + events = [e.replace('"delta":""', '"delta":"{}"') for e in events] + self.check_result(events) + finally: + uvicorn_server.should_exit = True + server_thread.join(timeout=5) async def test_astream(self, mock_mcp_server): """测试多工具查询场景 (MCP + Local + MockLLM)"""