feat: implement real-time cluster synchronization for Pipes via NATS KV watcher#83
feat: implement real-time cluster synchronization for Pipes via NATS KV watcher#83taitelee wants to merge 5 commits into
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces real-time synchronization for the Pipes store across WaveHouse clusters. By integrating a NATS Key-Value (KV) watcher, the system now ensures that any changes to named queries in the NATS KV store are immediately reflected in the local in-memory cache of all connected nodes, thereby eliminating previous synchronization delays and enhancing cluster consistency. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a cluster synchronization mechanism for pipes using NATS KV watchers, ensuring that all nodes in the cluster maintain a consistent state. The Watch method has been added to the pipes store to handle real-time updates and deletions. The review feedback identifies a performance bottleneck where JSON unmarshaling is performed while holding a write lock, which could block other operations. Additionally, the tests rely on time.Sleep for synchronization; it is recommended to use more deterministic approaches like require.Eventually to prevent flakiness and improve test execution speed.
| s.mu.Lock() | ||
| switch entry.Operation() { | ||
| case jetstream.KeyValuePut: | ||
| var q NamedQuery | ||
| if err := json.Unmarshal(entry.Value(), &q); err != nil { | ||
| s.logger.Error("failed to unmarshal pipe from kv watch", "key", entry.Key(), "error", err) | ||
| } else { | ||
| s.cached[entry.Key()] = &q | ||
| s.logger.Info("pipe updated via cluster sync", "name", entry.Key()) | ||
| } | ||
| case jetstream.KeyValueDelete, jetstream.KeyValuePurge: | ||
| delete(s.cached, entry.Key()) | ||
| s.logger.Info("pipe deleted via cluster sync", "name", entry.Key()) | ||
| } | ||
| s.mu.Unlock() |
There was a problem hiding this comment.
[SHOULD] The write lock is held during the entire switch block, including the json.Unmarshal operation. This blocks all readers (Get, List) and other writers while expensive CPU work (JSON parsing) is performed. It is better to unmarshal the data outside the lock and only acquire the lock for the map update.
switch entry.Operation() {
case jetstream.KeyValuePut:
var q NamedQuery
if err := json.Unmarshal(entry.Value(), &q); err != nil {
s.logger.Error("failed to unmarshal pipe from kv watch", "key", entry.Key(), "error", err)
continue
}
s.mu.Lock()
s.cached[entry.Key()] = &q
s.mu.Unlock()
s.logger.Info("pipe updated via cluster sync", "name", entry.Key())
case jetstream.KeyValueDelete, jetstream.KeyValuePurge:
s.mu.Lock()
delete(s.cached, entry.Key())
s.mu.Unlock()
s.logger.Info("pipe deleted via cluster sync", "name", entry.Key())
}| go store.Watch(ctx) | ||
|
|
||
| // Yield briefly to let the background goroutine start | ||
| time.Sleep(10 * time.Millisecond) |
There was a problem hiding this comment.
[SHOULD] Avoid using time.Sleep for synchronization in tests as it leads to flakiness and slow test suites. Per AGENTS.md §"Testing Conventions", prefer using require.Eventually to wait for background state changes.
References
- Testing conventions in AGENTS.md discourage the use of time.Sleep for synchronization, favoring deterministic primitives like require.Eventually.
| } | ||
|
|
||
| // Wait for the background goroutine to process the channel | ||
| time.Sleep(10 * time.Millisecond) |
There was a problem hiding this comment.
| op: jetstream.KeyValueDelete, | ||
| } | ||
|
|
||
| time.Sleep(10 * time.Millisecond) |
There was a problem hiding this comment.
|
Claude finished @taitelee's task in 2m 27s —— View job Code Review
The implementation mirrors the existing Correctness — channel closure causes spin-loop (MEDIUM)
case entry := <-watcher.Updates():
if entry == nil {
continue
}When the NATS connection drops, the JetStream client closes the Fix: use the two-value receive and bail on closure: case entry, ok := <-watcher.Updates():
if !ok {
s.logger.Error("pipe watcher channel closed, stopping")
return
}
if entry == nil {
continue
}If you want reconnect behavior rather than a dead watcher, the caller in Correctness — watcher exits silently on
|
| Area | Finding | Severity |
|---|---|---|
| Correctness | Channel closure → spin-loop in Watch |
MEDIUM |
| Performance | Gemini's lock finding is incorrect — no issue | NONE |
| Security | No new attack surface | NONE |
| Testing | Missing channel-closure test case | LOW |
| Docs | CHANGELOG.md entry required; AGENTS.md §13 stale | MEDIUM (per AGENTS.md hard rule) |
Iterate — the CHANGELOG entry is a hard requirement per AGENTS.md, and the channel-closure spin-loop is a real production hazard that should be fixed (and tested) before merge. Everything else is good.
|
Triage note as part of the pre-alpha priority pass. This PR predates the standalone-only refactor in #89. The "real-time cluster synchronization" framing assumes clustered mode, which was removed — the embedded NATS instance is per-binary, so there is no "cluster" to synchronize across. The underlying UX problem is still real though: today, pipes load from the Two paths:
Currently failing E2E + REVIEW_REQUIRED + DRAFT, so deferred from alpha either way. Recommend (1) post-alpha unless there's user demand sooner. Setting Priority to P3 on the board to reflect. @EricAndrechek / @taitelee — your call which path. |
## Summary Umbrella PR setting up shared Claude Code + AI agent infrastructure for the WaveHouse team. Two work streams: 1. **AI rules drift cleanup** — corrected stale references that AI tools (Claude Code, Gemini Code Assist, Copilot, CodeRabbit) were following blindly. 2. **Claude Code native tooling** — committed `.claude/` configuration and `.githooks/` so every teammate gets identical dev affordances out of the box, with agent-specific gating layered on top. The team just got Max 20x subscriptions across the board; this lands the team-wide config so everyone is on the same agentic dev experience by default. ## Scope ### 1. AI rules drift cleanup - **17 doc-path references corrected** across AGENTS.md + CONTRIBUTING.md to `docs/src/content/docs/*.md` (the actual Astro Starlight location, not the old flat layout). - **`.github/copilot-instructions.md` shrunk to a pointer** — was drifting on Go 1.25 (vs current 1.26.3) and 60% coverage (vs current 80% total / 70% unit per `.testcoverage.yml`). - **`.gemini/styleguide.md`** — stale `#67` / 60% claim fixed (issue closed, 70% restored); duplicated doc-sync bullet collapsed to defer to AGENTS.md (already authoritative). - **`.github/labeler.yml`** — dropped non-existent `cmd/wavehouse-{api,worker}/**` entries; fixed `tests/{compose.yaml,sdk/**}` → `tests/e2e/...`; added `cmd/wavehouse/**` to `area/infra`. - **`.github/prompts/pr-review.md`** — doc-sync list collapsed; vestigial "tenant" wording dropped (no tenant model in WaveHouse); hard-wrap reflowed (180 → 81 lines). - **AGENTS.md** — `cmd/*/main.go` (plural) → `cmd/wavehouse/main.go` (one binary); `tests/fixtures/` → `tests/e2e/fixtures/`; removed stale "update `triage.yml` area enumeration" step (workflow now discovers `area/*` labels dynamically); fixed internal-package count. - **CONTRIBUTING.md** — vestigial "tenant isolation" wording removed. - **TODO.md deleted** — audit summary below. ### 2. Claude Code native tooling - **`.claude/`** — shared configuration: `settings.json` (deny rules + worktree config + three hooks wired), `agents/pre-push-reviewer.md`, `hooks/agent-bash-gate.sh` (PreToolUse Bash gate), `hooks/review-marker.sh` (PostToolUse Agent marker writer), `hooks/gofumpt-on-save.sh` (auto-format), `skills/pr-review-locally/`, `skills/pr-sync-with-main/`, `commands/cover.md`. - **`.githooks/`** — universal team hooks installed by `make tools`: `pre-commit` runs `make verify`; `pre-push` requires `tmp/ci-passed-<HEAD-sha>` marker (written by `make ci`). - **`.config/wt.toml`** — worktrunk project hooks so parallel-agent worktrees install `.githooks/` correctly. - **AGENTS.md §"Agent PR Discipline"** — new section codifying the agent-only ruleset: - Drafts-only PR creation; human-only ready/approve/request-changes/reviewer-add transitions. - Bot reviewer re-triggers go through PR comments (`@coderabbitai review`, `@gemini-code-assist`, `@claude` / `/review`). - **Pre-push self-review mandatory** on PR branches: agent invokes `pre-push-reviewer` subagent in fresh context. `ship_it` requires zero findings at any severity — any `[MUST]` / `[SHOULD]` / `[MAY]` forces iterate; the orchestrator loops review → fix → review until clean. - **Honest-agent marker policy**: `--no-verify` regex-blocked + the obvious marker-write idioms denied at the permission layer (`Bash(touch tmp/ci-passed:*)`, `Write`/`Edit` on the canonical paths); everything else is a documented rule, not regex-enforced. Bash can write a file by a dozen paths and regex enforcement is a porous game of whack-a-mole. - **`docs/src/content/docs/claude-code.md`** — contributor-facing page documenting the four-layer model (universal git hooks → agent gate → ergonomic hooks → skills/agents/commands), quick setup, and discipline rules. - **CHANGELOG.md** — `[Unreleased]` entry covering all of the above. ## Out-of-tree GitHub changes that pair with the AI-rules cleanup Done via `gh` CLI as part of the same audit: - **Closed #46** (Graceful Shutdown) — verified shipped in `cmd/wavehouse/main.go:378-393` (SIGINT/SIGTERM → bounded shutCtx → ingestStream.Stop → srv.Shutdown → promSrv.Shutdown). - **Scope notes added to #44, #50, #94** with current-status / boundary info (ldflags shipped vs `/version` remaining; DLQ shipped vs retry remaining + scope boundary with #91; per-component logger source field as a #94 complement). - **Opened 4 new issues from orphan TODOs**: #143 (pprof), #144 (K8s `/healthz` + per-dep health), #145 (RequireRoles fail-closed), #146 (split `internal/api/` into focused subpackages). ## TODO.md audit (one-time, for record) | Bucket | Count | Disposition | |--------|-------|-------------| | Already shipped per closed issues (#11, #14, #16, #28, #40, #41, #42, #45) + current code | ~12 | Deleted from TODO | | Tracked as open issues (#32, #33, #34, #37, #39, #44, #48, #49, #50, #51, #94) | ~12 | Kept as issues, scope notes added where useful | | In-flight via open PRs (#83, #92, #119, #122, #125, #136, #137) | 4 | Untouched | | #46 Graceful Shutdown | 1 | Verified shipped, closed with comment | | Orphan items | 4 | Split into #143-146 | | Aspirational ("more tests", "update README") | 2 | Deleted — covered by AGENTS.md doc-sync rules | Projects #7 board + triage automation is now the single canonical backlog. ## Test plan - [x] `make ci` passes locally for each push (gated by `.githooks/pre-push`) - [x] CI green on the latest HEAD (8fbd7db) - [x] PR-title-lint accepts the title (`chore: claude code native improvements`) - [x] All `docs/src/content/docs/*.md` paths in AGENTS.md resolve to real files - [x] Labeler workflow auto-labels correctly per the updated paths - [x] `pre-push-reviewer` subagent loop reached `VERDICT: ship_it` with zero findings under the strict rubric before the final push (validated end-to-end across five iterations on this branch — each surfacing a real doc-sync / off-by-one / quote-strip issue and forcing a fix before the marker auto-wrote) - [x] `agent-bash-gate.sh` quote-strip generalization sanity-tested live: `echo "git push to deploy"` passes through; `git push --no-verify` and `git commit --no-verify` still block (`bash -n` clean, JSON wiring valid) - [ ] Human review ## Related issues - Closed during this work: #46 - Scope notes added: #44, #50, #94 - New follow-up issues created: #143, #144, #145, #146 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Claude Code integration: local pre-push reviewer with strict ship/iterate/block verdicts, push gating via CI/review markers, automatic review-marker creation, and a coverage-reporting command. * **Documentation** * Comprehensive Claude Code & agent docs, new skill guides for PR review/sync, updated README/CONTRIBUTING/CHANGELOG/styleguide, and site sidebar/page additions. * **Chores** * Added git and agent hooks, CI marker creation, worktrunk config, labeler tweaks, and simplified Copilot instructions. <!-- review_stack_entry_start --> [](https://app.coderabbit.ai/change-stack/Wave-RF/WaveHouse/pull/147?utm_source=github_walkthrough&utm_medium=github&utm_campaign=change_stack) <!-- review_stack_entry_end --> <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
This PR implements a reactive synchronization pattern for the Pipes store to ensure consistency across WaveHouse clusters. Previously, the Pipes store only loaded named queries from NATS KV during the initial boot sequence, leading to synchronization lag where configuration changes on one node were not reflected on others without a restart.
Test plan
Related Issues
Closes #33