Skip to content

Commit 4cbc70d

Browse files
declan-scaleclaude
andcommitted
test(langgraph): offline integration tests for sync, async, and temporal channels
Adds 18 offline integration tests across the three delivery channels using fake LangGraph event streams and fake streaming backends. Documents the AGX1-377 behavior (Full events don't produce tool spans). Notes the usage capture timing: turn.usage() is the authoritative post-iteration value since auto_send_turn evaluates usage eagerly before events are consumed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent db84cf5 commit 4cbc70d

3 files changed

Lines changed: 744 additions & 0 deletions

File tree

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
"""Integration test: async (Redis-streaming) channel with a LangGraph agent.
2+
3+
Exercises the unified harness surface (UnifiedEmitter.auto_send_turn + LangGraphTurn)
4+
with a minimal fake LangGraph stream so the test runs fully offline (no API
5+
keys, no Redis, no Agentex server).
6+
7+
Agent description
8+
-----------------
9+
A simulated single-tool agent run using hand-crafted LangGraph event tuples:
10+
one tool request + response, followed by a final text reply.
11+
12+
What is tested
13+
--------------
14+
- The async handler pushes the correct sequence of messages to the fake streaming
15+
backend: Full(ToolRequest) + Full(ToolResponse) + text Start/Delta/Done.
16+
- final_text accumulates all text (not just last segment — AGX1-377 unified behavior).
17+
- Tool messages go through streaming_task_message_context (not messages.create).
18+
- With a SpanTracer, no tool spans are produced (AGX1-377: Full events are not
19+
handled by SpanDeriver today).
20+
21+
What is NOT covered without live infrastructure
22+
-----------------------------------------------
23+
- Actual Redis streaming (requires a running Redis instance).
24+
- The ACP on_task_event_send / on_task_create / on_task_cancel lifecycle.
25+
- Real LLM calls or real LangGraph graph execution.
26+
- The full FastACP async request lifecycle.
27+
28+
See also: test_harness_langgraph_sync.py and test_harness_langgraph_temporal.py
29+
for the other two channels.
30+
"""
31+
32+
from __future__ import annotations
33+
34+
import sys
35+
from typing import Any
36+
from dataclasses import field, dataclass
37+
38+
import pytest
39+
40+
from agentex.types.task_message import TaskMessage
41+
from agentex.types.text_content import TextContent
42+
from agentex.lib.core.harness.types import TurnResult
43+
from agentex.lib.core.harness.tracer import SpanTracer
44+
from agentex.lib.core.harness.emitter import UnifiedEmitter
45+
from agentex.types.tool_request_content import ToolRequestContent
46+
from agentex.types.tool_response_content import ToolResponseContent
47+
from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn
48+
49+
# ---------------------------------------------------------------------------
50+
# Remove conftest stubs so real langchain_core types are used
51+
# ---------------------------------------------------------------------------
52+
53+
54+
@pytest.fixture(autouse=True)
55+
def _real_langchain_core():
56+
stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")]
57+
saved = {k: sys.modules.pop(k) for k in stub_keys}
58+
import importlib
59+
60+
importlib.import_module("langchain_core.messages")
61+
yield
62+
sys.modules.update(saved)
63+
64+
65+
# ---------------------------------------------------------------------------
66+
# Fake streaming backend (replaces adk.streaming; no Redis required)
67+
# ---------------------------------------------------------------------------
68+
69+
70+
@dataclass
71+
class _FakeCtx:
72+
ctype: str
73+
initial_content: Any
74+
task_message: TaskMessage
75+
closed: bool = False
76+
deltas: list[Any] = field(default_factory=list)
77+
78+
async def __aenter__(self) -> "_FakeCtx":
79+
return self
80+
81+
async def __aexit__(self, *args: Any) -> bool:
82+
await self.close()
83+
return False
84+
85+
async def close(self) -> None:
86+
self.closed = True
87+
88+
async def stream_update(self, update: Any) -> Any:
89+
self.deltas.append(update)
90+
return update
91+
92+
93+
class _FakeStreaming:
94+
def __init__(self) -> None:
95+
self.contexts: list[_FakeCtx] = []
96+
97+
def streaming_task_message_context(self, task_id: str, initial_content: Any, **kw: Any) -> _FakeCtx:
98+
ctype = getattr(initial_content, "type", None) or ""
99+
tm = TaskMessage(id=f"m{len(self.contexts) + 1}", task_id=task_id, content=initial_content)
100+
ctx = _FakeCtx(ctype=ctype, initial_content=initial_content, task_message=tm)
101+
self.contexts.append(ctx)
102+
return ctx
103+
104+
105+
# ---------------------------------------------------------------------------
106+
# Fake tracing backend
107+
# ---------------------------------------------------------------------------
108+
109+
110+
class _FakeSpan:
111+
def __init__(self, name: str) -> None:
112+
self.name = name
113+
self.output: Any = None
114+
115+
116+
class _FakeTracing:
117+
def __init__(self) -> None:
118+
self.started: list[tuple[str, Any]] = []
119+
self.ended: list[tuple[str, Any]] = []
120+
121+
async def start_span(self, *, trace_id: str, name: str, **kw: Any) -> _FakeSpan:
122+
self.started.append((name, kw.get("parent_id")))
123+
return _FakeSpan(name)
124+
125+
async def end_span(self, *, trace_id: str, span: _FakeSpan) -> None:
126+
self.ended.append((span.name, span.output))
127+
128+
129+
# ---------------------------------------------------------------------------
130+
# Helpers
131+
# ---------------------------------------------------------------------------
132+
133+
134+
def _make_stream(events: list[tuple[str, Any]]):
135+
async def _gen():
136+
for e in events:
137+
yield e
138+
139+
return _gen()
140+
141+
142+
async def _run_auto_send_turn(
143+
stream_events: list[tuple[str, Any]],
144+
trace_id: str | None = None,
145+
) -> tuple[TurnResult, _FakeStreaming, _FakeTracing | None]:
146+
fake_streaming = _FakeStreaming()
147+
fake_tracing = _FakeTracing() if trace_id else None
148+
149+
tracer: SpanTracer | bool = False
150+
if trace_id and fake_tracing is not None:
151+
tracer = SpanTracer(trace_id=trace_id, parent_span_id=None, task_id="task1", tracing=fake_tracing)
152+
153+
turn = LangGraphTurn(_make_stream(stream_events), model=None)
154+
emitter = UnifiedEmitter(
155+
task_id="task1",
156+
trace_id=trace_id,
157+
parent_span_id=None,
158+
tracer=tracer,
159+
streaming=fake_streaming,
160+
)
161+
result = await emitter.auto_send_turn(turn)
162+
return result, fake_streaming, fake_tracing
163+
164+
165+
# ---------------------------------------------------------------------------
166+
# Tests
167+
# ---------------------------------------------------------------------------
168+
169+
170+
class TestAsyncAutoSendChannel:
171+
async def test_text_only_streams_text_and_returns_final(self):
172+
from langchain_core.messages import AIMessage, AIMessageChunk
173+
174+
chunk = AIMessageChunk(content="Hello from LangGraph!")
175+
ai_msg = AIMessage(content="Hello from LangGraph!")
176+
events = [
177+
("messages", (chunk, {})),
178+
("updates", {"agent": {"messages": [ai_msg]}}),
179+
]
180+
result, fake_streaming, _ = await _run_auto_send_turn(events)
181+
182+
assert result.final_text == "Hello from LangGraph!"
183+
text_ctxs = [c for c in fake_streaming.contexts if c.ctype == "text"]
184+
assert len(text_ctxs) == 1
185+
assert text_ctxs[0].closed is True
186+
187+
async def test_tool_call_posted_via_streaming_context(self):
188+
from langchain_core.messages import AIMessage
189+
190+
tc = {"id": "call_1", "name": "get_weather", "args": {"city": "Paris"}}
191+
ai_msg = AIMessage(content="", tool_calls=[tc])
192+
events = [("updates", {"agent": {"messages": [ai_msg]}})]
193+
194+
result, fake_streaming, _ = await _run_auto_send_turn(events)
195+
196+
# Tool request via streaming_task_message_context (Full event)
197+
tool_req_ctxs = [c for c in fake_streaming.contexts if isinstance(c.initial_content, ToolRequestContent)]
198+
assert len(tool_req_ctxs) == 1
199+
assert tool_req_ctxs[0].initial_content.tool_call_id == "call_1"
200+
assert tool_req_ctxs[0].closed is True
201+
assert tool_req_ctxs[0].deltas == [], "Full messages have no deltas"
202+
203+
async def test_tool_response_posted_via_streaming_context(self):
204+
from langchain_core.messages import ToolMessage
205+
206+
tool_msg = ToolMessage(content="Sunny, 72F", tool_call_id="call_1", name="get_weather")
207+
events = [("updates", {"tools": {"messages": [tool_msg]}})]
208+
209+
_, fake_streaming, _ = await _run_auto_send_turn(events)
210+
211+
tool_resp_ctxs = [c for c in fake_streaming.contexts if isinstance(c.initial_content, ToolResponseContent)]
212+
assert len(tool_resp_ctxs) == 1
213+
assert tool_resp_ctxs[0].initial_content.content == "Sunny, 72F"
214+
assert tool_resp_ctxs[0].closed is True
215+
216+
async def test_multi_step_accumulates_all_text(self):
217+
"""Unified surface: final_text accumulates all text, not just last segment."""
218+
from langchain_core.messages import AIMessage, ToolMessage, AIMessageChunk
219+
220+
chunk1 = AIMessageChunk(content="Searching...")
221+
ai_msg1 = AIMessage(content="Searching...", tool_calls=[{"id": "c1", "name": "s", "args": {}}])
222+
tool_msg = ToolMessage(content="results", tool_call_id="c1", name="s")
223+
chunk2 = AIMessageChunk(content="Found it!")
224+
ai_msg2 = AIMessage(content="Found it!")
225+
226+
events = [
227+
("messages", (chunk1, {})),
228+
("updates", {"agent": {"messages": [ai_msg1]}}),
229+
("updates", {"tools": {"messages": [tool_msg]}}),
230+
("messages", (chunk2, {})),
231+
("updates", {"agent": {"messages": [ai_msg2]}}),
232+
]
233+
result, fake_streaming, _ = await _run_auto_send_turn(events)
234+
235+
# All text accumulated
236+
assert "Searching..." in result.final_text
237+
assert "Found it!" in result.final_text
238+
239+
# Two text streaming contexts
240+
text_ctxs = [c for c in fake_streaming.contexts if isinstance(c.initial_content, TextContent)]
241+
assert len(text_ctxs) == 2
242+
243+
async def test_empty_stream_returns_empty_final_text(self):
244+
result, fake_streaming, _ = await _run_auto_send_turn([])
245+
assert result.final_text == ""
246+
assert fake_streaming.contexts == []
247+
248+
async def test_turn_usage_populated_after_events_consumed(self):
249+
"""LangGraphTurn.usage() is populated via the on_final_ai_message callback
250+
during event iteration. TurnResult.usage is a snapshot from before events run
251+
(emitter.auto_send_turn evaluates turn.usage() eagerly); the authoritative
252+
post-iteration usage is on turn.usage() directly."""
253+
from langchain_core.messages import AIMessage
254+
255+
fake_streaming = _FakeStreaming()
256+
usage_meta = {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}
257+
ai_msg = AIMessage(content="hi", usage_metadata=usage_meta)
258+
events = [("updates", {"agent": {"messages": [ai_msg]}})]
259+
260+
turn = LangGraphTurn(_make_stream(events), model="gpt-4")
261+
emitter = UnifiedEmitter(
262+
task_id="task1", trace_id=None, parent_span_id=None, tracer=False, streaming=fake_streaming
263+
)
264+
await emitter.auto_send_turn(turn)
265+
266+
# After auto_send_turn, turn.usage() has the captured values
267+
usage = turn.usage()
268+
assert usage.input_tokens == 10
269+
assert usage.output_tokens == 5
270+
assert usage.total_tokens == 15
271+
272+
async def test_tracer_does_not_produce_tool_spans_for_full_events(self):
273+
"""AGX1-377: Full events don't trigger SpanDeriver tool spans."""
274+
from langchain_core.messages import AIMessage, ToolMessage
275+
276+
tc = {"id": "c1", "name": "t", "args": {}}
277+
ai_msg = AIMessage(content="", tool_calls=[tc])
278+
tool_msg = ToolMessage(content="ok", tool_call_id="c1", name="t")
279+
280+
events = [
281+
("updates", {"agent": {"messages": [ai_msg]}}),
282+
("updates", {"tools": {"messages": [tool_msg]}}),
283+
]
284+
_, _, fake_tracing = await _run_auto_send_turn(events, trace_id="trace-1")
285+
286+
assert fake_tracing is not None
287+
assert fake_tracing.started == [], "AGX1-377: Full events don't trigger tool spans"

0 commit comments

Comments
 (0)