Add AG2 (formerly AutoGen) as a third AI engine (AI_ENGINE=ag2)#46
Add AG2 (formerly AutoGen) as a third AI engine (AI_ENGINE=ag2)#46VasiliyRad wants to merge 3 commits intoEvolutionAPI:mainfrom
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Reviewer's GuideAdds AG2 (AutoGen/AG2) as a third AI engine, implementing an AG2-specific builder and runner, session persistence, MCP integration, and custom HTTP tools, and wiring them into the existing AI engine dispatch paths without requiring schema changes. Sequence diagram for AG2 run_agent request flowsequenceDiagram
actor User
participant ChatRoutes as chat_routes
participant ServiceProviders as service_providers
participant AG2Runner as ag2_run_agent
participant Builder as AG2AgentBuilder
participant SessionSvc as AG2SessionService
participant AG2 as AG2_ConversableAgent_or_GroupChat
participant DB as Database
User->>ChatRoutes: HTTP POST /chat (AI_ENGINE=ag2)
ChatRoutes->>ServiceProviders: resolve_engine(ag2)
ServiceProviders-->>ChatRoutes: run_agent reference
ChatRoutes->>AG2Runner: run_agent(agent_id, external_id, message, session_service, db)
AG2Runner->>DB: get_agent(agent_id)
DB-->>AG2Runner: Agent record
AG2Runner->>Builder: build_agent(root_agent)
Builder->>DB: get_agent(sub_agent_id)*
DB-->>Builder: sub_agents
Builder-->>AG2Runner: ConversableAgent or group_chat_setup
AG2Runner->>SessionSvc: get_or_create(agent_id, external_id)
SessionSvc->>DB: load_or_create AG2StorageSession
DB-->>SessionSvc: AG2StorageSession
SessionSvc-->>AG2Runner: AG2Session
AG2Runner->>SessionSvc: build_messages(session)
SessionSvc-->>AG2Runner: history
alt ag2_mode == group_chat
AG2Runner->>AG2: initiate_group_chat(pattern, history + message)
AG2-->>AG2Runner: chat_result, final_context, last_agent
else ag2_mode == single
AG2Runner->>AG2: initiate_chat(single_agent, message, history)
AG2-->>AG2Runner: chat_result
end
AG2Runner->>SessionSvc: append(session, "user", message)
AG2Runner->>SessionSvc: append(session, "assistant", final_response)
SessionSvc->>DB: update AG2StorageSession.messages
DB-->>SessionSvc: ok
AG2Runner-->>ChatRoutes: {final_response, message_history}
ChatRoutes-->>User: HTTP response with assistant message
Entity relationship diagram for new AG2StorageSession tableerDiagram
AG2StorageSession {
string app_name PK
string user_id PK
string id PK
jsonb messages
timestamp create_time
timestamp update_time
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 15 issues, and left some high level feedback:
- The AG2SessionService creates its own SQLAlchemy engine and calls Base.metadata.create_all at construction time (and is instantiated inside run_agent_stream), which bypasses the app’s existing DB session/metadata lifecycle and will repeatedly run DDL; consider wiring it into the existing Session/engine infrastructure and letting migrations manage the ag2_sessions table.
- AG2StorageSession.messages is typed as MutableDict but defaults to [] and is treated as a list in get_or_create/build_messages, which is inconsistent with the column type; aligning this to a list-oriented mutable type or to a dict consistently will avoid subtle ORM/serialization issues.
- DynamicJSON lacks cache_ok = True and always JSON-serializes non-Postgres values, which can be a performance and warning source in SQLAlchemy 2.x; consider adding cache_ok = True and confirming this decorator matches the project’s existing JSON handling pattern.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The AG2SessionService creates its own SQLAlchemy engine and calls Base.metadata.create_all at construction time (and is instantiated inside run_agent_stream), which bypasses the app’s existing DB session/metadata lifecycle and will repeatedly run DDL; consider wiring it into the existing Session/engine infrastructure and letting migrations manage the ag2_sessions table.
- AG2StorageSession.messages is typed as MutableDict but defaults to [] and is treated as a list in get_or_create/build_messages, which is inconsistent with the column type; aligning this to a list-oriented mutable type or to a dict consistently will avoid subtle ORM/serialization issues.
- DynamicJSON lacks cache_ok = True and always JSON-serializes non-Postgres values, which can be a performance and warning source in SQLAlchemy 2.x; consider adding cache_ok = True and confirming this decorator matches the project’s existing JSON handling pattern.
## Individual Comments
### Comment 1
<location path="src/services/ag2/session_service.py" line_range="95-96" />
<code_context>
+ id: Mapped[str] = mapped_column(
+ String, primary_key=True, default=lambda: str(uuid.uuid4())
+ )
+ messages: Mapped[MutableDict[str, Any]] = mapped_column(
+ MutableDict.as_mutable(DynamicJSON), default=[]
+ )
+ create_time: Mapped[DateTime] = mapped_column(DateTime(), default=func.now())
</code_context>
<issue_to_address>
**issue (bug_risk):** The `messages` column uses `MutableDict` but defaults to a list, which is inconsistent and may break SQLAlchemy change tracking.
The column is typed as `Mapped[MutableDict[str, Any]]` and uses `MutableDict.as_mutable(DynamicJSON)`, but the default is `[]`, which is a list. `MutableDict` expects a mapping and may not track changes correctly with a list value. If `messages` is meant to be a list of message dicts, switch to `MutableList` (or a custom mutable list type) and update the type hint; otherwise, use a dict default (e.g. `{}`) and treat it consistently as a mapping.
</issue_to_address>
### Comment 2
<location path="src/services/ag2/session_service.py" line_range="130-139" />
<code_context>
+ def get_or_create(self, agent_id: str, external_id: str) -> AG2Session:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** `get_or_create` performs two separate DB roundtrips for the same session where one would suffice.
Inside `get_or_create`, you open one session to create/commit the `AG2StorageSession`, then a second session to fetch the same row just to load `messages`. Instead, reuse the first `record` to build `AG2Session` and skip the second `db.get`/session, which both cuts DB roundtrips and removes the race window between the two transactions.
Suggested implementation:
```python
def get_or_create(self, agent_id: str, external_id: str) -> AG2Session:
"""Retrieve an existing session or create a new one.
This implementation performs all work in a single DB session to avoid
extra roundtrips and race windows between separate transactions.
"""
session_id = f"{external_id}_{agent_id}"
with self.SessionLocal() as db:
# Single roundtrip: either fetch existing or create, then return.
record = db.get(AG2StorageSession, (agent_id, external_id, session_id))
if record is None:
record = AG2StorageSession(
app_name=agent_id,
user_id=external_id,
id=session_id,
messages=[],
)
db.add(record)
db.commit()
# Ensure we have a fully-populated, persistent instance
db.refresh(record)
# Make sure messages are loaded while the session is still open.
# If `messages` is a lazy relationship, this will trigger the load here.
_ = record.messages
# Build the AG2Session directly from the record we already have,
# instead of re-opening a new session and re-querying the same row.
return AG2Session(
agent_id=record.app_name,
external_id=record.user_id,
session_id=record.id,
messages=list(record.messages or []),
)
```
Because I only saw the beginning of `get_or_create`, you should:
1. Remove any remaining code after this method that:
- Opens a second `with self.SessionLocal() as db:` block inside `get_or_create`, or
- Re-fetches `AG2StorageSession` (e.g., `db.get(AG2StorageSession, ...)`) for the same `agent_id`/`external_id`/`session_id`.
That logic is now redundant and should be deleted.
2. If your `AG2Session` constructor has a different signature than shown here, adapt the `AG2Session(...)` call accordingly, but keep the pattern of constructing it directly from `record` in this single session.
3. If `messages` is not a relationship but a plain JSON/array column, you can drop the `_ = record.messages` line; the rest of the logic remains the same.
</issue_to_address>
### Comment 3
<location path="src/services/ag2/mcp_service.py" line_range="159-161" />
<code_context>
+ command = server_config.get("command", "npx")
+ args = server_config.get("args", [])
+ env = server_config.get("env", {})
+ if env:
+ for key, value in env.items():
+ os.environ[key] = value
+ server = McpServer({"command": command, "args": args, "env": env})
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Mutating `os.environ` for MCP server env vars introduces global side effects and is likely unnecessary.
In `_connect_server`, you both build an `env` dict for `McpServer` and also write those values into `os.environ`. This leaks per-server config into the global process and can cause unexpected interactions between servers or other code. It should be enough to pass `env` into `McpServer` without modifying `os.environ`.
</issue_to_address>
### Comment 4
<location path="src/services/ag2/agent_builder.py" line_range="134-141" />
<code_context>
+ # Build all sub-agents first so handoff resolution can reference them
+ all_agents = {}
+ agents = []
+ for aid in sub_agent_ids:
+ db_agent = get_agent(self.db, str(aid))
+ if db_agent is None:
+ raise ValueError(f"Sub-agent {aid} not found")
+ ca = await self.build_conversable_agent(db_agent)
+ all_agents[str(aid)] = ca
+ agents.append(ca)
+
+ root_ca = await self.build_conversable_agent(root_agent)
+ all_agents[str(root_agent.id)] = root_ca
+
+ # Apply handoffs to each agent if configured
+ for aid in sub_agent_ids:
+ db_agent = get_agent(self.db, str(aid))
+ if db_agent and db_agent.config:
+ self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)
+
</code_context>
<issue_to_address>
**suggestion (performance):** Sub-agents are fetched from the database twice: once to build them and again to apply handoffs.
In `build_group_chat_setup`, each `aid` triggers two `get_agent` calls—first to build `ConversableAgent`s, then again when applying handoffs. Consider storing the initial `db_agent` objects (e.g., an id → db_agent dict) and reusing them when applying handoffs to avoid duplicate database queries and keep the lookup logic centralized.
```suggestion
config = root_agent.config or {}
sub_agent_ids = config.get("sub_agents", [])
if not sub_agent_ids:
raise ValueError("group_chat agent requires at least one sub_agent")
# Build all sub-agents first so handoff resolution can reference them
all_agents = {}
agents = []
db_sub_agents = {}
for aid in sub_agent_ids:
db_agent = get_agent(self.db, str(aid))
if db_agent is None:
raise ValueError(f"Sub-agent {aid} not found")
db_sub_agents[str(aid)] = db_agent
ca = await self.build_conversable_agent(db_agent)
all_agents[str(aid)] = ca
agents.append(ca)
root_ca = await self.build_conversable_agent(root_agent)
all_agents[str(root_agent.id)] = root_ca
# Apply handoffs to each sub-agent if configured, reusing cached db agents
for aid in sub_agent_ids:
db_agent = db_sub_agents.get(str(aid))
if db_agent and db_agent.config:
self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)
```
</issue_to_address>
### Comment 5
<location path="src/services/ag2/agent_builder.py" line_range="94-116" />
<code_context>
+ logger.warning(f"Handoff target {target_id} not found, skipping")
+ continue
+
+ if h["type"] == "llm":
+ llm_conditions.append(
+ OnCondition(
+ target=AgentTarget(target_agent),
+ condition=StringLLMCondition(prompt=h["condition"]),
+ )
+ )
+ elif h["type"] == "context":
+ context_conditions.append(
+ OnContextCondition(
</code_context>
<issue_to_address>
**suggestion:** Using `h["type"]` without validation can raise if a handoff entry is malformed.
In `_apply_handoffs`, this direct `h["type"]` access will raise a `KeyError` if a handoff entry is missing `type`, which will break setup for a misconfigured user config. Consider using `h.get("type")` and skipping/logging invalid entries, consistent with how missing `target_agent` is handled.
```suggestion
for h in handoffs_config:
target_id = h.get("target_agent_id")
target_agent = all_agents.get(str(target_id))
if not target_agent:
logger.warning(f"Handoff target {target_id} not found, skipping")
continue
h_type = h.get("type")
if h_type not in ("llm", "context"):
logger.warning(
f"Invalid or missing handoff type {h_type!r} for target {target_id}, skipping"
)
continue
if h_type == "llm":
llm_conditions.append(
OnCondition(
target=AgentTarget(target_agent),
condition=StringLLMCondition(prompt=h["condition"]),
)
)
elif h_type == "context":
context_conditions.append(
OnContextCondition(
target=AgentTarget(target_agent),
condition=ExpressionContextCondition(
expression=ContextExpression(h["expression"])
),
)
)
```
</issue_to_address>
### Comment 6
<location path="src/services/ag2/custom_tool.py" line_range="109-116" />
<code_context>
+ ):
+ body_data[param] = value
+
+ response = requests.request(
+ method=method,
+ url=url,
+ headers=processed_headers,
+ params=query_params_dict,
+ json=body_data or None,
+ timeout=error_handling.get("timeout", 30),
+ )
+
+ if response.status_code >= 400:
+ raise requests.exceptions.HTTPError(
+ f"Error in the request: {response.status_code} - {response.text}"
+ )
+
+ return json.dumps(response.json())
+
+ except Exception as e:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Assuming all HTTP responses are JSON can cause tool failures for non‑JSON endpoints.
`response.json()` is called unconditionally, so plain text/HTML responses (including some error bodies) will raise and drop you into the generic `Exception` handler, obscuring the real error. Wrap `response.json()` in a `try/except ValueError` and fall back to `response.text` or a consistent JSON wrapper so non‑JSON responses are handled gracefully.
```suggestion
if response.status_code >= 400:
raise requests.exceptions.HTTPError(
f"Error in the request: {response.status_code} - {response.text}"
)
try:
response_data = response.json()
except ValueError:
# Non-JSON response; fall back to a consistent JSON wrapper
response_data = {
"status_code": response.status_code,
"headers": dict(response.headers),
"raw_response": response.text,
}
return json.dumps(response_data)
except Exception as e:
```
</issue_to_address>
### Comment 7
<location path="src/services/ag2/agent_runner.py" line_range="46-50" />
<code_context>
+ try:
+ ag2_mode = (db_agent.config or {}).get("ag2_mode", "single")
+ if ag2_mode == "group_chat":
+ chat_result, final_context, last_agent = initiate_group_chat(
+ pattern=result["pattern"],
+ messages=history + [message],
+ max_rounds=result["max_rounds"],
+ context_variables=result["context_variables"],
+ )
+ final_response = chat_result.summary or (
</code_context>
<issue_to_address>
**issue (bug_risk):** Group chat `messages` mixes dict history with a raw string for the new message, which may not match AG2’s expected format.
Here `messages` is built as `history + [message]`, mixing `{role, content}` dicts with a plain string. If `initiate_group_chat` expects a uniform list of message dicts, this could break routing/formatting. Consider wrapping `message` as `{"role": "user", "content": message}` to keep the structure consistent.
</issue_to_address>
### Comment 8
<location path="src/services/ag2/agent_runner.py" line_range="24" />
<code_context>
+ session_service: AG2SessionService,
+ db: Session,
+ session_id: Optional[str] = None,
+ timeout: float = 60.0,
+ files: Optional[list] = None,
+) -> dict:
</code_context>
<issue_to_address>
**nitpick:** The `timeout` parameter on `run_agent` is currently unused.
Since `timeout` isn’t used anywhere in the implementation, the signature is misleading. Please either apply it to the underlying execution (e.g., wrap the blocking call in `asyncio.wait_for`) or remove the parameter.
</issue_to_address>
### Comment 9
<location path="src/services/ag2/agent_runner.py" line_range="21" />
<code_context>
+ agent_id: str,
+ external_id: str,
+ message: str,
+ session_service: AG2SessionService,
+ db: Session,
+ session_id: Optional[str] = None,
</code_context>
<issue_to_address>
**question (bug_risk):** Session IDs are accepted but not used, so distinct sessions for the same agent/user cannot be differentiated.
`run_agent` and `run_agent_stream` accept `session_id`, but `AG2SessionService.get_or_create` always uses `f"{external_id}_{agent_id}"`, so concurrent sessions for the same agent/user are merged. If multiple sessions are intended, include `session_id` in the session key; if not, consider removing `session_id` from the API to avoid confusion.
</issue_to_address>
### Comment 10
<location path="src/services/ag2/agent_runner.py" line_range="8-17" />
<code_context>
+ Token-level streaming can be added in a future iteration by wiring
+ ConversableAgent's `process_last_received_message` hook to a queue.
+ """
+ from src.services.ag2.session_service import AG2SessionService
+ from src.config.settings import get_settings
+ settings = get_settings()
+ session_service = AG2SessionService(db_url=settings.POSTGRES_CONNECTION_STRING)
+
+ result = await run_agent(
</code_context>
<issue_to_address>
**suggestion (performance):** Creating a new `AG2SessionService` (and engine) on every `run_agent_stream` call is potentially expensive.
In `run_agent_stream`, each request creates a new `AG2SessionService`, which builds a new SQLAlchemy engine and metadata. Consider reusing a shared `AG2SessionService` or at least a shared engine/session factory to avoid repeated DB setup and reduce overhead under load.
</issue_to_address>
### Comment 11
<location path="tests/services/ag2/test_agent_builder.py" line_range="40-49" />
<code_context>
+ }
+ return a
+
+@pytest.mark.asyncio
+async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
+ builder = AG2AgentBuilder(db=mock_db)
+ sub_agent = _make_agent("uuid-triage", "triage")
+ with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent):
+ result, _ = await builder.build_agent(group_chat_agent_record)
+ # result should be a dict with pattern and agents for initiate_group_chat
+ assert "pattern" in result
+ assert "agents" in result
+
+@pytest.mark.asyncio
+async def test_builder_validates_group_chat_requires_agents(mock_db):
+ record = MagicMock()
+ record.type = "llm"
+ record.config = {"ag2_mode": "group_chat", "sub_agents": []} # empty — should raise
+ record.api_key = "test"
+ builder = AG2AgentBuilder(db=mock_db)
+ with pytest.raises(ValueError, match="at least one"):
+ await builder.build_agent(record)
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for single-agent mode and API key resolution branches in AG2AgentBuilder
Current tests only cover group chat orchestration and the empty-sub-agent validation. Please also add coverage for:
- `single` mode: `build_agent` returning a `ConversableAgent` when `ag2_mode` is unset or set to `"single"`.
- Agent metadata: name sanitization (spaces → underscores) and correct system message composition from `role`, `goal`, and `instruction`.
- `_get_api_key` behavior:
- `api_key_id` provided and resolved via `get_decrypted_api_key`.
- `config.api_key` as a UUID string resolved via `get_decrypted_api_key`.
- `config.api_key` as a non-UUID raw key returned as-is.
- No valid key → `ValueError` with a clear message.
These cases help ensure behavior stays aligned with ADK/CrewAI builders and that auth errors are surfaced properly.
Suggested implementation:
```python
@pytest.mark.asyncio
async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
builder = AG2AgentBuilder(db=mock_db)
sub_agent = _make_agent("uuid-triage", "triage")
with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent):
result, _ = await builder.build_agent(group_chat_agent_record)
# result should be a dict with pattern and agents for initiate_group_chat
assert "pattern" in result
assert "agents" in result
@pytest.mark.asyncio
async def test_builder_validates_group_chat_requires_agents(mock_db):
record = MagicMock()
record.type = "llm"
record.config = {"ag2_mode": "group_chat", "sub_agents": []} # empty — should raise
record.api_key = "test"
builder = AG2AgentBuilder(db=mock_db)
with pytest.raises(ValueError, match="at least one"):
await builder.build_agent(record)
@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_unset(mock_db):
record = MagicMock()
record.type = "llm"
record.name = "Support Team Agent"
record.api_key_id = None
record.api_key = None
record.config = {
# no ag2_mode: should default to single-agent mode
"role": "assistant",
"goal": "Help users resolve issues",
"instruction": "Be concise and friendly.",
"api_key": "raw-openai-key",
}
builder = AG2AgentBuilder(db=mock_db)
agent, metadata = await builder.build_agent(record)
# single-agent mode should produce a ConversableAgent-like object
from autogen import ConversableAgent # type: ignore[import]
assert isinstance(agent, ConversableAgent)
# name should be sanitized (spaces -> underscores)
assert agent.name == "Support_Team_Agent"
# system message should compose role, goal and instruction
system_message = getattr(agent, "system_message", "")
assert "assistant" in system_message
assert "Help users resolve issues" in system_message
assert "Be concise and friendly." in system_message
# metadata should at least echo the mode
assert metadata.get("mode") == "single"
@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_single(mock_db):
record = MagicMock()
record.type = "llm"
record.name = "Billing Bot"
record.api_key_id = None
record.api_key = None
record.config = {
"ag2_mode": "single",
"role": "assistant",
"goal": "Assist with billing questions",
"instruction": "Only answer billing-related queries.",
"api_key": "raw-openai-key",
}
builder = AG2AgentBuilder(db=mock_db)
agent, metadata = await builder.build_agent(record)
from autogen import ConversableAgent # type: ignore[import]
assert isinstance(agent, ConversableAgent)
assert agent.name == "Billing_Bot"
system_message = getattr(agent, "system_message", "")
assert "billing" in system_message.lower()
assert "only answer billing-related queries.".lower() in system_message.lower()
assert metadata.get("mode") == "single"
def test_get_api_key_with_api_key_id_uses_secret_store(mock_db):
builder = AG2AgentBuilder(db=mock_db)
record = MagicMock()
record.api_key_id = "11111111-2222-3333-4444-555555555555"
record.api_key = None
record.config = {}
with patch(
"src.services.ag2.agent_builder.get_decrypted_api_key",
return_value="resolved-secret-key",
) as mock_get:
api_key = builder._get_api_key(record)
mock_get.assert_called_once_with(mock_db, record.api_key_id)
assert api_key == "resolved-secret-key"
def test_get_api_key_with_uuid_string_in_config(mock_db):
builder = AG2AgentBuilder(db=mock_db)
config_uuid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
record = MagicMock()
record.api_key_id = None
record.api_key = None
record.config = {"api_key": config_uuid}
with patch(
"src.services.ag2.agent_builder.get_decrypted_api_key",
return_value="resolved-from-config",
) as mock_get:
api_key = builder._get_api_key(record)
mock_get.assert_called_once_with(mock_db, config_uuid)
assert api_key == "resolved-from-config"
def test_get_api_key_with_raw_non_uuid_config_value(mock_db):
builder = AG2AgentBuilder(db=mock_db)
record = MagicMock()
record.api_key_id = None
record.api_key = None
record.config = {"api_key": "sk-live-plain-openai-key"}
api_key = builder._get_api_key(record)
# Non-UUID api_key should be returned as-is, without hitting secret store
assert api_key == "sk-live-plain-openai-key"
def test_get_api_key_raises_when_no_valid_key(mock_db):
builder = AG2AgentBuilder(db=mock_db)
record = MagicMock()
record.api_key_id = None
record.api_key = None
record.config = {}
with pytest.raises(ValueError, match="API key"):
builder._get_api_key(record)
```
1. Ensure `ConversableAgent` is available at the top of the test module; if you prefer not to import it inside tests, move the `from autogen import ConversableAgent` import to the global imports section and remove the inline imports in the tests.
2. Verify the call signature of `get_decrypted_api_key` in `src.services.ag2.agent_builder`; the tests assume `get_decrypted_api_key(db, api_key_id)`. If the actual order differs, adjust the `assert_called_once_with` expectations accordingly.
3. If `AG2AgentBuilder._get_api_key` currently uses a different error message when no key is found, update the implementation to raise `ValueError("API key could not be resolved")` (or similar) so that the `"API key"` substring in the `match` expression is present.
4. If the `ConversableAgent` implementation uses a different attribute than `.system_message` for the composed instructions, either:
- Update `AG2AgentBuilder` to set `.system_message` to the composed text, or
- Adjust the tests to assert against the actual attribute used (e.g. `agent.description`), keeping the checks that `role`, `goal`, and `instruction` are all present.
</issue_to_address>
### Comment 12
<location path="tests/services/ag2/test_agent_builder.py" line_range="6-15" />
<code_context>
+from unittest.mock import MagicMock, AsyncMock, patch
+from src.services.ag2.agent_builder import AG2AgentBuilder
+
+@pytest.fixture
+def mock_db():
+ return AsyncMock()
+
+def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"):
+ """Create a minimal agent record mock with explicit attribute assignment."""
+ a = MagicMock()
+ a.id = agent_id
+ a.name = agent_name
+ a.type = "llm"
+ a.llm_provider = "openai"
+ a.model = model
+ a.config = {"ag2_mode": ag2_mode, "api_key": "test-key"}
+ a.api_key = "test-key"
+ a.api_key_id = None
+ a.api_url = "https://api.openai.com/v1"
+ a.role = None
+ a.goal = None
+ a.instruction = f"You are {agent_name}."
+ return a
+
+@pytest.fixture
</code_context>
<issue_to_address>
**suggestion (testing):** Extend tests to cover handoff configuration and sub-agent resolution in group chat setups
`AG2AgentBuilder` has substantial logic in `_apply_handoffs` and `build_group_chat_setup` (LLM/context handoffs, `after_work` mapping, and missing sub-agent handling), but current tests only check the output shape.
Please add tests that:
- Build a root agent with sub-agents containing `handoffs` of type `"llm"` and `"context"`, and assert the expected conditions are registered on `ca.handoffs`.
- Use a sub-agent ID for which `get_agent` returns `None` and assert a `ValueError("Sub-agent ... not found")`.
- Verify `after_work` resolves to `TerminateTarget` when set to `"terminate"` and to `RevertToUserTarget` otherwise.
You can rely on lightweight `MagicMock` agents or patched AG2 classes to avoid heavy dependencies while still validating the behavior and wiring.
Suggested implementation:
```python
from unittest.mock import MagicMock, AsyncMock, patch
from src.services.ag2.agent_builder import AG2AgentBuilder, TerminateTarget, RevertToUserTarget
@pytest.fixture
def mock_db():
return AsyncMock()
def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"):
"""Create a minimal agent record mock with explicit attribute assignment."""
a = MagicMock()
a.id = agent_id
a.name = agent_name
a.type = "llm"
a.llm_provider = "openai"
a.model = model
a.config = {"ag2_mode": ag2_mode, "api_key": "test-key"}
a.api_key = "test-key"
a.api_key_id = None
a.api_url = "https://api.openai.com/v1"
a.role = None
a.goal = None
a.instruction = f"You are {agent_name}."
return a
@pytest.fixture
def root_agent_with_handoffs():
"""Root agent configured with LLM and context handoffs to sub-agents."""
root = _make_agent("root", "Root Agent", ag2_mode="group")
# Two handoffs: one LLM and one context, with different after_work semantics
root.config["handoffs"] = [
{
"type": "llm",
"to": "sub-llm",
"condition": {"kind": "tool", "name": "delegate_to_sub_llm"},
"after_work": "terminate",
},
{
"type": "context",
"to": "sub-context",
"condition": {"kind": "message", "contains": "use context"},
"after_work": "return",
},
]
return root
@pytest.fixture
def sub_agents():
"""Sub-agents referenced by the root agent's handoffs."""
sub_llm = _make_agent("sub-llm", "Sub LLM Agent")
sub_ctx = _make_agent("sub-context", "Sub Context Agent")
return {"sub-llm": sub_llm, "sub-context": sub_ctx}
@pytest.mark.asyncio
async def test_apply_handoffs_registers_llm_and_context_handoffs(
mock_db, root_agent_with_handoffs, sub_agents
):
"""
Ensure LLM and context handoffs are transformed and registered on ca.handoffs.
This exercises AG2AgentBuilder.build_group_chat_setup and its internal
_apply_handoffs logic without pulling in heavy AG2 dependencies.
"""
builder = AG2AgentBuilder(mock_db)
# Patch get_agent so handoff resolution finds our lightweight MagicMock sub-agents.
async def fake_get_agent(agent_id: str):
return sub_agents.get(agent_id)
with patch.object(builder, "get_agent", side_effect=fake_get_agent):
# Depending on implementation, the entry point may differ.
# We assume build_group_chat_setup(root_agent) returns an object with .handoffs.
ca = await builder.build_group_chat_setup(root_agent_with_handoffs)
# We expect one handoff per entry in root_agent.config["handoffs"].
assert hasattr(ca, "handoffs")
assert len(ca.handoffs) == 2
# Order is preserved: first is LLM handoff, second is context handoff.
llm_handoff, ctx_handoff = ca.handoffs
# The tests are intentionally lightweight: we validate the wiring rather than AG2 internals.
# LLM handoff should target the sub-llm agent and have a condition based on tool usage.
assert getattr(llm_handoff, "sub_agent_id", None) == "sub-llm"
assert getattr(llm_handoff, "type", None) == "llm"
assert "delegate_to_sub_llm" in repr(getattr(llm_handoff, "condition", ""))
# Context handoff should target the sub-context agent and have a message-based condition.
assert getattr(ctx_handoff, "sub_agent_id", None) == "sub-context"
assert getattr(ctx_handoff, "type", None) == "context"
assert "use context" in repr(getattr(ctx_handoff, "condition", ""))
@pytest.mark.asyncio
async def test_apply_handoffs_missing_sub_agent_raises_value_error(
mock_db, root_agent_with_handoffs
):
"""
When get_agent returns None for a referenced sub-agent, a ValueError is raised.
"""
builder = AG2AgentBuilder(mock_db)
# Force one of the handoffs to reference a non-existent sub-agent.
root_agent_with_handoffs.config["handoffs"][0]["to"] = "missing-subagent-id"
async def fake_get_agent(agent_id: str):
# Simulate not-found for the missing ID, normal behavior otherwise.
if agent_id == "missing-subagent-id":
return None
return _make_agent(agent_id, f"Agent {agent_id}")
with patch.object(builder, "get_agent", side_effect=fake_get_agent):
with pytest.raises(ValueError, match=r"Sub-agent .* not found"):
await builder.build_group_chat_setup(root_agent_with_handoffs)
@pytest.mark.asyncio
async def test_after_work_resolves_to_correct_targets(
mock_db, root_agent_with_handoffs, sub_agents
):
"""
Verify that after_work is mapped to TerminateTarget and RevertToUserTarget.
- "terminate" -> TerminateTarget
- any other value (e.g. "return") -> RevertToUserTarget
"""
builder = AG2AgentBuilder(mock_db)
async def fake_get_agent(agent_id: str):
return sub_agents.get(agent_id)
with patch.object(builder, "get_agent", side_effect=fake_get_agent):
ca = await builder.build_group_chat_setup(root_agent_with_handoffs)
assert hasattr(ca, "handoffs")
assert len(ca.handoffs) == 2
llm_handoff, ctx_handoff = ca.handoffs
# after_work was set to "terminate" for the LLM handoff.
after_work_llm = getattr(llm_handoff, "after_work", None)
assert isinstance(after_work_llm, TerminateTarget)
# after_work was set to "return" for the context handoff.
after_work_ctx = getattr(ctx_handoff, "after_work", None)
assert isinstance(after_work_ctx, RevertToUserTarget)
```
These tests assume the following public/semipublic API on `AG2AgentBuilder` and related classes:
1. `AG2AgentBuilder` has an async method `build_group_chat_setup(root_agent)` returning an object (here called `ca`) that exposes:
- `ca.handoffs`: an ordered iterable of handoff objects.
2. Each handoff object has (or can be extended to have) the attributes:
- `type`: `"llm"` or `"context"`.
- `sub_agent_id`: the string ID of the sub-agent resolved from `config["handoffs"][i]["to"]`.
- `condition`: an object whose `repr` contains at least the identifying text from the `condition` dict in the config.
- `after_work`: an instance of either `TerminateTarget` or `RevertToUserTarget`.
3. `AG2AgentBuilder` exposes an async method `get_agent(agent_id: str)` used by `_apply_handoffs` to resolve sub-agents.
4. `src.services.ag2.agent_builder` exports `TerminateTarget` and `RevertToUserTarget`.
If any of these assumptions differ from your actual implementation, you should:
- Update the test calls (`build_group_chat_setup`, attribute names on the returned object, etc.) to match your real API.
- Adjust the assertions on the handoff objects to match the actual attributes you expose (for example, you may store the sub-agent as `handoff.agent` instead of `sub_agent_id`; in that case assert on `handoff.agent.id`).
- If `TerminateTarget` / `RevertToUserTarget` are not directly exported, either import them from the correct module or assert based on whatever target representation your implementation uses (e.g., string enums like `"terminate"` / `"user"`).
</issue_to_address>
### Comment 13
<location path="tests/services/ag2/test_agent_runner.py" line_range="7-16" />
<code_context>
+ }
+ return a
+
+@pytest.mark.asyncio
+async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
+ builder = AG2AgentBuilder(db=mock_db)
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for error conditions and non-happy-path behavior in `run_agent_stream` and `run_agent`
Please also cover:
- A case where `run_agent` raises (e.g., `InternalServerError`) and `run_agent_stream` propagates the exception instead of yielding.
- Direct `run_agent` tests for:
- `get_agent` returning `None` → `AgentNotFoundError`.
- `ag2_mode == "group_chat"` with `initiate_group_chat` patched, asserting correct use of `chat_result.summary` and `chat_history`.
- `ag2_mode == "single"` with `ConversableAgent` patched, asserting the proxy pattern and that the final response comes from the last message.
This will exercise the control flow and error mapping beyond the happy-path streaming case.
Suggested implementation:
```python
import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from src.exceptions import InternalServerError
from src.services.ag2.agent_runner import run_agent_stream, run_agent, AgentNotFoundError
```
```python
assert data["content"]["parts"][0]["text"] == "Resolved: your issue is fixed."
assert data["is_final"] is True
@pytest.mark.asyncio
async def test_run_agent_stream_propagates_internal_server_error():
mock_settings = MagicMock()
mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"
# Patch run_agent to raise, and still patch session service + settings as in happy-path test
with patch(
"src.services.ag2.agent_runner.run_agent",
AsyncMock(side_effect=InternalServerError("boom")),
):
with patch("src.services.ag2.session_service.AG2SessionService"):
with patch("src.config.settings.get_settings", return_value=mock_settings):
with pytest.raises(InternalServerError):
async for _ in run_agent_stream(
db=AsyncMock(),
agent_id="agent-123",
external_id="ext-123",
session_id="session-abc",
message="This will fail",
):
# If we ever yield, the behavior is wrong
pytest.fail("run_agent_stream should not yield on InternalServerError")
@pytest.mark.asyncio
async def test_run_agent_get_agent_none_raises_agent_not_found():
mock_db = AsyncMock()
with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
builder_instance = builder_cls.return_value
builder_instance.get_agent.return_value = None
with pytest.raises(AgentNotFoundError):
await run_agent(
db=mock_db,
agent_id="missing-agent",
external_id="ext-123",
session_id="session-abc",
message="Hello?",
)
@pytest.mark.asyncio
async def test_run_agent_group_chat_uses_summary_and_chat_history():
mock_db = AsyncMock()
# Fake agent record in group_chat mode
agent_record = MagicMock()
agent_record.ag2_mode = "group_chat"
chat_result = MagicMock()
chat_result.summary = "Group chat resolution summary"
chat_result.chat_history = [
{"role": "user", "content": "Question"},
{"role": "assistant", "content": "Intermediate"},
{"role": "assistant", "content": "Final"},
]
with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
builder_instance = builder_cls.return_value
builder_instance.get_agent.return_value = agent_record
with patch(
"src.services.ag2.agent_runner.initiate_group_chat",
AsyncMock(return_value=chat_result),
):
result = await run_agent(
db=mock_db,
agent_id="group-agent",
external_id="ext-123",
session_id="session-abc",
message="Start group chat",
)
assert result["final_response"] == chat_result.summary
assert result["message_history"] == chat_result.chat_history
@pytest.mark.asyncio
async def test_run_agent_single_mode_uses_proxy_and_last_message_response():
mock_db = AsyncMock()
# Fake agent record in single mode
agent_record = MagicMock()
agent_record.ag2_mode = "single"
# Simulated conversation messages; run_agent should use the last one as final response
conversation_messages = [
{"role": "user", "content": "Hi"},
{"role": "assistant", "content": "First reply"},
{"role": "assistant", "content": "Final reply from proxy"},
]
with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
builder_instance = builder_cls.return_value
builder_instance.get_agent.return_value = agent_record
# Mock ConversableAgent to simulate proxy pattern and returned messages
with patch("src.services.ag2.agent_runner.ConversableAgent") as ConversableAgentMock:
# Each instantiation returns a separate agent mock
primary_agent = MagicMock()
proxy_agent = MagicMock()
# The proxy agent produces the conversation messages
proxy_agent.initiate_chat.return_value = conversation_messages
ConversableAgentMock.side_effect = [primary_agent, proxy_agent]
result = await run_agent(
db=mock_db,
agent_id="single-agent",
external_id="ext-123",
session_id="session-abc",
message="Talk to me",
)
# Assert that we used the proxy pattern (two agents created)
assert ConversableAgentMock.call_count == 2
# Final response should come from the last message produced by the proxy conversation
assert result["final_response"] == conversation_messages[-1]["content"]
```
These tests assume:
1. `InternalServerError` is available as `src.exceptions.InternalServerError`. If it lives elsewhere, update the import path accordingly.
2. `AgentNotFoundError` is exported from `src.services.ag2.agent_runner`. If it is defined in a different module, adjust the import and the `pytest.raises(...)` target.
3. `run_agent`:
- Accepts the arguments `(db, agent_id, external_id, session_id, message, ...)` as used here.
- Raises `AgentNotFoundError` when `AG2AgentBuilder(...).get_agent(...)` returns `None`.
- In `"group_chat"` mode calls `initiate_group_chat(...)` and returns a dict with `"final_response"` (from `chat_result.summary`) and `"message_history"` (from `chat_result.chat_history`).
- In `"single"` mode constructs two `ConversableAgent` instances (primary + proxy), uses the proxy's `initiate_chat` result, and sets `"final_response"` from the last message's `"content"`.
If your actual control flow differs (e.g., different attribute names, return shapes, or proxy setup), adjust the assertions and mocks (especially the `side_effect` on `ConversableAgentMock` and the expected keys in `result`) to match the real implementation.
</issue_to_address>
### Comment 14
<location path="tests/services/ag2/test_agent_runner.py" line_range="18-27" />
<code_context>
+
+ # Patch run_agent (the heavy work), session service, and settings —
+ # get_settings and AG2SessionService are local imports inside run_agent_stream
+ with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)):
+ with patch("src.services.ag2.session_service.AG2SessionService"):
+ with patch("src.config.settings.get_settings", return_value=mock_settings):
+ chunks = []
+ async for chunk in run_agent_stream(
+ db=AsyncMock(),
+ agent_id="agent-123",
+ external_id="ext-123",
+ session_id="session-abc",
+ message="My printer is broken",
+ ):
+ chunks.append(chunk)
+
+ assert len(chunks) >= 1
</code_context>
<issue_to_address>
**suggestion (testing):** Consider asserting the full streaming envelope contract, not just the text content
Since `run_agent_stream` should match the WebSocket/ADK streaming envelope, this test should also validate the JSON schema, e.g.:
- `data["content"]["role"] == "agent"`
- `data["author"]` matches the `agent_id` argument
- `parts` elements have the expected `{ "type": "text" }` shape
This will better enforce the streaming protocol contract and detect unintended schema changes.
</issue_to_address>
### Comment 15
<location path="tests/services/ag2/test_agent_runner.py" line_range="13-20" />
<code_context>
+ "final_response": "Resolved: your issue is fixed.",
+ "message_history": [],
+ }
+ mock_settings = MagicMock()
+ mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"
+
+ # Patch run_agent (the heavy work), session service, and settings —
+ # get_settings and AG2SessionService are local imports inside run_agent_stream
+ with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)):
+ with patch("src.services.ag2.session_service.AG2SessionService"):
+ with patch("src.config.settings.get_settings", return_value=mock_settings):
+ chunks = []
+ async for chunk in run_agent_stream(
</code_context>
<issue_to_address>
**suggestion (testing):** Add a dedicated test for session history integration in `run_agent`
Since `AG2SessionService` is fully stubbed, we don’t currently verify that messages are appended and persisted. Please add a focused unit test for `run_agent` that injects a fake `AG2SessionService` with `get_or_create`, `append`, and `save` mocked, and asserts that:
- `append` is called in order with `("user", message)` then `("assistant", final_response)`
- `save` is called once with the same session instance
This will ensure conversational state is correctly persisted for the WhatsApp pause/resume scenario described in the PR.
Suggested implementation:
```python
import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch, call
from src.services.ag2.agent_runner import run_agent_stream, run_agent
```
```python
assert len(chunks) >= 1
data = json.loads(chunks[0])
@pytest.mark.asyncio
async def test_run_agent_persists_session_history_in_order():
mock_settings = MagicMock()
mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"
with patch("src.services.ag2.session_service.AG2SessionService") as MockSessionService, \
patch("src.config.settings.get_settings", return_value=mock_settings):
mock_session_service = MockSessionService.return_value
mock_session = MagicMock()
# Ensure session service methods are async and controllable
mock_session_service.get_or_create = AsyncMock(return_value=mock_session)
mock_session_service.append = AsyncMock()
mock_session_service.save = AsyncMock()
db = AsyncMock()
user_message = "My printer is broken"
result = await run_agent(
db=db,
agent_id="agent-123",
external_id="ext-123",
session_id="session-abc",
message=user_message,
)
# Append is called in order: user then assistant
mock_session_service.append.assert_has_awaits(
[
call(mock_session, "user", user_message),
call(mock_session, "assistant", result["final_response"]),
]
)
# Save is called once with the same session instance
mock_session_service.save.assert_awaited_once_with(mock_session)
```
If the actual signature of `run_agent` or the `AG2SessionService` methods differ (for example, if `append`/`save` or `get_or_create` are synchronous instead of async, or if `append` takes different parameters), you should:
1. Adjust the `AsyncMock` usage to `MagicMock` and switch from `assert_awaited*` to `assert_called*` accordingly.
2. Update the `call(...)` argument lists to match the real method signatures (e.g., if `append` receives keyword arguments or a different ordering).
3. If `run_agent` requires additional parameters (such as config, tools, or agent options), pass suitable dummy values in the test invocation.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| messages: Mapped[MutableDict[str, Any]] = mapped_column( | ||
| MutableDict.as_mutable(DynamicJSON), default=[] |
There was a problem hiding this comment.
issue (bug_risk): The messages column uses MutableDict but defaults to a list, which is inconsistent and may break SQLAlchemy change tracking.
The column is typed as Mapped[MutableDict[str, Any]] and uses MutableDict.as_mutable(DynamicJSON), but the default is [], which is a list. MutableDict expects a mapping and may not track changes correctly with a list value. If messages is meant to be a list of message dicts, switch to MutableList (or a custom mutable list type) and update the type hint; otherwise, use a dict default (e.g. {}) and treat it consistently as a mapping.
| def get_or_create(self, agent_id: str, external_id: str) -> AG2Session: | ||
| """Retrieve an existing session or create a new one.""" | ||
| session_id = f"{external_id}_{agent_id}" | ||
| with self.SessionLocal() as db: | ||
| record = db.get(AG2StorageSession, (agent_id, external_id, session_id)) | ||
| if record is None: | ||
| record = AG2StorageSession( | ||
| app_name=agent_id, | ||
| user_id=external_id, | ||
| id=session_id, |
There was a problem hiding this comment.
suggestion (bug_risk): get_or_create performs two separate DB roundtrips for the same session where one would suffice.
Inside get_or_create, you open one session to create/commit the AG2StorageSession, then a second session to fetch the same row just to load messages. Instead, reuse the first record to build AG2Session and skip the second db.get/session, which both cuts DB roundtrips and removes the race window between the two transactions.
Suggested implementation:
def get_or_create(self, agent_id: str, external_id: str) -> AG2Session:
"""Retrieve an existing session or create a new one.
This implementation performs all work in a single DB session to avoid
extra roundtrips and race windows between separate transactions.
"""
session_id = f"{external_id}_{agent_id}"
with self.SessionLocal() as db:
# Single roundtrip: either fetch existing or create, then return.
record = db.get(AG2StorageSession, (agent_id, external_id, session_id))
if record is None:
record = AG2StorageSession(
app_name=agent_id,
user_id=external_id,
id=session_id,
messages=[],
)
db.add(record)
db.commit()
# Ensure we have a fully-populated, persistent instance
db.refresh(record)
# Make sure messages are loaded while the session is still open.
# If `messages` is a lazy relationship, this will trigger the load here.
_ = record.messages
# Build the AG2Session directly from the record we already have,
# instead of re-opening a new session and re-querying the same row.
return AG2Session(
agent_id=record.app_name,
external_id=record.user_id,
session_id=record.id,
messages=list(record.messages or []),
)Because I only saw the beginning of get_or_create, you should:
- Remove any remaining code after this method that:
- Opens a second
with self.SessionLocal() as db:block insideget_or_create, or - Re-fetches
AG2StorageSession(e.g.,db.get(AG2StorageSession, ...)) for the sameagent_id/external_id/session_id.
That logic is now redundant and should be deleted.
- Opens a second
- If your
AG2Sessionconstructor has a different signature than shown here, adapt theAG2Session(...)call accordingly, but keep the pattern of constructing it directly fromrecordin this single session. - If
messagesis not a relationship but a plain JSON/array column, you can drop the_ = record.messagesline; the rest of the logic remains the same.
| if env: | ||
| for key, value in env.items(): | ||
| os.environ[key] = value |
There was a problem hiding this comment.
issue (bug_risk): Mutating os.environ for MCP server env vars introduces global side effects and is likely unnecessary.
In _connect_server, you both build an env dict for McpServer and also write those values into os.environ. This leaks per-server config into the global process and can cause unexpected interactions between servers or other code. It should be enough to pass env into McpServer without modifying os.environ.
| config = root_agent.config or {} | ||
| sub_agent_ids = config.get("sub_agents", []) | ||
| if not sub_agent_ids: | ||
| raise ValueError("group_chat agent requires at least one sub_agent") | ||
|
|
||
| # Build all sub-agents first so handoff resolution can reference them | ||
| all_agents = {} | ||
| agents = [] |
There was a problem hiding this comment.
suggestion (performance): Sub-agents are fetched from the database twice: once to build them and again to apply handoffs.
In build_group_chat_setup, each aid triggers two get_agent calls—first to build ConversableAgents, then again when applying handoffs. Consider storing the initial db_agent objects (e.g., an id → db_agent dict) and reusing them when applying handoffs to avoid duplicate database queries and keep the lookup logic centralized.
| config = root_agent.config or {} | |
| sub_agent_ids = config.get("sub_agents", []) | |
| if not sub_agent_ids: | |
| raise ValueError("group_chat agent requires at least one sub_agent") | |
| # Build all sub-agents first so handoff resolution can reference them | |
| all_agents = {} | |
| agents = [] | |
| config = root_agent.config or {} | |
| sub_agent_ids = config.get("sub_agents", []) | |
| if not sub_agent_ids: | |
| raise ValueError("group_chat agent requires at least one sub_agent") | |
| # Build all sub-agents first so handoff resolution can reference them | |
| all_agents = {} | |
| agents = [] | |
| db_sub_agents = {} | |
| for aid in sub_agent_ids: | |
| db_agent = get_agent(self.db, str(aid)) | |
| if db_agent is None: | |
| raise ValueError(f"Sub-agent {aid} not found") | |
| db_sub_agents[str(aid)] = db_agent | |
| ca = await self.build_conversable_agent(db_agent) | |
| all_agents[str(aid)] = ca | |
| agents.append(ca) | |
| root_ca = await self.build_conversable_agent(root_agent) | |
| all_agents[str(root_agent.id)] = root_ca | |
| # Apply handoffs to each sub-agent if configured, reusing cached db agents | |
| for aid in sub_agent_ids: | |
| db_agent = db_sub_agents.get(str(aid)) | |
| if db_agent and db_agent.config: | |
| self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents) |
| for h in handoffs_config: | ||
| target_id = h.get("target_agent_id") | ||
| target_agent = all_agents.get(str(target_id)) | ||
| if not target_agent: | ||
| logger.warning(f"Handoff target {target_id} not found, skipping") | ||
| continue | ||
|
|
||
| if h["type"] == "llm": | ||
| llm_conditions.append( | ||
| OnCondition( | ||
| target=AgentTarget(target_agent), | ||
| condition=StringLLMCondition(prompt=h["condition"]), | ||
| ) | ||
| ) | ||
| elif h["type"] == "context": | ||
| context_conditions.append( | ||
| OnContextCondition( | ||
| target=AgentTarget(target_agent), | ||
| condition=ExpressionContextCondition( | ||
| expression=ContextExpression(h["expression"]) | ||
| ), | ||
| ) | ||
| ) |
There was a problem hiding this comment.
suggestion: Using h["type"] without validation can raise if a handoff entry is malformed.
In _apply_handoffs, this direct h["type"] access will raise a KeyError if a handoff entry is missing type, which will break setup for a misconfigured user config. Consider using h.get("type") and skipping/logging invalid entries, consistent with how missing target_agent is handled.
| for h in handoffs_config: | |
| target_id = h.get("target_agent_id") | |
| target_agent = all_agents.get(str(target_id)) | |
| if not target_agent: | |
| logger.warning(f"Handoff target {target_id} not found, skipping") | |
| continue | |
| if h["type"] == "llm": | |
| llm_conditions.append( | |
| OnCondition( | |
| target=AgentTarget(target_agent), | |
| condition=StringLLMCondition(prompt=h["condition"]), | |
| ) | |
| ) | |
| elif h["type"] == "context": | |
| context_conditions.append( | |
| OnContextCondition( | |
| target=AgentTarget(target_agent), | |
| condition=ExpressionContextCondition( | |
| expression=ContextExpression(h["expression"]) | |
| ), | |
| ) | |
| ) | |
| for h in handoffs_config: | |
| target_id = h.get("target_agent_id") | |
| target_agent = all_agents.get(str(target_id)) | |
| if not target_agent: | |
| logger.warning(f"Handoff target {target_id} not found, skipping") | |
| continue | |
| h_type = h.get("type") | |
| if h_type not in ("llm", "context"): | |
| logger.warning( | |
| f"Invalid or missing handoff type {h_type!r} for target {target_id}, skipping" | |
| ) | |
| continue | |
| if h_type == "llm": | |
| llm_conditions.append( | |
| OnCondition( | |
| target=AgentTarget(target_agent), | |
| condition=StringLLMCondition(prompt=h["condition"]), | |
| ) | |
| ) | |
| elif h_type == "context": | |
| context_conditions.append( | |
| OnContextCondition( | |
| target=AgentTarget(target_agent), | |
| condition=ExpressionContextCondition( | |
| expression=ContextExpression(h["expression"]) | |
| ), | |
| ) | |
| ) |
| @pytest.mark.asyncio | ||
| async def test_builder_creates_group_chat(mock_db, group_chat_agent_record): | ||
| builder = AG2AgentBuilder(db=mock_db) | ||
| sub_agent = _make_agent("uuid-triage", "triage") | ||
| with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent): | ||
| result, _ = await builder.build_agent(group_chat_agent_record) | ||
| # result should be a dict with pattern and agents for initiate_group_chat | ||
| assert "pattern" in result | ||
| assert "agents" in result | ||
|
|
There was a problem hiding this comment.
suggestion (testing): Add tests for single-agent mode and API key resolution branches in AG2AgentBuilder
Current tests only cover group chat orchestration and the empty-sub-agent validation. Please also add coverage for:
singlemode:build_agentreturning aConversableAgentwhenag2_modeis unset or set to"single".- Agent metadata: name sanitization (spaces → underscores) and correct system message composition from
role,goal, andinstruction. _get_api_keybehavior:api_key_idprovided and resolved viaget_decrypted_api_key.config.api_keyas a UUID string resolved viaget_decrypted_api_key.config.api_keyas a non-UUID raw key returned as-is.- No valid key →
ValueErrorwith a clear message.
These cases help ensure behavior stays aligned with ADK/CrewAI builders and that auth errors are surfaced properly.
Suggested implementation:
@pytest.mark.asyncio
async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
builder = AG2AgentBuilder(db=mock_db)
sub_agent = _make_agent("uuid-triage", "triage")
with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent):
result, _ = await builder.build_agent(group_chat_agent_record)
# result should be a dict with pattern and agents for initiate_group_chat
assert "pattern" in result
assert "agents" in result
@pytest.mark.asyncio
async def test_builder_validates_group_chat_requires_agents(mock_db):
record = MagicMock()
record.type = "llm"
record.config = {"ag2_mode": "group_chat", "sub_agents": []} # empty — should raise
record.api_key = "test"
builder = AG2AgentBuilder(db=mock_db)
with pytest.raises(ValueError, match="at least one"):
await builder.build_agent(record)
@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_unset(mock_db):
record = MagicMock()
record.type = "llm"
record.name = "Support Team Agent"
record.api_key_id = None
record.api_key = None
record.config = {
# no ag2_mode: should default to single-agent mode
"role": "assistant",
"goal": "Help users resolve issues",
"instruction": "Be concise and friendly.",
"api_key": "raw-openai-key",
}
builder = AG2AgentBuilder(db=mock_db)
agent, metadata = await builder.build_agent(record)
# single-agent mode should produce a ConversableAgent-like object
from autogen import ConversableAgent # type: ignore[import]
assert isinstance(agent, ConversableAgent)
# name should be sanitized (spaces -> underscores)
assert agent.name == "Support_Team_Agent"
# system message should compose role, goal and instruction
system_message = getattr(agent, "system_message", "")
assert "assistant" in system_message
assert "Help users resolve issues" in system_message
assert "Be concise and friendly." in system_message
# metadata should at least echo the mode
assert metadata.get("mode") == "single"
@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_single(mock_db):
record = MagicMock()
record.type = "llm"
record.name = "Billing Bot"
record.api_key_id = None
record.api_key = None
record.config = {
"ag2_mode": "single",
"role": "assistant",
"goal": "Assist with billing questions",
"instruction": "Only answer billing-related queries.",
"api_key": "raw-openai-key",
}
builder = AG2AgentBuilder(db=mock_db)
agent, metadata = await builder.build_agent(record)
from autogen import ConversableAgent # type: ignore[import]
assert isinstance(agent, ConversableAgent)
assert agent.name == "Billing_Bot"
system_message = getattr(agent, "system_message", "")
assert "billing" in system_message.lower()
assert "only answer billing-related queries.".lower() in system_message.lower()
assert metadata.get("mode") == "single"
def test_get_api_key_with_api_key_id_uses_secret_store(mock_db):
builder = AG2AgentBuilder(db=mock_db)
record = MagicMock()
record.api_key_id = "11111111-2222-3333-4444-555555555555"
record.api_key = None
record.config = {}
with patch(
"src.services.ag2.agent_builder.get_decrypted_api_key",
return_value="resolved-secret-key",
) as mock_get:
api_key = builder._get_api_key(record)
mock_get.assert_called_once_with(mock_db, record.api_key_id)
assert api_key == "resolved-secret-key"
def test_get_api_key_with_uuid_string_in_config(mock_db):
builder = AG2AgentBuilder(db=mock_db)
config_uuid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
record = MagicMock()
record.api_key_id = None
record.api_key = None
record.config = {"api_key": config_uuid}
with patch(
"src.services.ag2.agent_builder.get_decrypted_api_key",
return_value="resolved-from-config",
) as mock_get:
api_key = builder._get_api_key(record)
mock_get.assert_called_once_with(mock_db, config_uuid)
assert api_key == "resolved-from-config"
def test_get_api_key_with_raw_non_uuid_config_value(mock_db):
builder = AG2AgentBuilder(db=mock_db)
record = MagicMock()
record.api_key_id = None
record.api_key = None
record.config = {"api_key": "sk-live-plain-openai-key"}
api_key = builder._get_api_key(record)
# Non-UUID api_key should be returned as-is, without hitting secret store
assert api_key == "sk-live-plain-openai-key"
def test_get_api_key_raises_when_no_valid_key(mock_db):
builder = AG2AgentBuilder(db=mock_db)
record = MagicMock()
record.api_key_id = None
record.api_key = None
record.config = {}
with pytest.raises(ValueError, match="API key"):
builder._get_api_key(record)- Ensure
ConversableAgentis available at the top of the test module; if you prefer not to import it inside tests, move thefrom autogen import ConversableAgentimport to the global imports section and remove the inline imports in the tests. - Verify the call signature of
get_decrypted_api_keyinsrc.services.ag2.agent_builder; the tests assumeget_decrypted_api_key(db, api_key_id). If the actual order differs, adjust theassert_called_once_withexpectations accordingly. - If
AG2AgentBuilder._get_api_keycurrently uses a different error message when no key is found, update the implementation to raiseValueError("API key could not be resolved")(or similar) so that the"API key"substring in thematchexpression is present. - If the
ConversableAgentimplementation uses a different attribute than.system_messagefor the composed instructions, either:- Update
AG2AgentBuilderto set.system_messageto the composed text, or - Adjust the tests to assert against the actual attribute used (e.g.
agent.description), keeping the checks thatrole,goal, andinstructionare all present.
- Update
| @pytest.fixture | ||
| def mock_db(): | ||
| return AsyncMock() | ||
|
|
||
| def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"): | ||
| """Create a minimal agent record mock with explicit attribute assignment.""" | ||
| a = MagicMock() | ||
| a.id = agent_id | ||
| a.name = agent_name | ||
| a.type = "llm" |
There was a problem hiding this comment.
suggestion (testing): Extend tests to cover handoff configuration and sub-agent resolution in group chat setups
AG2AgentBuilder has substantial logic in _apply_handoffs and build_group_chat_setup (LLM/context handoffs, after_work mapping, and missing sub-agent handling), but current tests only check the output shape.
Please add tests that:
- Build a root agent with sub-agents containing
handoffsof type"llm"and"context", and assert the expected conditions are registered onca.handoffs. - Use a sub-agent ID for which
get_agentreturnsNoneand assert aValueError("Sub-agent ... not found"). - Verify
after_workresolves toTerminateTargetwhen set to"terminate"and toRevertToUserTargetotherwise.
You can rely on lightweight MagicMock agents or patched AG2 classes to avoid heavy dependencies while still validating the behavior and wiring.
Suggested implementation:
from unittest.mock import MagicMock, AsyncMock, patch
from src.services.ag2.agent_builder import AG2AgentBuilder, TerminateTarget, RevertToUserTarget
@pytest.fixture
def mock_db():
return AsyncMock()
def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"):
"""Create a minimal agent record mock with explicit attribute assignment."""
a = MagicMock()
a.id = agent_id
a.name = agent_name
a.type = "llm"
a.llm_provider = "openai"
a.model = model
a.config = {"ag2_mode": ag2_mode, "api_key": "test-key"}
a.api_key = "test-key"
a.api_key_id = None
a.api_url = "https://api.openai.com/v1"
a.role = None
a.goal = None
a.instruction = f"You are {agent_name}."
return a
@pytest.fixture
def root_agent_with_handoffs():
"""Root agent configured with LLM and context handoffs to sub-agents."""
root = _make_agent("root", "Root Agent", ag2_mode="group")
# Two handoffs: one LLM and one context, with different after_work semantics
root.config["handoffs"] = [
{
"type": "llm",
"to": "sub-llm",
"condition": {"kind": "tool", "name": "delegate_to_sub_llm"},
"after_work": "terminate",
},
{
"type": "context",
"to": "sub-context",
"condition": {"kind": "message", "contains": "use context"},
"after_work": "return",
},
]
return root
@pytest.fixture
def sub_agents():
"""Sub-agents referenced by the root agent's handoffs."""
sub_llm = _make_agent("sub-llm", "Sub LLM Agent")
sub_ctx = _make_agent("sub-context", "Sub Context Agent")
return {"sub-llm": sub_llm, "sub-context": sub_ctx}
@pytest.mark.asyncio
async def test_apply_handoffs_registers_llm_and_context_handoffs(
mock_db, root_agent_with_handoffs, sub_agents
):
"""
Ensure LLM and context handoffs are transformed and registered on ca.handoffs.
This exercises AG2AgentBuilder.build_group_chat_setup and its internal
_apply_handoffs logic without pulling in heavy AG2 dependencies.
"""
builder = AG2AgentBuilder(mock_db)
# Patch get_agent so handoff resolution finds our lightweight MagicMock sub-agents.
async def fake_get_agent(agent_id: str):
return sub_agents.get(agent_id)
with patch.object(builder, "get_agent", side_effect=fake_get_agent):
# Depending on implementation, the entry point may differ.
# We assume build_group_chat_setup(root_agent) returns an object with .handoffs.
ca = await builder.build_group_chat_setup(root_agent_with_handoffs)
# We expect one handoff per entry in root_agent.config["handoffs"].
assert hasattr(ca, "handoffs")
assert len(ca.handoffs) == 2
# Order is preserved: first is LLM handoff, second is context handoff.
llm_handoff, ctx_handoff = ca.handoffs
# The tests are intentionally lightweight: we validate the wiring rather than AG2 internals.
# LLM handoff should target the sub-llm agent and have a condition based on tool usage.
assert getattr(llm_handoff, "sub_agent_id", None) == "sub-llm"
assert getattr(llm_handoff, "type", None) == "llm"
assert "delegate_to_sub_llm" in repr(getattr(llm_handoff, "condition", ""))
# Context handoff should target the sub-context agent and have a message-based condition.
assert getattr(ctx_handoff, "sub_agent_id", None) == "sub-context"
assert getattr(ctx_handoff, "type", None) == "context"
assert "use context" in repr(getattr(ctx_handoff, "condition", ""))
@pytest.mark.asyncio
async def test_apply_handoffs_missing_sub_agent_raises_value_error(
mock_db, root_agent_with_handoffs
):
"""
When get_agent returns None for a referenced sub-agent, a ValueError is raised.
"""
builder = AG2AgentBuilder(mock_db)
# Force one of the handoffs to reference a non-existent sub-agent.
root_agent_with_handoffs.config["handoffs"][0]["to"] = "missing-subagent-id"
async def fake_get_agent(agent_id: str):
# Simulate not-found for the missing ID, normal behavior otherwise.
if agent_id == "missing-subagent-id":
return None
return _make_agent(agent_id, f"Agent {agent_id}")
with patch.object(builder, "get_agent", side_effect=fake_get_agent):
with pytest.raises(ValueError, match=r"Sub-agent .* not found"):
await builder.build_group_chat_setup(root_agent_with_handoffs)
@pytest.mark.asyncio
async def test_after_work_resolves_to_correct_targets(
mock_db, root_agent_with_handoffs, sub_agents
):
"""
Verify that after_work is mapped to TerminateTarget and RevertToUserTarget.
- "terminate" -> TerminateTarget
- any other value (e.g. "return") -> RevertToUserTarget
"""
builder = AG2AgentBuilder(mock_db)
async def fake_get_agent(agent_id: str):
return sub_agents.get(agent_id)
with patch.object(builder, "get_agent", side_effect=fake_get_agent):
ca = await builder.build_group_chat_setup(root_agent_with_handoffs)
assert hasattr(ca, "handoffs")
assert len(ca.handoffs) == 2
llm_handoff, ctx_handoff = ca.handoffs
# after_work was set to "terminate" for the LLM handoff.
after_work_llm = getattr(llm_handoff, "after_work", None)
assert isinstance(after_work_llm, TerminateTarget)
# after_work was set to "return" for the context handoff.
after_work_ctx = getattr(ctx_handoff, "after_work", None)
assert isinstance(after_work_ctx, RevertToUserTarget)These tests assume the following public/semipublic API on AG2AgentBuilder and related classes:
AG2AgentBuilderhas an async methodbuild_group_chat_setup(root_agent)returning an object (here calledca) that exposes:ca.handoffs: an ordered iterable of handoff objects.
- Each handoff object has (or can be extended to have) the attributes:
type:"llm"or"context".sub_agent_id: the string ID of the sub-agent resolved fromconfig["handoffs"][i]["to"].condition: an object whosereprcontains at least the identifying text from theconditiondict in the config.after_work: an instance of eitherTerminateTargetorRevertToUserTarget.
AG2AgentBuilderexposes an async methodget_agent(agent_id: str)used by_apply_handoffsto resolve sub-agents.src.services.ag2.agent_builderexportsTerminateTargetandRevertToUserTarget.
If any of these assumptions differ from your actual implementation, you should:
- Update the test calls (
build_group_chat_setup, attribute names on the returned object, etc.) to match your real API. - Adjust the assertions on the handoff objects to match the actual attributes you expose (for example, you may store the sub-agent as
handoff.agentinstead ofsub_agent_id; in that case assert onhandoff.agent.id). - If
TerminateTarget/RevertToUserTargetare not directly exported, either import them from the correct module or assert based on whatever target representation your implementation uses (e.g., string enums like"terminate"/"user").
| @pytest.mark.asyncio | ||
| async def test_runner_yields_response_chunks(): | ||
| mock_result = { | ||
| "final_response": "Resolved: your issue is fixed.", | ||
| "message_history": [], | ||
| } | ||
| mock_settings = MagicMock() | ||
| mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://" | ||
|
|
||
| # Patch run_agent (the heavy work), session service, and settings — |
There was a problem hiding this comment.
suggestion (testing): Add tests for error conditions and non-happy-path behavior in run_agent_stream and run_agent
Please also cover:
- A case where
run_agentraises (e.g.,InternalServerError) andrun_agent_streampropagates the exception instead of yielding. - Direct
run_agenttests for:get_agentreturningNone→AgentNotFoundError.ag2_mode == "group_chat"withinitiate_group_chatpatched, asserting correct use ofchat_result.summaryandchat_history.ag2_mode == "single"withConversableAgentpatched, asserting the proxy pattern and that the final response comes from the last message.
This will exercise the control flow and error mapping beyond the happy-path streaming case.
Suggested implementation:
import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from src.exceptions import InternalServerError
from src.services.ag2.agent_runner import run_agent_stream, run_agent, AgentNotFoundError assert data["content"]["parts"][0]["text"] == "Resolved: your issue is fixed."
assert data["is_final"] is True
@pytest.mark.asyncio
async def test_run_agent_stream_propagates_internal_server_error():
mock_settings = MagicMock()
mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"
# Patch run_agent to raise, and still patch session service + settings as in happy-path test
with patch(
"src.services.ag2.agent_runner.run_agent",
AsyncMock(side_effect=InternalServerError("boom")),
):
with patch("src.services.ag2.session_service.AG2SessionService"):
with patch("src.config.settings.get_settings", return_value=mock_settings):
with pytest.raises(InternalServerError):
async for _ in run_agent_stream(
db=AsyncMock(),
agent_id="agent-123",
external_id="ext-123",
session_id="session-abc",
message="This will fail",
):
# If we ever yield, the behavior is wrong
pytest.fail("run_agent_stream should not yield on InternalServerError")
@pytest.mark.asyncio
async def test_run_agent_get_agent_none_raises_agent_not_found():
mock_db = AsyncMock()
with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
builder_instance = builder_cls.return_value
builder_instance.get_agent.return_value = None
with pytest.raises(AgentNotFoundError):
await run_agent(
db=mock_db,
agent_id="missing-agent",
external_id="ext-123",
session_id="session-abc",
message="Hello?",
)
@pytest.mark.asyncio
async def test_run_agent_group_chat_uses_summary_and_chat_history():
mock_db = AsyncMock()
# Fake agent record in group_chat mode
agent_record = MagicMock()
agent_record.ag2_mode = "group_chat"
chat_result = MagicMock()
chat_result.summary = "Group chat resolution summary"
chat_result.chat_history = [
{"role": "user", "content": "Question"},
{"role": "assistant", "content": "Intermediate"},
{"role": "assistant", "content": "Final"},
]
with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
builder_instance = builder_cls.return_value
builder_instance.get_agent.return_value = agent_record
with patch(
"src.services.ag2.agent_runner.initiate_group_chat",
AsyncMock(return_value=chat_result),
):
result = await run_agent(
db=mock_db,
agent_id="group-agent",
external_id="ext-123",
session_id="session-abc",
message="Start group chat",
)
assert result["final_response"] == chat_result.summary
assert result["message_history"] == chat_result.chat_history
@pytest.mark.asyncio
async def test_run_agent_single_mode_uses_proxy_and_last_message_response():
mock_db = AsyncMock()
# Fake agent record in single mode
agent_record = MagicMock()
agent_record.ag2_mode = "single"
# Simulated conversation messages; run_agent should use the last one as final response
conversation_messages = [
{"role": "user", "content": "Hi"},
{"role": "assistant", "content": "First reply"},
{"role": "assistant", "content": "Final reply from proxy"},
]
with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
builder_instance = builder_cls.return_value
builder_instance.get_agent.return_value = agent_record
# Mock ConversableAgent to simulate proxy pattern and returned messages
with patch("src.services.ag2.agent_runner.ConversableAgent") as ConversableAgentMock:
# Each instantiation returns a separate agent mock
primary_agent = MagicMock()
proxy_agent = MagicMock()
# The proxy agent produces the conversation messages
proxy_agent.initiate_chat.return_value = conversation_messages
ConversableAgentMock.side_effect = [primary_agent, proxy_agent]
result = await run_agent(
db=mock_db,
agent_id="single-agent",
external_id="ext-123",
session_id="session-abc",
message="Talk to me",
)
# Assert that we used the proxy pattern (two agents created)
assert ConversableAgentMock.call_count == 2
# Final response should come from the last message produced by the proxy conversation
assert result["final_response"] == conversation_messages[-1]["content"]These tests assume:
InternalServerErroris available assrc.exceptions.InternalServerError. If it lives elsewhere, update the import path accordingly.AgentNotFoundErroris exported fromsrc.services.ag2.agent_runner. If it is defined in a different module, adjust the import and thepytest.raises(...)target.run_agent:- Accepts the arguments
(db, agent_id, external_id, session_id, message, ...)as used here. - Raises
AgentNotFoundErrorwhenAG2AgentBuilder(...).get_agent(...)returnsNone. - In
"group_chat"mode callsinitiate_group_chat(...)and returns a dict with"final_response"(fromchat_result.summary) and"message_history"(fromchat_result.chat_history). - In
"single"mode constructs twoConversableAgentinstances (primary + proxy), uses the proxy'sinitiate_chatresult, and sets"final_response"from the last message's"content".
If your actual control flow differs (e.g., different attribute names, return shapes, or proxy setup), adjust the assertions and mocks (especially theside_effectonConversableAgentMockand the expected keys inresult) to match the real implementation.
- Accepts the arguments
| with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)): | ||
| with patch("src.services.ag2.session_service.AG2SessionService"): | ||
| with patch("src.config.settings.get_settings", return_value=mock_settings): | ||
| chunks = [] | ||
| async for chunk in run_agent_stream( | ||
| db=AsyncMock(), | ||
| agent_id="agent-123", | ||
| external_id="ext-123", | ||
| session_id="session-abc", | ||
| message="My printer is broken", |
There was a problem hiding this comment.
suggestion (testing): Consider asserting the full streaming envelope contract, not just the text content
Since run_agent_stream should match the WebSocket/ADK streaming envelope, this test should also validate the JSON schema, e.g.:
data["content"]["role"] == "agent"data["author"]matches theagent_idargumentpartselements have the expected{ "type": "text" }shape
This will better enforce the streaming protocol contract and detect unintended schema changes.
| mock_settings = MagicMock() | ||
| mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://" | ||
|
|
||
| # Patch run_agent (the heavy work), session service, and settings — | ||
| # get_settings and AG2SessionService are local imports inside run_agent_stream | ||
| with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)): | ||
| with patch("src.services.ag2.session_service.AG2SessionService"): | ||
| with patch("src.config.settings.get_settings", return_value=mock_settings): |
There was a problem hiding this comment.
suggestion (testing): Add a dedicated test for session history integration in run_agent
Since AG2SessionService is fully stubbed, we don’t currently verify that messages are appended and persisted. Please add a focused unit test for run_agent that injects a fake AG2SessionService with get_or_create, append, and save mocked, and asserts that:
appendis called in order with("user", message)then("assistant", final_response)saveis called once with the same session instance
This will ensure conversational state is correctly persisted for the WhatsApp pause/resume scenario described in the PR.
Suggested implementation:
import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch, call
from src.services.ag2.agent_runner import run_agent_stream, run_agent assert len(chunks) >= 1
data = json.loads(chunks[0])
@pytest.mark.asyncio
async def test_run_agent_persists_session_history_in_order():
mock_settings = MagicMock()
mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"
with patch("src.services.ag2.session_service.AG2SessionService") as MockSessionService, \
patch("src.config.settings.get_settings", return_value=mock_settings):
mock_session_service = MockSessionService.return_value
mock_session = MagicMock()
# Ensure session service methods are async and controllable
mock_session_service.get_or_create = AsyncMock(return_value=mock_session)
mock_session_service.append = AsyncMock()
mock_session_service.save = AsyncMock()
db = AsyncMock()
user_message = "My printer is broken"
result = await run_agent(
db=db,
agent_id="agent-123",
external_id="ext-123",
session_id="session-abc",
message=user_message,
)
# Append is called in order: user then assistant
mock_session_service.append.assert_has_awaits(
[
call(mock_session, "user", user_message),
call(mock_session, "assistant", result["final_response"]),
]
)
# Save is called once with the same session instance
mock_session_service.save.assert_awaited_once_with(mock_session)If the actual signature of run_agent or the AG2SessionService methods differ (for example, if append/save or get_or_create are synchronous instead of async, or if append takes different parameters), you should:
- Adjust the
AsyncMockusage toMagicMockand switch fromassert_awaited*toassert_called*accordingly. - Update the
call(...)argument lists to match the real method signatures (e.g., ifappendreceives keyword arguments or a different ordering). - If
run_agentrequires additional parameters (such as config, tools, or agent options), pass suitable dummy values in the test invocation.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements
src/services/ag2/following the same two-file builder+runner pattern used by the existing CrewAI and Google ADK engines. AddsAG2AgentBuilder(translates evo-ai Agent DB records to AG2 agents) andAG2AgentRunner(run_agent + run_agent_stream via chat_routes). Updateschat_routes.pyandservice_providers.pydispatch branches. No schema migration required: orchestration mode is stored in the existingconfigJSON column (config.ag2_mode), following the same pattern used by the existing engine integrations.Why: evo-ai currently supports Google ADK (single-agent) and CrewAI (sequential crew), but has no conversational multi-agent runtime. AG2's
GroupChatwith LLM-driven speaker selection is architecturally distinct from both: agents collaborate dynamically without a fixed execution order. The most distinctive capability isWhatsAppSuspendableProxy— AG2's human-in-the-loop mechanism maps directly to evo-ai's WhatsApp integration, enabling stateful pause-and-resume across HTTP requests, a pattern that no other engine in the platform currently supports. The integration follows the existing engine pattern exactly;chat_routes.pyalready has theAI_ENGINEdispatch structure, making the diff minimal and the review scope clear.