-
Notifications
You must be signed in to change notification settings - Fork 9
feat(harness): unified harness surface — foundation (span derivation, delivery adapters, emitter) #412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
feat(harness): unified harness surface — foundation (span derivation, delivery adapters, emitter) #412
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
e5551b5
docs: design for unified harness tracing/message-emitting surface
declan-scale 5f60ba7
docs: refine unified harness spec — span derivation rules, TurnUsage,…
declan-scale 9e7546e
docs: link deferred tool-error decision to AGX1-371
declan-scale e20693e
docs: foundation implementation plan for unified harness surface (PRs…
declan-scale 1103dde
feat(harness): foundation types for unified harness surface
declan-scale c2a27a1
test(harness): cover CloseSpan defaults and HarnessTurn runtime check
declan-scale 21c3f22
feat(harness): pure SpanDeriver reducing the canonical stream to span…
declan-scale 1446459
refactor(harness): deterministic flush order + defensive index/orphan…
declan-scale eee3071
feat(harness): SpanTracer adapter from span signals to adk.tracing
declan-scale 0ac2dc0
refactor(harness): guarded make_logger import + lifecycle contract te…
declan-scale 03cd746
feat(harness): yield_events delivery adapter (passthrough + tracing)
declan-scale e2744cf
refactor(harness): simplify yield_events guard + cover finally-flush …
declan-scale 6ee287d
feat(harness): auto_send delivery adapter (canonical stream -> adk.st…
declan-scale 89d11f7
refactor(harness): exception-safe full-message post + drop dead state…
declan-scale 00c0140
feat(harness): UnifiedEmitter facade tying delivery + tracing + usage
declan-scale 8a68e2c
refactor(harness): inject streaming into UnifiedEmitter + cover auto_…
declan-scale fd784e1
test(harness): conformance scaffold + CI integration job skeleton
declan-scale 0cd17a5
test(harness): match scripts/test invocation + document conformance r…
declan-scale 6a1b18f
refactor(harness): isinstance narrowing for clean type-check across t…
declan-scale 4cd90a2
refactor(harness): narrow auto_send tracer guards, drop type:ignore f…
declan-scale 51ac1e5
style: ruff import-sort + format fixes across the harness package
declan-scale 97caafe
fix(harness): mark overridden start_span with @override for pyright (…
declan-scale c981f2a
fix(harness): relative import in conformance test for pyright (report…
declan-scale 70a95ba
fix(harness): index-keyed routing, tool stream delivery, final_text l…
declan-scale f15f5a5
test(harness): add tests for AGX1-377 tool stream delivery, index rou…
declan-scale f24c58a
feat(harness): thread created_at through UnifiedEmitter.auto_send_tur…
declan-scale 676de68
fix(harness): read turn.usage() AFTER stream exhaustion in auto_send_…
declan-scale 56d85f3
fix(harness): guard each context close in auto_send teardown [greptile]
declan-scale cf01e70
chore(harness): drop superpowers plan/spec docs from the PR
declan-scale 41b1a32
feat(harness): mark derived tool spans as errored from ToolResponseCo…
declan-scale File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| name: Harness Integration | ||
|
|
||
| on: | ||
| push: | ||
| branches: [main] | ||
| pull_request: | ||
| paths: | ||
| - "src/agentex/lib/core/harness/**" | ||
| - "src/agentex/lib/adk/_modules/**" | ||
| - ".github/workflows/harness-integration.yml" | ||
|
|
||
| jobs: | ||
| conformance: | ||
| runs-on: ubuntu-latest | ||
| steps: | ||
| - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 | ||
|
|
||
| - name: Install uv | ||
| uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5.4.2 | ||
| with: | ||
| version: '0.10.2' | ||
|
|
||
| - name: Bootstrap | ||
| run: ./scripts/bootstrap | ||
|
|
||
| # Defer to scripts/test so the harness suite runs under the exact same | ||
| # invocation as the main CI test job: DEFER_PYDANTIC_BUILD=false and | ||
| # `uv run --isolated --all-packages --all-extras pytest`, across the | ||
| # min/max supported Python versions. Running `uv run pytest` directly | ||
| # would risk an all-extras-only dep passing locally but failing in CI. | ||
| - name: Conformance suite | ||
| run: ./scripts/test tests/lib/core/harness/ -v | ||
|
|
||
| # Live integration matrix (harness x {sync, async, temporal}) is added per-harness | ||
| # in the migration plans. Placeholder job keeps the workflow valid until then. | ||
| live-matrix: | ||
| runs-on: ubuntu-latest | ||
| if: false # enabled once the first harness's test agents land | ||
| steps: | ||
| - run: echo "populated by migration PRs" # TODO(harness-migration): enable per-harness; see migration PRs 4-8 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| """Shared, harness-independent machinery for the unified harness surface. | ||
| The Agentex StreamTaskMessage* stream is the single source of truth; this | ||
| package derives spans from it and delivers it (yield or auto-send), so every | ||
| harness tap gets streaming + tracing + turn usage uniformly. | ||
| """ | ||
|
|
||
| from agentex.lib.core.harness.types import ( | ||
| OpenSpan, | ||
| CloseSpan, | ||
| TurnUsage, | ||
| SpanSignal, | ||
| TurnResult, | ||
| HarnessTurn, | ||
| StreamTaskMessage, | ||
| ) | ||
| from agentex.lib.core.harness.tracer import SpanTracer | ||
| from agentex.lib.core.harness.emitter import UnifiedEmitter | ||
|
|
||
| __all__ = [ | ||
| "UnifiedEmitter", | ||
| "SpanTracer", | ||
| "OpenSpan", | ||
| "CloseSpan", | ||
| "SpanSignal", | ||
| "StreamTaskMessage", | ||
| "TurnUsage", | ||
| "TurnResult", | ||
| "HarnessTurn", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| """Auto-send delivery: canonical stream -> adk.streaming side effects + tracing.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import Any, AsyncIterator | ||
| from datetime import datetime | ||
|
|
||
| from agentex.types.text_delta import TextDelta | ||
| from agentex.types.text_content import TextContent | ||
| from agentex.lib.core.harness.types import TurnUsage, TurnResult, StreamTaskMessage | ||
| from agentex.lib.core.harness.tracer import SpanTracer | ||
| from agentex.types.task_message_update import ( | ||
| StreamTaskMessageDone, | ||
| StreamTaskMessageFull, | ||
| StreamTaskMessageDelta, | ||
| StreamTaskMessageStart, | ||
| ) | ||
| from agentex.lib.core.harness.span_derivation import SpanDeriver | ||
|
|
||
| try: | ||
| from agentex.lib.utils.logging import make_logger | ||
|
|
||
| logger = make_logger(__name__) | ||
| except Exception: # ddtrace may be absent in some envs; fall back to stdlib | ||
| import logging | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| async def auto_send( | ||
| events: AsyncIterator[StreamTaskMessage], | ||
| task_id: str, | ||
| tracer: SpanTracer | None = None, | ||
| streaming: Any = None, | ||
| usage: TurnUsage | None = None, | ||
| created_at: datetime | None = None, | ||
| ) -> TurnResult: | ||
| """Push the canonical stream to the task stream via adk.streaming. | ||
|
|
||
| Opens a streaming context per message (keyed by index), streams deltas via | ||
| ctx.stream_update, and closes via ctx.close() on Done. Posts tool | ||
| request/response full messages by opening a context with the content and | ||
| closing it immediately (no deltas). Derives and traces spans from the same | ||
| stream. Returns the last text segment's text + usage. | ||
|
|
||
| Index-keyed routing: each Start(index=i) opens a context stored in | ||
| ctx_map[i]; Delta(index=i) routes to ctx_map.get(i); Done(index=i) closes | ||
| and removes ctx_map[i]. Events with index is None are skipped. The finally | ||
| block closes all remaining open contexts. | ||
|
|
||
| final_text last-segment semantics: a new Start(TextContent) resets | ||
| final_text_parts so that multi-step turns return the LAST text segment. | ||
| Full(TextContent) also overwrites final_text_parts (same semantics). | ||
|
|
||
| AGX1-378: created_at is forwarded to every streaming_task_message_context | ||
| call so callers can back-date message timestamps. | ||
|
|
||
| Mirrors the open/close/stream_update pattern from | ||
| src/agentex/lib/adk/_modules/_langgraph_async.py: | ||
| - context opened via streaming_task_message_context(...).__aenter__() | ||
| - context closed via ctx.close() (not __aexit__) | ||
| - deltas pushed as StreamTaskMessageDelta with parent_task_message set | ||
| from ctx.task_message | ||
|
|
||
| For async + temporal agents (call from inside an activity). | ||
| """ | ||
| if streaming is None: | ||
| from agentex.lib import adk | ||
|
|
||
| streaming = adk.streaming | ||
|
|
||
| deriver = SpanDeriver() if tracer is not None else None | ||
| final_text_parts: list[str] = [] | ||
| ctx_map: dict[int, Any] = {} | ||
|
|
||
| async def _close_all() -> None: | ||
| # Guard each close independently: a failure on one context (e.g. a | ||
| # backend hiccup during teardown) must not abandon the remaining open | ||
| # contexts, otherwise their task messages would never be finalized. | ||
| for ctx in list(ctx_map.values()): | ||
| try: | ||
| await ctx.close() | ||
| except Exception as exc: | ||
| logger.warning("[harness.auto_send] context close failed during teardown: %s", exc) | ||
| ctx_map.clear() | ||
|
|
||
| try: | ||
| async for event in events: | ||
| if deriver is not None and tracer is not None: | ||
| for signal in deriver.observe(event): | ||
| await tracer.handle(signal) | ||
|
|
||
| if isinstance(event, StreamTaskMessageStart): | ||
| if event.index is None: | ||
| continue | ||
| i = event.index | ||
| # Reset final_text_parts when a new text segment starts | ||
| if isinstance(event.content, TextContent): | ||
| final_text_parts = [] | ||
| ctx = streaming.streaming_task_message_context( | ||
| task_id=task_id, | ||
| initial_content=event.content, | ||
| created_at=created_at, | ||
| ) | ||
| ctx_map[i] = await ctx.__aenter__() | ||
|
|
||
| elif isinstance(event, StreamTaskMessageDelta): | ||
| if event.index is None: | ||
| continue | ||
| ctx = ctx_map.get(event.index) | ||
| if ctx is not None and event.delta is not None: | ||
| # Reconstruct the delta with parent_task_message set from | ||
| # the context's task_message (mirrors _langgraph_async.py | ||
| # lines 72-78 and 117-127). | ||
| delta_with_parent = StreamTaskMessageDelta( | ||
| parent_task_message=ctx.task_message, | ||
| delta=event.delta, | ||
| type="delta", | ||
| index=event.index, | ||
| ) | ||
| await ctx.stream_update(delta_with_parent) | ||
| if isinstance(event.delta, TextDelta) and event.delta.text_delta: | ||
| final_text_parts.append(event.delta.text_delta) | ||
|
|
||
| elif isinstance(event, StreamTaskMessageDone): | ||
| if event.index is None: | ||
| continue | ||
| ctx = ctx_map.pop(event.index, None) | ||
| if ctx is not None: | ||
| await ctx.close() | ||
|
|
||
| elif isinstance(event, StreamTaskMessageFull): | ||
|
greptile-apps[bot] marked this conversation as resolved.
|
||
| # Full messages: post the full message by opening a context | ||
| # with the content and closing it immediately (no deltas; | ||
| # StreamingTaskMessageContext.close() persists initial_content | ||
| # when the accumulator is empty). Use async with so the context | ||
| # is closed even if close() raises (__aexit__ delegates to | ||
| # close()). | ||
| # Full(TextContent) also resets final_text_parts for | ||
| # last-segment semantics. | ||
| if isinstance(event.content, TextContent): | ||
| final_text_parts = [event.content.content] | ||
| async with streaming.streaming_task_message_context( | ||
| task_id=task_id, | ||
| initial_content=event.content, | ||
| created_at=created_at, | ||
| ): | ||
| pass | ||
|
greptile-apps[bot] marked this conversation as resolved.
greptile-apps[bot] marked this conversation as resolved.
|
||
|
|
||
| finally: | ||
| await _close_all() | ||
| if deriver is not None and tracer is not None: | ||
| for signal in deriver.flush(): | ||
| await tracer.handle(signal) | ||
|
|
||
| return TurnResult(final_text="".join(final_text_parts), usage=usage or TurnUsage()) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| """UnifiedEmitter: the single facade agent authors use for either delivery mode.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import AsyncGenerator | ||
| from datetime import datetime | ||
|
|
||
| from agentex.lib.core.harness.types import TurnResult, HarnessTurn, StreamTaskMessage | ||
| from agentex.lib.core.harness.tracer import SpanTracer | ||
| from agentex.lib.core.harness.auto_send import auto_send | ||
| from agentex.lib.core.harness.yield_delivery import yield_events | ||
|
|
||
|
|
||
| class UnifiedEmitter: | ||
| """Ties trace context + chosen delivery together. | ||
|
|
||
| Tracing modes (the `tracer` arg): | ||
| - tracer=None (default): auto-construct a SpanTracer if `trace_id` is present. | ||
| - tracer=False: disable tracing entirely, regardless of `trace_id`. | ||
| - tracer=<SpanTracer>: use the supplied instance. | ||
|
|
||
| `tracing` and `streaming` are injection escape-hatches for tests/advanced | ||
| use; leave them None in production so the real adk modules are used. | ||
| """ | ||
|
|
||
| tracer: SpanTracer | None | ||
|
|
||
| def __init__( | ||
| self, | ||
| task_id: str, | ||
| trace_id: str | None, | ||
| parent_span_id: str | None, | ||
| tracer: SpanTracer | bool | None = None, | ||
| tracing: object | None = None, | ||
| streaming: object | None = None, | ||
| ): | ||
| self.task_id = task_id | ||
| self.trace_id = trace_id | ||
| self.parent_span_id = parent_span_id | ||
| self._streaming = streaming | ||
| if tracer is False: | ||
| self.tracer = None | ||
| elif isinstance(tracer, SpanTracer): | ||
| self.tracer = tracer | ||
| elif trace_id: | ||
| self.tracer = SpanTracer( | ||
| trace_id=trace_id, | ||
| parent_span_id=parent_span_id, | ||
| task_id=task_id, | ||
| tracing=tracing, | ||
| ) | ||
| else: | ||
| self.tracer = None | ||
|
|
||
| async def yield_turn(self, turn: HarnessTurn) -> AsyncGenerator[StreamTaskMessage, None]: | ||
| """Sync HTTP ACP delivery: forward events, trace as side effect.""" | ||
| async for event in yield_events(turn.events, tracer=self.tracer): | ||
| yield event | ||
|
|
||
| async def auto_send_turn(self, turn: HarnessTurn, created_at: datetime | None = None) -> TurnResult: | ||
| """Async/temporal delivery: push to the task stream, return TurnResult. | ||
|
|
||
| Pass `created_at` (e.g. `workflow.now()` under Temporal) to stamp the | ||
| turn's messages with a deterministic timestamp; it is forwarded to the | ||
| streaming contexts. Default None preserves server-side timestamps. | ||
| """ | ||
| # `turn.usage()` is only valid AFTER `turn.events` is exhausted (the | ||
| # HarnessTurn single-pass contract: real turns populate usage while the | ||
| # stream is consumed). So drive delivery first, then read usage — do NOT | ||
| # pass `usage=turn.usage()` eagerly here (that would capture the empty | ||
| # default before the stream runs). | ||
| result = await auto_send( | ||
| turn.events, | ||
| task_id=self.task_id, | ||
| tracer=self.tracer, | ||
| streaming=self._streaming, | ||
| created_at=created_at, | ||
| ) | ||
| result.usage = turn.usage() | ||
| return result |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.