diff --git a/.github/workflows/ci-agy-acp.yml b/.github/workflows/ci-agy-acp.yml new file mode 100644 index 00000000..13d78606 --- /dev/null +++ b/.github/workflows/ci-agy-acp.yml @@ -0,0 +1,36 @@ +name: "CI: agy-acp" + +on: + pull_request: + paths: + - "agy-acp/**" + workflow_dispatch: + +env: + CARGO_TERM_COLOR: always + +jobs: + ci: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + agy-acp/target + key: agy-acp-${{ runner.os }}-${{ hashFiles('agy-acp/Cargo.lock') }} + + - name: Build + working-directory: agy-acp + run: cargo build --release + + - name: Run tests + working-directory: agy-acp + run: cargo test -- --include-ignored --skip e2e diff --git a/.github/workflows/docker-smoke-test.yml b/.github/workflows/docker-smoke-test.yml index 71d410f9..fd360c79 100644 --- a/.github/workflows/docker-smoke-test.yml +++ b/.github/workflows/docker-smoke-test.yml @@ -8,37 +8,7 @@ on: - 'Cargo.*' jobs: - build-binary: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v6 - - - name: Build openab binary (once) - run: | - mkdir -p .binaries - DOCKER_BUILDKIT=1 docker build --target builder -t openab-builder -f Dockerfile . - CID=$(docker create openab-builder) - docker cp "$CID:/build/target/release/openab" .binaries/openab - docker rm "$CID" - - - name: Build agy-acp adapter (for Dockerfile.antigravity) - run: | - if [ -f Dockerfile.antigravity ]; then - DOCKER_BUILDKIT=1 docker build --target adapter-builder -t agy-acp-builder -f Dockerfile.antigravity . - CID=$(docker create agy-acp-builder) - docker cp "$CID:/build/target/release/agy-acp" .binaries/agy-acp - docker rm "$CID" - fi - - - name: Upload binaries - uses: actions/upload-artifact@v4 - with: - name: openab-binary - path: .binaries/ - retention-days: 1 - smoke-test: - needs: build-binary strategy: fail-fast: false matrix: @@ -59,39 +29,8 @@ jobs: steps: - uses: actions/checkout@v6 - - name: Download pre-built binary - uses: actions/download-artifact@v4 - with: - name: openab-binary - path: .pre-built - - - name: Build image (skip Rust compilation) - run: | - chmod +x .pre-built/* - DF="${{ matrix.variant.dockerfile }}" - - if grep -q "FROM rust" "$DF"; then - # Create fake builder context with pre-built binary at the expected path - mkdir -p .fake-builder/build/target/release - cp .pre-built/openab .fake-builder/build/target/release/openab - - BUILD_CONTEXTS="--build-context builder=.fake-builder" - - # Handle additional named Rust stages (e.g. adapter-builder) - grep -i '^FROM rust' "$DF" | sed -n 's/.*AS \([a-zA-Z0-9_-]*\).*/\1/p' | grep -v '^builder$' | while read stage; do - mkdir -p ".fake-${stage}/build/target/release" - cp .pre-built/* ".fake-${stage}/build/target/release/" - echo "--build-context ${stage}=.fake-${stage}" - done > /tmp/extra-contexts.txt - - if [ -s /tmp/extra-contexts.txt ]; then - BUILD_CONTEXTS="$BUILD_CONTEXTS $(cat /tmp/extra-contexts.txt | tr '\n' ' ')" - fi - - docker buildx build $BUILD_CONTEXTS -t openab-test${{ matrix.variant.suffix }} -f "$DF" . - else - docker buildx build -t openab-test${{ matrix.variant.suffix }} -f "$DF" . - fi + - name: Build image + run: docker build -t openab-test${{ matrix.variant.suffix }} -f ${{ matrix.variant.dockerfile }} . - name: Verify openab CMD does not crash run: | @@ -109,6 +48,7 @@ jobs: run: | INIT='{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","clientCapabilities":{},"clientInfo":{"name":"ci-test","version":"0.0.1"}}}' + # Start agent in background, send init, capture output with timeout CID=$(docker run -d -i --entrypoint sh openab-test${{ matrix.variant.suffix }} -c 'exec ${{ matrix.variant.agent }} ${{ matrix.variant.agent_args }} 2>/dev/null') echo "$INIT" | docker attach --no-stdin=false "$CID" & sleep 5 diff --git a/Dockerfile.antigravity b/Dockerfile.antigravity index 2a742632..612a044d 100644 --- a/Dockerfile.antigravity +++ b/Dockerfile.antigravity @@ -18,16 +18,16 @@ RUN touch src/main.rs && cargo build --release FROM debian:bookworm-slim RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl procps tini unzip && rm -rf /var/lib/apt/lists/* -# Install agy (Google Antigravity CLI) +# Install agy (Google Antigravity CLI) — pinned to v1.0.4 +ENV AGY_VERSION=1.0.4 RUN ARCH=$(dpkg --print-architecture) && \ case "$ARCH" in \ - amd64) PLATFORM="linux_amd64" ;; \ - arm64) PLATFORM="linux_arm64" ;; \ + amd64) ASSET="agy_cli_linux_x64.tar.gz" ;; \ + arm64) ASSET="agy_cli_linux_arm64.tar.gz" ;; \ *) echo "unsupported arch: $ARCH" && exit 1 ;; \ esac && \ - MANIFEST_URL="https://antigravity-cli-auto-updater-974169037036.us-central1.run.app/manifests/${PLATFORM}.json" && \ - DOWNLOAD_URL=$(curl -fsSL "$MANIFEST_URL" | grep -o '"url": *"[^"]*"' | cut -d'"' -f4) && \ - curl -fsSL "$DOWNLOAD_URL" | tar -xz -C /usr/local/bin && \ + curl -fsSL "https://github.com/google-antigravity/antigravity-cli/releases/download/${AGY_VERSION}/${ASSET}" \ + | tar -xz -C /usr/local/bin && \ mv /usr/local/bin/antigravity /usr/local/bin/agy && \ chmod +x /usr/local/bin/agy diff --git a/agy-acp/Cargo.lock b/agy-acp/Cargo.lock index 190cbe15..b7e79455 100644 --- a/agy-acp/Cargo.lock +++ b/agy-acp/Cargo.lock @@ -7,12 +7,25 @@ name = "agy-acp" version = "0.1.0" dependencies = [ "fs2", + "rusqlite", "serde", "serde_json", "tokio", "uuid", ] +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -21,15 +34,15 @@ checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" [[package]] name = "bitflags" -version = "2.11.1" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +checksum = "84d7ced0ae9557296835c32bf1b1e02b44c746701f898460fb000d7eaa84f00a" [[package]] name = "bumpalo" -version = "3.20.2" +version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" [[package]] name = "bytes" @@ -37,6 +50,16 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cc" +version = "1.2.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f" +dependencies = [ + "find-msvc-tools", + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -59,6 +82,24 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + [[package]] name = "foldhash" version = "0.1.5" @@ -112,6 +153,15 @@ dependencies = [ "wasip3", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -127,6 +177,15 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.5.0" @@ -159,9 +218,9 @@ checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "js-sys" -version = "0.3.98" +version = "0.3.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67df7112613f8bfd9150013a0314e196f4800d3201ae742489d999db2f979f08" +checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11" dependencies = [ "cfg-if", "futures-util", @@ -181,6 +240,17 @@ version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "libsqlite3-sys" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "lock_api" version = "0.4.14" @@ -192,21 +262,21 @@ dependencies = [ [[package]] name = "log" -version = "0.4.29" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +checksum = "113b30b4cd05f7c06868fdb2854f66a7b9fece9a48425351cd532e810d74024f" [[package]] name = "memchr" -version = "2.8.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +checksum = "6b947ae49db0d222b1dbc6b113ce7248a3fc3a6ca21b696717bfc000ba4484d8" [[package]] name = "mio" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +checksum = "02bd0af71c67b473010cbbc60715ee815645a4dc942899111f494b4b737d6fda" dependencies = [ "libc", "wasi", @@ -248,6 +318,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "prettyplease" version = "0.2.37" @@ -291,6 +367,20 @@ dependencies = [ "bitflags", ] +[[package]] +name = "rusqlite" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -352,6 +442,12 @@ dependencies = [ "zmij", ] +[[package]] +name = "shlex" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" + [[package]] name = "signal-hook-registry" version = "1.4.8" @@ -376,9 +472,9 @@ checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] name = "socket2" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", "windows-sys", @@ -437,15 +533,27 @@ checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" [[package]] name = "uuid" -version = "1.23.1" +version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" dependencies = [ "getrandom", "js-sys", "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -472,9 +580,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49ace1d07c165b0864824eee619580c4689389afa9dc9ed3a4c75040d82e6790" +checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409" dependencies = [ "cfg-if", "once_cell", @@ -485,9 +593,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e68e6f4afd367a562002c05637acb8578ff2dea1943df76afb9e83d177c8578" +checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -495,9 +603,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d95a9ec35c64b2a7cb35d3fead40c4238d0940c86d107136999567a4703259f2" +checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e" dependencies = [ "bumpalo", "proc-macro2", @@ -508,9 +616,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4e0100b01e9f0d03189a92b96772a1fb998639d981193d7dbab487302513441" +checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437" dependencies = [ "unicode-ident", ] @@ -680,6 +788,26 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "zerocopy" +version = "0.8.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b065d4f0e55f82fae73202e189638116a87c55ab6b8e6c2721e13dd9d854ad1" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b631b19d36a892ab55420c92dbc83ccd79274f25be714855d3074aa71cab639" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/agy-acp/Cargo.toml b/agy-acp/Cargo.toml index fa98b8f5..adea791b 100644 --- a/agy-acp/Cargo.toml +++ b/agy-acp/Cargo.toml @@ -11,3 +11,4 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" uuid = { version = "1", features = ["v4"] } fs2 = "0.4.3" +rusqlite = { version = "=0.31.0", features = ["bundled"] } diff --git a/agy-acp/README.md b/agy-acp/README.md new file mode 100644 index 00000000..151859d5 --- /dev/null +++ b/agy-acp/README.md @@ -0,0 +1,64 @@ +# agy-acp + +ACP (Agent Client Protocol) adapter for [Antigravity CLI](https://github.com/google-antigravity/antigravity-cli). Bridges `agy` into OpenAB's stdio JSON-RPC protocol. + +## How it works + +``` +openab ──JSON-RPC──► agy-acp ──spawns──► agy -p "prompt" + │ + ├─ Tracks conversation IDs via SQLite .db files + ├─ Extracts responses from protobuf step_payload (field 20.1) + └─ Persists session state for multi-turn conversations +``` + +## Build + +```bash +cargo build --release +``` + +## Tests + +```bash +# Unit tests +cargo test + +# All tests including filesystem I/O tests +cargo test -- --include-ignored + +# E2E test (requires agy in PATH + auth) +cargo test e2e -- --ignored --nocapture +``` + +### E2E requirements + +The E2E test spawns `agy-acp` → `agy` and verifies a full round-trip prompt/response. + +| Requirement | Local dev | CI | +|---|---|---| +| `agy` binary | `~/.local/bin/agy` | Downloaded from GitHub release | +| Auth | macOS Keychain (existing login) | `GEMINI_API_KEY` env var | + +**Local setup:** +```bash +# Install agy v1.0.4+ +gh release download 1.0.4 --repo google-antigravity/antigravity-cli \ + --pattern "agy_cli_mac_arm64.tar.gz" --dir /tmp +tar -xzf /tmp/agy_cli_mac_arm64.tar.gz -C ~/.local/bin/ +ln -sf ~/.local/bin/antigravity ~/.local/bin/agy + +# Run e2e +export PATH="$HOME/.local/bin:$PATH" +cargo test e2e -- --ignored --nocapture +``` + +**CI:** The GitHub Actions workflow (`.github/workflows/e2e-agy-acp.yml`) handles everything automatically. It uses the `GEMINI_API_KEY` repo secret. + +### Updating the API key + +```bash +gh secret set GEMINI_API_KEY --repo openabdev/openab +``` + +Get a free key from https://aistudio.google.com/apikey — the e2e sends one short prompt per run so cost is negligible. diff --git a/agy-acp/src/main.rs b/agy-acp/src/main.rs index 4d4fc371..4f9406d4 100644 --- a/agy-acp/src/main.rs +++ b/agy-acp/src/main.rs @@ -1,10 +1,9 @@ use fs2::FileExt; +use rusqlite::Connection; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::{HashMap, HashSet}; -use std::collections::hash_map::DefaultHasher; use std::fs; -use std::hash::{Hash, Hasher}; use std::io::{self, BufRead, Write}; use std::path::PathBuf; use tokio::process::Command; @@ -44,19 +43,15 @@ struct SessionStore { #[derive(Debug, Clone, Serialize, Deserialize)] struct StoredSession { conversation_id: Option, + /// Last step idx read from SQLite; used for delta extraction. #[serde(default)] - prev_line_count: usize, - /// Hash of the last line at prev_line_count boundary; detects same-length rewrites. - #[serde(default)] - prev_last_line_hash: u64, + last_step_idx: i64, } struct Session { conversation_id: Option, - /// Number of lines already delivered to the caller; used for delta extraction. - prev_line_count: usize, - /// Hash of the last line at prev_line_count boundary. - prev_last_line_hash: u64, + /// Last step idx read from SQLite. + last_step_idx: i64, } struct Adapter { @@ -110,18 +105,16 @@ impl Adapter { self.load_store_inner() } - /// Try to restore conversation_id, prev_line_count, and prev_last_line_hash from persisted state. - fn restore_session(&self, session_id: &str) -> Option<(String, usize, u64)> { + /// Try to restore conversation_id and last_step_idx from persisted state. + fn restore_session(&self, session_id: &str) -> Option<(String, i64)> { let store = self.load_store(); store.sessions.get(session_id).and_then(|s| { - s.conversation_id - .clone() - .map(|cid| (cid, s.prev_line_count, s.prev_last_line_hash)) + s.conversation_id.clone().map(|cid| (cid, s.last_step_idx)) }) } /// Persist a session binding (read-modify-write under single lock). - fn persist_session(&self, session_id: &str, conversation_id: Option<&str>, prev_line_count: usize, prev_last_line_hash: u64) { + fn persist_session(&self, session_id: &str, conversation_id: Option<&str>, last_step_idx: i64) { let Some(_lock) = self.lock_state_file() else { return; }; @@ -130,8 +123,7 @@ impl Adapter { session_id.to_string(), StoredSession { conversation_id: conversation_id.map(String::from), - prev_line_count, - prev_last_line_hash, + last_step_idx, }, ); let tmp = self.state_file.with_extension("tmp"); @@ -150,7 +142,7 @@ impl Adapter { .filter_map(|e| e.ok()) .filter_map(|e| { let path = e.path(); - if path.extension().map(|x| x == "pb").unwrap_or(false) { + if path.extension().map(|x| x == "db").unwrap_or(false) { path.file_stem().map(|s| s.to_string_lossy().to_string()) } else { None @@ -175,51 +167,107 @@ impl Adapter { Some(created.remove(0).clone()) } - fn line_hash(line: &str) -> u64 { - let mut hasher = DefaultHasher::new(); - line.hash(&mut hasher); - hasher.finish() + /// Extract text from a step_payload protobuf: top-level field 20 (sub-message) → field 1 (string). + fn extract_text_from_step_payload(blob: &[u8]) -> Option { + let field_20 = Self::get_proto_field(blob, 20)?; + let field_1 = Self::get_proto_field(&field_20, 1)?; + String::from_utf8(field_1).ok() } - fn extract_delta(full_text: &str, prev_line_count: usize, prev_last_line_hash: u64, conversation_bound: bool) -> (String, usize, u64) { - let lines: Vec<&str> = full_text.lines().collect(); - let total = lines.len(); - let last_hash = lines.last().map(|l| Self::line_hash(l)).unwrap_or(0); - if !conversation_bound || prev_line_count == 0 { - return (full_text.to_string(), total, last_hash); + /// Extract the first length-delimited field with the given number from a protobuf blob. + fn get_proto_field(blob: &[u8], target: u64) -> Option> { + let mut i = 0; + while i < blob.len() { + let (tag, consumed) = Self::read_varint(&blob[i..])?; + i += consumed; + let field_number = tag >> 3; + let wire_type = tag & 0x7; + match wire_type { + 0 => { let (_, c) = Self::read_varint(&blob[i..])?; i += c; } + 2 => { + let (len, c) = Self::read_varint(&blob[i..])?; + i += c; + let len = len as usize; + if i + len > blob.len() { return None; } + if field_number == target { + return Some(blob[i..i + len].to_vec()); + } + i += len; + } + 5 => { i += 4; } + 1 => { i += 8; } + _ => return None, + } } - if total < prev_line_count { - eprintln!( - "[agy-acp] WARN: agy stdout has fewer lines than expected ({total} < {prev_line_count}); \ - sending full output and resetting delta baseline" - ); - return (full_text.to_string(), total, last_hash); + None + } + + /// Read a protobuf varint, returning (value, bytes_consumed). + fn read_varint(buf: &[u8]) -> Option<(u64, usize)> { + let mut result: u64 = 0; + let mut shift = 0; + for (i, &byte) in buf.iter().enumerate() { + if shift >= 70 { + return None; + } + result |= ((byte & 0x7F) as u64) << shift; + shift += 7; + if byte & 0x80 == 0 { + return Some((result, i + 1)); + } } - // Verify the boundary line hash to detect same-length rewrites - if total == prev_line_count { - if prev_last_line_hash != 0 && last_hash != prev_last_line_hash { - eprintln!( - "[agy-acp] WARN: agy stdout has same line count but different content; \ - sending full output" - ); - return (full_text.to_string(), total, last_hash); + None + } + + /// Read the latest response from the SQLite conversation DB. + /// Returns (response_text, max_step_idx) or None if reading fails. + fn read_response_from_db(&self, conversation_id: &str, after_step_idx: i64) -> Option<(String, i64)> { + let db_path = self.conversations_dir.join(format!("{}.db", conversation_id)); + let conn = Connection::open_with_flags( + &db_path, + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX, + ).ok()?; + + // Verify steps table exists + let table_exists: bool = conn.query_row( + "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='steps'", + [], + |row| row.get(0), + ).unwrap_or(false); + if !table_exists { + eprintln!("[agy-acp] WARN: steps table not found in {}.db — schema changed?", conversation_id); + return None; + } + + let mut stmt = conn.prepare( + "SELECT idx, step_payload FROM steps WHERE idx > ?1 AND step_type = 15 ORDER BY idx" + ).ok()?; + let rows: Vec<(i64, Vec)> = stmt.query_map([after_step_idx], |row| { + Ok((row.get(0)?, row.get(1)?)) + }).ok()?.filter_map(|r| r.ok()).collect(); + + let mut max_idx = after_step_idx; + let mut response_parts: Vec = Vec::new(); + for (idx, payload) in &rows { + max_idx = max_idx.max(*idx); + if let Some(text) = Self::extract_text_from_step_payload(payload) { + if !text.is_empty() { + response_parts.push(text); + } } - // Same count, same hash — genuinely no new content - return (String::new(), total, last_hash); } - // Verify that the line at prev boundary still matches - if prev_line_count > 0 && prev_last_line_hash != 0 { - let boundary_hash = Self::line_hash(lines[prev_line_count - 1]); - if boundary_hash != prev_last_line_hash { + if response_parts.is_empty() { + if !rows.is_empty() { + let payload_sizes: Vec = rows.iter().map(|(_, p)| p.len()).collect(); eprintln!( - "[agy-acp] WARN: agy stdout content changed at boundary; \ - sending full output and resetting delta baseline" + "[agy-acp] WARN: {} new steps found (payload sizes: {:?}) but none had extractable text \ + (field 20.1 missing — schema change?)", + rows.len(), payload_sizes ); - return (full_text.to_string(), total, last_hash); } + return None; } - let delta = lines[prev_line_count..].join("\n"); - (delta, total, last_hash) + Some((response_parts.join("\n"), max_idx)) } fn evict_if_needed(&mut self) { @@ -232,10 +280,9 @@ impl Adapter { } fn restore_session_state(&mut self, session_id: &str) -> bool { - let Some((conversation_id, prev_line_count, prev_last_line_hash)) = self.restore_session(session_id) else { + let Some((conversation_id, last_step_idx)) = self.restore_session(session_id) else { return false; }; - // Evict only after confirming the restore target exists if !self.sessions.contains_key(session_id) { self.evict_if_needed(); } @@ -243,8 +290,7 @@ impl Adapter { session_id.to_string(), Session { conversation_id: Some(conversation_id), - prev_line_count, - prev_last_line_hash, + last_step_idx, }, ); true @@ -266,13 +312,11 @@ impl Adapter { fn handle_session_new(&mut self, id: u64) -> JsonRpcResponse { let session_id = Uuid::new_v4().to_string(); self.evict_if_needed(); - let conversation_id = None; self.sessions.insert( session_id.clone(), Session { - conversation_id, - prev_line_count: 0, - prev_last_line_hash: 0, + conversation_id: None, + last_step_idx: -1, }, ); JsonRpcResponse { @@ -382,7 +426,6 @@ impl Adapter { match result { Ok(output) => { - // Log stderr if non-empty let stderr_text = String::from_utf8_lossy(&output.stderr); if !stderr_text.is_empty() { eprintln!("[agy-acp] agy stderr: {}", stderr_text.trim_end()); @@ -409,73 +452,80 @@ impl Adapter { let full_text = String::from_utf8_lossy(&output.stdout).to_string(); - let prev_line_count = self - .sessions - .get(session_id) - .map(|s| s.prev_line_count) - .unwrap_or(0); - let prev_last_line_hash = self - .sessions - .get(session_id) - .map(|s| s.prev_last_line_hash) - .unwrap_or(0); - let conversation_bound = self - .sessions - .get(session_id) - .map(|s| s.conversation_id.is_some()) - .unwrap_or(false); - let (new_text, total_lines, last_hash) = Self::extract_delta(&full_text, prev_line_count, prev_last_line_hash, conversation_bound); - // Bind conversation from snapshot diff let conv_id = snapshot .as_ref() .and_then(|before| self.new_conversation_id(before)); - let mut should_persist = false; if let Some(session) = self.sessions.get_mut(session_id) { - let newly_bound = session.conversation_id.is_none() && conv_id.is_some(); if session.conversation_id.is_none() { session.conversation_id = conv_id.clone(); } + } + + let bound_conv_id = self.sessions.get(session_id).and_then(|s| s.conversation_id.clone()); + let last_step_idx = self.sessions.get(session_id).map(|s| s.last_step_idx).unwrap_or(-1); + + // Read response delta from SQLite + let (new_text, new_step_idx) = if let Some(cid) = &bound_conv_id { + match self.read_response_from_db(cid, last_step_idx) { + Some((text, idx)) => { + eprintln!("[agy-acp] delta from SQLite (steps {} → {})", last_step_idx, idx); + (Some(text), idx) + } + None => { + eprintln!("[agy-acp] WARN: SQLite read returned no new text (field 20.1 missing?)"); + (None, last_step_idx) + } + } + } else { + eprintln!("[agy-acp] WARN: could not bind conversation ID; single-turn mode"); + (Some(full_text.clone()), -1i64) + }; + + // Persist session state + if let Some(session) = self.sessions.get_mut(session_id) { if session.conversation_id.is_some() { - session.prev_line_count = total_lines; - session.prev_last_line_hash = last_hash; - should_persist = true; - } else { - session.prev_line_count = 0; - session.prev_last_line_hash = 0; - eprintln!( - "[agy-acp] WARN: could not bind conversation ID; \ - running in single-turn mode" - ); + session.last_step_idx = new_step_idx; } - let _ = newly_bound; // persist regardless — line count changed } - if should_persist { - let cid = self.sessions.get(session_id).and_then(|s| s.conversation_id.clone()); - self.persist_session(session_id, cid.as_deref(), total_lines, last_hash); + if bound_conv_id.is_some() { + self.persist_session(session_id, bound_conv_id.as_deref(), new_step_idx); } - let notification = serde_json::to_string(&JsonRpcNotification { - jsonrpc: "2.0", - method: "session/update".to_string(), - params: json!({ - "sessionId": session_id, - "update": { - "sessionUpdate": "agent_message_chunk", - "content": { "type": "text", "text": new_text }, - }, - }), - }) - .unwrap(); - output_lines.push(notification); - let resp = JsonRpcResponse { - jsonrpc: "2.0", - id, - result: Some(json!({ "stopReason": "end_turn" })), - error: None, - }; - output_lines.push(serde_json::to_string(&resp).unwrap()); + match new_text { + Some(text) => { + let notification = serde_json::to_string(&JsonRpcNotification { + jsonrpc: "2.0", + method: "session/update".to_string(), + params: json!({ + "sessionId": session_id, + "update": { + "sessionUpdate": "agent_message_chunk", + "content": { "type": "text", "text": text }, + }, + }), + }) + .unwrap(); + output_lines.push(notification); + let resp = JsonRpcResponse { + jsonrpc: "2.0", + id, + result: Some(json!({ "stopReason": "end_turn" })), + error: None, + }; + output_lines.push(serde_json::to_string(&resp).unwrap()); + } + None => { + let resp = JsonRpcResponse { + jsonrpc: "2.0", + id, + result: None, + error: Some(json!({"code":-32001,"message":"agy responded but response extraction failed — possible schema change in conversation DB (field 20.1)"})), + }; + output_lines.push(serde_json::to_string(&resp).unwrap()); + } + } } Err(e) => { let resp = JsonRpcResponse { @@ -573,57 +623,53 @@ mod tests { use super::*; #[test] - fn test_extract_delta_returns_full_text_when_unbound() { - let (result, count, _hash) = Adapter::extract_delta("old\nnew", 1, 0, false); - assert_eq!(result, "old\nnew"); - assert_eq!(count, 2); - } - - #[test] - fn test_extract_delta_skips_previous_lines_when_bound() { - let (result, count, _hash) = - Adapter::extract_delta("first response\nsecond response", 1, 0, true); - assert_eq!(result, "second response"); - assert_eq!(count, 2); + fn test_extract_text_from_step_payload_field20_field1() { + // field 20 (tag 0xA2 0x01), containing sub-message with field 1 = "hello" + let mut inner = Vec::new(); + inner.push(0x0A); inner.push(0x05); // field 1, LEN, 5 bytes + inner.extend_from_slice(b"hello"); + + let mut blob = Vec::new(); + blob.push(0x08); blob.push(0x0F); // field 1 varint = 15 + // field 20, wire type 2: tag = (20 << 3) | 2 = 0xA2, needs varint encoding: 0xA2 0x01 + blob.push(0xA2); blob.push(0x01); + blob.push(inner.len() as u8); + blob.extend_from_slice(&inner); + assert_eq!(Adapter::extract_text_from_step_payload(&blob), Some("hello".to_string())); } #[test] - fn test_extract_delta_returns_full_when_fewer_lines_than_expected() { - let (result, count, _hash) = Adapter::extract_delta("short", 5, 0, true); - assert_eq!(result, "short"); - assert_eq!(count, 1); + fn test_extract_text_returns_none_without_field20() { + // Only field 1 (varint) — no field 20 + let blob = vec![0x08, 0x03]; + assert_eq!(Adapter::extract_text_from_step_payload(&blob), None); } #[test] - fn test_extract_delta_preserves_indentation() { - let (result, count, _hash) = Adapter::extract_delta("hello\n indented code", 1, 0, true); - assert_eq!(result, " indented code"); - assert_eq!(count, 2); - } - - #[test] - fn test_extract_delta_detects_same_length_rewrite() { - let original = "line one\nline two"; - let (_result, _count, hash) = Adapter::extract_delta(original, 0, 0, false); - // Now simulate same line count but different content - let rewritten = "line one\nline CHANGED"; - let (result, count, _new_hash) = Adapter::extract_delta(rewritten, 2, hash, true); - // Should detect mismatch and send full output - assert_eq!(result, rewritten); - assert_eq!(count, 2); + fn test_extract_text_multiline() { + let text = b"Safe memory rules\nCompiler points out the flaws\nFast and fearless code"; + let mut inner = Vec::new(); + inner.push(0x0A); // field 1, LEN + inner.push(text.len() as u8); + inner.extend_from_slice(text); + + let mut blob = Vec::new(); + blob.push(0x08); blob.push(0x01); // field 1 varint + // field 20 + blob.push(0xA2); blob.push(0x01); + blob.push(inner.len() as u8); + blob.extend_from_slice(&inner); + assert_eq!( + Adapter::extract_text_from_step_payload(&blob), + Some("Safe memory rules\nCompiler points out the flaws\nFast and fearless code".to_string()) + ); } #[test] - fn test_extract_delta_boundary_hash_mismatch_sends_full() { - let original = "aaa\nbbb\nccc"; - // Simulate: prev_line_count=2, hash of line "bbb" (the boundary) - let boundary_hash = Adapter::line_hash("bbb"); - // Now content changed at boundary - let modified = "aaa\nXXX\nccc\nnew line"; - let (result, count, _hash) = Adapter::extract_delta(modified, 2, boundary_hash, true); - // Should detect boundary mismatch and send full - assert_eq!(result, modified); - assert_eq!(count, 4); + fn test_read_varint() { + assert_eq!(Adapter::read_varint(&[0x05]), Some((5, 1))); + assert_eq!(Adapter::read_varint(&[0xAC, 0x02]), Some((300, 2))); + assert_eq!(Adapter::read_varint(&[]), None); } #[test] @@ -642,7 +688,7 @@ mod tests { } #[test] - #[ignore] // filesystem I/O — run with CHI_INTEG=1 + #[ignore] // filesystem I/O fn test_session_load_restores_persisted_session() { let root = std::env::temp_dir().join(format!("agy-acp-load-{}", Uuid::new_v4())); let _ = fs::create_dir_all(&root); @@ -653,37 +699,24 @@ mod tests { conversations_dir: root.join("conversations"), state_file: root.join("sessions.json"), }; - adapter.persist_session("sess-1", Some("conv-abc"), 10, 12345); + adapter.persist_session("sess-1", Some("conv-abc"), 5); let response = adapter.handle_session_load(7, &json!({"sessionId": "sess-1"})); assert!(response.error.is_none()); assert_eq!( - adapter - .sessions - .get("sess-1") - .and_then(|s| s.conversation_id.as_deref()), + adapter.sessions.get("sess-1").and_then(|s| s.conversation_id.as_deref()), Some("conv-abc") ); assert_eq!( - adapter - .sessions - .get("sess-1") - .map(|s| s.prev_line_count), - Some(10) - ); - assert_eq!( - adapter - .sessions - .get("sess-1") - .map(|s| s.prev_last_line_hash), - Some(12345) + adapter.sessions.get("sess-1").map(|s| s.last_step_idx), + Some(5) ); let _ = fs::remove_dir_all(root); } #[test] - #[ignore] // filesystem I/O — run with CHI_INTEG=1 + #[ignore] // filesystem I/O fn test_session_load_rejects_unknown_session() { let root = std::env::temp_dir().join(format!("agy-acp-missing-{}", Uuid::new_v4())); let _ = fs::create_dir_all(&root); @@ -698,11 +731,7 @@ mod tests { let response = adapter.handle_session_load(9, &json!({"sessionId": "missing"})); assert!(response.result.is_none()); assert_eq!( - response - .error - .as_ref() - .and_then(|e| e.get("message")) - .and_then(|m| m.as_str()), + response.error.as_ref().and_then(|e| e.get("message")).and_then(|m| m.as_str()), Some("unknown sessionId: missing") ); @@ -710,11 +739,12 @@ mod tests { } #[test] - #[ignore] // filesystem I/O — run with CHI_INTEG=1 - fn test_new_conversation_id_returns_none_when_multiple_files() { - let root = std::env::temp_dir().join(format!("agy-acp-multi-{}", Uuid::new_v4())); + #[ignore] // filesystem I/O + fn test_snapshot_detects_db_conversations() { + let root = std::env::temp_dir().join(format!("agy-acp-db-{}", Uuid::new_v4())); let conv_dir = root.join("conversations"); fs::create_dir_all(&conv_dir).unwrap(); + fs::write(conv_dir.join("existing.db"), b"old").unwrap(); let adapter = Adapter { sessions: HashMap::new(), @@ -724,20 +754,26 @@ mod tests { }; let before = adapter.conversation_snapshot(); - fs::write(conv_dir.join("a.pb"), b"").unwrap(); - fs::write(conv_dir.join("b.pb"), b"").unwrap(); + assert!(before.contains("existing")); - assert_eq!(adapter.new_conversation_id(&before), None); + fs::write(conv_dir.join("new-conv.db"), b"new").unwrap(); + // WAL sidecar files should not be picked up + fs::write(conv_dir.join("new-conv.db-wal"), b"wal").unwrap(); + fs::write(conv_dir.join("new-conv.db-shm"), b"shm").unwrap(); + + assert_eq!( + adapter.new_conversation_id(&before), + Some("new-conv".to_string()) + ); let _ = fs::remove_dir_all(root); } #[test] - #[ignore] // filesystem I/O — run with CHI_INTEG=1 - fn test_snapshot_diff_binds_single_new_conversation() { - let root = std::env::temp_dir().join(format!("agy-acp-snap-{}", Uuid::new_v4())); + #[ignore] // filesystem I/O + fn test_snapshot_ignores_multiple_new_files() { + let root = std::env::temp_dir().join(format!("agy-acp-multi-{}", Uuid::new_v4())); let conv_dir = root.join("conversations"); fs::create_dir_all(&conv_dir).unwrap(); - fs::write(conv_dir.join("existing.pb"), b"old").unwrap(); let adapter = Adapter { sessions: HashMap::new(), @@ -747,18 +783,16 @@ mod tests { }; let before = adapter.conversation_snapshot(); - fs::write(conv_dir.join("new-conv.pb"), b"new").unwrap(); + fs::write(conv_dir.join("a.db"), b"").unwrap(); + fs::write(conv_dir.join("b.db"), b"").unwrap(); - assert_eq!( - adapter.new_conversation_id(&before), - Some("new-conv".to_string()) - ); + assert_eq!(adapter.new_conversation_id(&before), None); let _ = fs::remove_dir_all(root); } #[test] - #[ignore] // filesystem I/O — run with CHI_INTEG=1 - fn test_persist_and_restore_session_binding() { + #[ignore] // filesystem I/O + fn test_persist_and_restore_session() { let root = std::env::temp_dir().join(format!("agy-acp-state-{}", Uuid::new_v4())); let _ = fs::create_dir_all(&root); @@ -769,13 +803,455 @@ mod tests { state_file: root.join("sessions.json"), }; - adapter.persist_session("sess-1", Some("conv-abc"), 7, 99999); + adapter.persist_session("sess-1", Some("conv-abc"), 7); let restored = adapter.restore_session("sess-1"); - assert_eq!(restored, Some(("conv-abc".to_string(), 7, 99999))); + assert_eq!(restored, Some(("conv-abc".to_string(), 7))); let missing = adapter.restore_session("sess-unknown"); assert_eq!(missing, None); let _ = fs::remove_dir_all(root); } + + #[test] + #[ignore] // filesystem I/O — requires real SQLite DB + fn test_read_response_from_db() { + let root = std::env::temp_dir().join(format!("agy-acp-sqlite-{}", Uuid::new_v4())); + let conv_dir = root.join("conversations"); + fs::create_dir_all(&conv_dir).unwrap(); + + // Create a test SQLite DB with steps table + let db_path = conv_dir.join("test-conv.db"); + let conn = Connection::open(&db_path).unwrap(); + conn.execute_batch( + "CREATE TABLE steps ( + idx INTEGER PRIMARY KEY, + step_type INTEGER NOT NULL DEFAULT 0, + status INTEGER NOT NULL DEFAULT 0, + has_subtrajectory NUMERIC NOT NULL DEFAULT 0, + metadata BLOB, + error_details BLOB, + permissions BLOB, + task_details BLOB, + render_info BLOB, + step_payload BLOB, + step_format INTEGER NOT NULL DEFAULT 0 + )" + ).unwrap(); + + // Insert a step_type=15 step with field 20 → field 1 containing "hello world" + let mut inner = Vec::new(); + inner.push(0x0A); inner.push(11); // field 1, LEN, 11 bytes + inner.extend_from_slice(b"hello world"); + let mut payload = Vec::new(); + payload.push(0x08); payload.push(0x0F); // field 1 varint = 15 + payload.push(0xA2); payload.push(0x01); // field 20, LEN + payload.push(inner.len() as u8); + payload.extend_from_slice(&inner); + + conn.execute( + "INSERT INTO steps (idx, step_type, step_payload) VALUES (?1, 15, ?2)", + rusqlite::params![1i64, payload], + ).unwrap(); + + // Insert a non-response step (step_type=14) — should be ignored + conn.execute( + "INSERT INTO steps (idx, step_type, step_payload) VALUES (?1, 14, ?2)", + rusqlite::params![2i64, vec![0x08u8, 0x0E]], + ).unwrap(); + drop(conn); + + let adapter = Adapter { + sessions: HashMap::new(), + working_dir: root.to_string_lossy().to_string(), + conversations_dir: conv_dir, + state_file: root.join("sessions.json"), + }; + + let result = adapter.read_response_from_db("test-conv", -1); + assert_eq!(result, Some(("hello world".to_string(), 1))); + + // Reading after idx 1 should return None (no new steps) + let result = adapter.read_response_from_db("test-conv", 1); + assert_eq!(result, None); + + let _ = fs::remove_dir_all(root); + } + + /// Check auth is available: either GEMINI_API_KEY env var or local keyring. + /// Returns true if auth is ready, false to skip the test. + fn prepare_auth() -> bool { + if std::env::var("GEMINI_API_KEY").map(|v| !v.is_empty()).unwrap_or(false) { + eprintln!("[e2e] Using GEMINI_API_KEY"); + return true; + } + let home = std::env::var("HOME").unwrap_or_default(); + let settings = format!("{}/.gemini/antigravity-cli/settings.json", home); + if std::path::Path::new(&settings).exists() { + eprintln!("[e2e] Using local auth (keyring)"); + return true; + } + eprintln!("SKIP: No GEMINI_API_KEY and no local auth found"); + false + } + + /// E2E test: spawns agy-acp, sends initialize → session/new → session/prompt, + /// and verifies the response contains expected text from real agy v1.0.4. + /// Requires `agy` in PATH and auth (via local or AGY_AUTH_URL). Run with: cargo test e2e -- --ignored + #[test] + #[ignore] + fn test_e2e_agy_acp_full_round_trip() { + use std::io::{BufRead, BufReader, Write}; + use std::process::{Command, Stdio}; + use std::time::Duration; + + if !prepare_auth() { + return; + } + + // Check agy is available + let agy_check = Command::new("agy").arg("--help").output(); + if agy_check.is_err() || !agy_check.unwrap().status.success() { + eprintln!("SKIP: agy not found in PATH"); + return; + } + + let binary = std::env::current_dir().unwrap().join("target/release/agy-acp"); + if !binary.exists() { + panic!("Run `cargo build --release` first"); + } + + let mut child = Command::new(&binary) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("failed to spawn agy-acp"); + + let mut stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + let mut reader = BufReader::new(stdout); + + // Helper to send a line and read one response line + let mut send_and_recv = |msg: &str| -> String { + writeln!(stdin, "{}", msg).unwrap(); + stdin.flush().unwrap(); + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + line + }; + + // 1. Initialize + let resp = send_and_recv(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"clientName":"e2e","clientVersion":"0.1"}}"#); + let init: Value = serde_json::from_str(&resp).unwrap(); + assert_eq!(init["result"]["protocolVersion"], 1); + + // 2. Session new + let resp = send_and_recv(r#"{"jsonrpc":"2.0","id":2,"method":"session/new","params":{}}"#); + let session: Value = serde_json::from_str(&resp).unwrap(); + let session_id = session["result"]["sessionId"].as_str().unwrap(); + assert!(!session_id.is_empty()); + + // 3. Send prompt — ask agy to reply with a known word + let prompt_msg = format!( + r#"{{"jsonrpc":"2.0","id":3,"method":"session/prompt","params":{{"sessionId":"{}","prompt":[{{"type":"text","text":"Reply with exactly one word: PONG"}}]}}}}"#, + session_id + ); + writeln!(stdin, "{}", prompt_msg).unwrap(); + stdin.flush().unwrap(); + + // Read lines until we get id:3 response (there may be a notification first) + let deadline = std::time::Instant::now() + Duration::from_secs(120); + let mut got_notification = false; + let mut response_text = String::new(); + loop { + if std::time::Instant::now() > deadline { + panic!("Timed out waiting for agy-acp response"); + } + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + if line.is_empty() { + std::thread::sleep(Duration::from_millis(100)); + continue; + } + let msg: Value = serde_json::from_str(line.trim()).unwrap(); + if msg.get("method") == Some(&json!("session/update")) { + got_notification = true; + response_text = msg["params"]["update"]["content"]["text"] + .as_str() + .unwrap_or("") + .to_string(); + } + if msg.get("id") == Some(&json!(3)) { + assert!(msg["error"].is_null(), "Got error: {}", msg["error"]); + assert_eq!(msg["result"]["stopReason"], "end_turn"); + break; + } + } + + drop(stdin); + let _ = child.wait(); + + assert!(got_notification, "Expected session/update notification"); + let lower = response_text.to_lowercase(); + assert!( + lower.contains("pong"), + "Expected 'PONG' in response, got: '{}'", + response_text + ); + } + + /// Helper: spawn agy-acp, return (stdin, reader, child) + fn spawn_agy_acp() -> Option<(std::process::ChildStdin, std::io::BufReader, std::process::Child)> { + use std::io::BufReader; + use std::process::{Command, Stdio}; + + if !prepare_auth() { return None; } + let agy_check = Command::new("agy").arg("--help").output(); + if agy_check.is_err() || !agy_check.unwrap().status.success() { + eprintln!("SKIP: agy not found in PATH"); + return None; + } + let binary = std::env::current_dir().unwrap().join("target/release/agy-acp"); + if !binary.exists() { panic!("Run `cargo build --release` first"); } + + let mut child = Command::new(&binary) + .stdin(Stdio::piped()).stdout(Stdio::piped()).stderr(Stdio::piped()) + .spawn().expect("failed to spawn agy-acp"); + let stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + Some((stdin, BufReader::new(stdout), child)) + } + + /// Helper: send JSON-RPC and read one response line + fn send_recv(stdin: &mut std::process::ChildStdin, reader: &mut std::io::BufReader, msg: &str) -> String { + use std::io::{BufRead, Write}; + writeln!(stdin, "{}", msg).unwrap(); + stdin.flush().unwrap(); + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + line + } + + /// Helper: send a prompt and wait for the response (notification + final reply) + fn send_prompt_wait(stdin: &mut std::process::ChildStdin, reader: &mut std::io::BufReader, id: u64, session_id: &str, text: &str) -> (Option, Value) { + use std::io::{BufRead, Write}; + use std::time::Duration; + + let msg = format!( + r#"{{"jsonrpc":"2.0","id":{},"method":"session/prompt","params":{{"sessionId":"{}","prompt":[{{"type":"text","text":"{}"}}]}}}}"#, + id, session_id, text + ); + writeln!(stdin, "{}", msg).unwrap(); + stdin.flush().unwrap(); + + let deadline = std::time::Instant::now() + Duration::from_secs(120); + let mut notification_text: Option = None; + loop { + if std::time::Instant::now() > deadline { panic!("Timed out"); } + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + if line.is_empty() { std::thread::sleep(Duration::from_millis(100)); continue; } + let msg: Value = serde_json::from_str(line.trim()).unwrap(); + if msg.get("method") == Some(&json!("session/update")) { + notification_text = msg["params"]["update"]["content"]["text"].as_str().map(String::from); + } + if msg.get("id") == Some(&json!(id)) { + return (notification_text, msg); + } + } + } + + /// E2E: multi-turn — second prompt reuses the same conversation via --conversation flag + #[test] + #[ignore] + fn test_e2e_multi_turn() { + let Some((mut stdin, mut reader, mut child)) = spawn_agy_acp() else { return }; + + // Initialize + send_recv(&mut stdin, &mut reader, r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"clientName":"e2e","clientVersion":"0.1"}}"#); + + // Session new + let resp = send_recv(&mut stdin, &mut reader, r#"{"jsonrpc":"2.0","id":2,"method":"session/new","params":{}}"#); + let session_id = serde_json::from_str::(&resp).unwrap()["result"]["sessionId"].as_str().unwrap().to_string(); + + // First prompt: set a context + let (text1, resp1) = send_prompt_wait(&mut stdin, &mut reader, 3, &session_id, "Remember this word: BANANA. Reply OK."); + assert!(resp1["error"].is_null(), "Turn 1 error: {}", resp1["error"]); + assert!(text1.is_some()); + + // Second prompt: ask it to recall — this exercises --conversation reuse + let (text2, resp2) = send_prompt_wait(&mut stdin, &mut reader, 4, &session_id, "What word did I ask you to remember? Reply with just that word."); + assert!(resp2["error"].is_null(), "Turn 2 error: {}", resp2["error"]); + let reply = text2.unwrap_or_default().to_lowercase(); + assert!(reply.contains("banana"), "Expected 'BANANA' in multi-turn reply, got: '{}'", reply); + + drop(stdin); + let _ = child.wait(); + } + + /// E2E: session/load — evict session from memory, then restore from persisted state + #[test] + #[ignore] + fn test_e2e_session_load() { + let Some((mut stdin, mut reader, mut child)) = spawn_agy_acp() else { return }; + + send_recv(&mut stdin, &mut reader, r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"clientName":"e2e","clientVersion":"0.1"}}"#); + let resp = send_recv(&mut stdin, &mut reader, r#"{"jsonrpc":"2.0","id":2,"method":"session/new","params":{}}"#); + let session_id = serde_json::from_str::(&resp).unwrap()["result"]["sessionId"].as_str().unwrap().to_string(); + + // Send first prompt to bind conversation and persist state + let (_text, resp1) = send_prompt_wait(&mut stdin, &mut reader, 3, &session_id, "Reply with exactly: FIRST_TURN"); + assert!(resp1["error"].is_null(), "First turn error: {}", resp1["error"]); + + // Send second prompt on the same session — this confirms multi-turn works + // (session/load is already tested in unit tests; here we just verify the session + // can handle continued prompts after binding) + let (text2, resp2) = send_prompt_wait(&mut stdin, &mut reader, 4, &session_id, "Reply with exactly one word: SECOND"); + assert!(resp2["error"].is_null(), "Second turn error: {}", resp2["error"]); + assert!(text2.is_some(), "Expected response on continued session"); + + drop(stdin); + let _ = child.wait(); + } + + /// E2E: error path — invalid requests should return errors, not crash + #[test] + #[ignore] + fn test_e2e_error_paths() { + let Some((mut stdin, mut reader, mut child)) = spawn_agy_acp() else { return }; + + send_recv(&mut stdin, &mut reader, r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"clientName":"e2e","clientVersion":"0.1"}}"#); + + // Load a non-existent session + let resp = send_recv(&mut stdin, &mut reader, r#"{"jsonrpc":"2.0","id":2,"method":"session/load","params":{"sessionId":"non-existent-session"}}"#); + let val: Value = serde_json::from_str(&resp).unwrap(); + assert!(!val["error"].is_null(), "Expected error for unknown session"); + + // Unknown method + let resp = send_recv(&mut stdin, &mut reader, r#"{"jsonrpc":"2.0","id":3,"method":"bogus/method","params":{}}"#); + let val: Value = serde_json::from_str(&resp).unwrap(); + assert!(!val["error"].is_null(), "Expected error for unknown method"); + + drop(stdin); + let _ = child.wait(); + } + + #[test] + #[ignore] // filesystem I/O + fn test_read_response_multi_step_no_skip_no_duplicate() { + let root = std::env::temp_dir().join(format!("agy-acp-multi-step-{}", Uuid::new_v4())); + let conv_dir = root.join("conversations"); + fs::create_dir_all(&conv_dir).unwrap(); + + let db_path = conv_dir.join("multi.db"); + let conn = Connection::open(&db_path).unwrap(); + conn.execute_batch( + "CREATE TABLE steps ( + idx INTEGER PRIMARY KEY, + step_type INTEGER NOT NULL DEFAULT 0, + status INTEGER NOT NULL DEFAULT 0, + has_subtrajectory NUMERIC NOT NULL DEFAULT 0, + metadata BLOB, + error_details BLOB, + permissions BLOB, + task_details BLOB, + render_info BLOB, + step_payload BLOB, + step_format INTEGER NOT NULL DEFAULT 0 + )" + ).unwrap(); + + // Helper: build payload with field 20 (sub-msg) → field 1 (text) + fn make_payload(text: &str) -> Vec { + // Inner message: field 1, wire type 2 (LEN), + let text_bytes = text.as_bytes(); + let mut inner = vec![0x0A]; // tag: field 1, wire type 2 + let mut len = text_bytes.len(); + loop { + if len < 128 { inner.push(len as u8); break; } + inner.push((len as u8 & 0x7F) | 0x80); + len >>= 7; + } + inner.extend_from_slice(text_bytes); + + // Outer: field 20, wire type 2 (LEN), + // tag = (20 << 3) | 2 = 162 → varint [0xA2, 0x01] + let mut outer = vec![0xA2, 0x01]; + let mut ilen = inner.len(); + loop { + if ilen < 128 { outer.push(ilen as u8); break; } + outer.push((ilen as u8 & 0x7F) | 0x80); + ilen >>= 7; + } + outer.extend(inner); + outer + } + + // step_type 0 = user, step_type 15 = response + // Step 1: user prompt (step_type=0, no extractable text) + conn.execute("INSERT INTO steps (idx, step_type, step_payload) VALUES (1, 0, X'0801')", []).unwrap(); + // Step 2: bot response "hello" + conn.execute("INSERT INTO steps (idx, step_type, step_payload) VALUES (?1, 15, ?2)", + rusqlite::params![2i64, make_payload("hello")]).unwrap(); + // Step 3: user prompt + conn.execute("INSERT INTO steps (idx, step_type, step_payload) VALUES (3, 0, X'0802')", []).unwrap(); + // Step 4: bot response "world" + conn.execute("INSERT INTO steps (idx, step_type, step_payload) VALUES (?1, 15, ?2)", + rusqlite::params![4i64, make_payload("world")]).unwrap(); + // Step 5: bot response multi-line + conn.execute("INSERT INTO steps (idx, step_type, step_payload) VALUES (?1, 15, ?2)", + rusqlite::params![5i64, make_payload("line1\nline2\nline3")]).unwrap(); + drop(conn); + + let adapter = Adapter { + sessions: HashMap::new(), + working_dir: root.to_string_lossy().to_string(), + conversations_dir: conv_dir, + state_file: root.join("sessions.json"), + }; + + // From start: get all response steps + let result = adapter.read_response_from_db("multi", -1); + assert_eq!(result, Some(("hello\nworld\nline1\nline2\nline3".to_string(), 5))); + + // After step 2: skip "hello", get "world" + multi-line + let result = adapter.read_response_from_db("multi", 2); + assert_eq!(result, Some(("world\nline1\nline2\nline3".to_string(), 5))); + + // After step 4: only multi-line + let result = adapter.read_response_from_db("multi", 4); + assert_eq!(result, Some(("line1\nline2\nline3".to_string(), 5))); + + // After step 5: nothing new + let result = adapter.read_response_from_db("multi", 5); + assert_eq!(result, None); + + let _ = fs::remove_dir_all(root); + } + + #[test] + #[ignore] // filesystem I/O + fn test_read_response_missing_steps_table() { + let root = std::env::temp_dir().join(format!("agy-acp-noschema-{}", Uuid::new_v4())); + let conv_dir = root.join("conversations"); + fs::create_dir_all(&conv_dir).unwrap(); + + let db_path = conv_dir.join("empty.db"); + let conn = Connection::open(&db_path).unwrap(); + conn.execute_batch("CREATE TABLE other (id INTEGER)").unwrap(); + drop(conn); + + let adapter = Adapter { + sessions: HashMap::new(), + working_dir: root.to_string_lossy().to_string(), + conversations_dir: conv_dir, + state_file: root.join("sessions.json"), + }; + + let result = adapter.read_response_from_db("empty", -1); + assert_eq!(result, None); + + let _ = fs::remove_dir_all(root); + } }