Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The Evo AI platform allows:
- Custom tools management
- **[Google Agent Development Kit (ADK)](https://google.github.io/adk-docs/)**: Base framework for agent development
- **[CrewAI Support](https://github.com/crewAI/crewAI)**: Alternative framework for agent development (in development)
- **[AG2 (formerly AutoGen)](https://github.com/ag2ai/ag2)**: Dynamic GroupChat, context-variable handoffs, and human-in-the-loop (`AI_ENGINE=ag2`)
- JWT authentication with email verification
- **[Agent 2 Agent (A2A) Protocol Support](https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/)**: Interoperability between AI agents
- **[Workflow Agent with LangGraph](https://www.langchain.com/langgraph)**: Building complex agent workflows
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies = [
"crewai==0.120.1",
"crewai-tools==0.45.0",
"a2a-sdk==0.2.4",
"ag2[openai]>=0.11.0",
]

[project.optional-dependencies]
Expand Down
41 changes: 31 additions & 10 deletions src/api/chat_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse, FileData
from src.services.adk.agent_runner import run_agent as run_agent_adk, run_agent_stream
from src.services.crewai.agent_runner import run_agent as run_agent_crewai
from src.services.ag2.agent_runner import run_agent as run_agent_ag2, run_agent_stream as run_agent_stream_ag2
from src.core.exceptions import AgentNotFoundError
from src.services.service_providers import (
session_service,
Expand Down Expand Up @@ -221,16 +222,27 @@ async def websocket_chat(
logger.error(f"Error processing files: {str(e)}")
files = None

async for chunk in run_agent_stream(
agent_id=agent_id,
external_id=external_id,
message=message,
session_service=session_service,
artifacts_service=artifacts_service,
memory_service=memory_service,
db=db,
files=files,
):
if settings.AI_ENGINE == "ag2":
stream_gen = run_agent_stream_ag2(
agent_id=agent_id,
external_id=external_id,
message=message,
session_service=session_service,
db=db,
files=files,
)
else:
stream_gen = run_agent_stream(
agent_id=agent_id,
external_id=external_id,
message=message,
session_service=session_service,
artifacts_service=artifacts_service,
memory_service=memory_service,
db=db,
files=files,
)
async for chunk in stream_gen:
await websocket.send_json(
{"message": json.loads(chunk), "turn_complete": False}
)
Expand Down Expand Up @@ -300,6 +312,15 @@ async def chat(
db,
files=request.files,
)
elif settings.AI_ENGINE == "ag2":
final_response = await run_agent_ag2(
agent_id,
external_id,
request.message,
session_service,
db,
files=request.files,
)

return {
"response": final_response["final_response"],
Expand Down
6 changes: 5 additions & 1 deletion src/services/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
from .adk.agent_runner import run_agent
# google-adk is an optional dependency — guard so unit tests run without the full stack
try:
from .adk.agent_runner import run_agent
except ImportError:
pass
Empty file added src/services/ag2/__init__.py
Empty file.
207 changes: 207 additions & 0 deletions src/services/ag2/agent_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import uuid
from typing import Tuple, Optional
from autogen import ConversableAgent, LLMConfig
from autogen.agentchat import initiate_group_chat
from autogen.agentchat.group.patterns import DefaultPattern, AutoPattern
from autogen.agentchat.group import (
ContextVariables,
RevertToUserTarget,
TerminateTarget,
AgentTarget,
OnCondition,
StringLLMCondition,
OnContextCondition,
ExpressionContextCondition,
ContextExpression,
)
from sqlalchemy.orm import Session
from src.services.agent_service import get_agent
from src.services.apikey_service import get_decrypted_api_key
from src.utils.logger import setup_logger

logger = setup_logger(__name__)


class AG2AgentBuilder:
def __init__(self, db: Session):
self.db = db

async def _get_api_key(self, agent) -> str:
"""Reuse the same key resolution logic as ADK and CrewAI builders."""
if hasattr(agent, "api_key_id") and agent.api_key_id:
key = get_decrypted_api_key(self.db, agent.api_key_id)
if key:
return key
raise ValueError(f"API key {agent.api_key_id} not found or inactive")
config_key = agent.config.get("api_key") if agent.config else None
if config_key:
try:
key = get_decrypted_api_key(self.db, uuid.UUID(config_key))
return key or config_key
except (ValueError, TypeError):
return config_key
raise ValueError(f"No API key configured for agent {agent.name}")

def _build_llm_config(self, agent, api_key: str) -> LLMConfig:
return LLMConfig({"model": agent.model, "api_key": api_key})

def _build_system_message(self, agent) -> str:
parts = []
if agent.role:
parts.append(f"Role: {agent.role}")
if agent.goal:
parts.append(f"Goal: {agent.goal}")
if agent.instruction:
parts.append(agent.instruction)
return "\n\n".join(parts)

async def build_conversable_agent(self, agent) -> ConversableAgent:
api_key = await self._get_api_key(agent)
# AG2 0.11+ rejects names containing whitespace for OpenAI models
safe_name = agent.name.replace(" ", "_")
return ConversableAgent(
name=safe_name,
system_message=self._build_system_message(agent),
description=agent.description or "",
llm_config=self._build_llm_config(agent, api_key),
)

def _apply_handoffs(self, ca: ConversableAgent, config: dict, all_agents: dict):
"""
Apply AG2 handoff conditions from the agent config's optional 'handoffs' field.

Config format:
{
"handoffs": [
{
"type": "llm",
"target_agent_id": "<uuid>",
"condition": "Route when the user asks about billing"
},
{
"type": "context",
"target_agent_id": "<uuid>",
"expression": "${is_vip} == True"
}
],
"after_work": "revert_to_user" // or "terminate"
}
"""
handoffs_config = config.get("handoffs", [])
llm_conditions = []
context_conditions = []

for h in handoffs_config:
target_id = h.get("target_agent_id")
target_agent = all_agents.get(str(target_id))
if not target_agent:
logger.warning(f"Handoff target {target_id} not found, skipping")
continue

h_type = h.get("type")
if h_type not in ("llm", "context"):
logger.warning(f"Unknown or missing handoff type {h_type!r} for target {target_id}, skipping")
continue

if h_type == "llm":
llm_conditions.append(
OnCondition(
target=AgentTarget(target_agent),
condition=StringLLMCondition(prompt=h.get("condition", "")),
)
)
elif h_type == "context":
context_conditions.append(
OnContextCondition(
target=AgentTarget(target_agent),
condition=ExpressionContextCondition(
expression=ContextExpression(h.get("expression", ""))
),
)
)

if llm_conditions:
ca.handoffs.add_llm_conditions(llm_conditions)
if context_conditions:
ca.handoffs.add_context_conditions(context_conditions)

after_work = config.get("after_work", "revert_to_user")
if after_work == "terminate":
ca.handoffs.set_after_work(TerminateTarget())
else:
ca.handoffs.set_after_work(RevertToUserTarget())

async def build_group_chat_setup(self, root_agent) -> dict:
"""
Build a GroupChat pattern from an agent record with sub_agents.
Returns a dict consumed by the runner's initiate_group_chat call.
"""
config = root_agent.config or {}
sub_agent_ids = config.get("sub_agents", [])
if not sub_agent_ids:
raise ValueError("group_chat agent requires at least one sub_agent")

# Build all sub-agents first so handoff resolution can reference them.
# Cache db_agent records to avoid re-fetching them in the handoff pass.
all_agents: dict = {}
agents = []
db_sub_agents: dict = {}
for aid in sub_agent_ids:
db_agent = get_agent(self.db, str(aid))
if db_agent is None:
raise ValueError(f"Sub-agent {aid} not found")
db_sub_agents[str(aid)] = db_agent
ca = await self.build_conversable_agent(db_agent)
all_agents[str(aid)] = ca
agents.append(ca)

root_ca = await self.build_conversable_agent(root_agent)
all_agents[str(root_agent.id)] = root_ca

# Apply handoffs using the already-fetched db_agent records
for aid in sub_agent_ids:
db_agent = db_sub_agents.get(str(aid))
if db_agent and db_agent.config:
self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)

api_key = await self._get_api_key(root_agent)
manager_llm = self._build_llm_config(root_agent, api_key)

pattern_type = config.get("pattern", "auto")
if pattern_type == "auto":
pattern = AutoPattern(
initial_agent=root_ca,
agents=[root_ca] + agents,
group_manager_args={"llm_config": manager_llm},
)
else:
pattern = DefaultPattern(
initial_agent=root_ca,
agents=[root_ca] + agents,
group_after_work=RevertToUserTarget(),
)

return {
"pattern": pattern,
"agents": [root_ca] + agents,
"max_rounds": config.get("max_rounds", 10),
"context_variables": ContextVariables(
data=config.get("context_variables", {})
),
}

async def build_agent(self, root_agent) -> Tuple[object, None]:
"""
Entry point matching the ADK/CrewAI AgentBuilder interface.
Returns (agent_or_setup_dict, exit_stack).

Orchestration mode is read from config["ag2_mode"]:
"group_chat" → GroupChat with sub-agents from config["sub_agents"]
"single" / absent → single ConversableAgent (default)
No new agent type is required in the DB; all AG2 agents use type="llm".
"""
ag2_mode = (root_agent.config or {}).get("ag2_mode", "single")
if ag2_mode == "group_chat":
return await self.build_group_chat_setup(root_agent), None
else:
return await self.build_conversable_agent(root_agent), None
Loading