feat(langgraph): migrate LangGraph harness onto unified surface#417
Conversation
| def __init__(self, stream: Any, model: str | None = None) -> None: | ||
| self._stream = stream | ||
| self._model = model | ||
| self._usage: TurnUsage = TurnUsage(model=model) | ||
|
|
||
| @property | ||
| def events(self) -> AsyncIterator[StreamTaskMessage]: | ||
| return self._generate_events() | ||
|
|
||
| async def _generate_events(self) -> AsyncGenerator[StreamTaskMessage, None]: | ||
| def _capture(ai_msg: Any) -> None: | ||
| usage_metadata = getattr(ai_msg, "usage_metadata", None) | ||
| if usage_metadata is not None: | ||
| self._usage = langgraph_usage_to_turn_usage(usage_metadata, self._model) | ||
|
|
||
| async for ev in convert_langgraph_to_agentex_events(self._stream, on_final_ai_message=_capture): | ||
| yield ev | ||
|
|
||
| def usage(self) -> TurnUsage: | ||
| """Return the usage captured from the last AIMessage in the stream. | ||
|
|
||
| Valid only after ``events`` has been fully consumed. | ||
| Returns a zero-usage ``TurnUsage`` if the model did not report usage. | ||
| """ | ||
| return self._usage |
There was a problem hiding this comment.
TurnResult.usage is always empty when using auto_send_turn
LangGraphTurn populates self._usage lazily via the on_final_ai_message callback, which fires during event iteration. However, UnifiedEmitter.auto_send_turn passes usage=turn.usage() as an argument to auto_send before iteration begins (Python evaluates all arguments before the call). By the time the stream is consumed and _capture updates self._usage, the pre-iteration snapshot has already been handed to TurnResult.
Concretely: every caller that reads result.usage after await emitter.auto_send_turn(turn) gets TurnUsage(model=model) — zero token counts regardless of what the model reported. The PR description documents the workaround ("callers should read turn.usage() after auto_send_turn returns"), but TurnResult.usage existing with silent stale data is a trap for every future user of this API.
The fix belongs in emitter.py: call turn.usage() after await auto_send(turn.events, ...) returns, then construct the TurnResult from the now-populated usage.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/adk/_modules/_langgraph_turn.py
Line: 95-119
Comment:
**`TurnResult.usage` is always empty when using `auto_send_turn`**
`LangGraphTurn` populates `self._usage` lazily via the `on_final_ai_message` callback, which fires _during_ event iteration. However, `UnifiedEmitter.auto_send_turn` passes `usage=turn.usage()` as an argument to `auto_send` _before_ iteration begins (Python evaluates all arguments before the call). By the time the stream is consumed and `_capture` updates `self._usage`, the pre-iteration snapshot has already been handed to `TurnResult`.
Concretely: every caller that reads `result.usage` after `await emitter.auto_send_turn(turn)` gets `TurnUsage(model=model)` — zero token counts regardless of what the model reported. The PR description documents the workaround ("callers should read `turn.usage()` after `auto_send_turn` returns"), but `TurnResult.usage` existing with silent stale data is a trap for every future user of this API.
The fix belongs in `emitter.py`: call `turn.usage()` _after_ `await auto_send(turn.events, ...)` returns, then construct the `TurnResult` from the now-populated usage.
How can I resolve this? If you propose a fix, please make it concise.dc5c81d to
68572d5
Compare
|
@greptile review |
b4b8b33 to
da780a1
Compare
68572d5 to
40b4cbe
Compare
37421b6 to
df3461c
Compare
4fd2ff4 to
e03a584
Compare
ccbd5cf to
e3fa1cc
Compare
734b298 to
6ac00e3
Compare
c8c63d1 to
05120f3
Compare
a367469 to
af6a4b2
Compare
05120f3 to
c9a907c
Compare
af6a4b2 to
002d5f9
Compare
c9a907c to
a04bf5e
Compare
002d5f9 to
bdd528b
Compare
Adds an additive on_final_ai_message=None parameter to convert_langgraph_to_agentex_events so callers can capture AIMessage usage_metadata without re-traversing the stream. No behavior change when omitted. Also adds a DeprecationWarning to create_langgraph_tracing_handler and its module docstring, pointing to the unified harness surface, and updates the sync module docstring with the preferred unified path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Implements LangGraphTurn (HarnessTurn protocol) that wraps a LangGraph astream() event stream and captures usage from AIMessage.usage_metadata via the on_final_ai_message callback. Implements langgraph_usage_to_turn_usage that maps all UsageMetadata fields (input/output/total/cache_read/reasoning) onto the framework-agnostic TurnUsage model. Zero token counts are preserved. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…pre-refactor) Records the current bespoke behavior as a contract test. After Task 4 rewrites the internals to use UnifiedEmitter + LangGraphTurn, these tests must still pass to confirm behavioral parity. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…urface Replaces the bespoke Redis-streaming loop with UnifiedEmitter.auto_send_turn( LangGraphTurn(...)), matching the pattern established for pydantic-ai. Public signature preserved identically. Behavioral difference: tool calls/responses are now posted via streaming_task_message_context (not adk.messages.create), and final_text accumulates all text across the turn. Updates the characterization test to document these unified-surface semantics. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Verifies yield_turn(LangGraphTurn) produces identical events to direct iteration, and documents the AGX1-377 behavior (LangGraph Full tool events don't produce SpanDeriver spans today; cross-channel equivalence comes with AGX1-373). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…-step) Registers LangGraph-specific conformance fixtures with the shared harness conformance runner. Documents the AGX1-377 behavior (tool requests are Full events, not Start+Done). Span derivation is deterministic for all 4 fixtures. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ral channels Adds 18 offline integration tests across the three delivery channels using fake LangGraph event streams and fake streaming backends. Documents the AGX1-377 behavior (Full events don't produce tool spans). Notes the usage capture timing: turn.usage() is the authoritative post-iteration value since auto_send_turn evaluates usage eagerly before events are consumed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Task 9: add 3 deployable tutorial agents that demonstrate the unified harness surface side-by-side with the bespoke reference examples: - examples/tutorials/00_sync/harness_langgraph/ (s-harness-langgraph) uses UnifiedEmitter.yield_turn(LangGraphTurn(stream)) - examples/tutorials/10_async/00_base/harness_langgraph/ (a-harness-langgraph) uses UnifiedEmitter.auto_send_turn(LangGraphTurn(stream)) - examples/tutorials/10_async/10_temporal/harness_langgraph/ (at-harness-langgraph) follows 130_langgraph pattern (LangGraphPlugin + emit_langgraph_messages) Task 10: enable live-matrix CI job in harness-integration.yml with a 3-way matrix over [sync, async, temporal] running offline integration tests. Also add test_harness_langgraph_*.py to PR path triggers. Task 11 (pyright fixes): annotate convert_langgraph_to_agentex_events and _generate_events with AsyncGenerator return types so pyright infers them as async generators rather than coroutines. Add start_time to Span construction in test_langgraph_sync_unified.py fake tracing backend. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…racing handler (PR 5/6) AGX1-378: wire workflow_now_if_in_workflow() into stream_langgraph_events so Temporal callers get deterministic message timestamps, matching the pattern used by the openai/litellm providers. Deprecation alignment: remove runtime warnings.warn from create_langgraph_tracing_handler (and unused import warnings) to match PR 4/6 pydantic-ai convention. Deprecation remains in docstrings on module, class, and function. Callers under -W error are no longer broken. Test alignment after rebase onto unified-harness-surface (b4b8b33): - FakeStreamingModule.streaming_task_message_context in test_langgraph_async.py and test_pydantic_ai_async.py updated to accept **kw (foundation now passes created_at). - Three "no tool spans for Full events" tests updated to assert the new SpanDeriver behaviour: Full(ToolRequestContent) opens a span, Full(ToolResponseContent) closes it. - Two "accumulates all text" multi-step tests corrected to last-segment semantics (auto_send resets final_text_parts on each new Start(TextContent)). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…-373) Rewrites test_langgraph_conformance.py to use the cross-channel runner from PR #414 (run_cross_channel_conformance, LogicalDelivery) instead of the simpler derive_all-only API it was written against. The four fixtures (text-only, single-tool, reasoning, multi-step) are retained as canonical StreamTaskMessage* sequences. Each is now exercised by test_cross_channel_equivalence (yield_events vs auto_send logical deliveries and span signals) plus the backward-compat test_span_derivation_is_deterministic guard. LangGraph tool requests arrive as Full events from the "updates" stream; auto_send handles them via open+close, yielding the same LogicalDelivery on both channels. No coalesce_tool_requests option is needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
_capture overwrote self._usage on every AIMessage, so a multi-step turn (text -> tool decision -> final text) reported only the last LLM call's tokens and silently dropped the rest — undercounting in any billing/monitoring that reads turn.usage(). Accumulate additively across calls via _accumulate_turn_usage (None+None stays None; real 0 contributes 0). Add a test asserting summed input/output/total/cache/reasoning tokens across two AIMessages. The separate 06-18 "TurnResult.usage empty via auto_send_turn" comment is resolved by the foundation (emitter reads turn.usage() after stream exhaustion). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e [greptile]
The sync harness_langgraph tutorial set turn_span.output to
{"final_output": turn.usage().model_dump()} — token metrics under a key that
means the assistant's text, producing misleading AGENT_WORKFLOW trace data
versus the async tutorial. Accumulate text deltas during the yield loop (as the
030_langgraph tutorial does) and store {"final_output": final_text, "usage":
...} so the final output is the text and usage stays available under its own key.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…nt [greptile] The sync converter opened a reasoning stream with a StreamTaskMessageStart wrapping TextContent, even though the deltas are ReasoningContentDelta — so the Start's content type contradicted its payload (and downstream span/UI handling of reasoning). Emit ReasoningContent for the reasoning Start. Add a test asserting the reasoning block opens a ReasoningContent Start + ReasoningContentDelta. Also refresh the stale test_harness_langgraph_sync.py module docstring: it claimed the SpanDeriver does NOT produce tool spans for LangGraph Full events, but the foundation now opens/closes tool spans from Full(ToolRequest/Response) (asserted by test_tracer_produces_tool_spans_for_full_events). Note: the "TurnResult.usage pre-iteration snapshot" item is already resolved at root — UnifiedEmitter.auto_send_turn reads turn.usage() after auto_send drains the stream. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- _langgraph_sync.py: reasoning StreamTaskMessageStart now sets style="active". The AgentEx server's StreamTaskMessageStartEntity rejects reasoning.style=None (enum), which killed the sync stream — breaking harness_langgraph and the pre-existing 030_langgraph that share this emitter. - temporal harness tools.py: give get_weather a native async coroutine so tools_node's `await tool.ainvoke(...)` runs on the workflow loop instead of LangChain's run_in_executor fallback (NotImplementedError in the deterministic Temporal workflow sandbox). - test_langgraph_sync.py: assert the reasoning Start carries a non-null style. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
cb03bbc to
4ceb1cf
Compare
The shared TurnUsage.num_llm_calls became Optional (None = "not reported") when it landed on next. The openai / pydantic-ai turn tests still asserted == 0 for the default (no-usage / pre-exhaustion) construction path, which now yields None — matching the token fields. Update those three assertions to `is None`; real-zero cases (provider reported 0) are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
Migrates the LangGraph harness onto the unified harness surface introduced in PR 4 (pydantic-ai). Implements 12 tasks covering the new
LangGraphTurnadapter, bespoke helper rewrites, offline integration tests, conformance fixtures, tutorial agents, and CI matrix.New surface:
Key implementation points:
LangGraphTurnwraps LangGraphastream()and implementsHarnessTurn(tasks 1-2)stream_langgraph_eventsreimplemented onUnifiedEmitter(task 4)_langgraph_tracing.pycreate_langgraph_tracing_handlermarked deprecated withwarnings.warn(DeprecationWarning)(task 3)StreamTaskMessageFull(not Start+Delta+Done);SpanDeriverdoes not produce tool spans from Full events today (tracked in AGX1-373)LangGraphTurn.usage()is populated viaon_final_ai_messagecallback during event iteration;TurnResult.usageis a pre-iteration snapshot — callers should readturn.usage()afterauto_send_turnreturnsAsyncGeneratorreturn type annotation toconvert_langgraph_to_agentex_eventsand_generate_eventsto fix pyright inference (was treating them as coroutines)Tests added (tasks 5-8, 219 passing):
test_langgraph_sync.py: 11 unit tests forconvert_langgraph_to_agentex_events+ deprecationtest_langgraph_turn.py: 19 unit tests forLangGraphTurn+langgraph_usage_to_turn_usagetest_langgraph_async.py: 6 characterization tests for the unifiedstream_langgraph_eventstest_langgraph_sync_unified.py: 6 passthrough + span derivation teststest_langgraph_conformance.py: 4 conformance fixtures (text-only, single-tool, reasoning, multi-step)test_harness_langgraph_sync.py: 6 offline integration tests (yield channel)test_harness_langgraph_async.py: 7 offline integration tests (auto_send channel)test_harness_langgraph_temporal.py: 5 offline integration tests (temporal channel)Tutorial agents (task 9):
examples/tutorials/00_sync/harness_langgraph/(s-harness-langgraph) — sync, yield_turnexamples/tutorials/10_async/00_base/harness_langgraph/(a-harness-langgraph) — async, auto_send_turnexamples/tutorials/10_async/10_temporal/harness_langgraph/(at-harness-langgraph) — temporal, LangGraphPlugin + emit_langgraph_messagesCI (task 10): Enabled
live-matrixjob inharness-integration.ymlwith 3-way matrix over[sync, async, temporal]running offline LangGraph integration tests.Test plan
uv run --all-packages --all-extras pytest tests/lib/core/harness/ tests/lib/adk/ -v— 219 passed./scripts/lint— 0 errors, 0 warnings (ruff + pyright)🤖 Generated with Claude Code
Greptile Summary
Migrates the LangGraph harness onto the unified
LangGraphTurn+UnifiedEmittersurface introduced for pydantic-ai, eliminating ~165 lines of bespoke Redis-streaming code and aligning all three channels (sync HTTP yield, async Redis, Temporal) behind the same adapter pattern.LangGraphTurn(new): wrapsgraph.astream()and accumulates token usage additively across multi-step LLM calls via_accumulate_turn_usage, fixing the silent per-call overwrite from the old bespoke path._langgraph_sync.py: addson_final_ai_messagecallback for lazy usage capture and fixes the reasoning-blockStreamTaskMessageStartcontent type fromTextContenttoReasoningContent(matching the conformance fixture and downstream schema expectations).[pydantic_ai, langgraph] × [sync, async, temporal]with 219 new offline tests covering unit, integration, and conformance layers.Confidence Score: 5/5
Safe to merge — core logic is correct, all three channels work, and the one reasoning-content-type fix is guarded by a new assertion in the test suite.
The implementation is well-structured: bespoke Redis code is cleanly replaced, multi-step usage now accumulates additively instead of overwriting, and the reasoning-block content-type fix is confirmed by a targeted test. The only finding is a stale docstring claiming SpanDeriver does not handle Full tool events, while the tests in this same PR assert it does — a documentation inconsistency that does not affect runtime behavior.
The module docstring in tests/lib/core/harness/test_harness_langgraph_async.py and both acp.py tutorial files carry a stale claim about SpanDeriver not producing tool spans from Full events; those comments should be updated before they mislead future contributors.
Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant Agent as Agent ACP Handler participant LGT as LangGraphTurn participant Conv as convert_langgraph_to_agentex_events participant UE as UnifiedEmitter participant SD as SpanDeriver participant Backend as Streaming Backend (Redis / HTTP) Agent->>LGT: LangGraphTurn(graph.astream(), model) Agent->>UE: yield_turn(turn) OR auto_send_turn(turn) UE->>LGT: iterate turn.events LGT->>Conv: "convert_langgraph_to_agentex_events(stream, on_final_ai_message=_capture)" loop LangGraph messages events Conv-->>LGT: StreamTaskMessageStart / Delta / Done (text or reasoning) LGT-->>UE: event UE->>SD: derive span signal UE-->>Backend: stream text delta end loop LangGraph updates events Conv-->>LGT: StreamTaskMessageFull(ToolRequestContent) LGT-->>UE: Full event UE->>SD: open tool span UE-->>Backend: Full message Conv->>Conv: on_final_ai_message(_capture) accumulate TurnUsage Conv-->>LGT: StreamTaskMessageFull(ToolResponseContent) LGT-->>UE: Full event UE->>SD: close tool span UE-->>Backend: Full message end UE-->>Agent: TurnResult(final_text) / yield events Agent->>LGT: turn.usage() accumulated TurnUsage%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant Agent as Agent ACP Handler participant LGT as LangGraphTurn participant Conv as convert_langgraph_to_agentex_events participant UE as UnifiedEmitter participant SD as SpanDeriver participant Backend as Streaming Backend (Redis / HTTP) Agent->>LGT: LangGraphTurn(graph.astream(), model) Agent->>UE: yield_turn(turn) OR auto_send_turn(turn) UE->>LGT: iterate turn.events LGT->>Conv: "convert_langgraph_to_agentex_events(stream, on_final_ai_message=_capture)" loop LangGraph messages events Conv-->>LGT: StreamTaskMessageStart / Delta / Done (text or reasoning) LGT-->>UE: event UE->>SD: derive span signal UE-->>Backend: stream text delta end loop LangGraph updates events Conv-->>LGT: StreamTaskMessageFull(ToolRequestContent) LGT-->>UE: Full event UE->>SD: open tool span UE-->>Backend: Full message Conv->>Conv: on_final_ai_message(_capture) accumulate TurnUsage Conv-->>LGT: StreamTaskMessageFull(ToolResponseContent) LGT-->>UE: Full event UE->>SD: close tool span UE-->>Backend: Full message end UE-->>Agent: TurnResult(final_text) / yield events Agent->>LGT: turn.usage() accumulated TurnUsageComments Outside Diff (1)
src/agentex/lib/adk/_modules/_langgraph_sync.py, line 147-153 (link)StreamTaskMessageStartuses wrong content typeWhen a reasoning model emits a block of type
"reasoning", the code opens the stream withTextContent(type="text", ...)instead ofReasoningContent. Downstream consumers that dispatch oncontent.type(e.g. rendering pipelines, theSpanDerivertext-span logic) will receive aTextContentwrapper for what is actually a reasoning block, then see aReasoningContentDeltaarrive — a type mismatch that will confuse or break those consumers.ReasoningContentis also not imported in this file, confirming the intended type was never used. The conformance fixture_REASONINGcorrectly showsReasoningContentas the expected start content, but it constructs the events by hand and never runs them through the actual converter, so no test catches this today.Prompt To Fix With AI
Reviews (13): Last reviewed commit: "test(harness): num_llm_calls is None (no..." | Re-trigger Greptile