diff --git a/.gitignore b/.gitignore index a99f209..b500659 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ logs/ *.db .pytype/ .idea/ +.vscode/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..3861873 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,137 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Overview + +This is a Slack AI Agent App built with Bolt for Python, implementing a multi-agent system using Google ADK (Agent Development Kit) with Gemini models. The app provides AI-powered assistance through Slack's Assistant interface with streaming responses. + +## Development Commands + +### Environment Setup + +```sh +# Create and activate virtual environment +python3 -m venv .venv +source .venv/bin/activate # Windows: .\.venv\Scripts\Activate + +# Install dependencies +pip install -r requirements.txt +``` + +### Running the App + +```sh +# Using Slack CLI (recommended) +slack run + +# Or directly with Python +python3 app.py +``` + +### Code Quality + +```sh +# Lint code +ruff check + +# Format code +ruff format +``` + +### Testing + +```sh +# Run all tests +pytest + +# Run specific test file +pytest tests/test_file.py + +# Run with verbose output +pytest -v +``` + +## Architecture + +### Multi-Agent System + +The application uses Google ADK to implement a hierarchical multi-agent architecture: + +- **CoordinatorAgent** (`ai/agents.py:55-78`): Root agent that analyzes user requests and delegates to specialized agents +- **MathAgent** (`ai/agents.py:20-28`): Handles mathematical calculations and numerical operations +- **TextAgent** (`ai/agents.py:31-40`): Processes text formatting, word counting, and list creation +- **InfoAgent** (`ai/agents.py:43-52`): Provides current time, help information, and general queries + +Each agent uses the `gemini-2.0-flash` model and has access to specific tools defined in `ai/tools.py`. + +### Request Flow + +1. **Entry Point** (`app.py`): Thin entry point that initializes the Bolt app and registers listeners +2. **Listener Registration** (`listeners/__init__.py`): Routes incoming Slack events to appropriate handlers +3. **Event Handlers**: + - `listeners/assistant/message.py`: Handles messages in assistant threads + - `listeners/events/app_mentioned.py`: Handles @mentions of the bot + - `listeners/assistant/assistant_thread_started.py`: Sets up suggested prompts when threads start +4. **LLM Integration** (`ai/llm_caller.py`): Provides `call_llm()` async generator that streams responses from the ADK agent system +5. **Response Streaming**: Uses Slack's `chat_stream` API to stream chunks back to the user in real-time + +### Key Design Patterns + +- **Async Streaming**: All LLM responses are streamed using async generators to provide real-time feedback +- **Event Loop Management**: Creates new event loops in synchronous Bolt handlers to call async ADK functions +- **Session Management**: Uses thread timestamps as session IDs to maintain conversation context +- **Tool-Based Architecture**: Agents use Python functions as tools, automatically exposed to the LLM by ADK + +## Configuration + +### Required Environment Variables + +```sh +SLACK_BOT_TOKEN=xoxb-... # From OAuth & Permissions +SLACK_APP_TOKEN=xapp-... # App-level token with connections:write +``` + +### Google ADK Authentication (one of): + +```sh +# Option 1: Service account (production) +GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json + +# Option 2: API key (development) +GOOGLE_API_KEY=your-api-key + +# Option 3: Use `gcloud auth application-default login` +``` + +## Adding New Agents + +To add a new specialized agent: + +1. Create tools in `ai/tools.py` following the existing pattern (functions returning `Dict[str, Any]`) +2. Define the agent in `ai/agents.py`: + ```python + new_agent = Agent( + name="AgentName", + model="gemini-2.0-flash", + description="When to use this agent", + instruction="Detailed instructions for the agent", + tools=[tool1, tool2], + ) + ``` +3. Add the agent to `coordinator_agent.sub_agents` list +4. Update the coordinator's instruction to explain when to delegate to the new agent + +## Slack-Specific Conventions + +- **Markdown Compatibility**: Agents are instructed to convert markdown to Slack-compatible format +- **Slack Syntax Preservation**: User mentions (`<@USER_ID>`) and channel mentions (`<#CHANNEL_ID>`) must be preserved as-is in responses +- **Status Messages**: Use playful loading messages during "thinking" state (see `listeners/assistant/message.py:40-46`) +- **Feedback Blocks**: Responses include feedback blocks for user interaction (`listeners/views/feedback_block.py`) + +## Project Structure Notes + +- `/listeners`: Organized by Slack Platform feature (events, assistant, actions, views) +- `/ai`: Contains all LLM and agent-related code, separated from Slack-specific logic +- Socket Mode is used for local development (no public URL required) +- For OAuth/distribution, use `app_oauth.py` instead of `app.py` diff --git a/ai/agents.py b/ai/agents.py new file mode 100644 index 0000000..bcc9e01 --- /dev/null +++ b/ai/agents.py @@ -0,0 +1,93 @@ +""" +Multi-agent system using Google ADK. + +This module defines a multi-agent system with specialized agents for different tasks. +""" + +from google.adk.agents import Agent +from google.adk.models.lite_llm import LiteLlm + +from .tools import ( + get_current_time, + calculate, + format_text, + count_words, + create_list, + get_help_info, +) + +# Configure OpenAI model via LiteLLM +# Requires OPENAI_API_KEY environment variable +openai_model = LiteLlm(model="openai/gpt-4o") + + +# Specialized agent for mathematical operations +math_agent = Agent( + name="MathAgent", + model=openai_model, + description="Specializes in mathematical calculations and numerical operations. Use this agent for any math-related queries.", + instruction="""You are a mathematical expert. You can perform calculations, solve equations, + and help with numerical problems. Use the calculate tool to evaluate mathematical expressions. + Always explain your calculations clearly.""", + tools=[calculate], +) + +# Specialized agent for text processing +text_agent = Agent( + name="TextAgent", + model=openai_model, + description="Specializes in text processing, formatting, and analysis. Use this agent for text manipulation tasks.", + instruction="""You are a text processing expert. You can format text, count words, + create lists, and analyze text content. Use the available tools to help users with text-related tasks. + When you include markdown text, convert them to Slack compatible ones. + When a prompt has Slack's special syntax like <@USER_ID> or <#CHANNEL_ID>, you must keep them as-is in your response.""", + tools=[format_text, count_words, create_list], +) + +# Specialized agent for information and utilities +info_agent = Agent( + name="InfoAgent", + model=openai_model, + description="Provides general information, current time, and help about available capabilities.", + instruction="""You are an information assistant. You can provide the current time, + help users understand what tools are available, and answer general questions. + When you include markdown text, convert them to Slack compatible ones. + When a prompt has Slack's special syntax like <@USER_ID> or <#CHANNEL_ID>, you must keep them as-is in your response.""", + tools=[get_current_time, get_help_info], +) + +# Coordinator agent that routes requests to specialized agents +coordinator_agent = Agent( + name="CoordinatorAgent", + model=openai_model, + description="Main coordinator that routes user requests to specialized agents.", + instruction="""You are a helpful assistant coordinator in a Slack workspace. + Users in the workspace will ask you to help them with various tasks. + + You have access to specialized agents: + - MathAgent: For mathematical calculations and numerical operations + - TextAgent: For text processing, formatting, and analysis + - InfoAgent: For general information, current time, and help + + Analyze the user's request and delegate to the appropriate specialized agent: + - For math problems, calculations, or numerical queries -> use MathAgent + - For text formatting, word counting, or list creation -> use TextAgent + - For time queries, help requests, or general information -> use InfoAgent + - For general conversation or questions that don't fit the above -> answer directly + + When you include markdown text, convert them to Slack compatible ones. + When a prompt has Slack's special syntax like <@USER_ID> or <#CHANNEL_ID>, you must keep them as-is in your response. + + Always be professional, helpful, and friendly in your responses.""", + sub_agents=[math_agent, text_agent, info_agent], +) + + +def get_root_agent() -> Agent: + """ + Get the root coordinator agent for the multi-agent system. + + Returns: + Agent: The coordinator agent that routes requests to specialized agents. + """ + return coordinator_agent diff --git a/ai/llm_caller.py b/ai/llm_caller.py index d0a0591..43f5b86 100644 --- a/ai/llm_caller.py +++ b/ai/llm_caller.py @@ -1,27 +1,143 @@ -import os -from typing import Dict, List - -import openai -from openai import Stream -from openai.types.responses import ResponseStreamEvent - -DEFAULT_SYSTEM_CONTENT = """ -You're an assistant in a Slack workspace. -Users in the workspace will ask you to help them write something or to think better about a specific topic. -You'll respond to those questions in a professional way. -When you include markdown text, convert them to Slack compatible ones. -When a prompt has Slack's special syntax like <@USER_ID> or <#CHANNEL_ID>, you must keep them as-is in your response. """ +LLM caller using Google ADK multi-agent system. + +This module provides the interface for calling the ADK multi-agent system +and streaming responses back to Slack. +""" + +import logging +from typing import Dict, List, AsyncGenerator, Literal + +from google.adk.runners import InMemoryRunner +from google.genai import types + +from .agents import get_root_agent + +logger = logging.getLogger(__name__) + + +# Type for the events yielded by call_llm +class LLMEvent(Dict): + """Event yielded during LLM processing.""" + + type: Literal["status", "content"] + text: str + + +# Initialize the ADK runner with the root agent +_runner = None +_app_name = "slack_ai_agent" + + +def get_adk_runner() -> InMemoryRunner: + """Get or create the ADK InMemoryRunner instance.""" + global _runner + if _runner is None: + root_agent = get_root_agent() + _runner = InMemoryRunner(agent=root_agent, app_name=_app_name) + return _runner -def call_llm( +async def call_llm( messages_in_thread: List[Dict[str, str]], - system_content: str = DEFAULT_SYSTEM_CONTENT, -) -> Stream[ResponseStreamEvent]: - openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - messages = [{"role": "system", "content": system_content}] - messages.extend(messages_in_thread) - response = openai_client.responses.create( - model="gpt-4o-mini", input=messages, stream=True + user_id: str = "default_user", + session_id: str = None, +) -> AsyncGenerator[Dict[str, str], None]: + """ + Call the ADK multi-agent system and stream responses with status updates. + + Args: + messages_in_thread: List of message dictionaries with 'role' and 'content' keys + user_id: The Slack user ID + session_id: Optional session ID for maintaining conversation context + + Yields: + Dict[str, str]: Events with 'type' (either 'status' or 'content') and 'text' keys + - status events: Updates about what the agent is doing (tool calls, agent transfers) + - content events: Actual response text to display to the user + """ + runner = get_adk_runner() + + # Create or get session + if session_id is None: + session_id = f"session_{user_id}" + + # Ensure session exists + try: + await runner.session_service.create_session( + app_name=_app_name, user_id=user_id, session_id=session_id + ) + except Exception: + # Session might already exist, which is fine + pass + + # Convert messages to ADK format - only send the last user message + last_message = ( + messages_in_thread[-1] + if messages_in_thread + else {"role": "user", "content": ""} + ) + + # Create the message content + new_message = types.Content( + role="user", parts=[types.Part(text=last_message["content"])] ) - return response + + # Track the current agent for status updates + current_agent = None + has_started_streaming_content = False + + # Use the runner's run_async method for streaming + async for event in runner.run_async( + user_id=user_id, session_id=session_id, new_message=new_message + ): + # Log the full event for debugging + logger.info( + f"ADK Event - Author: {event.author}, Has content: {bool(event.content)}" + ) + logger.info(f"Event attributes: {dir(event)}") + + # Detect agent changes/transfers + if event.author and event.author != "user" and event.author != current_agent: + current_agent = event.author + # Only show agent status before we start streaming actual content + if not has_started_streaming_content: + status_text = f"{current_agent} is working..." + logger.info(f"Yielding status: {status_text}") + yield {"type": "status", "text": status_text} + + # Detect tool calls (function calls) + function_calls = ( + event.get_function_calls() if hasattr(event, "get_function_calls") else [] + ) + logger.info(f"Function calls detected: {len(function_calls)}") + if function_calls and not has_started_streaming_content: + for func_call in function_calls: + tool_name = ( + func_call.name if hasattr(func_call, "name") else str(func_call) + ) + # Make tool names more readable + readable_name = tool_name.replace("_", " ").title() + status_text = f"Using {readable_name}..." + logger.info(f"Yielding tool status: {status_text}") + yield {"type": "status", "text": status_text} + + # Detect agent transfers + if hasattr(event, "actions") and event.actions: + logger.info(f"Event has actions: {event.actions}") + if ( + hasattr(event.actions, "transfer_to_agent") + and event.actions.transfer_to_agent + ): + target_agent = event.actions.transfer_to_agent + status_text = f"Consulting {target_agent}..." + logger.info(f"Yielding transfer status: {status_text}") + yield {"type": "status", "text": status_text} + + # Stream all non-user content events + if event.content and event.content.parts and event.author != "user": + # Extract text from all parts and yield as content + for part in event.content.parts: + if hasattr(part, "text") and part.text: + has_started_streaming_content = True + yield {"type": "content", "text": part.text} diff --git a/ai/tools.py b/ai/tools.py new file mode 100644 index 0000000..1f6e844 --- /dev/null +++ b/ai/tools.py @@ -0,0 +1,153 @@ +""" +Custom tools for ADK agents. + +This module contains various tools that agents can use to perform specific tasks. +""" + +import datetime +from typing import Dict, Any + + +def get_current_time() -> Dict[str, str]: + """ + Get the current date and time. + + Returns: + dict: A dictionary containing the current date and time information. + """ + now = datetime.datetime.now() + return { + "status": "success", + "current_time": now.strftime("%Y-%m-%d %H:%M:%S"), + "day_of_week": now.strftime("%A"), + "timezone": "UTC" + if datetime.datetime.now().tzinfo is None + else str(datetime.datetime.now().tzinfo), + } + + +def calculate(expression: str) -> Dict[str, Any]: + """ + Safely evaluate a mathematical expression. + + Args: + expression (str): A mathematical expression to evaluate (e.g., "2 + 2", "10 * 5"). + + Returns: + dict: A dictionary containing the result or error message. + """ + try: + # Only allow safe mathematical operations + allowed_chars = set("0123456789+-*/(). ") + if not all(c in allowed_chars for c in expression): + return { + "status": "error", + "message": "Invalid characters in expression. Only numbers and basic operators (+, -, *, /, parentheses) are allowed.", + } + + result = eval(expression, {"__builtins__": {}}, {}) + return {"status": "success", "expression": expression, "result": result} + except Exception as e: + return {"status": "error", "message": f"Error evaluating expression: {str(e)}"} + + +def format_text(text: str, format_type: str = "uppercase") -> Dict[str, str]: + """ + Format text in various ways. + + Args: + text (str): The text to format. + format_type (str): The type of formatting to apply. + Options: 'uppercase', 'lowercase', 'title', 'reverse'. + Defaults to 'uppercase'. + + Returns: + dict: A dictionary containing the formatted text. + """ + format_type = format_type.lower() + + if format_type == "uppercase": + formatted = text.upper() + elif format_type == "lowercase": + formatted = text.lower() + elif format_type == "title": + formatted = text.title() + elif format_type == "reverse": + formatted = text[::-1] + else: + return { + "status": "error", + "message": f"Unknown format type: {format_type}. Use 'uppercase', 'lowercase', 'title', or 'reverse'.", + } + + return { + "status": "success", + "original": text, + "formatted": formatted, + "format_type": format_type, + } + + +def count_words(text: str) -> Dict[str, Any]: + """ + Count words, characters, and sentences in a text. + + Args: + text (str): The text to analyze. + + Returns: + dict: A dictionary containing word count, character count, and sentence count. + """ + words = text.split() + sentences = text.count(".") + text.count("!") + text.count("?") + + return { + "status": "success", + "word_count": len(words), + "character_count": len(text), + "character_count_no_spaces": len(text.replace(" ", "")), + "sentence_count": sentences if sentences > 0 else 1, + } + + +def create_list(items: str, separator: str = ",") -> Dict[str, Any]: + """ + Create a formatted list from comma-separated or custom-separated items. + + Args: + items (str): Items separated by a delimiter. + separator (str): The separator used between items. Defaults to comma. + + Returns: + dict: A dictionary containing the formatted list. + """ + item_list = [item.strip() for item in items.split(separator) if item.strip()] + + formatted_list = "\n".join([f"{i + 1}. {item}" for i, item in enumerate(item_list)]) + + return { + "status": "success", + "item_count": len(item_list), + "items": item_list, + "formatted_list": formatted_list, + } + + +def get_help_info() -> Dict[str, str]: + """ + Get information about available tools and capabilities. + + Returns: + dict: A dictionary containing help information. + """ + return { + "status": "success", + "message": "I have access to several tools", + "available_tools": [ + "get_current_time - Get current date and time", + "calculate - Perform mathematical calculations", + "format_text - Format text (uppercase, lowercase, title, reverse)", + "count_words - Count words, characters, and sentences", + "create_list - Create formatted lists from text", + ], + } diff --git a/docs/MIGRATION_GUIDE.md b/docs/MIGRATION_GUIDE.md new file mode 100644 index 0000000..f1bca73 --- /dev/null +++ b/docs/MIGRATION_GUIDE.md @@ -0,0 +1,217 @@ +# Migration from OpenAI to Google ADK + +This document explains the changes made to migrate from OpenAI to Google's Agent Development Kit (ADK) with a multi-agent system. + +## Overview of Changes + +### 1. Dependencies Updated + +**Before:** +``` +openai +``` + +**After:** +``` +google-adk +google-genai +``` + +### 2. Multi-Agent Architecture + +The application now uses a multi-agent system with specialized agents: + +#### **CoordinatorAgent** (Root Agent) +- Routes user requests to appropriate specialized agents +- Handles general conversation +- Manages the overall interaction flow + +#### **MathAgent** +- Specializes in mathematical calculations +- Tools: `calculate()` +- Handles: Math problems, numerical operations, equations + +#### **TextAgent** +- Specializes in text processing and analysis +- Tools: `format_text()`, `count_words()`, `create_list()` +- Handles: Text formatting, word counting, list creation + +#### **InfoAgent** +- Provides general information and utilities +- Tools: `get_current_time()`, `get_help_info()` +- Handles: Time queries, help requests, general information + +### 3. Custom Tools + +New tools have been created in `ai/tools.py`: + +- **get_current_time()**: Returns current date and time +- **calculate()**: Safely evaluates mathematical expressions +- **format_text()**: Formats text (uppercase, lowercase, title, reverse) +- **count_words()**: Counts words, characters, and sentences +- **create_list()**: Creates formatted lists from text +- **get_help_info()**: Provides information about available tools + +### 4. Streaming Implementation + +The streaming mechanism has been updated to work with ADK events: + +**Before (OpenAI):** +```python +for event in returned_message: + if event.type == "response.output_text.delta": + streamer.append(markdown_text=f"{event.delta}") +``` + +**After (ADK):** +```python +async for event in call_llm(messages_in_thread, user_id=user_id, session_id=thread_ts): + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + streamer.append(markdown_text=part.text) +``` + +### 5. Session Management + +ADK uses session-based conversation management: +- Each user/thread combination gets a unique session +- Sessions maintain conversation history automatically +- No need to manually pass full conversation history + +## File Changes + +### New Files +- `ai/agents.py` - Multi-agent system definition +- `ai/tools.py` - Custom tools for agents +- `MIGRATION_GUIDE.md` - This file + +### Modified Files +- `requirements.txt` - Updated dependencies +- `ai/llm_caller.py` - Rewritten to use ADK +- `listeners/assistant/message.py` - Updated for ADK streaming +- `listeners/events/app_mentioned.py` - Updated for ADK streaming +- `.env.sample` - Updated environment variables +- `README.md` - Updated documentation + +## Setup Instructions + +### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +### 2. Set Up Google Cloud Authentication + +Choose one of the following methods: + +#### Option A: Service Account (Recommended for Production) +1. Create a Google Cloud project +2. Enable Vertex AI API +3. Create a service account and download JSON key +4. Set in `.env`: +``` +GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json +``` + +#### Option B: API Key (Simpler for Development) +1. Get API key from [Google AI Studio](https://aistudio.google.com/app/apikey) +2. Set in `.env`: +``` +GOOGLE_API_KEY=YOUR_GOOGLE_API_KEY +``` + +#### Option C: Application Default Credentials +```bash +gcloud auth application-default login +``` + +### 3. Run the Application + +```bash +python3 app.py +``` + +## Testing the Multi-Agent System + +### Test MathAgent +Send messages like: +- "Calculate 25 * 4 + 10" +- "What is 100 divided by 5?" +- "Solve 2 + 2" + +### Test TextAgent +Send messages like: +- "Format 'hello world' in uppercase" +- "Count words in: The quick brown fox jumps over the lazy dog" +- "Create a list from: apples, oranges, bananas" + +### Test InfoAgent +Send messages like: +- "What time is it?" +- "What can you do?" +- "Help me understand your capabilities" + +### Test Coordinator +Send general messages: +- "Hello, how are you?" +- "Tell me about yourself" +- The coordinator will handle these directly or route to appropriate agents + +## Key Benefits + +1. **Modularity**: Each agent has a specific responsibility +2. **Extensibility**: Easy to add new agents and tools +3. **Scalability**: Agents can be deployed independently +4. **Maintainability**: Clear separation of concerns +5. **Tool Integration**: Rich ecosystem of tools available +6. **Session Management**: Built-in conversation history + +## Troubleshooting + +### Authentication Issues +- Ensure Google Cloud credentials are properly set +- Check that Vertex AI API is enabled in your project +- Verify API key is valid (if using API key method) + +### Streaming Issues +- ADK events are asynchronous - ensure async/await is properly used +- Check that event loop is created correctly in synchronous contexts + +### Agent Routing Issues +- Verify agent descriptions are clear and distinct +- Check that coordinator instructions properly describe when to use each agent +- Review agent logs to see routing decisions + +## Future Enhancements + +Potential improvements to consider: + +1. **Add More Specialized Agents**: + - SearchAgent for web searches + - DataAgent for database queries + - CodeAgent for code generation + +2. **Enhance Tools**: + - Add external API integrations + - Implement more complex calculations + - Add file processing capabilities + +3. **Improve Routing**: + - Implement more sophisticated routing logic + - Add agent selection confidence scores + - Enable multi-agent collaboration + +4. **Add Observability**: + - Integrate with Google Cloud Trace + - Add custom logging for agent decisions + - Implement performance monitoring + +## Resources + +- [Google ADK Documentation](https://google.github.io/adk-docs/) +- [Multi-Agent Systems Guide](https://google.github.io/adk-docs/agents/multi-agents/) +- [Function Tools Documentation](https://google.github.io/adk-docs/tools/function-tools/) +- [Slack AI Apps Documentation](https://docs.slack.dev/ai/) + diff --git a/docs/SLACK_AGENT_STATUS.md b/docs/SLACK_AGENT_STATUS.md new file mode 100644 index 0000000..546a271 --- /dev/null +++ b/docs/SLACK_AGENT_STATUS.md @@ -0,0 +1,961 @@ +# Implementing Real-Time Agent Status Updates in Slack AI Assistants + +This document provides a comprehensive technical guide for implementing real-time status updates in Slack AI assistants using Google ADK (Agent Development Kit) event streams. + +## Table of Contents + +1. [Overview](#overview) +2. [Architecture](#architecture) +3. [Dependencies](#dependencies) +4. [Understanding Slack's Status API](#understanding-slacks-status-api) +5. [Understanding ADK Events](#understanding-adk-events) +6. [Implementation](#implementation) +7. [Event Flow](#event-flow) +8. [Troubleshooting](#troubleshooting) +9. [Best Practices](#best-practices) + +## Overview + +By default, Slack AI assistants show generic loading states ("thinking...", "evaluating...", "analyzing..."). This implementation enhances the user experience by displaying real-time information about: + +- Which agent is currently processing the request +- What tools are being invoked +- When control transfers between agents + +### What This Achieves + +**Before:** +``` +Status: thinking... +Loading: "Teaching the hamsters to type faster..." + "Untangling the internet cables..." +``` + +**After:** +``` +Status: is working... +Loading: "CoordinatorAgent is working..." + "Consulting MathAgent..." + "Using Calculate..." +``` + +## Architecture + +### High-Level Flow + +``` +User Message + ↓ +Slack Event Handler + ↓ +LLM Caller (ai/llm_caller.py) + ↓ +ADK Runner.run_async() + ↓ +Event Stream (yields events) + ↓ +Event Processor (detects agents/tools) + ↓ +Status/Content Events + ↓ +Slack Status Update + Response Streaming +``` + +### Component Responsibilities + +| Component | File | Responsibility | +|-----------|------|----------------| +| Event Handler | `listeners/assistant/message.py` | Receives Slack events, manages event loop | +| Event Handler | `listeners/events/app_mentioned.py` | Handles @mentions | +| LLM Caller | `ai/llm_caller.py` | Processes ADK events, yields status/content | +| ADK Runner | Google ADK SDK | Executes agent system, emits events | +| Agents | `ai/agents.py` | Define agent hierarchy and tools | + +## Dependencies + +### Required Python Packages + +```python +# Slack SDK +slack-bolt>=1.18.0 +slack-sdk>=3.23.0 + +# Google ADK +google-genai>=1.0.0 # Includes ADK + +# Standard library (no installation needed) +asyncio +logging +typing +``` + +### Environment Variables + +```bash +# Slack Configuration +SLACK_BOT_TOKEN=xoxb-... +SLACK_APP_TOKEN=xapp-... + +# Google ADK Authentication (choose one) +GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json # Option 1 +GOOGLE_API_KEY=your-api-key # Option 2 +# Or use: gcloud auth application-default login # Option 3 +``` + +### Slack App Scopes Required + +``` +app_mentions:read +assistant:write +channels:history +channels:read +chat:write +groups:history +groups:read +im:history +im:read +mpim:history +mpim:read +``` + +## Understanding Slack's Status API + +### The `assistant.threads.setStatus` Method + +```python +client.assistant_threads_setStatus( + channel_id: str, # Required: Channel containing the thread + thread_ts: str, # Required: Thread timestamp + status: str, # Required: Status text (may be translated by Slack) + loading_messages: Optional[List[str]] = None # Optional: Custom rotating messages +) +``` + +### Key Behaviors + +1. **Status Translation**: Slack translates custom `status` values to predefined ones: + - Your input: `"Processing your request..."` + - Slack displays: `"thinking..."` or `"evaluating..."` + +2. **Loading Messages**: These are **not** translated and display exactly as provided: + - Your input: `loading_messages=["MathAgent is working..."]` + - Slack displays: `"MathAgent is working..."` + +3. **Display Format**: + ``` + + + ``` + Example: + ``` + MyBot is working... + Consulting MathAgent... + ``` + +4. **Auto-Clearing**: Status automatically clears when: + - The app sends a reply message + - You send an empty string: `status=""` + +5. **Update Frequency**: You can call `setStatus` multiple times during processing to update the loading message in real-time. + +## Understanding ADK Events + +### Event Structure + +ADK's `runner.run_async()` yields events during agent execution. Each event contains: + +```python +class Event: + author: str # Agent name or "user" + content: Content | None # Message content with parts + event_id: str # Unique event identifier + invocation_id: str # Correlates events in one interaction + partial: bool # True for streaming chunks + actions: Actions | None # State changes, transfers, etc. +``` + +### Key Event Properties + +#### 1. `event.author` +Identifies who created the event: +```python +"user" # User's input message +"CoordinatorAgent" # Response from coordinator +"MathAgent" # Response from specialized agent +``` + +#### 2. `event.content` +Contains the actual message content: +```python +event.content.parts # List of Part objects +part.text # Text content of the part +``` + +#### 3. `event.get_function_calls()` +Returns tool invocations: +```python +function_calls = event.get_function_calls() +for func_call in function_calls: + func_call.name # e.g., "calculate", "format_text" + func_call.args # Tool arguments +``` + +#### 4. `event.actions` +Contains control flow information: +```python +event.actions.transfer_to_agent # Target agent for handoff +event.actions.state_delta # State changes +event.actions.artifact_delta # Artifact updates +``` + +### Event Lifecycle Example + +For request: "What is 25 times 4?" + +```python +# Event 1: User input echo +Event(author="user", content="What is 25 times 4?") + +# Event 2: Coordinator receives request +Event(author="CoordinatorAgent", content=None) + +# Event 3: Coordinator decides to transfer +Event(author="CoordinatorAgent", + actions=Actions(transfer_to_agent="MathAgent")) + +# Event 4: MathAgent takes over +Event(author="MathAgent", content=None) + +# Event 5: MathAgent requests tool +Event(author="MathAgent", + function_calls=[FunctionCall(name="calculate", args={"expr": "25*4"})]) + +# Event 6: Tool result +Event(author="function", content="100") + +# Event 7: MathAgent starts response +Event(author="MathAgent", content="The result", partial=True) + +# Event 8: Continues streaming +Event(author="MathAgent", content=" is 100", partial=True) + +# Event 9: Final response +Event(author="MathAgent", content=".", partial=False) +``` + +## Implementation + +### Step 1: Modify `ai/llm_caller.py` + +#### Add Dependencies + +```python +import logging +from typing import Dict, List, AsyncGenerator, Literal + +from google.adk.runners import InMemoryRunner +from google.genai import types + +from .agents import get_root_agent + +logger = logging.getLogger(__name__) +``` + +#### Update Function Signature + +Change from yielding strings to yielding event dictionaries: + +```python +async def call_llm( + messages_in_thread: List[Dict[str, str]], + user_id: str = "default_user", + session_id: str = None, +) -> AsyncGenerator[Dict[str, str], None]: + """ + Call the ADK multi-agent system and stream responses with status updates. + + Args: + messages_in_thread: List of message dictionaries with 'role' and 'content' keys + user_id: The Slack user ID + session_id: Optional session ID for maintaining conversation context + + Yields: + Dict[str, str]: Events with 'type' (either 'status' or 'content') and 'text' keys + - status events: Updates about what the agent is doing (tool calls, agent transfers) + - content events: Actual response text to display to the user + """ +``` + +#### Implement Event Processing + +```python +async def call_llm(...) -> AsyncGenerator[Dict[str, str], None]: + runner = get_adk_runner() + + # Session management (existing code) + if session_id is None: + session_id = f"session_{user_id}" + + try: + await runner.session_service.create_session( + app_name=_app_name, user_id=user_id, session_id=session_id + ) + except Exception: + pass + + # Convert messages to ADK format (existing code) + last_message = ( + messages_in_thread[-1] + if messages_in_thread + else {"role": "user", "content": ""} + ) + + new_message = types.Content( + role="user", parts=[types.Part(text=last_message["content"])] + ) + + # Track state + current_agent = None + has_started_streaming_content = False + + # Process event stream + async for event in runner.run_async( + user_id=user_id, session_id=session_id, new_message=new_message + ): + # Optional: Log for debugging + logger.info(f"ADK Event - Author: {event.author}, Has content: {bool(event.content)}") + + # DETECTION 1: Agent changes/transfers + if event.author and event.author != "user" and event.author != current_agent: + current_agent = event.author + if not has_started_streaming_content: + status_text = f"{current_agent} is working..." + logger.info(f"Yielding status: {status_text}") + yield {"type": "status", "text": status_text} + + # DETECTION 2: Tool calls (function calls) + function_calls = ( + event.get_function_calls() if hasattr(event, "get_function_calls") else [] + ) + logger.info(f"Function calls detected: {len(function_calls)}") + if function_calls and not has_started_streaming_content: + for func_call in function_calls: + tool_name = ( + func_call.name if hasattr(func_call, "name") else str(func_call) + ) + # Make tool names more readable + readable_name = tool_name.replace("_", " ").title() + status_text = f"Using {readable_name}..." + logger.info(f"Yielding tool status: {status_text}") + yield {"type": "status", "text": status_text} + + # DETECTION 3: Explicit agent transfers + if hasattr(event, "actions") and event.actions: + logger.info(f"Event has actions: {event.actions}") + if ( + hasattr(event.actions, "transfer_to_agent") + and event.actions.transfer_to_agent + ): + target_agent = event.actions.transfer_to_agent + status_text = f"Consulting {target_agent}..." + logger.info(f"Yielding transfer status: {status_text}") + yield {"type": "status", "text": status_text} + + # DETECTION 4: Stream content + if event.content and event.content.parts and event.author != "user": + for part in event.content.parts: + if hasattr(part, "text") and part.text: + has_started_streaming_content = True + yield {"type": "content", "text": part.text} +``` + +#### Key Implementation Details + +1. **`has_started_streaming_content` flag**: Prevents status updates after response text starts streaming. This avoids interrupting the user's reading experience. + +2. **`current_agent` tracking**: Only yields status when agent changes to avoid redundant updates. + +3. **Tool name formatting**: Converts `calculate` → `Calculate`, `format_text` → `Format Text` for better readability. + +4. **Hasattr checks**: Safely checks for optional attributes that may not exist on all event types. + +### Step 2: Update Event Handlers + +#### For Assistant Thread Messages (`listeners/assistant/message.py`) + +```python +def message( + client: WebClient, + context: BoltContext, + logger: Logger, + payload: dict, + say: Say, + set_status: SetStatus, +): + try: + channel_id = payload["channel"] + team_id = context.team_id + thread_ts = payload["thread_ts"] + user_id = context.user_id + + # Set initial status + set_status(status="is thinking...") + + # Fetch conversation history (existing code) + replies = client.conversations_replies( + channel=context.channel_id, + ts=context.thread_ts, + oldest=context.thread_ts, + limit=10, + ) + messages_in_thread: List[Dict[str, str]] = [] + for message in replies["messages"]: + role = "user" if message.get("bot_id") is None else "assistant" + messages_in_thread.append({"role": role, "content": message["text"]}) + + # Create event loop for async operations + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Initialize Slack streaming + streamer = client.chat_stream( + channel=channel_id, + recipient_team_id=team_id, + recipient_user_id=user_id, + thread_ts=thread_ts, + ) + + # Stream ADK response with dynamic status updates + async def stream_response(): + async for event in call_llm( + messages_in_thread, user_id=user_id, session_id=thread_ts + ): + if event["type"] == "status": + # Update loading message with real-time agent activity + logger.info(f"Setting Slack loading message to: {event['text']}") + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is working...", # Generic status (may be translated) + loading_messages=[event["text"]], # Specific detail (shows as-is) + ) + elif event["type"] == "content": + # Stream actual response content + if event["text"]: + streamer.append(markdown_text=event["text"]) + + loop.run_until_complete(stream_response()) + loop.close() + + # Add feedback block and stop streaming + feedback_block = create_feedback_block() + streamer.stop(blocks=feedback_block) + + except Exception as e: + logger.exception(f"Failed to handle a user message event: {e}") + say(f":warning: Something went wrong! ({e})") +``` + +#### For App Mentions (`listeners/events/app_mentioned.py`) + +```python +def app_mentioned_callback(client: WebClient, event: dict, logger: Logger, say: Say): + try: + channel_id = event.get("channel") + team_id = event.get("team") + text = event.get("text") + thread_ts = event.get("thread_ts") or event.get("ts") + user_id = event.get("user") + + # Set initial status + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is thinking...", + loading_messages=["Starting to process your request..."], + ) + + # Create event loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Initialize streaming + streamer = client.chat_stream( + channel=channel_id, + recipient_team_id=team_id, + recipient_user_id=user_id, + thread_ts=thread_ts, + ) + + # Stream response with dynamic status + async def stream_response(): + async for event in call_llm( + [{"role": "user", "content": text}], + user_id=user_id, + session_id=thread_ts, + ): + if event["type"] == "status": + logger.info(f"Setting Slack loading message to: {event['text']}") + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is working...", + loading_messages=[event["text"]], + ) + elif event["type"] == "content": + if event["text"]: + streamer.append(markdown_text=event["text"]) + + loop.run_until_complete(stream_response()) + loop.close() + + feedback_block = create_feedback_block() + streamer.stop(blocks=feedback_block) + + except Exception as e: + logger.exception(f"Failed to handle a user message event: {e}") + say(f":warning: Something went wrong! ({e})") +``` + +## Event Flow + +### Complete Request-Response Cycle + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ 1. User sends message in Slack │ +└───────────────────────────────┬─────────────────────────────────┘ + │ +┌───────────────────────────────▼─────────────────────────────────┐ +│ 2. Slack Event Handler receives message │ +│ - Calls client.assistant_threads_setStatus() │ +│ - Status: "is thinking..." │ +│ - Loading: "Starting to process..." │ +└───────────────────────────────┬─────────────────────────────────┘ + │ +┌───────────────────────────────▼─────────────────────────────────┐ +│ 3. call_llm() invoked with message history │ +│ - Creates ADK session │ +│ - Calls runner.run_async() │ +└───────────────────────────────┬─────────────────────────────────┘ + │ +┌───────────────────────────────▼─────────────────────────────────┐ +│ 4. ADK Event Stream begins │ +│ │ +│ Event 1: author="CoordinatorAgent" │ +│ └─> Yield: {"type": "status", "text": "CoordinatorAgent..."}│ +│ └─> Update Slack: loading_messages=["CoordinatorAgent..."] │ +│ │ +│ Event 2: actions.transfer_to_agent="MathAgent" │ +│ └─> Yield: {"type": "status", "text": "Consulting Math..."}│ +│ └─> Update Slack: loading_messages=["Consulting Math..."] │ +│ │ +│ Event 3: author="MathAgent" │ +│ └─> Yield: {"type": "status", "text": "MathAgent..."} │ +│ └─> Update Slack: loading_messages=["MathAgent..."] │ +│ │ +│ Event 4: function_call="calculate" │ +│ └─> Yield: {"type": "status", "text": "Using Calculate..."} │ +│ └─> Update Slack: loading_messages=["Using Calculate..."] │ +│ │ +│ Event 5: content="The result is 100" │ +│ └─> Yield: {"type": "content", "text": "The result is 100"} │ +│ └─> Stream to Slack: streamer.append("The result is 100") │ +│ └─> Set flag: has_started_streaming_content = True │ +│ │ +└───────────────────────────────┬─────────────────────────────────┘ + │ +┌───────────────────────────────▼─────────────────────────────────┐ +│ 5. Stream completes │ +│ - streamer.stop() called │ +│ - Status automatically cleared by Slack │ +│ - User sees complete response │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## Troubleshooting + +### Issue: Custom status shows as "thinking", "evaluating", etc. + +**Cause**: Slack translates the `status` parameter to predefined values. + +**Solution**: Use `loading_messages` parameter for custom text: +```python +# ❌ This gets translated +status="MathAgent is working..." + +# ✅ This displays as-is +status="is working...", +loading_messages=["MathAgent is working..."] +``` + +### Issue: Status updates don't appear + +**Possible Causes:** + +1. **Missing scope**: Ensure `assistant:write` scope is added to your Slack app +2. **Wrong parameters**: Verify `channel_id` and `thread_ts` are correct +3. **Status cleared too quickly**: Check if response starts streaming immediately + +**Debug Steps:** +```python +# Add logging +logger.info(f"Setting status - channel: {channel_id}, thread: {thread_ts}") +logger.info(f"Status text: {status}, Loading: {loading_messages}") +``` + +### Issue: No agent or tool names detected + +**Possible Causes:** + +1. **Events not exposing expected attributes**: ADK version mismatch +2. **Agents responding without intermediate events**: Fast responses skip tool calls + +**Debug Steps:** +```python +# Log all event attributes +logger.info(f"Event attributes: {dir(event)}") +logger.info(f"Event author: {event.author}") +logger.info(f"Has function_calls: {hasattr(event, 'get_function_calls')}") +``` + +### Issue: Async loop errors + +**Error**: `RuntimeError: This event loop is already running` + +**Cause**: Trying to create nested event loops. + +**Solution**: Ensure you create a fresh loop: +```python +loop = asyncio.new_event_loop() +asyncio.set_event_loop(loop) +try: + loop.run_until_complete(stream_response()) +finally: + loop.close() +``` + +## Best Practices + +### 1. Stop Status Updates Before Content Streaming + +Once you start streaming actual response content, stop sending status updates: + +```python +has_started_streaming_content = False + +if event["type"] == "content": + has_started_streaming_content = True + +if event["type"] == "status" and not has_started_streaming_content: + # Only update status before content starts + update_slack_status(event["text"]) +``` + +**Rationale**: Avoids disrupting the user's reading experience with status changes. + +### 2. Make Tool Names Human-Readable + +Transform technical function names: + +```python +tool_name = "calculate" +readable_name = tool_name.replace("_", " ").title() # "Calculate" + +tool_name = "format_text" +readable_name = tool_name.replace("_", " ").title() # "Format Text" +``` + +### 3. Use Descriptive Status Messages + +Be specific about what's happening: + +```python +# ❌ Generic +"Processing..." + +# ✅ Specific +"MathAgent is calculating..." +"Using Format Text tool..." +"Consulting InfoAgent for current time..." +``` + +### 4. Handle Errors Gracefully + +Always clear status on errors: + +```python +try: + # Process events + async for event in call_llm(...): + # Handle event +except Exception as e: + logger.exception(f"Error: {e}") + # Clear status + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="", # Empty string clears status + ) + # Notify user + say(f":warning: Something went wrong!") +``` + +### 5. Log Extensively During Development + +Add detailed logging to understand event flow: + +```python +logger.info(f"ADK Event - Author: {event.author}") +logger.info(f"Has content: {bool(event.content)}") +logger.info(f"Function calls: {len(function_calls)}") +logger.info(f"Setting loading message to: {status_text}") +``` + +Remove or reduce verbosity in production. + +### 6. Consider Rate Limiting + +Slack API has rate limits. Avoid updating status too frequently: + +```python +import time + +last_status_update = 0 +MIN_UPDATE_INTERVAL = 0.5 # seconds + +current_time = time.time() +if current_time - last_status_update >= MIN_UPDATE_INTERVAL: + client.assistant_threads_setStatus(...) + last_status_update = current_time +``` + +### 7. Test with Different Agent Scenarios + +Ensure your implementation works across different paths: + +```python +# Math request (should show MathAgent + calculate tool) +"What is 25 times 4?" + +# Text request (should show TextAgent + count_words tool) +"Count the words in this sentence" + +# Info request (should show InfoAgent + get_current_time tool) +"What time is it?" + +# Direct response (should show only CoordinatorAgent) +"Hello!" +``` + +## Example Output Scenarios + +### Scenario 1: Math Calculation + +**User**: "What is 15 * 8?" + +**Status Updates**: +``` +1. is thinking... + "Starting to process your request..." + +2. is working... + "CoordinatorAgent is working..." + +3. is working... + "Consulting MathAgent..." + +4. is working... + "MathAgent is working..." + +5. is working... + "Using Calculate..." + +6. [Status cleared, streaming response] + "The result of 15 * 8 is 120." +``` + +### Scenario 2: Text Processing + +**User**: "Count the words in this sentence please" + +**Status Updates**: +``` +1. is thinking... + "Starting to process your request..." + +2. is working... + "CoordinatorAgent is working..." + +3. is working... + "Consulting TextAgent..." + +4. is working... + "TextAgent is working..." + +5. is working... + "Using Count Words..." + +6. [Status cleared, streaming response] + "There are 6 words in your sentence." +``` + +### Scenario 3: General Query + +**User**: "Hello!" + +**Status Updates**: +``` +1. is thinking... + "Starting to process your request..." + +2. is working... + "CoordinatorAgent is working..." + +3. [Status cleared, streaming response] + "Hello! How can I help you today?" +``` + +## Performance Considerations + +### Event Processing Overhead + +- Each status update makes an API call to Slack +- Typical latency: 50-200ms per call +- Impact: Minimal for most use cases + +### Optimization Strategies + +1. **Batch rapid updates**: If multiple events occur within 500ms, only send the latest +2. **Skip redundant updates**: Track last status sent, only update if changed +3. **Prioritize content**: Once streaming starts, ignore status events + +### Memory Usage + +- Event objects are processed in streaming fashion +- No large accumulation of events in memory +- Session history stored in ADK's session service + +## Security Considerations + +### 1. Sanitize Status Messages + +Never include sensitive data in status updates: + +```python +# ❌ Dangerous +status_text = f"Querying database for user {user_email}..." + +# ✅ Safe +status_text = "Querying database..." +``` + +### 2. Validate Thread Context + +Ensure the bot has access to the channel/thread: + +```python +try: + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is working...", + ) +except SlackApiError as e: + if e.response["error"] == "channel_not_found": + logger.error("Bot not in channel") + return +``` + +### 3. Rate Limit Per User + +Prevent abuse by rate limiting status updates per user: + +```python +from collections import defaultdict +import time + +user_status_times = defaultdict(list) + +def can_update_status(user_id: str, max_per_minute: int = 60) -> bool: + now = time.time() + user_times = user_status_times[user_id] + # Remove times older than 1 minute + user_times[:] = [t for t in user_times if now - t < 60] + + if len(user_times) >= max_per_minute: + return False + + user_times.append(now) + return True +``` + +## Extending the Implementation + +### Adding Custom Event Types + +Extend the event dictionary to include more information: + +```python +# In ai/llm_caller.py +yield { + "type": "status", + "text": status_text, + "agent": current_agent, # Additional metadata + "tool": tool_name, # Additional metadata +} + +# In event handler +if event["type"] == "status": + # Use additional metadata for custom logic + if event.get("tool") == "calculate": + # Special handling for calculations + pass +``` + +### Supporting Multiple Languages + +Translate status messages based on user locale: + +```python +TRANSLATIONS = { + "en": { + "working": "is working...", + "consulting": "Consulting {agent}...", + "using_tool": "Using {tool}...", + }, + "es": { + "working": "está trabajando...", + "consulting": "Consultando {agent}...", + "using_tool": "Usando {tool}...", + }, +} + +def get_status_text(key: str, locale: str, **kwargs) -> str: + template = TRANSLATIONS.get(locale, TRANSLATIONS["en"])[key] + return template.format(**kwargs) +``` + +### Adding Progress Indicators + +Show numerical progress for long operations: + +```python +total_steps = 5 +current_step = 0 + +async for event in runner.run_async(...): + if event["type"] == "status": + current_step += 1 + progress = f"[{current_step}/{total_steps}] {event['text']}" + yield {"type": "status", "text": progress} +``` + +## Conclusion + +This implementation provides users with transparency into the AI agent's decision-making process, enhancing trust and user experience. By leveraging ADK's event system and Slack's loading messages, you can create a sophisticated real-time status update system without significant performance overhead. + +For questions or issues, refer to: +- [Slack API Documentation](https://api.slack.com/methods/assistant.threads.setStatus) +- [Google ADK Documentation](https://google.github.io/adk-docs/) +- [Project CLAUDE.md](./CLAUDE.md) for codebase-specific guidance diff --git a/listeners/assistant/message.py b/listeners/assistant/message.py index b61de90..440112b 100644 --- a/listeners/assistant/message.py +++ b/listeners/assistant/message.py @@ -1,3 +1,4 @@ +import asyncio from logging import Logger from typing import Dict, List @@ -34,16 +35,8 @@ def message( thread_ts = payload["thread_ts"] user_id = context.user_id - set_status( - status="thinking...", - loading_messages=[ - "Teaching the hamsters to type faster…", - "Untangling the internet cables…", - "Consulting the office goldfish…", - "Polishing up the response just for you…", - "Convincing the AI to stop overthinking…", - ], - ) + # Set initial status - will be updated dynamically based on agent activity + set_status(status="is thinking...") replies = client.conversations_replies( channel=context.channel_id, @@ -56,7 +49,9 @@ def message( role = "user" if message.get("bot_id") is None else "assistant" messages_in_thread.append({"role": role, "content": message["text"]}) - returned_message = call_llm(messages_in_thread) + # Create event loop for async call + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) streamer = client.chat_stream( channel=channel_id, @@ -65,13 +60,30 @@ def message( thread_ts=thread_ts, ) - # Loop over OpenAI response stream - # https://platform.openai.com/docs/api-reference/responses/create - for event in returned_message: - if event.type == "response.output_text.delta": - streamer.append(markdown_text=f"{event.delta}") - else: - continue + # Stream ADK response with dynamic status updates + async def stream_response(): + async for event in call_llm( + messages_in_thread, user_id=user_id, session_id=thread_ts + ): + if event["type"] == "status": + # Update the loading message with real-time agent activity + logger.info(f"Setting Slack loading message to: {event['text']}") + # Use client API directly to access loading_messages parameter + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is thinking...", # The status below text box which typically shows "is typing..." for other users + loading_messages=[ + event["text"] + ], # The text that shows next to the app name shimmer. Here you can put custom text from you Agent events. + ) + elif event["type"] == "content": + # Stream the actual response content + if event["text"]: + streamer.append(markdown_text=event["text"]) + + loop.run_until_complete(stream_response()) + loop.close() feedback_block = create_feedback_block() streamer.stop(blocks=feedback_block) diff --git a/listeners/events/app_mentioned.py b/listeners/events/app_mentioned.py index 6ec8d25..cea92ae 100644 --- a/listeners/events/app_mentioned.py +++ b/listeners/events/app_mentioned.py @@ -1,3 +1,4 @@ +import asyncio from logging import Logger from slack_bolt import Say @@ -25,20 +26,18 @@ def app_mentioned_callback(client: WebClient, event: dict, logger: Logger, say: thread_ts = event.get("thread_ts") or event.get("ts") user_id = event.get("user") + # Set initial status with placeholder loading messages + # Will be updated dynamically based on agent activity client.assistant_threads_setStatus( channel_id=channel_id, thread_ts=thread_ts, - status="thinking...", - loading_messages=[ - "Teaching the hamsters to type faster…", - "Untangling the internet cables…", - "Consulting the office goldfish…", - "Polishing up the response just for you…", - "Convincing the AI to stop overthinking…", - ], + status="is thinking...", + loading_messages=["Starting to process your request..."], ) - returned_message = call_llm([{"role": "user", "content": text}]) + # Create event loop for async call + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) streamer = client.chat_stream( channel=channel_id, @@ -47,13 +46,30 @@ def app_mentioned_callback(client: WebClient, event: dict, logger: Logger, say: thread_ts=thread_ts, ) - # Loop over OpenAI response stream - # https://platform.openai.com/docs/api-reference/responses/create - for event in returned_message: - if event.type == "response.output_text.delta": - streamer.append(markdown_text=f"{event.delta}") - else: - continue + # Stream ADK response with dynamic status updates + async def stream_response(): + async for event in call_llm( + [{"role": "user", "content": text}], + user_id=user_id, + session_id=thread_ts, + ): + if event["type"] == "status": + # Update the loading message with real-time agent activity + # Keep generic status but show detailed info in loading_messages + logger.info(f"Setting Slack loading message to: {event['text']}") + client.assistant_threads_setStatus( + channel_id=channel_id, + thread_ts=thread_ts, + status="is working...", # Generic status + loading_messages=[event["text"]], # Specific detail + ) + elif event["type"] == "content": + # Stream the actual response content + if event["text"]: + streamer.append(markdown_text=event["text"]) + + loop.run_until_complete(stream_response()) + loop.close() feedback_block = create_feedback_block() streamer.stop(blocks=feedback_block) diff --git a/requirements.txt b/requirements.txt index bd5e197..7438e89 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,12 @@ slack-sdk==3.37.0 slack-bolt==1.26.0 -# If you use a different LLM vendor, replace this dependency -openai +# Google ADK for multi-agent system +google-adk +google-genai + +# LiteLLM for OpenAI model integration with ADK +litellm pytest ruff==0.14.3