Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,26 @@ use crate::schema::{PartMigration, SchemaCache};

pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
"persist_fetch_semaphore_cost_adjustment",
// We use `encoded_size_bytes` as the number of permits, but the parsed size
// is larger than the encoded one, so adjust it. This default value is from
// eyeballing graphs in experiments that were run on tpch loadgen data.
// We use `encoded_size_bytes` as the number of permits, but the decoded,
// heap-resident size is larger, so adjust it. This default value is from
// eyeballing graphs in experiments that were run on tpch loadgen data, and
// may need retuning now that the decoded data lands on the heap rather than
// in lgalloc.
1.2,
"\
An adjustment multiplied by encoded_size_bytes to approximate an upper \
bound on the size in lgalloc, which includes the decoded version.",
bound on the heap size of a fetched part, which includes the decoded \
version.",
);

pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
"persist_fetch_semaphore_permit_adjustment",
1.0,
0.1,
"\
A limit on the number of outstanding persist bytes being fetched and \
parsed, expressed as a multiplier of the process's memory limit. This data \
all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
replicas.",
A hard bound on the outstanding persist bytes being fetched and decoded, \
expressed as a multiplier of the process's memory limit. Applied to all \
replicas. These bytes are heap-resident (there is no disk to spill to) and \
must coexist with the arrangements being built, so this is well below 1.0.",
);

pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
Expand Down
56 changes: 51 additions & 5 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use prometheus::proto::MetricFamily;
use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec};
use timely::progress::Antichain;
use tokio_metrics::TaskMonitor;
use tracing::{Instrument, debug, info, info_span};
use tracing::{Instrument, debug, info, info_span, warn};

use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT};
use crate::internal::paths::BlobKey;
Expand Down Expand Up @@ -2531,9 +2531,12 @@ impl SemaphoreMetrics {
let registry = self.registry.clone();
let init = async move {
let total_permits = match cfg.announce_memory_limit {
// Non-cc replicas have the old physical flow control mechanism,
// so only apply this one on cc replicas.
Some(mem) if cfg.is_cc_active => {
// Bound outstanding fetched+decoding bytes to a fraction of the
// process memory limit. This is the only read-ahead bound: there
// is no disk to spill to, so these bytes are heap-resident and
// must coexist with the arrangements being built. Applied to all
// replicas (the cc/non-cc distinction is gone).
Some(mem) => {
// We can't easily adjust the number of permits later, so
// make sure we've synced dyncfg values at least once.
info!("fetch semaphore awaiting first dyncfg values");
Expand All @@ -2544,7 +2547,12 @@ impl SemaphoreMetrics {
info!("fetch_semaphore got first dyncfg values");
total_permits
}
Some(_) | None => Semaphore::MAX_PERMITS,
// No announced memory limit (embedded/test processes); we have
// nothing to size against, so read-ahead is unbounded.
None => {
warn!("fetch semaphore unbounded: no announced memory limit");
Semaphore::MAX_PERMITS
}
};
MetricsSemaphore::new(&registry, "fetch", total_permits)
};
Expand Down Expand Up @@ -3245,3 +3253,41 @@ pub fn encode_ts_metric<T: Codec64>(ts: &Antichain<T>) -> i64 {
None => i64::MAX,
}
}

#[cfg(test)]
mod tests {
use mz_dyncfg::ConfigUpdates;
use mz_ore::metrics::MetricsRegistry;

use super::*;

/// The fetch semaphore must be sized from the announced memory limit on all
/// replicas, regardless of `is_cc_active` (the cc/non-cc distinction is
/// gone), and only fall back to unbounded when no memory limit is announced.
#[mz_ore::test(tokio::test)]
async fn fetch_semaphore_sized_from_memory_limit() {
async fn total_permits(is_cc_active: bool, mem: Option<usize>) -> usize {
let mut cfg = PersistConfig::new_for_tests();
cfg.is_cc_active = is_cc_active;
cfg.announce_memory_limit = mem;
// Mark configs synced so the semaphore initializes rather than
// blocking on `configs_synced_once`.
cfg.apply_from(&ConfigUpdates::default());
SemaphoreMetrics::new(cfg, MetricsRegistry::new())
.fetch()
.await
.total_permits
}

let cfg = PersistConfig::new_for_tests();
let adjustment = FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg);
let mem = 1 << 30;
let expected = usize::cast_lossy(f64::cast_lossy(mem) * adjustment);

// Both cc and non-cc replicas get the memory-derived bound.
assert_eq!(total_permits(true, Some(mem)).await, expected);
assert_eq!(total_permits(false, Some(mem)).await, expected);
// No announced limit => unbounded.
assert_eq!(total_permits(false, None).await, Semaphore::MAX_PERMITS);
}
}
44 changes: 43 additions & 1 deletion test/bounded-memory/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,40 @@ class KafkaScenario(Scenario):


SCENARIOS = [
Scenario(
# Regression guard for persist-source fetch backpressure: re-hydrating a
# large index must keep read-ahead within the announced memory limit
# instead of prefetching the whole snapshot and OOMing. See the
# `persist_fetch_semaphore_permit_adjustment` dyncfg.
name="persist-source-read-ahead",
pre_restart=dedent(f"""
> CREATE TABLE t1 (f1 INTEGER, f2 TEXT)

# ~1 GiB of data spread across many persist parts. The index below
# must re-read this entire snapshot from persist on re-hydration.
> INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g
> INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g
> INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g
> INSERT INTO t1 SELECT g, '{STRING_PAD}' FROM generate_series(1, 256 * 1024) g

> CREATE INDEX i1 IN CLUSTER clusterd ON t1 (f1)

> SELECT count(*) FROM t1
{4 * 256 * 1024}
"""),
post_restart=dedent(f"""
# On restart the index re-hydrates by reading t1's full snapshot
# through persist_source. The fetch semaphore must bound read-ahead
# so clusterd stays within its announced memory limit; an OOM here
# kills the replica and fails this step.
> SET CLUSTER = clusterd

> SELECT count(*) FROM t1
{4 * 256 * 1024}
"""),
materialized_memory="8Gb",
clusterd_memory="1.5Gb",
),
PgCdcScenario(
name="pg-cdc-snapshot",
pre_restart=PgCdcScenario.PG_SETUP
Expand Down Expand Up @@ -1452,9 +1486,17 @@ def run_scenario(
) -> None:
c.down(destroy_volumes=True)

# Mirror production: announce clusterd's memory limit so persist sizes its
# fetch-backpressure semaphore. Without this the semaphore is unbounded and
# the suite would not exercise persist's memory budget at all.
announce_bytes = int(_get_memory_in_gb(clusterd_memory) * 1024**3)

with c.override(
Materialized(memory=materialized_memory, support_external_clusterd=True),
Clusterd(memory=clusterd_memory),
Clusterd(
memory=clusterd_memory,
options=[f"--announce-memory-limit={announce_bytes}"],
),
):
c.up(
"redpanda",
Expand Down
Loading