diff --git a/axme_sdk/client.py b/axme_sdk/client.py index b155c8c..52ed4b4 100644 --- a/axme_sdk/client.py +++ b/axme_sdk/client.py @@ -832,6 +832,7 @@ def listen( since: int = 0, wait_seconds: int = 15, timeout_seconds: float | None = None, + status: str | None = None, trace_id: str | None = None, ) -> Iterator[dict[str, Any]]: """Stream incoming intents for an agent address via SSE. @@ -843,6 +844,10 @@ def listen( automatically reconnects until ``timeout_seconds`` elapses (or forever if ``timeout_seconds`` is ``None``). + By default the server excludes terminal statuses (COMPLETED, FAILED, + CANCELED, TIMED_OUT) so agents only receive actionable intents. Pass an + explicit ``status`` filter to override this behaviour. + Args: address: Full ``agent://org/workspace/name`` or bare ``org/workspace/name`` agent address to listen on. @@ -854,6 +859,9 @@ def listen( new intents. timeout_seconds: Optional wall-clock timeout after which the method raises ``TimeoutError``. ``None`` means listen indefinitely. + status: Comma-separated status filter passed to the server (e.g. + ``"CREATED,DELIVERED"``). When omitted the server applies its + default filter (exclude terminal statuses). trace_id: Optional trace ID forwarded as ``X-Trace-Id``. Yields: @@ -895,6 +903,7 @@ def listen( path_part=path_part, since=next_since, wait_seconds=stream_wait_seconds, + status=status, trace_id=trace_id, ): seq = event.get("seq") @@ -1670,11 +1679,18 @@ def _iter_intent_events_stream( if normalized_trace_id is not None: headers = {"X-Trace-Id": normalized_trace_id} + stream_timeout = httpx.Timeout( + connect=10.0, + read=float(wait_seconds) + 15.0, + write=10.0, + pool=10.0, + ) with self._http.stream( "GET", f"/v1/intents/{intent_id}/events/stream", params={"since": str(since), "wait_seconds": str(wait_seconds)}, headers=headers, + timeout=stream_timeout, ) as response: if response.status_code >= 400: self._raise_http_error(response) @@ -1710,6 +1726,7 @@ def _iter_agent_intents_stream( path_part: str, since: int, wait_seconds: int, + status: str | None = None, trace_id: str | None, ) -> Iterator[dict[str, Any]]: headers: dict[str, str] | None = None @@ -1717,11 +1734,23 @@ def _iter_agent_intents_stream( if normalized_trace_id is not None: headers = {"X-Trace-Id": normalized_trace_id} + # SSE streams need a longer read timeout than regular API calls + # (server holds connection open for wait_seconds + processing time) + stream_timeout = httpx.Timeout( + connect=10.0, + read=float(wait_seconds) + 15.0, + write=10.0, + pool=10.0, + ) + params: dict[str, str] = {"since": str(since), "wait_seconds": str(wait_seconds)} + if status: + params["status"] = status with self._http.stream( "GET", f"/v1/agents/{path_part}/intents/stream", - params={"since": str(since), "wait_seconds": str(wait_seconds)}, + params=params, headers=headers, + timeout=stream_timeout, ) as response: if response.status_code >= 400: self._raise_http_error(response) @@ -1876,6 +1905,12 @@ def _parse_json_response(self, response: httpx.Response) -> dict[str, Any]: def _raise_http_error(self, response: httpx.Response) -> None: body: Any | None body = None + # response.read() is required when called inside a streaming context — + # without it, accessing .text or .json() raises httpx.ResponseNotRead. + try: + response.read() + except Exception: + pass message = response.text try: body = response.json()