From fa932e3161928f6260e2598ead0a4915d687d9fb Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 11 Jun 2026 13:51:26 +0200 Subject: [PATCH] persist: bound fetch read-ahead on all replicas The persist fetch semaphore bounds the bytes of fetched-but-not-yet-decoded parts held in memory. It was gated on `is_cc_active` and, where active, sized to the full process memory limit, both under the assumption that this data spills to lgalloc-backed disk. That assumption no longer holds: there is no spill disk, the data is heap-resident, and the cc/non-cc distinction is gone. As a result the semaphore was effectively disabled (`MAX_PERMITS`) on the replicas that matter, so persist_source could read ahead without bound during hydration. A production heap profile showed fetched-but-undecoded blobs dominating the heap, with a large transient spike over steady state. Size the semaphore from the announced memory limit on every replica, and lower the default budget from 1.0x to 0.1x of the limit: the read-ahead buffers must now coexist with the arrangements being built in the same scarce heap, so the bound has to sit well below the full limit. Rewrite the flag docs to describe a hard heap bound rather than an lgalloc spill budget. Also make the bounded-memory suite start clusterd with `--announce-memory-limit`, mirroring production. Without it the semaphore was unbounded in the suite, so it never exercised persist's memory budget. Add a `persist-source-read-ahead` scenario that re-hydrates a large index and must stay within the announced limit. Co-Authored-By: Claude Opus 4.8 --- src/persist-client/src/fetch.rs | 21 ++++---- src/persist-client/src/internal/metrics.rs | 56 ++++++++++++++++++++-- test/bounded-memory/mzcompose.py | 44 ++++++++++++++++- 3 files changed, 106 insertions(+), 15 deletions(-) diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 60e8308424c02..105f9f45e2c50 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -59,23 +59,26 @@ use crate::schema::{PartMigration, SchemaCache}; pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config = Config::new( "persist_fetch_semaphore_cost_adjustment", - // We use `encoded_size_bytes` as the number of permits, but the parsed size - // is larger than the encoded one, so adjust it. This default value is from - // eyeballing graphs in experiments that were run on tpch loadgen data. + // We use `encoded_size_bytes` as the number of permits, but the decoded, + // heap-resident size is larger, so adjust it. This default value is from + // eyeballing graphs in experiments that were run on tpch loadgen data, and + // may need retuning now that the decoded data lands on the heap rather than + // in lgalloc. 1.2, "\ An adjustment multiplied by encoded_size_bytes to approximate an upper \ - bound on the size in lgalloc, which includes the decoded version.", + bound on the heap size of a fetched part, which includes the decoded \ + version.", ); pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config = Config::new( "persist_fetch_semaphore_permit_adjustment", - 1.0, + 0.1, "\ - A limit on the number of outstanding persist bytes being fetched and \ - parsed, expressed as a multiplier of the process's memory limit. This data \ - all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \ - replicas.", + A hard bound on the outstanding persist bytes being fetched and decoded, \ + expressed as a multiplier of the process's memory limit. Applied to all \ + replicas. These bytes are heap-resident (there is no disk to spill to) and \ + must coexist with the arrangements being built, so this is well below 1.0.", ); pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new( diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 8daba8b775854..dab60ad3d9bf4 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -41,7 +41,7 @@ use prometheus::proto::MetricFamily; use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec}; use timely::progress::Antichain; use tokio_metrics::TaskMonitor; -use tracing::{Instrument, debug, info, info_span}; +use tracing::{Instrument, debug, info, info_span, warn}; use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT}; use crate::internal::paths::BlobKey; @@ -2531,9 +2531,12 @@ impl SemaphoreMetrics { let registry = self.registry.clone(); let init = async move { let total_permits = match cfg.announce_memory_limit { - // Non-cc replicas have the old physical flow control mechanism, - // so only apply this one on cc replicas. - Some(mem) if cfg.is_cc_active => { + // Bound outstanding fetched+decoding bytes to a fraction of the + // process memory limit. This is the only read-ahead bound: there + // is no disk to spill to, so these bytes are heap-resident and + // must coexist with the arrangements being built. Applied to all + // replicas (the cc/non-cc distinction is gone). + Some(mem) => { // We can't easily adjust the number of permits later, so // make sure we've synced dyncfg values at least once. info!("fetch semaphore awaiting first dyncfg values"); @@ -2544,7 +2547,12 @@ impl SemaphoreMetrics { info!("fetch_semaphore got first dyncfg values"); total_permits } - Some(_) | None => Semaphore::MAX_PERMITS, + // No announced memory limit (embedded/test processes); we have + // nothing to size against, so read-ahead is unbounded. + None => { + warn!("fetch semaphore unbounded: no announced memory limit"); + Semaphore::MAX_PERMITS + } }; MetricsSemaphore::new(®istry, "fetch", total_permits) }; @@ -3245,3 +3253,41 @@ pub fn encode_ts_metric(ts: &Antichain) -> i64 { None => i64::MAX, } } + +#[cfg(test)] +mod tests { + use mz_dyncfg::ConfigUpdates; + use mz_ore::metrics::MetricsRegistry; + + use super::*; + + /// The fetch semaphore must be sized from the announced memory limit on all + /// replicas, regardless of `is_cc_active` (the cc/non-cc distinction is + /// gone), and only fall back to unbounded when no memory limit is announced. + #[mz_ore::test(tokio::test)] + async fn fetch_semaphore_sized_from_memory_limit() { + async fn total_permits(is_cc_active: bool, mem: Option) -> usize { + let mut cfg = PersistConfig::new_for_tests(); + cfg.is_cc_active = is_cc_active; + cfg.announce_memory_limit = mem; + // Mark configs synced so the semaphore initializes rather than + // blocking on `configs_synced_once`. + cfg.apply_from(&ConfigUpdates::default()); + SemaphoreMetrics::new(cfg, MetricsRegistry::new()) + .fetch() + .await + .total_permits + } + + let cfg = PersistConfig::new_for_tests(); + let adjustment = FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg); + let mem = 1 << 30; + let expected = usize::cast_lossy(f64::cast_lossy(mem) * adjustment); + + // Both cc and non-cc replicas get the memory-derived bound. + assert_eq!(total_permits(true, Some(mem)).await, expected); + assert_eq!(total_permits(false, Some(mem)).await, expected); + // No announced limit => unbounded. + assert_eq!(total_permits(false, None).await, Semaphore::MAX_PERMITS); + } +} diff --git a/test/bounded-memory/mzcompose.py b/test/bounded-memory/mzcompose.py index 9e465586b9825..38d15f06879e7 100644 --- a/test/bounded-memory/mzcompose.py +++ b/test/bounded-memory/mzcompose.py @@ -220,6 +220,40 @@ class KafkaScenario(Scenario): SCENARIOS = [ + Scenario( + # Regression guard for persist-source fetch backpressure: re-hydrating a + # large index must keep read-ahead within the announced memory limit + # instead of prefetching the whole snapshot and OOMing. See the + # `persist_fetch_semaphore_permit_adjustment` dyncfg. + name="persist-source-read-ahead", + pre_restart=dedent(f""" + > CREATE TABLE t1 (f1 INTEGER, f2 TEXT) + + # ~1 GiB of data spread across many persist parts. The index below + # must re-read this entire snapshot from persist on re-hydration. + > INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g + > INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g + > INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g + > INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g + + > CREATE INDEX i1 IN CLUSTER clusterd ON t1 (f1) + + > SELECT count(*) FROM t1 + {4 * 256 * 1024} + """), + post_restart=dedent(f""" + # On restart the index re-hydrates by reading t1's full snapshot + # through persist_source. The fetch semaphore must bound read-ahead + # so clusterd stays within its announced memory limit; an OOM here + # kills the replica and fails this step. + > SET CLUSTER = clusterd + + > SELECT count(*) FROM t1 + {4 * 256 * 1024} + """), + materialized_memory="8Gb", + clusterd_memory="1.5Gb", + ), PgCdcScenario( name="pg-cdc-snapshot", pre_restart=PgCdcScenario.PG_SETUP @@ -1452,9 +1486,17 @@ def run_scenario( ) -> None: c.down(destroy_volumes=True) + # Mirror production: announce clusterd's memory limit so persist sizes its + # fetch-backpressure semaphore. Without this the semaphore is unbounded and + # the suite would not exercise persist's memory budget at all. + announce_bytes = int(_get_memory_in_gb(clusterd_memory) * 1024**3) + with c.override( Materialized(memory=materialized_memory, support_external_clusterd=True), - Clusterd(memory=clusterd_memory), + Clusterd( + memory=clusterd_memory, + options=[f"--announce-memory-limit={announce_bytes}"], + ), ): c.up( "redpanda",