From 218bac7e60bee7886de73e5d5dbd7237d8dbb147 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 12 Mar 2026 16:30:07 -0400 Subject: [PATCH] fix(amp-worker-core): compaction skips latest segments Add a `skip_latest_segments` config option (default: 3) that drops the last N segments from the canonical chain before compaction planning. This avoids compacting recently-written data that may contain reorganized blocks. --- crates/config/src/worker_core.rs | 10 ++++-- .../worker-core/src/compaction/algorithm.rs | 36 ++++++++++++++----- .../core/worker-core/src/compaction/config.rs | 8 ++++- .../core/worker-core/src/compaction/plan.rs | 5 +++ docs/schemas/config/ampd.spec.json | 9 ++++- tests/src/tests/it_sql_dataset_batch_size.rs | 1 + 6 files changed, 56 insertions(+), 13 deletions(-) diff --git a/crates/config/src/worker_core.rs b/crates/config/src/worker_core.rs index 47c823ab9..d7c030fd6 100644 --- a/crates/config/src/worker_core.rs +++ b/crates/config/src/worker_core.rs @@ -288,13 +288,17 @@ impl From<&CompactorConfig> for amp_worker_core::CompactorConfig { /// Compaction algorithm tuning parameters. /// -/// Controls cooldown between compaction runs and eager compaction size limits. +/// Controls cooldown, segment exclusion, and eager compaction size limits. #[derive(Debug, Clone, serde::Deserialize)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[serde(default)] pub struct CompactionAlgorithmConfig { - /// Base cooldown duration in seconds between compaction runs (default: 1024). + /// Minimum time in seconds before a file can initiate a new compaction group. + /// Prevents excessive I/O from rewriting large files too frequently (default: 1024). pub cooldown_duration: ConfigDuration<1024>, + /// Number of latest segments to skip during compaction. Avoids compacting + /// recently-written data that may contain reorganized blocks (default: 3). + pub skip_latest_segments: u64, /// Eager compaction limits (flattened fields: `overflow`, `bytes`, `rows`). #[serde( flatten, @@ -309,6 +313,7 @@ impl Default for CompactionAlgorithmConfig { fn default() -> Self { Self { cooldown_duration: ConfigDuration::default(), + skip_latest_segments: 3, eager_compaction_limit: SizeLimitConfig::default_eager_limit(), } } @@ -318,6 +323,7 @@ impl From<&CompactionAlgorithmConfig> for amp_worker_core::CompactionAlgorithmCo fn from(config: &CompactionAlgorithmConfig) -> Self { Self { cooldown_duration: (&config.cooldown_duration).into(), + skip_latest_segments: config.skip_latest_segments, eager_compaction_limit: (&config.eager_compaction_limit).into(), } } diff --git a/crates/core/worker-core/src/compaction/algorithm.rs b/crates/core/worker-core/src/compaction/algorithm.rs index aa8f0537e..27228824f 100644 --- a/crates/core/worker-core/src/compaction/algorithm.rs +++ b/crates/core/worker-core/src/compaction/algorithm.rs @@ -153,16 +153,20 @@ use crate::compaction::{compactor::CompactionGroup, plan::CompactionFile}; /// based on their size and age. /// /// ## Fields -/// - `cooldown_duration`: The base duration used to calculate -/// the cooldown period for files based on their generation. +/// - `cooldown_duration`: Minimum time before a file can initiate a new +/// compaction group. Prevents excessive I/O from rewriting large files +/// too frequently. /// - `target_partition_size`: The upper bound for segment size limits. /// Files exceeding this limit will not be compacted together. This /// value must be non-unbounded. /// - `max_eager_generation`: Segments up to this generation will not be subject to cooldowns. +/// - `skip_latest_segments`: Number of latest segments to skip during +/// compaction. Avoids compacting recently-written data that may contain +/// reorganized blocks. #[derive(Clone, Copy, Debug)] pub struct CompactionAlgorithm { - /// The amount of time a file must wait before it can be - /// compacted with files of different generations. + /// Minimum time before a file can initiate a new compaction group. + /// Prevents excessive I/O from rewriting large files too frequently. pub cooldown_duration: Duration, /// The upper bound for segment size limits. Files exceeding this limit /// will not be compacted together. This value must be non-unbounded. @@ -170,6 +174,10 @@ pub struct CompactionAlgorithm { /// Segments up to this generation will not be subject to cooldowns pub max_eager_generation: Option, + + /// Number of latest segments to skip during compaction. Avoids compacting + /// recently-written data that may contain reorganized blocks. + pub skip_latest_segments: u64, } impl CompactionAlgorithm { @@ -207,9 +215,17 @@ impl CompactionAlgorithm { /// - When a group is started, if the candidate can be added to it. /// /// The current algorithm is: - /// - If the file is `Hot`, it cannot start a new group. - /// - If a group has been started, it will accept files up to the target size, regardless of file state. + /// - `Hot` files (within `cooldown_duration`) cannot start a new group, + /// but can join an existing one. This prevents excessive I/O from + /// rewriting large files too frequently. + /// - Files are accepted into a group up to the target size. + /// + /// Note: The latest segments are excluded before they reach the predicate + /// (see `CompactionPlan::from_snapshot`), controlled by + /// `skip_latest_segments`. pub fn predicate(&self, group: &CompactionGroup, candidate: &CompactionFile) -> bool { + // Hot files cannot start a new group (prevents excessive I/O from + // rewriting large files too frequently). if group.is_empty() && self.file_state(&candidate.size) == FileState::Hot { return false; } @@ -241,13 +257,15 @@ impl<'a> From<&'a ParquetConfig> for CompactionAlgorithm { Some(Generation::from(generation)) } }, + skip_latest_segments: config.compactor.algorithm.skip_latest_segments, } } } -/// Cooldown period for file compaction. Before the period elapses, -/// the file will only be compacted if the candidate group shares the -/// same generation. +/// Cooldown period for file compaction. A file within its cooldown +/// period is considered "hot" and cannot initiate a new compaction +/// group. This prevents excessive I/O from rewriting large files +/// too frequently. Hot files can still be added to existing groups. #[derive(Clone, Copy)] pub struct Cooldown(Duration); diff --git a/crates/core/worker-core/src/compaction/config.rs b/crates/core/worker-core/src/compaction/config.rs index 8a679e276..72bcd2929 100644 --- a/crates/core/worker-core/src/compaction/config.rs +++ b/crates/core/worker-core/src/compaction/config.rs @@ -75,10 +75,15 @@ impl Default for CompactorConfig { #[derive(Debug, Clone)] pub struct CompactionAlgorithmConfig { - /// Base cooldown duration in seconds (default: 1024.0) + /// Minimum time before a file can initiate a new compaction group. + /// Prevents excessive I/O from rewriting large files too frequently + /// (default: 1024 seconds). pub cooldown_duration: ConfigDuration<1024>, /// Eager compaction limits pub eager_compaction_limit: SizeLimitConfig, + /// Number of latest segments to skip during compaction. Avoids compacting + /// recently-written data that may contain reorganized blocks (default: 3). + pub skip_latest_segments: u64, } impl Default for CompactionAlgorithmConfig { @@ -89,6 +94,7 @@ impl Default for CompactionAlgorithmConfig { bytes: 0, ..Default::default() }, + skip_latest_segments: 3, } } } diff --git a/crates/core/worker-core/src/compaction/plan.rs b/crates/core/worker-core/src/compaction/plan.rs index 44c9cba8c..5120f5bbf 100644 --- a/crates/core/worker-core/src/compaction/plan.rs +++ b/crates/core/worker-core/src/compaction/plan.rs @@ -142,6 +142,11 @@ impl<'a> CompactionPlan<'a> { ) -> CompactionResult> { let chain = table.canonical_segments(); + // Drop the latest N segments from the end of the canonical chain to + // avoid compacting data that may contain reorganized blocks. + let skip = opts.compactor.algorithm.skip_latest_segments as usize; + let chain = &chain[..chain.len().saturating_sub(skip)]; + let size = chain.len(); if size == 0 { return Ok(None); diff --git a/docs/schemas/config/ampd.spec.json b/docs/schemas/config/ampd.spec.json index 650923348..b231ed9b2 100644 --- a/docs/schemas/config/ampd.spec.json +++ b/docs/schemas/config/ampd.spec.json @@ -140,7 +140,7 @@ "minimum": 0 }, "cooldown_duration": { - "description": "Base cooldown duration in seconds between compaction runs (default: 1024).", + "description": "Minimum time in seconds before a file can initiate a new compaction group.\nPrevents excessive I/O from rewriting large files too frequently (default: 1024).", "$ref": "#/$defs/ConfigDuration" }, "metadata_concurrency": { @@ -164,6 +164,13 @@ "format": "uint64", "minimum": 0 }, + "skip_latest_segments": { + "description": "Number of latest segments to skip during compaction. Avoids compacting\nrecently-written data that may contain reorganized blocks (default: 3).", + "type": "integer", + "format": "uint64", + "default": 3, + "minimum": 0 + }, "write_concurrency": { "description": "Maximum concurrent compaction write operations (default: 2).", "type": "integer", diff --git a/tests/src/tests/it_sql_dataset_batch_size.rs b/tests/src/tests/it_sql_dataset_batch_size.rs index 9fbc8a468..4df67d5a6 100644 --- a/tests/src/tests/it_sql_dataset_batch_size.rs +++ b/tests/src/tests/it_sql_dataset_batch_size.rs @@ -174,6 +174,7 @@ impl TestCtx { opts_mut.collector.interval = Duration::ZERO; opts_mut.compactor.interval = Duration::ZERO; opts_mut.compactor.algorithm.cooldown_duration = Duration::ZERO; + opts_mut.compactor.algorithm.skip_latest_segments = 0; opts_mut.partition = SegmentSizeLimit::new(100, 0, 0, 0, Generation::default(), 10); let metadata_db = self.ctx.daemon_worker().metadata_db().clone(); let data_store = self.ctx.daemon_server().data_store().clone();