Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/issue-watch-976.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: Issue Watch — #976 Activity Notify

on:
schedule:
- cron: '0 */4 * * *'
workflow_dispatch:

jobs:
watch:
runs-on: ubuntu-latest
steps:
- name: Check issue #976 activity and notify Discord
env:
GH_TOKEN: ${{ secrets.GH_PAT }}
DISCORD_WEBHOOK_URL: ${{ secrets.DISCORD_WEBHOOK_URL }}
SINCE: '2026-06-02T07:24:00Z'
run: |
FOUND=0
MSG=""

COMMENTS=$(gh api "/repos/openabdev/openab/issues/976/comments" \
--jq ".[] | select(.created_at > \"$SINCE\") | \"💬 [\(.user.login)] \(.created_at[:16]): \(.body[:150])\"" 2>/dev/null)
if [ -n "$COMMENTS" ]; then
FOUND=1
MSG="${MSG}\n**Issue #976 — New Comments:**\n${COMMENTS}"
fi

ISSUE_STATE=$(gh api /repos/openabdev/openab/issues/976 \
--jq '"state=\(.state) updated=\(.updated_at[:16]) labels=\([.labels[].name] | join(","))"' 2>/dev/null)
MSG="${MSG}\n**Issue #976 Status:** ${ISSUE_STATE}"

if [ "$FOUND" -eq 1 ]; then
HEADER="🔔 **GitHub Watch — Issue #976** | $(date -u '+%Y-%m-%d %H:%M UTC')"
FULL_MSG="${HEADER}\n${MSG}"
else
FULL_MSG="✅ **GitHub Watch #976** | $(date -u '+%Y-%m-%d %H:%M UTC') — No new activity since ${SINCE}."
fi

printf '%b' "$FULL_MSG" > /tmp/msg.txt
PAYLOAD=$(jq -n --rawfile msg /tmp/msg.txt '{content: $msg}')
curl -sf -X POST "$DISCORD_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d "$PAYLOAD"
51 changes: 51 additions & 0 deletions .github/workflows/pr-watch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: PR Watch — Upstream Activity Notify

on:
schedule:
- cron: '0 */4 * * *'
workflow_dispatch:

jobs:
watch:
runs-on: ubuntu-latest
steps:
- name: Check upstream activity and notify Discord
env:
GH_TOKEN: ${{ secrets.GH_PAT }}
DISCORD_WEBHOOK_URL: ${{ secrets.DISCORD_WEBHOOK_URL }}
SINCE: '2026-05-25T15:00:00Z'
run: |
NOW=$(date -u +%Y-%m-%dT%H:%M:%SZ)
FOUND=0
MSG=""

COMMENTS=$(gh api "/repos/openabdev/openab/issues/919/comments" \
--jq ".[] | select(.created_at > \"$SINCE\") | \"💬 [\(.user.login)] \(.created_at[:16]): \(.body[:120])\"" 2>/dev/null)
if [ -n "$COMMENTS" ]; then
FOUND=1
MSG="${MSG}\n**PR #919 — New Comments:**\n${COMMENTS}"
fi

REVIEWS=$(gh api "/repos/openabdev/openab/pulls/919/reviews" \
--jq ".[] | select(.submitted_at > \"$SINCE\") | \"🔍 [\(.user.login)] [\(.state)] \(.submitted_at[:16]): \(.body[:120])\"" 2>/dev/null)
if [ -n "$REVIEWS" ]; then
FOUND=1
MSG="${MSG}\n**PR #919 — New Reviews:**\n${REVIEWS}"
fi

PR_STATE=$(gh api /repos/openabdev/openab/pulls/919 \
--jq '"state=\(.state) mergeable=\(.mergeable_state) updated=\(.updated_at[:16])"' 2>/dev/null)
MSG="${MSG}\n**PR #919 Status:** ${PR_STATE}"

if [ "$FOUND" -eq 1 ]; then
HEADER="🔔 **GitHub Watch Report** | $(date -u '+%Y-%m-%d %H:%M UTC')"
FULL_MSG="${HEADER}\n${MSG}"
else
FULL_MSG="✅ **GitHub Watch** | $(date -u '+%Y-%m-%d %H:%M UTC') — No new activity on PR #919 since ${SINCE}."
fi

printf '%b' "$FULL_MSG" > /tmp/msg.txt
PAYLOAD=$(jq -n --rawfile msg /tmp/msg.txt '{content: $msg}')
curl -sf -X POST "$DISCORD_WEBHOOK_URL" \
-H "Content-Type: application/json" \
-d "$PAYLOAD"
31 changes: 31 additions & 0 deletions docs/codex.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,37 @@ args = []
working_dir = "/home/node"
```

### MCP Servers

To connect Codex to MCP sidecars, add `[[agent.mcp_servers]]` entries. OpenAB
forwards these to `codex-acp` in the `session/new` and `session/load` payloads.

**Stdio server:**

```toml
[agent.mcp_servers.my-tool]
command = "npx"
args = ["-y", "@my-org/my-mcp-tool"]
env = { API_KEY = "${MY_TOOL_API_KEY}" }
```

**Streamable HTTP server:**

```toml
[agent.mcp_servers.gdrive]
type = "http"
url = "http://127.0.0.1:3140/mcp"
```

**SSE server:**

```toml
[agent.mcp_servers.my-sse-tool]
type = "sse"
url = "http://127.0.0.1:3141/sse"
headers = { Authorization = "Bearer ${MY_SSE_TOKEN}" }
```

## Authentication

```bash
Expand Down
34 changes: 22 additions & 12 deletions src/acp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ impl AcpConnection {
Ok(_) => {
let trimmed = line.trim();
if !trimmed.is_empty() {
let sanitized: String = trimmed.chars()
let sanitized: String = trimmed
.chars()
.filter(|c| !c.is_control() || *c == '\t')
.collect();
if !sanitized.is_empty() {
Expand Down Expand Up @@ -482,9 +483,16 @@ impl AcpConnection {
Ok(())
}

pub async fn session_new(&mut self, cwd: &str) -> Result<String> {
pub async fn session_new(
&mut self,
cwd: &str,
mcp_servers: &[serde_json::Value],
) -> Result<String> {
let resp = self
.send_request("session/new", Some(json!({"cwd": cwd, "mcpServers": []})))
.send_request(
"session/new",
Some(json!({"cwd": cwd, "mcpServers": mcp_servers})),
)
.await?;

let session_id = resp
Expand Down Expand Up @@ -640,11 +648,16 @@ impl AcpConnection {

/// Resume a previous session by ID. Returns Ok(()) if the agent accepted
/// the load, or an error if it failed (caller should fall back to session/new).
pub async fn session_load(&mut self, session_id: &str, cwd: &str) -> Result<()> {
pub async fn session_load(
&mut self,
session_id: &str,
cwd: &str,
mcp_servers: &[serde_json::Value],
) -> Result<()> {
let resp = self
.send_request(
"session/load",
Some(json!({"sessionId": session_id, "cwd": cwd, "mcpServers": []})),
Some(json!({"sessionId": session_id, "cwd": cwd, "mcpServers": mcp_servers})),
)
.await?;
// Accept any non-error response as success
Expand Down Expand Up @@ -872,13 +885,10 @@ mod reader_loop_tests {
agent_stdout_writer.write_all(stale).await.unwrap();
agent_stdout_writer.flush().await.unwrap();

let forwarded = tokio::time::timeout(
std::time::Duration::from_secs(2),
sub_rx.recv(),
)
.await
.expect("subscriber should receive stale message before timeout")
.expect("subscriber channel should not be closed");
let forwarded = tokio::time::timeout(std::time::Duration::from_secs(2), sub_rx.recv())
.await
.expect("subscriber should receive stale message before timeout")
.expect("subscriber channel should not be closed");
assert_eq!(forwarded.id, Some(42));
assert!(pending.lock().await.is_empty());

Expand Down
23 changes: 18 additions & 5 deletions src/acp/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,14 @@ impl SessionPool {

new_conn.initialize().await?;

let mcp_servers = self.config.acp_mcp_servers()?;
let mut resumed = false;
if let Some(ref sid) = saved_session_id {
if new_conn.supports_load_session {
match new_conn.session_load(sid, &self.config.working_dir).await {
match new_conn
.session_load(sid, &self.config.working_dir, &mcp_servers)
.await
{
Ok(()) => {
info!(thread_id, session_id = %sid, "session resumed via session/load");
resumed = true;
Expand All @@ -198,7 +202,9 @@ impl SessionPool {
}

if !resumed {
new_conn.session_new(&self.config.working_dir).await?;
new_conn
.session_new(&self.config.working_dir, &mcp_servers)
.await?;
// Surface the reset banner both for restored sessions and for stale
// live entries that died before we could recover a resumable
// session id. In both cases the caller is continuing after an
Expand Down Expand Up @@ -521,14 +527,21 @@ mod tests {
fn persisted_mapping_can_include_active_and_suspended_sessions() {
let persisted = HashMap::from([
("active-thread".to_string(), "session-active".to_string()),
("suspended-thread".to_string(), "session-suspended".to_string()),
(
"suspended-thread".to_string(),
"session-suspended".to_string(),
),
]);

let serialized = serde_json::to_string_pretty(&persisted).expect("serialize persisted mapping");
let serialized =
serde_json::to_string_pretty(&persisted).expect("serialize persisted mapping");
let roundtrip: HashMap<String, String> =
serde_json::from_str(&serialized).expect("deserialize persisted mapping");

assert_eq!(roundtrip.get("active-thread"), Some(&"session-active".to_string()));
assert_eq!(
roundtrip.get("active-thread"),
Some(&"session-active".to_string())
);
assert_eq!(
roundtrip.get("suspended-thread"),
Some(&"session-suspended".to_string())
Expand Down
88 changes: 88 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,94 @@ pub struct AgentConfig {
pub env: HashMap<String, String>,
#[serde(default)]
pub inherit_env: Vec<String>,
/// MCP servers to register with the ACP agent at session init.
#[serde(default)]
pub mcp_servers: HashMap<String, McpServerConfig>,
}

impl AgentConfig {
/// Serialize configured MCP servers into the JSON array expected by
/// `session/new` and `session/load` ACP payloads.
pub fn acp_mcp_servers(&self) -> anyhow::Result<Vec<serde_json::Value>> {
let mut servers: Vec<_> = self.mcp_servers.iter().collect();
servers.sort_by_key(|(name, _)| *name);
servers
.into_iter()
.map(|(name, cfg)| cfg.to_acp_value(name))
.collect()
}
}

/// Configuration for a single MCP server entry under `[agent.mcp_servers]`.
#[derive(Debug, Deserialize)]
pub struct McpServerConfig {
/// Transport type: `"stdio"` (default), `"http"`, or `"sse"`.
#[serde(default, rename = "type")]
pub server_type: Option<String>,
/// Command to run (required for stdio).
pub command: Option<String>,
/// Arguments for stdio command.
#[serde(default)]
pub args: Vec<String>,
/// Server URL (required for http/sse).
pub url: Option<String>,
/// Environment variables for stdio servers.
#[serde(default)]
pub env: HashMap<String, String>,
/// HTTP headers for http/sse servers.
#[serde(default)]
pub headers: HashMap<String, String>,
}

impl McpServerConfig {
fn to_acp_value(&self, name: &str) -> anyhow::Result<serde_json::Value> {
anyhow::ensure!(!name.trim().is_empty(), "mcp_servers key cannot be empty");
match self
.server_type
.as_deref()
.unwrap_or("stdio")
.to_ascii_lowercase()
.as_str()
{
"stdio" => {
let command = self.command.as_deref().unwrap_or("").trim();
anyhow::ensure!(
!command.is_empty(),
"agent.mcp_servers.{name}.command is required for stdio servers"
);
Ok(serde_json::json!({
"name": name,
"command": command,
"args": &self.args,
"env": env_name_value(&self.env),
}))
}
t @ ("http" | "sse") => {
let url = self.url.as_deref().unwrap_or("").trim();
anyhow::ensure!(
!url.is_empty(),
"agent.mcp_servers.{name}.url is required for {t} servers"
);
let mut value = serde_json::json!({"name": name, "type": t, "url": url});
if !self.headers.is_empty() {
value["headers"] = serde_json::Value::Array(env_name_value(&self.headers));
}
Ok(value)
}
other => anyhow::bail!(
"agent.mcp_servers.{name}.type must be stdio, http, or sse (got {other})"
),
}
}
}

fn env_name_value(map: &HashMap<String, String>) -> Vec<serde_json::Value> {
let mut entries: Vec<_> = map.iter().collect();
entries.sort_by_key(|(k, _)| *k);
entries
.into_iter()
.map(|(k, v)| serde_json::json!({"name": k, "value": expand_env_vars(v)}))
.collect()
}

#[derive(Debug, Deserialize)]
Expand Down
1 change: 1 addition & 0 deletions src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,7 @@ mod tests {
working_dir: "/tmp".into(),
env: std::collections::HashMap::new(),
inherit_env: vec![],
mcp_servers: std::collections::HashMap::new(),
};
let pool = Arc::new(SessionPool::new(agent_cfg, 1));
let router = Arc::new(AdapterRouter::new(
Expand Down
Loading