Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/tutorials/00_sync/060_claude_code/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Environments
.env**
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# IDE
.idea/
.vscode/
*.swp
*.swo

# Git
.git
.gitignore

# Misc
.DS_Store
46 changes: 46 additions & 0 deletions examples/tutorials/00_sync/060_claude_code/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# syntax=docker/dockerfile:1.3
FROM python:3.12-slim
COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/

# Install system dependencies including Node.js (required by the claude CLI)
RUN apt-get update && apt-get install -y \
htop \
vim \
curl \
tar \
python3-dev \
postgresql-client \
build-essential \
libpq-dev \
gcc \
cmake \
netcat-openbsd \
nodejs \
npm \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

RUN uv pip install --system --upgrade pip setuptools wheel

# Install the claude CLI (requires Node.js)
# NOTE: live runs require ANTHROPIC_API_KEY in the environment.
RUN npm install -g @anthropic-ai/claude-code || true

ENV UV_HTTP_TIMEOUT=1000

COPY 00_sync/060_claude_code/pyproject.toml /app/060_claude_code/pyproject.toml
COPY 00_sync/060_claude_code/README.md /app/060_claude_code/README.md

WORKDIR /app/060_claude_code

COPY 00_sync/060_claude_code/project /app/060_claude_code/project
COPY 00_sync/060_claude_code/tests /app/060_claude_code/tests
COPY test_utils /app/test_utils

RUN uv pip install --system .[dev]

ENV PYTHONPATH=/app

ENV AGENT_NAME=s060-claude-code

CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"]
76 changes: 76 additions & 0 deletions examples/tutorials/00_sync/060_claude_code/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Tutorial 060: Sync Claude Code Agent

This tutorial demonstrates how to build a **synchronous** agent that spawns the
Claude Code CLI as a local subprocess and streams its output through the Agentex
unified harness surface via ``ClaudeCodeTurn`` and ``UnifiedEmitter``.

## Key Concepts

### ClaudeCodeTurn + UnifiedEmitter

``ClaudeCodeTurn`` wraps ``convert_claude_code_to_agentex_events``, which
parses the newline-delimited JSON envelopes emitted by
``claude -p --output-format stream-json``. It implements the ``HarnessTurn``
protocol: an ``events`` async iterator of canonical ``StreamTaskMessage*``
objects and a ``usage()`` method (populated once the stream is exhausted).

``UnifiedEmitter.yield_turn(turn)`` is the sync delivery path: it forwards
events as HTTP yield chunks while tracing as a side effect.

### Local subprocess spawn

The ``_spawn_claude`` function in ``project/acp.py`` uses
``asyncio.create_subprocess_exec`` to run:

```
claude -p --output-format stream-json --verbose
```

The prompt is written to stdin. Stdout is read line by line and fed into
``ClaudeCodeTurn``. This is purely local -- no Scale sandbox is involved.

Production isolation (Scale sandbox, secret injection, MCP configuration)
is the golden agent's concern at
``teams/sgp/agents/golden_agent/project/harness/providers/claude.py``.

### Injectable spawn seam

``_spawn_claude`` is a top-level async generator in ``project/acp.py``.
Tests monkeypatch it to inject pre-recorded stream-json lines instead of
spawning the real process, so offline unit tests run without the CLI.

## Files

| File | Description |
|------|-------------|
| ``project/acp.py`` | ACP server, ``_spawn_claude`` seam, and message handler |
| ``tests/test_agent.py`` | Live integration tests (needs CLI + API key) |
| ``tests/test_agent_offline.py`` | Offline unit tests with injected fake subprocess |
| ``manifest.yaml`` | Agent configuration |

## Running Locally (live)

Requires the ``claude`` CLI installed and ``ANTHROPIC_API_KEY`` set:

```bash
npm install -g @anthropic-ai/claude-code
export ANTHROPIC_API_KEY=sk-ant-...
agentex agents run
```

## Running Offline Tests

No CLI or API key needed:

```bash
uv run pytest tests/test_agent_offline.py -v
```

## Notes

- Production isolation (sandbox, secrets, MCP) is the golden agent's concern.
This tutorial runs the CLI directly to keep the code as simple as possible.
- Multi-turn session resumption (``claude -r <session_id>``) is out of scope
for this tutorial. See the golden agent for that pattern.
- The ``--verbose`` flag is included to match the golden agent's invocation;
it causes the CLI to emit ``stream_event`` triples for incremental streaming.
55 changes: 55 additions & 0 deletions examples/tutorials/00_sync/060_claude_code/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
build:
context:
root: ../../
include_paths:
- 00_sync/060_claude_code
- test_utils
dockerfile: 00_sync/060_claude_code/Dockerfile
dockerignore: 00_sync/060_claude_code/.dockerignore

local_development:
agent:
port: 8000
host_address: host.docker.internal
paths:
acp: project/acp.py

agent:
acp_type: sync
name: s060-claude-code
description: A sync Claude Code agent streaming the unified harness surface via a local CLI subprocess

temporal:
enabled: false

credentials:
- env_var_name: ANTHROPIC_API_KEY
secret_name: anthropic-api-key
secret_key: api-key
- env_var_name: SGP_API_KEY
secret_name: sgp-api-key
secret_key: api-key
- env_var_name: SGP_ACCOUNT_ID
secret_name: sgp-account-id
secret_key: account-id
- env_var_name: SGP_CLIENT_BASE_URL
secret_name: sgp-client-base-url
secret_key: url

deployment:
image:
repository: ""
tag: "latest"

global:
agent:
name: "s060-claude-code"
description: "A sync Claude Code agent streaming via local CLI subprocess"
replicaCount: 1
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1000m"
memory: "2Gi"
Empty file.
137 changes: 137 additions & 0 deletions examples/tutorials/00_sync/060_claude_code/project/acp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""ACP handler for the sync Claude Code tutorial.

Spawns ``claude -p --output-format stream-json --verbose`` as a LOCAL
asyncio subprocess (no Scale sandbox -- that is the golden agent's
production concern). Stdout lines are fed into ``ClaudeCodeTurn``, which
wraps ``convert_claude_code_to_agentex_events``. Events are delivered via
``UnifiedEmitter.yield_turn``, the sync HTTP yield path.

Live runs require the ``claude`` CLI to be installed and an
ANTHROPIC_API_KEY (or equivalent credential) to be in the environment.
For offline testing, see ``tests/test_agent_offline.py``, which injects a
fake subprocess.
"""

from __future__ import annotations

import os
import asyncio
from typing import AsyncIterator, AsyncGenerator

from dotenv import load_dotenv

load_dotenv()

import agentex.lib.adk as adk
from agentex.lib.adk import ClaudeCodeTurn
from agentex.lib.types.acp import SendMessageParams
from agentex.lib.core.harness import UnifiedEmitter
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.types.task_message_update import TaskMessageUpdate
from agentex.types.task_message_content import TaskMessageContent
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config

logger = make_logger(__name__)

add_tracing_processor_config(
SGPTracingProcessorConfig(
sgp_api_key=os.environ.get("SGP_API_KEY", ""),
sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""),
sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""),
)
)

acp = FastACP.create(acp_type="sync")


async def _spawn_claude(prompt: str) -> AsyncIterator[str]:
"""Spawn ``claude -p --output-format stream-json`` locally and yield stdout lines.

This is a seam: tests replace it with a fake async iterator of
pre-recorded lines so no real CLI invocation is needed offline.
"""
proc = await asyncio.create_subprocess_exec(
"claude",
"-p",
"--output-format",
"stream-json",
"--verbose",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert proc.stdout is not None
assert proc.stdin is not None

proc.stdin.write(prompt.encode())
proc.stdin.close()

# Drain stderr concurrently. With --verbose, Claude Code can write enough to
# stderr to fill the OS pipe buffer; if we only read stdout, the CLI blocks
# on its stderr write while we block reading stdout — a deadlock. A
# background task keeps stderr flowing so stdout never stalls.
async def _drain_stderr() -> None:
assert proc.stderr is not None
async for _ in proc.stderr:
pass

stderr_task = asyncio.create_task(_drain_stderr())

try:
buffer = ""
async for chunk in proc.stdout:
buffer += chunk.decode("utf-8", errors="replace")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
yield line

if buffer.strip():
yield buffer.strip()

await proc.wait()
finally:
# Release the subprocess and stderr drain task even if the consumer
# abandons the generator early (task cancellation / client disconnect):
# cancel the drain task and terminate+reap the process if it is still
# running, so neither is leaked.
stderr_task.cancel()
try:
await stderr_task
except asyncio.CancelledError:
pass
if proc.returncode is None:
try:
proc.terminate()
except ProcessLookupError:
pass
await proc.wait()


@acp.on_message_send
async def handle_message_send(
params: SendMessageParams,
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
"""Handle an incoming message: run Claude Code locally and stream events."""
task_id = params.task.id
prompt = params.content.content
logger.info("Processing message for task %s", task_id)

async with adk.tracing.span(
trace_id=task_id,
task_id=task_id,
name="message",
input={"message": prompt},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
)
turn = ClaudeCodeTurn(_spawn_claude(prompt))
async for event in emitter.yield_turn(turn):
yield event
25 changes: 25 additions & 0 deletions examples/tutorials/00_sync/060_claude_code/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "s060-claude-code"
version = "0.1.0"
description = "A sync Claude Code agent streaming the unified harness surface via a local CLI subprocess"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"agentex-sdk",
"scale-gp",
"python-dotenv>=1.0,<2",
]

[project.optional-dependencies]
dev = [
"pytest",
"pytest-asyncio",
"httpx",
]

[tool.hatch.build.targets.wheel]
packages = ["project"]
Loading
Loading