diff --git a/CLAUDE.md b/CLAUDE.md index 0088678..60e0329 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -63,12 +63,12 @@ toolchain required. Broker: ```sh -PEERBUS_LISTEN=0.0.0.0:8080 PEERBUS_TOKENS=t1,t2 PEERBUS_HMAC_SECRET=... \ +PEERBUS_LISTEN=0.0.0.0:47821 PEERBUS_TOKENS=t1,t2 PEERBUS_HMAC_SECRET=... \ ./peerbus serve ./peerbus audit verify [--db PATH] # walks the blake3 chain ``` -`serve` config loads from env: `PEERBUS_LISTEN` (default `127.0.0.1:8080`), +`serve` config loads from env: `PEERBUS_LISTEN` (default `127.0.0.1:47821`), `PEERBUS_TOKENS` (comma-separated, ≥1 required), `PEERBUS_HMAC_SECRET` (≥ `hmac.MinSecretLen` = 32 bytes), `PEERBUS_DB` (default `peerbus.db`; the `--db` flag sets the base, env overrides). Missing tokens or a short secret @@ -83,9 +83,9 @@ peerbus adapter --adapter=generic # or --adapter=cc Env: `PEERBUS_URL`, `PEERBUS_NAME`, `PEERBUS_TOKEN`, `PEERBUS_HMAC_SECRET`. Fail-fast (exit 2): missing `PEERBUS_URL`, missing `PEERBUS_TOKEN`, `PEERBUS_HMAC_SECRET` shorter than `hmac.MinSecretLen` (32), or empty -`PEERBUS_NAME` when `--adapter=generic` (cc auto-mints -`cc---` when name is empty). Missing/unknown `--adapter` -is also exit 2. +`PEERBUS_NAME` when `--adapter=generic` (cc auto-mints a friendly +`--<3-char-base36>` name — see `internal/channel`). +Missing/unknown `--adapter` is also exit 2. ## Load-bearing invariants — do NOT break diff --git a/Dockerfile b/Dockerfile index 91c9451..5793c11 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,15 +37,15 @@ COPY --from=build /out/peerbus /usr/local/bin/peerbus # volume at /data. PEERBUS_DB should point here (see deploy/compose.yml). VOLUME ["/data"] # Broker config is read from PEERBUS_* env (see internal/broker/config.go): -# PEERBUS_LISTEN WS bind host:port (default 127.0.0.1:8080; set -# 0.0.0.0:8080 in a container so the published port +# PEERBUS_LISTEN WS bind host:port (default 127.0.0.1:47821; set +# 0.0.0.0:47821 in a container so the published port # reaches it) # PEERBUS_TOKENS comma-separated static bearer tokens (>=1 required) # PEERBUS_HMAC_SECRET shared end-to-end HMAC-SHA256 secret (min length # enforced; broker refuses to start otherwise) # PEERBUS_DB durable SQLite path (point at /data) # Default WS port. Keep in sync with PEERBUS_LISTEN. -EXPOSE 8080 +EXPOSE 47821 # No HEALTHCHECK: the broker exposes no health endpoint/subcommand and the # distroless image ships no shell or probe tooling. `restart: always` plus # the broker's crash-on-misconfig (missing token / short HMAC secret exits diff --git a/README.md b/README.md index 9a745f8..4d378ae 100644 --- a/README.md +++ b/README.md @@ -20,19 +20,6 @@ Messages are peer-to-peer and out-of-band: peerbus moves messages *between alrea **Honest taxonomy:** this is a **custom MCP-channel peer bus**. It is *conceptually* A2A-shaped — peer agents, asynchronous messages, human escalation handled by the peer rather than the bus — but it is **not** an implementation of Zed's [Agent Client Protocol](https://github.com/zed-industries/agent-client-protocol) nor of the Google / Linux Foundation [Agent2Agent (A2A)](https://github.com/a2aproject/A2A) specification. peerbus defines and implements its own small WebSocket wire protocol (see [`docs/wire-protocol.md`](docs/wire-protocol.md)); it borrows the *shape* of A2A-style peer messaging but ships none of those specs' types, handshakes, or guarantees. peerbus is its own bus, not an ACP/A2A implementation. -## Migration: v0.1.0 → v0.2.0 (single binary) - -v0.1.0 shipped two binaries (`peerbus-broker` and `peerbus-adapter`). v0.2.0 collapses them into **one** `peerbus` multi-command binary (git/kubectl style). Every flag and every env var inside each subcommand is preserved — only the dispatch shell changed. Pre-1.0, this is a breaking CLI rename; the wire protocol, security model, and on-disk store are unchanged. - -| v0.1.0 | v0.2.0 | -| --------------------------------------- | --------------------------------- | -| `peerbus-broker serve` | `peerbus serve` | -| `peerbus-broker audit verify` | `peerbus audit verify` | -| `peerbus-adapter --adapter=cc` | `peerbus adapter --adapter=cc` | -| `peerbus-adapter --adapter=generic` | `peerbus adapter --adapter=generic` | - -If you wired an adapter in `.mcp.json`, swap `"command": "peerbus-adapter", "args": ["--adapter=cc"]` for `"command": "peerbus", "args": ["adapter", "--adapter=cc"]` (note: **two** args now, since `adapter` is a subcommand). The Docker image still runs the broker by default — its `ENTRYPOINT`+`CMD` is now `peerbus serve`. - ## How It Works Two parts: a **broker** and **adapters**. @@ -87,7 +74,7 @@ Broker configuration (struct defaults, overridden by env): | Env var | Meaning | | --------------------- | --------------------------------------------------------------- | -| `PEERBUS_LISTEN` | WS server bind address (`host:port`, default `127.0.0.1:8080`). | +| `PEERBUS_LISTEN` | WS server bind address (`host:port`, default `127.0.0.1:47821`). | | `PEERBUS_TOKENS` | Comma-separated accepted static bearer tokens (at least one). | | `PEERBUS_HMAC_SECRET` | Shared end-to-end HMAC-SHA256 secret (min 32 bytes enforced). | | `PEERBUS_DB` | Durable SQLite store path (default `peerbus.db`). | @@ -115,7 +102,7 @@ The same `peerbus` binary runs the adapter — pick the mode at launch with `pee "command": "peerbus", "args": ["adapter", "--adapter=generic"], "env": { - "PEERBUS_URL": "ws://broker-host:8080", + "PEERBUS_URL": "ws://broker-host:47821", "PEERBUS_NAME": "hermes-prod", "PEERBUS_TOKEN": "", "PEERBUS_HMAC_SECRET": "" @@ -127,7 +114,7 @@ The same `peerbus` binary runs the adapter — pick the mode at launch with `pee Tools: `bus.send` (direct), `bus.broadcast` (fan-out), `bus.peers` (list), `bus.drain` (return + ack pending — the host calls this on its own schedule). Full guide: [`docs/integrations/generic-adapter.md`](docs/integrations/generic-adapter.md). Recommended timed self-drain + escalation pattern for Hermes: [`docs/integrations/hermes-drain-skill.md`](docs/integrations/hermes-drain-skill.md). -**An interactive Claude Code session** uses `peerbus adapter --adapter=cc` instead. It is the MCP `claude/channel` server; inbound is a push-wake that creates a turn in an idle session (no `bus.drain`). Register it in `.mcp.json` as a server named `peerbus`, same env vars as generic but leave `PEERBUS_NAME` empty to auto-register `cc---`: +**An interactive Claude Code session** uses `peerbus adapter --adapter=cc` instead. It is the MCP `claude/channel` server; inbound is a push-wake that creates a turn in an idle session (no `bus.drain`). Register it in `.mcp.json` as a server named `peerbus`, same env vars as generic but leave `PEERBUS_NAME` empty to auto-register a friendly `--<3-char-suffix>` name (e.g. `wild-wasp-3kx`). On startup the adapter pushes a system-kind notification announcing its bound name, and `bus.peers` returns `{ self, peers }` so the session always knows its own bus identity: ```json { @@ -136,7 +123,7 @@ Tools: `bus.send` (direct), `bus.broadcast` (fan-out), `bus.peers` (list), `bus. "command": "peerbus", "args": ["adapter", "--adapter=cc"], "env": { - "PEERBUS_URL": "ws://broker-host:8080", + "PEERBUS_URL": "ws://broker-host:47821", "PEERBUS_NAME": "", "PEERBUS_TOKEN": "", "PEERBUS_HMAC_SECRET": "" diff --git a/cmd/peerbus/adapter.go b/cmd/peerbus/adapter.go index 4dda691..7327dab 100644 --- a/cmd/peerbus/adapter.go +++ b/cmd/peerbus/adapter.go @@ -1,7 +1,6 @@ package main -// `adapter` subcommand. Ported verbatim from the v0.1.0 -// cmd/peerbus-adapter/main.go — same --adapter dispatch, same env vars, same +// `adapter` subcommand: --adapter dispatch over PEERBUS_* env, with // fail-fast guards (missing URL/Token, short HMAC, empty generic name). import ( @@ -20,8 +19,7 @@ import ( ) // envClientConfig is split out so the env→ClientConfig mapping is testable -// and the variable names live in exactly one place. Matches v0.1.0 -// peerbus-adapter behaviour verbatim. +// and the variable names live in exactly one place. func envClientConfig() adapter.ClientConfig { return adapter.ClientConfig{ URL: os.Getenv("PEERBUS_URL"), @@ -33,8 +31,7 @@ func envClientConfig() adapter.ClientConfig { // adapterRun parses adapter flags, builds the broker client config from the // environment, resolves + constructs the mode, and runs it until a -// termination signal or the host closes stdio. Returns a process exit code. -// Behaviour mirrors v0.1.0 `peerbus-adapter --adapter=` exactly: +// termination signal or the host closes stdio. Returns a process exit code: // // - missing or unknown --adapter → exit 2 // - missing PEERBUS_URL or PEERBUS_TOKEN → exit 2 @@ -98,7 +95,8 @@ func adapterRun(args []string, stdout, stderr io.Writer) int { // Generic mode binds a fixed peer name and has no auto-name fallback // (only cc auto-generates one). An empty PEERBUS_NAME there is rejected // at broker register on every attempt → reconnect spin; fail fast - // instead. cc tolerates an empty name (mints cc---). + // instead. cc tolerates an empty name (mints a friendly + // --<3 base36> name; see internal/channel.UniqueName). if *mode == "generic" && cfg.Name == "" { _, _ = fmt.Fprintln(stderr, "peerbus: PEERBUS_NAME is required for --adapter=generic") return 2 diff --git a/cmd/peerbus/broker.go b/cmd/peerbus/broker.go index 1681229..69edb3c 100644 --- a/cmd/peerbus/broker.go +++ b/cmd/peerbus/broker.go @@ -1,8 +1,7 @@ package main -// Broker subcommands: `serve` and `audit verify`. Ported verbatim from the -// v0.1.0 cmd/peerbus-broker/main.go — same flags, same env precedence, same -// exit codes (audit verify still exits 0 intact / 1 break / 2 operational). +// Broker subcommands: `serve` and `audit verify`. Exit codes for audit +// verify: 0 intact, 1 a break was found, 2 operational error. import ( "context" @@ -20,15 +19,13 @@ import ( "github.com/nnemirovsky/peerbus/internal/version" ) -// defaultDBPath is the store location used when --db is not given. Matches -// the v0.1.0 broker default. +// defaultDBPath is the store location used when --db is not given. const defaultDBPath = "peerbus.db" // brokerServe parses serve-subcommand flags, loads broker config from env // (env-overrides-struct precedence; see internal/broker.LoadConfig), and // runs the WebSocket broker until SIGINT/SIGTERM. Exit 0 on clean shutdown, -// 2 on config/operational error. Behaviour mirrors v0.1.0 `peerbus-broker -// serve` exactly. +// 2 on config/operational error. func brokerServe(args []string, stdout, stderr io.Writer) int { fs := flag.NewFlagSet("peerbus serve", flag.ContinueOnError) fs.SetOutput(stderr) @@ -84,9 +81,8 @@ func brokerServe(args []string, stdout, stderr io.Writer) int { // brokerAuditVerify implements the `audit verify` subcommand. The first // positional arg must be the literal verb "verify"; the --db flag is -// accepted EITHER before or after the verb (matches the v0.1.0 broker's -// flag-before-subcommand calling convention and its tests which pass -// `--db PATH audit verify` and bare `audit`). +// accepted EITHER before or after the verb so callers may pass either +// `audit verify --db PATH` or `--db PATH audit verify`. // // Exit codes: 0 chain intact, 1 a break was found, 2 usage/operational // error. diff --git a/cmd/peerbus/main.go b/cmd/peerbus/main.go index 1970f9f..4f141ff 100644 --- a/cmd/peerbus/main.go +++ b/cmd/peerbus/main.go @@ -1,17 +1,17 @@ // Command peerbus is the single multi-command binary for the peerbus -// project. It collapses the v0.1.0 cmd/peerbus-broker and cmd/peerbus-adapter -// into one git/kubectl-style dispatcher. +// project: one git/kubectl-style dispatcher for the broker and adapter +// subcommands. // -// Subcommands (each preserves its v0.1.0 flag/env contract verbatim): +// Subcommands: // // serve start the WebSocket broker (token auth + peer // registry + direct/broadcast routing, offline -// queue, ack/redelivery). Was: peerbus-broker serve. -// audit verify [--db PATH] walk the blake3 hash-chain audit log and report -// any break. Was: peerbus-broker audit verify. +// queue, ack/redelivery). +// audit verify [--db PATH] walk the blake3 hash-chain audit log and +// report any break. // adapter --adapter= run the adapter (mode resolved through the // additive --adapter dispatch registry; today: -// cc | generic). Was: peerbus-adapter --adapter=. +// cc | generic). // // Top-level flags: // @@ -41,9 +41,8 @@ func main() { // (brokerServe, brokerAuditVerify, adapterRun) is also independently // testable. func dispatch(args []string, stdout, stderr io.Writer) int { - // Top-level --version MUST work BEFORE subcommand parsing — matches the - // v0.1.0 behaviour of both old mains (`peerbus-broker --version` and - // `peerbus-adapter --version` both printed version and exited 0). + // Top-level --version MUST work BEFORE subcommand parsing so + // `peerbus --version` is answerable without a subcommand. if len(args) > 0 { switch args[0] { case "--version", "-version": diff --git a/cmd/peerbus/main_test.go b/cmd/peerbus/main_test.go index 3b58633..6e6688b 100644 --- a/cmd/peerbus/main_test.go +++ b/cmd/peerbus/main_test.go @@ -7,8 +7,7 @@ import ( ) // TestTopLevelVersion: `peerbus --version` prints the version and exits 0, -// WITHOUT requiring a subcommand. Matches the v0.1.0 behaviour of both old -// mains (both supported `--version` as a top-level flag). +// WITHOUT requiring a subcommand. func TestTopLevelVersion(t *testing.T) { var out, errb bytes.Buffer if code := dispatch([]string{"--version"}, &out, &errb); code != 0 { diff --git a/deploy/compose.yml b/deploy/compose.yml index a031f06..a5d1d47 100644 --- a/deploy/compose.yml +++ b/deploy/compose.yml @@ -54,11 +54,11 @@ services: # adapters on other machines can dial in. Keep the container-side port in # sync with PEERBUS_LISTEN below. ports: - - "8080:8080" + - "47821:47821" environment: # WS bind address inside the container. Bind 0.0.0.0 so the published - # port reaches it (the in-code default 127.0.0.1:8080 is loopback-only). - PEERBUS_LISTEN: "0.0.0.0:8080" + # port reaches it (the in-code default 127.0.0.1:47821 is loopback-only). + PEERBUS_LISTEN: "0.0.0.0:47821" # Comma-separated static bearer tokens. Provide via .env / secret store, # NOT inline. At least one token is required or the broker won't start. PEERBUS_TOKENS: "${PEERBUS_TOKENS:?set PEERBUS_TOKENS out-of-band}" diff --git a/deploy/peerbus-broker.run b/deploy/peerbus-broker.run index 8151031..0be02fa 100755 --- a/deploy/peerbus-broker.run +++ b/deploy/peerbus-broker.run @@ -37,8 +37,8 @@ if [ -r /etc/peerbus/broker.env ]; then fi # Bind a routable address so adapters on other machines can dial in. The -# in-code default (127.0.0.1:8080) is loopback-only. -export PEERBUS_LISTEN="${PEERBUS_LISTEN:-0.0.0.0:8080}" +# in-code default (127.0.0.1:47821) is loopback-only. +export PEERBUS_LISTEN="${PEERBUS_LISTEN:-0.0.0.0:47821}" # Durable SQLite store on persistent disk (survives restarts: the # at-least-once + audit guarantee). Match the path to your volume mount. diff --git a/docs/integrations/generic-adapter.md b/docs/integrations/generic-adapter.md index 1b03f02..f796f18 100644 --- a/docs/integrations/generic-adapter.md +++ b/docs/integrations/generic-adapter.md @@ -1,11 +1,5 @@ # Integration: the generic MCP adapter (`peerbus adapter --adapter=generic`) -> **v0.2.0 rename.** v0.1.0 invoked this as `peerbus-adapter -> --adapter=generic`; v0.2.0 ships ONE multi-command `peerbus` binary, so the -> invocation is now `peerbus adapter --adapter=generic` (`adapter` is a -> subcommand; `--adapter=` is its flag). Flags, env vars, and behaviour -> are otherwise unchanged. - How any agent runtime — Hermes, OpenClaw, Codex CLI, a bespoke bot — joins the peerbus fabric. This is the universal path: every agent except a real interactive Claude Code session uses it. (Claude Code has its own push-wake @@ -42,7 +36,7 @@ Register the adapter as a stdio MCP server in the host's MCP config. Shape: "command": "peerbus", "args": ["adapter", "--adapter=generic"], "env": { - "PEERBUS_URL": "ws://broker-host:8080", + "PEERBUS_URL": "ws://broker-host:47821", "PEERBUS_NAME": "hermes-prod", "PEERBUS_TOKEN": "", "PEERBUS_HMAC_SECRET": "" @@ -75,7 +69,7 @@ The generic adapter advertises exactly four tools: | --------------- | ------------------------ | ----------------------------------------------------------------------------------------------- | | `bus.send` | `to` (string), `body` (object) | Direct message to one peer. Body is opaque application JSON, hashed verbatim. | | `bus.broadcast` | `body` (object) | Fan-out to every currently-registered peer except yourself. No backfill for late joiners. | -| `bus.peers` | — | List the peers currently registered on the bus. | +| `bus.peers` | — | Return `{self, peers}`: this adapter's bound name plus the other peers currently registered. | | `bus.drain` | — | Return **and acknowledge** every message received since the last drain. | `bus.drain` is the entire inbound path for a generic peer. It returns each diff --git a/internal/adapter/cc.go b/internal/adapter/cc.go index 08160ef..b4d094b 100644 --- a/internal/adapter/cc.go +++ b/internal/adapter/cc.go @@ -3,6 +3,7 @@ package adapter import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "os" @@ -68,28 +69,31 @@ func (b *ccBus) Broadcast(ctx context.Context, body json.RawMessage) error { // Peers mirrors the generic adapter's no-second-reader pattern: the resume // loop is the SOLE WS reader, so install a one-shot sink the Recv loop -// forwards the peers reply to rather than reading frames here. -func (b *ccBus) Peers(ctx context.Context) ([]string, error) { +// forwards the peers reply to rather than reading frames here. Returns +// (self, peers, err); see channel.OutboundBus / mcp.Bus for the shape +// rationale. +func (b *ccBus) Peers(ctx context.Context) (string, []string, error) { + self := b.rc.Name() c := b.rc.Client() if c == nil { - return nil, ErrNotConnected + return self, nil, ErrNotConnected } sink := make(chan []string, 1) c.SetPeersSink(sink) defer c.SetPeersSink(nil) if err := c.RequestPeers(ctx); err != nil { - return nil, err + return self, nil, err } select { case names := <-sink: - return names, nil + return self, filterSelf(names, self), nil case <-ctx.Done(): - return nil, ctx.Err() + return self, nil, ctx.Err() case <-time.After(peersReplyTimeout): // MAJOR-6: bound the wait. Without this a peers reply lost across // a reconnect would block until ctx cancel (forever for a // long-lived cc session). - return nil, fmt.Errorf("adapter: peers reply timed out") + return self, nil, fmt.Errorf("adapter: peers reply timed out") } } @@ -105,6 +109,7 @@ func (b *ccBus) handle(_ context.Context, env wire.Envelope) error { ID: env.ID, From: env.From, Source: env.Source, + Kind: string(env.Kind), Body: env.Body, }) return nil @@ -121,27 +126,96 @@ type ccMode struct { func (m *ccMode) Name() string { return "cc" } -// Run wires the channel server to a fresh ResumingClient, auto-registers a -// unique peer name when none was configured (cc2cc-parity ergonomics; see -// channel.UniqueName for the scheme), starts the resume/dedupe/HMAC loop in +// nameCollisionRetries bounds the on-startup name-rotation attempts when a +// freshly-minted friendly name happens to be claimed under a different +// bearer token (an "essentially impossible" event given the keyspace; see +// channel.UniqueName). 6 attempts is a conservative safety backstop: +// independent collisions on each retry are vanishingly unlikely, but +// bounding the loop prevents a misconfigured environment (e.g. a hostile +// token-sharing setup) from spinning forever. +const nameCollisionRetries = 6 + +// Run wires the channel server to a fresh ResumingClient, auto-mints a +// friendly peer name when none was configured (channel.UniqueName), retries +// up to nameCollisionRetries times on the broker's name-claimed rejection +// (collision-safety backstop), then starts the resume/dedupe/HMAC loop in // the background (each inbound delivery becomes a claude/channel push) and -// serves the stdio MCP protocol in the foreground. When stdin closes the -// MCP server returns and the resume loop is cancelled. +// serves the stdio MCP protocol in the foreground. On every successful +// register the adapter emits ONE system-kind notification carrying the +// bound name (channel.Server.AnnounceSelf) so the consuming Claude session +// always knows its own bus identity from turn 1 — no separate bus.whoami +// round-trip required. When stdin closes the MCP server returns and the +// resume loop is cancelled. func (m *ccMode) Run(ctx context.Context) error { + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) + cfg := m.cfg if cfg.Name == "" { cfg.Name = channel.UniqueName() } - log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) ctx, cancel := context.WithCancel(ctx) defer cancel() + // Probe-register up front with name rotation on ErrNameClaimed. Doing + // this BEFORE wiring the resume loop avoids a forever-redial spin if + // the chosen name is permanently rejected (different-token claim). We + // close the probe connection on success: the resume loop redials and + // performs its OWN register under the now-validated name. Same-token + // takeover (this peer reconnecting) is the entire resume mechanism so + // the probe close is harmless. + for attempt := 0; ; attempt++ { + probe := NewClient(cfg) + err := probe.Connect(ctx) + if err == nil { + probe.Close() + break + } + if !errors.Is(err, ErrNameClaimed) { + return err + } + if attempt >= nameCollisionRetries { + return fmt.Errorf("cc: %d friendly-name rotations all rejected (last name %q): %w", + nameCollisionRetries+1, cfg.Name, err) + } + log.Warn("cc: name claimed under different token, rotating", + "name", cfg.Name, "attempt", attempt+1) + // Operator override (PEERBUS_NAME) is honoured verbatim and + // MUST NOT be rotated — a permanent rejection there is a config + // problem, not a collision the adapter can paper over. + if os.Getenv("PEERBUS_NAME") != "" { + return fmt.Errorf("cc: PEERBUS_NAME=%q rejected: %w", cfg.Name, err) + } + cfg.Name = channel.UniqueName() + } + rc := NewResumingClient(cfg, m.dedupeSize) bus := &ccBus{rc: rc} srv := channel.NewServer(bus, os.Stdin, os.Stdout) bus.srv = srv + // Self-announcement: emit ONE system-kind claude/channel push per + // successful (re)register, BUT only after the MCP client has signalled + // notifications/initialized. Claude Code silently drops + // claude/channel notifications received before the handshake completes + // (CHANNELS_SCHEMA.md §3) — that was the original bug where the + // connected-as banner never appeared in turn 1 and the consuming agent + // fell back to memsearch for a stale name. SetOnConnect runs its + // callback in a fresh goroutine, so the <-initialized wait does not + // stall the resume loop's Recv pump. Re-announcing on each reconnect + // (rather than only the first) keeps the banner reliable across + // mid-session broker drops — by the time a reconnect happens the + // session is already past initialized, so the wait is a no-op. + initialized := srv.Initialized() + rc.SetOnConnect(func() { + select { + case <-initialized: + case <-ctx.Done(): + return + } + srv.AnnounceSelf(rc.Name()) + }) + done := make(chan struct{}) go func() { defer close(done) diff --git a/internal/adapter/cc_test.go b/internal/adapter/cc_test.go new file mode 100644 index 0000000..71a59a3 --- /dev/null +++ b/internal/adapter/cc_test.go @@ -0,0 +1,133 @@ +package adapter + +import ( + "context" + "errors" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/nnemirovsky/peerbus/internal/broker" + "github.com/nnemirovsky/peerbus/internal/store" +) + +// TestClientConnect_NameClaimedSurfacesTyped: a fresh Client connecting +// under a name that is already bound under a DIFFERENT bearer token +// receives the broker's "name claimed under a different token" policy +// violation and Client.Connect surfaces it as ErrNameClaimed (not as the +// generic "register rejected or no ack" wrapper). This is the typed +// signal the cc adapter's collision-retry loop keys off when it rotates +// to a fresh friendly name. +func TestClientConnect_NameClaimedSurfacesTyped(t *testing.T) { + f2 := newBrokerFixtureWithTokens(t, []string{"tok", "tok2"}) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Holder binds "shared-name" under token "tok". + holder := NewClient(ClientConfig{ + URL: f2.wsURL, Token: "tok", Name: "shared-name", HMACSecret: testSecret, + }) + if err := holder.Connect(ctx); err != nil { + t.Fatalf("holder connect: %v", err) + } + defer holder.Close() + + // Challenger tries the same name under DIFFERENT token "tok2". + challenger := NewClient(ClientConfig{ + URL: f2.wsURL, Token: "tok2", Name: "shared-name", HMACSecret: testSecret, + }) + err := challenger.Connect(ctx) + if err == nil { + challenger.Close() + t.Fatalf("challenger connected; want ErrNameClaimed") + } + if !errors.Is(err, ErrNameClaimed) { + t.Fatalf("challenger connect err = %v, want errors.Is(_, ErrNameClaimed)", err) + } +} + +// TestResumingClient_NameClaimedShortCircuits: ResumingClient.connect (the +// reconnect/resume entry point) propagates ErrNameClaimed up instead of +// retrying with backoff. A permanent rejection MUST NOT spin the redial +// loop forever — the embedding mode rotates the name and starts a fresh +// resuming client. +func TestResumingClient_NameClaimedShortCircuits(t *testing.T) { + f := newBrokerFixtureWithTokens(t, []string{"tok", "tok2"}) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + holder := NewClient(ClientConfig{ + URL: f.wsURL, Token: "tok", Name: "claimed", HMACSecret: testSecret, + }) + if err := holder.Connect(ctx); err != nil { + t.Fatalf("holder connect: %v", err) + } + defer holder.Close() + + rc := NewResumingClient(ClientConfig{ + URL: f.wsURL, Token: "tok2", Name: "claimed", HMACSecret: testSecret, + }, 16) + start := time.Now() + _, err := rc.connect(ctx) + if err == nil { + t.Fatalf("connect succeeded; want ErrNameClaimed") + } + if !errors.Is(err, ErrNameClaimed) { + t.Fatalf("connect err = %v, want errors.Is(_, ErrNameClaimed)", err) + } + if elapsed := time.Since(start); elapsed > 2*time.Second { + t.Fatalf("connect took %v; expected near-immediate short-circuit", elapsed) + } +} + +// TestResumingClient_NameAccessor: ResumingClient.Name() returns the +// configured name (the value used in register and the value bus.peers +// reports as `self`). +func TestResumingClient_NameAccessor(t *testing.T) { + rc := NewResumingClient(ClientConfig{Name: "my-name"}, 0) + if got := rc.Name(); got != "my-name" { + t.Fatalf("Name() = %q, want my-name", got) + } +} + +// TestFilterSelf: filterSelf removes self from a name slice without +// mutating the input (used by bus.Peers implementations to suppress the +// caller from its own peer list). +func TestFilterSelf(t *testing.T) { + in := []string{"alpha", "self", "beta", "self"} + out := filterSelf(in, "self") + if len(out) != 2 || out[0] != "alpha" || out[1] != "beta" { + t.Fatalf("filterSelf = %v, want [alpha beta]", out) + } + if len(in) != 4 || in[1] != "self" { + t.Fatalf("filterSelf mutated input: %v", in) + } + // Empty self => identity. + if got := filterSelf([]string{"x", "y"}, ""); len(got) != 2 || got[0] != "x" { + t.Fatalf("filterSelf with empty self = %v, want [x y]", got) + } +} + +// newBrokerFixtureWithTokens is a variant of newBrokerFixture (defined in +// client_test.go) that accepts a custom token list so a test can bind two +// peers under DIFFERENT tokens against the same broker — required for +// driving the name-claimed-under-different-token rejection path. +func newBrokerFixtureWithTokens(t *testing.T, tokens []string) *brokerFixture { + t.Helper() + st, err := store.Open(":memory:") + if err != nil { + t.Fatalf("open store: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + srv := broker.NewServer(broker.NewAuthenticator(tokens), broker.NewRegistry(), st, nil) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + return &brokerFixture{ + t: t, + st: st, + srv: srv, + hs: hs, + wsURL: "ws" + strings.TrimPrefix(hs.URL, "http"), + } +} diff --git a/internal/adapter/client.go b/internal/adapter/client.go index 89ab9dd..7525b4a 100644 --- a/internal/adapter/client.go +++ b/internal/adapter/client.go @@ -59,6 +59,15 @@ var ( // Envelope.ID would ack the original signed id and never clear a // broadcast row, redelivering it forever). ErrMissingDeliveryKey = errors.New("adapter: broker deliver frame missing delivery_key") + // ErrNameClaimed signals that a register attempt was rejected because + // the chosen name is already bound under a DIFFERENT bearer token + // (broker's registry.ErrNameClaimed). The cc adapter uses this to + // trigger a name-rotation retry on startup (collision-safety backstop + // for the friendly-name generator — see channel.UniqueName). It is + // surfaced by detecting the broker's StatusPolicyViolation + + // "name claimed" close reason because the broker closes the WS rather + // than sending a structured rejection frame. + ErrNameClaimed = errors.New("adapter: peer name already claimed under a different token") ) // Client is a single broker WebSocket connection for one peer. It performs @@ -152,6 +161,17 @@ func (c *Client) Connect(ctx context.Context) error { typ, data, err := ws.Read(ctx) if err != nil { _ = ws.CloseNow() + // Map the broker's "name claimed under a different token" close + // (StatusPolicyViolation + matching reason) onto the typed + // ErrNameClaimed so callers can react (the cc adapter rotates + // to a fresh friendly name). Other policy-violations (bad token, + // empty name, bad protocol_version) remain opaque — they are + // configuration errors, not collision-retry candidates. + var ce websocket.CloseError + if errors.As(err, &ce) && ce.Code == websocket.StatusPolicyViolation && + ce.Reason == "name claimed under a different token" { + return fmt.Errorf("%w: %v", ErrNameClaimed, err) + } return fmt.Errorf("adapter: register rejected or no ack: %w", err) } if typ != websocket.MessageText { diff --git a/internal/adapter/generic.go b/internal/adapter/generic.go index 229e31c..91bc935 100644 --- a/internal/adapter/generic.go +++ b/internal/adapter/generic.go @@ -105,24 +105,31 @@ func (b *genericBus) Broadcast(ctx context.Context, body json.RawMessage) error // the WS connection, so this must NOT read frames itself (two readers split // frames and deadlock). It installs a one-shot sink the Recv loop forwards // the peers reply to, writes the request, and waits for the sink. -func (b *genericBus) Peers(ctx context.Context) ([]string, error) { +// +// Returns (self, peers, err). self is THIS adapter's bound name (b.rc.Name() +// — stable across reconnects); peers is the broker registry with self +// filtered out so the consuming agent does not see itself in its own peer +// list. The combined {self, peers} shape lets bus.peers also serve as +// whoami without a second tool call. +func (b *genericBus) Peers(ctx context.Context) (string, []string, error) { + self := b.rc.Name() c := b.rc.Client() if c == nil { - return nil, ErrNotConnected + return self, nil, ErrNotConnected } sink := make(chan []string, 1) c.SetPeersSink(sink) defer c.SetPeersSink(nil) if err := c.RequestPeers(ctx); err != nil { - return nil, err + return self, nil, err } select { case names := <-sink: - return names, nil + return self, filterSelf(names, self), nil case <-ctx.Done(): - return nil, ctx.Err() + return self, nil, ctx.Err() case <-time.After(peersReplyTimeout): - return nil, fmt.Errorf("adapter: peers reply timed out") + return self, nil, fmt.Errorf("adapter: peers reply timed out") } } diff --git a/internal/adapter/reconnect.go b/internal/adapter/reconnect.go index 8883ed6..3f5e973 100644 --- a/internal/adapter/reconnect.go +++ b/internal/adapter/reconnect.go @@ -67,6 +67,15 @@ type ResumingClient struct { // previous fully-constructed pointer or the new one, never a torn or // partially-published value. (CRITICAL-3 data-race fix.) cur atomic.Pointer[Client] + + // onConnect, if non-nil, is invoked after every successful (re)register + // — once for the initial connect and once per reconnect. The cc adapter + // uses this to re-emit its claude/channel self-announce so the + // consuming session sees its name even if a mid-session broker drop + // forced a redial. Fired in a fresh goroutine so a slow callback (e.g. + // one waiting on MCP notifications/initialized) cannot block the + // resume loop's Recv pump. + onConnect atomic.Pointer[func()] } // NewResumingClient builds a ResumingClient. dedupeSize bounds the shared @@ -91,8 +100,48 @@ func (rc *ResumingClient) Dedupe() *Dedupe { return rc.dedupe } // redial; callers should treat ErrNotConnected as transient. func (rc *ResumingClient) Client() *Client { return rc.cur.Load() } +// SetOnConnect installs a callback fired after every successful (re)register +// — both the initial register and every subsequent reconnect. Pass nil to +// clear. The callback runs in its own goroutine so a blocking callback (e.g. +// one waiting on MCP notifications/initialized) does not stall the resume +// loop's Recv pump. Calling SetOnConnect concurrently with Run is safe; the +// next successful connect observes the latest value. +func (rc *ResumingClient) SetOnConnect(fn func()) { + if fn == nil { + rc.onConnect.Store(nil) + return + } + rc.onConnect.Store(&fn) +} + +// Name returns the peer name this resuming client registers under. It is +// stable across reconnects (same-name re-register is the resume mechanism) +// and is the value the broker sees as the "from" of every outbound +// envelope. Useful for self-identification (bus.peers' {self, peers} +// shape) without an extra broker round-trip. +func (rc *ResumingClient) Name() string { return rc.cfg.Name } + +// filterSelf returns names with self removed (used so bus.peers does not +// list the caller in its own peer list). Allocates a new slice; the input +// is left untouched. +func filterSelf(names []string, self string) []string { + if self == "" { + return names + } + out := make([]string, 0, len(names)) + for _, n := range names { + if n != self { + out = append(out, n) + } + } + return out +} + // connect (re)establishes a live Client with bounded backoff, returning -// once connected or when ctx is done. +// once connected or when ctx is done. A name-claimed-under-different-token +// rejection is surfaced verbatim (ErrNameClaimed) so the embedding mode can +// rotate to a fresh name rather than spinning the redial loop forever +// against a permanent rejection. func (rc *ResumingClient) connect(ctx context.Context) (*Client, error) { backoff := baseBackoff for { @@ -102,8 +151,15 @@ func (rc *ResumingClient) connect(ctx context.Context) (*Client, error) { // Publish the fully-constructed, connected Client with a single // atomic Store (happens-before for any concurrent Client()). rc.cur.Store(c) + if cb := rc.onConnect.Load(); cb != nil { + // Fire in its own goroutine — see SetOnConnect rationale. + go (*cb)() + } return c, nil } + if errors.Is(err, ErrNameClaimed) { + return nil, err + } if ctx.Err() != nil { return nil, ctx.Err() } diff --git a/internal/broker/config.go b/internal/broker/config.go index dc1e3ee..57e0abb 100644 --- a/internal/broker/config.go +++ b/internal/broker/config.go @@ -41,9 +41,15 @@ const ( EnvDBPath = "PEERBUS_DB" ) -// DefaultListenAddr is used when neither the struct nor the environment sets a -// listen address. -const DefaultListenAddr = "127.0.0.1:8080" +// DefaultListenAddr is used when neither the struct nor the environment sets +// a listen address. 47821 was chosen because it is outside the IANA +// well-known/registered range hot-spots (8080 is the prototypical "I'm +// already running a tutorial here" port), is absent from macOS +// /etc/services, and sits well below the OS ephemeral range so a bound +// listener is unlikely to race a randomly-assigned outbound socket. Bound +// to loopback by default — operators wanting cross-host access set +// PEERBUS_LISTEN=0.0.0.0:47821 explicitly. +const DefaultListenAddr = "127.0.0.1:47821" // parseTokens splits a comma-separated token list, trimming whitespace and // dropping empties. diff --git a/internal/broker/config_test.go b/internal/broker/config_test.go new file mode 100644 index 0000000..951e2b0 --- /dev/null +++ b/internal/broker/config_test.go @@ -0,0 +1,67 @@ +package broker + +import ( + "strings" + "testing" + + bhmac "github.com/nnemirovsky/peerbus/internal/hmac" +) + +// validSecret is a non-secret 32-byte fixture matching the hmac minimum. +func validSecret() []byte { + return []byte(strings.Repeat("config-test-", 4)[:bhmac.MinSecretLen]) +} + +// TestDefaultListenAddr documents the broker's default listen address. The +// constant is part of the operator contract (README + deploy manifest) so a +// silent change here would mis-document the deployment. +func TestDefaultListenAddr(t *testing.T) { + if DefaultListenAddr != "127.0.0.1:47821" { + t.Fatalf("DefaultListenAddr = %q, want 127.0.0.1:47821", DefaultListenAddr) + } +} + +// TestLoadConfig_DefaultListenAddrApplied: a Config that omits ListenAddr +// (and no PEERBUS_LISTEN env override) gets DefaultListenAddr applied. The +// rest of the required fields must be valid or LoadConfig errors before +// reaching the default-fill branch. +func TestLoadConfig_DefaultListenAddrApplied(t *testing.T) { + // Defensively unset env that LoadConfig would honour. + t.Setenv("PEERBUS_LISTEN", "") + t.Setenv("PEERBUS_TOKENS", "") + t.Setenv("PEERBUS_HMAC_SECRET", "") + t.Setenv("PEERBUS_DB", "") + + cfg, err := LoadConfig(Config{ + Tokens: []string{"tok"}, + HMACSecret: validSecret(), + }) + if err != nil { + t.Fatalf("LoadConfig: %v", err) + } + if cfg.ListenAddr != DefaultListenAddr { + t.Fatalf("ListenAddr = %q, want %q", cfg.ListenAddr, DefaultListenAddr) + } +} + +// TestLoadConfig_EnvListenOverride: PEERBUS_LISTEN, when set, overrides +// both the struct default and the constant DefaultListenAddr (env-overrides +// -struct precedence — see LoadConfig docs). +func TestLoadConfig_EnvListenOverride(t *testing.T) { + t.Setenv("PEERBUS_LISTEN", "0.0.0.0:9001") + t.Setenv("PEERBUS_TOKENS", "") + t.Setenv("PEERBUS_HMAC_SECRET", "") + t.Setenv("PEERBUS_DB", "") + + cfg, err := LoadConfig(Config{ + ListenAddr: "127.0.0.1:1234", + Tokens: []string{"tok"}, + HMACSecret: validSecret(), + }) + if err != nil { + t.Fatalf("LoadConfig: %v", err) + } + if cfg.ListenAddr != "0.0.0.0:9001" { + t.Fatalf("ListenAddr = %q, want env override 0.0.0.0:9001", cfg.ListenAddr) + } +} diff --git a/internal/channel/channel.go b/internal/channel/channel.go index f926413..92fb0eb 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -20,13 +20,29 @@ // by the SHARED internal/adapter machinery — see internal/adapter/cc.go) is // mapped to a JSON-RPC notification `notifications/claude/channel` with // -// params = { content: , -// meta: { from, source, msg_id } } (all meta values strings) +// params = { content: , +// meta: { from, source, msg_id, kind } } // // emitted via mcp.Server.Notify (the additive server->client path). meta // keys are identifier-safe (letters/digits/underscore only) per // CHANNELS_SCHEMA.md §3 — keys with hyphens are silently dropped by Claude -// Code, so we use from / source / msg_id. +// Code, so we use from / source / msg_id / kind. The content shape is a +// single line `📨 from : ""` — flat by design +// because Claude Code's renderer collapses embedded newlines into spaces +// and then truncates with an ellipsis, so a multi-line banner is wasted +// vertical space. Claude Code's UI prefixes the notification with the MCP +// server name (rendered as `peerbus: `), so the word "peerbus" +// inside the content would be duplicated noise — it is omitted. See +// formatInbound. +// +// On every successful broker (re)register the cc adapter emits ONE +// system-kind notification (kind="system", content "📡 connected as +// ") so the consuming agent immediately knows its own bus name +// without an explicit bus.whoami round-trip — see AnnounceSelf. The push is +// gated on the MCP client having sent notifications/initialized: Claude +// Code silently drops claude/channel notifications received before the +// handshake completes (CHANNELS_SCHEMA.md §3), so a pre-handshake announce +// would never reach turn 1 of the session. // // Outbound (reply path): standard MCP tools/list + tools/call exposing // bus.send / bus.broadcast / bus.peers — the SAME tool surface and @@ -45,7 +61,6 @@ package channel import ( "context" "crypto/rand" - "encoding/hex" "encoding/json" "fmt" "io" @@ -58,21 +73,31 @@ import ( // to. internal/adapter/cc.go implements it over the shared resuming broker // client (HMAC sign + reconnect/resume) — the channel layer never touches // the broker, HMAC, or dedupe itself. +// +// Peers returns (self, peers, err): self is THIS adapter's bound peer name +// (so bus.peers can echo it back in the shaped {self, peers} result without +// a separate bus.whoami round-trip); peers is the broker registry sans the +// caller's own entry (filtered at the bus implementation — the broker +// returns the full registry including this peer, and exposing yourself in +// "peers" is confusing for the consuming agent). type OutboundBus interface { Send(ctx context.Context, to string, body json.RawMessage) error Broadcast(ctx context.Context, body json.RawMessage) error - Peers(ctx context.Context) ([]string, error) + Peers(ctx context.Context) (self string, peers []string, err error) } // Inbound is one already-HMAC-verified, already-deduped delivery the cc // adapter pushes into the session. Source is the envelope `source` (e.g. // "peer-bus" — the tag the consuming agent's prompt keys escalation off; -// peerbus itself has no such logic). Body is the opaque application JSON -// verbatim. +// peerbus itself has no such logic). Kind is the envelope `kind` ("msg" or +// "broadcast") so the channel layer can surface it as the XML +// kind attribute without re-decoding the body. Body is the opaque +// application JSON verbatim. type Inbound struct { ID string From string Source string + Kind string Body json.RawMessage } @@ -100,7 +125,7 @@ func (b busAdapter) Broadcast(ctx context.Context, body json.RawMessage) error { return b.out.Broadcast(ctx, body) } -func (b busAdapter) Peers(ctx context.Context) ([]string, error) { +func (b busAdapter) Peers(ctx context.Context) (string, []string, error) { return b.out.Peers(ctx) } @@ -132,51 +157,170 @@ func (s *Server) Serve(ctx context.Context) error { return s.mcp.Serve(ctx) } // Deliver maps one inbound broker delivery to a claude/channel push-wake // notification and emits it (DOCUMENTED — CHANNELS_SCHEMA.md §3). content is -// the message body as text; meta carries identifier-safe string attributes -// (from / source / msg_id) that Claude Code surfaces as XML -// attributes. The body is opaque JSON: if it is a JSON string we unwrap it -// to its text so the session sees plain text, otherwise the compact JSON is -// passed through verbatim. +// a single-line human-readable summary of the inbound message (see +// formatInbound); meta carries identifier-safe string attributes +// (from / source / msg_id / kind) that Claude Code surfaces as XML +// attributes. +// +// "kind" is "msg" for direct messages and "broadcast" for fan-outs; the +// channel layer takes the kind from the inbound envelope so the consuming +// agent's prompt can branch on it without re-parsing the body. func (s *Server) Deliver(in Inbound) { s.mcp.Notify(PushMethod, PushParams{ - Content: bodyAsText(in.Body), + Content: formatInbound(in), Meta: map[string]string{ "from": in.From, "source": in.Source, "msg_id": in.ID, + "kind": in.Kind, + }, + }) +} + +// AnnounceSelf emits a single system-kind claude/channel notification +// telling the session what peer name this adapter bound under. meta.kind is +// "system" so the consuming agent can ignore it from human-style escalation +// logic. +// +// CALLER CONTRACT: this is the unconditional push primitive. The cc adapter +// MUST gate the call on Initialized() — Claude Code silently drops +// server-initiated notifications received before the client signals +// notifications/initialized (CHANNELS_SCHEMA.md §3), so a pre-handshake +// announce never reaches turn 1 of the session. See +// internal/adapter/cc.go's Run for the wiring. +func (s *Server) AnnounceSelf(self string) { + s.mcp.Notify(PushMethod, PushParams{ + Content: fmt.Sprintf("\U0001F4E1 connected as %s", self), + Meta: map[string]string{ + "kind": "system", + "self": self, }, }) } -// bodyAsText renders an opaque body for the channel `content` string. A JSON -// string body is unwrapped to its raw text (so Claude sees the message, not -// a quoted JSON literal); anything else is passed through as compact JSON. -func bodyAsText(body json.RawMessage) string { +// Initialized returns a channel that is closed once the MCP client has sent +// notifications/initialized (the handshake completion signal). The cc +// adapter's startup self-announce waits on this before pushing — see +// AnnounceSelf's caller contract. +func (s *Server) Initialized() <-chan struct{} { return s.mcp.Initialized() } + +// formatInbound renders the single-line channel content from one inbound +// delivery. Shape: `📨 from : ""`. The format is +// flat by design — Claude Code's renderer collapses embedded newlines into +// spaces and then truncates with an ellipsis, so a multi-line banner is +// wasted vertical space (observed live in 2-session test). Claude Code's +// UI also prefixes the notification with the MCP server name (rendered as +// `peerbus: `), so the word "peerbus" inside the body would be +// duplicated noise — it is omitted. The "kind" token is "msg" for direct +// messages and "broadcast" for fan-outs (defaults to "msg" if unset for +// safety — an older sender that pre-dates the kind field still renders +// cleanly). The decoded body is wrapped in literal ASCII double-quotes; +// any `"` inside the body is left as-is (readability over correctness for +// a banner string). +func formatInbound(in Inbound) string { + kind := in.Kind + if kind == "" { + kind = "msg" + } + return fmt.Sprintf("\U0001F4E8 %s from %s: \"%s\"", kind, in.From, decodeBody(in.Body)) +} + +// decodeBody renders an opaque body for the pretty channel content. Rules, +// applied IN ORDER: +// +// 1. body is a JSON string -> unwrap to the inner string. +// 2. body is a JSON object containing "text" / "message" / "content" +// (first match wins) -> use that field's value (stringify if non-string). +// 3. otherwise -> compact-encode the body JSON as-is. +// +// An empty body renders as the empty string. +func decodeBody(body json.RawMessage) string { if len(body) == 0 { return "" } + // (1) plain JSON string. var s string if err := json.Unmarshal(body, &s); err == nil { return s } + // (2) JSON object with a known text-bearing field. + var obj map[string]json.RawMessage + if err := json.Unmarshal(body, &obj); err == nil { + for _, key := range []string{"text", "message", "content"} { + if raw, ok := obj[key]; ok { + var str string + if err := json.Unmarshal(raw, &str); err == nil { + return str + } + // non-string field value: pass through as compact JSON. + return string(raw) + } + } + } + // (3) compact JSON verbatim. return string(body) } -// UniqueName generates a stable-ish unique peer name for auto-registration -// (cc2cc-parity ergonomics) when no name is configured. Scheme: -// -// cc---<6 hex random> +// nameSuffixAlphabet is the base36 alphabet used for the 3-char random +// suffix in UniqueName. Lowercase to match the rest of the name. +const nameSuffixAlphabet = "0123456789abcdefghijklmnopqrstuvwxyz" + +// nameSuffixLen is the length of the random suffix in a generated name. +// 3 chars in base36 = 46656 distinct suffixes per (adjective, noun) pair. +// Combined with ~200 adjectives and ~200 nouns the total keyspace is +// ~1.86 billion combinations, so a same-token collision (two adapters +// independently minting the same name and then needing to reconcile via +// the registry's same-token takeover) is essentially impossible. The +// adapter's collision-retry loop on ErrNameClaimed (a different-token +// claim) is therefore a defence-in-depth backstop, not the primary safety +// mechanism — see internal/adapter/cc.go. +const nameSuffixLen = 3 + +// UniqueName mints a friendly, lowercase peer name in the shape +// "--<3 base36>" (e.g. "wild-wasp-3kx"). It honours the +// PEERBUS_NAME environment variable verbatim when set (operator override), +// otherwise draws fresh entropy via crypto/rand. // -// hostname+pid make it readable and naturally distinct per session/host; -// the random suffix breaks ties if two sessions on the same host race the -// same pid reuse across a restart. Documented here so the scheme is part of -// the contract, not an implementation accident. +// The scheme replaces the older "cc---" identifier — +// friendlier to read in logs / lists / Claude Code's tag while +// keeping a huge keyspace. func UniqueName() string { - host, err := os.Hostname() - if err != nil || host == "" { - host = "unknown" + if override := os.Getenv("PEERBUS_NAME"); override != "" { + return override + } + return generateName() +} + +// generateName produces a fresh "--<3 base36>" name. +// Split out of UniqueName so the env-override path can be tested +// independently of the random draw. +func generateName() string { + adj := pickWord(nameAdjectives) + noun := pickWord(nameNouns) + return fmt.Sprintf("%s-%s-%s", adj, noun, randomSuffix()) +} + +// pickWord picks one entry uniformly at random from words. The list must +// be non-empty (the curated wordlist.go guarantees this). +func pickWord(words []string) string { + var b [8]byte + _, _ = rand.Read(b[:]) + // 64-bit unbiased index: modulo a non-power-of-two introduces a + // negligible bias for our list sizes (~200), well below the noise + // floor of name collisions we already tolerate. + n := uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | + uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 + return words[n%uint64(len(words))] +} + +// randomSuffix returns a nameSuffixLen-char lowercase base36 string drawn +// from crypto/rand. +func randomSuffix() string { + var b [nameSuffixLen]byte + _, _ = rand.Read(b[:]) + out := make([]byte, nameSuffixLen) + for i, v := range b { + out[i] = nameSuffixAlphabet[int(v)%len(nameSuffixAlphabet)] } - var r [3]byte - _, _ = rand.Read(r[:]) - return fmt.Sprintf("cc-%s-%d-%s", host, os.Getpid(), hex.EncodeToString(r[:])) + return string(out) } diff --git a/internal/channel/channel_test.go b/internal/channel/channel_test.go index 52868e8..d251728 100644 --- a/internal/channel/channel_test.go +++ b/internal/channel/channel_test.go @@ -78,24 +78,31 @@ func (b *ccBus) Broadcast(ctx context.Context, body json.RawMessage) error { return c.Broadcast(ctx, "bc-"+time.Now().Format("150405.000000000"), time.Now().UTC().Format(time.RFC3339Nano), "peer-bus", body) } -func (b *ccBus) Peers(ctx context.Context) ([]string, error) { +func (b *ccBus) Peers(ctx context.Context) (string, []string, error) { + self := b.rc.Name() c := b.rc.Client() if c == nil { - return nil, adapter.ErrNotConnected + return self, nil, adapter.ErrNotConnected } sink := make(chan []string, 1) c.SetPeersSink(sink) defer c.SetPeersSink(nil) if err := c.RequestPeers(ctx); err != nil { - return nil, err + return self, nil, err } select { case names := <-sink: - return names, nil + out := make([]string, 0, len(names)) + for _, n := range names { + if n != self { + out = append(out, n) + } + } + return self, out, nil case <-ctx.Done(): - return nil, ctx.Err() + return self, nil, ctx.Err() case <-time.After(5 * time.Second): - return nil, context.DeadlineExceeded + return self, nil, context.DeadlineExceeded } } @@ -128,7 +135,10 @@ func newHarness(t *testing.T, f *brokerFixture, name string) *harness { go func() { defer close(loopDone) _ = rc.Run(ctx, func(_ context.Context, env wire.Envelope) error { - srv.Deliver(channel.Inbound{ID: env.ID, From: env.From, Source: env.Source, Body: env.Body}) + srv.Deliver(channel.Inbound{ + ID: env.ID, From: env.From, Source: env.Source, + Kind: string(env.Kind), Body: env.Body, + }) return nil }) }() @@ -171,7 +181,7 @@ func newHarness(t *testing.T, f *brokerFixture, name string) *harness { // wait for a live broker connection so injected messages are not lost. deadline := time.Now().Add(3 * time.Second) for time.Now().Before(deadline) { - if _, err := bus.Peers(ctx); err == nil { + if _, _, err := bus.Peers(ctx); err == nil { break } time.Sleep(20 * time.Millisecond) @@ -363,8 +373,11 @@ func TestNotificationMapping(t *testing.T) { if pf.Method != "notifications/claude/channel" { t.Fatalf("method = %q, want notifications/claude/channel", pf.Method) } - if pf.Params.Content != "hello from tx" { - t.Fatalf("content = %q, want %q", pf.Params.Content, "hello from tx") + // Single-line content: `📨 from : ""`. Body is a JSON + // string ("hello from tx"), so decodeBody unwraps it to that text. + want := "\U0001F4E8 msg from tx: \"hello from tx\"" + if pf.Params.Content != want { + t.Fatalf("content = %q, want %q", pf.Params.Content, want) } if pf.Params.Meta["from"] != "tx" { t.Fatalf("meta.from = %q, want tx", pf.Params.Meta["from"]) @@ -375,6 +388,9 @@ func TestNotificationMapping(t *testing.T) { if pf.Params.Meta["msg_id"] != "msg-1" { t.Fatalf("meta.msg_id = %q, want msg-1", pf.Params.Meta["msg_id"]) } + if pf.Params.Meta["kind"] != "msg" { + t.Fatalf("meta.kind = %q, want msg", pf.Params.Meta["kind"]) + } // Re-send the SAME id: broker dedups by UNIQUE(id) so it never even // reaches the adapter again -> no second push. @@ -395,7 +411,8 @@ func TestNotificationMapping(t *testing.T) { if err := json.Unmarshal(frame2, &pf2); err != nil { t.Fatalf("push2 decode: %v (%s)", err, frame2) } - if pf2.Params.Content != "second" || pf2.Params.Meta["msg_id"] != "msg-2" { + if !strings.Contains(pf2.Params.Content, `: "second"`) || + pf2.Params.Meta["msg_id"] != "msg-2" { t.Fatalf("unexpected second push: %s", frame2) } } @@ -440,8 +457,8 @@ func TestForgedInboundSkipped(t *testing.T) { if err := json.Unmarshal(frame, &pf); err != nil { t.Fatalf("legit push decode: %v (%s)", err, frame) } - if pf.Params.Content != "legit" { - t.Fatalf("content = %q, want legit", pf.Params.Content) + if !strings.Contains(pf.Params.Content, `: "legit"`) { + t.Fatalf("content = %q, want body 'legit' in single-line format", pf.Params.Content) } } @@ -457,17 +474,23 @@ func TestOutboundTools(t *testing.T) { h := newHarness(t, f, "cc-sender") - // bus.peers lists the registry. + // bus.peers lists the registry as {self, peers}. st, isErr, rpcErr := h.callTool("bus.peers", nil) if rpcErr != nil || isErr { t.Fatalf("bus.peers failed: rpcErr=%v isErr=%v", rpcErr, isErr) } + if self, _ := st["self"].(string); self != "cc-sender" { + t.Fatalf("bus.peers self = %v, want cc-sender", st["self"]) + } peers, _ := st["peers"].([]any) found := false for _, p := range peers { if p == "peer2" { found = true } + if p == "cc-sender" { + t.Fatalf("bus.peers must not include self; got %v", peers) + } } if !found { t.Fatalf("bus.peers missing peer2: %v", st["peers"]) @@ -521,7 +544,9 @@ func TestOutboundTools(t *testing.T) { } } -// TestUniqueName: auto-registration mints distinct, non-empty names. +// TestUniqueName: auto-registration mints distinct, lowercase +// --<3 base36> names. The exact corpus is intentionally +// an implementation detail; only the shape is contractual. func TestUniqueName(t *testing.T) { a := channel.UniqueName() b := channel.UniqueName() @@ -529,9 +554,353 @@ func TestUniqueName(t *testing.T) { t.Fatalf("UniqueName returned empty (a=%q b=%q)", a, b) } if a == b { - t.Fatalf("UniqueName not unique: %q == %q", a, b) + t.Fatalf("UniqueName not unique across two calls: %q == %q", a, b) + } + for _, n := range []string{a, b} { + parts := strings.Split(n, "-") + if len(parts) != 3 { + t.Fatalf("UniqueName %q: want three hyphen-separated parts, got %d", n, len(parts)) + } + if got := len(parts[2]); got != 3 { + t.Fatalf("UniqueName %q: suffix len = %d, want 3", n, got) + } + for _, p := range parts { + if p == "" { + t.Fatalf("UniqueName %q: empty segment", n) + } + for _, r := range p { + if (r < 'a' || r > 'z') && (r < '0' || r > '9') { + t.Fatalf("UniqueName %q: non-[a-z0-9] char %q", n, r) + } + } + } + } +} + +// TestUniqueName_EnvOverride: PEERBUS_NAME, when set, is honoured verbatim +// (operator escape hatch — bypasses the friendly-name generator). +func TestUniqueName_EnvOverride(t *testing.T) { + t.Setenv("PEERBUS_NAME", "fixed-operator-name") + if got := channel.UniqueName(); got != "fixed-operator-name" { + t.Fatalf("PEERBUS_NAME override: got %q, want fixed-operator-name", got) + } +} + +// TestAnnounceSelf: the startup self-announcement is ONE +// notifications/claude/channel push with the correct content + meta +// (kind=system, self=). The cc adapter fires this once per session +// after a successful register so the consuming Claude session immediately +// knows its identity. +func TestAnnounceSelf(t *testing.T) { + f := newBrokerFixture(t) + h := newHarness(t, f, "cc-announce") + h.srv.AnnounceSelf("cc-announce") + frame := h.readFrame() + var pf pushFrame + if err := json.Unmarshal(frame, &pf); err != nil { + t.Fatalf("decode: %v (%s)", err, frame) + } + if pf.Method != "notifications/claude/channel" { + t.Fatalf("method = %q, want notifications/claude/channel", pf.Method) + } + want := "\U0001F4E1 connected as cc-announce" + if pf.Params.Content != want { + t.Fatalf("content = %q, want %q", pf.Params.Content, want) } - if !strings.HasPrefix(a, "cc-") { - t.Fatalf("UniqueName missing cc- prefix: %q", a) + if pf.Params.Meta["kind"] != "system" { + t.Fatalf("meta.kind = %q, want system", pf.Params.Meta["kind"]) + } + if pf.Params.Meta["self"] != "cc-announce" { + t.Fatalf("meta.self = %q, want cc-announce", pf.Params.Meta["self"]) + } +} + +// ── self-announce gating: mirrors internal/adapter/cc.go wiring ── +// +// gatedHarness spins up the same triple as ccMode.Run — a real channel.Server +// over pipes, a real ResumingClient against a real in-process broker, the +// resume loop wired to forward inbound deliveries as channel pushes, AND the +// SetOnConnect(<-srv.Initialized()) -> AnnounceSelf gate that is the subject +// of these tests. The host side does NOT auto-send notifications/initialized +// (unlike newHarness); each test drives the handshake explicitly so it can +// observe what does and does not arrive before initialized. +type gatedHarness struct { + t *testing.T + in *io.PipeWriter + out *bufio.Reader + frames chan json.RawMessage + srv *channel.Server + rc *adapter.ResumingClient + stop func() + cancel context.CancelFunc + loopDone <-chan struct{} + nextID int +} + +func newGatedHarness(t *testing.T, f *brokerFixture, name string) *gatedHarness { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + + rc := adapter.NewResumingClient(f.cfg(name), 64) + bus := &ccBus{rc: rc} + + inR, inW := io.Pipe() + outR, outW := io.Pipe() + srv := channel.NewServer(bus, inR, outW) + + // EXACTLY the gate cc.go installs: announce on every successful + // (re)register, but only after MCP notifications/initialized. + initialized := srv.Initialized() + rc.SetOnConnect(func() { + select { + case <-initialized: + case <-ctx.Done(): + return + } + srv.AnnounceSelf(rc.Name()) + }) + + loopDone := make(chan struct{}) + go func() { + defer close(loopDone) + _ = rc.Run(ctx, func(_ context.Context, env wire.Envelope) error { + srv.Deliver(channel.Inbound{ + ID: env.ID, From: env.From, Source: env.Source, + Kind: string(env.Kind), Body: env.Body, + }) + return nil + }) + }() + + serveDone := make(chan struct{}) + go func() { + defer close(serveDone) + _ = srv.Serve(ctx) + }() + + h := &gatedHarness{ + t: t, in: inW, out: bufio.NewReader(outR), + frames: make(chan json.RawMessage, 16), + srv: srv, rc: rc, cancel: cancel, loopDone: loopDone, + } + + readerDone := make(chan struct{}) + go func() { + defer close(readerDone) + for { + line, err := h.out.ReadBytes('\n') + if len(line) > 0 { + h.frames <- json.RawMessage(strings.TrimRight(string(line), "\r\n")) + } + if err != nil { + return + } + } + }() + + h.stop = func() { + _ = inW.Close() + cancel() + <-serveDone + <-loopDone + _ = outW.Close() + <-readerDone + } + t.Cleanup(h.stop) + return h +} + +func (h *gatedHarness) sendReq(method string, params any) json.RawMessage { + h.t.Helper() + h.nextID++ + req := map[string]any{"jsonrpc": "2.0", "id": h.nextID, "method": method} + if params != nil { + req["params"] = params + } + b, _ := json.Marshal(req) + if _, err := h.in.Write(append(b, '\n')); err != nil { + h.t.Fatalf("write %s: %v", method, err) + } + return h.readFrame() +} + +func (h *gatedHarness) notify(method string) { + b, _ := json.Marshal(map[string]any{"jsonrpc": "2.0", "method": method}) + if _, err := h.in.Write(append(b, '\n')); err != nil { + h.t.Fatalf("notify %s: %v", method, err) + } +} + +func (h *gatedHarness) readFrame() json.RawMessage { + h.t.Helper() + select { + case f := <-h.frames: + return f + case <-time.After(5 * time.Second): + h.t.Fatalf("timed out waiting for a JSON-RPC frame") + return nil + } +} + +func (h *gatedHarness) readFrameNoFail(d time.Duration) (json.RawMessage, bool) { + select { + case f := <-h.frames: + return f, true + case <-time.After(d): + return nil, false + } +} + +// waitForRegister waits until the resuming client has a live broker +// connection (so the OnConnect callback is guaranteed to have fired at least +// once). Mirrors newHarness's bus.Peers ping but works without the MCP +// handshake having completed (peers is an outbound broker RPC, not an MCP +// tool call here). +func (h *gatedHarness) waitForRegister() { + h.t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if h.rc.Client() != nil { + return + } + time.Sleep(20 * time.Millisecond) + } + h.t.Fatalf("resuming client never registered") +} + +// TestAnnounceSelfGatedOnInitialized: AnnounceSelf MUST NOT fire before the +// MCP client has sent notifications/initialized. Even after a successful +// broker register (OnConnect has fired), no claude/channel push reaches +// stdout until the handshake completes — Claude Code silently drops +// server-initiated notifications received before initialized +// (CHANNELS_SCHEMA.md §3) and we'd be writing them into the void. +func TestAnnounceSelfGatedOnInitialized(t *testing.T) { + f := newBrokerFixture(t) + h := newGatedHarness(t, f, "cc-gated") + h.waitForRegister() + + // Broker register is complete; OnConnect has been invoked. NO MCP + // handshake has happened yet. Assert nothing arrives. + if frame, ok := h.readFrameNoFail(400 * time.Millisecond); ok { + t.Fatalf("announce leaked before notifications/initialized: %s", frame) + } + + // Now drive the handshake; the announce should land promptly. + resp := h.sendReq("initialize", map[string]any{"protocolVersion": "2025-06-18"}) + if !strings.Contains(string(resp), `"protocolVersion"`) { + t.Fatalf("initialize response unexpected: %s", resp) + } + h.notify("notifications/initialized") + + frame := h.readFrame() + var pf pushFrame + if err := json.Unmarshal(frame, &pf); err != nil { + t.Fatalf("announce decode: %v (%s)", err, frame) + } + if pf.Method != "notifications/claude/channel" { + t.Fatalf("method = %q, want notifications/claude/channel", pf.Method) + } + if pf.Params.Meta["kind"] != "system" || pf.Params.Meta["self"] != "cc-gated" { + t.Fatalf("meta = %v, want kind=system self=cc-gated", pf.Params.Meta) + } + if want := "\U0001F4E1 connected as cc-gated"; pf.Params.Content != want { + t.Fatalf("content = %q, want %q", pf.Params.Content, want) + } + + // Exactly ONE announce per register. No spurious second push. + if frame, ok := h.readFrameNoFail(300 * time.Millisecond); ok { + t.Fatalf("second announce after single register: %s", frame) + } +} + +// TestAnnounceSelfReannouncesOnReconnect: a broker drop + redial under the +// same name triggers a SECOND announce. The first connect's announce already +// fired (the session is past initialized), the resume loop reconnects on +// drop and OnConnect runs again — the wait on initialized is a no-op the +// second time so the banner lands immediately. Keeps the connected-as line +// reliable across mid-session broker flaps. +func TestAnnounceSelfReannouncesOnReconnect(t *testing.T) { + f := newBrokerFixture(t) + h := newGatedHarness(t, f, "cc-reconnect") + h.waitForRegister() + + // Complete the handshake; consume the first announce. + _ = h.sendReq("initialize", map[string]any{"protocolVersion": "2025-06-18"}) + h.notify("notifications/initialized") + first := h.readFrame() + var pf1 pushFrame + if err := json.Unmarshal(first, &pf1); err != nil { + t.Fatalf("first announce decode: %v (%s)", err, first) + } + if pf1.Params.Meta["self"] != "cc-reconnect" { + t.Fatalf("first announce self = %q, want cc-reconnect", pf1.Params.Meta["self"]) + } + + // Force a transport drop by closing the current Client. The resume + // loop redials under the same name and OnConnect fires again. + if c := h.rc.Client(); c != nil { + c.Close() + } + + // A second announce MUST arrive after the redial succeeds. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + frame, ok := h.readFrameNoFail(200 * time.Millisecond) + if !ok { + continue + } + var pf2 pushFrame + if err := json.Unmarshal(frame, &pf2); err != nil { + t.Fatalf("decode reconnect frame: %v (%s)", err, frame) + } + // Skip any pre-existing buffered frames (none expected here). + if pf2.Method == "notifications/claude/channel" && + pf2.Params.Meta["kind"] == "system" && + pf2.Params.Meta["self"] == "cc-reconnect" { + return + } + t.Fatalf("unexpected frame after reconnect: %s", frame) + } + t.Fatalf("no re-announce within 3s of reconnect") +} + +// TestPrettyContentDecoding exercises the three decode branches of the +// single-line content body decoder via direct Server.Deliver calls (the +// broker path is covered by the live-server tests above). Each branch maps +// the opaque body JSON to the quoted body in the single-line content. Also +// asserts the `broadcast` kind token renders correctly. +func TestPrettyContentDecoding(t *testing.T) { + cases := []struct { + name string + kind string + body string + want string // expected full content line + }{ + {"string-body", "msg", `"plain hello"`, "\U0001F4E8 msg from tx: \"plain hello\""}, + {"object-text-field", "msg", `{"text":"hi there"}`, "\U0001F4E8 msg from tx: \"hi there\""}, + {"object-message-field-broadcast", "broadcast", `{"message":"all hands"}`, "\U0001F4E8 broadcast from tx: \"all hands\""}, + {"object-content-field", "msg", `{"content":"yet another"}`, "\U0001F4E8 msg from tx: \"yet another\""}, + {"object-fallback", "msg", `{"foo":"bar"}`, "\U0001F4E8 msg from tx: \"{\"foo\":\"bar\"}\""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + f := newBrokerFixture(t) + h := newHarness(t, f, "cc-fmt-"+tc.name) + h.srv.Deliver(channel.Inbound{ + ID: "id-" + tc.name, From: "tx", Source: "peer-bus", + Kind: tc.kind, Body: json.RawMessage(tc.body), + }) + frame := h.readFrame() + var pf pushFrame + if err := json.Unmarshal(frame, &pf); err != nil { + t.Fatalf("decode: %v (%s)", err, frame) + } + if pf.Params.Content != tc.want { + t.Fatalf("content = %q, want %q", pf.Params.Content, tc.want) + } + if pf.Params.Meta["kind"] != tc.kind { + t.Fatalf("meta.kind = %q, want %q", pf.Params.Meta["kind"], tc.kind) + } + }) } } diff --git a/internal/channel/wordlist.go b/internal/channel/wordlist.go new file mode 100644 index 0000000..37dd4f8 --- /dev/null +++ b/internal/channel/wordlist.go @@ -0,0 +1,87 @@ +package channel + +// Curated word lists for friendly peer-name generation. The corpus is +// deliberately gentle, work-safe, and visually distinct: ~200 adjectives and +// ~200 nouns, all lowercase, no hyphens, no spaces. Combined with the 3-char +// base36 suffix (UniqueName) the keyspace is ~200 * 200 * 36^3 ≈ 1.86 * 10^9 +// — same-token collisions are essentially impossible in practice, so the +// adapter's collision-retry loop is a defence-in-depth backstop, not the +// primary safety mechanism. +// +// Add words by appending — order is irrelevant; nothing keys off index. + +// nameAdjectives is the adjective half of the friendly-name dictionary. +var nameAdjectives = []string{ + "agile", "amber", "ancient", "azure", "balmy", "bashful", "blissful", + "bold", "bouncy", "brave", "breezy", "bright", "brisk", "bronze", + "bubbly", "calm", "candid", "careful", "cheerful", "chipper", "clever", + "cosy", "cool", "copper", "cosmic", "crimson", "crisp", "curious", + "daring", "dapper", "dawn", "dazzling", "deft", "dewy", "diligent", + "dreamy", "dusky", "eager", "earnest", "easy", "elated", "electric", + "elegant", "emerald", "epic", "exotic", "fair", "fancy", "feisty", + "fearless", "festive", "fiery", "flaxen", "fleecy", "fleet", "fluffy", + "flying", "forest", "fresh", "friendly", "frosty", "funny", "gallant", + "gentle", "giddy", "gilded", "glassy", "gleaming", "glowing", "golden", + "graceful", "grand", "grateful", "grassy", "happy", "hardy", "harmonic", + "hazel", "hearty", "heroic", "honest", "humble", "icy", "indigo", + "jade", "jaunty", "jolly", "joyful", "jovial", "keen", "kind", + "lavender", "lean", "light", "lilac", "limber", "limpid", "lively", + "loyal", "lucky", "lumen", "lunar", "marble", "mellow", "merry", + "midnight", "mighty", "mild", "mindful", "minty", "misty", "modest", + "mossy", "mythic", "nautical", "neat", "nimble", "noble", "north", + "olive", "opal", "orange", "outer", "pacific", "patient", "peachy", + "pearl", "perky", "placid", "plucky", "plum", "polar", "polished", + "prairie", "prancing", "primal", "proper", "proud", "prudent", "quaint", + "quick", "quiet", "quirky", "radiant", "rapid", "raven", "ready", + "regal", "ringed", "river", "robust", "rosy", "ruby", "rugged", + "rustic", "sable", "sacred", "salty", "sandy", "sapphire", "savvy", + "scarlet", "scenic", "secret", "serene", "shady", "sharp", "shimmer", + "shiny", "silent", "silken", "silver", "skyward", "sleek", "smart", + "smiling", "smoky", "smooth", "snowy", "soaring", "solar", "solid", + "sound", "south", "sparkly", "spirited", "spry", "stalwart", "starry", + "steady", "stellar", "stoic", "stout", "summery", "sunlit", "sunny", + "super", "swift", "tangy", "teal", "tender", "thrifty", "tidy", "tidal", + "timely", "tranquil", "trusty", "twilight", "vast", "velvet", "verdant", + "vibrant", "vigil", "vintage", "violet", "vivid", "warm", "wandering", + "wavy", "whimsical", "wild", "willing", "windswept", "winsome", "wise", + "witty", "woolen", "yonder", "youthful", "zealous", "zen", "zesty", +} + +// nameNouns is the noun half of the friendly-name dictionary. +var nameNouns = []string{ + "acorn", "albatross", "alder", "alpaca", "amber", "anchor", "antler", + "apricot", "arrow", "ash", "aspen", "auk", "aurora", "axis", "azalea", + "badger", "bamboo", "banner", "barley", "basil", "beacon", "beaver", + "beetle", "bell", "birch", "bison", "blossom", "bluebell", "bobcat", + "boulder", "brambler", "branch", "brook", "buffalo", "bumble", "burrow", + "cabin", "cactus", "calla", "camel", "candle", "canyon", "cardinal", + "cedar", "chamois", "cheetah", "cherry", "chestnut", "cinder", "clover", + "coast", "cobalt", "comet", "compass", "condor", "coral", "cottage", + "cougar", "coyote", "cranberry", "crocus", "cypress", "daisy", "dandelion", + "deer", "delta", "dingo", "dolphin", "dragonfly", "drift", "dune", + "eagle", "ember", "emu", "estuary", "falcon", "fawn", "fennec", "fern", + "ferret", "field", "finch", "firefly", "flamingo", "flax", "fjord", + "foxglove", "frost", "galaxy", "garnet", "gazelle", "gecko", "gem", + "geyser", "ginger", "glacier", "glade", "gleam", "globe", "gopher", + "granite", "grebe", "grove", "gull", "harbor", "harmony", "harvest", + "hawk", "haven", "hazel", "heath", "heron", "hill", "hollow", "honey", + "hornbill", "horizon", "iris", "ivory", "ivy", "jackal", "jasper", + "jay", "junco", "juniper", "kelp", "kestrel", "kingfisher", "kit", + "koala", "lagoon", "lantern", "lark", "lavender", "leaf", "lemur", + "lichen", "lighthouse", "lily", "lime", "linden", "lion", "lupine", + "lynx", "magnolia", "mallow", "maple", "marigold", "marmot", "marsh", + "meadow", "mesa", "midge", "mink", "minnow", "mirage", "moose", "morel", + "moth", "mountain", "muffin", "narwhal", "nebula", "newt", "nimbus", + "nook", "nova", "oak", "oasis", "ocelot", "ocean", "olive", "onyx", + "opal", "orca", "orchard", "orchid", "osprey", "otter", "owl", "panda", + "pansy", "panther", "parrot", "partridge", "pebble", "pelican", "petal", + "phoenix", "pika", "pine", "plover", "plum", "pollen", "pond", "poppy", + "prairie", "puffin", "quail", "quartz", "quokka", "rabbit", "raccoon", + "raven", "reef", "reindeer", "ridge", "river", "robin", "rookery", "rose", + "sage", "salmon", "satellite", "savanna", "seal", "sequoia", "shore", + "shrike", "skunk", "skyline", "sparrow", "spire", "starling", "stork", + "stream", "summit", "swan", "tamarack", "teak", "thicket", "thistle", + "tortoise", "tulip", "tundra", "valley", "vega", "violet", "vista", + "vixen", "wallaby", "warbler", "waterfall", "willow", "wolf", "wombat", + "woodland", "wren", "yarrow", "yew", "zebra", "zephyr", "zinnia", +} diff --git a/internal/integration/parity_test.go b/internal/integration/parity_test.go index 455980c..ccc5090 100644 --- a/internal/integration/parity_test.go +++ b/internal/integration/parity_test.go @@ -92,7 +92,7 @@ type genericPeer struct { bus interface { // subset of mcp.Bus exercised here Send(ctx context.Context, to string, body json.RawMessage) error Broadcast(ctx context.Context, body json.RawMessage) error - Peers(ctx context.Context) ([]string, error) + Peers(ctx context.Context) (string, []string, error) } drain func(ctx context.Context) ([]drainMsg, error) stop func() @@ -135,7 +135,7 @@ func newGenericPeer(t *testing.T, f *brokerFixture, name string) *genericPeer { deadline := time.Now().Add(3 * time.Second) for time.Now().Before(deadline) { - if _, err := bus.Peers(ctx); err == nil { + if _, _, err := bus.Peers(ctx); err == nil { return p } time.Sleep(20 * time.Millisecond) @@ -185,24 +185,31 @@ func (b *ccBus) Broadcast(ctx context.Context, body json.RawMessage) error { time.Now().UTC().Format(time.RFC3339Nano), "peer-bus", body) } -func (b *ccBus) Peers(ctx context.Context) ([]string, error) { +func (b *ccBus) Peers(ctx context.Context) (string, []string, error) { + self := b.rc.Name() c := b.rc.Client() if c == nil { - return nil, adapter.ErrNotConnected + return self, nil, adapter.ErrNotConnected } sink := make(chan []string, 1) c.SetPeersSink(sink) defer c.SetPeersSink(nil) if err := c.RequestPeers(ctx); err != nil { - return nil, err + return self, nil, err } select { case names := <-sink: - return names, nil + out := make([]string, 0, len(names)) + for _, n := range names { + if n != self { + out = append(out, n) + } + } + return self, out, nil case <-ctx.Done(): - return nil, ctx.Err() + return self, nil, ctx.Err() case <-time.After(5 * time.Second): - return nil, context.DeadlineExceeded + return self, nil, context.DeadlineExceeded } } @@ -247,7 +254,10 @@ func newCCPeer(t *testing.T, f *brokerFixture, name string) (peer *ccPeer, regis go func() { defer close(loopDone) _ = rc.Run(ctx, func(_ context.Context, env wire.Envelope) error { - srv.Deliver(channel.Inbound{ID: env.ID, From: env.From, Source: env.Source, Body: env.Body}) + srv.Deliver(channel.Inbound{ + ID: env.ID, From: env.From, Source: env.Source, + Kind: string(env.Kind), Body: env.Body, + }) return nil }) }() @@ -290,7 +300,7 @@ func newCCPeer(t *testing.T, f *brokerFixture, name string) (peer *ccPeer, regis // Wait for a live broker connection so injected messages are not lost. deadline := time.Now().Add(3 * time.Second) for time.Now().Before(deadline) { - if _, err := bus.Peers(ctx); err == nil { + if _, _, err := bus.Peers(ctx); err == nil { break } time.Sleep(20 * time.Millisecond) @@ -373,32 +383,43 @@ func TestParity_AutoRegisterUniqueName(t *testing.T) { a := newGenericPeer(t, f, "alpha") b := newGenericPeer(t, f, "bravo") - names, err := a.bus.Peers(ctx) + self, names, err := a.bus.Peers(ctx) if err != nil { t.Fatalf("peers: %v", err) } + if self != "alpha" { + t.Fatalf("bus.Peers self = %q, want alpha", self) + } + // bus.Peers filters self out; only "bravo" should appear (an observing + // rawClient sees the full registry — see TestParity_CCAutoRegisterUniqueName). got := map[string]bool{} for _, n := range names { got[n] = true } - if !got["alpha"] || !got["bravo"] { - t.Fatalf("registry = %v, want alpha+bravo bound under their unique names", names) + if got["alpha"] { + t.Fatalf("bus.Peers must not list self %q; got %v", "alpha", names) + } + if !got["bravo"] { + t.Fatalf("registry = %v, want bravo bound under its unique name", names) } _ = b } // TestParity_CCAutoRegisterUniqueName: a cc adapter launched with NO -// configured name auto-registers a minted unique name (channel.UniqueName, -// cc---) and that name is visible in the broker registry — -// the cc-side of the cc2cc auto-register/unique-name parity row. +// configured name auto-registers a minted unique friendly name +// (channel.UniqueName: "--<3 base36>") and that name is +// visible in the broker registry — the cc-side of the cc2cc +// auto-register/unique-name parity row. func TestParity_CCAutoRegisterUniqueName(t *testing.T) { f := newBrokerFixture(t) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, name1 := newCCPeer(t, f, "") // "" => channel.UniqueName() - if !strings.HasPrefix(name1, "cc-") { - t.Fatalf("auto-registered name %q lacks cc- prefix", name1) + // Shape check: lowercase three-part hyphenated identifier. The exact + // adjective/noun corpus is intentionally an implementation detail. + if !looksLikeFriendlyName(name1) { + t.Fatalf("auto-registered name %q is not a -- shape", name1) } // A second auto-registered cc peer must mint a DISTINCT name. @@ -423,6 +444,28 @@ func TestParity_CCAutoRegisterUniqueName(t *testing.T) { } } +// looksLikeFriendlyName checks the auto-minted-name shape: +// lowercase letters / digits split into exactly three hyphenated parts. Used +// instead of pinning a specific corpus so the adjective/noun lists can grow +// without churning the parity test. +func looksLikeFriendlyName(s string) bool { + parts := strings.Split(s, "-") + if len(parts) != 3 { + return false + } + for _, p := range parts { + if p == "" { + return false + } + for _, r := range p { + if (r < 'a' || r > 'z') && (r < '0' || r > '9') { + return false + } + } + } + return true +} + // ── Row 2: peer discovery ── // TestParity_PeerDiscovery: a generic adapter sees every other live peer via @@ -436,16 +479,25 @@ func TestParity_PeerDiscovery(t *testing.T) { defer other.Close() asker := newGenericPeer(t, f, "asker") - names, err := asker.bus.Peers(ctx) + self, names, err := asker.bus.Peers(ctx) if err != nil { t.Fatalf("peers: %v", err) } + if self != "asker" { + t.Fatalf("bus.Peers self = %q, want asker", self) + } + // "asker" is filtered out by bus.Peers; only "discoverable" should + // surface (cc2cc's peer-discovery semantic is "see the OTHERS", not + // "see yourself in the list"). got := map[string]bool{} for _, n := range names { got[n] = true } - if !got["asker"] || !got["discoverable"] { - t.Fatalf("peers = %v, want asker+discoverable", names) + if got["asker"] { + t.Fatalf("peers must not include self; got %v", names) + } + if !got["discoverable"] { + t.Fatalf("peers = %v, want discoverable", names) } } @@ -784,13 +836,22 @@ func TestParity_CCPushWakeNotification(t *testing.T) { if pf.Method != "notifications/claude/channel" { t.Fatalf("method = %q, want notifications/claude/channel", pf.Method) } - if pf.Params.Content != "wake up, there is a decision to make" { - t.Fatalf("content = %q", pf.Params.Content) + // Single-line content (see internal/channel.formatInbound): + // `📨 from : ""`. The body is a JSON string so + // decodeBody unwraps it to plain text. The exact prefix is the + // contract; only assert it (the kind/from/body decoding is verified + // by internal/channel's unit tests in detail). + if !strings.HasPrefix(pf.Params.Content, "\U0001F4E8 msg from tx: ") { + t.Fatalf("content prefix = %q, want single-line banner", pf.Params.Content) + } + if !strings.Contains(pf.Params.Content, "wake up, there is a decision to make") { + t.Fatalf("content missing decoded body: %q", pf.Params.Content) } if pf.Params.Meta["from"] != "tx" || pf.Params.Meta["source"] != "peer-bus" || - pf.Params.Meta["msg_id"] != "wake-1" { - t.Fatalf("meta = %v, want from=tx source=peer-bus msg_id=wake-1", pf.Params.Meta) + pf.Params.Meta["msg_id"] != "wake-1" || + pf.Params.Meta["kind"] != "msg" { + t.Fatalf("meta = %v, want from=tx source=peer-bus msg_id=wake-1 kind=msg", pf.Params.Meta) } // EXACTLY ONE notification: a duplicate id never re-pushes (broker diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 997708b..3e2ed87 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -105,6 +105,17 @@ type Server struct { inCloser io.Closer initialized bool + // initializedCh is closed exactly once on the first + // notifications/initialized client message. Exposed via Initialized() + // so the cc adapter's self-announce can wait for the MCP handshake to + // complete before pushing a server-initiated notification — Claude Code + // silently drops claude/channel notifications received before the + // client has signalled initialized (CHANNELS_SCHEMA.md §3). Lazily + // allocated by Initialized() so the generic adapter (which never asks) + // pays nothing. + initializedCh chan struct{} + initializedOnce sync.Once + initializedMu sync.Mutex // serverName is the serverInfo.name advertised at initialize. Defaults // to the generic adapter name; the cc adapter overrides it. @@ -195,6 +206,45 @@ func (s *Server) Notify(method string, params any) { _ = s.out.Flush() } +// Initialized returns a channel that is closed once the client has sent the +// MCP notifications/initialized message (the standard MCP handshake +// completion signal). The channel is lazily allocated and idempotent: every +// caller observes the same channel and a second initialized notification is +// a no-op. The cc adapter waits on this before emitting its +// claude/channel self-announce notification — Claude Code silently drops +// server-initiated notifications that arrive before the client has signalled +// initialized (CHANNELS_SCHEMA.md §3). The generic adapter never asks and +// is unaffected. +func (s *Server) Initialized() <-chan struct{} { + s.initializedMu.Lock() + if s.initializedCh == nil { + s.initializedCh = make(chan struct{}) + // A late Initialized() call after the client already signalled + // initialized must observe an already-closed channel, otherwise the + // caller would block forever on a signal that has already fired. + if s.initialized { + close(s.initializedCh) + } + } + ch := s.initializedCh + s.initializedMu.Unlock() + return ch +} + +// signalInitialized closes the initialized channel exactly once. Safe to +// call before or after Initialized(): if the channel hasn't been allocated +// yet, the next Initialized() call will see s.initialized=true and allocate +// an already-closed channel. +func (s *Server) signalInitialized() { + s.initializedOnce.Do(func() { + s.initializedMu.Lock() + defer s.initializedMu.Unlock() + if s.initializedCh != nil { + close(s.initializedCh) + } + }) +} + // Serve runs the read/dispatch loop until ctx is cancelled, stdin reaches // EOF, or an unrecoverable framing error occurs. A clean EOF (host closed // the pipe) returns nil; ctx cancellation returns ctx.Err(). @@ -342,6 +392,7 @@ func (s *Server) dispatch(ctx context.Context, raw []byte) { s.handleInitialize(req) case "notifications/initialized", "initialized": s.initialized = true // notification — no response + s.signalInitialized() case "ping": if !isNotification { s.writeResult(req.ID, struct{}{}) diff --git a/internal/mcp/server_test.go b/internal/mcp/server_test.go index ed6f368..d06b7e5 100644 --- a/internal/mcp/server_test.go +++ b/internal/mcp/server_test.go @@ -110,7 +110,7 @@ func newWiredHarness(t *testing.T, f *brokerFixture, name string) *mcpHarness { // are not lost before register completes. deadline := time.Now().Add(3 * time.Second) for time.Now().Before(deadline) { - if _, err := bus.Peers(ctx); err == nil { + if _, _, err := bus.Peers(ctx); err == nil { break } time.Sleep(20 * time.Millisecond) @@ -340,13 +340,20 @@ func TestBusPeersLists(t *testing.T) { if rpcErr != nil || isErr { t.Fatalf("bus.peers failed: rpcErr=%v isErr=%v", rpcErr, isErr) } + if self, _ := structured["self"].(string); self != "asker" { + t.Fatalf("bus.peers self = %v, want asker", structured["self"]) + } peersAny, _ := structured["peers"].([]any) got := map[string]bool{} for _, p := range peersAny { got[p.(string)] = true } - if !got["asker"] || !got["other"] { - t.Fatalf("peers = %v, want asker+other", peersAny) + // Self is filtered out of the peers list (bus.peers' new shape). + if got["asker"] { + t.Fatalf("peers must not include self; got %v", peersAny) + } + if !got["other"] { + t.Fatalf("peers = %v, want other", peersAny) } } @@ -637,7 +644,7 @@ type idleBus struct{} func (idleBus) Send(context.Context, string, json.RawMessage) error { return nil } func (idleBus) Broadcast(context.Context, json.RawMessage) error { return nil } -func (idleBus) Peers(context.Context) ([]string, error) { return nil, nil } +func (idleBus) Peers(context.Context) (string, []string, error) { return "", nil, nil } func (idleBus) Drain(context.Context) ([]mcp.InboundMessage, error) { return nil, nil } // TestServeReturnsPromptlyOnCtxCancelWithIdleStdin is the MAJOR-R4 regression: @@ -646,6 +653,76 @@ func (idleBus) Drain(context.Context) ([]mcp.InboundMessage, error) { return nil // an idle pipe, so a SIGTERM never made Serve (and thus the adapter mode's // Run) return — it hung until SIGKILL. Serve must now return ~immediately // when ctx is cancelled even with a blocked, idle input. +// TestInitializedChannel: Server.Initialized() is open before the client +// signals initialized and closes exactly when the +// notifications/initialized client message is dispatched. A second +// initialized notification is idempotent (close-of-closed channel would +// panic — sync.Once guards it). +func TestInitializedChannel(t *testing.T) { + pr, pw := io.Pipe() + t.Cleanup(func() { _ = pw.Close() }) + srv := mcp.NewServer(idleBus{}, pr, io.Discard) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go func() { _ = srv.Serve(ctx) }() + + ch := srv.Initialized() + select { + case <-ch: + t.Fatalf("Initialized() closed before notifications/initialized") + default: + } + + if _, err := pw.Write([]byte(`{"jsonrpc":"2.0","method":"notifications/initialized"}` + "\n")); err != nil { + t.Fatalf("write initialized: %v", err) + } + + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatalf("Initialized() did not close within 2s of notifications/initialized") + } + + // Same channel handed out on every call. + if got := srv.Initialized(); got != ch { + t.Fatalf("Initialized() returned a different channel on second call") + } + + // Idempotent: a second initialized notification must not panic on + // close-of-closed. + if _, err := pw.Write([]byte(`{"jsonrpc":"2.0","method":"initialized"}` + "\n")); err != nil { + t.Fatalf("write second initialized: %v", err) + } + time.Sleep(50 * time.Millisecond) +} + +// TestInitializedChannelLateSubscriber: Initialized() called AFTER the +// client has already signalled initialized must return an already-closed +// channel rather than block forever. (Important for the cc adapter wiring +// where the OnConnect callback may resolve after handshake.) +func TestInitializedChannelLateSubscriber(t *testing.T) { + pr, pw := io.Pipe() + t.Cleanup(func() { _ = pw.Close() }) + srv := mcp.NewServer(idleBus{}, pr, io.Discard) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go func() { _ = srv.Serve(ctx) }() + + if _, err := pw.Write([]byte(`{"jsonrpc":"2.0","method":"notifications/initialized"}` + "\n")); err != nil { + t.Fatalf("write initialized: %v", err) + } + // Give the dispatcher a moment to process before we subscribe. + time.Sleep(100 * time.Millisecond) + + select { + case <-srv.Initialized(): + case <-time.After(2 * time.Second): + t.Fatalf("late Initialized() did not observe already-closed channel") + } +} + func TestServeReturnsPromptlyOnCtxCancelWithIdleStdin(t *testing.T) { // An io.Pipe reader with NO writer blocks readMessage indefinitely — // exactly the idle-stdin condition. *io.PipeReader is an io.Closer, so diff --git a/internal/mcp/tools.go b/internal/mcp/tools.go index 1a4783f..1553d76 100644 --- a/internal/mcp/tools.go +++ b/internal/mcp/tools.go @@ -20,8 +20,11 @@ type Bus interface { // Broadcast signs and fans a message out to every currently-registered // peer except this one (no backfill). Broadcast(ctx context.Context, body json.RawMessage) error - // Peers returns the broker's current peer registry. - Peers(ctx context.Context) ([]string, error) + // Peers returns this adapter's bound peer name (self) AND the + // broker's current peer registry sans self. bus.peers shapes the + // result as {self, peers} so the consuming agent immediately knows + // its own bus identity without a separate bus.whoami round-trip. + Peers(ctx context.Context) (self string, peers []string, err error) // Drain returns every inbound message buffered since the last drain — // already HMAC-verified and already filtered through the SHARED dedupe // cache — and acks each one back to the broker. A repeat delivery of an @@ -96,7 +99,7 @@ func toolsListResult(hideDrain bool) map[string]any { }, { "name": "bus.peers", - "description": "List the peers currently registered on the bus.", + "description": "Return {self, peers}: this adapter's own bus name (self) and the names of other peers currently registered on the bus.", "inputSchema": map[string]any{ "type": "object", "properties": map[string]any{}, @@ -185,14 +188,14 @@ func (s *Server) callTool(ctx context.Context, name string, args json.RawMessage return toolResult(map[string]any{"broadcast": true}), nil case "bus.peers": - names, err := s.bus.Peers(ctx) + self, names, err := s.bus.Peers(ctx) if err != nil { return nil, err } if names == nil { names = []string{} } - return toolResult(map[string]any{"peers": names}), nil + return toolResult(map[string]any{"self": self, "peers": names}), nil case "bus.drain": msgs, err := s.bus.Drain(ctx)