From c97f9013447dd4f235577f4994da248fe043cb3c Mon Sep 17 00:00:00 2001 From: Aneesh Date: Wed, 5 Nov 2025 20:57:34 -0600 Subject: [PATCH] feat: migrate from openai to google adk multi-agent system - Implemented hierarchical multi-agent architecture using Google ADK - Created CoordinatorAgent, MathAgent, TextAgent, and InfoAgent - Added specialized tools for each agent type - Updated streaming response handling for ADK integration - Added comprehensive documentation and migration guide feat: migrate from openai to google adk multi-agent system - Replace OpenAI integration with Google ADK multi-agent architecture - Add specialized agents: MathAgent, TextAgent, InfoAgent, and CoordinatorAgent - Implement custom tools for calculations, text processing, and information retrieval - Add real-time status updates showing agent activity and tool usage - Update streaming implementation to work with ADK event system - Add comprehensive documentation including migration guide and status implementation guide BREAKING CHANGE: Requires Google ADK authentication instead of OpenAI API key. Update environment variables to use GOOGLE_APPLICATION_CREDENTIALS or GOOGLE_API_KEY. chore(git): ignore .vscode directory and update assistant status text Updated .gitignore to exclude .vscode settings directory. Also refined the assistant's loading status message from "is working..." to "is thinking..." to better reflect the AI processing state and added clarifying comments about the UI elements being updated. --- .gitignore | 1 + CLAUDE.md | 137 +++++ ai/agents.py | 93 +++ ai/llm_caller.py | 160 ++++- ai/tools.py | 153 +++++ docs/MIGRATION_GUIDE.md | 217 +++++++ docs/SLACK_AGENT_STATUS.md | 961 ++++++++++++++++++++++++++++++ listeners/assistant/message.py | 48 +- listeners/events/app_mentioned.py | 48 +- requirements.txt | 8 +- 10 files changed, 1768 insertions(+), 58 deletions(-) create mode 100644 CLAUDE.md create mode 100644 ai/agents.py create mode 100644 ai/tools.py create mode 100644 docs/MIGRATION_GUIDE.md create mode 100644 docs/SLACK_AGENT_STATUS.md diff --git a/.gitignore b/.gitignore index a99f209..b500659 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ logs/ *.db .pytype/ .idea/ +.vscode/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..3861873 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,137 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Overview + +This is a Slack AI Agent App built with Bolt for Python, implementing a multi-agent system using Google ADK (Agent Development Kit) with Gemini models. The app provides AI-powered assistance through Slack's Assistant interface with streaming responses. + +## Development Commands + +### Environment Setup + +```sh +# Create and activate virtual environment +python3 -m venv .venv +source .venv/bin/activate # Windows: .\.venv\Scripts\Activate + +# Install dependencies +pip install -r requirements.txt +``` + +### Running the App + +```sh +# Using Slack CLI (recommended) +slack run + +# Or directly with Python +python3 app.py +``` + +### Code Quality + +```sh +# Lint code +ruff check + +# Format code +ruff format +``` + +### Testing + +```sh +# Run all tests +pytest + +# Run specific test file +pytest tests/test_file.py + +# Run with verbose output +pytest -v +``` + +## Architecture + +### Multi-Agent System + +The application uses Google ADK to implement a hierarchical multi-agent architecture: + +- **CoordinatorAgent** (`ai/agents.py:55-78`): Root agent that analyzes user requests and delegates to specialized agents +- **MathAgent** (`ai/agents.py:20-28`): Handles mathematical calculations and numerical operations +- **TextAgent** (`ai/agents.py:31-40`): Processes text formatting, word counting, and list creation +- **InfoAgent** (`ai/agents.py:43-52`): Provides current time, help information, and general queries + +Each agent uses the `gemini-2.0-flash` model and has access to specific tools defined in `ai/tools.py`. + +### Request Flow + +1. **Entry Point** (`app.py`): Thin entry point that initializes the Bolt app and registers listeners +2. **Listener Registration** (`listeners/__init__.py`): Routes incoming Slack events to appropriate handlers +3. **Event Handlers**: + - `listeners/assistant/message.py`: Handles messages in assistant threads + - `listeners/events/app_mentioned.py`: Handles @mentions of the bot + - `listeners/assistant/assistant_thread_started.py`: Sets up suggested prompts when threads start +4. **LLM Integration** (`ai/llm_caller.py`): Provides `call_llm()` async generator that streams responses from the ADK agent system +5. **Response Streaming**: Uses Slack's `chat_stream` API to stream chunks back to the user in real-time + +### Key Design Patterns + +- **Async Streaming**: All LLM responses are streamed using async generators to provide real-time feedback +- **Event Loop Management**: Creates new event loops in synchronous Bolt handlers to call async ADK functions +- **Session Management**: Uses thread timestamps as session IDs to maintain conversation context +- **Tool-Based Architecture**: Agents use Python functions as tools, automatically exposed to the LLM by ADK + +## Configuration + +### Required Environment Variables + +```sh +SLACK_BOT_TOKEN=xoxb-... # From OAuth & Permissions +SLACK_APP_TOKEN=xapp-... # App-level token with connections:write +``` + +### Google ADK Authentication (one of): + +```sh +# Option 1: Service account (production) +GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json + +# Option 2: API key (development) +GOOGLE_API_KEY=your-api-key + +# Option 3: Use `gcloud auth application-default login` +``` + +## Adding New Agents + +To add a new specialized agent: + +1. Create tools in `ai/tools.py` following the existing pattern (functions returning `Dict[str, Any]`) +2. Define the agent in `ai/agents.py`: + ```python + new_agent = Agent( + name="AgentName", + model="gemini-2.0-flash", + description="When to use this agent", + instruction="Detailed instructions for the agent", + tools=[tool1, tool2], + ) + ``` +3. Add the agent to `coordinator_agent.sub_agents` list +4. Update the coordinator's instruction to explain when to delegate to the new agent + +## Slack-Specific Conventions + +- **Markdown Compatibility**: Agents are instructed to convert markdown to Slack-compatible format +- **Slack Syntax Preservation**: User mentions (`<@USER_ID>`) and channel mentions (`<#CHANNEL_ID>`) must be preserved as-is in responses +- **Status Messages**: Use playful loading messages during "thinking" state (see `listeners/assistant/message.py:40-46`) +- **Feedback Blocks**: Responses include feedback blocks for user interaction (`listeners/views/feedback_block.py`) + +## Project Structure Notes + +- `/listeners`: Organized by Slack Platform feature (events, assistant, actions, views) +- `/ai`: Contains all LLM and agent-related code, separated from Slack-specific logic +- Socket Mode is used for local development (no public URL required) +- For OAuth/distribution, use `app_oauth.py` instead of `app.py` diff --git a/ai/agents.py b/ai/agents.py new file mode 100644 index 0000000..bcc9e01 --- /dev/null +++ b/ai/agents.py @@ -0,0 +1,93 @@ +""" +Multi-agent system using Google ADK. + +This module defines a multi-agent system with specialized agents for different tasks. +""" + +from google.adk.agents import Agent +from google.adk.models.lite_llm import LiteLlm + +from .tools import ( + get_current_time, + calculate, + format_text, + count_words, + create_list, + get_help_info, +) + +# Configure OpenAI model via LiteLLM +# Requires OPENAI_API_KEY environment variable +openai_model = LiteLlm(model="openai/gpt-4o") + + +# Specialized agent for mathematical operations +math_agent = Agent( + name="MathAgent", + model=openai_model, + description="Specializes in mathematical calculations and numerical operations. Use this agent for any math-related queries.", + instruction="""You are a mathematical expert. You can perform calculations, solve equations, + and help with numerical problems. Use the calculate tool to evaluate mathematical expressions. + Always explain your calculations clearly.""", + tools=[calculate], +) + +# Specialized agent for text processing +text_agent = Agent( + name="TextAgent", + model=openai_model, + description="Specializes in text processing, formatting, and analysis. Use this agent for text manipulation tasks.", + instruction="""You are a text processing expert. You can format text, count words, + create lists, and analyze text content. Use the available tools to help users with text-related tasks. + When you include markdown text, convert them to Slack compatible ones. + When a prompt has Slack's special syntax like <@USER_ID> or <#CHANNEL_ID>, you must keep them as-is in your response.""", + tools=[format_text, count_words, create_list], +) + +# Specialized agent for information and utilities +info_agent = Agent( + name="InfoAgent", + model=openai_model, + description="Provides general information, current time, and help about available capabilities.", + instruction="""You are an information assistant. You can provide the current time, + help users understand what tools are available, and answer general questions. + When you include markdown text, convert them to Slack compatible ones. + When a prompt has Slack's special syntax like <@USER_ID> or <#CHANNEL_ID>, you must keep them as-is in your response.""", + tools=[get_current_time, get_help_info], +) + +# Coordinator agent that routes requests to specialized agents +coordinator_agent = Agent( + name="CoordinatorAgent", + model=openai_model, + description="Main coordinator that routes user requests to specialized agents.", + instruction="""You are a helpful assistant coordinator in a Slack workspace. + Users in the workspace will ask you to help them with various tasks. + + You have access to specialized agents: + - MathAgent: For mathematical calculations and numerical operations + - TextAgent: For text processing, formatting, and analysis + - InfoAgent: For general information, current time, and help + + Analyze the user's request and delegate to the appropriate specialized agent: + - For math problems, calculations, or numerical queries -> use MathAgent + - For text formatting, word counting, or list creation -> use TextAgent + - For time queries, help requests, or general information -> use InfoAgent + - For general conversation or questions that don't fit the above -> answer directly + + When you include markdown text, convert them to Slack compatible ones. + When a prompt has Slack's special syntax like <@USER_ID> or <#CHANNEL_ID>, you must keep them as-is in your response. + + Always be professional, helpful, and friendly in your responses.""", + sub_agents=[math_agent, text_agent, info_agent], +) + + +def get_root_agent() -> Agent: + """ + Get the root coordinator agent for the multi-agent system. + + Returns: + Agent: The coordinator agent that routes requests to specialized agents. + """ + return coordinator_agent diff --git a/ai/llm_caller.py b/ai/llm_caller.py index d0a0591..43f5b86 100644 --- a/ai/llm_caller.py +++ b/ai/llm_caller.py @@ -1,27 +1,143 @@ -import os -from typing import Dict, List - -import openai -from openai import Stream -from openai.types.responses import ResponseStreamEvent - -DEFAULT_SYSTEM_CONTENT = """ -You're an assistant in a Slack workspace. -Users in the workspace will ask you to help them write something or to think better about a specific topic. -You'll respond to those questions in a professional way. -When you include markdown text, convert them to Slack compatible ones. -When a prompt has Slack's special syntax like <@USER_ID> or <#CHANNEL_ID>, you must keep them as-is in your response. """ +LLM caller using Google ADK multi-agent system. + +This module provides the interface for calling the ADK multi-agent system +and streaming responses back to Slack. +""" + +import logging +from typing import Dict, List, AsyncGenerator, Literal + +from google.adk.runners import InMemoryRunner +from google.genai import types + +from .agents import get_root_agent + +logger = logging.getLogger(__name__) + + +# Type for the events yielded by call_llm +class LLMEvent(Dict): + """Event yielded during LLM processing.""" + + type: Literal["status", "content"] + text: str + + +# Initialize the ADK runner with the root agent +_runner = None +_app_name = "slack_ai_agent" + + +def get_adk_runner() -> InMemoryRunner: + """Get or create the ADK InMemoryRunner instance.""" + global _runner + if _runner is None: + root_agent = get_root_agent() + _runner = InMemoryRunner(agent=root_agent, app_name=_app_name) + return _runner -def call_llm( +async def call_llm( messages_in_thread: List[Dict[str, str]], - system_content: str = DEFAULT_SYSTEM_CONTENT, -) -> Stream[ResponseStreamEvent]: - openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - messages = [{"role": "system", "content": system_content}] - messages.extend(messages_in_thread) - response = openai_client.responses.create( - model="gpt-4o-mini", input=messages, stream=True + user_id: str = "default_user", + session_id: str = None, +) -> AsyncGenerator[Dict[str, str], None]: + """ + Call the ADK multi-agent system and stream responses with status updates. + + Args: + messages_in_thread: List of message dictionaries with 'role' and 'content' keys + user_id: The Slack user ID + session_id: Optional session ID for maintaining conversation context + + Yields: + Dict[str, str]: Events with 'type' (either 'status' or 'content') and 'text' keys + - status events: Updates about what the agent is doing (tool calls, agent transfers) + - content events: Actual response text to display to the user + """ + runner = get_adk_runner() + + # Create or get session + if session_id is None: + session_id = f"session_{user_id}" + + # Ensure session exists + try: + await runner.session_service.create_session( + app_name=_app_name, user_id=user_id, session_id=session_id + ) + except Exception: + # Session might already exist, which is fine + pass + + # Convert messages to ADK format - only send the last user message + last_message = ( + messages_in_thread[-1] + if messages_in_thread + else {"role": "user", "content": ""} + ) + + # Create the message content + new_message = types.Content( + role="user", parts=[types.Part(text=last_message["content"])] ) - return response + + # Track the current agent for status updates + current_agent = None + has_started_streaming_content = False + + # Use the runner's run_async method for streaming + async for event in runner.run_async( + user_id=user_id, session_id=session_id, new_message=new_message + ): + # Log the full event for debugging + logger.info( + f"ADK Event - Author: {event.author}, Has content: {bool(event.content)}" + ) + logger.info(f"Event attributes: {dir(event)}") + + # Detect agent changes/transfers + if event.author and event.author != "user" and event.author != current_agent: + current_agent = event.author + # Only show agent status before we start streaming actual content + if not has_started_streaming_content: + status_text = f"{current_agent} is working..." + logger.info(f"Yielding status: {status_text}") + yield {"type": "status", "text": status_text} + + # Detect tool calls (function calls) + function_calls = ( + event.get_function_calls() if hasattr(event, "get_function_calls") else [] + ) + logger.info(f"Function calls detected: {len(function_calls)}") + if function_calls and not has_started_streaming_content: + for func_call in function_calls: + tool_name = ( + func_call.name if hasattr(func_call, "name") else str(func_call) + ) + # Make tool names more readable + readable_name = tool_name.replace("_", " ").title() + status_text = f"Using {readable_name}..." + logger.info(f"Yielding tool status: {status_text}") + yield {"type": "status", "text": status_text} + + # Detect agent transfers + if hasattr(event, "actions") and event.actions: + logger.info(f"Event has actions: {event.actions}") + if ( + hasattr(event.actions, "transfer_to_agent") + and event.actions.transfer_to_agent + ): + target_agent = event.actions.transfer_to_agent + status_text = f"Consulting {target_agent}..." + logger.info(f"Yielding transfer status: {status_text}") + yield {"type": "status", "text": status_text} + + # Stream all non-user content events + if event.content and event.content.parts and event.author != "user": + # Extract text from all parts and yield as content + for part in event.content.parts: + if hasattr(part, "text") and part.text: + has_started_streaming_content = True + yield {"type": "content", "text": part.text} diff --git a/ai/tools.py b/ai/tools.py new file mode 100644 index 0000000..1f6e844 --- /dev/null +++ b/ai/tools.py @@ -0,0 +1,153 @@ +""" +Custom tools for ADK agents. + +This module contains various tools that agents can use to perform specific tasks. +""" + +import datetime +from typing import Dict, Any + + +def get_current_time() -> Dict[str, str]: + """ + Get the current date and time. + + Returns: + dict: A dictionary containing the current date and time information. + """ + now = datetime.datetime.now() + return { + "status": "success", + "current_time": now.strftime("%Y-%m-%d %H:%M:%S"), + "day_of_week": now.strftime("%A"), + "timezone": "UTC" + if datetime.datetime.now().tzinfo is None + else str(datetime.datetime.now().tzinfo), + } + + +def calculate(expression: str) -> Dict[str, Any]: + """ + Safely evaluate a mathematical expression. + + Args: + expression (str): A mathematical expression to evaluate (e.g., "2 + 2", "10 * 5"). + + Returns: + dict: A dictionary containing the result or error message. + """ + try: + # Only allow safe mathematical operations + allowed_chars = set("0123456789+-*/(). ") + if not all(c in allowed_chars for c in expression): + return { + "status": "error", + "message": "Invalid characters in expression. Only numbers and basic operators (+, -, *, /, parentheses) are allowed.", + } + + result = eval(expression, {"__builtins__": {}}, {}) + return {"status": "success", "expression": expression, "result": result} + except Exception as e: + return {"status": "error", "message": f"Error evaluating expression: {str(e)}"} + + +def format_text(text: str, format_type: str = "uppercase") -> Dict[str, str]: + """ + Format text in various ways. + + Args: + text (str): The text to format. + format_type (str): The type of formatting to apply. + Options: 'uppercase', 'lowercase', 'title', 'reverse'. + Defaults to 'uppercase'. + + Returns: + dict: A dictionary containing the formatted text. + """ + format_type = format_type.lower() + + if format_type == "uppercase": + formatted = text.upper() + elif format_type == "lowercase": + formatted = text.lower() + elif format_type == "title": + formatted = text.title() + elif format_type == "reverse": + formatted = text[::-1] + else: + return { + "status": "error", + "message": f"Unknown format type: {format_type}. Use 'uppercase', 'lowercase', 'title', or 'reverse'.", + } + + return { + "status": "success", + "original": text, + "formatted": formatted, + "format_type": format_type, + } + + +def count_words(text: str) -> Dict[str, Any]: + """ + Count words, characters, and sentences in a text. + + Args: + text (str): The text to analyze. + + Returns: + dict: A dictionary containing word count, character count, and sentence count. + """ + words = text.split() + sentences = text.count(".") + text.count("!") + text.count("?") + + return { + "status": "success", + "word_count": len(words), + "character_count": len(text), + "character_count_no_spaces": len(text.replace(" ", "")), + "sentence_count": sentences if sentences > 0 else 1, + } + + +def create_list(items: str, separator: str = ",") -> Dict[str, Any]: + """ + Create a formatted list from comma-separated or custom-separated items. + + Args: + items (str): Items separated by a delimiter. + separator (str): The separator used between items. Defaults to comma. + + Returns: + dict: A dictionary containing the formatted list. + """ + item_list = [item.strip() for item in items.split(separator) if item.strip()] + + formatted_list = "\n".join([f"{i + 1}. {item}" for i, item in enumerate(item_list)]) + + return { + "status": "success", + "item_count": len(item_list), + "items": item_list, + "formatted_list": formatted_list, + } + + +def get_help_info() -> Dict[str, str]: + """ + Get information about available tools and capabilities. + + Returns: + dict: A dictionary containing help information. + """ + return { + "status": "success", + "message": "I have access to several tools", + "available_tools": [ + "get_current_time - Get current date and time", + "calculate - Perform mathematical calculations", + "format_text - Format text (uppercase, lowercase, title, reverse)", + "count_words - Count words, characters, and sentences", + "create_list - Create formatted lists from text", + ], + } diff --git a/docs/MIGRATION_GUIDE.md b/docs/MIGRATION_GUIDE.md new file mode 100644 index 0000000..f1bca73 --- /dev/null +++ b/docs/MIGRATION_GUIDE.md @@ -0,0 +1,217 @@ +# Migration from OpenAI to Google ADK + +This document explains the changes made to migrate from OpenAI to Google's Agent Development Kit (ADK) with a multi-agent system. + +## Overview of Changes + +### 1. Dependencies Updated + +**Before:** +``` +openai +``` + +**After:** +``` +google-adk +google-genai +``` + +### 2. Multi-Agent Architecture + +The application now uses a multi-agent system with specialized agents: + +#### **CoordinatorAgent** (Root Agent) +- Routes user requests to appropriate specialized agents +- Handles general conversation +- Manages the overall interaction flow + +#### **MathAgent** +- Specializes in mathematical calculations +- Tools: `calculate()` +- Handles: Math problems, numerical operations, equations + +#### **TextAgent** +- Specializes in text processing and analysis +- Tools: `format_text()`, `count_words()`, `create_list()` +- Handles: Text formatting, word counting, list creation + +#### **InfoAgent** +- Provides general information and utilities +- Tools: `get_current_time()`, `get_help_info()` +- Handles: Time queries, help requests, general information + +### 3. Custom Tools + +New tools have been created in `ai/tools.py`: + +- **get_current_time()**: Returns current date and time +- **calculate()**: Safely evaluates mathematical expressions +- **format_text()**: Formats text (uppercase, lowercase, title, reverse) +- **count_words()**: Counts words, characters, and sentences +- **create_list()**: Creates formatted lists from text +- **get_help_info()**: Provides information about available tools + +### 4. Streaming Implementation + +The streaming mechanism has been updated to work with ADK events: + +**Before (OpenAI):** +```python +for event in returned_message: + if event.type == "response.output_text.delta": + streamer.append(markdown_text=f"{event.delta}") +``` + +**After (ADK):** +```python +async for event in call_llm(messages_in_thread, user_id=user_id, session_id=thread_ts): + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + streamer.append(markdown_text=part.text) +``` + +### 5. Session Management + +ADK uses session-based conversation management: +- Each user/thread combination gets a unique session +- Sessions maintain conversation history automatically +- No need to manually pass full conversation history + +## File Changes + +### New Files +- `ai/agents.py` - Multi-agent system definition +- `ai/tools.py` - Custom tools for agents +- `MIGRATION_GUIDE.md` - This file + +### Modified Files +- `requirements.txt` - Updated dependencies +- `ai/llm_caller.py` - Rewritten to use ADK +- `listeners/assistant/message.py` - Updated for ADK streaming +- `listeners/events/app_mentioned.py` - Updated for ADK streaming +- `.env.sample` - Updated environment variables +- `README.md` - Updated documentation + +## Setup Instructions + +### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +### 2. Set Up Google Cloud Authentication + +Choose one of the following methods: + +#### Option A: Service Account (Recommended for Production) +1. Create a Google Cloud project +2. Enable Vertex AI API +3. Create a service account and download JSON key +4. Set in `.env`: +``` +GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json +``` + +#### Option B: API Key (Simpler for Development) +1. Get API key from [Google AI Studio](https://aistudio.google.com/app/apikey) +2. Set in `.env`: +``` +GOOGLE_API_KEY=YOUR_GOOGLE_API_KEY +``` + +#### Option C: Application Default Credentials +```bash +gcloud auth application-default login +``` + +### 3. Run the Application + +```bash +python3 app.py +``` + +## Testing the Multi-Agent System + +### Test MathAgent +Send messages like: +- "Calculate 25 * 4 + 10" +- "What is 100 divided by 5?" +- "Solve 2 + 2" + +### Test TextAgent +Send messages like: +- "Format 'hello world' in uppercase" +- "Count words in: The quick brown fox jumps over the lazy dog" +- "Create a list from: apples, oranges, bananas" + +### Test InfoAgent +Send messages like: +- "What time is it?" +- "What can you do?" +- "Help me understand your capabilities" + +### Test Coordinator +Send general messages: +- "Hello, how are you?" +- "Tell me about yourself" +- The coordinator will handle these directly or route to appropriate agents + +## Key Benefits + +1. **Modularity**: Each agent has a specific responsibility +2. **Extensibility**: Easy to add new agents and tools +3. **Scalability**: Agents can be deployed independently +4. **Maintainability**: Clear separation of concerns +5. **Tool Integration**: Rich ecosystem of tools available +6. **Session Management**: Built-in conversation history + +## Troubleshooting + +### Authentication Issues +- Ensure Google Cloud credentials are properly set +- Check that Vertex AI API is enabled in your project +- Verify API key is valid (if using API key method) + +### Streaming Issues +- ADK events are asynchronous - ensure async/await is properly used +- Check that event loop is created correctly in synchronous contexts + +### Agent Routing Issues +- Verify agent descriptions are clear and distinct +- Check that coordinator instructions properly describe when to use each agent +- Review agent logs to see routing decisions + +## Future Enhancements + +Potential improvements to consider: + +1. **Add More Specialized Agents**: + - SearchAgent for web searches + - DataAgent for database queries + - CodeAgent for code generation + +2. **Enhance Tools**: + - Add external API integrations + - Implement more complex calculations + - Add file processing capabilities + +3. **Improve Routing**: + - Implement more sophisticated routing logic + - Add agent selection confidence scores + - Enable multi-agent collaboration + +4. **Add Observability**: + - Integrate with Google Cloud Trace + - Add custom logging for agent decisions + - Implement performance monitoring + +## Resources + +- [Google ADK Documentation](https://google.github.io/adk-docs/) +- [Multi-Agent Systems Guide](https://google.github.io/adk-docs/agents/multi-agents/) +- [Function Tools Documentation](https://google.github.io/adk-docs/tools/function-tools/) +- [Slack AI Apps Documentation](https://docs.slack.dev/ai/) + diff --git a/docs/SLACK_AGENT_STATUS.md b/docs/SLACK_AGENT_STATUS.md new file mode 100644 index 0000000..546a271 --- /dev/null +++ b/docs/SLACK_AGENT_STATUS.md @@ -0,0 +1,961 @@ +# Implementing Real-Time Agent Status Updates in Slack AI Assistants + +This document provides a comprehensive technical guide for implementing real-time status updates in Slack AI assistants using Google ADK (Agent Development Kit) event streams. + +## Table of Contents + +1. [Overview](#overview) +2. [Architecture](#architecture) +3. [Dependencies](#dependencies) +4. [Understanding Slack's Status API](#understanding-slacks-status-api) +5. [Understanding ADK Events](#understanding-adk-events) +6. [Implementation](#implementation) +7. [Event Flow](#event-flow) +8. [Troubleshooting](#troubleshooting) +9. [Best Practices](#best-practices) + +## Overview + +By default, Slack AI assistants show generic loading states ("thinking...", "evaluating...", "analyzing..."). This implementation enhances the user experience by displaying real-time information about: + +- Which agent is currently processing the request +- What tools are being invoked +- When control transfers between agents + +### What This Achieves + +**Before:** +``` +Status: thinking... +Loading: "Teaching the hamsters to type faster..." + "Untangling the internet cables..." +``` + +**After:** +``` +Status: is working... +Loading: "CoordinatorAgent is working..." + "Consulting MathAgent..." + "Using Calculate..." +``` + +## Architecture + +### High-Level Flow + +``` +User Message + ↓ +Slack Event Handler + ↓ +LLM Caller (ai/llm_caller.py) + ↓ +ADK Runner.run_async() + ↓ +Event Stream (yields events) + ↓ +Event Processor (detects agents/tools) + ↓ +Status/Content Events + ↓ +Slack Status Update + Response Streaming +``` + +### Component Responsibilities + +| Component | File | Responsibility | +|-----------|------|----------------| +| Event Handler | `listeners/assistant/message.py` | Receives Slack events, manages event loop | +| Event Handler | `listeners/events/app_mentioned.py` | Handles @mentions | +| LLM Caller | `ai/llm_caller.py` | Processes ADK events, yields status/content | +| ADK Runner | Google ADK SDK | Executes agent system, emits events | +| Agents | `ai/agents.py` | Define agent hierarchy and tools | + +## Dependencies + +### Required Python Packages + +```python +# Slack SDK +slack-bolt>=1.18.0 +slack-sdk>=3.23.0 + +# Google ADK +google-genai>=1.0.0 # Includes ADK + +# Standard library (no installation needed) +asyncio +logging +typing +``` + +### Environment Variables + +```bash +# Slack Configuration +SLACK_BOT_TOKEN=xoxb-... +SLACK_APP_TOKEN=xapp-... + +# Google ADK Authentication (choose one) +GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json # Option 1 +GOOGLE_API_KEY=your-api-key # Option 2 +# Or use: gcloud auth application-default login # Option 3 +``` + +### Slack App Scopes Required + +``` +app_mentions:read +assistant:write +channels:history +channels:read +chat:write +groups:history +groups:read +im:history +im:read +mpim:history +mpim:read +``` + +## Understanding Slack's Status API + +### The `assistant.threads.setStatus` Method + +```python +client.assistant_threads_setStatus( + channel_id: str, # Required: Channel containing the thread + thread_ts: str, # Required: Thread timestamp + status: str, # Required: Status text (may be translated by Slack) + loading_messages: Optional[List[str]] = None # Optional: Custom rotating messages +) +``` + +### Key Behaviors + +1. **Status Translation**: Slack translates custom `status` values to predefined ones: + - Your input: `"Processing your request..."` + - Slack displays: `"thinking..."` or `"evaluating..."` + +2. **Loading Messages**: These are **not** translated and display exactly as provided: + - Your input: `loading_messages=["MathAgent is working..."]` + - Slack displays: `"MathAgent is working..."` + +3. **Display Format**: + ``` + + + ``` + Example: + ``` + MyBot is working... + Consulting MathAgent... + ``` + +4. **Auto-Clearing**: Status automatically clears when: + - The app sends a reply message + - You send an empty string: `status=""` + +5. **Update Frequency**: You can call `setStatus` multiple times during processing to update the loading message in real-time. + +## Understanding ADK Events + +### Event Structure + +ADK's `runner.run_async()` yields events during agent execution. Each event contains: + +```python +class Event: + author: str # Agent name or "user" + content: Content | None # Message content with parts + event_id: str # Unique event identifier + invocation_id: str # Correlates events in one interaction + partial: bool # True for streaming chunks + actions: Actions | None # State changes, transfers, etc. +``` + +### Key Event Properties + +#### 1. `event.author` +Identifies who created the event: +```python +"user" # User's input message +"CoordinatorAgent" # Response from coordinator +"MathAgent" # Response from specialized agent +``` + +#### 2. `event.content` +Contains the actual message content: +```python +event.content.parts # List of Part objects +part.text # Text content of the part +``` + +#### 3. `event.get_function_calls()` +Returns tool invocations: +```python +function_calls = event.get_function_calls() +for func_call in function_calls: + func_call.name # e.g., "calculate", "format_text" + func_call.args # Tool arguments +``` + +#### 4. `event.actions` +Contains control flow information: +```python +event.actions.transfer_to_agent # Target agent for handoff +event.actions.state_delta # State changes +event.actions.artifact_delta # Artifact updates +``` + +### Event Lifecycle Example + +For request: "What is 25 times 4?" + +```python +# Event 1: User input echo +Event(author="user", content="What is 25 times 4?") + +# Event 2: Coordinator receives request +Event(author="CoordinatorAgent", content=None) + +# Event 3: Coordinator decides to transfer +Event(author="CoordinatorAgent", + actions=Actions(transfer_to_agent="MathAgent")) + +# Event 4: MathAgent takes over +Event(author="MathAgent", content=None) + +# Event 5: MathAgent requests tool +Event(author="MathAgent", + function_calls=[FunctionCall(name="calculate", args={"expr": "25*4"})]) + +# Event 6: Tool result +Event(author="function", content="100") + +# Event 7: MathAgent starts response +Event(author="MathAgent", content="The result", partial=True) + +# Event 8: Continues streaming +Event(author="MathAgent", content=" is 100", partial=True) + +# Event 9: Final response +Event(author="MathAgent", content=".", partial=False) +``` + +## Implementation + +### Step 1: Modify `ai/llm_caller.py` + +#### Add Dependencies + +```python +import logging +from typing import Dict, List, AsyncGenerator, Literal + +from google.adk.runners import InMemoryRunner +from google.genai import types + +from .agents import get_root_agent + +logger = logging.getLogger(__name__) +``` + +#### Update Function Signature + +Change from yielding strings to yielding event dictionaries: + +```python +async def call_llm( + messages_in_thread: List[Dict[str, str]], + user_id: str = "default_user", + session_id: str = None, +) -> AsyncGenerator[Dict[str, str], None]: + """ + Call the ADK multi-agent system and stream responses with status updates. + + Args: + messages_in_thread: List of message dictionaries with 'role' and 'content' keys + user_id: The Slack user ID + session_id: Optional session ID for maintaining conversation context + + Yields: + Dict[str, str]: Events with 'type' (either 'status' or 'content') and 'text' keys + - status events: Updates about what the agent is doing (tool calls, agent transfers) + - content events: Actual response text to display to the user + """ +``` + +#### Implement Event Processing + +```python +async def call_llm(...) -> AsyncGenerator[Dict[str, str], None]: + runner = get_adk_runner() + + # Session management (existing code) + if session_id is None: + session_id = f"session_{user_id}" + + try: + await runner.session_service.create_session( + app_name=_app_name, user_id=user_id, session_id=session_id + ) + except Exception: + pass + + # Convert messages to ADK format (existing code) + last_message = ( + messages_in_thread[-1] + if messages_in_thread + else {"role": "user", "content": ""} + ) + + new_message = types.Content( + role="user", parts=[types.Part(text=last_message["content"])] + ) + + # Track state + current_agent = None + has_started_streaming_content = False + + # Process event stream + async for event in runner.run_async( + user_id=user_id, session_id=session_id, new_message=new_message + ): + # Optional: Log for debugging + logger.info(f"ADK Event - Author: {event.author}, Has content: {bool(event.content)}") + + # DETECTION 1: Agent changes/transfers + if event.author and event.author != "user" and event.author != current_agent: + current_agent = event.author + if not has_started_streaming_content: + status_text = f"{current_agent} is working..." + logger.info(f"Yielding status: {status_text}") + yield {"type": "status", "text": status_text} + + # DETECTION 2: Tool calls (function calls) + function_calls = ( + event.get_function_calls() if hasattr(event, "get_function_calls") else [] + ) + logger.info(f"Function calls detected: {len(function_calls)}") + if function_calls and not has_started_streaming_content: + for func_call in function_calls: + tool_name = ( + func_call.name if hasattr(func_call, "name") else str(func_call) + ) + # Make tool names more readable + readable_name = tool_name.replace("_", " ").title() + status_text = f"Using {readable_name}..." + logger.info(f"Yielding tool status: {status_text}") + yield {"type": "status", "text": status_text} + + # DETECTION 3: Explicit agent transfers + if hasattr(event, "actions") and event.actions: + logger.info(f"Event has actions: {event.actions}") + if ( + hasattr(event.actions, "transfer_to_agent") + and event.actions.transfer_to_agent + ): + target_agent = event.actions.transfer_to_agent + status_text = f"Consulting {target_agent}..." + logger.info(f"Yielding transfer status: {status_text}") + yield {"type": "status", "text": status_text} + + # DETECTION 4: Stream content + if event.content and event.content.parts and event.author != "user": + for part in event.content.parts: + if hasattr(part, "text") and part.text: + has_started_streaming_content = True + yield {"type": "content", "text": part.text} +``` + +#### Key Implementation Details + +1. **`has_started_streaming_content` flag**: Prevents status updates after response text starts streaming. This avoids interrupting the user's reading experience. + +2. **`current_agent` tracking**: Only yields status when agent changes to avoid redundant updates. + +3. **Tool name formatting**: Converts `calculate` → `Calculate`, `format_text` → `Format Text` for better readability. + +4. **Hasattr checks**: Safely checks for optional attributes that may not exist on all event types. + +### Step 2: Update Event Handlers + +#### For Assistant Thread Messages (`listeners/assistant/message.py`) + +```python +def message( + client: WebClient, + context: BoltContext, + logger: Logger, + payload: dict, + say: Say, + set_status: SetStatus, +): + try: + channel_id = payload["channel"] + team_id = context.team_id + thread_ts = payload["thread_ts"] + user_id = context.user_id + + # Set initial status + set_status(status="is thinking...") + + # Fetch conversation history (existing code) + replies = client.conversations_replies( + channel=context.channel_id, + ts=context.thread_ts, + oldest=context.thread_ts, + limit=10, + ) + messages_in_thread: List[Dict[str, str]] = [] + for message in replies["messages"]: + role = "user" if message.get("bot_id") is None else "assistant" + messages_in_thread.append({"role": role, "content": message["text"]}) + + # Create event loop for async operations + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Initialize Slack streaming + streamer = client.chat_stream( + channel=channel_id, + recipient_team_id=team_id, + recipient_user_id=user_id, + thread_ts=thread_ts, + ) + + # Stream ADK response with dynamic status updates + async def stream_response(): + async for event in call_llm( + messages_in_thread, user_id=user_id, session_id=thread_ts + ): + if event["type"] == "status": + # Update loading message with real-time agent activity + logger.info(f"Setting Slack loading message to: {event['text']}") + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is working...", # Generic status (may be translated) + loading_messages=[event["text"]], # Specific detail (shows as-is) + ) + elif event["type"] == "content": + # Stream actual response content + if event["text"]: + streamer.append(markdown_text=event["text"]) + + loop.run_until_complete(stream_response()) + loop.close() + + # Add feedback block and stop streaming + feedback_block = create_feedback_block() + streamer.stop(blocks=feedback_block) + + except Exception as e: + logger.exception(f"Failed to handle a user message event: {e}") + say(f":warning: Something went wrong! ({e})") +``` + +#### For App Mentions (`listeners/events/app_mentioned.py`) + +```python +def app_mentioned_callback(client: WebClient, event: dict, logger: Logger, say: Say): + try: + channel_id = event.get("channel") + team_id = event.get("team") + text = event.get("text") + thread_ts = event.get("thread_ts") or event.get("ts") + user_id = event.get("user") + + # Set initial status + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is thinking...", + loading_messages=["Starting to process your request..."], + ) + + # Create event loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Initialize streaming + streamer = client.chat_stream( + channel=channel_id, + recipient_team_id=team_id, + recipient_user_id=user_id, + thread_ts=thread_ts, + ) + + # Stream response with dynamic status + async def stream_response(): + async for event in call_llm( + [{"role": "user", "content": text}], + user_id=user_id, + session_id=thread_ts, + ): + if event["type"] == "status": + logger.info(f"Setting Slack loading message to: {event['text']}") + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is working...", + loading_messages=[event["text"]], + ) + elif event["type"] == "content": + if event["text"]: + streamer.append(markdown_text=event["text"]) + + loop.run_until_complete(stream_response()) + loop.close() + + feedback_block = create_feedback_block() + streamer.stop(blocks=feedback_block) + + except Exception as e: + logger.exception(f"Failed to handle a user message event: {e}") + say(f":warning: Something went wrong! ({e})") +``` + +## Event Flow + +### Complete Request-Response Cycle + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ 1. User sends message in Slack │ +└───────────────────────────────┬─────────────────────────────────┘ + │ +┌───────────────────────────────▼─────────────────────────────────┐ +│ 2. Slack Event Handler receives message │ +│ - Calls client.assistant_threads_setStatus() │ +│ - Status: "is thinking..." │ +│ - Loading: "Starting to process..." │ +└───────────────────────────────┬─────────────────────────────────┘ + │ +┌───────────────────────────────▼─────────────────────────────────┐ +│ 3. call_llm() invoked with message history │ +│ - Creates ADK session │ +│ - Calls runner.run_async() │ +└───────────────────────────────┬─────────────────────────────────┘ + │ +┌───────────────────────────────▼─────────────────────────────────┐ +│ 4. ADK Event Stream begins │ +│ │ +│ Event 1: author="CoordinatorAgent" │ +│ └─> Yield: {"type": "status", "text": "CoordinatorAgent..."}│ +│ └─> Update Slack: loading_messages=["CoordinatorAgent..."] │ +│ │ +│ Event 2: actions.transfer_to_agent="MathAgent" │ +│ └─> Yield: {"type": "status", "text": "Consulting Math..."}│ +│ └─> Update Slack: loading_messages=["Consulting Math..."] │ +│ │ +│ Event 3: author="MathAgent" │ +│ └─> Yield: {"type": "status", "text": "MathAgent..."} │ +│ └─> Update Slack: loading_messages=["MathAgent..."] │ +│ │ +│ Event 4: function_call="calculate" │ +│ └─> Yield: {"type": "status", "text": "Using Calculate..."} │ +│ └─> Update Slack: loading_messages=["Using Calculate..."] │ +│ │ +│ Event 5: content="The result is 100" │ +│ └─> Yield: {"type": "content", "text": "The result is 100"} │ +│ └─> Stream to Slack: streamer.append("The result is 100") │ +│ └─> Set flag: has_started_streaming_content = True │ +│ │ +└───────────────────────────────┬─────────────────────────────────┘ + │ +┌───────────────────────────────▼─────────────────────────────────┐ +│ 5. Stream completes │ +│ - streamer.stop() called │ +│ - Status automatically cleared by Slack │ +│ - User sees complete response │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## Troubleshooting + +### Issue: Custom status shows as "thinking", "evaluating", etc. + +**Cause**: Slack translates the `status` parameter to predefined values. + +**Solution**: Use `loading_messages` parameter for custom text: +```python +# ❌ This gets translated +status="MathAgent is working..." + +# ✅ This displays as-is +status="is working...", +loading_messages=["MathAgent is working..."] +``` + +### Issue: Status updates don't appear + +**Possible Causes:** + +1. **Missing scope**: Ensure `assistant:write` scope is added to your Slack app +2. **Wrong parameters**: Verify `channel_id` and `thread_ts` are correct +3. **Status cleared too quickly**: Check if response starts streaming immediately + +**Debug Steps:** +```python +# Add logging +logger.info(f"Setting status - channel: {channel_id}, thread: {thread_ts}") +logger.info(f"Status text: {status}, Loading: {loading_messages}") +``` + +### Issue: No agent or tool names detected + +**Possible Causes:** + +1. **Events not exposing expected attributes**: ADK version mismatch +2. **Agents responding without intermediate events**: Fast responses skip tool calls + +**Debug Steps:** +```python +# Log all event attributes +logger.info(f"Event attributes: {dir(event)}") +logger.info(f"Event author: {event.author}") +logger.info(f"Has function_calls: {hasattr(event, 'get_function_calls')}") +``` + +### Issue: Async loop errors + +**Error**: `RuntimeError: This event loop is already running` + +**Cause**: Trying to create nested event loops. + +**Solution**: Ensure you create a fresh loop: +```python +loop = asyncio.new_event_loop() +asyncio.set_event_loop(loop) +try: + loop.run_until_complete(stream_response()) +finally: + loop.close() +``` + +## Best Practices + +### 1. Stop Status Updates Before Content Streaming + +Once you start streaming actual response content, stop sending status updates: + +```python +has_started_streaming_content = False + +if event["type"] == "content": + has_started_streaming_content = True + +if event["type"] == "status" and not has_started_streaming_content: + # Only update status before content starts + update_slack_status(event["text"]) +``` + +**Rationale**: Avoids disrupting the user's reading experience with status changes. + +### 2. Make Tool Names Human-Readable + +Transform technical function names: + +```python +tool_name = "calculate" +readable_name = tool_name.replace("_", " ").title() # "Calculate" + +tool_name = "format_text" +readable_name = tool_name.replace("_", " ").title() # "Format Text" +``` + +### 3. Use Descriptive Status Messages + +Be specific about what's happening: + +```python +# ❌ Generic +"Processing..." + +# ✅ Specific +"MathAgent is calculating..." +"Using Format Text tool..." +"Consulting InfoAgent for current time..." +``` + +### 4. Handle Errors Gracefully + +Always clear status on errors: + +```python +try: + # Process events + async for event in call_llm(...): + # Handle event +except Exception as e: + logger.exception(f"Error: {e}") + # Clear status + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="", # Empty string clears status + ) + # Notify user + say(f":warning: Something went wrong!") +``` + +### 5. Log Extensively During Development + +Add detailed logging to understand event flow: + +```python +logger.info(f"ADK Event - Author: {event.author}") +logger.info(f"Has content: {bool(event.content)}") +logger.info(f"Function calls: {len(function_calls)}") +logger.info(f"Setting loading message to: {status_text}") +``` + +Remove or reduce verbosity in production. + +### 6. Consider Rate Limiting + +Slack API has rate limits. Avoid updating status too frequently: + +```python +import time + +last_status_update = 0 +MIN_UPDATE_INTERVAL = 0.5 # seconds + +current_time = time.time() +if current_time - last_status_update >= MIN_UPDATE_INTERVAL: + client.assistant_threads_setStatus(...) + last_status_update = current_time +``` + +### 7. Test with Different Agent Scenarios + +Ensure your implementation works across different paths: + +```python +# Math request (should show MathAgent + calculate tool) +"What is 25 times 4?" + +# Text request (should show TextAgent + count_words tool) +"Count the words in this sentence" + +# Info request (should show InfoAgent + get_current_time tool) +"What time is it?" + +# Direct response (should show only CoordinatorAgent) +"Hello!" +``` + +## Example Output Scenarios + +### Scenario 1: Math Calculation + +**User**: "What is 15 * 8?" + +**Status Updates**: +``` +1. is thinking... + "Starting to process your request..." + +2. is working... + "CoordinatorAgent is working..." + +3. is working... + "Consulting MathAgent..." + +4. is working... + "MathAgent is working..." + +5. is working... + "Using Calculate..." + +6. [Status cleared, streaming response] + "The result of 15 * 8 is 120." +``` + +### Scenario 2: Text Processing + +**User**: "Count the words in this sentence please" + +**Status Updates**: +``` +1. is thinking... + "Starting to process your request..." + +2. is working... + "CoordinatorAgent is working..." + +3. is working... + "Consulting TextAgent..." + +4. is working... + "TextAgent is working..." + +5. is working... + "Using Count Words..." + +6. [Status cleared, streaming response] + "There are 6 words in your sentence." +``` + +### Scenario 3: General Query + +**User**: "Hello!" + +**Status Updates**: +``` +1. is thinking... + "Starting to process your request..." + +2. is working... + "CoordinatorAgent is working..." + +3. [Status cleared, streaming response] + "Hello! How can I help you today?" +``` + +## Performance Considerations + +### Event Processing Overhead + +- Each status update makes an API call to Slack +- Typical latency: 50-200ms per call +- Impact: Minimal for most use cases + +### Optimization Strategies + +1. **Batch rapid updates**: If multiple events occur within 500ms, only send the latest +2. **Skip redundant updates**: Track last status sent, only update if changed +3. **Prioritize content**: Once streaming starts, ignore status events + +### Memory Usage + +- Event objects are processed in streaming fashion +- No large accumulation of events in memory +- Session history stored in ADK's session service + +## Security Considerations + +### 1. Sanitize Status Messages + +Never include sensitive data in status updates: + +```python +# ❌ Dangerous +status_text = f"Querying database for user {user_email}..." + +# ✅ Safe +status_text = "Querying database..." +``` + +### 2. Validate Thread Context + +Ensure the bot has access to the channel/thread: + +```python +try: + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is working...", + ) +except SlackApiError as e: + if e.response["error"] == "channel_not_found": + logger.error("Bot not in channel") + return +``` + +### 3. Rate Limit Per User + +Prevent abuse by rate limiting status updates per user: + +```python +from collections import defaultdict +import time + +user_status_times = defaultdict(list) + +def can_update_status(user_id: str, max_per_minute: int = 60) -> bool: + now = time.time() + user_times = user_status_times[user_id] + # Remove times older than 1 minute + user_times[:] = [t for t in user_times if now - t < 60] + + if len(user_times) >= max_per_minute: + return False + + user_times.append(now) + return True +``` + +## Extending the Implementation + +### Adding Custom Event Types + +Extend the event dictionary to include more information: + +```python +# In ai/llm_caller.py +yield { + "type": "status", + "text": status_text, + "agent": current_agent, # Additional metadata + "tool": tool_name, # Additional metadata +} + +# In event handler +if event["type"] == "status": + # Use additional metadata for custom logic + if event.get("tool") == "calculate": + # Special handling for calculations + pass +``` + +### Supporting Multiple Languages + +Translate status messages based on user locale: + +```python +TRANSLATIONS = { + "en": { + "working": "is working...", + "consulting": "Consulting {agent}...", + "using_tool": "Using {tool}...", + }, + "es": { + "working": "está trabajando...", + "consulting": "Consultando {agent}...", + "using_tool": "Usando {tool}...", + }, +} + +def get_status_text(key: str, locale: str, **kwargs) -> str: + template = TRANSLATIONS.get(locale, TRANSLATIONS["en"])[key] + return template.format(**kwargs) +``` + +### Adding Progress Indicators + +Show numerical progress for long operations: + +```python +total_steps = 5 +current_step = 0 + +async for event in runner.run_async(...): + if event["type"] == "status": + current_step += 1 + progress = f"[{current_step}/{total_steps}] {event['text']}" + yield {"type": "status", "text": progress} +``` + +## Conclusion + +This implementation provides users with transparency into the AI agent's decision-making process, enhancing trust and user experience. By leveraging ADK's event system and Slack's loading messages, you can create a sophisticated real-time status update system without significant performance overhead. + +For questions or issues, refer to: +- [Slack API Documentation](https://api.slack.com/methods/assistant.threads.setStatus) +- [Google ADK Documentation](https://google.github.io/adk-docs/) +- [Project CLAUDE.md](./CLAUDE.md) for codebase-specific guidance diff --git a/listeners/assistant/message.py b/listeners/assistant/message.py index b61de90..440112b 100644 --- a/listeners/assistant/message.py +++ b/listeners/assistant/message.py @@ -1,3 +1,4 @@ +import asyncio from logging import Logger from typing import Dict, List @@ -34,16 +35,8 @@ def message( thread_ts = payload["thread_ts"] user_id = context.user_id - set_status( - status="thinking...", - loading_messages=[ - "Teaching the hamsters to type faster…", - "Untangling the internet cables…", - "Consulting the office goldfish…", - "Polishing up the response just for you…", - "Convincing the AI to stop overthinking…", - ], - ) + # Set initial status - will be updated dynamically based on agent activity + set_status(status="is thinking...") replies = client.conversations_replies( channel=context.channel_id, @@ -56,7 +49,9 @@ def message( role = "user" if message.get("bot_id") is None else "assistant" messages_in_thread.append({"role": role, "content": message["text"]}) - returned_message = call_llm(messages_in_thread) + # Create event loop for async call + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) streamer = client.chat_stream( channel=channel_id, @@ -65,13 +60,30 @@ def message( thread_ts=thread_ts, ) - # Loop over OpenAI response stream - # https://platform.openai.com/docs/api-reference/responses/create - for event in returned_message: - if event.type == "response.output_text.delta": - streamer.append(markdown_text=f"{event.delta}") - else: - continue + # Stream ADK response with dynamic status updates + async def stream_response(): + async for event in call_llm( + messages_in_thread, user_id=user_id, session_id=thread_ts + ): + if event["type"] == "status": + # Update the loading message with real-time agent activity + logger.info(f"Setting Slack loading message to: {event['text']}") + # Use client API directly to access loading_messages parameter + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is thinking...", # The status below text box which typically shows "is typing..." for other users + loading_messages=[ + event["text"] + ], # The text that shows next to the app name shimmer. Here you can put custom text from you Agent events. + ) + elif event["type"] == "content": + # Stream the actual response content + if event["text"]: + streamer.append(markdown_text=event["text"]) + + loop.run_until_complete(stream_response()) + loop.close() feedback_block = create_feedback_block() streamer.stop(blocks=feedback_block) diff --git a/listeners/events/app_mentioned.py b/listeners/events/app_mentioned.py index 6ec8d25..cea92ae 100644 --- a/listeners/events/app_mentioned.py +++ b/listeners/events/app_mentioned.py @@ -1,3 +1,4 @@ +import asyncio from logging import Logger from slack_bolt import Say @@ -25,20 +26,18 @@ def app_mentioned_callback(client: WebClient, event: dict, logger: Logger, say: thread_ts = event.get("thread_ts") or event.get("ts") user_id = event.get("user") + # Set initial status with placeholder loading messages + # Will be updated dynamically based on agent activity client.assistant_threads_setStatus( channel_id=channel_id, thread_ts=thread_ts, - status="thinking...", - loading_messages=[ - "Teaching the hamsters to type faster…", - "Untangling the internet cables…", - "Consulting the office goldfish…", - "Polishing up the response just for you…", - "Convincing the AI to stop overthinking…", - ], + status="is thinking...", + loading_messages=["Starting to process your request..."], ) - returned_message = call_llm([{"role": "user", "content": text}]) + # Create event loop for async call + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) streamer = client.chat_stream( channel=channel_id, @@ -47,13 +46,30 @@ def app_mentioned_callback(client: WebClient, event: dict, logger: Logger, say: thread_ts=thread_ts, ) - # Loop over OpenAI response stream - # https://platform.openai.com/docs/api-reference/responses/create - for event in returned_message: - if event.type == "response.output_text.delta": - streamer.append(markdown_text=f"{event.delta}") - else: - continue + # Stream ADK response with dynamic status updates + async def stream_response(): + async for event in call_llm( + [{"role": "user", "content": text}], + user_id=user_id, + session_id=thread_ts, + ): + if event["type"] == "status": + # Update the loading message with real-time agent activity + # Keep generic status but show detailed info in loading_messages + logger.info(f"Setting Slack loading message to: {event['text']}") + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is working...", # Generic status + loading_messages=[event["text"]], # Specific detail + ) + elif event["type"] == "content": + # Stream the actual response content + if event["text"]: + streamer.append(markdown_text=event["text"]) + + loop.run_until_complete(stream_response()) + loop.close() feedback_block = create_feedback_block() streamer.stop(blocks=feedback_block) diff --git a/requirements.txt b/requirements.txt index bd5e197..7438e89 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,12 @@ slack-sdk==3.37.0 slack-bolt==1.26.0 -# If you use a different LLM vendor, replace this dependency -openai +# Google ADK for multi-agent system +google-adk +google-genai + +# LiteLLM for OpenAI model integration with ADK +litellm pytest ruff==0.14.3