Skip to content
196 changes: 196 additions & 0 deletions adk/docs/harness.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# Unified Harness Surface

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


The unified harness surface gives every agent harness (pydantic-ai, LangGraph, OpenAI Agents, and future parsers) a single, shared path to streaming, message persistence, and tracing. The Agentex `StreamTaskMessage*` event stream is the canonical wire format. A harness tap produces that stream once; the shared machinery delivers it and derives spans from it.

All public names are re-exported from `agentex.lib.adk`:

```python
from agentex.lib.adk import (
UnifiedEmitter,
SpanTracer,
TurnUsage,
TurnResult,
HarnessTurn,
StreamTaskMessage,
OpenSpan,
CloseSpan,
SpanSignal,
)
```

The implementation lives at `src/agentex/lib/core/harness/`.

---

## The canonical stream: `StreamTaskMessage`

`StreamTaskMessage` is a union of the four wire-protocol update types:

```
StreamTaskMessageStart - opens a content slot (text, reasoning, tool request, ...)
StreamTaskMessageDelta - appends a token/fragment to an open slot
StreamTaskMessageFull - posts a complete message in one shot (tool response, ...)
StreamTaskMessageDone - closes an open slot
```

Every harness tap produces a sequence of these. Everything downstream (delivery, tracing) reads the same sequence.

---

## Per-harness taps: `convert_<harness>_to_agentex_events`

A tap is an async generator that translates the harness's native event stream into `StreamTaskMessage*` events. The currently shipped taps are:

| Harness | Tap function | Exported from |
|---|---|---|
| pydantic-ai | `convert_pydantic_ai_to_agentex_events` | `agentex.lib.adk` |
| LangGraph | `convert_langgraph_to_agentex_events` | `agentex.lib.adk` |

Taps for claude-code and codex will be added in subsequent PRs (AGX1-420, AGX1-421) and exported from `agentex.lib.adk` in the same way.

---

## `HarnessTurn` protocol

`HarnessTurn` is the interface a harness turn object must satisfy to plug into `UnifiedEmitter`:

```python
@runtime_checkable
class HarnessTurn(Protocol):
@property
def events(self) -> AsyncIterator[StreamTaskMessage]: ...

def usage(self) -> TurnUsage: ...
```

`events` is the canonical stream for this turn. `usage()` is valid only after `events` is exhausted (async generators cannot cleanly return a value to the consumer, so usage travels out-of-band).

---

## `TurnUsage`

Token counts and cost for one turn, harness-independent:

```python
class TurnUsage(BaseModel):
model: str | None = None
input_tokens: int | None = None
output_tokens: int | None = None
cached_input_tokens: int | None = None
reasoning_tokens: int | None = None
total_tokens: int | None = None
cost_usd: float | None = None
duration_ms: int | None = None
num_llm_calls: int = 0
num_tool_calls: int = 0
num_reasoning_blocks: int = 0
```

Field names align with `agentex.lib.core.observability.llm_metrics` for easy conversion.

---

## `UnifiedEmitter`

`UnifiedEmitter` ties a turn's canonical stream, tracing context, and delivery mode together. Construct one per turn with the task/trace context from the request:

```python
emitter = UnifiedEmitter(
task_id=params.task.id,
trace_id=params.task.id, # or None to disable tracing
parent_span_id=turn_span.id if turn_span else None,
)
```

**Tracing is on by default** when `trace_id` is provided. To disable it explicitly, pass `tracer=False`. To inject a custom `SpanTracer` (e.g. in tests), pass it as `tracer=<instance>`.

### Delivery mode 1: `yield_turn` (sync HTTP ACP)

For sync ACP agents that return events directly over the HTTP response:

```python
@acp.on_message_send
async def handle(params):
turn = MyHarnessTurn(params) # implements HarnessTurn
async for event in emitter.yield_turn(turn):
yield event
```

`yield_turn` forwards each event to the caller and traces spans as a side effect. It is a passthrough when `tracer` is `None`.

### Delivery mode 2: `auto_send_turn` (async/Temporal)

For async or Temporal agents that push to the task stream via Redis:

```python
result: TurnResult = await emitter.auto_send_turn(turn, created_at=workflow.now())
```

`auto_send_turn` drives `adk.streaming` contexts for every message in the stream, derives and records spans, and returns a `TurnResult` with the final text and usage. Pass `created_at` under Temporal to back-date message timestamps deterministically.

---

## `TurnResult`

```python
class TurnResult(BaseModel):
final_text: str = ""
usage: TurnUsage = TurnUsage()
```

Returned by `auto_send_turn`. `final_text` is the last text segment of the turn (multi-step runs return only the final segment, matching `stream_langgraph_events` / `stream_pydantic_ai_events` semantics).

---

## Tracing: span derivation

Spans are derived from the canonical stream by `SpanDeriver` (pure, no `adk` dependency) and dispatched to `adk.tracing` by `SpanTracer`. The mapping:

- `StreamTaskMessageStart(ToolRequestContent)` + `StreamTaskMessageDone` on that index -> tool span open (keyed by `tool_call_id`)
- `StreamTaskMessageFull(ToolResponseContent)` whose `tool_call_id` was opened -> tool span close
- `StreamTaskMessageFull(ToolRequestContent)` (harnesses that emit tool calls as Full) -> opens a tool span; matching `Full(ToolResponseContent)` closes it
- `StreamTaskMessageStart(ReasoningContent)` + `StreamTaskMessageDone` -> reasoning span

`SpanTracer` is `SpanDeriver`'s consumer. You can inject a custom `SpanTracer` via `UnifiedEmitter(tracer=<instance>)` for advanced use or testing.

---

## Usage examples by channel

### Sync ACP (pydantic-ai tap)

```python
import agentex.lib.adk as adk
from agentex.lib.adk import UnifiedEmitter, convert_pydantic_ai_to_agentex_events

@acp.on_message_send
async def handle(params):
task_id = params.task.id
async with adk.tracing.span(trace_id=task_id, name="message", ...) as turn_span:
emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
)
tap = convert_pydantic_ai_to_agentex_events(pydantic_stream)
# wrap tap in a HarnessTurn then yield_turn, or yield directly:
async for event in tap:
yield event
```

For the pre-unified sync path the tap is still yielded directly; `UnifiedEmitter.yield_turn` is the forward-looking integration point when a `HarnessTurn` wrapper is available.

### Async Temporal (auto-send)

```python
from agentex.lib.adk import UnifiedEmitter

emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=parent_span_id,
)
result = await emitter.auto_send_turn(turn, created_at=workflow.now())
# result.final_text — last text segment
# result.usage — TurnUsage (tokens, cost, ...)
```
23 changes: 23 additions & 0 deletions src/agentex/lib/adk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
from agentex.lib.adk._modules.tasks import TasksModule
from agentex.lib.adk._modules.tracing import TracingModule

# Unified harness surface (AGX1-375)
from agentex.lib.core.harness import (
UnifiedEmitter,
SpanTracer,
OpenSpan,
CloseSpan,
SpanSignal,
StreamTaskMessage,
TurnUsage,
TurnResult,
HarnessTurn,
)

from agentex.lib.adk import providers
from agentex.lib.adk import utils

Expand Down Expand Up @@ -69,6 +82,16 @@
"convert_codex_to_agentex_events",
"CodexTurn",
"codex_usage_to_turn_usage",
# Unified harness surface (AGX1-375)
"UnifiedEmitter",
"SpanTracer",
"OpenSpan",
"CloseSpan",
"SpanSignal",
"StreamTaskMessage",
"TurnUsage",
"TurnResult",
"HarnessTurn",
# Providers
"providers",
# Utils
Expand Down
Loading