diff --git a/README.md b/README.md index cc5330e..3f4c7fd 100644 --- a/README.md +++ b/README.md @@ -89,19 +89,25 @@ client = AxmeClient( # Check connectivity print(client.health()) -# Send an intent +# Send an intent to a registered agent address intent = client.create_intent( { "intent_type": "order.fulfillment.v1", + "to_agent": "agent://acme-corp/production/fulfillment-service", "payload": {"order_id": "ord_123", "priority": "high"}, - "owner_agent": "agent://fulfillment-service", }, idempotency_key="fulfill-ord-123-001", + correlation_id="corr-ord-123-001", ) print(intent["intent_id"], intent["status"]) +# List registered agent addresses in your workspace +agents = client.list_agents(org_id="acme-corp-uuid", workspace_id="prod-ws-uuid") +for agent in agents["agents"]: + print(agent["address"], agent["status"]) + # Wait for resolution -resolved = client.wait_for(intent["intent_id"], terminal_states={"RESOLVED", "CANCELLED"}) +resolved = client.wait_for(intent["intent_id"]) print(resolved["status"]) ``` diff --git a/axme_sdk/client.py b/axme_sdk/client.py index 40f5fbe..b155c8c 100644 --- a/axme_sdk/client.py +++ b/axme_sdk/client.py @@ -115,6 +115,49 @@ def send_intent( raise ValueError("create_intent response does not include string intent_id") return intent_id + def apply_scenario( + self, + bundle: dict[str, Any], + *, + idempotency_key: str | None = None, + trace_id: str | None = None, + ) -> dict[str, Any]: + """Submit a ScenarioBundle to POST /v1/scenarios/apply. + + The server provisions missing agents, compiles the workflow, and creates the + intent in one atomic operation. Returns the full bundle response including + ``intent_id``, ``compile_id``, ``agents_provisioned``. + """ + payload = dict(bundle) + if idempotency_key is not None: + payload.setdefault("idempotency_key", idempotency_key) + return self._request_json( + "POST", + "/v1/scenarios/apply", + json_body=payload, + idempotency_key=idempotency_key, + trace_id=trace_id, + retryable=idempotency_key is not None, + ) + + def validate_scenario( + self, + bundle: dict[str, Any], + *, + trace_id: str | None = None, + ) -> dict[str, Any]: + """Dry-run validate a ScenarioBundle without creating any resources. + + Returns a list of validation errors (empty list means valid). + """ + return self._request_json( + "POST", + "/v1/scenarios/validate", + json_body=bundle, + trace_id=trace_id, + retryable=True, + ) + def list_intent_events( self, intent_id: str, @@ -744,6 +787,121 @@ def create_service_account_key( retryable=idempotency_key is not None, ) + def list_agents( + self, + *, + org_id: str, + workspace_id: str, + limit: int | None = None, + trace_id: str | None = None, + ) -> dict[str, Any]: + """List registered agent addresses in a workspace. + + Returns a dict with an ``agents`` list, each entry containing + ``address``, ``display_name``, ``status``, and ``created_at``. + """ + params: dict[str, str] = {"org_id": org_id, "workspace_id": workspace_id} + if limit is not None: + params["limit"] = str(limit) + return self._request_json( + "GET", + "/v1/agents", + params=params, + trace_id=trace_id, + retryable=True, + ) + + def get_agent(self, address: str, *, trace_id: str | None = None) -> dict[str, Any]: + """Get agent address details by full ``agent://org/workspace/name`` address.""" + if not isinstance(address, str) or not address.strip(): + raise ValueError("address must be a non-empty string") + path_part = address.strip() + if path_part.startswith("agent://"): + path_part = path_part[len("agent://"):] + return self._request_json( + "GET", + f"/v1/agents/{path_part}", + trace_id=trace_id, + retryable=True, + ) + + def listen( + self, + address: str, + *, + since: int = 0, + wait_seconds: int = 15, + timeout_seconds: float | None = None, + trace_id: str | None = None, + ) -> Iterator[dict[str, Any]]: + """Stream incoming intents for an agent address via SSE. + + Connects to ``GET /v1/agents/{address}/intents/stream`` and yields each + intent payload as it arrives. The stream is a long-lived SSE connection; + the server sends a ``stream.timeout`` keepalive event when there are no + new intents within ``wait_seconds``, at which point the client + automatically reconnects until ``timeout_seconds`` elapses (or forever if + ``timeout_seconds`` is ``None``). + + Args: + address: Full ``agent://org/workspace/name`` or bare ``org/workspace/name`` + agent address to listen on. + since: Sequence cursor — only intents with a sequence number greater + than this value are returned. Pass the ``seq`` value from the last + received event to resume without gaps. + wait_seconds: Server-side long-poll window (1–60 s). The server keeps + the connection open for up to this many seconds while waiting for + new intents. + timeout_seconds: Optional wall-clock timeout after which the method + raises ``TimeoutError``. ``None`` means listen indefinitely. + trace_id: Optional trace ID forwarded as ``X-Trace-Id``. + + Yields: + Each intent payload dict as it arrives on the stream. + + Raises: + ValueError: If ``address`` is empty or arguments are out of range. + TimeoutError: If ``timeout_seconds`` elapses before the caller + stops iterating. + """ + if not isinstance(address, str) or not address.strip(): + raise ValueError("address must be a non-empty string") + if since < 0: + raise ValueError("since must be >= 0") + if wait_seconds < 1: + raise ValueError("wait_seconds must be >= 1") + if timeout_seconds is not None and timeout_seconds <= 0: + raise ValueError("timeout_seconds must be > 0 when provided") + + path_part = address.strip() + if path_part.startswith("agent://"): + path_part = path_part[len("agent://"):] + + deadline = (time.monotonic() + timeout_seconds) if timeout_seconds is not None else None + next_since = since + + while True: + if deadline is not None and time.monotonic() >= deadline: + raise TimeoutError(f"timed out while listening on {address}") + + stream_wait_seconds = wait_seconds + if deadline is not None: + seconds_left = max(0.0, deadline - time.monotonic()) + if seconds_left <= 0: + raise TimeoutError(f"timed out while listening on {address}") + stream_wait_seconds = max(1, min(wait_seconds, int(seconds_left))) + + for event in self._iter_agent_intents_stream( + path_part=path_part, + since=next_since, + wait_seconds=stream_wait_seconds, + trace_id=trace_id, + ): + seq = event.get("seq") + if isinstance(seq, int) and seq >= 0: + next_since = max(next_since, seq) + yield event + def revoke_service_account_key( self, service_account_id: str, @@ -1546,6 +1704,53 @@ def _iter_intent_events_stream( data_lines.append(line.partition(":")[2].lstrip()) continue + def _iter_agent_intents_stream( + self, + *, + path_part: str, + since: int, + wait_seconds: int, + trace_id: str | None, + ) -> Iterator[dict[str, Any]]: + headers: dict[str, str] | None = None + normalized_trace_id = self._normalize_trace_id(trace_id) + if normalized_trace_id is not None: + headers = {"X-Trace-Id": normalized_trace_id} + + with self._http.stream( + "GET", + f"/v1/agents/{path_part}/intents/stream", + params={"since": str(since), "wait_seconds": str(wait_seconds)}, + headers=headers, + ) as response: + if response.status_code >= 400: + self._raise_http_error(response) + + current_event: str | None = None + data_lines: list[str] = [] + for line in response.iter_lines(): + if line == "": + if current_event == "stream.timeout": + return + if current_event and data_lines: + try: + payload = json.loads("\n".join(data_lines)) + except ValueError: + payload = None + if isinstance(payload, dict) and current_event.startswith("intent."): + yield payload + current_event = None + data_lines = [] + continue + if line.startswith(":"): + continue + if line.startswith("event:"): + current_event = line.partition(":")[2].strip() + continue + if line.startswith("data:"): + data_lines.append(line.partition(":")[2].lstrip()) + continue + def _mcp_request( self, *, @@ -1748,7 +1953,9 @@ def _max_seen_seq(*, next_since: int, event: dict[str, Any]) -> int: def _is_terminal_intent_event(event: dict[str, Any]) -> bool: status = event.get("status") - if isinstance(status, str) and status in {"COMPLETED", "FAILED", "CANCELED"}: + if isinstance(status, str) and status in {"COMPLETED", "FAILED", "CANCELED", "TIMED_OUT"}: return True event_type = event.get("event_type") - return isinstance(event_type, str) and event_type in {"intent.completed", "intent.failed", "intent.canceled"} + return isinstance(event_type, str) and event_type in { + "intent.completed", "intent.failed", "intent.canceled", "intent.timed_out" + } diff --git a/examples/basic_submit.py b/examples/basic_submit.py index 95f81e5..52dbbe0 100644 --- a/examples/basic_submit.py +++ b/examples/basic_submit.py @@ -15,8 +15,7 @@ def main() -> None: created = client.create_intent( { "intent_type": "intent.demo.v1", - "from_agent": "agent://basic/python/source", - "to_agent": "agent://basic/python/target", + "to_agent": "agent://acme-corp/production/target", "payload": {"task": "hello-from-python"}, }, correlation_id=correlation_id, diff --git a/tests/test_client.py b/tests/test_client.py index de4dbb8..b0f3ed4 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1686,3 +1686,213 @@ def handler(request: httpx.Request) -> httpx.Response: ) assert result["ok"] is True assert attempts == 2 + + +# --------------------------------------------------------------------------- +# listen(address) — agent intent stream +# --------------------------------------------------------------------------- + + +def _sse(events: list[tuple[str, dict]]) -> str: + """Build a minimal SSE payload from (event_type, payload) pairs.""" + parts = [] + for event_type, payload in events: + parts.append(f"event: {event_type}\ndata: {json.dumps(payload)}\n\n") + return "".join(parts) + + +def test_listen_yields_intent_events_from_sse() -> None: + address = "agent://acme/main/router" + intent1 = { + "intent_id": "aaaa-1", + "seq": 1, + "event_type": "intent.submitted", + "status": "SUBMITTED", + "at": "2026-03-01T00:00:00Z", + } + intent2 = { + "intent_id": "bbbb-2", + "seq": 2, + "event_type": "intent.submitted", + "status": "SUBMITTED", + "at": "2026-03-01T00:00:01Z", + } + + def handler(request: httpx.Request) -> httpx.Response: + assert request.method == "GET" + assert request.url.path == "/v1/agents/acme/main/router/intents/stream" + # Deliver two intents then keepalive to end the stream segment + return httpx.Response( + 200, + text=_sse([("intent.submitted", intent1), ("intent.submitted", intent2)]) + + "event: stream.timeout\ndata: {}\n\n", + ) + + client = _client(handler) + gen = client.listen(address) + received = [next(gen), next(gen)] + assert received[0]["intent_id"] == "aaaa-1" + assert received[1]["intent_id"] == "bbbb-2" + + +def test_listen_strips_agent_scheme_from_url() -> None: + """address with and without 'agent://' prefix must hit the same URL path.""" + paths_seen: list[str] = [] + + def handler(request: httpx.Request) -> httpx.Response: + paths_seen.append(request.url.path) + # Return one intent then stream.timeout to end the stream cleanly + intent = {"intent_id": "x1", "seq": 1, "event_type": "intent.submitted", "status": "SUBMITTED", "at": "2026-03-01T00:00:00Z"} + return httpx.Response(200, text=_sse([("intent.submitted", intent)]) + "event: stream.timeout\ndata: {}\n\n") + + client = _client(handler) + # with scheme — consume one event + next(client.listen("agent://org/ws/bot")) + # without scheme — consume one event + next(client.listen("org/ws/bot")) + assert paths_seen[0] == paths_seen[1] == "/v1/agents/org/ws/bot/intents/stream" + + +def test_listen_since_cursor_forwarded() -> None: + def handler(request: httpx.Request) -> httpx.Response: + assert request.url.params.get("since") == "42" + intent = {"intent_id": "x1", "seq": 43, "event_type": "intent.submitted", "status": "SUBMITTED", "at": "2026-03-01T00:00:00Z"} + return httpx.Response(200, text=_sse([("intent.submitted", intent)]) + "event: stream.timeout\ndata: {}\n\n") + + client = _client(handler) + result = next(client.listen("org/ws/bot", since=42)) + assert result["intent_id"] == "x1" + + +def test_listen_reconnects_on_stream_timeout_keepalive() -> None: + """A stream.timeout SSE event causes the client to reconnect.""" + calls: list[int] = [] + intent = { + "intent_id": "cc-3", + "seq": 5, + "event_type": "intent.submitted", + "status": "SUBMITTED", + "at": "2026-03-01T00:00:00Z", + } + + def handler(request: httpx.Request) -> httpx.Response: + call_no = len(calls) + 1 + calls.append(call_no) + if call_no == 1: + # first response: keepalive only → reconnect + return httpx.Response( + 200, + text="event: stream.timeout\ndata: {}\n\n", + ) + # second response: real intent + keepalive → stops reconnect + return httpx.Response( + 200, + text=_sse([("intent.submitted", intent)]) + "event: stream.timeout\ndata: {}\n\n", + ) + + client = _client(handler) + # consume one event (which only arrives on the second request) + received = next(client.listen("org/ws/bot", wait_seconds=1)) + assert received["intent_id"] == "cc-3" + assert len(calls) >= 2 + + +def test_listen_advances_since_cursor_across_reconnects() -> None: + """After receiving seq=7 the next request should use since=7.""" + requests_seen: list[str] = [] + + intent = { + "intent_id": "dd-4", + "seq": 7, + "event_type": "intent.submitted", + "status": "SUBMITTED", + "at": "2026-03-01T00:00:00Z", + } + intent2 = { + "intent_id": "ee-5", + "seq": 8, + "event_type": "intent.submitted", + "status": "SUBMITTED", + "at": "2026-03-01T00:00:01Z", + } + + def handler(request: httpx.Request) -> httpx.Response: + since_param = request.url.params.get("since", "?") + requests_seen.append(since_param) + call_no = len(requests_seen) + if call_no == 1: + # first call: deliver intent with seq=7 + keepalive + return httpx.Response( + 200, + text=_sse([("intent.submitted", intent)]) + "event: stream.timeout\ndata: {}\n\n", + ) + # second call: deliver second intent so we can consume it + return httpx.Response( + 200, + text=_sse([("intent.submitted", intent2)]) + "event: stream.timeout\ndata: {}\n\n", + ) + + client = _client(handler) + gen = client.listen("org/ws/bot") + e1 = next(gen) + assert e1["seq"] == 7 + e2 = next(gen) + assert e2["seq"] == 8 + # the reconnect should use since=7 + assert requests_seen[0] == "0" + assert requests_seen[1] == "7" + + +def test_listen_raises_on_empty_address() -> None: + client = _client(lambda r: httpx.Response(200, text="")) + with pytest.raises(ValueError, match="address must be a non-empty string"): + list(client.listen("")) + + +def test_listen_raises_on_negative_since() -> None: + client = _client(lambda r: httpx.Response(200, text="")) + with pytest.raises(ValueError, match="since must be >= 0"): + list(client.listen("org/ws/bot", since=-1)) + + +def test_listen_raises_on_invalid_wait_seconds() -> None: + client = _client(lambda r: httpx.Response(200, text="")) + with pytest.raises(ValueError, match="wait_seconds must be >= 1"): + list(client.listen("org/ws/bot", wait_seconds=0)) + + +def test_listen_raises_timeout_error_when_deadline_exceeded() -> None: + import time as _time + + def handler(request: httpx.Request) -> httpx.Response: + # Simulate a short server delay so we don't spin too tight + _time.sleep(0.05) + return httpx.Response(200, text="event: stream.timeout\ndata: {}\n\n") + + client = _client(handler) + with pytest.raises(TimeoutError): + list(client.listen("org/ws/bot", timeout_seconds=0.12)) + + +def test_listen_requires_api_key() -> None: + def handler(request: httpx.Request) -> httpx.Response: + if not request.headers.get("x-api-key"): + return httpx.Response(401, json={"error": "Unauthorized"}) + intent = {"intent_id": "x1", "seq": 1, "event_type": "intent.submitted", "status": "SUBMITTED", "at": "2026-03-01T00:00:00Z"} + return httpx.Response(200, text=_sse([("intent.submitted", intent)]) + "event: stream.timeout\ndata: {}\n\n") + + client = _client(handler, api_key="my-api-key") + # should succeed — consume one event without raising auth error + result = next(client.listen("org/ws/bot")) + assert result["intent_id"] == "x1" + + +def test_listen_raises_auth_error_on_401() -> None: + from axme.exceptions import AxmeAuthError + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(401, json={"error": "Unauthorized"}) + + client = _client(handler) + with pytest.raises(AxmeAuthError): + list(client.listen("org/ws/bot"))