Skip to content

feat(pipes): resolve table dependencies for cache invalidation#343

Open
taitelee wants to merge 11 commits into
mainfrom
pipe-cache-invalidation
Open

feat(pipes): resolve table dependencies for cache invalidation#343
taitelee wants to merge 11 commits into
mainfrom
pipe-cache-invalidation

Conversation

@taitelee

Copy link
Copy Markdown
Contributor

Summary

Pipes were cached TTL-only because they couldn't report which tables they read, so an ingest never invalidated a stale pipe result (#178). This teaches a pipe to resolve the ingested base tables its SQL reads — through views and aliases — and version-invalidate on the same namespace-versioned path structured queries already use.

When a pipe is created or updated, PipesHandler.Put resolves its base tables by running EXPLAIN QUERY TREE over dummy-bound SQL (the table set depends only on the SQL, not on per-request parameter values) and keeping the names the schema
registry knows in the configured database; the result is stored on the pipe (ResolvedTables, server-owned — never trusted from client input). Execute folds those tables into the cache key as whole-table namespaces, encoded exactly as the ingest worker encodes them, so an ingest into any of them invalidates the cached result. Resolution is best-effort and bounded: with no registry/ClickHouse wired or on any failure, the pipe falls back to the previous TTL-only behavior,

Related Issues

Closes #178

@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Pipes now automatically resolve and track the tables they depend on, enabling smarter cache invalidation.
  • Changed

    • Pipe query result caching now invalidates based on table changes; queries automatically fall back to time-based expiration if table resolution is unavailable.
  • Documentation

    • Updated CHANGELOG documenting improved cache invalidation strategy and pipe table dependency resolution.

Walkthrough

This PR refactors cache invalidation to use dependency-aware versioning for both structured queries and pipes. The cache API shifts from string-based key/namespace/scope parameters to SHA+namespace-dependencies, with a two-level versioning model that supports whole-table and scoped invalidation. Pipes now extract and cache their ingested table dependencies via EXPLAIN QUERY TREE, enabling writes to those tables to invalidate stale pipe results.

Changes

Cache Versioning Refactor and Local Cache Implementation

Layer / File(s) Summary
Cache interface and versioning contracts
internal/cache/cache.go, internal/cache/cache_test.go
Cache.Get/Set redesigned to accept (sha, deps []Namespace) instead of (key, namespace, scope); InvalidateCache replaced with Invalidate(namespaces); VersionManager introduces Namespace struct and shifts from flat version keys to two-level table/namespace versioning with QueryKey composition and explicit BumpTable/BumpNamespace methods.
Version manager versioning and key composition tests
internal/cache/version_manager_test.go
Tests validate NamespaceKey formatting with embedded table version, QueryKey composition with sorted dependency segments, and that BumpTable and BumpNamespace affect only their target scopes while leaving others unchanged.
LocalCache Get/Set/Invalidate implementation
internal/cache/local.go
LocalCache.Get accepts SHA+dependencies and computes keys via VersionManager.QueryKey; Set stores with computed keys; Invalidate bumps table or scoped namespace versions based on Namespace.Scope being empty.
LocalCache namespace-based API and invalidation tests
internal/cache/local_test.go
Tests updated to use []Namespace dependency descriptors; verify miss/hit/TTL behavior with new signature; test scoped vs whole-table invalidation semantics and cross-scope orphaning.

Pipe Dependency Resolution and Integration

Layer / File(s) Summary
Pipe table extraction from EXPLAIN QUERY TREE
internal/api/pipe_deps.go
New module with resolvePipeDeps and resolvePipeTables for ClickHouse-based table discovery; parseQueryTreeTables and helpers (identifierField, splitQualified, unquoteIdent) robustly extract and normalize database.table identifiers with backtick-quoting support; filterKnownTables deduplicates and filters against schema registry; pipeDeps converts tables to cache namespaces via NATS encoding.
Pipe table parsing and namespace mapping tests
internal/api/pipe_deps_test.go
Unit tests validate extraction from EXPLAIN output, qualified identifier parsing with backtick preservation, unquoting escaped backticks, schema registry filtering with sort/dedupe, and pipeDeps namespace conversion.
Pipe SQL and DummyBind for dependency resolution
internal/pipes/pipes.go
NamedQuery gains ResolvedTables []string field; DummyBind function creates runnable SQL by binding declared parameters with type-appropriate values and augmenting bare {{name}} placeholders (without inline defaults) to enable safe EXPLAIN analysis.
DummyBind parameter binding tests
internal/pipes/dummybind_test.go
Tests verify type-correct dummy literals for formal params, boolean→0 binding, augmentation of bare inline params, preservation of inline defaults, and no-op on parameter-free queries.
Pipes handler with Registry/Database fields and dependency-based caching
internal/api/pipes.go
PipesHandler extended with Registry and Database fields; Put resolves and stores q.ResolvedTables via resolvePipeDeps; Execute derives deps from ResolvedTables and passes to Cache.Get/Set for dependency-based invalidation; degrades to TTL-only on resolution failure.
Pipe handler execution and dependency-aware caching tests
internal/api/pipe_deps_test.go
fakeCache test double records dependency namespaces; tests verify deps passed to Get when ResolvedTables present (cache HIT) and nil deps when absent (TTL-only); confirm server ignores client-provided resolved_tables.

Query Handlers and Cache Invalidation Updates

Layer / File(s) Summary
Structured query handler cache dependencies
internal/api/structured_query.go
StructuredQueryHandler.Handle derives deps from safe-encoded table and NATS-encoded scope; passes deps to Cache.Get (HIT path) and Cache.Set (MISS path), replacing prior separate safeTableName/scope parameters.
Ingest worker namespace-based cache invalidation
internal/ingest/worker.go
IngestWorker.handleSuccess deduplicates scope values per table and constructs cache.Namespace entries; scopeless messages invalidate whole-table namespace; otherwise per-scope; calls w.cache.Invalidate instead of prior InvalidateCache with NATS-safe subjects.
Mock cache for namespace-based testing
internal/testutil/mocks.go
MockCache tracks invalidated cache.Namespace values (InvNamespaces) instead of version keys; Invalidate records namespaces; GetNamespaces accessor returns copy of recorded slice.
Ingest worker cache invalidation namespace tests
internal/ingest/worker_test.go
Tests refactored from version-key assertions to namespace assertions; validate scope deduplication, whole-table namespace for scopeless messages, namespace subsumption, and cache failure resilience.

System Wiring and Documentation

Layer / File(s) Summary
Main dependency injection for pipes handler
cmd/wavehouse/main.go
pipesHandler constructed separately and populated with Registry and Database fields using already-initialized schema registry and ClickHouse database name; Deps.Pipes set to configured instance.
Pipe dependency resolution end-to-end tests
tests/integration/pipe_deps_test.go
Integration tests with ClickHouse execution; verify resolution through views to underlying tables, cache HIT/MISS behavior, invalidation forcing MISS, and direct-table resolution.
Documentation of cache invalidation refactor and pipe resolution
CHANGELOG.md
Documents namespace-versioned read-cache invalidation with two-level table/scope hierarchy; documents pipes resolving ingested tables via EXPLAIN QUERY TREE with best-effort TTL-only fallback.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Wave-RF/WaveHouse#314: Main PR's dependency-based pipe/structured-query caching and invalidation is built directly on the same namespace-versioned cache refactor introduced in retrieved PR #314 (reworking internal/cache/cache.go, local.go, version_manager.go APIs and key composition).
  • Wave-RF/WaveHouse#177: Both PRs refactor the cache API and its wiring into PipesHandler/query caching to support table/scope-based invalidation (main PR further extends it with resolved pipe dependency namespaces via resolvePipeDeps and deps-driven Cache.Get/Set/Invalidate).
  • Wave-RF/WaveHouse#182: The main PR's ingest-worker cache invalidation changes (switching to namespace/dependency-based invalidation) are directly connected to the retrieved PR's ingest→ClickHouse→DLQ refactor that also rewired ingest worker cache invalidation logic in internal/ingest/worker.go.

Suggested labels

go, area/pipes, area/cache, area/api, area/ingest, area/docs, documentation, area/query

Suggested reviewers

  • EricAndrechek
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 48.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(pipes): resolve table dependencies for cache invalidation' directly and clearly summarizes the main change: teaching pipes to resolve and use table dependencies for cache invalidation.
Description check ✅ Passed The description comprehensively explains how pipes now resolve base tables via EXPLAIN QUERY TREE and integrate those tables into cache keys for ingest-driven invalidation, with fallback to TTL-only behavior.
Linked Issues check ✅ Passed The PR fully addresses issue #178 requirements: it resolves table dependencies from pipe SQL, maps them to cache namespaces for ingest invalidation, supports multi-table pipes, and maintains read-your-writes semantics with best-effort resolution.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing pipe table dependency resolution and cache invalidation: cache interface refactoring, version manager updates, pipe resolution logic, and integration tests align with the stated objectives.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pipe-cache-invalidation
⚔️ Resolve merge conflicts
  • Resolve merge conflict in branch pipe-cache-invalidation
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch pipe-cache-invalidation

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added go Pull requests that update go code area/api HTTP handlers, routing, middleware area/ingest Ingest pipeline (Bento, batching, DLQ) area/cache Local / shared / tiered caching area/pipes Named query pipes area/docs Documentation, site/, README area/infra CI, build, deploy, Docker, release labels Jun 11, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: d7ff65f4-c449-4c3d-a7b3-4ef586eb7bbe

📥 Commits

Reviewing files that changed from the base of the PR and between 9661c82 and 27ae1f1.

📒 Files selected for processing (18)
  • CHANGELOG.md
  • cmd/wavehouse/main.go
  • internal/api/pipe_deps.go
  • internal/api/pipe_deps_test.go
  • internal/api/pipes.go
  • internal/api/structured_query.go
  • internal/cache/cache.go
  • internal/cache/cache_test.go
  • internal/cache/local.go
  • internal/cache/local_test.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
  • internal/ingest/worker.go
  • internal/ingest/worker_test.go
  • internal/pipes/dummybind_test.go
  • internal/pipes/pipes.go
  • internal/testutil/mocks.go
  • tests/integration/pipe_deps_test.go
💤 Files with no reviewable changes (1)
  • internal/cache/cache_test.go
📜 Review details
🧰 Additional context used
📓 Path-based instructions (7)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Use Go 1.26 with strict formatting enforced by gofumpt
Use structured logging with log/slog (JSON handler)
Use Chi v5 for HTTP routing
Return errors, don't panic. Wrap with fmt.Errorf("context: %w", err)
Use package naming: lowercase, single word (or abbreviated). internal/ enforces module privacy
No global state: Dependencies are passed explicitly (constructor injection)
Comment the why, not the what. Add a comment only when the reason isn't obvious from the code; a line that matches the surrounding pattern needs none. Keep comments to 1–2 lines
DRY — one source of truth. Before adding logic, look for an existing helper, type, or constant to reuse; before duplicating a rule, factor it into one place every caller reads
Leave it neater than you found it — within reason. Fix small, safe things in passing: a stale comment, an obvious typo, a misnamed local, dead code on your path

Files:

  • internal/api/structured_query.go
  • internal/testutil/mocks.go
  • internal/ingest/worker.go
  • internal/pipes/pipes.go
  • internal/pipes/dummybind_test.go
  • cmd/wavehouse/main.go
  • tests/integration/pipe_deps_test.go
  • internal/ingest/worker_test.go
  • internal/api/pipe_deps.go
  • internal/cache/local.go
  • internal/api/pipe_deps_test.go
  • internal/cache/local_test.go
  • internal/cache/cache.go
  • internal/api/pipes.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
internal/api/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Chi HTTP router, JWT/JWKS middleware (from auth/), ingest/query/structured-query/SSE/schema/DLQ/policy/pipes handlers, Hub

Files:

  • internal/api/structured_query.go
  • internal/api/pipe_deps.go
  • internal/api/pipe_deps_test.go
  • internal/api/pipes.go
internal/ingest/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Ingest worker pipeline (worker.go): JetStream input → per-table batch INSERT with DLQ output. The pipeline is insert-only. Wire format EventMessage carries {table_name, scope, received_timestamp, data} and nothing else

Files:

  • internal/ingest/worker.go
  • internal/ingest/worker_test.go
internal/pipes/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Named query pipes: NamedQuery type + NATS KV store (WAVEHOUSE_PIPES) + .sql file bootstrap. Pre-defined SQL templates with param binding + caching; per-pipe allowed_roles is the only execute-path gate via policy.RoleAllowed

Files:

  • internal/pipes/pipes.go
  • internal/pipes/dummybind_test.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Use table-driven tests with tests := []struct{ name string; ... } and t.Run(tt.name, ...)
Use shared mocks from internal/testutil/ (MockPublisher, MockCache, MockDeduplicator, MockSubscriber) instead of creating ad-hoc mocks
Use testutil.MakeJWT(t, claims) and testutil.MakeExpiredJWT(t, claims) for auth tests
Use testutil.NewTestSchemaRegistry(tables) or discovery.NewSchemaRegistryFromMap(tables) for schema-aware tests
Use policy.NewMemoryStore(p) for in-memory policy testing without NATS
Use pipes.NewMemoryStore(queries...) for in-memory pipes testing without NATS
Use testutil.AssertJSONResponse(t, rec, status, expected) and testutil.AssertJSONContains(t, rec, status, substring) for response assertions

Files:

  • internal/pipes/dummybind_test.go
  • tests/integration/pipe_deps_test.go
  • internal/ingest/worker_test.go
  • internal/api/pipe_deps_test.go
  • internal/cache/local_test.go
  • internal/cache/version_manager_test.go
tests/integration/**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

Go integration tests (//go:build integration; ClickHouse testcontainer); run via make test-integration

Files:

  • tests/integration/pipe_deps_test.go
internal/cache/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Implement Cache interface → LocalCache (Ristretto) + SharedCache (TBD) + TieredCache (singleflight)

Files:

  • internal/cache/local.go
  • internal/cache/local_test.go
  • internal/cache/cache.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
🧠 Learnings (5)
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.

Applied to files:

  • CHANGELOG.md
📚 Learning: 2026-05-25T11:24:21.130Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 180
File: internal/cache/local.go:0-0
Timestamp: 2026-05-25T11:24:21.130Z
Learning: In WaveHouse’s cache packages (e.g., internal/cache/local.go), it’s acceptable to define package-level `var` constants that hold immutable OpenTelemetry metric attribute sets / `metric.MeasurementOption` values (for example: `cacheL1Attrs = metric.WithAttributes(attribute.String("tier","L1"))`). Treat these as stateless, pre-allocated option values (analogous to `regexp.MustCompile(...)`), not mutable global state. When applying the AGENTS.md “no global state / constructor injection” guideline, apply it to application dependencies (e.g., Cache, Publisher, Deduplicator) rather than to these immutable OTel attribute/measurement option variables—do not flag them as constructor-injection violations.

Applied to files:

  • internal/cache/local.go
  • internal/cache/local_test.go
  • internal/cache/cache.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
📚 Learning: 2026-05-20T01:02:00.784Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 164
File: internal/api/router_test.go:289-350
Timestamp: 2026-05-20T01:02:00.784Z
Learning: In WaveHouse’s internal API tests (files matching internal/api/**/*_test.go), follow the existing separation-of-concerns convention for testing the RequireRole middleware: inject `ContextKeyRole` directly into the request `context.Context` instead of using `testutil.MakeJWT`/JWT-driven flows. Do not refactor role-gate tests to use JWT tokens—JWT parsing and token handling are covered separately in `middleware_test.go` (the dedicated JWT parsing tests), and mixing those concerns would expand the failure surface and reduce isolation.

Applied to files:

  • internal/api/pipe_deps_test.go
📚 Learning: 2026-05-23T01:23:59.268Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 174
File: internal/api/ingest_test.go:111-111
Timestamp: 2026-05-23T01:23:59.268Z
Learning: In WaveHouse Go tests in internal/api/**/*_test.go, use internal/testutil.AssertJSONErrorResponse(t, w) for HTTP error-path JSON assertions. Do not use (or reintroduce) package-local assertJSONErrorResponse helpers. AssertJSONErrorResponse verifies the response Content-Type is application/json, includes the X-Content-Type-Options: nosniff header, and that the JSON body contains an "error" field.

Applied to files:

  • internal/api/pipe_deps_test.go
📚 Learning: 2026-05-20T20:30:15.808Z
Learnt from: taitelee
Repo: Wave-RF/WaveHouse PR: 172
File: internal/api/pipes_test.go:106-118
Timestamp: 2026-05-20T20:30:15.808Z
Learning: For WaveHouse pipes authorization allowlist checks, fix the empty-role fail-open behavior by (1) removing any outer guard that prevents allowlist evaluation when the incoming `role` is `""` (e.g., don’t short-circuit with `if role != "" { ... }`), and (2) during allowlist scanning, ensure only non-empty allowlist entries can match—e.g., require `ar != "" && ar == role` (so a malformed allowlist like `["" ]` cannot grant access to an empty incoming role via `"" == ""`).

Applied to files:

  • internal/api/pipes.go
🔇 Additional comments (30)
internal/cache/cache.go (1)

8-32: LGTM!

internal/cache/version_manager.go (3)

16-32: LGTM!


34-51: LGTM!


76-94: LGTM!

internal/cache/version_manager_test.go (1)

9-72: LGTM!

internal/cache/local.go (1)

31-75: LGTM!

internal/cache/local_test.go (1)

12-161: LGTM!

internal/api/structured_query.go (2)

129-143: LGTM!


172-174: LGTM!

internal/testutil/mocks.go (1)

117-136: LGTM!

internal/ingest/worker_test.go (4)

224-232: LGTM!


449-543: LGTM!


704-731: LGTM!


733-764: LGTM!

internal/ingest/worker.go (1)

461-509: LGTM!

internal/pipes/pipes.go (2)

29-37: LGTM!


204-243: LGTM!

internal/api/pipe_deps.go (7)

28-35: LGTM!


43-56: LGTM!


64-83: LGTM!


90-103: LGTM!


134-153: LGTM!


196-205: LGTM!


179-187: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Incorrect escape sequence handling for identifiers with both backslashes and backticks.

Lines 183-184 unescape ClickHouse backtick-quoted identifiers in the wrong order, causing incorrect results when an identifier contains both a literal backslash and a literal backtick. ClickHouse uses \\ for a backslash and ``` for a backtick inside backtick-quoted identifiers.

Example: The ClickHouse identifier a\b(a, backslash, backtick, b) is written in SQL as ``a\`b`` (where\` and \`` → `` ``). After stripping the outer backticks, the string is a\\\b`. The current code:

  1. Line 183 replaces \`` with `` ``: a\\\b` → `a\b`
  2. Line 184 tries to replace \\\\ with \\: no match (only two backslashes remain)
  3. Result: a\\b (wrong; expected a\b`)

Sequential ReplaceAll is unsafe here because the replacements can interact. The correct approach is to process escape sequences in a single pass:

  • Iterate byte-by-byte; when \ is seen, check the next byte:
    • If \, output one \ and advance two bytes
    • If `, output one ` and advance two bytes
    • Otherwise, output \ (or handle as an error)
🔧 Proposed fix for correct escape handling
 func unquoteIdent(s string) string {
 	s = strings.TrimSpace(s)
 	if len(s) >= 2 && s[0] == '`' && s[len(s)-1] == '`' {
-		s = s[1 : len(s)-1]
-		s = strings.ReplaceAll(s, "\\`", "`")
-		s = strings.ReplaceAll(s, "\\\\", "\\")
+		s = s[1 : len(s)-1] // strip outer backticks
+		// Unescape in a single pass to avoid interaction between \\ and \`
+		var out strings.Builder
+		for i := 0; i < len(s); i++ {
+			if s[i] == '\\' && i+1 < len(s) {
+				next := s[i+1]
+				if next == '\\' || next == '`' {
+					out.WriteByte(next)
+					i++ // skip the next byte (already consumed)
+					continue
+				}
+			}
+			out.WriteByte(s[i])
+		}
+		s = out.String()
 	}
 	return s
 }
			> Likely an incorrect or invalid review comment.
internal/api/pipes.go (3)

27-33: LGTM!


76-80: LGTM!


155-193: LGTM!

cmd/wavehouse/main.go (1)

367-372: LGTM!

internal/pipes/dummybind_test.go (1)

1-63: LGTM!

internal/api/pipe_deps_test.go (1)

1-192: LGTM!

Comment thread CHANGELOG.md

### Changed

- **Read-cache invalidation cut over to namespace-versioned keys, replacing the flat per-namespace version map** (`internal/cache/cache.go`, `internal/cache/local.go`, `internal/cache/version_manager.go`, `internal/api/structured_query.go`, `internal/api/pipes.go`, `internal/ingest/worker.go`, `internal/testutil/mocks.go`, plus tests in `internal/cache/{local,version_manager}_test.go` and `internal/ingest/worker_test.go`): the `Cache` interface is now keyed by what a result *depends on* rather than one opaque string. `Get`/`Set` take a `sha` (hash of SQL+params) plus the `[]Namespace` (`{table, scope}` pairs) the result reads — one for a structured query, several for a (future) pipe — and `QueryKey` folds the table version and per-namespace version of every dependency into the key, so a write to any dependency misses it. Invalidation is two-level: `BumpTable(table)` advances a table version that is embedded in every namespace key, invalidating all scopes of that table in O(1); `BumpNamespace(table, scope)` advances one scope plus the whole-table view, leaving other scopes' cached entries valid. The old `GetCacheKey`/`GetVersion`/`IncrementVersion`/`InvalidateCache` flat-`versions` path is removed, and the structured-query handler and ingest worker are wired onto the new path via `Cache.Invalidate([]Namespace)` (empty scope → `BumpTable`, scoped → `BumpNamespace`); the ingest worker collapses a single-table batch to one whole-table bump before calling `Invalidate` (a whole-table bump subsumes every per-scope bump for that table), keeping `Invalidate` a simple one-pass bump. Pipes now report the tables they read and are version-invalidated on this same path (see the pipe-dependency entry below; [#178](https://github.com/Wave-RF/WaveHouse/issues/178)).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove “(future) pipe” wording to avoid contradicting the next entry.

This bullet says pipe deps are “future,” while the next bullet documents the feature as delivered now. Updating that phrase keeps the release notes internally consistent.

Comment thread internal/api/pipe_deps.go
Comment on lines +110 to +124
func identifierField(s string) string {
s = strings.TrimSpace(s)
inTick := false
for i := 0; i < len(s); i++ {
switch s[i] {
case '`':
inTick = !inTick
case ',':
if !inTick {
return strings.TrimSpace(s[:i])
}
}
}
return strings.TrimSpace(s)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

Escaped backtick handling edge case in rare table names.

The function toggles inTick on every backtick character without recognizing ClickHouse's escape sequences (\`` for a literal backtick inside a quoted identifier). If a table name contains a literal backtick (e.g., `` a`b`` in ClickHouse syntax, representing the identifierab), the toggle logic may misidentify the field boundary and include trailing fields in the extracted identifier. The downstream filterKnownTables will not match such malformed identifiers against the registry and will drop them, causing the pipe to fall back to TTL-only caching for that table (a false negative, not a crash or security issue).

This is a low-probability edge case—table names with literal backticks are extremely rare—and the best-effort design already tolerates partial resolution failures. If full correctness is desired, consider tracking escape sequences (\`` and \) in the character loop to avoid toggling inTick` on escaped backticks.

Comment on lines +58 to 74
func (vm *VersionManager) QueryKey(sha string, deps []Namespace) string {
segs := make([]string, len(deps))
// Lock per dependency rather than across the whole loop: each dep's table +
// namespace versions are read together (consistent for that dep), but we don't
// hold the lock across all deps. A concurrent bump can land between deps, but the
// key is already a racy snapshot (versions can move between building it and using
// it), so cross-dep consistency buys nothing. Crucially, the sort/join run with
// no lock held.
for i, d := range deps {
vm.mu.RLock()
nsKey := vm.namespaceKeyLocked(d.Table, d.Scope)
segs[i] = fmt.Sprintf("%s.%d", nsKey, vm.namespaceVersions[nsKey])
vm.mu.RUnlock()
}
sort.Strings(segs)
return sha + "|" + strings.Join(segs, "|")
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

Per-dependency locking is correct, but the comment could be clearer.

The locking granularity (lock/unlock per dependency rather than holding across all deps) is functionally correct. However, the comment's rationale "cross-dep consistency buys nothing" is somewhat terse. Consider expanding slightly:

"Consistency across dependencies doesn't affect correctness: if a version changes between key construction and actual use (Get/Set), we simply miss the cache—correct behavior. Reading deps at slightly different times has the same effect."

📝 Optional comment clarification
-	// Lock per dependency rather than across the whole loop: each dep's table +
-	// namespace versions are read together (consistent for that dep), but we don't
-	// hold the lock across all deps. A concurrent bump can land between deps, but the
-	// key is already a racy snapshot (versions can move between building it and using
-	// it), so cross-dep consistency buys nothing. Crucially, the sort/join run with
-	// no lock held.
+	// Lock per dependency rather than across the whole loop: each dep's table +
+	// namespace versions are read atomically (consistent for that dep), but we don't
+	// hold the lock across all deps. A concurrent bump landing between deps has the
+	// same effect as a bump landing between key construction and use—both cause a
+	// cache miss, which is correct. Cross-dep consistency doesn't affect correctness,
+	// so we release the lock between deps. The sort/join run with no lock held.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (vm *VersionManager) QueryKey(sha string, deps []Namespace) string {
segs := make([]string, len(deps))
// Lock per dependency rather than across the whole loop: each dep's table +
// namespace versions are read together (consistent for that dep), but we don't
// hold the lock across all deps. A concurrent bump can land between deps, but the
// key is already a racy snapshot (versions can move between building it and using
// it), so cross-dep consistency buys nothing. Crucially, the sort/join run with
// no lock held.
for i, d := range deps {
vm.mu.RLock()
nsKey := vm.namespaceKeyLocked(d.Table, d.Scope)
segs[i] = fmt.Sprintf("%s.%d", nsKey, vm.namespaceVersions[nsKey])
vm.mu.RUnlock()
}
sort.Strings(segs)
return sha + "|" + strings.Join(segs, "|")
}
func (vm *VersionManager) QueryKey(sha string, deps []Namespace) string {
segs := make([]string, len(deps))
// Lock per dependency rather than across the whole loop: each dep's table +
// namespace versions are read atomically (consistent for that dep), but we don't
// hold the lock across all deps. A concurrent bump landing between deps has the
// same effect as a bump landing between key construction and use—both cause a
// cache miss, which is correct. Cross-dep consistency doesn't affect correctness,
// so we release the lock between deps. The sort/join run with no lock held.
for i, d := range deps {
vm.mu.RLock()
nsKey := vm.namespaceKeyLocked(d.Table, d.Scope)
segs[i] = fmt.Sprintf("%s.%d", nsKey, vm.namespaceVersions[nsKey])
vm.mu.RUnlock()
}
sort.Strings(segs)
return sha + "|" + strings.Join(segs, "|")
}

Comment on lines +60 to +144
func TestPipeDeps_ResolveThroughViewAndInvalidate(t *testing.T) {
e := env(t)
ctx := context.Background()

base := createTable(t, "id String, val Float64", "ORDER BY id")
require.NoError(t, e.chConn.Exec(ctx,
fmt.Sprintf("INSERT INTO %s (id, val) VALUES ('a', 1.5)", base)))

// The pipe reads this view, never the base table directly — so only view→base
// resolution can recover the real dependency. Deliberately NOT refreshing the
// registry after creating it, so the view itself stays unknown to the registry
// and is filtered out, leaving only the base table.
view := base + "_v"
require.NoError(t, e.chConn.Exec(ctx,
fmt.Sprintf("CREATE VIEW %s AS SELECT id, val FROM %s", view, base)))
t.Cleanup(func() {
dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = e.chConn.Exec(dctx, "DROP VIEW IF EXISTS "+view)
})

// A cache we own, so we can both observe hits and simulate the ingest worker's
// invalidation bump deterministically.
localCache, err := cache.NewLocal(1 << 20)
require.NoError(t, err)
t.Cleanup(func() { _ = localCache.Close() })

h := api.NewPipesHandler(
pipes.NewMemoryStore(),
policy.NewMemoryStore(&policy.Policy{AdminRole: "admin"}),
e.chConn,
localCache,
30*time.Second,
testutil.NopLogger(),
)
h.Registry = e.registry
h.Database = testCHDatabase

// PUT runs resolution against real ClickHouse.
putPipe(t, h, "via_view", fmt.Sprintf("SELECT id, val FROM %s", view))

saved := h.Store.Get("via_view")
require.NotNil(t, saved)
assert.Contains(t, saved.ResolvedTables, base,
"EXPLAIN QUERY TREE should resolve view %q to its base table %q; got %v", view, base, saved.ResolvedTables)

// MISS, then HIT once cached.
assert.Equal(t, "MISS", executePipe(t, h, "via_view"))
localCache.Wait()
assert.Equal(t, "HIT", executePipe(t, h, "via_view"))

// Simulate ingest bumping the base table's version. Because the pipe was keyed
// on the base table (resolved through the view), this must orphan the entry.
_, err = localCache.Invalidate(ctx, []cache.Namespace{{Table: query.SafeEncodeNATS(base)}})
require.NoError(t, err)
localCache.Wait()

assert.Equal(t, "MISS", executePipe(t, h, "via_view"),
"a write to the resolved base table should have invalidated the cached pipe result")
}

// TestPipeDeps_DirectTableResolves covers the plain case: a pipe reading a base
// table directly resolves to exactly that table.
func TestPipeDeps_DirectTableResolves(t *testing.T) {
e := env(t)

base := createTable(t, "id String, n UInt64", "ORDER BY id")

h := api.NewPipesHandler(
pipes.NewMemoryStore(),
policy.NewMemoryStore(&policy.Policy{AdminRole: "admin"}),
e.chConn,
nil, // no cache needed; this test only checks resolution at Put()
30*time.Second,
testutil.NopLogger(),
)
h.Registry = e.registry
h.Database = testCHDatabase

putPipe(t, h, "direct", fmt.Sprintf("SELECT id, n FROM %s", base))

saved := h.Store.Get("direct")
require.NotNil(t, saved)
assert.Equal(t, []string{base}, saved.ResolvedTables)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Convert these integration scenarios to table-driven subtests.

These tests are currently standalone and duplicate substantial setup/handler wiring. Please reshape into tests := []struct{...} + t.Run(tt.name, ...) to match the repo’s test convention and keep future dependency-resolution cases cheaper to add.

As per coding guidelines, **/*_test.go should use table-driven tests with tests := []struct{ name string; ... } and t.Run(tt.name, ...).

Source: Coding guidelines

@github-project-automation github-project-automation Bot moved this from Backlog to In review in WaveHouse Task Board Jun 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/api HTTP handlers, routing, middleware area/cache Local / shared / tiered caching area/docs Documentation, site/, README area/infra CI, build, deploy, Docker, release area/ingest Ingest pipeline (Bento, batching, DLQ) area/pipes Named query pipes go Pull requests that update go code

Projects

Status: In review

Development

Successfully merging this pull request may close these issues.

feat(cache): pipe cache invalidation via table/scope extraction from named queries

1 participant