Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ clap = { version = "4", features = ["derive"] }
dirs = "6.0.0"
flate2 = "1.1.9"
reqwest = { version = "0.13.2", features = ["json"] }
agent-client-protocol = "0.11"
agent-client-protocol = "0.13"
agent-client-protocol-conductor = "0.13"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
tar = "0.4.45"
Expand Down
285 changes: 151 additions & 134 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,60 @@ pub mod registry;
pub use cli::*;
pub use registry::*;

use agent_client_protocol::{Agent as AcpAgent, ByteStreams, Client, ConnectTo};
use agent_client_protocol::{
AcpAgent, Agent as AcpAgentRole, Client, ConnectTo, Stdio,
schema::{EnvVariable, McpServer, McpServerStdio},
};
use agent_client_protocol_conductor::{AgentOnly, ConductorImpl};
use std::ffi::OsString;
use std::path::PathBuf;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::process::Command;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, info};

/// A resolved command ready to be spawned, before any wrapping is applied.
#[derive(Debug, Clone)]
pub struct ResolvedCommand {
pub program: OsString,
pub args: Vec<OsString>,
pub envs: Vec<(OsString, OsString)>,
}

/// Trait for transforming a resolved command before it is spawned.
///
/// Implement this to wrap agent processes in a sandbox or modify their
/// execution environment. A blanket impl is provided for all
/// `Fn(ResolvedCommand) -> ResolvedCommand`.
pub trait CommandWrapper {
fn wrap(&self, cmd: ResolvedCommand) -> ResolvedCommand;
}

/// The default (no-op) command wrapper.
pub struct NullWrapper;

impl CommandWrapper for NullWrapper {
fn wrap(&self, cmd: ResolvedCommand) -> ResolvedCommand {
cmd
}
}

impl<F: Fn(ResolvedCommand) -> ResolvedCommand> CommandWrapper for F {
fn wrap(&self, cmd: ResolvedCommand) -> ResolvedCommand {
self(cmd)
}
}

/// Simple function to run an agent by name
pub async fn run(agent_name: &str) -> Result<(), Box<dyn std::error::Error>> {
Acpr::new(agent_name).run().await
}

/// Main library interface for acpr
pub struct Acpr {
pub struct Acpr<W: CommandWrapper = NullWrapper> {
pub agent_name: String,
cache_dir: Option<PathBuf>,
registry_file: Option<PathBuf>,
force: Option<ForceOption>,
command_wrapper: W,
}

impl Acpr {
Expand All @@ -32,9 +68,12 @@ impl Acpr {
cache_dir: None,
registry_file: None,
force: None,
command_wrapper: NullWrapper,
}
}
}

impl<W: CommandWrapper> Acpr<W> {
/// Set a custom cache directory
pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self {
self.cache_dir = Some(cache_dir);
Expand All @@ -53,22 +92,24 @@ impl Acpr {
self
}

/// Run the agent with default stdio
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
self.run_with_streams(tokio::io::stdin(), tokio::io::stdout())
.await
/// Set a command wrapper that transforms the resolved command before it is spawned.
///
/// The wrapper receives the fully-resolved command (after registry lookup, binary download,
/// and args applied) and returns a modified command. This is useful for wrapping the agent
/// process in a sandbox (e.g., bubblewrap/bwrap).
pub fn with_command_wrapper<F: CommandWrapper>(self, wrapper: F) -> Acpr<F> {
Acpr {
agent_name: self.agent_name,
cache_dir: self.cache_dir,
registry_file: self.registry_file,
force: self.force,
command_wrapper: wrapper,
}
}

/// Run the agent with custom stdio streams
pub async fn run_with_streams<R, W>(
&self,
stdin: R,
stdout: W,
) -> Result<(), Box<dyn std::error::Error>>
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
/// Resolve the registry and build the command for this agent,
/// applying the command wrapper.
async fn resolve_agent(&self) -> Result<AcpAgent, Box<dyn std::error::Error>> {
let cache_dir = self.cache_dir.clone().unwrap_or_else(|| {
dirs::cache_dir()
.expect("No cache directory found")
Expand All @@ -84,120 +125,112 @@ impl Acpr {
.find(|a| a.id == self.agent_name)
.ok_or("Agent not found")?;

debug!("Running agent: {}", agent.id);

let mut cmd = self.build_command(agent, &cache_dir).await?;
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit());
debug!("Running cmd: {cmd:?}");

let mut child = cmd.spawn()?;
let child_stdin = child.stdin.take().unwrap();
let child_stdout = child.stdout.take().unwrap();

let stdin_future = async {
let mut stdin = stdin;
let mut child_stdin = child_stdin;
let mut buf = [0u8; 8192];
loop {
match stdin.read(&mut buf).await {
Ok(0) => {
debug!("stdin: EOF received");
break;
}
Ok(n) => {
debug!("stdin: received {} bytes", n);
if let Err(e) = child_stdin.write_all(&buf[..n]).await {
tracing::debug!("stdin write error: {}", e);
break;
}
if let Err(e) = child_stdin.flush().await {
tracing::debug!("stdin flush error: {}", e);
break;
}
debug!("stdin: forwarded {} bytes to child", n);
}
Err(e) => {
tracing::debug!("stdin read error: {}", e);
break;
}
}
}
Ok::<(), std::io::Error>(())
};

let stdout_future = async {
let mut child_stdout = child_stdout;
let mut stdout = stdout;
let mut buf = [0u8; 8192];
loop {
match child_stdout.read(&mut buf).await {
Ok(0) => {
debug!("stdout: EOF from child");
break;
}
Ok(n) => {
debug!("stdout: received {} bytes from child", n);
if let Err(e) = stdout.write_all(&buf[..n]).await {
tracing::debug!("stdout write error: {}", e);
break;
}
if let Err(e) = stdout.flush().await {
tracing::debug!("stdout flush error: {}", e);
break;
}
debug!("stdout: forwarded {} bytes", n);
}
Err(e) => {
tracing::debug!("stdout read error: {}", e);
break;
}
}
}
Ok::<(), std::io::Error>(())
};
debug!("Resolving agent: {}", agent.id);

tokio::try_join!(
async { child.wait().await.map_err(|e| e.into()) },
stdin_future,
stdout_future
)?;
let resolved = self.resolve_command(agent, &cache_dir).await?;
let resolved = self.command_wrapper.wrap(resolved);

let args: Vec<String> = resolved
.args
.iter()
.map(|a| a.to_string_lossy().into_owned())
.collect();
let envs: Vec<EnvVariable> = resolved
.envs
.iter()
.map(|(k, v)| {
EnvVariable::new(
k.to_string_lossy().into_owned(),
v.to_string_lossy().into_owned(),
)
})
.collect();

let command = resolved.program.to_string_lossy().into_owned();
let mcp_server = McpServerStdio::new(&self.agent_name, &command)
.args(args)
.env(envs);
let acp_agent = AcpAgent::new(McpServer::Stdio(mcp_server));

Ok(acp_agent)
}

/// Run the agent with default stdio, using the Conductor for protocol handling.
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let acp_agent = self.resolve_agent().await?;
let conductor = ConductorImpl::new_agent(&self.agent_name, AgentOnly(acp_agent));
conductor
.run(Stdio::new())
.await
.map_err(|e| e.to_string())?;
Ok(())
}

/// Run the agent with custom stdio streams, using the Conductor for protocol handling.
pub async fn run_with_streams<R, Wr>(
&self,
stdin: R,
stdout: Wr,
) -> Result<(), Box<dyn std::error::Error>>
where
R: AsyncRead + Unpin + Send + 'static,
Wr: AsyncWrite + Unpin + Send + 'static,
{
let acp_agent = self.resolve_agent().await?;
let conductor = ConductorImpl::new_agent(&self.agent_name, AgentOnly(acp_agent));

let byte_streams = agent_client_protocol::ByteStreams::new(
tokio_util::compat::TokioAsyncWriteCompatExt::compat_write(stdout),
tokio_util::compat::TokioAsyncReadCompatExt::compat(stdin),
);
conductor
.run(byte_streams)
.await
.map_err(|e| e.to_string())?;
Ok(())
}

async fn build_command(
async fn resolve_command(
&self,
agent: &Agent,
cache_dir: &PathBuf,
) -> Result<Command, Box<dyn std::error::Error>> {
) -> Result<ResolvedCommand, Box<dyn std::error::Error>> {
if let Some(npx) = &agent.distribution.npx {
info!("Executing npx package: {}", npx.package);
let mut cmd = Command::new("npx");
cmd.arg("-y");
let package_arg = if npx.package.contains('@') && npx.package.matches('@').count() > 1 {
npx.package.clone()
} else {
format!("{}@latest", npx.package)
};
cmd.arg(package_arg).args(&npx.args);
Ok(cmd)
let mut args: Vec<OsString> = vec!["-y".into(), package_arg.into()];
args.extend(npx.args.iter().map(OsString::from));
Ok(ResolvedCommand {
program: "npx".into(),
args,
envs: vec![],
})
} else if let Some(uvx) = &agent.distribution.uvx {
info!("Executing uvx package: {}", uvx.package);
let mut cmd = Command::new("uvx");
cmd.arg(&uvx.package).args(&uvx.args);
Ok(cmd)
let mut args: Vec<OsString> = vec![uvx.package.clone().into()];
args.extend(uvx.args.iter().map(OsString::from));
Ok(ResolvedCommand {
program: "uvx".into(),
args,
envs: vec![],
})
} else if !agent.distribution.binary.is_empty() {
let platform = get_platform();
debug!("Platform detected: {}", platform);
if let Some(binary_dist) = agent.distribution.binary.get(&platform) {
let binary_path =
download_binary(agent, binary_dist, cache_dir, self.force.as_ref()).await?;
info!("Executing binary: {:?}", binary_path);
let mut cmd = Command::new(&binary_path);
cmd.args(&binary_dist.args);
Ok(cmd)
let args: Vec<OsString> = binary_dist.args.iter().map(OsString::from).collect();
Ok(ResolvedCommand {
program: binary_path.into_os_string(),
args,
envs: vec![],
})
} else {
Err(format!("No binary available for platform: {}", platform).into())
}
Expand All @@ -207,35 +240,19 @@ impl Acpr {
}
}

/// Implement ConnectTo<Client> so Acpr can act as an ACP agent
impl ConnectTo<Client> for Acpr {
/// Implement ConnectTo<Client> so Acpr can act as an ACP agent via Conductor
impl<W: CommandWrapper + Send + Sync + 'static> ConnectTo<Client> for Acpr<W> {
async fn connect_to(
self,
client: impl ConnectTo<AcpAgent>,
client: impl ConnectTo<AcpAgentRole>,
) -> Result<(), agent_client_protocol::Error> {
debug!("ConnectTo: creating duplex streams");
let (client_stdin, agent_stdin) = tokio::io::duplex(8192);
let (agent_stdout, client_stdout) = tokio::io::duplex(8192);

debug!("ConnectTo: creating ByteStreams for sacp");
let byte_streams = ByteStreams::new(client_stdin.compat_write(), client_stdout.compat());

debug!("ConnectTo: starting agent and client tasks");
tokio::try_join!(
async {
debug!("ConnectTo: starting agent process");
self.run_with_streams(agent_stdin, agent_stdout)
.await
.map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))
},
async {
debug!("ConnectTo: starting sacp client connection");
ConnectTo::<Client>::connect_to(byte_streams, client).await
}
)?;
let acp_agent = self
.resolve_agent()
.await
.map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))?;

debug!("ConnectTo: both tasks completed successfully");
Ok(())
let conductor = ConductorImpl::new_agent(&self.agent_name, AgentOnly(acp_agent));
conductor.run(client).await
}
}

Expand Down
Loading
Loading