Skip to content

Commit d344228

Browse files
authored
feat(langgraph): migrate LangGraph harness onto unified surface (#417)
1 parent 904339c commit d344228

44 files changed

Lines changed: 3897 additions & 188 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/harness-integration.yml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ on:
88
- "src/agentex/lib/core/harness/**"
99
- "src/agentex/lib/adk/_modules/**"
1010
- "tests/lib/core/harness/test_harness_pydantic_ai_*.py"
11+
- "tests/lib/core/harness/test_harness_langgraph_*.py"
1112
- ".github/workflows/harness-integration.yml"
1213

1314
jobs:
@@ -32,17 +33,18 @@ jobs:
3233
- name: Conformance suite
3334
run: ./scripts/test tests/lib/core/harness/ -v
3435

35-
# Offline pydantic-ai integration tests (sync / async / temporal channels).
36-
# These use pydantic-ai TestModel + fake streaming/tracing and require no live
37-
# infrastructure. Enabled here for PR 4 (pydantic-ai migration). Future harness
38-
# migration PRs (5-8) should add their integration-test paths to this matrix.
36+
# Offline harness integration tests (sync / async / temporal channels) for each
37+
# migrated harness. These use fake streams / TestModel + fake streaming/tracing
38+
# and require no live infrastructure. Future harness migration PRs (6-8) add
39+
# their harness to the matrix below and their test paths to the triggers above.
3940
live-matrix:
4041
runs-on: ubuntu-latest
4142
strategy:
4243
matrix:
44+
harness: [pydantic_ai, langgraph]
4345
channel: [sync, async, temporal]
4446
fail-fast: false
45-
name: pydantic-ai-${{ matrix.channel }}
47+
name: ${{ matrix.harness }}-${{ matrix.channel }}
4648
steps:
4749
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
4850

@@ -54,6 +56,6 @@ jobs:
5456
- name: Bootstrap
5557
run: ./scripts/bootstrap
5658

57-
- name: pydantic-ai ${{ matrix.channel }} integration tests (offline, TestModel)
59+
- name: ${{ matrix.harness }} ${{ matrix.channel }} integration tests (offline)
5860
run: |
59-
./scripts/test tests/lib/core/harness/test_harness_pydantic_ai_${{ matrix.channel }}.py -v
61+
./scripts/test tests/lib/core/harness/test_harness_${{ matrix.harness }}_${{ matrix.channel }}.py -v
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# syntax=docker/dockerfile:1.3
2+
FROM python:3.12-slim
3+
COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/
4+
5+
# Install system dependencies
6+
RUN apt-get update && apt-get install -y \
7+
htop \
8+
vim \
9+
curl \
10+
tar \
11+
python3-dev \
12+
postgresql-client \
13+
build-essential \
14+
libpq-dev \
15+
gcc \
16+
cmake \
17+
netcat-openbsd \
18+
&& apt-get clean \
19+
&& rm -rf /var/lib/apt/lists/*
20+
21+
RUN uv pip install --system --upgrade pip setuptools wheel
22+
23+
ENV UV_HTTP_TIMEOUT=1000
24+
25+
# Copy pyproject.toml and README.md to install dependencies
26+
COPY 00_sync/harness_langgraph/pyproject.toml /app/harness_langgraph/pyproject.toml
27+
COPY 00_sync/harness_langgraph/README.md /app/harness_langgraph/README.md
28+
29+
WORKDIR /app/harness_langgraph
30+
31+
# Copy the project code
32+
COPY 00_sync/harness_langgraph/project /app/harness_langgraph/project
33+
34+
# Copy the test files
35+
COPY 00_sync/harness_langgraph/tests /app/harness_langgraph/tests
36+
37+
# Copy shared test utilities
38+
COPY test_utils /app/test_utils
39+
40+
# Install the required Python packages with dev dependencies
41+
RUN uv pip install --system .[dev]
42+
43+
# Set environment variables
44+
ENV PYTHONPATH=/app
45+
46+
# Set test environment variables
47+
ENV AGENT_NAME=s-harness-langgraph
48+
49+
# Run the agent using uvicorn
50+
CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"]
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Tutorial: Sync Harness LangGraph Agent
2+
3+
This tutorial demonstrates how to build a **synchronous** LangGraph agent on AgentEx
4+
using the **unified harness surface**:
5+
6+
```python
7+
turn = LangGraphTurn(stream, model=None)
8+
emitter = UnifiedEmitter(task_id=task_id, trace_id=task_id, ...)
9+
async for event in emitter.yield_turn(turn):
10+
yield event
11+
```
12+
13+
Compare with ``030_langgraph``, which uses the bespoke
14+
``convert_langgraph_to_agentex_events`` helper directly.
15+
16+
## Key Concepts
17+
18+
### Unified Harness
19+
20+
`LangGraphTurn` implements the `HarnessTurn` protocol: it wraps the raw
21+
LangGraph `astream()` generator and exposes `events` (an async generator of
22+
`TaskMessageUpdate`) and `usage()` (token counts captured from the final
23+
`AIMessage`).
24+
25+
`UnifiedEmitter.yield_turn(turn)` iterates the turn's events and yields them
26+
to the sync ACP handler unchanged. The same `LangGraphTurn` object can also be
27+
passed to `UnifiedEmitter.auto_send_turn` in the async/temporal channels.
28+
29+
### AGX1-377 Note
30+
31+
LangGraph emits tool requests as `StreamTaskMessageFull` events (from "updates"
32+
node outputs). The `SpanDeriver` does not open tool spans from Full events
33+
today; that gap is tracked in AGX1-373.
34+
35+
## Files
36+
37+
| File | Description |
38+
|------|-------------|
39+
| `project/acp.py` | ACP server using unified harness (LangGraphTurn + yield_turn) |
40+
| `project/graph.py` | LangGraph state graph (identical to 030_langgraph) |
41+
| `project/tools.py` | Tool definitions (weather example) |
42+
| `tests/test_agent.py` | Integration tests |
43+
| `manifest.yaml` | Agent configuration (name: s-harness-langgraph) |
44+
45+
## Running Locally
46+
47+
```bash
48+
agentex agents run
49+
```
50+
51+
## Running Tests
52+
53+
```bash
54+
pytest tests/test_agent.py -v
55+
```
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
build:
2+
context:
3+
root: ../../
4+
include_paths:
5+
- 00_sync/harness_langgraph
6+
- test_utils
7+
dockerfile: 00_sync/harness_langgraph/Dockerfile
8+
dockerignore: 00_sync/harness_langgraph/.dockerignore
9+
10+
local_development:
11+
agent:
12+
port: 8000
13+
host_address: host.docker.internal
14+
paths:
15+
acp: project/acp.py
16+
17+
agent:
18+
acp_type: sync
19+
name: s-harness-langgraph
20+
description: A sync LangGraph agent using the unified harness surface (LangGraphTurn + UnifiedEmitter.yield_turn)
21+
22+
temporal:
23+
enabled: false
24+
25+
credentials:
26+
- env_var_name: OPENAI_API_KEY
27+
secret_name: openai-api-key
28+
secret_key: api-key
29+
- env_var_name: REDIS_URL
30+
secret_name: redis-url-secret
31+
secret_key: url
32+
- env_var_name: SGP_API_KEY
33+
secret_name: sgp-api-key
34+
secret_key: api-key
35+
- env_var_name: SGP_ACCOUNT_ID
36+
secret_name: sgp-account-id
37+
secret_key: account-id
38+
- env_var_name: SGP_CLIENT_BASE_URL
39+
secret_name: sgp-client-base-url
40+
secret_key: url
41+
42+
deployment:
43+
image:
44+
repository: ""
45+
tag: "latest"
46+
47+
global:
48+
agent:
49+
name: "s-harness-langgraph"
50+
description: "A sync LangGraph agent using the unified harness surface"
51+
replicaCount: 1
52+
resources:
53+
requests:
54+
cpu: "500m"
55+
memory: "1Gi"
56+
limits:
57+
cpu: "1000m"
58+
memory: "2Gi"

examples/tutorials/00_sync/harness_langgraph/project/__init__.py

Whitespace-only changes.
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""ACP handler for sync harness LangGraph agent.
2+
3+
Uses the unified harness surface: ``LangGraphTurn`` wraps the LangGraph
4+
``astream()`` generator, and ``UnifiedEmitter.yield_turn`` converts it into
5+
the AgentEx ``TaskMessageUpdate`` event stream expected by the sync ACP.
6+
7+
Differences from ``030_langgraph`` (bespoke path):
8+
- No ``create_langgraph_tracing_handler`` boilerplate.
9+
- No manual text-delta accumulation for the span output.
10+
- Tool calls are emitted as ``StreamTaskMessageFull`` (not Start+Delta+Done)
11+
via the same code path as the async/temporal channels.
12+
- Usage data (token counts) is captured on the ``LangGraphTurn`` object and
13+
can be read after the turn completes.
14+
15+
AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull``
16+
events (from "updates"). The ``SpanDeriver`` does not open tool spans from
17+
Full events today; that gap is tracked in AGX1-373.
18+
"""
19+
20+
from __future__ import annotations
21+
22+
import os
23+
from typing import AsyncGenerator
24+
25+
from dotenv import load_dotenv
26+
27+
load_dotenv()
28+
29+
import agentex.lib.adk as adk
30+
from project.graph import create_graph
31+
from agentex.lib.types.acp import SendMessageParams
32+
from agentex.lib.types.tracing import SGPTracingProcessorConfig
33+
from agentex.lib.utils.logging import make_logger
34+
from agentex.lib.sdk.fastacp.fastacp import FastACP
35+
from agentex.lib.core.harness.emitter import UnifiedEmitter
36+
from agentex.types.task_message_delta import TextDelta
37+
from agentex.types.task_message_update import TaskMessageUpdate
38+
from agentex.types.task_message_content import TaskMessageContent
39+
from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn
40+
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config
41+
42+
logger = make_logger(__name__)
43+
44+
add_tracing_processor_config(
45+
SGPTracingProcessorConfig(
46+
sgp_api_key=os.environ.get("SGP_API_KEY", ""),
47+
sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""),
48+
sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""),
49+
)
50+
)
51+
52+
acp = FastACP.create(acp_type="sync")
53+
54+
_graph = None
55+
56+
57+
async def get_graph():
58+
"""Get or create the compiled graph instance."""
59+
global _graph
60+
if _graph is None:
61+
_graph = await create_graph()
62+
return _graph
63+
64+
65+
@acp.on_message_send
66+
async def handle_message_send(
67+
params: SendMessageParams,
68+
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
69+
"""Handle incoming messages, streaming tokens and tool calls via unified harness."""
70+
graph = await get_graph()
71+
72+
task_id = params.task.id
73+
user_message = params.content.content
74+
75+
logger.info(f"Processing message for task {task_id}")
76+
77+
async with adk.tracing.span(
78+
trace_id=task_id,
79+
task_id=task_id,
80+
name="message",
81+
input={"message": user_message},
82+
data={"__span_type__": "AGENT_WORKFLOW"},
83+
) as turn_span:
84+
stream = graph.astream(
85+
{"messages": [{"role": "user", "content": user_message}]},
86+
config={"configurable": {"thread_id": task_id}},
87+
stream_mode=["messages", "updates"],
88+
)
89+
90+
turn = LangGraphTurn(stream, model=None)
91+
emitter = UnifiedEmitter(
92+
task_id=task_id,
93+
trace_id=task_id,
94+
parent_span_id=turn_span.id if turn_span else None,
95+
)
96+
97+
final_text = ""
98+
async for event in emitter.yield_turn(turn):
99+
# Accumulate text deltas so the span's final_output is the assistant
100+
# text (matching the async tutorial), not the usage metrics.
101+
delta = getattr(event, "delta", None)
102+
if isinstance(delta, TextDelta) and delta.text_delta:
103+
final_text += delta.text_delta
104+
yield event
105+
106+
if turn_span:
107+
turn_span.output = {"final_output": final_text, "usage": turn.usage().model_dump()}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""LangGraph graph definition for the harness_langgraph sync agent.
2+
3+
Identical to ``030_langgraph/project/graph.py`` — the graph definition is not
4+
affected by the harness migration. Only ``acp.py`` changes.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from typing import Any, Annotated
10+
from datetime import datetime
11+
from typing_extensions import TypedDict
12+
13+
from langgraph.graph import START, StateGraph
14+
from langchain_openai import ChatOpenAI
15+
from langgraph.prebuilt import ToolNode, tools_condition
16+
from langchain_core.messages import SystemMessage
17+
from langgraph.graph.message import add_messages
18+
19+
from project.tools import TOOLS
20+
from agentex.lib.adk import create_checkpointer
21+
22+
MODEL_NAME = "gpt-5"
23+
SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools.
24+
25+
Current date and time: {timestamp}
26+
27+
Guidelines:
28+
- Be concise and helpful
29+
- Use tools when they would help answer the user's question
30+
- If you're unsure, ask clarifying questions
31+
- Always provide accurate information
32+
"""
33+
34+
35+
class AgentState(TypedDict):
36+
"""State schema for the agent graph."""
37+
38+
messages: Annotated[list[Any], add_messages]
39+
40+
41+
async def create_graph():
42+
"""Create and compile the agent graph with checkpointer."""
43+
llm = ChatOpenAI(
44+
model=MODEL_NAME,
45+
reasoning={"effort": "high", "summary": "auto"},
46+
)
47+
llm_with_tools = llm.bind_tools(TOOLS)
48+
49+
checkpointer = await create_checkpointer()
50+
51+
def agent_node(state: AgentState) -> dict[str, Any]:
52+
"""Process the current state and generate a response."""
53+
messages = state["messages"]
54+
if not messages or not isinstance(messages[0], SystemMessage):
55+
system_content = SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
56+
messages = [SystemMessage(content=system_content)] + messages
57+
response = llm_with_tools.invoke(messages)
58+
return {"messages": [response]}
59+
60+
builder = StateGraph(AgentState)
61+
builder.add_node("agent", agent_node)
62+
builder.add_node("tools", ToolNode(tools=TOOLS))
63+
builder.add_edge(START, "agent")
64+
builder.add_conditional_edges("agent", tools_condition, "tools")
65+
builder.add_edge("tools", "agent")
66+
67+
return builder.compile(checkpointer=checkpointer)

0 commit comments

Comments
 (0)