From e672af6b3d564ffc9d949e653cf6bc6146d1a97c Mon Sep 17 00:00:00 2001 From: sreedharsreeram <141047751+sreedharsreeram@users.noreply.github.com> Date: Wed, 15 Apr 2026 16:27:23 +0000 Subject: [PATCH] Supermemory-Cartesia SDK (#744) ### TL;DR Added Python SDK for integrating Supermemory with Cartesia Line voice agents, enabling persistent memory capabilities. ### What changed? Created a new Python SDK package (`supermemory_cartesia`) that provides: - `SupermemoryCartesiaAgent` wrapper class that enhances Cartesia Line agents with memory capabilities - Memory retrieval and storage functionality that integrates with the Supermemory API - Utility functions for memory formatting, deduplication, and time formatting - Custom exception classes for error handling - Comprehensive documentation and type hints The implementation includes: - Memory enrichment for user queries - Automatic storage of conversation history - Configurable memory retrieval modes (profile, query, full) - Background processing to avoid blocking the main conversation flow ### How to test? ```python from supermemory_cartesia import SupermemoryCartesiaAgent from line.llm_agent import LlmAgent, LlmConfig import os # Create base LLM agent base_agent = LlmAgent( model="gemini/gemini-2.5-flash-preview-09-2025", config=LlmConfig( system_prompt="You are a helpful assistant.", introduction="Hello!" ) ) # Wrap with Supermemory memory_agent = SupermemoryCartesiaAgent( agent=base_agent, api_key=os.getenv("SUPERMEMORY_API_KEY"), user_id="user-123", ) # Use memory_agent in your Cartesia Line application ``` ### Why make this change? This SDK enables Cartesia Line voice agents to maintain persistent memory across conversations, enhancing user experience by: 1. Providing contextual awareness of past interactions 2. Remembering user preferences and important information 3. Reducing repetition in conversations 4. Creating more personalized and natural voice interactions The integration is designed to be lightweight and non-blocking, ensuring that memory operations don't impact the responsiveness of voice interactions. --- apps/docs/integrations/cartesia.mdx | 345 ++++++++++++++ packages/cartesia-sdk-python/README.md | 232 +++++++++ packages/cartesia-sdk-python/pyproject.toml | 78 +++ packages/cartesia-sdk-python/requirements.txt | 5 + .../src/supermemory_cartesia/__init__.py | 67 +++ .../src/supermemory_cartesia/agent.py | 446 ++++++++++++++++++ .../src/supermemory_cartesia/exceptions.py | 58 +++ .../src/supermemory_cartesia/utils.py | 134 ++++++ 8 files changed, 1365 insertions(+) create mode 100644 apps/docs/integrations/cartesia.mdx create mode 100644 packages/cartesia-sdk-python/README.md create mode 100644 packages/cartesia-sdk-python/pyproject.toml create mode 100644 packages/cartesia-sdk-python/requirements.txt create mode 100644 packages/cartesia-sdk-python/src/supermemory_cartesia/__init__.py create mode 100644 packages/cartesia-sdk-python/src/supermemory_cartesia/agent.py create mode 100644 packages/cartesia-sdk-python/src/supermemory_cartesia/exceptions.py create mode 100644 packages/cartesia-sdk-python/src/supermemory_cartesia/utils.py diff --git a/apps/docs/integrations/cartesia.mdx b/apps/docs/integrations/cartesia.mdx new file mode 100644 index 000000000..5b790545d --- /dev/null +++ b/apps/docs/integrations/cartesia.mdx @@ -0,0 +1,345 @@ +--- +title: "Cartesia" +sidebarTitle: "Cartesia (Voice)" +description: "Integrate Supermemory with Cartesia for conversational memory in voice AI agents" +icon: "/images/cartesia.svg" +--- + +Supermemory integrates with [Cartesia](https://cartesia.ai/agents), providing long-term memory capabilities for voice AI agents. Your Cartesia applications will remember past conversations and provide personalized responses based on user history. + +## Installation + +To use Supermemory with Cartesia, install the required dependencies: + +```bash +pip install supermemory-cartesia +``` + +Set up your API key as an environment variable: + +```bash +export SUPERMEMORY_API_KEY=your_supermemory_api_key +``` + +You can obtain an API key from [console.supermemory.ai](https://console.supermemory.ai). + +## Configuration + +Supermemory integration is provided through the `SupermemoryCartesiaAgent` wrapper class: + +```python +from supermemory_cartesia import SupermemoryCartesiaAgent +from line.llm_agent import LlmAgent, LlmConfig + +# Create base LLM agent +base_agent = LlmAgent( + model="anthropic/claude-haiku-4-5-20251001", + api_key=os.getenv("ANTHROPIC_API_KEY"), + config=LlmConfig( + system_prompt="""You are a helpful voice assistant with memory.""", + introduction="Hello! Great to talk with you again!", + ), +) + +# Wrap with Supermemory +memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag="user-123", + custom_id="session-456", # Required: groups all messages in same document + config=SupermemoryCartesiaAgent.MemoryConfig( + mode="full", # "profile" | "query" | "full" + search_limit=10, # Max memories to retrieve + search_threshold=0.3, # Relevance threshold (0.0-1.0) + ), +) +``` + +## Agent Wrapper Pattern + +The `SupermemoryCartesiaAgent` wraps your existing `LlmAgent` to add memory capabilities: + +```python +from line.voice_agent_app import VoiceAgentApp + +async def get_agent(env, call_request): + # Extract container_tag from call metadata (typically user ID) + container_tag = call_request.metadata.get("user_id", "default-user") + + # Create base agent + base_agent = LlmAgent(...) + + # Wrap with memory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + container_tag=container_tag, + custom_id=call_request.call_id, # Required: groups all messages in same document + ) + + return memory_agent + +# Create voice agent app +app = VoiceAgentApp(get_agent=get_agent) +``` + +## How It Works + +When integrated with Cartesia Line, Supermemory provides two key functionalities: + +### 1. Memory Retrieval + +When a `UserTurnEnded` event is detected, Supermemory retrieves relevant memories: + +- **Static Profile**: Persistent facts about the user +- **Dynamic Profile**: Recent context and preferences +- **Search Results**: Semantically relevant past memories + +### 2. Context Enhancement + +Retrieved memories are formatted and injected into the agent's system prompt before processing, giving the model awareness of past conversations. + +### 3. Background Storage + +Conversations are automatically stored in Supermemory (non-blocking) for future retrieval. + +## Memory Modes + +| Mode | Static Profile | Dynamic Profile | Search Results | Use Case | +| ----------- | -------------- | --------------- | -------------- | ------------------------------ | +| `"profile"` | Yes | Yes | No | Personalization without search | +| `"query"` | No | No | Yes | Finding relevant past context | +| `"full"` | Yes | Yes | Yes | Complete memory (default) | + +## Configuration Options + +You can customize how memories are retrieved and used: + +### MemoryConfig + +```python +SupermemoryCartesiaAgent.MemoryConfig( + mode="full", # Memory mode (default: "full") + search_limit=10, # Max memories to retrieve (default: 10) + search_threshold=0.1, # Similarity threshold 0.0-1.0 (default: 0.1) + system_prompt="Based on previous conversations:\n\n", +) +``` + +| Parameter | Type | Default | Description | +| ------------------ | ----- | -------------------------------------- | ---------------------------------------------------------- | +| `search_limit` | int | 10 | Maximum number of memories to retrieve per query | +| `search_threshold` | float | 0.1 | Minimum similarity threshold for memory retrieval | +| `mode` | str | "full" | Memory retrieval mode: `"profile"`, `"query"`, or `"full"` | +| `system_prompt` | str | "Based on previous conversations:\n\n" | Prefix text for memory context | + +### Agent Parameters + +```python +SupermemoryCartesiaAgent( + agent=base_agent, # Required: Cartesia Line LlmAgent + container_tag="user-123", # Required: Primary container tag (e.g., user ID) + custom_id="session-456", # Required: Groups all messages in same document + add_memory="always", # Optional: "always" (default) or "never" + container_tags=["org-acme", "prod"], # Optional: Additional tags + api_key=os.getenv("SUPERMEMORY_API_KEY"), # Optional: defaults to env var + config=MemoryConfig(...), # Optional: memory configuration + base_url=None, # Optional: custom API endpoint +) +``` + +| Parameter | Type | Required | Description | +| --------------- | ------------ | -------- | ------------------------------------------------------------------ | +| `agent` | LlmAgent | **Yes** | The Cartesia Line agent to wrap | +| `container_tag` | str | **Yes** | Primary container tag for memory scoping (e.g., user ID) | +| `custom_id` | str | **Yes** | Groups all messages in the same document (e.g., call ID, conversation ID) | +| `add_memory` | str | No | Memory persistence mode: "always" (default) or "never" | +| `container_tags`| List[str] | No | Additional container tags for organization (e.g., ["org", "prod"]) | +| `api_key` | str | No | Supermemory API key (or set `SUPERMEMORY_API_KEY` env var) | +| `config` | MemoryConfig | No | Advanced configuration | +| `base_url` | str | No | Custom API endpoint | + +## Container Tags + +Container tags allow you to organize memories across multiple dimensions: + +```python +memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + container_tag="user-alice", # Primary: user ID + container_tags=["org-acme", "prod"], # Additional: organization, environment +) +``` + +Memories are stored with all tags: +```json +{ + "content": "User: What's the weather?\nAssistant: It's sunny today!", + "container_tags": ["user-alice", "org-acme", "prod"], + "metadata": { "platform": "cartesia" } +} +``` + +## Automatic Document Grouping + +The SDK **automatically groups all messages from the same conversation** into a single Supermemory document using `custom_id`: + +```python +memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + container_tag="user-alice", + custom_id=call_request.call_id, # Required: Groups all messages together +) +``` + +**How it works:** +- The `custom_id` parameter groups all messages into the same Supermemory document +- Typically you use the call ID or conversation ID from Cartesia +- All messages from that conversation are appended to the same document +- This ensures conversation continuity and proper memory generation + +## Example: Basic Voice Agent with Memory + +Here's a complete example of a Cartesia Line voice agent with Supermemory integration: + +```python +import os +from line.llm_agent import LlmAgent, LlmConfig +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +async def get_agent(env, call_request): + # Extract container_tag from call metadata (typically user ID) + container_tag = call_request.metadata.get("user_id", "default-user") + + # Create base LLM agent + base_agent = LlmAgent( + model="anthropic/claude-haiku-4-5-20251001", + api_key=os.getenv("ANTHROPIC_API_KEY"), + config=LlmConfig( + system_prompt="""You are a helpful voice assistant with memory.""", + introduction="Hello! Great to talk with you again!", + ), + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + custom_id=call_request.call_id, # Required: Groups all messages + ) + + return memory_agent + +# Create voice agent app +app = VoiceAgentApp(get_agent=get_agent) + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8000) +``` + +## Example: Advanced Agent with Tools + +Here's an example with custom tools and multi-tag support: + +```python +import os +from line.llm_agent import LlmAgent, LlmConfig +from line.tools import LoopbackTool +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +# Define custom tool +async def get_weather(location: str) -> str: + return f"The weather in {location} is sunny, 72°F" + +weather_tool = LoopbackTool( + name="get_weather", + description="Get current weather for a location", + function=get_weather +) + +async def get_agent(env, call_request): + container_tag = call_request.metadata.get("user_id", "default-user") + org_id = call_request.metadata.get("org_id") + + # Create LLM agent with tools + base_agent = LlmAgent( + model="gemini/gemini-2.5-flash-preview-09-2025", + tools=[weather_tool], + config=LlmConfig( + system_prompt="You are a personal assistant with memory and tools.", + introduction="Hi! How can I help you today?" + ) + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + custom_id=call_request.call_id, # Required: Groups all messages + container_tags=[org_id] if org_id else None, + config=SupermemoryCartesiaAgent.MemoryConfig( + mode="full", + search_limit=15, + search_threshold=0.15, + ) + ) + + return memory_agent + +app = VoiceAgentApp(get_agent=get_agent) +``` + +## Deployment + +To deploy to Cartesia Line, create a `main.py` file in your project root: + +```python +import os +import sys + +# Add src to path for local imports +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) + +from line.llm_agent import LlmAgent, LlmConfig +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +async def get_agent(env, call_request): + """Create a memory-enabled voice agent.""" + container_tag = call_request.metadata.get("user_id", "default-user") + + base_agent = LlmAgent( + model="anthropic/claude-haiku-4-5-20251001", + api_key=os.getenv("ANTHROPIC_API_KEY"), + config=LlmConfig( + system_prompt="""You are a helpful voice assistant with memory. + You remember past conversations and can reference them naturally. + Keep responses brief and conversational.""", + introduction="Hello! Great to talk with you again!", + ), + ) + + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + custom_id=call_request.call_id, # Required: Groups all messages + ) + + return memory_agent + +app = VoiceAgentApp(get_agent=get_agent) +``` + +Then deploy with: + +```bash +cartesia deploy +``` + +Make sure to set these environment variables in your Cartesia deployment: +- `SUPERMEMORY_API_KEY` - Your Supermemory API key +- `ANTHROPIC_API_KEY` - Your Anthropic API key (or the key for your chosen LLM provider) diff --git a/packages/cartesia-sdk-python/README.md b/packages/cartesia-sdk-python/README.md new file mode 100644 index 000000000..bb446a7c9 --- /dev/null +++ b/packages/cartesia-sdk-python/README.md @@ -0,0 +1,232 @@ +# Supermemory Cartesia SDK + +Memory-enhanced voice agents with [Supermemory](https://supermemory.ai) and [Cartesia Line](https://cartesia.ai/agents). + +## Installation + +```bash +pip install supermemory-cartesia +``` + +## Quick Start + +```python +import os +from line.llm_agent import LlmAgent, LlmConfig +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +async def get_agent(env, call_request): + # Extract container_tag from call metadata (typically user ID) + container_tag = call_request.metadata.get("user_id", "default-user") + + # Create base LLM agent + base_agent = LlmAgent( + model="gemini/gemini-2.5-flash-preview-09-2025", + config=LlmConfig( + system_prompt="You are a helpful voice assistant with memory.", + introduction="Hello! Great to talk with you again!" + ) + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + custom_id=call_request.call_id, + ) + + return memory_agent + +# Create voice agent app +app = VoiceAgentApp(get_agent=get_agent) + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8000) +``` + +## Configuration + +### Parameters + +| Parameter | Type | Required | Description | +| --------------- | ------------ | -------- | ------------------------------------------------------------------ | +| `agent` | LlmAgent | **Yes** | The Cartesia Line agent to wrap | +| `container_tag` | str | **Yes** | Primary container tag for memory scoping (e.g., user ID) | +| `custom_id` | str | **Yes** | Custom ID for grouping conversation messages into a single document| +| `add_memory` | Literal | No | Memory persistence mode: "always" (default) or "never" | +| `container_tags`| List[str] | No | Additional container tags for organization (e.g., ["org", "prod"]) | +| `api_key` | str | No | Supermemory API key (or set `SUPERMEMORY_API_KEY` env var) | +| `config` | MemoryConfig | No | Advanced configuration | +| `base_url` | str | No | Custom API endpoint | + +### Advanced Configuration + +```python +from supermemory_cartesia import SupermemoryCartesiaAgent + +memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + container_tag="user-123", + custom_id="conversation-456", + add_memory="always", # "always" (default) or "never" + container_tags=["org-acme", "prod"], # Optional: additional tags + config=SupermemoryCartesiaAgent.MemoryConfig( + search_limit=10, # Max memories to retrieve + search_threshold=0.1, # Similarity threshold + mode="full", # "profile", "query", or "full" + system_prompt="Based on previous conversations, I recall:\n\n", + ), +) + +# Read-only mode - retrieve memories but don't save new ones +read_only_agent = SupermemoryCartesiaAgent( + agent=base_agent, + container_tag="user-123", + custom_id="conversation-456", + add_memory="never", # Only retrieve, don't save +) +``` + +### Memory Modes + +| Mode | Static Profile | Dynamic Profile | Search Results | +| ----------- | -------------- | --------------- | -------------- | +| `"profile"` | Yes | Yes | No | +| `"query"` | No | No | Yes | +| `"full"` | Yes | Yes | Yes | + +## How It Works + +1. **Intercepts events** - Listens for `UserTurnEnded` events from Cartesia Line +2. **Retrieves memories** - Queries Supermemory `/v4/profile` API with user's message +3. **Enriches context** - Adds memories to event history as system message +4. **Stores messages** - Sends conversation to Supermemory (background, non-blocking) +5. **Passes to agent** - Forwards enriched event to wrapped LlmAgent + +### What Gets Stored + +User and assistant messages are sent to Supermemory: + +```json +{ + "content": "User: What's the weather?\nAssistant: It's sunny today!", + "container_tags": ["user-123", "org-acme", "prod"], + "metadata": { "platform": "cartesia" } +} +``` + +## Architecture + +Cartesia Line uses an event-driven architecture: + +``` +User Speaks (Audio) + ↓ +[Ink STT] → Automatic speech recognition + ↓ +UserTurnEnded Event {content: "user message", history: [...]} + ↓ +┌──────────────────────────────────────────────┐ +│ SUPERMEMORY CARTESIA AGENT (Wrapper) │ +│ │ +│ process(env, event): │ +│ 1. Intercept UserTurnEnded │ +│ 2. Extract user message │ +│ 3. Query Supermemory API │ +│ 4. Enrich event.history with memories │ +│ 5. Pass to wrapped LlmAgent │ +│ 6. Store conversation (async background) │ +└──────────────────────────────────────────────┘ + ↓ +AgentSendText Event {text: "response"} + ↓ +[Sonic TTS] → Ultra-fast speech synthesis + ↓ +Audio Output +``` + +## Comparison with Pipecat SDK + +| Aspect | Pipecat | Cartesia Line | +| ----------------------- | ------------------------------ | ---------------------------- | +| **Integration Pattern** | Extends `FrameProcessor` | Wrapper around `LlmAgent` | +| **Event Handling** | `process_frame()` method | `process()` method | +| **Events** | `LLMContextFrame`, `LLMMessagesFrame` | `UserTurnEnded`, `CallStarted` | +| **Context Object** | `LLMContext.get_messages()` | `event.history` | +| **Memory Injection** | Modify `context.add_message()` | Modify `event.history` | + +## Full Example with Tools + +```python +import os +from line.llm_agent import LlmAgent, LlmConfig +from line.tools import LoopbackTool +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +# Define custom tools +async def get_weather(location: str) -> str: + return f"The weather in {location} is sunny, 72°F" + +weather_tool = LoopbackTool( + name="get_weather", + description="Get current weather for a location", + function=get_weather +) + +async def get_agent(env, call_request): + container_tag = call_request.metadata.get("user_id", "default-user") + org_id = call_request.metadata.get("org_id") + + # Create LLM agent with tools + base_agent = LlmAgent( + model="gemini/gemini-2.5-flash-preview-09-2025", + tools=[weather_tool], + config=LlmConfig( + system_prompt="You are a personal assistant with memory and tools.", + introduction="Hi! How can I help you today?" + ) + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + custom_id=call_request.call_id, + container_tags=[org_id] if org_id else None, + config=SupermemoryCartesiaAgent.MemoryConfig( + mode="full", + search_limit=15, + search_threshold=0.15, + ) + ) + + return memory_agent + +app = VoiceAgentApp(get_agent=get_agent) +``` + +## Development + +```bash +# Clone repository +git clone https://github.com/supermemoryai/supermemory +cd supermemory/packages/cartesia-sdk-python + +# Install in development mode +pip install -e ".[dev]" + +# Run tests +pytest + +# Format code +black . +isort . +``` + +## License + +MIT diff --git a/packages/cartesia-sdk-python/pyproject.toml b/packages/cartesia-sdk-python/pyproject.toml new file mode 100644 index 000000000..18b3b2994 --- /dev/null +++ b/packages/cartesia-sdk-python/pyproject.toml @@ -0,0 +1,78 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "supermemory-cartesia" +version = "0.1.0" +description = "Supermemory integration for Cartesia Line - memory-enhanced voice agents" +readme = "README.md" +license = "MIT" +requires-python = ">=3.10" +authors = [ + { name = "Supermemory", email = "support@supermemory.ai" } +] +keywords = [ + "supermemory", + "cartesia", + "line", + "memory", + "conversational-ai", + "llm", + "voice-ai", +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering :: Artificial Intelligence", +] +dependencies = [ + "supermemory>=3.16.0", + "cartesia-line>=0.2.0", + "pydantic>=2.10.0", + "loguru>=0.7.3", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.3.5", + "pytest-asyncio>=0.24.0", + "mypy>=1.14.1", + "black>=24.8.0", + "isort>=5.13.2", +] + +[project.urls] +Homepage = "https://supermemory.ai" +Documentation = "https://docs.supermemory.ai" +Repository = "https://github.com/supermemoryai/supermemory" + +[tool.hatch.build.targets.wheel] +packages = ["src/supermemory_cartesia"] + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", + "/README.md", + "/LICENSE", +] + +[tool.black] +line-length = 100 +target-version = ["py310"] + +[tool.isort] +profile = "black" +line_length = 100 + +[tool.mypy] +python_version = "3.10" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true diff --git a/packages/cartesia-sdk-python/requirements.txt b/packages/cartesia-sdk-python/requirements.txt new file mode 100644 index 000000000..dd4b8fe5e --- /dev/null +++ b/packages/cartesia-sdk-python/requirements.txt @@ -0,0 +1,5 @@ +# Core dependencies for supermemory-cartesia +supermemory>=3.16.0 +pydantic>=2.10.0 +loguru>=0.7.3 +cartesia-line>=0.2.0 diff --git a/packages/cartesia-sdk-python/src/supermemory_cartesia/__init__.py b/packages/cartesia-sdk-python/src/supermemory_cartesia/__init__.py new file mode 100644 index 000000000..b21511d9d --- /dev/null +++ b/packages/cartesia-sdk-python/src/supermemory_cartesia/__init__.py @@ -0,0 +1,67 @@ +"""Supermemory Cartesia SDK - Memory-enhanced voice agents with Cartesia Line. + +This package provides seamless integration between Supermemory and Cartesia Line, +enabling persistent memory and context enhancement for voice AI applications. + +Example: + ```python + from supermemory_cartesia import SupermemoryCartesiaAgent, MemoryConfig + from line.llm_agent import LlmAgent, LlmConfig + + # Create base LLM agent + base_agent = LlmAgent( + model="gemini/gemini-2.5-flash-preview-09-2025", + config=LlmConfig( + system_prompt="You are a helpful assistant.", + introduction="Hello!" + ) + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag="user-123", + ) + ``` +""" + +from .agent import SupermemoryCartesiaAgent + +# Export MemoryConfig as a top-level class for convenience +MemoryConfig = SupermemoryCartesiaAgent.MemoryConfig + +from .exceptions import ( + APIError, + ConfigurationError, + MemoryRetrievalError, + MemoryStorageError, + NetworkError, + SupermemoryCartesiaError, +) +from .utils import ( + deduplicate_memories, + format_memories_to_text, + format_relative_time, + get_last_user_message, +) + +__version__ = "0.1.0" + +__all__ = [ + # Main agent + "SupermemoryCartesiaAgent", + "MemoryConfig", + # Exceptions + "SupermemoryCartesiaError", + "ConfigurationError", + "MemoryRetrievalError", + "MemoryStorageError", + "APIError", + "NetworkError", + # Utilities + "get_last_user_message", + "deduplicate_memories", + "format_memories_to_text", + "format_relative_time", +] diff --git a/packages/cartesia-sdk-python/src/supermemory_cartesia/agent.py b/packages/cartesia-sdk-python/src/supermemory_cartesia/agent.py new file mode 100644 index 000000000..3d209aa00 --- /dev/null +++ b/packages/cartesia-sdk-python/src/supermemory_cartesia/agent.py @@ -0,0 +1,446 @@ +"""Supermemory Cartesia Line agent integration. + +This module provides a memory-enhanced agent wrapper that integrates with +Cartesia Line voice agents, adding persistent memory and context enrichment. +""" + +import asyncio +import os +import re +from typing import Any, AsyncGenerator, Dict, List, Literal, Optional + +from loguru import logger +from pydantic import BaseModel, Field + +from .exceptions import ConfigurationError, MemoryRetrievalError +from .utils import deduplicate_memories, format_memories_to_text + +try: + import supermemory +except ImportError: + supermemory = None # type: ignore + +try: + from line.events import Event +except ImportError: + Event = Any # type: ignore + +# XML tags for memory injection +MEMORY_TAG_START = "" +MEMORY_TAG_END = "" + + +class SupermemoryCartesiaAgent: + """Memory-enhanced wrapper for Cartesia Line agents. + + This wrapper intercepts UserTurnEnded events, retrieves relevant memories + from Supermemory, and enriches the conversation history before passing to + the wrapped agent. + + Example: + ```python + from line.llm_agent import LlmAgent, LlmConfig + from supermemory_cartesia import SupermemoryCartesiaAgent + + base_agent = LlmAgent( + model="anthropic/claude-haiku-4-5-20251001", + config=LlmConfig( + system_prompt="You are a helpful assistant.", + introduction="Hello! How can I help you today?" + ) + ) + + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag="user-123", + custom_id="conversation-456", + ) + ``` + """ + + class MemoryConfig(BaseModel): + """Configuration for memory retrieval. + + Attributes: + search_limit: Maximum memories to retrieve per query. + search_threshold: Minimum similarity threshold (0.0-1.0). + system_prompt: Prefix text for memory context. + mode: "profile", "query", or "full". + """ + + search_limit: int = Field(default=10, ge=1) + search_threshold: float = Field(default=0.1, ge=0.0, le=1.0) + system_prompt: str = Field(default="Based on previous conversations:\n\n") + mode: Literal["profile", "query", "full"] = Field(default="full") + + def __init__( + self, + *, + agent: Any, + api_key: Optional[str] = None, + container_tag: str, + custom_id: str, + add_memory: Literal["always", "never"] = "always", + container_tags: Optional[List[str]] = None, + config: Optional[MemoryConfig] = None, + base_url: Optional[str] = None, + ): + """Initialize the Supermemory Cartesia agent wrapper. + + Args: + agent: The inner Cartesia Line agent to wrap. + api_key: Supermemory API key (or SUPERMEMORY_API_KEY env var). + container_tag: Primary container tag for memory scoping (e.g., user ID). + custom_id: Required. Custom ID to store all conversation messages in the same document. + Useful for grouping multi-turn conversations (e.g., call ID, conversation ID). + add_memory: Memory persistence mode - "always" (default) or "never". + container_tags: Optional list of additional container tags for + organization/categorization (e.g., ["org-acme", "prod"]). + config: Memory retrieval configuration. + base_url: Optional custom Supermemory API URL. + + Raises: + ConfigurationError: If API key, container_tag, or custom_id is missing. + """ + self.agent = agent + self.container_tag = container_tag + self.custom_id = custom_id + self.add_memory = add_memory + + # Build container tags list: primary tag first, then additional tags + self.container_tags = [container_tag] + if container_tags: + self.container_tags.extend(container_tags) + + self.config = config or SupermemoryCartesiaAgent.MemoryConfig() + + self.api_key = api_key or os.getenv("SUPERMEMORY_API_KEY") + if not self.api_key: + raise ConfigurationError( + "API key required. Set SUPERMEMORY_API_KEY or pass api_key." + ) + + if not container_tag: + raise ConfigurationError("container_tag is required") + + if not custom_id or not custom_id.strip(): + raise ConfigurationError( + "custom_id is required and must be a non-empty string. " + "This ensures messages are grouped into the same document for a conversation." + ) + + self._supermemory_client = None + if supermemory is not None: + try: + self._supermemory_client = supermemory.AsyncSupermemory( + api_key=self.api_key, + base_url=base_url, + ) + logger.info(f"[Supermemory] Initialized client for container_tag={container_tag}, all_tags={self.container_tags}") + except Exception as e: + logger.error(f"[Supermemory] Failed to initialize client: {e}") + + self._messages_sent_count: int = 0 + self._last_query: Optional[str] = None + self._background_tasks: set = set() # Track background tasks to prevent GC + + async def _retrieve_memories(self, query: str) -> Dict[str, Any]: + """Retrieve memories from Supermemory.""" + if self._supermemory_client is None: + raise MemoryRetrievalError("Supermemory client not initialized") + + try: + # Use primary container tag for profile retrieval + kwargs: Dict[str, Any] = {"container_tag": self.container_tags[0]} + + if self.config.mode != "profile" and query: + kwargs["q"] = query + kwargs["threshold"] = self.config.search_threshold + kwargs["extra_body"] = {"limit": self.config.search_limit} + + logger.info(f"[Supermemory] Retrieving memories for query: {query[:50]}...") + + response = await asyncio.wait_for( + self._supermemory_client.profile(**kwargs), + timeout=10.0 + ) + + static_count = len(response.profile.static) if response.profile.static else 0 + dynamic_count = len(response.profile.dynamic) if response.profile.dynamic else 0 + search_count = len(response.search_results.results) if response.search_results and response.search_results.results else 0 + + logger.info(f"[Supermemory] Retrieved memories - static: {static_count}, dynamic: {dynamic_count}, search: {search_count}") + + search_results = [] + if response.search_results and response.search_results.results: + search_results = response.search_results.results + + return { + "profile": { + "static": response.profile.static or [], + "dynamic": response.profile.dynamic or [], + }, + "search_results": search_results, + } + + except asyncio.TimeoutError: + logger.warning("[Supermemory] Profile API timed out after 10s") + raise MemoryRetrievalError("Profile API timed out") + except Exception as e: + logger.error(f"[Supermemory] Error retrieving memories: {e}") + raise MemoryRetrievalError("Failed to retrieve memories", e) + + async def _store_messages(self, messages: List[Dict[str, Any]]) -> None: + """Store messages in Supermemory.""" + if self._supermemory_client is None or not messages or self.add_memory == "never": + return + + try: + # Format as conversation transcript + lines = [] + for msg in messages: + role = msg.get("role", "") + content = msg.get("content", "") + if role == "user": + lines.append(f"User: {content}") + elif role == "assistant": + lines.append(f"Assistant: {content}") + + logger.info(f"[Supermemory] Storing {len(messages)} messages to containers={self.container_tags}") + + # Build kwargs for add() call + add_kwargs: Dict[str, Any] = { + "content": "\n".join(lines), + "container_tags": self.container_tags, + "metadata": {"platform": "cartesia"}, + } + + # Use custom_id for document grouping (required field) + add_kwargs["custom_id"] = self.custom_id + logger.info(f"[Supermemory] Using custom_id={self.custom_id} for document grouping") + + await self._supermemory_client.add(**add_kwargs) + + logger.info(f"[Supermemory] Successfully stored {len(messages)} messages") + + except Exception as e: + logger.error(f"[Supermemory] Error storing messages: {e}") + + def _build_memory_message(self, memories_data: Dict[str, Any]) -> Optional[str]: + """Build memory context from retrieved data.""" + profile = memories_data["profile"] + deduplicated = deduplicate_memories( + static=profile["static"], + dynamic=profile["dynamic"], + search_results=memories_data["search_results"], + ) + + total = ( + len(deduplicated["static"]) + + len(deduplicated["dynamic"]) + + len(deduplicated["search_results"]) + ) + + if total == 0: + return None + + include_profile = self.config.mode in ("profile", "full") + include_search = self.config.mode in ("query", "full") + + memory_text = format_memories_to_text( + deduplicated, + system_prompt=self.config.system_prompt, + include_static=include_profile, + include_dynamic=include_profile, + include_search=include_search, + ) + + if not memory_text: + return None + + return f"{MEMORY_TAG_START}\n{memory_text}\n{MEMORY_TAG_END}" + + def _extract_user_message(self, event: Any) -> Optional[str]: + """Extract user text from a UserTurnEnded event.""" + if not hasattr(event, 'content'): + return None + + content = event.content + + if isinstance(content, str): + return content + + if isinstance(content, list): + texts = [] + for item in content: + if hasattr(item, 'content') and isinstance(item.content, str): + texts.append(item.content) + elif isinstance(item, str): + texts.append(item) + return " ".join(texts) if texts else None + + if hasattr(content, 'content'): + return str(content.content) + + return str(content) + + def _extract_conversation_from_history(self, history: list) -> List[Dict[str, str]]: + """Extract messages from Cartesia event history.""" + messages = [] + seen = set() + + for item in history: + if isinstance(item, dict): + if item.get("role") in ("user", "assistant"): + content = item.get("content", "") + if content and content not in seen: + messages.append(item) + seen.add(content) + continue + + event_type = getattr(item, 'type', None) or type(item).__name__ + + if event_type in ('user_turn_ended', 'UserTurnEnded'): + nested = getattr(item, 'content', []) + if isinstance(nested, list): + for n in nested: + if hasattr(n, 'content') and isinstance(n.content, str): + if n.content not in seen: + messages.append({"role": "user", "content": n.content}) + seen.add(n.content) + + elif event_type in ('agent_turn_ended', 'AgentTurnEnded'): + nested = getattr(item, 'content', []) + if isinstance(nested, list): + texts = [n.content for n in nested if hasattr(n, 'content') and isinstance(n.content, str)] + if texts: + content = " ".join(texts) + if content not in seen: + messages.append({"role": "assistant", "content": content}) + seen.add(content) + + elif event_type in ('user_text_sent', 'UserTextSent'): + content = getattr(item, 'content', '') + if content and isinstance(content, str) and content not in seen: + messages.append({"role": "user", "content": content}) + seen.add(content) + + elif event_type in ('agent_text_sent', 'AgentTextSent'): + content = getattr(item, 'content', '') + if content and isinstance(content, str) and content not in seen: + messages.append({"role": "assistant", "content": content}) + seen.add(content) + + return messages + + async def _enrich_event_with_memories(self, event: Any) -> tuple[Any, Optional[str]]: + """Enrich event by retrieving memories. + + Returns: + Tuple of (event, memory_context) - memory_context is None if no memories found. + The event is returned unchanged; memory injection happens at the agent level. + """ + user_message = self._extract_user_message(event) + + if not user_message: + logger.warning("[Supermemory] Could not extract user message from event") + return event, None + + if user_message == self._last_query: + return event, None + + self._last_query = user_message + logger.info(f"[Supermemory] Processing user message: {user_message[:50]}...") + + try: + memories_data = await self._retrieve_memories(user_message) + memory_context = self._build_memory_message(memories_data) + + if not memory_context: + logger.info("[Supermemory] No memories found for context injection") + return event, None + + logger.info("[Supermemory] Retrieved memory context for injection") + return event, memory_context + + except MemoryRetrievalError as e: + logger.warning(f"[Supermemory] Memory retrieval failed: {e}") + return event, None + except Exception as e: + logger.error(f"[Supermemory] Error in memory enrichment: {e}") + return event, None + + async def process(self, env: Any, event: Event) -> AsyncGenerator[Event, None]: + """Process events with memory enrichment. + + Args: + env: Turn environment from Cartesia Line. + event: Input event to process. + + Yields: + Output events from the wrapped agent. + """ + try: + if type(event).__name__ == "UserTurnEnded": + logger.info("[Supermemory] Processing UserTurnEnded event") + event, memory_context = await self._enrich_event_with_memories(event) + + # Clean up old memory context and inject new one if available + if hasattr(self.agent, 'config'): + original_prompt = getattr(self.agent.config, 'system_prompt', '') + # Always remove old memory context if present to prevent stale data + if MEMORY_TAG_START in original_prompt: + original_prompt = re.sub( + rf'{re.escape(MEMORY_TAG_START)}.*?{re.escape(MEMORY_TAG_END)}\s*', + '', + original_prompt, + flags=re.DOTALL + ) + logger.debug("[Supermemory] Removed old memory context from system prompt") + + # Inject new memory context if available + if memory_context: + self.agent.config.system_prompt = f"{memory_context}\n\n{original_prompt}" + logger.info("[Supermemory] Injected new memory context into system prompt") + else: + # No new memories, but we cleaned up old ones + self.agent.config.system_prompt = original_prompt + logger.debug("[Supermemory] No new memories to inject, using clean prompt") + + # Store conversation in background + if hasattr(event, 'history') and event.history: + messages = self._extract_conversation_from_history(event.history) + unsent = messages[self._messages_sent_count:] + if unsent: + logger.info(f"[Supermemory] Queuing {len(unsent)} messages for storage") + task = asyncio.create_task(self._store_messages(unsent)) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + self._messages_sent_count = len(messages) + else: + # No history yet, store just the current user message + user_content = self._extract_user_message(event) + if user_content: + logger.info(f"[Supermemory] No history, storing current user message: {user_content[:50]}...") + task = asyncio.create_task(self._store_messages([{"role": "user", "content": user_content}])) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + self._messages_sent_count = 1 # CRITICAL: Increment counter to prevent duplicate storage + + async for output in self.agent.process(env, event): + yield output + else: + async for output in self.agent.process(env, event): + yield output + + except Exception as e: + logger.error(f"[Supermemory] Error in process: {e}") + async for output in self.agent.process(env, event): + yield output + + def reset_memory_tracking(self) -> None: + """Reset memory tracking for a new conversation.""" + self._messages_sent_count = 0 + self._last_query = None + logger.info("[Supermemory] Reset memory tracking state") diff --git a/packages/cartesia-sdk-python/src/supermemory_cartesia/exceptions.py b/packages/cartesia-sdk-python/src/supermemory_cartesia/exceptions.py new file mode 100644 index 000000000..25cd2814c --- /dev/null +++ b/packages/cartesia-sdk-python/src/supermemory_cartesia/exceptions.py @@ -0,0 +1,58 @@ +"""Custom exceptions for Supermemory Cartesia integration.""" + +from typing import Optional + + +class SupermemoryCartesiaError(Exception): + """Base exception for all Supermemory Cartesia errors.""" + + def __init__(self, message: str, original_error: Optional[Exception] = None): + super().__init__(message) + self.message = message + self.original_error = original_error + + def __str__(self) -> str: + if self.original_error: + return f"{self.message}: {self.original_error}" + return self.message + + +class ConfigurationError(SupermemoryCartesiaError): + """Raised when there are configuration issues (e.g., missing API key, invalid params).""" + + +class MemoryRetrievalError(SupermemoryCartesiaError): + """Raised when memory retrieval operations fail.""" + + +class MemoryStorageError(SupermemoryCartesiaError): + """Raised when memory storage operations fail.""" + + +class APIError(SupermemoryCartesiaError): + """Raised when Supermemory API requests fail.""" + + def __init__( + self, + message: str, + status_code: Optional[int] = None, + response_text: Optional[str] = None, + original_error: Optional[Exception] = None, + ): + super().__init__(message, original_error) + self.status_code = status_code + self.response_text = response_text + + def __str__(self) -> str: + parts = [self.message] + if self.status_code: + parts.append(f"Status: {self.status_code}") + if self.response_text: + parts.append(f"Response: {self.response_text}") + if self.original_error: + parts.append(f"Cause: {self.original_error}") + return " | ".join(parts) + + +class NetworkError(SupermemoryCartesiaError): + """Raised when network operations fail.""" diff --git a/packages/cartesia-sdk-python/src/supermemory_cartesia/utils.py b/packages/cartesia-sdk-python/src/supermemory_cartesia/utils.py new file mode 100644 index 000000000..eb3664262 --- /dev/null +++ b/packages/cartesia-sdk-python/src/supermemory_cartesia/utils.py @@ -0,0 +1,134 @@ +"""Utility functions for Supermemory Cartesia integration.""" + +from datetime import datetime, timezone +from typing import Any, Dict, List, Union + + +def get_last_user_message(messages: List[Dict[str, str]]) -> str | None: + """Extract the last user message content from a list of messages.""" + for msg in reversed(messages): + if msg["role"] == "user": + return msg["content"] + return None + + +def format_relative_time(iso_timestamp: str) -> str: + """Convert ISO timestamp to relative time string. + + Format rules: + - [just now] - within 30 minutes + - [Xmins ago] - 30-60 minutes + - [X hrs ago] - less than 1 day + - [Xd ago] - less than 1 week + - [X Jul] - more than 1 week, same year + - [X Jul, 2023] - different year + """ + try: + dt = datetime.fromisoformat(iso_timestamp.replace("Z", "+00:00")) + now = datetime.now(timezone.utc) + diff = now - dt + + seconds = diff.total_seconds() + minutes = seconds / 60 + hours = seconds / 3600 + days = seconds / 86400 + + if minutes < 30: + return "just now" + elif minutes < 60: + return f"{int(minutes)}mins ago" + elif hours < 24: + return f"{int(hours)} hrs ago" + elif days < 7: + return f"{int(days)}d ago" + elif dt.year == now.year: + return f"{dt.day} {dt.strftime('%b')}" + else: + return f"{dt.day} {dt.strftime('%b')}, {dt.year}" + except Exception: + return "" + + +def deduplicate_memories( + static: List[str], + dynamic: List[str], + search_results: List[Dict[str, Any]], +) -> Dict[str, Union[List[str], List[Dict[str, Any]]]]: + """Deduplicate memories. Priority: static > dynamic > search. + + Args: + static: List of static memory strings. + dynamic: List of dynamic memory strings. + search_results: List of search result dicts with 'memory' and 'updatedAt'. + """ + seen = set() + + def unique_strings(memories: List[str]) -> List[str]: + out = [] + for m in memories: + if m not in seen: + seen.add(m) + out.append(m) + return out + + def unique_search(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + out = [] + for r in results: + memory = r.get("memory", "") + if memory and memory not in seen: + seen.add(memory) + out.append(r) + return out + + return { + "static": unique_strings(static), + "dynamic": unique_strings(dynamic), + "search_results": unique_search(search_results), + } + + +def format_memories_to_text( + memories: Dict[str, Union[List[str], List[Dict[str, Any]]]], + system_prompt: str = "Based on previous conversations, I recall:\n\n", + include_static: bool = True, + include_dynamic: bool = True, + include_search: bool = True, +) -> str: + """Format deduplicated memories into a text string for injection. + + Search results include temporal context (e.g., '3d ago') from updatedAt. + """ + sections = [] + + static = memories["static"] + dynamic = memories["dynamic"] + search_results = memories["search_results"] + + if include_static and static: + sections.append("## User Profile (Persistent)") + sections.append("\n".join(f"- {item}" for item in static)) + + if include_dynamic and dynamic: + sections.append("## Recent Context") + sections.append("\n".join(f"- {item}" for item in dynamic)) + + if include_search and search_results: + sections.append("## Relevant Memories") + lines = [] + for item in search_results: + if isinstance(item, dict): + memory = item.get("memory", "") + updated_at = item.get("updatedAt", "") + time_str = format_relative_time(updated_at) if updated_at else "" + if time_str: + lines.append(f"- [{time_str}] {memory}") + else: + lines.append(f"- {memory}") + else: + lines.append(f"- {item}") + sections.append("\n".join(lines)) + + if not sections: + return "" + + return f"{system_prompt}\n" + "\n\n".join(sections)