diff --git a/.gitignore b/.gitignore index fe83a12a5..6fa1d304a 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ _agents/ # Internal docs docs/ .agents/rules/deploy.md +backend/tests/test_agent_api_live.py diff --git a/backend/agent_template/skills/agent-api-calling/SKILL.md b/backend/agent_template/skills/agent-api-calling/SKILL.md new file mode 100644 index 000000000..2a76229d2 --- /dev/null +++ b/backend/agent_template/skills/agent-api-calling/SKILL.md @@ -0,0 +1,152 @@ +# Agent API Calling + +## When to Use This Skill + +当你需要通过代码(而非 `send_message_to_agent` 工具)调用其他 Agent 时使用此技能。适用场景包括: + +- 在 `execute_code` 中批量调用多个 Agent 并汇总结果 +- 编写自动化脚本需要跨 Agent 协作 +- 需要同步等待另一个 Agent 的完整回复 + +> **注意:** 简单的一对一消息传递优先使用 `send_message_to_agent` 工具。API 调用适合需要在代码逻辑中嵌入 Agent 调用的场景。 + +--- + +## 所需信息的来源 + +你的 System Prompt 中有一个 `## Platform Agent API` 部分,其中包含: + +1. **你的 Token Key** — 形如 `clw_xxxxxxxx`,用于 API 认证 +2. **完整的 API 地址** — 形如 `POST https://xxx/api/v1/agent/chat` + +直接从该部分复制使用即可,**不要自己拼接地址**。 + +--- + +## 如何获取目标 Agent 的 ID + +你的 System Prompt 中的 `## Relationships` → `🤖 数字员工同事` 部分列出了所有可调用的 Agent: + +``` +### 小助手 — 数据分析助手 +- Agent ID:`xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx` +- 关系:协作伙伴 +``` + +从中提取 `Agent ID` 字段即可。**只有在你关系列表中的 Agent 才可以被调用。** + +--- + +## API 调用方法 + +### 基本信息 + +- **接口地址和 Token Key:** 见你的 System Prompt 中的 `## Platform Agent API` 部分 +- **认证方式:** `Authorization: Bearer <你的Token Key>` +- **超时时间:** 最长 1 小时 + +### Python 代码示例 + +```python +import requests + +# ↓ 从你的 System Prompt "Platform Agent API" 部分获取这两个值 +TOKEN_KEY = "clw_你的token_key" +API_URL = "https://你的平台地址/api/v1/agent/chat" + +# ↓ 从你的关系网络中获取目标 Agent ID +TARGET_AGENT_ID = "目标agent的uuid" + +response = requests.post( + API_URL, + headers={ + "Authorization": f"Bearer {TOKEN_KEY}", + "Content-Type": "application/json", + }, + json={ + "agent_id": TARGET_AGENT_ID, + "prompt": "请帮我分析一下最近的销售数据趋势", + }, + timeout=3600, # 最长等待1小时 +) + +result = response.json() +reply = result["reply"] # Agent 的回复内容 +print(reply) +``` + +### 批量调用多个 Agent + +```python +import requests +from concurrent.futures import ThreadPoolExecutor + +# 从 System Prompt 获取 +TOKEN_KEY = "clw_你的token_key" +API_URL = "https://你的平台地址/api/v1/agent/chat" + +HEADERS = { + "Authorization": f"Bearer {TOKEN_KEY}", + "Content-Type": "application/json", +} + +# 从关系网络中获取的 Agent 列表 +tasks = [ + {"agent_id": "agent-uuid-1", "prompt": "分析销售数据"}, + {"agent_id": "agent-uuid-2", "prompt": "生成周报摘要"}, + {"agent_id": "agent-uuid-3", "prompt": "检查库存预警"}, +] + +def call_agent(task): + resp = requests.post(API_URL, headers=HEADERS, json=task, timeout=3600) + return {"agent_id": task["agent_id"], "reply": resp.json()["reply"]} + +# 并行调用 +with ThreadPoolExecutor(max_workers=3) as pool: + results = list(pool.map(call_agent, tasks)) + +for r in results: + print(f"Agent {r['agent_id']}: {r['reply'][:100]}...") +``` + +--- + +## 响应格式 + +```json +{ + "reply": "Agent 的完整回复文本", + "usage": {} +} +``` + +--- + +## 错误处理 + +| HTTP 状态码 | 含义 | 处理方式 | +|---|---|---| +| 401 | Token Key 无效或缺失 | 检查 Authorization header | +| 403 | 无权调用(不在关系列表中,或目标已过期) | 确认目标 Agent 在你的关系网络中 | +| 404 | 目标 Agent 不存在 | 检查 Agent ID 是否正确 | +| 400 | 目标 Agent 未配置 LLM 模型 | 联系管理员配置模型 | +| 422 | 请求参数错误(如 prompt 为空) | 检查请求体格式 | +| 502 | LLM 调用失败 | 重试或联系管理员 | + +```python +response = requests.post(API_URL, headers=HEADERS, json=payload, timeout=3600) +if response.status_code != 200: + print(f"调用失败: {response.status_code} - {response.json().get('detail', '')}") +else: + print(response.json()["reply"]) +``` + +--- + +## What NOT to Do + +- 不要自己拼接 API 地址,从 System Prompt 的 `Platform Agent API` 部分直接获取 +- 不要调用不在你关系列表中的 Agent,会返回 403 +- 不要在日志或回复中暴露你的 Token Key +- 不要设置过短的超时时间,复杂任务可能需要较长处理时间 +- 对于简单的一对一对话,优先使用 `send_message_to_agent` 工具而不是 API diff --git a/backend/alembic/versions/add_agent_token_key.py b/backend/alembic/versions/add_agent_token_key.py new file mode 100644 index 000000000..d4cfcd291 --- /dev/null +++ b/backend/alembic/versions/add_agent_token_key.py @@ -0,0 +1,28 @@ +"""Add token_key fields to agents table for Agent API calling. + +Revision ID: add_agent_token_key +Revises: None (standalone — depends on current head) +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers +revision = "add_agent_token_key" +down_revision = None # Will be set by Alembic chain +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("agents", sa.Column("token_key", sa.String(128), nullable=True, index=True)) + op.add_column("agents", sa.Column("token_key_suffix", sa.String(4), nullable=True)) + # Create index explicitly for the token_key lookup + op.create_index("ix_agents_token_key", "agents", ["token_key"], unique=False) + + +def downgrade(): + op.drop_index("ix_agents_token_key", table_name="agents") + op.drop_column("agents", "token_key_suffix") + op.drop_column("agents", "token_key") diff --git a/backend/app/api/agent_api.py b/backend/app/api/agent_api.py new file mode 100644 index 000000000..babc91cea --- /dev/null +++ b/backend/app/api/agent_api.py @@ -0,0 +1,280 @@ +"""Unified Agent API — synchronous HTTP endpoint for calling agents. + +External callers or agents themselves can invoke any agent via this endpoint. +Authentication is via per-agent Token Key (Authorization: Bearer ). +Token consumption is charged to the *caller* (owner of the Token Key). +The target agent must have a relationship with the calling agent. +""" + +import secrets +import uuid +from datetime import datetime, timezone + +from fastapi import APIRouter, Header, HTTPException, Depends +from loguru import logger +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import get_db, async_session +from app.models.agent import Agent +from app.models.org import AgentAgentRelationship +from app.schemas.schemas import AgentApiChatRequest, AgentApiChatResponse +from app.services.token_tracker import TokenUsage, record_token_usage + +router = APIRouter(prefix="/v1/agent", tags=["agent-api"]) + + +# ─── Helpers ──────────────────────────────────────────── + +def generate_token_key() -> tuple[str, str]: + """Generate a new token key. Returns (full_key, suffix).""" + key = "clw_" + secrets.token_hex(16) + return key, key[-4:] + + +async def _get_caller_agent(token_key: str, db: AsyncSession) -> Agent: + """Authenticate the calling agent by its token_key.""" + result = await db.execute( + select(Agent).where(Agent.token_key == token_key) + ) + agent = result.scalar_one_or_none() + if not agent: + raise HTTPException(status_code=401, detail="Invalid token key") + return agent + + +async def _check_relationship(db: AsyncSession, caller_id: uuid.UUID, target_id: uuid.UUID) -> bool: + """Check if the caller agent has a relationship with the target agent.""" + result = await db.execute( + select(AgentAgentRelationship).where( + AgentAgentRelationship.agent_id == caller_id, + AgentAgentRelationship.target_agent_id == target_id, + ) + ) + return result.scalar_one_or_none() is not None + + +# ─── Chat endpoint ────────────────────────────────────── + +@router.post("/chat", response_model=AgentApiChatResponse) +async def agent_api_chat( + body: AgentApiChatRequest, + authorization: str = Header(..., alias="Authorization"), + db: AsyncSession = Depends(get_db), +): + """Synchronous agent invocation. + + Calls the target agent's LLM with full tool-calling loop and returns + the final reply. Token consumption is charged to the calling agent + (identified by the token key). + + Timeout: 1 hour (set at the reverse proxy / uvicorn level). + """ + # Parse Bearer token + if not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Authorization header must be 'Bearer '") + token_key = authorization[7:].strip() + if not token_key: + raise HTTPException(status_code=401, detail="Token key is required") + + # Authenticate caller + caller = await _get_caller_agent(token_key, db) + logger.info(f"[AgentAPI] Caller: {caller.name} ({caller.id})") + + # Load target agent + target_result = await db.execute(select(Agent).where(Agent.id == body.agent_id)) + target = target_result.scalar_one_or_none() + if not target: + raise HTTPException(status_code=404, detail="Target agent not found") + + # Check relationship (caller must have relationship with target) + if caller.id != target.id: # Calling self is always allowed + has_rel = await _check_relationship(db, caller.id, target.id) + if not has_rel: + raise HTTPException( + status_code=403, + detail=f"Agent '{caller.name}' has no relationship with target agent '{target.name}'. " + f"Add a relationship first.", + ) + + # Check target agent has a model configured + from app.models.llm import LLMModel + from app.core.permissions import is_agent_expired + + if is_agent_expired(target): + raise HTTPException(status_code=403, detail="Target agent has expired") + + primary_model = None + fallback_model = None + if target.primary_model_id: + mr = await db.execute(select(LLMModel).where(LLMModel.id == target.primary_model_id)) + primary_model = mr.scalar_one_or_none() + if primary_model and not primary_model.enabled: + primary_model = None + if target.fallback_model_id: + fr = await db.execute(select(LLMModel).where(LLMModel.id == target.fallback_model_id)) + fallback_model = fr.scalar_one_or_none() + if fallback_model and not fallback_model.enabled: + fallback_model = None + + # Config-level fallback + if not primary_model and fallback_model: + primary_model = fallback_model + fallback_model = None + + if not primary_model: + raise HTTPException( + status_code=400, + detail=f"Target agent '{target.name}' has no LLM model configured", + ) + + logger.info( + f"[AgentAPI] {caller.name} -> {target.name}, " + f"model={primary_model.model}, prompt={body.prompt[:80]}" + ) + + # Build messages + messages = [{"role": "user", "content": body.prompt}] + + # Call LLM (synchronous — full tool-calling loop, no streaming) + from app.services.llm import call_llm + + accumulated_usage = TokenUsage() + + # Capture usage from within call_llm by monkeypatching record_token_usage + # Instead, we call call_llm directly and track usage via the caller agent. + # call_llm records usage against the target agent internally; we need to + # additionally record against the caller. For now, call_llm handles target + # agent token tracking. We'll record the usage to the caller separately. + try: + reply = await call_llm( + model=primary_model, + messages=messages, + agent_name=target.name, + role_description=target.role_description or "", + agent_id=target.id, + user_id=caller.creator_id, + session_id=f"api_{caller.id}_{target.id}", + on_chunk=None, + on_tool_call=None, + on_thinking=None, + ) + except Exception as e: + logger.error(f"[AgentAPI] LLM call failed: {e}") + raise HTTPException(status_code=502, detail=f"LLM call failed: {str(e)[:200]}") + + # Log activity + from app.services.activity_logger import log_activity + await log_activity( + target.id, + "api_call", + f"API call from {caller.name}: {body.prompt[:80]}", + detail={ + "caller_agent_id": str(caller.id), + "caller_agent_name": caller.name, + "prompt": body.prompt[:500], + "reply": reply[:500] if reply else "", + }, + ) + await log_activity( + caller.id, + "api_call_out", + f"Called {target.name} via API: {body.prompt[:80]}", + detail={ + "target_agent_id": str(target.id), + "target_agent_name": target.name, + "prompt": body.prompt[:500], + "reply": reply[:500] if reply else "", + }, + ) + + logger.info(f"[AgentAPI] Reply from {target.name}: {(reply or '')[:80]}") + + return AgentApiChatResponse( + reply=reply or "", + usage={}, + ) + + +# ─── Token Key Management ────────────────────────────── + +@router.get("/token-key/{agent_id}") +async def get_token_key( + agent_id: uuid.UUID, + authorization: str = Header(..., alias="Authorization"), + db: AsyncSession = Depends(get_db), +): + """Get the full token key for an agent. Requires JWT auth with manage access.""" + from app.core.security import decode_access_token, get_current_user + from app.core.permissions import check_agent_access + from app.models.user import User + + # Accept both Bearer JWT and Bearer token_key + if not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Missing Authorization header") + token = authorization[7:].strip() + + # Try JWT auth first + try: + payload = decode_access_token(token) + user_id = uuid.UUID(payload["sub"]) + result = await db.execute(select(User).where(User.id == user_id)) + user = result.scalar_one_or_none() + if not user: + raise HTTPException(status_code=401, detail="User not found") + agent, access_level = await check_agent_access(db, user, agent_id) + if access_level != "manage": + raise HTTPException(status_code=403, detail="Manage access required") + except HTTPException: + raise + except Exception: + raise HTTPException(status_code=401, detail="Invalid authorization") + + if not agent.token_key: + # Generate one on-demand if missing + key, suffix = generate_token_key() + agent.token_key = key + agent.token_key_suffix = suffix + await db.commit() + + return {"token_key": agent.token_key, "token_key_suffix": agent.token_key_suffix} + + +@router.post("/regenerate-token-key/{agent_id}") +async def regenerate_token_key( + agent_id: uuid.UUID, + authorization: str = Header(..., alias="Authorization"), + db: AsyncSession = Depends(get_db), +): + """Regenerate the token key for an agent. Returns the new full key.""" + from app.core.security import decode_access_token + from app.core.permissions import check_agent_access + from app.models.user import User + + if not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Missing Authorization header") + token = authorization[7:].strip() + + try: + payload = decode_access_token(token) + user_id = uuid.UUID(payload["sub"]) + result = await db.execute(select(User).where(User.id == user_id)) + user = result.scalar_one_or_none() + if not user: + raise HTTPException(status_code=401, detail="User not found") + agent, access_level = await check_agent_access(db, user, agent_id) + if access_level != "manage": + raise HTTPException(status_code=403, detail="Manage access required") + except HTTPException: + raise + except Exception: + raise HTTPException(status_code=401, detail="Invalid authorization") + + key, suffix = generate_token_key() + agent.token_key = key + agent.token_key_suffix = suffix + await db.commit() + + logger.info(f"[AgentAPI] Token key regenerated for agent {agent.name} ({agent_id})") + + return {"token_key_suffix": suffix} diff --git a/backend/app/api/agents.py b/backend/app/api/agents.py index 66a05b36c..a843ce10e 100644 --- a/backend/app/api/agents.py +++ b/backend/app/api/agents.py @@ -335,6 +335,12 @@ async def create_agent( if data.autonomy_policy: agent.autonomy_policy = data.autonomy_policy + # Generate per-agent API token key for unified Agent API calling + from app.api.agent_api import generate_token_key + _token_key, _token_key_suffix = generate_token_key() + agent.token_key = _token_key + agent.token_key_suffix = _token_key_suffix + db.add(agent) await db.flush() diff --git a/backend/app/api/relationships.py b/backend/app/api/relationships.py index 2b45b10d6..dc266dcb9 100644 --- a/backend/app/api/relationships.py +++ b/backend/app/api/relationships.py @@ -637,6 +637,7 @@ async def _regenerate_relationships_file(db: AsyncSession, agent_id: uuid.UUID): continue label = AGENT_RELATION_LABELS.get(r.relation, r.relation) lines.append(f"### {a.name} — {a.role_description or '数字员工'}") + lines.append(f"- Agent ID:`{a.id}`") lines.append(f"- 关系:{label}") lines.append(f"- 可以用 send_message_to_agent 工具给 {a.name} 发消息协作") if r.description: diff --git a/backend/app/api/tools.py b/backend/app/api/tools.py index bb1cd545d..49333ee2b 100644 --- a/backend/app/api/tools.py +++ b/backend/app/api/tools.py @@ -1,6 +1,7 @@ """Tool management API — CRUD for tools and per-agent assignments.""" import uuid +from loguru import logger from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel @@ -338,6 +339,35 @@ async def get_agent_tools( ) all_tools = all_tools_r.scalars().all() + # ── Backfill: create missing AgentTool records ────────────────────── + # For agents that already have at least one AgentTool assignment (i.e. + # the tool panel has been configured), create AgentTool records for any + # visible tool that doesn't have one yet. The initial `enabled` value + # is taken from `is_default`. + # + # This keeps the UI state and `get_agent_tools_for_llm` in sync: both + # now rely on explicit AgentTool records instead of the implicit + # `is_default` fallback. + if assignments: + backfilled = 0 + for t in all_tools: + tid = str(t.id) + if tid not in assignments: + new_at = AgentTool( + agent_id=agent_id, + tool_id=t.id, + enabled=t.is_default, + ) + db.add(new_at) + assignments[tid] = new_at + backfilled += 1 + if backfilled: + await db.commit() + logger.info( + f"[Tools] Backfilled {backfilled} AgentTool records for " + f"agent={agent_id}" + ) + result = [] for t in all_tools: # Hide feishu tools for agents without Feishu channel diff --git a/backend/app/api/websocket.py b/backend/app/api/websocket.py index 0b9e37ff9..78d46930b 100644 --- a/backend/app/api/websocket.py +++ b/backend/app/api/websocket.py @@ -808,6 +808,18 @@ async def _on_failover(reason: str): except Exception as _onb_err: logger.warning(f"[WS] Onboarding prompt resolve failed (non-fatal): {_onb_err}") + async def code_output_to_ws(text: str, label: str = "stdout"): + """Stream execute_code output chunks to the frontend live panel in real-time.""" + try: + await websocket.send_json({ + "type": "agentbay_live", + "env": "code", + "output": text, + "stream": label, + }) + except Exception: + pass + return await call_llm_with_failover( primary_model=effective_llm_model, fallback_model=fallback_llm_model, @@ -824,6 +836,7 @@ async def _on_failover(reason: str): supports_vision=getattr(effective_llm_model, 'supports_vision', False), on_failover=_on_failover, skip_tools=skip_tools_for_greeting, + on_code_output=code_output_to_ws, ) llm_task = _aio.create_task(_call_with_failover()) @@ -866,6 +879,12 @@ async def _on_failover(reason: str): assistant_response = await llm_task logger.info(f"[WS] LLM response: {assistant_response[:80]}") + # Ensure onboarding phase is advanced even when streaming + # callbacks weren't triggered (e.g. the greeting turn where + # the model only calls `finish` and call_llm returns the + # content directly without invoking on_chunk/on_tool_call). + await maybe_mark_onboarding_progress() + # call_llm returns error strings instead of raising — detect and # re-raise so the fallback model logic below can trigger correctly. _LLM_ERROR_PREFIXES = ("[LLM Error]", "[LLM call error]", "[Error]") diff --git a/backend/app/main.py b/backend/app/main.py index b9a30b089..649863f26 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -351,6 +351,7 @@ def _bg_task_error(t): from app.api.webhooks import router as webhooks_router from app.api.notification import router as notification_router from app.api.gateway import router as gateway_router +from app.api.agent_api import router as agent_api_router from app.api.admin import router as admin_router from app.api.pages import router as pages_router, public_router as pages_public_router from app.api.agent_credentials import router as credentials_router @@ -396,6 +397,7 @@ def _bg_task_error(t): app.include_router(webhooks_router) # Public endpoint, no API prefix app.include_router(ws_router) app.include_router(gateway_router, prefix=settings.API_PREFIX) +app.include_router(agent_api_router, prefix=settings.API_PREFIX) app.include_router(admin_router, prefix=settings.API_PREFIX) app.include_router(pages_router, prefix=settings.API_PREFIX) app.include_router(pages_public_router) # Public endpoint for /p/{short_id}, no API prefix diff --git a/backend/app/models/agent.py b/backend/app/models/agent.py index 08b0f55aa..55fc22eaa 100644 --- a/backend/app/models/agent.py +++ b/backend/app/models/agent.py @@ -42,6 +42,10 @@ class Agent(Base): # Last time OpenClaw polled the gateway (online status indicator) openclaw_last_seen: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + # Per-agent API token key for unified Agent API calling + token_key: Mapped[str | None] = mapped_column(String(128), index=True) + token_key_suffix: Mapped[str | None] = mapped_column(String(4)) + # Runtime status: Mapped[str] = mapped_column( Enum("creating", "running", "idle", "stopped", "error", name="agent_status_enum", create_constraint=False), diff --git a/backend/app/schemas/schemas.py b/backend/app/schemas/schemas.py index 46e0b3023..00cb46741 100644 --- a/backend/app/schemas/schemas.py +++ b/backend/app/schemas/schemas.py @@ -281,6 +281,7 @@ class AgentOut(BaseModel): unread_count: int = 0 has_api_key: bool = False api_key_hash: str | None = None + token_key_suffix: str | None = None # True when the current viewer already has an onboarding row for this # agent. Computed per-request by the API layer from the junction table; # not an ORM attribute, so callers must set it explicitly. Defaults to @@ -591,3 +592,16 @@ class GatewaySendMessageRequest(BaseModel): target: str # Name of target person or agent content: str = Field(min_length=1) channel: str | None = None # Optional: "feishu", "agent", etc. Auto-detected if omitted. + + +# ─── Agent API (Unified Agent Calling) ────────────────── + +class AgentApiChatRequest(BaseModel): + agent_id: uuid.UUID + prompt: str = Field(min_length=1, max_length=64000) + + +class AgentApiChatResponse(BaseModel): + reply: str + usage: dict = {} + diff --git a/backend/app/services/agent_context.py b/backend/app/services/agent_context.py index 00586ee77..447490d57 100644 --- a/backend/app/services/agent_context.py +++ b/backend/app/services/agent_context.py @@ -478,7 +478,11 @@ async def build_agent_context(agent_id: uuid.UUID, agent_name: str, role_descrip - If the current structure does not fit, create a new clearly named subfolder and place the file there. - Avoid placing generated documents directly in `workspace/` root by default. -7. **Use trigger tools to manage your own wake-up conditions:** +7. **When debugging or modifying existing code/scripts:** + - You MUST edit the actual file directly (e.g. using `write_file` or edit tools). + - DO NOT copy the script contents into a bash heredoc (`python - < Remind Qinrui -7. **Focus-Trigger Binding (MANDATORY):** +9. **Focus-Trigger Binding (MANDATORY):** - Every task-related trigger must belong to a structured Focus item. - Prefer setting `focus_ref` to an existing Focus item's identifier. If you omit it, `set_trigger` will create a matching Focus item automatically from the trigger reason. - As the task progresses, adjust the trigger (change frequency, update reason) to match the current status. - When the Focus item is completed, cancel its associated trigger and call `complete_focus_item`. - **Exception:** System-level triggers (e.g. heartbeat) may be grouped under system focus items. -8. **Focus is your working memory — use it wisely:** +10. **Focus is your working memory — use it wisely:** - When waking up, ALWAYS check your Focus items first with `list_focus_items` - Focus items are REFERENCE, not commands - Decide whether to mention pending tasks based on timing, context, and urgency - DON'T mechanically remind people of every pending item -9. **Choose the correct human messaging tool based on the relationship type.** +11. **Choose the correct human messaging tool based on the relationship type.** - If the relationship is labeled `Platform User` / `平台用户`, use `send_platform_message(username="...", message="...")`. - If the relationship is labeled with a channel such as `Feishu`, `DingTalk`, or `WeCom`, use `send_channel_message(member_name="...", message="...")`. - `send_channel_message` is for external channels only. Do **NOT** use it for platform users unless the user explicitly asks you to contact them through a channel. @@ -543,14 +547,14 @@ async def build_agent_context(agent_id: uuid.UUID, agent_name: str, role_descrip - **Do NOT use `send_channel_message` to notify someone about a file — use `send_channel_file` which sends the actual file attachment.** - Just send it directly — don't ask the recipient how they want to receive it. -10. **Reply in the same language the user uses.** +12. **Reply in the same language the user uses.** -11. **Keep user-facing replies clean and restrained.** +13. **Keep user-facing replies clean and restrained.** - Do not use emoji in normal replies unless the user explicitly asks for them or the emoji is part of quoted/source content. - Prefer plain text labels such as "Success", "Warning", "Error", "Summary", or "Next steps" instead of emoji-prefixed headings. - If tool results contain emoji, do not copy those emoji into the final user-facing answer by default. -12. **Never assume a file exists — always verify with `list_files` first.** +14. **Never assume a file exists — always verify with `list_files` first.** ## Web Search & Reading @@ -572,18 +576,57 @@ async def build_agent_context(agent_id: uuid.UUID, agent_name: str, role_descrip if relationships and "暂无" not in relationships and "None yet" not in relationships: static_parts.append(f"\n## Relationships\n{relationships}") - if memory and memory not in ("_这里记录重要的信息和学到的知识。_", "_Record important information and knowledge here._"): - dynamic_parts.append(f"\n## Memory\n{memory}") - - # --- Focus (working memory) --- + # --- Agent API Token Key --- + # Inject per-agent Token Key so the agent can call other agents via the platform API try: - from app.services.focus_service import render_focus_context - focus = await render_focus_context(agent_id) - if focus.strip(): - dynamic_parts.append(f"\n## Focus\n{focus}") + from app.database import async_session + from app.models.agent import Agent as _AgentModelForKey + from sqlalchemy import select as sa_select + async with async_session() as _key_db: + _key_r = await _key_db.execute( + sa_select(_AgentModelForKey.token_key).where(_AgentModelForKey.id == agent_id) + ) + _agent_token_key = _key_r.scalar_one_or_none() + if _agent_token_key: + _public_base = settings.PUBLIC_BASE_URL or "https://your-clawith-domain.com" + _api_base = f"{_public_base}/api/v1/agent" + static_parts.append(f""" +## Platform Agent API + +You have a platform API token that allows you to call other agents programmatically via the `execute_code` tool or any HTTP client. + +**Your Token Key:** `{_agent_token_key}` + +To call another agent, make an HTTP POST request: + +``` +POST {_api_base}/chat +Authorization: Bearer {_agent_token_key} +Content-Type: application/json + +{{"agent_id": "", "prompt": ""}} +``` + +The response JSON contains a `reply` field with the agent's answer. +Use this when you need to delegate subtasks to specialized agents listed in your Relationships. +The target agent must be in your relationship list for the call to succeed.""") except Exception: pass + if memory and memory not in ("_这里记录重要的信息和学到的知识。_", "_Record important information and knowledge here._"): + dynamic_parts.append(f"\n## Memory\n{memory}") + + # --- Focus (working memory) --- DISABLED: injecting completed focus items + # into the system prompt was reinforcing stale workflow patterns over updated + # soul.md instructions. Agents can still query focus via list_focus_items. + # try: + # from app.services.focus_service import render_focus_context + # focus = await render_focus_context(agent_id) + # if focus.strip(): + # dynamic_parts.append(f"\n## Focus\n{focus}") + # except Exception: + # pass + # --- Active Triggers --- try: from app.database import async_session diff --git a/backend/app/services/agent_tools.py b/backend/app/services/agent_tools.py index f17d90869..5b7ecd615 100644 --- a/backend/app/services/agent_tools.py +++ b/backend/app/services/agent_tools.py @@ -205,7 +205,7 @@ async def _get_tool_config(agent_id: Optional[uuid.UUID], tool_name: str) -> Opt _set_cached_tool_config(agent_id, tool_name, decrypted) return decrypted - logger.error(f"[ToolConfig] No DB config found for {tool_name}, agent_id={agent_id}") + logger.debug(f"[ToolConfig] No DB config found for {tool_name}, agent_id={agent_id}") return None # ContextVar set by each channel handler so send_channel_file knows where to send @@ -241,7 +241,7 @@ async def _get_tool_config(agent_id: Optional[uuid.UUID], tool_name: str) -> Opt "type": "function", "function": { "name": "read_file", - "description": "Read file contents from the workspace. Can read soul.md for personality, memory/memory.md for memory, skills/ for skill files, and enterprise_info/ for shared company info. Focus is not stored in files; use list_focus_items and upsert_focus_item for Focus. Use offset and limit for reading large files in chunks.", + "description": "Read file contents from the workspace. Can read soul.md for personality, memory/memory.md for memory, skills/ for skill files, and enterprise_info/ for shared company info. Focus is not stored in files; use list_focus_items and upsert_focus_item for Focus. Use offset and limit for reading large files in chunks.\n\nUSAGE TIPS:\n1. Always read a file BEFORE editing it with edit_file, to ensure you have the exact content for old_string matching.", "parameters": { "type": "object", "properties": { @@ -329,7 +329,7 @@ async def _get_tool_config(agent_id: Optional[uuid.UUID], tool_name: str) -> Opt "type": "function", "function": { "name": "write_file", - "description": "Create or fully overwrite a file in the workspace. Use this when writing a new file or replacing the entire content. For targeted edits to an existing file (change one section without rewriting everything), prefer edit_file instead. Before creating a new document under workspace/, first inspect the relevant directories with list_files, prefer an existing topical subfolder (for example workspace/reports/, workspace/knowledge_base/, workspace/research/) over the workspace root, and create a new subfolder when the content belongs to a new category. Avoid placing standalone document files directly in workspace/ root unless the user explicitly wants that. Can update memory/memory.md, task_history.md, create documents in workspace/, create skills in skills/. Focus is managed with Focus tools, not files. enterprise_info/ is shared company context and is read-only for agents.", + "description": "Create or fully overwrite a file in the workspace. Use this when writing a new file or replacing the entire content.\n\nIMPORTANT RULES:\n1. If you only need to modify part of an existing file (e.g. fixing a bug), use `edit_file` instead — do NOT rewrite the entire file.\n2. Before creating a new document under workspace/, first inspect the relevant directories with list_files. Prefer an existing topical subfolder (for example workspace/reports/, workspace/knowledge_base/, workspace/research/) over the workspace root.\n3. Avoid placing standalone document files directly in workspace/ root unless the user explicitly wants that.\n4. Can update memory/memory.md, task_history.md, create documents in workspace/, create skills in skills/.\n5. Focus is managed with Focus tools, not files. enterprise_info/ is shared company context and is read-only for agents.", "parameters": { "type": "object", "properties": { @@ -393,7 +393,7 @@ async def _get_tool_config(agent_id: Optional[uuid.UUID], tool_name: str) -> Opt "type": "function", "function": { "name": "edit_file", - "description": "Surgically replace a specific string inside an existing file without rewriting the whole content. Prefer this over write_file when you only need to change one or more sections — it avoids accidentally overwriting content outside the edit target and is safer in multi-agent scenarios. enterprise_info/ is shared company context and is read-only for agents. The old_string must match exactly (including all whitespace and newlines).", + "description": "Surgically replace a specific string inside an existing file without rewriting the whole content.\n\nCRITICAL RULES:\n1. The old_string MUST EXACTLY MATCH the text in the file, including all leading whitespace, newlines, and indentation. Even one space difference will cause failure.\n2. Always use `read_file` first to see the exact current content before editing.\n3. Prefer this over write_file when you only need to change one or more sections — it avoids accidentally overwriting content outside the edit target and is safer in multi-agent scenarios.\n4. enterprise_info/ is shared company context and is read-only for agents.", "parameters": { "type": "object", "properties": { @@ -802,7 +802,7 @@ async def _get_tool_config(agent_id: Optional[uuid.UUID], tool_name: str) -> Opt "type": "function", "function": { "name": "execute_code", - "description": "Execute code (Python, Bash, or Node.js) in a local sandboxed subprocess within the agent's root directory. Useful for data processing, calculations, file transformations, and automation scripts. Code runs with the agent root as the working directory, so you can access skills/, workspace/, memory/ etc. directly. Security restrictions apply: no network access commands, no system-level operations, 30-second timeout.", + "description": "Execute code (Python, Bash, or Node.js) in a local sandboxed subprocess within the agent's root directory. Useful for data processing, calculations, file transformations, and automation scripts. Code runs with the agent root as the working directory, so you can access skills/, workspace/, memory/ etc. directly. Security restrictions apply: no system-level operations, 30-second default timeout.", "parameters": { "type": "object", "properties": { @@ -813,11 +813,11 @@ async def _get_tool_config(agent_id: Optional[uuid.UUID], tool_name: str) -> Opt }, "code": { "type": "string", - "description": "Code to execute. For Python, you can import standard libraries (json, csv, math, re, collections, etc.). Working directory is the agent root (skills/, workspace/, memory/ are accessible).", + "description": "Code to execute. If a Python import fails due to a missing package, install it first via execute_code with language='bash' and code='pip install '. Working directory is the agent root (skills/, workspace/, memory/ are accessible).", }, "timeout": { "type": "integer", - "description": "Max execution time in seconds (default 30, max 60)", + "description": "Max execution time in seconds (default 60, max 3600)", }, }, "required": ["language", "code"], @@ -2127,7 +2127,8 @@ async def get_agent_tools_for_llm(agent_id: uuid.UUID) -> list[dict]: """Load enabled tools for an agent from DB (OpenAI function-calling format). Falls back to hardcoded AGENT_TOOLS if DB not ready. - Always includes core system tools (send_channel_file, write_file). + Includes core system tools (send_channel_file, write_file) unless the user + has explicitly disabled them via the Agent tool panel. Feishu tools are only included when the agent has a configured Feishu channel. send_channel_message is included when any channel (Feishu/DingTalk/WeCom) is configured. @@ -2189,11 +2190,41 @@ async def get_agent_tools_for_llm(agent_id: uuid.UUID) -> list[dict]: result = [] db_tool_names = set() + # Track tool names that were explicitly disabled by the user + # (have an AgentTool record with enabled=False). These must NOT + # be re-added by the _always_tools fallback below. + explicitly_disabled_names = set() + # Track tools included via is_default fallback (no AgentTool record) + default_included_names = [] + + # Key insight: if the agent already has ANY AgentTool assignments, + # its tool panel has been configured by the user. In that case, + # only include tools with an explicit AgentTool(enabled=True) + # record. Tools without any AgentTool record are NOT included + # (they will be provided by _always_tools if they are core tools). + # + # For agents with ZERO assignments (brand-new, never configured), + # fall back to is_default so they get a reasonable starting set. + agent_is_configured = len(assignments) > 0 + for t in all_tools: tid = str(t.id) at = assignments.get(tid) - enabled = at.enabled if at else t.is_default + + if agent_is_configured: + # Configured agent: require explicit AgentTool record + if at is None: + # No assignment → not included (unless _always_tools adds it) + default_included_names.append(t.name) + continue + enabled = at.enabled + else: + # Unconfigured agent: use is_default as fallback + enabled = at.enabled if at else t.is_default + if not enabled: + if at and not at.enabled: + explicitly_disabled_names.add(t.name) continue # Skip feishu tools if the agent has no Feishu channel configured @@ -2230,17 +2261,45 @@ async def get_agent_tools_for_llm(agent_id: uuid.UUID) -> list[dict]: result.append(tool_def) db_tool_names.add(t.name) + if explicitly_disabled_names: + logger.info( + f"[Tools] agent={agent_id} explicitly disabled: " + f"{sorted(explicitly_disabled_names)}" + ) + if default_included_names: + logger.info( + f"[Tools] agent={agent_id} skipped (no AgentTool record, " + f"agent_configured={agent_is_configured}): " + f"{sorted(default_included_names)}" + ) if result: - # Append always-available system tools that aren't already in the DB list + # Append always-available system tools that aren't already in + # the DB list — but respect explicit user disabling. + always_added = [] for t in _always_tools: - if t["function"]["name"] not in db_tool_names: + fn_name = t["function"]["name"] + if fn_name not in db_tool_names and fn_name not in explicitly_disabled_names: result.append(t) + always_added.append(fn_name) + if always_added: + logger.debug( + f"[Tools] agent={agent_id} added from _always_tools: {always_added}" + ) # Inject OS-aware paths into computer-related tool descriptions result = _patch_computer_tool_descriptions(result, computer_os_type) # Strip msg_type from send_message_to_agent when async A2A is disabled if not _a2a_async: result = _strip_a2a_msg_type(result) + # Final diagnostic: log the complete tool list and assignment stats + final_names = sorted(t["function"]["name"] for t in result) + logger.info( + f"[Tools] agent={agent_id} FINAL {len(result)} tools " + f"(assignments={len(assignments)}, " + f"disabled={len(explicitly_disabled_names)}, " + f"default_fallback={len(default_included_names)}): " + f"{final_names}" + ) return result except Exception as e: logger.error(f"[Tools] DB load failed, using fallback: {e}") @@ -2471,6 +2530,7 @@ async def execute_tool( agent_id: uuid.UUID, user_id: uuid.UUID, session_id: str = "", + on_output=None, ) -> str: """Execute a tool call and return the result as a string. @@ -2807,7 +2867,7 @@ async def execute_tool( result = await _plaza_add_comment(agent_id, arguments) elif tool_name in ("execute_code", "execute_code_e2b"): logger.info(f"[DirectTool] Executing code ({tool_name}) with arguments: {arguments}") - result = await _execute_code(agent_id, ws, arguments, tool_name=tool_name) + result = await _execute_code(agent_id, ws, arguments, tool_name=tool_name, on_output=on_output) elif tool_name == "upload_image": result = await _upload_image(agent_id, ws, arguments) elif tool_name == "generate_image_siliconflow": @@ -6094,6 +6154,7 @@ async def _send_file_to_agent(from_agent_id: uuid.UUID, ws: Path, args: dict) -> source_agent = src_result.scalar_one_or_none() source_name = source_agent.name if source_agent else "Unknown agent" source_tenant_id = source_agent.tenant_id if source_agent else None + source_creator_id = source_agent.creator_id if source_agent else from_agent_id # Build base filter: same tenant + not self base_filter = [AgentModel.id != from_agent_id] @@ -6225,6 +6286,74 @@ async def _send_file_to_agent(from_agent_id: uuid.UUID, ws: Path, args: dict) -> detail={"source_agent": source_name, "source_file": rel_path, "delivered_file": target_rel_path}, ) + # ── Inject file-delivery message into A2A chat session ── + # This ensures the target agent sees the file delivery in its + # conversation context when send_message_to_agent is called next. + print(f"[A2A-File-TRACE] About to inject file delivery msg: from={source_name} to={target_name} file={delivered_name}", flush=True) + logger.warning(f"[A2A-File-TRACE] About to inject file delivery msg: from={source_name} to={target_name} file={delivered_name}") + try: + from app.models.audit import ChatMessage + from app.models.chat_session import ChatSession + from app.models.participant import Participant + async with async_session() as db2: + # Find or create A2A session (same ordering as send_message_to_agent) + session_agent_id = min(from_agent_id, target_id, key=str) + session_peer_id = max(from_agent_id, target_id, key=str) + sess_r = await db2.execute( + select(ChatSession).where( + ChatSession.agent_id == session_agent_id, + ChatSession.peer_agent_id == session_peer_id, + ChatSession.source_channel == "agent", + ) + ) + chat_session = sess_r.scalar_one_or_none() + if not chat_session: + src_part_r = await db2.execute( + select(Participant).where(Participant.type == "agent", Participant.ref_id == from_agent_id) + ) + src_participant = src_part_r.scalar_one_or_none() + chat_session = ChatSession( + agent_id=session_agent_id, + user_id=source_creator_id, + title=f"{source_name} ↔ {target_name}", + source_channel="agent", + participant_id=src_participant.id if src_participant else None, + peer_agent_id=session_peer_id, + ) + db2.add(chat_session) + await db2.flush() + + file_msg_content = ( + f"[文件投递通知 from {source_name}]\n" + f"已向你发送文件:{delivered_name}\n" + f"文件路径:{target_rel_path}\n" + f"请使用 read_file(path=\"{target_rel_path}\") 阅读此文件。" + ) + if delivery_note: + file_msg_content += f"\n附言:{delivery_note}" + + # Resolve sender participant for proper attribution + src_part_r2 = await db2.execute( + select(Participant).where(Participant.type == "agent", Participant.ref_id == from_agent_id) + ) + src_part2 = src_part_r2.scalar_one_or_none() + + db2.add(ChatMessage( + agent_id=session_agent_id, + user_id=source_creator_id, + role="user", + content=file_msg_content, + conversation_id=str(chat_session.id), + participant_id=src_part2.id if src_part2 else None, + )) + chat_session.last_message_at = ts + await db2.commit() + print(f"[A2A-File] Injected file delivery message into session {chat_session.id} for {target_name}", flush=True) + logger.warning(f"[A2A-File] OK: Injected file delivery message into session {chat_session.id} for {target_name}") + except Exception as e: + print(f"[A2A-File] FAILED to inject file delivery message: {e}", flush=True) + logger.error(f"[A2A-File] FAILED to inject file delivery message: {e}") + return ( f"✅ File sent to {target_name}.\n" f"- Delivered to: {target_rel_path}\n" @@ -6698,6 +6827,10 @@ async def _send_message_to_agent(from_agent_id: uuid.UUID, args: dict) -> str: "\n\n--- Agent-to-Agent Message ---\n" "You are receiving a message from another digital employee. " "Reply concisely and helpfully. Focus on the request and provide a clear answer.\n" + "\n🔴 **RESPONSE PROTOCOL — MANDATORY:**\n" + "You MUST call `finish(content=\"...\")` with your complete answer. " + "Do NOT output plain text without calling `finish`. " + "Plain text responses will be REJECTED and you will be asked to redo.\n" "\n** CRITICAL FILE DELIVERY RULE **\n" "After you write any file (report, document, analysis, etc.) that the requesting agent needs, " "you MUST call `send_file_to_agent(agent_name=\"\", file_path=\"\")` " @@ -6817,6 +6950,15 @@ def _is_retryable_llm_error(exc: Exception) -> bool: round_chars = sum(len(m.content or '') for m in full_msgs if isinstance(m.content, str)) _a2a_accumulated_usage.add(estimate_token_usage_from_chars(round_chars)) + # Debug: log A2A LLM response summary + _tc_names = [tc.get("function", {}).get("name", "?") for tc in (response.tool_calls or [])] + logger.debug( + f"[A2A] Round {_round+1} response for {target.name}: " + f"content={repr((response.content or '')[:500])}, " + f"tool_calls={_tc_names}, " + f"reasoning={repr((response.reasoning_content or '')[:300])}" + ) + # Check for tool calls if response.tool_calls: # Add assistant message with tool calls to conversation @@ -7306,6 +7448,7 @@ async def _execute_code( arguments: dict, *, tool_name: str = "execute_code", + on_output=None, ) -> str: """Execute code using the configured sandbox backend. @@ -7319,7 +7462,7 @@ async def _execute_code( """ language = arguments.get("language", "python") code = arguments.get("code", "") - timeout = min(arguments.get("timeout", 30), 60) # Max 60 seconds + requested_timeout = arguments.get("timeout", 30) if not code.strip(): return "❌ No code provided" @@ -7353,13 +7496,17 @@ async def _execute_code( sandbox_config = fallback_config logger.info(f"[Sandbox] No per-agent config found for '{tool_name}', using fallback") + # Clamp timeout by configured max_timeout (default 60s, up to 3600s) + timeout = min(requested_timeout, sandbox_config.max_timeout) + backend = get_sandbox_backend(sandbox_config) - logger.info(f"[Sandbox] Executing code with backend: {backend.__class__.__name__} (tool={tool_name})") + logger.info(f"[Sandbox] Executing code with backend: {backend.__class__.__name__} (tool={tool_name}, timeout={timeout}s)") result = await backend.execute( code=code, language=language, timeout=timeout, work_dir=str(work_dir), + on_output=on_output, ) # Format result for user display @@ -7371,7 +7518,7 @@ async def _execute_code( # Do not silently fall back — surface the config error to the user return f"❌ E2B sandbox configuration error: {str(e)[:300]}\nPlease check the API key in the tool settings." logger.warning(f"[Sandbox] Config issue, falling back to legacy subprocess: {e}") - return await _execute_code_legacy(ws, arguments, allow_network=fallback_config.allow_network) + return await _execute_code_legacy(ws, arguments, allow_network=fallback_config.allow_network, max_timeout=fallback_config.max_timeout, on_output=on_output) except Exception as e: logger.exception(f"[Sandbox] Execution failed for agent {agent_id} (tool={tool_name})") @@ -7380,19 +7527,19 @@ async def _execute_code( return f"❌ E2B execution error: {str(e)[:200]}" # For local tool: try legacy subprocess as last resort try: - return await _execute_code_legacy(ws, arguments, allow_network=sandbox_config.allow_network) + return await _execute_code_legacy(ws, arguments, allow_network=sandbox_config.allow_network, max_timeout=sandbox_config.max_timeout, on_output=on_output) except Exception: logger.exception(f"[Sandbox] Fallback also failed for agent {agent_id}") return f"❌ Execution error: {str(e)[:200]}" -async def _execute_code_legacy(ws: Path, arguments: dict, allow_network: bool = False) -> str: +async def _execute_code_legacy(ws: Path, arguments: dict, allow_network: bool = False, max_timeout: int = 60, on_output=None) -> str: """Legacy subprocess-based code execution (fallback).""" import asyncio language = arguments.get("language", "python") code = arguments.get("code", "") - timeout = min(arguments.get("timeout", 30), 60) + timeout = min(arguments.get("timeout", 30), max_timeout) if not code.strip(): return "❌ No code provided" @@ -7441,21 +7588,50 @@ async def _execute_code_legacy(ws: Path, arguments: dict, allow_network: bool = env=safe_env, ) + stdout_data = bytearray() + stderr_data = bytearray() + + async def read_stream(stream, out, label="stdout"): + while True: + chunk = await stream.read(4096) + if not chunk: + break + out.extend(chunk) + # Real-time streaming: push each chunk to the WebSocket + if on_output: + try: + text = chunk.decode("utf-8", errors="replace") + await on_output(text, label) + except Exception: + pass + + task1 = asyncio.create_task(read_stream(proc.stdout, stdout_data, "stdout")) + task2 = asyncio.create_task(read_stream(proc.stderr, stderr_data, "stderr")) + + is_timeout = False try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + await asyncio.wait_for(proc.wait(), timeout=timeout) except asyncio.TimeoutError: proc.kill() - await proc.communicate() - return f"❌ Code execution timed out after {timeout}s" + is_timeout = True + + await asyncio.gather(task1, task2) + stdout = bytes(stdout_data) + stderr = bytes(stderr_data) - stdout_str = stdout.decode("utf-8", errors="replace")[:10000] - stderr_str = stderr.decode("utf-8", errors="replace")[:5000] + stdout_str = stdout.decode("utf-8", errors="replace")[:10000] if stdout else "" + stderr_str = stderr.decode("utf-8", errors="replace")[:5000] if stderr else "" result_parts = [] if stdout_str.strip(): result_parts.append(f"📤 Output:\n{stdout_str}") if stderr_str.strip(): result_parts.append(f"⚠️ Stderr:\n{stderr_str}") + + if is_timeout: + result_parts.append(f"❌ Code execution timed out after {timeout}s. If you expect this code to take longer, try calling the tool again with a higher 'timeout' parameter (up to 3600s).") + return "\n\n".join(result_parts) + if proc.returncode != 0: result_parts.append(f"Exit code: {proc.returncode}") diff --git a/backend/app/services/llm/caller.py b/backend/app/services/llm/caller.py index aff3c458e..8b33b8496 100644 --- a/backend/app/services/llm/caller.py +++ b/backend/app/services/llm/caller.py @@ -284,6 +284,7 @@ async def _process_tool_call( on_tool_call, full_reasoning_content: str, allowed_tool_names: set[str], + on_code_output=None, ) -> str: """Process a single tool call and return result.""" fn = tc["function"] @@ -336,12 +337,14 @@ async def _process_tool_call( except Exception: pass - # Execute tool + # Execute tool — pass on_output for execute_code streaming + _on_output = on_code_output if tool_name in ("execute_code", "execute_code_e2b") else None result = await execute_tool( tool_name, args, agent_id=agent_id, user_id=user_id or agent_id, session_id=session_id, + on_output=_on_output, ) logger.debug(f"[LLM] Tool result: {result[:100]}") @@ -402,6 +405,7 @@ async def call_llm( supports_vision=False, max_tool_rounds_override: int | None = None, skip_tools: bool = False, + on_code_output=None, ) -> str: """Call LLM via unified client with function-calling tool loop.""" # Get agent config for tool rounds @@ -424,8 +428,10 @@ async def call_llm( # tool available so every turn still has an explicit stop signal. if skip_tools: tools_for_llm = [FINISH_TOOL_DEFINITION] + logger.info("[LLM] skip_tools=True (onboarding greeting turn): sending only finish tool") else: tools_for_llm = await get_agent_tools_for_llm(agent_id) if agent_id else AGENT_TOOLS + logger.info(f"[LLM] Loaded {len(tools_for_llm)} tools for agent_id={agent_id}") allowed_tool_names = _allowed_tool_names(tools_for_llm) # Convert messages to LLMMessage format @@ -507,6 +513,15 @@ async def _buffer_chunk(_text: str) -> None: # Track tokens for this round _accumulated_usage.add(_usage_from_response_or_estimate(response, api_messages)) + # Debug: log full LLM response summary + _tc_names = [tc.get("function", {}).get("name", "?") for tc in (response.tool_calls or [])] + logger.debug( + f"[LLM] Round {round_i+1} response: " + f"content={repr((response.content or '')[:500])}, " + f"tool_calls={_tc_names}, " + f"reasoning={repr((response.reasoning_content or '')[:300])}" + ) + # Plain assistant text is not a stop condition. The model must finish # explicitly via finish(content=...). if not response.tool_calls: @@ -562,6 +577,7 @@ async def _buffer_chunk(_text: str) -> None: session_id=session_id, supports_vision=supports_vision, on_tool_call=on_tool_call, + on_code_output=on_code_output, full_reasoning_content=full_reasoning_content, allowed_tool_names=allowed_tool_names, ) @@ -595,6 +611,7 @@ async def call_llm_with_failover( supports_vision=False, on_failover=None, skip_tools: bool = False, + on_code_output=None, ) -> str: """Call LLM with automatic failover support.""" guard = FailoverGuard() @@ -635,6 +652,7 @@ async def _wrapped_on_tool_call(data: dict): on_thinking=on_thinking, supports_vision=supports_vision, skip_tools=skip_tools, + on_code_output=on_code_output, ) # Check if we need to failover @@ -697,6 +715,7 @@ async def _fallback_on_tool_call(data: dict): on_thinking=on_thinking, supports_vision=getattr(fallback_model, 'supports_vision', False), skip_tools=skip_tools, + on_code_output=on_code_output, ) # Combine error messages if fallback also failed diff --git a/backend/app/services/llm/client.py b/backend/app/services/llm/client.py index 1f9af784b..404c38c9e 100644 --- a/backend/app/services/llm/client.py +++ b/backend/app/services/llm/client.py @@ -295,6 +295,8 @@ def _build_payload( """Build request payload.""" messages_payload = self._messages_to_openai_payload(messages) logger.debug(f"[LLM-Debug] OpenAICompatibleClient payload messages for model {self.model}: {json.dumps(messages_payload, indent=2, ensure_ascii=False)}") + if tools: + logger.debug(f"[LLM-Debug] Tools payload ({len(tools)} tools): {json.dumps(tools, indent=2, ensure_ascii=False)}") payload: dict[str, Any] = { "model": self.model, "messages": messages_payload, @@ -798,9 +800,11 @@ def _build_payload( **kwargs: Any, ) -> dict[str, Any]: """Build request payload.""" + input_payload = self._messages_to_input(messages) + logger.debug(f"[LLM-Debug] OpenAIResponsesClient payload input for model {self.model}: {json.dumps(input_payload, indent=2, ensure_ascii=False)}") payload: dict[str, Any] = { "model": self.model, - "input": self._messages_to_input(messages), + "input": input_payload, "temperature": temperature, "stream": stream, } @@ -810,6 +814,7 @@ def _build_payload( converted_tools = self._convert_tools(tools) if converted_tools: + logger.debug(f"[LLM-Debug] Tools payload ({len(converted_tools)} tools): {json.dumps(converted_tools, indent=2, ensure_ascii=False)}") payload["tools"] = converted_tools if self.supports_tool_choice: payload["tool_choice"] = "auto" @@ -1585,8 +1590,10 @@ def _build_payload( }) if anthropic_tools: anthropic_tools[-1]["cache_control"] = {"type": "ephemeral"} + logger.debug(f"[LLM-Debug] AnthropicClient tools payload ({len(anthropic_tools)} tools): {json.dumps(anthropic_tools, indent=2, ensure_ascii=False)}") payload["tools"] = anthropic_tools + logger.debug(f"[LLM-Debug] AnthropicClient payload messages for model {self.model}: {json.dumps(anthropic_messages, indent=2, ensure_ascii=False)}") payload.update(kwargs) return payload diff --git a/backend/app/services/sandbox/config.py b/backend/app/services/sandbox/config.py index 5d1930b39..23efba49e 100644 --- a/backend/app/services/sandbox/config.py +++ b/backend/app/services/sandbox/config.py @@ -34,8 +34,8 @@ class SandboxConfig(BaseModel): api_url: str = "" # Common options - default_timeout: int = Field(default=30, ge=1, le=300) - max_timeout: int = Field(default=60, ge=1, le=300) + default_timeout: int = Field(default=30, ge=1, le=3600) + max_timeout: int = Field(default=60, ge=1, le=3600) # Language mapping for API sandboxes # Maps our internal language names to API-specific language IDs diff --git a/backend/app/services/sandbox/local/subprocess_backend.py b/backend/app/services/sandbox/local/subprocess_backend.py index 524504c52..73f98b4a2 100644 --- a/backend/app/services/sandbox/local/subprocess_backend.py +++ b/backend/app/services/sandbox/local/subprocess_backend.py @@ -162,7 +162,7 @@ def _preexec(): import resource memory_bytes = int(self.config.memory_limit.rstrip("mM")) * 1024 * 1024 - cpu_limit = max(1, min(timeout, self.config.max_timeout, 60)) + cpu_limit = max(1, min(timeout, self.config.max_timeout)) resource.setrlimit(resource.RLIMIT_CPU, (cpu_limit, cpu_limit)) resource.setrlimit(resource.RLIMIT_AS, (memory_bytes, memory_bytes)) resource.setrlimit(resource.RLIMIT_FSIZE, (10 * 1024 * 1024, 10 * 1024 * 1024)) @@ -276,6 +276,7 @@ async def execute( **kwargs ) -> ExecutionResult: """Execute code in a subprocess.""" + on_output = kwargs.get("on_output") start_time = time.time() # Validate language @@ -366,28 +367,52 @@ async def execute( **kwargs, ) + stdout_data = bytearray() + stderr_data = bytearray() + + async def read_stream(stream, out, label="stdout"): + while True: + chunk = await stream.read(4096) + if not chunk: + break + out.extend(chunk) + # Real-time streaming: push each chunk to the WebSocket + if on_output: + try: + text = chunk.decode("utf-8", errors="replace") + await on_output(text, label) + except Exception: + pass + + task1 = asyncio.create_task(read_stream(proc.stdout, stdout_data, "stdout")) + task2 = asyncio.create_task(read_stream(proc.stderr, stderr_data, "stderr")) + + is_timeout = False try: - stdout, stderr = await asyncio.wait_for( - proc.communicate(), - timeout=timeout - ) + await asyncio.wait_for(proc.wait(), timeout=timeout) except asyncio.TimeoutError: proc.kill() - await proc.communicate() + is_timeout = True + + await asyncio.gather(task1, task2) + stdout = bytes(stdout_data) + stderr = bytes(stderr_data) + + stdout_str = stdout.decode("utf-8", errors="replace")[:10000] if stdout else "" + stderr_str = stderr.decode("utf-8", errors="replace")[:5000] if stderr else "" + + duration_ms = int((time.time() - start_time) * 1000) + + if is_timeout: return ExecutionResult( success=False, - stdout="", - stderr="", + stdout=stdout_str, + stderr=stderr_str, exit_code=124, - duration_ms=int((time.time() - start_time) * 1000), - error=f"Code execution timed out after {timeout}s" + duration_ms=duration_ms, + error=f"Code execution timed out after {timeout}s. If you expect this code to take longer, try calling the tool again with a higher 'timeout' parameter (up to 3600s)." ) - stdout_str = stdout.decode("utf-8", errors="replace")[:10000] - stderr_str = stderr.decode("utf-8", errors="replace")[:5000] - - duration_ms = int((time.time() - start_time) * 1000) - return ExecutionResult( success=proc.returncode == 0, stdout=stdout_str, diff --git a/backend/app/services/skill_seeder.py b/backend/app/services/skill_seeder.py index b3c1b2612..76b64113a 100644 --- a/backend/app/services/skill_seeder.py +++ b/backend/app/services/skill_seeder.py @@ -847,6 +847,16 @@ }, ], }, + # ─── Agent API Calling (default — enables cross-agent delegation via code) ─── + { + "name": "Agent API Calling", + "description": "通过代码调用平台 API 与其他 Agent 协作。包含获取目标 Agent ID、认证方式、Python 代码示例和错误处理。", + "category": "development", + "icon": "🔗", + "folder_name": "agent-api-calling", + "is_default": True, + "files": [], # populated at runtime from agent_template/skills/agent-api-calling/SKILL.md + }, ] @@ -873,6 +883,12 @@ async def seed_skills(): s["files"] = [{"path": "SKILL.md", "content": mcp_file.read_text(encoding="utf-8")}] else: logger.warning("[SkillSeeder] mcp-installer/SKILL.md not found in agent_template/skills/") + elif s["folder_name"] == "agent-api-calling" and not s["files"]: + api_file = _template_skills_dir / "agent-api-calling" / "SKILL.md" + if api_file.exists(): + s["files"] = [{"path": "SKILL.md", "content": api_file.read_text(encoding="utf-8")}] + else: + logger.warning("[SkillSeeder] agent-api-calling/SKILL.md not found in agent_template/skills/") async with async_session() as db: for skill_data in BUILTIN_SKILLS: diff --git a/backend/app/services/tool_seeder.py b/backend/app/services/tool_seeder.py index 909d72949..963d35948 100644 --- a/backend/app/services/tool_seeder.py +++ b/backend/app/services/tool_seeder.py @@ -16,6 +16,36 @@ "jina_search", "jina_read", "update_objective", + # AgentBay tools should NOT be is_default=True. Older seeder versions may + # have set them to True; include them here so the seeder corrects the DB. + "agentbay_browser_navigate", + "agentbay_browser_screenshot", + "agentbay_browser_save_screenshot", + "agentbay_browser_click", + "agentbay_browser_type", + "agentbay_browser_extract", + "agentbay_browser_observe", + "agentbay_browser_login", + "agentbay_code_execute", + "agentbay_code_write_file", + "agentbay_code_read_file", + "agentbay_code_edit_file", + "agentbay_command_exec", + "agentbay_computer_screenshot", + "agentbay_computer_save_screenshot", + "agentbay_computer_click", + "agentbay_computer_precision_screenshot", + "agentbay_computer_input_text", + "agentbay_computer_press_keys", + "agentbay_computer_scroll", + "agentbay_computer_move_mouse", + "agentbay_computer_drag_mouse", + "agentbay_computer_get_installed_apps", + "agentbay_computer_start_app", + "agentbay_computer_list_windows", + "agentbay_computer_close_window", + "agentbay_computer_dismiss_dialog", + "agentbay_file_transfer", } LEGACY_IMAGE_TOOL_MODEL_DEFAULTS = { @@ -931,7 +961,7 @@ def _global_builtin_config(tool_data: dict) -> dict: "type": "number", "default": 30, "min": 5, - "max": 300, + "max": 3600, }, { "key": "max_timeout", @@ -939,7 +969,7 @@ def _global_builtin_config(tool_data: dict) -> dict: "type": "number", "default": 60, "min": 10, - "max": 300, + "max": 3600, }, ] }, @@ -982,7 +1012,7 @@ def _global_builtin_config(tool_data: dict) -> dict: "type": "number", "default": 30, "min": 5, - "max": 300, + "max": 3600, }, { "key": "max_timeout", @@ -990,7 +1020,7 @@ def _global_builtin_config(tool_data: dict) -> dict: "type": "number", "default": 60, "min": 10, - "max": 300, + "max": 3600, }, ] }, diff --git a/backend/tests/test_agent_api.py b/backend/tests/test_agent_api.py new file mode 100644 index 000000000..e49971e65 --- /dev/null +++ b/backend/tests/test_agent_api.py @@ -0,0 +1,593 @@ +"""Unit tests for the Agent API calling feature (app/api/agent_api.py). + +Tests cover: +- Token key generation format and uniqueness +- Bearer token parsing and authentication +- Relationship enforcement between caller and target agents +- Target agent not found / expired / no model configured +- Self-calling bypass (no relationship needed) +- Successful LLM invocation end-to-end +- Token Key management endpoints (get / regenerate) +""" + +import uuid +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import HTTPException + +from app.api import agent_api +from app.api.agent_api import ( + generate_token_key, + _get_caller_agent, + _check_relationship, + agent_api_chat, + get_token_key, + regenerate_token_key, +) + + +# --------------------------------------------------------------------------- +# Helpers / fakes +# --------------------------------------------------------------------------- + +class DummyResult: + """Mimics SQLAlchemy async result for execute() calls.""" + + def __init__(self, values=None): + self._values = list(values or []) + + def scalar_one_or_none(self): + return self._values[0] if self._values else None + + def scalars(self): + return self + + def all(self): + return list(self._values) + + +class RecordingDB: + """Minimal fake async DB session that returns pre-configured results.""" + + def __init__(self, responses=None): + self.responses = list(responses or []) + self.added = [] + self.committed = False + + async def execute(self, _statement, _params=None): + if not self.responses: + return DummyResult() + return self.responses.pop(0) + + def add(self, value): + self.added.append(value) + + async def commit(self): + self.committed = True + + async def refresh(self, value): + pass + + async def flush(self): + pass + + +def _make_agent(*, name="TestBot", token_key=None, token_key_suffix=None, + primary_model_id=None, fallback_model_id=None, + is_expired=False, creator_id=None, role_description="helper", + status="idle", agent_type="native"): + """Create a fake Agent-like object.""" + agent_id = uuid.uuid4() + return SimpleNamespace( + id=agent_id, + name=name, + role_description=role_description, + token_key=token_key, + token_key_suffix=token_key_suffix, + primary_model_id=primary_model_id, + fallback_model_id=fallback_model_id, + is_expired=is_expired, + creator_id=creator_id or uuid.uuid4(), + status=status, + agent_type=agent_type, + tenant_id=uuid.uuid4(), + ) + + +def _make_model(*, enabled=True, model_name="gpt-4"): + """Create a fake LLMModel-like object.""" + return SimpleNamespace( + id=uuid.uuid4(), + model=model_name, + enabled=enabled, + ) + + +def _make_relationship(agent_id, target_agent_id): + """Create a fake AgentAgentRelationship-like object.""" + return SimpleNamespace( + id=uuid.uuid4(), + agent_id=agent_id, + target_agent_id=target_agent_id, + relation="collaborator", + ) + + +def _make_chat_request(agent_id, prompt="Hello"): + """Create a fake AgentApiChatRequest-like object.""" + return SimpleNamespace( + agent_id=agent_id, + prompt=prompt, + ) + + +# --------------------------------------------------------------------------- +# Token key generation tests +# --------------------------------------------------------------------------- + + +class TestGenerateTokenKey: + def test_format(self): + """Token key must start with 'clw_' and be 36 chars total.""" + key, suffix = generate_token_key() + assert key.startswith("clw_") + assert len(key) == 4 + 32 # "clw_" + 32 hex chars + assert suffix == key[-4:] + + def test_uniqueness(self): + """Two calls should produce different keys.""" + key1, _ = generate_token_key() + key2, _ = generate_token_key() + assert key1 != key2 + + def test_suffix_matches_last_four(self): + """Suffix must be exactly the last 4 characters of the full key.""" + for _ in range(10): + key, suffix = generate_token_key() + assert suffix == key[-4:] + + +# --------------------------------------------------------------------------- +# _get_caller_agent tests +# --------------------------------------------------------------------------- + + +class TestGetCallerAgent: + @pytest.mark.asyncio + async def test_valid_token(self): + """Valid token key returns the matching agent.""" + agent = _make_agent(token_key="clw_abc123") + db = RecordingDB(responses=[DummyResult(values=[agent])]) + result = await _get_caller_agent("clw_abc123", db) + assert result.id == agent.id + + @pytest.mark.asyncio + async def test_invalid_token(self): + """Invalid token key raises 401.""" + db = RecordingDB(responses=[DummyResult()]) + with pytest.raises(HTTPException) as exc: + await _get_caller_agent("clw_nonexistent", db) + assert exc.value.status_code == 401 + assert "Invalid token key" in exc.value.detail + + +# --------------------------------------------------------------------------- +# _check_relationship tests +# --------------------------------------------------------------------------- + + +class TestCheckRelationship: + @pytest.mark.asyncio + async def test_has_relationship(self): + """Returns True when relationship exists.""" + caller_id = uuid.uuid4() + target_id = uuid.uuid4() + rel = _make_relationship(caller_id, target_id) + db = RecordingDB(responses=[DummyResult(values=[rel])]) + assert await _check_relationship(db, caller_id, target_id) is True + + @pytest.mark.asyncio + async def test_no_relationship(self): + """Returns False when no relationship exists.""" + db = RecordingDB(responses=[DummyResult()]) + assert await _check_relationship(db, uuid.uuid4(), uuid.uuid4()) is False + + +# --------------------------------------------------------------------------- +# agent_api_chat endpoint tests +# --------------------------------------------------------------------------- + + +class TestAgentApiChat: + @pytest.mark.asyncio + async def test_missing_bearer_prefix(self): + """Non-Bearer auth header returns 401.""" + body = _make_chat_request(uuid.uuid4()) + with pytest.raises(HTTPException) as exc: + await agent_api_chat(body, authorization="Basic abc123") + assert exc.value.status_code == 401 + assert "Bearer" in exc.value.detail + + @pytest.mark.asyncio + async def test_empty_token_key(self): + """Empty token key after Bearer returns 401.""" + body = _make_chat_request(uuid.uuid4()) + with pytest.raises(HTTPException) as exc: + await agent_api_chat(body, authorization="Bearer ") + assert exc.value.status_code == 401 + assert "required" in exc.value.detail.lower() + + @pytest.mark.asyncio + async def test_invalid_token_key(self): + """Invalid token key returns 401.""" + body = _make_chat_request(uuid.uuid4()) + db = RecordingDB(responses=[DummyResult()]) # no agent found + with pytest.raises(HTTPException) as exc: + await agent_api_chat(body, authorization="Bearer clw_bad", db=db) + assert exc.value.status_code == 401 + + @pytest.mark.asyncio + async def test_target_not_found(self): + """Non-existent target agent returns 404.""" + caller = _make_agent(name="Caller", token_key="clw_callerkey") + db = RecordingDB(responses=[ + DummyResult(values=[caller]), # caller lookup + DummyResult(), # target lookup: not found + ]) + body = _make_chat_request(uuid.uuid4(), prompt="test") + with pytest.raises(HTTPException) as exc: + await agent_api_chat(body, authorization="Bearer clw_callerkey", db=db) + assert exc.value.status_code == 404 + assert "not found" in exc.value.detail.lower() + + @pytest.mark.asyncio + async def test_no_relationship_returns_403(self): + """Calling an agent without relationship returns 403.""" + caller = _make_agent(name="Caller", token_key="clw_callerkey") + target = _make_agent(name="Target") + db = RecordingDB(responses=[ + DummyResult(values=[caller]), # caller lookup + DummyResult(values=[target]), # target lookup + DummyResult(), # relationship check: not found + ]) + body = _make_chat_request(target.id, prompt="test") + with pytest.raises(HTTPException) as exc: + await agent_api_chat(body, authorization="Bearer clw_callerkey", db=db) + assert exc.value.status_code == 403 + assert "no relationship" in exc.value.detail.lower() + + @pytest.mark.asyncio + async def test_expired_target_returns_403(self): + """Calling an expired agent returns 403.""" + caller = _make_agent(name="Caller", token_key="clw_callerkey") + target = _make_agent(name="Target", is_expired=True) + rel = _make_relationship(caller.id, target.id) + db = RecordingDB(responses=[ + DummyResult(values=[caller]), # caller lookup + DummyResult(values=[target]), # target lookup + DummyResult(values=[rel]), # relationship check + ]) + body = _make_chat_request(target.id, prompt="test") + with patch("app.core.permissions.is_agent_expired", return_value=True): + with pytest.raises(HTTPException) as exc: + await agent_api_chat(body, authorization="Bearer clw_callerkey", db=db) + assert exc.value.status_code == 403 + assert "expired" in exc.value.detail.lower() + + @pytest.mark.asyncio + async def test_no_model_configured_returns_400(self): + """Target agent with no configured model returns 400.""" + caller = _make_agent(name="Caller", token_key="clw_callerkey") + target = _make_agent(name="Target", primary_model_id=None) + rel = _make_relationship(caller.id, target.id) + db = RecordingDB(responses=[ + DummyResult(values=[caller]), # caller lookup + DummyResult(values=[target]), # target lookup + DummyResult(values=[rel]), # relationship check + ]) + body = _make_chat_request(target.id, prompt="test") + with patch("app.core.permissions.is_agent_expired", return_value=False): + with pytest.raises(HTTPException) as exc: + await agent_api_chat(body, authorization="Bearer clw_callerkey", db=db) + assert exc.value.status_code == 400 + assert "no llm model" in exc.value.detail.lower() + + @pytest.mark.asyncio + async def test_self_call_skips_relationship(self): + """Calling yourself should skip the relationship check.""" + model_id = uuid.uuid4() + caller = _make_agent( + name="SelfBot", token_key="clw_selfkey", + primary_model_id=model_id, + ) + model = _make_model() + db = RecordingDB(responses=[ + DummyResult(values=[caller]), # caller lookup + DummyResult(values=[caller]), # target lookup (same agent) + # No relationship query — self-call skips it + DummyResult(values=[model]), # primary model lookup + ]) + body = _make_chat_request(caller.id, prompt="talk to yourself") + + with patch("app.core.permissions.is_agent_expired", return_value=False): + with patch("app.services.llm.call_llm", new_callable=AsyncMock, return_value="Self-reply"): + with patch("app.services.activity_logger.log_activity", new_callable=AsyncMock): + result = await agent_api_chat( + body, authorization="Bearer clw_selfkey", db=db, + ) + assert result.reply == "Self-reply" + + @pytest.mark.asyncio + async def test_successful_call_with_relationship(self): + """Full successful call flow: auth → relationship → LLM → response.""" + model_id = uuid.uuid4() + caller = _make_agent(name="Caller", token_key="clw_callerkey") + target = _make_agent(name="Target", primary_model_id=model_id) + rel = _make_relationship(caller.id, target.id) + model = _make_model() + db = RecordingDB(responses=[ + DummyResult(values=[caller]), # caller lookup + DummyResult(values=[target]), # target lookup + DummyResult(values=[rel]), # relationship check + DummyResult(values=[model]), # primary model lookup + ]) + body = _make_chat_request(target.id, prompt="What is 1+1?") + + with patch("app.core.permissions.is_agent_expired", return_value=False): + with patch("app.services.llm.call_llm", new_callable=AsyncMock, return_value="The answer is 2."): + with patch("app.services.activity_logger.log_activity", new_callable=AsyncMock) as mock_log: + result = await agent_api_chat( + body, authorization="Bearer clw_callerkey", db=db, + ) + + assert result.reply == "The answer is 2." + # Activity logs: one for the target (api_call) and one for the caller (api_call_out) + assert mock_log.call_count == 2 + call_args = [c.args for c in mock_log.call_args_list] + action_types = [a[1] for a in call_args] + assert "api_call" in action_types + assert "api_call_out" in action_types + + @pytest.mark.asyncio + async def test_llm_error_returns_502(self): + """LLM call failure returns 502.""" + model_id = uuid.uuid4() + caller = _make_agent(name="Caller", token_key="clw_callerkey") + target = _make_agent(name="Target", primary_model_id=model_id) + rel = _make_relationship(caller.id, target.id) + model = _make_model() + db = RecordingDB(responses=[ + DummyResult(values=[caller]), + DummyResult(values=[target]), + DummyResult(values=[rel]), + DummyResult(values=[model]), + ]) + body = _make_chat_request(target.id, prompt="error test") + + with patch("app.core.permissions.is_agent_expired", return_value=False): + with patch("app.services.llm.call_llm", new_callable=AsyncMock, side_effect=RuntimeError("Model crashed")): + with pytest.raises(HTTPException) as exc: + await agent_api_chat(body, authorization="Bearer clw_callerkey", db=db) + assert exc.value.status_code == 502 + assert "LLM call failed" in exc.value.detail + + @pytest.mark.asyncio + async def test_fallback_model_used_when_primary_disabled(self): + """When primary model is disabled, fallback is promoted.""" + primary_model_id = uuid.uuid4() + fallback_model_id = uuid.uuid4() + caller = _make_agent(name="Caller", token_key="clw_callerkey") + target = _make_agent( + name="Target", + primary_model_id=primary_model_id, + fallback_model_id=fallback_model_id, + ) + rel = _make_relationship(caller.id, target.id) + disabled_model = _make_model(enabled=False, model_name="disabled-model") + fallback_model = _make_model(enabled=True, model_name="fallback-model") + db = RecordingDB(responses=[ + DummyResult(values=[caller]), + DummyResult(values=[target]), + DummyResult(values=[rel]), + DummyResult(values=[disabled_model]), # primary model: disabled + DummyResult(values=[fallback_model]), # fallback model: enabled + ]) + body = _make_chat_request(target.id, prompt="test fallback") + + captured_model = {} + + async def fake_call_llm(*args, **kwargs): + # Extract model from positional or keyword args + model = kwargs.get('model') or (args[0] if args else None) + if model: + captured_model["model"] = model.model + return "Fallback used" + + with patch("app.core.permissions.is_agent_expired", return_value=False): + with patch("app.services.llm.call_llm", side_effect=fake_call_llm): + with patch("app.services.activity_logger.log_activity", new_callable=AsyncMock): + result = await agent_api_chat( + body, authorization="Bearer clw_callerkey", db=db, + ) + assert result.reply == "Fallback used" + assert captured_model["model"] == "fallback-model" + + +# --------------------------------------------------------------------------- +# get_token_key endpoint tests +# --------------------------------------------------------------------------- + + +class TestGetTokenKey: + @pytest.mark.asyncio + async def test_missing_bearer_prefix(self): + """Non-Bearer auth returns 401.""" + with pytest.raises(HTTPException) as exc: + await get_token_key(uuid.uuid4(), authorization="Basic xyz") + assert exc.value.status_code == 401 + + @pytest.mark.asyncio + async def test_returns_existing_key(self): + """Returns existing token_key when present.""" + agent_id = uuid.uuid4() + user_id = uuid.uuid4() + agent = _make_agent(token_key="clw_existing1234", token_key_suffix="1234") + agent.id = agent_id + user = SimpleNamespace(id=user_id) + + db = RecordingDB(responses=[ + DummyResult(values=[user]), # user lookup + ]) + + with patch("app.core.security.decode_access_token", return_value={"sub": str(user_id)}): + with patch("app.core.permissions.check_agent_access", new_callable=AsyncMock, return_value=(agent, "manage")): + result = await get_token_key(agent_id, authorization="Bearer jwt.token.here", db=db) + + assert result["token_key"] == "clw_existing1234" + assert result["token_key_suffix"] == "1234" + + @pytest.mark.asyncio + async def test_generates_key_on_demand(self): + """Generates a new key when agent has no token_key.""" + agent_id = uuid.uuid4() + user_id = uuid.uuid4() + agent = _make_agent(token_key=None, token_key_suffix=None) + agent.id = agent_id + user = SimpleNamespace(id=user_id) + + db = RecordingDB(responses=[ + DummyResult(values=[user]), + ]) + + with patch("app.core.security.decode_access_token", return_value={"sub": str(user_id)}): + with patch("app.core.permissions.check_agent_access", new_callable=AsyncMock, return_value=(agent, "manage")): + result = await get_token_key(agent_id, authorization="Bearer jwt.token.here", db=db) + + assert result["token_key"].startswith("clw_") + assert result["token_key_suffix"] == result["token_key"][-4:] + assert db.committed is True # key was persisted + + @pytest.mark.asyncio + async def test_non_manage_access_returns_403(self): + """Use-only access should be denied.""" + agent_id = uuid.uuid4() + user_id = uuid.uuid4() + agent = _make_agent(token_key="clw_existing") + user = SimpleNamespace(id=user_id) + + db = RecordingDB(responses=[ + DummyResult(values=[user]), + ]) + + with patch("app.core.security.decode_access_token", return_value={"sub": str(user_id)}): + with patch("app.core.permissions.check_agent_access", new_callable=AsyncMock, return_value=(agent, "use")): + with pytest.raises(HTTPException) as exc: + await get_token_key(agent_id, authorization="Bearer jwt.token.here", db=db) + assert exc.value.status_code == 403 + + +# --------------------------------------------------------------------------- +# regenerate_token_key endpoint tests +# --------------------------------------------------------------------------- + + +class TestRegenerateTokenKey: + @pytest.mark.asyncio + async def test_regenerate_produces_new_key(self): + """Regenerating creates a fresh key and returns it.""" + agent_id = uuid.uuid4() + user_id = uuid.uuid4() + old_key = "clw_oldkey1234567890abcdef12345678" + agent = _make_agent(token_key=old_key, token_key_suffix=old_key[-4:]) + agent.id = agent_id + user = SimpleNamespace(id=user_id) + + db = RecordingDB(responses=[ + DummyResult(values=[user]), + ]) + + with patch("app.core.security.decode_access_token", return_value={"sub": str(user_id)}): + with patch("app.core.permissions.check_agent_access", new_callable=AsyncMock, return_value=(agent, "manage")): + result = await regenerate_token_key(agent_id, authorization="Bearer jwt.token.here", db=db) + + assert result["token_key"].startswith("clw_") + assert result["token_key"] != old_key + assert result["token_key_suffix"] == result["token_key"][-4:] + assert db.committed is True + # Agent object should be updated + assert agent.token_key == result["token_key"] + assert agent.token_key_suffix == result["token_key_suffix"] + + @pytest.mark.asyncio + async def test_regenerate_non_manage_returns_403(self): + """Use-only access on regenerate should be denied.""" + agent_id = uuid.uuid4() + user_id = uuid.uuid4() + agent = _make_agent(token_key="clw_key") + user = SimpleNamespace(id=user_id) + + db = RecordingDB(responses=[ + DummyResult(values=[user]), + ]) + + with patch("app.core.security.decode_access_token", return_value={"sub": str(user_id)}): + with patch("app.core.permissions.check_agent_access", new_callable=AsyncMock, return_value=(agent, "use")): + with pytest.raises(HTTPException) as exc: + await regenerate_token_key(agent_id, authorization="Bearer jwt.token.here", db=db) + assert exc.value.status_code == 403 + + @pytest.mark.asyncio + async def test_regenerate_missing_bearer(self): + """Missing Bearer prefix returns 401.""" + with pytest.raises(HTTPException) as exc: + await regenerate_token_key(uuid.uuid4(), authorization="Token xyz") + assert exc.value.status_code == 401 + + +# --------------------------------------------------------------------------- +# Integration-style: agent creation with token key +# --------------------------------------------------------------------------- + + +class TestAgentCreationTokenKey: + def test_generate_token_key_imported_by_agents(self): + """Ensure generate_token_key is importable from agent_api module.""" + from app.api.agent_api import generate_token_key as gk + key, suffix = gk() + assert key.startswith("clw_") + assert len(suffix) == 4 + + +# --------------------------------------------------------------------------- +# Schema validation tests +# --------------------------------------------------------------------------- + + +class TestSchemas: + def test_agent_api_chat_request_schema(self): + """AgentApiChatRequest should accept valid data.""" + from app.schemas.schemas import AgentApiChatRequest + req = AgentApiChatRequest(agent_id=uuid.uuid4(), prompt="Hello world") + assert req.prompt == "Hello world" + + def test_agent_api_chat_request_empty_prompt_rejected(self): + """AgentApiChatRequest should reject empty prompt.""" + from app.schemas.schemas import AgentApiChatRequest + from pydantic import ValidationError + with pytest.raises(ValidationError): + AgentApiChatRequest(agent_id=uuid.uuid4(), prompt="") + + def test_agent_api_chat_response_schema(self): + """AgentApiChatResponse should serialize properly.""" + from app.schemas.schemas import AgentApiChatResponse + resp = AgentApiChatResponse(reply="Hello!", usage={"total_tokens": 100}) + assert resp.reply == "Hello!" + assert resp.usage["total_tokens"] == 100 + + def test_agent_out_has_token_key_suffix(self): + """AgentOut schema should include token_key_suffix field.""" + from app.schemas.schemas import AgentOut + fields = AgentOut.model_fields + assert "token_key_suffix" in fields diff --git a/frontend/src/components/AgentBayLivePanel.tsx b/frontend/src/components/AgentBayLivePanel.tsx index d1f3520de..643c1dd51 100644 --- a/frontend/src/components/AgentBayLivePanel.tsx +++ b/frontend/src/components/AgentBayLivePanel.tsx @@ -26,6 +26,8 @@ interface Props { sessionId?: string; // needed for Take Control /** Called by TC panel on close to push the latest screenshot into liveState */ onLiveUpdate?: (env: 'browser' | 'desktop', screenshotDataUri: string) => void; + /** Called when user clicks Clear in the code output panel */ + onClearCode?: () => void; } /* ── Tab Icons (Linear-style minimal SVGs) ── */ @@ -85,7 +87,7 @@ function calcHalfContainerWidth(): number { return Math.max(MIN_WIDTH, Math.floor((window.innerWidth - 60) / 2)); } -export default function AgentBayLivePanel({ liveState, visible, onToggle, agentId, sessionId, onLiveUpdate }: Props) { +export default function AgentBayLivePanel({ liveState, visible, onToggle, agentId, sessionId, onLiveUpdate, onClearCode }: Props) { const { t } = useTranslation(); // Keep a ref to the latest onLiveUpdate so TakeControl callbacks always @@ -261,6 +263,19 @@ export default function AgentBayLivePanel({ liveState, visible, onToggle, agentI Control )} + {/* Clear button for code output */} + {activeTab === 'code' && liveState.code && onClearCode && ( + + )} diff --git a/frontend/src/pages/AgentDetail.tsx b/frontend/src/pages/AgentDetail.tsx index 6c15259cc..ef13826d9 100644 --- a/frontend/src/pages/AgentDetail.tsx +++ b/frontend/src/pages/AgentDetail.tsx @@ -51,6 +51,8 @@ import { IconWorld, IconBolt, IconAlertTriangle, + IconKey, + IconRefresh, } from '@tabler/icons-react'; import { useDropZone } from '../hooks/useDropZone'; @@ -4140,6 +4142,29 @@ function AgentDetailInner() { setChatInfoMsg(d.content || ''); if (chatInfoTimerRef.current) clearTimeout(chatInfoTimerRef.current); chatInfoTimerRef.current = setTimeout(() => setChatInfoMsg(null), 6000); + } else if (d.type === 'agentbay_live') { + // Real-time streaming from execute_code or other AgentBay envs + if ((d.env === 'desktop' || d.env === 'browser') && d.screenshot_url) { + setLiveState(prev => ({ + ...prev, + [d.env]: { screenshotUrl: d.screenshot_url }, + })); + if (allowLivePanelAutoFocus()) { + setSidePanelTab(d.env === 'desktop' ? 'desktop' : 'browser'); + setLivePanelVisible(true); + collapseSidebarsForLivePanel(); + } + } else if (d.env === 'code' && d.output) { + setLiveState(prev => ({ + ...prev, + code: { output: (prev.code?.output || '') + d.output }, + })); + if (allowLivePanelAutoFocus()) { + setSidePanelTab('code'); + setLivePanelVisible(true); + collapseSidebarsForLivePanel(); + } + } } else { setChatMessages(prev => [...prev, parseChatMsg({ role: d.role, content: d.content })]); } @@ -8108,6 +8133,106 @@ function AgentDetailInner() { + {/* API Token Key — for Agent API calling */} + {(() => { + const TokenKeyCard = () => { + const isChinese = i18n.language?.startsWith('zh'); + const tokenSuffix = (agent as any)?.token_key_suffix; + const [tokenKeyRegenConfirm, setTokenKeyRegenConfirm] = useState(false); + const [tokenKeyRegening, setTokenKeyRegening] = useState(false); + const [tokenKeyGenerated, setTokenKeyGenerated] = useState(false); + + const generateOrRegenTokenKey = async () => { + setTokenKeyRegening(true); + try { + await fetchAuth<{ token_key_suffix: string }>( + `/v1/agent/regenerate-token-key/${id}`, + { method: 'POST' }, + ); + queryClient.invalidateQueries({ queryKey: ['agent', id] }); + setTokenKeyGenerated(true); + setTimeout(() => setTokenKeyGenerated(false), 2000); + } catch (e: any) { + alert(e?.message || (isChinese ? '操作失败' : 'Operation failed')); + } finally { + setTokenKeyRegening(false); + setTokenKeyRegenConfirm(false); + } + }; + + return ( +
+

+ + {isChinese ? 'API Token Key' : 'API Token Key'} +

+

+ {isChinese + ? 'Agent 通过 System Prompt 自动获得此 Key,用于调用平台 API 与其他 Agent 协作。Key 不可查看,仅 Agent 自身可使用。' + : 'The agent receives this key automatically via System Prompt for cross-agent API calls. The key is not viewable — only the agent itself can use it.'} +

+
+ + {tokenSuffix + ? `clw_••••••••••••${tokenSuffix}` + : (isChinese ? '未生成' : 'Not generated')} + + {tokenKeyGenerated && ( + + ✓ {isChinese ? '已生成' : 'Generated'} + + )} + {tokenSuffix && !tokenKeyRegenConfirm && ( + + )} + {tokenSuffix && tokenKeyRegenConfirm && ( + + )} + {!tokenSuffix && ( + + )} +
+
+ ); + }; + return ; + })()} + {/* Welcome Message */} {(() => { const isChinese = i18n.language?.startsWith('zh'); diff --git a/frontend/src/pages/Chat.tsx b/frontend/src/pages/Chat.tsx index d3ceb0dd8..38caddcff 100644 --- a/frontend/src/pages/Chat.tsx +++ b/frontend/src/pages/Chat.tsx @@ -520,14 +520,18 @@ export default function Chat() { if (data.env === 'desktop') next.desktop = { screenshotUrl: imgUrl }; else next.browser = { screenshotUrl: imgUrl }; } else if (data.env === 'code' && data.output) { - // Append code output + // Real-time streaming: concatenate chunks directly const existing = prev.code?.output || ''; - next.code = { output: existing + (existing ? '\n---\n' : '') + data.output }; + const prefix = data.stream === 'stderr' ? '⚠️ ' : ''; + next.code = { output: existing + prefix + data.output }; } return next; }); - // Auto-expand the live panel on first data - setLivePanelVisible(true); + // Auto-expand the live panel on first data arrival + // (for desktop/browser screenshots, or the first code chunk only) + if (data.env !== 'code' || !liveState.code) { + setLivePanelVisible(true); + } return; } @@ -629,10 +633,8 @@ export default function Chat() { const imgUrl = lp.screenshot_url + '&_t=' + Date.now(); if (lp.env === 'desktop') next.desktop = { screenshotUrl: imgUrl }; else next.browser = { screenshotUrl: imgUrl }; - } else if (lp.env === 'code' && lp.output) { - const existing = prev.code?.output || ''; - next.code = { output: existing + (existing ? '\n---\n' : '') + lp.output }; } + // Note: code env is handled via real-time streaming (agentbay_live events) return next; }); setLivePanelVisible(true); @@ -1102,6 +1104,13 @@ export default function Chat() { [env]: { screenshotUrl: screenshotDataUri }, })); }} + onClearCode={() => { + setLiveState(prev => { + const next = { ...prev }; + delete next.code; + return next; + }); + }} /> )}