From 4a4a1a7ee51121c66901d5e1a54b937e08293142 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Wed, 27 May 2026 11:16:00 +0000 Subject: [PATCH 01/10] feat(openab-agent): add text streaming via SSE for both providers - LlmProvider::chat() now accepts an optional TextCallback that receives text chunks as they arrive from the LLM - AnthropicProvider: switch to stream:true, parse SSE events (content_block_delta/text_delta), invoke callback per chunk - OpenAiProvider: parse SSE line-by-line (response.output_text.delta), invoke callback per chunk instead of collecting full response - Agent::run() forwards the callback to the provider - ACP server emits session/update notifications per text chunk, enabling real-time streaming to Discord - Set agentCapabilities.streaming = true in initialize response - Add reqwest 'stream' feature, tokio-util, futures-util deps - Add test: test_agent_streams_text_via_callback --- openab-agent/Cargo.lock | 176 +++++++++++++++++++++ openab-agent/Cargo.toml | 4 +- openab-agent/src/acp.rs | 56 ++++--- openab-agent/src/agent.rs | 77 ++++++--- openab-agent/src/llm.rs | 325 +++++++++++++++++++++++--------------- 5 files changed, 457 insertions(+), 181 deletions(-) diff --git a/openab-agent/Cargo.lock b/openab-agent/Cargo.lock index 5f878017f..db0f200ac 100644 --- a/openab-agent/Cargo.lock +++ b/openab-agent/Cargo.lock @@ -85,6 +85,15 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +[[package]] +name = "block-buffer" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdd35008169921d80bc60d3d0ab416eecb028c4cd653352907921d95084790be" +dependencies = [ + "hybrid-array", +] + [[package]] name = "bumpalo" version = "3.20.3" @@ -165,6 +174,41 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" +[[package]] +name = "const-oid" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" + +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6e4c961d6cd6c9a86db418387425e8bdeaf05b3c8bc1411e6dca4c252f1453" +dependencies = [ + "hybrid-array", +] + +[[package]] +name = "digest" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2" +dependencies = [ + "block-buffer", + "const-oid", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -234,6 +278,29 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + [[package]] name = "futures-task" version = "0.3.32" @@ -247,7 +314,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -352,6 +423,15 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "hybrid-array" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9155a582abd142abc056962c29e3ce5ff2ad5469f4246b537ed42c5deba857da" +dependencies = [ + "typenum", +] + [[package]] name = "hyper" version = "1.9.0" @@ -538,6 +618,25 @@ version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" +[[package]] +name = "is-docker" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928bae27f42bc99b60d9ac7334e3a21d10ad8f1835a4e12ec3ec0464765ed1b3" +dependencies = [ + "once_cell", +] + +[[package]] +name = "is-wsl" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "173609498df190136aa7dea1a91db051746d339e18476eed5ca40521f02d7aa5" +dependencies = [ + "is-docker", + "once_cell", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -660,20 +759,39 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "open" +version = "5.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fbaa89d2ddc8473c78a3adf69eea8cffa28c483b8e02a971ef31527cd0fc92c" +dependencies = [ + "is-wsl", + "libc", + "pathdiff", +] + [[package]] name = "openab-agent" version = "0.1.0" dependencies = [ "anyhow", + "base64", "clap", + "futures-core", + "getrandom 0.4.2", "libc", + "open", "reqwest", "serde", "serde_json", + "sha2", "tempfile", "tokio", + "tokio-util", "tracing", "tracing-subscriber", + "url", + "urlencoding", "uuid", ] @@ -700,6 +818,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "pathdiff" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" + [[package]] name = "percent-encoding" version = "2.3.2" @@ -889,6 +1013,7 @@ dependencies = [ "base64", "bytes", "futures-core", + "futures-util", "http", "http-body", "http-body-util", @@ -908,12 +1033,14 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls", + "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots", ] @@ -1065,6 +1192,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "446ba717509524cb3f22f17ecc096f10f4822d76ab5c0b9822c5f9c284e825f4" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1266,6 +1404,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower" version = "0.5.3" @@ -1378,6 +1529,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" + [[package]] name = "unicode-ident" version = "1.0.24" @@ -1408,6 +1565,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -1547,6 +1710,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" diff --git a/openab-agent/Cargo.toml b/openab-agent/Cargo.toml index f059cfc6a..a90b6c3fc 100644 --- a/openab-agent/Cargo.toml +++ b/openab-agent/Cargo.toml @@ -9,7 +9,9 @@ description = "Native Rust coding agent with built-in ACP support" tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "stream"] } +tokio-util = { version = "0.7", features = ["io"] } +futures-util = "0.3" anyhow = "1" uuid = { version = "1", features = ["v4"] } clap = { version = "4", features = ["derive"] } diff --git a/openab-agent/src/acp.rs b/openab-agent/src/acp.rs index 38054f25d..9386cb650 100644 --- a/openab-agent/src/acp.rs +++ b/openab-agent/src/acp.rs @@ -1,9 +1,10 @@ use crate::agent::Agent; -use crate::llm::AnthropicProvider; +use crate::llm::{AnthropicProvider, TextCallback}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::io::{self, BufRead, Write}; +use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; use uuid::Uuid; @@ -113,7 +114,7 @@ impl AcpServer { "version": env!("CARGO_PKG_VERSION") }, "agentCapabilities": { - "streaming": false, + "streaming": true, "loadSession": false } })), @@ -125,7 +126,6 @@ impl AcpServer { fn handle_session_new(&mut self, id: u64) -> String { let session_id = Uuid::new_v4().to_string(); - // Respect OPENAB_AGENT_PROVIDER if set, otherwise auto-detect let provider_choice = std::env::var("OPENAB_AGENT_PROVIDER").unwrap_or_default(); let provider: Box = match provider_choice.as_str() { "anthropic" => match AnthropicProvider::from_env() { @@ -137,7 +137,6 @@ impl AcpServer { Err(e) => return self.error_response(id, -32000, &e), }, _ => { - // Auto-detect: try API key first, then OAuth token match AnthropicProvider::from_env() { Ok(p) => Box::new(p), Err(_) => match crate::llm::OpenAiProvider::from_auth_store() { @@ -193,24 +192,37 @@ impl AcpServer { } }; - let mut output_lines = Vec::new(); + // Collect streaming notifications in a buffer. The callback writes + // session/update notifications as text chunks arrive from the LLM. let session_id_owned = session_id.to_string(); + let notifications: Arc>> = Arc::new(Mutex::new(Vec::new())); + let notif_clone = notifications.clone(); + let sid = session_id_owned.clone(); - match agent.run(&prompt_text).await { - Ok(response_text) => { - let notification = serde_json::to_string(&JsonRpcNotification { - jsonrpc: "2.0", - method: "session/update".to_string(), - params: json!({ - "sessionId": session_id_owned, - "update": { - "sessionUpdate": "agent_message_chunk", - "content": { "type": "text", "text": response_text } - } - }), - }) - .unwrap(); - output_lines.push(notification); + let cb: TextCallback = Box::new(move |text: &str| { + let notification = serde_json::to_string(&JsonRpcNotification { + jsonrpc: "2.0", + method: "session/update".to_string(), + params: json!({ + "sessionId": sid, + "update": { + "sessionUpdate": "agent_message_chunk", + "content": { "type": "text", "text": text } + } + }), + }) + .unwrap(); + notif_clone.lock().unwrap().push(notification); + }); + + let result = agent.run(&prompt_text, Some(&cb)).await; + + let mut output_lines: Vec = notifications.lock().unwrap().drain(..).collect(); + + match result { + Ok(_response_text) => { + // Text was already streamed via notifications above. + // Send final response to signal completion. output_lines.push(self.ok_response(id, json!({ "stopReason": "end_turn" }))); } Err(e) => { @@ -254,12 +266,11 @@ mod tests { assert_eq!(resp["jsonrpc"], "2.0"); assert_eq!(resp["id"], 1); assert_eq!(resp["result"]["agentInfo"]["name"], "openab-agent"); - assert_eq!(resp["result"]["agentCapabilities"]["streaming"], false); + assert_eq!(resp["result"]["agentCapabilities"]["streaming"], true); } #[test] fn test_session_new() { - // Set a fake key so from_env() succeeds in CI unsafe { std::env::set_var("ANTHROPIC_API_KEY", "test-key") }; let mut server = AcpServer::new(); let resp_str = server.handle_session_new(2); @@ -271,7 +282,6 @@ mod tests { #[test] fn test_session_new_missing_key() { - // Ensure no OAuth token exists either let auth_path = std::path::PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string())) .join(".openab/agent/auth.json"); diff --git a/openab-agent/src/agent.rs b/openab-agent/src/agent.rs index 01ec99f44..5cfb5dac3 100644 --- a/openab-agent/src/agent.rs +++ b/openab-agent/src/agent.rs @@ -2,7 +2,7 @@ use anyhow::Result; use std::path::PathBuf; use tracing::{debug, info}; -use crate::llm::{ContentBlock, LlmEvent, LlmProvider, Message, ToolDef}; +use crate::llm::{ContentBlock, LlmEvent, LlmProvider, Message, TextCallback, ToolDef}; use crate::tools; const SYSTEM_PROMPT: &str = r#"You are openab-agent, a coding assistant. You help users by reading, writing, and editing files, and running shell commands. @@ -52,8 +52,6 @@ impl Agent { } } - /// Run the agent with a user prompt, executing tool calls until completion. - /// Returns the final text response. fn build_system_prompt(working_dir: &str) -> String { let agents_md = std::path::Path::new(working_dir).join("AGENTS.md"); let custom = std::fs::read_to_string(&agents_md).unwrap_or_default(); @@ -72,8 +70,13 @@ impl Agent { } } - pub async fn run(&mut self, prompt: &str) -> Result { - // Add user message + /// Run the agent with a user prompt, executing tool calls until completion. + /// Returns the final text response. + /// + /// If `on_text` is provided, text chunks are streamed to the callback as + /// they arrive from the LLM — enabling real-time output before the full + /// response is assembled. + pub async fn run(&mut self, prompt: &str, on_text: Option<&TextCallback>) -> Result { self.messages.push(Message { role: "user".to_string(), content: vec![ContentBlock::Text { @@ -86,10 +89,9 @@ impl Agent { for iteration in 0..MAX_TOOL_LOOPS { debug!("agent loop iteration {iteration}"); - // Truncate context to prevent unbounded growth / token limit self.truncate_context(); - let events = self.call_llm().await?; + let events = self.call_llm(on_text).await?; let mut tool_calls = Vec::new(); let mut text_parts = Vec::new(); @@ -128,7 +130,6 @@ impl Agent { }); if tool_calls.is_empty() || !text_parts.is_empty() { - // No tool calls — we're done final_text = text_parts.join(""); break; } @@ -175,15 +176,14 @@ impl Agent { /// first user message and maintaining strict user/assistant alternation. fn truncate_context(&mut self) { while self.messages.len() > MAX_CONTEXT_MESSAGES { - // Drain in pairs (assistant + user) from index 1 to maintain alternation let end = (1 + 2).min(self.messages.len()); self.messages.drain(1..end); } } - async fn call_llm(&self) -> Result> { + async fn call_llm(&self, on_text: Option<&TextCallback>) -> Result> { self.provider - .chat(&self.system_prompt, &self.messages, &self.tools) + .chat(&self.system_prompt, &self.messages, &self.tools, on_text) .await } } @@ -215,11 +215,22 @@ mod tests { _system: &'a str, _messages: &'a [Message], _tools: &'a [ToolDef], + on_text: Option<&'a TextCallback>, ) -> std::pin::Pin>> + Send + 'a>> { let idx = self.call_count.fetch_add(1, Ordering::SeqCst); let events = self.responses[idx].clone(); - Box::pin(async move { Ok(events) }) + Box::pin(async move { + // Simulate streaming: call on_text for each Text event + if let Some(cb) = on_text { + for event in &events { + if let LlmEvent::Text(t) = event { + cb(t); + } + } + } + Ok(events) + }) } } @@ -232,10 +243,33 @@ mod tests { let tmp = tempfile::TempDir::new().unwrap(); let mut agent = Agent::new(mock, tmp.path().to_string_lossy().to_string()); - let result = agent.run("hi").await.unwrap(); + let result = agent.run("hi", None).await.unwrap(); assert_eq!(result, "Hello!"); } + #[tokio::test] + async fn test_agent_streams_text_via_callback() { + let mock = MockLlmProvider::new(vec![vec![ + LlmEvent::Text("Hello ".to_string()), + LlmEvent::Text("world!".to_string()), + LlmEvent::Stop, + ]]); + + let tmp = tempfile::TempDir::new().unwrap(); + let mut agent = Agent::new(mock, tmp.path().to_string_lossy().to_string()); + + let chunks: Arc>> = Arc::new(std::sync::Mutex::new(vec![])); + let chunks_clone = chunks.clone(); + let cb: TextCallback = Box::new(move |text| { + chunks_clone.lock().unwrap().push(text.to_string()); + }); + + let result = agent.run("hi", Some(&cb)).await.unwrap(); + assert_eq!(result, "Hello world!"); + let collected = chunks.lock().unwrap(); + assert_eq!(*collected, vec!["Hello ", "world!"]); + } + #[tokio::test] #[ignore] // Integration test: executes real file tools async fn test_agent_tool_call_then_response() { @@ -243,13 +277,11 @@ mod tests { std::fs::write(tmp.path().join("test.txt"), "file content here").unwrap(); let mock = MockLlmProvider::new(vec![ - // First call: LLM requests to read a file vec![LlmEvent::ToolUse { id: "tu_1".to_string(), name: "read".to_string(), input: serde_json::json!({ "path": "test.txt" }), }], - // Second call: LLM responds with text vec![ LlmEvent::Text("The file contains: file content here".to_string()), LlmEvent::Stop, @@ -257,7 +289,7 @@ mod tests { ]); let mut agent = Agent::new(mock, tmp.path().to_string_lossy().to_string()); - let result = agent.run("read test.txt").await.unwrap(); + let result = agent.run("read test.txt", None).await.unwrap(); assert_eq!(result, "The file contains: file content here"); } @@ -267,13 +299,11 @@ mod tests { let tmp = tempfile::TempDir::new().unwrap(); let mock = MockLlmProvider::new(vec![ - // First call: LLM requests to read a non-existent file vec![LlmEvent::ToolUse { id: "tu_1".to_string(), name: "read".to_string(), input: serde_json::json!({ "path": "nonexistent.txt" }), }], - // Second call: LLM acknowledges the error vec![ LlmEvent::Text("File not found.".to_string()), LlmEvent::Stop, @@ -281,11 +311,10 @@ mod tests { ]); let mut agent = Agent::new(mock, tmp.path().to_string_lossy().to_string()); - let result = agent.run("read nonexistent.txt").await.unwrap(); + let result = agent.run("read nonexistent.txt", None).await.unwrap(); assert_eq!(result, "File not found."); - // Verify the tool result was marked as error - assert_eq!(agent.messages.len(), 4); // user, assistant(tool_use), user(tool_result), assistant(text) + assert_eq!(agent.messages.len(), 4); let tool_result_msg = &agent.messages[2]; match &tool_result_msg.content[0] { ContentBlock::ToolResult { is_error, .. } => { @@ -301,19 +330,16 @@ mod tests { let tmp = tempfile::TempDir::new().unwrap(); let mock = MockLlmProvider::new(vec![ - // First call: write a file vec![LlmEvent::ToolUse { id: "tu_1".to_string(), name: "write".to_string(), input: serde_json::json!({ "path": "out.txt", "content": "hello" }), }], - // Second call: read it back vec![LlmEvent::ToolUse { id: "tu_2".to_string(), name: "read".to_string(), input: serde_json::json!({ "path": "out.txt" }), }], - // Third call: done vec![ LlmEvent::Text("Done. File contains: hello".to_string()), LlmEvent::Stop, @@ -322,12 +348,11 @@ mod tests { let mut agent = Agent::new(mock, tmp.path().to_string_lossy().to_string()); let result = agent - .run("write hello to out.txt then read it") + .run("write hello to out.txt then read it", None) .await .unwrap(); assert_eq!(result, "Done. File contains: hello"); - // Verify file was actually written let content = std::fs::read_to_string(tmp.path().join("out.txt")).unwrap(); assert_eq!(content, "hello"); } diff --git a/openab-agent/src/llm.rs b/openab-agent/src/llm.rs index 429b7875b..48cf5c775 100644 --- a/openab-agent/src/llm.rs +++ b/openab-agent/src/llm.rs @@ -1,8 +1,11 @@ use anyhow::{anyhow, Result}; use base64::Engine; +use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::pin::Pin; +use tokio::io::AsyncBufReadExt; +use tokio_util::io::StreamReader; /// A message in the conversation. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -54,6 +57,9 @@ pub enum LlmEvent { Error(String), } +/// Callback invoked for each text chunk during streaming. +pub type TextCallback = Box; + /// Trait for LLM providers. pub trait LlmProvider: Send + Sync { fn chat<'a>( @@ -61,6 +67,7 @@ pub trait LlmProvider: Send + Sync { system: &'a str, messages: &'a [Message], tools: &'a [ToolDef], + on_text: Option<&'a TextCallback>, ) -> Pin>> + Send + 'a>>; } @@ -68,7 +75,6 @@ pub trait LlmProvider: Send + Sync { pub struct AnthropicProvider { api_key: String, model: String, - #[allow(dead_code)] max_tokens: u32, client: reqwest::Client, } @@ -128,6 +134,7 @@ impl AnthropicProvider { let mut body = json!({ "model": &self.model, "max_tokens": self.max_tokens, + "stream": true, "messages": msgs, "system": system, }); @@ -156,6 +163,7 @@ impl LlmProvider for AnthropicProvider { system: &'a str, messages: &'a [Message], tools: &'a [ToolDef], + on_text: Option<&'a TextCallback>, ) -> Pin>> + Send + 'a>> { Box::pin(async move { let body = self.build_request_body(system, messages, tools); @@ -175,7 +183,6 @@ impl LlmProvider for AnthropicProvider { let status = resp.status(); - // Retry on 429 (rate limit) or 529 (overloaded) if (status.as_u16() == 429 || status.as_u16() == 529) && attempt < max_retries { let delay = std::time::Duration::from_millis(1000 * 2u64.pow(attempt)); tokio::time::sleep(delay).await; @@ -187,62 +194,122 @@ impl LlmProvider for AnthropicProvider { return Err(anyhow!("Anthropic API error {status}: {text}")); } - let response: Value = resp - .json() + // Parse SSE stream + let byte_stream = resp.bytes_stream(); + let stream_reader = StreamReader::new( + byte_stream + .map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))), + ); + let mut lines = tokio::io::BufReader::new(stream_reader).lines(); + + let mut events = Vec::new(); + let mut current_text = String::new(); + let mut tool_id = String::new(); + let mut tool_name = String::new(); + let mut tool_input_json = String::new(); + let mut in_tool_use = false; + let mut stop_reason = String::new(); + + while let Some(line) = lines + .next_line() .await - .map_err(|e| anyhow!("Failed to parse response: {e}"))?; - - return parse_anthropic_response(&response); - } - - Err(anyhow!("Anthropic API: max retries exceeded")) - }) - } -} - -fn parse_anthropic_response(response: &Value) -> Result> { - let mut events = Vec::new(); + .map_err(|e| anyhow!("stream read: {e}"))? + { + let line = line.trim().to_string(); + if !line.starts_with("data: ") { + continue; + } + let data = &line[6..]; + if data == "[DONE]" { + break; + } + let event: Value = match serde_json::from_str(data) { + Ok(v) => v, + Err(_) => continue, + }; + + let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); + + match event_type { + "content_block_start" => { + let block = &event["content_block"]; + match block.get("type").and_then(|t| t.as_str()) { + Some("tool_use") => { + // Flush any accumulated text + if !current_text.is_empty() { + events.push(LlmEvent::Text(current_text.clone())); + current_text.clear(); + } + in_tool_use = true; + tool_id = block["id"].as_str().unwrap_or("").to_string(); + tool_name = block["name"].as_str().unwrap_or("").to_string(); + tool_input_json.clear(); + } + _ => {} + } + } + "content_block_delta" => { + let delta = &event["delta"]; + match delta.get("type").and_then(|t| t.as_str()) { + Some("text_delta") => { + if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { + current_text.push_str(text); + if let Some(cb) = on_text { + cb(text); + } + } + } + Some("input_json_delta") => { + if let Some(json_chunk) = + delta.get("partial_json").and_then(|t| t.as_str()) + { + tool_input_json.push_str(json_chunk); + } + } + _ => {} + } + } + "content_block_stop" => { + if in_tool_use { + let input: Value = + serde_json::from_str(&tool_input_json).unwrap_or(json!({})); + events.push(LlmEvent::ToolUse { + id: tool_id.clone(), + name: tool_name.clone(), + input, + }); + in_tool_use = false; + } else if !current_text.is_empty() { + events.push(LlmEvent::Text(current_text.clone())); + current_text.clear(); + } + } + "message_delta" => { + if let Some(sr) = + event["delta"].get("stop_reason").and_then(|s| s.as_str()) + { + stop_reason = sr.to_string(); + } + } + _ => {} + } + } - let content = response - .get("content") - .and_then(|c| c.as_array()) - .ok_or_else(|| anyhow!("missing content in response"))?; + // Flush remaining text + if !current_text.is_empty() { + events.push(LlmEvent::Text(current_text)); + } - for block in content { - match block.get("type").and_then(|t| t.as_str()) { - Some("text") => { - if let Some(text) = block.get("text").and_then(|t| t.as_str()) { - events.push(LlmEvent::Text(text.to_string())); + if stop_reason != "tool_use" { + events.push(LlmEvent::Stop); } - } - Some("tool_use") => { - let id = block - .get("id") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let name = block - .get("name") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let input = block.get("input").cloned().unwrap_or(json!({})); - events.push(LlmEvent::ToolUse { id, name, input }); - } - _ => {} - } - } - let stop_reason = response - .get("stop_reason") - .and_then(|s| s.as_str()) - .unwrap_or("end_turn"); + return Ok(events); + } - if stop_reason != "tool_use" { - events.push(LlmEvent::Stop); + Err(anyhow!("Anthropic API: max retries exceeded")) + }) } - - Ok(events) } // === OpenAI-compatible Provider (for Codex subscription via OAuth) === @@ -258,7 +325,6 @@ pub struct OpenAiProvider { impl OpenAiProvider { /// Create provider using stored OAuth token from ~/.openab/agent/auth.json pub fn from_auth_store() -> Result { - // Just verify tokens exist; actual token is fetched at call time crate::auth::load_tokens().map_err(|e| e.to_string())?; Ok(Self { base_url: std::env::var("OPENAB_AGENT_OPENAI_BASE_URL") @@ -281,13 +347,13 @@ impl LlmProvider for OpenAiProvider { system: &'a str, messages: &'a [Message], tools: &'a [ToolDef], + on_text: Option<&'a TextCallback>, ) -> Pin>> + Send + 'a>> { Box::pin(async move { // Build Responses API input format let mut oai_messages: Vec = vec![]; for m in messages { if m.role == "user" { - // User text messages let texts: Vec<&str> = m .content .iter() @@ -302,7 +368,6 @@ impl LlmProvider for OpenAiProvider { if !texts.is_empty() { oai_messages.push(json!({"role": "user", "content": [{"type": "input_text", "text": texts.join("")}]})); } - // Tool results as function_call_output for b in &m.content { if let ContentBlock::ToolResult { tool_use_id, @@ -328,7 +393,6 @@ impl LlmProvider for OpenAiProvider { } } - // Build Responses API body let mut body = json!({ "model": &self.model, "store": false, @@ -357,7 +421,6 @@ impl LlmProvider for OpenAiProvider { let max_retries = 3u32; for attempt in 0..=max_retries { let token = crate::auth::get_valid_token().await?; - // Extract account ID from JWT for chatgpt backend API let account_id = extract_account_id_from_jwt(&token); let mut req = self .client @@ -381,7 +444,6 @@ impl LlmProvider for OpenAiProvider { continue; } - // 401: token may have expired mid-request, force refresh and retry if status.as_u16() == 401 && attempt < max_retries { let _ = crate::auth::force_refresh().await; continue; @@ -392,13 +454,23 @@ impl LlmProvider for OpenAiProvider { return Err(anyhow!("OpenAI API error {status}: {text}")); } - // Parse SSE stream - collect output items from response.output_item.done events - let text = resp - .text() - .await - .map_err(|e| anyhow!("Failed to read response: {e}"))?; + // Stream SSE line-by-line + let byte_stream = resp.bytes_stream(); + let stream_reader = StreamReader::new( + byte_stream + .map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))), + ); + let mut lines = tokio::io::BufReader::new(stream_reader).lines(); + let mut output_items: Vec = Vec::new(); - for line in text.lines() { + let mut current_text = String::new(); + + while let Some(line) = lines + .next_line() + .await + .map_err(|e| anyhow!("stream read: {e}"))? + { + let line = line.trim().to_string(); if let Some(data) = line.strip_prefix("data: ") { if data == "[DONE]" { break; @@ -406,22 +478,39 @@ impl LlmProvider for OpenAiProvider { if let Ok(event) = serde_json::from_str::(data) { let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); - if event_type == "response.output_item.done" { - if let Some(item) = event.get("item") { - output_items.push(item.clone()); + match event_type { + "response.output_text.delta" => { + if let Some(delta) = event.get("delta").and_then(|d| d.as_str()) + { + current_text.push_str(delta); + if let Some(cb) = on_text { + cb(delta); + } + } } + "response.output_item.done" => { + if let Some(item) = event.get("item") { + output_items.push(item.clone()); + } + } + _ => {} } } } } - if output_items.is_empty() { - return Err(anyhow!( - "No output items in SSE stream. Raw: {}", - &text[..text.len().min(500)] - )); + + if output_items.is_empty() && current_text.is_empty() { + return Err(anyhow!("No output in SSE stream")); + } + + // If we collected output_items, parse them (includes function_calls) + if !output_items.is_empty() { + let response = json!({"output": output_items}); + return parse_openai_response(&response); } - let response = json!({"output": output_items}); - return parse_openai_response(&response); + + // Fallback: text-only response + return Ok(vec![LlmEvent::Text(current_text), LlmEvent::Stop]); } Err(anyhow!("OpenAI API: max retries exceeded")) }) @@ -503,14 +592,12 @@ fn parse_openai_response(response: &Value) -> Result> { let message = choice.get("message").ok_or_else(|| anyhow!("No message"))?; - // Text content if let Some(content) = message.get("content").and_then(|c| c.as_str()) { if !content.is_empty() { events.push(LlmEvent::Text(content.to_string())); } } - // Tool calls if let Some(tool_calls) = message.get("tool_calls").and_then(|t| t.as_array()) { for tc in tool_calls { let id = tc @@ -549,62 +636,6 @@ fn parse_openai_response(response: &Value) -> Result> { mod tests { use super::*; - #[test] - fn test_parse_text_response() { - let resp = json!({ - "content": [{"type": "text", "text": "Hello world"}], - "stop_reason": "end_turn" - }); - let events = parse_anthropic_response(&resp).unwrap(); - assert_eq!(events.len(), 2); - match &events[0] { - LlmEvent::Text(t) => assert_eq!(t, "Hello world"), - _ => panic!("expected Text event"), - } - assert!(matches!(events[1], LlmEvent::Stop)); - } - - #[test] - fn test_parse_tool_use_response() { - let resp = json!({ - "content": [ - {"type": "tool_use", "id": "tu_1", "name": "read", "input": {"path": "/tmp/x"}} - ], - "stop_reason": "tool_use" - }); - let events = parse_anthropic_response(&resp).unwrap(); - assert_eq!(events.len(), 1); - match &events[0] { - LlmEvent::ToolUse { id, name, input } => { - assert_eq!(id, "tu_1"); - assert_eq!(name, "read"); - assert_eq!(input["path"], "/tmp/x"); - } - _ => panic!("expected ToolUse event"), - } - } - - #[test] - fn test_build_request_body() { - let provider = AnthropicProvider { - api_key: "test".to_string(), - model: "claude-sonnet-4-20250514".to_string(), - max_tokens: 4096, - client: reqwest::Client::new(), - }; - let messages = vec![Message { - role: "user".to_string(), - content: vec![ContentBlock::Text { - text: "hello".to_string(), - }], - }]; - let body = provider.build_request_body("system prompt", &messages, &[]); - assert_eq!(body["model"], "claude-sonnet-4-20250514"); - assert_eq!(body["max_tokens"], 4096); - assert_eq!(body["system"], "system prompt"); - assert_eq!(body["messages"][0]["role"], "user"); - } - #[test] fn test_parse_openai_text_response() { let resp = json!({ @@ -641,4 +672,36 @@ mod tests { let resp = json!({"choices": []}); assert!(parse_openai_response(&resp).is_err()); } + + #[test] + fn test_parse_openai_responses_api_format() { + let resp = json!({ + "output": [ + {"type": "message", "content": [{"type": "output_text", "text": "Hi"}]}, + ] + }); + let events = parse_openai_response(&resp).unwrap(); + assert_eq!(events.len(), 2); + assert!(matches!(&events[0], LlmEvent::Text(t) if t == "Hi")); + assert!(matches!(events[1], LlmEvent::Stop)); + } + + #[test] + fn test_build_request_body_has_stream() { + let provider = AnthropicProvider { + api_key: "test".to_string(), + model: "claude-sonnet-4-20250514".to_string(), + max_tokens: 4096, + client: reqwest::Client::new(), + }; + let messages = vec![Message { + role: "user".to_string(), + content: vec![ContentBlock::Text { + text: "hello".to_string(), + }], + }]; + let body = provider.build_request_body("system prompt", &messages, &[]); + assert_eq!(body["stream"], true); + assert_eq!(body["model"], "claude-sonnet-4-20250514"); + } } From f5803e981a96c090bd177ec0218fbdac946fcb01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Wed, 27 May 2026 11:24:20 +0000 Subject: [PATCH 02/10] fix(openab-agent): real streaming + review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address findings from 覺渡法師: F1 🔴: Fix fake streaming — callback now writes directly to stdout via Arc> instead of buffering in a Vec. Text chunks reach the harness immediately as they arrive from the LLM. F2 🟡: Mark filesystem-touching test with #[ignore] F3 🟡: Rename tests to _ pattern F4 🟡: Change TextCallback from Box to dyn Fn (type alias) to avoid double-indirection when passed as &TextCallback --- openab-agent/src/acp.rs | 39 +++++++++++++++++++-------------------- openab-agent/src/agent.rs | 14 +++++++------- openab-agent/src/llm.rs | 3 ++- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/openab-agent/src/acp.rs b/openab-agent/src/acp.rs index 9386cb650..1009f5d56 100644 --- a/openab-agent/src/acp.rs +++ b/openab-agent/src/acp.rs @@ -1,5 +1,5 @@ use crate::agent::Agent; -use crate::llm::{AnthropicProvider, TextCallback}; +use crate::llm::AnthropicProvider; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; @@ -192,14 +192,14 @@ impl AcpServer { } }; - // Collect streaming notifications in a buffer. The callback writes - // session/update notifications as text chunks arrive from the LLM. + // Real streaming: write each text chunk to stdout immediately as it + // arrives from the LLM, so the harness can update Discord in real time. let session_id_owned = session_id.to_string(); - let notifications: Arc>> = Arc::new(Mutex::new(Vec::new())); - let notif_clone = notifications.clone(); + let stdout = Arc::new(Mutex::new(io::stdout())); + let stdout_clone = stdout.clone(); let sid = session_id_owned.clone(); - let cb: TextCallback = Box::new(move |text: &str| { + let cb = |text: &str| { let notification = serde_json::to_string(&JsonRpcNotification { jsonrpc: "2.0", method: "session/update".to_string(), @@ -212,25 +212,23 @@ impl AcpServer { }), }) .unwrap(); - notif_clone.lock().unwrap().push(notification); - }); + let mut out = stdout_clone.lock().unwrap(); + let _ = writeln!(out, "{}", notification); + let _ = out.flush(); + }; let result = agent.run(&prompt_text, Some(&cb)).await; - let mut output_lines: Vec = notifications.lock().unwrap().drain(..).collect(); - + // Only the final response/error goes through the return path; + // all text chunks were already written to stdout above. match result { - Ok(_response_text) => { - // Text was already streamed via notifications above. - // Send final response to signal completion. - output_lines.push(self.ok_response(id, json!({ "stopReason": "end_turn" }))); + Ok(_) => { + vec![self.ok_response(id, json!({ "stopReason": "end_turn" }))] } Err(e) => { - output_lines.push(self.error_response(id, -32000, &format!("agent error: {e}"))); + vec![self.error_response(id, -32000, &format!("agent error: {e}"))] } } - - output_lines } fn ok_response(&self, id: u64, result: Value) -> String { @@ -259,7 +257,7 @@ mod tests { use super::*; #[test] - fn test_initialize_response() { + fn initialize_returns_streaming_capability() { let server = AcpServer::new(); let resp_str = server.handle_initialize(1); let resp: Value = serde_json::from_str(&resp_str).unwrap(); @@ -270,7 +268,7 @@ mod tests { } #[test] - fn test_session_new() { + fn session_new_with_valid_key_returns_session_id() { unsafe { std::env::set_var("ANTHROPIC_API_KEY", "test-key") }; let mut server = AcpServer::new(); let resp_str = server.handle_session_new(2); @@ -281,7 +279,8 @@ mod tests { } #[test] - fn test_session_new_missing_key() { + #[ignore] // Modifies filesystem (removes auth.json) + fn session_new_without_credentials_returns_error() { let auth_path = std::path::PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string())) .join(".openab/agent/auth.json"); diff --git a/openab-agent/src/agent.rs b/openab-agent/src/agent.rs index 5cfb5dac3..dce44a518 100644 --- a/openab-agent/src/agent.rs +++ b/openab-agent/src/agent.rs @@ -235,7 +235,7 @@ mod tests { } #[tokio::test] - async fn test_agent_simple_text_response() { + async fn simple_prompt_returns_text_response() { let mock = MockLlmProvider::new(vec![vec![ LlmEvent::Text("Hello!".to_string()), LlmEvent::Stop, @@ -248,7 +248,7 @@ mod tests { } #[tokio::test] - async fn test_agent_streams_text_via_callback() { + async fn streaming_callback_receives_all_text_chunks() { let mock = MockLlmProvider::new(vec![vec![ LlmEvent::Text("Hello ".to_string()), LlmEvent::Text("world!".to_string()), @@ -260,9 +260,9 @@ mod tests { let chunks: Arc>> = Arc::new(std::sync::Mutex::new(vec![])); let chunks_clone = chunks.clone(); - let cb: TextCallback = Box::new(move |text| { + let cb = move |text: &str| { chunks_clone.lock().unwrap().push(text.to_string()); - }); + }; let result = agent.run("hi", Some(&cb)).await.unwrap(); assert_eq!(result, "Hello world!"); @@ -272,7 +272,7 @@ mod tests { #[tokio::test] #[ignore] // Integration test: executes real file tools - async fn test_agent_tool_call_then_response() { + async fn tool_call_executes_then_returns_text() { let tmp = tempfile::TempDir::new().unwrap(); std::fs::write(tmp.path().join("test.txt"), "file content here").unwrap(); @@ -295,7 +295,7 @@ mod tests { #[tokio::test] #[ignore] // Integration test: executes real file tools - async fn test_agent_tool_error_handling() { + async fn tool_error_marked_as_is_error() { let tmp = tempfile::TempDir::new().unwrap(); let mock = MockLlmProvider::new(vec![ @@ -326,7 +326,7 @@ mod tests { #[tokio::test] #[ignore] // Integration test: executes real file tools - async fn test_agent_multiple_tool_calls() { + async fn multiple_tool_rounds_execute_sequentially() { let tmp = tempfile::TempDir::new().unwrap(); let mock = MockLlmProvider::new(vec![ diff --git a/openab-agent/src/llm.rs b/openab-agent/src/llm.rs index 48cf5c775..615cdb1b5 100644 --- a/openab-agent/src/llm.rs +++ b/openab-agent/src/llm.rs @@ -58,7 +58,8 @@ pub enum LlmEvent { } /// Callback invoked for each text chunk during streaming. -pub type TextCallback = Box; +/// Pass `&dyn Fn(&str)` directly to avoid double-indirection through Box. +pub type TextCallback = dyn Fn(&str) + Send + Sync; /// Trait for LLM providers. pub trait LlmProvider: Send + Sync { From bc190640ca95f02d5dc2ec9afde00b1b5351e0ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Wed, 27 May 2026 11:25:32 +0000 Subject: [PATCH 03/10] =?UTF-8?q?fix(openab-agent):=20address=20=E6=93=BA?= =?UTF-8?q?=E6=B8=A1=E6=B3=95=E5=B8=AB=20findings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit F1 🔴: Fix premature break when model returns text + tool_calls in same turn. Now only breaks when tool_calls is empty — text with concurrent tool_use correctly continues the loop. F3 🟡: Add 'error' event handling to Anthropic SSE parser for robustness against mid-stream errors. F4 🟡: Add 'error' event handling to OpenAI SSE parser. Note: the OpenAI Responses API emits fully-assembled items via response.output_item.done — no manual argument fragment merging is needed (unlike Chat Completions streaming). --- openab-agent/src/agent.rs | 3 ++- openab-agent/src/llm.rs | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/openab-agent/src/agent.rs b/openab-agent/src/agent.rs index dce44a518..7e87f0d7c 100644 --- a/openab-agent/src/agent.rs +++ b/openab-agent/src/agent.rs @@ -129,7 +129,8 @@ impl Agent { content: assistant_content, }); - if tool_calls.is_empty() || !text_parts.is_empty() { + if tool_calls.is_empty() { + // No tool calls — we're done, use whatever text was produced final_text = text_parts.join(""); break; } diff --git a/openab-agent/src/llm.rs b/openab-agent/src/llm.rs index 615cdb1b5..5b8b018b5 100644 --- a/openab-agent/src/llm.rs +++ b/openab-agent/src/llm.rs @@ -292,7 +292,13 @@ impl LlmProvider for AnthropicProvider { stop_reason = sr.to_string(); } } - _ => {} + "error" => { + let msg = event["error"]["message"] + .as_str() + .unwrap_or("unknown stream error"); + return Err(anyhow!("Anthropic stream error: {msg}")); + } + _ => {} // message_start, ping, etc. — no action needed } } @@ -494,7 +500,14 @@ impl LlmProvider for OpenAiProvider { output_items.push(item.clone()); } } - _ => {} + "error" => { + let msg = event["error"]["message"] + .as_str() + .or_else(|| event.get("message").and_then(|m| m.as_str())) + .unwrap_or("unknown stream error"); + return Err(anyhow!("OpenAI stream error: {msg}")); + } + _ => {} // response.created, response.in_progress, etc. } } } From 74cb2fbd015f7b8f6a0e595e3c1ec0eb84808753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Wed, 27 May 2026 12:06:25 +0000 Subject: [PATCH 04/10] fix(openab-agent): fix redundant clones and stale Cargo.lock - Remove redundant session_id_owned/stdout intermediates that trigger clippy::redundant_clone with -D warnings - Add move to streaming callback closure for clarity - Fix Cargo.lock to list futures-util (matches Cargo.toml) --- openab-agent/Cargo.lock | 2 +- openab-agent/src/acp.rs | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/openab-agent/Cargo.lock b/openab-agent/Cargo.lock index db0f200ac..e5fe969ee 100644 --- a/openab-agent/Cargo.lock +++ b/openab-agent/Cargo.lock @@ -777,7 +777,7 @@ dependencies = [ "anyhow", "base64", "clap", - "futures-core", + "futures-util", "getrandom 0.4.2", "libc", "open", diff --git a/openab-agent/src/acp.rs b/openab-agent/src/acp.rs index 1009f5d56..ab65ecd36 100644 --- a/openab-agent/src/acp.rs +++ b/openab-agent/src/acp.rs @@ -194,12 +194,10 @@ impl AcpServer { // Real streaming: write each text chunk to stdout immediately as it // arrives from the LLM, so the harness can update Discord in real time. - let session_id_owned = session_id.to_string(); + let sid = session_id.to_string(); let stdout = Arc::new(Mutex::new(io::stdout())); - let stdout_clone = stdout.clone(); - let sid = session_id_owned.clone(); - let cb = |text: &str| { + let cb = move |text: &str| { let notification = serde_json::to_string(&JsonRpcNotification { jsonrpc: "2.0", method: "session/update".to_string(), @@ -212,7 +210,7 @@ impl AcpServer { }), }) .unwrap(); - let mut out = stdout_clone.lock().unwrap(); + let mut out = stdout.lock().unwrap(); let _ = writeln!(out, "{}", notification); let _ = out.flush(); }; From 3402662e67e4da693213a0133eef561b97851a64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E6=B8=A1=E6=B3=95=E5=B8=AB?= Date: Thu, 28 May 2026 11:07:36 +0000 Subject: [PATCH 05/10] fix(openab-agent): fix clippy warnings - Replace match with single arm to if-let (clippy::single_match) - Replace len() > 0 with !is_empty() (clippy::len_zero) --- openab-agent/src/acp.rs | 2 +- openab-agent/src/llm.rs | 21 +++++++++------------ 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/openab-agent/src/acp.rs b/openab-agent/src/acp.rs index ab65ecd36..5a0988391 100644 --- a/openab-agent/src/acp.rs +++ b/openab-agent/src/acp.rs @@ -273,7 +273,7 @@ mod tests { let resp: Value = serde_json::from_str(&resp_str).unwrap(); assert_eq!(resp["jsonrpc"], "2.0"); assert_eq!(resp["id"], 2); - assert!(resp["result"]["sessionId"].as_str().unwrap().len() > 0); + assert!(!resp["result"]["sessionId"].as_str().unwrap().is_empty()); } #[test] diff --git a/openab-agent/src/llm.rs b/openab-agent/src/llm.rs index 5b8b018b5..e75db2fcc 100644 --- a/openab-agent/src/llm.rs +++ b/openab-agent/src/llm.rs @@ -234,19 +234,16 @@ impl LlmProvider for AnthropicProvider { match event_type { "content_block_start" => { let block = &event["content_block"]; - match block.get("type").and_then(|t| t.as_str()) { - Some("tool_use") => { - // Flush any accumulated text - if !current_text.is_empty() { - events.push(LlmEvent::Text(current_text.clone())); - current_text.clear(); - } - in_tool_use = true; - tool_id = block["id"].as_str().unwrap_or("").to_string(); - tool_name = block["name"].as_str().unwrap_or("").to_string(); - tool_input_json.clear(); + if let Some("tool_use") = block.get("type").and_then(|t| t.as_str()) { + // Flush any accumulated text + if !current_text.is_empty() { + events.push(LlmEvent::Text(current_text.clone())); + current_text.clear(); } - _ => {} + in_tool_use = true; + tool_id = block["id"].as_str().unwrap_or("").to_string(); + tool_name = block["name"].as_str().unwrap_or("").to_string(); + tool_input_json.clear(); } } "content_block_delta" => { From b3b5f4b53f617e191be3ece2af2c078bdacb91a5 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Thu, 28 May 2026 20:12:13 +0000 Subject: [PATCH 06/10] fix(openab-agent): use std::io::Error::other() for clippy io_other_error lint --- openab-agent/src/llm.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openab-agent/src/llm.rs b/openab-agent/src/llm.rs index e75db2fcc..796bb05ca 100644 --- a/openab-agent/src/llm.rs +++ b/openab-agent/src/llm.rs @@ -199,7 +199,7 @@ impl LlmProvider for AnthropicProvider { let byte_stream = resp.bytes_stream(); let stream_reader = StreamReader::new( byte_stream - .map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))), + .map(|r| r.map_err(|e| std::io::Error::other(e))), ); let mut lines = tokio::io::BufReader::new(stream_reader).lines(); @@ -462,7 +462,7 @@ impl LlmProvider for OpenAiProvider { let byte_stream = resp.bytes_stream(); let stream_reader = StreamReader::new( byte_stream - .map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))), + .map(|r| r.map_err(|e| std::io::Error::other(e))), ); let mut lines = tokio::io::BufReader::new(stream_reader).lines(); From ceda048d180893b6d9c9fad6705c4382e9ca9a1d Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Thu, 28 May 2026 20:17:03 +0000 Subject: [PATCH 07/10] style(openab-agent): fix cargo fmt formatting --- openab-agent/src/llm.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/openab-agent/src/llm.rs b/openab-agent/src/llm.rs index 796bb05ca..3a53d673f 100644 --- a/openab-agent/src/llm.rs +++ b/openab-agent/src/llm.rs @@ -197,10 +197,8 @@ impl LlmProvider for AnthropicProvider { // Parse SSE stream let byte_stream = resp.bytes_stream(); - let stream_reader = StreamReader::new( - byte_stream - .map(|r| r.map_err(|e| std::io::Error::other(e))), - ); + let stream_reader = + StreamReader::new(byte_stream.map(|r| r.map_err(|e| std::io::Error::other(e)))); let mut lines = tokio::io::BufReader::new(stream_reader).lines(); let mut events = Vec::new(); @@ -460,10 +458,8 @@ impl LlmProvider for OpenAiProvider { // Stream SSE line-by-line let byte_stream = resp.bytes_stream(); - let stream_reader = StreamReader::new( - byte_stream - .map(|r| r.map_err(|e| std::io::Error::other(e))), - ); + let stream_reader = + StreamReader::new(byte_stream.map(|r| r.map_err(|e| std::io::Error::other(e)))); let mut lines = tokio::io::BufReader::new(stream_reader).lines(); let mut output_items: Vec = Vec::new(); From b0ce74786bbe4333bcd12f0491c472754541e240 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Thu, 28 May 2026 20:22:58 +0000 Subject: [PATCH 08/10] fix(openab-agent): remove redundant closure per clippy redundant_closure lint --- openab-agent/src/llm.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openab-agent/src/llm.rs b/openab-agent/src/llm.rs index 3a53d673f..aaf1d0dbd 100644 --- a/openab-agent/src/llm.rs +++ b/openab-agent/src/llm.rs @@ -198,7 +198,7 @@ impl LlmProvider for AnthropicProvider { // Parse SSE stream let byte_stream = resp.bytes_stream(); let stream_reader = - StreamReader::new(byte_stream.map(|r| r.map_err(|e| std::io::Error::other(e)))); + StreamReader::new(byte_stream.map(|r| r.map_err(std::io::Error::other))); let mut lines = tokio::io::BufReader::new(stream_reader).lines(); let mut events = Vec::new(); @@ -459,7 +459,7 @@ impl LlmProvider for OpenAiProvider { // Stream SSE line-by-line let byte_stream = resp.bytes_stream(); let stream_reader = - StreamReader::new(byte_stream.map(|r| r.map_err(|e| std::io::Error::other(e)))); + StreamReader::new(byte_stream.map(|r| r.map_err(std::io::Error::other))); let mut lines = tokio::io::BufReader::new(stream_reader).lines(); let mut output_items: Vec = Vec::new(); From 69473d62cc347e9b430a485c2ef9fc2d752191a2 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Thu, 28 May 2026 20:31:40 +0000 Subject: [PATCH 09/10] fix(openab-agent): rename tests to follow test__ convention --- openab-agent/src/acp.rs | 4 ++-- openab-agent/src/agent.rs | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/openab-agent/src/acp.rs b/openab-agent/src/acp.rs index 5a0988391..46ac10749 100644 --- a/openab-agent/src/acp.rs +++ b/openab-agent/src/acp.rs @@ -255,7 +255,7 @@ mod tests { use super::*; #[test] - fn initialize_returns_streaming_capability() { + fn test_initialize_returns_streaming_capability() { let server = AcpServer::new(); let resp_str = server.handle_initialize(1); let resp: Value = serde_json::from_str(&resp_str).unwrap(); @@ -266,7 +266,7 @@ mod tests { } #[test] - fn session_new_with_valid_key_returns_session_id() { + fn test_session_new_with_valid_key_returns_session_id() { unsafe { std::env::set_var("ANTHROPIC_API_KEY", "test-key") }; let mut server = AcpServer::new(); let resp_str = server.handle_session_new(2); diff --git a/openab-agent/src/agent.rs b/openab-agent/src/agent.rs index 7e87f0d7c..7d7074cc3 100644 --- a/openab-agent/src/agent.rs +++ b/openab-agent/src/agent.rs @@ -236,7 +236,7 @@ mod tests { } #[tokio::test] - async fn simple_prompt_returns_text_response() { + async fn test_simple_prompt_returns_text_response() { let mock = MockLlmProvider::new(vec![vec![ LlmEvent::Text("Hello!".to_string()), LlmEvent::Stop, @@ -249,7 +249,7 @@ mod tests { } #[tokio::test] - async fn streaming_callback_receives_all_text_chunks() { + async fn test_streaming_callback_receives_all_text_chunks() { let mock = MockLlmProvider::new(vec![vec![ LlmEvent::Text("Hello ".to_string()), LlmEvent::Text("world!".to_string()), @@ -273,7 +273,7 @@ mod tests { #[tokio::test] #[ignore] // Integration test: executes real file tools - async fn tool_call_executes_then_returns_text() { + async fn test_tool_call_executes_then_returns_text() { let tmp = tempfile::TempDir::new().unwrap(); std::fs::write(tmp.path().join("test.txt"), "file content here").unwrap(); @@ -296,7 +296,7 @@ mod tests { #[tokio::test] #[ignore] // Integration test: executes real file tools - async fn tool_error_marked_as_is_error() { + async fn test_tool_error_marked_as_is_error() { let tmp = tempfile::TempDir::new().unwrap(); let mock = MockLlmProvider::new(vec![ @@ -327,7 +327,7 @@ mod tests { #[tokio::test] #[ignore] // Integration test: executes real file tools - async fn multiple_tool_rounds_execute_sequentially() { + async fn test_multiple_tool_rounds_execute_sequentially() { let tmp = tempfile::TempDir::new().unwrap(); let mock = MockLlmProvider::new(vec![ From 3df68e78d49c993f5815611f1a1b4a307bd2f60c Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Thu, 28 May 2026 20:33:49 +0000 Subject: [PATCH 10/10] fix(openab-agent): address review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Revert loop condition to original: break when text+tool_calls coexist (F1 — behavioral change reverted) - Preserve current_text in OpenAI path when output_items also present, avoiding silent discard (F3) - Add TODO for stdout handle consolidation in future multi-session work (F2) --- openab-agent/src/acp.rs | 3 +++ openab-agent/src/agent.rs | 6 ++++-- openab-agent/src/llm.rs | 7 ++++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/openab-agent/src/acp.rs b/openab-agent/src/acp.rs index 46ac10749..b76871446 100644 --- a/openab-agent/src/acp.rs +++ b/openab-agent/src/acp.rs @@ -194,6 +194,9 @@ impl AcpServer { // Real streaming: write each text chunk to stdout immediately as it // arrives from the LLM, so the harness can update Discord in real time. + // TODO: consolidate stdout handles if we add concurrent sessions — + // this Arc> and the outer loop's stdout are separate handles + // on the same fd, safe for single-session but may interleave otherwise. let sid = session_id.to_string(); let stdout = Arc::new(Mutex::new(io::stdout())); diff --git a/openab-agent/src/agent.rs b/openab-agent/src/agent.rs index 7d7074cc3..87cf34501 100644 --- a/openab-agent/src/agent.rs +++ b/openab-agent/src/agent.rs @@ -129,8 +129,10 @@ impl Agent { content: assistant_content, }); - if tool_calls.is_empty() { - // No tool calls — we're done, use whatever text was produced + if tool_calls.is_empty() || !text_parts.is_empty() { + // If no tool calls, or if LLM returned text alongside tool calls, + // stop the loop. Text+tool_call combo means the model considers + // the text as the final user-facing answer. final_text = text_parts.join(""); break; } diff --git a/openab-agent/src/llm.rs b/openab-agent/src/llm.rs index aaf1d0dbd..f9115c610 100644 --- a/openab-agent/src/llm.rs +++ b/openab-agent/src/llm.rs @@ -513,7 +513,12 @@ impl LlmProvider for OpenAiProvider { // If we collected output_items, parse them (includes function_calls) if !output_items.is_empty() { let response = json!({"output": output_items}); - return parse_openai_response(&response); + let mut events = Vec::new(); + if !current_text.is_empty() { + events.push(LlmEvent::Text(current_text)); + } + events.extend(parse_openai_response(&response)?); + return Ok(events); } // Fallback: text-only response