Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
0e57601
feat(langgraph): optional on_final_ai_message callback for usage capture
declan-scale Jun 18, 2026
3195e52
feat(langgraph): LangGraphTurn + langgraph_usage_to_turn_usage
declan-scale Jun 18, 2026
7991c0a
test(langgraph): characterization tests for stream_langgraph_events (…
declan-scale Jun 18, 2026
453d2de
refactor(langgraph): reimplement stream_langgraph_events on unified s…
declan-scale Jun 18, 2026
f5e30ae
test(langgraph): unified sync path — passthrough and span derivation
declan-scale Jun 18, 2026
2080d95
test(langgraph): 4 conformance fixtures (text, tool, reasoning, multi…
declan-scale Jun 18, 2026
dc02937
test(langgraph): offline integration tests for sync, async, and tempo…
declan-scale Jun 18, 2026
f55ee13
feat(langgraph): tutorial agents + CI live-matrix + pyright fixes
declan-scale Jun 18, 2026
f481637
fix(langgraph): restore created_at + docstring-only deprecation for t…
declan-scale Jun 18, 2026
c6c5bb8
test(langgraph): adapt conformance test to cross-channel runner (AGX1…
declan-scale Jun 22, 2026
ed9fb63
fix(langgraph): accumulate multi-step usage in LangGraphTurn [greptile]
declan-scale Jun 22, 2026
88a2a2f
fix(langgraph): sync tutorial span output stores final text, not usag…
declan-scale Jun 22, 2026
f02df19
fix(langgraph): reasoning Start emits ReasoningContent, not TextConte…
declan-scale Jun 22, 2026
4ceb1cf
fix(langgraph): reasoning Start needs non-null style + native async tool
declan-scale Jun 22, 2026
dbdb2da
test(harness): num_llm_calls is None (not 0) for no-usage default cases
declan-scale Jun 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions .github/workflows/harness-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
- "src/agentex/lib/core/harness/**"
- "src/agentex/lib/adk/_modules/**"
- "tests/lib/core/harness/test_harness_pydantic_ai_*.py"
- "tests/lib/core/harness/test_harness_langgraph_*.py"
- ".github/workflows/harness-integration.yml"

jobs:
Expand All @@ -32,17 +33,18 @@ jobs:
- name: Conformance suite
run: ./scripts/test tests/lib/core/harness/ -v

# Offline pydantic-ai integration tests (sync / async / temporal channels).
# These use pydantic-ai TestModel + fake streaming/tracing and require no live
# infrastructure. Enabled here for PR 4 (pydantic-ai migration). Future harness
# migration PRs (5-8) should add their integration-test paths to this matrix.
# Offline harness integration tests (sync / async / temporal channels) for each
# migrated harness. These use fake streams / TestModel + fake streaming/tracing
# and require no live infrastructure. Future harness migration PRs (6-8) add
# their harness to the matrix below and their test paths to the triggers above.
live-matrix:
runs-on: ubuntu-latest
strategy:
matrix:
harness: [pydantic_ai, langgraph]
channel: [sync, async, temporal]
fail-fast: false
name: pydantic-ai-${{ matrix.channel }}
name: ${{ matrix.harness }}-${{ matrix.channel }}
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

Expand All @@ -54,6 +56,6 @@ jobs:
- name: Bootstrap
run: ./scripts/bootstrap

- name: pydantic-ai ${{ matrix.channel }} integration tests (offline, TestModel)
- name: ${{ matrix.harness }} ${{ matrix.channel }} integration tests (offline)
run: |
./scripts/test tests/lib/core/harness/test_harness_pydantic_ai_${{ matrix.channel }}.py -v
./scripts/test tests/lib/core/harness/test_harness_${{ matrix.harness }}_${{ matrix.channel }}.py -v
50 changes: 50 additions & 0 deletions examples/tutorials/00_sync/harness_langgraph/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# syntax=docker/dockerfile:1.3
FROM python:3.12-slim
COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/

# Install system dependencies
RUN apt-get update && apt-get install -y \
htop \
vim \
curl \
tar \
python3-dev \
postgresql-client \
build-essential \
libpq-dev \
gcc \
cmake \
netcat-openbsd \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

RUN uv pip install --system --upgrade pip setuptools wheel

ENV UV_HTTP_TIMEOUT=1000

# Copy pyproject.toml and README.md to install dependencies
COPY 00_sync/harness_langgraph/pyproject.toml /app/harness_langgraph/pyproject.toml
COPY 00_sync/harness_langgraph/README.md /app/harness_langgraph/README.md

WORKDIR /app/harness_langgraph

# Copy the project code
COPY 00_sync/harness_langgraph/project /app/harness_langgraph/project

# Copy the test files
COPY 00_sync/harness_langgraph/tests /app/harness_langgraph/tests

# Copy shared test utilities
COPY test_utils /app/test_utils

# Install the required Python packages with dev dependencies
RUN uv pip install --system .[dev]

# Set environment variables
ENV PYTHONPATH=/app

# Set test environment variables
ENV AGENT_NAME=s-harness-langgraph

# Run the agent using uvicorn
CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"]
55 changes: 55 additions & 0 deletions examples/tutorials/00_sync/harness_langgraph/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Tutorial: Sync Harness LangGraph Agent

This tutorial demonstrates how to build a **synchronous** LangGraph agent on AgentEx
using the **unified harness surface**:

```python
turn = LangGraphTurn(stream, model=None)
emitter = UnifiedEmitter(task_id=task_id, trace_id=task_id, ...)
async for event in emitter.yield_turn(turn):
yield event
```

Compare with ``030_langgraph``, which uses the bespoke
``convert_langgraph_to_agentex_events`` helper directly.

## Key Concepts

### Unified Harness

`LangGraphTurn` implements the `HarnessTurn` protocol: it wraps the raw
LangGraph `astream()` generator and exposes `events` (an async generator of
`TaskMessageUpdate`) and `usage()` (token counts captured from the final
`AIMessage`).

`UnifiedEmitter.yield_turn(turn)` iterates the turn's events and yields them
to the sync ACP handler unchanged. The same `LangGraphTurn` object can also be
passed to `UnifiedEmitter.auto_send_turn` in the async/temporal channels.

### AGX1-377 Note

LangGraph emits tool requests as `StreamTaskMessageFull` events (from "updates"
node outputs). The `SpanDeriver` does not open tool spans from Full events
today; that gap is tracked in AGX1-373.

## Files

| File | Description |
|------|-------------|
| `project/acp.py` | ACP server using unified harness (LangGraphTurn + yield_turn) |
| `project/graph.py` | LangGraph state graph (identical to 030_langgraph) |
| `project/tools.py` | Tool definitions (weather example) |
| `tests/test_agent.py` | Integration tests |
| `manifest.yaml` | Agent configuration (name: s-harness-langgraph) |

## Running Locally

```bash
agentex agents run
```

## Running Tests

```bash
pytest tests/test_agent.py -v
```
58 changes: 58 additions & 0 deletions examples/tutorials/00_sync/harness_langgraph/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
build:
context:
root: ../../
include_paths:
- 00_sync/harness_langgraph
- test_utils
dockerfile: 00_sync/harness_langgraph/Dockerfile
dockerignore: 00_sync/harness_langgraph/.dockerignore

local_development:
agent:
port: 8000
host_address: host.docker.internal
paths:
acp: project/acp.py

agent:
acp_type: sync
name: s-harness-langgraph
description: A sync LangGraph agent using the unified harness surface (LangGraphTurn + UnifiedEmitter.yield_turn)

temporal:
enabled: false

credentials:
- env_var_name: OPENAI_API_KEY
secret_name: openai-api-key
secret_key: api-key
- env_var_name: REDIS_URL
secret_name: redis-url-secret
secret_key: url
- env_var_name: SGP_API_KEY
secret_name: sgp-api-key
secret_key: api-key
- env_var_name: SGP_ACCOUNT_ID
secret_name: sgp-account-id
secret_key: account-id
- env_var_name: SGP_CLIENT_BASE_URL
secret_name: sgp-client-base-url
secret_key: url

deployment:
image:
repository: ""
tag: "latest"

global:
agent:
name: "s-harness-langgraph"
description: "A sync LangGraph agent using the unified harness surface"
replicaCount: 1
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1000m"
memory: "2Gi"
Empty file.
107 changes: 107 additions & 0 deletions examples/tutorials/00_sync/harness_langgraph/project/acp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""ACP handler for sync harness LangGraph agent.

Uses the unified harness surface: ``LangGraphTurn`` wraps the LangGraph
``astream()`` generator, and ``UnifiedEmitter.yield_turn`` converts it into
the AgentEx ``TaskMessageUpdate`` event stream expected by the sync ACP.

Differences from ``030_langgraph`` (bespoke path):
- No ``create_langgraph_tracing_handler`` boilerplate.
- No manual text-delta accumulation for the span output.
- Tool calls are emitted as ``StreamTaskMessageFull`` (not Start+Delta+Done)
via the same code path as the async/temporal channels.
- Usage data (token counts) is captured on the ``LangGraphTurn`` object and
can be read after the turn completes.

AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull``
events (from "updates"). The ``SpanDeriver`` does not open tool spans from
Full events today; that gap is tracked in AGX1-373.
"""

from __future__ import annotations

import os
from typing import AsyncGenerator

from dotenv import load_dotenv

load_dotenv()

import agentex.lib.adk as adk
from project.graph import create_graph
from agentex.lib.types.acp import SendMessageParams
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.core.harness.emitter import UnifiedEmitter
from agentex.types.task_message_delta import TextDelta
from agentex.types.task_message_update import TaskMessageUpdate
from agentex.types.task_message_content import TaskMessageContent
from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config

logger = make_logger(__name__)

add_tracing_processor_config(
SGPTracingProcessorConfig(
sgp_api_key=os.environ.get("SGP_API_KEY", ""),
sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""),
sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""),
)
)

acp = FastACP.create(acp_type="sync")

_graph = None


async def get_graph():
"""Get or create the compiled graph instance."""
global _graph
if _graph is None:
_graph = await create_graph()
return _graph


@acp.on_message_send
async def handle_message_send(
params: SendMessageParams,
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
"""Handle incoming messages, streaming tokens and tool calls via unified harness."""
graph = await get_graph()

task_id = params.task.id
user_message = params.content.content

logger.info(f"Processing message for task {task_id}")

async with adk.tracing.span(
trace_id=task_id,
task_id=task_id,
name="message",
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
stream = graph.astream(
{"messages": [{"role": "user", "content": user_message}]},
config={"configurable": {"thread_id": task_id}},
stream_mode=["messages", "updates"],
)

turn = LangGraphTurn(stream, model=None)
emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
)

final_text = ""
async for event in emitter.yield_turn(turn):
# Accumulate text deltas so the span's final_output is the assistant
# text (matching the async tutorial), not the usage metrics.
delta = getattr(event, "delta", None)
if isinstance(delta, TextDelta) and delta.text_delta:
final_text += delta.text_delta
yield event

if turn_span:
turn_span.output = {"final_output": final_text, "usage": turn.usage().model_dump()}
67 changes: 67 additions & 0 deletions examples/tutorials/00_sync/harness_langgraph/project/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""LangGraph graph definition for the harness_langgraph sync agent.

Identical to ``030_langgraph/project/graph.py`` — the graph definition is not
affected by the harness migration. Only ``acp.py`` changes.
"""

from __future__ import annotations

from typing import Any, Annotated
from datetime import datetime
from typing_extensions import TypedDict

from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import SystemMessage
from langgraph.graph.message import add_messages

from project.tools import TOOLS
from agentex.lib.adk import create_checkpointer

MODEL_NAME = "gpt-5"
SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools.

Current date and time: {timestamp}

Guidelines:
- Be concise and helpful
- Use tools when they would help answer the user's question
- If you're unsure, ask clarifying questions
- Always provide accurate information
"""


class AgentState(TypedDict):
"""State schema for the agent graph."""

messages: Annotated[list[Any], add_messages]


async def create_graph():
"""Create and compile the agent graph with checkpointer."""
llm = ChatOpenAI(
model=MODEL_NAME,
reasoning={"effort": "high", "summary": "auto"},
)
llm_with_tools = llm.bind_tools(TOOLS)

checkpointer = await create_checkpointer()

def agent_node(state: AgentState) -> dict[str, Any]:
"""Process the current state and generate a response."""
messages = state["messages"]
if not messages or not isinstance(messages[0], SystemMessage):
system_content = SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
messages = [SystemMessage(content=system_content)] + messages
response = llm_with_tools.invoke(messages)
return {"messages": [response]}

builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_node("tools", ToolNode(tools=TOOLS))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition, "tools")
builder.add_edge("tools", "agent")

return builder.compile(checkpointer=checkpointer)
Loading
Loading