From 433c999bbdb4817d2048c5454cb65b54812950af Mon Sep 17 00:00:00 2001 From: Nitesh Dhanpal Date: Wed, 17 Jun 2026 21:10:11 -0700 Subject: [PATCH 1/3] =?UTF-8?q?feat(compat):=20runtime=20SDK=E2=86=94backe?= =?UTF-8?q?nd=20version=20guard=20at=20ACP=20startup=20(#408)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Claude Opus 4.8 --- src/agentex/lib/core/compat/__init__.py | 1 + src/agentex/lib/core/compat/version_guard.py | 164 ++++++++++++++ .../lib/core/temporal/workers/worker.py | 5 + .../lib/sdk/fastacp/base/base_acp_server.py | 4 + tests/lib/core/temporal/__init__.py | 0 tests/lib/core/temporal/workers/__init__.py | 0 .../workers/test_worker_version_guard.py | 70 ++++++ tests/test_version_guard.py | 208 ++++++++++++++++++ 8 files changed, 452 insertions(+) create mode 100644 src/agentex/lib/core/compat/__init__.py create mode 100644 src/agentex/lib/core/compat/version_guard.py create mode 100644 tests/lib/core/temporal/__init__.py create mode 100644 tests/lib/core/temporal/workers/__init__.py create mode 100644 tests/lib/core/temporal/workers/test_worker_version_guard.py create mode 100644 tests/test_version_guard.py diff --git a/src/agentex/lib/core/compat/__init__.py b/src/agentex/lib/core/compat/__init__.py new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/src/agentex/lib/core/compat/__init__.py @@ -0,0 +1 @@ + diff --git a/src/agentex/lib/core/compat/version_guard.py b/src/agentex/lib/core/compat/version_guard.py new file mode 100644 index 000000000..56933de0b --- /dev/null +++ b/src/agentex/lib/core/compat/version_guard.py @@ -0,0 +1,164 @@ +"""Runtime SDK ↔ backend contract-version guard. + +Complements the *build-time* cross-version compatibility tests (``tests/compat``): + +- **Build-time** (CI): is this *client* compatible with the window of supported server + contracts (``min-supported``..``current``)? +- **Runtime** (this module): is the *server* the SDK is pointed at within that window? + +It runs once at ACP/worker startup, reads the backend's contract version (the version +the server already reports via ``/openapi.json`` ``info.version``), and **fails fast with +an actionable error** if the backend is older than this SDK supports — instead of the +mismatch surfacing later as opaque 500s / missing-field errors deep in a request. + +``MIN_BACKEND_CONTRACT`` is the same source of truth as the ``min-supported`` server +contract in ``tests/compat/server_specs/manifest.json``: the oldest agentex backend this +SDK version supports. Bump both together when a breaking change raises the floor. +""" + +from __future__ import annotations + +import os +import re + +import httpx + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + +# Oldest agentex backend contract this SDK is compatible with. +# Keep in sync with the `min-supported` spec in tests/compat (#407); the version axis +# itself comes from scale-agentex release tags (#321). Bump on a breaking SDK change. +MIN_BACKEND_CONTRACT = "0.1.0" + +SKIP_ENV = "AGENTEX_SKIP_VERSION_CHECK" + +# Full-string SemVer. Accepts: `1.2.3`, leading `v`, surrounding whitespace, `-prerelease` +# (captured), `+build` (ignored). Anchored at both ends so a malformed tail (`0.1.0rc1`, +# `0.1.0.1`) is rejected → None → "unknown, proceed", not silently coerced to stable `0.1.0`. +_VERSION_RE = re.compile( + r"^\s*v?(\d+)\.(\d+)\.(\d+)" # major.minor.patch + r"(?:-([0-9A-Za-z.-]+))?" # optional -prerelease (captured) + r"(?:\+[0-9A-Za-z.-]+)?" # optional +build metadata (ignored) + r"\s*$" +) + + +class IncompatibleBackendError(RuntimeError): + """Raised when the agentex backend is older than this SDK's minimum supported contract.""" + + +def _parse(version: str | None) -> tuple[int, int, int, str | None] | None: + """Parse ``major.minor.patch[-prerelease]`` → ``(major, minor, patch, prerelease)``. + + ``prerelease`` is the raw dot-separated identifier string (e.g. ``"rc.1"``), or None for + a stable release. Build metadata (after ``+``) is ignored. Returns None if unparseable. + """ + m = _VERSION_RE.match(version or "") + if not m: + return None + return (int(m.group(1)), int(m.group(2)), int(m.group(3)), m.group(4) or None) + + +# Comparable SemVer precedence key. The 4th element keeps a uniform shape across stable and +# prerelease so the whole tuple is orderable: (rank, identifiers), where stable rank 1 > prerelease +# rank 0 (and the identifier list is only ever compared when both sides are prereleases, rank 0). +_PreKey = tuple[int, int, int, tuple[int, list[tuple[int, int, str]]]] + + +def _precedence_key(parsed: tuple[int, int, int, str | None]) -> _PreKey: + """SemVer §11 precedence key (directly comparable with ``<``). + + A stable release outranks any prerelease of the same triplet (``0.1.0-rc.1 < 0.1.0``); + among prereleases, numeric identifiers rank below alphanumeric and compare field-by-field, + with a longer identifier list outranking a shorter prefix-equal one. + """ + major, minor, patch, prerelease = parsed + if prerelease is None: + return (major, minor, patch, (1, [])) # stable sorts above every prerelease + identifiers: list[tuple[int, int, str]] = [] + for ident in prerelease.split("."): + if ident.isdigit(): + identifiers.append((0, int(ident), "")) # numeric: lowest class, numeric order + else: + identifiers.append((1, 0, ident)) # alphanumeric: higher class, lexical order + return (major, minor, patch, (0, identifiers)) + + +def _truthy(name: str) -> bool: + return os.environ.get(name, "").strip().lower() in ("1", "true", "yes", "on") + + +async def fetch_backend_version(base_url: str, *, timeout: float = 5.0) -> str | None: + """Return the backend's reported contract version (``/openapi.json`` ``info.version``), or None.""" + url = base_url.rstrip("/") + "/openapi.json" + try: + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.get(url) + resp.raise_for_status() + return (resp.json().get("info") or {}).get("version") + except Exception as exc: # noqa: BLE001 - any failure → unknown, handled by caller + logger.warning("backend version guard: could not fetch %s (%s)", url, exc) + return None + + +async def assert_backend_compatible( + base_url: str | None, + *, + min_version: str = MIN_BACKEND_CONTRACT, + sdk_version: str | None = None, +) -> None: + """Fail fast at startup if the backend is older than ``min_version``. + + No-op (warns, does not raise) when: + - ``AGENTEX_SKIP_VERSION_CHECK`` is set (explicit bypass), + - ``base_url`` is unset, + - the backend version can't be determined (unreachable / unparseable) — a transient + blip or a contract-less server shouldn't crash startup. + + Raises ``IncompatibleBackendError`` only when the backend version is *known* and older + than ``min_version``. + """ + if _truthy(SKIP_ENV): + logger.warning("%s set — skipping backend version guard", SKIP_ENV) + return + if not base_url: + return + + if sdk_version is None: + from agentex._version import __version__ as sdk_version # local import to avoid cycles + + backend_version = await fetch_backend_version(base_url) + if backend_version is None: + logger.warning( + "backend version guard: could not determine backend version at %s; proceeding " + "(set %s=1 to silence).", + base_url, + SKIP_ENV, + ) + return + + backend, minimum = _parse(backend_version), _parse(min_version) + if backend is None or minimum is None: + logger.warning( + "backend version guard: unparseable version(s) backend=%r min=%r; proceeding.", + backend_version, + min_version, + ) + return + + if _precedence_key(backend) < _precedence_key(minimum): + raise IncompatibleBackendError( + f"agentex-sdk {sdk_version} requires agentex backend >= {min_version}, " + f"but {base_url} reports {backend_version}. " + f"Upgrade the backend, or pin agentex-sdk to a version compatible with backend " + f"{backend_version}. (Set {SKIP_ENV}=1 to bypass at your own risk.)" + ) + + logger.info( + "backend version guard OK: sdk=%s backend=%s (min=%s)", + sdk_version, + backend_version, + min_version, + ) diff --git a/src/agentex/lib/core/temporal/workers/worker.py b/src/agentex/lib/core/temporal/workers/worker.py index 253b6759f..2b4958b1f 100644 --- a/src/agentex/lib/core/temporal/workers/worker.py +++ b/src/agentex/lib/core/temporal/workers/worker.py @@ -30,6 +30,7 @@ from agentex.lib.utils.logging import make_logger from agentex.lib.utils.registration import register_agent from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.compat.version_guard import assert_backend_compatible logger = make_logger(__name__) @@ -278,6 +279,10 @@ async def start_health_check_server(self): async def _register_agent(self): env_vars = EnvironmentVariables.refresh() if env_vars and env_vars.AGENTEX_BASE_URL: + # Fail fast if this worker is pointed at a backend older than the SDK supports — + # the worker process never goes through the ACP server lifespan, so it needs its + # own guard (mirrors base_acp_server.lifespan_context). + await assert_backend_compatible(env_vars.AGENTEX_BASE_URL) await register_agent(env_vars) else: logger.warning("AGENTEX_BASE_URL not set, skipping worker registration") diff --git a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py index b0b1c3685..b3dac487e 100644 --- a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py +++ b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py @@ -33,6 +33,7 @@ from agentex.types.task_message_update import TaskMessageUpdate, StreamTaskMessageFull from agentex.types.task_message_content import TaskMessageContent from agentex.lib.core.tracing.span_queue import shutdown_default_span_queue +from agentex.lib.core.compat.version_guard import assert_backend_compatible from agentex.lib.sdk.fastacp.base.constants import ( FASTACP_HEADER_SKIP_EXACT, FASTACP_HEADER_SKIP_PREFIXES, @@ -104,6 +105,9 @@ def get_lifespan_function(self): async def lifespan_context(app: FastAPI): # noqa: ARG001 env_vars = EnvironmentVariables.refresh() if env_vars.AGENTEX_BASE_URL: + # Runtime SDK<->backend contract guard: fail fast if the backend is older + # than this SDK supports, instead of opaque 500s later. See compat.version_guard. + await assert_backend_compatible(env_vars.AGENTEX_BASE_URL) await register_agent(env_vars, agent_card=self._agent_card) self.agent_id = env_vars.AGENT_ID else: diff --git a/tests/lib/core/temporal/__init__.py b/tests/lib/core/temporal/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/temporal/workers/__init__.py b/tests/lib/core/temporal/workers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/temporal/workers/test_worker_version_guard.py b/tests/lib/core/temporal/workers/test_worker_version_guard.py new file mode 100644 index 000000000..4ab5fc435 --- /dev/null +++ b/tests/lib/core/temporal/workers/test_worker_version_guard.py @@ -0,0 +1,70 @@ +"""AgentexWorker wires the backend version guard into worker startup. + +A Temporal worker runs as its own process and never goes through the ACP server +lifespan, so the guard must run inside `_register_agent` — before `register_agent`, +and only when `AGENTEX_BASE_URL` is set. +""" + +from __future__ import annotations + +from unittest.mock import Mock, AsyncMock + +import pytest + +from agentex.lib.core.temporal.workers import worker as worker_mod +from agentex.lib.core.compat.version_guard import IncompatibleBackendError + + +def _worker(): + # explicit health_check_port so __init__ doesn't read EnvironmentVariables + return worker_mod.AgentexWorker(task_queue="test-queue", health_check_port=8080) + + +def _patch_env(monkeypatch, base_url): + env = Mock() + env.AGENTEX_BASE_URL = base_url + fake_cls = Mock() + fake_cls.refresh.return_value = env + monkeypatch.setattr(worker_mod, "EnvironmentVariables", fake_cls) + return env + + +async def test_guard_runs_before_register_agent(monkeypatch): + env = _patch_env(monkeypatch, "http://backend") + order: list[str] = [] + guard = AsyncMock(side_effect=lambda *a, **k: order.append("guard")) + register = AsyncMock(side_effect=lambda *a, **k: order.append("register")) + monkeypatch.setattr(worker_mod, "assert_backend_compatible", guard) + monkeypatch.setattr(worker_mod, "register_agent", register) + + await _worker()._register_agent() + + guard.assert_awaited_once_with("http://backend") + register.assert_awaited_once_with(env) + assert order == ["guard", "register"] # guard must precede registration + + +async def test_incompatible_backend_blocks_registration(monkeypatch): + _patch_env(monkeypatch, "http://backend") + guard = AsyncMock(side_effect=IncompatibleBackendError("backend too old")) + register = AsyncMock() + monkeypatch.setattr(worker_mod, "assert_backend_compatible", guard) + monkeypatch.setattr(worker_mod, "register_agent", register) + + with pytest.raises(IncompatibleBackendError): + await _worker()._register_agent() + + register.assert_not_awaited() # fail fast — never register against an unsupported backend + + +async def test_no_base_url_skips_guard_and_registration(monkeypatch): + _patch_env(monkeypatch, None) + guard = AsyncMock() + register = AsyncMock() + monkeypatch.setattr(worker_mod, "assert_backend_compatible", guard) + monkeypatch.setattr(worker_mod, "register_agent", register) + + await _worker()._register_agent() + + guard.assert_not_awaited() + register.assert_not_awaited() diff --git a/tests/test_version_guard.py b/tests/test_version_guard.py new file mode 100644 index 000000000..dba4a50e2 --- /dev/null +++ b/tests/test_version_guard.py @@ -0,0 +1,208 @@ +"""Unit tests for the runtime backend version guard (agentex.lib.core.compat.version_guard).""" + +from __future__ import annotations + +import asyncio + +import httpx +import pytest + +from agentex.lib.core.compat import version_guard as vg + + +def _run(coro): + return asyncio.run(coro) + + +def _patch_transport(monkeypatch, handler): + """Make version_guard's httpx.AsyncClient route through an in-memory MockTransport, + so fetch_backend_version runs for real (request build, status check, JSON parse) + without touching the network. `handler(request) -> httpx.Response` (or raises).""" + + real_client = httpx.AsyncClient # capture before patching to avoid recursing into the factory + + def factory(**kwargs): + kwargs.pop("transport", None) + return real_client(transport=httpx.MockTransport(handler), **kwargs) + + monkeypatch.setattr(vg.httpx, "AsyncClient", factory) + + +def test_parse_versions(): + assert vg._parse("0.2.1") == (0, 2, 1, None) + assert vg._parse("v1.4.0") == (1, 4, 0, None) + assert vg._parse("0.2.1-rc.1+build5") == (0, 2, 1, "rc.1") # build metadata ignored + assert vg._parse("0.1.0+build5") == (0, 1, 0, None) # build metadata only, still stable + assert vg._parse("garbage") is None + assert vg._parse(None) is None + + +def test_parse_rejects_malformed_tails(): + # Anchored regex: a junk tail after the triplet must NOT silently parse as stable 0.1.0; + # it has to fall through to None (→ unknown / unparseable path), not satisfy the floor. + for bad in ("0.1.0rc1", "0.1.0foo", "0.1.0.1", "0.1.0-", "1.2", "0.1.0-rc 1"): + assert vg._parse(bad) is None, bad + + +def test_parse_anchored_both_ends(): + # Leading anchor (^): anything before the triplet (other than whitespace / a `v`) is rejected. + for bad in ("foo0.1.0", ">=0.1.0", "x0.1.0", "=0.1.0", "0 0.1.0"): + assert vg._parse(bad) is None, bad + # Trailing anchor ($): anything after the version (other than whitespace) is rejected. + for bad in ("0.1.0 extra", "0.1.0;", "0.1.0/", "0.1.0+", "0.1.0 0.1.0"): + assert vg._parse(bad) is None, bad + # What the anchors DO permit: surrounding whitespace and an optional leading `v`. + assert vg._parse(" 0.1.0 ") == (0, 1, 0, None) + assert vg._parse("\tv1.2.3\n") == (1, 2, 3, None) + assert vg._parse(" 0.2.0-rc.1 ") == (0, 2, 0, "rc.1") + + +def test_prerelease_precedence(): + k = lambda v: vg._precedence_key(vg._parse(v)) # noqa: E731 + assert k("0.1.0-rc.1") < k("0.1.0") # prerelease precedes its stable release (SemVer §11) + assert k("0.1.0-rc.1") < k("0.1.0-rc.2") # numeric prerelease identifiers compare numerically + assert k("0.1.0-alpha") < k("0.1.0-rc") # numeric/alpha ordering by identifier + assert k("0.1.0") < k("0.1.1-rc.1") # patch bump outranks prior stable + assert k("0.2.0-rc.1") > k("0.1.0") # prerelease of a higher version still clears the floor + + +def test_compatible_backend_passes(monkeypatch): + async def fake(url, **kw): + return "0.2.0" + + monkeypatch.setattr(vg, "fetch_backend_version", fake) + # backend (0.2.0) >= min (0.1.0) → no raise + _run(vg.assert_backend_compatible("http://backend", min_version="0.1.0")) + + +def test_incompatible_backend_raises(monkeypatch): + async def fake(url, **kw): + return "0.0.9" + + monkeypatch.setattr(vg, "fetch_backend_version", fake) + with pytest.raises(vg.IncompatibleBackendError) as exc: + _run(vg.assert_backend_compatible("http://backend", min_version="0.1.0", sdk_version="0.13.0")) + msg = str(exc.value) + assert "0.13.0" in msg and "0.1.0" in msg and "0.0.9" in msg # actionable message + + +def test_prerelease_backend_below_stable_floor_raises(monkeypatch): + async def fake(url, **kw): + return "0.1.0-rc.1" # release candidate: precedes the stable 0.1.0 contract + + monkeypatch.setattr(vg, "fetch_backend_version", fake) + with pytest.raises(vg.IncompatibleBackendError): + _run(vg.assert_backend_compatible("http://backend", min_version="0.1.0", sdk_version="0.13.0")) + + +def test_skip_env_bypasses(monkeypatch): + async def fake(url, **kw): + raise AssertionError("must not fetch when skip env is set") + + monkeypatch.setattr(vg, "fetch_backend_version", fake) + monkeypatch.setenv(vg.SKIP_ENV, "1") + # even an impossible min must not raise when explicitly skipped + _run(vg.assert_backend_compatible("http://backend", min_version="9.9.9")) + + +def test_unknown_backend_version_does_not_crash(monkeypatch): + async def fake(url, **kw): + return None # unreachable / no version → unknown + + monkeypatch.setattr(vg, "fetch_backend_version", fake) + # unknown version warns but must not raise (transient/contract-less server) + _run(vg.assert_backend_compatible("http://backend", min_version="9.9.9")) + + +def test_no_base_url_is_noop(): + _run(vg.assert_backend_compatible(None)) + _run(vg.assert_backend_compatible("")) + + +def test_truthy(monkeypatch): + for val in ("1", "true", "True", "YES", "on"): + monkeypatch.setenv("X_GUARD_FLAG", val) + assert vg._truthy("X_GUARD_FLAG") + for val in ("0", "false", "no", "off", ""): + monkeypatch.setenv("X_GUARD_FLAG", val) + assert not vg._truthy("X_GUARD_FLAG") + monkeypatch.delenv("X_GUARD_FLAG", raising=False) + assert not vg._truthy("X_GUARD_FLAG") # unset → falsy + + +# --- fetch_backend_version: exercised for real through MockTransport (not mocked out) --- + + +def test_fetch_success_and_url_construction(monkeypatch): + seen = {} + + def handler(request): + seen["url"] = str(request.url) + seen["method"] = request.method + return httpx.Response(200, json={"openapi": "3.1.0", "info": {"version": "0.2.0"}}) + + _patch_transport(monkeypatch, handler) + assert _run(vg.fetch_backend_version("http://backend/")) == "0.2.0" + assert seen["url"] == "http://backend/openapi.json" # trailing slash trimmed, path appended + assert seen["method"] == "GET" + + +def test_fetch_missing_version_field(monkeypatch): + _patch_transport(monkeypatch, lambda r: httpx.Response(200, json={"info": {}})) + assert _run(vg.fetch_backend_version("http://backend")) is None + + +def test_fetch_missing_info_object(monkeypatch): + # `info` absent entirely, and `info: null` — both must coalesce to None, not crash. + _patch_transport(monkeypatch, lambda r: httpx.Response(200, json={})) + assert _run(vg.fetch_backend_version("http://backend")) is None + _patch_transport(monkeypatch, lambda r: httpx.Response(200, json={"info": None})) + assert _run(vg.fetch_backend_version("http://backend")) is None + + +def test_fetch_http_error_status(monkeypatch): + # raise_for_status() → caught → None (e.g. server has no /openapi.json) + _patch_transport(monkeypatch, lambda r: httpx.Response(404, text="not found")) + assert _run(vg.fetch_backend_version("http://backend")) is None + _patch_transport(monkeypatch, lambda r: httpx.Response(503, text="unavailable")) + assert _run(vg.fetch_backend_version("http://backend")) is None + + +def test_fetch_non_json_body(monkeypatch): + _patch_transport(monkeypatch, lambda r: httpx.Response(200, text="nope")) + assert _run(vg.fetch_backend_version("http://backend")) is None + + +def test_fetch_connection_error(monkeypatch): + def handler(request): + raise httpx.ConnectError("connection refused", request=request) + + _patch_transport(monkeypatch, handler) + assert _run(vg.fetch_backend_version("http://backend")) is None + + +# --- assert_backend_compatible end-to-end: real fetch through MockTransport, not mocked out --- + + +def test_assert_end_to_end_old_backend_raises(monkeypatch): + monkeypatch.delenv(vg.SKIP_ENV, raising=False) + _patch_transport(monkeypatch, lambda r: httpx.Response(200, json={"info": {"version": "0.0.9"}})) + with pytest.raises(vg.IncompatibleBackendError): + _run(vg.assert_backend_compatible("http://backend", min_version="0.1.0", sdk_version="0.13.0")) + + +def test_assert_end_to_end_new_backend_passes(monkeypatch): + monkeypatch.delenv(vg.SKIP_ENV, raising=False) + _patch_transport(monkeypatch, lambda r: httpx.Response(200, json={"info": {"version": "0.2.0"}})) + _run(vg.assert_backend_compatible("http://backend", min_version="0.1.0")) + + +def test_assert_end_to_end_unreachable_backend_does_not_raise(monkeypatch): + # real fetch returns None on connection failure → guard proceeds (no crash on transient blip) + monkeypatch.delenv(vg.SKIP_ENV, raising=False) + + def handler(request): + raise httpx.ConnectError("refused", request=request) + + _patch_transport(monkeypatch, handler) + _run(vg.assert_backend_compatible("http://backend", min_version="9.9.9")) From 7e5be61c6c12abcbd52187080b0c5f4896756639 Mon Sep 17 00:00:00 2001 From: "stainless-app[bot]" <142633134+stainless-app[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 20:45:55 +0000 Subject: [PATCH 2/3] codegen metadata --- .stats.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.stats.yml b/.stats.yml index 076fb3798..fa83f7dfa 100644 --- a/.stats.yml +++ b/.stats.yml @@ -1,4 +1,4 @@ configured_endpoints: 64 -openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-dfcee301cded58822f489f034b6fcd42f392df406ca3780e7213698cec59c777.yml -openapi_spec_hash: 3aae4790b24edf6ea9469c1680d513ae +openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-cd43ba4b554ca024dd7ee7b74e4f4700a743282c17def704a0967e6ff251c09b.yml +openapi_spec_hash: 9369ccc9c0289e9d6f641a526d244d1c config_hash: 138b7c0b394e7393133c8ff16a6d0eb3 From 66a3997a23377bb18b9d29d3a1bf1a05dd859f28 Mon Sep 17 00:00:00 2001 From: Daniel Miller Date: Sun, 21 Jun 2026 19:32:26 -0400 Subject: [PATCH 3/3] feat(sdk): add webhook helper for forward-route handlers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add agentex.lib.sdk.utils.webhooks.handle_webhook — a reusable helper an agent calls from a forward-route handler (@acp.post on the route the server's /agents/forward/name/{agent}/{path} ingress proxies to). It shapes the payload (generic or GitHub PR), resolves task params (inline or fetched from a config resolve URL for config-by-id), get-or-creates a task on a stable session key so repeat events fold into one task, drives the turn (sync message / async event), and returns/polls the reply. This keeps webhook triggering on the supported forward mechanism + its built-in GitHub/Slack signature auth, instead of a parallel ingress. Config-by-id is ingress-independent: point params_source at the platform's config-resolve endpoint. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/agentex/lib/sdk/utils/webhooks.py | 373 ++++++++++++++++++++++++++ tests/lib/test_webhooks.py | 215 +++++++++++++++ 2 files changed, 588 insertions(+) create mode 100644 src/agentex/lib/sdk/utils/webhooks.py create mode 100644 tests/lib/test_webhooks.py diff --git a/src/agentex/lib/sdk/utils/webhooks.py b/src/agentex/lib/sdk/utils/webhooks.py new file mode 100644 index 000000000..474d67e66 --- /dev/null +++ b/src/agentex/lib/sdk/utils/webhooks.py @@ -0,0 +1,373 @@ +"""Drive an agent turn from an inbound webhook, inside a forward-route handler. + +The Agentex server already exposes a webhook ingress: a request to +``/agents/forward/name/{agent}/{path}`` is signature-verified (GitHub ``sha256=`` / +Slack ``v0:`` HMAC via the agent's registered keys) and proxied to the agent's own +HTTP route. This helper is what that route handler calls to turn the inbound payload +into an agent turn — without each agent re-implementing payload shaping, config +resolution, session continuity, and reply handling. + +Typical use inside an agent:: + + from fastapi import Request + from agentex.lib.sdk.utils.webhooks import handle_webhook + + + @acp.post("/github-pr") + async def github_pr(request: Request): + body = await request.json() + result = await handle_webhook( + agent_name="my-agent", + payload=body, + acp_type="sync", + shaper="github_pr", + params_source="https:///public/v5/agent_configs//resolve", + params_source_headers={"x-api-key": ..., "x-selected-account-id": ...}, + wait=True, + ) + return {"task_id": result.task_id, "reply": result.reply} + +Config-by-id: pass ``params_source`` pointing at the platform's config-resolve +endpoint; the resolved params (e.g. system_prompt / harness / model / tools) are +forwarded opaquely to ``task/create``. Or pass inline ``params`` for a one-off. +""" + +from __future__ import annotations + +import json +import hashlib +from typing import Any, Literal +from dataclasses import field, dataclass +from collections.abc import Callable, Awaitable + +from agentex.lib import adk +from agentex.lib.utils.logging import make_logger +from agentex.types.task_message_content import TextContent + +logger = make_logger(__name__) + +# Injectable params fetcher (url -> JSON). Default uses httpx; tests inject a fake. +ParamsFetcher = Callable[[str], Awaitable[dict[str, Any]]] + +MAX_BODY_CHARS = 4000 +MAX_DIFF_CHARS = 30000 + + +class WebhookError(RuntimeError): + """Raised when a webhook turn cannot be driven (e.g. params resolution failed).""" + + +@dataclass +class WebhookResult: + task_id: str + # Sync agents reply inline. For async agents, ``reply`` is None unless ``wait`` was + # set, in which case it is the polled reply (or None if it didn't settle in time). + reply: str | None = None + task_metadata: dict[str, str] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- shaping + + +def session_key(agent_name: str, channel: str, peer_id: str) -> str: + """Stable per-conversation task name → reused for get-or-create on task/create, so + repeat events from the same source fold into one task instead of spawning new ones.""" + basis = peer_id or "main" + digest = hashlib.sha1(f"{agent_name}:{channel}:{basis}".encode()).hexdigest()[:16] + return f"wh-{channel}-{digest}" + + +# Top-level fields a generic webhook payload might carry its prompt in, in priority +# order. Matched case-insensitively against the payload's keys. +GENERIC_PROMPT_KEYS = ( + "text", + "message", + "prompt", + "goal", + "content", + "body", + "description", + "title", +) + + +def render_generic(body: dict[str, Any]) -> str: + """Generic payload → prompt text: first non-empty string among GENERIC_PROMPT_KEYS + (case-insensitive), else raw JSON.""" + lowered = {key.lower(): value for key, value in body.items() if isinstance(key, str)} + for key in GENERIC_PROMPT_KEYS: + value = lowered.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return json.dumps(body, indent=2)[:8000] + + +def shape_github_pr(body: dict[str, Any]) -> tuple[str, str | None, str]: + """Shape a GitHub/Gitea pull-request webhook into (prompt, peer_id, sender). + + ``peer_id`` is ``repo#number`` so repeated events for the same PR (opened, + synchronize, ...) fold into one task. Falls back to generic rendering for non-PR + payloads (ping, issue, ...). + """ + pull_request = body.get("pull_request") + if not isinstance(pull_request, dict): + return render_generic(body), None, _github_actor(body) + + repo = _repo_full_name(body) + number = pull_request.get("number") + title = (pull_request.get("title") or "").strip() + action = (body.get("action") or "").strip() + description = (pull_request.get("body") or "").strip() + html_url = pull_request.get("html_url") or pull_request.get("url") + + header = "Pull request" + if repo and number is not None: + header = f"Pull request {repo}#{number}" + elif number is not None: + header = f"Pull request #{number}" + + lines = [f"{header}: {title}" if title else header] + if action: + lines.append(f"Action: {action}") + if html_url: + lines.append(f"URL: {html_url}") + if description: + lines.extend(["", "Description:", description[:MAX_BODY_CHARS]]) + + diff = _inline_diff(body, pull_request) + if diff: + lines.extend(["", "Diff:", diff[:MAX_DIFF_CHARS]]) + else: + # Standard GitHub/Gitea payloads carry a diff/patch URL, not the patch body. + # Surface it so a tool-enabled agent (or the caller) can fetch the diff; inline + # `diff` wins. Gitea sends patch_url alongside diff_url, so accept either. + diff_url = pull_request.get("diff_url") or pull_request.get("patch_url") + if diff_url: + lines.extend(["", f"Diff URL: {diff_url}"]) + + peer_id = None + if repo and number is not None: + peer_id = f"{repo}#{number}" + elif number is not None: + peer_id = f"pr#{number}" + return "\n".join(lines), peer_id, _github_actor(body) + + +def _repo_full_name(body: dict[str, Any]) -> str | None: + repo = body.get("repository") + if isinstance(repo, dict) and isinstance(repo.get("full_name"), str): + return repo["full_name"] or None + return None + + +def _github_actor(body: dict[str, Any]) -> str: + sender = body.get("sender") + if isinstance(sender, dict) and isinstance(sender.get("login"), str) and sender["login"]: + return sender["login"] + return "webhook" + + +def _inline_diff(body: dict[str, Any], pull_request: dict[str, Any]) -> str | None: + for source in (body, pull_request): + diff = source.get("diff") + if isinstance(diff, str) and diff.strip(): + return diff.strip() + return None + + +# ------------------------------------------------------------------- params resolution + + +async def _default_fetch(url: str, headers: dict[str, str]) -> dict[str, Any]: + """GET a params source over HTTP. Imported lazily so callers that only pass inline + params carry no httpx dependency.""" + import httpx + + request_headers = {"accept": "application/json", **headers} + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get(url, headers=request_headers) + response.raise_for_status() + return response.json() + except httpx.HTTPError as exc: + raise WebhookError(f"params source request failed: {exc}") from exc + except ValueError as exc: # json.JSONDecodeError subclasses ValueError + raise WebhookError(f"params source returned invalid JSON: {exc}") from exc + + +async def resolve_remote_params( + url: str, + headers: dict[str, str] | None = None, + *, + fetch: ParamsFetcher | None = None, +) -> tuple[dict[str, Any], dict[str, str]]: + """Fetch params (+ optional task_metadata) from a config-resolve URL. + + Response shape (lenient):: + + {"params": {...}, "task_metadata": {...}} + + A bare object with no ``params`` key is treated as the params dict itself (minus a + top-level ``task_metadata``, which is returned separately for stamping). + """ + do_fetch = fetch or (lambda u: _default_fetch(u, headers or {})) + payload = await do_fetch(url) + if not isinstance(payload, dict): + raise WebhookError("params source returned a non-object response") + + metadata_raw = payload.get("task_metadata") + task_metadata = {str(k): str(v) for k, v in metadata_raw.items()} if isinstance(metadata_raw, dict) else {} + params = payload.get("params") + if not isinstance(params, dict): + params = {k: v for k, v in payload.items() if k != "task_metadata"} + return params, task_metadata + + +# ------------------------------------------------------------------------- dispatch + + +def _agent_reply_text(messages: object) -> str | None: + """Join agent-authored text from a message list (sync result or polled stream).""" + if not isinstance(messages, list): + return None + parts = [] + for message in messages: + content = getattr(message, "content", None) + if ( + content is not None + and getattr(content, "author", None) == "agent" + and getattr(content, "type", None) == "text" + ): + text = (getattr(content, "content", "") or "").strip() + if text: + parts.append(text) + return "\n\n".join(parts) if parts else None + + +async def handle_webhook( + *, + agent_name: str, + payload: dict[str, Any], + acp_type: Literal["sync", "async"] = "sync", + shaper: Literal["generic", "github_pr"] = "generic", + channel: str | None = None, + params: dict[str, Any] | None = None, + params_source: str | None = None, + params_source_headers: dict[str, str] | None = None, + peer_id: str | None = None, + extra_task_metadata: dict[str, str] | None = None, + wait: bool = False, + fetch: ParamsFetcher | None = None, +) -> WebhookResult: + """Drive an agent turn from a webhook payload, agent-side, via the ADK client. + + - Shapes the payload (generic or GitHub PR) into a prompt + conversation scope. + - Resolves task params: inline ``params``, or fetched from ``params_source`` + (config-by-id). The platform never interprets params — they're forwarded to the + agent as ``task/create`` params. + - Get-or-creates a task keyed on a stable session key, so repeat events fold in. + - Sends the turn (sync → message/send returns the reply inline; async → event/send, + with optional ``wait`` to poll for the reply). + """ + channel = channel or shaper + if shaper == "github_pr": + text, derived_peer, sender = shape_github_pr(payload) + peer_id = peer_id or derived_peer + else: + text, sender = render_generic(payload), "webhook" + + task_metadata: dict[str, str] = {"channel": channel, "sender_id": sender} + if peer_id: + task_metadata["peer_id"] = peer_id + + resolved_params = dict(params) if params else {} + if params_source: + resolved_params, source_metadata = await resolve_remote_params( + params_source, params_source_headers, fetch=fetch + ) + # Source metadata + caller extras never override the canonical fields above. + for key, value in {**source_metadata, **(extra_task_metadata or {})}.items(): + task_metadata.setdefault(key, str(value)) + elif extra_task_metadata: + for key, value in extra_task_metadata.items(): + task_metadata.setdefault(key, str(value)) + + name = session_key(agent_name, channel, peer_id or "") + # task/create carries only name/params (CreateTaskParams has no task_metadata field), + # so we create first, then stamp task_metadata via a follow-up update below. + task = await adk.acp.create_task( + name=name, + agent_name=agent_name, + params=resolved_params or None, + ) + + # Best-effort: stamp the resolved task_metadata (channel/sender/peer_id, plus the + # display_name etc. from params_source) onto the task so it's labeled in the UI. + # Failure must never break the run — the metadata is also returned on the result. + if task_metadata: + try: + await adk.tasks.update(task_id=task.id, task_metadata=dict(task_metadata)) + except Exception: + logger.warning("Failed to stamp task_metadata on task %s", task.id, exc_info=True) + + content = TextContent(author="user", content=text, format="markdown") + + if acp_type == "sync": + messages = await adk.acp.send_message(task_id=task.id, agent_name=agent_name, content=content) + return WebhookResult(task_id=task.id, reply=_agent_reply_text(messages), task_metadata=task_metadata) + + # Async: when we'll wait for the reply, snapshot existing message ids BEFORE the + # event so a reused task's prior reply (session continuity) isn't mistaken for it. + if wait: + seen_ids = await _message_ids(task.id) + await adk.acp.send_event(task_id=task.id, agent_name=agent_name, content=content) + reply = await _await_reply(task.id, seen_ids) + else: + await adk.acp.send_event(task_id=task.id, agent_name=agent_name, content=content) + reply = None + return WebhookResult(task_id=task.id, reply=reply, task_metadata=task_metadata) + + +async def _message_ids(task_id: str) -> set[str]: + # Only track real ids. Keeping None in the set would let a later id-less message + # collide with it and be wrongly treated as already-seen (dropping a fresh reply). + messages = await adk.messages.list(task_id=task_id) + return {mid for m in (messages or []) if (mid := getattr(m, "id", None)) is not None} + + +async def _await_reply( + task_id: str, + seen_ids: set[str | None], + *, + timeout_s: float = 120.0, + interval_s: float = 2.0, + quiescence_s: float = 6.0, +) -> str | None: + """Poll for THIS turn's reply — agent text in messages that weren't present before + the event — until it settles (unchanged for ``quiescence_s``) or times out. Filtering + on new message ids avoids returning a stale prior reply on a reused task.""" + import asyncio + + waited = 0.0 + last: str | None = None + stable_for = 0.0 + while waited < timeout_s: + await asyncio.sleep(interval_s) + waited += interval_s + messages = await adk.messages.list(task_id=task_id) + # Only id-bearing messages we haven't seen count as this turn's reply; id-less + # messages can't be tracked across polls, so ignore them rather than risk a + # collision (see _message_ids). + new = [ + m + for m in (messages or []) + if (mid := getattr(m, "id", None)) is not None and mid not in seen_ids + ] + text = _agent_reply_text(new) + if text and text == last: + stable_for += interval_s + if stable_for >= quiescence_s: + return text + elif text: + last, stable_for = text, 0.0 + return last diff --git a/tests/lib/test_webhooks.py b/tests/lib/test_webhooks.py new file mode 100644 index 000000000..01e9de74f --- /dev/null +++ b/tests/lib/test_webhooks.py @@ -0,0 +1,215 @@ +"""Unit tests for the SDK webhook helper (agentex.lib.sdk.utils.webhooks).""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from agentex.lib import adk +from agentex.lib.sdk.utils.webhooks import ( + WebhookError, + session_key, + handle_webhook, + render_generic, + shape_github_pr, + resolve_remote_params, +) + + +def _pr_payload(**pr_overrides) -> dict: + pr = { + "number": 42, + "title": "Add retry to uploader", + "body": "Adds backoff on 503.", + "html_url": "https://example.com/acme/widgets/pull/42", + } + pr.update(pr_overrides) + return { + "action": "opened", + "repository": {"full_name": "acme/widgets"}, + "sender": {"login": "octocat"}, + "pull_request": pr, + } + + +class TestSessionKey: + def test_stable_and_folds_same_conversation(self): + a = session_key("agent-1", "github_pr", "acme/widgets#42") + b = session_key("agent-1", "github_pr", "acme/widgets#42") + assert a == b and a.startswith("wh-github_pr-") + + def test_differs_by_peer(self): + assert session_key("a", "github_pr", "r#1") != session_key("a", "github_pr", "r#2") + + +class TestShaping: + def test_render_generic_prefers_text_field(self): + assert render_generic({"text": "hello"}) == "hello" + + def test_render_generic_falls_back_to_json(self): + assert "zen" in render_generic({"zen": "be awesome"}) + + def test_render_generic_matches_keys_case_insensitively(self): + assert render_generic({"Message": "hi there"}) == "hi there" + + def test_render_generic_supports_broadened_keys(self): + assert render_generic({"description": "do the thing"}) == "do the thing" + + def test_github_pr_shape(self): + text, peer, sender = shape_github_pr(_pr_payload()) + assert "Pull request acme/widgets#42: Add retry to uploader" in text + assert "Action: opened" in text + assert "Adds backoff on 503." in text + assert peer == "acme/widgets#42" + assert sender == "octocat" + + def test_github_pr_includes_diff(self): + body = _pr_payload() + body["pull_request"]["diff"] = "diff --git a/x b/x\n+line" + text, _, _ = shape_github_pr(body) + assert "Diff:" in text and "+line" in text + + def test_non_pr_payload_falls_back_to_generic(self): + text, peer, _ = shape_github_pr({"zen": "be awesome", "hook_id": 1}) + assert "Pull request" not in text + assert "be awesome" in text + assert peer is None + + +class TestResolveRemoteParams: + async def test_envelope_with_params_and_metadata(self): + async def fetch(_url): + return {"params": {"system_prompt": "x", "model": "m"}, "task_metadata": {"cfg": "1"}} + + params, md = await resolve_remote_params("https://h/resolve", fetch=fetch) + assert params == {"system_prompt": "x", "model": "m"} + assert md == {"cfg": "1"} + + async def test_bare_object_is_params_minus_task_metadata(self): + async def fetch(_url): + return {"system_prompt": "x", "task_metadata": {"cfg": "1"}} + + params, md = await resolve_remote_params("https://h/resolve", fetch=fetch) + assert params == {"system_prompt": "x"} # task_metadata stripped from params + assert md == {"cfg": "1"} + + async def test_non_object_raises(self): + async def fetch(_url): + return ["nope"] + + with pytest.raises(WebhookError): + await resolve_remote_params("https://h/resolve", fetch=fetch) + + +def _agent_msg(text: str): + return SimpleNamespace(content=SimpleNamespace(author="agent", type="text", content=text)) + + +class TestHandleWebhook: + @pytest.fixture(autouse=True) + def _mock_adk(self, monkeypatch): + self.created = {} + self.sent = {} + self.stamped = {} + + async def create_task(*, name, agent_name, params=None, request=None, **_): + self.created = {"name": name, "agent_name": agent_name, "params": params, "request": request} + return SimpleNamespace(id="task-1") + + async def send_message(*, task_id, agent_name, content, **_): + self.sent = {"task_id": task_id, "content": content} + return [_agent_msg("Looks good — ship it.")] + + async def update_task(*, task_id, task_metadata=None, **_): + self.stamped = {"task_id": task_id, "task_metadata": task_metadata} + return SimpleNamespace(id=task_id) + + send_event = AsyncMock() + monkeypatch.setattr(adk.acp, "create_task", create_task) + monkeypatch.setattr(adk.acp, "send_message", send_message) + monkeypatch.setattr(adk.acp, "send_event", send_event) + monkeypatch.setattr(adk.tasks, "update", update_task) + self.send_event = send_event + yield + + async def test_sync_github_pr_with_config_by_id(self): + async def fake_resolve(_url): + return {"params": {"system_prompt": "review"}, "task_metadata": {"agent_config_id": "cfg-9"}} + + result = await handle_webhook( + agent_name="golden-agent", + payload=_pr_payload(), + acp_type="sync", + shaper="github_pr", + params_source="https://h/v5/agent_configs/cfg-9/resolve", + fetch=fake_resolve, + ) + + assert result.reply == "Looks good — ship it." + assert self.created["params"] == {"system_prompt": "review"} + # metadata is returned on the result (SDK task/create can't carry it) + md = result.task_metadata + assert md["channel"] == "github_pr" + assert md["peer_id"] == "acme/widgets#42" + assert md["agent_config_id"] == "cfg-9" + # task folded on a stable session key + assert self.created["name"].startswith("wh-github_pr-") + # metadata is also stamped onto the task (best-effort) so it's labeled in the UI + assert self.stamped["task_id"] == "task-1" + assert self.stamped["task_metadata"]["peer_id"] == "acme/widgets#42" + assert self.stamped["task_metadata"]["agent_config_id"] == "cfg-9" + + async def test_inline_params_no_fetch(self): + result = await handle_webhook( + agent_name="a", + payload={"text": "hi"}, + acp_type="sync", + params={"system_prompt": "inline"}, + ) + assert result.reply == "Looks good — ship it." + assert self.created["params"] == {"system_prompt": "inline"} + + async def test_source_metadata_cannot_override_canonical(self): + async def fake_resolve(_url): + return {"params": {}, "task_metadata": {"channel": "spoofed"}} + + result = await handle_webhook( + agent_name="a", + payload=_pr_payload(), + shaper="github_pr", + params_source="https://h/resolve", + fetch=fake_resolve, + ) + assert result.task_metadata["channel"] == "github_pr" + + async def test_async_without_wait_sends_event_and_returns_no_reply(self): + result = await handle_webhook(agent_name="a", payload={"text": "go"}, acp_type="async", wait=False) + assert result.reply is None + self.send_event.assert_awaited_once() + + +class TestAwaitReplyIgnoresStalePriorReply: + async def test_returns_only_new_agent_text_on_reused_task(self, monkeypatch): + from agentex.lib.sdk.utils.webhooks import _await_reply + + old = _agent_msg("OLD reply") + old.id = "m1" + new = _agent_msg("NEW reply") + new.id = "m2" + calls = {"n": 0} + + async def fake_list(*, task_id, **_): + calls["n"] += 1 + return [old] if calls["n"] < 2 else [old, new] # new appears on 2nd poll + + async def no_sleep(_seconds): + return None + + monkeypatch.setattr(adk.messages, "list", fake_list) + monkeypatch.setattr("asyncio.sleep", no_sleep) + + # baseline = the pre-existing old message; only m2 (NEW) should be returned + reply = await _await_reply("task-1", {"m1"}, interval_s=0.0, quiescence_s=0.0) + assert reply == "NEW reply"