diff --git a/CHANGELOG.md b/CHANGELOG.md index b48aa8c..3f188ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.6.0] - 2026-06-20 + +### Added + +- ๐Ÿง  **Memory.** The AI can now remember things about you and your projects. Memories are stored per user and per workspace, and are automatically included in future conversations. You can view, edit, and delete memories from the new Memory tab in Settings. Admins can also turn on background memory review, which lets the AI quietly pick up on preferences and patterns as you chat. +- ๐ŸŽจ **Image generation and editing.** Ask the AI to create or edit images right from the chat. Generated images are saved to your workspace and displayed inline in the conversation. Supports any OpenAI-compatible image API. Configure from the new Images tab in admin settings. +- ๐Ÿ”€ **Background sub-agents.** Sub-agents can now run in the background. The AI kicks off a task, keeps chatting with you, and brings the results back when the background work is done. Great for long-running research or multi-step tasks you don't want to wait on. +- ๐Ÿ” **Better chat search.** Searching your chats now looks at chat IDs, titles, summaries, and message content all at once, with smarter ranking. Exact and prefix matches on titles and IDs show up first, then summary matches, then message content. You can also filter by workspace and choose whether to include sub-agent chats. + +### Changed + +- ๐Ÿ–ผ๏ธ **Generated images show inline.** Images created by the AI now appear as proper image previews in the chat instead of raw file paths or JSON. The tool call output also shows a cleaner summary instead of a wall of data. +- ๐Ÿท๏ธ **Background sub-agents are labeled.** When the AI runs a task in the background, the tool call label now says "Background sub-agent" so you can tell it apart from a regular sub-agent at a glance. +- ๐Ÿงน **Code cleanup.** Formatting and style improvements across the codebase for better readability. + ## [0.5.6] - 2026-06-19 ### Changed diff --git a/cptr/app.py b/cptr/app.py index 4e41655..6607de1 100644 --- a/cptr/app.py +++ b/cptr/app.py @@ -17,6 +17,8 @@ files_router, gateway_router, git_router, + images_router, + memory_router, proxy_router, search_router, skills_router, @@ -75,6 +77,12 @@ async def shutdown(): bot_manager = getattr(app.state, "bot_manager", None) if bot_manager: await bot_manager.stop_all() + try: + from cptr.utils.async_subagents import cancel_all_async_subagents + + await cancel_all_async_subagents(reason="shutdown") + except Exception: + pass # Clean up browser sessions and launched Chrome try: from cptr.utils.browser.session import session_manager @@ -257,6 +265,8 @@ async def get_config(): app.include_router(files_router) app.include_router(gateway_router) app.include_router(git_router) +app.include_router(images_router) +app.include_router(memory_router) app.include_router(proxy_router) app.include_router(search_router) app.include_router(skills_router) diff --git a/cptr/frontend/src/lib/apis/memory.ts b/cptr/frontend/src/lib/apis/memory.ts new file mode 100644 index 0000000..97b7009 --- /dev/null +++ b/cptr/frontend/src/lib/apis/memory.ts @@ -0,0 +1,40 @@ +import { fetchJSON, jsonBody } from '$lib/apis'; + +export type MemoryScope = 'user' | 'workspace'; +export type MemoryOperation = { + action: 'add' | 'replace' | 'remove'; + content?: string; + old_text?: string; +}; + +export type MemorySettings = { + enabled: boolean; + tool_enabled: boolean; + background_review_enabled: boolean; + review_interval_turns: number; + user_char_limit: number; + workspace_char_limit: number; +}; + +export type MemoryState = { + settings: MemorySettings; + user: { entries: string[]; usage: string; path: string }; + workspace: { entries: string[]; usage: string; path: string }; +}; + +export const getMemory = (workspace: string) => + fetchJSON(`/api/memory?workspace=${encodeURIComponent(workspace || '')}`); + +export const updateMemorySettings = (settings: Partial) => + fetchJSON<{ settings: MemorySettings }>('/api/memory/config', { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ settings }) + }); + +export const updateMemory = ( + scope: MemoryScope, + workspace: string, + operations: MemoryOperation[] +) => + fetchJSON('/api/memory/update', jsonBody({ scope, workspace, operations })); diff --git a/cptr/frontend/src/lib/apis/search.ts b/cptr/frontend/src/lib/apis/search.ts index 9efc0f4..bd96a4c 100644 --- a/cptr/frontend/src/lib/apis/search.ts +++ b/cptr/frontend/src/lib/apis/search.ts @@ -10,8 +10,10 @@ export interface ChatSearchResult { workspace: string; updated_at: number; created_at: number; - match_type: 'title' | 'message'; + match_type: 'id' | 'title' | 'summary' | 'message' | 'recent'; snippet: string | null; + matched_message_id?: string | null; + matched_role?: string | null; } export interface FileSearchResult { diff --git a/cptr/frontend/src/lib/components/Admin/Images.svelte b/cptr/frontend/src/lib/components/Admin/Images.svelte new file mode 100644 index 0000000..ff1cf4d --- /dev/null +++ b/cptr/frontend/src/lib/components/Admin/Images.svelte @@ -0,0 +1,235 @@ + + +
+ {#if loading} +
+ {:else} +
+

+ {$t('admin.images.title')} +

+ +

+ {$t('admin.images.generation')} +

+
+ +

+ {$t('admin.images.generationHint')} +

+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ +

+ {$t('admin.images.editing')} +

+
+ +

+ {$t('admin.images.editHint')} +

+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+ + +
+ +
+ {/if} +
diff --git a/cptr/frontend/src/lib/components/Admin/Models.svelte b/cptr/frontend/src/lib/components/Admin/Models.svelte index 62a9c3b..6b5812d 100644 --- a/cptr/frontend/src/lib/components/Admin/Models.svelte +++ b/cptr/frontend/src/lib/components/Admin/Models.svelte @@ -64,7 +64,7 @@ { name: 'MODEL', desc: 'Model ID being used' } ]; - const DEFAULT_PROMPT_PLACEHOLDER = `You are cptr, a helpful assistant running inside the user's computer interface. You have access to tools to read, search, and modify files in the workspace, run commands, and use configured tools. Use them to help the user directly. + const DEFAULT_PROMPT_PLACEHOLDER = `You are Computer (cptr), a helpful assistant running inside the user's computer interface. You have access to tools to read, search, and modify files in the workspace, run commands, and use configured tools. Use them to help the user directly. Approach hard requests with initiative and persistence: make the best possible attempt, adapt as needed, and keep going unless a real constraint prevents progress. {{CPTR_CONTEXT}} diff --git a/cptr/frontend/src/lib/components/Admin/Subagents.svelte b/cptr/frontend/src/lib/components/Admin/Subagents.svelte index a6c0dd1..2ce6371 100644 --- a/cptr/frontend/src/lib/components/Admin/Subagents.svelte +++ b/cptr/frontend/src/lib/components/Admin/Subagents.svelte @@ -10,7 +10,9 @@ let saving = $state(false); let enabled = $state(false); + let backgroundEnabled = $state(false); let maxConcurrent = $state(3); + let maxAsync = $state(3); let maxIterations = $state(30); let maxOutput = $state(30000); let systemPrompt = $state(''); @@ -19,7 +21,9 @@ try { const config = await getAdminConfig(); enabled = config['subagents.enabled'] === true || config['subagents.enabled'] === 'true'; + backgroundEnabled = config['subagents.background_enabled'] === true || config['subagents.background_enabled'] === 'true'; maxConcurrent = Number(config['subagents.max_concurrent']) || 3; + maxAsync = Number(config['subagents.max_async']) || 3; maxIterations = Number(config['subagents.max_iterations']) || 30; maxOutput = Number(config['subagents.max_output']) || 30000; systemPrompt = (config['subagents.system_prompt'] as string) || ''; @@ -34,7 +38,9 @@ try { await updateConfig({ 'subagents.enabled': enabled, + 'subagents.background_enabled': backgroundEnabled, 'subagents.max_concurrent': maxConcurrent, + 'subagents.max_async': maxAsync, 'subagents.max_iterations': maxIterations, 'subagents.max_output': maxOutput, 'subagents.system_prompt': systemPrompt @@ -73,6 +79,27 @@ +
+ +

+ {$t('admin.subagentsBackgroundHint')} +

+
+ + {#if backgroundEnabled} +
+ +
+ + {$t('admin.subagentsMaxAsyncHint')} +
+
+ {/if} +
diff --git a/cptr/frontend/src/lib/components/Icon.svelte b/cptr/frontend/src/lib/components/Icon.svelte index 083f667..402b838 100644 --- a/cptr/frontend/src/lib/components/Icon.svelte +++ b/cptr/frontend/src/lib/components/Icon.svelte @@ -72,6 +72,17 @@ /> + {:else if name === 'brain'} + + + + + + {:else if name === 'plus'} {:else if name === 'xmark'} diff --git a/cptr/frontend/src/lib/components/SearchModal.svelte b/cptr/frontend/src/lib/components/SearchModal.svelte index c565fc6..d15c9f1 100644 --- a/cptr/frontend/src/lib/components/SearchModal.svelte +++ b/cptr/frontend/src/lib/components/SearchModal.svelte @@ -264,7 +264,7 @@ {chat.title} {relativeTime(chat.updated_at)}
- {#if chat.match_type === 'message' && chat.snippet} + {#if chat.snippet}
{chat.snippet}
{/if}
diff --git a/cptr/frontend/src/lib/components/Settings/General.svelte b/cptr/frontend/src/lib/components/Settings/General.svelte index 42bbc78..6482790 100644 --- a/cptr/frontend/src/lib/components/Settings/General.svelte +++ b/cptr/frontend/src/lib/components/Settings/General.svelte @@ -6,6 +6,7 @@ import { t, locale, changeLocale, supportedLocales } from '$lib/i18n'; import { notificationsEnabled, notificationSound } from '$lib/stores/chat'; import { fetchJSON } from '$lib/apis'; + import { updateConfig } from '$lib/apis/admin'; import { session } from '$lib/session'; import ToggleSwitch from '../common/ToggleSwitch.svelte'; import { onMount } from 'svelte'; @@ -33,10 +34,7 @@ async function save() { saving = true; try { - await fetchJSON('/api/admin/config', { - method: 'PUT', - body: JSON.stringify({ config: { 'notifications.webhook_url': webhookUrl.trim() || null } }) - }); + await updateConfig({ 'notifications.webhook_url': webhookUrl.trim() || null }); webhookUrlOriginal = webhookUrl.trim(); toast.success($t('settings.saved')); } catch { diff --git a/cptr/frontend/src/lib/components/Settings/Memory.svelte b/cptr/frontend/src/lib/components/Settings/Memory.svelte new file mode 100644 index 0000000..a5c5a15 --- /dev/null +++ b/cptr/frontend/src/lib/components/Settings/Memory.svelte @@ -0,0 +1,163 @@ + + +
+ {#if loading || !settings} +
+ {:else} +
+

Memory

+ +

Behavior

+
+ + + {#if settings.enabled} + + + + {/if} +
+ + {#if settings.enabled} +

Limits

+
+
+ + +
+
+ + +
+
+ + {#if userEntries.length > 0} +

User Memory

+
+ {#each userEntries as entry} +
+
+ {entry} +
+ +
+ {/each} +

{userUsage}

+
+ {/if} + + {/if} +
+ +
+ +
+ {/if} +
diff --git a/cptr/frontend/src/lib/components/SettingsModal.svelte b/cptr/frontend/src/lib/components/SettingsModal.svelte index 47fbd3b..86939d8 100644 --- a/cptr/frontend/src/lib/components/SettingsModal.svelte +++ b/cptr/frontend/src/lib/components/SettingsModal.svelte @@ -3,6 +3,7 @@ import Icon from './Icon.svelte'; import Modal from './Modal.svelte'; import General from './Settings/General.svelte'; + import Memory from './Settings/Memory.svelte'; import PWA from './Settings/PWA.svelte'; import Account from './Settings/Account.svelte'; import Keyboard from './Settings/Keyboard.svelte'; @@ -13,6 +14,7 @@ import Messaging from './Admin/Messaging.svelte'; import Gateway from './Admin/Gateway.svelte'; import AudioSettings from './Admin/AudioSettings.svelte'; + import Images from './Admin/Images.svelte'; import AdminWeb from './Admin/Web.svelte'; import ToolServers from './Admin/ToolServers.svelte'; import Subagents from './Admin/Subagents.svelte'; @@ -21,6 +23,7 @@ type Tab = | 'general' + | 'memory' | 'pwa' | 'keyboard' | 'account' @@ -31,6 +34,7 @@ | 'messaging' | 'gateway' | 'audio' + | 'images' | 'web' | 'toolservers' | 'subagents'; @@ -49,6 +53,20 @@ type SettingsTab = { id: Tab; label: string; icon: string }; + const adminTabIds: Tab[] = [ + 'users', + 'connections', + 'models', + 'messaging', + 'gateway', + 'audio', + 'images', + 'web', + 'toolservers', + 'subagents', + 'memory' + ]; + const personalTabs: SettingsTab[] = $derived.by(() => { const tabs: SettingsTab[] = [ { id: 'general', label: $t('settings.general'), icon: 'settings' }, @@ -67,15 +85,19 @@ { id: 'messaging', label: $t('admin.messaging'), icon: 'chat-bubble' }, { id: 'gateway', label: $t('admin.gateway.tab'), icon: 'gateway' }, { id: 'audio', label: $t('admin.audio.title'), icon: 'microphone' }, + { id: 'images', label: $t('admin.images.title'), icon: 'image' }, { id: 'web', label: $t('admin.web'), icon: 'globe' }, { id: 'toolservers', label: $t('admin.toolServers'), icon: 'plug' }, - { id: 'subagents', label: $t('admin.subagents'), icon: 'user' } + { id: 'subagents', label: $t('admin.subagents'), icon: 'user' }, + { id: 'memory', label: 'Memory', icon: 'brain' } ]); onMount(() => { showPwaSettings = isInstalledPwa(); if (showPwaSettings && initialTab === 'pwa') { activeTab = 'pwa'; + } else if (!$session || ($session.role !== 'admin' && adminTabIds.includes(initialTab))) { + activeTab = 'general'; } else if (initialTab !== 'pwa') { activeTab = initialTab; } else { @@ -146,9 +168,11 @@
- {#if activeTab === 'general'} - - {:else if activeTab === 'pwa' && showPwaSettings} + {#if activeTab === 'general'} + + {:else if activeTab === 'memory'} + + {:else if activeTab === 'pwa' && showPwaSettings} {:else if activeTab === 'keyboard'} @@ -168,6 +192,8 @@ {:else if activeTab === 'audio'} + {:else if activeTab === 'images'} + {:else if activeTab === 'web'} {:else if activeTab === 'toolservers'} diff --git a/cptr/frontend/src/lib/components/chat/AssistantMessage.svelte b/cptr/frontend/src/lib/components/chat/AssistantMessage.svelte index ec9694b..17b5658 100644 --- a/cptr/frontend/src/lib/components/chat/AssistantMessage.svelte +++ b/cptr/frontend/src/lib/components/chat/AssistantMessage.svelte @@ -177,6 +177,8 @@ return _t('chat.tool.checkTask', { id: args.task_id || '?' }); case 'kill_task': return _t('chat.tool.killTask', { id: args.task_id || '?' }); + case 'image_generate': + return args.image || args.images?.length ? 'Edit image' : 'Generate image'; case 'web_search': return _t('chat.tool.webSearch', { query: args.query || '?' }); case 'read_url': { @@ -188,7 +190,8 @@ } case 'delegate_task': { const t = args.task || '?'; - return `Sub-agent: "${t.length > 60 ? t.slice(0, 60) + 'โ€ฆ' : t}"`; + const label = args.background ? 'Background sub-agent' : 'Sub-agent'; + return `${label}: "${t.length > 60 ? t.slice(0, 60) + 'โ€ฆ' : t}"`; } default: { // External tool: {server_id}_{tool_name} โ†’ "tool_name (server_id)" @@ -224,7 +227,12 @@ item: any; } - type DisplayItem = ActivityGroup | MessageItem | ArtifactItem; + interface ImageItem { + type: 'image_item'; + item: any; + } + + type DisplayItem = ActivityGroup | MessageItem | ArtifactItem | ImageItem; const outputText = $derived.by((): string => { return (output || []) @@ -235,6 +243,14 @@ .join(''); }); + const structuredImageUrls = $derived.by((): string[] => { + return (output || []) + .filter((i: any) => i.type === 'image') + .flatMap((i: any) => i.images || []) + .map((image: any) => image?.url) + .filter((url: any): url is string => typeof url === 'string' && url.length > 0); + }); + const unrenderedContent = $derived.by((): string => { if (!content) return ''; if (!outputText) return content; @@ -245,6 +261,20 @@ return (item.content || []).map((c: any) => c.text || '').join(''); } + function escapeRegExp(value: string): string { + return value.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + } + + function stripStructuredImageMarkdown(text: string): string { + let cleaned = text; + for (const url of structuredImageUrls) { + const escapedUrl = escapeRegExp(url); + cleaned = cleaned.replace(new RegExp(`!\\[[^\\]]*\\]\\(${escapedUrl}\\)`, 'g'), ''); + cleaned = cleaned.replace(new RegExp(`(^|\\n)[ \\t]*${escapedUrl}[ \\t]*(?=\\n|$)`, 'g'), '$1'); + } + return cleaned.replace(/\n{3,}/g, '\n\n').trim(); + } + const displayItems = $derived.by((): DisplayItem[] => { if (!output?.length) return []; @@ -297,6 +327,9 @@ } else if (item.type === 'artifact') { flushGroup(); items.push({ type: 'artifact_item', item }); + } else if (item.type === 'image') { + flushGroup(); + items.push({ type: 'image_item', item }); } // function_call_output items are handled via outputMap, skip standalone render } @@ -378,9 +411,10 @@ {/if} {#each displayItems as displayItem, groupIdx} {#if displayItem.type === 'message_item'} - c.text).join('') || ''} - /> + {@const messageText = stripStructuredImageMarkdown(messageItemText(displayItem.item))} + {#if messageText} + + {/if} {:else if displayItem.type === 'artifact_item'} {@const artifact = displayItem.item} {@const preview = (artifact.content || '').replace(/^#.*\n*/m, '').trim()} @@ -415,6 +449,23 @@ {/if}
+ {:else if displayItem.type === 'image_item'} +
+ {#each displayItem.item.images || [] as image} + + {image.name + + {/each} +
{:else if displayItem.type === 'activity_group'} {#if displayItem.entries.length === 1} {@const item = displayItem.entries[0]} @@ -448,10 +499,14 @@ {/if} {/each} {#if done && unrenderedContent && displayItems.length > 0} - + {@const leftoverText = stripStructuredImageMarkdown(unrenderedContent)} + {#if leftoverText} + + {/if} {:else if !done} - {#if unrenderedContent} - + {@const leftoverText = stripStructuredImageMarkdown(unrenderedContent)} + {#if leftoverText} + {/if} { if (!allMessages.length) return []; - // Exclude queued messages from the display path โ€” they only appear in the queue UI - const displayMessages = allMessages.filter((m) => !m.meta?.queued); + // Exclude pending internal inputs until the parent has processed them. + const displayMessages = allMessages.filter( + (m) => !m.meta?.queued && !m.meta?.async_subagent_pending + ); if (!displayMessages.length) return []; const msgMap = new Map(displayMessages.map((m) => [m.id, m])); @@ -259,7 +261,7 @@ const isLanding = $derived(allMessages.length === 0 && !chatId); const workspaceName = $derived(workspace.split('/').pop() || 'workspace'); - // Queued messages: user messages with meta.queued flag (server-side queue) + // Queued messages: user-authored messages waiting behind an active response. const queuedMessages = $derived( allMessages .filter((m) => m.role === 'user' && m.meta?.queued) @@ -368,9 +370,10 @@ output?: any; done?: boolean; error?: string; - queue_processed?: boolean; + pending_inputs_processed?: boolean; + async_subagent_pending?: boolean; title?: string; - }) { + }) { // On the landing page, update the chat list in place from socket events if (isLanding) { const knownChat = previousChats.some((c) => c.id === data.chat_id); @@ -402,8 +405,8 @@ updateTab(tabId, data.chat_id, data.title); } - // Queue was processed server-side: reload to see combined message + new generation - if (data.queue_processed) { + // Follow-up state changed server-side: reload to see new transcript/generation state. + if (data.pending_inputs_processed || data.async_subagent_pending) { loadChat(data.chat_id); return; } @@ -1375,9 +1378,9 @@ {#if hasHiddenMessages} {/if} - {#each visiblePath as { msg, siblingIds, siblingIndex } (msg.id)} - {#if msg.role === 'user'} - { + if (toolName !== 'image_generate' || !pairedOutput?.output) { + return null; + } + try { + const parsed = JSON.parse(pairedOutput.output); + const images = Array.isArray(parsed.images) ? parsed.images : []; + return images.filter((image: any) => image && (image.path || image.url || image.id)); + } catch { + return null; + } + }); function toggleExpanded() { expanded = !expanded; @@ -246,12 +258,22 @@ {$t('chat.toolOutput')}
-
{pairedOutput
-									.output.length > 10000
-									? pairedOutput.output.slice(0, 10000)
-									: pairedOutput.output}
- {#if pairedOutput.output.length > 10000} + {#if imageToolOutput?.length} +
+ {#each imageToolOutput as image} +
+ {image.path || image.url || image.id} +
+ {/each} +
+ {:else} +
{pairedOutput
+										.output.length > 10000
+										? pairedOutput.output.slice(0, 10000)
+										: pairedOutput.output}
+ {/if} + {#if !imageToolOutput?.length && pairedOutput.output.length > 10000}
{$t('chat.totalChars', { count: pairedOutput.output.length.toLocaleString() diff --git a/cptr/frontend/src/lib/components/chat/UserMessage.svelte b/cptr/frontend/src/lib/components/chat/UserMessage.svelte index 3a92826..e36557a 100644 --- a/cptr/frontend/src/lib/components/chat/UserMessage.svelte +++ b/cptr/frontend/src/lib/components/chat/UserMessage.svelte @@ -42,6 +42,21 @@ let editedContent = $state(''); let copied = $state(false); let textareaEl: HTMLTextAreaElement; + let asyncExpanded = $state(false); + const isAsyncSubagentResult = $derived(meta?.async_subagent_result === true); + const delegationId = $derived(meta?.delegation_id || ''); + const delegationIds = $derived(Array.isArray(meta?.delegation_ids) ? meta.delegation_ids : []); + const delegationLabel = $derived( + delegationId || (delegationIds.length > 1 ? `${delegationIds.length} tasks` : '') + ); + const asyncSummary = $derived.by(() => { + const line = content + .split('\n') + .map((s) => s.trim()) + .find((s) => s && !s.startsWith('[')); + if (!line) return delegationLabel || ''; + return line.length > 96 ? `${line.slice(0, 96)}...` : line; + }); async function startEdit() { edit = true; @@ -95,7 +110,37 @@
- {#if edit} + {#if isAsyncSubagentResult} +
+ + {#if asyncExpanded} +
+ {content} +
+ {/if} +
+ {:else if edit}
list[dict]: - """Search chats by title AND message content (case-insensitive). + """Search chats by id, title, summary, and message content. - Returns dicts with chat fields + 'match_type' ('title' | 'message') - and 'snippet' (matching message excerpt or null). - Title matches rank first. + Ranking favors exact/prefix id and title matches, then summary matches, + then message content. This intentionally avoids schema changes or FTS + dependencies; it is a better-ranked LIKE search over the current tables. """ - from sqlalchemy import literal, union_all + needle = (query or "").strip() + if not needle or limit <= 0: + return [] + terms = _search_terms(needle) async with await get_db() as db: - pattern = f"%{query}%" - - # 1) Chats matching by title - title_q = ( - select( - Chat.id, - Chat.title, - Chat.summary, - Chat.meta, - Chat.updated_at, - Chat.created_at, - literal("title").label("match_type"), - literal("").label("snippet"), + pattern = f"%{needle}%" + term_patterns = [f"%{term}%" for term in terms] + metadata_clauses = [ + Chat.id.ilike(pattern), + Chat.title.ilike(pattern), + Chat.summary.ilike(pattern), + ] + message_clauses = [ChatMessage.content.ilike(pattern)] + for term_pattern in term_patterns: + metadata_clauses.extend( + ( + Chat.id.ilike(term_pattern), + Chat.title.ilike(term_pattern), + Chat.summary.ilike(term_pattern), + ) ) - .where(Chat.user_id == user_id) - .where(Chat.title.ilike(pattern)) + message_clauses.append(ChatMessage.content.ilike(term_pattern)) + + chat_rows = await db.execute( + select(Chat).where(Chat.user_id == user_id).where(or_(*metadata_clauses)) ) + chats = list(chat_rows.scalars().all()) - # 2) Chats matching by message content (exclude title-matched) - message_q = ( - select( - Chat.id, - Chat.title, - Chat.summary, - Chat.meta, - Chat.updated_at, - Chat.created_at, - literal("message").label("match_type"), - ChatMessage.content.label("snippet"), - ) + message_rows = await db.execute( + select(Chat, ChatMessage) .join(ChatMessage, ChatMessage.chat_id == Chat.id) .where(Chat.user_id == user_id) - .where(ChatMessage.content.ilike(pattern)) - .where(~Chat.title.ilike(pattern)) # avoid duplicates - .group_by(Chat.id) # one result per chat + .where(or_(*message_clauses)) + .order_by(Chat.updated_at.desc(), ChatMessage.created_at.desc()) + .limit(max(200, (limit + offset) * 50)) ) - - # Union, order by match_type (title first), then recency - combined = union_all(title_q, message_q).subquery() - result = await db.execute( - select(combined) - .order_by(combined.c.match_type, combined.c.updated_at.desc()) - .limit(limit) - .offset(offset) + message_matches = list(message_rows.all()) + + results: dict[str, dict] = {} + + def allowed(chat: Chat) -> bool: + meta = chat.meta or {} + if not include_subagents and meta.get("subagent"): + return False + if workspace and meta.get("workspace") != workspace: + return False + return True + + def add_candidate( + chat: Chat, + match_type: str, + rank: int, + snippet_source: str | None = None, + matched_message: ChatMessage | None = None, + ) -> None: + if not allowed(chat): + return + payload = { + "id": chat.id, + "title": chat.title, + "summary": chat.summary, + "meta": chat.meta, + "updated_at": chat.updated_at, + "created_at": chat.created_at, + "match_type": match_type, + "snippet": _extract_snippet(snippet_source, needle, terms) + if snippet_source + else None, + "matched_message_id": matched_message.id if matched_message else None, + "matched_role": matched_message.role if matched_message else None, + "_rank": rank, + } + existing = results.get(chat.id) + if not existing or (rank, -chat.updated_at) < ( + existing["_rank"], + -existing["updated_at"], + ): + results[chat.id] = payload + + for chat in chats: + rank, match_type, snippet_source = _rank_chat_match(chat, needle, terms) + add_candidate(chat, match_type, rank, snippet_source) + + for chat, message in message_matches: + phrase_or_terms_rank = 60 if needle.casefold() in message.content.casefold() else 70 + coverage = _term_coverage(message.content, terms) + add_candidate( + chat, + "message", + phrase_or_terms_rank - min(coverage, 5), + message.content, + matched_message=message, ) - rows = result.all() - return [ - { - "id": r.id, - "title": r.title, - "summary": r.summary, - "meta": r.meta, - "updated_at": r.updated_at, - "created_at": r.created_at, - "match_type": r.match_type, - "snippet": _extract_snippet(r.snippet, query) if r.snippet else None, - } - for r in rows - ] - - -def _extract_snippet(content: str, query: str, context_chars: int = 80) -> str | None: + ordered = sorted( + results.values(), key=lambda r: (r["_rank"], -r["updated_at"], r["title"].lower()) + ) + page = ordered[offset : offset + limit] + for row in page: + row.pop("_rank", None) + return page + + +_SEARCH_STOPWORDS = { + "a", + "am", + "an", + "and", + "are", + "do", + "does", + "how", + "i", + "is", + "me", + "my", + "of", + "or", + "the", + "to", + "was", + "what", + "when", +} + + +def _search_terms(query: str) -> list[str]: + """Return useful term fallbacks for non-phrase recall queries.""" + terms: list[str] = [] + for token in re.findall(r"[\w'-]+", query.casefold()): + token = token.strip("-'") + if len(token) < 2 or token in _SEARCH_STOPWORDS: + continue + if token not in terms: + terms.append(token) + return terms[:8] + + +def _term_coverage(content: str | None, terms: list[str]) -> int: + if not content or not terms: + return 0 + folded = content.casefold() + return sum(1 for term in terms if term in folded) + + +def _rank_chat_match(chat: Chat, query: str, terms: list[str]) -> tuple[int, str, str | None]: + """Return rank, match_type, and optional snippet source for chat metadata.""" + q = query.casefold() + values = { + "id": chat.id or "", + "title": chat.title or "", + "summary": chat.summary or "", + } + folded = {key: value.casefold() for key, value in values.items()} + + if folded["id"] == q: + return (0, "id", f"Chat ID: {values['id']}") + if folded["title"] == q: + return (1, "title", None) + if folded["id"].startswith(q): + return (2, "id", f"Chat ID: {values['id']}") + if folded["title"].startswith(q): + return (3, "title", None) + if q in folded["id"]: + return (10, "id", f"Chat ID: {values['id']}") + if q in folded["title"]: + return (11, "title", None) + if q in folded["summary"]: + return (40, "summary", values["summary"]) + + best: tuple[int, str, str | None] | None = None + for field, value in folded.items(): + coverage = _term_coverage(value, terms) + if coverage <= 0: + continue + if field == "id": + candidate = (20 - min(coverage, 5), "id", f"Chat ID: {values['id']}") + elif field == "title": + candidate = (30 - min(coverage, 5), "title", None) + else: + candidate = (50 - min(coverage, 5), "summary", values["summary"]) + if best is None or candidate[0] < best[0]: + best = candidate + return best or (90, "summary", values["summary"]) + + +def _extract_snippet( + content: str | None, query: str, terms: list[str] | None = None, context_chars: int = 96 +) -> str | None: """Extract a short snippet around the first match in message content.""" if not content: return None - lower = content.lower() - idx = lower.find(query.lower()) + query = (query or "").strip() + if not query: + return content[: context_chars * 2] + ("..." if len(content) > context_chars * 2 else "") + + lower = content.casefold() + idx = lower.find(query.casefold()) + if idx == -1: + for token in terms or query.split(): + idx = lower.find(token.casefold()) + if idx != -1: + query = token + break if idx == -1: return content[: context_chars * 2] + ("..." if len(content) > context_chars * 2 else "") start = max(0, idx - context_chars) @@ -228,7 +365,9 @@ class ChatMessage(Base): output = Column(JSON, nullable=True) # Responses API output items usage = Column(JSON, nullable=True) # {input_tokens, output_tokens, ...} meta = Column(JSON, nullable=True) # {files, followups, error, ...} - chat_summary = Column(Text, nullable=True) # Compaction summary (covers all ancestors before this msg) + chat_summary = Column( + Text, nullable=True + ) # Compaction summary (covers all ancestors before this msg) created_at = Column(BigInteger, nullable=False) # โ”€โ”€ Class methods โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ diff --git a/cptr/models/config.py b/cptr/models/config.py index 3e7a2f7..1ae1305 100644 --- a/cptr/models/config.py +++ b/cptr/models/config.py @@ -51,6 +51,10 @@ async def upsert(updates: dict) -> None: async with await get_db() as db: for key, value in updates.items(): existing = await db.get(Config, key) + if value is None: + if existing: + await db.delete(existing) + continue if existing: existing.value = value else: @@ -66,6 +70,4 @@ async def upsert(updates: dict) -> None: except Exception: import logging - logging.getLogger(__name__).warning( - "Failed to sync config to TOML", exc_info=True - ) + logging.getLogger(__name__).warning("Failed to sync config to TOML", exc_info=True) diff --git a/cptr/routers/__init__.py b/cptr/routers/__init__.py index 0f448bf..a64383a 100644 --- a/cptr/routers/__init__.py +++ b/cptr/routers/__init__.py @@ -11,6 +11,8 @@ from cptr.routers.events import router as events_router from cptr.routers.files import router as files_router from cptr.routers.git import router as git_router +from cptr.routers.images import router as images_router +from cptr.routers.memory import router as memory_router from cptr.routers.proxy import router as proxy_router from cptr.routers.search import router as search_router from cptr.routers.skills import router as skills_router @@ -30,6 +32,8 @@ "files_router", "gateway_router", "git_router", + "images_router", + "memory_router", "proxy_router", "search_router", "skills_router", @@ -37,4 +41,3 @@ "terminal_router", "workspace_router", ] - diff --git a/cptr/routers/admin.py b/cptr/routers/admin.py index 6e2311b..0b093a7 100644 --- a/cptr/routers/admin.py +++ b/cptr/routers/admin.py @@ -134,7 +134,12 @@ def _prepare_config_updates(updates: dict) -> dict: """Normalize sensitive config values before persisting them.""" prepared = dict(updates) secret = _get_jwt_secret() - for key in ("audio.stt_api_key", "audio.tts_api_key"): + for key in ( + "audio.stt_api_key", + "audio.tts_api_key", + "images.generation_api_key", + "images.edit_api_key", + ): value = prepared.get(key) if isinstance(value, str) and value and not value.startswith("encrypted:"): prepared[key] = encrypt_key(value, secret) diff --git a/cptr/routers/chat.py b/cptr/routers/chat.py index 26e05d6..1d462d5 100644 --- a/cptr/routers/chat.py +++ b/cptr/routers/chat.py @@ -325,7 +325,7 @@ async def _get_chat_context_usage(chat, model_id: str | None = None) -> dict | N messages, existing_summary = await _load_message_history(chat.id, message_id) workspace = (chat.meta or {}).get("workspace", "") model = model_id or await _infer_chat_model(chat.id) - system = await _load_system_prompt(workspace, model or "") + system = await _load_system_prompt(workspace, model or "", user_id=chat.user_id) if existing_summary: system += f"\n\n[CONVERSATION SUMMARY]\n{existing_summary}" @@ -428,15 +428,23 @@ async def send_message(body: SendMessageRequest, request: Request): # Sync params into chat meta if chat.meta is None: chat.meta = {} - if chat.meta.get("params") != body.params: + if ( + chat.meta.get("params") != body.params + or chat.meta.get("last_model") != body.model_id + ): chat.meta["params"] = body.params + chat.meta["last_model"] = body.model_id await Chat.update_meta(chat.id, chat.meta) else: title = body.content[:50].strip() or "New Chat" chat = await Chat.create( user_id=user_id, title=title, - meta={"workspace": body.workspace, "params": body.params}, + meta={ + "workspace": body.workspace, + "params": body.params, + "last_model": body.model_id, + }, created_at=now_ms(), ) # Ensure .cptr/chats/ dir exists @@ -446,70 +454,73 @@ async def send_message(body: SendMessageRequest, request: Request): # Auto-add .cptr to .gitignore if this is a git repo await asyncio.to_thread(ensure_cptr_gitignored, body.workspace) - # Check if the chat has an in-progress assistant message. - # If so, queue this message instead of starting a new task. - if body.chat_id and await _chat_has_active_generation(chat.id): - user_msg = await ChatMessage.create( - chat_id=chat.id, - role="user", - content=body.content, - parent_id=body.parent_id, - meta={"queued": True}, - created_at=now_ms(), - ) - # Double-check: if generation finished during our create, - # process queue now to close the race window. - if not await _chat_has_active_generation(chat.id): - from cptr.utils.chat_task import _process_queue - - workspace = (chat.meta or {}).get("workspace", body.workspace) - await _process_queue(chat.id, user_id, workspace) - return {"chat_id": chat.id, "message_id": user_msg.id, "queued": True} - # Resolve connection for model connection, bare_model = await _resolve_connection(body.model_id, request.app.state) # Detect regeneration by checking parent message role parent_msg = await ChatMessage.get_by_id(body.parent_id) if body.parent_id else None - if parent_msg and parent_msg.role == "user": - # Regeneration: create assistant as sibling of existing response - assistant_parent = body.parent_id - else: - # Normal send: create user message first - user_meta = {"files": body.files} if body.files else None - user_msg = await ChatMessage.create( - chat_id=chat.id, - role="user", - content=body.content, - parent_id=body.parent_id, - meta=user_meta, - created_at=now_ms(), - ) - assistant_parent = user_msg.id - - # Create empty assistant message - assistant_msg = await ChatMessage.create( - chat_id=chat.id, - role="assistant", - content="", - parent_id=assistant_parent, - model=body.model_id, - done=False, - created_at=now_ms(), + from cptr.utils.chat_task import ( + get_pending_input_lock, + process_pending_chat_inputs, + start_task, ) - # Update chat pointer to new leaf - await Chat.update_current_message(chat.id, assistant_msg.id, now_ms()) + queued_msg = None + user_msg = None + assistant_msg = None + async with get_pending_input_lock(chat.id): + if body.chat_id and await _chat_has_active_generation(chat.id): + queued_msg = await ChatMessage.create( + chat_id=chat.id, + role="user", + content=body.content, + parent_id=body.parent_id, + meta={"queued": True}, + created_at=now_ms(), + ) + else: + if parent_msg and parent_msg.role == "user": + # Regeneration: create assistant as sibling of existing response. + assistant_parent = body.parent_id + else: + user_meta = {"files": body.files} if body.files else None + user_msg = await ChatMessage.create( + chat_id=chat.id, + role="user", + content=body.content, + parent_id=body.parent_id, + meta=user_meta, + created_at=now_ms(), + ) + assistant_parent = user_msg.id + + assistant_msg = await ChatMessage.create( + chat_id=chat.id, + role="assistant", + content="", + parent_id=assistant_parent, + model=body.model_id, + done=False, + created_at=now_ms(), + ) + + # Update chat pointer to new leaf while still holding the input lock. + await Chat.update_current_message(chat.id, assistant_msg.id, now_ms()) + + if queued_msg: + workspace = (chat.meta or {}).get("workspace", body.workspace) + await process_pending_chat_inputs(chat.id, user_id, workspace) + return {"chat_id": chat.id, "message_id": queued_msg.id, "queued": True} + + if not assistant_msg: + raise HTTPException(500, "failed to create assistant message") # Export JSON immediately so list_chats discovers it right away from cptr.utils.chat_export import export_chat_to_file await export_chat_to_file(chat.id) - # Start background task (export_chat_to_file also runs after task completes) - from cptr.utils.chat_task import start_task - start_task( message_id=assistant_msg.id, chat_id=chat.id, @@ -649,7 +660,7 @@ async def approve_tool(chat_id: str, message_id: str, body: ApproveRequest, requ ) # Emit artifact card if the tool produced an artifact - from cptr.utils.chat_task import build_artifact_item + from cptr.utils.chat_task import build_artifact_item, build_image_item artifact_item = build_artifact_item(call["name"], call.get("arguments", {}), result) if artifact_item: @@ -661,6 +672,16 @@ async def approve_tool(chat_id: str, message_id: str, body: ApproveRequest, requ {"chat_id": chat_id, "message_id": message_id, "output": artifact_item}, ) + image_item = build_image_item(call["name"], result) + if image_item: + output.append(image_item) + from cptr.socket.main import emit_to_user + + await emit_to_user( + user_id, + {"chat_id": chat_id, "message_id": message_id, "output": image_item}, + ) + await ChatMessage.update(message_id, output=output, done=False) # Resolve connection and continue @@ -680,11 +701,11 @@ async def approve_tool(chat_id: str, message_id: str, body: ApproveRequest, requ else: call["status"] = "rejected" await ChatMessage.update(message_id, output=output, done=True) - # Process queued messages since this chat is now idle - from cptr.utils.chat_task import _process_queue + # Process pending inputs since this chat is now idle. + from cptr.utils.chat_task import process_pending_chat_inputs workspace = chat.meta.get("workspace", "") if chat.meta else "" - await _process_queue(chat_id, user_id, workspace) + await process_pending_chat_inputs(chat_id, user_id, workspace) return {"ok": True} @@ -715,11 +736,11 @@ async def cancel_task_endpoint(chat_id: str, message_id: str, request: Request): item["status"] = "rejected" await ChatMessage.update(message_id, output=output, done=True) - # Process queued messages since this chat may now be idle - from cptr.utils.chat_task import _process_queue + # Process pending inputs since this chat may now be idle. + from cptr.utils.chat_task import process_pending_chat_inputs workspace = chat.meta.get("workspace", "") if chat.meta else "" - await _process_queue(chat_id, user_id, workspace) + await process_pending_chat_inputs(chat_id, user_id, workspace) return {"ok": True} @@ -824,53 +845,57 @@ async def queue_send_now(chat_id: str, message_id: str, request: Request): if not (msg.meta and msg.meta.get("queued")): raise HTTPException(400, "message is not queued") - # Cancel any active task - all_msgs = await ChatMessage.get_all_by_chat(chat_id) - for m in all_msgs: - if m.role == "assistant" and not m.done: - from cptr.utils.chat_task import cancel_task + from cptr.utils.chat_task import cancel_task, get_pending_input_lock, start_task - await cancel_task(m.id) - await ChatMessage.update(m.id, done=True) + async with get_pending_input_lock(chat_id): + # Cancel any active task + all_msgs = await ChatMessage.get_all_by_chat(chat_id) + for m in all_msgs: + if m.role == "assistant" and not m.done: + await cancel_task(m.id) + await ChatMessage.update(m.id, done=True) - # Delete all OTHER queued messages, keep this one - for m in all_msgs: - if m.role == "user" and m.meta and m.meta.get("queued") and m.id != message_id: + # Delete all OTHER queued user prompts, keep this one. + for m in all_msgs: + if not (m.role == "user" and m.meta and m.meta.get("queued") and m.id != message_id): + continue await ChatMessage.delete(m.id) - # Clear queued flag on this message - meta = dict(msg.meta or {}) - meta.pop("queued", None) - await ChatMessage.update(message_id, meta=meta or None) - - # Find parent: latest done assistant message, or the queued message's parent - done_assistants = [m for m in all_msgs if m.role == "assistant" and m.done] - parent_id = done_assistants[-1].id if done_assistants else msg.parent_id - - # Re-parent the user message to the correct leaf - if msg.parent_id != parent_id: - await ChatMessage.update(message_id, parent_id=parent_id) - - # Resolve connection and start task - model_id = msg.model or (all_msgs[-1].model if all_msgs else "") - if not model_id: - # Fall back to last used model from chat meta - model_id = (chat.meta or {}).get("last_model", "") - connection, bare_model = await _resolve_connection(model_id, request.app.state) - workspace = (chat.meta or {}).get("workspace", "") + # Clear queued flag on this message + meta = dict(msg.meta or {}) + meta.pop("queued", None) + await ChatMessage.update(message_id, meta=meta or None) - assistant_msg = await ChatMessage.create( - chat_id=chat_id, - role="assistant", - content="", - parent_id=message_id, - model=model_id, - done=False, - created_at=now_ms(), - ) - await Chat.update_current_message(chat_id, assistant_msg.id, now_ms()) + # Find parent: latest done assistant message, or the queued message's parent + done_assistants = [m for m in all_msgs if m.role == "assistant" and m.done] + parent_id = done_assistants[-1].id if done_assistants else msg.parent_id - from cptr.utils.chat_task import start_task + # Re-parent the user message to the correct leaf + if msg.parent_id != parent_id: + await ChatMessage.update(message_id, parent_id=parent_id) + + # Resolve connection and start task + model_id = msg.model or (all_msgs[-1].model if all_msgs else "") + if not model_id: + # Fall back to last used model from chat meta + model_id = (chat.meta or {}).get("last_model", "") + connection, bare_model = await _resolve_connection(model_id, request.app.state) + workspace = (chat.meta or {}).get("workspace", "") + meta_for_model = dict(chat.meta or {}) + if meta_for_model.get("last_model") != model_id: + meta_for_model["last_model"] = model_id + await Chat.update_meta(chat.id, meta_for_model, now_ms()) + + assistant_msg = await ChatMessage.create( + chat_id=chat_id, + role="assistant", + content="", + parent_id=message_id, + model=model_id, + done=False, + created_at=now_ms(), + ) + await Chat.update_current_message(chat_id, assistant_msg.id, now_ms()) start_task( message_id=assistant_msg.id, diff --git a/cptr/routers/images.py b/cptr/routers/images.py new file mode 100644 index 0000000..8b81eb4 --- /dev/null +++ b/cptr/routers/images.py @@ -0,0 +1,95 @@ +"""OpenAI-compatible image generation and editing routes.""" + +from __future__ import annotations + +import logging + +import httpx +from fastapi import APIRouter, HTTPException, Request +from pydantic import BaseModel, Field + +from cptr.utils.config import check_access +from cptr.utils.images import MAX_IMAGES_PER_REQUEST, edit_images, generate_images + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/images", tags=["images"]) + +COOKIE_NAME = "cptr_session" + + +class GenerateImageRequest(BaseModel): + prompt: str + workspace: str | None = None + model: str | None = None + size: str | None = None + n: int | None = Field(default=1, ge=1, le=MAX_IMAGES_PER_REQUEST) + + +class EditImageRequest(BaseModel): + prompt: str + image_ids: list[str] + workspace: str | None = None + model: str | None = None + size: str | None = None + n: int | None = Field(default=1, ge=1, le=MAX_IMAGES_PER_REQUEST) + background: str | None = None + + +def _get_user(request: Request) -> str: + token = request.cookies.get(COOKIE_NAME) + client_host = request.client.host if request.client else "127.0.0.1" + auth = check_access(client_host=client_host, jwt_token=token) + if not auth or not auth.user_id: + raise HTTPException(401, "authentication required") + return auth.user_id + + +def _raise_image_error(exc: Exception) -> None: + if isinstance(exc, PermissionError): + raise HTTPException(403, str(exc)) + if isinstance(exc, ValueError): + raise HTTPException(400, str(exc)) + if isinstance(exc, httpx.HTTPStatusError): + detail = f"Image API error: {exc.response.status_code}" + logger.warning("[images] %s: %s", detail, exc.response.text[:500]) + raise HTTPException(502, detail) + if isinstance(exc, httpx.ConnectError): + raise HTTPException(502, "Could not connect to image API") + raise HTTPException(500, "Image request failed") + + +@router.post("/generations") +async def create_image_generation(body: GenerateImageRequest, request: Request): + user_id = _get_user(request) + try: + images = await generate_images( + body.prompt, + user_id=user_id, + model=body.model, + size=body.size, + n=body.n, + workspace=body.workspace, + ) + return {"images": [image.as_dict() for image in images]} + except Exception as exc: + _raise_image_error(exc) + + +@router.post("/edits") +async def create_image_edit(body: EditImageRequest, request: Request): + user_id = _get_user(request) + try: + images = await edit_images( + body.prompt, + body.image_ids, + user_id=user_id, + model=body.model, + size=body.size, + n=body.n, + background=body.background, + workspace=body.workspace, + ) + return {"images": [image.as_dict() for image in images]} + except Exception as exc: + _raise_image_error(exc) diff --git a/cptr/routers/memory.py b/cptr/routers/memory.py new file mode 100644 index 0000000..9fa4374 --- /dev/null +++ b/cptr/routers/memory.py @@ -0,0 +1,71 @@ +"""Managed memory API.""" + +from __future__ import annotations + +from typing import Any, Literal + +from fastapi import APIRouter, HTTPException, Query, Request +from pydantic import BaseModel + +from cptr.utils.config import AuthResult, check_access +from cptr.utils.memory import ( + remember, + read_memory_state, + save_memory_settings, +) + +router = APIRouter(prefix="/api/memory", tags=["memory"]) +COOKIE_NAME = "cptr_session" + + +def _get_auth(request: Request) -> AuthResult: + token = request.cookies.get(COOKIE_NAME) + client_host = request.client.host if request.client else "127.0.0.1" + auth = check_access(client_host=client_host, jwt_token=token) + if not auth or not auth.user_id: + raise HTTPException(401, "authentication required") + return auth + + +def _get_user(request: Request) -> str: + return _get_auth(request).user_id or "" + + +def _require_admin(request: Request) -> AuthResult: + auth = _get_auth(request) + if auth.role != "admin": + raise HTTPException(403, "admin required") + return auth + + +class MemorySettingsRequest(BaseModel): + settings: dict[str, Any] + + +class MemoryUpdateRequest(BaseModel): + scope: Literal["user", "workspace"] + operations: list[dict[str, Any]] + workspace: str = "" + + +@router.get("") +async def get_memory(request: Request, workspace: str = Query("")): + user_id = _get_user(request) + return await read_memory_state(user_id, workspace) + + +@router.put("/config") +async def put_memory_settings(body: MemorySettingsRequest, request: Request): + _require_admin(request) + return {"settings": await save_memory_settings(body.settings)} + + +@router.post("/update") +async def update_memory(body: MemoryUpdateRequest, request: Request): + user_id = _get_user(request) + return await remember( + user_id=user_id, + workspace=body.workspace, + scope=body.scope, + operations=body.operations, + ) diff --git a/cptr/routers/search.py b/cptr/routers/search.py index 5d39b8b..5ec347c 100644 --- a/cptr/routers/search.py +++ b/cptr/routers/search.py @@ -9,7 +9,7 @@ from fastapi import APIRouter, HTTPException, Query, Request from cptr.models import Chat -from cptr.routers.workspace import walk_and_rank_files, SearchResult +from cptr.routers.workspace import walk_and_rank_files router = APIRouter(prefix="/api/search", tags=["search"]) @@ -47,7 +47,7 @@ async def unified_search( ws_paths = [workspace] if workspace else workspaces # Run chat search and file search concurrently - chat_task = Chat.search_by_text(user_id, q, chat_limit) + chat_task = Chat.search_by_text(user_id, q, chat_limit, workspace=workspace) async def _search_files() -> list[dict]: if not ws_paths: @@ -82,12 +82,6 @@ async def _search_files() -> list[dict]: chat_results, file_results = await asyncio.gather(chat_task, _search_files()) - # Filter chats by workspace if scoped - if workspace: - chat_results = [ - c for c in chat_results if (c.get("meta") or {}).get("workspace") == workspace - ] - # Build chat response (strip meta, add workspace) chats_out = [] for c in chat_results: @@ -101,6 +95,8 @@ async def _search_files() -> list[dict]: "created_at": c["created_at"], "match_type": c.get("match_type", "title"), "snippet": c.get("snippet"), + "matched_message_id": c.get("matched_message_id"), + "matched_role": c.get("matched_role"), } ) @@ -120,12 +116,10 @@ async def recent_chats( async with await get_db() as db: result = await db.execute( - select(Chat) - .where(Chat.user_id == user_id) - .order_by(Chat.updated_at.desc()) - .limit(limit) + select(Chat).where(Chat.user_id == user_id).order_by(Chat.updated_at.desc()) ) rows = list(result.scalars().all()) + rows = [c for c in rows if not (c.meta or {}).get("subagent")][:limit] return { "chats": [ diff --git a/cptr/utils/async_subagents.py b/cptr/utils/async_subagents.py new file mode 100644 index 0000000..8fa2dd9 --- /dev/null +++ b/cptr/utils/async_subagents.py @@ -0,0 +1,325 @@ +"""Session-scoped async subagent registry. + +Background subagents are deliberately not durable jobs. They run in the +current server process, keep a small in-memory status record, and inject their +final summary back into the parent chat when they finish. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +import uuid +from collections.abc import Awaitable, Callable +from typing import Any + +from cptr.models import Chat, ChatMessage +from cptr.socket.main import emit_to_user +from cptr.utils.chat_export import export_chat_to_file +from cptr.utils.config import now_ms + +logger = logging.getLogger(__name__) + +_records: dict[str, dict[str, Any]] = {} +_lock = asyncio.Lock() +_MAX_RETAINED_COMPLETED = 50 +_completion_injector_override: Callable[[dict[str, Any]], Awaitable[None]] | None = None + + +def _new_delegation_id() -> str: + return f"deleg_{uuid.uuid4().hex[:8]}" + + +async def active_count() -> int: + """Return the number of running background subagents.""" + async with _lock: + return sum(1 for r in _records.values() if r.get("status") in {"starting", "running"}) + + +async def reserve_async_subagent(max_async: int, **record: Any) -> dict[str, Any]: + """Reserve capacity for a background subagent before creating its chat.""" + max_async = max(1, int(max_async or 3)) + async with _lock: + running = sum( + 1 for r in _records.values() if r.get("status") in {"starting", "running"} + ) + if running >= max_async: + return { + "status": "rejected", + "error": ( + f"Async subagent capacity reached ({max_async} running). " + "Wait for one to finish or increase subagents.max_async." + ), + } + + delegation_id = _new_delegation_id() + now = time.time() + _records[delegation_id] = { + **record, + "delegation_id": delegation_id, + "status": "starting", + "dispatched_at": now, + "completed_at": None, + "task": None, + "error": None, + "summary": None, + } + return {"status": "reserved", "delegation_id": delegation_id} + + +async def attach_subagent_chat( + delegation_id: str, + *, + subagent_chat_id: str, + subagent_message_id: str, +) -> None: + async with _lock: + record = _records.get(delegation_id) + if record: + record["subagent_chat_id"] = subagent_chat_id + record["subagent_message_id"] = subagent_message_id + record["status"] = "running" + + +async def start_async_subagent( + delegation_id: str, + runner: Callable[[], Awaitable[str]], +) -> None: + """Start a reserved subagent runner and return immediately.""" + + async def _run() -> None: + status = "completed" + summary = "" + error = None + try: + summary = await runner() + except asyncio.CancelledError: + status = "interrupted" + error = "cancelled" + raise + except Exception as exc: # noqa: BLE001 - completion must still be recorded + logger.exception("Async subagent %s failed", delegation_id) + status = "error" + error = f"{type(exc).__name__}: {exc}" + finally: + await _finalize(delegation_id, status=status, summary=summary, error=error) + + task = asyncio.create_task(_run()) + async with _lock: + record = _records.get(delegation_id) + if record: + record["task"] = task + + +async def fail_reserved_subagent(delegation_id: str, error: str) -> None: + await _finalize(delegation_id, status="error", summary="", error=error) + + +async def cancel_all_async_subagents(reason: str = "shutdown") -> int: + """Cancel all running background subagents.""" + async with _lock: + tasks = [ + r.get("task") + for r in _records.values() + if r.get("status") in {"starting", "running"} and r.get("task") + ] + count = 0 + for task in tasks: + if task and not task.done(): + task.cancel() + count += 1 + if count: + logger.info("Cancelled %d async subagent(s) (%s)", count, reason) + return count + + +def list_async_subagents() -> list[dict[str, Any]]: + """Return a serializable snapshot of records.""" + snapshot = [] + for record in _records.values(): + snapshot.append({k: v for k, v in record.items() if k != "task"}) + return snapshot + + +async def _finalize( + delegation_id: str, + *, + status: str, + summary: str, + error: str | None, +) -> None: + async with _lock: + record = _records.get(delegation_id) + if not record: + return + record["status"] = status + record["summary"] = summary + record["error"] = error + record["completed_at"] = time.time() + snapshot = {k: v for k, v in record.items() if k != "task"} + _prune_completed_locked() + + injector = _completion_injector_override or _inject_completion + try: + await injector(snapshot) + except Exception: + logger.exception("Failed to inject async subagent completion %s", delegation_id) + + +def _prune_completed_locked() -> None: + completed = [ + (rid, r) + for rid, r in _records.items() + if r.get("status") not in {"starting", "running"} + ] + if len(completed) <= _MAX_RETAINED_COMPLETED: + return + completed.sort(key=lambda kv: kv[1].get("completed_at") or kv[1].get("dispatched_at") or 0) + for rid, _ in completed[: len(completed) - _MAX_RETAINED_COMPLETED]: + _records.pop(rid, None) + + +async def _inject_completion(record: dict[str, Any]) -> None: + parent_chat_id = record.get("parent_chat_id") + user_id = record.get("user_id") + if not parent_chat_id or not user_id: + return + + chat = await Chat.get_by_id(parent_chat_id) + if not chat: + return + + model_id = _parent_model_id(chat, record) + content = _format_completion(record) + + from cptr.utils.chat_task import get_pending_input_lock, start_task + + assistant_msg = None + async with get_pending_input_lock(parent_chat_id): + all_msgs = await ChatMessage.get_all_by_chat(parent_chat_id) + active = any(m.role == "assistant" and not m.done for m in all_msgs) + done_assistants = [m for m in all_msgs if m.role == "assistant" and m.done] + parent_id = done_assistants[-1].id if done_assistants else record.get("parent_message_id") + + meta = { + "async_subagent_result": True, + "delegation_id": record.get("delegation_id"), + "subagent_chat_id": record.get("subagent_chat_id"), + } + if active: + meta["async_subagent_pending"] = True + + user_msg = await ChatMessage.create( + chat_id=parent_chat_id, + role="user", + content=content, + parent_id=parent_id, + model=model_id, + meta=meta, + created_at=now_ms(), + ) + + if not active: + assistant_msg = await ChatMessage.create( + chat_id=parent_chat_id, + role="assistant", + content="", + parent_id=user_msg.id, + model=model_id, + done=False, + created_at=now_ms(), + ) + await Chat.update_current_message(parent_chat_id, assistant_msg.id, now_ms()) + + await export_chat_to_file(parent_chat_id) + if not assistant_msg: + await emit_to_user( + user_id, + { + "chat_id": parent_chat_id, + "message_id": user_msg.id, + "async_subagent_pending": True, + }, + ) + return + + await emit_to_user( + user_id, + { + "chat_id": parent_chat_id, + "message_id": assistant_msg.id, + "pending_inputs_processed": True, + }, + ) + + start_task( + message_id=assistant_msg.id, + chat_id=parent_chat_id, + user_id=user_id, + connection=record["connection"], + workspace=record["workspace"], + model=record["model"], + ) + + +def _parent_model_id(chat: Chat, record: dict[str, Any]) -> str: + meta = chat.meta or {} + return str(meta.get("last_model") or record.get("model_id") or record.get("model") or "") + + +def _format_completion(record: dict[str, Any]) -> str: + status = record.get("status") or "completed" + summary = record.get("summary") or "" + error = record.get("error") + dispatched_at = record.get("dispatched_at") + completed_at = record.get("completed_at") or time.time() + duration = "" + if isinstance(dispatched_at, (int, float)): + duration = f"{completed_at - dispatched_at:.1f}s" + + lines = [ + f"[ASYNC SUBAGENT COMPLETE - {record.get('delegation_id', 'unknown')}]", + ( + "A background subagent you dispatched earlier has finished. " + "The original task source is included so you can decide whether " + "to use the result or continue without it." + ), + "", + f"Original task: {record.get('task', '')}", + ] + if record.get("context"): + lines.append(f"Context provided: {record['context']}") + if record.get("subagent_chat_id"): + lines.append(f"Subagent chat: {record['subagent_chat_id']}") + if duration: + lines.append(f"Status: {status} Duration: {duration}") + else: + lines.append(f"Status: {status}") + lines.append("--- RESULT ---") + if status == "completed": + lines.append(summary or "Subagent completed without a final summary.") + elif status == "interrupted": + lines.append("The subagent was interrupted before completing.") + if summary: + lines.extend(["Partial output:", summary]) + else: + detail = f" {error}" if error else "" + lines.append(f"The subagent did not complete successfully.{detail}") + if summary: + lines.extend(["Partial output:", summary]) + return "\n".join(lines) + + +def _set_completion_injector_for_tests( + injector: Callable[[dict[str, Any]], Awaitable[None]] | None, +) -> None: + global _completion_injector_override + _completion_injector_override = injector + + +async def _reset_for_tests() -> None: + await cancel_all_async_subagents(reason="test reset") + async with _lock: + _records.clear() + _set_completion_injector_for_tests(None) diff --git a/cptr/utils/chat_task.py b/cptr/utils/chat_task.py index 7c25a59..eb07a6d 100644 --- a/cptr/utils/chat_task.py +++ b/cptr/utils/chat_task.py @@ -43,7 +43,11 @@ _tasks: dict[str, asyncio.Task] = {} # message_id โ†’ asyncio.Task _task_state: dict[str, dict] = {} # message_id โ†’ {content, output} _task_chat: dict[str, str] = {} # message_id โ†’ chat_id -_queue_locks: dict[str, asyncio.Lock] = {} # chat_id โ†’ Lock +_pending_input_locks: dict[str, asyncio.Lock] = {} # chat_id โ†’ Lock + + +def get_pending_input_lock(chat_id: str) -> asyncio.Lock: + return _pending_input_locks.setdefault(chat_id, asyncio.Lock()) def start_task( @@ -98,37 +102,89 @@ def get_active_chat_ids() -> set[str]: return {cid for mid, cid in _task_chat.items() if mid in _tasks and not _tasks[mid].done()} -# โ”€โ”€ Queue processing โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ +# โ”€โ”€ Pending input processing โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + + +def _merge_async_subagent_result_meta(messages: list[ChatMessage]) -> dict | None: + if not messages: + return None + + if all((m.meta or {}).get("async_subagent_result") for m in messages): + delegation_ids = [ + m.meta.get("delegation_id") + for m in messages + if m.meta and m.meta.get("delegation_id") + ] + subagent_chat_ids = [ + m.meta.get("subagent_chat_id") + for m in messages + if m.meta and m.meta.get("subagent_chat_id") + ] + meta = {"async_subagent_result": True} + if len(delegation_ids) == 1: + meta["delegation_id"] = delegation_ids[0] + elif delegation_ids: + meta["delegation_ids"] = delegation_ids + if len(subagent_chat_ids) == 1: + meta["subagent_chat_id"] = subagent_chat_ids[0] + elif subagent_chat_ids: + meta["subagent_chat_ids"] = subagent_chat_ids + return meta + + return None + + +def _is_pending_internal_subagent_result(message: ChatMessage) -> bool: + return bool((message.meta or {}).get("async_subagent_pending")) + + +def _is_pending_chat_input(message: ChatMessage) -> bool: + meta = message.meta or {} + return bool(meta.get("queued") or meta.get("async_subagent_pending")) + +def _first_pending_input_batch(messages: list[ChatMessage]) -> list[ChatMessage]: + if not messages: + return [] + + first_is_internal_subagent_result = _is_pending_internal_subagent_result(messages[0]) + batch = [] + for message in messages: + if _is_pending_internal_subagent_result(message) != first_is_internal_subagent_result: + break + batch.append(message) + return batch -async def _process_queue(chat_id: str, user_id: str, workspace: str): - """Check for queued user messages and start the next task. + +async def process_pending_chat_inputs(chat_id: str, user_id: str, workspace: str): + """Start the next task from user-queued prompts or internal subagent results. Uses a per-chat lock to prevent concurrent processing from both the task's finally block and the API double-check. """ - lock = _queue_locks.setdefault(chat_id, asyncio.Lock()) + lock = get_pending_input_lock(chat_id) async with lock: all_msgs = await ChatMessage.get_all_by_chat(chat_id) - # Don't process queue if there's still an active generation + # Wait until the parent assistant turn is idle. if any(m.role == "assistant" and not m.done for m in all_msgs): return - # Find queued messages (ordered by created_at) - queued = [m for m in all_msgs if m.role == "user" and m.meta and m.meta.get("queued")] - if not queued: + pending_inputs = [ + m for m in all_msgs if m.role == "user" and _is_pending_chat_input(m) + ] + if not pending_inputs: return + input_batch = _first_pending_input_batch(pending_inputs) - # Combine all queued prompts into one user message - combined_content = "\n\n".join(m.content for m in queued if m.content) + combined_content = "\n\n".join(m.content for m in input_batch if m.content) + combined_meta = _merge_async_subagent_result_meta(input_batch) # Find the current leaf (latest done assistant message) done_assistants = [m for m in all_msgs if m.role == "assistant" and m.done] - parent_id = done_assistants[-1].id if done_assistants else queued[0].parent_id + parent_id = done_assistants[-1].id if done_assistants else input_batch[0].parent_id - # Delete individual queued messages, create one combined message - for m in queued: + for m in input_batch: await ChatMessage.delete(m.id) combined_msg = await ChatMessage.create( @@ -136,6 +192,7 @@ async def _process_queue(chat_id: str, user_id: str, workspace: str): role="user", content=combined_content, parent_id=parent_id, + meta=combined_meta, created_at=now_ms(), ) @@ -149,7 +206,10 @@ async def _process_queue(chat_id: str, user_id: str, workspace: str): last_asst = done_assistants[-1] if done_assistants else None model_id = (last_asst.model if last_asst else "") or "" if not model_id: - logger.error("[queue] No model found for chat %s, cannot process queue", chat_id) + logger.error( + "[chat-input] No model found for chat %s, cannot process pending input", + chat_id, + ) return # Resolve connection @@ -158,7 +218,7 @@ async def _process_queue(chat_id: str, user_id: str, workspace: str): connection, bare_model = await _resolve_connection(model_id) except Exception: - logger.exception("[queue] Failed to resolve connection for model %s", model_id) + logger.exception("[chat-input] Failed to resolve connection for model %s", model_id) return # Create assistant placeholder @@ -173,10 +233,14 @@ async def _process_queue(chat_id: str, user_id: str, workspace: str): ) await Chat.update_current_message(chat_id, assistant_msg.id, now_ms()) - # Notify frontend that queue was processed (new messages appeared) + # Notify frontend that pending inputs became transcript messages. await emit_to_user( user_id, - {"chat_id": chat_id, "message_id": assistant_msg.id, "queue_processed": True}, + { + "chat_id": chat_id, + "message_id": assistant_msg.id, + "pending_inputs_processed": True, + }, ) # Start new task @@ -189,19 +253,19 @@ async def _process_queue(chat_id: str, user_id: str, workspace: str): model=bare_model, ) logger.info( - "[queue] Processed %d queued message(s) for chat %s", - len(queued), + "[chat-input] Processed %d pending input message(s) for chat %s", + len(input_batch), chat_id[:8], ) async def reconcile_chat_state(): - """Recover from server crash: fix stuck messages, process orphaned queues. + """Recover from server crash: fix stuck messages and resume pending inputs. Called once on startup when ENABLE_CHAT_RECONCILE_ON_STARTUP=true (default). Finds: 1. Assistant messages with done=False that have no running task โ†’ mark done - 2. Chats with queued user messages โ†’ process them + 2. Chats with pending user prompts or subagent results โ†’ process them """ from sqlalchemy import select, and_ from cptr.utils.db import get_db @@ -226,15 +290,15 @@ async def reconcile_chat_state(): await ChatMessage.update(msg.id, done=True, meta=meta) healed_chats.add(msg.chat_id) - # Process orphaned queues for healed chats + # Resume pending inputs for healed chats. for cid in healed_chats: chat = await Chat.get_by_id(cid) if chat: workspace = (chat.meta or {}).get("workspace", "") try: - await _process_queue(cid, chat.user_id, workspace) + await process_pending_chat_inputs(cid, chat.user_id, workspace) except Exception: - logger.exception("[reconcile] Failed to process queue for chat %s", cid) + logger.exception("[reconcile] Failed to process pending inputs for chat %s", cid) if healed_chats: logger.info("[reconcile] Recovered %d chat(s) on startup", len(healed_chats)) @@ -492,6 +556,9 @@ async def _load_message_history(chat_id: str, message_id: str) -> tuple[list[dic # Filter calls to only those with matching outputs turn_output_ids = {o["call_id"] for o in turn["outputs"]} matched_calls = [tc for tc in turn["calls"] if tc["id"] in turn_output_ids] + call_names = { + tc["id"]: tc.get("function", {}).get("name", "") for tc in turn["calls"] + } if matched_calls: if ti == 0: # First turn: attach to the existing entry @@ -514,7 +581,10 @@ async def _load_message_history(chat_id: str, message_id: str) -> tuple[list[dic entry = { "role": "tool", "tool_call_id": out["call_id"], - "content": out.get("output", ""), + "content": _tool_result_for_model( + call_names.get(out["call_id"], ""), + out.get("output", ""), + ), } result.append(entry) @@ -602,6 +672,39 @@ def _parse_image_data_uri(result: str) -> tuple[str, str] | None: return None +def _tool_result_for_model(tool_name: str, result: str) -> str: + """Return the tool result text to send back to the model.""" + if tool_name != "image_generate": + return result + + try: + meta = json.loads(result) + except (json.JSONDecodeError, TypeError): + return result + + images = meta.get("images") + if not isinstance(images, list): + return result + + return json.dumps( + { + "status": meta.get("status", "success"), + "displayed_to_user": True, + "note": "The generated image is already displayed in the chat. Do not include markdown image links for it.", + "images": [ + { + "id": image.get("id"), + "path": image.get("path"), + "name": image.get("name"), + "mime_type": image.get("mime_type"), + } + for image in images + if isinstance(image, dict) + ], + } + ) + + def _append_tool_to_messages( messages: list[dict], event: dict, @@ -610,6 +713,8 @@ def _append_tool_to_messages( reasoning_items: list[dict] | None = None, ): """Append a tool call + result to the message history for the next API call.""" + result = _tool_result_for_model(event["name"], result) + # Check for image result before truncation (data URI is large but needed) image = _parse_image_data_uri(result) @@ -710,6 +815,8 @@ def _append_batch_to_messages( messages.append(assistant_msg) for event, result in call_results: + result = _tool_result_for_model(event["name"], result) + # Guard against oversized tool outputs image = _parse_image_data_uri(result) if not image: @@ -857,6 +964,30 @@ def build_artifact_item(tool_name: str, arguments: dict, result: str) -> dict | } +def build_image_item(tool_name: str, result: str) -> dict | None: + """Build a renderable image output item for image-generation tools.""" + if tool_name != "image_generate": + return None + + try: + meta = json.loads(result) + except (json.JSONDecodeError, TypeError): + return None + + images = meta.get("images") + if not isinstance(images, list) or not images: + return None + + return { + "type": "image", + "images": [ + image + for image in images + if isinstance(image, dict) and isinstance(image.get("url"), str) + ], + } + + def _default_base_url(provider: str) -> str: return { "anthropic": "https://api.anthropic.com/v1", @@ -978,7 +1109,7 @@ async def _save_message(save_reason: str, **kwargs) -> bool: chat_obj = await Chat.get_by_id(chat_id) chat_params = (chat_obj.meta or {}).get("params", {}) if chat_obj else {} - system = await _load_system_prompt(workspace, model) + system = await _load_system_prompt(workspace, model, user_id=user_id) messages, loaded_summary = await _load_message_history(chat_id, message_id) if loaded_summary: system += f"\n\n[CONVERSATION SUMMARY]\n{loaded_summary}" @@ -993,7 +1124,7 @@ async def _save_message(save_reason: str, **kwargs) -> bool: # Strip delegate_task from sub-agent chats (depth limit = 1) if chat_obj and (chat_obj.meta or {}).get("subagent"): - tools = [t for t in tools if t["name"] != "delegate_task"] + tools = [t for t in tools if t["name"] not in {"delegate_task", "update_memory"}] # Parse $skill-name mentions from the user message to auto-activate skills attached_skill_ids: list[str] = [] @@ -1026,6 +1157,7 @@ async def _save_message(save_reason: str, **kwargs) -> bool: plan_mode = chat_params.get("plan_mode", False) if plan_mode: tools = [t for t in tools if ALL_TOOLS.get(t["name"], {}).get("auto")] + tools = [t for t in tools if t["name"] not in {"delegate_task", "update_memory"}] # Inject create_artifact (only available in plan mode) tools.append(_fn_to_schema("create_artifact", create_artifact)) messages.append({"role": "user", "content": PLAN_MODE_PROMPT}) @@ -1082,7 +1214,7 @@ async def _save_message(save_reason: str, **kwargs) -> bool: loaded_summary = summary # Append summary to system prompt (works for all providers) - system = await _load_system_prompt(workspace, model) + system = await _load_system_prompt(workspace, model, user_id=user_id) system += f"\n\n[CONVERSATION SUMMARY]\n{summary}" # Re-inject attached skills after compaction (protect from pruning) if attached_skill_ids: @@ -1265,7 +1397,12 @@ async def _save_message(save_reason: str, **kwargs) -> bool: "workspace": workspace, "user_id": user_id, "model_id": model, + "full_model_id": ( + (chat_obj.meta or {}).get("last_model") if chat_obj else None + ) + or model, "chat_id": chat_id, + "message_id": message_id, "connection": connection, } @@ -1368,6 +1505,12 @@ async def _save_message(save_reason: str, **kwargs) -> bool: await emit(output=artifact_item) _sync_state() + image_item = build_image_item(tc["name"], result) + if image_item: + output_items.append(image_item) + await emit(output=image_item) + _sync_state() + sequential_results.append((tc, result)) # Build a single combined assistant message for all sequential calls @@ -1555,8 +1698,24 @@ async def _save_message(save_reason: str, **kwargs) -> bool: await post_webhook(webhook_url, title, preview) except Exception: logger.debug("[webhook] Error sending webhook for chat %s", chat_id[:8], exc_info=True) - # Process any queued follow-up messages + # Best-effort post-turn memory review. Runs detached and never competes + # with queued user input processing. + try: + from cptr.utils.memory import review_memory_after_turn + + await review_memory_after_turn( + user_id=user_id, + message_id=message_id, + workspace=workspace, + conversation_messages=messages, + assistant_reply=content, + model_connection=connection, + model=model, + ) + except Exception: + logger.debug("[memory] Failed to review conversation", exc_info=True) + # Process any pending user prompts or internal subagent results. try: - await _process_queue(chat_id, user_id, workspace) + await process_pending_chat_inputs(chat_id, user_id, workspace) except Exception: - logger.exception(f"Failed to process queue for chat {chat_id}") + logger.exception(f"Failed to process pending inputs for chat {chat_id}") diff --git a/cptr/utils/config.py b/cptr/utils/config.py index e2858be..3a729ba 100644 --- a/cptr/utils/config.py +++ b/cptr/utils/config.py @@ -9,9 +9,9 @@ import time from dataclasses import dataclass from enum import Enum -from pathlib import Path import bcrypt +import jwt # PyJWT from cptr.env import DATA_DIR, CONFIG_FILE @@ -164,6 +164,8 @@ def sync_config_to_toml(app_config: dict) -> None: # Build the app_config section toml_section: dict = {} for key, value in app_config.items(): + if value is None: + continue if isinstance(value, (list, dict)): # Store complex types as JSON strings toml_section[key] = _json.dumps(value, ensure_ascii=False) @@ -230,7 +232,6 @@ async def has_any_user() -> bool: return result.scalar_one_or_none() is not None - # โ”€โ”€ PAM โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ @@ -290,8 +291,6 @@ async def get_or_create_user(username: str) -> str: # โ”€โ”€ JWT Tokens (stateless auth) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ -import jwt # PyJWT - def _get_jwt_secret() -> str: """Get or auto-generate JWT secret. Stored in config.toml.""" diff --git a/cptr/utils/images.py b/cptr/utils/images.py new file mode 100644 index 0000000..9fff865 --- /dev/null +++ b/cptr/utils/images.py @@ -0,0 +1,345 @@ +"""OpenAI-compatible image generation and editing helpers.""" + +from __future__ import annotations + +import base64 +import asyncio +import hashlib +import mimetypes +import os +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path + +import httpx + +from cptr.models import Config, File +from cptr.utils.config import _get_jwt_secret, now_ms +from cptr.utils.crypto import decrypt_key +from cptr.utils.storage import get_storage + + +DEFAULT_IMAGE_BASE_URL = "https://api.openai.com/v1" +DEFAULT_IMAGE_MODEL = "gpt-image-1" +MAX_IMAGES_PER_REQUEST = 4 + + +@dataclass +class ImageResult: + id: str + url: str + name: str + content_type: str + size: int + path: str | None = None + + def as_dict(self) -> dict: + data = { + "id": self.id, + "url": self.url, + "name": self.name, + "type": "image", + "content_type": self.content_type, + "size": self.size, + } + if self.path: + data["path"] = self.path + return data + + +def clamp_image_count(n: int | None) -> int: + if n is None: + return 1 + return min(max(int(n), 1), MAX_IMAGES_PER_REQUEST) + + +def _clean_base_url(value: object) -> str: + base_url = str(value or DEFAULT_IMAGE_BASE_URL).strip().rstrip("/") + return base_url or DEFAULT_IMAGE_BASE_URL + + +def _clean_optional(value: object) -> str | None: + if not isinstance(value, str): + return None + value = value.strip() + return value or None + + +async def _decrypt_config_key(key: str) -> str | None: + encrypted = await Config.get(key) + if not encrypted: + return None + return decrypt_key(str(encrypted), _get_jwt_secret()) + + +async def image_generation_config() -> dict: + return { + "enabled": await Config.get("images.generation_enabled") is True, + "base_url": _clean_base_url(await Config.get("images.generation_base_url")), + "api_key": await _decrypt_config_key("images.generation_api_key"), + "model": _clean_optional(await Config.get("images.generation_model")) + or DEFAULT_IMAGE_MODEL, + "size": _clean_optional(await Config.get("images.generation_size")), + } + + +async def image_edit_config() -> dict: + edit_key = await _decrypt_config_key("images.edit_api_key") + generation_key = await _decrypt_config_key("images.generation_api_key") + return { + "enabled": await Config.get("images.edit_enabled") is True, + "base_url": _clean_base_url(await Config.get("images.edit_base_url")), + "api_key": edit_key or generation_key, + "model": _clean_optional(await Config.get("images.edit_model")) or DEFAULT_IMAGE_MODEL, + "size": _clean_optional(await Config.get("images.edit_size")), + } + + +def _extension_for_content_type(content_type: str) -> str: + content_type = content_type.split(";")[0].strip().lower() + ext = mimetypes.guess_extension(content_type) + if ext == ".jpe": + ext = ".jpg" + return ext or ".png" + + +def _file_url(file_id: str, content_type: str) -> str: + return f"/api/files/{file_id}{_extension_for_content_type(content_type)}" + + +def _unique_workspace_image_path(workspace: str, source: str, content_type: str) -> Path: + root = Path(workspace).expanduser().resolve() + ext = _extension_for_content_type(content_type) + stamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") + name = f"{source}-{stamp}{ext}" + path = root / name + counter = 2 + while path.exists(): + path = root / f"{source}-{stamp}-{counter}{ext}" + counter += 1 + return path + + +async def _store_image( + data: bytes, + content_type: str, + user_id: str | None, + source: str, + metadata: dict, + workspace: str | None = None, +) -> ImageResult: + content_type = content_type.split(";")[0].strip().lower() or "image/png" + ext = _extension_for_content_type(content_type) + workspace_path: Path | None = None + + if workspace: + workspace_path = _unique_workspace_image_path(workspace, source, content_type) + + def _write() -> None: + workspace_path.parent.mkdir(parents=True, exist_ok=True) + workspace_path.write_bytes(data) + + await asyncio.to_thread(_write) + + filename = workspace_path.name if workspace_path else f"{source}{ext}" + meta = { + "content_type": content_type, + "size": len(data), + "hash": hashlib.sha256(data).hexdigest(), + "source": source, + **metadata, + } + if workspace_path: + meta["path"] = str(workspace_path) + + record = await File.create( + user_id=user_id, + filename=filename, + meta=meta, + created_at=now_ms(), + ) + await get_storage().put(record.id, data) + return ImageResult( + id=record.id, + url=_file_url(record.id, content_type), + name=filename, + content_type=content_type, + size=len(data), + path=str(workspace_path) if workspace_path else None, + ) + + +async def _decode_image_response_item( + client: httpx.AsyncClient, + item: dict, + headers: dict[str, str], +) -> tuple[bytes, str]: + if b64_json := item.get("b64_json"): + return base64.b64decode(b64_json), "image/png" + + if image_url := item.get("url"): + resp = await client.get(image_url, headers=headers) + resp.raise_for_status() + content_type = resp.headers.get("content-type", "image/png") + if not content_type.lower().startswith("image/"): + raise ValueError("Provider URL did not return an image.") + return resp.content, content_type + + raise ValueError("Image response did not include b64_json or url.") + + +async def generate_images( + prompt: str, + *, + user_id: str | None, + model: str | None = None, + size: str | None = None, + n: int | None = None, + workspace: str | None = None, +) -> list[ImageResult]: + cfg = await image_generation_config() + if not cfg["enabled"]: + raise PermissionError("Image generation is disabled.") + if not cfg["api_key"]: + raise ValueError("Image generation is not configured.") + + payload: dict = { + "model": model or cfg["model"], + "prompt": prompt, + "n": clamp_image_count(n), + } + if chosen_size := (size or cfg["size"]): + payload["size"] = chosen_size + + headers = {"Authorization": f"Bearer {cfg['api_key']}"} + async with httpx.AsyncClient(timeout=httpx.Timeout(120)) as client: + resp = await client.post( + f"{cfg['base_url']}/images/generations", + headers={**headers, "Content-Type": "application/json"}, + json=payload, + ) + resp.raise_for_status() + body = resp.json() + + results = [] + for item in body.get("data", []): + data, content_type = await _decode_image_response_item(client, item, headers) + results.append( + await _store_image( + data, + content_type, + user_id, + "generated-image", + {"prompt": prompt, "model": payload["model"], "kind": "generation"}, + workspace, + ) + ) + return results + + +def clean_file_id(value: str) -> str: + value = value.strip() + if value.startswith("/api/files/"): + value = value.removeprefix("/api/files/") + value, _ = os.path.splitext(value) + if "/" in value or "\\" in value or not value: + raise ValueError("Image edits only accept cptr file IDs.") + return value + + +def _resolve_workspace_image_path(value: str, workspace: str | None) -> Path | None: + raw = value.strip() + if raw.startswith("/api/workspace/files/serve/"): + raw = "/" + raw.removeprefix("/api/workspace/files/serve/") + path = Path(raw).expanduser() + if not path.is_absolute(): + if not workspace: + return None + path = Path(workspace).expanduser().resolve() / raw + return path.resolve() + + +async def _load_image_file(image_ref: str, workspace: str | None = None) -> tuple[str, bytes, str]: + path = _resolve_workspace_image_path(image_ref, workspace) + if path and path.is_file(): + content_type = mimetypes.guess_type(str(path))[0] or "application/octet-stream" + if not content_type.lower().startswith("image/"): + raise ValueError(f"File is not an image: {image_ref}") + data = await asyncio.to_thread(path.read_bytes) + return path.name, data, content_type + + clean_id = clean_file_id(image_ref) + record = await File.get_by_id(clean_id) + if not record: + raise ValueError(f"Image file not found: {image_ref}") + data = await get_storage().get(record.id) + if data is None: + raise ValueError(f"Image blob missing: {image_ref}") + content_type = (record.meta or {}).get("content_type", "application/octet-stream") + if not str(content_type).lower().startswith("image/"): + raise ValueError(f"File is not an image: {image_ref}") + return record.filename or f"{record.id}.png", data, str(content_type) + + +async def edit_images( + prompt: str, + image_ids: list[str], + *, + user_id: str | None, + model: str | None = None, + size: str | None = None, + n: int | None = None, + background: str | None = None, + workspace: str | None = None, +) -> list[ImageResult]: + cfg = await image_edit_config() + if not cfg["enabled"]: + raise PermissionError("Image editing is disabled.") + if not cfg["api_key"]: + raise ValueError("Image editing is not configured.") + if not image_ids: + raise ValueError("At least one image file ID is required.") + + payload: dict[str, str] = { + "model": model or cfg["model"], + "prompt": prompt, + } + if n is not None: + payload["n"] = str(clamp_image_count(n)) + if chosen_size := (size or cfg["size"]): + payload["size"] = chosen_size + if background: + payload["background"] = background + + loaded_files = [await _load_image_file(image_id, workspace) for image_id in image_ids] + files = [] + multi = len(loaded_files) > 1 + for filename, data, content_type in loaded_files: + field_name = "image[]" if multi else "image" + files.append((field_name, (filename, data, content_type))) + + headers = {"Authorization": f"Bearer {cfg['api_key']}"} + async with httpx.AsyncClient(timeout=httpx.Timeout(120)) as client: + resp = await client.post( + f"{cfg['base_url']}/images/edits", + headers=headers, + data=payload, + files=files, + ) + resp.raise_for_status() + body = resp.json() + + results = [] + for item in body.get("data", []): + data, content_type = await _decode_image_response_item(client, item, headers) + results.append( + await _store_image( + data, + content_type, + user_id, + "edited-image", + {"prompt": prompt, "model": payload["model"], "kind": "edit"}, + workspace, + ) + ) + return results diff --git a/cptr/utils/memory.py b/cptr/utils/memory.py new file mode 100644 index 0000000..d44bab2 --- /dev/null +++ b/cptr/utils/memory.py @@ -0,0 +1,440 @@ +"""File-backed managed memory for cptr.""" + +from __future__ import annotations + +import asyncio +import os +import re +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from cptr.env import DATA_DIR +from cptr.models import Config +from cptr.utils.workspace import ensure_cptr_gitignored + +MEMORY_DIR_NAME = "memory" + +DEFAULT_MEMORY_SETTINGS: dict[str, Any] = { + "enabled": True, + "tool_enabled": True, + "background_review_enabled": True, + "review_interval_turns": 10, + "user_char_limit": 2000, + "workspace_char_limit": 3000, +} + +_memory_file_locks: dict[str, asyncio.Lock] = {} +_reviewed_messages: set[str] = set() + + +@dataclass(frozen=True) +class MemoryFile: + path: Path + character_limit: int + + +def _safe_id(value: str) -> str: + cleaned = re.sub(r"[^A-Za-z0-9_.-]+", "-", value).strip(".-") + return cleaned or "user" + + +def normalize_workspace_path(workspace: str) -> str: + if not workspace: + raise ValueError("workspace is required for workspace memory") + return str(Path(workspace).expanduser().resolve()) + + +def _user_memory_root(user_id: str) -> Path: + return DATA_DIR / MEMORY_DIR_NAME / "users" / _safe_id(user_id) + + +def user_memory_path(user_id: str) -> Path: + return _user_memory_root(user_id) / "USER.md" + + +def workspace_memory_path(user_id: str, workspace: str) -> Path: + root = Path(normalize_workspace_path(workspace)) + return root / ".cptr" / MEMORY_DIR_NAME / "users" / _safe_id(user_id) / "WORKSPACE.md" + + +def _memory_file_lock(path: Path) -> asyncio.Lock: + return _memory_file_locks.setdefault(str(path), asyncio.Lock()) + + +def read_memory_entries(path: Path) -> list[str]: + if not path.is_file(): + return [] + entries: list[str] = [] + for line in path.read_text(errors="replace").splitlines(): + line = line.strip() + if not line.startswith("- "): + continue + entry = line[2:].strip() + if entry: + entries.append(entry) + return entries + + +def format_memory_entries(entries: list[str]) -> str: + rendered = "\n".join(f"- {normalize_memory_text(entry)}" for entry in entries if entry.strip()) + return f"{rendered}\n" if rendered else "" + + +def write_memory_entries(path: Path, entries: list[str]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_name(f".{path.name}.{uuid.uuid4().hex}.tmp") + tmp.write_text(format_memory_entries(entries), encoding="utf-8") + os.replace(tmp, path) + + +def measure_memory_entries(entries: list[str]) -> int: + if not entries: + return 0 + return len("\n".join(entries)) + + +UNSAFE_MEMORY_CHARACTERS = ("\x00", "\u200b", "\u200c", "\u200d", "\ufeff") + + +def normalize_memory_text(content: str) -> str: + return " ".join(str(content).split()) + + +def memory_text_error(content: str) -> str | None: + """Return a rejection reason for text that cannot be safely stored as memory.""" + if any(ch in content for ch in UNSAFE_MEMORY_CHARACTERS): + return "memory content contains invisible or null characters" + return None + + +async def get_memory_settings() -> dict[str, Any]: + settings = {**DEFAULT_MEMORY_SETTINGS} + for key in DEFAULT_MEMORY_SETTINGS: + value = await Config.get(f"memory.{key}") + if value is not None: + settings[key] = value + settings["review_interval_turns"] = max(1, int(settings.get("review_interval_turns") or 10)) + settings["user_char_limit"] = max(250, int(settings.get("user_char_limit") or 2000)) + settings["workspace_char_limit"] = max(250, int(settings.get("workspace_char_limit") or 3000)) + return settings + + +async def save_memory_settings(updates: dict[str, Any]) -> dict[str, Any]: + await Config.upsert( + {f"memory.{key}": value for key, value in updates.items() if key in DEFAULT_MEMORY_SETTINGS} + ) + return await get_memory_settings() + + +async def resolve_memory_file(user_id: str, workspace: str, scope: str) -> MemoryFile: + settings = await get_memory_settings() + if scope == "user": + return MemoryFile( + path=user_memory_path(user_id), + character_limit=int(settings["user_char_limit"]), + ) + if scope == "workspace": + return MemoryFile( + path=workspace_memory_path(user_id, workspace), + character_limit=int(settings["workspace_char_limit"]), + ) + raise ValueError("scope must be 'user' or 'workspace'") + + +def apply_memory_batch( + current_entries: list[str], + operations: list[dict[str, Any]], + character_limit: int, +) -> tuple[bool, str, list[str], str]: + current_usage = f"{measure_memory_entries(current_entries)}/{character_limit}" + if not operations: + return False, "operations list is empty", current_entries, current_usage + + next_entries = list(current_entries) + for index, operation in enumerate(operations): + if not isinstance(operation, dict): + return False, f"Operation {index + 1}: must be an object", current_entries, current_usage + action = operation.get("action") + content = normalize_memory_text(str(operation.get("content") or "")) + old_text = normalize_memory_text(str(operation.get("old_text") or "")) + operation_name = f"Operation {index + 1} ({action or 'unknown'})" + + if action in {"add", "replace"}: + validation_error = memory_text_error(content) + if validation_error: + return False, f"{operation_name}: {validation_error}", current_entries, current_usage + + if action == "add": + if not content: + return False, f"{operation_name}: content is required", current_entries, current_usage + if content not in next_entries: + next_entries.append(content) + elif action == "replace": + if not old_text: + return False, f"{operation_name}: old_text is required", current_entries, current_usage + if not content: + return False, f"{operation_name}: content is required", current_entries, current_usage + matches = [i for i, entry in enumerate(next_entries) if old_text in entry] + if not matches: + return False, f"{operation_name}: no entry matched '{old_text}'", current_entries, current_usage + if len({next_entries[i] for i in matches}) > 1: + return ( + False, + f"{operation_name}: old_text matched multiple distinct entries", + current_entries, + current_usage, + ) + next_entries[matches[0]] = content + elif action == "remove": + if not old_text: + return False, f"{operation_name}: old_text is required", current_entries, current_usage + matches = [i for i, entry in enumerate(next_entries) if old_text in entry] + if not matches: + return False, f"{operation_name}: no entry matched '{old_text}'", current_entries, current_usage + if len({next_entries[i] for i in matches}) > 1: + return ( + False, + f"{operation_name}: old_text matched multiple distinct entries", + current_entries, + current_usage, + ) + next_entries.pop(matches[0]) + else: + return ( + False, + f"{operation_name}: unknown action; use add, replace, or remove", + current_entries, + current_usage, + ) + + next_usage_count = measure_memory_entries(next_entries) + if next_usage_count > character_limit: + return ( + False, + f"final memory would be {next_usage_count}/{character_limit} chars; remove or shorten entries in the same batch", + current_entries, + current_usage, + ) + return True, f"Applied {len(operations)} operation(s).", next_entries, f"{next_usage_count}/{character_limit}" + + +async def write_memory( + user_id: str, + workspace: str, + scope: str, + operations: list[dict[str, Any]], +) -> dict[str, Any]: + memory_file = await resolve_memory_file(user_id, workspace, scope) + if scope == "workspace": + await asyncio.to_thread(ensure_cptr_gitignored, workspace) + async with _memory_file_lock(memory_file.path): + entries = await asyncio.to_thread(read_memory_entries, memory_file.path) + success, message, next_entries, usage = apply_memory_batch( + entries, operations, memory_file.character_limit + ) + if not success: + return { + "success": False, + "error": message, + "entries": entries, + "usage": usage, + "scope": scope, + "path": str(memory_file.path), + } + await asyncio.to_thread(write_memory_entries, memory_file.path, next_entries) + return { + "success": True, + "message": message, + "entries": next_entries, + "usage": usage, + "scope": scope, + "path": str(memory_file.path), + } + + +async def remember( + user_id: str, + workspace: str, + scope: str, + operations: list[dict[str, Any]], +) -> dict[str, Any]: + settings = await get_memory_settings() + if not settings["enabled"]: + return {"success": False, "error": "memory writes are disabled"} + return await write_memory(user_id, workspace, scope, operations) + + +async def read_memory_state(user_id: str, workspace: str) -> dict[str, Any]: + settings = await get_memory_settings() + user_memory = await resolve_memory_file(user_id, workspace, "user") + user_entries = await asyncio.to_thread(read_memory_entries, user_memory.path) + workspace_entries: list[str] = [] + workspace_usage = f"0/{settings['workspace_char_limit']}" + workspace_path_value = "" + if workspace: + workspace_memory = await resolve_memory_file(user_id, workspace, "workspace") + workspace_entries = await asyncio.to_thread(read_memory_entries, workspace_memory.path) + workspace_usage = f"{measure_memory_entries(workspace_entries)}/{workspace_memory.character_limit}" + workspace_path_value = str(workspace_memory.path) + return { + "settings": settings, + "user": { + "entries": user_entries, + "usage": f"{measure_memory_entries(user_entries)}/{user_memory.character_limit}", + "path": str(user_memory.path), + }, + "workspace": { + "entries": workspace_entries, + "usage": workspace_usage, + "path": workspace_path_value, + }, + } + + +async def build_memory_prompt(user_id: str | None, workspace: str) -> str: + if not user_id: + return "" + settings = await get_memory_settings() + if not settings["enabled"]: + return "" + state = await read_memory_state(user_id, workspace) + blocks: list[str] = [] + for key, title in (("user", "User Memory"), ("workspace", "Workspace Memory")): + entries = state[key]["entries"] + if not entries: + continue + blocks.append( + f"[{title}] [{state[key]['usage']}]\n" + + "\n".join(f"- {entry}" for entry in entries) + ) + return "\n\n".join(blocks) + + +def summarize_recent_conversation(messages: list[dict[str, Any]], assistant_reply: str) -> str: + recent_messages = messages[-16:] + lines: list[str] = [] + for message in recent_messages: + role = message.get("role", "unknown") + content = message.get("content", "") + if isinstance(content, list): + content = " ".join( + str(block.get("text", "")) for block in content if isinstance(block, dict) + ) + text = str(content).strip() + if len(text) > 1600: + text = text[:1000] + "\n...(truncated)...\n" + text[-400:] + if text: + lines.append(f"{role}: {text}") + if assistant_reply: + text = assistant_reply.strip() + if len(text) > 1600: + text = text[:1000] + "\n...(truncated)...\n" + text[-400:] + lines.append(f"assistant_final: {text}") + return "\n\n".join(lines) + + +def build_memory_review_prompt(memory_state: dict[str, Any], workspace: str, transcript: str) -> str: + return ( + "Review the completed conversation and decide whether cptr should remember " + "stable facts. Return ONLY JSON with this shape:\n" + '{"user": [{"action": "add|replace|remove", "content": "...", "old_text": "..."}], ' + '"workspace": [{"action": "add|replace|remove", "content": "...", "old_text": "..."}]}\n\n' + "Use user memory only for durable user preferences, communication style, repeated " + "corrections, or cross-workspace habits. Use workspace memory only for facts true " + "in the current workspace, such as repo conventions, verification commands, " + "architecture notes, or local tool quirks. If nothing is worth saving, return " + '{"user": [], "workspace": []}.\n\n' + f"Workspace: {workspace}\n\n" + f"Current user memory ({memory_state['user']['usage']}):\n" + + "\n".join(f"- {entry}" for entry in memory_state["user"]["entries"]) + + f"\n\nCurrent workspace memory ({memory_state['workspace']['usage']}):\n" + + "\n".join(f"- {entry}" for entry in memory_state["workspace"]["entries"]) + + f"\n\nConversation:\n{transcript}" + ) + + +async def review_memory_after_turn( + *, + user_id: str, + message_id: str, + workspace: str, + conversation_messages: list[dict[str, Any]], + assistant_reply: str, + model_connection: dict, + model: str, +) -> None: + settings = await get_memory_settings() + if ( + not settings["enabled"] + or not settings["tool_enabled"] + or not settings["background_review_enabled"] + or not assistant_reply.strip() + ): + return + if message_id in _reviewed_messages: + return + user_turns = sum(1 for message in conversation_messages if message.get("role") == "user") + if user_turns <= 0 or user_turns % int(settings["review_interval_turns"]) != 0: + return + _reviewed_messages.add(message_id) + asyncio.create_task( + run_memory_review( + user_id=user_id, + workspace=workspace, + conversation_messages=list(conversation_messages), + assistant_reply=assistant_reply, + model_connection=dict(model_connection), + model=model, + ) + ) + + +async def run_memory_review( + *, + user_id: str, + workspace: str, + conversation_messages: list[dict[str, Any]], + assistant_reply: str, + model_connection: dict, + model: str, +) -> None: + try: + from cptr.utils.ai import chat_completion + from cptr.utils.chat_task import _default_base_url + from cptr.utils.config import _get_jwt_secret + from cptr.utils.crypto import decrypt_key + from cptr.utils.json_parser import extract_json + + memory_state = await read_memory_state(user_id, workspace) + transcript = summarize_recent_conversation(conversation_messages, assistant_reply) + prompt = build_memory_review_prompt(memory_state, workspace, transcript) + provider = model_connection["provider"] + api_key = decrypt_key(model_connection.get("api_key", ""), _get_jwt_secret()) + base_url = model_connection.get("base_url") or _default_base_url(provider) + text = await chat_completion( + provider=provider, + base_url=base_url, + api_key=api_key, + model=model, + messages=[{"role": "user", "content": prompt}], + system="You are cptr's private memory reviewer. Return only valid JSON.", + max_tokens=700, + api_type=model_connection.get("api_type", "chat_completions"), + ) + parsed = extract_json(text) + if not isinstance(parsed, dict): + return + for scope in ("user", "workspace"): + operations = parsed.get(scope) or [] + if not isinstance(operations, list) or not operations: + continue + await remember( + user_id=user_id, + workspace=workspace, + scope=scope, + operations=operations, + ) + except Exception: + return diff --git a/cptr/utils/prompt_templates.py b/cptr/utils/prompt_templates.py index 0d6a1ee..c96c769 100644 --- a/cptr/utils/prompt_templates.py +++ b/cptr/utils/prompt_templates.py @@ -21,10 +21,13 @@ _TEMPLATE_RE = re.compile(r"\{\{(\w+)\}\}") DEFAULT_SYSTEM_PROMPT = ( - "You are cptr, a helpful assistant running inside the user's computer interface. " + "You are Computer (cptr), a helpful assistant running inside the user's computer interface. " "You have access to tools to read, search, and modify files in the workspace, " "run commands, and use configured tools. Use them to help the user directly." + " Approach hard requests with initiative and persistence: make the best possible " + "attempt, adapt as needed, and keep going unless a real constraint prevents progress." "\n\n{{CPTR_CONTEXT}}" + "\n\n{{MEMORY}}" "\n\n{{INSTRUCTIONS}}" "\n\n{{SKILLS}}" "\n\nWorkspace: {{WORKSPACE_NAME}}" @@ -204,7 +207,7 @@ def _render_system_template(template: str, variables: dict[str, str]) -> str: return re.sub(r"\n{3,}", "\n\n", rendered).strip() -def _build_template_variables(workspace: str, model: str = "") -> dict[str, str]: +def _build_template_variables(workspace: str, model: str = "", memory: str = "") -> dict[str, str]: """Build the dict of template variable values for the current context.""" ws_path = Path(workspace) os_name = platform.system().replace("Darwin", "macOS") @@ -215,9 +218,8 @@ def _build_template_variables(workspace: str, model: str = "") -> dict[str, str] instructions_block = ( f"\n{instructions}\n" "\n\nThe above were loaded from instruction files in the workspace root. " - "These files persist across sessions. " - "You can update them with your file tools to save learnings, decisions, or " - "project conventions for future sessions." + "These files persist across sessions and are user-authored workspace instructions. " + "Managed memory is shown separately when available." ) else: instructions_block = "" @@ -230,6 +232,7 @@ def _build_template_variables(workspace: str, model: str = "") -> dict[str, str] "WORKSPACE_PATH": str(ws_path), "FILE_TREE": _get_file_tree(workspace), "INSTRUCTIONS": instructions_block, + "MEMORY": memory, "SKILLS": skills_block, "CPTR_CONTEXT": _format_cptr_context(workspace, model), "RUNTIME_ENV": _runtime_label(), @@ -245,7 +248,7 @@ def _build_template_variables(workspace: str, model: str = "") -> dict[str, str] } -async def load_system_prompt(workspace: str, model: str = "") -> str: +async def load_system_prompt(workspace: str, model: str = "", user_id: str | None = None) -> str: """Load and render the system prompt for a workspace/model. Resolution order: @@ -281,5 +284,17 @@ async def load_system_prompt(workspace: str, model: str = "") -> str: if template is None: template = DEFAULT_SYSTEM_PROMPT - variables = _build_template_variables(workspace, model) + memory = "" + if user_id: + try: + from cptr.utils.memory import build_memory_prompt + + memory = await build_memory_prompt(user_id, workspace) + except Exception: + logger.debug("[memory] Failed to load managed memory", exc_info=True) + + if memory and "{{MEMORY}}" not in template: + template = template.rstrip() + "\n\n{{MEMORY}}" + + variables = _build_template_variables(workspace, model, memory) return _render_system_template(template, variables) diff --git a/cptr/utils/tools.py b/cptr/utils/tools.py index d822014..f0ac7ea 100644 --- a/cptr/utils/tools.py +++ b/cptr/utils/tools.py @@ -12,15 +12,13 @@ from __future__ import annotations import asyncio -import fnmatch import inspect import json import os -import re import time import uuid from pathlib import Path -from typing import Callable, Optional, Pattern, get_type_hints +from typing import Literal, Optional, get_args, get_origin, get_type_hints from cptr.env import CHAT_TOOL_COMMAND_MAX_CHARS, CHAT_TOOL_MAX_CHARS, EXECUTE_TIMEOUT try: @@ -102,7 +100,7 @@ def _rotate_log(log_path: str, log_file) -> tuple: with open(log_path, "r", encoding="utf-8") as f: lines = f.readlines() - keep = lines[len(lines) // 2:] + keep = lines[len(lines) // 2 :] with open(log_path, "w", encoding="utf-8") as f: f.write(json.dumps({"type": "log_rotated", "ts": time.time()}) + "\n") @@ -110,7 +108,7 @@ def _rotate_log(log_path: str, log_file) -> tuple: f.write(line) new_file = open(log_path, "a", encoding="utf-8") - new_size = sum(len(l.encode("utf-8", errors="replace")) for l in keep) + new_size = sum(len(line.encode("utf-8", errors="replace")) for line in keep) return new_file, new_size @@ -130,8 +128,17 @@ async def _collect_bg_output(task_id: str): if log_path: Path(log_path).parent.mkdir(parents=True, exist_ok=True) log_file = open(log_path, "a", encoding="utf-8") - entry = json.dumps({"type": "start", "command": task["command"], - "pid": proc.pid, "ts": time.time()}) + "\n" + entry = ( + json.dumps( + { + "type": "start", + "command": task["command"], + "pid": proc.pid, + "ts": time.time(), + } + ) + + "\n" + ) log_file.write(entry) log_file.flush() log_bytes += len(entry.encode("utf-8", errors="replace")) @@ -157,12 +164,19 @@ async def _collect_bg_output(task_id: str): task["output"].extend(chunk) task["total_bytes"] += len(chunk) if len(task["output"]) > 256 * 1024: - task["output"] = task["output"][-256 * 1024:] + task["output"] = task["output"][-256 * 1024 :] if log_file: - entry = json.dumps({"type": "output", - "data": chunk.decode(errors="replace"), - "ts": time.time()}) + "\n" + entry = ( + json.dumps( + { + "type": "output", + "data": chunk.decode(errors="replace"), + "ts": time.time(), + } + ) + + "\n" + ) entry_size = len(entry.encode("utf-8", errors="replace")) if log_bytes + entry_size > _MAX_LOG_SIZE: log_file, log_bytes = _rotate_log(log_path, log_file) @@ -191,8 +205,7 @@ async def _collect_bg_output(task_id: str): if log_file: log_file.write( - json.dumps({"type": "end", "exit_code": exit_code, - "ts": time.time()}) + "\n" + json.dumps({"type": "end", "exit_code": exit_code, "ts": time.time()}) + "\n" ) log_file.close() @@ -228,7 +241,14 @@ def _truncate_output(text: str, max_chars: int = 80_000) -> str: # โ”€โ”€ Image support โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ IMAGE_EXTENSIONS = { - ".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".tiff", ".tif", + ".png", + ".jpg", + ".jpeg", + ".gif", + ".webp", + ".bmp", + ".tiff", + ".tif", } _IMAGE_MAX_BYTES = 5 * 1024 * 1024 # 5 MB target for API payload @@ -351,7 +371,9 @@ def _read(): e = min(total, end_line) if end_line > 0 else total selected = lines[s:e] numbered = [f"{i + s + 1}: {line}" for i, line in enumerate(selected)] - return f"File: {path} | Lines {s + 1}-{e} of {total}\n" + "\n".join(numbered) + return f"File: {path} | Lines {s + 1}-{e} of {total}\n" + "\n".join( + numbered + ) capped = lines[:800] numbered = [f"{i + 1}: {line}" for i, line in enumerate(capped)] header = f"File: {path} | Total lines: {total}" @@ -603,12 +625,14 @@ def _write_artifact(): rel_path = str(artifact_path.relative_to(Path(workspace))) display_title = artifact_type.replace("_", " ").title() - return json.dumps({ - "artifact_type": artifact_type, - "title": display_title, - "path": rel_path, - "bytes": len(content), - }) + return json.dumps( + { + "artifact_type": artifact_type, + "title": display_title, + "path": rel_path, + "bytes": len(content), + } + ) if not path: return "Error: path is required when artifact_type is not set." @@ -654,12 +678,14 @@ def _write(): display_title = title or artifact_type.replace("_", " ").title() rel_path = str(artifact_path.relative_to(Path(workspace))) - return json.dumps({ - "artifact_type": artifact_type, - "title": display_title, - "path": rel_path, - "bytes": len(content), - }) + return json.dumps( + { + "artifact_type": artifact_type, + "title": display_title, + "path": rel_path, + "bytes": len(content), + } + ) async def write_file(path: str, content: str, *, workspace: str) -> str: @@ -882,14 +908,13 @@ async def run_command( status = "running" return ( - f"Task {task_id}: {status}\n" - f"Command: {command}\n" - f"next_offset: {next_offset}\n" - f"---\n{output}" + f"Task {task_id}: {status}\nCommand: {command}\nnext_offset: {next_offset}\n---\n{output}" ) -async def check_task(task_id: str, offset: int = 0, wait: Optional[int] = None, *, workspace: str) -> str: +async def check_task( + task_id: str, offset: int = 0, wait: Optional[int] = None, *, workspace: str +) -> str: """Check status and recent output of a background task. :param task_id: The task ID returned by run_command. :param offset: Byte offset from previous check. Pass next_offset from the last response to get only new output. @@ -1092,14 +1117,16 @@ async def create_automation( is_active=True, created_at=now_ns, ) - return json.dumps({ - "status": "success", - "id": automation.id, - "name": automation.name, - "model_id": automation.model_id, - "is_active": automation.is_active, - "next_runs": next_n_runs_ns(rrule), - }) + return json.dumps( + { + "status": "success", + "id": automation.id, + "name": automation.name, + "model_id": automation.model_id, + "is_active": automation.is_active, + "next_runs": next_n_runs_ns(rrule), + } + ) except Exception as e: return json.dumps({"error": str(e)}) @@ -1129,16 +1156,18 @@ async def list_automations( ) automations = [] for item in items: - automations.append({ - "id": item.id, - "name": item.name, - "prompt_snippet": item.prompt[:100] + ("..." if len(item.prompt) > 100 else ""), - "model_id": item.model_id, - "rrule": item.rrule, - "is_active": item.is_active, - "last_run_at": item.last_run_at, - "next_runs": next_n_runs_ns(item.rrule), - }) + automations.append( + { + "id": item.id, + "name": item.name, + "prompt_snippet": item.prompt[:100] + ("..." if len(item.prompt) > 100 else ""), + "model_id": item.model_id, + "rrule": item.rrule, + "is_active": item.is_active, + "last_run_at": item.last_run_at, + "next_runs": next_n_runs_ns(item.rrule), + } + ) return json.dumps({"automations": automations, "total": total}) except Exception as e: return json.dumps({"error": str(e)}) @@ -1193,12 +1222,14 @@ async def update_automation( return json.dumps({"error": "Failed to update automation"}) final_rrule = rrule or automation.rrule - return json.dumps({ - "status": "success", - "id": automation_id, - "updated_fields": list(kwargs.keys()), - "next_runs": next_n_runs_ns(final_rrule), - }) + return json.dumps( + { + "status": "success", + "id": automation_id, + "updated_fields": list(kwargs.keys()), + "next_runs": next_n_runs_ns(final_rrule), + } + ) except Exception as e: return json.dumps({"error": str(e)}) @@ -1224,12 +1255,14 @@ async def toggle_automation( if not toggled: return json.dumps({"error": "Failed to toggle automation"}) - return json.dumps({ - "status": "success", - "id": toggled.id, - "name": toggled.name, - "is_active": toggled.is_active, - }) + return json.dumps( + { + "status": "success", + "id": toggled.id, + "name": toggled.name, + "is_active": toggled.is_active, + } + ) except Exception as e: return json.dumps({"error": str(e)}) @@ -1299,18 +1332,22 @@ async def _get_browser_config() -> dict: "enabled": await Config.get("browser.enabled") or False, "provider": await Config.get("browser.provider") or "local", "cdp_url": await Config.get("browser.cdp_url") or "http://localhost:9222", - "auto_launch": await Config.get("browser.auto_launch") if await Config.get("browser.auto_launch") is not None else True, + "auto_launch": await Config.get("browser.auto_launch") + if await Config.get("browser.auto_launch") is not None + else True, "session_timeout": int(await Config.get("browser.session_timeout_minutes") or 10), "firecrawl_api_key": await Config.get("browser.firecrawl_api_key") or "", - "firecrawl_base_url": await Config.get("browser.firecrawl_base_url") or "https://api.firecrawl.dev", + "firecrawl_base_url": await Config.get("browser.firecrawl_base_url") + or "https://api.firecrawl.dev", "browser_use_api_key": await Config.get("browser.browser_use_api_key") or "", - "browser_use_base_url": await Config.get("browser.browser_use_base_url") or "https://api.browser-use.com", + "browser_use_base_url": await Config.get("browser.browser_use_base_url") + or "https://api.browser-use.com", } except Exception: return {"enabled": False, "provider": "local"} -async def _get_cdp_session(chat_id: str) -> "CDPClient": +async def _get_cdp_session(chat_id: str): """Get or create a CDP session for the current chat.""" cfg = await _get_browser_config() cdp_url = cfg["cdp_url"] @@ -1348,7 +1385,9 @@ async def browser_navigate(url: str, *, __context__: dict) -> str: return "Error: Browser-Use API key not configured. Set it in Settings > Browser." from cptr.utils.browser.browser_use import browse - result = await browse(f"Navigate to {url} and describe what you see", key, cfg.get("browser_use_base_url", "")) + result = await browse( + f"Navigate to {url} and describe what you see", key, cfg.get("browser_use_base_url", "") + ) return f"Navigated to {url} (via Browser-Use)\n\n{result}" # Local CDP @@ -1402,8 +1441,7 @@ async def browser_type(ref: str, text: str, *, __context__: dict) -> str: async def browser_screenshot(*, __context__: dict) -> str: - """Take a screenshot of the current browser page. Saves the image to the workspace. - """ + """Take a screenshot of the current browser page. Saves the image to the workspace.""" cfg = await _get_browser_config() if cfg.get("provider", "local") != "local": return "Error: browser_screenshot requires Local CDP provider." @@ -1439,6 +1477,312 @@ async def browser_evaluate(javascript: str, *, __context__: dict) -> str: return await client.evaluate(javascript) +async def image_generate( + prompt: str, + image: Optional[str] = None, + images: Optional[list[str]] = None, + size: Optional[str] = None, + n: int = 1, + background: Optional[str] = None, + *, + __context__: dict, +) -> str: + """Generate or edit image files from a prompt. + :param prompt: Detailed description of the image to create or the edits to make. + :param image: Optional source image file id, /api/files/... URL, or workspace path for edit mode. + :param images: Optional source image file ids, /api/files/... URLs, or workspace paths for edit mode. + :param size: Optional image size, such as 1024x1024. + :param n: Number of images to create, from 1 to 4. + :param background: Optional background setting supported by the image provider. + """ + image_refs: list[str] = [] + if image: + image_refs.append(image) + if images: + image_refs.extend(images) + + if image_refs: + from cptr.utils.images import edit_images + + results = await edit_images( + prompt, + image_refs, + user_id=__context__.get("user_id"), + size=size, + n=n, + background=background, + workspace=__context__.get("workspace"), + ) + kind = "edit" + else: + from cptr.utils.images import generate_images + + results = await generate_images( + prompt, + user_id=__context__.get("user_id"), + size=size, + n=n, + workspace=__context__.get("workspace"), + ) + kind = "generation" + + return json.dumps( + { + "status": "success", + "kind": kind, + "images": [result.as_dict() for result in results], + }, + ensure_ascii=False, + ) + + +async def update_memory( + scope: Literal["user", "workspace"], + operations: list[dict], + *, + __context__: dict, +) -> str: + """Save durable memories about the user or current workspace. + + Use user memory for stable preferences, communication style, and cross-workspace + facts. Use workspace memory for repo-specific conventions, verification + commands, architecture notes, and local tool quirks. Make all changes in one + operations array so removals/replacements and additions apply atomically. + :param scope: "user" for global per-user memory, or "workspace" for the current workspace only. + :param operations: Batch of {action, content?, old_text?}; action is add, replace, or remove. + """ + from cptr.utils.memory import get_memory_settings, remember + + user_id = __context__.get("user_id") + workspace = __context__.get("workspace", "") + if not user_id: + return json.dumps({"success": False, "error": "user_id missing from tool context"}) + if not isinstance(operations, list): + return json.dumps({"success": False, "error": "operations must be a list"}) + settings = await get_memory_settings() + if not settings.get("tool_enabled", True): + return json.dumps({"success": False, "error": "memory tool is disabled"}) + + result = await remember( + user_id=user_id, + workspace=workspace, + scope=scope, + operations=operations, + ) + return json.dumps(result, ensure_ascii=False) + + +def _shape_chat_search_result(row: dict) -> dict: + meta = row.get("meta") or {} + return { + "chat_id": row.get("id"), + "title": row.get("title"), + "workspace": meta.get("workspace", ""), + "updated_at": row.get("updated_at"), + "created_at": row.get("created_at"), + "match_type": row.get("match_type"), + "snippet": row.get("snippet"), + "matched_message_id": row.get("matched_message_id"), + "matched_role": row.get("matched_role"), + } + + +def _shape_chat_tool_message(message) -> dict: + payload = { + "id": message.id, + "role": message.role, + "content": message.content, + "created_at": message.created_at, + } + if message.model: + payload["model"] = message.model + if message.meta: + payload["meta"] = message.meta + return payload + + +async def search_chats( + query: str = "", + chat_id: str = "", + around_message_id: str = "", + window: int = 5, + limit: int = 5, + workspace_scope: Literal["current", "all"] = "current", + include_subagents: bool = False, + *, + __context__: dict, +) -> str: + """Search or read prior chats from cptr's existing chat history. + + With no args, browse recent chats. Pass query to search previous chats. + Pass chat_id to read a bounded transcript. Pass chat_id plus around_message_id + to read a window around a specific message. + :param query: Text to search in chat ids, titles, summaries, and message content. + :param chat_id: Chat id to read directly. + :param around_message_id: Message id to center a window on when chat_id is set. + :param window: Number of messages before and after around_message_id, from 1 to 20. + :param limit: Maximum chats to return for browse/search, from 1 to 10. + :param workspace_scope: "current" searches only this workspace; "all" searches every workspace owned by the user. + :param include_subagents: Include delegated sub-agent chats in browse/search/read results. + """ + from sqlalchemy import select + + from cptr.models import Chat, ChatMessage + from cptr.utils.db import get_db + + user_id = __context__.get("user_id") + current_chat_id = __context__.get("chat_id") + current_workspace = __context__.get("workspace", "") + if not user_id: + return json.dumps({"success": False, "error": "user_id missing from tool context"}) + + try: + limit = max(1, min(int(limit), 10)) + except (TypeError, ValueError): + limit = 5 + try: + window = max(1, min(int(window), 20)) + except (TypeError, ValueError): + window = 5 + + workspace = current_workspace if workspace_scope == "current" else None + + async def get_allowed_chat(cid: str): + chat = await Chat.get_by_id(cid) + if not chat or chat.user_id != user_id: + return None, "chat not found" + meta = chat.meta or {} + if not include_subagents and meta.get("subagent"): + return None, "chat is a sub-agent chat" + if workspace and meta.get("workspace") != workspace: + return None, "chat is outside the current workspace" + return chat, None + + if chat_id and around_message_id: + chat, error = await get_allowed_chat(chat_id) + if error: + return json.dumps({"success": False, "error": error, "chat_id": chat_id}) + messages = await ChatMessage.get_all_by_chat(chat_id) + anchor_index = next( + (idx for idx, message in enumerate(messages) if message.id == around_message_id), + -1, + ) + if anchor_index < 0: + return json.dumps( + { + "success": False, + "error": "around_message_id not found in chat", + "chat_id": chat_id, + "around_message_id": around_message_id, + } + ) + start = max(0, anchor_index - window) + end = min(len(messages), anchor_index + window + 1) + return json.dumps( + { + "success": True, + "mode": "window", + "chat_id": chat_id, + "title": chat.title, + "around_message_id": around_message_id, + "messages_before": start, + "messages_after": len(messages) - end, + "messages": [_shape_chat_tool_message(message) for message in messages[start:end]], + }, + ensure_ascii=False, + ) + + if chat_id: + chat, error = await get_allowed_chat(chat_id) + if error: + return json.dumps({"success": False, "error": error, "chat_id": chat_id}) + messages = await ChatMessage.get_all_by_chat(chat_id) + head = 20 + tail = 10 + truncated = len(messages) > head + tail + visible = messages[:head] + messages[-tail:] if truncated else messages + return json.dumps( + { + "success": True, + "mode": "read", + "chat_id": chat_id, + "title": chat.title, + "workspace": (chat.meta or {}).get("workspace", ""), + "message_count": len(messages), + "truncated": truncated, + "messages": [_shape_chat_tool_message(message) for message in visible], + "hint": ( + "Pass chat_id plus around_message_id from one of these messages to inspect the middle." + if truncated + else None + ), + }, + ensure_ascii=False, + ) + + if query.strip(): + rows = await Chat.search_by_text( + user_id=user_id, + query=query, + limit=limit + 1, + workspace=workspace, + include_subagents=include_subagents, + ) + results = [ + _shape_chat_search_result(row) for row in rows if row.get("id") != current_chat_id + ][:limit] + return json.dumps( + { + "success": True, + "mode": "search", + "query": query, + "workspace_scope": workspace_scope, + "results": results, + "count": len(results), + }, + ensure_ascii=False, + ) + + async with await get_db() as db: + result = await db.execute( + select(Chat).where(Chat.user_id == user_id).order_by(Chat.updated_at.desc()) + ) + chats = list(result.scalars().all()) + + recent = [] + for chat in chats: + meta = chat.meta or {} + if chat.id == current_chat_id: + continue + if not include_subagents and meta.get("subagent"): + continue + if workspace and meta.get("workspace") != workspace: + continue + recent.append( + { + "chat_id": chat.id, + "title": chat.title, + "workspace": meta.get("workspace", ""), + "updated_at": chat.updated_at, + "created_at": chat.created_at, + "summary": chat.summary, + } + ) + if len(recent) >= limit: + break + + return json.dumps( + { + "success": True, + "mode": "browse", + "workspace_scope": workspace_scope, + "results": recent, + "count": len(recent), + }, + ensure_ascii=False, + ) + + # โ”€โ”€ Registry โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ TOOLS: dict[str, dict] = { @@ -1449,6 +1793,7 @@ async def browser_evaluate(javascript: str, *, __context__: dict) -> str: "check_task": {"fn": check_task, "auto": True}, "web_search": {"fn": web_search, "auto": True}, "read_url": {"fn": read_url, "auto": True}, + "search_chats": {"fn": search_chats, "auto": True}, "list_automations": {"fn": list_automations, "auto": True}, "view_skill": {"fn": view_skill, "auto": True}, # Write / mutate (require approval unless auto_approve_all) @@ -1463,6 +1808,8 @@ async def browser_evaluate(javascript: str, *, __context__: dict) -> str: "update_automation": {"fn": update_automation, "auto": False}, "toggle_automation": {"fn": toggle_automation, "auto": False}, "delete_automation": {"fn": delete_automation, "auto": False}, + "image_generate": {"fn": image_generate, "auto": False}, + "update_memory": {"fn": update_memory, "auto": True}, } # Browser tools โ€” conditionally included in schemas based on browser.enabled @@ -1498,10 +1845,12 @@ async def _get_subagent_config() -> dict: return { "max_concurrent": int(await Config.get("subagents.max_concurrent") or 3), + "background_enabled": (await Config.get("subagents.background_enabled")) + in (True, "true", "1"), + "max_async": int(await Config.get("subagents.max_async") or 3), "max_iterations": int(await Config.get("subagents.max_iterations") or 30), "max_output": int(await Config.get("subagents.max_output") or 30_000), - "system_prompt": (await Config.get("subagents.system_prompt")) - or _DEFAULT_SUBAGENT_SYSTEM, + "system_prompt": (await Config.get("subagents.system_prompt")) or _DEFAULT_SUBAGENT_SYSTEM, } @@ -1515,16 +1864,89 @@ def _truncate_output(text: str, max_chars: int) -> str: async def delegate_task( task: str, context: str = "", + background: bool = False, *, __context__: dict, ) -> str: - """Delegate a task to a sub-agent. The sub-agent has full access to read, write, edit files, and run commands. Use for parallel work โ€” call multiple times in one response to run tasks concurrently. + """Delegate a task to a sub-agent. Use background=true for long-running independent work. :param task: What the sub-agent should do. :param context: Optional context (e.g. relevant file paths, decisions made so far). + :param background: Return a handle immediately and inject the final result into this chat when done. """ global _subagent_semaphore config = await _get_subagent_config() + + if background: + if not config["background_enabled"]: + return "Error: background sub-agents are disabled in settings." + + from cptr.utils.async_subagents import ( + attach_subagent_chat, + fail_reserved_subagent, + reserve_async_subagent, + start_async_subagent, + ) + + reserve = await reserve_async_subagent( + config["max_async"], + task=task, + context=context, + workspace=__context__["workspace"], + user_id=__context__["user_id"], + parent_chat_id=__context__["chat_id"], + parent_message_id=__context__.get("message_id"), + connection=__context__["connection"], + model=__context__["model_id"], + model_id=__context__.get("full_model_id") or __context__["model_id"], + ) + if reserve.get("status") == "rejected": + return f"Error: {reserve['error']}" + + delegation_id = reserve["delegation_id"] + try: + chat, assistant_msg = await _create_subagent_chat( + task=task, + context=context, + workspace=__context__["workspace"], + model=__context__["model_id"], + user_id=__context__["user_id"], + parent_chat_id=__context__["chat_id"], + config=config, + delegation_id=delegation_id, + ) + except Exception as e: + await fail_reserved_subagent(delegation_id, str(e)) + return f"Error: failed to create background sub-agent: {e}" + + await attach_subagent_chat( + delegation_id, + subagent_chat_id=chat.id, + subagent_message_id=assistant_msg.id, + ) + + async def _runner() -> str: + return await _run_existing_subagent_chat( + assistant_msg_id=assistant_msg.id, + chat_id=chat.id, + workspace=__context__["workspace"], + connection=__context__["connection"], + model=__context__["model_id"], + user_id=__context__["user_id"], + config=config, + ) + + await start_async_subagent(delegation_id, _runner) + return json.dumps( + { + "status": "dispatched", + "delegation_id": delegation_id, + "subagent_chat_id": chat.id, + "mode": "background", + "task": task, + } + ) + if _subagent_semaphore is None: _subagent_semaphore = asyncio.Semaphore(config["max_concurrent"]) @@ -1541,40 +1963,40 @@ async def delegate_task( ) -async def _run_subagent_chat( +async def _create_subagent_chat( task: str, context: str, workspace: str, - connection: dict, model: str, user_id: str, parent_chat_id: str, config: dict, -) -> str: - """Create a real chat and run the agent loop on it.""" + delegation_id: str | None = None, +): + """Create the real chat/messages used by a sub-agent.""" from cptr.models import Chat, ChatMessage - from cptr.utils.chat_task import run_chat_task + from cptr.utils.chat_export import export_chat_to_file from cptr.utils.config import now_ms - # Build the user message content user_content = f"{task}\n\n## Context\n{context}" if context else task + meta = { + "workspace": workspace, + "subagent": True, + "parent_chat_id": parent_chat_id, + "params": { + "tool_approval_mode": "full", # auto-approve all tools + }, + } + if delegation_id: + meta["delegation_id"] = delegation_id - # Create a real chat, marked as a sub-agent chat = await Chat.create( user_id=user_id, title=f"Sub-agent: {task[:60]}", - meta={ - "workspace": workspace, - "subagent": True, - "parent_chat_id": parent_chat_id, - "params": { - "tool_approval_mode": "full", # auto-approve all tools - }, - }, + meta=meta, created_at=now_ms(), ) - # Create user message user_msg = await ChatMessage.create( chat_id=chat.id, role="user", @@ -1582,7 +2004,6 @@ async def _run_subagent_chat( created_at=now_ms(), ) - # Create empty assistant message assistant_msg = await ChatMessage.create( chat_id=chat.id, role="assistant", @@ -1594,24 +2015,69 @@ async def _run_subagent_chat( ) await Chat.update_current_message(chat.id, assistant_msg.id, now_ms()) + await export_chat_to_file(chat.id) + return chat, assistant_msg + + +async def _run_existing_subagent_chat( + assistant_msg_id: str, + chat_id: str, + workspace: str, + connection: dict, + model: str, + user_id: str, + config: dict, +) -> str: + """Run the agent loop for an already-created sub-agent chat.""" + from cptr.models import ChatMessage + from cptr.utils.chat_task import run_chat_task - # Run the SAME agent loop as a normal chat โ€” no special code await run_chat_task( - message_id=assistant_msg.id, - chat_id=chat.id, + message_id=assistant_msg_id, + chat_id=chat_id, user_id=user_id, connection=connection, workspace=workspace, model=model, ) - # Read back the completed message - result_msg = await ChatMessage.get_by_id(assistant_msg.id) + result_msg = await ChatMessage.get_by_id(assistant_msg_id) output = result_msg.content if result_msg else "Sub-agent produced no output." return _truncate_output(output, config["max_output"]) +async def _run_subagent_chat( + task: str, + context: str, + workspace: str, + connection: dict, + model: str, + user_id: str, + parent_chat_id: str, + config: dict, +) -> str: + """Create a real chat and run the agent loop on it.""" + chat, assistant_msg = await _create_subagent_chat( + task=task, + context=context, + workspace=workspace, + model=model, + user_id=user_id, + parent_chat_id=parent_chat_id, + config=config, + ) + return await _run_existing_subagent_chat( + assistant_msg_id=assistant_msg.id, + chat_id=chat.id, + workspace=workspace, + connection=connection, + model=model, + user_id=user_id, + config=config, + ) + + SUBAGENT_TOOLS: dict[str, dict] = { "delegate_task": {"fn": delegate_task, "auto": True}, } @@ -1700,9 +2166,7 @@ async def _load_tool_servers() -> dict: if not command: continue - client = await stdio_manager.get_client( - server_id, command, args, env, cwd - ) + client = await stdio_manager.get_client(server_id, command, args, env, cwd) for spec in await client.list_tool_specs(): prefixed = f"{server_id}_{spec['name']}" tools[prefixed] = { @@ -1832,7 +2296,7 @@ async def _execute_external_tool(name: str, args: dict) -> str: def _unwrap_optional(hint): """If hint is Optional[X] (Union[X, None]), return X.""" - args = getattr(hint, '__args__', None) + args = getattr(hint, "__args__", None) if args and type(None) in args: real = [a for a in args if a is not type(None)] if len(real) == 1: @@ -1840,6 +2304,22 @@ def _unwrap_optional(hint): return hint +def _schema_for_type(hint) -> dict: + origin = get_origin(hint) + if origin is list: + args = get_args(hint) + item_hint = _unwrap_optional(args[0]) if args else str + return { + "type": "array", + "items": _schema_for_type(item_hint), + } + if origin is dict or hint is dict: + return {"type": "object", "additionalProperties": True} + if origin is Literal: + return {"type": "string", "enum": list(get_args(hint))} + return {"type": _TYPE_MAP.get(hint, "string")} # type: ignore[arg-type] + + def _parse_param_descriptions(docstring: str) -> dict[str, str]: """Extract :param name: description lines from docstring.""" descs: dict[str, str] = {} @@ -1870,8 +2350,7 @@ def _fn_to_schema(name: str, fn) -> dict: continue raw_hint = hints.get(pname) hint = _unwrap_optional(raw_hint) if raw_hint else raw_hint - ptype = _TYPE_MAP.get(hint, "string") # type: ignore[arg-type] - prop: dict = {"type": ptype} + prop: dict = _schema_for_type(hint) if pname in param_descs: prop["description"] = param_descs[pname] if param.default is not inspect.Parameter.empty: @@ -1891,6 +2370,24 @@ def _fn_to_schema(name: str, fn) -> dict: } +def _without_background_param(schema: dict) -> dict: + """Return a delegate_task schema copy without the background option.""" + if schema.get("name") != "delegate_task": + return schema + schema = { + **schema, + "parameters": { + **schema.get("parameters", {}), + "properties": dict(schema.get("parameters", {}).get("properties", {})), + }, + } + schema["parameters"]["properties"].pop("background", None) + required = schema["parameters"].get("required") + if isinstance(required, list): + schema["parameters"]["required"] = [r for r in required if r != "background"] + return schema + + async def get_tool_list() -> list[dict]: """Return tool schemas for the LLM. @@ -1898,17 +2395,37 @@ async def get_tool_list() -> list[dict]: and external tool server tools when configured. """ tools = dict(TOOLS) + background_subagents_enabled = False try: from cptr.models import Config + memory_enabled = (await Config.get("memory.enabled")) not in (False, "false", "0") + if not memory_enabled: + tools.pop("update_memory", None) if (await Config.get("browser.enabled")) in (True, "true", "1"): tools.update(BROWSER_TOOLS) if (await Config.get("subagents.enabled")) in (True, "true", "1"): tools.update(SUBAGENT_TOOLS) + background_subagents_enabled = (await Config.get("subagents.background_enabled")) in ( + True, + "true", + "1", + ) + images_generation_enabled = (await Config.get("images.generation_enabled")) in ( + True, + "true", + "1", + ) + images_edit_enabled = (await Config.get("images.edit_enabled")) in (True, "true", "1") + if not (images_generation_enabled or images_edit_enabled): + tools.pop("image_generate", None) except Exception: + tools.pop("image_generate", None) pass schemas = [_fn_to_schema(name, t["fn"]) for name, t in tools.items()] + if not background_subagents_enabled: + schemas = [_without_background_param(s) for s in schemas] # Add external tool server schemas try: diff --git a/pyproject.toml b/pyproject.toml index b27b2ab..aabcd9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cptr" -version = "0.5.6" +version = "0.6.0" description = "Your computer, from anywhere. Code, manage, and control your machine from the web." license = {file = "LICENSE"} readme = "README.md"