Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion axme_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ def listen(
since: int = 0,
wait_seconds: int = 15,
timeout_seconds: float | None = None,
status: str | None = None,
trace_id: str | None = None,
) -> Iterator[dict[str, Any]]:
"""Stream incoming intents for an agent address via SSE.
Expand All @@ -843,6 +844,10 @@ def listen(
automatically reconnects until ``timeout_seconds`` elapses (or forever if
``timeout_seconds`` is ``None``).

By default the server excludes terminal statuses (COMPLETED, FAILED,
CANCELED, TIMED_OUT) so agents only receive actionable intents. Pass an
explicit ``status`` filter to override this behaviour.

Args:
address: Full ``agent://org/workspace/name`` or bare ``org/workspace/name``
agent address to listen on.
Expand All @@ -854,6 +859,9 @@ def listen(
new intents.
timeout_seconds: Optional wall-clock timeout after which the method
raises ``TimeoutError``. ``None`` means listen indefinitely.
status: Comma-separated status filter passed to the server (e.g.
``"CREATED,DELIVERED"``). When omitted the server applies its
default filter (exclude terminal statuses).
trace_id: Optional trace ID forwarded as ``X-Trace-Id``.

Yields:
Expand Down Expand Up @@ -895,6 +903,7 @@ def listen(
path_part=path_part,
since=next_since,
wait_seconds=stream_wait_seconds,
status=status,
trace_id=trace_id,
):
seq = event.get("seq")
Expand Down Expand Up @@ -1670,11 +1679,18 @@ def _iter_intent_events_stream(
if normalized_trace_id is not None:
headers = {"X-Trace-Id": normalized_trace_id}

stream_timeout = httpx.Timeout(
connect=10.0,
read=float(wait_seconds) + 15.0,
write=10.0,
pool=10.0,
)
with self._http.stream(
"GET",
f"/v1/intents/{intent_id}/events/stream",
params={"since": str(since), "wait_seconds": str(wait_seconds)},
headers=headers,
timeout=stream_timeout,
) as response:
if response.status_code >= 400:
self._raise_http_error(response)
Expand Down Expand Up @@ -1710,18 +1726,31 @@ def _iter_agent_intents_stream(
path_part: str,
since: int,
wait_seconds: int,
status: str | None = None,
trace_id: str | None,
) -> Iterator[dict[str, Any]]:
headers: dict[str, str] | None = None
normalized_trace_id = self._normalize_trace_id(trace_id)
if normalized_trace_id is not None:
headers = {"X-Trace-Id": normalized_trace_id}

# SSE streams need a longer read timeout than regular API calls
# (server holds connection open for wait_seconds + processing time)
stream_timeout = httpx.Timeout(
connect=10.0,
read=float(wait_seconds) + 15.0,
write=10.0,
pool=10.0,
)
params: dict[str, str] = {"since": str(since), "wait_seconds": str(wait_seconds)}
if status:
params["status"] = status
with self._http.stream(
"GET",
f"/v1/agents/{path_part}/intents/stream",
params={"since": str(since), "wait_seconds": str(wait_seconds)},
params=params,
headers=headers,
timeout=stream_timeout,
) as response:
if response.status_code >= 400:
self._raise_http_error(response)
Expand Down Expand Up @@ -1876,6 +1905,12 @@ def _parse_json_response(self, response: httpx.Response) -> dict[str, Any]:
def _raise_http_error(self, response: httpx.Response) -> None:
body: Any | None
body = None
# response.read() is required when called inside a streaming context —
# without it, accessing .text or .json() raises httpx.ResponseNotRead.
try:
response.read()
except Exception:
pass
message = response.text
try:
body = response.json()
Expand Down
Loading