diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 60e8308424c02..105f9f45e2c50 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -59,23 +59,26 @@ use crate::schema::{PartMigration, SchemaCache}; pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config = Config::new( "persist_fetch_semaphore_cost_adjustment", - // We use `encoded_size_bytes` as the number of permits, but the parsed size - // is larger than the encoded one, so adjust it. This default value is from - // eyeballing graphs in experiments that were run on tpch loadgen data. + // We use `encoded_size_bytes` as the number of permits, but the decoded, + // heap-resident size is larger, so adjust it. This default value is from + // eyeballing graphs in experiments that were run on tpch loadgen data, and + // may need retuning now that the decoded data lands on the heap rather than + // in lgalloc. 1.2, "\ An adjustment multiplied by encoded_size_bytes to approximate an upper \ - bound on the size in lgalloc, which includes the decoded version.", + bound on the heap size of a fetched part, which includes the decoded \ + version.", ); pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config = Config::new( "persist_fetch_semaphore_permit_adjustment", - 1.0, + 0.1, "\ - A limit on the number of outstanding persist bytes being fetched and \ - parsed, expressed as a multiplier of the process's memory limit. This data \ - all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \ - replicas.", + A hard bound on the outstanding persist bytes being fetched and decoded, \ + expressed as a multiplier of the process's memory limit. Applied to all \ + replicas. These bytes are heap-resident (there is no disk to spill to) and \ + must coexist with the arrangements being built, so this is well below 1.0.", ); pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new( diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 8daba8b775854..dab60ad3d9bf4 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -41,7 +41,7 @@ use prometheus::proto::MetricFamily; use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec}; use timely::progress::Antichain; use tokio_metrics::TaskMonitor; -use tracing::{Instrument, debug, info, info_span}; +use tracing::{Instrument, debug, info, info_span, warn}; use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT}; use crate::internal::paths::BlobKey; @@ -2531,9 +2531,12 @@ impl SemaphoreMetrics { let registry = self.registry.clone(); let init = async move { let total_permits = match cfg.announce_memory_limit { - // Non-cc replicas have the old physical flow control mechanism, - // so only apply this one on cc replicas. - Some(mem) if cfg.is_cc_active => { + // Bound outstanding fetched+decoding bytes to a fraction of the + // process memory limit. This is the only read-ahead bound: there + // is no disk to spill to, so these bytes are heap-resident and + // must coexist with the arrangements being built. Applied to all + // replicas (the cc/non-cc distinction is gone). + Some(mem) => { // We can't easily adjust the number of permits later, so // make sure we've synced dyncfg values at least once. info!("fetch semaphore awaiting first dyncfg values"); @@ -2544,7 +2547,12 @@ impl SemaphoreMetrics { info!("fetch_semaphore got first dyncfg values"); total_permits } - Some(_) | None => Semaphore::MAX_PERMITS, + // No announced memory limit (embedded/test processes); we have + // nothing to size against, so read-ahead is unbounded. + None => { + warn!("fetch semaphore unbounded: no announced memory limit"); + Semaphore::MAX_PERMITS + } }; MetricsSemaphore::new(®istry, "fetch", total_permits) }; @@ -3245,3 +3253,41 @@ pub fn encode_ts_metric(ts: &Antichain) -> i64 { None => i64::MAX, } } + +#[cfg(test)] +mod tests { + use mz_dyncfg::ConfigUpdates; + use mz_ore::metrics::MetricsRegistry; + + use super::*; + + /// The fetch semaphore must be sized from the announced memory limit on all + /// replicas, regardless of `is_cc_active` (the cc/non-cc distinction is + /// gone), and only fall back to unbounded when no memory limit is announced. + #[mz_ore::test(tokio::test)] + async fn fetch_semaphore_sized_from_memory_limit() { + async fn total_permits(is_cc_active: bool, mem: Option) -> usize { + let mut cfg = PersistConfig::new_for_tests(); + cfg.is_cc_active = is_cc_active; + cfg.announce_memory_limit = mem; + // Mark configs synced so the semaphore initializes rather than + // blocking on `configs_synced_once`. + cfg.apply_from(&ConfigUpdates::default()); + SemaphoreMetrics::new(cfg, MetricsRegistry::new()) + .fetch() + .await + .total_permits + } + + let cfg = PersistConfig::new_for_tests(); + let adjustment = FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg); + let mem = 1 << 30; + let expected = usize::cast_lossy(f64::cast_lossy(mem) * adjustment); + + // Both cc and non-cc replicas get the memory-derived bound. + assert_eq!(total_permits(true, Some(mem)).await, expected); + assert_eq!(total_permits(false, Some(mem)).await, expected); + // No announced limit => unbounded. + assert_eq!(total_permits(false, None).await, Semaphore::MAX_PERMITS); + } +} diff --git a/test/bounded-memory/mzcompose.py b/test/bounded-memory/mzcompose.py index 9e465586b9825..38d15f06879e7 100644 --- a/test/bounded-memory/mzcompose.py +++ b/test/bounded-memory/mzcompose.py @@ -220,6 +220,40 @@ class KafkaScenario(Scenario): SCENARIOS = [ + Scenario( + # Regression guard for persist-source fetch backpressure: re-hydrating a + # large index must keep read-ahead within the announced memory limit + # instead of prefetching the whole snapshot and OOMing. See the + # `persist_fetch_semaphore_permit_adjustment` dyncfg. + name="persist-source-read-ahead", + pre_restart=dedent(f""" + > CREATE TABLE t1 (f1 INTEGER, f2 TEXT) + + # ~1 GiB of data spread across many persist parts. The index below + # must re-read this entire snapshot from persist on re-hydration. + > INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g + > INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g + > INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g + > INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g + + > CREATE INDEX i1 IN CLUSTER clusterd ON t1 (f1) + + > SELECT count(*) FROM t1 + {4 * 256 * 1024} + """), + post_restart=dedent(f""" + # On restart the index re-hydrates by reading t1's full snapshot + # through persist_source. The fetch semaphore must bound read-ahead + # so clusterd stays within its announced memory limit; an OOM here + # kills the replica and fails this step. + > SET CLUSTER = clusterd + + > SELECT count(*) FROM t1 + {4 * 256 * 1024} + """), + materialized_memory="8Gb", + clusterd_memory="1.5Gb", + ), PgCdcScenario( name="pg-cdc-snapshot", pre_restart=PgCdcScenario.PG_SETUP @@ -1452,9 +1486,17 @@ def run_scenario( ) -> None: c.down(destroy_volumes=True) + # Mirror production: announce clusterd's memory limit so persist sizes its + # fetch-backpressure semaphore. Without this the semaphore is unbounded and + # the suite would not exercise persist's memory budget at all. + announce_bytes = int(_get_memory_in_gb(clusterd_memory) * 1024**3) + with c.override( Materialized(memory=materialized_memory, support_external_clusterd=True), - Clusterd(memory=clusterd_memory), + Clusterd( + memory=clusterd_memory, + options=[f"--announce-memory-limit={announce_bytes}"], + ), ): c.up( "redpanda",