diff --git a/adk/docs/harness.md b/adk/docs/harness.md new file mode 100644 index 000000000..6a9d8947a --- /dev/null +++ b/adk/docs/harness.md @@ -0,0 +1,196 @@ +# Unified Harness Surface + +The unified harness surface gives every agent harness (pydantic-ai, LangGraph, OpenAI Agents, and future parsers) a single, shared path to streaming, message persistence, and tracing. The Agentex `StreamTaskMessage*` event stream is the canonical wire format. A harness tap produces that stream once; the shared machinery delivers it and derives spans from it. + +All public names are re-exported from `agentex.lib.adk`: + +```python +from agentex.lib.adk import ( + UnifiedEmitter, + SpanTracer, + TurnUsage, + TurnResult, + HarnessTurn, + StreamTaskMessage, + OpenSpan, + CloseSpan, + SpanSignal, +) +``` + +The implementation lives at `src/agentex/lib/core/harness/`. + +--- + +## The canonical stream: `StreamTaskMessage` + +`StreamTaskMessage` is a union of the four wire-protocol update types: + +``` +StreamTaskMessageStart - opens a content slot (text, reasoning, tool request, ...) +StreamTaskMessageDelta - appends a token/fragment to an open slot +StreamTaskMessageFull - posts a complete message in one shot (tool response, ...) +StreamTaskMessageDone - closes an open slot +``` + +Every harness tap produces a sequence of these. Everything downstream (delivery, tracing) reads the same sequence. + +--- + +## Per-harness taps: `convert__to_agentex_events` + +A tap is an async generator that translates the harness's native event stream into `StreamTaskMessage*` events. The currently shipped taps are: + +| Harness | Tap function | Exported from | +|---|---|---| +| pydantic-ai | `convert_pydantic_ai_to_agentex_events` | `agentex.lib.adk` | +| LangGraph | `convert_langgraph_to_agentex_events` | `agentex.lib.adk` | + +Taps for claude-code and codex will be added in subsequent PRs (AGX1-420, AGX1-421) and exported from `agentex.lib.adk` in the same way. + +--- + +## `HarnessTurn` protocol + +`HarnessTurn` is the interface a harness turn object must satisfy to plug into `UnifiedEmitter`: + +```python +@runtime_checkable +class HarnessTurn(Protocol): + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: ... + + def usage(self) -> TurnUsage: ... +``` + +`events` is the canonical stream for this turn. `usage()` is valid only after `events` is exhausted (async generators cannot cleanly return a value to the consumer, so usage travels out-of-band). + +--- + +## `TurnUsage` + +Token counts and cost for one turn, harness-independent: + +```python +class TurnUsage(BaseModel): + model: str | None = None + input_tokens: int | None = None + output_tokens: int | None = None + cached_input_tokens: int | None = None + reasoning_tokens: int | None = None + total_tokens: int | None = None + cost_usd: float | None = None + duration_ms: int | None = None + num_llm_calls: int = 0 + num_tool_calls: int = 0 + num_reasoning_blocks: int = 0 +``` + +Field names align with `agentex.lib.core.observability.llm_metrics` for easy conversion. + +--- + +## `UnifiedEmitter` + +`UnifiedEmitter` ties a turn's canonical stream, tracing context, and delivery mode together. Construct one per turn with the task/trace context from the request: + +```python +emitter = UnifiedEmitter( + task_id=params.task.id, + trace_id=params.task.id, # or None to disable tracing + parent_span_id=turn_span.id if turn_span else None, +) +``` + +**Tracing is on by default** when `trace_id` is provided. To disable it explicitly, pass `tracer=False`. To inject a custom `SpanTracer` (e.g. in tests), pass it as `tracer=`. + +### Delivery mode 1: `yield_turn` (sync HTTP ACP) + +For sync ACP agents that return events directly over the HTTP response: + +```python +@acp.on_message_send +async def handle(params): + turn = MyHarnessTurn(params) # implements HarnessTurn + async for event in emitter.yield_turn(turn): + yield event +``` + +`yield_turn` forwards each event to the caller and traces spans as a side effect. It is a passthrough when `tracer` is `None`. + +### Delivery mode 2: `auto_send_turn` (async/Temporal) + +For async or Temporal agents that push to the task stream via Redis: + +```python +result: TurnResult = await emitter.auto_send_turn(turn, created_at=workflow.now()) +``` + +`auto_send_turn` drives `adk.streaming` contexts for every message in the stream, derives and records spans, and returns a `TurnResult` with the final text and usage. Pass `created_at` under Temporal to back-date message timestamps deterministically. + +--- + +## `TurnResult` + +```python +class TurnResult(BaseModel): + final_text: str = "" + usage: TurnUsage = TurnUsage() +``` + +Returned by `auto_send_turn`. `final_text` is the last text segment of the turn (multi-step runs return only the final segment, matching `stream_langgraph_events` / `stream_pydantic_ai_events` semantics). + +--- + +## Tracing: span derivation + +Spans are derived from the canonical stream by `SpanDeriver` (pure, no `adk` dependency) and dispatched to `adk.tracing` by `SpanTracer`. The mapping: + +- `StreamTaskMessageStart(ToolRequestContent)` + `StreamTaskMessageDone` on that index -> tool span open (keyed by `tool_call_id`) +- `StreamTaskMessageFull(ToolResponseContent)` whose `tool_call_id` was opened -> tool span close +- `StreamTaskMessageFull(ToolRequestContent)` (harnesses that emit tool calls as Full) -> opens a tool span; matching `Full(ToolResponseContent)` closes it +- `StreamTaskMessageStart(ReasoningContent)` + `StreamTaskMessageDone` -> reasoning span + +`SpanTracer` is `SpanDeriver`'s consumer. You can inject a custom `SpanTracer` via `UnifiedEmitter(tracer=)` for advanced use or testing. + +--- + +## Usage examples by channel + +### Sync ACP (pydantic-ai tap) + +```python +import agentex.lib.adk as adk +from agentex.lib.adk import UnifiedEmitter, convert_pydantic_ai_to_agentex_events + +@acp.on_message_send +async def handle(params): + task_id = params.task.id + async with adk.tracing.span(trace_id=task_id, name="message", ...) as turn_span: + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + tap = convert_pydantic_ai_to_agentex_events(pydantic_stream) + # wrap tap in a HarnessTurn then yield_turn, or yield directly: + async for event in tap: + yield event +``` + +For the pre-unified sync path the tap is still yielded directly; `UnifiedEmitter.yield_turn` is the forward-looking integration point when a `HarnessTurn` wrapper is available. + +### Async Temporal (auto-send) + +```python +from agentex.lib.adk import UnifiedEmitter + +emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=parent_span_id, +) +result = await emitter.auto_send_turn(turn, created_at=workflow.now()) +# result.final_text — last text segment +# result.usage — TurnUsage (tokens, cost, ...) +``` diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index f6713be7c..fedd52f7a 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -27,6 +27,19 @@ from agentex.lib.adk._modules.tasks import TasksModule from agentex.lib.adk._modules.tracing import TracingModule +# Unified harness surface (AGX1-375) +from agentex.lib.core.harness import ( + UnifiedEmitter, + SpanTracer, + OpenSpan, + CloseSpan, + SpanSignal, + StreamTaskMessage, + TurnUsage, + TurnResult, + HarnessTurn, +) + from agentex.lib.adk import providers from agentex.lib.adk import utils @@ -69,6 +82,16 @@ "convert_codex_to_agentex_events", "CodexTurn", "codex_usage_to_turn_usage", + # Unified harness surface (AGX1-375) + "UnifiedEmitter", + "SpanTracer", + "OpenSpan", + "CloseSpan", + "SpanSignal", + "StreamTaskMessage", + "TurnUsage", + "TurnResult", + "HarnessTurn", # Providers "providers", # Utils