diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 78238e78b..b4efa7072 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -31,6 +31,18 @@ "ban_timeout": 300000, "broadcast_address": null, "broadcast_port": 6433, + "cache": { + "backend": "redis", + "enabled": false, + "max_result_size": 0, + "policy": "no_cache", + "redis": { + "cache_key_prefix": "pgdog:", + "operation_timeout": 2000, + "url": "redis://localhost:6379" + }, + "ttl": 300 + }, "checkout_timeout": 5000, "client_connection_recovery": "drop", "client_idle_in_transaction_timeout": 9223372036854775807, @@ -275,6 +287,76 @@ } ] }, + "Cache": { + "description": "Cache configuration.", + "type": "object", + "properties": { + "backend": { + "description": "Which storage backend to use.\n\n_Default:_ `redis`", + "$ref": "#/$defs/CacheBackend", + "default": "redis" + }, + "enabled": { + "description": "Whether to enable caching.\n\n_Default:_ `false`", + "type": "boolean", + "default": false + }, + "max_result_size": { + "description": "Maximum result size in bytes to cache (0 = unlimited).\n\n_Default:_ `0`", + "type": "integer", + "format": "uint", + "default": 0, + "minimum": 0 + }, + "policy": { + "description": "Cache policy: `no_cache` or `cache`.\n\n_Default:_ `no_cache`", + "$ref": "#/$defs/CachePolicy", + "default": "no_cache" + }, + "redis": { + "description": "Redis backend configuration.\n\nOnly read when `backend = \"redis\"`.", + "$ref": "#/$defs/RedisConfig", + "default": { + "cache_key_prefix": "pgdog:", + "operation_timeout": 2000, + "url": "redis://localhost:6379" + } + }, + "ttl": { + "description": "Default TTL in seconds for cached queries.\n\n_Default:_ `300`", + "type": "integer", + "format": "uint64", + "default": 300, + "minimum": 0 + } + }, + "additionalProperties": false + }, + "CacheBackend": { + "description": "Cache storage backend discriminator.", + "oneOf": [ + { + "description": "Redis backend (default).", + "type": "string", + "const": "redis" + } + ] + }, + "CachePolicy": { + "description": "Cache policy.", + "oneOf": [ + { + "description": "Never cache queries for this database.", + "type": "string", + "const": "no_cache" + }, + { + "description": "Always cache read queries.", + "type": "string", + "const": "cache" + } + ] + }, "ConnectionRecovery": { "description": "controls if server connections are recovered or dropped if a client abruptly disconnects.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#connection_recovery", "oneOf": [ @@ -574,6 +656,22 @@ "maximum": 65535, "minimum": 0 }, + "cache": { + "description": "Redis cache configuration for this database.", + "$ref": "#/$defs/Cache", + "default": { + "backend": "redis", + "enabled": false, + "max_result_size": 0, + "policy": "no_cache", + "redis": { + "cache_key_prefix": "pgdog:", + "operation_timeout": 2000, + "url": "redis://localhost:6379" + }, + "ttl": 300 + } + }, "checkout_timeout": { "description": "Maximum amount of time a client is allowed to wait for a connection from the pool.\n\n_Default:_ `5000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#checkout_timeout", "type": "integer", @@ -1441,6 +1539,30 @@ } ] }, + "RedisConfig": { + "description": "Redis-specific cache backend configuration.\n\nCorresponds to the `[general.cache.redis]` TOML section.", + "type": "object", + "properties": { + "cache_key_prefix": { + "description": "Key prefix prepended to every cache key stored in Redis.\n\n_Default:_ `pgdog:`", + "type": "string", + "default": "pgdog:" + }, + "operation_timeout": { + "description": "Timeout in milliseconds for individual Redis operations (GET/SET/ping).\n\n_Default:_ `2000`", + "type": "integer", + "format": "uint64", + "default": 2000, + "minimum": 1 + }, + "url": { + "description": "Redis connection URL.\n\n_Default:_ `redis://localhost:6379`", + "type": "string", + "default": "redis://localhost:6379" + } + }, + "additionalProperties": false + }, "ReplicaLag": { "description": "Replica lag banning configuration. When a replica's replication lag exceeds the threshold, it is banned from serving read queries.", "type": "object", diff --git a/Cargo.lock b/Cargo.lock index 98f6c842c..d2a7fa197 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -663,7 +663,7 @@ dependencies = [ "bitflags 2.9.1", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -973,6 +973,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.4" @@ -1023,6 +1029,12 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "critical-section" version = "1.2.0" @@ -1450,6 +1462,15 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "flume" version = "0.11.1" @@ -1497,6 +1518,47 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fred" +version = "9.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cdd5378252ea124b712e0ac55147d26ae3af575883b34b8423091a4c719606b" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "bytes-utils", + "crossbeam-queue", + "float-cmp", + "fred-macros", + "futures", + "log", + "parking_lot", + "rand 0.8.5", + "redis-protocol", + "rustls 0.23.27", + "rustls-native-certs 0.7.3", + "semver", + "socket2", + "tokio", + "tokio-rustls 0.26.2", + "tokio-stream", + "tokio-util", + "url", + "urlencoding", +] + +[[package]] +name = "fred-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1458c6e22d36d61507034d5afecc64f105c1d39712b7ac6ec3b352c423f715cc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -2871,6 +2933,7 @@ dependencies = [ "dashmap", "derive_builder", "fnv", + "fred", "futures", "hickory-resolver", "http-body-util", @@ -2914,6 +2977,7 @@ dependencies = [ "tracing-subscriber", "url", "uuid", + "xxhash-rust", ] [[package]] @@ -3458,6 +3522,20 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "redis-protocol" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65deb7c9501fbb2b6f812a30d59c0253779480853545153a51d8e9e444ddc99f" +dependencies = [ + "bytes", + "bytes-utils", + "cookie-factory", + "crc16", + "log", + "nom", +] + [[package]] name = "redox_syscall" version = "0.5.12" @@ -3826,6 +3904,19 @@ dependencies = [ "security-framework 2.11.1", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.1" @@ -5859,6 +5950,12 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "yoke" version = "0.8.0" diff --git a/docs/CACHE.md b/docs/CACHE.md new file mode 100644 index 000000000..b9eedfcb1 --- /dev/null +++ b/docs/CACHE.md @@ -0,0 +1,414 @@ +# Cache for pgdog — State of Implementation + +## Architecture + +Cache SELECT queries in Redis, bypass PostgreSQL on cache hit, populate cache on cache miss. Two-tier policy resolution: SQL comment/connection parameter → pgdog's config. + +--- + +## Implementation + +### Configuration (`pgdog-config`) + +**`cache.rs`** — Cache configuration types: + +**CachePolicy enum:** `NoCache` (default), `Cache`. Implements `FromStr`, `Display`, `Serialize`, `Deserialize`, `Copy`, `JsonSchema`. + +**CacheBackend enum:** `Redis` (default). Discriminator for selecting the storage backend and for hotswap detection when the backend type changes in config. + +**RedisConfig struct** (`[general.cache.redis]`): +- `url: String` — Redis connection URL (default `redis://localhost:6379`) +- `cache_key_prefix: String` — prefix prepended to every Redis key (default `pgdog:`) +- `operation_timeout: NonZeroU64` — timeout in seconds for individual Redis operations (GET/SET/ping) (default `2`) + +**Cache struct** (`[general.cache]`): +- `enabled: bool` — is caching on? (default `false`) +- `policy: CachePolicy` — which policy? (default `no_cache`) +- `ttl: u64` — default TTL seconds (default `300`) +- `backend: CacheBackend` — which storage backend (default `redis`) +- `redis: RedisConfig` — Redis-specific settings +- `max_result_size: usize` — max cached result bytes (default `0` = unlimited) + +Example TOML: +```toml +[general.cache] +enabled = true +policy = "cache" +ttl = 300 + +[general.cache.redis] +url = "redis://localhost:6379" +cache_key_prefix = "pgdog:" +operation_timeout = 2 +``` + +**`general.rs`** — `General` struct holds `cache: Cache` field. **Cache config is global.** + +**`lib.rs`** — Exports `pub use cache::{CacheBackend, CachePolicy, Cache, RedisConfig as CacheRedisConfig};`. + +### Cache Module (`pgdog/src/frontend/cache/`) + +**`mod.rs`** — Module exports, global singleton, and main `Cache` struct: +```rust +pub mod context; +pub mod directive; +pub mod hashing; +pub mod integration; +pub mod storage; +pub mod wire; + +pub use context::CacheContext; +pub use directive::CacheDirective; +pub use integration::CacheCheckResult; +pub use storage::{CacheStorage, RedisCacheStorage}; +``` + +`Cache` struct wraps `RwLock>>` (tokio `RwLock`). + +**Global singleton:** Cache is global-scoped, not connection-scoped. Accessed via async `cache()` function which returns `Arc` from a `tokio::sync::OnceCell` static. `Cache::new()` is async and reads config internally — no parameters needed. + +**Config hotswap:** `hotswap_if_needed()` is called at the top of `try_read_cache` and `save_response_in_cache`. It fast-paths with a read-lock; acquires write-lock only if `is_actual()` returns true, then rebuilds the storage. The write-lock path re-checks to guard against concurrent swaps. `is_actual()` is a no-argument method on `CacheStorage` that reads current config internally — callers do not pass a config snapshot. + +Key methods: +- `new()` — async; creates storage from current config (or `None` if disabled); waits up to `operation_timeout` for initial Redis connection +- `hotswap_if_needed()` — compares live config against the active storage via `is_actual()`; swaps if `true` +- `try_read_cache(cache_context, in_transaction, client_request, params)` — hotswaps, calls `cache_check()`, returns `Ok(Some(Vec))` on HIT (caller replays through pipeline), `Ok(None)` on MISS/PASSTHROUGH +- `save_response_in_cache(cache_context)` — hotswaps, finalizes by storing the captured response + +**`storage/mod.rs`** — Abstract storage trait and error type: +- `CacheStorage` trait: `get`, `set`, `is_enabled`, `is_actual` — implemented by all cache backends +- `is_actual(&self) -> bool` — takes no arguments; reads live config internally; should only check parameters that require a storage rebuild (e.g. `backend` type and storage-specific settings like `redis.url`); TTL and other runtime settings do not require a rebuild and are read from live config on every call +- `Error` enum shared across all backends: `RedisError`, `ConnectionFailed`, `CacheMiss` + +**`storage/redis.rs`** — Redis storage backend (`RedisCacheStorage`) implementing `CacheStorage`: +- `RedisCacheStorage::new(config)` — async; builds client from given URL; spawns background connection task and waits up to `operation_timeout` ms for it to complete; if timeout expires, task continues in background; returns `None` if URL is invalid +- Background connect task: retries `init()` in a loop (5ms to 5s exponential backoff); sets `reconnecting = false` on success; CAS-guarded so only one task runs at a time; timeout for `init()` read from live config (`config.redis.operation_timeout`) +- `get(&self, key)` — returns `Result, Error>`; returns `Err(Error::ConnectionFailed)` immediately (triggering cache miss) if not yet connected; marks `reconnecting` and spawns reconnect on Redis errors; operation timeout read from live config +- `set(&self, key, value, ttl)` — stores bytes with EX expiration; returns immediately on disconnect; respects `max_result_size` from live config; operation timeout read from live config +- `reconnect()` — spawns reconnect task fire-and-forget (no waiting) if not already running (CAS-guarded) +- `is_actual()` — returns `true` if `backend != Redis` or `self.url != live config url`; only URL triggers a rebuild (all other redis settings including `cache_key_prefix`, `operation_timeout` are read from live config on every call) +- `is_enabled()` — reads live `config().config.general.cache.enabled` +- Key prefix comes from `config().config.general.cache.redis.cache_key_prefix` +- `reconnecting: Arc` — prevents multiple concurrent reconnect tasks +- All Redis operations wrapped in `tokio::time::timeout(Duration::from_millis(operation_timeout))` where `operation_timeout` is read from live config on every call (no compile-time constant) +- `RedisCacheStorage` stores only `url: String` (not the full `CacheConfig`) — all other settings are read from live config; this means `cache_key_prefix` and `operation_timeout` changes take effect immediately without a storage rebuild + +**`directive.rs`** — Position-agnostic cache directive parsing and resolution: +- `CacheMode` enum: `Cache`, `ForceCache`, `NoCache` (default) +- `CacheDirective` struct: `{ mode: Option, ttl_seconds: Option, key: Option }` — flat structure where each field is independently optional +- `CacheDirective::parse(s: &str)` — parses space-separated tokens in any order: `cache`, `force_cache`, `no_cache`, `ttl=N`, `key=NAME`, and ignores unknown tokens +- `CacheDirective::or(fallback)` — merges two directives field-by-field: if a field is `Some`, it wins; otherwise fall back to the other directive's value +- `resolve(client_request, params)` — extracts directive from SQL comment (via AST `comment_cache` field) and connection parameter (`pgdog.cache`), merges them (comment wins per-field), returns the merged `CacheDirective` +- Arguments can appear in any order: `ttl=17 force_cache` and `force_cache ttl=17` are equivalent +- Missing fields cascade through resolution tiers: comment → parameter → global config +- `key=NAME` — when present, overrides the automatic cache key: XXH3 of just the literal name string is used instead of `database + query + bind params`; queries with different SQL but the same `key=` share a cache slot + +**`context.rs`** — Cache context held in `QueryEngineContext`: +- `CacheContext` with `cache_miss: Option`, `response_buffer: Vec`, and `had_error: bool` +- `capture_response(message)` — stores message in buffer when cache miss is tracked; sets `had_error = true` on `E` messages +- `reset()` — clears all state for per-query isolation + +**`integration.rs`** — Public types and integration methods on `impl Cache`: +- `CacheMiss` struct: `{ key: u64, ttl: u64 }` — carries the resolved cache key and TTL on a cache miss +- `CacheCheckResult` enum: `Hit { cached: Vec }`, `Miss(CacheMiss)`, `Passthrough` +- `cache_check()` — main entry point: checks route, calls `directive::resolve()`, resolves final mode and TTL: + - Mode resolution: directive mode → global `CachePolicy` (converted to `CacheMode`) + - TTL resolution: directive TTL → global `cache.ttl` + - Key resolution: if `directive.key` is `Some(name)`, the cache key is XXH3 of that name alone; otherwise `compute_cache_key_hash(database, query, bind)` from `hashing.rs` is used + - `NoCache` → `Passthrough` + - `ForceCache` → returns `Miss` immediately (bypasses Redis lookup, always repopulates) + - `Cache` → computes key hash, acquires read-lock on storage, calls `storage.get()`; `CacheMiss` → `Miss`; other errors → `Passthrough` +- `cache_response()` — serializes `Vec` into wire bytes and stores in Redis + +**`hashing.rs`** — Cache key computation: +- `compute_cache_key_hash(database, query, bind) -> u64` — public entry point; feeds `database`, the normalized query, and any bind parameters into an XXH3 hasher +- `hash_query_without_comments(query, hasher)` — private; feeds the query directly into the hasher without allocating a `String`. Implements a state machine over the character stream: + - **Block comments** (`/* … */`, including PostgreSQL nested variants) — skipped entirely; treated as a token separator (sets `pending_space`) + - **Line comments** (`-- … \n`) — skipped entirely; treated as a token separator + - **Whitespace** outside string literals — collapsed: any run of whitespace sets `pending_space = true` but is not hashed directly; leading and trailing whitespace is suppressed naturally + - **String literals** (`'…'` with `''` escapes) — passed through verbatim; spaces inside strings are never collapsed or removed + - **Regular characters** — if `pending_space` is set and at least one character has already been emitted, a single space is hashed first, then the character; this ensures `SELECT/*c*/1` and `SELECT 1` produce the same hash without merging tokens into `SELECT1` + +This means `"/* pgdog_cache: cache */ SELECT 1"` and `"SELECT 1"` hash identically: the comment is dropped and the leading whitespace it would have left is suppressed because `emitted = false` at that point. + +**`wire.rs`** — PostgreSQL wire message deserialization: +- `Cache::deserialize_cached(Vec) -> Vec` — parses a flat blob of concatenated PostgreSQL wire messages into individual `Message` values. Wire format: `[1B code][4B length (incl. itself)][payload]`. Named constants `HEADER_CODE_LEN`, `HEADER_LEN_SIZE`, `HEADER_TOTAL` replace magic numbers. Not Redis-specific — usable with any cache backend that stores raw bytes. + +### Query Engine Integration + +**`pgdog/src/frontend/client/query_engine/mod.rs`** +- Imports global async `cache()` from `frontend::cache` +- `handle()` flow: after `route_query()` and before `before_execution()`, calls `cache().await.try_read_cache(context)`. If HIT: replays each cached `Message` through `process_server_message()` (same pipeline as live backend responses — stats, transaction state, hooks all fire correctly), then returns. On MISS: stores state in `context.cache_context`. +- After `match command`, calls `cache().await.save_response_in_cache(context)` to finalize caching. + +**`pgdog/src/frontend/client/query_engine/query.rs`** +- `process_server_message()` calls `context.cache_context.capture_response(message.clone())`. + +**`pgdog/src/frontend/client/query_engine/context.rs`** +- `QueryEngineContext` holds `cache_context: CacheContext` field. + +### Backend and Config Integration + +**`pgdog/src/backend/pool/cluster.rs`** +- `ClusterConfig` and `Cluster` hold `cache_enabled: bool` field +- Query parser requirement check includes `|| self.cache_enabled()` — when caching is on, the query parser is forced on. + +**`pgdog-config/src/core.rs`** +- Startup warning emitted when `cache.is_enabled()` and parser is `Off` or `SessionControl`. + +### Dependencies + +**`pgdog/Cargo.toml`** +fred = { version = "9", features = ["enable-rustls"] } +xxhash-rust = { version = "0.8", features = ["xxh3"]} + +--- + +## Key Design Decisions + +| Decision | Choice | +|----------|--------| +| Interception point | Between `route_query()` and `before_execution()` in `handle()` | +| Cache config scope | **Global** (`config.general.cache`) | +| Redis client | `fred` crate v9 (async-native, tokio integration) | +| Cacheable queries | Only reads (`route.is_read()`) | +| Cache policy resolution | Field-by-field merge: SQL comment → connection param → global config | +| Cache HIT flow | Deserialize wire bytes → `Vec` → replay each through `process_server_message()` | +| Cache MISS flow | Normal execute → capture response via `CacheContext` → store in Redis → respond | +| Cache key | XXH3 hash of `database_name + normalized query + bind params`; or XXH3 of just `key=NAME` when the hint is present | +| Query normalization | On-the-fly in hasher: comments stripped, whitespace collapsed (except inside string literals), no `String` allocated | +| Wire format | Full PostgreSQL wire messages stored as raw bytes (one concatenated buffer) | +| Config hotswap | `is_actual()` reads live config internally; only those config parameters, that require rebuild, triggers it | +| Redis operation timeout | Configurable via `redis.operation_timeout` (seconds, default `2`); read from live config on every call — no rebuild needed to change it | + +--- + +## How to Control Cache + +### SQL Comments + +Add a C-style comment before your query. Arguments can appear in any order: + +```sql +-- Force bypass cache for this query +/* pgdog_cache: no_cache */ +SELECT * FROM users WHERE id = 1; + +-- Cache with database default TTL +/* pgdog_cache: cache */ +SELECT * FROM products WHERE category = 'electronics'; + +-- Cache with custom TTL in seconds +/* pgdog_cache: cache ttl=300 */ +SELECT * FROM orders; + +-- TTL before mode (same as above) +/* pgdog_cache: ttl=300 cache */ +SELECT * FROM orders; + +-- Force cache with database default TTL +/* pgdog_cache: force_cache */ +SELECT * FROM products WHERE category = 'electronics'; + +-- Force cache with custom TTL in seconds +/* pgdog_cache: force_cache ttl=300 */ +SELECT * FROM orders; + +-- Only specify TTL, inherit mode from connection parameter or global config +/* pgdog_cache: ttl=60 */ +SELECT * FROM sessions; + +-- Use a custom cache key (bypasses automatic key computation) +/* pgdog_cache: cache key=my_query */ +SELECT id, val FROM cache_test_custom_key WHERE id = 1; + +-- Custom key with custom TTL +/* pgdog_cache: cache key=my_query ttl=120 */ +SELECT id, val FROM cache_test_custom_key WHERE id = 1; + +-- Multiple directives in one comment (cache directive does not consume other directives) +/* pgdog_cache: force_cache ttl=10 pgdog_role: replica */ +SELECT * FROM analytics; +``` + +> **Position-agnostic parsing:** Arguments like `cache`, `force_cache`, `no_cache`, `ttl=N`, and `key=NAME` +> can appear in any order. Unknown tokens are silently ignored for forward compatibility. +> +> **Field-by-field fallback:** If you specify only `ttl=60` in a comment without a mode, the mode +> will be taken from the `pgdog.cache` connection parameter. If the parameter also lacks a mode, +> the global config's `policy` is used. Each field cascades independently through the resolution +> tiers: comment → parameter → global config. +> +> **Hash independence from comments:** SQL comments are skipped on-the-fly while hashing, with no +> intermediate `String` allocation. Surrounding whitespace left by a stripped comment is also +> collapsed, so `"/* pgdog_cache: cache */ SELECT 1"` and `"SELECT 1"` produce exactly the same +> cache key. Spaces inside string literals (`WHERE name = 'hello world'`) are never affected. +> +> **Custom cache key (`key=NAME`):** When `key=NAME` is specified, the automatic key computation +> (`database + query + bind params`) is bypassed entirely. Instead, XXH3 of just the literal name +> is used. This lets you assign the same cache slot to logically-equivalent queries that differ in +> SQL text, or create an independent slot for a variant of a query that would otherwise collide. +> If `key=` is empty (`key=`), the argument is ignored and the automatic key is used. + +### Connection Parameter + +Set `pgdog.cache` at connection time (via DSN options) or with `SET` after connecting. Arguments can appear in any order: + +```sql +-- Session-wide: all queries in this connection bypass cache +SET pgdog.cache = 'no_cache'; + +-- Session-wide: cache all queries with default TTL +SET pgdog.cache = 'cache'; + +-- Session-wide: cache all queries with 5-minute TTL +SET pgdog.cache = 'cache ttl=300'; + +-- TTL before mode (same as above) +SET pgdog.cache = 'ttl=300 cache'; + +-- Session-wide: force cache all queries with default TTL +SET pgdog.cache = 'force_cache'; + +-- Session-wide: force cache all queries with 5-minute TTL +SET pgdog.cache = 'force_cache ttl=300'; + +-- Only specify TTL, inherit mode from global config +SET pgdog.cache = 'ttl=120'; +``` + +```sh +# Session-wide: all queries in this connection bypass cache +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dno_cache + +# Session-wide: cache all queries with default TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dcache + +# Session-wide: cache all queries with 5-minute TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dcache%20ttl%3D300 + +# Session-wide: force cache all queries with default TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dforce_cache + +# Session-wide: force cache all queries with 5-minute TTL +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dforce_cache%20ttl%3D300 + +# Only specify TTL, inherit mode from global config +psql postgresql://postgres:postgres@127.0.0.1:5432/postgres?options=-c%20pgdog.cache%3Dttl%3D120 +``` + +### Priority Order + +Cache directives are resolved field-by-field. For each field (`mode` and `ttl_seconds`), the first non-`None` value wins: + +``` +SQL comment → pgdog.cache parameter → global config +(highest) (lowest) +``` + +**Example 1:** Comment specifies `ttl=60` but no mode; parameter specifies `cache` but no TTL. +- Final mode: `cache` (from parameter) +- Final TTL: `60` (from comment) + +**Example 2:** Comment specifies `force_cache ttl=10`; parameter specifies `cache ttl=300`. +- Final mode: `force_cache` (comment wins) +- Final TTL: `10` (comment wins) + +**Example 3:** Comment specifies `ttl=120`; parameter is not set; global config has `policy = "cache"` and `ttl = 300`. +- Final mode: `cache` (from global config) +- Final TTL: `120` (comment wins) + +--- + +## Completed + +1. **Redis client never connects** - Problem: CacheClient::new() built the client but never called init(). Fred requires explicit connection initialization. Fix: Added lazy `ensure_connected()` using `client.init().await`, guarded by `AtomicBool` so it runs exactly once on first get()/set(). Changed CacheClient from `#[derive(Debug)]` to manual Debug impl (contains `Arc`). + +2. **Redis GET fails on NULL / cache miss** - Problem: `client.get::()` throws `Parse Error: Cannot parse into bytes` when the key doesn't exist. Fix: Use `client.get::()` and check `val.is_null()` before extracting bytes. Later refined: `get()` now returns `Result, Error>` instead of `Result>>` — a missing key yields `Err(Error::CacheMiss)`, which is matched explicitly in `cache_check()` and converted to `CacheCheckResult::Miss`. Other errors propagate as `Passthrough`. + +3. **Wire format deserialization wrong in send_cached_response** - Problem: PostgreSQL wire message structure is `[1B code][4B length]` where length includes the 4B itself. I calculated `offset + 5 + msg_len` (treating length as payload-only), causing incorrect byte slicing. Fix: Corrected to `offset + 1 + msg_len`, then replaced magic numbers with named constants `HEADER_CODE_LEN`, `HEADER_LEN_SIZE`, `HEADER_TOTAL`. + +4. **Route incorrectly reports read-only as write when parser is disabled** - Problem: `query_parser_bypass()` conservatively returns `Route::write()` for all SQL when the query parser is disabled. Since pgdog doesn't enable the parser by default for simple queries, `route.is_read()` was false for `SELECT 1`. Fix: When any database has `cache.enabled = true`, the query parser level is auto-upgraded to `On` in the cluster config. The `|| self.cache_enabled()` check in `cluster.rs:475` forces the parser on. Cache also emits a startup warning if parser is `Off` or `SessionControl`. The old `is_likely_read()` string-prefix heuristic has been removed entirely. + +5. **DB cache config defaults** - Observation: `Cache.policy` defaults to `CachePolicy::NoCache`. Even with `enabled = true`, caching is skipped unless policy is explicitly set. User action taken: Add `policy = "cache"` to pgdog.toml. + +6. **Query parser auto-upgrade for caching** — When caching is enabled and parser is `Auto`/`Off`/`SessionControl`, the parser is forced to `On` via `|| self.cache_enabled()` check in `cluster.rs`. A startup warning is emitted in `core.rs` if parser remains incompatible. + +7. **Decoupled cache policy extraction** — Cache directives extracted via standalone regex in `cache/policy.rs`, works regardless of parser state. Supports `/* pgdog_cache: ... */` format with optional `ttl=` parameter. Unified with sharding hints via `comment()` function in `comment.rs`. + +8. **Error handling / Reconnection** — Automatic reconnection with background task, CAS-guarded single reconnect, 2s operation timeout on all Redis calls, PING-based connection verification. + +9. **Cache key collision across databases sharing one Redis** — Database name and query string (with all SQL comments stripped) are combined via a single XXH3 hash call, producing deterministic, collision-resistant per-database keys even on shared Redis. Different literal values in queries produce different cache keys. Because all comments are stripped before hashing, the cache key is identical whether the cache directive arrives via a SQL comment or a connection parameter. + +10. **Wire format serialization/deserialization** — PostgreSQL wire messages stored as raw bytes. Correct byte slice calculation expressed via named constants (`HEADER_CODE_LEN = 1`, `HEADER_LEN_SIZE = 4`, `HEADER_TOTAL = 5`). Deserialization extracted into `deserialize_cached()` with inline comments explaining each boundary check. + +11. **Do not cache error responses**. + +12. **Setting pgdog.cache via connection url doesn't work** — now works. + +13. **Moved all cache-related structs from QueryEngine to Client** — now all cache structs including redis client are creating for whole pgdog's lifetime. + +14. **Use built-in query comment hints** — Cache hints (`pgdog_cache:`) are now extracted alongside sharding hints (`pgdog_shard:`, `pgdog_sharding_key:`, `pgdog_role:`) via the unified `comment()` function in `comment.rs`. The `comment_cache` field is stored in `AstInner` and accessed during cache checking via `client_request.ast.comment_cache`. Policy resolution simplified: trait-based extractors replaced with free functions (`resolve()`, `get_cache_directive()`, `extract_parameter_directive()`). Comment hint (from AST) has priority over connection parameter `pgdog.cache`. `Cache` struct no longer needs `policy_dispatcher` field. Parameter format unified to `no_cache` (underscore, not dash). + +15. **Add cache config to .schema**. + +16. **Force-cache hint support** — `/* pgdog_cache: force_cache */` and `/* pgdog_cache: force_cache ttl=N */` directives always attempt to cache. Because all comments are stripped before hashing, `force_cache` and `cache` directives produce the same cache key as the bare query with no comment at all. + +17. **Cache HIT replays through the server-message pipeline** — Previously, cache hits sent responses directly to the stream, bypassing `process_server_message()`. Now `try_read_cache()` returns `Option>` and the caller (`handle()`) feeds each message through `process_server_message()` — giving correct stats accounting, transaction state updates from `ReadyForQuery`, and hook invocations on every cache hit. + +18. **CacheClient error types refined** — `get()` now returns `Result, Error>` (no more `Option`). `Error::CacheMiss(u64)` is a dedicated variant for key-not-found; `Error::RedisError` is now a struct variant carrying `cmd: &'static str`, `key: u64`, and the underlying error for richer diagnostics. `Error::ConnectionFailed` uses `&'static str` instead of `String` to avoid heap allocation on the hot path. + +19. **Config hotswap** — `Cache` singleton holds `Arc>>>`. `hotswap_if_needed()` runs at the start of every `try_read_cache` and `save_response_in_cache` call: read-locks and calls `has_config_changed()` on the active backend; if true, write-locks, re-checks (to guard against concurrent swaps), and rebuilds the storage. `has_config_changed()` is a no-argument method — each implementation reads the live config internally so callers never pass a config snapshot. + +20. **CacheClient rewritten as `RedisCacheStorage`** — Replaced `CacheClient` with `RedisCacheStorage` implementing the `CacheStorage` trait. Key improvements: background connect task is spawned in `new()` and `new()` waits up to `operation_timeout` ms for the connection to establish (if timeout expires, task continues in background); `get`/`set` check only one atomic flag (`reconnecting`) and return immediately if `true` instead of running `ensure_connected`; the `Option` field and the three-condition guard at the top of every operation are gone; `reconnect` is the single place that sets the flag and CAS-guards the reconnect spawn; reconnect spawns fire-and-forget without waiting. + +21. **Abstract storage backend** — `storage/mod.rs` defines the `CacheStorage` trait (`get`, `set`, `is_enabled`, `has_config_changed`) and the shared `Error` enum. `storage/redis.rs` is the Redis implementation. `Cache` holds `Box` behind a tokio `RwLock` so any backend (e.g. Memcached) can be plugged in by adding a sub-module under `storage/` and a variant to `CacheBackend`. `deserialize_cached()` remains backend-agnostic in `integration.rs`. + +22. **Nested backend config** — Backend-specific settings live in their own TOML subtable (`[general.cache.redis]`) rather than flat fields on `[general.cache]`. `RedisConfig` holds `url` and `cache_key_prefix`. When a new backend is added, it gets its own subtable (e.g. `[general.cache.memcached]`) without polluting the top-level cache section. `client.rs` renamed to `storage/redis.rs`. + +23. **Cache key must include Bind parameters for extended protocol** — For simple `Query` messages, parameter values are embedded in the SQL string, so the XXH3 hash of `database + query_text` is naturally unique per value. For extended protocol (Parse/Bind/Execute), the SQL contains `$1`/`$2` placeholders and the actual values arrive in the `Bind` message separately. The current hash ignores them, so `SELECT * FROM users WHERE id = $1` with `id = 1` and `id = 2` produce the same cache key — wrong rows are returned on the second call. Fix: hash `param.len` (the `i32` field, not the `len()` method which returns wire size) and `param.data` for each entry in `bind.params_raw()` into the hasher in `cache_check()` in `integration.rs`. This affects all production drivers that use extended protocol by default: psycopg3, asyncpg, JDBC, npgsql. Note: pgdog's built-in prepared statement cache (`PreparedStatements` / `GlobalCache`) is a proxy-level plan cache only — it deduplicates backend `Parse` round-trips. It does not cache result rows and is orthogonal to the Redis result cache. + +24. **Comments stripped from query before hashing** — All SQL block comments (`/* … */`, including nested) and line comments (`-- …`) are removed from the query string before computing the XXH3 cache key. This makes the cache key independent of whether the cache directive was supplied via a SQL comment or a connection parameter. + +25. **Zero-allocation query hashing** — `hash_query_without_comments` feeds the query directly into the XXH3 hasher without allocating a `String`. A `pending_space` / `emitted` state machine collapses whitespace runs and suppresses leading/trailing whitespace on-the-fly. Spaces inside SQL string literals (`'…'`) are never collapsed or removed. `strip_sql_comments` (which returned a `Cow`) has been removed; the old string-comparison unit tests have been rewritten as hash-equality assertions. + +26. **`has_config_changed` reads live config internally** — The method signature changed from `has_config_changed(&self, new_config: &CacheConfig) -> bool` to `has_config_changed(&self) -> bool`. Each implementation reads `config()` directly. For Redis, only `redis.url` is compared (not the full `RedisConfig`): `cache_key_prefix` and other runtime settings are read from live config on every call and do not require a storage rebuild. + +27. **Set redis query timeout from config** — `RedisConfig` gains `operation_timeout: NonZeroU64` (default `2` seconds). The `REDIS_OPERATION_TIMEOUT` compile-time constant is removed. All `tokio::time::timeout` calls in `storage/redis.rs` (init, GET, SET) read `config().config.general.cache.redis.operation_timeout` from live config on every invocation. `RedisCacheStorage` no longer stores the full `CacheConfig`; it stores only `url: String` for change-detection — all other settings (`cache_key_prefix`, `operation_timeout`) are fetched from live config on each call, so they take effect immediately without a storage rebuild. Schema updated with the new field. + +28. **Wait for initial Redis connection** — `Cache::new()` and `RedisCacheStorage::new()` are now async. `RedisCacheStorage::new()` spawns the connection task and waits up to `operation_timeout` ms for it to complete. If the timeout expires, the task continues in background without cancellation. This prevents the first query from immediately failing with `ConnectionFailed`. The wait applies to both initial startup (via `tokio::sync::OnceCell`) and config hotswap rebuilds. `cache()` function is now async; `reconnect()` remains fire-and-forget (no waiting). + +29. **Changed `has_config_changed` to `is_actual`** — config can be changed, but that doesn't mean, that cache storage should be rebuilt. And the function doesn't actually say if config has changed. It says if some specific parameters have changed. So `is_actual` is more correct. + +30. **Position-agnostic cache directive parsing** — `CacheDirective` refactored from an enum (`Cache { ttl }`, `ForceCache { ttl }`, `NoCache`) to a flat struct `{ mode: Option, ttl_seconds: Option }` where `CacheMode` is a sub-enum (`Cache`, `ForceCache`, `NoCache`). `CacheDirective::parse()` tokenizes space-separated arguments in any order: `cache`, `force_cache`, `no_cache`, `ttl=N`. Unknown tokens are silently ignored for forward compatibility (e.g., future `hash_key=foo` argument). Field-by-field resolution: comment directive `.or()` merges with parameter directive, then global config. This means `/* pgdog_cache: ttl=60 */` (mode absent) can combine with `SET pgdog.cache = 'cache'` (TTL absent) to produce `mode=Cache, ttl=60`. Module renamed from `policy.rs` to `directive.rs`. `CacheDecision` enum removed; resolution happens inline in `integration.rs` `cache_check()`. Regex in `comment.rs` simplified to capture the raw body after `pgdog_cache:` up to the next `pgdog_\w+:` directive or end-of-comment, then delegates parsing to `CacheDirective::parse()`. + +31. **Add hint for query hash key (`key=NAME`)** — `CacheDirective` gains an optional `key: Option` field. When `key=NAME` is present in a SQL comment or connection parameter, the automatic cache key computation (`database + normalized query + bind params`) is bypassed: the key is XXH3 of just the literal name string. An empty `key=` value is treated as absent. The `CacheMiss` struct field is renamed from `cache_key_hash` to `key` for clarity. Integration test `test_custom_key` verifies that a query with `key=separate_cache` populates an independent cache slot from the same query without the directive. + +32. **Split `integration.rs` into several modules** — `integration.rs` split into three focused files: `hashing.rs` (cache key computation), `wire.rs` (PostgreSQL wire message deserialization), and a slimmed-down `integration.rs` (public types `CacheMiss`/`CacheCheckResult` and `Cache::cache_check`/`cache_response`). Tests co-located in each file. + +--- + +## What's Left To Do + +1. **Redis disconnect/reconnect under heavy load** — The reconnection logic works, but timing edge cases under rapid disconnect/reconnect cycles still need stress-testing. + +2. **Add config flag for mandatory availability of cache storage** — query will fail with error if Redis (or another cache storage) is unavailable. + +# Tests + +## Running unit tests + +```sh +cargo nextest run -p pgdog frontend::cache +``` + +## Integration tests (PostgreSQL + Redis + pgdog required) + +```sh +bash integration/cache/run.sh +``` + +Or if you already have pgdog running on port 6432 with that config: +```sh +bash integration/cache/dev.sh +``` diff --git a/integration/cache/dev.sh b/integration/cache/dev.sh new file mode 100755 index 000000000..e424356eb --- /dev/null +++ b/integration/cache/dev.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source "${SCRIPT_DIR}"/../common.sh + +active_venv + +pushd "${SCRIPT_DIR}" +pytest -x test_cache.py +popd diff --git a/integration/cache/pgdog.toml b/integration/cache/pgdog.toml new file mode 100644 index 000000000..08c87d054 --- /dev/null +++ b/integration/cache/pgdog.toml @@ -0,0 +1,31 @@ +[general] +query_timeout = 2_000 +checkout_timeout = 2_000 +connect_timeout = 2_000 + +[general.cache] +enabled = true +policy = "cache" +ttl = 5 + +[general.cache.redis] +url = "redis://127.0.0.1:6379" + +# ------------------------------------------------------------------------------ +# ----- Admin ------------------------------------------------------------------ + +[admin] +password = "pgdog" + +# ------------------------------------------------------------------------------ +# ----- Database :: pgdog ------------------------------------------------------ + +[[databases]] +name = "pgdog" +host = "127.0.0.1" + +[[databases]] +name = "pgdog" +host = "127.0.0.1" +role = "replica" +read_only = true \ No newline at end of file diff --git a/integration/cache/run.sh b/integration/cache/run.sh new file mode 100755 index 000000000..a79a1d01d --- /dev/null +++ b/integration/cache/run.sh @@ -0,0 +1,15 @@ +#!/bin/bash +# Run cache integration tests with the dedicated cache pgdog config. +# PostgreSQL must be running on 127.0.0.1:5432 and Redis on 127.0.0.1:6379. +# Run integration/setup.sh first if you haven't already. +# Run integration/python/run.sh first to install python dependencies. +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source "${SCRIPT_DIR}"/../common.sh + +run_pgdog "integration/cache" +wait_for_pgdog + +bash "${SCRIPT_DIR}"/dev.sh + +stop_pgdog diff --git a/integration/cache/test_cache.py b/integration/cache/test_cache.py new file mode 100644 index 000000000..882114559 --- /dev/null +++ b/integration/cache/test_cache.py @@ -0,0 +1,314 @@ +import asyncio +import socket + +import asyncpg +import pytest +import pytest_asyncio + +from asyncpg.connection import Connection + +PGDOG_DSN = "postgres://pgdog:pgdog@127.0.0.1:6432/pgdog" +DIRECT_DSN = "postgres://pgdog:pgdog@127.0.0.1:5432/pgdog" + + +def redis_available() -> bool: + try: + with socket.create_connection(("127.0.0.1", 6379), timeout=1): + return True + except OSError: + return False + + +skip_if_no_redis = pytest.mark.skipif( + not redis_available(), reason="Redis required at 127.0.0.1:6379" +) + + +@pytest_asyncio.fixture(scope="function") +async def conn(): + c = await asyncpg.connect(PGDOG_DSN) + yield c + await c.close() + + +@pytest_asyncio.fixture(scope="function") +async def direct(): + c = await asyncpg.connect(DIRECT_DSN) + yield c + await c.close() + + +# --------------------------------------------------------------------------- + + +@skip_if_no_redis +@pytest.mark.asyncio +async def test_cache_hit(conn: Connection, direct: Connection): + """Verifies that a second identical SELECT is served from the cache instead of PostgreSQL.""" + await conn.execute( + "CREATE TABLE IF NOT EXISTS cache_test_hit (id BIGINT PRIMARY KEY, val TEXT)" + ) + await conn.execute("TRUNCATE cache_test_hit") + await conn.execute("INSERT INTO cache_test_hit VALUES (1, 'hello')") + + rows = await conn.fetch( + "/* pgdog_cache: no_cache */ SELECT id, val FROM cache_test_hit WHERE id = 1" + ) + assert len(rows) == 1, "row must exist in PG before cache warm-up" + + # Warm up the cache with a regular (cacheable) SELECT. + first = await conn.fetch("SELECT id, val FROM cache_test_hit WHERE id = 1") + assert len(first) == 1, "first SELECT must return the row" + + # Delete the row directly in Postgres, bypassing pgdog so the cache is not invalidated. + await direct.execute("DELETE FROM cache_test_hit WHERE id = 1") + + gone = await conn.fetch( + "/* pgdog_cache: no_cache */ SELECT id, val FROM cache_test_hit WHERE id = 1" + ) + assert len(gone) == 0, "row must be gone in PG after direct delete" + + cached = await conn.fetch("SELECT id, val FROM cache_test_hit WHERE id = 1") + assert len(cached) == 1, "cached SELECT must still return the row from the cache" + assert cached[0]["val"] == "hello" + + await conn.execute("DROP TABLE IF EXISTS cache_test_hit") + + +@skip_if_no_redis +@pytest.mark.asyncio +async def test_cache_bypassed_in_transaction(conn): + """Verifies that queries inside an explicit transaction are never served from the cache.""" + await conn.execute( + "CREATE TABLE IF NOT EXISTS cache_test_txn (id BIGINT PRIMARY KEY, val TEXT)" + ) + await conn.execute("TRUNCATE cache_test_txn") + await conn.execute("INSERT INTO cache_test_txn VALUES (1, 'original')") + + # Warm up the cache. + await conn.fetch("SELECT id, val FROM cache_test_txn WHERE id = 1") + + async with conn.transaction(): + await conn.execute("UPDATE cache_test_txn SET val = 'updated' WHERE id = 1") + in_tx = await conn.fetch("SELECT id, val FROM cache_test_txn WHERE id = 1") + assert ( + in_tx[0]["val"] == "updated" + ), "SELECT inside a transaction must see its own write, not the cached one" + + await conn.execute("DROP TABLE IF EXISTS cache_test_txn") + + +@skip_if_no_redis +@pytest.mark.asyncio +async def test_cache_ttl_expiry(conn: Connection, direct: Connection): + """Verifies that the cached results expire after the configured TTL.""" + await conn.execute( + "CREATE TABLE IF NOT EXISTS cache_test_ttl (id BIGINT PRIMARY KEY, val TEXT)" + ) + await conn.execute("TRUNCATE cache_test_ttl") + await conn.execute("INSERT INTO cache_test_ttl VALUES (1, 'original')") + + # Warm up the cache with ttl=1. + await conn.fetch( + "/* pgdog_cache: cache ttl=1 */ SELECT id, val FROM cache_test_ttl WHERE id = 1" + ) + + # Remove the row directly so the cache is stale. + await direct.execute("DELETE FROM cache_test_ttl WHERE id = 1") + + # Wait for the cache entry to expire. + await asyncio.sleep(2) + + rows = await conn.fetch( + "/* pgdog_cache: cache ttl=1 */ SELECT id, val FROM cache_test_ttl WHERE id = 1" + ) + assert len(rows) == 0, "after TTL expiry the cached row must no longer be returned" + + await conn.execute("DROP TABLE IF EXISTS cache_test_ttl") + + +@skip_if_no_redis +@pytest.mark.asyncio +async def test_extended_protocol_different_params_have_different_cache_keys( + conn: Connection, direct: Connection +): + """Verifies that the extended protocol uses bind parameter values in the cache key.""" + await conn.execute( + "CREATE TABLE IF NOT EXISTS cache_test_ext (id BIGINT PRIMARY KEY, val TEXT)" + ) + await conn.execute("TRUNCATE cache_test_ext") + await conn.execute("INSERT INTO cache_test_ext VALUES (1, 'one'), (2, 'two')") + + # Warm up the cache for id=1 and id=2. + r1 = await conn.fetch("SELECT id, val FROM cache_test_ext WHERE id = $1", 1) + assert len(r1) == 1 and r1[0]["val"] == "one" + + r2 = await conn.fetch("SELECT id, val FROM cache_test_ext WHERE id = $1", 2) + assert len(r2) == 1 and r2[0]["val"] == "two" + + # Delete both rows directly in PG so that any result must come from the cache. + await direct.execute("DELETE FROM cache_test_ext") + + cached1 = await conn.fetch("SELECT id, val FROM cache_test_ext WHERE id = $1", 1) + assert len(cached1) == 1, "id=1 entry must be served from cache" + assert cached1[0]["val"] == "one" + + cached2 = await conn.fetch("SELECT id, val FROM cache_test_ext WHERE id = $1", 2) + assert len(cached2) == 1, "id=2 entry must be served from cache" + assert cached2[0]["val"] == "two" + + await conn.execute("DROP TABLE IF EXISTS cache_test_ext") + + +@skip_if_no_redis +@pytest.mark.asyncio +async def test_force_cache(conn: Connection, direct: Connection): + """Verifies that `/* pgdog_cache: force_cache */` updates the cache.""" + await conn.execute( + "CREATE TABLE IF NOT EXISTS cache_test_force (id BIGINT PRIMARY KEY, val TEXT)" + ) + await conn.execute("TRUNCATE cache_test_force") + await conn.execute("INSERT INTO cache_test_force VALUES (1, 'not_forced')") + + # Warm up the cache. + r1 = await conn.fetch( + "/* pgdog_cache: cache */ SELECT id, val FROM cache_test_force WHERE id = 1" + ) + assert len(r1) == 1 and r1[0]["val"] == "not_forced" + + # Update directly in PG so the cache is stale. + await direct.execute("UPDATE cache_test_force SET val = 'forced' WHERE id = 1") + + # force_cache must re-fetch from PG and overwrite the cached entry. + r2 = await conn.fetch( + "/* pgdog_cache: force_cache */ SELECT id, val FROM cache_test_force WHERE id = 1" + ) + assert len(r2) == 1 and r2[0]["val"] == "forced" + + # Subsequent plain SELECT must now return the updated cached value. + cached = await conn.fetch("SELECT id, val FROM cache_test_force WHERE id = 1") + assert len(cached) == 1 and cached[0]["val"] == "forced" + + await conn.execute("DROP TABLE IF EXISTS cache_test_force") + + +@skip_if_no_redis +@pytest.mark.asyncio +async def test_no_cache_hint_does_not_warm_cache(conn: Connection, direct: Connection): + """Verifies that `/* pgdog_cache: no_cache */` prevents the response from being stored in cache.""" + await conn.execute( + "CREATE TABLE IF NOT EXISTS cache_test_no_warm (id BIGINT PRIMARY KEY, val TEXT)" + ) + await conn.execute("TRUNCATE cache_test_no_warm") + await conn.execute("INSERT INTO cache_test_no_warm VALUES (1, 'original')") + + # Fetch with no_cache — must NOT warm up the cache. + r = await conn.fetch( + "/* pgdog_cache: no_cache */ SELECT id, val FROM cache_test_no_warm WHERE id = 1" + ) + assert len(r) == 1 + + await direct.execute("DELETE FROM cache_test_no_warm WHERE id = 1") + + after = await conn.fetch("SELECT id, val FROM cache_test_no_warm WHERE id = 1") + assert ( + len(after) == 0 + ), "no_cache hint must not warm up the cache, so PG miss returns 0 rows" + + await conn.execute("DROP TABLE IF EXISTS cache_test_no_warm") + + +@skip_if_no_redis +@pytest.mark.asyncio +async def test_connection_option_no_cache_bypasses_cache( + conn: Connection, direct: Connection +): + """Verifies that passing `pgdog.cache=no_cache` in the connection DSN options + bypasses the cache for all queries on that connection.""" + await conn.execute( + "CREATE TABLE IF NOT EXISTS cache_test_param (id BIGINT PRIMARY KEY, val TEXT)" + ) + await conn.execute("TRUNCATE cache_test_param") + await conn.execute("INSERT INTO cache_test_param VALUES (1, 'cached_val')") + + # Warm up the cache via a normal connection, then delete row from db. + await conn.fetch("SELECT id, val FROM cache_test_param WHERE id = 1") + await direct.execute("DELETE FROM cache_test_param WHERE id = 1") + + # A connection with pgdog.cache=no_cache must bypass the cache and hit PG, + # returning 0 rows because the row was deleted. + no_cache_conn = await asyncpg.connect( + PGDOG_DSN, server_settings={"pgdog.cache": "no_cache"} + ) + try: + rows = await no_cache_conn.fetch( + "SELECT id, val FROM cache_test_param WHERE id = 1" + ) + assert ( + len(rows) == 0 + ), "connection-level no_cache must bypass the cache and see the deleted row" + finally: + await no_cache_conn.close() + + await conn.execute("DROP TABLE IF EXISTS cache_test_param") + + +@skip_if_no_redis +@pytest.mark.asyncio +async def test_error_response_not_cached(conn: Connection): + """Verifies that error responses are never stored in the cache.""" + await conn.execute("DROP TABLE IF EXISTS cache_test_error") + + # This SELECT will produce an error (table does not exist). + try: + await conn.fetch("SELECT id, val FROM cache_test_error WHERE id = 1") + assert False, "query on missing table must return an error" + except asyncpg.exceptions.UndefinedTableError: + pass + + # Now create the table and insert a row. + await conn.execute( + "CREATE TABLE cache_test_error (id BIGINT PRIMARY KEY, val TEXT)" + ) + await conn.execute("INSERT INTO cache_test_error VALUES (1, 'live')") + + # The same query must now hit PG (the previous error was not cached). + rows = await conn.fetch("SELECT id, val FROM cache_test_error WHERE id = 1") + assert len(rows) == 1, "error must not be cached; must return live row" + assert rows[0]["val"] == "live" + + await conn.execute("DROP TABLE IF EXISTS cache_test_error") + + +@skip_if_no_redis +@pytest.mark.asyncio +async def test_custom_key(conn: Connection, direct: Connection): + """Verifies that a custom key= directive creates an independent cache entry, + so queries with and without the directive do not share a cache slot.""" + await conn.execute( + "CREATE TABLE IF NOT EXISTS cache_test_custom_key (id BIGINT PRIMARY KEY, val TEXT)" + ) + await conn.execute("TRUNCATE cache_test_custom_key") + await conn.execute("INSERT INTO cache_test_custom_key VALUES (1, 'value')") + + r = await conn.fetch("SELECT id, val FROM cache_test_custom_key WHERE id = 1") + assert len(r) == 1 + assert r[0]["val"] == "value" + + await direct.execute( + "UPDATE cache_test_custom_key SET val = 'new_value' WHERE id = 1" + ) + + # Must get new cached value + r = await conn.fetch( + "/* pgdog_cache: key=separate_cache */ SELECT id, val FROM cache_test_custom_key WHERE id = 1" + ) + assert len(r) == 1 + assert r[0]["val"] == "new_value" + + # Must get old cached value + r = await conn.fetch("SELECT id, val FROM cache_test_custom_key WHERE id = 1") + assert len(r) == 1 + assert r[0]["val"] == "value" + + await conn.execute("DROP TABLE IF EXISTS cache_test_custom_key") diff --git a/integration/cache/users.toml b/integration/cache/users.toml new file mode 100644 index 000000000..77fa26a15 --- /dev/null +++ b/integration/cache/users.toml @@ -0,0 +1,4 @@ +[[users]] +name = "pgdog" +database = "pgdog" +password = "pgdog" \ No newline at end of file diff --git a/pgdog-config/src/cache.rs b/pgdog-config/src/cache.rs new file mode 100644 index 000000000..91da7aa70 --- /dev/null +++ b/pgdog-config/src/cache.rs @@ -0,0 +1,175 @@ +use std::num::NonZeroU64; + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Cache policy. +#[derive( + Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Copy, JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum CachePolicy { + /// Never cache queries for this database. + #[default] + NoCache, + /// Always cache read queries. + Cache, +} + +impl std::str::FromStr for CachePolicy { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "no_cache" => Ok(Self::NoCache), + "cache" => Ok(Self::Cache), + _ => Err(format!("Invalid cache policy: {}", s)), + } + } +} + +impl std::fmt::Display for CachePolicy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let display = match self { + Self::NoCache => "no_cache", + Self::Cache => "cache", + }; + write!(f, "{}", display) + } +} + +/// Cache storage backend discriminator. +#[derive( + Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Copy, JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum CacheBackend { + /// Redis backend (default). + #[default] + Redis, +} + +/// Redis-specific cache backend configuration. +/// +/// Corresponds to the `[general.cache.redis]` TOML section. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct RedisConfig { + /// Redis connection URL. + /// + /// _Default:_ `redis://localhost:6379` + #[serde(default = "RedisConfig::url")] + pub url: String, + + /// Key prefix prepended to every cache key stored in Redis. + /// + /// _Default:_ `pgdog:` + #[serde(default = "RedisConfig::cache_key_prefix")] + pub cache_key_prefix: String, + + /// Timeout in milliseconds for individual Redis operations (GET/SET/ping). + /// + /// _Default:_ `2000` + #[serde(default = "RedisConfig::operation_timeout")] + pub operation_timeout: NonZeroU64, +} + +impl Default for RedisConfig { + fn default() -> Self { + Self { + url: Self::url(), + cache_key_prefix: Self::cache_key_prefix(), + operation_timeout: Self::operation_timeout(), + } + } +} + +impl RedisConfig { + fn url() -> String { + "redis://localhost:6379".to_string() + } + + fn cache_key_prefix() -> String { + "pgdog:".to_string() + } + + fn operation_timeout() -> NonZeroU64 { + NonZeroU64::new(2000).expect("2000 is non-zero") + } +} + +/// Cache configuration. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct Cache { + /// Whether to enable caching. + /// + /// _Default:_ `false` + #[serde(default = "Cache::enabled")] + pub enabled: bool, + + /// Cache policy: `no_cache` or `cache`. + /// + /// _Default:_ `no_cache` + #[serde(default = "Cache::policy")] + pub policy: CachePolicy, + + /// Default TTL in seconds for cached queries. + /// + /// _Default:_ `300` + #[serde(default = "Cache::ttl")] + pub ttl: u64, + + /// Which storage backend to use. + /// + /// _Default:_ `redis` + #[serde(default = "Cache::backend")] + pub backend: CacheBackend, + + /// Redis backend configuration. + /// + /// Only read when `backend = "redis"`. + #[serde(default)] + pub redis: RedisConfig, + + /// Maximum result size in bytes to cache (0 = unlimited). + /// + /// _Default:_ `0` + #[serde(default = "Cache::max_result_size")] + pub max_result_size: usize, +} + +impl Default for Cache { + fn default() -> Self { + Self { + enabled: Self::enabled(), + policy: Self::policy(), + ttl: Self::ttl(), + backend: Self::backend(), + redis: RedisConfig::default(), + max_result_size: Self::max_result_size(), + } + } +} + +impl Cache { + fn enabled() -> bool { + false + } + + fn policy() -> CachePolicy { + CachePolicy::default() + } + + fn ttl() -> u64 { + 300 + } + + fn backend() -> CacheBackend { + CacheBackend::default() + } + + fn max_result_size() -> usize { + 0 + } +} diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index 856518a89..4fcc8242f 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -564,6 +564,15 @@ impl Config { r#""pg_query_raw" parser engine requires a large thread stack, setting it to 32MiB for each Tokio worker"# ); } + + if self.general.cache.enabled + && matches!( + self.general.query_parser, + QueryParserLevel::Off | QueryParserLevel::SessionControl + ) + { + warn!("cache requires enabled query parser but it's disabled or session controlled"); + } } /// Multi-tenancy is enabled. diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index aa0f636f6..59d76aafa 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; +use crate::cache::Cache; use crate::pooling::ConnectionRecovery; use crate::UniqueIdFunction; use crate::{ @@ -643,6 +644,10 @@ pub struct General { /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#cutover_save_config #[serde(default)] pub cutover_save_config: bool, + + /// Redis cache configuration for this database. + #[serde(default)] + pub cache: Cache, } impl Default for General { @@ -729,6 +734,7 @@ impl Default for General { cutover_timeout_action: Self::cutover_timeout_action(), cutover_save_config: bool::default(), unique_id_function: Self::unique_id_function(), + cache: Cache::default(), } } } diff --git a/pgdog-config/src/lib.rs b/pgdog-config/src/lib.rs index 1a106a295..fc8962019 100644 --- a/pgdog-config/src/lib.rs +++ b/pgdog-config/src/lib.rs @@ -1,5 +1,6 @@ // Submodules pub mod auth; +pub mod cache; pub mod core; pub mod data_types; pub mod database; @@ -18,6 +19,7 @@ pub mod users; pub mod util; pub use auth::{AuthType, PassthroughAuth}; +pub use cache::{Cache, CacheBackend, CachePolicy, RedisConfig as CacheRedisConfig}; pub use core::{Config, ConfigAndUsers}; pub use data_types::*; pub use database::{ diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 7c62c28c4..461daec45 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -69,6 +69,8 @@ pgdog-config = { path = "../pgdog-config" } pgdog-vector = { path = "../pgdog-vector" } pgdog-stats = { path = "../pgdog-stats" } pgdog-postgres-types = { path = "../pgdog-postgres-types"} +fred = { version = "9", features = ["enable-rustls"] } +xxhash-rust = { version = "0.8", features = ["xxh3"]} [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 9dbd038d0..1dff017a5 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -81,6 +81,7 @@ pub struct Cluster { reload_schema_on_ddl: bool, load_schema: LoadSchema, resharding_parallel_copies: usize, + cache_enabled: bool, } /// Sharding configuration from the cluster. @@ -157,6 +158,7 @@ pub struct ClusterConfig<'a> { pub reload_schema_on_ddl: bool, pub load_schema: LoadSchema, pub resharding_parallel_copies: usize, + pub cache_enabled: bool, } impl<'a> ClusterConfig<'a> { @@ -210,6 +212,7 @@ impl<'a> ClusterConfig<'a> { reload_schema_on_ddl: general.reload_schema_on_ddl, load_schema: general.load_schema, resharding_parallel_copies: general.resharding_parallel_copies, + cache_enabled: general.cache.enabled, } } } @@ -247,6 +250,7 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + cache_enabled, } = config; let identifier = Arc::new(DatabaseUser { @@ -296,6 +300,7 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + cache_enabled, } } @@ -470,6 +475,7 @@ impl Cluster { || self.dry_run() || self.prepared_statements() == &PreparedStatements::Full || self.pub_sub_enabled() + || self.cache_enabled() || RegexParser::use_parser(request) } } @@ -545,6 +551,11 @@ impl Cluster { self.resharding_parallel_copies } + /// Redis cache enabled. + pub fn cache_enabled(&self) -> bool { + self.cache_enabled + } + /// Launch the connection pools. pub(crate) fn launch(&self) { for shard in self.shards() { diff --git a/pgdog/src/config/cache.rs b/pgdog/src/config/cache.rs new file mode 100644 index 000000000..ece03acb6 --- /dev/null +++ b/pgdog/src/config/cache.rs @@ -0,0 +1 @@ +pub use pgdog_config::cache::*; diff --git a/pgdog/src/config/mod.rs b/pgdog/src/config/mod.rs index 835a0f10e..6ecd3785d 100644 --- a/pgdog/src/config/mod.rs +++ b/pgdog/src/config/mod.rs @@ -1,6 +1,7 @@ //! Configuration. // Submodules +pub mod cache; pub mod convert; pub mod core; pub mod database; @@ -15,6 +16,7 @@ pub mod rewrite; pub mod sharding; pub mod users; +pub use cache::*; pub use core::{Config, ConfigAndUsers}; pub use database::{Database, Role}; pub use error::Error; diff --git a/pgdog/src/frontend/cache/context.rs b/pgdog/src/frontend/cache/context.rs new file mode 100644 index 000000000..aeeab7613 --- /dev/null +++ b/pgdog/src/frontend/cache/context.rs @@ -0,0 +1,31 @@ +use crate::{ + frontend::cache::integration::CacheMiss, + net::{messages::Protocol, Message}, +}; + +/// Cache context to use in QueryEngineContext. +#[derive(Default)] +pub struct CacheContext { + pub cache_miss: Option, + pub response_buffer: Vec, + pub had_error: bool, +} + +impl CacheContext { + /// Capture a response message for caching. + pub fn capture_response(&mut self, message: Message) { + if self.cache_miss.is_some() { + if message.code() == 'E' { + self.had_error = true; + } + self.response_buffer.push(message); + } + } + + /// Reset the cache context for a new query. + pub fn reset(&mut self) { + self.cache_miss = None; + self.response_buffer.clear(); + self.had_error = false; + } +} diff --git a/pgdog/src/frontend/cache/directive.rs b/pgdog/src/frontend/cache/directive.rs new file mode 100644 index 000000000..5e1e307a7 --- /dev/null +++ b/pgdog/src/frontend/cache/directive.rs @@ -0,0 +1,254 @@ +use crate::frontend::ClientRequest; +use crate::net::parameter::ParameterValue; +use crate::net::Parameters; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum CacheMode { + Cache, + ForceCache, + #[default] + NoCache, +} + +/// A fully-parsed cache directive. +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct CacheDirective { + pub mode: Option, + pub ttl_seconds: Option, + pub key: Option, +} + +const KEY: &str = "pgdog.cache"; + +impl CacheDirective { + /// Parse a space-separated token string like `"force_cache ttl=60"`. + pub fn parse(s: &str) -> Self { + let mut directive = CacheDirective::default(); + + for token in s.split_whitespace() { + if let Some((key, val)) = token.split_once('=') { + match key { + "ttl" => { + directive.ttl_seconds = val.parse::().ok(); + } + "key" => { + directive.key = Some(val.to_string()).filter(|v| !v.is_empty()); + } + _ => {} + } + } else { + match token { + "no_cache" => { + directive.mode = Some(CacheMode::NoCache); + } + "force_cache" => { + directive.mode = Some(CacheMode::ForceCache); + } + "cache" => { + directive.mode = Some(CacheMode::Cache); + } + _ => {} + } + } + } + + directive + } + + fn from_params(params: &Parameters) -> Self { + match params.get(KEY) { + Some(ParameterValue::String(v)) => CacheDirective::parse(v.as_str().trim()), + _ => CacheDirective::default(), + } + } + + pub fn or(self, fallback: Self) -> Self { + let Self { + mode, + ttl_seconds, + key, + } = self; + Self { + mode: mode.or(fallback.mode), + ttl_seconds: ttl_seconds.or(fallback.ttl_seconds), + key: key.or(fallback.key), + } + } +} + +pub fn resolve(client_request: &ClientRequest, params: &Parameters) -> CacheDirective { + let comment_directive = client_request + .ast + .as_ref() + .and_then(|ast| ast.comment_cache.clone()) + .unwrap_or_default(); + let param_directive = CacheDirective::from_params(params); + + comment_directive.or(param_directive) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse(s: &str) -> CacheDirective { + CacheDirective::parse(s) + } + + fn directive(mode: CacheMode, ttl: Option) -> CacheDirective { + CacheDirective { + mode: Some(mode), + ttl_seconds: ttl, + key: None, + } + } + + #[test] + fn no_cache_directive() { + assert_eq!(parse("no_cache"), directive(CacheMode::NoCache, None)); + } + + #[test] + fn cache_directive_no_ttl() { + assert_eq!(parse("cache"), directive(CacheMode::Cache, None)); + } + + #[test] + fn cache_directive_with_ttl() { + assert_eq!(parse("cache ttl=60"), directive(CacheMode::Cache, Some(60))); + } + + #[test] + fn cache_directive_with_large_ttl() { + assert_eq!( + parse("cache ttl=86400"), + directive(CacheMode::Cache, Some(86400)) + ); + } + + #[test] + fn force_cache_no_ttl() { + assert_eq!(parse("force_cache"), directive(CacheMode::ForceCache, None)); + } + + #[test] + fn force_cache_with_ttl() { + assert_eq!( + parse("force_cache ttl=120"), + directive(CacheMode::ForceCache, Some(120)) + ); + } + + #[test] + fn garbage_input_returns_all_none() { + assert_eq!(parse("garbage"), CacheDirective::default()); + } + + #[test] + fn invalid_ttl_does_not_set_ttl() { + assert_eq!(parse("cache ttl=abc"), directive(CacheMode::Cache, None)); + } + + #[test] + fn force_cache_invalid_ttl_does_not_set_ttl() { + assert_eq!( + parse("force_cache ttl=bad"), + directive(CacheMode::ForceCache, None) + ); + } + + #[test] + fn empty_ttl_does_not_set_ttl() { + assert_eq!(parse("cache ttl="), directive(CacheMode::Cache, None)); + } + + #[test] + fn ttl_zero_is_valid() { + assert_eq!(parse("cache ttl=0"), directive(CacheMode::Cache, Some(0))); + } + + #[test] + fn key_directive() { + assert_eq!( + parse("cache key=big_query"), + CacheDirective { + mode: Some(CacheMode::Cache), + key: Some("big_query".to_string()), + ..Default::default() + } + ) + } + + #[test] + fn key_only() { + assert_eq!( + parse("key=big_query2"), + CacheDirective { + key: Some("big_query2".to_string()), + ..Default::default() + } + ) + } + + #[test] + fn empty_key() { + assert_eq!( + parse("key= force_cache"), + CacheDirective { + mode: Some(CacheMode::ForceCache), + key: None, + ..Default::default() + } + ) + } + + #[test] + fn missing_key_returns_all_none() { + let params = Parameters::default(); + assert_eq!( + CacheDirective::from_params(¶ms), + CacheDirective::default() + ); + } + + #[test] + fn whitespace_trimmed_around_value() { + let mut params = Parameters::default(); + params.insert("pgdog.cache", " no_cache "); + assert_eq!( + CacheDirective::from_params(¶ms), + directive(CacheMode::NoCache, None) + ); + } + + #[test] + fn ttl_before_mode() { + assert_eq!( + parse("ttl=17 force_cache"), + directive(CacheMode::ForceCache, Some(17)) + ); + } + + #[test] + fn ttl_only_no_mode() { + assert_eq!( + parse("ttl=42"), + CacheDirective { + ttl_seconds: Some(42), + ..Default::default() + } + ); + } + + #[test] + fn mode_ttl_extra_unknown_token() { + assert_eq!( + parse("cache key=super ttl=5 foo=bar"), + CacheDirective { + mode: Some(CacheMode::Cache), + ttl_seconds: Some(5), + key: Some("super".to_string()) + } + ); + } +} diff --git a/pgdog/src/frontend/cache/hashing.rs b/pgdog/src/frontend/cache/hashing.rs new file mode 100644 index 000000000..7e76d464f --- /dev/null +++ b/pgdog/src/frontend/cache/hashing.rs @@ -0,0 +1,276 @@ +use std::hash::{Hash, Hasher}; + +use crate::net::bind::Bind; + +/// Feed `query` into `hasher`, skipping SQL comments and normalising surrounding +/// whitespace — without allocating a `String`. +/// +/// Rules applied on-the-fly: +/// - Block comments (`/* … */`, including PostgreSQL nested variants) are treated +/// as a potential token separator: a single space is emitted if needed. +/// - Line comments (`-- …\n`) are treated the same way; the trailing newline +/// becomes the pending separator. +/// - Runs of ASCII whitespace outside string literals are collapsed to a single +/// space and leading/trailing whitespace is suppressed. +/// - String literals (`'…'`, with `''` escapes) are passed through byte-for-byte +/// so that spaces inside them are never removed. +fn hash_query_without_comments(query: &str, hasher: &mut H) { + // pending_space: we *want* to emit a space before the next real token but + // haven't done so yet (avoids leading space and trailing space). + let mut pending_space = false; + // emitted: have we written at least one real byte yet? + let mut emitted = false; + + let mut chars = query.chars().peekable(); + + while let Some(c) = chars.next() { + match c { + // ---- block comment (supports nested) -------------------------------- + '/' if chars.peek() == Some(&'*') => { + chars.next(); // consume '*' + let mut depth = 1u32; + while depth > 0 { + match chars.next() { + Some('/') if chars.peek() == Some(&'*') => { + chars.next(); + depth += 1; + } + Some('*') if chars.peek() == Some(&'/') => { + chars.next(); + depth -= 1; + } + None => break, // malformed: treat as end + _ => {} + } + } + // The comment may stand between two tokens; record the need for + // a separator but don't emit yet. + if emitted { + pending_space = true; + } + } + + // ---- line comment --------------------------------------------------- + '-' if chars.peek() == Some(&'-') => { + for ch in chars.by_ref() { + if ch == '\n' { + break; + } + } + if emitted { + pending_space = true; + } + } + + // ---- string literal — pass through verbatim ------------------------- + '\'' => { + // Flush any pending space before the opening quote. + if pending_space && emitted { + ' '.hash(hasher); + pending_space = false; + } + c.hash(hasher); + emitted = true; + while let Some(ch) = chars.next() { + ch.hash(hasher); + if ch == '\'' { + // Standard SQL escaped quote: two consecutive single-quotes. + if chars.peek() == Some(&'\'') { + chars.next().unwrap().hash(hasher); + } else { + break; + } + } + } + } + + // ---- whitespace — collapse to a single pending space ---------------- + c if c.is_ascii_whitespace() => { + if emitted { + pending_space = true; + } + } + + // ---- regular character ---------------------------------------------- + c => { + if pending_space && emitted { + ' '.hash(hasher); + pending_space = false; + } + c.hash(hasher); + emitted = true; + } + } + } +} + +/// Compute the XXH3 cache key hash for a query. +/// +/// SQL comments are skipped and surrounding whitespace is normalised on-the-fly +/// while feeding bytes directly into the hasher — no `String` allocation. +pub fn compute_cache_key_hash(database: &str, query: &str, bind: Option<&Bind>) -> u64 { + let mut hasher = xxhash_rust::xxh3::Xxh3Default::new(); + database.hash(&mut hasher); + hash_query_without_comments(query, &mut hasher); + if let Some(bind) = bind { + for param in bind.params_raw() { + param.len.hash(&mut hasher); + param.data.hash(&mut hasher); + } + } + hasher.finish() +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Helper: hash just the query part (no bind params) with a fixed database. + fn qhash(query: &str) -> u64 { + compute_cache_key_hash("db", query, None) + } + + #[test] + fn hash_no_comments_same_as_plain() { + // A query without any comments should hash identically to itself. + assert_eq!(qhash("SELECT 1"), qhash("SELECT 1")); + } + + #[test] + fn hash_block_comment_stripped() { + // Block comment containing the directive must be invisible to the hash. + assert_eq!( + qhash("/* pgdog_cache: cache */ SELECT 1"), + qhash("SELECT 1"), + ); + } + + #[test] + fn hash_line_comment_stripped() { + assert_eq!(qhash("-- pgdog_cache: cache\nSELECT 1"), qhash("SELECT 1"),); + } + + #[test] + fn hash_nested_block_comments_stripped() { + assert_eq!( + qhash("/* outer /* inner */ still outer */ SELECT 2"), + qhash("SELECT 2"), + ); + } + + #[test] + fn hash_multiple_block_comments_stripped() { + assert_eq!(qhash("/* a */ SELECT /* b */ 1"), qhash("SELECT 1"),); + } + + #[test] + fn hash_string_literal_contents_preserved() { + // `/* ... */` inside a string literal must NOT be treated as a comment. + // The two queries are different so their hashes must differ. + let h1 = qhash("SELECT '/* not a comment */' FROM t"); + let h2 = qhash("SELECT ' ' FROM t"); + assert_ne!(h1, h2); + } + + #[test] + fn hash_escaped_quotes_in_literal_preserved() { + // `'it''s fine'` — the embedded `''` must survive; the queries differ. + let h1 = qhash("SELECT 'it''s fine' FROM t"); + let h2 = qhash("SELECT 'its fine' FROM t"); + assert_ne!(h1, h2); + } + + #[test] + fn hash_whitespace_inside_string_preserved() { + // Spaces inside string literals must not be collapsed/removed. + let h1 = qhash("SELECT 'hello world' FROM t"); + let h2 = qhash("SELECT 'helloworld' FROM t"); + assert_ne!(h1, h2); + } + + #[test] + fn hash_inline_comment_no_space_between_tokens() { + // `SELECT/*c*/1` — comment sits directly between tokens; they must not + // be merged, so this must equal `SELECT 1` not `SELECT1`. + assert_eq!(qhash("SELECT/*c*/1"), qhash("SELECT 1")); + } + + // ------------------------------------------------------------------------- + // compute_cache_key_hash tests + // ------------------------------------------------------------------------- + + #[test] + fn hash_is_stable() { + let h1 = compute_cache_key_hash("mydb", "SELECT 1", None); + let h2 = compute_cache_key_hash("mydb", "SELECT 1", None); + assert_eq!(h1, h2); + } + + #[test] + fn hash_differs_by_database() { + let h1 = compute_cache_key_hash("db1", "SELECT 1", None); + let h2 = compute_cache_key_hash("db2", "SELECT 1", None); + assert_ne!(h1, h2); + } + + #[test] + fn hash_differs_by_query() { + let h1 = compute_cache_key_hash("db", "SELECT 1", None); + let h2 = compute_cache_key_hash("db", "SELECT 2", None); + assert_ne!(h1, h2); + } + + #[test] + fn hash_same_with_and_without_cache_comment() { + // A block comment containing the cache directive must be stripped so + // the hash is the same whether the directive was in a comment or a + // connection parameter. + let h_with_comment = + compute_cache_key_hash("db", "/* pgdog_cache: cache */ SELECT 1", None); + let h_without_comment = compute_cache_key_hash("db", "SELECT 1", None); + assert_eq!(h_with_comment, h_without_comment); + } + + #[test] + fn hash_same_for_force_cache_and_regular_comment() { + // force_cache and cache hints should produce the same hash (both are + // stripped before hashing, so the underlying query is identical). + let h_force = compute_cache_key_hash("db", "/* pgdog_cache: force_cache */ SELECT 1", None); + let h_cache = compute_cache_key_hash("db", "/* pgdog_cache: cache */ SELECT 1", None); + let h_plain = compute_cache_key_hash("db", "SELECT 1", None); + assert_eq!(h_force, h_cache); + assert_eq!(h_force, h_plain); + } + + #[test] + fn hash_same_for_line_comment_cache_directive() { + let h_with_line = compute_cache_key_hash("db", "-- pgdog_cache: cache\nSELECT 1", None); + let h_plain = compute_cache_key_hash("db", "SELECT 1", None); + assert_eq!(h_with_line, h_plain); + } + + #[test] + fn hash_differs_by_bind_params() { + use crate::net::messages::bind::{Bind, Parameter}; + use bytes::Bytes; + use pgdog_postgres_types::Format; + + let make_bind = |val: &'static [u8]| { + let mut b = Bind::default(); + b.push_param( + Parameter { + len: val.len() as i32, + data: Bytes::from_static(val), + }, + Format::Text, + ); + b + }; + + let b1 = make_bind(b"1"); + let b2 = make_bind(b"2"); + let h1 = compute_cache_key_hash("db", "SELECT $1", Some(&b1)); + let h2 = compute_cache_key_hash("db", "SELECT $1", Some(&b2)); + assert_ne!(h1, h2); + } +} diff --git a/pgdog/src/frontend/cache/integration.rs b/pgdog/src/frontend/cache/integration.rs new file mode 100644 index 000000000..0604d7952 --- /dev/null +++ b/pgdog/src/frontend/cache/integration.rs @@ -0,0 +1,139 @@ +use std::hash::{Hash, Hasher}; + +use crate::{ + config::{cache::CachePolicy, config}, + frontend::{ + cache::{ + directive::{self, CacheMode}, + hashing::compute_cache_key_hash, + storage::Error as CacheStorageError, + Cache, + }, + ClientRequest, + }, + net::{Message, Parameters, ToBytes}, +}; + +use tracing::warn; + +pub struct CacheMiss { + pub key: u64, + pub ttl: u64, +} + +pub enum CacheCheckResult { + Hit { cached: Vec }, + Miss(CacheMiss), + Passthrough, +} + +impl Cache { + pub(super) async fn cache_check( + &self, + in_transaction: bool, + client_request: &ClientRequest, + params: &Parameters, + ) -> Result { + if in_transaction || !client_request.is_executable() { + return Ok(CacheCheckResult::Passthrough); + } + + let route = match client_request.route.as_ref() { + Some(r) => r, + None => return Ok(CacheCheckResult::Passthrough), + }; + + // Detect read-only status via the AST parser's route classification. + // When caching is enabled, the query parser is auto-enabled. + let is_read = route.is_read(); + if !is_read { + return Ok(CacheCheckResult::Passthrough); + } + + let query = match client_request.query() { + Ok(Some(q)) => q, + _ => return Ok(CacheCheckResult::Passthrough), + }; + + let directive = directive::resolve(client_request, params); + let cache_config = &config().config.general.cache; + + let ttl = directive.ttl_seconds.unwrap_or(cache_config.ttl); + let mode = match directive.mode { + Some(mode) => mode, + None => match cache_config.policy { + CachePolicy::NoCache => CacheMode::NoCache, + CachePolicy::Cache => CacheMode::Cache, + }, + }; + + if mode == CacheMode::NoCache { + return Ok(CacheCheckResult::Passthrough); + } + + let key = match directive.key { + Some(key) => { + let mut hasher = xxhash_rust::xxh3::Xxh3Default::new(); + key.hash(&mut hasher); + hasher.finish() + } + None => { + let user = params.get_required("user")?; + let database = params.get_default("database", user); + let bind = client_request.parameters()?; + compute_cache_key_hash(database, query.query(), bind) + } + }; + + if mode == CacheMode::ForceCache { + return Ok(CacheCheckResult::Miss(CacheMiss { key, ttl })); + } + + let guard = self.storage.read().await; + let storage = match guard.as_ref() { + Some(storage) => storage, + None => return Ok(CacheCheckResult::Passthrough), + }; + match storage.get(key).await { + Ok(cached) => Ok(CacheCheckResult::Hit { cached }), + Err(CacheStorageError::CacheMiss(_)) => { + Ok(CacheCheckResult::Miss(CacheMiss { key, ttl })) + } + Err(e) => { + warn!("{}", e); + Ok(CacheCheckResult::Passthrough) + } + } + } + + pub(super) async fn cache_response(&self, key: u64, messages: Vec, ttl: u64) { + let guard = self.storage.read().await; + let storage = match guard.as_ref() { + Some(s) if s.is_enabled() => s, + _ => return, + }; + + if messages.is_empty() { + return; + } + + let mut buffer = Vec::new(); + for msg in &messages { + match msg.to_bytes() { + Ok(bytes) => buffer.extend_from_slice(&bytes), + Err(e) => { + warn!("Failed to serialize message for caching: {}", e); + return; + } + } + } + + if buffer.is_empty() { + return; + } + + if let Err(e) = storage.set(key, &buffer, ttl).await { + warn!("{}", e); + } + } +} diff --git a/pgdog/src/frontend/cache/mod.rs b/pgdog/src/frontend/cache/mod.rs new file mode 100644 index 000000000..a0119f556 --- /dev/null +++ b/pgdog/src/frontend/cache/mod.rs @@ -0,0 +1,141 @@ +pub mod context; +pub mod directive; +pub mod hashing; +pub mod integration; +pub mod storage; +pub mod wire; + +pub use context::CacheContext; +pub use directive::CacheDirective; +pub use integration::CacheCheckResult; +pub use storage::{CacheStorage, RedisCacheStorage}; + +use std::sync::Arc; +use tokio::sync::{OnceCell, RwLock}; +use tracing::debug; + +use crate::{ + config::config, + frontend::{ + cache::{integration::CacheMiss, storage::build_storage, wire::deserialize_cached}, + ClientRequest, + }, + net::{Message, Parameters}, +}; + +/// Wraps the active storage backend behind a tokio `RwLock` so it can be +/// hotswapped without restarting pgdog. +pub struct Cache { + storage: RwLock>>, +} + +impl std::fmt::Debug for Cache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Cache").field("storage", &"...").finish() + } +} + +static CACHE: OnceCell> = OnceCell::const_new(); + +pub async fn cache() -> Arc { + CACHE + .get_or_init(async || Arc::new(Cache::new().await)) + .await + .clone() +} + +impl Cache { + async fn new() -> Self { + let storage = build_storage().await; + Cache { + storage: RwLock::new(storage), + } + } + + /// Replace the storage backend if the config has changed (URL or backend type). + /// + /// Acquires the write lock only when a change is detected. + async fn hotswap_if_needed(&self) { + // Fast path: read-lock to check whether anything has changed. + { + let guard = self.storage.read().await; + let cfg = &config().config.general.cache; + let needs_swap = match guard.as_ref() { + Some(s) => s.is_actual(), + None => cfg.enabled, + }; + if !needs_swap { + return; + } + } + + // Slow path: write-lock, re-check and rebuild. + let mut guard = self.storage.write().await; + let cfg = &config().config.general.cache; + let needs_swap = match guard.as_ref() { + Some(s) => s.is_actual(), + None => cfg.enabled, + }; + + if needs_swap { + debug!("Cache storage config changed — rebuilding backend"); + *guard = build_storage().await; + } + } + + // ── public API ─────────────────────────────────────────────────────────── + + /// Check the cache for a query response. + /// + /// On HIT returns `Ok(Some(messages))` — the caller is responsible for + /// replaying these messages through the normal server-message pipeline. + /// + /// On MISS or PASSTHROUGH returns `Ok(None)` and updates `cache_context` + /// so that the response can later be captured and stored via + /// `save_response_in_cache`. + pub async fn try_read_cache( + &self, + cache_context: &mut CacheContext, + in_transaction: bool, + client_request: &ClientRequest, + params: &Parameters, + ) -> Result>, crate::frontend::Error> { + self.hotswap_if_needed().await; + + let cache_result = self + .cache_check(in_transaction, client_request, params) + .await?; + + match cache_result { + CacheCheckResult::Hit { cached } => { + debug!("Cache hit, serving from cache"); + let messages = deserialize_cached(cached); + cache_context.reset(); + Ok(Some(messages)) + } + CacheCheckResult::Miss(cache_miss) => { + debug!("Cache miss for key hash: {}", cache_miss.key); + cache_context.cache_miss = Some(cache_miss); + cache_context.response_buffer.clear(); + cache_context.had_error = false; + Ok(None) + } + CacheCheckResult::Passthrough => { + cache_context.reset(); + Ok(None) + } + } + } + + /// Finalize caching by storing the response in the active backend. + pub async fn save_response_in_cache(&self, cache_context: &mut CacheContext) { + self.hotswap_if_needed().await; + + if let Some(CacheMiss { key, ttl }) = cache_context.cache_miss.take() { + if !cache_context.had_error && !cache_context.response_buffer.is_empty() { + let messages = std::mem::take(&mut cache_context.response_buffer); + self.cache_response(key, messages, ttl).await; + } + } + } +} diff --git a/pgdog/src/frontend/cache/storage/mod.rs b/pgdog/src/frontend/cache/storage/mod.rs new file mode 100644 index 000000000..7053a7ba5 --- /dev/null +++ b/pgdog/src/frontend/cache/storage/mod.rs @@ -0,0 +1,58 @@ +pub mod redis; + +pub use redis::RedisCacheStorage; + +use async_trait::async_trait; + +use crate::config::{cache::CacheBackend, config}; + +/// Errors returned by cache storage backends. +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Redis {cmd} error for key {key}: {err}")] + RedisError { + cmd: &'static str, + key: u64, + err: fred::error::RedisError, + }, + #[error("Connection failed: {0}")] + ConnectionFailed(&'static str), + #[error("Cache miss for key {0}")] + CacheMiss(u64), +} + +/// Abstract cache storage backend. +/// +/// Implementations must be `Send + Sync` so they can be held behind +/// something like `Arc>` and shared across async tasks. +#[async_trait] +pub trait CacheStorage: Send + Sync { + /// Fetch cached bytes for `key`. Returns [`Error::CacheMiss`] when the + /// key is absent (not an error condition — used for control flow). + async fn get(&self, key: u64) -> Result, Error>; + + /// Store `value` under `key` with a `ttl` in seconds. + async fn set(&self, key: u64, value: &[u8], ttl: u64) -> Result<(), Error>; + + /// Returns `true` when the backend is configured and enabled. + fn is_enabled(&self) -> bool; + + /// Returns `true` if cache config has changed (used for hotswap detection). + /// + /// This method should check only those parameters that require a storage rebuild and + /// that are specific to the storage, e.g. `Config::backend` and storage's own settings. + fn is_actual(&self) -> bool; +} + +/// Construct the appropriate storage backend from the current config. +pub async fn build_storage() -> Option> { + let cfg = &config().config.general.cache; + if !cfg.enabled { + return None; + } + match cfg.backend { + CacheBackend::Redis => RedisCacheStorage::new(cfg) + .await + .map(|s| Box::new(s) as Box), + } +} diff --git a/pgdog/src/frontend/cache/storage/redis.rs b/pgdog/src/frontend/cache/storage/redis.rs new file mode 100644 index 000000000..73dc49e36 --- /dev/null +++ b/pgdog/src/frontend/cache/storage/redis.rs @@ -0,0 +1,260 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use fred::prelude::*; +use pgdog_config::CacheBackend; +use tracing::{debug, error, info}; + +use crate::config::{cache::Cache as CacheConfig, config}; + +use super::{CacheStorage, Error}; + +/// Max time between reconnection attempts +const MAX_REDIS_RECONNECTION_PERIOD: Duration = Duration::from_secs(5); + +/// Redis implementation of [`CacheStorage`]. +/// +/// Connection is established in a background task spawned from [`RedisCacheStorage::new`]. +/// All operations return immediately if the connection is not yet ready — `get` returns +/// [`Error::ConnectionFailed`] (triggering a cache-miss path) and `set` is silently dropped. +/// +/// At most one reconnect task runs at any time, enforced by a CAS on `reconnecting`. +pub struct RedisCacheStorage { + client: RedisClient, + /// Redis url (only for update tracking). + url: String, + /// Guards against spawning multiple concurrent reconnect tasks. + reconnecting: Arc, +} + +impl std::fmt::Debug for RedisCacheStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RedisCacheStorage") + .field("url", &self.url) + .field("reconnecting", &self.reconnecting.load(Ordering::Relaxed)) + .finish() + } +} + +impl RedisCacheStorage { + /// Build a new storage instance for `url` and immediately start a background + /// connection task. Returns `None` when the URL cannot be parsed. + pub async fn new(config: &CacheConfig) -> Option { + let client_config = match RedisConfig::from_url(&config.redis.url) { + Ok(c) => c, + Err(e) => { + error!("Failed to parse Redis URL '{}': {}", config.redis.url, e); + return None; + } + }; + + let client = match Builder::from_config(client_config).build() { + Ok(c) => c, + Err(e) => { + error!("Failed to build Redis client: {}", e); + return None; + } + }; + + let reconnecting = Arc::new(AtomicBool::new(true)); // treat initial connect as "reconnecting" + + let storage = Self { + client, + url: config.redis.url.clone(), + reconnecting, + }; + + // Try to wait up to operation_timeout for connection establishment. If it times out, let it complete in background. + let operation_timeout = config.redis.operation_timeout.get(); + let _ = tokio::time::timeout( + Duration::from_millis(operation_timeout), + storage.spawn_connect_task(), + ) + .await; + + Some(storage) + } + + // ── internal helpers ──────────────────────────────────────────────────── + + /// Spawn the (re)connect background loop. Uses a CAS to ensure only one + /// task is ever running at a time. + fn spawn_connect_task(&self) -> tokio::task::JoinHandle<()> { + let client = self.client.clone(); + let reconnecting = self.reconnecting.clone(); + + tokio::spawn(async move { + info!("Redis connect task started"); + let mut attempt = 0u32; + + loop { + attempt += 1; + debug!("Redis connect attempt #{}", attempt); + + let operation_timeout = config().config.general.cache.redis.operation_timeout.get(); + let init_ok = match tokio::time::timeout( + Duration::from_millis(operation_timeout), + client.init(), + ) + .await + { + Ok(Ok(_)) => true, + Ok(Err(e)) => { + debug!("Redis init error: {}", e); + false + } + Err(_) => { + debug!("Redis init timed out"); + false + } + }; + + if init_ok { + reconnecting.store(false, Ordering::Release); + info!("Redis connected (attempt #{})", attempt); + return; + } + + // Exponential backoff + tokio::time::sleep( + const { Duration::from_millis(5) } + .saturating_mul(1u32 << attempt.min(10)) + .min(MAX_REDIS_RECONNECTION_PERIOD), + ) + .await; + } + }) + } + + /// Mark the reconnecting as true and spawn a reconnect task if one is not + /// already running. + fn reconnect(&self) { + if self + .reconnecting + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + self.spawn_connect_task(); + } else { + debug!("Redis reconnect task already running"); + } + } +} + +#[async_trait] +impl CacheStorage for RedisCacheStorage { + async fn get(&self, key: u64) -> Result, Error> { + if self.reconnecting.load(Ordering::Acquire) { + return Err(Error::ConnectionFailed("Redis not connected")); + } + + let config = &config().config.general.cache; + + let full_key = format!("{}{}", config.redis.cache_key_prefix, key); + + let operation_timeout = config.redis.operation_timeout.get(); + let redis_result = tokio::time::timeout( + Duration::from_millis(operation_timeout), + self.client.get::(full_key), + ) + .await; + let val = match redis_result { + Ok(Ok(v)) => v, + Ok(Err(err)) => { + self.reconnect(); + return Err(Error::RedisError { + cmd: "GET", + key, + err, + }); + } + Err(_) => { + self.reconnect(); + return Err(Error::ConnectionFailed("Redis GET timed out")); + } + }; + + match val.into_bytes() { + Some(bytes) => { + debug!("Cache hit for key {}", key); + Ok(bytes.to_vec()) + } + None => Err(Error::CacheMiss(key)), + } + } + + async fn set(&self, key: u64, value: &[u8], ttl: u64) -> Result<(), Error> { + if self.reconnecting.load(Ordering::Acquire) { + return Err(Error::ConnectionFailed("Redis not connected")); + } + + let config = &config().config.general.cache; + + let max_result_size = config.max_result_size; + if max_result_size != 0 && value.len() > max_result_size { + debug!( + "Skipping cache for key {}: size {} exceeds max {}", + key, + value.len(), + max_result_size + ); + return Ok(()); + } + + let full_key = format!("{}{}", config.redis.cache_key_prefix, key); + let ttl_seconds = ttl as i64; + + let operation_timeout = config.redis.operation_timeout.get(); + match tokio::time::timeout( + Duration::from_millis(operation_timeout), + self.client.set::<(), _, _>( + full_key, + value, + Some(Expiration::EX(ttl_seconds)), + None, + false, + ), + ) + .await + { + Ok(Ok(_)) => { + debug!("Cached key {} with TTL {}s", key, ttl_seconds); + Ok(()) + } + Ok(Err(err)) => { + self.reconnect(); + Err(Error::RedisError { + cmd: "SET", + key, + err, + }) + } + Err(_) => { + self.reconnect(); + Err(Error::ConnectionFailed("Redis SET timed out")) + } + } + } + + fn is_enabled(&self) -> bool { + config().config.general.cache.enabled + } + + fn is_actual(&self) -> bool { + let new_config = &config().config.general.cache; + new_config.backend != CacheBackend::Redis || self.url != new_config.redis.url + } +} + +// Avoid shallow copy +impl Clone for RedisCacheStorage { + fn clone(&self) -> Self { + Self { + client: self.client.clone_new(), + url: self.url.clone(), + reconnecting: Arc::new(AtomicBool::new(false)), + } + } +} diff --git a/pgdog/src/frontend/cache/wire.rs b/pgdog/src/frontend/cache/wire.rs new file mode 100644 index 000000000..70f80db6c --- /dev/null +++ b/pgdog/src/frontend/cache/wire.rs @@ -0,0 +1,197 @@ +use tracing::debug; + +use crate::net::{FromBytes, Message}; + +const HEADER_CODE_LEN: usize = 1; +const HEADER_LEN_SIZE: usize = 4; +const HEADER_TOTAL: usize = HEADER_CODE_LEN + HEADER_LEN_SIZE; + +/// Deserializes a flat byte blob (N concatenated PostgreSQL wire messages) into `Vec`. +/// +/// Redis stores cache responses as raw wire-format bytes concatenated together without framing. +/// We walk through the blob reading each message boundary, then slice out the individual message. +/// +/// ### PostgreSQL wire protocol message layout: +/// +/// [Source](https://www.postgresql.org/docs/current/protocol-overview.html) +/// +/// ```text +/// +----------+--------------------------+-------------------+ +/// | 1 byte | 4 bytes (big-endian) | N bytes (payload) | +/// | code | length (incl. 4B itself) | data | +/// +----------+--------------------------+-------------------+ +/// ``` +/// +/// Constants for parsing: +/// - `HEADER_CODE_LEN` = 1 byte (message type code, e.g. 'T' = RowDescription) +/// - `HEADER_LEN_SIZE` = 4 bytes (message length, includes itself but NOT the code byte) +/// - `HEADER_TOTAL` = 5 bytes (minimum bytes needed to read the length field) +pub(super) fn deserialize_cached(cached: Vec) -> Vec { + let mut messages = Vec::new(); + let mut offset = 0; + let len = cached.len(); + + while offset < len { + // Need at least a full header (code + length) to proceed. + if offset + HEADER_TOTAL > len { + debug!( + "deserializing cached response: not enough bytes for message header (offset={}, len={})", + offset, len + ); + break; + } + + // Read the message length field (4 bytes, big-endian). + // This length includes the 4-byte length field itself but NOT the code byte. + let msg_len = u32::from_be_bytes([ + cached[offset + 1], + cached[offset + 2], + cached[offset + 3], + cached[offset + 4], + ]) as usize; + + // Sanity checks: + // 1. Length must be at least 4 (the length field itself): if < 4 the data is corrupt. + // 2. Must not read past the end of the blob. + if msg_len < 4 || offset + HEADER_CODE_LEN + msg_len > len { + debug!( + "deserializing cached response: invalid msg length {} (offset={}, len={})", + msg_len, offset, len + ); + break; + } + + // Full message spans: 1 byte (code) + msg_len (length field + payload) + let end = offset + HEADER_CODE_LEN + msg_len; + + let msg_bytes: bytes::Bytes = cached[offset..end].to_vec().into(); + if let Ok(msg) = Message::from_bytes(msg_bytes) { + messages.push(msg); + } + offset = end; + } + + messages +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::net::messages::{CommandComplete, Protocol, ReadyForQuery, ToBytes}; + + /// Build a raw wire-format blob from a list of typed protocol messages. + fn wire_bytes(msgs: &[&dyn ToBytes]) -> Vec { + let mut buf = Vec::new(); + for msg in msgs { + buf.extend_from_slice(&msg.to_bytes().unwrap()); + } + buf + } + + #[test] + fn deserialize_empty_input() { + let messages = deserialize_cached(vec![]); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_single_message() { + let rfq = ReadyForQuery::idle(); + let blob = wire_bytes(&[&rfq]); + let messages = deserialize_cached(blob); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].code(), 'Z'); + } + + #[test] + fn deserialize_multiple_messages_roundtrip() { + let cc = CommandComplete::new("SELECT 1"); + let rfq = ReadyForQuery::idle(); + let blob = wire_bytes(&[&cc, &rfq]); + + let messages = deserialize_cached(blob); + assert_eq!(messages.len(), 2); + assert_eq!(messages[0].code(), 'C'); + assert_eq!(messages[1].code(), 'Z'); + } + + #[test] + fn deserialize_roundtrip_payload_matches() { + let cc = CommandComplete::new("SELECT 42"); + let rfq = ReadyForQuery::idle(); + let original: Vec = vec![ + Message::new(cc.to_bytes().unwrap()), + Message::new(rfq.to_bytes().unwrap()), + ]; + + // Serialize to flat blob exactly as cache_response does. + let mut blob = Vec::new(); + for msg in &original { + blob.extend_from_slice(&msg.to_bytes().unwrap()); + } + + let deserialized = deserialize_cached(blob); + assert_eq!(deserialized.len(), original.len()); + for (d, o) in deserialized.iter().zip(original.iter()) { + assert_eq!(d.payload(), o.payload()); + } + } + + #[test] + fn deserialize_truncated_header_no_panic() { + // Only 3 bytes — not enough for a full 5-byte header. + let truncated = vec![b'Z', 0x00, 0x00]; + let messages = deserialize_cached(truncated); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_truncated_payload_no_panic() { + // Valid header claiming length 8 (4-byte len field + 4-byte payload), + // but we only provide the header and 2 payload bytes instead of 4. + let mut blob = Vec::new(); + blob.push(b'C'); // code byte + blob.extend_from_slice(&8u32.to_be_bytes()); // length = 8 (includes itself) + blob.extend_from_slice(&[0u8, 0]); // only 2 of the expected 4 payload bytes + let messages = deserialize_cached(blob); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_corrupt_length_no_panic() { + // Length field set to 0 — invalid (must be >= 4). + let mut blob = Vec::new(); + blob.push(b'Z'); + blob.extend_from_slice(&0u32.to_be_bytes()); + let messages = deserialize_cached(blob); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_length_of_three_no_panic() { + // Length field = 3 — below minimum of 4, should be rejected. + let mut blob = Vec::new(); + blob.push(b'Z'); + blob.extend_from_slice(&3u32.to_be_bytes()); + blob.extend_from_slice(&[0u8; 3]); + let messages = deserialize_cached(blob); + assert!(messages.is_empty()); + } + + #[test] + fn deserialize_many_messages() { + // Round-trip 10 CommandComplete messages. + let n = 10usize; + let mut blob = Vec::new(); + for i in 0..n { + let cc = CommandComplete::new(format!("SELECT {}", i)); + blob.extend_from_slice(&cc.to_bytes().unwrap()); + } + + let messages = deserialize_cached(blob); + assert_eq!(messages.len(), n); + for msg in &messages { + assert_eq!(msg.code(), 'C'); + } + } +} diff --git a/pgdog/src/frontend/client/query_engine/context.rs b/pgdog/src/frontend/client/query_engine/context.rs index b54751a35..6a1fe3c38 100644 --- a/pgdog/src/frontend/client/query_engine/context.rs +++ b/pgdog/src/frontend/client/query_engine/context.rs @@ -1,6 +1,7 @@ use crate::{ backend::pool::{connection::mirror::Mirror, stats::MemoryStats}, frontend::{ + cache::context::CacheContext, client::{timeouts::Timeouts, Sticky, TransactionType}, router::parser::rewrite::statement::plan::RewriteResult, Client, ClientRequest, PreparedStatements, @@ -39,6 +40,8 @@ pub struct QueryEngineContext<'a> { pub(super) sticky: Sticky, /// Rewrite result. pub(super) rewrite_result: Option, + /// Cache context. + pub(super) cache_context: CacheContext, } impl<'a> QueryEngineContext<'a> { @@ -60,6 +63,7 @@ impl<'a> QueryEngineContext<'a> { rollback: false, sticky: client.sticky, rewrite_result: None, + cache_context: CacheContext::default(), } } @@ -86,6 +90,7 @@ impl<'a> QueryEngineContext<'a> { rollback: false, sticky: Sticky::new(), rewrite_result: None, + cache_context: CacheContext::default(), } } diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index f0dc8979b..3312d909a 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -2,6 +2,7 @@ use crate::{ backend::pool::{Connection, Request}, config::config, frontend::{ + cache::cache, client::query_engine::{hooks::QueryEngineHooks, route_query::ClusterCheck}, router::{parser::Shard, Route}, BufferedQuery, Client, ClientComms, Command, Error, Router, RouterContext, Stats, @@ -129,6 +130,24 @@ impl QueryEngine { return Ok(()); } + let in_transaction = context.in_transaction(); + if let Some(cached_messages) = cache() + .await + .try_read_cache( + &mut context.cache_context, + in_transaction, + context.client_request, + context.params, + ) + .await? + { + for msg in cached_messages { + self.process_server_message(context, msg).await?; + } + self.update_stats(context); + return Ok(()); + } + self.hooks.before_execution(context)?; // Queue up request to mirrors, if any. @@ -228,6 +247,11 @@ impl QueryEngine { command => self.unknown_command(context, command.clone()).await?, } + cache() + .await + .save_response_in_cache(&mut context.cache_context) + .await; + self.hooks.after_execution(context)?; if context.in_error() { diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 231d936cd..0775b4682 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -120,6 +120,8 @@ impl QueryEngine { context: &mut QueryEngineContext<'_>, mut message: Message, ) -> Result<(), Error> { + context.cache_context.capture_response(message.clone()); + self.streaming = message.streaming(); let code = message.code(); diff --git a/pgdog/src/frontend/mod.rs b/pgdog/src/frontend/mod.rs index 284b777b0..aa1bbe523 100644 --- a/pgdog/src/frontend/mod.rs +++ b/pgdog/src/frontend/mod.rs @@ -1,6 +1,7 @@ //! pgDog frontend manages connections to clients. pub mod buffered_query; +pub mod cache; pub mod client; pub mod client_request; pub mod comms; diff --git a/pgdog/src/frontend/router/parser/cache/ast.rs b/pgdog/src/frontend/router/parser/cache/ast.rs index c34d865dc..16f403a24 100644 --- a/pgdog/src/frontend/router/parser/cache/ast.rs +++ b/pgdog/src/frontend/router/parser/cache/ast.rs @@ -12,6 +12,7 @@ use super::super::{ }; use super::{Fingerprint, Stats}; use crate::backend::schema::Schema; +use crate::frontend::cache::directive::CacheDirective; use crate::frontend::router::parser::rewrite::statement::RewritePlan; use crate::frontend::{BufferedQuery, PreparedStatements}; use crate::net::parameter::ParameterValue; @@ -37,6 +38,8 @@ pub struct AstInner { pub comment_shard: Option, /// Role. pub comment_role: Option, + /// Cache. + pub comment_cache: Option, /// Rewrite plan. pub rewrite_plan: RewritePlan, /// Fingerprint. @@ -51,6 +54,7 @@ impl AstInner { stats: Mutex::new(Stats::new()), comment_role: None, comment_shard: None, + comment_cache: None, rewrite_plan: RewritePlan::default(), fingerprint: Fingerprint::default(), } @@ -81,7 +85,7 @@ impl Ast { QueryParserEngine::PgQueryRaw => parse_raw(query), } .map_err(Error::PgQuery)?; - let (comment_shard, comment_role) = comment(query, schema)?; + let (comment_shard, comment_role, comment_cache) = comment(query, schema)?; let fingerprint = Fingerprint::new(query, schema.query_parser_engine).map_err(Error::PgQuery)?; @@ -113,6 +117,7 @@ impl Ast { stats: Mutex::new(stats), comment_shard, comment_role, + comment_cache, ast, rewrite_plan, fingerprint, diff --git a/pgdog/src/frontend/router/parser/comment.rs b/pgdog/src/frontend/router/parser/comment.rs index a87883adb..24a6a8307 100644 --- a/pgdog/src/frontend/router/parser/comment.rs +++ b/pgdog/src/frontend/router/parser/comment.rs @@ -6,6 +6,7 @@ use regex::Regex; use crate::backend::ShardingSchema; use crate::config::database::Role; +use crate::frontend::cache::directive::CacheDirective; use crate::frontend::router::sharding::ContextBuilder; use super::super::parser::Shard; @@ -16,6 +17,8 @@ static SHARDING_KEY: Lazy = Lazy::new(|| { Regex::new(r#"pgdog_sharding_key: *(?:"([^"]*)"|'([^']*)'|([0-9a-zA-Z-]+))"#).unwrap() }); static ROLE: Lazy = Lazy::new(|| Regex::new(r#"pgdog_role: *(primary|replica)"#).unwrap()); +static CACHE: Lazy = + Lazy::new(|| Regex::new(r#"pgdog_cache: *(.*?)(?:\s*pgdog_\w+:|$|\s*\*/)"#).unwrap()); fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { caps.get(1) @@ -24,23 +27,24 @@ fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { .map(|m| m.as_str()) } -/// Extract shard number from a comment. +/// Extract shard number, role and cache directive from a comment. /// /// Comment style uses the C-style comments (not SQL comments!) /// as to allow the comment to appear anywhere in the query. /// -/// See [`SHARD`] and [`SHARDING_KEY`] for the style of comment we expect. +/// See [`SHARD`], [`SHARDING_KEY`], [`ROLE`] and [`CACHE`] for the style of comment we expect. /// pub fn comment( query: &str, schema: &ShardingSchema, -) -> Result<(Option, Option), Error> { +) -> Result<(Option, Option, Option), Error> { let tokens = match schema.query_parser_engine { QueryParserEngine::PgQueryProtobuf => scan(query), QueryParserEngine::PgQueryRaw => scan_raw(query), } .map_err(Error::PgQuery)?; let mut role = None; + let mut cache = None; for token in tokens.tokens.iter() { if token.token == Token::CComment as i32 { @@ -54,15 +58,20 @@ pub fn comment( } } } + if let Some(cap) = CACHE.captures(comment) { + if let Some(body) = cap.get(1) { + cache = Some(CacheDirective::parse(body.as_str())); + } + } if let Some(cap) = SHARDING_KEY.captures(comment) { if let Some(sharding_key) = get_matched_value(&cap) { if let Some(schema) = schema.schemas.get(Some(sharding_key.into())) { - return Ok((Some(schema.shard().into()), role)); + return Ok((Some(schema.shard().into()), role, cache)); } let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)? .shards(schema.shards) .build()?; - return Ok((Some(ctx.apply()?), role)); + return Ok((Some(ctx.apply()?), role, cache)); } } if let Some(cap) = SHARD.captures(comment) { @@ -77,13 +86,14 @@ pub fn comment( .unwrap_or(Shard::All), ), role, + cache, )); } } } } - Ok((None, role)) + Ok((None, role, cache)) } #[cfg(test)] @@ -255,4 +265,226 @@ mod tests { let result = comment(query, &schema).unwrap(); assert_eq!(result.0, Some(Shard::Direct(1))); } + + #[test] + fn test_cache_hint_no_cache() { + use crate::backend::ShardedTables; + use crate::frontend::cache::directive::CacheMode; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: no_cache */"; + let result = comment(query, &schema).unwrap(); + assert_eq!( + result.2, + Some(CacheDirective { + mode: Some(CacheMode::NoCache), + ..Default::default() + }) + ); + } + + #[test] + fn test_cache_hint_cache_default_ttl() { + use crate::backend::ShardedTables; + use crate::frontend::cache::directive::CacheMode; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: cache */"; + let result = comment(query, &schema).unwrap(); + assert_eq!( + result.2, + Some(CacheDirective { + mode: Some(CacheMode::Cache), + ..Default::default() + }) + ); + } + + #[test] + fn test_cache_hint_cache_with_ttl() { + use crate::backend::ShardedTables; + use crate::frontend::cache::directive::CacheMode; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: cache ttl=60 */"; + let result = comment(query, &schema).unwrap(); + assert_eq!( + result.2, + Some(CacheDirective { + mode: Some(CacheMode::Cache), + ttl_seconds: Some(60), + ..Default::default() + }) + ); + } + + #[test] + fn test_cache_hint_no_directive() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users"; + let result = comment(query, &schema).unwrap(); + assert!(matches!(result.2, None)); + } + + #[test] + fn test_combined_shard_and_cache_hints() { + use crate::backend::ShardedTables; + use crate::frontend::cache::directive::CacheMode; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_role: replica pgdog_shard: 1 pgdog_cache: cache ttl=300 */"; + let result = comment(query, &schema).unwrap(); + assert_eq!(result.1, Some(Role::Replica)); + assert_eq!(result.0, Some(Shard::Direct(1))); + assert_eq!( + result.2, + Some(CacheDirective { + mode: Some(CacheMode::Cache), + ttl_seconds: Some(300), + ..Default::default() + }) + ); + } + + #[test] + fn test_cache_hint_force_cache() { + use crate::backend::ShardedTables; + use crate::frontend::cache::directive::CacheMode; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: force_cache */"; + let result = comment(query, &schema).unwrap(); + assert_eq!( + result.2, + Some(CacheDirective { + mode: Some(CacheMode::ForceCache), + ..Default::default() + }) + ); + } + + #[test] + fn test_cache_hint_force_cache_with_ttl() { + use crate::backend::ShardedTables; + use crate::frontend::cache::directive::CacheMode; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: force_cache ttl=60 */"; + let result = comment(query, &schema).unwrap(); + assert_eq!( + result.2, + Some(CacheDirective { + mode: Some(CacheMode::ForceCache), + ttl_seconds: Some(60), + ..Default::default() + }) + ); + } + + #[test] + fn test_cache_hint_ttl_before_mode() { + use crate::backend::ShardedTables; + use crate::frontend::cache::directive::CacheMode; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + let query = "SELECT * FROM users /* pgdog_cache: ttl=17 force_cache */"; + let result = comment(query, &schema).unwrap(); + assert_eq!( + result.2, + Some(CacheDirective { + mode: Some(CacheMode::ForceCache), + ttl_seconds: Some(17), + ..Default::default() + }) + ); + } + + #[test] + fn test_cache_hint_ttl_only() { + use crate::backend::ShardedTables; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + // ttl only, no mode — mode falls back to param/config + let query = "SELECT * FROM users /* pgdog_cache: ttl=42 */"; + let result = comment(query, &schema).unwrap(); + assert_eq!( + result.2, + Some(CacheDirective { + ttl_seconds: Some(42), + ..Default::default() + }) + ); + } + + #[test] + fn test_cache_followed_by_role_directive() { + use crate::backend::ShardedTables; + use crate::frontend::cache::directive::CacheMode; + + let schema = ShardingSchema { + shards: 2, + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), + ..Default::default() + }; + + // pgdog_role: comes after pgdog_cache: — cache body must not swallow it + let query = "SELECT * FROM users /* pgdog_cache: force_cache ttl=10 pgdog_role: replica */"; + let result = comment(query, &schema).unwrap(); + assert_eq!(result.1, Some(Role::Replica)); + assert_eq!( + result.2, + Some(CacheDirective { + mode: Some(CacheMode::ForceCache), + ttl_seconds: Some(10), + ..Default::default() + }) + ); + } } diff --git a/pgdog/src/net/messages/hello.rs b/pgdog/src/net/messages/hello.rs index 84f901b06..5c989221b 100644 --- a/pgdog/src/net/messages/hello.rs +++ b/pgdog/src/net/messages/hello.rs @@ -60,7 +60,7 @@ impl Startup { } else if name == "options" { let kvs = value.split("-c"); for kv in kvs { - let mut nvs = kv.split("="); + let mut nvs = kv.splitn(2, "="); let name = nvs.next(); let value = nvs.next(); diff --git a/pgdog/src/net/parameter.rs b/pgdog/src/net/parameter.rs index 1502d0397..4dd0c6114 100644 --- a/pgdog/src/net/parameter.rs +++ b/pgdog/src/net/parameter.rs @@ -33,6 +33,7 @@ static UNTRACKED_PARAMS: Lazy> = Lazy::new(|| { String::from("pgdog.role"), String::from("pgdog.shard"), String::from("pgdog.sharding_key"), + String::from("pgdog.cache"), ]) });