diff --git a/examples/tutorials/00_sync/060_claude_code/.dockerignore b/examples/tutorials/00_sync/060_claude_code/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/tutorials/00_sync/060_claude_code/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/tutorials/00_sync/060_claude_code/Dockerfile b/examples/tutorials/00_sync/060_claude_code/Dockerfile new file mode 100644 index 000000000..ec22d7e0b --- /dev/null +++ b/examples/tutorials/00_sync/060_claude_code/Dockerfile @@ -0,0 +1,46 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies including Node.js (required by the claude CLI) +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +# Install the claude CLI (requires Node.js) +# NOTE: live runs require ANTHROPIC_API_KEY in the environment. +RUN npm install -g @anthropic-ai/claude-code || true + +ENV UV_HTTP_TIMEOUT=1000 + +COPY 00_sync/060_claude_code/pyproject.toml /app/060_claude_code/pyproject.toml +COPY 00_sync/060_claude_code/README.md /app/060_claude_code/README.md + +WORKDIR /app/060_claude_code + +COPY 00_sync/060_claude_code/project /app/060_claude_code/project +COPY 00_sync/060_claude_code/tests /app/060_claude_code/tests +COPY test_utils /app/test_utils + +RUN uv pip install --system .[dev] + +ENV PYTHONPATH=/app + +ENV AGENT_NAME=s060-claude-code + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/00_sync/060_claude_code/README.md b/examples/tutorials/00_sync/060_claude_code/README.md new file mode 100644 index 000000000..e9c724732 --- /dev/null +++ b/examples/tutorials/00_sync/060_claude_code/README.md @@ -0,0 +1,76 @@ +# Tutorial 060: Sync Claude Code Agent + +This tutorial demonstrates how to build a **synchronous** agent that spawns the +Claude Code CLI as a local subprocess and streams its output through the Agentex +unified harness surface via ``ClaudeCodeTurn`` and ``UnifiedEmitter``. + +## Key Concepts + +### ClaudeCodeTurn + UnifiedEmitter + +``ClaudeCodeTurn`` wraps ``convert_claude_code_to_agentex_events``, which +parses the newline-delimited JSON envelopes emitted by +``claude -p --output-format stream-json``. It implements the ``HarnessTurn`` +protocol: an ``events`` async iterator of canonical ``StreamTaskMessage*`` +objects and a ``usage()`` method (populated once the stream is exhausted). + +``UnifiedEmitter.yield_turn(turn)`` is the sync delivery path: it forwards +events as HTTP yield chunks while tracing as a side effect. + +### Local subprocess spawn + +The ``_spawn_claude`` function in ``project/acp.py`` uses +``asyncio.create_subprocess_exec`` to run: + +``` +claude -p --output-format stream-json --verbose +``` + +The prompt is written to stdin. Stdout is read line by line and fed into +``ClaudeCodeTurn``. This is purely local -- no Scale sandbox is involved. + +Production isolation (Scale sandbox, secret injection, MCP configuration) +is the golden agent's concern at +``teams/sgp/agents/golden_agent/project/harness/providers/claude.py``. + +### Injectable spawn seam + +``_spawn_claude`` is a top-level async generator in ``project/acp.py``. +Tests monkeypatch it to inject pre-recorded stream-json lines instead of +spawning the real process, so offline unit tests run without the CLI. + +## Files + +| File | Description | +|------|-------------| +| ``project/acp.py`` | ACP server, ``_spawn_claude`` seam, and message handler | +| ``tests/test_agent.py`` | Live integration tests (needs CLI + API key) | +| ``tests/test_agent_offline.py`` | Offline unit tests with injected fake subprocess | +| ``manifest.yaml`` | Agent configuration | + +## Running Locally (live) + +Requires the ``claude`` CLI installed and ``ANTHROPIC_API_KEY`` set: + +```bash +npm install -g @anthropic-ai/claude-code +export ANTHROPIC_API_KEY=sk-ant-... +agentex agents run +``` + +## Running Offline Tests + +No CLI or API key needed: + +```bash +uv run pytest tests/test_agent_offline.py -v +``` + +## Notes + +- Production isolation (sandbox, secrets, MCP) is the golden agent's concern. + This tutorial runs the CLI directly to keep the code as simple as possible. +- Multi-turn session resumption (``claude -r ``) is out of scope + for this tutorial. See the golden agent for that pattern. +- The ``--verbose`` flag is included to match the golden agent's invocation; + it causes the CLI to emit ``stream_event`` triples for incremental streaming. diff --git a/examples/tutorials/00_sync/060_claude_code/manifest.yaml b/examples/tutorials/00_sync/060_claude_code/manifest.yaml new file mode 100644 index 000000000..56b9fd9e4 --- /dev/null +++ b/examples/tutorials/00_sync/060_claude_code/manifest.yaml @@ -0,0 +1,55 @@ +build: + context: + root: ../../ + include_paths: + - 00_sync/060_claude_code + - test_utils + dockerfile: 00_sync/060_claude_code/Dockerfile + dockerignore: 00_sync/060_claude_code/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: sync + name: s060-claude-code + description: A sync Claude Code agent streaming the unified harness surface via a local CLI subprocess + + temporal: + enabled: false + + credentials: + - env_var_name: ANTHROPIC_API_KEY + secret_name: anthropic-api-key + secret_key: api-key + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "s060-claude-code" + description: "A sync Claude Code agent streaming via local CLI subprocess" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/00_sync/060_claude_code/project/__init__.py b/examples/tutorials/00_sync/060_claude_code/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/00_sync/060_claude_code/project/acp.py b/examples/tutorials/00_sync/060_claude_code/project/acp.py new file mode 100644 index 000000000..aad53801a --- /dev/null +++ b/examples/tutorials/00_sync/060_claude_code/project/acp.py @@ -0,0 +1,137 @@ +"""ACP handler for the sync Claude Code tutorial. + +Spawns ``claude -p --output-format stream-json --verbose`` as a LOCAL +asyncio subprocess (no Scale sandbox -- that is the golden agent's +production concern). Stdout lines are fed into ``ClaudeCodeTurn``, which +wraps ``convert_claude_code_to_agentex_events``. Events are delivered via +``UnifiedEmitter.yield_turn``, the sync HTTP yield path. + +Live runs require the ``claude`` CLI to be installed and an +ANTHROPIC_API_KEY (or equivalent credential) to be in the environment. +For offline testing, see ``tests/test_agent_offline.py``, which injects a +fake subprocess. +""" + +from __future__ import annotations + +import os +import asyncio +from typing import AsyncIterator, AsyncGenerator + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from agentex.lib.adk import ClaudeCodeTurn +from agentex.lib.types.acp import SendMessageParams +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.types.task_message_update import TaskMessageUpdate +from agentex.types.task_message_content import TaskMessageContent +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create(acp_type="sync") + + +async def _spawn_claude(prompt: str) -> AsyncIterator[str]: + """Spawn ``claude -p --output-format stream-json`` locally and yield stdout lines. + + This is a seam: tests replace it with a fake async iterator of + pre-recorded lines so no real CLI invocation is needed offline. + """ + proc = await asyncio.create_subprocess_exec( + "claude", + "-p", + "--output-format", + "stream-json", + "--verbose", + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + assert proc.stdout is not None + assert proc.stdin is not None + + proc.stdin.write(prompt.encode()) + proc.stdin.close() + + # Drain stderr concurrently. With --verbose, Claude Code can write enough to + # stderr to fill the OS pipe buffer; if we only read stdout, the CLI blocks + # on its stderr write while we block reading stdout — a deadlock. A + # background task keeps stderr flowing so stdout never stalls. + async def _drain_stderr() -> None: + assert proc.stderr is not None + async for _ in proc.stderr: + pass + + stderr_task = asyncio.create_task(_drain_stderr()) + + try: + buffer = "" + async for chunk in proc.stdout: + buffer += chunk.decode("utf-8", errors="replace") + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if line: + yield line + + if buffer.strip(): + yield buffer.strip() + + await proc.wait() + finally: + # Release the subprocess and stderr drain task even if the consumer + # abandons the generator early (task cancellation / client disconnect): + # cancel the drain task and terminate+reap the process if it is still + # running, so neither is leaked. + stderr_task.cancel() + try: + await stderr_task + except asyncio.CancelledError: + pass + if proc.returncode is None: + try: + proc.terminate() + except ProcessLookupError: + pass + await proc.wait() + + +@acp.on_message_send +async def handle_message_send( + params: SendMessageParams, +) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]: + """Handle an incoming message: run Claude Code locally and stream events.""" + task_id = params.task.id + prompt = params.content.content + logger.info("Processing message for task %s", task_id) + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name="message", + input={"message": prompt}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + turn = ClaudeCodeTurn(_spawn_claude(prompt)) + async for event in emitter.yield_turn(turn): + yield event diff --git a/examples/tutorials/00_sync/060_claude_code/pyproject.toml b/examples/tutorials/00_sync/060_claude_code/pyproject.toml new file mode 100644 index 000000000..e5c1c4ea6 --- /dev/null +++ b/examples/tutorials/00_sync/060_claude_code/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "s060-claude-code" +version = "0.1.0" +description = "A sync Claude Code agent streaming the unified harness surface via a local CLI subprocess" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "python-dotenv>=1.0,<2", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] diff --git a/examples/tutorials/00_sync/060_claude_code/tests/test_agent.py b/examples/tutorials/00_sync/060_claude_code/tests/test_agent.py new file mode 100644 index 000000000..954a520f3 --- /dev/null +++ b/examples/tutorials/00_sync/060_claude_code/tests/test_agent.py @@ -0,0 +1,162 @@ +"""Tests for the sync Claude Code tutorial agent. + +LIVE tests (``TestClaudeCodeLive``): + - Require the ``claude`` CLI on PATH and ``ANTHROPIC_API_KEY`` set. + - Run the full agent end-to-end against a live Agentex server. + - Skipped automatically when ``CLAUDE_LIVE_TESTS`` is not set to ``1``. + +OFFLINE unit tests (``TestClaudeCodeOffline``): + - Inject a fake async iterator of pre-recorded stream-json lines. + - Assert the ``ClaudeCodeTurn`` + ``UnifiedEmitter`` pipeline yields events, + populates usage, and satisfies the ``HarnessTurn`` protocol. + - Always run -- no CLI or API key needed. +""" + +from __future__ import annotations + +import os +import json +from typing import AsyncIterator + +import pytest + +# --------------------------------------------------------------------------- +# Recorded stream-json fixtures +# --------------------------------------------------------------------------- + +_TEXT_ONLY_LINES: list[str] = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-offline-1"}), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Hello from Claude Code!"}]}, + } + ), + json.dumps( + { + "type": "result", + "usage": {"input_tokens": 10, "output_tokens": 5}, + "cost_usd": 0.0001, + "duration_ms": 250, + "num_turns": 1, + } + ), +] + + +async def _fake_lines(lines: list[str]) -> AsyncIterator[str]: + """Async iterator of pre-recorded stream-json lines (no subprocess).""" + for line in lines: + yield line + + +# --------------------------------------------------------------------------- +# Offline tests (always run -- no CLI or API key needed) +# --------------------------------------------------------------------------- + + +class TestClaudeCodeOffline: + """Unit tests that run without a real claude CLI or network.""" + + @pytest.mark.asyncio + async def test_yields_stream_events(self): + """ClaudeCodeTurn drives UnifiedEmitter and yields StreamTaskMessage* events.""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + from agentex.types.task_message_update import StreamTaskMessageStart + + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + + events = [e async for e in emitter.yield_turn(turn)] + assert len(events) > 0, "No events yielded" + assert any(isinstance(e, StreamTaskMessageStart) for e in events) + + @pytest.mark.asyncio + async def test_stream_task_message_done_present(self): + """StreamTaskMessageDone must appear after stream exhaustion.""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + from agentex.types.task_message_update import StreamTaskMessageDone + + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + + events = [e async for e in emitter.yield_turn(turn)] + assert any(isinstance(e, StreamTaskMessageDone) for e in events), ( + "Expected at least one StreamTaskMessageDone event" + ) + + @pytest.mark.asyncio + async def test_usage_populated_after_stream_exhausted(self): + """ClaudeCodeTurn.usage() returns correct tokens after stream is exhausted.""" + from agentex.lib.adk import ClaudeCodeTurn + + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + _ = [e async for e in turn.events] + usage = turn.usage() + assert usage.input_tokens == 10 + assert usage.output_tokens == 5 + assert usage.num_llm_calls == 1 + + @pytest.mark.asyncio + async def test_protocol_compliance(self): + """ClaudeCodeTurn satisfies the HarnessTurn protocol.""" + from agentex.lib.adk import ClaudeCodeTurn + + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + assert hasattr(turn, "events"), "ClaudeCodeTurn missing .events" + assert hasattr(turn, "usage"), "ClaudeCodeTurn missing .usage()" + + +# --------------------------------------------------------------------------- +# Live tests (skipped unless CLAUDE_LIVE_TESTS=1) +# --------------------------------------------------------------------------- + +pytestmark_live = pytest.mark.skipif( + not os.environ.get("CLAUDE_LIVE_TESTS"), + reason="Set CLAUDE_LIVE_TESTS=1 and ensure the `claude` CLI + ANTHROPIC_API_KEY are available", +) + +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "s060-claude-code") + + +@pytestmark_live +class TestClaudeCodeLive: + """Live streaming tests -- needs the claude CLI + ANTHROPIC_API_KEY.""" + + @pytest.fixture + def client(self): + from agentex import Agentex + + return Agentex(base_url=AGENTEX_API_BASE_URL) + + @pytest.fixture + def agent_name(self): + return AGENT_NAME + + def test_stream_simple_message(self, client, agent_name: str): + """Stream a simple prompt through the local Claude Code subprocess.""" + from test_utils.sync import collect_streaming_response + + from agentex.types import TextContentParam + from agentex.types.agent_rpc_params import ParamsSendMessageRequest + + stream = client.agents.send_message_stream( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="Reply with exactly three words: hello from claude", + type="text", + ) + ), + ) + aggregated_content, chunks = collect_streaming_response(stream) + assert aggregated_content is not None + assert len(chunks) >= 1 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/00_sync/060_claude_code/tests/test_agent_offline.py b/examples/tutorials/00_sync/060_claude_code/tests/test_agent_offline.py new file mode 100644 index 000000000..23ac52a57 --- /dev/null +++ b/examples/tutorials/00_sync/060_claude_code/tests/test_agent_offline.py @@ -0,0 +1,210 @@ +"""Offline unit tests for the sync Claude Code tutorial agent. + +These tests do NOT require the ``claude`` CLI or an ANTHROPIC_API_KEY. +They inject a fake async iterator of pre-recorded stream-json lines in +place of the real subprocess spawn, and a fake streaming backend in place +of the real Redis/AGP layer, then assert that the handler correctly drives +the unified surface (``UnifiedEmitter.yield_turn``). + +The injection seam is the ``_spawn_claude`` function in ``project/acp.py``. +Tests monkeypatch it with a coroutine that returns a pre-recorded async +iterator, so the handler code runs in full without any subprocess. +""" + +from __future__ import annotations + +import json +from typing import AsyncIterator + +import pytest + +from agentex.lib.adk import ClaudeCodeTurn +from agentex.lib.core.harness import UnifiedEmitter +from agentex.types.task_message_update import ( + StreamTaskMessageStart, +) + +# --------------------------------------------------------------------------- +# Recorded stream-json fixtures +# --------------------------------------------------------------------------- + +_TEXT_ONLY_LINES: list[str] = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-1"}), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Hello from Claude Code!"}]}, + } + ), + json.dumps( + { + "type": "result", + "usage": {"input_tokens": 10, "output_tokens": 5}, + "cost_usd": 0.0001, + "duration_ms": 250, + "num_turns": 1, + } + ), +] + +_TOOL_CALL_LINES: list[str] = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-2"}), + json.dumps( + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "tool_abc", + "name": "Bash", + "input": {"command": "echo hello"}, + } + ] + }, + } + ), + json.dumps( + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "tool_abc", + "content": "hello\n", + "is_error": False, + } + ] + }, + } + ), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Done."}]}, + } + ), + json.dumps( + { + "type": "result", + "usage": {"input_tokens": 20, "output_tokens": 8}, + "cost_usd": 0.0002, + "duration_ms": 400, + "num_turns": 1, + } + ), +] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _fake_lines(lines: list[str]) -> AsyncIterator[str]: + for line in lines: + yield line + + +async def _collect_yield_turn(lines: list[str]) -> list: + """Run a ClaudeCodeTurn through UnifiedEmitter.yield_turn and collect events.""" + turn = ClaudeCodeTurn(_fake_lines(lines)) + emitter = UnifiedEmitter(task_id="t1", trace_id=None, parent_span_id=None) + return [e async for e in emitter.yield_turn(turn)] + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_text_only_produces_start_and_done(): + events = await _collect_yield_turn(_TEXT_ONLY_LINES) + types = [type(e).__name__ for e in events] + assert "StreamTaskMessageStart" in types + assert "StreamTaskMessageDone" in types + + +@pytest.mark.asyncio +async def test_text_only_content(): + events = await _collect_yield_turn(_TEXT_ONLY_LINES) + starts = [e for e in events if isinstance(e, StreamTaskMessageStart)] + assert len(starts) == 1 + assert starts[0].content.type == "text" + + +@pytest.mark.asyncio +async def test_usage_is_populated_after_stream(): + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + _ = [e async for e in turn.events] + usage = turn.usage() + assert usage.input_tokens == 10 + assert usage.output_tokens == 5 + assert usage.cost_usd == pytest.approx(0.0001, rel=1e-4) + assert usage.num_llm_calls == 1 + + +@pytest.mark.asyncio +async def test_tool_call_produces_tool_request_and_response(): + events = await _collect_yield_turn(_TOOL_CALL_LINES) + content_types = { + getattr(e, "content", None) and getattr(e.content, "type", None) for e in events if hasattr(e, "content") + } + assert "tool_request" in content_types + assert "tool_response" in content_types + + +@pytest.mark.asyncio +async def test_tool_call_has_one_text_block(): + """The tool_use block is not text; only 'Done.' is the text block.""" + events = await _collect_yield_turn(_TOOL_CALL_LINES) + text_starts = [ + e for e in events if isinstance(e, StreamTaskMessageStart) and getattr(e.content, "type", None) == "text" + ] + assert len(text_starts) == 1 + + +@pytest.mark.asyncio +async def test_empty_lines_are_skipped(): + """Inserting blank lines in the stream must not crash the parser.""" + lines_with_blanks = ["", " "] + _TEXT_ONLY_LINES + [""] + events = await _collect_yield_turn(lines_with_blanks) + assert any(isinstance(e, StreamTaskMessageStart) for e in events) + + +@pytest.mark.asyncio +async def test_spawn_seam_concept(): + """Demonstrate the injectable spawn seam pattern used in project/acp.py. + + The ``_spawn_claude`` function in ``project/acp.py`` is a top-level async + generator. Production code calls it like:: + + turn = ClaudeCodeTurn(_spawn_claude(prompt)) + + In tests, a replacement function is injected (e.g. via monkeypatch) to + return pre-recorded lines. This test proves the pattern works end-to-end + without importing the full ACP module (which has module-level env-var + checks that only pass in a running agent environment). + """ + recorded_lines = _TEXT_ONLY_LINES + + async def _fake_spawn(prompt: str) -> AsyncIterator[str]: # noqa: ARG001 + """Drop-in replacement for _spawn_claude.""" + for line in recorded_lines: + yield line + + called_with: list[str] = [] + + async def _wrapped_spawn(prompt: str) -> AsyncIterator[str]: + called_with.append(prompt) + async for line in _fake_spawn(prompt): + yield line + + turn = ClaudeCodeTurn(_wrapped_spawn("test prompt")) + emitter = UnifiedEmitter(task_id="t2", trace_id=None, parent_span_id=None) + events = [e async for e in emitter.yield_turn(turn)] + + assert called_with == ["test prompt"] + assert any(isinstance(e, StreamTaskMessageStart) for e in events) diff --git a/examples/tutorials/10_async/00_base/130_claude_code/.dockerignore b/examples/tutorials/10_async/00_base/130_claude_code/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_claude_code/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/tutorials/10_async/00_base/130_claude_code/Dockerfile b/examples/tutorials/10_async/00_base/130_claude_code/Dockerfile new file mode 100644 index 000000000..e36b9e56d --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_claude_code/Dockerfile @@ -0,0 +1,43 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +RUN npm install -g @anthropic-ai/claude-code || true + +ENV UV_HTTP_TIMEOUT=1000 + +COPY 10_async/00_base/130_claude_code/pyproject.toml /app/130_claude_code/pyproject.toml +COPY 10_async/00_base/130_claude_code/README.md /app/130_claude_code/README.md + +WORKDIR /app/130_claude_code + +COPY 10_async/00_base/130_claude_code/project /app/130_claude_code/project +COPY 10_async/00_base/130_claude_code/tests /app/130_claude_code/tests +COPY test_utils /app/test_utils + +RUN uv pip install --system .[dev] + +ENV PYTHONPATH=/app + +ENV AGENT_NAME=ab130-claude-code + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/10_async/00_base/130_claude_code/README.md b/examples/tutorials/10_async/00_base/130_claude_code/README.md new file mode 100644 index 000000000..695207c57 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_claude_code/README.md @@ -0,0 +1,76 @@ +# Tutorial 130 (async/base): Async Claude Code Agent + +This tutorial demonstrates how to build an **async (non-Temporal)** agent that +spawns the Claude Code CLI as a local subprocess and delivers its output through +the Agentex unified harness surface via ``ClaudeCodeTurn`` and +``UnifiedEmitter.auto_send_turn``. + +## Key Concepts + +### Async delivery path + +Unlike the sync tutorial (060), this agent uses the async ACP model. The +``@acp.on_task_event_send`` handler does not return a generator -- instead, +``UnifiedEmitter.auto_send_turn(turn)`` pushes events to the task's Redis +stream in real time and returns a ``TurnResult`` when the turn is complete. +The UI polls or streams that Redis channel independently. + +### ClaudeCodeTurn + UnifiedEmitter + +Same tap as the sync tutorial: +- ``ClaudeCodeTurn`` wraps ``convert_claude_code_to_agentex_events``. +- ``UnifiedEmitter`` wires trace context + chosen delivery. +- ``auto_send_turn`` is the async push path. + +### Local subprocess spawn + +``_spawn_claude`` in ``project/acp.py`` uses ``asyncio.create_subprocess_exec`` +to run: + +``` +claude -p --output-format stream-json --verbose +``` + +The prompt is written to stdin. Stdout is read line by line. + +Production isolation (Scale sandbox, secret injection, MCP configuration) +is the golden agent's concern at +``teams/sgp/agents/golden_agent/project/harness/providers/claude.py``. + +### Injectable spawn seam + +``_spawn_claude`` is a top-level async generator. Tests monkeypatch it to +inject pre-recorded stream-json lines so offline unit tests run without the CLI. + +## Files + +| File | Description | +|------|-------------| +| ``project/acp.py`` | ACP server, ``_spawn_claude`` seam, and event handler | +| ``tests/test_agent.py`` | Live integration tests (needs CLI + API key) | +| ``tests/test_agent_offline.py`` | Offline unit tests with injected fake subprocess | +| ``manifest.yaml`` | Agent configuration | + +## Running Locally (live) + +Requires the ``claude`` CLI installed and ``ANTHROPIC_API_KEY`` set: + +```bash +npm install -g @anthropic-ai/claude-code +export ANTHROPIC_API_KEY=sk-ant-... +agentex agents run +``` + +## Running Offline Tests + +No CLI or API key needed: + +```bash +uv run pytest tests/test_agent_offline.py -v +``` + +## Notes + +- Production isolation (sandbox, secrets, MCP) is the golden agent's concern. +- For multi-turn memory, persist the Claude Code session_id from the + ``result`` envelope and pass it to ``claude -r `` on the next turn. diff --git a/examples/tutorials/10_async/00_base/130_claude_code/manifest.yaml b/examples/tutorials/10_async/00_base/130_claude_code/manifest.yaml new file mode 100644 index 000000000..7d74de7c6 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_claude_code/manifest.yaml @@ -0,0 +1,58 @@ +build: + context: + root: ../../../ + include_paths: + - 10_async/00_base/130_claude_code + - test_utils + dockerfile: 10_async/00_base/130_claude_code/Dockerfile + dockerignore: 10_async/00_base/130_claude_code/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: async + name: ab130-claude-code + description: An async Claude Code agent streaming the unified harness surface via a local CLI subprocess + + temporal: + enabled: false + + credentials: + - env_var_name: ANTHROPIC_API_KEY + secret_name: anthropic-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "ab130-claude-code" + description: "An async Claude Code agent streaming via local CLI subprocess" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/00_base/130_claude_code/project/__init__.py b/examples/tutorials/10_async/00_base/130_claude_code/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/00_base/130_claude_code/project/acp.py b/examples/tutorials/10_async/00_base/130_claude_code/project/acp.py new file mode 100644 index 000000000..b6681f6a8 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_claude_code/project/acp.py @@ -0,0 +1,149 @@ +"""ACP handler for the async Claude Code tutorial. + +Spawns ``claude -p --output-format stream-json --verbose`` as a LOCAL +asyncio subprocess (no Scale sandbox -- that is the golden agent's +production concern). Stdout lines are fed into ``ClaudeCodeTurn``. Events +are delivered via ``UnifiedEmitter.auto_send_turn``, the async Redis push +path. + +Live runs require the ``claude`` CLI to be installed and an +ANTHROPIC_API_KEY (or equivalent credential) in the environment. +For offline testing, see ``tests/test_agent_offline.py``. +""" + +from __future__ import annotations + +import os +import asyncio +from typing import AsyncIterator + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from agentex.lib.adk import ClaudeCodeTurn +from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.types.fastacp import AsyncACPConfig +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create( + acp_type="async", + config=AsyncACPConfig(type="base"), +) + + +async def _spawn_claude(prompt: str) -> AsyncIterator[str]: + """Spawn ``claude -p --output-format stream-json`` locally and yield stdout lines. + + Injectable seam: tests monkeypatch this with a fake async iterator of + pre-recorded lines so no real CLI invocation is needed offline. + """ + proc = await asyncio.create_subprocess_exec( + "claude", + "-p", + "--output-format", + "stream-json", + "--verbose", + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + assert proc.stdout is not None + assert proc.stdin is not None + + proc.stdin.write(prompt.encode()) + proc.stdin.close() + + # Drain stderr concurrently. With --verbose, Claude Code can write enough to + # stderr to fill the OS pipe buffer; if we only read stdout, the CLI blocks + # on its stderr write while we block reading stdout — a deadlock. A + # background task keeps stderr flowing so stdout never stalls. + async def _drain_stderr() -> None: + assert proc.stderr is not None + async for _ in proc.stderr: + pass + + stderr_task = asyncio.create_task(_drain_stderr()) + + try: + buffer = "" + async for chunk in proc.stdout: + buffer += chunk.decode("utf-8", errors="replace") + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if line: + yield line + + if buffer.strip(): + yield buffer.strip() + + await proc.wait() + finally: + # Release the subprocess and stderr drain task even if the consumer + # abandons the generator early (task cancellation / client disconnect): + # cancel the drain task and terminate+reap the process if it is still + # running, so neither is leaked. + stderr_task.cancel() + try: + await stderr_task + except asyncio.CancelledError: + pass + if proc.returncode is None: + try: + proc.terminate() + except ProcessLookupError: + pass + await proc.wait() + + +@acp.on_task_create +async def handle_task_create(params: CreateTaskParams): + logger.info("Task created: %s", params.task.id) + + +@acp.on_task_event_send +async def handle_task_event_send(params: SendEventParams): + """Handle a user message: spawn Claude Code locally and push events to the task stream.""" + task_id = params.task.id + prompt = params.event.content.content + logger.info("Processing message for task %s", task_id) + + await adk.messages.create(task_id=task_id, content=params.event.content) + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name="message", + input={"message": prompt}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + turn = ClaudeCodeTurn(_spawn_claude(prompt)) + result = await emitter.auto_send_turn(turn) + if turn_span: + turn_span.output = {"final_text": result.final_text} + + +@acp.on_task_cancel +async def handle_task_canceled(params: CancelTaskParams): + logger.info("Task canceled: %s", params.task.id) diff --git a/examples/tutorials/10_async/00_base/130_claude_code/pyproject.toml b/examples/tutorials/10_async/00_base/130_claude_code/pyproject.toml new file mode 100644 index 000000000..66c3cdaf3 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_claude_code/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "ab130-claude-code" +version = "0.1.0" +description = "An async Claude Code agent streaming the unified harness surface via a local CLI subprocess" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "python-dotenv>=1.0,<2", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] diff --git a/examples/tutorials/10_async/00_base/130_claude_code/tests/test_agent.py b/examples/tutorials/10_async/00_base/130_claude_code/tests/test_agent.py new file mode 100644 index 000000000..ee254da23 --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_claude_code/tests/test_agent.py @@ -0,0 +1,250 @@ +"""Tests for the async Claude Code tutorial agent. + +LIVE tests (``TestClaudeCodeLive``): + - Require the ``claude`` CLI on PATH and ``ANTHROPIC_API_KEY`` set. + - Run the full agent end-to-end against a live Agentex server. + - Skipped automatically when ``CLAUDE_LIVE_TESTS`` is not set to ``1``. + +OFFLINE unit tests (``TestClaudeCodeOffline``): + - Inject a fake async iterator of pre-recorded stream-json lines. + - Assert the ``ClaudeCodeTurn`` + ``UnifiedEmitter`` pipeline drives + ``auto_send_turn``, populates usage, and satisfies the ``HarnessTurn`` + protocol. + - Always run -- no CLI or API key needed. +""" + +from __future__ import annotations + +import os +import json +from typing import AsyncIterator + +import pytest + +from agentex.types.task_message import TaskMessage + +# --------------------------------------------------------------------------- +# Recorded stream-json fixtures +# --------------------------------------------------------------------------- + +_TEXT_ONLY_LINES: list[str] = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-offline-async-1"}), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Hello from async Claude Code!"}]}, + } + ), + json.dumps( + { + "type": "result", + "usage": {"input_tokens": 12, "output_tokens": 6}, + "cost_usd": 0.0001, + "duration_ms": 300, + "num_turns": 1, + } + ), +] + + +async def _fake_lines(lines: list[str]) -> AsyncIterator[str]: + """Async iterator of pre-recorded stream-json lines (no subprocess).""" + for line in lines: + yield line + + +# --------------------------------------------------------------------------- +# Fake streaming backend +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, sink, content_type, initial_content): + self.sink = sink + self.content_type = content_type + self.task_message = TaskMessage(id="msg-1", task_id="task-offline", content=initial_content) + + async def __aenter__(self): + self.sink.append(("open", self.content_type)) + return self + + async def __aexit__(self, *a): + await self.close() + return False + + async def close(self): + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update): + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + def __init__(self): + self.sink: list = [] + + def streaming_task_message_context(self, task_id, initial_content, streaming_mode="coalesced", created_at=None): # noqa: ARG002 + ctype = getattr(initial_content, "type", None) + self.sink.append(("ctx", ctype)) + return _FakeCtx(self.sink, ctype, initial_content) + + +# --------------------------------------------------------------------------- +# Offline tests (always run -- no CLI or API key needed) +# --------------------------------------------------------------------------- + + +class TestClaudeCodeOffline: + """Unit tests that run without a real claude CLI or network.""" + + @pytest.mark.asyncio + async def test_auto_send_text_only_opens_and_closes_context(self): + """auto_send_turn opens and closes exactly one streaming context.""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter( + task_id="offline-task", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + + opened = [s for s in fake_streaming.sink if s[0] == "open"] + closed = [s for s in fake_streaming.sink if s[0] == "close"] + assert len(opened) == 1 + assert len(closed) == 1 + assert opened[0][1] == "text" + + @pytest.mark.asyncio + async def test_auto_send_populates_final_text(self): + """auto_send_turn result carries the agent's reply text.""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter( + task_id="offline-task", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + assert "Hello from async Claude Code" in result.final_text + + @pytest.mark.asyncio + async def test_usage_populated_after_stream_exhausted(self): + """Usage is populated after the events stream is exhausted.""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter( + task_id="t", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + await emitter.auto_send_turn(turn) + usage = turn.usage() + assert usage.input_tokens == 12 + assert usage.output_tokens == 6 + assert usage.num_llm_calls == 1 + + @pytest.mark.asyncio + async def test_stream_task_message_done_present(self): + """StreamTaskMessageDone must appear via yield_turn on a ClaudeCodeTurn.""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + from agentex.types.task_message_update import StreamTaskMessageDone + + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + events = [e async for e in emitter.yield_turn(turn)] + assert any(isinstance(e, StreamTaskMessageDone) for e in events), ( + "Expected at least one StreamTaskMessageDone event" + ) + + +# --------------------------------------------------------------------------- +# Live tests (skipped unless CLAUDE_LIVE_TESTS=1) +# --------------------------------------------------------------------------- + +pytestmark_live = pytest.mark.skipif( + not os.environ.get("CLAUDE_LIVE_TESTS"), + reason="Set CLAUDE_LIVE_TESTS=1 and ensure the `claude` CLI + ANTHROPIC_API_KEY are available", +) + +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "ab130-claude-code") + + +@pytestmark_live +class TestClaudeCodeLive: + """Live async tests -- needs the claude CLI + ANTHROPIC_API_KEY.""" + + @pytest.fixture + def client(self): + from agentex import Agentex + + return Agentex(base_url=AGENTEX_API_BASE_URL) + + @pytest.fixture + def agent_name(self): + return AGENT_NAME + + @pytest.fixture + def agent_id(self, client, agent_name): + agents = client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent {agent_name!r} not found.") + + def test_send_simple_message(self, client, agent_id: str): + """Create a task, send a message, and poll until a response appears.""" + import time + import uuid + + from agentex.types import TextContentParam + from agentex.types.agent_rpc_params import ParamsSendEventRequest, ParamsCreateTaskRequest + + task = client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)).result + assert task is not None + task_id = task.id + + client.agents.send_event( + agent_id=agent_id, + params=ParamsSendEventRequest( + task_id=task_id, + content=TextContentParam( + author="user", + content="Reply with exactly three words: hello from claude", + type="text", + ), + ), + ) + + deadline = time.monotonic() + 60 + while time.monotonic() < deadline: + msgs = client.messages.list(task_id=task_id) + agent_msgs = [m for m in msgs if getattr(m.content, "author", None) == "agent"] + if agent_msgs: + assert len(agent_msgs) >= 1 + return + time.sleep(2) + + raise AssertionError("No agent response received within 60 s") + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/00_base/130_claude_code/tests/test_agent_offline.py b/examples/tutorials/10_async/00_base/130_claude_code/tests/test_agent_offline.py new file mode 100644 index 000000000..ac48474ee --- /dev/null +++ b/examples/tutorials/10_async/00_base/130_claude_code/tests/test_agent_offline.py @@ -0,0 +1,243 @@ +"""Offline unit tests for the async Claude Code tutorial agent. + +These tests do NOT require the ``claude`` CLI or an ANTHROPIC_API_KEY. +They inject a fake async iterator of pre-recorded stream-json lines in +place of the real subprocess spawn and a fake streaming backend, then +assert that the handler drives ``UnifiedEmitter.auto_send_turn`` correctly. + +The injection seam is the ``_spawn_claude`` function in ``project/acp.py``. +""" + +from __future__ import annotations + +import json +from typing import AsyncIterator + +import pytest + +from agentex.lib.adk import ClaudeCodeTurn +from agentex.lib.core.harness import UnifiedEmitter +from agentex.types.task_message import TaskMessage + +# --------------------------------------------------------------------------- +# Recorded fixtures +# --------------------------------------------------------------------------- + +_TEXT_ONLY_LINES: list[str] = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-1"}), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Hello from async Claude Code!"}]}, + } + ), + json.dumps( + { + "type": "result", + "usage": {"input_tokens": 12, "output_tokens": 6}, + "cost_usd": 0.0001, + "duration_ms": 300, + "num_turns": 1, + } + ), +] + +_TOOL_CALL_LINES: list[str] = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-2"}), + json.dumps( + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "tool_xyz", + "name": "Read", + "input": {"file_path": "/tmp/foo.txt"}, + } + ] + }, + } + ), + json.dumps( + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "tool_xyz", + "content": "file contents", + "is_error": False, + } + ] + }, + } + ), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Read the file."}]}, + } + ), + json.dumps( + { + "type": "result", + "usage": {"input_tokens": 25, "output_tokens": 10}, + "cost_usd": 0.0003, + "duration_ms": 500, + "num_turns": 1, + } + ), +] + + +# --------------------------------------------------------------------------- +# Fake streaming backend +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, sink, content_type, initial_content): + self.sink = sink + self.content_type = content_type + self.task_message = TaskMessage(id="msg-1", task_id="task-offline", content=initial_content) + + async def __aenter__(self): + self.sink.append(("open", self.content_type)) + return self + + async def __aexit__(self, *a): + await self.close() + return False + + async def close(self): + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update): + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + def __init__(self): + self.sink: list = [] + + def streaming_task_message_context(self, task_id, initial_content, streaming_mode="coalesced", created_at=None): # noqa: ARG002 + ctype = getattr(initial_content, "type", None) + self.sink.append(("ctx", ctype)) + return _FakeCtx(self.sink, ctype, initial_content) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _fake_lines(lines: list[str]) -> AsyncIterator[str]: + for line in lines: + yield line + + +async def _run_auto_send(lines: list[str]): + """Drive ClaudeCodeTurn through auto_send_turn with a fake streaming backend.""" + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_lines(lines)) + emitter = UnifiedEmitter( + task_id="offline-task", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + return result, fake_streaming.sink + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_text_only_opens_and_closes_context(): + result, sink = await _run_auto_send(_TEXT_ONLY_LINES) + opened = [s for s in sink if s[0] == "open"] + closed = [s for s in sink if s[0] == "close"] + assert len(opened) == 1 + assert len(closed) == 1 + assert opened[0][1] == "text" + + +@pytest.mark.asyncio +async def test_auto_send_populates_final_text(): + result, _ = await _run_auto_send(_TEXT_ONLY_LINES) + assert "Hello from async Claude Code" in result.final_text + + +@pytest.mark.asyncio +async def test_auto_send_usage_is_populated(): + """Usage is populated after the events stream is exhausted. + + UnifiedEmitter.auto_send_turn evaluates turn.usage() eagerly (before + the events are consumed) so the TurnResult.usage reflects a pre-exhaust + snapshot. Test usage directly from the turn after auto_send_turn completes + instead -- the result envelope is populated by the generator being consumed + inside auto_send. + """ + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + fake_streaming = _FakeStreaming() + emitter = UnifiedEmitter( + task_id="t", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + await emitter.auto_send_turn(turn) + # After auto_send_turn, the events generator is exhausted and + # ClaudeCodeTurn._on_result has been called with the result envelope. + usage = turn.usage() + assert usage.input_tokens == 12 + assert usage.output_tokens == 6 + assert usage.num_llm_calls == 1 + + +@pytest.mark.asyncio +async def test_auto_send_tool_call_opens_two_contexts(): + result, sink = await _run_auto_send(_TOOL_CALL_LINES) + opened = [s for s in sink if s[0] == "open"] + content_types = [s[1] for s in opened] + assert "tool_request" in content_types + assert "text" in content_types + + +@pytest.mark.asyncio +async def test_spawn_seam_concept(): + """Demonstrate the injectable spawn seam pattern used in project/acp.py. + + The ``_spawn_claude`` function is a top-level async generator. A drop-in + replacement can be injected (e.g. via monkeypatch) to supply pre-recorded + lines without spawning the real CLI. This test proves the pattern works + end-to-end without importing the full ACP module. + """ + called: list[str] = [] + + async def _fake_spawn(prompt: str) -> AsyncIterator[str]: + called.append(prompt) + for line in _TEXT_ONLY_LINES: + yield line + + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_spawn("ping")) + emitter = UnifiedEmitter( + task_id="t", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + + assert called == ["ping"] + assert "Hello from async Claude Code" in result.final_text diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/.dockerignore b/examples/tutorials/10_async/10_temporal/140_claude_code/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/Dockerfile b/examples/tutorials/10_async/10_temporal/140_claude_code/Dockerfile new file mode 100644 index 000000000..c909ee6c7 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/Dockerfile @@ -0,0 +1,46 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +RUN npm install -g @anthropic-ai/claude-code || true + +ENV UV_HTTP_TIMEOUT=1000 + +COPY 10_async/10_temporal/140_claude_code/pyproject.toml /app/140_claude_code/pyproject.toml +COPY 10_async/10_temporal/140_claude_code/README.md /app/140_claude_code/README.md + +WORKDIR /app/140_claude_code + +COPY 10_async/10_temporal/140_claude_code/project /app/140_claude_code/project +COPY 10_async/10_temporal/140_claude_code/tests /app/140_claude_code/tests +COPY test_utils /app/test_utils + +RUN uv pip install --system .[dev] + +ENV PYTHONPATH=/app + +ENV AGENT_NAME=at140-claude-code + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When deploying the worker, replace the CMD with: +# CMD ["python", "project/run_worker.py"] diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/README.md b/examples/tutorials/10_async/10_temporal/140_claude_code/README.md new file mode 100644 index 000000000..61cc94183 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/README.md @@ -0,0 +1,76 @@ +# Tutorial 140 (async/temporal): Temporal Claude Code Agent + +This tutorial demonstrates how to build a **Temporal-backed** agent that +spawns the Claude Code CLI as a local subprocess and delivers its output +through the Agentex unified harness surface via ``ClaudeCodeTurn`` and +``UnifiedEmitter.auto_send_turn``, with Temporal providing durable execution +and crash recovery. + +## Key Concepts + +### Temporal + ClaudeCodeTurn + +The Temporal workflow (``project/workflow.py``) holds state durably. Each user +message arrives as a signal (``on_task_event_send``), spawns the Claude Code +CLI locally, wraps the stdout line stream in ``ClaudeCodeTurn``, and pushes +events to the task's Redis stream via ``UnifiedEmitter.auto_send_turn``. + +``workflow.now()`` is passed as ``created_at`` so message timestamps are +deterministic under Temporal replay. + +### Multi-turn session resume + +The workflow persists the Claude Code ``session_id`` from the ``result`` +envelope. On the next turn, ``-r `` is passed to the CLI to +resume the conversation. Temporal's durable state ensures the session_id +survives worker crashes. + +### Note on subprocess in workflow code + +For simplicity, this tutorial spawns the subprocess directly inside the +workflow signal handler. For production use, move the spawn into a custom +Temporal activity so each subprocess invocation gets independent retry and +timeout guarantees. See +``examples/tutorials/10_async/10_temporal/030_custom_activities/`` for +that pattern. + +### Injectable spawn seam + +``_spawn_claude`` in ``project/workflow.py`` is a top-level async generator. +Tests monkeypatch it to inject pre-recorded stream-json lines so offline +unit tests run without the CLI. + +## Files + +| File | Description | +|------|-------------| +| ``project/acp.py`` | Thin ACP server; wires Temporal (no handlers) | +| ``project/workflow.py`` | Temporal workflow + ``_spawn_claude`` seam | +| ``project/run_worker.py`` | Temporal worker entry point | +| ``tests/test_agent.py`` | Live integration tests (needs CLI + Temporal + API key) | +| ``tests/test_agent_offline.py`` | Offline unit tests with injected fake subprocess | +| ``manifest.yaml`` | Agent configuration | + +## Running Locally (live) + +Requires Temporal server, the ``claude`` CLI, and ``ANTHROPIC_API_KEY``: + +```bash +npm install -g @anthropic-ai/claude-code +export ANTHROPIC_API_KEY=sk-ant-... +agentex agents run +``` + +## Running Offline Tests + +No CLI, Temporal, or API key needed: + +```bash +uv run pytest tests/test_agent_offline.py -v +``` + +## Notes + +- Production isolation (sandbox, secrets, MCP) is the golden agent's concern. +- The subprocess spawn should be moved to a custom activity in production. +- The ``--verbose`` flag is included to match the golden agent's invocation. diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/manifest.yaml b/examples/tutorials/10_async/10_temporal/140_claude_code/manifest.yaml new file mode 100644 index 000000000..9328b1713 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/manifest.yaml @@ -0,0 +1,62 @@ +build: + context: + root: ../../../ + include_paths: + - 10_async/10_temporal/140_claude_code + - test_utils + dockerfile: 10_async/10_temporal/140_claude_code/Dockerfile + dockerignore: 10_async/10_temporal/140_claude_code/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + worker: project/run_worker.py + +agent: + acp_type: async + name: at140-claude-code + description: A Temporal-backed Claude Code agent streaming the unified harness surface via a local CLI subprocess + + temporal: + enabled: true + workflows: + - name: at140-claude-code + queue_name: at140_claude_code_queue + + credentials: + - env_var_name: ANTHROPIC_API_KEY + secret_name: anthropic-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "at140-claude-code" + description: "A Temporal-backed Claude Code agent streaming via local CLI subprocess" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/project/__init__.py b/examples/tutorials/10_async/10_temporal/140_claude_code/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/project/acp.py b/examples/tutorials/10_async/10_temporal/140_claude_code/project/acp.py new file mode 100644 index 000000000..07258f6d8 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/project/acp.py @@ -0,0 +1,31 @@ +"""ACP server for the Temporal Claude Code tutorial. + +This file is intentionally thin. When ``acp_type="async"`` is combined +with ``TemporalACPConfig``, FastACP auto-wires: + + HTTP task/create -> @workflow.run on the workflow class + HTTP task/event/send -> @workflow.signal(SignalName.RECEIVE_EVENT) + HTTP task/cancel -> workflow cancellation via the Temporal client + +The actual agent code lives in ``project/workflow.py`` and is executed by +the Temporal worker (``project/run_worker.py``), not by this HTTP process. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + ), +) diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/project/activities.py b/examples/tutorials/10_async/10_temporal/140_claude_code/project/activities.py new file mode 100644 index 000000000..dcba0f9a7 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/project/activities.py @@ -0,0 +1,139 @@ +"""Temporal activity for the Claude Code tutorial. + +Subprocess spawning (and any other I/O) must run inside a Temporal *activity*, +not in workflow code. Temporal runs workflow + signal-handler bodies on a +deterministic sandbox event loop that does not implement ``subprocess_exec`` +(or threads / sockets), so spawning the CLI directly in the signal handler +raises ``NotImplementedError``. This activity runs the Claude Code CLI, drives +the ``ClaudeCodeTurn`` through ``UnifiedEmitter.auto_send_turn`` (the async +Redis push path), and returns the turn result to the workflow. + +The ``_spawn_claude`` async generator is an injectable seam: offline tests +provide a fake that yields pre-recorded stdout lines so no real CLI runs. +""" + +from __future__ import annotations + +import asyncio +from typing import Any, AsyncIterator +from datetime import datetime + +from temporalio import activity + +from agentex.lib.adk import ClaudeCodeTurn +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.utils.logging import make_logger +from agentex.lib.utils.model_utils import BaseModel + +logger = make_logger(__name__) + +RUN_CLAUDE_CODE_TURN_ACTIVITY = "run_claude_code_turn" + + +class RunClaudeCodeTurnParams(BaseModel): + """Arguments for one Claude Code turn run inside an activity.""" + + task_id: str + prompt: str + trace_id: str | None = None + parent_span_id: str | None = None + session_id: str | None = None + created_at: datetime | None = None + + +class RunClaudeCodeTurnResult(BaseModel): + """Result returned from the activity to the workflow.""" + + final_text: str + session_id: str | None = None + + +async def _spawn_claude(prompt: str, session_id: str | None = None) -> AsyncIterator[str]: + """Spawn ``claude -p --output-format stream-json`` locally and yield stdout lines. + + Pass ``session_id`` to resume a previous Claude Code session (multi-turn + memory via ``-r ``). + + Injectable seam: tests monkeypatch this with a fake async iterator so no + real CLI invocation is needed offline. + """ + cmd = [ + "claude", + "-p", + "--output-format", + "stream-json", + "--verbose", + ] + if session_id: + cmd.extend(["-r", session_id]) + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + assert proc.stdout is not None + assert proc.stdin is not None + + proc.stdin.write(prompt.encode()) + proc.stdin.close() + + # Drain stderr concurrently. With --verbose, Claude Code can write enough to + # stderr to fill the OS pipe buffer; if we only read stdout, the CLI blocks + # on its stderr write while we block reading stdout — a deadlock. A + # background task keeps stderr flowing so stdout never stalls. + async def _drain_stderr() -> None: + assert proc.stderr is not None + async for _ in proc.stderr: + pass + + stderr_task = asyncio.create_task(_drain_stderr()) + + try: + buffer = "" + async for chunk in proc.stdout: + buffer += chunk.decode("utf-8", errors="replace") + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if line: + yield line + + if buffer.strip(): + yield buffer.strip() + + await proc.wait() + finally: + # Release the subprocess and stderr drain task even if the consumer + # abandons the generator early (task cancellation / client disconnect): + # cancel the drain task and terminate+reap the process if it is still + # running, so neither is leaked. + stderr_task.cancel() + try: + await stderr_task + except asyncio.CancelledError: + pass + if proc.returncode is None: + try: + proc.terminate() + except ProcessLookupError: + pass + await proc.wait() + + +@activity.defn(name=RUN_CLAUDE_CODE_TURN_ACTIVITY) +async def run_claude_code_turn(params: RunClaudeCodeTurnParams) -> dict[str, Any]: + """Run one Claude Code turn end-to-end and stream events to the task. + + Runs in an activity (real asyncio loop) so subprocess I/O is permitted. + """ + emitter = UnifiedEmitter( + task_id=params.task_id, + trace_id=params.trace_id, + parent_span_id=params.parent_span_id, + ) + turn = ClaudeCodeTurn(_spawn_claude(params.prompt, session_id=params.session_id)) + result = await emitter.auto_send_turn(turn, created_at=params.created_at) + + return RunClaudeCodeTurnResult(final_text=result.final_text, session_id=turn.session_id).model_dump() diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/project/run_worker.py b/examples/tutorials/10_async/10_temporal/140_claude_code/project/run_worker.py new file mode 100644 index 000000000..58802737e --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/project/run_worker.py @@ -0,0 +1,41 @@ +"""Temporal worker for the Claude Code tutorial. + +Run as a separate long-lived process alongside the ACP HTTP server. The +worker polls Temporal for workflow + activity tasks and executes them. + +The Claude Code CLI subprocess runs in the ``run_claude_code_turn`` activity +(registered below alongside the built-in Agentex activities), because +subprocess I/O is not permitted on the Temporal workflow event loop. +""" + +import asyncio + +from project.workflow import At140ClaudeCodeWorkflow +from project.activities import run_claude_code_turn +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +environment_variables = EnvironmentVariables.refresh() +logger = make_logger(__name__) + + +async def main(): + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + worker = AgentexWorker(task_queue=task_queue_name) + + await worker.run( + activities=[run_claude_code_turn, *get_all_activities()], + workflow=At140ClaudeCodeWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py b/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py new file mode 100644 index 000000000..7f50ba8d5 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py @@ -0,0 +1,137 @@ +"""Temporal workflow for the Claude Code tutorial. + +Holds conversation state (session_id for multi-turn resume) durably across +crashes. Each user message triggers ``on_task_event_send``, which delegates the +turn to the ``run_claude_code_turn`` activity. The activity spawns the Claude +Code CLI, wraps its stdout in ``ClaudeCodeTurn``, and delivers the turn via +``UnifiedEmitter.auto_send_turn`` (the async Redis push path). + +Note on subprocess inside Temporal +------------------------------------ +Subprocess (and all other) I/O must run in a Temporal *activity*, never in +workflow code. Temporal runs workflow + signal-handler bodies on a +deterministic sandbox event loop that does not implement ``subprocess_exec`` +(spawning the CLI there raises ``NotImplementedError``). The activity also gets +Temporal's retry + timeout guarantees. See +``examples/tutorials/10_async/10_temporal/030_custom_activities/`` for the +activity pattern. +""" + +from __future__ import annotations + +import os +import json +from datetime import timedelta + +from temporalio import workflow + +from agentex.lib import adk +from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +with workflow.unsafe.imports_passed_through(): + from project.activities import RunClaudeCodeTurnParams, run_claude_code_turn + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class At140ClaudeCodeWorkflow(BaseWorkflow): + """Temporal workflow that runs Claude Code locally for each user message. + + Persists the Claude Code session_id across turns so the CLI can resume + the conversation (``-r ``). Temporal's durable state ensures + the session_id survives worker crashes. + """ + + def __init__(self): + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._turn_number = 0 + # Claude Code session_id for multi-turn resume. + self._session_id: str | None = None + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + """Handle a user message: spawn Claude Code and push events to the task stream.""" + self._turn_number += 1 + task_id = params.task.id + prompt = params.event.content.content + logger.info("Turn %d for task %s", self._turn_number, task_id) + + await adk.messages.create(task_id=task_id, content=params.event.content) + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name=f"Turn {self._turn_number}", + input={"message": prompt}, + ) as span: + # Delegate the subprocess turn to an activity: subprocess I/O is not + # permitted on the Temporal workflow event loop. The activity streams + # events to the task and returns the final text + session_id. + # workflow.now() gives a deterministic timestamp under replay. + result = await workflow.execute_activity( + run_claude_code_turn, + RunClaudeCodeTurnParams( + task_id=task_id, + prompt=prompt, + trace_id=task_id, + parent_span_id=span.id if span else None, + session_id=self._session_id, + created_at=workflow.now(), + ), + start_to_close_timeout=timedelta(minutes=5), + ) + + # Capture session_id to enable Claude Code resume on the next turn. + sid = result.get("session_id") + if sid: + self._session_id = sid + + if span: + span.output = {"final_text": result.get("final_text")} + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> str: + logger.info("Task created: %s", params.task.id) + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n" + "Send me a message and I'll run it through Claude Code locally." + ), + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + return "Task completed" + + @workflow.signal + async def complete_task_signal(self) -> None: + logger.info("Received complete_task signal") + self._complete_task = True diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/pyproject.toml b/examples/tutorials/10_async/10_temporal/140_claude_code/pyproject.toml new file mode 100644 index 000000000..b9d517267 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/pyproject.toml @@ -0,0 +1,27 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "at140-claude-code" +version = "0.1.0" +description = "A Temporal-backed Claude Code agent streaming the unified harness surface via a local CLI subprocess" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "temporalio>=1.18.2", + "python-dotenv>=1.0,<2", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/140_claude_code/tests/test_agent.py new file mode 100644 index 000000000..767c707b9 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/tests/test_agent.py @@ -0,0 +1,249 @@ +"""Tests for the Temporal Claude Code tutorial agent. + +LIVE tests (``TestClaudeCodeLive``): + - Require Temporal server, the ACP server, the Temporal worker, the ``claude`` + CLI on PATH, and ``ANTHROPIC_API_KEY`` set. + - Run the full agent end-to-end against a live Agentex server. + - Skipped automatically when ``CLAUDE_LIVE_TESTS`` is not set to ``1``. + +OFFLINE unit tests (``TestClaudeCodeOffline``): + - Inject a fake async iterator of pre-recorded stream-json lines. + - Assert the ``ClaudeCodeTurn`` + ``UnifiedEmitter`` pipeline drives + ``auto_send_turn``, populates usage, and satisfies the ``HarnessTurn`` + protocol. + - Always run -- no CLI or API key needed. +""" + +from __future__ import annotations + +import os +import json +from typing import AsyncIterator + +import pytest + +from agentex.types.task_message import TaskMessage + +# --------------------------------------------------------------------------- +# Recorded stream-json fixtures +# --------------------------------------------------------------------------- + +_TEXT_ONLY_LINES: list[str] = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-temporal-offline-1"}), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Hello from Temporal Claude Code!"}]}, + } + ), + json.dumps( + { + "type": "result", + "session_id": "sess-temporal-offline-1", + "usage": {"input_tokens": 15, "output_tokens": 7}, + "cost_usd": 0.00015, + "duration_ms": 350, + "num_turns": 1, + } + ), +] + + +async def _fake_lines(lines: list[str]) -> AsyncIterator[str]: + """Async iterator of pre-recorded stream-json lines (no subprocess).""" + for line in lines: + yield line + + +# --------------------------------------------------------------------------- +# Fake streaming backend +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, sink, content_type, initial_content): + self.sink = sink + self.content_type = content_type + self.task_message = TaskMessage(id="msg-t1", task_id="task-temporal-offline", content=initial_content) + + async def __aenter__(self): + self.sink.append(("open", self.content_type)) + return self + + async def __aexit__(self, *a): + await self.close() + return False + + async def close(self): + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update): + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + def __init__(self): + self.sink: list = [] + + def streaming_task_message_context(self, task_id, initial_content, streaming_mode="coalesced", created_at=None): # noqa: ARG002 + ctype = getattr(initial_content, "type", None) + self.sink.append(("ctx", ctype)) + return _FakeCtx(self.sink, ctype, initial_content) + + +# --------------------------------------------------------------------------- +# Offline tests (always run -- no CLI or API key needed) +# --------------------------------------------------------------------------- + + +class TestClaudeCodeOffline: + """Unit tests that run without a real claude CLI, Temporal, or network.""" + + @pytest.mark.asyncio + async def test_auto_send_text_only_produces_output(self): + """auto_send_turn result carries the agent's reply text.""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter( + task_id="offline-temporal", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + assert "Hello from Temporal Claude Code" in result.final_text + + @pytest.mark.asyncio + async def test_usage_populated_after_stream_exhausted(self): + """Usage is populated after the events stream is exhausted.""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter( + task_id="t", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + await emitter.auto_send_turn(turn) + usage = turn.usage() + assert usage.input_tokens == 15 + assert usage.output_tokens == 7 + assert usage.num_llm_calls == 1 + + @pytest.mark.asyncio + async def test_stream_task_message_done_present(self): + """StreamTaskMessageDone must appear via yield_turn on a ClaudeCodeTurn.""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + from agentex.types.task_message_update import StreamTaskMessageDone + + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + events = [e async for e in emitter.yield_turn(turn)] + assert any(isinstance(e, StreamTaskMessageDone) for e in events), ( + "Expected at least one StreamTaskMessageDone event" + ) + + @pytest.mark.asyncio + async def test_session_id_captured_in_result_envelope(self): + """The result envelope carries session_id (multi-turn resume support).""" + from agentex.lib.adk import ClaudeCodeTurn + from agentex.lib.core.harness import UnifiedEmitter + + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_lines(_TEXT_ONLY_LINES)) + emitter = UnifiedEmitter( + task_id="t", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + await emitter.auto_send_turn(turn) + assert turn._result_envelope is not None + assert turn._result_envelope.get("session_id") == "sess-temporal-offline-1" + + +# --------------------------------------------------------------------------- +# Live tests (skipped unless CLAUDE_LIVE_TESTS=1) +# --------------------------------------------------------------------------- + +pytestmark_live = pytest.mark.skipif( + not os.environ.get("CLAUDE_LIVE_TESTS"), + reason="Set CLAUDE_LIVE_TESTS=1 and ensure the `claude` CLI + ANTHROPIC_API_KEY are available", +) + +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "at140-claude-code") + + +@pytestmark_live +class TestClaudeCodeLive: + """Live Temporal tests -- needs Temporal server + the claude CLI + ANTHROPIC_API_KEY.""" + + @pytest.fixture + def client(self): + from agentex import Agentex + + return Agentex(base_url=AGENTEX_API_BASE_URL) + + @pytest.fixture + def agent_name(self): + return AGENT_NAME + + @pytest.fixture + def agent_id(self, client, agent_name): + agents = client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent {agent_name!r} not found.") + + def test_send_simple_message(self, client, agent_id: str): + """Create a task, send a message, and poll until a response appears.""" + import time + import uuid + + from agentex.types import TextContentParam + from agentex.types.agent_rpc_params import ParamsSendEventRequest, ParamsCreateTaskRequest + + task = client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)).result + assert task is not None + task_id = task.id + + client.agents.send_event( + agent_id=agent_id, + params=ParamsSendEventRequest( + task_id=task_id, + content=TextContentParam( + author="user", + content="Reply with exactly three words: hello from claude", + type="text", + ), + ), + ) + + deadline = time.monotonic() + 90 + while time.monotonic() < deadline: + msgs = client.messages.list(task_id=task_id) + agent_msgs = [m for m in msgs if getattr(m.content, "author", None) == "agent"] + response_msgs = [m for m in agent_msgs if "Task initialized" not in str(getattr(m.content, "content", ""))] + if response_msgs: + assert len(response_msgs) >= 1 + return + time.sleep(3) + + raise AssertionError("No agent response received within 90 s") + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/tests/test_agent_offline.py b/examples/tutorials/10_async/10_temporal/140_claude_code/tests/test_agent_offline.py new file mode 100644 index 000000000..1adc553f1 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/tests/test_agent_offline.py @@ -0,0 +1,230 @@ +"""Offline unit tests for the Temporal Claude Code tutorial agent. + +These tests do NOT require the ``claude`` CLI, Temporal, or ANTHROPIC_API_KEY. +They inject a fake async iterator of pre-recorded stream-json lines in place of +the real subprocess spawn and a fake streaming backend, then assert that the +workflow's turn logic correctly drives ``UnifiedEmitter.auto_send_turn``. + +The injection seam is the ``_spawn_claude`` function in ``project/workflow.py``. +Tests monkeypatch it with a coroutine returning a pre-recorded async iterator. +""" + +from __future__ import annotations + +import json +from typing import AsyncIterator + +import pytest + +from agentex.lib.adk import ClaudeCodeTurn +from agentex.lib.core.harness import UnifiedEmitter +from agentex.types.task_message import TaskMessage + +# --------------------------------------------------------------------------- +# Recorded fixtures +# --------------------------------------------------------------------------- + +_TEXT_ONLY_LINES: list[str] = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-temporal-1"}), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Hello from Temporal Claude Code!"}]}, + } + ), + json.dumps( + { + "type": "result", + "session_id": "sess-temporal-1", + "usage": {"input_tokens": 15, "output_tokens": 7}, + "cost_usd": 0.00015, + "duration_ms": 350, + "num_turns": 1, + } + ), +] + +_TOOL_CALL_LINES: list[str] = [ + json.dumps({"type": "system", "subtype": "init", "session_id": "sess-temporal-2"}), + json.dumps( + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "tool_temporal", + "name": "Bash", + "input": {"command": "ls /tmp"}, + } + ] + }, + } + ), + json.dumps( + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "tool_temporal", + "content": "file1\nfile2\n", + "is_error": False, + } + ] + }, + } + ), + json.dumps( + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Listed files."}]}, + } + ), + json.dumps( + { + "type": "result", + "session_id": "sess-temporal-2", + "usage": {"input_tokens": 30, "output_tokens": 12}, + "cost_usd": 0.0004, + "duration_ms": 600, + "num_turns": 1, + } + ), +] + + +# --------------------------------------------------------------------------- +# Fake streaming backend +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, sink, content_type, initial_content): + self.sink = sink + self.content_type = content_type + self.task_message = TaskMessage(id="msg-t1", task_id="task-temporal-offline", content=initial_content) + + async def __aenter__(self): + self.sink.append(("open", self.content_type)) + return self + + async def __aexit__(self, *a): + await self.close() + return False + + async def close(self): + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update): + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + def __init__(self): + self.sink: list = [] + + def streaming_task_message_context(self, task_id, initial_content, streaming_mode="coalesced", created_at=None): # noqa: ARG002 + ctype = getattr(initial_content, "type", None) + self.sink.append(("ctx", ctype)) + return _FakeCtx(self.sink, ctype, initial_content) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _fake_lines(lines: list[str]) -> AsyncIterator[str]: + for line in lines: + yield line + + +async def _run_turn(lines: list[str]): + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_lines(lines)) + emitter = UnifiedEmitter( + task_id="offline-temporal", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + return result, fake_streaming.sink, turn + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_text_only_produces_agent_output(): + result, sink, _ = await _run_turn(_TEXT_ONLY_LINES) + assert "Hello from Temporal Claude Code" in result.final_text + + +@pytest.mark.asyncio +async def test_usage_from_result_envelope(): + """Usage is available from turn.usage() after the events are exhausted. + + UnifiedEmitter.auto_send_turn evaluates turn.usage() eagerly before the + async generator is consumed, so result.usage is a pre-exhaust snapshot. + Read usage directly from the turn after _run_turn completes instead. + """ + result, _, turn = await _run_turn(_TEXT_ONLY_LINES) + usage = turn.usage() + assert usage.input_tokens == 15 + assert usage.output_tokens == 7 + assert usage.num_llm_calls == 1 + + +@pytest.mark.asyncio +async def test_session_id_captured_in_result_envelope(): + """Verify the result envelope carries session_id (multi-turn resume support).""" + _, _, turn = await _run_turn(_TEXT_ONLY_LINES) + assert turn._result_envelope is not None + assert turn._result_envelope.get("session_id") == "sess-temporal-1" + + +@pytest.mark.asyncio +async def test_tool_call_context_types(): + result, sink, _ = await _run_turn(_TOOL_CALL_LINES) + opened = [s for s in sink if s[0] == "open"] + content_types = [s[1] for s in opened] + assert "tool_request" in content_types + assert "text" in content_types + + +@pytest.mark.asyncio +async def test_spawn_seam_concept(): + """Demonstrate the injectable spawn seam pattern used in project/workflow.py. + + ``_spawn_claude(prompt, session_id=None)`` is a top-level async generator. + A drop-in replacement (e.g. via monkeypatch) supplies pre-recorded lines + and captures call arguments. The session_id parameter enables multi-turn + resume (``claude -r ``). + """ + called: list[tuple] = [] + + async def _fake_spawn(prompt: str, session_id=None) -> AsyncIterator[str]: + called.append((prompt, session_id)) + for line in _TEXT_ONLY_LINES: + yield line + + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_fake_spawn("temporal prompt", session_id="old-sid")) + emitter = UnifiedEmitter( + task_id="t", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + + assert called == [("temporal prompt", "old-sid")] + assert "Hello from Temporal Claude Code" in result.final_text diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index a08131260..c2b343b72 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -13,6 +13,11 @@ from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events from agentex.lib.adk._modules._pydantic_ai_tracing import create_pydantic_ai_tracing_handler +from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events +from agentex.lib.adk._modules._claude_code_turn import ( + ClaudeCodeTurn, + claude_code_usage_to_turn_usage, +) from agentex.lib.adk._modules.events import EventsModule from agentex.lib.adk._modules.messages import MessagesModule from agentex.lib.adk._modules.state import StateModule @@ -54,6 +59,10 @@ "stream_pydantic_ai_events", "convert_pydantic_ai_to_agentex_events", "create_pydantic_ai_tracing_handler", + # Claude Code + "convert_claude_code_to_agentex_events", + "ClaudeCodeTurn", + "claude_code_usage_to_turn_usage", # Providers "providers", # Utils diff --git a/src/agentex/lib/adk/_modules/_claude_code_sync.py b/src/agentex/lib/adk/_modules/_claude_code_sync.py new file mode 100644 index 000000000..4e25503cf --- /dev/null +++ b/src/agentex/lib/adk/_modules/_claude_code_sync.py @@ -0,0 +1,378 @@ +"""Claude Code stream-json parser tap for the unified harness surface. + +Converts the newline-delimited JSON envelopes emitted by +``claude -p --output-format stream-json`` into the canonical +``StreamTaskMessage*`` stream consumed by the Agentex harness. + +Envelope → canonical mapping +----------------------------- +system/init + Ignored at this layer (session_id tracking is a provider concern). + +assistant / user (content blocks) + text block → Start(TextContent) + Delta(TextDelta)* + Done + thinking block → Start(ReasoningContent) + Delta(ReasoningContentDelta)* + Done + tool_use block → Start(ToolRequestContent) + Done (Full args in Start content) + tool_result block → Full(ToolResponseContent) + +stream_event / content_block_start + type=text → Start(TextContent, empty) + type=thinking → Start(ReasoningContent, empty) + +stream_event / content_block_delta + type=text_delta → Delta(TextDelta) + type=thinking_delta → Delta(ReasoningContentDelta) + +stream_event / content_block_stop + (text open) → Done + (thinking open) → Done (full text known here; update Full via Full event first) + +result + Fires ``on_result`` with the raw envelope so the caller can capture + usage and cost. No StreamTaskMessage is emitted for the result itself. + +Out of scope +------------ +No deployable test agent is provided. claude-code requires the golden +agent's sandbox/subprocess/secret/MCP orchestration to produce the stream. +Live coverage is the golden agent, which will adopt this tap. Do NOT add an +examples/ agent or CI live-matrix row for claude-code. +""" + +from __future__ import annotations + +import json +from typing import Any, Callable, Awaitable, AsyncIterator + +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta + +logger = make_logger(__name__) + +_MAX_RESULT_LENGTH = 4000 + + +def _truncate(text: str) -> str: + return str(text)[:_MAX_RESULT_LENGTH] + + +def _extract_summary(text: str, max_len: int = 300) -> str: + return text.strip().split("\n", 1)[0][:max_len] + + +async def convert_claude_code_to_agentex_events( + lines: AsyncIterator[str | dict[str, Any]], + on_result: Callable[[dict[str, Any]], Awaitable[None]] | None = None, +) -> AsyncIterator[StreamTaskMessageStart | StreamTaskMessageDelta | StreamTaskMessageFull | StreamTaskMessageDone]: + """Convert a claude-code ``stream-json`` line stream into Agentex ``StreamTaskMessage*`` events. + + Each item in ``lines`` is either a raw JSON string (as read from the CLI's + stdout) or an already-parsed dict. Empty strings are skipped; unparseable + JSON is logged and skipped. + + ``on_result`` is called with the ``result`` envelope when it arrives so the + caller can capture usage and cost. It is awaited before the generator + continues. When ``None``, the result envelope is silently dropped. + + Envelope → canonical mapping is documented in this module's docstring. + """ + next_index = 0 + tool_call_count = 0 + + # Streaming state for content_block_start / content_block_delta / + # content_block_stop triples. + _thinking_open = False + _thinking_buf = "" + _thinking_index: int | None = None + _text_open = False + _text_buf = "" + _text_index: int | None = None + # Track which assistant-message block indices were already streamed via + # stream_event triples. Those blocks must not be re-emitted when the full + # assistant message arrives. Reset at each message boundary (see below) so a + # later turn's block indices don't collide with an earlier turn's. + _streamed_block_indexes: set[int] = set() + # Once-guard so a thinking block's pending index is claimed on its first + # thinking_delta only. Reset per turn alongside _streamed_block_indexes. + _saw_thinking_stream = False + # For deferred ReasoningStarted: if a content_block_start(thinking) arrives + # but no thinking_delta ever follows, the final assistant block's thinking + # field fills the reasoning content instead. + _pending_thinking_block_index: int | None = None + + async for raw in lines: + if not raw: + continue + + if isinstance(raw, dict): + evt = raw + else: + line = raw.strip() + if not line: + continue + try: + evt = json.loads(line) + except json.JSONDecodeError: + logger.debug("claude-code: skipping non-JSON line: %r", line[:120]) + continue + + evt_type = evt.get("type", "") + + # ----------------------------------------------------------------------- + # assistant / user — materialised content blocks + # ----------------------------------------------------------------------- + if evt_type in ("assistant", "user"): + msg = evt.get("message", {}) + blocks = msg.get("content", []) + if not isinstance(blocks, list): + blocks = [blocks] + + for idx, block in enumerate(blocks): + if not isinstance(block, dict): + continue + block_type = block.get("type", "") + + if block_type == "text": + # Skip only the specific blocks already delivered via + # stream_event deltas (per-block, not a turn-wide latch). + if idx in _streamed_block_indexes: + continue + text = block.get("text", "") + if text: + msg_index = next_index + next_index += 1 + yield StreamTaskMessageStart( + type="start", + index=msg_index, + content=TextContent( + type="text", + author="agent", + content="", + ), + ) + yield StreamTaskMessageDelta( + type="delta", + index=msg_index, + delta=TextDelta(type="text", text_delta=text), + ) + yield StreamTaskMessageDone(type="done", index=msg_index) + + elif block_type == "thinking": + # Skip only the specific blocks already delivered via + # stream_event deltas (per-block, not a turn-wide latch). + if idx in _streamed_block_indexes: + continue + thinking_text = block.get("thinking", "") + if thinking_text: + summary = _extract_summary(thinking_text) + msg_index = next_index + next_index += 1 + yield StreamTaskMessageStart( + type="start", + index=msg_index, + content=ReasoningContent( + type="reasoning", + author="agent", + summary=[summary], + content=[], + style="active", + ), + ) + yield StreamTaskMessageDelta( + type="delta", + index=msg_index, + delta=ReasoningContentDelta( + type="reasoning_content", + content_index=0, + content_delta=thinking_text, + ), + ) + yield StreamTaskMessageDone(type="done", index=msg_index) + + elif block_type == "tool_use": + tool_call_count += 1 + tool_id = block.get("id", f"tool_{tool_call_count}") + name = block.get("name", "unknown") + arguments = block.get("input", {}) + if not isinstance(arguments, dict): + arguments = {} + msg_index = next_index + next_index += 1 + yield StreamTaskMessageStart( + type="start", + index=msg_index, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id=tool_id, + name=name, + arguments=arguments, + ), + ) + yield StreamTaskMessageDone(type="done", index=msg_index) + + elif block_type == "tool_result": + tool_id = block.get("tool_use_id", "") + content = block.get("content", "") + is_error = block.get("is_error", False) + if isinstance(content, list): + content = "\n".join(b.get("text", str(b)) if isinstance(b, dict) else str(b) for b in content) + result_str = _truncate(str(content)) + msg_index = next_index + next_index += 1 + yield StreamTaskMessageFull( + type="full", + index=msg_index, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id=tool_id, + name="", + content={"result": result_str, **({"is_error": True} if is_error else {})}, + ), + ) + + # End of a materialised message: reset per-turn streaming dedup state + # so the next turn's stream_event indices start clean. Without this, + # a block index streamed in an earlier turn would linger in the set + # and silently drop a later turn's non-streamed block at that index. + _streamed_block_indexes = set() + _saw_thinking_stream = False + + # ----------------------------------------------------------------------- + # stream_event — incremental streaming deltas + # ----------------------------------------------------------------------- + elif evt_type == "stream_event": + se = evt.get("event") or {} + se_type = se.get("type", "") + block_index = se.get("index") + + if se_type == "content_block_start": + block = se.get("content_block") or {} + btype = block.get("type") + + if btype == "thinking": + _thinking_open = True + _thinking_buf = "" + # Defer marking the block as streamed until we actually + # receive a thinking_delta. Some configurations emit a + # thinking block_start but no deltas — in that case we want + # the final assistant-message handler to fill the text. + _pending_thinking_block_index = block_index if isinstance(block_index, int) else None + msg_index = next_index + next_index += 1 + _thinking_index = msg_index + yield StreamTaskMessageStart( + type="start", + index=msg_index, + content=ReasoningContent( + type="reasoning", + author="agent", + summary=[], + content=[], + style="active", + ), + ) + + elif btype == "text": + _text_open = True + _text_buf = "" + if isinstance(block_index, int): + _streamed_block_indexes.add(block_index) + msg_index = next_index + next_index += 1 + _text_index = msg_index + yield StreamTaskMessageStart( + type="start", + index=msg_index, + content=TextContent( + type="text", + author="agent", + content="", + ), + ) + + elif se_type == "content_block_delta": + delta = se.get("delta") or {} + dtype = delta.get("type") + + if dtype == "thinking_delta": + chunk = delta.get("thinking", "") + if chunk and _thinking_open: + if not _saw_thinking_stream: + _saw_thinking_stream = True + # Now mark the block as claimed so the assistant + # message handler won't re-emit it. + if _pending_thinking_block_index is not None: + _streamed_block_indexes.add(_pending_thinking_block_index) + _thinking_buf += chunk + if _thinking_index is not None: + yield StreamTaskMessageDelta( + type="delta", + index=_thinking_index, + delta=ReasoningContentDelta( + type="reasoning_content", + content_index=0, + content_delta=chunk, + ), + ) + + elif dtype == "text_delta": + chunk = delta.get("text", "") + if chunk and _text_open: + _text_buf += chunk + if _text_index is not None: + yield StreamTaskMessageDelta( + type="delta", + index=_text_index, + delta=TextDelta(type="text", text_delta=chunk), + ) + + elif se_type == "content_block_stop": + if _thinking_open: + _thinking_open = False + _thinking_buf = "" + _pending_thinking_block_index = None + # Reset the once-guard per thinking block: a turn can stream a + # second thinking block, and without this the guard stays True, + # the second block's index is never claimed, and the final + # assistant envelope re-emits it (duplicate Start/Delta/Done). + _saw_thinking_stream = False + if _thinking_index is not None: + yield StreamTaskMessageDone(type="done", index=_thinking_index) + _thinking_index = None + elif _text_open: + _text_open = False + _text_buf = "" + if _text_index is not None: + yield StreamTaskMessageDone(type="done", index=_text_index) + _text_index = None + + # ----------------------------------------------------------------------- + # system / init — session metadata (ignored at this layer) + # ----------------------------------------------------------------------- + elif evt_type == "system": + # Session ID tracking and MCP status logging are provider concerns. + # This pure parser layer intentionally emits nothing for system events. + pass + + # ----------------------------------------------------------------------- + # result — carries usage + cost; fired to on_result, not emitted as msgs + # ----------------------------------------------------------------------- + elif evt_type == "result": + if on_result is not None: + await on_result(evt) + + else: + logger.debug("claude-code: unhandled envelope type %r", evt_type) diff --git a/src/agentex/lib/adk/_modules/_claude_code_turn.py b/src/agentex/lib/adk/_modules/_claude_code_turn.py new file mode 100644 index 000000000..6c052976a --- /dev/null +++ b/src/agentex/lib/adk/_modules/_claude_code_turn.py @@ -0,0 +1,161 @@ +"""ClaudeCodeTurn — HarnessTurn implementation for the claude-code tap. + +Wraps ``convert_claude_code_to_agentex_events`` to implement the +``HarnessTurn`` protocol: exposes ``events`` (the canonical +``StreamTaskMessage*`` stream) and ``usage()`` (the normalised +``TurnUsage``, populated after the stream is exhausted). + +Usage normalization +------------------- +Claude Code's ``result`` envelope carries usage under several key shapes +depending on the CLI version. We defensive-map all known shapes: + + result.usage.input_tokens -> input_tokens + result.usage.output_tokens -> output_tokens + result.usage.cache_read_input_tokens + result.usage.cache_creation_input_tokens -> cached_input_tokens (sum) + result.cost_usd / result.total_cost_usd -> cost_usd + result.duration_ms -> duration_ms + result.num_turns -> num_llm_calls + +Real zeros are preserved; missing keys default to ``None`` (not zero) so +downstream consumers can distinguish "not reported" from "zero". + +Out of scope: no deployable test agent is provided — see module docstring +in ``_claude_code_sync.py``. +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator + +from agentex.lib.core.harness.types import TurnUsage, HarnessTurn, StreamTaskMessage +from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events + + +def claude_code_usage_to_turn_usage(result_envelope: dict[str, Any]) -> TurnUsage: + """Map a claude-code ``result`` envelope to a canonical ``TurnUsage``. + + Defensively handles missing / None values. Real zeros are preserved. + ``cost_usd`` checks both ``cost_usd`` and ``total_cost_usd`` keys (the + CLI has used both across versions). + ``cached_input_tokens`` accumulates cache_read and cache_creation counts + since both represent tokens served from the prompt cache. + """ + usage_raw: dict[str, Any] = result_envelope.get("usage") or {} + + def _int(d: dict[str, Any], key: str) -> int | None: + v = d.get(key) + if v is None: + return None + try: + return int(v) + except (TypeError, ValueError): + return None + + def _float(d: dict[str, Any], *keys: str) -> float | None: + for key in keys: + v = d.get(key) + if v is not None: + try: + return float(v) + except (TypeError, ValueError): + continue + return None + + input_tokens = _int(usage_raw, "input_tokens") + output_tokens = _int(usage_raw, "output_tokens") + + # Aggregate both cache_read and cache_creation into cached_input_tokens + cache_read = _int(usage_raw, "cache_read_input_tokens") + cache_creation = _int(usage_raw, "cache_creation_input_tokens") + if cache_read is not None or cache_creation is not None: + cached_input_tokens = (cache_read or 0) + (cache_creation or 0) + else: + cached_input_tokens = None + + total_tokens: int | None = None + if input_tokens is not None and output_tokens is not None: + total_tokens = input_tokens + output_tokens + + cost_usd = _float(result_envelope, "cost_usd", "total_cost_usd") + duration_ms = _int(result_envelope, "duration_ms") + + # num_llm_calls is provider-reported (from num_turns): default None ("not + # reported") rather than 0 so callers can distinguish it from a real zero, + # matching the None convention used for the token fields above. + num_turns = result_envelope.get("num_turns") + num_llm_calls: int | None = None + if num_turns is not None: + try: + num_llm_calls = int(num_turns) + except (TypeError, ValueError): + pass + + return TurnUsage( + input_tokens=input_tokens, + output_tokens=output_tokens, + cached_input_tokens=cached_input_tokens, + total_tokens=total_tokens, + cost_usd=cost_usd, + duration_ms=duration_ms, + num_llm_calls=num_llm_calls, + ) + + +class ClaudeCodeTurn: + """HarnessTurn for a claude-code ``stream-json`` line stream. + + Satisfies the ``HarnessTurn`` protocol: + - ``events`` yields the canonical ``StreamTaskMessage*`` stream. + - ``usage()`` returns the normalised ``TurnUsage`` (only valid after + ``events`` is fully consumed). + + ``lines`` is an async iterator of raw JSON strings or pre-parsed dicts, as + produced by reading the claude-code CLI's stdout line by line. + """ + + def __init__(self, lines: AsyncIterator[str | dict[str, Any]]) -> None: + self._lines = lines + self._result_envelope: dict[str, Any] | None = None + self._events_stream: AsyncIterator[StreamTaskMessage] | None = None + + async def _on_result(self, envelope: dict[str, Any]) -> None: + self._result_envelope = envelope + + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: + if self._events_stream is None: + self._events_stream = convert_claude_code_to_agentex_events( + self._lines, + on_result=self._on_result, + ) + return self._events_stream + + @property + def session_id(self) -> str | None: + """The Claude Code session id, for resuming a multi-turn session. + + Valid only after ``events`` has been fully consumed (populated by the + ``result`` envelope). Returns ``None`` if the stream was truncated or + Claude Code reported no session id. + """ + if not self._result_envelope: + return None + return self._result_envelope.get("session_id") + + def usage(self) -> TurnUsage: + """Return normalised usage for this turn. + + Call only after ``events`` is exhausted. Returns an empty ``TurnUsage`` + if the ``result`` envelope was not received (e.g. stream was truncated). + """ + if self._result_envelope is None: + return TurnUsage() + return claude_code_usage_to_turn_usage(self._result_envelope) + + +# Runtime assert that ClaudeCodeTurn satisfies HarnessTurn protocol +assert isinstance(ClaudeCodeTurn.__new__(ClaudeCodeTurn), HarnessTurn), ( + "ClaudeCodeTurn must satisfy the HarnessTurn protocol" +) diff --git a/src/agentex/lib/core/harness/types.py b/src/agentex/lib/core/harness/types.py index b37dc1e51..74e0dc314 100644 --- a/src/agentex/lib/core/harness/types.py +++ b/src/agentex/lib/core/harness/types.py @@ -64,7 +64,10 @@ class TurnUsage(BaseModel): total_tokens: int | None = None cost_usd: float | None = None duration_ms: int | None = None - num_llm_calls: int = 0 + # num_llm_calls is provider-reported and may be absent (None = "not + # reported"). num_tool_calls / num_reasoning_blocks are counted locally from + # the observed stream, so 0 is always a real count. + num_llm_calls: int | None = None num_tool_calls: int = 0 num_reasoning_blocks: int = 0 diff --git a/tests/lib/adk/test_claude_code_sync.py b/tests/lib/adk/test_claude_code_sync.py new file mode 100644 index 000000000..6dd36d973 --- /dev/null +++ b/tests/lib/adk/test_claude_code_sync.py @@ -0,0 +1,637 @@ +"""Tests for the claude-code stream-json -> Agentex StreamTaskMessage* converter.""" + +from __future__ import annotations + +from typing import Any, AsyncIterator + +import pytest + +from agentex.types.text_content import TextContent +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events + + +async def _aiter(events: list[Any]) -> AsyncIterator[Any]: + for e in events: + yield e + + +async def _collect(stream: AsyncIterator[Any]) -> list[Any]: + return [e async for e in stream] + + +# --------------------------------------------------------------------------- +# Text content +# --------------------------------------------------------------------------- + + +class TestTextContent: + async def test_text_block_in_assistant_message_emits_start_delta_done(self): + envelopes = [ + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Hello world"}]}, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + + assert len(out) == 3 + assert isinstance(out[0], StreamTaskMessageStart) + assert isinstance(out[0].content, TextContent) + assert out[0].content.content == "" + assert isinstance(out[1], StreamTaskMessageDelta) + assert isinstance(out[1].delta, TextDelta) + assert out[1].delta.text_delta == "Hello world" + assert isinstance(out[2], StreamTaskMessageDone) + assert out[0].index == out[1].index == out[2].index + + async def test_empty_text_block_is_skipped(self): + envelopes = [ + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": ""}]}, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + assert out == [] + + async def test_streamed_text_via_stream_event_emits_start_deltas_done(self): + envelopes = [ + { + "type": "stream_event", + "event": {"type": "content_block_start", "index": 0, "content_block": {"type": "text"}}, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "text_delta", "text": "Hello"}, + }, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "text_delta", "text": " world"}, + }, + }, + { + "type": "stream_event", + "event": {"type": "content_block_stop", "index": 0}, + }, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + + assert len(starts) == 1 + assert isinstance(starts[0].content, TextContent) + assert len(deltas) == 2 + assert isinstance(deltas[0].delta, TextDelta) + assert deltas[0].delta.text_delta == "Hello" + assert isinstance(deltas[1].delta, TextDelta) + assert deltas[1].delta.text_delta == " world" + assert len(dones) == 1 + + async def test_streamed_text_not_re_emitted_by_assistant_block(self): + """After stream_event triple, the final assistant block must not re-emit the text.""" + envelopes = [ + { + "type": "stream_event", + "event": { + "type": "content_block_start", + "index": 0, + "content_block": {"type": "text"}, + }, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "text_delta", "text": "streamed"}, + }, + }, + { + "type": "stream_event", + "event": {"type": "content_block_stop", "index": 0}, + }, + # Final assistant message with same text — must NOT be re-emitted + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "streamed"}]}, + }, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + text_starts = [e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, TextContent)] + assert len(text_starts) == 1, "Text block must not be emitted twice" + + async def test_later_turn_non_streamed_text_not_dropped(self): + """A non-streamed text block in a later turn must not be dropped because an + earlier turn streamed a block at the same index.""" + envelopes = [ + # Turn 1: streamed text at index 0 (dedup'd against the materialised msg). + { + "type": "stream_event", + "event": {"type": "content_block_start", "index": 0, "content_block": {"type": "text"}}, + }, + { + "type": "stream_event", + "event": {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "first"}}, + }, + {"type": "stream_event", "event": {"type": "content_block_stop", "index": 0}}, + {"type": "assistant", "message": {"content": [{"type": "text", "text": "first"}]}}, + # Turn 2: a NON-streamed text block, also at index 0. + {"type": "assistant", "message": {"content": [{"type": "text", "text": "second"}]}}, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + deltas = [ + e.delta.text_delta for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, TextDelta) + ] + assert deltas == ["first", "second"], "Later turn's non-streamed text must still be delivered" + + +# --------------------------------------------------------------------------- +# Thinking / reasoning content +# --------------------------------------------------------------------------- + + +class TestThinkingContent: + async def test_thinking_block_emits_reasoning_start_delta_done(self): + envelopes = [ + { + "type": "assistant", + "message": {"content": [{"type": "thinking", "thinking": "Let me reason..."}]}, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + + assert len(out) == 3 + assert isinstance(out[0], StreamTaskMessageStart) + assert isinstance(out[0].content, ReasoningContent) + # Summary must be populated from the thinking text + assert out[0].content.summary == ["Let me reason..."] + assert isinstance(out[1], StreamTaskMessageDelta) + assert isinstance(out[1].delta, ReasoningContentDelta) + assert out[1].delta.content_delta == "Let me reason..." + assert out[1].delta.content_index == 0 + assert isinstance(out[2], StreamTaskMessageDone) + + async def test_empty_thinking_block_is_skipped(self): + envelopes = [ + { + "type": "assistant", + "message": {"content": [{"type": "thinking", "thinking": ""}]}, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + assert out == [] + + async def test_streamed_thinking_emits_reasoning_start_deltas_done(self): + envelopes = [ + { + "type": "stream_event", + "event": { + "type": "content_block_start", + "index": 0, + "content_block": {"type": "thinking"}, + }, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "thinking_delta", "thinking": "step one"}, + }, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "thinking_delta", "thinking": " step two"}, + }, + }, + { + "type": "stream_event", + "event": {"type": "content_block_stop", "index": 0}, + }, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + + assert len(starts) == 1 + assert isinstance(starts[0].content, ReasoningContent) + assert len(deltas) == 2 + assert isinstance(deltas[0].delta, ReasoningContentDelta) + assert deltas[0].delta.content_delta == "step one" + assert isinstance(deltas[1].delta, ReasoningContentDelta) + assert deltas[1].delta.content_delta == " step two" + assert len(dones) == 1 + + async def test_two_streamed_thinking_blocks_not_re_emitted(self): + """A turn that streams two thinking blocks must claim both indices, so the + final assistant envelope does not re-emit the second one.""" + + def _thinking_block(idx: int, text: str) -> list: + return [ + { + "type": "stream_event", + "event": {"type": "content_block_start", "index": idx, "content_block": {"type": "thinking"}}, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": idx, + "delta": {"type": "thinking_delta", "thinking": text}, + }, + }, + {"type": "stream_event", "event": {"type": "content_block_stop", "index": idx}}, + ] + + envelopes = [ + *_thinking_block(0, "first thought"), + *_thinking_block(1, "second thought"), + # Final assistant envelope repeats both thinking blocks — neither should re-emit. + { + "type": "assistant", + "message": { + "content": [ + {"type": "thinking", "thinking": "first thought"}, + {"type": "thinking", "thinking": "second thought"}, + ] + }, + }, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + reasoning_starts = [ + e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, ReasoningContent) + ] + assert len(reasoning_starts) == 2, "each streamed thinking block emitted exactly once (no duplicate)" + + async def test_thinking_block_start_with_no_deltas_allows_assistant_to_fill(self): + """A thinking block_start without any deltas leaves the final assistant block + free to emit the thinking text (the block index is not claimed as streamed).""" + envelopes = [ + { + "type": "stream_event", + "event": { + "type": "content_block_start", + "index": 0, + "content_block": {"type": "thinking"}, + }, + }, + # No thinking_delta — close block immediately + { + "type": "stream_event", + "event": {"type": "content_block_stop", "index": 0}, + }, + # Final assistant message has the thinking text + { + "type": "assistant", + "message": {"content": [{"type": "thinking", "thinking": "delayed thinking"}]}, + }, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + # The assistant block should produce a full thinking message (Start+Delta+Done) + reasoning_starts = [ + e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, ReasoningContent) + ] + # There will be the empty start from stream_event, plus the one from assistant block + reasoning_deltas = [ + e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningContentDelta) + ] + assert len(reasoning_deltas) >= 1 + assert any( + isinstance(d.delta, ReasoningContentDelta) and d.delta.content_delta == "delayed thinking" + for d in reasoning_deltas + ) + + +# --------------------------------------------------------------------------- +# Tool calls and results +# --------------------------------------------------------------------------- + + +class TestToolCallsAndResults: + async def test_tool_use_block_emits_start_done(self): + envelopes = [ + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "call_abc", + "name": "Bash", + "input": {"command": "ls /"}, + } + ] + }, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + + assert len(out) == 2 + assert isinstance(out[0], StreamTaskMessageStart) + assert isinstance(out[0].content, ToolRequestContent) + assert out[0].content.tool_call_id == "call_abc" + assert out[0].content.name == "Bash" + assert out[0].content.arguments == {"command": "ls /"} + assert isinstance(out[1], StreamTaskMessageDone) + + async def test_tool_result_block_emits_full(self): + envelopes = [ + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "call_abc", + "content": "file1.txt\nfile2.txt", + } + ] + }, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + + assert len(out) == 1 + assert isinstance(out[0], StreamTaskMessageFull) + assert isinstance(out[0].content, ToolResponseContent) + assert out[0].content.tool_call_id == "call_abc" + assert "file1.txt" in str(out[0].content.content) + + async def test_tool_result_list_content_joined(self): + envelopes = [ + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "tid", + "content": [ + {"type": "text", "text": "line1"}, + {"type": "text", "text": "line2"}, + ], + } + ] + }, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + assert isinstance(out[0], StreamTaskMessageFull) + assert isinstance(out[0].content, ToolResponseContent) + payload = str(out[0].content.content) + assert "line1" in payload + assert "line2" in payload + + async def test_tool_result_error_flag_passed_through(self): + envelopes = [ + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "err_call", + "content": "Permission denied", + "is_error": True, + } + ] + }, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + assert isinstance(out[0], StreamTaskMessageFull) + assert isinstance(out[0].content, ToolResponseContent) + assert isinstance(out[0].content.content, dict) + assert out[0].content.content.get("is_error") is True + + async def test_tool_result_truncation(self): + long_result = "x" * 5000 + envelopes = [ + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "t", + "content": long_result, + } + ] + }, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + result_str = out[0].content.content.get("result", "") + assert len(result_str) <= 4000 + + +# --------------------------------------------------------------------------- +# on_result callback +# --------------------------------------------------------------------------- + + +class TestOnResult: + async def test_on_result_called_with_result_envelope(self): + captured: list[dict] = [] + + async def capture(envelope): + captured.append(envelope) + + envelopes = [ + { + "type": "result", + "session_id": "sess123", + "cost_usd": 0.012, + "usage": {"input_tokens": 100, "output_tokens": 50}, + } + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes), on_result=capture)) + + # result envelope does not emit any StreamTaskMessage + assert out == [] + assert len(captured) == 1 + assert captured[0]["session_id"] == "sess123" + assert captured[0]["cost_usd"] == pytest.approx(0.012) + + async def test_on_result_not_called_when_no_result_envelope(self): + captured: list[dict] = [] + + async def capture(envelope): + captured.append(envelope) + + envelopes = [ + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Hi"}]}, + } + ] + await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes), on_result=capture)) + assert captured == [] + + async def test_no_on_result_does_not_raise(self): + envelopes = [ + { + "type": "result", + "cost_usd": 0.001, + "usage": {"input_tokens": 10, "output_tokens": 5}, + } + ] + # Should not raise even without a callback + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + assert out == [] + + +# --------------------------------------------------------------------------- +# Message indexing +# --------------------------------------------------------------------------- + + +class TestMessageIndexing: + async def test_multiple_blocks_get_distinct_indices(self): + envelopes = [ + { + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "First"}, + { + "type": "tool_use", + "id": "c1", + "name": "Read", + "input": {"path": "/tmp"}, + }, + ] + }, + }, + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "c1", + "content": "some content", + } + ] + }, + }, + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Done"}]}, + }, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + + # Gather all Start/Full events and check indices are monotonically increasing + anchors = [e for e in out if isinstance(e, (StreamTaskMessageStart, StreamTaskMessageFull))] + indices = [e.index for e in anchors] + assert indices == sorted(indices), "Indices must be monotonically increasing" + assert len(set(indices)) == len(indices), "All indices must be distinct" + + async def test_system_init_and_unknown_envelopes_produce_no_output(self): + envelopes = [ + {"type": "system", "subtype": "init", "session_id": "sess"}, + {"type": "unknown_future_type", "data": "whatever"}, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + assert out == [] + + async def test_non_json_string_lines_are_skipped(self): + lines = [ + "not json at all", + '{"type": "assistant", "message": {"content": [{"type": "text", "text": "hi"}]}}', + ] + + async def _str_iter(): + for line in lines: + yield line + + out = await _collect(convert_claude_code_to_agentex_events(_str_iter())) + assert len(out) == 3 # Start + Delta + Done for the text block + + async def test_empty_lines_are_skipped(self): + lines = ["", " ", '{"type": "system", "subtype": "init"}'] + + async def _str_iter(): + for line in lines: + yield line + + out = await _collect(convert_claude_code_to_agentex_events(_str_iter())) + assert out == [] + + +# --------------------------------------------------------------------------- +# Author +# --------------------------------------------------------------------------- + + +class TestContentAuthors: + @pytest.mark.parametrize( + "envelope", + [ + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "hi"}]}, + }, + { + "type": "assistant", + "message": {"content": [{"type": "thinking", "thinking": "thoughts"}]}, + }, + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "c", + "name": "t", + "input": {}, + } + ] + }, + }, + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "c", + "content": "ok", + } + ] + }, + }, + ], + ) + async def test_all_content_authored_by_agent(self, envelope: dict): + out = await _collect(convert_claude_code_to_agentex_events(_aiter([envelope]))) + for e in out: + content = getattr(e, "content", None) + if content is not None and hasattr(content, "author"): + assert content.author == "agent" diff --git a/tests/lib/adk/test_claude_code_turn.py b/tests/lib/adk/test_claude_code_turn.py new file mode 100644 index 000000000..4fbb2f913 --- /dev/null +++ b/tests/lib/adk/test_claude_code_turn.py @@ -0,0 +1,283 @@ +"""Tests for ClaudeCodeTurn and claude_code_usage_to_turn_usage.""" + +from __future__ import annotations + +from typing import Any, AsyncIterator + +import pytest + +from agentex.lib.core.harness.types import TurnUsage, HarnessTurn +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._claude_code_turn import ( + ClaudeCodeTurn, + claude_code_usage_to_turn_usage, +) + + +async def _aiter(events: list[Any]) -> AsyncIterator[Any]: + for e in events: + yield e + + +async def _drain(turn: ClaudeCodeTurn) -> list[Any]: + return [e async for e in turn.events] + + +# --------------------------------------------------------------------------- +# Usage normalization +# --------------------------------------------------------------------------- + + +class TestClaudeCodeUsageToTurnUsage: + def test_full_usage_fields(self): + result = { + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "cache_read_input_tokens": 20, + "cache_creation_input_tokens": 5, + }, + "cost_usd": 0.025, + "duration_ms": 3200, + "num_turns": 3, + } + usage = claude_code_usage_to_turn_usage(result) + + assert usage.input_tokens == 100 + assert usage.output_tokens == 50 + assert usage.cached_input_tokens == 25 # 20 + 5 + assert usage.total_tokens == 150 + assert usage.cost_usd == pytest.approx(0.025) + assert usage.duration_ms == 3200 + assert usage.num_llm_calls == 3 + + def test_total_cost_usd_fallback(self): + """total_cost_usd should be used when cost_usd is absent.""" + result = { + "usage": {"input_tokens": 10, "output_tokens": 5}, + "total_cost_usd": 0.001, + } + usage = claude_code_usage_to_turn_usage(result) + assert usage.cost_usd == pytest.approx(0.001) + + def test_cost_usd_takes_precedence_over_total_cost_usd(self): + result = { + "usage": {"input_tokens": 10, "output_tokens": 5}, + "cost_usd": 0.002, + "total_cost_usd": 0.999, + } + usage = claude_code_usage_to_turn_usage(result) + assert usage.cost_usd == pytest.approx(0.002) + + def test_missing_usage_key_returns_nones(self): + result: dict[str, Any] = {} + usage = claude_code_usage_to_turn_usage(result) + assert usage.input_tokens is None + assert usage.output_tokens is None + assert usage.cached_input_tokens is None + assert usage.total_tokens is None + assert usage.cost_usd is None + assert usage.duration_ms is None + assert usage.num_llm_calls is None + + def test_real_zeros_preserved(self): + result = { + "usage": { + "input_tokens": 0, + "output_tokens": 0, + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + }, + "cost_usd": 0.0, + "duration_ms": 0, + "num_turns": 0, + } + usage = claude_code_usage_to_turn_usage(result) + assert usage.input_tokens == 0 + assert usage.output_tokens == 0 + assert usage.cached_input_tokens == 0 + assert usage.total_tokens == 0 + assert usage.cost_usd == pytest.approx(0.0) + assert usage.duration_ms == 0 + assert usage.num_llm_calls == 0 + + def test_only_cache_read_no_creation(self): + result = { + "usage": { + "input_tokens": 50, + "output_tokens": 25, + "cache_read_input_tokens": 15, + } + } + usage = claude_code_usage_to_turn_usage(result) + assert usage.cached_input_tokens == 15 + + def test_only_cache_creation_no_read(self): + result = { + "usage": { + "input_tokens": 50, + "output_tokens": 25, + "cache_creation_input_tokens": 10, + } + } + usage = claude_code_usage_to_turn_usage(result) + assert usage.cached_input_tokens == 10 + + def test_no_cache_fields_gives_none(self): + result = {"usage": {"input_tokens": 10, "output_tokens": 5}} + usage = claude_code_usage_to_turn_usage(result) + assert usage.cached_input_tokens is None + + def test_total_tokens_computed_from_input_output(self): + result = {"usage": {"input_tokens": 70, "output_tokens": 30}} + usage = claude_code_usage_to_turn_usage(result) + assert usage.total_tokens == 100 + + def test_missing_output_tokens_leaves_total_none(self): + result = {"usage": {"input_tokens": 70}} + usage = claude_code_usage_to_turn_usage(result) + assert usage.total_tokens is None + + def test_returns_turn_usage_instance(self): + result = {"usage": {"input_tokens": 1, "output_tokens": 1}} + usage = claude_code_usage_to_turn_usage(result) + assert isinstance(usage, TurnUsage) + + +# --------------------------------------------------------------------------- +# ClaudeCodeTurn protocol +# --------------------------------------------------------------------------- + + +class TestClaudeCodeTurnProtocol: + def test_satisfies_harness_turn_protocol(self): + """ClaudeCodeTurn must satisfy the HarnessTurn structural protocol.""" + turn = ClaudeCodeTurn(_aiter([])) + assert isinstance(turn, HarnessTurn) + + async def test_events_yields_stream_task_messages(self): + envelopes = [ + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "Hi there"}]}, + } + ] + turn = ClaudeCodeTurn(_aiter(envelopes)) + out = await _drain(turn) + assert len(out) == 3 + assert isinstance(out[0], StreamTaskMessageStart) + assert isinstance(out[1], StreamTaskMessageDelta) + assert isinstance(out[2], StreamTaskMessageDone) + + async def test_usage_before_drain_returns_empty(self): + envelopes = [ + { + "type": "result", + "usage": {"input_tokens": 100, "output_tokens": 50}, + "cost_usd": 0.01, + } + ] + turn = ClaudeCodeTurn(_aiter(envelopes)) + # usage() called before events drained — no result envelope yet + usage = turn.usage() + assert isinstance(usage, TurnUsage) + assert usage.input_tokens is None + + async def test_usage_after_drain_reflects_result(self): + envelopes = [ + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "response"}]}, + }, + { + "type": "result", + "usage": {"input_tokens": 200, "output_tokens": 80}, + "cost_usd": 0.015, + "num_turns": 2, + }, + ] + turn = ClaudeCodeTurn(_aiter(envelopes)) + await _drain(turn) + usage = turn.usage() + + assert usage.input_tokens == 200 + assert usage.output_tokens == 80 + assert usage.cost_usd == pytest.approx(0.015) + assert usage.num_llm_calls == 2 + + async def test_usage_empty_when_no_result_envelope(self): + envelopes = [ + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "no result"}]}, + } + ] + turn = ClaudeCodeTurn(_aiter(envelopes)) + await _drain(turn) + usage = turn.usage() + assert usage.input_tokens is None + assert usage.cost_usd is None + + async def test_tool_call_and_result_round_trip(self): + envelopes = [ + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "call_1", + "name": "Read", + "input": {"path": "/etc/hosts"}, + } + ] + }, + }, + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "call_1", + "content": "127.0.0.1 localhost", + } + ] + }, + }, + { + "type": "result", + "usage": {"input_tokens": 50, "output_tokens": 20}, + "cost_usd": 0.005, + }, + ] + turn = ClaudeCodeTurn(_aiter(envelopes)) + out = await _drain(turn) + usage = turn.usage() + + tool_starts = [ + e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, ToolResponseContent) + ] + tool_fulls = [ + e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ToolResponseContent) + ] + assert len(tool_fulls) == 1 + full_content = tool_fulls[0].content + assert isinstance(full_content, ToolResponseContent) + assert full_content.tool_call_id == "call_1" + + assert usage.input_tokens == 50 + assert usage.output_tokens == 20 + + async def test_events_property_returns_same_iterator(self): + """Accessing .events multiple times returns the same iterator (not a new one each call).""" + turn = ClaudeCodeTurn(_aiter([])) + it1 = turn.events + it2 = turn.events + assert it1 is it2 diff --git a/tests/lib/core/harness/conformance/test_claude_code_conformance.py b/tests/lib/core/harness/conformance/test_claude_code_conformance.py new file mode 100644 index 000000000..88643a4cd --- /dev/null +++ b/tests/lib/core/harness/conformance/test_claude_code_conformance.py @@ -0,0 +1,202 @@ +"""Cross-channel conformance tests for the claude-code parser tap. + +Each fixture is a representative sequence of claude-code stream-json +envelopes, converted into canonical ``StreamTaskMessage*`` events via +``ClaudeCodeTurn``, then registered into the shared conformance runner. + +The conformance runner asserts two guarantees per fixture: + +1. **Logical-delivery equivalence**: ``yield_events`` and ``auto_send`` + produce the same logically-delivered message contents. + +2. **Span signal equivalence**: both channels emit the same ``SpanSignal`` + sequence to their ``SpanTracer``. + +Fixtures +-------- +text-only: single ``assistant`` text block +tool-call-result: ``tool_use`` block followed by ``tool_result`` +thinking-block: ``thinking`` block with full text +multi-step: text + tool_use + tool_result + text (two model turns) + +Note +---- +Relative imports are used throughout (runner.py and these fixtures live in the +same package). The per-module ``_FIXTURES`` list is both registered globally +(via ``register()``) and parametrized locally so this module's tests are +self-contained regardless of global registry ordering (see runner.py docstring). +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events + +from .runner import ( + Fixture, + register, + run_cross_channel_conformance, +) + +# --------------------------------------------------------------------------- +# Convert claude-code envelopes to StreamTaskMessage* events +# --------------------------------------------------------------------------- + + +async def _envelopes_to_events(envelopes: list[dict]) -> list: + """Drive convert_claude_code_to_agentex_events and collect all events.""" + + async def _aiter(items): # type: ignore[return] + for item in items: + yield item + + return [e async for e in convert_claude_code_to_agentex_events(_aiter(envelopes))] + + +# --------------------------------------------------------------------------- +# Fixture definitions (raw claude-code envelope sequences) +# --------------------------------------------------------------------------- + +_TEXT_ENVELOPES = [ + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "The answer is 42."}]}, + } +] + +_TOOL_ENVELOPES = [ + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "call_read", + "name": "Read", + "input": {"path": "/workspace/README.md"}, + } + ] + }, + }, + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "call_read", + "content": "# My Project\n\nA great project.", + } + ] + }, + }, +] + +_THINKING_ENVELOPES = [ + { + "type": "assistant", + "message": { + "content": [ + {"type": "thinking", "thinking": "Let me think about this carefully.\nStep 1: check the facts."}, + {"type": "text", "text": "Here is my answer."}, + ] + }, + } +] + +_MULTI_STEP_ENVELOPES = [ + # Turn 1: text + tool call + { + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "Let me look that up."}, + { + "type": "tool_use", + "id": "call_bash", + "name": "Bash", + "input": {"command": "cat /etc/hostname"}, + }, + ] + }, + }, + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "call_bash", + "content": "myhost", + } + ] + }, + }, + # Turn 2: final text after tool result + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "The hostname is myhost."}]}, + }, +] + + +# --------------------------------------------------------------------------- +# Build fixtures from envelopes at module load time +# --------------------------------------------------------------------------- + + +async def _build_fixture(name: str, envelopes: list[dict]) -> Fixture: + events = await _envelopes_to_events(envelopes) + return Fixture(name=name, events=events) + + +# Fixtures must exist before pytest collects (they parametrize the test below), +# so they are built at import time. The conversion only iterates in-memory +# envelopes — it never suspends on a real future — so we drive the coroutine to +# completion by hand instead of asyncio.run(). asyncio.run() at import raises +# RuntimeError when an event loop is already running (programmatic pytest, a +# Jupyter kernel, or session-scoped asyncio loops); the loop-free driver below +# is unaffected by the ambient loop state. +def _run_pure_async(coro: Any) -> Any: + try: + coro.send(None) + except StopIteration as stop: + return stop.value + coro.close() + raise RuntimeError("conformance fixture build unexpectedly suspended on real I/O") + + +_FIXTURES: list[Fixture] = [ + _run_pure_async(_build_fixture("claude-code-text-only", _TEXT_ENVELOPES)), + _run_pure_async(_build_fixture("claude-code-tool-call-result", _TOOL_ENVELOPES)), + _run_pure_async(_build_fixture("claude-code-thinking-block", _THINKING_ENVELOPES)), + _run_pure_async(_build_fixture("claude-code-multi-step", _MULTI_STEP_ENVELOPES)), +] + +# Register into the shared registry so all_fixtures() can enumerate them +for _f in _FIXTURES: + register(_f) + + +# --------------------------------------------------------------------------- +# Cross-channel conformance assertions +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("fixture", _FIXTURES, ids=lambda f: f.name) +@pytest.mark.asyncio +async def test_cross_channel_equivalence(fixture: Fixture) -> None: + """yield_events and auto_send must produce equivalent logical deliveries + and identical span signals for every claude-code fixture. + """ + yield_deliveries, auto_deliveries, yield_spans, auto_spans = await run_cross_channel_conformance(fixture) + + assert yield_deliveries == auto_deliveries, ( + f"[{fixture.name}] logical deliveries differ:\n yield: {yield_deliveries}\n auto_send: {auto_deliveries}" + ) + assert yield_spans == auto_spans, ( + f"[{fixture.name}] span signals differ:\n yield: {yield_spans}\n auto_send: {auto_spans}" + )