diff --git a/examples/tutorials/00_sync/harness_codex/Dockerfile b/examples/tutorials/00_sync/harness_codex/Dockerfile new file mode 100644 index 000000000..72713b95d --- /dev/null +++ b/examples/tutorials/00_sync/harness_codex/Dockerfile @@ -0,0 +1,50 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy pyproject.toml and README.md to install dependencies +COPY 00_sync/harness_codex/pyproject.toml /app/harness_codex/pyproject.toml +COPY 00_sync/harness_codex/README.md /app/harness_codex/README.md + +WORKDIR /app/harness_codex + +# Copy the project code +COPY 00_sync/harness_codex/project /app/harness_codex/project + +# Copy the test files +COPY 00_sync/harness_codex/tests /app/harness_codex/tests + +# Copy shared test utilities +COPY test_utils /app/test_utils + +# Install the required Python packages with dev dependencies +RUN uv pip install --system .[dev] + +# Set environment variables +ENV PYTHONPATH=/app + +# Set test environment variables +ENV AGENT_NAME=s-harness-codex + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/00_sync/harness_codex/README.md b/examples/tutorials/00_sync/harness_codex/README.md new file mode 100644 index 000000000..5f3396cfa --- /dev/null +++ b/examples/tutorials/00_sync/harness_codex/README.md @@ -0,0 +1,40 @@ +# harness_codex (sync) + +Tutorial agent demonstrating the `convert_codex_to_agentex_events` tap, +`CodexTurn`, and `UnifiedEmitter` for a **sync** (HTTP-yield) ACP agent. + +## What this tutorial shows + +- Spawning `codex exec --json` as a **local asyncio subprocess** (no Scale sandbox). +- Wrapping the stdout line stream in a `CodexTurn`. +- Delivering every canonical `StreamTaskMessage*` event to the HTTP caller via + `UnifiedEmitter.yield_turn` (tracing as a side-effect). + +> **Production isolation note:** A tutorial agent runs the Codex CLI locally. +> Production-grade isolation (Scale sandbox, secret injection, MCP configuration) +> is handled by the golden agent at +> `teams/sgp/agents/golden_agent/project/harness/providers/codex.py`. + +## Live runs + +Live runs require: +1. The `codex` CLI on PATH: `npm install -g @openai/codex` +2. `OPENAI_API_KEY` set in the environment. + +## Running offline unit tests + +The offline tests inject a fake subprocess and never invoke the real CLI: + +```bash +cd /path/to/scale-agentex-python +uv run --all-packages --all-extras pytest examples/tutorials/00_sync/harness_codex/tests/test_agent.py -q +``` + +## Running live integration tests + +```bash +export CODEX_LIVE_TESTS=1 +export OPENAI_API_KEY=sk-... +# Start the agent server first, then: +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/00_sync/harness_codex/conftest.py b/examples/tutorials/00_sync/harness_codex/conftest.py new file mode 100644 index 000000000..bdd78994b --- /dev/null +++ b/examples/tutorials/00_sync/harness_codex/conftest.py @@ -0,0 +1,12 @@ +"""Add the agent's project root to sys.path so ``import project`` works. + +Also sets minimal environment variables so the FastACP and tracing modules +can be imported without a running agent server. +""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(__file__)) + +os.environ.setdefault("ACP_URL", "http://localhost:8000") diff --git a/examples/tutorials/00_sync/harness_codex/manifest.yaml b/examples/tutorials/00_sync/harness_codex/manifest.yaml new file mode 100644 index 000000000..52943f8f2 --- /dev/null +++ b/examples/tutorials/00_sync/harness_codex/manifest.yaml @@ -0,0 +1,58 @@ +build: + context: + root: ../../ + include_paths: + - 00_sync/harness_codex + - test_utils + dockerfile: 00_sync/harness_codex/Dockerfile + dockerignore: 00_sync/harness_codex/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: sync + name: s-harness-codex + description: Sync tutorial agent driving the unified harness surface via local codex CLI subprocess + + temporal: + enabled: false + + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "s-harness-codex" + description: "Sync tutorial agent driving the unified harness surface via local codex CLI subprocess" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/00_sync/harness_codex/project/__init__.py b/examples/tutorials/00_sync/harness_codex/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/00_sync/harness_codex/project/acp.py b/examples/tutorials/00_sync/harness_codex/project/acp.py new file mode 100644 index 000000000..bcb5e10df --- /dev/null +++ b/examples/tutorials/00_sync/harness_codex/project/acp.py @@ -0,0 +1,175 @@ +"""Sync ACP handler for the Codex CLI harness tutorial. + +Demonstrates the ``convert_codex_to_agentex_events`` tap + ``CodexTurn`` + +``UnifiedEmitter`` for a sync (HTTP-yield) ACP agent. + +The handler: +1. Spawns ``codex exec --json`` as a LOCAL asyncio subprocess (no sandbox). + This is correct for tutorials and local development; production isolation + is handled by the golden agent's Scale sandbox at + ``teams/sgp/agents/golden_agent/project/harness/providers/codex.py``. +2. Wraps the stdout line stream in a ``CodexTurn``. +3. Delivers every canonical ``StreamTaskMessage*`` event via + ``UnifiedEmitter.yield_turn``, which traces + yields each event back to + the HTTP caller in one pass. + +Live runs require: +- ``codex`` CLI on PATH (``npm install -g @openai/codex``) +- ``OPENAI_API_KEY`` set in the environment +""" + +from __future__ import annotations + +import os +import time +import codecs +import asyncio +from typing import AsyncGenerator +from collections.abc import AsyncIterator + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from agentex.lib.adk import CodexTurn +from agentex.lib.types.acp import SendMessageParams +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.types.task_message_update import TaskMessageUpdate +from agentex.types.task_message_content import TaskMessageContent +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create(acp_type="sync") + +MODEL = os.environ.get("CODEX_MODEL", "o4-mini") + + +async def _spawn_codex(model: str) -> asyncio.subprocess.Process: + """Spawn ``codex exec --json`` locally and return the live process. + + Injection seam: tests replace this function with a fake that returns a + mock process whose stdout yields pre-recorded event lines. + + The flags mirror the golden agent (codex.py in the golden agent repo): + --json machine-readable newline-delimited events + --skip-git-repo-check safe to run outside a git repo + --dangerously-bypass-approvals-and-sandbox + skip interactive approval prompts in a + non-interactive (server) context + --model which OpenAI model to use + + The caller writes the prompt to stdin after the process starts, then + closes stdin so codex knows input is complete. + """ + cmd = [ + "codex", + "exec", + "--json", + "--skip-git-repo-check", + "--dangerously-bypass-approvals-and-sandbox", + "--model", + model, + "-", # read prompt from stdin + ] + return await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + # Discard stderr: codex --json writes events to stdout; its stderr is + # progress/debug noise. Capturing it with PIPE but never reading it + # would deadlock once codex fills the OS pipe buffer (~64 KB). + stderr=asyncio.subprocess.DEVNULL, + env={**os.environ}, + ) + + +async def _process_stdout(process: asyncio.subprocess.Process) -> AsyncIterator[str]: + """Yield newline-delimited JSON lines from the process stdout. + + Uses an incremental UTF-8 decoder so a multibyte character split across two + 4 KB reads is decoded correctly instead of being corrupted at the boundary. + """ + assert process.stdout is not None + decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") + buffer = "" + while True: + chunk = await process.stdout.read(4096) + if not chunk: + break + buffer += decoder.decode(chunk) + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if line: + yield line + buffer += decoder.decode(b"", final=True) + if buffer.strip(): + yield buffer.strip() + + +@acp.on_message_send +async def handle_message_send( + params: SendMessageParams, +) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]: + """Handle each message by running ``codex exec`` locally and streaming events.""" + task_id = params.task.id + user_message = params.content.content + logger.info("Processing message for task %s", task_id) + + start_ms = int(time.monotonic() * 1000) + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + process = await _spawn_codex(MODEL) + + # Write prompt to stdin then close it so codex knows input is done. + assert process.stdin is not None + process.stdin.write(user_message.encode("utf-8")) + await process.stdin.drain() + process.stdin.close() + + turn = CodexTurn( + events=_process_stdout(process), + model=MODEL, + ) + + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + async for event in emitter.yield_turn(turn): + yield event + + await process.wait() + + # Record the real wall-clock duration AFTER streaming completes; setting + # it before the stream ran would capture only subprocess spawn overhead. + turn.duration_ms = int(time.monotonic() * 1000) - start_ms + + if turn_span: + usage = turn.usage() + turn_span.output = { + "model": usage.model, + "input_tokens": usage.input_tokens, + "output_tokens": usage.output_tokens, + } diff --git a/examples/tutorials/00_sync/harness_codex/pyproject.toml b/examples/tutorials/00_sync/harness_codex/pyproject.toml new file mode 100644 index 000000000..ca7d8ac18 --- /dev/null +++ b/examples/tutorials/00_sync/harness_codex/pyproject.toml @@ -0,0 +1,38 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "s-harness-codex" +version = "0.1.0" +description = "Sync tutorial agent driving the unified harness surface via local codex CLI subprocess" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 + +[tool.pytest.ini_options] +asyncio_mode = "auto" diff --git a/examples/tutorials/00_sync/harness_codex/tests/test_agent.py b/examples/tutorials/00_sync/harness_codex/tests/test_agent.py new file mode 100644 index 000000000..b2d5b6498 --- /dev/null +++ b/examples/tutorials/00_sync/harness_codex/tests/test_agent.py @@ -0,0 +1,176 @@ +"""Tests for the sync Codex harness tutorial agent. + +LIVE tests (``TestLiveCodexAgent``): + - Require the ``codex`` CLI on PATH and ``OPENAI_API_KEY`` set. + - Run the full agent end-to-end against a live Agentex server. + - Skipped automatically when ``CODEX_LIVE_TESTS`` is not set to ``1``. + +OFFLINE unit tests (``TestOfflineCodexHandler``): + - Inject a fake async iterator of pre-recorded codex event lines. + - Assert the ``CodexTurn`` + ``UnifiedEmitter`` pipeline yields events, + populates usage, and satisfies the ``HarnessTurn`` protocol. + - Always run. +""" + +from __future__ import annotations + +import os +import json +from typing import Any + +import pytest + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +SAMPLE_EVENTS: list[dict[str, Any]] = [ + {"type": "thread.started", "thread_id": "thread-abc"}, + {"type": "turn.started"}, + { + "type": "item.started", + "item": {"id": "msg-1", "type": "agent_message", "text": "Hello"}, + }, + { + "type": "item.completed", + "item": {"id": "msg-1", "type": "agent_message", "text": "Hello, world!"}, + }, + { + "type": "turn.completed", + "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + }, +] + + +async def _fake_event_stream(): + """Async iterator of pre-recorded codex event JSON lines (no subprocess).""" + for evt in SAMPLE_EVENTS: + yield json.dumps(evt) + + +class TestOfflineCodexHandler: + """Unit tests that run without a real codex CLI or network.""" + + @pytest.mark.asyncio + async def test_codex_turn_yields_stream_events(self): + """CodexTurn drives the unified surface and yields StreamTaskMessage* events.""" + from agentex.lib.adk import CodexTurn + from agentex.lib.core.harness import UnifiedEmitter + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + + events = [e async for e in emitter.yield_turn(turn)] + assert len(events) > 0, "No events yielded" + + types_seen = {type(e).__name__ for e in events} + known_types = { + "StreamTaskMessageStart", + "StreamTaskMessageDelta", + "StreamTaskMessageFull", + "StreamTaskMessageDone", + } + assert bool(types_seen & known_types), f"Unexpected event types: {types_seen}" + + @pytest.mark.asyncio + async def test_usage_populated_after_stream_exhausted(self): + """CodexTurn.usage() returns correct tokens after stream is exhausted.""" + from agentex.lib.adk import CodexTurn + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + + collected = [e async for e in turn.events] + + usage = turn.usage() + assert usage.input_tokens == 10 + assert usage.output_tokens == 5 + assert usage.total_tokens == 15 + assert usage.model == "o4-mini" + + @pytest.mark.asyncio + async def test_codex_turn_protocol_compliance(self): + """CodexTurn satisfies the HarnessTurn protocol.""" + from agentex.lib.adk import CodexTurn + from agentex.lib.core.harness.types import HarnessTurn + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + assert isinstance(turn, HarnessTurn), "CodexTurn does not satisfy HarnessTurn protocol" + + @pytest.mark.asyncio + async def test_unified_emitter_yield_passes_through_events(self): + """UnifiedEmitter.yield_turn passes events through unchanged in sync mode.""" + from agentex.lib.adk import CodexTurn + from agentex.lib.core.harness import UnifiedEmitter + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + + events = [e async for e in emitter.yield_turn(turn)] + assert len(events) > 0 + + @pytest.mark.asyncio + async def test_convert_codex_to_agentex_events_direct(self): + """convert_codex_to_agentex_events tap produces text start/done events.""" + from agentex.lib.adk import convert_codex_to_agentex_events + from agentex.types.task_message_update import StreamTaskMessageDone + + events = [e async for e in convert_codex_to_agentex_events(_fake_event_stream())] + assert any(isinstance(e, StreamTaskMessageDone) for e in events), ( + "Expected at least one StreamTaskMessageDone event" + ) + + @pytest.mark.asyncio + async def test_on_result_callback_receives_session_id(self): + """on_result callback receives the session_id from thread.started.""" + from agentex.lib.adk import convert_codex_to_agentex_events + + captured: list[dict] = [] + + events = [ + e + async for e in convert_codex_to_agentex_events( + _fake_event_stream(), + on_result=captured.append, + ) + ] + + assert len(captured) == 1 + assert captured[0]["session_id"] == "thread-abc" + assert captured[0]["tool_call_count"] == 0 + + +# --------------------------------------------------------------------------- +# Live tests (skipped unless CODEX_LIVE_TESTS=1) +# --------------------------------------------------------------------------- + +LIVE = os.environ.get("CODEX_LIVE_TESTS", "") == "1" +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "s-harness-codex") + + +@pytest.mark.skipif(not LIVE, reason="Set CODEX_LIVE_TESTS=1 and ensure codex CLI + OPENAI_API_KEY are available") +class TestLiveCodexAgent: + """End-to-end tests that require the real codex CLI and a running Agentex server.""" + + @pytest.fixture + def client(self): + from agentex import Agentex + + return Agentex(base_url=AGENTEX_API_BASE_URL) + + def test_send_simple_message(self, client): + from agentex.types import TextContentParam + from agentex.types.agent_rpc_params import ParamsSendMessageRequest + + response = client.agents.send_message( + agent_name=AGENT_NAME, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="What is 2+2? Reply with just the number.", + type="text", + ) + ), + ) + assert response.result is not None + assert len(response.result) >= 1 diff --git a/examples/tutorials/10_async/00_base/harness_codex/Dockerfile b/examples/tutorials/10_async/00_base/harness_codex/Dockerfile new file mode 100644 index 000000000..06b76aae2 --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_codex/Dockerfile @@ -0,0 +1,39 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY 10_async/00_base/harness_codex/pyproject.toml /app/harness_codex/pyproject.toml +COPY 10_async/00_base/harness_codex/README.md /app/harness_codex/README.md + +WORKDIR /app/harness_codex + +COPY 10_async/00_base/harness_codex/project /app/harness_codex/project +COPY 10_async/00_base/harness_codex/tests /app/harness_codex/tests +COPY test_utils /app/test_utils + +RUN uv pip install --system .[dev] + +ENV PYTHONPATH=/app +ENV AGENT_NAME=ab-harness-codex + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/10_async/00_base/harness_codex/README.md b/examples/tutorials/10_async/00_base/harness_codex/README.md new file mode 100644 index 000000000..9bbcd927a --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_codex/README.md @@ -0,0 +1,40 @@ +# harness_codex (async base) + +Tutorial agent demonstrating the `convert_codex_to_agentex_events` tap, +`CodexTurn`, and `UnifiedEmitter` for an **async** (Redis-streaming, no Temporal) +ACP agent. + +## What this tutorial shows + +- Spawning `codex exec --json` as a **local asyncio subprocess** (no Scale sandbox). +- Wrapping the stdout line stream in a `CodexTurn`. +- Delivering every canonical `StreamTaskMessage*` event to Redis via + `UnifiedEmitter.auto_send_turn`, so the UI receives tokens in real time. +- Persisting the codex thread ID in `adk.state` so subsequent turns resume the + same codex session via `codex exec resume `. + +> **Production isolation note:** A tutorial agent runs the Codex CLI locally. +> Production-grade isolation (Scale sandbox, secret injection, MCP configuration) +> is handled by the golden agent at +> `teams/sgp/agents/golden_agent/project/harness/providers/codex.py`. + +## Live runs + +Live runs require: +1. The `codex` CLI on PATH: `npm install -g @openai/codex` +2. `OPENAI_API_KEY` set in the environment. + +## Running offline unit tests + +```bash +cd /path/to/scale-agentex-python +uv run --all-packages --all-extras pytest examples/tutorials/10_async/00_base/harness_codex/tests/test_agent.py -q +``` + +## Running live integration tests + +```bash +export CODEX_LIVE_TESTS=1 +export OPENAI_API_KEY=sk-... +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/10_async/00_base/harness_codex/conftest.py b/examples/tutorials/10_async/00_base/harness_codex/conftest.py new file mode 100644 index 000000000..bdd78994b --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_codex/conftest.py @@ -0,0 +1,12 @@ +"""Add the agent's project root to sys.path so ``import project`` works. + +Also sets minimal environment variables so the FastACP and tracing modules +can be imported without a running agent server. +""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(__file__)) + +os.environ.setdefault("ACP_URL", "http://localhost:8000") diff --git a/examples/tutorials/10_async/00_base/harness_codex/manifest.yaml b/examples/tutorials/10_async/00_base/harness_codex/manifest.yaml new file mode 100644 index 000000000..e88e2029d --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_codex/manifest.yaml @@ -0,0 +1,58 @@ +build: + context: + root: ../../../ + include_paths: + - 10_async/00_base/harness_codex + - test_utils + dockerfile: 10_async/00_base/harness_codex/Dockerfile + dockerignore: 10_async/00_base/harness_codex/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: async + name: ab-harness-codex + description: Async (base) tutorial agent driving the unified harness surface via local codex CLI subprocess + + temporal: + enabled: false + + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "ab-harness-codex" + description: "Async (base) tutorial agent driving the unified harness surface via local codex CLI subprocess" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/00_base/harness_codex/project/__init__.py b/examples/tutorials/10_async/00_base/harness_codex/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/00_base/harness_codex/project/acp.py b/examples/tutorials/10_async/00_base/harness_codex/project/acp.py new file mode 100644 index 000000000..0233c49ab --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_codex/project/acp.py @@ -0,0 +1,230 @@ +"""Async (base) ACP handler for the Codex CLI harness tutorial. + +Demonstrates the ``convert_codex_to_agentex_events`` tap + ``CodexTurn`` + +``UnifiedEmitter`` for an async (Redis-streaming) ACP agent without Temporal. + +The handler: +1. Spawns ``codex exec --json`` as a LOCAL asyncio subprocess (no sandbox). + This is correct for tutorials and local development; production isolation + is handled by the golden agent's Scale sandbox at + ``teams/sgp/agents/golden_agent/project/harness/providers/codex.py``. +2. Wraps the stdout line stream in a ``CodexTurn``. +3. Delivers every canonical ``StreamTaskMessage*`` event to Redis via + ``UnifiedEmitter.auto_send_turn``, so the UI receives tokens in real time. +4. Multi-turn memory is persisted via ``adk.state``. + +Live runs require: +- ``codex`` CLI on PATH (``npm install -g @openai/codex``) +- ``OPENAI_API_KEY`` set in the environment +""" + +from __future__ import annotations + +import os +import time +import codecs +import asyncio +from collections.abc import AsyncIterator + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from agentex.lib.adk import CodexTurn +from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.types.fastacp import AsyncACPConfig +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.utils.model_utils import BaseModel +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create( + acp_type="async", + config=AsyncACPConfig(type="base"), +) + +MODEL = os.environ.get("CODEX_MODEL", "o4-mini") + + +class ConversationState(BaseModel): + """Per-task conversation state persisted via ``adk.state``. + + We store the codex session/thread ID so subsequent turns can resume the + same codex session via ``codex exec resume ``. + """ + + codex_thread_id: str | None = None + turn_number: int = 0 + + +async def _spawn_codex( + model: str, + thread_id: str | None = None, +) -> asyncio.subprocess.Process: + """Spawn ``codex exec --json`` locally and return the live process. + + Injection seam: tests replace this function with a fake that returns a + mock process whose stdout yields pre-recorded event lines. + + When ``thread_id`` is provided the subcommand becomes + ``codex exec ... resume -`` so codex continues the prior + conversation thread. + + The caller writes the prompt to stdin after the process starts, then + closes stdin so codex knows input is complete. + """ + base_flags = [ + "--json", + "--skip-git-repo-check", + "--dangerously-bypass-approvals-and-sandbox", + "--model", + model, + ] + + if thread_id: + cmd = ["codex", "exec", *base_flags, "resume", thread_id, "-"] + else: + cmd = ["codex", "exec", *base_flags, "-"] + + return await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + # Discard stderr: codex --json writes events to stdout; its stderr is + # progress/debug noise. Capturing it with PIPE but never reading it + # would deadlock once codex fills the OS pipe buffer (~64 KB). + stderr=asyncio.subprocess.DEVNULL, + env={**os.environ}, + ) + + +async def _process_stdout(process: asyncio.subprocess.Process) -> AsyncIterator[str]: + """Yield newline-delimited JSON lines from the process stdout. + + Uses an incremental UTF-8 decoder so a multibyte character split across two + 4 KB reads is decoded correctly instead of being corrupted at the boundary. + """ + assert process.stdout is not None + decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") + buffer = "" + while True: + chunk = await process.stdout.read(4096) + if not chunk: + break + buffer += decoder.decode(chunk) + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if line: + yield line + buffer += decoder.decode(b"", final=True) + if buffer.strip(): + yield buffer.strip() + + +@acp.on_task_create +async def handle_task_create(params: CreateTaskParams): + """Initialize per-task state on task creation.""" + logger.info("Task created: %s", params.task.id) + await adk.state.create( + task_id=params.task.id, + agent_id=params.agent.id, + state=ConversationState(), + ) + + +@acp.on_task_event_send +async def handle_task_event_send(params: SendEventParams): + """Handle each user message: spawn codex, stream events, save thread ID.""" + task_id = params.task.id + agent_id = params.agent.id + user_message = params.event.content.content + + logger.info("Processing message for task %s", task_id) + + await adk.messages.create(task_id=task_id, content=params.event.content) + + task_state = await adk.state.get_by_task_and_agent(task_id=task_id, agent_id=agent_id) + if task_state is None: + state = ConversationState() + task_state = await adk.state.create(task_id=task_id, agent_id=agent_id, state=state) + else: + state = ConversationState.model_validate(task_state.state) + + state.turn_number += 1 + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name=f"Turn {state.turn_number}", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + start_ms = int(time.monotonic() * 1000) + + process = await _spawn_codex(MODEL, thread_id=state.codex_thread_id) + + assert process.stdin is not None + process.stdin.write(user_message.encode("utf-8")) + await process.stdin.drain() + process.stdin.close() + + turn = CodexTurn( + events=_process_stdout(process), + model=MODEL, + ) + + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + result = await emitter.auto_send_turn(turn) + + await process.wait() + + # Record the real wall-clock duration AFTER streaming completes; setting + # it before the stream ran would capture only subprocess spawn overhead. + turn.duration_ms = int(time.monotonic() * 1000) - start_ms + + # Persist the new thread ID so subsequent turns resume the same session. + usage = turn.usage() + if usage.model: + # usage() is valid now that the stream is exhausted + pass + # Persist the codex session id (public accessor; valid post-stream) so the + # next turn resumes the same session. + if turn.session_id: + state.codex_thread_id = turn.session_id + + await adk.state.update( + state_id=task_state.id, + task_id=task_id, + agent_id=agent_id, + state=state, + ) + + if turn_span: + turn_span.output = { + "final_text": result.final_text, + "model": usage.model, + } + + +@acp.on_task_cancel +async def handle_task_canceled(params: CancelTaskParams): + logger.info("Task canceled: %s", params.task.id) diff --git a/examples/tutorials/10_async/00_base/harness_codex/pyproject.toml b/examples/tutorials/10_async/00_base/harness_codex/pyproject.toml new file mode 100644 index 000000000..c25a65c47 --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_codex/pyproject.toml @@ -0,0 +1,38 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "ab-harness-codex" +version = "0.1.0" +description = "Async (base) tutorial agent driving the unified harness surface via local codex CLI subprocess" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 + +[tool.pytest.ini_options] +asyncio_mode = "auto" diff --git a/examples/tutorials/10_async/00_base/harness_codex/tests/test_agent.py b/examples/tutorials/10_async/00_base/harness_codex/tests/test_agent.py new file mode 100644 index 000000000..b50ee9116 --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_codex/tests/test_agent.py @@ -0,0 +1,188 @@ +"""Tests for the async (base) Codex harness tutorial agent. + +LIVE tests (``TestLiveCodexAgent``): + - Require the ``codex`` CLI on PATH and ``OPENAI_API_KEY`` set. + - Skipped automatically when ``CODEX_LIVE_TESTS`` is not set to ``1``. + +OFFLINE unit tests (``TestOfflineCodexHandler``): + - Inject a fake async iterator of pre-recorded codex event lines. + - Assert ``CodexTurn`` + ``UnifiedEmitter.auto_send_turn`` is driven correctly. + - Always run. +""" + +from __future__ import annotations + +import os +import json +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +SAMPLE_EVENTS: list[dict[str, Any]] = [ + {"type": "thread.started", "thread_id": "thread-xyz"}, + {"type": "turn.started"}, + { + "type": "item.started", + "item": {"id": "msg-1", "type": "agent_message", "text": "Hi"}, + }, + { + "type": "item.completed", + "item": {"id": "msg-1", "type": "agent_message", "text": "Hi there!"}, + }, + { + "type": "turn.completed", + "usage": {"input_tokens": 8, "output_tokens": 4, "total_tokens": 12}, + }, +] + + +async def _fake_event_stream(): + """Async iterator of pre-recorded codex event JSON lines (no subprocess).""" + for evt in SAMPLE_EVENTS: + yield json.dumps(evt) + + +class TestOfflineCodexHandler: + """Unit tests that run without a real codex CLI or network.""" + + @pytest.mark.asyncio + async def test_usage_populated_after_stream_exhausted(self): + """CodexTurn.usage() returns non-None tokens after stream is exhausted.""" + from agentex.lib.adk import CodexTurn + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + + collected = [e async for e in turn.events] + + usage = turn.usage() + assert usage.input_tokens == 8 + assert usage.output_tokens == 4 + assert usage.model == "o4-mini" + + @pytest.mark.asyncio + async def test_auto_send_turn_drives_unified_surface(self): + """auto_send_turn returns a TurnResult with the final text.""" + from agentex.lib.adk import CodexTurn + from agentex.lib.core.harness import UnifiedEmitter + from agentex.types.task_message import TaskMessage + from agentex.types.text_content import TextContent + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + + real_task_msg = TaskMessage( + id="msg-fake", + task_id="t", + content=TextContent(type="text", author="agent", content=""), + ) + + fake_streaming = MagicMock() + fake_ctx = AsyncMock() + fake_ctx.__aenter__ = AsyncMock(return_value=fake_ctx) + fake_ctx.__aexit__ = AsyncMock(return_value=False) + fake_ctx.stream_update = AsyncMock(return_value=MagicMock()) + fake_ctx.close = AsyncMock() + fake_ctx.task_message = real_task_msg + fake_streaming.streaming_task_message_context = MagicMock(return_value=fake_ctx) + + emitter = UnifiedEmitter( + task_id="t", + trace_id=None, + parent_span_id=None, + streaming=fake_streaming, + ) + + result = await emitter.auto_send_turn(turn) + assert result is not None + + @pytest.mark.asyncio + async def test_session_id_captured_after_stream(self): + """CodexTurn._result captures the session_id from thread.started.""" + from agentex.lib.adk import CodexTurn + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + _ = [e async for e in turn.events] + + assert turn._result is not None + assert turn._result["session_id"] == "thread-xyz" + + @pytest.mark.asyncio + async def test_yield_turn_is_passthrough(self): + """yield_turn mode also works with CodexTurn (no streaming infra needed).""" + from agentex.lib.adk import CodexTurn + from agentex.lib.core.harness import UnifiedEmitter + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + + events = [e async for e in emitter.yield_turn(turn)] + assert len(events) > 0 + + +# --------------------------------------------------------------------------- +# Live tests +# --------------------------------------------------------------------------- + +LIVE = os.environ.get("CODEX_LIVE_TESTS", "") == "1" +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "ab-harness-codex") + + +@pytest.mark.skipif( + not LIVE, + reason="Set CODEX_LIVE_TESTS=1 and ensure codex CLI + OPENAI_API_KEY are available", +) +class TestLiveCodexAgent: + """End-to-end tests that require the real codex CLI and a running Agentex server.""" + + @pytest.fixture + def client(self): + from agentex import Agentex + + return Agentex(base_url=AGENTEX_API_BASE_URL) + + @pytest.fixture + def agent_id(self, client): + for agent in client.agents.list(): + if agent.name == AGENT_NAME: + return agent.id + raise ValueError(f"Agent {AGENT_NAME!r} not found.") + + def test_send_simple_message(self, client, agent_id: str): + """Async agents process events out of band, so create a task, send an + event, and poll the task's messages for the agent's response.""" + import time + import uuid + + from agentex.types import TextContentParam + from agentex.types.agent_rpc_params import ParamsSendEventRequest, ParamsCreateTaskRequest + + task = client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)).result + assert task is not None + + client.agents.send_event( + agent_id=agent_id, + params=ParamsSendEventRequest( + task_id=task.id, + content=TextContentParam( + author="user", + content="What is 3+3? Reply with just the number.", + type="text", + ), + ), + ) + + deadline = time.monotonic() + 60 + while time.monotonic() < deadline: + msgs = client.messages.list(task_id=task.id) + agent_msgs = [m for m in msgs if getattr(m.content, "author", None) == "agent"] + if agent_msgs: + assert len(agent_msgs) >= 1 + return + time.sleep(2) + + raise AssertionError("No agent response received within 60 s") diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/Dockerfile b/examples/tutorials/10_async/10_temporal/harness_codex/Dockerfile new file mode 100644 index 000000000..e2f8807fd --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/Dockerfile @@ -0,0 +1,42 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY 10_async/10_temporal/harness_codex/pyproject.toml /app/harness_codex/pyproject.toml +COPY 10_async/10_temporal/harness_codex/README.md /app/harness_codex/README.md + +WORKDIR /app/harness_codex + +COPY 10_async/10_temporal/harness_codex/project /app/harness_codex/project +COPY 10_async/10_temporal/harness_codex/tests /app/harness_codex/tests +COPY test_utils /app/test_utils + +RUN uv pip install --system .[dev] + +ENV PYTHONPATH=/app +ENV AGENT_NAME=at-harness-codex + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When deploying the worker, replace CMD with: +# CMD ["python", "-m", "project.run_worker"] diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/README.md b/examples/tutorials/10_async/10_temporal/harness_codex/README.md new file mode 100644 index 000000000..4f9b76955 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/README.md @@ -0,0 +1,48 @@ +# harness_codex (Temporal) + +Tutorial agent demonstrating the `convert_codex_to_agentex_events` tap, +`CodexTurn`, and `UnifiedEmitter` for a **Temporal-durable** async ACP agent. + +## What this tutorial shows + +- Spawning `codex exec --json` as a **local asyncio subprocess** (no Scale sandbox) + inside a Temporal workflow signal handler. +- Wrapping the stdout line stream in a `CodexTurn`. +- Delivering every canonical `StreamTaskMessage*` event to Redis via + `UnifiedEmitter.auto_send_turn`, passing `created_at=workflow.now()` for + deterministic Temporal replay timestamps. +- Keeping the codex thread ID on the workflow instance (durable across crashes + without an external `adk.state` round-trip). + +> **Production isolation note:** A tutorial agent runs the Codex CLI locally. +> Production-grade isolation (Scale sandbox, secret injection, MCP configuration) +> is handled by the golden agent at +> `teams/sgp/agents/golden_agent/project/harness/providers/codex.py`. + +> **Temporal determinism note:** Subprocess spawning happens inside +> `@workflow.signal` handler bodies. Temporal does NOT replay signal handler +> bodies (only `@workflow.run` is subject to replay constraints), so this is +> safe. A production agent would wrap the subprocess in a Temporal activity for +> full durability and retry semantics. + +## Live runs + +Live runs require: +1. The `codex` CLI on PATH: `npm install -g @openai/codex` +2. `OPENAI_API_KEY` set in the environment. +3. A running Temporal server. + +## Running offline unit tests + +```bash +cd /path/to/scale-agentex-python +uv run --all-packages --all-extras pytest examples/tutorials/10_async/10_temporal/harness_codex/tests/test_agent.py -q +``` + +## Running live integration tests + +```bash +export CODEX_LIVE_TESTS=1 +export OPENAI_API_KEY=sk-... +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/conftest.py b/examples/tutorials/10_async/10_temporal/harness_codex/conftest.py new file mode 100644 index 000000000..4ae6ce61a --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/conftest.py @@ -0,0 +1,17 @@ +"""Add the agent's project root to sys.path so ``import project`` works. + +Also sets minimal environment variables so FastACP, tracing, and the +Temporal workflow module can be imported without a running server. +""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(__file__)) + +# AGENT_NAME must match the manifest's agent name: the live test queries the +# server by this name, and project.workflow reads it at import time. +os.environ.setdefault("AGENT_NAME", "at-harness-codex") +os.environ.setdefault("ACP_URL", "http://localhost:8000") +os.environ.setdefault("WORKFLOW_NAME", "at-harness-codex") +os.environ.setdefault("WORKFLOW_TASK_QUEUE", "at_harness_codex_queue") diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/manifest.yaml b/examples/tutorials/10_async/10_temporal/harness_codex/manifest.yaml new file mode 100644 index 000000000..3bc21dccc --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/manifest.yaml @@ -0,0 +1,62 @@ +build: + context: + root: ../../../ + include_paths: + - 10_async/10_temporal/harness_codex + - test_utils + dockerfile: 10_async/10_temporal/harness_codex/Dockerfile + dockerignore: 10_async/10_temporal/harness_codex/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + worker: project/run_worker.py + +agent: + acp_type: async + name: at-harness-codex + description: Temporal tutorial agent driving the unified harness surface via local codex CLI subprocess + + temporal: + enabled: true + workflows: + - name: at-harness-codex + queue_name: at_harness_codex_queue + + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "at-harness-codex" + description: "Temporal tutorial agent driving the unified harness surface via local codex CLI subprocess" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/project/__init__.py b/examples/tutorials/10_async/10_temporal/harness_codex/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/project/acp.py b/examples/tutorials/10_async/10_temporal/harness_codex/project/acp.py new file mode 100644 index 000000000..39a81dde9 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/project/acp.py @@ -0,0 +1,32 @@ +"""ACP server for the Temporal Codex harness tutorial. + +This file is intentionally thin. When ``acp_type="async"`` is combined with +``TemporalACPConfig(type="temporal", ...)``, FastACP auto-wires: + + HTTP task/create -> @workflow.run on the workflow class + HTTP task/event/send -> @workflow.signal(SignalName.RECEIVE_EVENT) + HTTP task/cancel -> workflow cancellation via the Temporal client + +so we don't define any handlers here. The actual agent code lives in +``project/workflow.py`` and is executed by the Temporal worker +(``project/run_worker.py``), not by this HTTP process. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + ), +) diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/project/activities.py b/examples/tutorials/10_async/10_temporal/harness_codex/project/activities.py new file mode 100644 index 000000000..363347635 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/project/activities.py @@ -0,0 +1,145 @@ +"""Temporal activity for the Codex harness tutorial. + +Subprocess spawning (and any other I/O) must run inside a Temporal *activity*, +not in workflow code. Temporal runs workflow + signal-handler bodies on a +deterministic sandbox event loop that does not implement ``subprocess_exec`` +(or threads / sockets), so spawning ``codex exec`` directly in the signal +handler raises ``NotImplementedError``. This activity runs codex, drives the +``CodexTurn`` through ``UnifiedEmitter.auto_send_turn`` (the async Redis push +path), and returns the turn result to the workflow. + +The ``_spawn_codex`` / ``_process_stdout`` seams are injectable: offline tests +replace them with fakes that yield pre-recorded event lines so no real CLI +runs. +""" + +from __future__ import annotations + +import os +import codecs +import asyncio +from typing import Any +from datetime import datetime +from collections.abc import AsyncIterator + +from temporalio import activity + +from agentex.lib.adk import CodexTurn +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.utils.logging import make_logger +from agentex.lib.utils.model_utils import BaseModel + +logger = make_logger(__name__) + +RUN_CODEX_TURN_ACTIVITY = "run_codex_turn" + + +class RunCodexTurnParams(BaseModel): + """Arguments for one codex turn run inside an activity.""" + + task_id: str + prompt: str + model: str + trace_id: str | None = None + parent_span_id: str | None = None + thread_id: str | None = None + created_at: datetime | None = None + + +class RunCodexTurnResult(BaseModel): + """Result returned from the activity to the workflow.""" + + final_text: str + session_id: str | None = None + model: str | None = None + + +async def _spawn_codex( + model: str, + thread_id: str | None = None, +) -> asyncio.subprocess.Process: + """Spawn ``codex exec --json`` locally and return the live process. + + Injection seam: tests replace this function with a fake that returns a + mock process whose stdout yields pre-recorded event lines. + + The caller writes the prompt to stdin after the process starts, then + closes stdin so codex knows input is complete. + """ + base_flags = [ + "--json", + "--skip-git-repo-check", + "--dangerously-bypass-approvals-and-sandbox", + "--model", + model, + ] + + if thread_id: + cmd = ["codex", "exec", *base_flags, "resume", thread_id, "-"] + else: + cmd = ["codex", "exec", *base_flags, "-"] + + return await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + # Discard stderr: codex --json writes events to stdout; its stderr is + # progress/debug noise. Capturing it with PIPE but never reading it + # would deadlock once codex fills the OS pipe buffer (~64 KB). + stderr=asyncio.subprocess.DEVNULL, + env={**os.environ}, + ) + + +async def _process_stdout(process: asyncio.subprocess.Process) -> AsyncIterator[str]: + """Yield newline-delimited JSON lines from the process stdout. + + Uses an incremental UTF-8 decoder so a multibyte character split across two + 4 KB reads is decoded correctly instead of being corrupted at the boundary. + """ + assert process.stdout is not None + decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") + buffer = "" + while True: + chunk = await process.stdout.read(4096) + if not chunk: + break + buffer += decoder.decode(chunk) + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if line: + yield line + buffer += decoder.decode(b"", final=True) + if buffer.strip(): + yield buffer.strip() + + +@activity.defn(name=RUN_CODEX_TURN_ACTIVITY) +async def run_codex_turn(params: RunCodexTurnParams) -> dict[str, Any]: + """Run one codex turn end-to-end and stream events to the task. + + Runs in an activity (real asyncio loop) so subprocess I/O is permitted. + """ + process = await _spawn_codex(params.model, thread_id=params.thread_id) + + assert process.stdin is not None + process.stdin.write(params.prompt.encode("utf-8")) + await process.stdin.drain() + process.stdin.close() + + turn = CodexTurn(events=_process_stdout(process), model=params.model) + emitter = UnifiedEmitter( + task_id=params.task_id, + trace_id=params.trace_id, + parent_span_id=params.parent_span_id, + ) + result = await emitter.auto_send_turn(turn, created_at=params.created_at) + + await process.wait() + + return RunCodexTurnResult( + final_text=result.final_text, + session_id=turn.session_id, + model=turn.usage().model, + ).model_dump() diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/project/run_worker.py b/examples/tutorials/10_async/10_temporal/harness_codex/project/run_worker.py new file mode 100644 index 000000000..b8972806b --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/project/run_worker.py @@ -0,0 +1,41 @@ +"""Temporal worker for the Codex harness tutorial. + +Run as a separate long-lived process alongside the ACP HTTP server. The +worker polls Temporal for workflow + activity tasks and executes them. + +The codex CLI subprocess runs in the ``run_codex_turn`` activity (registered +below alongside the built-in Agentex activities), because subprocess I/O is not +permitted on the Temporal workflow event loop. +""" + +import asyncio + +from project.workflow import AtHarnessCodexWorkflow +from project.activities import run_codex_turn +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +environment_variables = EnvironmentVariables.refresh() +logger = make_logger(__name__) + + +async def main(): + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + worker = AgentexWorker(task_queue=task_queue_name) + + await worker.run( + activities=[run_codex_turn, *get_all_activities()], + workflow=AtHarnessCodexWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/project/workflow.py b/examples/tutorials/10_async/10_temporal/harness_codex/project/workflow.py new file mode 100644 index 000000000..1970b478f --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/project/workflow.py @@ -0,0 +1,145 @@ +"""Temporal workflow for the Codex harness tutorial. + +Demonstrates the ``convert_codex_to_agentex_events`` tap + ``CodexTurn`` + +``UnifiedEmitter`` for a Temporal-durable ACP agent. + +KEY CONCEPTS DEMONSTRATED: +- Running ``codex exec --json`` in the ``run_codex_turn`` activity. Subprocess + I/O is not permitted on the Temporal workflow event loop (the deterministic + sandbox loop does not implement ``subprocess_exec``), so the signal handler + delegates the turn to an activity, which also gets Temporal's retry + timeout + guarantees. +- Wrapping the stdout line stream in a ``CodexTurn`` (inside the activity). +- Delivering events via ``UnifiedEmitter.auto_send_turn``, which pushes + ``StreamTaskMessage*`` events to Redis so the UI sees tokens in real time. +- Passing ``created_at=workflow.now()`` for deterministic timestamps under + Temporal replay (required for Temporal-safe delivery). +- Persisting the codex thread ID on the workflow instance itself — Temporal's + workflow state is durable, so no external ``adk.state`` round-trip is needed. +""" + +from __future__ import annotations + +import os +from datetime import timedelta + +from temporalio import workflow + +from agentex.lib import adk +from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +with workflow.unsafe.imports_passed_through(): + from project.activities import RunCodexTurnParams, run_codex_turn + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + +MODEL = os.environ.get("CODEX_MODEL", "o4-mini") + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class AtHarnessCodexWorkflow(BaseWorkflow): + """Long-running Temporal workflow that runs codex exec for each turn. + + Conversation state (codex thread ID + turn counter) is kept on the + workflow instance. Temporal's durable replay reconstructs this state if + the worker crashes, so no external ``adk.state`` round-trip is needed. + """ + + def __init__(self): + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._turn_number = 0 + self._codex_thread_id: str | None = None + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + """Handle a new user message: spawn codex, stream events via UnifiedEmitter.""" + logger.info("Received task event: %s", params.task.id) + self._turn_number += 1 + + await adk.messages.create(task_id=params.task.id, content=params.event.content) + + user_message = params.event.content.content + + async with adk.tracing.span( + trace_id=params.task.id, + task_id=params.task.id, + name=f"Turn {self._turn_number}", + input={"message": user_message}, + ) as span: + # Delegate the subprocess turn to an activity: subprocess I/O is not + # permitted on the Temporal workflow event loop. The activity streams + # events to the task and returns the final text + codex thread id. + # workflow.now() gives a deterministic timestamp under replay. + result = await workflow.execute_activity( + run_codex_turn, + RunCodexTurnParams( + task_id=params.task.id, + prompt=user_message, + model=MODEL, + trace_id=params.task.id, + parent_span_id=span.id if span else None, + thread_id=self._codex_thread_id, + created_at=workflow.now(), + ), + start_to_close_timeout=timedelta(minutes=5), + ) + + # Persist the codex thread id so the next turn resumes the session. + session_id = result.get("session_id") + if session_id: + self._codex_thread_id = session_id + + if span: + span.output = { + "final_text": result.get("final_text"), + "model": result.get("model"), + } + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> str: + """Workflow entry point — keep the conversation alive for incoming signals.""" + logger.info("Task created: %s", params.task.id) + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized.\n" + f"Send me a message and I'll run codex (local subprocess) " + f"to answer, streaming events via the unified harness surface." + ), + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + return "Task completed" + + @workflow.signal + async def complete_task_signal(self) -> None: + """Graceful workflow shutdown signal.""" + logger.info("Received complete_task signal") + self._complete_task = True diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/pyproject.toml b/examples/tutorials/10_async/10_temporal/harness_codex/pyproject.toml new file mode 100644 index 000000000..c4d67d285 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/pyproject.toml @@ -0,0 +1,40 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "at-harness-codex" +version = "0.1.0" +description = "Temporal tutorial agent driving the unified harness surface via local codex CLI subprocess" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "temporalio>=1.18.2", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 + +[tool.pytest.ini_options] +asyncio_mode = "auto" diff --git a/examples/tutorials/10_async/10_temporal/harness_codex/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/harness_codex/tests/test_agent.py new file mode 100644 index 000000000..2066b35b1 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_codex/tests/test_agent.py @@ -0,0 +1,275 @@ +"""Tests for the Temporal Codex harness tutorial agent. + +LIVE tests (``TestLiveCodexAgent``): + - Require the ``codex`` CLI on PATH, ``OPENAI_API_KEY``, and a running + Temporal + Agentex server. + - Skipped automatically when ``CODEX_LIVE_TESTS`` is not set to ``1``. + +OFFLINE unit tests (``TestOfflineCodexWorkflow``): + - Inject a fake async iterator of pre-recorded codex event lines. + - Assert the signal handler drives ``UnifiedEmitter.auto_send_turn`` and + captures the codex thread ID on the workflow instance. + - Always run. +""" + +from __future__ import annotations + +import os +import json +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +SAMPLE_EVENTS: list[dict[str, Any]] = [ + {"type": "thread.started", "thread_id": "thread-temporal-1"}, + {"type": "turn.started"}, + { + "type": "item.started", + "item": {"id": "msg-t1", "type": "agent_message", "text": "Hello"}, + }, + { + "type": "item.completed", + "item": {"id": "msg-t1", "type": "agent_message", "text": "Hello from Temporal!"}, + }, + { + "type": "turn.completed", + "usage": {"input_tokens": 6, "output_tokens": 3, "total_tokens": 9}, + }, +] + + +async def _fake_event_stream(): + """Async iterator of pre-recorded codex event JSON lines (no subprocess).""" + for evt in SAMPLE_EVENTS: + yield json.dumps(evt) + + +class _FakeSpan: + id = "span-temporal-1" + output: Any = None + + async def __aenter__(self): + return self + + async def __aexit__(self, *a): + pass + + +class TestOfflineCodexWorkflow: + """Unit tests that run without a real codex CLI, Temporal, or network.""" + + @pytest.mark.asyncio + async def test_codex_turn_usage_with_temporal_events(self): + """CodexTurn.usage() is correct after exhausting the temporal sample events.""" + from agentex.lib.adk import CodexTurn + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + + _ = [e async for e in turn.events] + + usage = turn.usage() + assert usage.input_tokens == 6 + assert usage.output_tokens == 3 + assert usage.model == "o4-mini" + + @pytest.mark.asyncio + async def test_unified_emitter_auto_send_with_created_at(self): + """UnifiedEmitter.auto_send_turn accepts created_at=None without error.""" + from agentex.lib.adk import CodexTurn + from agentex.lib.core.harness import UnifiedEmitter + from agentex.types.task_message import TaskMessage + from agentex.types.text_content import TextContent + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + + real_task_msg = TaskMessage( + id="msg-fake", + task_id="t", + content=TextContent(type="text", author="agent", content=""), + ) + + fake_streaming = MagicMock() + fake_ctx = AsyncMock() + fake_ctx.__aenter__ = AsyncMock(return_value=fake_ctx) + fake_ctx.__aexit__ = AsyncMock(return_value=False) + fake_ctx.stream_update = AsyncMock(return_value=MagicMock()) + fake_ctx.close = AsyncMock() + fake_ctx.task_message = real_task_msg + fake_streaming.streaming_task_message_context = MagicMock(return_value=fake_ctx) + + emitter = UnifiedEmitter( + task_id="t", + trace_id=None, + parent_span_id=None, + streaming=fake_streaming, + ) + + result = await emitter.auto_send_turn(turn, created_at=None) + assert result is not None + + @pytest.mark.asyncio + async def test_thread_id_captured_after_exhausted_stream(self): + """CodexTurn._result captures the thread_id from thread.started.""" + from agentex.lib.adk import CodexTurn + + turn = CodexTurn(events=_fake_event_stream(), model="o4-mini") + _ = [e async for e in turn.events] + + assert turn._result is not None + assert turn._result["session_id"] == "thread-temporal-1" + + @pytest.mark.asyncio + async def test_signal_handler_delegates_to_activity_and_captures_thread_id(self): + """Signal handler runs the turn via execute_activity, increments the turn + counter, and captures the codex thread ID returned by the activity.""" + captured: dict[str, Any] = {} + + async def _fake_execute_activity(_activity, params, **_kw): + captured["params"] = params + return { + "session_id": "thread-temporal-1", + "final_text": "Hello from Temporal!", + "model": "o4-mini", + } + + with patch("project.workflow.adk.messages.create", new=AsyncMock()), patch( + "project.workflow.adk.tracing.span" + ) as mock_span, patch( + "project.workflow.workflow.execute_activity", new=_fake_execute_activity + ), patch("project.workflow.workflow.now", return_value=None): + mock_span.return_value = _FakeSpan() + + from project.workflow import AtHarnessCodexWorkflow + + wf = AtHarnessCodexWorkflow.__new__(AtHarnessCodexWorkflow) + wf._turn_number = 0 + wf._codex_thread_id = None + wf._complete_task = False + wf._display_name = "test" + + params = MagicMock() + params.task.id = "task-temporal-offline-1" + params.event.content.content = "say hello temporal" + + await wf.on_task_event_send(params) + + assert wf._turn_number == 1 + assert wf._codex_thread_id == "thread-temporal-1" + assert captured["params"].prompt == "say hello temporal" + assert captured["params"].thread_id is None + + @pytest.mark.asyncio + async def test_run_codex_turn_activity_streams_and_returns_thread_id(self): + """The run_codex_turn activity drives the turn and returns the thread id.""" + from agentex.lib.core.harness import UnifiedEmitter + + async def _fake_spawn(model, thread_id=None): # noqa: ARG001 + fake_stdin = MagicMock() + fake_stdin.write = MagicMock() + fake_stdin.drain = AsyncMock() + fake_stdin.close = MagicMock() + proc = MagicMock() + proc.stdin = fake_stdin + proc.wait = AsyncMock(return_value=0) + return proc + + async def _fake_process_stdout(_process): # noqa: ARG001 + for evt in SAMPLE_EVENTS: + yield json.dumps(evt) + + class _FakeTurnResult: + final_text = "Hello from Temporal!" + + async def _auto_send(_self, turn, *_a, **_kw): + async for _ in turn.events: + pass + return _FakeTurnResult() + + with patch("project.activities._spawn_codex", new=_fake_spawn), patch( + "project.activities._process_stdout", new=_fake_process_stdout + ), patch.object(UnifiedEmitter, "auto_send_turn", new=_auto_send): + from project.activities import RunCodexTurnParams, run_codex_turn + + result = await run_codex_turn( + RunCodexTurnParams( + task_id="task-temporal-offline-1", + prompt="say hello temporal", + model="o4-mini", + ) + ) + + assert result["session_id"] == "thread-temporal-1" + assert result["final_text"] == "Hello from Temporal!" + + +# --------------------------------------------------------------------------- +# Live tests +# --------------------------------------------------------------------------- + +LIVE = os.environ.get("CODEX_LIVE_TESTS", "") == "1" +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "at-harness-codex") + + +@pytest.mark.skipif( + not LIVE, + reason="Set CODEX_LIVE_TESTS=1 and ensure codex CLI + OPENAI_API_KEY + Temporal are available", +) +class TestLiveCodexAgent: + """End-to-end tests that require the real codex CLI, Temporal, and Agentex server.""" + + @pytest.fixture + def client(self): + from agentex import Agentex + + return Agentex(base_url=AGENTEX_API_BASE_URL) + + @pytest.fixture + def agent_id(self, client): + for agent in client.agents.list(): + if agent.name == AGENT_NAME: + return agent.id + raise ValueError(f"Agent {AGENT_NAME!r} not found.") + + def test_send_simple_message(self, client, agent_id: str): + """Temporal agents process events out of band, so create a task, send an + event, and poll the task's messages for the agent's response.""" + import time + import uuid + + from agentex.types import TextContentParam + from agentex.types.agent_rpc_params import ParamsSendEventRequest, ParamsCreateTaskRequest + + task = client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)).result + assert task is not None + + client.agents.send_event( + agent_id=agent_id, + params=ParamsSendEventRequest( + task_id=task.id, + content=TextContentParam( + author="user", + content="What is 5+5? Reply with just the number.", + type="text", + ), + ), + ) + + deadline = time.monotonic() + 90 + while time.monotonic() < deadline: + msgs = client.messages.list(task_id=task.id) + agent_msgs = [m for m in msgs if getattr(m.content, "author", None) == "agent"] + response_msgs = [ + m for m in agent_msgs if "Task initialized" not in str(getattr(m.content, "content", "")) + ] + if response_msgs: + assert len(response_msgs) >= 1 + return + time.sleep(3) + + raise AssertionError("No agent response received within 90 s") diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index c2b343b72..f6713be7c 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -18,6 +18,8 @@ ClaudeCodeTurn, claude_code_usage_to_turn_usage, ) +from agentex.lib.adk._modules._codex_sync import convert_codex_to_agentex_events +from agentex.lib.adk._modules._codex_turn import CodexTurn, codex_usage_to_turn_usage from agentex.lib.adk._modules.events import EventsModule from agentex.lib.adk._modules.messages import MessagesModule from agentex.lib.adk._modules.state import StateModule @@ -63,6 +65,10 @@ "convert_claude_code_to_agentex_events", "ClaudeCodeTurn", "claude_code_usage_to_turn_usage", + # Codex + "convert_codex_to_agentex_events", + "CodexTurn", + "codex_usage_to_turn_usage", # Providers "providers", # Utils diff --git a/src/agentex/lib/adk/_modules/_codex_sync.py b/src/agentex/lib/adk/_modules/_codex_sync.py new file mode 100644 index 000000000..b2b162a24 --- /dev/null +++ b/src/agentex/lib/adk/_modules/_codex_sync.py @@ -0,0 +1,587 @@ +"""Codex event-stream parser tap for the unified harness surface. + +Converts a ``codex exec --json`` newline-delimited event stream (already +produced by the golden agent's sandbox/subprocess orchestration) into the +Agentex canonical ``StreamTaskMessage*`` events. + +SCOPE +----- +This module is a **pure parser**. It receives pre-produced codex events +(``str`` lines or already-decoded ``dict`` objects) and yields canonical +``StreamTaskMessage*`` events. All subprocess management, sandbox +provisioning, secret injection, and MCP orchestration remain in the golden +agent at +``teams/sgp/agents/golden_agent/project/harness/providers/codex.py``. + +No deployable test agent is included here: running codex requires the +golden agent's sandbox environment and is out of scope for this library tap. + +OUT OF SCOPE (document here so future callers are not surprised): +- Subprocess / sandbox management +- OPENAI_API_KEY / secret injection +- MCP server configuration (--config /tmp/codex_config.toml) +- ``codex exec resume`` session tracking +- ``scale_sandbox`` imports + +CANONICAL MAPPING +----------------- +The table below lists every ``type`` field the codex exec JSON stream can +emit (from ``codex-rs/exec/src/exec_events.rs``) and its mapping. + +Top-level event types +~~~~~~~~~~~~~~~~~~~~~ + thread.started -> (no StreamTaskMessage; session_id captured + internally; surfaced via ``on_result`` callback) + turn.started -> (no StreamTaskMessage; turn was started before + codex launched; nothing to emit here) + turn.completed -> on_result(usage_dict, tool_count, reasoning_count) + yields no StreamTaskMessage (turn lifecycle is + managed by the activity layer) + turn.failed -> StreamTaskMessageFull(TextContent, error text) + error -> StreamTaskMessageFull(TextContent, error text) + +Item sub-types (item.started / item.updated / item.completed) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + agent_message -> text deltas: + item.started / item.updated -> StreamTaskMessageDelta(TextDelta) + item.completed -> StreamTaskMessageDone + reasoning -> reasoning: + item.started -> StreamTaskMessageStart(ReasoningContent) + item.updated -> (no-op; final text arrives on completed) + item.completed -> StreamTaskMessageFull(ReasoningContent) + command_execution -> tool request + response: + item.started -> StreamTaskMessageStart(ToolRequestContent) + + StreamTaskMessageDone + item.completed -> StreamTaskMessageFull(ToolResponseContent) + file_change -> same as command_execution + NOTE: file_change may only emit item.completed (no started); + a synthetic ToolRequestContent Full is emitted before the response. + mcp_tool_call -> same as command_execution + web_search -> same as command_execution + todo_list -> same as command_execution + collab_tool_call -> same as command_execution + error (item type) -> StreamTaskMessageFull(TextContent, error text) on completed only + +UNMAPPED / PARTIALLY MAPPED EVENTS +----------------------------------- + thread.started: session_id is extracted but not forwarded as a + StreamTaskMessage (no canonical content type for + session-lifecycle signals; captured in on_result). + turn.started: no-op; intentional (the caller owns turn lifecycle). + turn.completed: no StreamTaskMessage; usage is forwarded via + on_result so the caller can record it in a span + without this module needing to know about spans. + item.updated (reasoning): the intermediate cumulative text is discarded; + only item.completed carries the final text. + item.updated (tool): tool item types other than agent_message do not + emit updates; item.started opens the request and + item.completed closes it. +""" + +from __future__ import annotations + +import json +from typing import Any, Callable, AsyncIterator + +from agentex.lib.utils.logging import make_logger +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.task_message_content import TextContent +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta + +logger = make_logger(__name__) + +# Canonical type alias matching the unified harness surface. +StreamTaskMessage = StreamTaskMessageStart | StreamTaskMessageDelta | StreamTaskMessageFull | StreamTaskMessageDone + +_MAX_RESULT_LENGTH = 4000 + + +def _truncate(text: str, max_len: int = _MAX_RESULT_LENGTH) -> str: + return str(text)[:max_len] + + +def _tool_name_for(item_type: str, payload: dict[str, Any]) -> str: + """Derive a canonical tool name from a codex item type.""" + if item_type == "command_execution": + return "bash" + if item_type == "file_change": + return "file_change" + if item_type == "mcp_tool_call": + server = payload.get("server", "") + tool = payload.get("tool", "") + return f"{server}.{tool}" if (server or tool) else "mcp_tool_call" + if item_type == "web_search": + return "web_search" + if item_type == "todo_list": + return "todo_list" + if item_type == "collab_tool_call": + return "collab_tool_call" + return item_type or "unknown" + + +def _tool_args_for(item_type: str, payload: dict[str, Any]) -> dict[str, Any]: + """Extract canonical arguments dict from a codex item payload.""" + if item_type == "command_execution": + return {"command": payload.get("command", "")} + if item_type == "file_change": + return {"changes": payload.get("changes") or []} + if item_type == "mcp_tool_call": + args = payload.get("arguments") + return args if isinstance(args, dict) else {"value": args} + if item_type == "web_search": + return {"query": payload.get("query", "")} + if item_type == "todo_list": + return {"items": payload.get("items") or []} + if item_type == "collab_tool_call": + # Surface an arguments dict if the payload carries one (mirrors + # mcp_tool_call); otherwise no args rather than fabricating a shape. + args = payload.get("arguments") + return args if isinstance(args, dict) else {} + return {} + + +def _tool_output_for(item_type: str, payload: dict[str, Any]) -> tuple[str, bool]: + """Extract (result_text, is_error) from a completed codex tool item.""" + if item_type == "command_execution": + out = payload.get("aggregated_output") or "" + exit_code = payload.get("exit_code") + is_error = exit_code is not None and exit_code != 0 + return _truncate(out), is_error + if item_type in ("mcp_tool_call", "collab_tool_call"): + # collab_tool_call mirrors mcp_tool_call's error/result convention + # (see _tool_args_for); without this branch a failed collab call would + # fall through to the generic path and be reported as a success. + err = payload.get("error") + if err: + msg = err.get("message", "") if isinstance(err, dict) else str(err) + return _truncate(f"Error: {msg}"), True + result = payload.get("result") + if result is None: + return "", False + try: + return _truncate(json.dumps(result)), False + except (TypeError, ValueError): + return _truncate(str(result)), False + if item_type == "file_change": + changes = payload.get("changes") or [] + status = payload.get("status", "") + return f"status={status}, {len(changes)} changes", status == "failed" + try: + return _truncate(json.dumps(payload, default=str)), False + except (TypeError, ValueError): + return _truncate(str(payload)), False + + +def _error_full(message: str, next_index: int) -> StreamTaskMessageFull: + """Emit a one-shot TextContent full message for an error.""" + return StreamTaskMessageFull( + type="full", + index=next_index, + content=TextContent( + type="text", + author="agent", + content=f"Error: {message}", + format="plain", + ), + ) + + +class _CodexStreamProcessor: + """Stateful parser: consumes codex exec events, yields StreamTaskMessage*. + + Ported from the golden agent's ``_CodexEventProcessor`` in + ``project/harness/providers/codex.py``, adapted to yield + ``StreamTaskMessage*`` directly instead of ``HarnessEvent`` objects. + + State tracked: + - ``_next_index``: monotonically increasing message index. + - ``_text_index``: message index of the current open agent_message block. + - ``_text_accumulated``: cumulative text per agent_message item_id. + - ``_reasoning_index``: message index of the current open reasoning block. + - ``_reasoning_text``: latest cumulative reasoning text per item_id. + - ``_tool_open``: item_ids for which a ToolRequestContent Start was emitted + but no ToolResponseContent Full yet. + - ``_tool_item_types``: item_id -> item_type for open tool calls. + """ + + def __init__(self) -> None: + self._next_index: int = 0 + + # agent_message tracking + self._text_index: dict[str, int] = {} + self._text_accumulated: dict[str, str] = {} + + # reasoning tracking + self._reasoning_index: dict[str, int] = {} + self._reasoning_text: dict[str, str] = {} + + # tool tracking + self._tool_open: set[str] = set() + self._tool_item_types: dict[str, str] = {} + # Remember the tool_call_id assigned per item so the request and response + # halves agree even when item_id is empty (a recomputed fallback would + # drift as tool_call_count advances between started and completed). + self._tool_call_ids: dict[str, str] = {} + + # counters for on_result callback + self.tool_call_count: int = 0 + self.reasoning_count: int = 0 + self.session_id: str | None = None + + def _alloc(self) -> int: + idx = self._next_index + self._next_index += 1 + return idx + + def process(self, evt: dict[str, Any]) -> list[StreamTaskMessage]: + evt_type = evt.get("type", "") + + if evt_type == "thread.started": + sid = evt.get("thread_id") or "" + if sid: + self.session_id = sid + return [] + + if evt_type == "turn.started": + # The activity layer owns turn lifecycle; nothing to emit. + return [] + + if evt_type == "turn.completed": + # Usage forwarded via on_result callback (not a StreamTaskMessage). + return [] + + if evt_type == "turn.failed": + err = evt.get("error") or {} + msg = err.get("message", "codex turn failed") if isinstance(err, dict) else str(err) + return [_error_full(f"Codex turn failed: {msg}", self._alloc())] + + if evt_type == "error": + return [_error_full(evt.get("message", "codex error"), self._alloc())] + + if evt_type in ("item.started", "item.updated", "item.completed"): + item = evt.get("item") or {} + return self._handle_item(evt_type, item) + + logger.debug("[codex] unhandled event type=%s", evt_type) + return [] + + def _handle_item(self, evt_type: str, item: dict[str, Any]) -> list[StreamTaskMessage]: + item_id = item.get("id") or "" + item_type = item.get("type") or "" + out: list[StreamTaskMessage] = [] + + if item_type == "agent_message": + current = item.get("text") or "" + previous = self._text_accumulated.get(item_id, "") + + if evt_type in ("item.started", "item.updated"): + if item_id not in self._text_index: + idx = self._alloc() + self._text_index[item_id] = idx + out.append( + StreamTaskMessageStart( + type="start", + index=idx, + content=TextContent( + type="text", + author="agent", + content="", + ), + ) + ) + idx = self._text_index[item_id] + delta = "" + if current.startswith(previous) and len(current) > len(previous): + delta = current[len(previous) :] + elif current and current != previous: + delta = current + if delta: + out.append( + StreamTaskMessageDelta( + type="delta", + index=idx, + delta=TextDelta(type="text", text_delta=delta), + ) + ) + self._text_accumulated[item_id] = current + + elif evt_type == "item.completed": + if item_id not in self._text_index: + idx = self._alloc() + self._text_index[item_id] = idx + out.append( + StreamTaskMessageStart( + type="start", + index=idx, + content=TextContent( + type="text", + author="agent", + content="", + ), + ) + ) + idx = self._text_index[item_id] + delta = "" + if current.startswith(previous) and len(current) > len(previous): + delta = current[len(previous) :] + elif current and current != previous: + delta = current + if delta: + out.append( + StreamTaskMessageDelta( + type="delta", + index=idx, + delta=TextDelta(type="text", text_delta=delta), + ) + ) + out.append(StreamTaskMessageDone(type="done", index=idx)) + self._text_accumulated[item_id] = current + + elif item_type == "reasoning": + current = item.get("text") or "" + + if evt_type == "item.started": + idx = self._alloc() + self._reasoning_index[item_id] = idx + self._reasoning_text[item_id] = current + out.append( + StreamTaskMessageStart( + type="start", + index=idx, + content=ReasoningContent( + type="reasoning", + author="agent", + summary=[], + content=[], + style="active", + ), + ) + ) + if current: + out.append( + StreamTaskMessageDelta( + type="delta", + index=idx, + delta=ReasoningContentDelta( + type="reasoning_content", + content_index=0, + content_delta=current, + ), + ) + ) + + elif evt_type == "item.updated": + # Accumulate silently; final text arrives on item.completed. + self._reasoning_text[item_id] = current + + elif evt_type == "item.completed": + text = current or self._reasoning_text.get(item_id, "") + idx = self._reasoning_index.get(item_id) + if text: + self.reasoning_count += 1 + summary = text.strip().split("\n", 1)[0][:300] + final_content = ReasoningContent( + type="reasoning", + author="agent", + summary=[summary], + content=[text], + style="static", + ) + if idx is not None: + out.append( + StreamTaskMessageFull( + type="full", + index=idx, + content=final_content, + ) + ) + else: + # No started event was seen; emit a standalone Full. + out.append( + StreamTaskMessageFull( + type="full", + index=self._alloc(), + content=final_content, + ) + ) + elif idx is not None: + # Empty reasoning block — still need to close with a Done. + out.append(StreamTaskMessageDone(type="done", index=idx)) + + elif item_type in ( + "command_execution", + "file_change", + "mcp_tool_call", + "web_search", + "todo_list", + "collab_tool_call", + ): + # Resolve a stable id once per item; reuse it for both halves. + tool_call_id = self._tool_call_ids.get(item_id) + if tool_call_id is None: + tool_call_id = item_id or f"codex_tool_{self.tool_call_count + 1}" + self._tool_call_ids[item_id] = tool_call_id + + if evt_type == "item.started": + self.tool_call_count += 1 + self._tool_open.add(item_id) + self._tool_item_types[item_id] = item_type + name = _tool_name_for(item_type, item) + args = _tool_args_for(item_type, item) + req_idx = self._alloc() + out.append( + StreamTaskMessageStart( + type="start", + index=req_idx, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id=tool_call_id, + name=name, + arguments=args, + ), + ) + ) + out.append(StreamTaskMessageDone(type="done", index=req_idx)) + + elif evt_type == "item.completed": + # file_change items may only emit item.completed (no started). + if item_id not in self._tool_open: + self.tool_call_count += 1 + self._tool_open.add(item_id) + self._tool_item_types[item_id] = item_type + name = _tool_name_for(item_type, item) + args = _tool_args_for(item_type, item) + req_idx = self._alloc() + out.append( + StreamTaskMessageFull( + type="full", + index=req_idx, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id=tool_call_id, + name=name, + arguments=args, + ), + ) + ) + + actual_type = self._tool_item_types.get(item_id, item_type) + result_text, is_error = _tool_output_for(actual_type, item) + name = _tool_name_for(actual_type, item) + resp_content: dict[str, Any] = {"result": result_text} + if is_error: + resp_content["is_error"] = True + out.append( + StreamTaskMessageFull( + type="full", + index=self._alloc(), + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id=tool_call_id, + name=name, + content=resp_content, + ), + ) + ) + self._tool_open.discard(item_id) + # Free the id mapping so a later item reusing an empty id gets a + # fresh fallback rather than colliding with this one. + self._tool_call_ids.pop(item_id, None) + + elif item_type == "error": + if evt_type == "item.completed": + out.append(_error_full(item.get("message", "codex item error"), self._alloc())) + + else: + logger.debug("[codex] unhandled item type=%s evt=%s", item_type, evt_type) + + return out + + +async def convert_codex_to_agentex_events( + events: AsyncIterator[str | dict[str, Any]], + on_result: Callable[[dict[str, Any]], None] | None = None, +) -> AsyncIterator[StreamTaskMessage]: + """Convert a ``codex exec --json`` event stream into Agentex stream events. + + This is a pure parser tap. The caller must supply ``events`` as an async + iterator of either raw newline-delimited JSON strings or pre-decoded dicts. + No subprocess or sandbox management is done here. + + Args: + events: Async iterator of ``str`` (newline-delimited JSON lines) or + ``dict`` (pre-decoded event objects) as produced by the codex CLI's + ``--json`` flag via sandbox stdout. + on_result: Optional callback invoked once when a ``turn.completed`` + event is seen. Receives a dict with keys: + ``usage`` — the raw codex usage dict (or None) + ``session_id`` — the codex thread_id (or None) + ``tool_call_count`` — int + ``reasoning_count`` — int + Use this to record turn-level metrics / usage in the caller's span + without coupling this module to span/tracing APIs. + + Yields: + Canonical ``StreamTaskMessage*`` events (Start/Delta/Full/Done) with + ``TextContent``, ``ReasoningContent``, ``ToolRequestContent``, or + ``ToolResponseContent`` payloads. + + MAPPING (abbreviated — see module docstring for the full table) + thread.started -> no event; session_id captured for on_result + turn.started -> no event + turn.completed -> no event; triggers on_result callback + turn.failed / error -> StreamTaskMessageFull(TextContent, error) + agent_message -> Start + Deltas + Done + reasoning -> Start + Full(ReasoningContent) + command_execution -> Start(ToolRequest)+Done + Full(ToolResponse) + file_change -> Full(ToolRequest) + Full(ToolResponse) + mcp_tool_call -> Start(ToolRequest)+Done + Full(ToolResponse) + web_search / todo_list -> Start(ToolRequest)+Done + Full(ToolResponse) + collab_tool_call -> Start(ToolRequest)+Done + Full(ToolResponse) + """ + processor = _CodexStreamProcessor() + _pending_usage: dict[str, Any] | None = None + + async for raw in events: + if isinstance(raw, dict): + evt = raw + else: + line = raw.strip() if isinstance(raw, str) else "" + if not line: + continue + try: + evt = json.loads(line) + except json.JSONDecodeError: + logger.debug("[codex] non-JSON line: %s", line[:100]) + continue + + # Capture usage before processing so on_result can fire after flush. + if evt.get("type") == "turn.completed": + usage = evt.get("usage") + _pending_usage = usage if isinstance(usage, dict) else None + + messages = processor.process(evt) + for msg in messages: + yield msg + + if on_result is not None: + on_result( + { + "usage": _pending_usage, + "session_id": processor.session_id, + "tool_call_count": processor.tool_call_count, + "reasoning_count": processor.reasoning_count, + } + ) diff --git a/src/agentex/lib/adk/_modules/_codex_turn.py b/src/agentex/lib/adk/_modules/_codex_turn.py new file mode 100644 index 000000000..e7fa1d929 --- /dev/null +++ b/src/agentex/lib/adk/_modules/_codex_turn.py @@ -0,0 +1,214 @@ +"""CodexTurn: HarnessTurn implementation for the codex event-stream tap. + +Wraps ``convert_codex_to_agentex_events`` so callers can pass a ``CodexTurn`` +directly to ``UnifiedEmitter.yield_turn`` or ``UnifiedEmitter.auto_send_turn``. + +Usage:: + + from agentex.lib.adk import convert_codex_to_agentex_events + from agentex.lib.adk._modules._codex_turn import CodexTurn, codex_usage_to_turn_usage + + turn = CodexTurn(events=codex_event_stream, model="o4-mini") + async for msg in emitter.yield_turn(turn): + yield msg + turn_usage = turn.usage() + +OUT OF SCOPE +------------ +Like ``_codex_sync``, this module is a pure library tap. Subprocess +provisioning, sandbox setup, secret injection, and MCP configuration remain +in the golden agent (``teams/sgp/agents/golden_agent/project/harness/``). +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator + +from agentex.lib.core.harness.types import TurnUsage +from agentex.lib.adk._modules._codex_sync import ( + StreamTaskMessage, + convert_codex_to_agentex_events, +) + + +def codex_usage_to_turn_usage( + raw: dict[str, Any] | None, + *, + model: str | None = None, + tool_call_count: int = 0, + reasoning_count: int = 0, + duration_ms: int | None = None, + cost_usd: float | None = None, +) -> TurnUsage: + """Map a raw codex ``turn.completed`` usage dict to a canonical ``TurnUsage``. + + Codex reports token usage under the ``usage`` key of the + ``turn.completed`` event. The shape follows the OpenAI completion_tokens + convention because codex is built on OpenAI models: + + .. code-block:: json + + { + "input_tokens": 1234, + "output_tokens": 456, + "total_tokens": 1690 + } + + Additionally, codex may report ``reasoning_tokens`` for o-series models: + + .. code-block:: json + + { + "input_tokens": 1234, + "output_tokens": 456, + "reasoning_tokens": 200, + "total_tokens": 1690 + } + + Defensive rules: + - Missing ``raw`` or missing sub-keys default to ``None`` (not zero) so + downstream callers can distinguish "not reported" from "reported as 0". + - Real zeros (``0`` explicitly present in ``raw``) are preserved as ``0``. + - ``total_tokens`` is accepted from the payload or left as ``None``; + callers should not recompute it because codex may use cached tokens. + - ``cost_usd`` is passed through when codex reports it (not yet common); + defaults to ``None`` if absent. + + Args: + raw: The raw codex usage dict from ``turn.completed``, or ``None``. + model: Model string (e.g. "o4-mini") to attach to the usage record. + tool_call_count: Number of tool calls in the turn (from processor). + reasoning_count: Number of reasoning blocks (from processor). + duration_ms: Wall-clock duration of the turn in milliseconds. + cost_usd: Cost in USD if the caller can derive it; ``None`` otherwise. + + Returns: + A populated ``TurnUsage`` instance. + """ + if not isinstance(raw, dict): + raw = {} + + def _int_or_none(key: str) -> int | None: + val = raw.get(key) + if val is None: + return None + try: + return int(val) + except (TypeError, ValueError): + return None + + def _float_or_none(key: str) -> float | None: + val = raw.get(key) + if val is None: + return None + try: + return float(val) + except (TypeError, ValueError): + return None + + # cost_usd: prefer explicitly passed value, then fall back to raw payload. + effective_cost = cost_usd if cost_usd is not None else _float_or_none("cost_usd") + + return TurnUsage( + model=model or None, + input_tokens=_int_or_none("input_tokens"), + output_tokens=_int_or_none("output_tokens"), + cached_input_tokens=_int_or_none("cached_input_tokens"), + reasoning_tokens=_int_or_none("reasoning_tokens"), + total_tokens=_int_or_none("total_tokens"), + cost_usd=effective_cost, + duration_ms=duration_ms, + num_llm_calls=1, + num_tool_calls=tool_call_count, + num_reasoning_blocks=reasoning_count, + ) + + +class CodexTurn: + """A single codex turn as a ``HarnessTurn``. + + Implements the ``HarnessTurn`` protocol so it can be passed to + ``UnifiedEmitter.yield_turn`` and ``UnifiedEmitter.auto_send_turn``. + + ``usage()`` is valid only after ``events`` has been fully consumed (i.e. + the async generator has been exhausted). Calling ``usage()`` before + exhaustion returns a zero-value ``TurnUsage`` with only ``model`` set. + + Args: + events: An async iterator of ``str | dict`` codex events, as + produced by reading ``codex exec --json`` stdout line by line. + model: Model string to attach to the ``TurnUsage``. + duration_ms: Optional turn wall-clock duration in milliseconds. + cost_usd: Optional cost in USD; ``None`` if not known. + """ + + def __init__( + self, + events: AsyncIterator[str | dict[str, Any]], + *, + model: str | None = None, + duration_ms: int | None = None, + cost_usd: float | None = None, + ) -> None: + self._raw_events = events + self._model = model + # Public + mutable: the true wall-clock duration (and cost) is usually + # only known after the stream is consumed, so callers may set these + # after construction and before calling usage(). + self.duration_ms = duration_ms + self.cost_usd = cost_usd + + # Populated by the on_result callback once the stream is exhausted. + self._result: dict[str, Any] | None = None + # The events generator is created at most once: ``_raw_events`` is a + # single-consumption AsyncIterator, so re-wrapping it would yield an + # already-exhausted stream that fires on_result with zeros and clobbers + # ``_result``. Cache the generator and hand back the same instance. + self._events_gen: AsyncIterator[StreamTaskMessage] | None = None + + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: + """Async iterator of canonical ``StreamTaskMessage*`` events. + + The ``on_result`` callback populates ``_result`` when the underlying + codex stream ends, so ``usage()`` returns meaningful data after + exhaustion. Returns the same generator on every access so the underlying + stream is consumed (and ``on_result`` fires) exactly once. + """ + if self._events_gen is None: + self._events_gen = convert_codex_to_agentex_events( + self._raw_events, + on_result=self._on_result, + ) + return self._events_gen + + def _on_result(self, result: dict[str, Any]) -> None: + self._result = result + + @property + def session_id(self) -> str | None: + """The codex session id, for resuming a multi-turn session. + + Valid only after ``events`` has been fully consumed (populated by the + ``on_result`` callback). Returns ``None`` if the stream is not yet + exhausted or codex reported no session id. + """ + return self._result.get("session_id") if self._result else None + + def usage(self) -> TurnUsage: + """Return normalized ``TurnUsage`` for this turn. + + Valid only after ``events`` has been fully consumed. Returns a + zero-value ``TurnUsage`` (model set, counts zero, tokens None) if + called before the stream ends. + """ + if self._result is None: + return TurnUsage(model=self._model) + return codex_usage_to_turn_usage( + self._result.get("usage"), + model=self._model, + tool_call_count=self._result.get("tool_call_count", 0), + reasoning_count=self._result.get("reasoning_count", 0), + duration_ms=self.duration_ms, + cost_usd=self.cost_usd, + ) diff --git a/tests/lib/adk/providers/test_openai_turn.py b/tests/lib/adk/providers/test_openai_turn.py index 023b0ed4e..47a9ba9fe 100644 --- a/tests/lib/adk/providers/test_openai_turn.py +++ b/tests/lib/adk/providers/test_openai_turn.py @@ -65,7 +65,9 @@ def test_usage_mapping_none_usage(): turn_usage = openai_usage_to_turn_usage(None, model="gpt-4o") assert turn_usage.model == "gpt-4o" - assert turn_usage.num_llm_calls == 0 + # num_llm_calls is None ("not reported") when no usage is present, matching + # the token fields below; a real 0 is only reported when the provider says so. + assert turn_usage.num_llm_calls is None assert turn_usage.input_tokens is None assert turn_usage.output_tokens is None assert turn_usage.total_tokens is None diff --git a/tests/lib/adk/test_codex_sync.py b/tests/lib/adk/test_codex_sync.py new file mode 100644 index 000000000..d0093e5dd --- /dev/null +++ b/tests/lib/adk/test_codex_sync.py @@ -0,0 +1,671 @@ +"""Offline tests for the codex event-stream parser tap. + +Tests cover: +- Text streaming (agent_message items) +- Tool call streaming (command_execution, mcp_tool_call, file_change) +- Reasoning streaming (reasoning items) +- Multi-step turns +- Error events (top-level + item-level) +- Edge cases: empty events, non-JSON lines, unknown types +- on_result callback (session_id, usage, counters) +- file_change synthesized start (no item.started emitted by codex) +""" + +from __future__ import annotations + +import json +from typing import Any, AsyncIterator + +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.task_message_content import TextContent +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._codex_sync import ( + _truncate, + _tool_args_for, + _tool_name_for, + _tool_output_for, + convert_codex_to_agentex_events, +) +from agentex.types.reasoning_content_delta import ReasoningContentDelta + + +async def _aiter(items: list[Any]) -> AsyncIterator[Any]: + for item in items: + yield item + + +async def _collect(stream: AsyncIterator[Any]) -> list[Any]: + return [e async for e in stream] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class TestHelpers: + def test_truncate_short(self) -> None: + assert _truncate("hello", max_len=10) == "hello" + + def test_truncate_long(self) -> None: + assert _truncate("a" * 5000) == "a" * 4000 + + def test_tool_name_command_execution(self) -> None: + assert _tool_name_for("command_execution", {}) == "bash" + + def test_tool_name_file_change(self) -> None: + assert _tool_name_for("file_change", {}) == "file_change" + + def test_tool_name_mcp_with_server_and_tool(self) -> None: + assert _tool_name_for("mcp_tool_call", {"server": "fs", "tool": "read"}) == "fs.read" + + def test_tool_name_mcp_empty(self) -> None: + assert _tool_name_for("mcp_tool_call", {}) == "mcp_tool_call" + + def test_tool_name_unknown(self) -> None: + assert _tool_name_for("", {}) == "unknown" + + def test_tool_args_command(self) -> None: + assert _tool_args_for("command_execution", {"command": "ls"}) == {"command": "ls"} + + def test_tool_args_file_change(self) -> None: + assert _tool_args_for("file_change", {"changes": ["a"]}) == {"changes": ["a"]} + + def test_tool_args_mcp_dict(self) -> None: + assert _tool_args_for("mcp_tool_call", {"arguments": {"k": "v"}}) == {"k": "v"} + + def test_tool_args_mcp_non_dict(self) -> None: + assert _tool_args_for("mcp_tool_call", {"arguments": "str"}) == {"value": "str"} + + def test_tool_output_command_success(self) -> None: + text, is_err = _tool_output_for("command_execution", {"aggregated_output": "hello", "exit_code": 0}) + assert text == "hello" + assert is_err is False + + def test_tool_output_command_error(self) -> None: + _, is_err = _tool_output_for("command_execution", {"aggregated_output": "boom", "exit_code": 1}) + assert is_err is True + + def test_tool_output_mcp_error(self) -> None: + text, is_err = _tool_output_for("mcp_tool_call", {"error": {"message": "not found"}}) + assert "not found" in text + assert is_err is True + + def test_tool_output_mcp_result(self) -> None: + text, is_err = _tool_output_for("mcp_tool_call", {"result": {"data": 1}}) + assert json.loads(text) == {"data": 1} + assert is_err is False + + def test_tool_output_file_change_failed(self) -> None: + _, is_err = _tool_output_for("file_change", {"status": "failed", "changes": []}) + assert is_err is True + + def test_tool_output_file_change_ok(self) -> None: + text, is_err = _tool_output_for("file_change", {"status": "ok", "changes": [1, 2]}) + assert "2 changes" in text + assert is_err is False + + +# --------------------------------------------------------------------------- +# Text streaming +# --------------------------------------------------------------------------- + + +class TestTextStreaming: + async def test_text_start_delta_done(self) -> None: + events = [ + {"type": "item.started", "item": {"id": "m1", "type": "agent_message", "text": "Hi"}}, + {"type": "item.updated", "item": {"id": "m1", "type": "agent_message", "text": "Hi!"}}, + {"type": "item.completed", "item": {"id": "m1", "type": "agent_message", "text": "Hi! Done"}}, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + + assert len(starts) == 1 + assert isinstance(starts[0].content, TextContent) + assert len(deltas) >= 1 + all_delta_text = "".join( + d.delta.text_delta for d in deltas if isinstance(d.delta, TextDelta) and d.delta.text_delta is not None + ) + assert "Hi" in all_delta_text + assert len(dones) == 1 + + async def test_text_indices_are_monotonic(self) -> None: + events = [ + {"type": "item.started", "item": {"id": "m1", "type": "agent_message", "text": "A"}}, + {"type": "item.completed", "item": {"id": "m1", "type": "agent_message", "text": "A"}}, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + anchor = [e for e in out if isinstance(e, StreamTaskMessageStart)] + done = [e for e in out if isinstance(e, StreamTaskMessageDone)] + assert anchor[0].index == done[0].index + + async def test_empty_text_no_delta(self) -> None: + events = [ + {"type": "item.started", "item": {"id": "m1", "type": "agent_message", "text": ""}}, + {"type": "item.completed", "item": {"id": "m1", "type": "agent_message", "text": ""}}, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)] + assert deltas == [] + + async def test_text_author_is_agent(self) -> None: + events = [ + {"type": "item.started", "item": {"id": "m1", "type": "agent_message", "text": "X"}}, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + for e in out: + content = getattr(e, "content", None) + if content and hasattr(content, "author"): + assert content.author == "agent" + + +# --------------------------------------------------------------------------- +# Tool call streaming +# --------------------------------------------------------------------------- + + +class TestToolCallStreaming: + async def test_command_execution_start_done_full(self) -> None: + events = [ + { + "type": "item.started", + "item": { + "id": "t1", + "type": "command_execution", + "command": "echo hello", + }, + }, + { + "type": "item.completed", + "item": { + "id": "t1", + "type": "command_execution", + "command": "echo hello", + "aggregated_output": "hello", + "exit_code": 0, + }, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + fulls = [e for e in out if isinstance(e, StreamTaskMessageFull)] + + assert len(starts) == 1 + assert isinstance(starts[0].content, ToolRequestContent) + assert starts[0].content.name == "bash" + assert starts[0].content.arguments == {"command": "echo hello"} + assert starts[0].content.tool_call_id == "t1" + + assert len(dones) == 1 + + assert len(fulls) == 1 + assert isinstance(fulls[0].content, ToolResponseContent) + resp_content = fulls[0].content.content + assert isinstance(resp_content, dict) + assert resp_content["result"] == "hello" + assert fulls[0].content.tool_call_id == "t1" + + async def test_empty_item_id_request_response_ids_match(self) -> None: + """A tool with an empty item_id must use the SAME fallback tool_call_id + on the request (started) and response (completed) halves.""" + events = [ + {"type": "item.started", "item": {"id": "", "type": "command_execution", "command": "ls"}}, + { + "type": "item.completed", + "item": { + "id": "", + "type": "command_execution", + "command": "ls", + "aggregated_output": ".", + "exit_code": 0, + }, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + # Pull tool_call_id inside the comprehension so the isinstance narrows the + # content union (the narrowing would not survive a later attribute access). + req_ids = [ + e.content.tool_call_id + for e in out + if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, ToolRequestContent) + ] + resp_ids = [ + e.content.tool_call_id + for e in out + if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ToolResponseContent) + ] + assert len(req_ids) == 1 and len(resp_ids) == 1 + assert req_ids[0] == resp_ids[0] + + async def test_file_change_synthesizes_start(self) -> None: + """file_change items may only emit item.completed (no started).""" + events = [ + { + "type": "item.completed", + "item": { + "id": "fc1", + "type": "file_change", + "changes": ["a.py"], + "status": "ok", + }, + } + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + tool_req = [ + e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ToolRequestContent) + ] + tool_resp = [ + e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ToolResponseContent) + ] + assert len(tool_req) == 1 + assert isinstance(tool_req[0].content, ToolRequestContent) + assert tool_req[0].content.name == "file_change" + assert len(tool_resp) == 1 + + async def test_mcp_tool_call_name(self) -> None: + events = [ + { + "type": "item.started", + "item": { + "id": "mcp1", + "type": "mcp_tool_call", + "server": "fs", + "tool": "read", + "arguments": {"path": "/x"}, + }, + }, + { + "type": "item.completed", + "item": { + "id": "mcp1", + "type": "mcp_tool_call", + "server": "fs", + "tool": "read", + "arguments": {"path": "/x"}, + "result": "content", + }, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + req = next( + e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, ToolRequestContent) + ) + assert isinstance(req.content, ToolRequestContent) + assert req.content.name == "fs.read" + + async def test_tool_error_marks_is_error(self) -> None: + events = [ + { + "type": "item.started", + "item": {"id": "cmd1", "type": "command_execution", "command": "bad"}, + }, + { + "type": "item.completed", + "item": { + "id": "cmd1", + "type": "command_execution", + "command": "bad", + "aggregated_output": "error output", + "exit_code": 127, + }, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + resp = next( + e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ToolResponseContent) + ) + assert isinstance(resp.content, ToolResponseContent) + resp_body = resp.content.content + assert isinstance(resp_body, dict) + assert resp_body.get("is_error") is True + + async def test_tool_indices_request_before_response(self) -> None: + events = [ + { + "type": "item.started", + "item": {"id": "cmd2", "type": "command_execution", "command": "ls"}, + }, + { + "type": "item.completed", + "item": { + "id": "cmd2", + "type": "command_execution", + "command": "ls", + "aggregated_output": ".", + "exit_code": 0, + }, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + req = next(e for e in out if isinstance(e, StreamTaskMessageStart)) + resp = next( + e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ToolResponseContent) + ) + assert req.index is not None and resp.index is not None + assert req.index < resp.index + + +# --------------------------------------------------------------------------- +# Reasoning +# --------------------------------------------------------------------------- + + +class TestReasoningStreaming: + async def test_reasoning_start_full(self) -> None: + events = [ + {"type": "item.started", "item": {"id": "r1", "type": "reasoning", "text": ""}}, + { + "type": "item.updated", + "item": {"id": "r1", "type": "reasoning", "text": "thinking..."}, + }, + { + "type": "item.completed", + "item": {"id": "r1", "type": "reasoning", "text": "thinking... done"}, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + fulls = [e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)] + + assert len(starts) == 1 + assert isinstance(starts[0].content, ReasoningContent) + assert len(fulls) == 1 + assert isinstance(fulls[0].content, ReasoningContent) + reasoning_content = fulls[0].content.content + assert reasoning_content is not None + assert any("thinking... done" in s for s in reasoning_content) + + async def test_reasoning_initial_text_emits_delta(self) -> None: + events = [ + { + "type": "item.started", + "item": {"id": "r1", "type": "reasoning", "text": "seed"}, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)] + assert len(deltas) == 1 + assert isinstance(deltas[0].delta, ReasoningContentDelta) + assert deltas[0].delta.content_delta == "seed" + + async def test_reasoning_no_started_emits_standalone_full(self) -> None: + """If item.completed arrives without item.started, emit a standalone Full.""" + events = [ + { + "type": "item.completed", + "item": {"id": "r_orphan", "type": "reasoning", "text": "orphan thought"}, + } + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + fulls = [e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)] + assert len(fulls) == 1 + assert isinstance(fulls[0].content, ReasoningContent) + orphan_content = fulls[0].content.content + assert orphan_content is not None + assert any("orphan thought" in s for s in orphan_content) + + async def test_reasoning_summary_is_first_line(self) -> None: + events = [ + {"type": "item.started", "item": {"id": "r2", "type": "reasoning", "text": ""}}, + { + "type": "item.completed", + "item": {"id": "r2", "type": "reasoning", "text": "line one\nline two"}, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + full = next(e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)) + assert isinstance(full.content, ReasoningContent) + assert full.content.summary == ["line one"] + + +# --------------------------------------------------------------------------- +# Error events +# --------------------------------------------------------------------------- + + +class TestErrorEvents: + async def test_turn_failed_emits_error_text(self) -> None: + events = [{"type": "turn.failed", "error": {"message": "context length exceeded"}}] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + assert len(out) == 1 + assert isinstance(out[0], StreamTaskMessageFull) + assert isinstance(out[0].content, TextContent) + assert "context length exceeded" in out[0].content.content + + async def test_top_level_error_emits_text(self) -> None: + events = [{"type": "error", "message": "unexpected EOF"}] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + assert len(out) == 1 + assert isinstance(out[0].content, TextContent) + assert "unexpected EOF" in out[0].content.content + + async def test_item_error_emits_on_completed_only(self) -> None: + events = [ + {"type": "item.started", "item": {"id": "e1", "type": "error", "message": "bad"}}, + {"type": "item.completed", "item": {"id": "e1", "type": "error", "message": "bad"}}, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + # Only item.completed emits an event for error items + assert len(out) == 1 + assert isinstance(out[0].content, TextContent) + assert "bad" in out[0].content.content + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + async def test_empty_stream(self) -> None: + out = await _collect(convert_codex_to_agentex_events(_aiter([]))) + assert out == [] + + async def test_non_json_lines_skipped(self) -> None: + events: list[str] = ["not json", "also not json"] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + assert out == [] + + async def test_blank_lines_skipped(self) -> None: + out = await _collect(convert_codex_to_agentex_events(_aiter(["", " ", "\n"]))) + assert out == [] + + async def test_pre_decoded_dict_events(self) -> None: + """Events passed as dicts (pre-decoded) should work without JSON parsing.""" + events: list[dict[str, Any]] = [ + {"type": "item.started", "item": {"id": "m1", "type": "agent_message", "text": "hi"}}, + { + "type": "item.completed", + "item": {"id": "m1", "type": "agent_message", "text": "hi"}, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + assert len(out) > 0 + + async def test_thread_started_no_message(self) -> None: + events = [{"type": "thread.started", "thread_id": "t1"}] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + assert out == [] + + async def test_turn_started_no_message(self) -> None: + out = await _collect(convert_codex_to_agentex_events(_aiter([{"type": "turn.started"}]))) + assert out == [] + + async def test_turn_completed_no_message(self) -> None: + out = await _collect( + convert_codex_to_agentex_events(_aiter([{"type": "turn.completed", "usage": {"input_tokens": 1}}])) + ) + assert out == [] + + async def test_unknown_event_type_no_message(self) -> None: + out = await _collect(convert_codex_to_agentex_events(_aiter([{"type": "some.future.event"}]))) + assert out == [] + + async def test_unknown_item_type_no_message(self) -> None: + out = await _collect( + convert_codex_to_agentex_events( + _aiter([{"type": "item.started", "item": {"id": "x", "type": "future_item"}}]) + ) + ) + assert out == [] + + +# --------------------------------------------------------------------------- +# on_result callback +# --------------------------------------------------------------------------- + + +class TestOnResult: + async def test_session_id_captured(self) -> None: + result: dict[str, Any] = {} + + def on_result(r: dict[str, Any]) -> None: + result.update(r) + + events = [ + {"type": "thread.started", "thread_id": "sess-xyz"}, + { + "type": "turn.completed", + "usage": {"input_tokens": 5, "output_tokens": 3, "total_tokens": 8}, + }, + ] + await _collect(convert_codex_to_agentex_events(_aiter(events), on_result=on_result)) + assert result["session_id"] == "sess-xyz" + + async def test_usage_forwarded(self) -> None: + result: dict[str, Any] = {} + + def on_result(r: dict[str, Any]) -> None: + result.update(r) + + events = [ + { + "type": "turn.completed", + "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + } + ] + await _collect(convert_codex_to_agentex_events(_aiter(events), on_result=on_result)) + assert result["usage"] == {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15} + + async def test_tool_count(self) -> None: + result: dict[str, Any] = {} + + def on_result(r: dict[str, Any]) -> None: + result.update(r) + + events = [ + { + "type": "item.started", + "item": {"id": "t1", "type": "command_execution", "command": "ls"}, + }, + { + "type": "item.completed", + "item": { + "id": "t1", + "type": "command_execution", + "command": "ls", + "aggregated_output": ".", + "exit_code": 0, + }, + }, + {"type": "turn.completed", "usage": None}, + ] + await _collect(convert_codex_to_agentex_events(_aiter(events), on_result=on_result)) + assert result["tool_call_count"] == 1 + + async def test_no_callback_when_none(self) -> None: + """Passing on_result=None should not raise.""" + events = [{"type": "turn.completed", "usage": None}] + out = await _collect(convert_codex_to_agentex_events(_aiter(events), on_result=None)) + assert out == [] + + async def test_on_result_called_even_without_turn_completed(self) -> None: + """on_result fires at end of stream even if turn.completed never arrived.""" + result: dict[str, Any] = {} + + def on_result(r: dict[str, Any]) -> None: + result.update(r) + + events: list[Any] = [] + await _collect(convert_codex_to_agentex_events(_aiter(events), on_result=on_result)) + assert result.get("usage") is None + assert result.get("session_id") is None + + +# --------------------------------------------------------------------------- +# Multi-step turn: tool → text +# --------------------------------------------------------------------------- + + +class TestMultiStepTurn: + async def test_tool_then_text_monotonic_indices(self) -> None: + events = [ + { + "type": "item.started", + "item": {"id": "cmd1", "type": "command_execution", "command": "ls"}, + }, + { + "type": "item.completed", + "item": { + "id": "cmd1", + "type": "command_execution", + "command": "ls", + "aggregated_output": "file.txt", + "exit_code": 0, + }, + }, + { + "type": "item.started", + "item": {"id": "msg1", "type": "agent_message", "text": ""}, + }, + { + "type": "item.completed", + "item": {"id": "msg1", "type": "agent_message", "text": "Done"}, + }, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + indices = [e.index for e in out] + assert indices == sorted(indices), "indices must be monotonically non-decreasing" + + async def test_two_text_blocks_distinct_indices(self) -> None: + events = [ + { + "type": "item.started", + "item": {"id": "a", "type": "agent_message", "text": "first"}, + }, + {"type": "item.completed", "item": {"id": "a", "type": "agent_message", "text": "first"}}, + { + "type": "item.started", + "item": {"id": "b", "type": "agent_message", "text": "second"}, + }, + {"type": "item.completed", "item": {"id": "b", "type": "agent_message", "text": "second"}}, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + assert len(starts) == 2 + assert starts[0].index != starts[1].index + + async def test_json_string_events(self) -> None: + """Events may arrive as raw newline-delimited JSON strings.""" + raw_events = [ + json.dumps({"type": "item.started", "item": {"id": "s1", "type": "agent_message", "text": "hello"}}), + json.dumps({"type": "item.completed", "item": {"id": "s1", "type": "agent_message", "text": "hello"}}), + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(raw_events))) + assert len(out) > 0 + assert any(isinstance(e, StreamTaskMessageStart) for e in out) diff --git a/tests/lib/adk/test_codex_turn.py b/tests/lib/adk/test_codex_turn.py new file mode 100644 index 000000000..f6a046478 --- /dev/null +++ b/tests/lib/adk/test_codex_turn.py @@ -0,0 +1,282 @@ +"""Offline tests for CodexTurn and codex_usage_to_turn_usage. + +Tests cover: +- TurnUsage normalization from raw codex usage dicts +- Defensive handling of missing/invalid usage fields +- CodexTurn: events property yields canonical StreamTaskMessage* +- CodexTurn: usage() before and after stream exhaustion +- CodexTurn: on_result wiring (session_id, counts propagate to usage()) +- CodexTurn satisfies HarnessTurn protocol +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator + +import pytest + +from agentex.lib.core.harness.types import TurnUsage, HarnessTurn +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.adk._modules._codex_turn import ( + CodexTurn, + codex_usage_to_turn_usage, +) + + +async def _aiter(items: list[Any]) -> AsyncIterator[Any]: + for item in items: + yield item + + +async def _collect(turn: CodexTurn) -> list[Any]: + return [msg async for msg in turn.events] + + +# --------------------------------------------------------------------------- +# codex_usage_to_turn_usage +# --------------------------------------------------------------------------- + + +class TestCodexUsageToTurnUsage: + def test_none_raw_all_none_tokens(self) -> None: + u = codex_usage_to_turn_usage(None) + assert u.input_tokens is None + assert u.output_tokens is None + assert u.total_tokens is None + assert u.cost_usd is None + + def test_empty_dict_all_none_tokens(self) -> None: + u = codex_usage_to_turn_usage({}) + assert u.input_tokens is None + assert u.output_tokens is None + + def test_standard_usage(self) -> None: + raw = {"input_tokens": 100, "output_tokens": 50, "total_tokens": 150} + u = codex_usage_to_turn_usage(raw, model="o4-mini") + assert u.input_tokens == 100 + assert u.output_tokens == 50 + assert u.total_tokens == 150 + assert u.model == "o4-mini" + + def test_reasoning_tokens(self) -> None: + raw = {"input_tokens": 200, "output_tokens": 80, "reasoning_tokens": 60, "total_tokens": 340} + u = codex_usage_to_turn_usage(raw) + assert u.reasoning_tokens == 60 + + def test_real_zero_preserved(self) -> None: + """Explicit zeros in the payload must survive (not be treated as missing).""" + raw = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + u = codex_usage_to_turn_usage(raw) + assert u.input_tokens == 0 + assert u.output_tokens == 0 + + def test_cached_input_tokens(self) -> None: + raw = {"input_tokens": 100, "cached_input_tokens": 20, "output_tokens": 40} + u = codex_usage_to_turn_usage(raw) + assert u.cached_input_tokens == 20 + + def test_invalid_token_values_become_none(self) -> None: + raw = {"input_tokens": "not_a_number", "output_tokens": None} + u = codex_usage_to_turn_usage(raw) + assert u.input_tokens is None + assert u.output_tokens is None + + def test_cost_explicit(self) -> None: + u = codex_usage_to_turn_usage(None, cost_usd=0.0042) + assert u.cost_usd == pytest.approx(0.0042) + + def test_cost_from_raw(self) -> None: + u = codex_usage_to_turn_usage({"cost_usd": 0.001}) + assert u.cost_usd == pytest.approx(0.001) + + def test_explicit_cost_overrides_raw(self) -> None: + """Explicit cost_usd kwarg takes precedence over raw dict value.""" + u = codex_usage_to_turn_usage({"cost_usd": 0.001}, cost_usd=0.002) + assert u.cost_usd == pytest.approx(0.002) + + def test_tool_and_reasoning_counts(self) -> None: + u = codex_usage_to_turn_usage(None, tool_call_count=3, reasoning_count=2) + assert u.num_tool_calls == 3 + assert u.num_reasoning_blocks == 2 + + def test_num_llm_calls_always_one(self) -> None: + u = codex_usage_to_turn_usage(None) + assert u.num_llm_calls == 1 + + def test_duration_ms(self) -> None: + u = codex_usage_to_turn_usage(None, duration_ms=1234) + assert u.duration_ms == 1234 + + def test_model_none_when_not_provided(self) -> None: + u = codex_usage_to_turn_usage(None) + assert u.model is None + + def test_non_dict_raw_treated_as_empty(self) -> None: + u = codex_usage_to_turn_usage("bad input") # type: ignore[arg-type] + assert u.input_tokens is None + + def test_returns_turn_usage_instance(self) -> None: + u = codex_usage_to_turn_usage({}) + assert isinstance(u, TurnUsage) + + +# --------------------------------------------------------------------------- +# CodexTurn protocol conformance +# --------------------------------------------------------------------------- + + +class TestCodexTurnProtocol: + def test_implements_harness_turn_protocol(self) -> None: + turn = CodexTurn(_aiter([]), model="o4-mini") + assert isinstance(turn, HarnessTurn) + + def test_usage_before_exhaustion_returns_zero_turn_usage(self) -> None: + turn = CodexTurn(_aiter([]), model="test-model") + u = turn.usage() + assert isinstance(u, TurnUsage) + assert u.model == "test-model" + assert u.input_tokens is None + assert u.num_tool_calls == 0 + + +# --------------------------------------------------------------------------- +# CodexTurn events +# --------------------------------------------------------------------------- + + +class TestCodexTurnEvents: + async def test_events_yield_stream_task_messages(self) -> None: + events = [ + {"type": "item.started", "item": {"id": "m1", "type": "agent_message", "text": "hi"}}, + {"type": "item.completed", "item": {"id": "m1", "type": "agent_message", "text": "hi"}}, + ] + turn = CodexTurn(_aiter(events), model="o4-mini") + out = await _collect(turn) + assert len(out) > 0 + for msg in out: + assert isinstance( + msg, + (StreamTaskMessageStart, StreamTaskMessageDelta, StreamTaskMessageFull, StreamTaskMessageDone), + ) + + async def test_usage_after_exhaustion_has_tokens(self) -> None: + events = [ + { + "type": "turn.completed", + "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + } + ] + turn = CodexTurn(_aiter(events), model="o4-mini") + await _collect(turn) + u = turn.usage() + assert u.input_tokens == 10 + assert u.output_tokens == 5 + assert u.total_tokens == 15 + + async def test_usage_model_propagated(self) -> None: + events = [{"type": "turn.completed", "usage": None}] + turn = CodexTurn(_aiter(events), model="codex-model-x") + await _collect(turn) + assert turn.usage().model == "codex-model-x" + + async def test_tool_count_in_usage(self) -> None: + events = [ + { + "type": "item.started", + "item": {"id": "t1", "type": "command_execution", "command": "ls"}, + }, + { + "type": "item.completed", + "item": { + "id": "t1", + "type": "command_execution", + "command": "ls", + "aggregated_output": ".", + "exit_code": 0, + }, + }, + {"type": "turn.completed", "usage": None}, + ] + turn = CodexTurn(_aiter(events), model="o4-mini") + await _collect(turn) + assert turn.usage().num_tool_calls == 1 + + async def test_events_property_stable_across_accesses(self) -> None: + """`.events` returns the same generator; usage survives a second access.""" + events = [ + { + "type": "item.started", + "item": {"id": "t1", "type": "command_execution", "command": "ls"}, + }, + { + "type": "item.completed", + "item": { + "id": "t1", + "type": "command_execution", + "command": "ls", + "aggregated_output": ".", + "exit_code": 0, + }, + }, + {"type": "turn.completed", "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}}, + ] + turn = CodexTurn(_aiter(events), model="o4-mini") + assert turn.events is turn.events # same generator, not a fresh wrapper + await _collect(turn) + # A second access must NOT re-wrap the exhausted iterator and reset usage. + _ = turn.events + assert turn.usage().total_tokens == 15 + assert turn.usage().num_tool_calls == 1 + + async def test_reasoning_count_in_usage(self) -> None: + events = [ + {"type": "item.started", "item": {"id": "r1", "type": "reasoning", "text": ""}}, + { + "type": "item.completed", + "item": {"id": "r1", "type": "reasoning", "text": "thought"}, + }, + {"type": "turn.completed", "usage": None}, + ] + turn = CodexTurn(_aiter(events), model="o4-mini") + await _collect(turn) + assert turn.usage().num_reasoning_blocks == 1 + + async def test_duration_ms_passed_through(self) -> None: + events = [{"type": "turn.completed", "usage": None}] + turn = CodexTurn(_aiter(events), model="o4-mini", duration_ms=999) + await _collect(turn) + assert turn.usage().duration_ms == 999 + + async def test_cost_usd_passed_through(self) -> None: + events = [{"type": "turn.completed", "usage": None}] + turn = CodexTurn(_aiter(events), model="o4-mini", cost_usd=0.007) + await _collect(turn) + assert turn.usage().cost_usd == pytest.approx(0.007) + + async def test_empty_stream_usage_still_valid(self) -> None: + turn = CodexTurn(_aiter([]), model="o4-mini") + await _collect(turn) + u = turn.usage() + assert isinstance(u, TurnUsage) + assert u.num_llm_calls == 1 + + async def test_reasoning_tokens_propagated(self) -> None: + events = [ + { + "type": "turn.completed", + "usage": { + "input_tokens": 100, + "output_tokens": 60, + "reasoning_tokens": 40, + "total_tokens": 200, + }, + } + ] + turn = CodexTurn(_aiter(events), model="o4-mini") + await _collect(turn) + assert turn.usage().reasoning_tokens == 40 diff --git a/tests/lib/adk/test_pydantic_ai_turn.py b/tests/lib/adk/test_pydantic_ai_turn.py index 0659895d3..46bf247a3 100644 --- a/tests/lib/adk/test_pydantic_ai_turn.py +++ b/tests/lib/adk/test_pydantic_ai_turn.py @@ -122,7 +122,7 @@ async def test_usage_before_exhaustion_returns_default(self): assert pre_usage.model == "openai:gpt-4o" assert pre_usage.input_tokens is None assert pre_usage.output_tokens is None - assert pre_usage.num_llm_calls == 0 + assert pre_usage.num_llm_calls is None async def test_turn_events_and_usage(self): """Driving events to exhaustion populates usage from the terminal event.""" @@ -227,7 +227,7 @@ async def test_no_usage_event_leaves_default_usage(self): usage = turn.usage() assert usage.model == "openai:gpt-4o" assert usage.input_tokens is None - assert usage.num_llm_calls == 0 + assert usage.num_llm_calls is None class TestToolRequestStreaming: diff --git a/tests/lib/core/harness/conformance/test_codex_conformance.py b/tests/lib/core/harness/conformance/test_codex_conformance.py new file mode 100644 index 000000000..b00ed2970 --- /dev/null +++ b/tests/lib/core/harness/conformance/test_codex_conformance.py @@ -0,0 +1,225 @@ +"""Conformance fixtures for the codex harness tap. + +Each fixture is derived from a ``CodexTurn`` and registered into the +cross-channel conformance runner so that span derivation is validated +alongside all other harness taps. + +Following the per-module registry pattern from runner.py: this module keeps +its own local list of fixtures, both registers them AND parametrizes over +them, to guarantee determinism regardless of pytest collection order. +""" + +from __future__ import annotations + +import asyncio +from typing import Any, AsyncIterator + +import pytest + +from agentex.lib.core.harness.types import StreamTaskMessage +from agentex.lib.adk._modules._codex_sync import convert_codex_to_agentex_events + +from .runner import Fixture, register, derive_all + + +async def _aiter(items: list[Any]) -> AsyncIterator[Any]: + for item in items: + yield item + + +async def _collect(events: list[Any]) -> list[StreamTaskMessage]: + return [msg async for msg in convert_codex_to_agentex_events(_aiter(events))] + + +def _build(events: list[Any]) -> list[StreamTaskMessage]: + return asyncio.run(_collect(events)) + + +# --------------------------------------------------------------------------- +# Fixture 1: plain text response +# --------------------------------------------------------------------------- + +_CODEX_TEXT = Fixture( + name="codex-text", + events=_build( + [ + {"type": "thread.started", "thread_id": "thread-abc"}, + {"type": "turn.started"}, + { + "type": "item.started", + "item": {"id": "msg1", "type": "agent_message", "text": "Hello"}, + }, + { + "type": "item.updated", + "item": {"id": "msg1", "type": "agent_message", "text": "Hello, world"}, + }, + { + "type": "item.completed", + "item": {"id": "msg1", "type": "agent_message", "text": "Hello, world!"}, + }, + { + "type": "turn.completed", + "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + }, + ] + ), +) +register(_CODEX_TEXT) + +# --------------------------------------------------------------------------- +# Fixture 2: tool call (command_execution) +# --------------------------------------------------------------------------- + +_CODEX_TOOL = Fixture( + name="codex-tool-command", + events=_build( + [ + {"type": "thread.started", "thread_id": "thread-cmd"}, + { + "type": "item.started", + "item": { + "id": "tool1", + "type": "command_execution", + "command": "ls /workspace", + }, + }, + { + "type": "item.completed", + "item": { + "id": "tool1", + "type": "command_execution", + "command": "ls /workspace", + "aggregated_output": "file1.txt\nfile2.py", + "exit_code": 0, + }, + }, + { + "type": "turn.completed", + "usage": {"input_tokens": 20, "output_tokens": 8, "total_tokens": 28}, + }, + ] + ), +) +register(_CODEX_TOOL) + +# --------------------------------------------------------------------------- +# Fixture 3: reasoning block +# --------------------------------------------------------------------------- + +_CODEX_REASONING = Fixture( + name="codex-reasoning", + events=_build( + [ + {"type": "thread.started", "thread_id": "thread-reason"}, + { + "type": "item.started", + "item": {"id": "r1", "type": "reasoning", "text": ""}, + }, + { + "type": "item.updated", + "item": {"id": "r1", "type": "reasoning", "text": "Step 1: analyze the problem"}, + }, + { + "type": "item.completed", + "item": { + "id": "r1", + "type": "reasoning", + "text": "Step 1: analyze the problem\nStep 2: solve it", + }, + }, + { + "type": "item.started", + "item": {"id": "msg2", "type": "agent_message", "text": ""}, + }, + { + "type": "item.completed", + "item": {"id": "msg2", "type": "agent_message", "text": "The answer is 42."}, + }, + { + "type": "turn.completed", + "usage": { + "input_tokens": 30, + "output_tokens": 20, + "reasoning_tokens": 50, + "total_tokens": 100, + }, + }, + ] + ), +) +register(_CODEX_REASONING) + +# --------------------------------------------------------------------------- +# Fixture 4: multi-step (mcp_tool_call + follow-up text) +# --------------------------------------------------------------------------- + +_CODEX_MULTI = Fixture( + name="codex-multi-step", + events=_build( + [ + {"type": "thread.started", "thread_id": "thread-multi"}, + { + "type": "item.started", + "item": { + "id": "mcp1", + "type": "mcp_tool_call", + "server": "filesystem", + "tool": "read_file", + "arguments": {"path": "/workspace/README.md"}, + }, + }, + { + "type": "item.completed", + "item": { + "id": "mcp1", + "type": "mcp_tool_call", + "server": "filesystem", + "tool": "read_file", + "arguments": {"path": "/workspace/README.md"}, + "result": {"content": "# My Project"}, + }, + }, + { + "type": "item.started", + "item": {"id": "msg3", "type": "agent_message", "text": "The README says:"}, + }, + { + "type": "item.completed", + "item": { + "id": "msg3", + "type": "agent_message", + "text": "The README says: # My Project", + }, + }, + { + "type": "turn.completed", + "usage": {"input_tokens": 50, "output_tokens": 30, "total_tokens": 80}, + }, + ] + ), +) +register(_CODEX_MULTI) + + +# --------------------------------------------------------------------------- +# Local parametrized tests (cross-channel conformance) +# --------------------------------------------------------------------------- + +_LOCAL_FIXTURES = [_CODEX_TEXT, _CODEX_TOOL, _CODEX_REASONING, _CODEX_MULTI] + + +@pytest.mark.parametrize("fixture", _LOCAL_FIXTURES, ids=lambda f: f.name) +def test_codex_span_derivation_is_deterministic(fixture: Fixture) -> None: + """Span derivation over codex events is deterministic (cross-channel guarantee). + + Deriving twice over the same events yields identical signals. This is the + invariant that makes ``yield`` and ``auto_send`` delivery equivalent: both + observe the same event stream, so their tracing side effects are identical. + """ + assert derive_all(fixture.events) == derive_all(fixture.events) + + +@pytest.mark.parametrize("fixture", _LOCAL_FIXTURES, ids=lambda f: f.name) +def test_codex_events_are_non_empty(fixture: Fixture) -> None: + """Every codex fixture yields at least one StreamTaskMessage*.""" + assert len(fixture.events) > 0