Skip to content

feat: implement real-time cluster synchronization for Pipes via NATS KV watcher#83

Draft
taitelee wants to merge 5 commits into
mainfrom
pipes_watcher
Draft

feat: implement real-time cluster synchronization for Pipes via NATS KV watcher#83
taitelee wants to merge 5 commits into
mainfrom
pipes_watcher

Conversation

@taitelee

Copy link
Copy Markdown
Contributor

Summary

This PR implements a reactive synchronization pattern for the Pipes store to ensure consistency across WaveHouse clusters. Previously, the Pipes store only loaded named queries from NATS KV during the initial boot sequence, leading to synchronization lag where configuration changes on one node were not reflected on others without a restart.

  • KV Watcher Integration: Added a Watch method to the Pipes store using the WatchAll NATS JetStream pattern.
  • Reactive Cache Updates: The store now listens for Put and Delete operations on the WAVEHOUSE_PIPES bucket, dynamically updating the local in-memory cache in milliseconds.
  • Lifecycle Management: Integrated the watcher into the main application lifecycle via background goroutines in the API and Standalone binaries, ensuring graceful shutdown support through context cancellation.

Test plan

  • ...

Related Issues

Closes #33

@github-actions github-actions Bot added go Pull requests that update go code area/api HTTP handlers, routing, middleware area/pipes Named query pipes labels Apr 28, 2026
@gemini-code-assist

Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces real-time synchronization for the Pipes store across WaveHouse clusters. By integrating a NATS Key-Value (KV) watcher, the system now ensures that any changes to named queries in the NATS KV store are immediately reflected in the local in-memory cache of all connected nodes, thereby eliminating previous synchronization delays and enhancing cluster consistency.

Highlights

  • KV Watcher Integration: A new Watch method was added to the Pipes store, utilizing the NATS JetStream WatchAll pattern to monitor all key changes within the WAVEHOUSE_PIPES bucket for real-time updates.
  • Reactive Cache Updates: The Pipes store now actively listens for Put and Delete operations from the NATS KV store, enabling dynamic and near real-time updates to its local in-memory cache, ensuring consistency across the cluster.
  • Application Lifecycle Integration: The NATS KV watcher for Pipes has been integrated into the main application lifecycle of both the wavehouse-api and wavehouse standalone binaries, running as background goroutines with proper context-based graceful shutdown.
  • Watcher Test Coverage: New mock implementations for NATS KV watcher interfaces (mockWatcher, mockKVEntry, mockKV) were added, along with comprehensive tests to verify the real-time synchronization logic of the Store.Watch method, including handling Put and Delete events and nil KV configurations.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a cluster synchronization mechanism for pipes using NATS KV watchers, ensuring that all nodes in the cluster maintain a consistent state. The Watch method has been added to the pipes store to handle real-time updates and deletions. The review feedback identifies a performance bottleneck where JSON unmarshaling is performed while holding a write lock, which could block other operations. Additionally, the tests rely on time.Sleep for synchronization; it is recommended to use more deterministic approaches like require.Eventually to prevent flakiness and improve test execution speed.

Comment thread internal/pipes/pipes.go Outdated
Comment on lines +328 to +342
s.mu.Lock()
switch entry.Operation() {
case jetstream.KeyValuePut:
var q NamedQuery
if err := json.Unmarshal(entry.Value(), &q); err != nil {
s.logger.Error("failed to unmarshal pipe from kv watch", "key", entry.Key(), "error", err)
} else {
s.cached[entry.Key()] = &q
s.logger.Info("pipe updated via cluster sync", "name", entry.Key())
}
case jetstream.KeyValueDelete, jetstream.KeyValuePurge:
delete(s.cached, entry.Key())
s.logger.Info("pipe deleted via cluster sync", "name", entry.Key())
}
s.mu.Unlock()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

[SHOULD] The write lock is held during the entire switch block, including the json.Unmarshal operation. This blocks all readers (Get, List) and other writers while expensive CPU work (JSON parsing) is performed. It is better to unmarshal the data outside the lock and only acquire the lock for the map update.

            switch entry.Operation() {
            case jetstream.KeyValuePut:
                var q NamedQuery
                if err := json.Unmarshal(entry.Value(), &q); err != nil {
                    s.logger.Error("failed to unmarshal pipe from kv watch", "key", entry.Key(), "error", err)
                    continue
                }
                s.mu.Lock()
                s.cached[entry.Key()] = &q
                s.mu.Unlock()
                s.logger.Info("pipe updated via cluster sync", "name", entry.Key())
            case jetstream.KeyValueDelete, jetstream.KeyValuePurge:
                s.mu.Lock()
                delete(s.cached, entry.Key())
                s.mu.Unlock()
                s.logger.Info("pipe deleted via cluster sync", "name", entry.Key())
            }

Comment thread internal/pipes/pipes_test.go Outdated
go store.Watch(ctx)

// Yield briefly to let the background goroutine start
time.Sleep(10 * time.Millisecond)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

[SHOULD] Avoid using time.Sleep for synchronization in tests as it leads to flakiness and slow test suites. Per AGENTS.md §"Testing Conventions", prefer using require.Eventually to wait for background state changes.

References
  1. Testing conventions in AGENTS.md discourage the use of time.Sleep for synchronization, favoring deterministic primitives like require.Eventually.

Comment thread internal/pipes/pipes_test.go Outdated
}

// Wait for the background goroutine to process the channel
time.Sleep(10 * time.Millisecond)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

[SHOULD] Avoid using time.Sleep for synchronization in tests. Use require.Eventually to assert that the pipe has been added to the cache.

References
  1. Testing conventions in AGENTS.md discourage the use of time.Sleep for synchronization, favoring deterministic primitives like require.Eventually.

Comment thread internal/pipes/pipes_test.go Outdated
op: jetstream.KeyValueDelete,
}

time.Sleep(10 * time.Millisecond)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

[SHOULD] Avoid using time.Sleep for synchronization in tests. Use require.Eventually to assert that the pipe has been removed from the cache.

References
  1. Testing conventions in AGENTS.md discourage the use of time.Sleep for synchronization, favoring deterministic primitives like require.Eventually.

@claude

claude Bot commented Apr 30, 2026

Copy link
Copy Markdown

Claude finished @taitelee's task in 2m 27s —— View job


Code Review

  • Gather context (AGENTS.md, changed files)
  • Review correctness (concurrency, error handling, lifecycle)
  • Review security
  • Review performance
  • Review testing
  • Review doc-sync requirements

The implementation mirrors the existing policy.Store.Watch pattern faithfully and the overall approach is sound. A few things need addressing before merge.


Correctness — channel closure causes spin-loop (MEDIUM)

internal/pipes/pipes.go:327

case entry := <-watcher.Updates():
    if entry == nil {
        continue
    }

When the NATS connection drops, the JetStream client closes the Updates() channel. A closed Go channel returns the zero value (nil) immediately and forever. Because the code treats nil as "end-of-initial-values / skip", the goroutine will spin at 100% CPU consuming no memory, logging nothing, making no progress — indefinitely. The policy.Store.Watch has the same bug, but perpetuating it isn't the right call.

Fix: use the two-value receive and bail on closure:

case entry, ok := <-watcher.Updates():
    if !ok {
        s.logger.Error("pipe watcher channel closed, stopping")
        return
    }
    if entry == nil {
        continue
    }

If you want reconnect behavior rather than a dead watcher, the caller in cmd/ needs a retry loop — but at minimum the current code must not spin. Fix this →


Correctness — watcher exits silently on WatchAll error, no restart

internal/pipes/pipes.go:314

watcher, err := s.kv.WatchAll(ctx)
if err != nil {
    s.logger.Error("failed to start kv watcher for pipes", "error", err)
    return
}

If WatchAll fails on boot (transient NATS error), the goroutine exits and sync is permanently disabled for the lifetime of the process. Again, consistent with policy.Store.Watch, so not a blocker here, but worth noting — a follow-up issue tracking reconnect logic would help.


Performance — Gemini's lock finding is incorrect (no action needed)

Gemini flagged that JSON unmarshal happens under the write lock. This is not true — the unmarshal is performed before s.mu.Lock() is acquired. The lock only covers the map write. The code is correct as-is; Gemini misread the indentation. No change needed here.


Security — NONE

The Watch method is a read-only consumer of KV events generated through the existing authenticated Put path. No new attack surface introduced.


Testing — require.Eventually is correctly used ✓

Gemini's comments about time.Sleep appear to reference an older version of the PR. The current test file uses require.Eventually with a 10ms polling interval throughout TestStore_Watch_SyncsCluster. That aligns with AGENTS.md's testing conventions. No action needed.

One missing test case: channel closure. The spin-loop bug above is not caught by the test suite because there's no test that closes the updates channel and asserts the goroutine exits cleanly. A test like:

func TestStore_Watch_ChannelClose_Exits(t *testing.T) {
    t.Parallel()
    updatesCh := make(chan jetstream.KeyValueEntry)
    store := NewMemoryStore()
    store.kv = &mockKV{watcher: &mockWatcher{updates: updatesCh}}

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    done := make(chan struct{})
    go func() {
        defer close(done)
        store.Watch(ctx)
    }()

    close(updatesCh) // simulate NATS connection drop

    select {
    case <-done:
    case <-time.After(1 * time.Second):
        t.Fatal("Watch did not exit after channel closed")
    }
}

This test will also serve as the correctness guard for the channel-closure fix.


Documentation — CHANGELOG entry missing (required by AGENTS.md)

AGENTS.md §"Documentation & Consistency Sync" requires every notable change to get an entry under [Unreleased] in CHANGELOG.md. This PR adds a real-time synchronization feature that changes cluster behavior — it belongs in the Added section. There is no entry for it.

Additionally, AGENTS.md key design decision #13 describes pipes as "Stored in NATS KV with .sql file directory bootstrap" but item #11 (policy) already documents "cluster-wide sync via KV Watch." Decision #13 should be updated to match now that pipes has the same capability.

Fix this →%20Add%20a%20CHANGELOG.md%20entry%20under%20%5BUnreleased%5D%20Added%20section%20for%20the%20real-time%20cluster%20sync%20for%20pipes%20via%20NATS%20KV%20watcher%20(Watch%20method%20added%20to%20pipes.Store%2C%20integrated%20into%20cmd%2Fwavehouse%20and%20cmd%2Fwavehouse-api).%202)%20Update%20AGENTS.md%20key%20design%20decision%20%2313%20to%20mention%20%22cluster-wide%20sync%20via%20KV%20Watch%22%20matching%20the%20language%20used%20in%20decision%20%2311%20for%20policies.&repo=Wave-RF/WaveHouse)


Summary

Area Finding Severity
Correctness Channel closure → spin-loop in Watch MEDIUM
Performance Gemini's lock finding is incorrect — no issue NONE
Security No new attack surface NONE
Testing Missing channel-closure test case LOW
Docs CHANGELOG.md entry required; AGENTS.md §13 stale MEDIUM (per AGENTS.md hard rule)

Iterate — the CHANGELOG entry is a hard requirement per AGENTS.md, and the channel-closure spin-loop is a real production hazard that should be fixed (and tested) before merge. Everything else is good.

@EricAndrechek

Copy link
Copy Markdown
Member

Triage note as part of the pre-alpha priority pass.

This PR predates the standalone-only refactor in #89. The "real-time cluster synchronization" framing assumes clustered mode, which was removed — the embedded NATS instance is per-binary, so there is no "cluster" to synchronize across.

The underlying UX problem is still real though: today, pipes load from the WAVEHOUSE_PIPES KV bucket only at boot, so an admin who updates a pipe via the API (or by editing the bootstrap .sql directory) has to restart the binary to see the change reflected. KV Watcher would fix that in a single-instance world too.

Two paths:

  1. Rebase + reframe as feat(pipes): hot-reload pipe definitions via NATS KV Watcher. Same code shape, different framing — drop the "cluster" wording, refocus on the live-update use case. Closes KV Watcher Real Time Updates #33 (KV Watcher Real Time Updates) and is a real win.
  2. Close if no near-term need for live pipe updates.

Currently failing E2E + REVIEW_REQUIRED + DRAFT, so deferred from alpha either way. Recommend (1) post-alpha unless there's user demand sooner. Setting Priority to P3 on the board to reflect.

@EricAndrechek / @taitelee — your call which path.

EricAndrechek added a commit that referenced this pull request May 20, 2026
## Summary

Umbrella PR setting up shared Claude Code + AI agent infrastructure for
the WaveHouse team. Two work streams:

1. **AI rules drift cleanup** — corrected stale references that AI tools
(Claude Code, Gemini Code Assist, Copilot, CodeRabbit) were following
blindly.
2. **Claude Code native tooling** — committed `.claude/` configuration
and `.githooks/` so every teammate gets identical dev affordances out of
the box, with agent-specific gating layered on top.

The team just got Max 20x subscriptions across the board; this lands the
team-wide config so everyone is on the same agentic dev experience by
default.

## Scope

### 1. AI rules drift cleanup

- **17 doc-path references corrected** across AGENTS.md +
CONTRIBUTING.md to `docs/src/content/docs/*.md` (the actual Astro
Starlight location, not the old flat layout).
- **`.github/copilot-instructions.md` shrunk to a pointer** — was
drifting on Go 1.25 (vs current 1.26.3) and 60% coverage (vs current 80%
total / 70% unit per `.testcoverage.yml`).
- **`.gemini/styleguide.md`** — stale `#67` / 60% claim fixed (issue
closed, 70% restored); duplicated doc-sync bullet collapsed to defer to
AGENTS.md (already authoritative).
- **`.github/labeler.yml`** — dropped non-existent
`cmd/wavehouse-{api,worker}/**` entries; fixed
`tests/{compose.yaml,sdk/**}` → `tests/e2e/...`; added
`cmd/wavehouse/**` to `area/infra`.
- **`.github/prompts/pr-review.md`** — doc-sync list collapsed;
vestigial "tenant" wording dropped (no tenant model in WaveHouse);
hard-wrap reflowed (180 → 81 lines).
- **AGENTS.md** — `cmd/*/main.go` (plural) → `cmd/wavehouse/main.go`
(one binary); `tests/fixtures/` → `tests/e2e/fixtures/`; removed stale
"update `triage.yml` area enumeration" step (workflow now discovers
`area/*` labels dynamically); fixed internal-package count.
- **CONTRIBUTING.md** — vestigial "tenant isolation" wording removed.
- **TODO.md deleted** — audit summary below.

### 2. Claude Code native tooling

- **`.claude/`** — shared configuration: `settings.json` (deny rules +
worktree config + three hooks wired), `agents/pre-push-reviewer.md`,
`hooks/agent-bash-gate.sh` (PreToolUse Bash gate),
`hooks/review-marker.sh` (PostToolUse Agent marker writer),
`hooks/gofumpt-on-save.sh` (auto-format), `skills/pr-review-locally/`,
`skills/pr-sync-with-main/`, `commands/cover.md`.
- **`.githooks/`** — universal team hooks installed by `make tools`:
`pre-commit` runs `make verify`; `pre-push` requires
`tmp/ci-passed-<HEAD-sha>` marker (written by `make ci`).
- **`.config/wt.toml`** — worktrunk project hooks so parallel-agent
worktrees install `.githooks/` correctly.
- **AGENTS.md §"Agent PR Discipline"** — new section codifying the
agent-only ruleset:
- Drafts-only PR creation; human-only
ready/approve/request-changes/reviewer-add transitions.
- Bot reviewer re-triggers go through PR comments (`@coderabbitai
review`, `@gemini-code-assist`, `@claude` / `/review`).
- **Pre-push self-review mandatory** on PR branches: agent invokes
`pre-push-reviewer` subagent in fresh context. `ship_it` requires zero
findings at any severity — any `[MUST]` / `[SHOULD]` / `[MAY]` forces
iterate; the orchestrator loops review → fix → review until clean.
- **Honest-agent marker policy**: `--no-verify` regex-blocked + the
obvious marker-write idioms denied at the permission layer (`Bash(touch
tmp/ci-passed:*)`, `Write`/`Edit` on the canonical paths); everything
else is a documented rule, not regex-enforced. Bash can write a file by
a dozen paths and regex enforcement is a porous game of whack-a-mole.
- **`docs/src/content/docs/claude-code.md`** — contributor-facing page
documenting the four-layer model (universal git hooks → agent gate →
ergonomic hooks → skills/agents/commands), quick setup, and discipline
rules.
- **CHANGELOG.md** — `[Unreleased]` entry covering all of the above.

## Out-of-tree GitHub changes that pair with the AI-rules cleanup

Done via `gh` CLI as part of the same audit:

- **Closed #46** (Graceful Shutdown) — verified shipped in
`cmd/wavehouse/main.go:378-393` (SIGINT/SIGTERM → bounded shutCtx →
ingestStream.Stop → srv.Shutdown → promSrv.Shutdown).
- **Scope notes added to #44, #50, #94** with current-status / boundary
info (ldflags shipped vs `/version` remaining; DLQ shipped vs retry
remaining + scope boundary with #91; per-component logger source field
as a #94 complement).
- **Opened 4 new issues from orphan TODOs**: #143 (pprof), #144 (K8s
`/healthz` + per-dep health), #145 (RequireRoles fail-closed), #146
(split `internal/api/` into focused subpackages).

## TODO.md audit (one-time, for record)

| Bucket | Count | Disposition |
|--------|-------|-------------|
| Already shipped per closed issues (#11, #14, #16, #28, #40, #41, #42,
#45) + current code | ~12 | Deleted from TODO |
| Tracked as open issues (#32, #33, #34, #37, #39, #44, #48, #49, #50,
#51, #94) | ~12 | Kept as issues, scope notes added where useful |
| In-flight via open PRs (#83, #92, #119, #122, #125, #136, #137) | 4 |
Untouched |
| #46 Graceful Shutdown | 1 | Verified shipped, closed with comment |
| Orphan items | 4 | Split into #143-146 |
| Aspirational ("more tests", "update README") | 2 | Deleted — covered
by AGENTS.md doc-sync rules |

Projects #7 board + triage automation is now the single canonical
backlog.

## Test plan

- [x] `make ci` passes locally for each push (gated by
`.githooks/pre-push`)
- [x] CI green on the latest HEAD (8fbd7db)
- [x] PR-title-lint accepts the title (`chore: claude code native
improvements`)
- [x] All `docs/src/content/docs/*.md` paths in AGENTS.md resolve to
real files
- [x] Labeler workflow auto-labels correctly per the updated paths
- [x] `pre-push-reviewer` subagent loop reached `VERDICT: ship_it` with
zero findings under the strict rubric before the final push (validated
end-to-end across five iterations on this branch — each surfacing a real
doc-sync / off-by-one / quote-strip issue and forcing a fix before the
marker auto-wrote)
- [x] `agent-bash-gate.sh` quote-strip generalization sanity-tested
live: `echo "git push to deploy"` passes through; `git push --no-verify`
and `git commit --no-verify` still block (`bash -n` clean, JSON wiring
valid)
- [ ] Human review

## Related issues

- Closed during this work: #46
- Scope notes added: #44, #50, #94
- New follow-up issues created: #143, #144, #145, #146

🤖 Generated with [Claude Code](https://claude.com/claude-code)


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Claude Code integration: local pre-push reviewer with strict
ship/iterate/block verdicts, push gating via CI/review markers,
automatic review-marker creation, and a coverage-reporting command.

* **Documentation**
* Comprehensive Claude Code & agent docs, new skill guides for PR
review/sync, updated README/CONTRIBUTING/CHANGELOG/styleguide, and site
sidebar/page additions.

* **Chores**
* Added git and agent hooks, CI marker creation, worktrunk config,
labeler tweaks, and simplified Copilot instructions.

<!-- review_stack_entry_start -->

[![Review Change
Stack](https://storage.googleapis.com/coderabbit_public_assets/review-stack-in-coderabbit-ui.svg)](https://app.coderabbit.ai/change-stack/Wave-RF/WaveHouse/pull/147?utm_source=github_walkthrough&utm_medium=github&utm_campaign=change_stack)

<!-- review_stack_entry_end -->
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/api HTTP handlers, routing, middleware area/pipes Named query pipes go Pull requests that update go code

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

KV Watcher Real Time Updates

2 participants