From 848c9e017fe0e348ff88a4c0f6d76e336b024ea6 Mon Sep 17 00:00:00 2001 From: geobelsky Date: Sun, 15 Mar 2026 22:09:49 +0000 Subject: [PATCH 1/3] fix(sdk): increase SSE stream timeout to prevent ReadTimeout SSE streaming calls now use wait_seconds + 15s read timeout instead of the global 15s client timeout. Fixes agent.listen() crashing when the server holds the connection open for 30s keepalive cycles. Co-Authored-By: Claude Opus 4.6 (1M context) --- axme_sdk/client.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/axme_sdk/client.py b/axme_sdk/client.py index b155c8c..7557f92 100644 --- a/axme_sdk/client.py +++ b/axme_sdk/client.py @@ -1670,11 +1670,18 @@ def _iter_intent_events_stream( if normalized_trace_id is not None: headers = {"X-Trace-Id": normalized_trace_id} + stream_timeout = httpx.Timeout( + connect=10.0, + read=float(wait_seconds) + 15.0, + write=10.0, + pool=10.0, + ) with self._http.stream( "GET", f"/v1/intents/{intent_id}/events/stream", params={"since": str(since), "wait_seconds": str(wait_seconds)}, headers=headers, + timeout=stream_timeout, ) as response: if response.status_code >= 400: self._raise_http_error(response) @@ -1717,11 +1724,20 @@ def _iter_agent_intents_stream( if normalized_trace_id is not None: headers = {"X-Trace-Id": normalized_trace_id} + # SSE streams need a longer read timeout than regular API calls + # (server holds connection open for wait_seconds + processing time) + stream_timeout = httpx.Timeout( + connect=10.0, + read=float(wait_seconds) + 15.0, + write=10.0, + pool=10.0, + ) with self._http.stream( "GET", f"/v1/agents/{path_part}/intents/stream", params={"since": str(since), "wait_seconds": str(wait_seconds)}, headers=headers, + timeout=stream_timeout, ) as response: if response.status_code >= 400: self._raise_http_error(response) From 60e55778587112949317eb8b25c41fc02f94f8bb Mon Sep 17 00:00:00 2001 From: geobelsky Date: Tue, 17 Mar 2026 17:10:25 +0000 Subject: [PATCH 2/3] feat: add status parameter to listen() for SSE stream filtering Server now excludes terminal statuses (COMPLETED, FAILED, CANCELED, TIMED_OUT) by default. SDK gains optional status parameter to override the server default when needed (e.g. for debugging or audit). Backward-compatible: existing agents that call listen() without status= benefit from server-side zombie filtering automatically. Co-Authored-By: Claude Opus 4.6 (1M context) --- axme_sdk/client.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/axme_sdk/client.py b/axme_sdk/client.py index 7557f92..ce6f55d 100644 --- a/axme_sdk/client.py +++ b/axme_sdk/client.py @@ -832,6 +832,7 @@ def listen( since: int = 0, wait_seconds: int = 15, timeout_seconds: float | None = None, + status: str | None = None, trace_id: str | None = None, ) -> Iterator[dict[str, Any]]: """Stream incoming intents for an agent address via SSE. @@ -843,6 +844,10 @@ def listen( automatically reconnects until ``timeout_seconds`` elapses (or forever if ``timeout_seconds`` is ``None``). + By default the server excludes terminal statuses (COMPLETED, FAILED, + CANCELED, TIMED_OUT) so agents only receive actionable intents. Pass an + explicit ``status`` filter to override this behaviour. + Args: address: Full ``agent://org/workspace/name`` or bare ``org/workspace/name`` agent address to listen on. @@ -854,6 +859,9 @@ def listen( new intents. timeout_seconds: Optional wall-clock timeout after which the method raises ``TimeoutError``. ``None`` means listen indefinitely. + status: Comma-separated status filter passed to the server (e.g. + ``"CREATED,DELIVERED"``). When omitted the server applies its + default filter (exclude terminal statuses). trace_id: Optional trace ID forwarded as ``X-Trace-Id``. Yields: @@ -895,6 +903,7 @@ def listen( path_part=path_part, since=next_since, wait_seconds=stream_wait_seconds, + status=status, trace_id=trace_id, ): seq = event.get("seq") @@ -1717,6 +1726,7 @@ def _iter_agent_intents_stream( path_part: str, since: int, wait_seconds: int, + status: str | None = None, trace_id: str | None, ) -> Iterator[dict[str, Any]]: headers: dict[str, str] | None = None @@ -1732,10 +1742,13 @@ def _iter_agent_intents_stream( write=10.0, pool=10.0, ) + params: dict[str, str] = {"since": str(since), "wait_seconds": str(wait_seconds)} + if status: + params["status"] = status with self._http.stream( "GET", f"/v1/agents/{path_part}/intents/stream", - params={"since": str(since), "wait_seconds": str(wait_seconds)}, + params=params, headers=headers, timeout=stream_timeout, ) as response: From 4f74983c4b68159e3a8ef78cdd2afbfbb542926d Mon Sep 17 00:00:00 2001 From: geobelsky Date: Tue, 17 Mar 2026 20:28:04 +0000 Subject: [PATCH 3/3] fix: handle ResponseNotRead in SSE streaming error path When the SSE stream returns a 4xx/5xx, _raise_http_error accessed response.text inside a streaming context, causing httpx.ResponseNotRead. This crashed agents on transient 503s instead of allowing reconnect. Fix: call response.read() before accessing .text/.json(). Co-Authored-By: Claude Opus 4.6 (1M context) --- axme_sdk/client.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/axme_sdk/client.py b/axme_sdk/client.py index ce6f55d..52ed4b4 100644 --- a/axme_sdk/client.py +++ b/axme_sdk/client.py @@ -1905,6 +1905,12 @@ def _parse_json_response(self, response: httpx.Response) -> dict[str, Any]: def _raise_http_error(self, response: httpx.Response) -> None: body: Any | None body = None + # response.read() is required when called inside a streaming context — + # without it, accessing .text or .json() raises httpx.ResponseNotRead. + try: + response.read() + except Exception: + pass message = response.text try: body = response.json()