From d55bf475ec7f14bf659b39ee39d39c29a4333183 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Wed, 3 Jun 2026 14:42:06 +0000 Subject: [PATCH 1/2] feat: add command wrapper hook and upgrade to Conductor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Upgrade agent-client-protocol from 0.11 to 0.13 - Add agent-client-protocol-conductor dependency - Replace manual stdin/stdout forwarding with ConductorImpl, which handles acp-over-mcp and future protocol extensions - Add CommandWrapper trait and ResolvedCommand struct for sandboxing (e.g., bwrap) — callers can intercept the resolved program/args/envs before the process is spawned - Make Acpr generic over the wrapper type (Acpr) with NullWrapper as the zero-cost default --- Cargo.toml | 3 +- src/lib.rs | 285 ++++++++++++++++++++----------------- tests/integration_tests.rs | 70 ++++++++- 3 files changed, 221 insertions(+), 137 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6a486b3..415d825 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,8 @@ clap = { version = "4", features = ["derive"] } dirs = "6.0.0" flate2 = "1.1.9" reqwest = { version = "0.13.2", features = ["json"] } -agent-client-protocol = "0.11" +agent-client-protocol = "0.13" +agent-client-protocol-conductor = "0.13" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" tar = "0.4.45" diff --git a/src/lib.rs b/src/lib.rs index e952320..6bf32d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,24 +4,60 @@ pub mod registry; pub use cli::*; pub use registry::*; -use agent_client_protocol::{Agent as AcpAgent, ByteStreams, Client, ConnectTo}; +use agent_client_protocol::{ + AcpAgent, Agent as AcpAgentRole, Client, ConnectTo, Stdio, + schema::{EnvVariable, McpServer, McpServerStdio}, +}; +use agent_client_protocol_conductor::{AgentOnly, ConductorImpl}; +use std::ffi::OsString; use std::path::PathBuf; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tokio::process::Command; -use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; +use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, info}; +/// A resolved command ready to be spawned, before any wrapping is applied. +#[derive(Debug, Clone)] +pub struct ResolvedCommand { + pub program: OsString, + pub args: Vec, + pub envs: Vec<(OsString, OsString)>, +} + +/// Trait for transforming a resolved command before it is spawned. +/// +/// Implement this to wrap agent processes in a sandbox or modify their +/// execution environment. A blanket impl is provided for all +/// `Fn(ResolvedCommand) -> ResolvedCommand`. +pub trait CommandWrapper { + fn wrap(&self, cmd: ResolvedCommand) -> ResolvedCommand; +} + +/// The default (no-op) command wrapper. +pub struct NullWrapper; + +impl CommandWrapper for NullWrapper { + fn wrap(&self, cmd: ResolvedCommand) -> ResolvedCommand { + cmd + } +} + +impl ResolvedCommand> CommandWrapper for F { + fn wrap(&self, cmd: ResolvedCommand) -> ResolvedCommand { + self(cmd) + } +} + /// Simple function to run an agent by name pub async fn run(agent_name: &str) -> Result<(), Box> { Acpr::new(agent_name).run().await } /// Main library interface for acpr -pub struct Acpr { +pub struct Acpr { pub agent_name: String, cache_dir: Option, registry_file: Option, force: Option, + command_wrapper: W, } impl Acpr { @@ -32,9 +68,12 @@ impl Acpr { cache_dir: None, registry_file: None, force: None, + command_wrapper: NullWrapper, } } +} +impl Acpr { /// Set a custom cache directory pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self { self.cache_dir = Some(cache_dir); @@ -53,22 +92,24 @@ impl Acpr { self } - /// Run the agent with default stdio - pub async fn run(&self) -> Result<(), Box> { - self.run_with_streams(tokio::io::stdin(), tokio::io::stdout()) - .await + /// Set a command wrapper that transforms the resolved command before it is spawned. + /// + /// The wrapper receives the fully-resolved command (after registry lookup, binary download, + /// and args applied) and returns a modified command. This is useful for wrapping the agent + /// process in a sandbox (e.g., bubblewrap/bwrap). + pub fn with_command_wrapper(self, wrapper: F) -> Acpr { + Acpr { + agent_name: self.agent_name, + cache_dir: self.cache_dir, + registry_file: self.registry_file, + force: self.force, + command_wrapper: wrapper, + } } - /// Run the agent with custom stdio streams - pub async fn run_with_streams( - &self, - stdin: R, - stdout: W, - ) -> Result<(), Box> - where - R: AsyncRead + Unpin + Send + 'static, - W: AsyncWrite + Unpin + Send + 'static, - { + /// Resolve the registry and build the command for this agent, + /// applying the command wrapper. + async fn resolve_agent(&self) -> Result> { let cache_dir = self.cache_dir.clone().unwrap_or_else(|| { dirs::cache_dir() .expect("No cache directory found") @@ -84,110 +125,99 @@ impl Acpr { .find(|a| a.id == self.agent_name) .ok_or("Agent not found")?; - debug!("Running agent: {}", agent.id); - - let mut cmd = self.build_command(agent, &cache_dir).await?; - cmd.stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::inherit()); - debug!("Running cmd: {cmd:?}"); - - let mut child = cmd.spawn()?; - let child_stdin = child.stdin.take().unwrap(); - let child_stdout = child.stdout.take().unwrap(); - - let stdin_future = async { - let mut stdin = stdin; - let mut child_stdin = child_stdin; - let mut buf = [0u8; 8192]; - loop { - match stdin.read(&mut buf).await { - Ok(0) => { - debug!("stdin: EOF received"); - break; - } - Ok(n) => { - debug!("stdin: received {} bytes", n); - if let Err(e) = child_stdin.write_all(&buf[..n]).await { - tracing::debug!("stdin write error: {}", e); - break; - } - if let Err(e) = child_stdin.flush().await { - tracing::debug!("stdin flush error: {}", e); - break; - } - debug!("stdin: forwarded {} bytes to child", n); - } - Err(e) => { - tracing::debug!("stdin read error: {}", e); - break; - } - } - } - Ok::<(), std::io::Error>(()) - }; - - let stdout_future = async { - let mut child_stdout = child_stdout; - let mut stdout = stdout; - let mut buf = [0u8; 8192]; - loop { - match child_stdout.read(&mut buf).await { - Ok(0) => { - debug!("stdout: EOF from child"); - break; - } - Ok(n) => { - debug!("stdout: received {} bytes from child", n); - if let Err(e) = stdout.write_all(&buf[..n]).await { - tracing::debug!("stdout write error: {}", e); - break; - } - if let Err(e) = stdout.flush().await { - tracing::debug!("stdout flush error: {}", e); - break; - } - debug!("stdout: forwarded {} bytes", n); - } - Err(e) => { - tracing::debug!("stdout read error: {}", e); - break; - } - } - } - Ok::<(), std::io::Error>(()) - }; + debug!("Resolving agent: {}", agent.id); - tokio::try_join!( - async { child.wait().await.map_err(|e| e.into()) }, - stdin_future, - stdout_future - )?; + let resolved = self.resolve_command(agent, &cache_dir).await?; + let resolved = self.command_wrapper.wrap(resolved); + + let args: Vec = resolved + .args + .iter() + .map(|a| a.to_string_lossy().into_owned()) + .collect(); + let envs: Vec = resolved + .envs + .iter() + .map(|(k, v)| { + EnvVariable::new( + k.to_string_lossy().into_owned(), + v.to_string_lossy().into_owned(), + ) + }) + .collect(); + + let command = resolved.program.to_string_lossy().into_owned(); + let mcp_server = McpServerStdio::new(&self.agent_name, &command) + .args(args) + .env(envs); + let acp_agent = AcpAgent::new(McpServer::Stdio(mcp_server)); + + Ok(acp_agent) + } + + /// Run the agent with default stdio, using the Conductor for protocol handling. + pub async fn run(&self) -> Result<(), Box> { + let acp_agent = self.resolve_agent().await?; + let conductor = ConductorImpl::new_agent(&self.agent_name, AgentOnly(acp_agent)); + conductor + .run(Stdio::new()) + .await + .map_err(|e| e.to_string())?; + Ok(()) + } + /// Run the agent with custom stdio streams, using the Conductor for protocol handling. + pub async fn run_with_streams( + &self, + stdin: R, + stdout: Wr, + ) -> Result<(), Box> + where + R: AsyncRead + Unpin + Send + 'static, + Wr: AsyncWrite + Unpin + Send + 'static, + { + let acp_agent = self.resolve_agent().await?; + let conductor = ConductorImpl::new_agent(&self.agent_name, AgentOnly(acp_agent)); + + let byte_streams = agent_client_protocol::ByteStreams::new( + tokio_util::compat::TokioAsyncWriteCompatExt::compat_write(stdout), + tokio_util::compat::TokioAsyncReadCompatExt::compat(stdin), + ); + conductor + .run(byte_streams) + .await + .map_err(|e| e.to_string())?; Ok(()) } - async fn build_command( + async fn resolve_command( &self, agent: &Agent, cache_dir: &PathBuf, - ) -> Result> { + ) -> Result> { if let Some(npx) = &agent.distribution.npx { info!("Executing npx package: {}", npx.package); - let mut cmd = Command::new("npx"); - cmd.arg("-y"); let package_arg = if npx.package.contains('@') && npx.package.matches('@').count() > 1 { npx.package.clone() } else { format!("{}@latest", npx.package) }; - cmd.arg(package_arg).args(&npx.args); - Ok(cmd) + let mut args: Vec = vec!["-y".into(), package_arg.into()]; + args.extend(npx.args.iter().map(OsString::from)); + Ok(ResolvedCommand { + program: "npx".into(), + args, + envs: vec![], + }) } else if let Some(uvx) = &agent.distribution.uvx { info!("Executing uvx package: {}", uvx.package); - let mut cmd = Command::new("uvx"); - cmd.arg(&uvx.package).args(&uvx.args); - Ok(cmd) + let mut args: Vec = vec![uvx.package.clone().into()]; + args.extend(uvx.args.iter().map(OsString::from)); + Ok(ResolvedCommand { + program: "uvx".into(), + args, + envs: vec![], + }) } else if !agent.distribution.binary.is_empty() { let platform = get_platform(); debug!("Platform detected: {}", platform); @@ -195,9 +225,12 @@ impl Acpr { let binary_path = download_binary(agent, binary_dist, cache_dir, self.force.as_ref()).await?; info!("Executing binary: {:?}", binary_path); - let mut cmd = Command::new(&binary_path); - cmd.args(&binary_dist.args); - Ok(cmd) + let args: Vec = binary_dist.args.iter().map(OsString::from).collect(); + Ok(ResolvedCommand { + program: binary_path.into_os_string(), + args, + envs: vec![], + }) } else { Err(format!("No binary available for platform: {}", platform).into()) } @@ -207,35 +240,19 @@ impl Acpr { } } -/// Implement ConnectTo so Acpr can act as an ACP agent -impl ConnectTo for Acpr { +/// Implement ConnectTo so Acpr can act as an ACP agent via Conductor +impl ConnectTo for Acpr { async fn connect_to( self, - client: impl ConnectTo, + client: impl ConnectTo, ) -> Result<(), agent_client_protocol::Error> { - debug!("ConnectTo: creating duplex streams"); - let (client_stdin, agent_stdin) = tokio::io::duplex(8192); - let (agent_stdout, client_stdout) = tokio::io::duplex(8192); - - debug!("ConnectTo: creating ByteStreams for sacp"); - let byte_streams = ByteStreams::new(client_stdin.compat_write(), client_stdout.compat()); - - debug!("ConnectTo: starting agent and client tasks"); - tokio::try_join!( - async { - debug!("ConnectTo: starting agent process"); - self.run_with_streams(agent_stdin, agent_stdout) - .await - .map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string())) - }, - async { - debug!("ConnectTo: starting sacp client connection"); - ConnectTo::::connect_to(byte_streams, client).await - } - )?; + let acp_agent = self + .resolve_agent() + .await + .map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))?; - debug!("ConnectTo: both tasks completed successfully"); - Ok(()) + let conductor = ConductorImpl::new_agent(&self.agent_name, AgentOnly(acp_agent)); + conductor.run(client).await } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index fa95bc4..6b3ea4b 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,4 +1,7 @@ -use acpr::{Agent, BinaryDist, ForceOption, download_binary, fetch_registry, get_platform}; +use acpr::{ + Acpr, Agent, BinaryDist, ForceOption, ResolvedCommand, download_binary, fetch_registry, + get_platform, +}; use std::collections::HashMap; use std::path::PathBuf; use tempfile::TempDir; @@ -232,7 +235,6 @@ async fn test_uvx_agent_basic() { return; } - use acpr::Acpr; use std::time::Duration; use tokio::io; @@ -261,3 +263,67 @@ async fn test_uvx_agent_basic() { } } } + +#[tokio::test] +async fn test_command_wrapper_transforms_command() { + use std::time::Duration; + + // Skip if environment variable is set (for CI) + if std::env::var("ACPR_SKIP_AGENT").is_ok() { + return; + } + + // Use a command wrapper that replaces the agent command with `echo` + let agent = Acpr::new("claude-acp").with_command_wrapper(|cmd: ResolvedCommand| { + let mut args = vec![format!("wrapped: {:?} {:?}", cmd.program, cmd.args).into()]; + args.extend(cmd.envs.iter().map(|(k, v)| { + format!("{}={}", k.to_string_lossy(), v.to_string_lossy()).into() + })); + ResolvedCommand { + program: "echo".into(), + args, + envs: vec![], + } + }); + + let (_, stdout_write) = tokio::io::duplex(4096); + + let result = tokio::time::timeout(Duration::from_secs(30), async { + agent + .run_with_streams(tokio::io::empty(), stdout_write) + .await + }) + .await; + + // The echo command should succeed - we just care that the wrapper was applied + match result { + Ok(Ok(())) => {} // wrapper worked, echo exited cleanly + Ok(Err(e)) => panic!("command wrapper test failed: {}", e), + Err(_) => panic!("command wrapper test timed out"), + } +} + +#[tokio::test] +async fn test_command_wrapper_sandbox_example() { + // This test just verifies the API compiles and the wrapper is stored correctly. + // It doesn't actually run bwrap since that requires specific system setup. + let agent = Acpr::new("claude-acp").with_command_wrapper(|cmd: ResolvedCommand| { + let mut args: Vec = vec![ + "--ro-bind".into(), + "/usr".into(), + "/usr".into(), + "--unshare-net".into(), + "--".into(), + ]; + args.push(cmd.program); + args.extend(cmd.args); + ResolvedCommand { + program: "bwrap".into(), + args, + envs: cmd.envs, + } + }); + + // Just verify we can construct it without panicking + assert_eq!(agent.agent_name, "claude-acp"); +} From 8aedf299a0ab2936bd175bf640a475f6c4b406b1 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Wed, 3 Jun 2026 14:48:27 +0000 Subject: [PATCH 2/2] chore: pacify the merciless fmt --- tests/integration_tests.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 6b3ea4b..19b6fe6 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -276,9 +276,11 @@ async fn test_command_wrapper_transforms_command() { // Use a command wrapper that replaces the agent command with `echo` let agent = Acpr::new("claude-acp").with_command_wrapper(|cmd: ResolvedCommand| { let mut args = vec![format!("wrapped: {:?} {:?}", cmd.program, cmd.args).into()]; - args.extend(cmd.envs.iter().map(|(k, v)| { - format!("{}={}", k.to_string_lossy(), v.to_string_lossy()).into() - })); + args.extend( + cmd.envs + .iter() + .map(|(k, v)| format!("{}={}", k.to_string_lossy(), v.to_string_lossy()).into()), + ); ResolvedCommand { program: "echo".into(), args,