Skip to content

Add AG2 (formerly AutoGen) as a third AI engine (AI_ENGINE=ag2)#46

Open
VasiliyRad wants to merge 3 commits intoEvolutionAPI:mainfrom
VasiliyRad:vasiliyr/03112026
Open

Add AG2 (formerly AutoGen) as a third AI engine (AI_ENGINE=ag2)#46
VasiliyRad wants to merge 3 commits intoEvolutionAPI:mainfrom
VasiliyRad:vasiliyr/03112026

Conversation

@VasiliyRad
Copy link

@VasiliyRad VasiliyRad commented Mar 12, 2026

Implements src/services/ag2/ following the same two-file builder+runner pattern used by the existing CrewAI and Google ADK engines. Adds AG2AgentBuilder (translates evo-ai Agent DB records to AG2 agents) and AG2AgentRunner (run_agent + run_agent_stream via chat_routes). Updates chat_routes.py and service_providers.py dispatch branches. No schema migration required: orchestration mode is stored in the existing config JSON column (config.ag2_mode), following the same pattern used by the existing engine integrations.

Why: evo-ai currently supports Google ADK (single-agent) and CrewAI (sequential crew), but has no conversational multi-agent runtime. AG2's GroupChat with LLM-driven speaker selection is architecturally distinct from both: agents collaborate dynamically without a fixed execution order. The most distinctive capability is WhatsAppSuspendableProxy — AG2's human-in-the-loop mechanism maps directly to evo-ai's WhatsApp integration, enabling stateful pause-and-resume across HTTP requests, a pattern that no other engine in the platform currently supports. The integration follows the existing engine pattern exactly; chat_routes.py already has the AI_ENGINE dispatch structure, making the diff minimal and the review scope clear.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@sourcery-ai
Copy link

sourcery-ai bot commented Mar 12, 2026

Reviewer's Guide

Adds AG2 (AutoGen/AG2) as a third AI engine, implementing an AG2-specific builder and runner, session persistence, MCP integration, and custom HTTP tools, and wiring them into the existing AI engine dispatch paths without requiring schema changes.

Sequence diagram for AG2 run_agent request flow

sequenceDiagram
    actor User
    participant ChatRoutes as chat_routes
    participant ServiceProviders as service_providers
    participant AG2Runner as ag2_run_agent
    participant Builder as AG2AgentBuilder
    participant SessionSvc as AG2SessionService
    participant AG2 as AG2_ConversableAgent_or_GroupChat
    participant DB as Database

    User->>ChatRoutes: HTTP POST /chat (AI_ENGINE=ag2)
    ChatRoutes->>ServiceProviders: resolve_engine(ag2)
    ServiceProviders-->>ChatRoutes: run_agent reference

    ChatRoutes->>AG2Runner: run_agent(agent_id, external_id, message, session_service, db)
    AG2Runner->>DB: get_agent(agent_id)
    DB-->>AG2Runner: Agent record

    AG2Runner->>Builder: build_agent(root_agent)
    Builder->>DB: get_agent(sub_agent_id)*
    DB-->>Builder: sub_agents
    Builder-->>AG2Runner: ConversableAgent or group_chat_setup

    AG2Runner->>SessionSvc: get_or_create(agent_id, external_id)
    SessionSvc->>DB: load_or_create AG2StorageSession
    DB-->>SessionSvc: AG2StorageSession
    SessionSvc-->>AG2Runner: AG2Session

    AG2Runner->>SessionSvc: build_messages(session)
    SessionSvc-->>AG2Runner: history

    alt ag2_mode == group_chat
        AG2Runner->>AG2: initiate_group_chat(pattern, history + message)
        AG2-->>AG2Runner: chat_result, final_context, last_agent
    else ag2_mode == single
        AG2Runner->>AG2: initiate_chat(single_agent, message, history)
        AG2-->>AG2Runner: chat_result
    end

    AG2Runner->>SessionSvc: append(session, "user", message)
    AG2Runner->>SessionSvc: append(session, "assistant", final_response)
    SessionSvc->>DB: update AG2StorageSession.messages
    DB-->>SessionSvc: ok

    AG2Runner-->>ChatRoutes: {final_response, message_history}
    ChatRoutes-->>User: HTTP response with assistant message
Loading

Entity relationship diagram for new AG2StorageSession table

erDiagram
    AG2StorageSession {
        string app_name PK
        string user_id PK
        string id PK
        jsonb messages
        timestamp create_time
        timestamp update_time
    }
Loading

File-Level Changes

Change Details Files
Introduce AG2 agent builder to translate DB agent records into AG2 ConversableAgent or GroupChat setups.
  • Implement AG2AgentBuilder with shared API key resolution logic reused from existing engines
  • Build LLMConfig and system messages from agent fields and enforce AG2-safe agent names
  • Support single-agent and group_chat modes driven by config.ag2_mode and sub_agents in agent config
  • Implement configurable handoff logic between agents using LLM and context-based conditions and after_work behavior
src/services/ag2/agent_builder.py
Add AG2 session persistence layer backed by SQLAlchemy, with portable JSON storage.
  • Define DynamicJSON TypeDecorator that uses JSONB on PostgreSQL and JSON-serialized TEXT elsewhere
  • Create AG2StorageSession ORM model and AG2Session in-memory representation
  • Implement AG2SessionService with get_or_create, build_messages, append, save, and delete operations, composing a stable session_id from agent and external ids
  • Initialize engine and tables lazily from a DB URL and log lifecycle events
src/services/ag2/session_service.py
Integrate MCP servers as AG2 tools via the AG2 MCP experimental API.
  • Add AG2MCPService that discovers configured MCP servers from agent config and DB and connects via autogen.tools.experimental.McpServer
  • Resolve env@@ placeholders against per-server env overrides and process both stored and custom MCP server configs
  • List tools from each connected server, optionally filtering to an allowlist, and aggregate them for agent registration
  • Handle missing AG2 MCP extra gracefully with feature flag and logging
src/services/ag2/mcp_service.py
Provide custom HTTP tools that AG2 agents can call as functions.
  • Implement AG2CustomToolBuilder to construct callable HTTP tools from declarative config sections (http_tools/custom_tools/tools)
  • Support path, query, and body parameters, default values, templated headers, and value merging across sources
  • Perform HTTP requests with requests, encode responses and errors as JSON strings, and support basic error-handling options including fallback responses
src/services/ag2/custom_tool.py
Implement AG2 agent runner and streaming wrapper wired to existing chat infrastructure.
  • Create run_agent to fetch DB agent, build AG2 agent or GroupChat via AG2AgentBuilder, and execute either group_chat (initiate_group_chat) or a proxy-driven single-agent chat
  • Use AG2SessionService to load and persist message history per agent/external user, appending user and assistant messages after each exchange
  • Wrap execution in OpenTelemetry tracing and normalize output to final_response plus message_history
  • Implement run_agent_stream that currently performs a full exchange and yields a single final chunk in the same shape as ADK streaming, constructing AG2SessionService from app settings
src/services/ag2/agent_runner.py
Add tests for AG2 builder and runner behaviors.
  • Test that AG2AgentBuilder.build_agent constructs group_chat setup dict when configured and validates presence of sub_agents
  • Test that run_agent_stream delegates to run_agent and yields a correctly shaped final event payload using mocks for settings and session service
tests/services/ag2/test_agent_builder.py
tests/services/ag2/test_agent_runner.py
Harden service imports for optional engines.
  • Wrap google-adk run_agent import in try/except ImportError so tests can run without the optional dependency
src/services/__init__.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 15 issues, and left some high level feedback:

  • The AG2SessionService creates its own SQLAlchemy engine and calls Base.metadata.create_all at construction time (and is instantiated inside run_agent_stream), which bypasses the app’s existing DB session/metadata lifecycle and will repeatedly run DDL; consider wiring it into the existing Session/engine infrastructure and letting migrations manage the ag2_sessions table.
  • AG2StorageSession.messages is typed as MutableDict but defaults to [] and is treated as a list in get_or_create/build_messages, which is inconsistent with the column type; aligning this to a list-oriented mutable type or to a dict consistently will avoid subtle ORM/serialization issues.
  • DynamicJSON lacks cache_ok = True and always JSON-serializes non-Postgres values, which can be a performance and warning source in SQLAlchemy 2.x; consider adding cache_ok = True and confirming this decorator matches the project’s existing JSON handling pattern.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The AG2SessionService creates its own SQLAlchemy engine and calls Base.metadata.create_all at construction time (and is instantiated inside run_agent_stream), which bypasses the app’s existing DB session/metadata lifecycle and will repeatedly run DDL; consider wiring it into the existing Session/engine infrastructure and letting migrations manage the ag2_sessions table.
- AG2StorageSession.messages is typed as MutableDict but defaults to [] and is treated as a list in get_or_create/build_messages, which is inconsistent with the column type; aligning this to a list-oriented mutable type or to a dict consistently will avoid subtle ORM/serialization issues.
- DynamicJSON lacks cache_ok = True and always JSON-serializes non-Postgres values, which can be a performance and warning source in SQLAlchemy 2.x; consider adding cache_ok = True and confirming this decorator matches the project’s existing JSON handling pattern.

## Individual Comments

### Comment 1
<location path="src/services/ag2/session_service.py" line_range="95-96" />
<code_context>
+    id: Mapped[str] = mapped_column(
+        String, primary_key=True, default=lambda: str(uuid.uuid4())
+    )
+    messages: Mapped[MutableDict[str, Any]] = mapped_column(
+        MutableDict.as_mutable(DynamicJSON), default=[]
+    )
+    create_time: Mapped[DateTime] = mapped_column(DateTime(), default=func.now())
</code_context>
<issue_to_address>
**issue (bug_risk):** The `messages` column uses `MutableDict` but defaults to a list, which is inconsistent and may break SQLAlchemy change tracking.

The column is typed as `Mapped[MutableDict[str, Any]]` and uses `MutableDict.as_mutable(DynamicJSON)`, but the default is `[]`, which is a list. `MutableDict` expects a mapping and may not track changes correctly with a list value. If `messages` is meant to be a list of message dicts, switch to `MutableList` (or a custom mutable list type) and update the type hint; otherwise, use a dict default (e.g. `{}`) and treat it consistently as a mapping.
</issue_to_address>

### Comment 2
<location path="src/services/ag2/session_service.py" line_range="130-139" />
<code_context>
+    def get_or_create(self, agent_id: str, external_id: str) -> AG2Session:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** `get_or_create` performs two separate DB roundtrips for the same session where one would suffice.

Inside `get_or_create`, you open one session to create/commit the `AG2StorageSession`, then a second session to fetch the same row just to load `messages`. Instead, reuse the first `record` to build `AG2Session` and skip the second `db.get`/session, which both cuts DB roundtrips and removes the race window between the two transactions.

Suggested implementation:

```python
    def get_or_create(self, agent_id: str, external_id: str) -> AG2Session:
        """Retrieve an existing session or create a new one.

        This implementation performs all work in a single DB session to avoid
        extra roundtrips and race windows between separate transactions.
        """
        session_id = f"{external_id}_{agent_id}"

        with self.SessionLocal() as db:
            # Single roundtrip: either fetch existing or create, then return.
            record = db.get(AG2StorageSession, (agent_id, external_id, session_id))

            if record is None:
                record = AG2StorageSession(
                    app_name=agent_id,
                    user_id=external_id,
                    id=session_id,
                    messages=[],
                )
                db.add(record)
                db.commit()
                # Ensure we have a fully-populated, persistent instance
                db.refresh(record)

            # Make sure messages are loaded while the session is still open.
            # If `messages` is a lazy relationship, this will trigger the load here.
            _ = record.messages

            # Build the AG2Session directly from the record we already have,
            # instead of re-opening a new session and re-querying the same row.
            return AG2Session(
                agent_id=record.app_name,
                external_id=record.user_id,
                session_id=record.id,
                messages=list(record.messages or []),
            )

```

Because I only saw the beginning of `get_or_create`, you should:
1. Remove any remaining code after this method that:
   - Opens a second `with self.SessionLocal() as db:` block inside `get_or_create`, or
   - Re-fetches `AG2StorageSession` (e.g., `db.get(AG2StorageSession, ...)`) for the same `agent_id`/`external_id`/`session_id`.
   That logic is now redundant and should be deleted.
2. If your `AG2Session` constructor has a different signature than shown here, adapt the `AG2Session(...)` call accordingly, but keep the pattern of constructing it directly from `record` in this single session.
3. If `messages` is not a relationship but a plain JSON/array column, you can drop the `_ = record.messages` line; the rest of the logic remains the same.
</issue_to_address>

### Comment 3
<location path="src/services/ag2/mcp_service.py" line_range="159-161" />
<code_context>
+                command = server_config.get("command", "npx")
+                args = server_config.get("args", [])
+                env = server_config.get("env", {})
+                if env:
+                    for key, value in env.items():
+                        os.environ[key] = value
+                server = McpServer({"command": command, "args": args, "env": env})
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Mutating `os.environ` for MCP server env vars introduces global side effects and is likely unnecessary.

In `_connect_server`, you both build an `env` dict for `McpServer` and also write those values into `os.environ`. This leaks per-server config into the global process and can cause unexpected interactions between servers or other code. It should be enough to pass `env` into `McpServer` without modifying `os.environ`.
</issue_to_address>

### Comment 4
<location path="src/services/ag2/agent_builder.py" line_range="134-141" />
<code_context>
+        # Build all sub-agents first so handoff resolution can reference them
+        all_agents = {}
+        agents = []
+        for aid in sub_agent_ids:
+            db_agent = get_agent(self.db, str(aid))
+            if db_agent is None:
+                raise ValueError(f"Sub-agent {aid} not found")
+            ca = await self.build_conversable_agent(db_agent)
+            all_agents[str(aid)] = ca
+            agents.append(ca)
+
+        root_ca = await self.build_conversable_agent(root_agent)
+        all_agents[str(root_agent.id)] = root_ca
+
+        # Apply handoffs to each agent if configured
+        for aid in sub_agent_ids:
+            db_agent = get_agent(self.db, str(aid))
+            if db_agent and db_agent.config:
+                self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)
+
</code_context>
<issue_to_address>
**suggestion (performance):** Sub-agents are fetched from the database twice: once to build them and again to apply handoffs.

In `build_group_chat_setup`, each `aid` triggers two `get_agent` calls—first to build `ConversableAgent`s, then again when applying handoffs. Consider storing the initial `db_agent` objects (e.g., an id → db_agent dict) and reusing them when applying handoffs to avoid duplicate database queries and keep the lookup logic centralized.

```suggestion
        config = root_agent.config or {}
        sub_agent_ids = config.get("sub_agents", [])
        if not sub_agent_ids:
            raise ValueError("group_chat agent requires at least one sub_agent")

        # Build all sub-agents first so handoff resolution can reference them
        all_agents = {}
        agents = []
        db_sub_agents = {}

        for aid in sub_agent_ids:
            db_agent = get_agent(self.db, str(aid))
            if db_agent is None:
                raise ValueError(f"Sub-agent {aid} not found")

            db_sub_agents[str(aid)] = db_agent
            ca = await self.build_conversable_agent(db_agent)
            all_agents[str(aid)] = ca
            agents.append(ca)

        root_ca = await self.build_conversable_agent(root_agent)
        all_agents[str(root_agent.id)] = root_ca

        # Apply handoffs to each sub-agent if configured, reusing cached db agents
        for aid in sub_agent_ids:
            db_agent = db_sub_agents.get(str(aid))
            if db_agent and db_agent.config:
                self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)
```
</issue_to_address>

### Comment 5
<location path="src/services/ag2/agent_builder.py" line_range="94-116" />
<code_context>
+                logger.warning(f"Handoff target {target_id} not found, skipping")
+                continue
+
+            if h["type"] == "llm":
+                llm_conditions.append(
+                    OnCondition(
+                        target=AgentTarget(target_agent),
+                        condition=StringLLMCondition(prompt=h["condition"]),
+                    )
+                )
+            elif h["type"] == "context":
+                context_conditions.append(
+                    OnContextCondition(
</code_context>
<issue_to_address>
**suggestion:** Using `h["type"]` without validation can raise if a handoff entry is malformed.

In `_apply_handoffs`, this direct `h["type"]` access will raise a `KeyError` if a handoff entry is missing `type`, which will break setup for a misconfigured user config. Consider using `h.get("type")` and skipping/logging invalid entries, consistent with how missing `target_agent` is handled.

```suggestion
        for h in handoffs_config:
            target_id = h.get("target_agent_id")
            target_agent = all_agents.get(str(target_id))
            if not target_agent:
                logger.warning(f"Handoff target {target_id} not found, skipping")
                continue

            h_type = h.get("type")
            if h_type not in ("llm", "context"):
                logger.warning(
                    f"Invalid or missing handoff type {h_type!r} for target {target_id}, skipping"
                )
                continue

            if h_type == "llm":
                llm_conditions.append(
                    OnCondition(
                        target=AgentTarget(target_agent),
                        condition=StringLLMCondition(prompt=h["condition"]),
                    )
                )
            elif h_type == "context":
                context_conditions.append(
                    OnContextCondition(
                        target=AgentTarget(target_agent),
                        condition=ExpressionContextCondition(
                            expression=ContextExpression(h["expression"])
                        ),
                    )
                )
```
</issue_to_address>

### Comment 6
<location path="src/services/ag2/custom_tool.py" line_range="109-116" />
<code_context>
+                    ):
+                        body_data[param] = value
+
+                response = requests.request(
+                    method=method,
+                    url=url,
+                    headers=processed_headers,
+                    params=query_params_dict,
+                    json=body_data or None,
+                    timeout=error_handling.get("timeout", 30),
+                )
+
+                if response.status_code >= 400:
+                    raise requests.exceptions.HTTPError(
+                        f"Error in the request: {response.status_code} - {response.text}"
+                    )
+
+                return json.dumps(response.json())
+
+            except Exception as e:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Assuming all HTTP responses are JSON can cause tool failures for non‑JSON endpoints.

`response.json()` is called unconditionally, so plain text/HTML responses (including some error bodies) will raise and drop you into the generic `Exception` handler, obscuring the real error. Wrap `response.json()` in a `try/except ValueError` and fall back to `response.text` or a consistent JSON wrapper so non‑JSON responses are handled gracefully.

```suggestion
                if response.status_code >= 400:
                    raise requests.exceptions.HTTPError(
                        f"Error in the request: {response.status_code} - {response.text}"
                    )

                try:
                    response_data = response.json()
                except ValueError:
                    # Non-JSON response; fall back to a consistent JSON wrapper
                    response_data = {
                        "status_code": response.status_code,
                        "headers": dict(response.headers),
                        "raw_response": response.text,
                    }

                return json.dumps(response_data)

            except Exception as e:
```
</issue_to_address>

### Comment 7
<location path="src/services/ag2/agent_runner.py" line_range="46-50" />
<code_context>
+        try:
+            ag2_mode = (db_agent.config or {}).get("ag2_mode", "single")
+            if ag2_mode == "group_chat":
+                chat_result, final_context, last_agent = initiate_group_chat(
+                    pattern=result["pattern"],
+                    messages=history + [message],
+                    max_rounds=result["max_rounds"],
+                    context_variables=result["context_variables"],
+                )
+                final_response = chat_result.summary or (
</code_context>
<issue_to_address>
**issue (bug_risk):** Group chat `messages` mixes dict history with a raw string for the new message, which may not match AG2’s expected format.

Here `messages` is built as `history + [message]`, mixing `{role, content}` dicts with a plain string. If `initiate_group_chat` expects a uniform list of message dicts, this could break routing/formatting. Consider wrapping `message` as `{"role": "user", "content": message}` to keep the structure consistent.
</issue_to_address>

### Comment 8
<location path="src/services/ag2/agent_runner.py" line_range="24" />
<code_context>
+    session_service: AG2SessionService,
+    db: Session,
+    session_id: Optional[str] = None,
+    timeout: float = 60.0,
+    files: Optional[list] = None,
+) -> dict:
</code_context>
<issue_to_address>
**nitpick:** The `timeout` parameter on `run_agent` is currently unused.

Since `timeout` isn’t used anywhere in the implementation, the signature is misleading. Please either apply it to the underlying execution (e.g., wrap the blocking call in `asyncio.wait_for`) or remove the parameter.
</issue_to_address>

### Comment 9
<location path="src/services/ag2/agent_runner.py" line_range="21" />
<code_context>
+    agent_id: str,
+    external_id: str,
+    message: str,
+    session_service: AG2SessionService,
+    db: Session,
+    session_id: Optional[str] = None,
</code_context>
<issue_to_address>
**question (bug_risk):** Session IDs are accepted but not used, so distinct sessions for the same agent/user cannot be differentiated.

`run_agent` and `run_agent_stream` accept `session_id`, but `AG2SessionService.get_or_create` always uses `f"{external_id}_{agent_id}"`, so concurrent sessions for the same agent/user are merged. If multiple sessions are intended, include `session_id` in the session key; if not, consider removing `session_id` from the API to avoid confusion.
</issue_to_address>

### Comment 10
<location path="src/services/ag2/agent_runner.py" line_range="8-17" />
<code_context>
+    Token-level streaming can be added in a future iteration by wiring
+    ConversableAgent's `process_last_received_message` hook to a queue.
+    """
+    from src.services.ag2.session_service import AG2SessionService
+    from src.config.settings import get_settings
+    settings = get_settings()
+    session_service = AG2SessionService(db_url=settings.POSTGRES_CONNECTION_STRING)
+
+    result = await run_agent(
</code_context>
<issue_to_address>
**suggestion (performance):** Creating a new `AG2SessionService` (and engine) on every `run_agent_stream` call is potentially expensive.

In `run_agent_stream`, each request creates a new `AG2SessionService`, which builds a new SQLAlchemy engine and metadata. Consider reusing a shared `AG2SessionService` or at least a shared engine/session factory to avoid repeated DB setup and reduce overhead under load.
</issue_to_address>

### Comment 11
<location path="tests/services/ag2/test_agent_builder.py" line_range="40-49" />
<code_context>
+    }
+    return a
+
+@pytest.mark.asyncio
+async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
+    builder = AG2AgentBuilder(db=mock_db)
+    sub_agent = _make_agent("uuid-triage", "triage")
+    with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent):
+        result, _ = await builder.build_agent(group_chat_agent_record)
+    # result should be a dict with pattern and agents for initiate_group_chat
+    assert "pattern" in result
+    assert "agents" in result
+
+@pytest.mark.asyncio
+async def test_builder_validates_group_chat_requires_agents(mock_db):
+    record = MagicMock()
+    record.type = "llm"
+    record.config = {"ag2_mode": "group_chat", "sub_agents": []}  # empty — should raise
+    record.api_key = "test"
+    builder = AG2AgentBuilder(db=mock_db)
+    with pytest.raises(ValueError, match="at least one"):
+        await builder.build_agent(record)
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for single-agent mode and API key resolution branches in AG2AgentBuilder

Current tests only cover group chat orchestration and the empty-sub-agent validation. Please also add coverage for:

- `single` mode: `build_agent` returning a `ConversableAgent` when `ag2_mode` is unset or set to `"single"`.
- Agent metadata: name sanitization (spaces → underscores) and correct system message composition from `role`, `goal`, and `instruction`.
- `_get_api_key` behavior:
  - `api_key_id` provided and resolved via `get_decrypted_api_key`.
  - `config.api_key` as a UUID string resolved via `get_decrypted_api_key`.
  - `config.api_key` as a non-UUID raw key returned as-is.
  - No valid key → `ValueError` with a clear message.

These cases help ensure behavior stays aligned with ADK/CrewAI builders and that auth errors are surfaced properly.

Suggested implementation:

```python
@pytest.mark.asyncio
async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
    builder = AG2AgentBuilder(db=mock_db)
    sub_agent = _make_agent("uuid-triage", "triage")
    with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent):
        result, _ = await builder.build_agent(group_chat_agent_record)
    # result should be a dict with pattern and agents for initiate_group_chat
    assert "pattern" in result
    assert "agents" in result

@pytest.mark.asyncio
async def test_builder_validates_group_chat_requires_agents(mock_db):
    record = MagicMock()
    record.type = "llm"
    record.config = {"ag2_mode": "group_chat", "sub_agents": []}  # empty — should raise
    record.api_key = "test"
    builder = AG2AgentBuilder(db=mock_db)
    with pytest.raises(ValueError, match="at least one"):
        await builder.build_agent(record)


@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_unset(mock_db):
    record = MagicMock()
    record.type = "llm"
    record.name = "Support Team Agent"
    record.api_key_id = None
    record.api_key = None
    record.config = {
        # no ag2_mode: should default to single-agent mode
        "role": "assistant",
        "goal": "Help users resolve issues",
        "instruction": "Be concise and friendly.",
        "api_key": "raw-openai-key",
    }

    builder = AG2AgentBuilder(db=mock_db)
    agent, metadata = await builder.build_agent(record)

    # single-agent mode should produce a ConversableAgent-like object
    from autogen import ConversableAgent  # type: ignore[import]
    assert isinstance(agent, ConversableAgent)

    # name should be sanitized (spaces -> underscores)
    assert agent.name == "Support_Team_Agent"

    # system message should compose role, goal and instruction
    system_message = getattr(agent, "system_message", "")
    assert "assistant" in system_message
    assert "Help users resolve issues" in system_message
    assert "Be concise and friendly." in system_message
    # metadata should at least echo the mode
    assert metadata.get("mode") == "single"


@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_single(mock_db):
    record = MagicMock()
    record.type = "llm"
    record.name = "Billing Bot"
    record.api_key_id = None
    record.api_key = None
    record.config = {
        "ag2_mode": "single",
        "role": "assistant",
        "goal": "Assist with billing questions",
        "instruction": "Only answer billing-related queries.",
        "api_key": "raw-openai-key",
    }

    builder = AG2AgentBuilder(db=mock_db)
    agent, metadata = await builder.build_agent(record)

    from autogen import ConversableAgent  # type: ignore[import]
    assert isinstance(agent, ConversableAgent)
    assert agent.name == "Billing_Bot"

    system_message = getattr(agent, "system_message", "")
    assert "billing" in system_message.lower()
    assert "only answer billing-related queries.".lower() in system_message.lower()
    assert metadata.get("mode") == "single"


def test_get_api_key_with_api_key_id_uses_secret_store(mock_db):
    builder = AG2AgentBuilder(db=mock_db)
    record = MagicMock()
    record.api_key_id = "11111111-2222-3333-4444-555555555555"
    record.api_key = None
    record.config = {}

    with patch(
        "src.services.ag2.agent_builder.get_decrypted_api_key",
        return_value="resolved-secret-key",
    ) as mock_get:
        api_key = builder._get_api_key(record)

    mock_get.assert_called_once_with(mock_db, record.api_key_id)
    assert api_key == "resolved-secret-key"


def test_get_api_key_with_uuid_string_in_config(mock_db):
    builder = AG2AgentBuilder(db=mock_db)
    config_uuid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
    record = MagicMock()
    record.api_key_id = None
    record.api_key = None
    record.config = {"api_key": config_uuid}

    with patch(
        "src.services.ag2.agent_builder.get_decrypted_api_key",
        return_value="resolved-from-config",
    ) as mock_get:
        api_key = builder._get_api_key(record)

    mock_get.assert_called_once_with(mock_db, config_uuid)
    assert api_key == "resolved-from-config"


def test_get_api_key_with_raw_non_uuid_config_value(mock_db):
    builder = AG2AgentBuilder(db=mock_db)
    record = MagicMock()
    record.api_key_id = None
    record.api_key = None
    record.config = {"api_key": "sk-live-plain-openai-key"}

    api_key = builder._get_api_key(record)

    # Non-UUID api_key should be returned as-is, without hitting secret store
    assert api_key == "sk-live-plain-openai-key"


def test_get_api_key_raises_when_no_valid_key(mock_db):
    builder = AG2AgentBuilder(db=mock_db)
    record = MagicMock()
    record.api_key_id = None
    record.api_key = None
    record.config = {}

    with pytest.raises(ValueError, match="API key"):
        builder._get_api_key(record)

```

1. Ensure `ConversableAgent` is available at the top of the test module; if you prefer not to import it inside tests, move the `from autogen import ConversableAgent` import to the global imports section and remove the inline imports in the tests.
2. Verify the call signature of `get_decrypted_api_key` in `src.services.ag2.agent_builder`; the tests assume `get_decrypted_api_key(db, api_key_id)`. If the actual order differs, adjust the `assert_called_once_with` expectations accordingly.
3. If `AG2AgentBuilder._get_api_key` currently uses a different error message when no key is found, update the implementation to raise `ValueError("API key could not be resolved")` (or similar) so that the `"API key"` substring in the `match` expression is present.
4. If the `ConversableAgent` implementation uses a different attribute than `.system_message` for the composed instructions, either:
   - Update `AG2AgentBuilder` to set `.system_message` to the composed text, or
   - Adjust the tests to assert against the actual attribute used (e.g. `agent.description`), keeping the checks that `role`, `goal`, and `instruction` are all present.
</issue_to_address>

### Comment 12
<location path="tests/services/ag2/test_agent_builder.py" line_range="6-15" />
<code_context>
+from unittest.mock import MagicMock, AsyncMock, patch
+from src.services.ag2.agent_builder import AG2AgentBuilder
+
+@pytest.fixture
+def mock_db():
+    return AsyncMock()
+
+def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"):
+    """Create a minimal agent record mock with explicit attribute assignment."""
+    a = MagicMock()
+    a.id = agent_id
+    a.name = agent_name
+    a.type = "llm"
+    a.llm_provider = "openai"
+    a.model = model
+    a.config = {"ag2_mode": ag2_mode, "api_key": "test-key"}
+    a.api_key = "test-key"
+    a.api_key_id = None
+    a.api_url = "https://api.openai.com/v1"
+    a.role = None
+    a.goal = None
+    a.instruction = f"You are {agent_name}."
+    return a
+
+@pytest.fixture
</code_context>
<issue_to_address>
**suggestion (testing):** Extend tests to cover handoff configuration and sub-agent resolution in group chat setups

`AG2AgentBuilder` has substantial logic in `_apply_handoffs` and `build_group_chat_setup` (LLM/context handoffs, `after_work` mapping, and missing sub-agent handling), but current tests only check the output shape.

Please add tests that:
- Build a root agent with sub-agents containing `handoffs` of type `"llm"` and `"context"`, and assert the expected conditions are registered on `ca.handoffs`.
- Use a sub-agent ID for which `get_agent` returns `None` and assert a `ValueError("Sub-agent ... not found")`.
- Verify `after_work` resolves to `TerminateTarget` when set to `"terminate"` and to `RevertToUserTarget` otherwise.

You can rely on lightweight `MagicMock` agents or patched AG2 classes to avoid heavy dependencies while still validating the behavior and wiring.

Suggested implementation:

```python
from unittest.mock import MagicMock, AsyncMock, patch
from src.services.ag2.agent_builder import AG2AgentBuilder, TerminateTarget, RevertToUserTarget

@pytest.fixture
def mock_db():
    return AsyncMock()

def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"):
    """Create a minimal agent record mock with explicit attribute assignment."""
    a = MagicMock()
    a.id = agent_id
    a.name = agent_name
    a.type = "llm"
    a.llm_provider = "openai"
    a.model = model
    a.config = {"ag2_mode": ag2_mode, "api_key": "test-key"}
    a.api_key = "test-key"
    a.api_key_id = None
    a.api_url = "https://api.openai.com/v1"
    a.role = None
    a.goal = None
    a.instruction = f"You are {agent_name}."
    return a


@pytest.fixture
def root_agent_with_handoffs():
    """Root agent configured with LLM and context handoffs to sub-agents."""
    root = _make_agent("root", "Root Agent", ag2_mode="group")

    # Two handoffs: one LLM and one context, with different after_work semantics
    root.config["handoffs"] = [
        {
            "type": "llm",
            "to": "sub-llm",
            "condition": {"kind": "tool", "name": "delegate_to_sub_llm"},
            "after_work": "terminate",
        },
        {
            "type": "context",
            "to": "sub-context",
            "condition": {"kind": "message", "contains": "use context"},
            "after_work": "return",
        },
    ]
    return root


@pytest.fixture
def sub_agents():
    """Sub-agents referenced by the root agent's handoffs."""
    sub_llm = _make_agent("sub-llm", "Sub LLM Agent")
    sub_ctx = _make_agent("sub-context", "Sub Context Agent")
    return {"sub-llm": sub_llm, "sub-context": sub_ctx}


@pytest.mark.asyncio
async def test_apply_handoffs_registers_llm_and_context_handoffs(
    mock_db, root_agent_with_handoffs, sub_agents
):
    """
    Ensure LLM and context handoffs are transformed and registered on ca.handoffs.

    This exercises AG2AgentBuilder.build_group_chat_setup and its internal
    _apply_handoffs logic without pulling in heavy AG2 dependencies.
    """
    builder = AG2AgentBuilder(mock_db)

    # Patch get_agent so handoff resolution finds our lightweight MagicMock sub-agents.
    async def fake_get_agent(agent_id: str):
        return sub_agents.get(agent_id)

    with patch.object(builder, "get_agent", side_effect=fake_get_agent):
        # Depending on implementation, the entry point may differ.
        # We assume build_group_chat_setup(root_agent) returns an object with .handoffs.
        ca = await builder.build_group_chat_setup(root_agent_with_handoffs)

    # We expect one handoff per entry in root_agent.config["handoffs"].
    assert hasattr(ca, "handoffs")
    assert len(ca.handoffs) == 2

    # Order is preserved: first is LLM handoff, second is context handoff.
    llm_handoff, ctx_handoff = ca.handoffs

    # The tests are intentionally lightweight: we validate the wiring rather than AG2 internals.
    # LLM handoff should target the sub-llm agent and have a condition based on tool usage.
    assert getattr(llm_handoff, "sub_agent_id", None) == "sub-llm"
    assert getattr(llm_handoff, "type", None) == "llm"
    assert "delegate_to_sub_llm" in repr(getattr(llm_handoff, "condition", ""))

    # Context handoff should target the sub-context agent and have a message-based condition.
    assert getattr(ctx_handoff, "sub_agent_id", None) == "sub-context"
    assert getattr(ctx_handoff, "type", None) == "context"
    assert "use context" in repr(getattr(ctx_handoff, "condition", ""))


@pytest.mark.asyncio
async def test_apply_handoffs_missing_sub_agent_raises_value_error(
    mock_db, root_agent_with_handoffs
):
    """
    When get_agent returns None for a referenced sub-agent, a ValueError is raised.
    """
    builder = AG2AgentBuilder(mock_db)

    # Force one of the handoffs to reference a non-existent sub-agent.
    root_agent_with_handoffs.config["handoffs"][0]["to"] = "missing-subagent-id"

    async def fake_get_agent(agent_id: str):
        # Simulate not-found for the missing ID, normal behavior otherwise.
        if agent_id == "missing-subagent-id":
            return None
        return _make_agent(agent_id, f"Agent {agent_id}")

    with patch.object(builder, "get_agent", side_effect=fake_get_agent):
        with pytest.raises(ValueError, match=r"Sub-agent .* not found"):
            await builder.build_group_chat_setup(root_agent_with_handoffs)


@pytest.mark.asyncio
async def test_after_work_resolves_to_correct_targets(
    mock_db, root_agent_with_handoffs, sub_agents
):
    """
    Verify that after_work is mapped to TerminateTarget and RevertToUserTarget.

    - "terminate" -> TerminateTarget
    - any other value (e.g. "return") -> RevertToUserTarget
    """
    builder = AG2AgentBuilder(mock_db)

    async def fake_get_agent(agent_id: str):
        return sub_agents.get(agent_id)

    with patch.object(builder, "get_agent", side_effect=fake_get_agent):
        ca = await builder.build_group_chat_setup(root_agent_with_handoffs)

    assert hasattr(ca, "handoffs")
    assert len(ca.handoffs) == 2

    llm_handoff, ctx_handoff = ca.handoffs

    # after_work was set to "terminate" for the LLM handoff.
    after_work_llm = getattr(llm_handoff, "after_work", None)
    assert isinstance(after_work_llm, TerminateTarget)

    # after_work was set to "return" for the context handoff.
    after_work_ctx = getattr(ctx_handoff, "after_work", None)
    assert isinstance(after_work_ctx, RevertToUserTarget)

```

These tests assume the following public/semipublic API on `AG2AgentBuilder` and related classes:

1. `AG2AgentBuilder` has an async method `build_group_chat_setup(root_agent)` returning an object (here called `ca`) that exposes:
   - `ca.handoffs`: an ordered iterable of handoff objects.
2. Each handoff object has (or can be extended to have) the attributes:
   - `type`: `"llm"` or `"context"`.
   - `sub_agent_id`: the string ID of the sub-agent resolved from `config["handoffs"][i]["to"]`.
   - `condition`: an object whose `repr` contains at least the identifying text from the `condition` dict in the config.
   - `after_work`: an instance of either `TerminateTarget` or `RevertToUserTarget`.
3. `AG2AgentBuilder` exposes an async method `get_agent(agent_id: str)` used by `_apply_handoffs` to resolve sub-agents.
4. `src.services.ag2.agent_builder` exports `TerminateTarget` and `RevertToUserTarget`.

If any of these assumptions differ from your actual implementation, you should:
- Update the test calls (`build_group_chat_setup`, attribute names on the returned object, etc.) to match your real API.
- Adjust the assertions on the handoff objects to match the actual attributes you expose (for example, you may store the sub-agent as `handoff.agent` instead of `sub_agent_id`; in that case assert on `handoff.agent.id`).
- If `TerminateTarget` / `RevertToUserTarget` are not directly exported, either import them from the correct module or assert based on whatever target representation your implementation uses (e.g., string enums like `"terminate"` / `"user"`).
</issue_to_address>

### Comment 13
<location path="tests/services/ag2/test_agent_runner.py" line_range="7-16" />
<code_context>
+    }
+    return a
+
+@pytest.mark.asyncio
+async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
+    builder = AG2AgentBuilder(db=mock_db)
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for error conditions and non-happy-path behavior in `run_agent_stream` and `run_agent`

Please also cover:

- A case where `run_agent` raises (e.g., `InternalServerError`) and `run_agent_stream` propagates the exception instead of yielding.
- Direct `run_agent` tests for:
  - `get_agent` returning `None``AgentNotFoundError`.
  - `ag2_mode == "group_chat"` with `initiate_group_chat` patched, asserting correct use of `chat_result.summary` and `chat_history`.
  - `ag2_mode == "single"` with `ConversableAgent` patched, asserting the proxy pattern and that the final response comes from the last message.

This will exercise the control flow and error mapping beyond the happy-path streaming case.

Suggested implementation:

```python
import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from src.exceptions import InternalServerError
from src.services.ag2.agent_runner import run_agent_stream, run_agent, AgentNotFoundError

```

```python
    assert data["content"]["parts"][0]["text"] == "Resolved: your issue is fixed."
    assert data["is_final"] is True


@pytest.mark.asyncio
async def test_run_agent_stream_propagates_internal_server_error():
    mock_settings = MagicMock()
    mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"

    # Patch run_agent to raise, and still patch session service + settings as in happy-path test
    with patch(
        "src.services.ag2.agent_runner.run_agent",
        AsyncMock(side_effect=InternalServerError("boom")),
    ):
        with patch("src.services.ag2.session_service.AG2SessionService"):
            with patch("src.config.settings.get_settings", return_value=mock_settings):
                with pytest.raises(InternalServerError):
                    async for _ in run_agent_stream(
                        db=AsyncMock(),
                        agent_id="agent-123",
                        external_id="ext-123",
                        session_id="session-abc",
                        message="This will fail",
                    ):
                        # If we ever yield, the behavior is wrong
                        pytest.fail("run_agent_stream should not yield on InternalServerError")


@pytest.mark.asyncio
async def test_run_agent_get_agent_none_raises_agent_not_found():
    mock_db = AsyncMock()

    with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
        builder_instance = builder_cls.return_value
        builder_instance.get_agent.return_value = None

        with pytest.raises(AgentNotFoundError):
            await run_agent(
                db=mock_db,
                agent_id="missing-agent",
                external_id="ext-123",
                session_id="session-abc",
                message="Hello?",
            )


@pytest.mark.asyncio
async def test_run_agent_group_chat_uses_summary_and_chat_history():
    mock_db = AsyncMock()

    # Fake agent record in group_chat mode
    agent_record = MagicMock()
    agent_record.ag2_mode = "group_chat"

    chat_result = MagicMock()
    chat_result.summary = "Group chat resolution summary"
    chat_result.chat_history = [
        {"role": "user", "content": "Question"},
        {"role": "assistant", "content": "Intermediate"},
        {"role": "assistant", "content": "Final"},
    ]

    with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
        builder_instance = builder_cls.return_value
        builder_instance.get_agent.return_value = agent_record

        with patch(
            "src.services.ag2.agent_runner.initiate_group_chat",
            AsyncMock(return_value=chat_result),
        ):
            result = await run_agent(
                db=mock_db,
                agent_id="group-agent",
                external_id="ext-123",
                session_id="session-abc",
                message="Start group chat",
            )

    assert result["final_response"] == chat_result.summary
    assert result["message_history"] == chat_result.chat_history


@pytest.mark.asyncio
async def test_run_agent_single_mode_uses_proxy_and_last_message_response():
    mock_db = AsyncMock()

    # Fake agent record in single mode
    agent_record = MagicMock()
    agent_record.ag2_mode = "single"

    # Simulated conversation messages; run_agent should use the last one as final response
    conversation_messages = [
        {"role": "user", "content": "Hi"},
        {"role": "assistant", "content": "First reply"},
        {"role": "assistant", "content": "Final reply from proxy"},
    ]

    with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
        builder_instance = builder_cls.return_value
        builder_instance.get_agent.return_value = agent_record

        # Mock ConversableAgent to simulate proxy pattern and returned messages
        with patch("src.services.ag2.agent_runner.ConversableAgent") as ConversableAgentMock:
            # Each instantiation returns a separate agent mock
            primary_agent = MagicMock()
            proxy_agent = MagicMock()

            # The proxy agent produces the conversation messages
            proxy_agent.initiate_chat.return_value = conversation_messages

            ConversableAgentMock.side_effect = [primary_agent, proxy_agent]

            result = await run_agent(
                db=mock_db,
                agent_id="single-agent",
                external_id="ext-123",
                session_id="session-abc",
                message="Talk to me",
            )

    # Assert that we used the proxy pattern (two agents created)
    assert ConversableAgentMock.call_count == 2

    # Final response should come from the last message produced by the proxy conversation
    assert result["final_response"] == conversation_messages[-1]["content"]

```

These tests assume:
1. `InternalServerError` is available as `src.exceptions.InternalServerError`. If it lives elsewhere, update the import path accordingly.
2. `AgentNotFoundError` is exported from `src.services.ag2.agent_runner`. If it is defined in a different module, adjust the import and the `pytest.raises(...)` target.
3. `run_agent`:
   - Accepts the arguments `(db, agent_id, external_id, session_id, message, ...)` as used here.
   - Raises `AgentNotFoundError` when `AG2AgentBuilder(...).get_agent(...)` returns `None`.
   - In `"group_chat"` mode calls `initiate_group_chat(...)` and returns a dict with `"final_response"` (from `chat_result.summary`) and `"message_history"` (from `chat_result.chat_history`).
   - In `"single"` mode constructs two `ConversableAgent` instances (primary + proxy), uses the proxy's `initiate_chat` result, and sets `"final_response"` from the last message's `"content"`.
If your actual control flow differs (e.g., different attribute names, return shapes, or proxy setup), adjust the assertions and mocks (especially the `side_effect` on `ConversableAgentMock` and the expected keys in `result`) to match the real implementation.
</issue_to_address>

### Comment 14
<location path="tests/services/ag2/test_agent_runner.py" line_range="18-27" />
<code_context>
+
+    # Patch run_agent (the heavy work), session service, and settings —
+    # get_settings and AG2SessionService are local imports inside run_agent_stream
+    with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)):
+        with patch("src.services.ag2.session_service.AG2SessionService"):
+            with patch("src.config.settings.get_settings", return_value=mock_settings):
+                chunks = []
+                async for chunk in run_agent_stream(
+                    db=AsyncMock(),
+                    agent_id="agent-123",
+                    external_id="ext-123",
+                    session_id="session-abc",
+                    message="My printer is broken",
+                ):
+                    chunks.append(chunk)
+
+    assert len(chunks) >= 1
</code_context>
<issue_to_address>
**suggestion (testing):** Consider asserting the full streaming envelope contract, not just the text content

Since `run_agent_stream` should match the WebSocket/ADK streaming envelope, this test should also validate the JSON schema, e.g.:

- `data["content"]["role"] == "agent"`
- `data["author"]` matches the `agent_id` argument
- `parts` elements have the expected `{ "type": "text" }` shape

This will better enforce the streaming protocol contract and detect unintended schema changes.
</issue_to_address>

### Comment 15
<location path="tests/services/ag2/test_agent_runner.py" line_range="13-20" />
<code_context>
+        "final_response": "Resolved: your issue is fixed.",
+        "message_history": [],
+    }
+    mock_settings = MagicMock()
+    mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"
+
+    # Patch run_agent (the heavy work), session service, and settings —
+    # get_settings and AG2SessionService are local imports inside run_agent_stream
+    with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)):
+        with patch("src.services.ag2.session_service.AG2SessionService"):
+            with patch("src.config.settings.get_settings", return_value=mock_settings):
+                chunks = []
+                async for chunk in run_agent_stream(
</code_context>
<issue_to_address>
**suggestion (testing):** Add a dedicated test for session history integration in `run_agent`

Since `AG2SessionService` is fully stubbed, we don’t currently verify that messages are appended and persisted. Please add a focused unit test for `run_agent` that injects a fake `AG2SessionService` with `get_or_create`, `append`, and `save` mocked, and asserts that:

- `append` is called in order with `("user", message)` then `("assistant", final_response)`
- `save` is called once with the same session instance

This will ensure conversational state is correctly persisted for the WhatsApp pause/resume scenario described in the PR.

Suggested implementation:

```python
import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch, call
from src.services.ag2.agent_runner import run_agent_stream, run_agent

```

```python
    assert len(chunks) >= 1
    data = json.loads(chunks[0])


@pytest.mark.asyncio
async def test_run_agent_persists_session_history_in_order():
    mock_settings = MagicMock()
    mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"

    with patch("src.services.ag2.session_service.AG2SessionService") as MockSessionService, \
         patch("src.config.settings.get_settings", return_value=mock_settings):

        mock_session_service = MockSessionService.return_value
        mock_session = MagicMock()

        # Ensure session service methods are async and controllable
        mock_session_service.get_or_create = AsyncMock(return_value=mock_session)
        mock_session_service.append = AsyncMock()
        mock_session_service.save = AsyncMock()

        db = AsyncMock()
        user_message = "My printer is broken"

        result = await run_agent(
            db=db,
            agent_id="agent-123",
            external_id="ext-123",
            session_id="session-abc",
            message=user_message,
        )

        # Append is called in order: user then assistant
        mock_session_service.append.assert_has_awaits(
            [
                call(mock_session, "user", user_message),
                call(mock_session, "assistant", result["final_response"]),
            ]
        )

        # Save is called once with the same session instance
        mock_session_service.save.assert_awaited_once_with(mock_session)

```

If the actual signature of `run_agent` or the `AG2SessionService` methods differ (for example, if `append`/`save` or `get_or_create` are synchronous instead of async, or if `append` takes different parameters), you should:
1. Adjust the `AsyncMock` usage to `MagicMock` and switch from `assert_awaited*` to `assert_called*` accordingly.
2. Update the `call(...)` argument lists to match the real method signatures (e.g., if `append` receives keyword arguments or a different ordering).
3. If `run_agent` requires additional parameters (such as config, tools, or agent options), pass suitable dummy values in the test invocation.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +95 to +96
messages: Mapped[MutableDict[str, Any]] = mapped_column(
MutableDict.as_mutable(DynamicJSON), default=[]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): The messages column uses MutableDict but defaults to a list, which is inconsistent and may break SQLAlchemy change tracking.

The column is typed as Mapped[MutableDict[str, Any]] and uses MutableDict.as_mutable(DynamicJSON), but the default is [], which is a list. MutableDict expects a mapping and may not track changes correctly with a list value. If messages is meant to be a list of message dicts, switch to MutableList (or a custom mutable list type) and update the type hint; otherwise, use a dict default (e.g. {}) and treat it consistently as a mapping.

Comment on lines +130 to +139
def get_or_create(self, agent_id: str, external_id: str) -> AG2Session:
"""Retrieve an existing session or create a new one."""
session_id = f"{external_id}_{agent_id}"
with self.SessionLocal() as db:
record = db.get(AG2StorageSession, (agent_id, external_id, session_id))
if record is None:
record = AG2StorageSession(
app_name=agent_id,
user_id=external_id,
id=session_id,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): get_or_create performs two separate DB roundtrips for the same session where one would suffice.

Inside get_or_create, you open one session to create/commit the AG2StorageSession, then a second session to fetch the same row just to load messages. Instead, reuse the first record to build AG2Session and skip the second db.get/session, which both cuts DB roundtrips and removes the race window between the two transactions.

Suggested implementation:

    def get_or_create(self, agent_id: str, external_id: str) -> AG2Session:
        """Retrieve an existing session or create a new one.

        This implementation performs all work in a single DB session to avoid
        extra roundtrips and race windows between separate transactions.
        """
        session_id = f"{external_id}_{agent_id}"

        with self.SessionLocal() as db:
            # Single roundtrip: either fetch existing or create, then return.
            record = db.get(AG2StorageSession, (agent_id, external_id, session_id))

            if record is None:
                record = AG2StorageSession(
                    app_name=agent_id,
                    user_id=external_id,
                    id=session_id,
                    messages=[],
                )
                db.add(record)
                db.commit()
                # Ensure we have a fully-populated, persistent instance
                db.refresh(record)

            # Make sure messages are loaded while the session is still open.
            # If `messages` is a lazy relationship, this will trigger the load here.
            _ = record.messages

            # Build the AG2Session directly from the record we already have,
            # instead of re-opening a new session and re-querying the same row.
            return AG2Session(
                agent_id=record.app_name,
                external_id=record.user_id,
                session_id=record.id,
                messages=list(record.messages or []),
            )

Because I only saw the beginning of get_or_create, you should:

  1. Remove any remaining code after this method that:
    • Opens a second with self.SessionLocal() as db: block inside get_or_create, or
    • Re-fetches AG2StorageSession (e.g., db.get(AG2StorageSession, ...)) for the same agent_id/external_id/session_id.
      That logic is now redundant and should be deleted.
  2. If your AG2Session constructor has a different signature than shown here, adapt the AG2Session(...) call accordingly, but keep the pattern of constructing it directly from record in this single session.
  3. If messages is not a relationship but a plain JSON/array column, you can drop the _ = record.messages line; the rest of the logic remains the same.

Comment on lines +159 to +161
if env:
for key, value in env.items():
os.environ[key] = value
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Mutating os.environ for MCP server env vars introduces global side effects and is likely unnecessary.

In _connect_server, you both build an env dict for McpServer and also write those values into os.environ. This leaks per-server config into the global process and can cause unexpected interactions between servers or other code. It should be enough to pass env into McpServer without modifying os.environ.

Comment on lines +134 to +141
config = root_agent.config or {}
sub_agent_ids = config.get("sub_agents", [])
if not sub_agent_ids:
raise ValueError("group_chat agent requires at least one sub_agent")

# Build all sub-agents first so handoff resolution can reference them
all_agents = {}
agents = []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Sub-agents are fetched from the database twice: once to build them and again to apply handoffs.

In build_group_chat_setup, each aid triggers two get_agent calls—first to build ConversableAgents, then again when applying handoffs. Consider storing the initial db_agent objects (e.g., an id → db_agent dict) and reusing them when applying handoffs to avoid duplicate database queries and keep the lookup logic centralized.

Suggested change
config = root_agent.config or {}
sub_agent_ids = config.get("sub_agents", [])
if not sub_agent_ids:
raise ValueError("group_chat agent requires at least one sub_agent")
# Build all sub-agents first so handoff resolution can reference them
all_agents = {}
agents = []
config = root_agent.config or {}
sub_agent_ids = config.get("sub_agents", [])
if not sub_agent_ids:
raise ValueError("group_chat agent requires at least one sub_agent")
# Build all sub-agents first so handoff resolution can reference them
all_agents = {}
agents = []
db_sub_agents = {}
for aid in sub_agent_ids:
db_agent = get_agent(self.db, str(aid))
if db_agent is None:
raise ValueError(f"Sub-agent {aid} not found")
db_sub_agents[str(aid)] = db_agent
ca = await self.build_conversable_agent(db_agent)
all_agents[str(aid)] = ca
agents.append(ca)
root_ca = await self.build_conversable_agent(root_agent)
all_agents[str(root_agent.id)] = root_ca
# Apply handoffs to each sub-agent if configured, reusing cached db agents
for aid in sub_agent_ids:
db_agent = db_sub_agents.get(str(aid))
if db_agent and db_agent.config:
self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)

Comment on lines +94 to +116
for h in handoffs_config:
target_id = h.get("target_agent_id")
target_agent = all_agents.get(str(target_id))
if not target_agent:
logger.warning(f"Handoff target {target_id} not found, skipping")
continue

if h["type"] == "llm":
llm_conditions.append(
OnCondition(
target=AgentTarget(target_agent),
condition=StringLLMCondition(prompt=h["condition"]),
)
)
elif h["type"] == "context":
context_conditions.append(
OnContextCondition(
target=AgentTarget(target_agent),
condition=ExpressionContextCondition(
expression=ContextExpression(h["expression"])
),
)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Using h["type"] without validation can raise if a handoff entry is malformed.

In _apply_handoffs, this direct h["type"] access will raise a KeyError if a handoff entry is missing type, which will break setup for a misconfigured user config. Consider using h.get("type") and skipping/logging invalid entries, consistent with how missing target_agent is handled.

Suggested change
for h in handoffs_config:
target_id = h.get("target_agent_id")
target_agent = all_agents.get(str(target_id))
if not target_agent:
logger.warning(f"Handoff target {target_id} not found, skipping")
continue
if h["type"] == "llm":
llm_conditions.append(
OnCondition(
target=AgentTarget(target_agent),
condition=StringLLMCondition(prompt=h["condition"]),
)
)
elif h["type"] == "context":
context_conditions.append(
OnContextCondition(
target=AgentTarget(target_agent),
condition=ExpressionContextCondition(
expression=ContextExpression(h["expression"])
),
)
)
for h in handoffs_config:
target_id = h.get("target_agent_id")
target_agent = all_agents.get(str(target_id))
if not target_agent:
logger.warning(f"Handoff target {target_id} not found, skipping")
continue
h_type = h.get("type")
if h_type not in ("llm", "context"):
logger.warning(
f"Invalid or missing handoff type {h_type!r} for target {target_id}, skipping"
)
continue
if h_type == "llm":
llm_conditions.append(
OnCondition(
target=AgentTarget(target_agent),
condition=StringLLMCondition(prompt=h["condition"]),
)
)
elif h_type == "context":
context_conditions.append(
OnContextCondition(
target=AgentTarget(target_agent),
condition=ExpressionContextCondition(
expression=ContextExpression(h["expression"])
),
)
)

Comment on lines +40 to +49
@pytest.mark.asyncio
async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
builder = AG2AgentBuilder(db=mock_db)
sub_agent = _make_agent("uuid-triage", "triage")
with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent):
result, _ = await builder.build_agent(group_chat_agent_record)
# result should be a dict with pattern and agents for initiate_group_chat
assert "pattern" in result
assert "agents" in result

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add tests for single-agent mode and API key resolution branches in AG2AgentBuilder

Current tests only cover group chat orchestration and the empty-sub-agent validation. Please also add coverage for:

  • single mode: build_agent returning a ConversableAgent when ag2_mode is unset or set to "single".
  • Agent metadata: name sanitization (spaces → underscores) and correct system message composition from role, goal, and instruction.
  • _get_api_key behavior:
    • api_key_id provided and resolved via get_decrypted_api_key.
    • config.api_key as a UUID string resolved via get_decrypted_api_key.
    • config.api_key as a non-UUID raw key returned as-is.
    • No valid key → ValueError with a clear message.

These cases help ensure behavior stays aligned with ADK/CrewAI builders and that auth errors are surfaced properly.

Suggested implementation:

@pytest.mark.asyncio
async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
    builder = AG2AgentBuilder(db=mock_db)
    sub_agent = _make_agent("uuid-triage", "triage")
    with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent):
        result, _ = await builder.build_agent(group_chat_agent_record)
    # result should be a dict with pattern and agents for initiate_group_chat
    assert "pattern" in result
    assert "agents" in result

@pytest.mark.asyncio
async def test_builder_validates_group_chat_requires_agents(mock_db):
    record = MagicMock()
    record.type = "llm"
    record.config = {"ag2_mode": "group_chat", "sub_agents": []}  # empty — should raise
    record.api_key = "test"
    builder = AG2AgentBuilder(db=mock_db)
    with pytest.raises(ValueError, match="at least one"):
        await builder.build_agent(record)


@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_unset(mock_db):
    record = MagicMock()
    record.type = "llm"
    record.name = "Support Team Agent"
    record.api_key_id = None
    record.api_key = None
    record.config = {
        # no ag2_mode: should default to single-agent mode
        "role": "assistant",
        "goal": "Help users resolve issues",
        "instruction": "Be concise and friendly.",
        "api_key": "raw-openai-key",
    }

    builder = AG2AgentBuilder(db=mock_db)
    agent, metadata = await builder.build_agent(record)

    # single-agent mode should produce a ConversableAgent-like object
    from autogen import ConversableAgent  # type: ignore[import]
    assert isinstance(agent, ConversableAgent)

    # name should be sanitized (spaces -> underscores)
    assert agent.name == "Support_Team_Agent"

    # system message should compose role, goal and instruction
    system_message = getattr(agent, "system_message", "")
    assert "assistant" in system_message
    assert "Help users resolve issues" in system_message
    assert "Be concise and friendly." in system_message
    # metadata should at least echo the mode
    assert metadata.get("mode") == "single"


@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_single(mock_db):
    record = MagicMock()
    record.type = "llm"
    record.name = "Billing Bot"
    record.api_key_id = None
    record.api_key = None
    record.config = {
        "ag2_mode": "single",
        "role": "assistant",
        "goal": "Assist with billing questions",
        "instruction": "Only answer billing-related queries.",
        "api_key": "raw-openai-key",
    }

    builder = AG2AgentBuilder(db=mock_db)
    agent, metadata = await builder.build_agent(record)

    from autogen import ConversableAgent  # type: ignore[import]
    assert isinstance(agent, ConversableAgent)
    assert agent.name == "Billing_Bot"

    system_message = getattr(agent, "system_message", "")
    assert "billing" in system_message.lower()
    assert "only answer billing-related queries.".lower() in system_message.lower()
    assert metadata.get("mode") == "single"


def test_get_api_key_with_api_key_id_uses_secret_store(mock_db):
    builder = AG2AgentBuilder(db=mock_db)
    record = MagicMock()
    record.api_key_id = "11111111-2222-3333-4444-555555555555"
    record.api_key = None
    record.config = {}

    with patch(
        "src.services.ag2.agent_builder.get_decrypted_api_key",
        return_value="resolved-secret-key",
    ) as mock_get:
        api_key = builder._get_api_key(record)

    mock_get.assert_called_once_with(mock_db, record.api_key_id)
    assert api_key == "resolved-secret-key"


def test_get_api_key_with_uuid_string_in_config(mock_db):
    builder = AG2AgentBuilder(db=mock_db)
    config_uuid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
    record = MagicMock()
    record.api_key_id = None
    record.api_key = None
    record.config = {"api_key": config_uuid}

    with patch(
        "src.services.ag2.agent_builder.get_decrypted_api_key",
        return_value="resolved-from-config",
    ) as mock_get:
        api_key = builder._get_api_key(record)

    mock_get.assert_called_once_with(mock_db, config_uuid)
    assert api_key == "resolved-from-config"


def test_get_api_key_with_raw_non_uuid_config_value(mock_db):
    builder = AG2AgentBuilder(db=mock_db)
    record = MagicMock()
    record.api_key_id = None
    record.api_key = None
    record.config = {"api_key": "sk-live-plain-openai-key"}

    api_key = builder._get_api_key(record)

    # Non-UUID api_key should be returned as-is, without hitting secret store
    assert api_key == "sk-live-plain-openai-key"


def test_get_api_key_raises_when_no_valid_key(mock_db):
    builder = AG2AgentBuilder(db=mock_db)
    record = MagicMock()
    record.api_key_id = None
    record.api_key = None
    record.config = {}

    with pytest.raises(ValueError, match="API key"):
        builder._get_api_key(record)
  1. Ensure ConversableAgent is available at the top of the test module; if you prefer not to import it inside tests, move the from autogen import ConversableAgent import to the global imports section and remove the inline imports in the tests.
  2. Verify the call signature of get_decrypted_api_key in src.services.ag2.agent_builder; the tests assume get_decrypted_api_key(db, api_key_id). If the actual order differs, adjust the assert_called_once_with expectations accordingly.
  3. If AG2AgentBuilder._get_api_key currently uses a different error message when no key is found, update the implementation to raise ValueError("API key could not be resolved") (or similar) so that the "API key" substring in the match expression is present.
  4. If the ConversableAgent implementation uses a different attribute than .system_message for the composed instructions, either:
    • Update AG2AgentBuilder to set .system_message to the composed text, or
    • Adjust the tests to assert against the actual attribute used (e.g. agent.description), keeping the checks that role, goal, and instruction are all present.

Comment on lines +6 to +15
@pytest.fixture
def mock_db():
return AsyncMock()

def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"):
"""Create a minimal agent record mock with explicit attribute assignment."""
a = MagicMock()
a.id = agent_id
a.name = agent_name
a.type = "llm"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Extend tests to cover handoff configuration and sub-agent resolution in group chat setups

AG2AgentBuilder has substantial logic in _apply_handoffs and build_group_chat_setup (LLM/context handoffs, after_work mapping, and missing sub-agent handling), but current tests only check the output shape.

Please add tests that:

  • Build a root agent with sub-agents containing handoffs of type "llm" and "context", and assert the expected conditions are registered on ca.handoffs.
  • Use a sub-agent ID for which get_agent returns None and assert a ValueError("Sub-agent ... not found").
  • Verify after_work resolves to TerminateTarget when set to "terminate" and to RevertToUserTarget otherwise.

You can rely on lightweight MagicMock agents or patched AG2 classes to avoid heavy dependencies while still validating the behavior and wiring.

Suggested implementation:

from unittest.mock import MagicMock, AsyncMock, patch
from src.services.ag2.agent_builder import AG2AgentBuilder, TerminateTarget, RevertToUserTarget

@pytest.fixture
def mock_db():
    return AsyncMock()

def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"):
    """Create a minimal agent record mock with explicit attribute assignment."""
    a = MagicMock()
    a.id = agent_id
    a.name = agent_name
    a.type = "llm"
    a.llm_provider = "openai"
    a.model = model
    a.config = {"ag2_mode": ag2_mode, "api_key": "test-key"}
    a.api_key = "test-key"
    a.api_key_id = None
    a.api_url = "https://api.openai.com/v1"
    a.role = None
    a.goal = None
    a.instruction = f"You are {agent_name}."
    return a


@pytest.fixture
def root_agent_with_handoffs():
    """Root agent configured with LLM and context handoffs to sub-agents."""
    root = _make_agent("root", "Root Agent", ag2_mode="group")

    # Two handoffs: one LLM and one context, with different after_work semantics
    root.config["handoffs"] = [
        {
            "type": "llm",
            "to": "sub-llm",
            "condition": {"kind": "tool", "name": "delegate_to_sub_llm"},
            "after_work": "terminate",
        },
        {
            "type": "context",
            "to": "sub-context",
            "condition": {"kind": "message", "contains": "use context"},
            "after_work": "return",
        },
    ]
    return root


@pytest.fixture
def sub_agents():
    """Sub-agents referenced by the root agent's handoffs."""
    sub_llm = _make_agent("sub-llm", "Sub LLM Agent")
    sub_ctx = _make_agent("sub-context", "Sub Context Agent")
    return {"sub-llm": sub_llm, "sub-context": sub_ctx}


@pytest.mark.asyncio
async def test_apply_handoffs_registers_llm_and_context_handoffs(
    mock_db, root_agent_with_handoffs, sub_agents
):
    """
    Ensure LLM and context handoffs are transformed and registered on ca.handoffs.

    This exercises AG2AgentBuilder.build_group_chat_setup and its internal
    _apply_handoffs logic without pulling in heavy AG2 dependencies.
    """
    builder = AG2AgentBuilder(mock_db)

    # Patch get_agent so handoff resolution finds our lightweight MagicMock sub-agents.
    async def fake_get_agent(agent_id: str):
        return sub_agents.get(agent_id)

    with patch.object(builder, "get_agent", side_effect=fake_get_agent):
        # Depending on implementation, the entry point may differ.
        # We assume build_group_chat_setup(root_agent) returns an object with .handoffs.
        ca = await builder.build_group_chat_setup(root_agent_with_handoffs)

    # We expect one handoff per entry in root_agent.config["handoffs"].
    assert hasattr(ca, "handoffs")
    assert len(ca.handoffs) == 2

    # Order is preserved: first is LLM handoff, second is context handoff.
    llm_handoff, ctx_handoff = ca.handoffs

    # The tests are intentionally lightweight: we validate the wiring rather than AG2 internals.
    # LLM handoff should target the sub-llm agent and have a condition based on tool usage.
    assert getattr(llm_handoff, "sub_agent_id", None) == "sub-llm"
    assert getattr(llm_handoff, "type", None) == "llm"
    assert "delegate_to_sub_llm" in repr(getattr(llm_handoff, "condition", ""))

    # Context handoff should target the sub-context agent and have a message-based condition.
    assert getattr(ctx_handoff, "sub_agent_id", None) == "sub-context"
    assert getattr(ctx_handoff, "type", None) == "context"
    assert "use context" in repr(getattr(ctx_handoff, "condition", ""))


@pytest.mark.asyncio
async def test_apply_handoffs_missing_sub_agent_raises_value_error(
    mock_db, root_agent_with_handoffs
):
    """
    When get_agent returns None for a referenced sub-agent, a ValueError is raised.
    """
    builder = AG2AgentBuilder(mock_db)

    # Force one of the handoffs to reference a non-existent sub-agent.
    root_agent_with_handoffs.config["handoffs"][0]["to"] = "missing-subagent-id"

    async def fake_get_agent(agent_id: str):
        # Simulate not-found for the missing ID, normal behavior otherwise.
        if agent_id == "missing-subagent-id":
            return None
        return _make_agent(agent_id, f"Agent {agent_id}")

    with patch.object(builder, "get_agent", side_effect=fake_get_agent):
        with pytest.raises(ValueError, match=r"Sub-agent .* not found"):
            await builder.build_group_chat_setup(root_agent_with_handoffs)


@pytest.mark.asyncio
async def test_after_work_resolves_to_correct_targets(
    mock_db, root_agent_with_handoffs, sub_agents
):
    """
    Verify that after_work is mapped to TerminateTarget and RevertToUserTarget.

    - "terminate" -> TerminateTarget
    - any other value (e.g. "return") -> RevertToUserTarget
    """
    builder = AG2AgentBuilder(mock_db)

    async def fake_get_agent(agent_id: str):
        return sub_agents.get(agent_id)

    with patch.object(builder, "get_agent", side_effect=fake_get_agent):
        ca = await builder.build_group_chat_setup(root_agent_with_handoffs)

    assert hasattr(ca, "handoffs")
    assert len(ca.handoffs) == 2

    llm_handoff, ctx_handoff = ca.handoffs

    # after_work was set to "terminate" for the LLM handoff.
    after_work_llm = getattr(llm_handoff, "after_work", None)
    assert isinstance(after_work_llm, TerminateTarget)

    # after_work was set to "return" for the context handoff.
    after_work_ctx = getattr(ctx_handoff, "after_work", None)
    assert isinstance(after_work_ctx, RevertToUserTarget)

These tests assume the following public/semipublic API on AG2AgentBuilder and related classes:

  1. AG2AgentBuilder has an async method build_group_chat_setup(root_agent) returning an object (here called ca) that exposes:
    • ca.handoffs: an ordered iterable of handoff objects.
  2. Each handoff object has (or can be extended to have) the attributes:
    • type: "llm" or "context".
    • sub_agent_id: the string ID of the sub-agent resolved from config["handoffs"][i]["to"].
    • condition: an object whose repr contains at least the identifying text from the condition dict in the config.
    • after_work: an instance of either TerminateTarget or RevertToUserTarget.
  3. AG2AgentBuilder exposes an async method get_agent(agent_id: str) used by _apply_handoffs to resolve sub-agents.
  4. src.services.ag2.agent_builder exports TerminateTarget and RevertToUserTarget.

If any of these assumptions differ from your actual implementation, you should:

  • Update the test calls (build_group_chat_setup, attribute names on the returned object, etc.) to match your real API.
  • Adjust the assertions on the handoff objects to match the actual attributes you expose (for example, you may store the sub-agent as handoff.agent instead of sub_agent_id; in that case assert on handoff.agent.id).
  • If TerminateTarget / RevertToUserTarget are not directly exported, either import them from the correct module or assert based on whatever target representation your implementation uses (e.g., string enums like "terminate" / "user").

Comment on lines +7 to +16
@pytest.mark.asyncio
async def test_runner_yields_response_chunks():
mock_result = {
"final_response": "Resolved: your issue is fixed.",
"message_history": [],
}
mock_settings = MagicMock()
mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"

# Patch run_agent (the heavy work), session service, and settings —
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add tests for error conditions and non-happy-path behavior in run_agent_stream and run_agent

Please also cover:

  • A case where run_agent raises (e.g., InternalServerError) and run_agent_stream propagates the exception instead of yielding.
  • Direct run_agent tests for:
    • get_agent returning NoneAgentNotFoundError.
    • ag2_mode == "group_chat" with initiate_group_chat patched, asserting correct use of chat_result.summary and chat_history.
    • ag2_mode == "single" with ConversableAgent patched, asserting the proxy pattern and that the final response comes from the last message.

This will exercise the control flow and error mapping beyond the happy-path streaming case.

Suggested implementation:

import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from src.exceptions import InternalServerError
from src.services.ag2.agent_runner import run_agent_stream, run_agent, AgentNotFoundError
    assert data["content"]["parts"][0]["text"] == "Resolved: your issue is fixed."
    assert data["is_final"] is True


@pytest.mark.asyncio
async def test_run_agent_stream_propagates_internal_server_error():
    mock_settings = MagicMock()
    mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"

    # Patch run_agent to raise, and still patch session service + settings as in happy-path test
    with patch(
        "src.services.ag2.agent_runner.run_agent",
        AsyncMock(side_effect=InternalServerError("boom")),
    ):
        with patch("src.services.ag2.session_service.AG2SessionService"):
            with patch("src.config.settings.get_settings", return_value=mock_settings):
                with pytest.raises(InternalServerError):
                    async for _ in run_agent_stream(
                        db=AsyncMock(),
                        agent_id="agent-123",
                        external_id="ext-123",
                        session_id="session-abc",
                        message="This will fail",
                    ):
                        # If we ever yield, the behavior is wrong
                        pytest.fail("run_agent_stream should not yield on InternalServerError")


@pytest.mark.asyncio
async def test_run_agent_get_agent_none_raises_agent_not_found():
    mock_db = AsyncMock()

    with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
        builder_instance = builder_cls.return_value
        builder_instance.get_agent.return_value = None

        with pytest.raises(AgentNotFoundError):
            await run_agent(
                db=mock_db,
                agent_id="missing-agent",
                external_id="ext-123",
                session_id="session-abc",
                message="Hello?",
            )


@pytest.mark.asyncio
async def test_run_agent_group_chat_uses_summary_and_chat_history():
    mock_db = AsyncMock()

    # Fake agent record in group_chat mode
    agent_record = MagicMock()
    agent_record.ag2_mode = "group_chat"

    chat_result = MagicMock()
    chat_result.summary = "Group chat resolution summary"
    chat_result.chat_history = [
        {"role": "user", "content": "Question"},
        {"role": "assistant", "content": "Intermediate"},
        {"role": "assistant", "content": "Final"},
    ]

    with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
        builder_instance = builder_cls.return_value
        builder_instance.get_agent.return_value = agent_record

        with patch(
            "src.services.ag2.agent_runner.initiate_group_chat",
            AsyncMock(return_value=chat_result),
        ):
            result = await run_agent(
                db=mock_db,
                agent_id="group-agent",
                external_id="ext-123",
                session_id="session-abc",
                message="Start group chat",
            )

    assert result["final_response"] == chat_result.summary
    assert result["message_history"] == chat_result.chat_history


@pytest.mark.asyncio
async def test_run_agent_single_mode_uses_proxy_and_last_message_response():
    mock_db = AsyncMock()

    # Fake agent record in single mode
    agent_record = MagicMock()
    agent_record.ag2_mode = "single"

    # Simulated conversation messages; run_agent should use the last one as final response
    conversation_messages = [
        {"role": "user", "content": "Hi"},
        {"role": "assistant", "content": "First reply"},
        {"role": "assistant", "content": "Final reply from proxy"},
    ]

    with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
        builder_instance = builder_cls.return_value
        builder_instance.get_agent.return_value = agent_record

        # Mock ConversableAgent to simulate proxy pattern and returned messages
        with patch("src.services.ag2.agent_runner.ConversableAgent") as ConversableAgentMock:
            # Each instantiation returns a separate agent mock
            primary_agent = MagicMock()
            proxy_agent = MagicMock()

            # The proxy agent produces the conversation messages
            proxy_agent.initiate_chat.return_value = conversation_messages

            ConversableAgentMock.side_effect = [primary_agent, proxy_agent]

            result = await run_agent(
                db=mock_db,
                agent_id="single-agent",
                external_id="ext-123",
                session_id="session-abc",
                message="Talk to me",
            )

    # Assert that we used the proxy pattern (two agents created)
    assert ConversableAgentMock.call_count == 2

    # Final response should come from the last message produced by the proxy conversation
    assert result["final_response"] == conversation_messages[-1]["content"]

These tests assume:

  1. InternalServerError is available as src.exceptions.InternalServerError. If it lives elsewhere, update the import path accordingly.
  2. AgentNotFoundError is exported from src.services.ag2.agent_runner. If it is defined in a different module, adjust the import and the pytest.raises(...) target.
  3. run_agent:
    • Accepts the arguments (db, agent_id, external_id, session_id, message, ...) as used here.
    • Raises AgentNotFoundError when AG2AgentBuilder(...).get_agent(...) returns None.
    • In "group_chat" mode calls initiate_group_chat(...) and returns a dict with "final_response" (from chat_result.summary) and "message_history" (from chat_result.chat_history).
    • In "single" mode constructs two ConversableAgent instances (primary + proxy), uses the proxy's initiate_chat result, and sets "final_response" from the last message's "content".
      If your actual control flow differs (e.g., different attribute names, return shapes, or proxy setup), adjust the assertions and mocks (especially the side_effect on ConversableAgentMock and the expected keys in result) to match the real implementation.

Comment on lines +18 to +27
with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)):
with patch("src.services.ag2.session_service.AG2SessionService"):
with patch("src.config.settings.get_settings", return_value=mock_settings):
chunks = []
async for chunk in run_agent_stream(
db=AsyncMock(),
agent_id="agent-123",
external_id="ext-123",
session_id="session-abc",
message="My printer is broken",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider asserting the full streaming envelope contract, not just the text content

Since run_agent_stream should match the WebSocket/ADK streaming envelope, this test should also validate the JSON schema, e.g.:

  • data["content"]["role"] == "agent"
  • data["author"] matches the agent_id argument
  • parts elements have the expected { "type": "text" } shape

This will better enforce the streaming protocol contract and detect unintended schema changes.

Comment on lines +13 to +20
mock_settings = MagicMock()
mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"

# Patch run_agent (the heavy work), session service, and settings —
# get_settings and AG2SessionService are local imports inside run_agent_stream
with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)):
with patch("src.services.ag2.session_service.AG2SessionService"):
with patch("src.config.settings.get_settings", return_value=mock_settings):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a dedicated test for session history integration in run_agent

Since AG2SessionService is fully stubbed, we don’t currently verify that messages are appended and persisted. Please add a focused unit test for run_agent that injects a fake AG2SessionService with get_or_create, append, and save mocked, and asserts that:

  • append is called in order with ("user", message) then ("assistant", final_response)
  • save is called once with the same session instance

This will ensure conversational state is correctly persisted for the WhatsApp pause/resume scenario described in the PR.

Suggested implementation:

import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch, call
from src.services.ag2.agent_runner import run_agent_stream, run_agent
    assert len(chunks) >= 1
    data = json.loads(chunks[0])


@pytest.mark.asyncio
async def test_run_agent_persists_session_history_in_order():
    mock_settings = MagicMock()
    mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"

    with patch("src.services.ag2.session_service.AG2SessionService") as MockSessionService, \
         patch("src.config.settings.get_settings", return_value=mock_settings):

        mock_session_service = MockSessionService.return_value
        mock_session = MagicMock()

        # Ensure session service methods are async and controllable
        mock_session_service.get_or_create = AsyncMock(return_value=mock_session)
        mock_session_service.append = AsyncMock()
        mock_session_service.save = AsyncMock()

        db = AsyncMock()
        user_message = "My printer is broken"

        result = await run_agent(
            db=db,
            agent_id="agent-123",
            external_id="ext-123",
            session_id="session-abc",
            message=user_message,
        )

        # Append is called in order: user then assistant
        mock_session_service.append.assert_has_awaits(
            [
                call(mock_session, "user", user_message),
                call(mock_session, "assistant", result["final_response"]),
            ]
        )

        # Save is called once with the same session instance
        mock_session_service.save.assert_awaited_once_with(mock_session)

If the actual signature of run_agent or the AG2SessionService methods differ (for example, if append/save or get_or_create are synchronous instead of async, or if append takes different parameters), you should:

  1. Adjust the AsyncMock usage to MagicMock and switch from assert_awaited* to assert_called* accordingly.
  2. Update the call(...) argument lists to match the real method signatures (e.g., if append receives keyword arguments or a different ordering).
  3. If run_agent requires additional parameters (such as config, tools, or agent options), pass suitable dummy values in the test invocation.

VasiliyRad and others added 2 commits March 11, 2026 23:48
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant