Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e5551b5
docs: design for unified harness tracing/message-emitting surface
declan-scale Jun 18, 2026
5f60ba7
docs: refine unified harness spec — span derivation rules, TurnUsage,…
declan-scale Jun 18, 2026
9e7546e
docs: link deferred tool-error decision to AGX1-371
declan-scale Jun 18, 2026
e20693e
docs: foundation implementation plan for unified harness surface (PRs…
declan-scale Jun 18, 2026
1103dde
feat(harness): foundation types for unified harness surface
declan-scale Jun 18, 2026
c2a27a1
test(harness): cover CloseSpan defaults and HarnessTurn runtime check
declan-scale Jun 18, 2026
21c3f22
feat(harness): pure SpanDeriver reducing the canonical stream to span…
declan-scale Jun 18, 2026
1446459
refactor(harness): deterministic flush order + defensive index/orphan…
declan-scale Jun 18, 2026
eee3071
feat(harness): SpanTracer adapter from span signals to adk.tracing
declan-scale Jun 18, 2026
0ac2dc0
refactor(harness): guarded make_logger import + lifecycle contract te…
declan-scale Jun 18, 2026
03cd746
feat(harness): yield_events delivery adapter (passthrough + tracing)
declan-scale Jun 18, 2026
e2744cf
refactor(harness): simplify yield_events guard + cover finally-flush …
declan-scale Jun 18, 2026
6ee287d
feat(harness): auto_send delivery adapter (canonical stream -> adk.st…
declan-scale Jun 18, 2026
89d11f7
refactor(harness): exception-safe full-message post + drop dead state…
declan-scale Jun 18, 2026
00c0140
feat(harness): UnifiedEmitter facade tying delivery + tracing + usage
declan-scale Jun 18, 2026
8a68e2c
refactor(harness): inject streaming into UnifiedEmitter + cover auto_…
declan-scale Jun 18, 2026
fd784e1
test(harness): conformance scaffold + CI integration job skeleton
declan-scale Jun 18, 2026
0cd17a5
test(harness): match scripts/test invocation + document conformance r…
declan-scale Jun 18, 2026
6a1b18f
refactor(harness): isinstance narrowing for clean type-check across t…
declan-scale Jun 18, 2026
4cd90a2
refactor(harness): narrow auto_send tracer guards, drop type:ignore f…
declan-scale Jun 18, 2026
51ac1e5
style: ruff import-sort + format fixes across the harness package
declan-scale Jun 18, 2026
97caafe
fix(harness): mark overridden start_span with @override for pyright (…
declan-scale Jun 18, 2026
c981f2a
fix(harness): relative import in conformance test for pyright (report…
declan-scale Jun 18, 2026
70a95ba
fix(harness): index-keyed routing, tool stream delivery, final_text l…
declan-scale Jun 18, 2026
f15f5a5
test(harness): add tests for AGX1-377 tool stream delivery, index rou…
declan-scale Jun 18, 2026
f24c58a
feat(harness): thread created_at through UnifiedEmitter.auto_send_tur…
declan-scale Jun 18, 2026
676de68
fix(harness): read turn.usage() AFTER stream exhaustion in auto_send_…
declan-scale Jun 22, 2026
56d85f3
fix(harness): guard each context close in auto_send teardown [greptile]
declan-scale Jun 22, 2026
cf01e70
chore(harness): drop superpowers plan/spec docs from the PR
declan-scale Jun 22, 2026
41b1a32
feat(harness): mark derived tool spans as errored from ToolResponseCo…
declan-scale Jun 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .github/workflows/harness-integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Harness Integration

on:
push:
branches: [main]
pull_request:
paths:
- "src/agentex/lib/core/harness/**"
- "src/agentex/lib/adk/_modules/**"
- ".github/workflows/harness-integration.yml"

jobs:
conformance:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

- name: Install uv
uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5.4.2
with:
version: '0.10.2'

- name: Bootstrap
run: ./scripts/bootstrap

# Defer to scripts/test so the harness suite runs under the exact same
# invocation as the main CI test job: DEFER_PYDANTIC_BUILD=false and
# `uv run --isolated --all-packages --all-extras pytest`, across the
# min/max supported Python versions. Running `uv run pytest` directly
# would risk an all-extras-only dep passing locally but failing in CI.
- name: Conformance suite
run: ./scripts/test tests/lib/core/harness/ -v

# Live integration matrix (harness x {sync, async, temporal}) is added per-harness
# in the migration plans. Placeholder job keeps the workflow valid until then.
live-matrix:
runs-on: ubuntu-latest
if: false # enabled once the first harness's test agents land
steps:
- run: echo "populated by migration PRs" # TODO(harness-migration): enable per-harness; see migration PRs 4-8
30 changes: 30 additions & 0 deletions src/agentex/lib/core/harness/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Shared, harness-independent machinery for the unified harness surface.
The Agentex StreamTaskMessage* stream is the single source of truth; this
package derives spans from it and delivers it (yield or auto-send), so every
harness tap gets streaming + tracing + turn usage uniformly.
"""

from agentex.lib.core.harness.types import (
OpenSpan,
CloseSpan,
TurnUsage,
SpanSignal,
TurnResult,
HarnessTurn,
StreamTaskMessage,
)
from agentex.lib.core.harness.tracer import SpanTracer
from agentex.lib.core.harness.emitter import UnifiedEmitter

__all__ = [
"UnifiedEmitter",
"SpanTracer",
"OpenSpan",
"CloseSpan",
"SpanSignal",
"StreamTaskMessage",
"TurnUsage",
"TurnResult",
"HarnessTurn",
]
156 changes: 156 additions & 0 deletions src/agentex/lib/core/harness/auto_send.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Auto-send delivery: canonical stream -> adk.streaming side effects + tracing."""

from __future__ import annotations

from typing import Any, AsyncIterator
from datetime import datetime

from agentex.types.text_delta import TextDelta
from agentex.types.text_content import TextContent
from agentex.lib.core.harness.types import TurnUsage, TurnResult, StreamTaskMessage
from agentex.lib.core.harness.tracer import SpanTracer
from agentex.types.task_message_update import (
StreamTaskMessageDone,
StreamTaskMessageFull,
StreamTaskMessageDelta,
StreamTaskMessageStart,
)
from agentex.lib.core.harness.span_derivation import SpanDeriver

try:
from agentex.lib.utils.logging import make_logger

logger = make_logger(__name__)
except Exception: # ddtrace may be absent in some envs; fall back to stdlib
import logging

logger = logging.getLogger(__name__)


async def auto_send(
events: AsyncIterator[StreamTaskMessage],
task_id: str,
tracer: SpanTracer | None = None,
streaming: Any = None,
usage: TurnUsage | None = None,
created_at: datetime | None = None,
) -> TurnResult:
"""Push the canonical stream to the task stream via adk.streaming.

Opens a streaming context per message (keyed by index), streams deltas via
ctx.stream_update, and closes via ctx.close() on Done. Posts tool
request/response full messages by opening a context with the content and
closing it immediately (no deltas). Derives and traces spans from the same
stream. Returns the last text segment's text + usage.

Index-keyed routing: each Start(index=i) opens a context stored in
ctx_map[i]; Delta(index=i) routes to ctx_map.get(i); Done(index=i) closes
and removes ctx_map[i]. Events with index is None are skipped. The finally
block closes all remaining open contexts.

final_text last-segment semantics: a new Start(TextContent) resets
final_text_parts so that multi-step turns return the LAST text segment.
Full(TextContent) also overwrites final_text_parts (same semantics).

AGX1-378: created_at is forwarded to every streaming_task_message_context
call so callers can back-date message timestamps.

Mirrors the open/close/stream_update pattern from
src/agentex/lib/adk/_modules/_langgraph_async.py:
- context opened via streaming_task_message_context(...).__aenter__()
- context closed via ctx.close() (not __aexit__)
- deltas pushed as StreamTaskMessageDelta with parent_task_message set
from ctx.task_message

For async + temporal agents (call from inside an activity).
"""
if streaming is None:
from agentex.lib import adk

streaming = adk.streaming

deriver = SpanDeriver() if tracer is not None else None
final_text_parts: list[str] = []
ctx_map: dict[int, Any] = {}

async def _close_all() -> None:
# Guard each close independently: a failure on one context (e.g. a
# backend hiccup during teardown) must not abandon the remaining open
# contexts, otherwise their task messages would never be finalized.
for ctx in list(ctx_map.values()):
try:
await ctx.close()
except Exception as exc:
logger.warning("[harness.auto_send] context close failed during teardown: %s", exc)
ctx_map.clear()
Comment thread
greptile-apps[bot] marked this conversation as resolved.

try:
async for event in events:
if deriver is not None and tracer is not None:
for signal in deriver.observe(event):
await tracer.handle(signal)

if isinstance(event, StreamTaskMessageStart):
if event.index is None:
continue
i = event.index
# Reset final_text_parts when a new text segment starts
if isinstance(event.content, TextContent):
final_text_parts = []
ctx = streaming.streaming_task_message_context(
task_id=task_id,
initial_content=event.content,
created_at=created_at,
)
ctx_map[i] = await ctx.__aenter__()

elif isinstance(event, StreamTaskMessageDelta):
if event.index is None:
continue
ctx = ctx_map.get(event.index)
if ctx is not None and event.delta is not None:
# Reconstruct the delta with parent_task_message set from
# the context's task_message (mirrors _langgraph_async.py
# lines 72-78 and 117-127).
delta_with_parent = StreamTaskMessageDelta(
parent_task_message=ctx.task_message,
delta=event.delta,
type="delta",
index=event.index,
)
await ctx.stream_update(delta_with_parent)
if isinstance(event.delta, TextDelta) and event.delta.text_delta:
final_text_parts.append(event.delta.text_delta)

elif isinstance(event, StreamTaskMessageDone):
if event.index is None:
continue
ctx = ctx_map.pop(event.index, None)
if ctx is not None:
await ctx.close()

elif isinstance(event, StreamTaskMessageFull):
Comment thread
greptile-apps[bot] marked this conversation as resolved.
# Full messages: post the full message by opening a context
# with the content and closing it immediately (no deltas;
# StreamingTaskMessageContext.close() persists initial_content
# when the accumulator is empty). Use async with so the context
# is closed even if close() raises (__aexit__ delegates to
# close()).
# Full(TextContent) also resets final_text_parts for
# last-segment semantics.
if isinstance(event.content, TextContent):
final_text_parts = [event.content.content]
async with streaming.streaming_task_message_context(
task_id=task_id,
initial_content=event.content,
created_at=created_at,
):
pass
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Comment thread
greptile-apps[bot] marked this conversation as resolved.

finally:
await _close_all()
if deriver is not None and tracer is not None:
for signal in deriver.flush():
await tracer.handle(signal)

return TurnResult(final_text="".join(final_text_parts), usage=usage or TurnUsage())
80 changes: 80 additions & 0 deletions src/agentex/lib/core/harness/emitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""UnifiedEmitter: the single facade agent authors use for either delivery mode."""

from __future__ import annotations

from typing import AsyncGenerator
from datetime import datetime

from agentex.lib.core.harness.types import TurnResult, HarnessTurn, StreamTaskMessage
from agentex.lib.core.harness.tracer import SpanTracer
from agentex.lib.core.harness.auto_send import auto_send
from agentex.lib.core.harness.yield_delivery import yield_events


class UnifiedEmitter:
"""Ties trace context + chosen delivery together.

Tracing modes (the `tracer` arg):
- tracer=None (default): auto-construct a SpanTracer if `trace_id` is present.
- tracer=False: disable tracing entirely, regardless of `trace_id`.
- tracer=<SpanTracer>: use the supplied instance.

`tracing` and `streaming` are injection escape-hatches for tests/advanced
use; leave them None in production so the real adk modules are used.
"""

tracer: SpanTracer | None

def __init__(
self,
task_id: str,
trace_id: str | None,
parent_span_id: str | None,
tracer: SpanTracer | bool | None = None,
tracing: object | None = None,
streaming: object | None = None,
):
self.task_id = task_id
self.trace_id = trace_id
self.parent_span_id = parent_span_id
self._streaming = streaming
if tracer is False:
self.tracer = None
elif isinstance(tracer, SpanTracer):
self.tracer = tracer
elif trace_id:
self.tracer = SpanTracer(
trace_id=trace_id,
parent_span_id=parent_span_id,
task_id=task_id,
tracing=tracing,
)
else:
self.tracer = None

async def yield_turn(self, turn: HarnessTurn) -> AsyncGenerator[StreamTaskMessage, None]:
"""Sync HTTP ACP delivery: forward events, trace as side effect."""
async for event in yield_events(turn.events, tracer=self.tracer):
yield event

async def auto_send_turn(self, turn: HarnessTurn, created_at: datetime | None = None) -> TurnResult:
"""Async/temporal delivery: push to the task stream, return TurnResult.

Pass `created_at` (e.g. `workflow.now()` under Temporal) to stamp the
turn's messages with a deterministic timestamp; it is forwarded to the
streaming contexts. Default None preserves server-side timestamps.
"""
# `turn.usage()` is only valid AFTER `turn.events` is exhausted (the
# HarnessTurn single-pass contract: real turns populate usage while the
# stream is consumed). So drive delivery first, then read usage — do NOT
# pass `usage=turn.usage()` eagerly here (that would capture the empty
# default before the stream runs).
result = await auto_send(
turn.events,
task_id=self.task_id,
tracer=self.tracer,
streaming=self._streaming,
created_at=created_at,
)
result.usage = turn.usage()
return result
Loading
Loading