diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 73847b67ed7a7..9df2a8074e41c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -82,7 +82,7 @@ use datafusion_execution::cache::cache_manager::{ }; pub use datafusion_execution::config::SessionConfig; use datafusion_execution::disk_manager::{ - DEFAULT_MAX_TEMP_DIRECTORY_SIZE, DiskManagerBuilder, + DEFAULT_MAX_SPILL_MERGE_FAN_IN, DEFAULT_MAX_TEMP_DIRECTORY_SIZE, DiskManagerBuilder, }; use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::HigherOrderUDF; @@ -1181,6 +1181,19 @@ impl SessionContext { let mut state = self.state.write(); + if key == "max_spill_merge_fan_in" { + let fan_in = value.parse::().map_err(|e| { + DataFusionError::Plan(format!( + "Failed to parse non-negative integer from '{variable}', value '{value}': {e}" + )) + })?; + state + .runtime_env() + .disk_manager + .set_max_spill_merge_fan_in(fan_in); + return Ok(()); + } + let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()); builder = match key { "memory_limit" => { @@ -1224,6 +1237,14 @@ impl SessionContext { let mut state = self.state.write(); + if key == "max_spill_merge_fan_in" { + state + .runtime_env() + .disk_manager + .set_max_spill_merge_fan_in(DEFAULT_MAX_SPILL_MERGE_FAN_IN); + return Ok(()); + } + let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()); match key { "memory_limit" => { diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index a9f57a0793463..9dfb2ce8c03c5 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -227,6 +227,34 @@ async fn test_max_temp_directory_size_enforcement() { ); } +#[tokio::test] +async fn test_max_spill_merge_fan_in_runtime_config() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.max_spill_merge_fan_in = '8'") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(ctx.runtime_env().disk_manager.max_spill_merge_fan_in(), 8); + + ctx.sql("RESET datafusion.runtime.max_spill_merge_fan_in") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(ctx.runtime_env().disk_manager.max_spill_merge_fan_in(), 0); + + let error = ctx + .sql("SET datafusion.runtime.max_spill_merge_fan_in = '-1'") + .await + .unwrap_err() + .to_string(); + assert!(error.contains("Failed to parse non-negative integer")); +} + #[tokio::test] async fn test_test_metadata_cache_limit() { let ctx = SessionContext::new(); diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 070ea5334366e..1c2278937d0bf 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -31,6 +31,7 @@ use tempfile::{Builder, NamedTempFile, TempDir}; use datafusion_common::human_readable_size; pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB +pub const DEFAULT_MAX_SPILL_MERGE_FAN_IN: usize = 0; /// Builder pattern for the [DiskManager] structure #[derive(Clone, Debug)] @@ -40,6 +41,9 @@ pub struct DiskManagerBuilder { /// The maximum amount of data (in bytes) stored inside the temporary directories. /// Default to 100GB max_temp_directory_size: u64, + /// Maximum number of spill files opened by one external merge pass. + /// A value of 0 means unlimited. + max_spill_merge_fan_in: usize, } impl Default for DiskManagerBuilder { @@ -47,6 +51,7 @@ impl Default for DiskManagerBuilder { Self { mode: DiskManagerMode::OsTmpDirectory, max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, + max_spill_merge_fan_in: DEFAULT_MAX_SPILL_MERGE_FAN_IN, } } } @@ -70,12 +75,22 @@ impl DiskManagerBuilder { self } + pub fn set_max_spill_merge_fan_in(&mut self, value: usize) { + self.max_spill_merge_fan_in = value; + } + + pub fn with_max_spill_merge_fan_in(mut self, value: usize) -> Self { + self.set_max_spill_merge_fan_in(value); + self + } + /// Create a DiskManager given the builder pub fn build(self) -> Result { match self.mode { DiskManagerMode::OsTmpDirectory => Ok(DiskManager { local_dirs: Mutex::new(Some(vec![])), max_temp_directory_size: AtomicU64::new(self.max_temp_directory_size), + max_spill_merge_fan_in: AtomicUsize::new(self.max_spill_merge_fan_in), used_disk_space: Arc::new(AtomicU64::new(0)), active_files_count: Arc::new(AtomicUsize::new(0)), }), @@ -87,6 +102,7 @@ impl DiskManagerBuilder { Ok(DiskManager { local_dirs: Mutex::new(Some(local_dirs)), max_temp_directory_size: AtomicU64::new(self.max_temp_directory_size), + max_spill_merge_fan_in: AtomicUsize::new(self.max_spill_merge_fan_in), used_disk_space: Arc::new(AtomicU64::new(0)), active_files_count: Arc::new(AtomicUsize::new(0)), }) @@ -94,6 +110,7 @@ impl DiskManagerBuilder { DiskManagerMode::Disabled => Ok(DiskManager { local_dirs: Mutex::new(None), max_temp_directory_size: AtomicU64::new(self.max_temp_directory_size), + max_spill_merge_fan_in: AtomicUsize::new(self.max_spill_merge_fan_in), used_disk_space: Arc::new(AtomicU64::new(0)), active_files_count: Arc::new(AtomicUsize::new(0)), }), @@ -170,6 +187,9 @@ pub struct DiskManager { /// Default to 100GB. Stored as `AtomicU64` so it can be adjusted at runtime /// without requiring exclusive (`&mut`) access to the `DiskManager`. max_temp_directory_size: AtomicU64, + /// Maximum number of spill files opened by one external merge pass. + /// A value of 0 preserves the memory-driven, unbounded behavior. + max_spill_merge_fan_in: AtomicUsize, /// Used disk space in the temporary directories. Now only spilled data for /// external executors are counted. used_disk_space: Arc, @@ -201,6 +221,7 @@ impl DiskManager { DiskManagerConfig::NewOs => Ok(Arc::new(Self { local_dirs: Mutex::new(Some(vec![])), max_temp_directory_size: AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE), + max_spill_merge_fan_in: AtomicUsize::new(DEFAULT_MAX_SPILL_MERGE_FAN_IN), used_disk_space: Arc::new(AtomicU64::new(0)), active_files_count: Arc::new(AtomicUsize::new(0)), })), @@ -214,6 +235,9 @@ impl DiskManager { max_temp_directory_size: AtomicU64::new( DEFAULT_MAX_TEMP_DIRECTORY_SIZE, ), + max_spill_merge_fan_in: AtomicUsize::new( + DEFAULT_MAX_SPILL_MERGE_FAN_IN, + ), used_disk_space: Arc::new(AtomicU64::new(0)), active_files_count: Arc::new(AtomicUsize::new(0)), })) @@ -221,6 +245,7 @@ impl DiskManager { DiskManagerConfig::Disabled => Ok(Arc::new(Self { local_dirs: Mutex::new(None), max_temp_directory_size: AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE), + max_spill_merge_fan_in: AtomicUsize::new(DEFAULT_MAX_SPILL_MERGE_FAN_IN), used_disk_space: Arc::new(AtomicU64::new(0)), active_files_count: Arc::new(AtomicUsize::new(0)), })), @@ -277,6 +302,22 @@ impl DiskManager { self.max_temp_directory_size.load(Ordering::Relaxed) } + /// Atomically set the maximum spill merge fan-in. + /// + /// A value of 0 disables the cap. Values of 1 are accepted but external + /// merge code will still merge at least two spill streams to make progress. + pub fn set_max_spill_merge_fan_in(&self, max_spill_merge_fan_in: usize) { + self.max_spill_merge_fan_in + .store(max_spill_merge_fan_in, Ordering::Relaxed); + } + + /// Returns the maximum number of spill files opened by one merge pass. + /// + /// A value of 0 means unlimited. + pub fn max_spill_merge_fan_in(&self) -> usize { + self.max_spill_merge_fan_in.load(Ordering::Relaxed) + } + /// Returns the current spilling progress pub fn spilling_progress(&self) -> SpillingProgress { SpillingProgress { @@ -875,6 +916,25 @@ mod tests { Ok(()) } + #[test] + fn test_max_spill_merge_fan_in_builder_and_dynamic_update() -> Result<()> { + let dm = Arc::new( + DiskManager::builder() + .with_max_spill_merge_fan_in(8) + .build()?, + ); + + assert_eq!(dm.max_spill_merge_fan_in(), 8); + + dm.set_max_spill_merge_fan_in(4); + assert_eq!(dm.max_spill_merge_fan_in(), 4); + + dm.set_max_spill_merge_fan_in(0); + assert_eq!(dm.max_spill_merge_fan_in(), 0); + + Ok(()) + } + #[test] fn test_disabled_disk_manager_rejects_nonzero_limit() -> Result<()> { let dm = DiskManager::builder() diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 5b90f28a141ef..a8d50c658e90e 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -96,15 +96,28 @@ impl Debug for RuntimeEnv { /// This helper function defines the structure and metadata for all runtime configuration /// entries to avoid duplication between `RuntimeEnv::config_entries()` and /// `RuntimeEnvBuilder::entries()`. -fn create_runtime_config_entries( +struct RuntimeConfigValues { memory_limit: Option, max_temp_directory_size: Option, + max_spill_merge_fan_in: Option, temp_directory: Option, metadata_cache_limit: Option, list_files_cache_limit: Option, list_files_cache_ttl: Option, file_statistics_cache_limit: Option, -) -> Vec { +} + +fn create_runtime_config_entries(values: RuntimeConfigValues) -> Vec { + let RuntimeConfigValues { + memory_limit, + max_temp_directory_size, + max_spill_merge_fan_in, + temp_directory, + metadata_cache_limit, + list_files_cache_limit, + list_files_cache_ttl, + file_statistics_cache_limit, + } = values; vec![ ConfigEntry { key: "datafusion.runtime.memory_limit".to_string(), @@ -116,6 +129,11 @@ fn create_runtime_config_entries( value: max_temp_directory_size, description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.", }, + ConfigEntry { + key: "datafusion.runtime.max_spill_merge_fan_in".to_string(), + value: max_spill_merge_fan_in, + description: "Maximum number of spill files opened by one external merge pass. Use 0 for unlimited. Values below 2 still use 2 so a merge can make progress.", + }, ConfigEntry { key: "datafusion.runtime.temp_directory".to_string(), value: temp_directory, @@ -269,6 +287,8 @@ impl RuntimeEnv { let max_temp_dir_size = self.disk_manager.max_temp_directory_size(); let max_temp_dir_value = format_byte_size(max_temp_dir_size); + let max_spill_merge_fan_in = + self.disk_manager.max_spill_merge_fan_in().to_string(); let temp_paths = self.disk_manager.temp_dir_paths(); let temp_dir_value = if temp_paths.is_empty() { @@ -310,15 +330,16 @@ impl RuntimeEnv { .expect("File statistics cache size conversion failed"), ); - create_runtime_config_entries( - memory_limit_value, - Some(max_temp_dir_value), - temp_dir_value, - Some(metadata_cache_value), - Some(list_files_cache_value), + create_runtime_config_entries(RuntimeConfigValues { + memory_limit: memory_limit_value, + max_temp_directory_size: Some(max_temp_dir_value), + max_spill_merge_fan_in: Some(max_spill_merge_fan_in), + temp_directory: temp_dir_value, + metadata_cache_limit: Some(metadata_cache_value), + list_files_cache_limit: Some(list_files_cache_value), list_files_cache_ttl, - Some(file_statistics_cache_value), - ) + file_statistics_cache_limit: Some(file_statistics_cache_value), + }) } } @@ -435,6 +456,14 @@ impl RuntimeEnvBuilder { self.with_disk_manager_builder(builder.with_max_temp_directory_size(size)) } + /// Limit the number of spill files opened by one external merge pass. + /// + /// A value of 0 means unlimited. + pub fn with_max_spill_merge_fan_in(mut self, fan_in: usize) -> Self { + let builder = self.disk_manager_builder.take().unwrap_or_default(); + self.with_disk_manager_builder(builder.with_max_spill_merge_fan_in(fan_in)) + } + /// Specify the limit of the file-embedded metadata cache, in bytes. pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self { self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit); @@ -528,15 +557,16 @@ impl RuntimeEnvBuilder { /// Returns a list of all available runtime configurations with their current values and descriptions pub fn entries(&self) -> Vec { - create_runtime_config_entries( - None, - Some("100G".to_string()), - None, - Some("50M".to_owned()), - Some("1M".to_owned()), - None, - Some("20M".to_owned()), - ) + create_runtime_config_entries(RuntimeConfigValues { + memory_limit: None, + max_temp_directory_size: Some("100G".to_string()), + max_spill_merge_fan_in: Some("0".to_string()), + temp_directory: None, + metadata_cache_limit: Some("50M".to_owned()), + list_files_cache_limit: Some("1M".to_owned()), + list_files_cache_ttl: None, + file_statistics_cache_limit: Some("20M".to_owned()), + }) } /// Generate documentation that can be included in the user guide diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index 8985e1d8c70ee..9b2c517eb1d8c 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -373,6 +373,12 @@ impl MultiLevelMergeBuilder { ) -> Result<(Vec, usize)> { assert_ne!(buffer_len, 0, "Buffer length must be greater than 0"); let mut number_of_spills_to_read_for_current_phase = 0; + let configured_fan_in = self + .spill_manager + .env() + .disk_manager + .max_spill_merge_fan_in(); + let max_spill_files = effective_spill_merge_fan_in(configured_fan_in); // Track total memory needed for spill file buffers. When the // reservation has pre-reserved bytes (from sort_spill_reservation_bytes), // those bytes cover the first N spill files without additional pool @@ -380,6 +386,10 @@ impl MultiLevelMergeBuilder { let mut total_needed: usize = 0; for spill in &self.sorted_spill_files { + if number_of_spills_to_read_for_current_phase >= max_spill_files { + break; + } + let per_spill = get_reserved_bytes_for_record_batch_size( spill.max_record_batch_memory, // Size will be the same as the sliced size, bc it is a spilled batch. @@ -431,6 +441,14 @@ impl MultiLevelMergeBuilder { } } +fn effective_spill_merge_fan_in(configured_fan_in: usize) -> usize { + if configured_fan_in == 0 { + usize::MAX + } else { + configured_fan_in.max(2) + } +} + struct StreamAttachedReservation { stream: SendableRecordBatchStream, reservation: MemoryReservation, @@ -481,3 +499,20 @@ impl RecordBatchStream for StreamAttachedReservation { self.stream.schema() } } + +#[cfg(test)] +mod tests { + use super::effective_spill_merge_fan_in; + + #[test] + fn spill_merge_fan_in_is_unlimited_by_default() { + assert_eq!(effective_spill_merge_fan_in(0), usize::MAX); + } + + #[test] + fn spill_merge_fan_in_preserves_merge_progress() { + assert_eq!(effective_spill_merge_fan_in(1), 2); + assert_eq!(effective_spill_merge_fan_in(2), 2); + assert_eq!(effective_spill_merge_fan_in(8), 8); + } +} diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 365a9f977eace..c75c34f472aeb 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -76,6 +76,10 @@ impl SpillManager { &self.schema } + pub(crate) fn env(&self) -> &RuntimeEnv { + &self.env + } + /// Creates a temporary file for in-progress operations, returning an error /// message if file creation fails. The file can be used to append batches /// incrementally and then finish the file when done. diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 430d2935157ba..1653fd40588dd 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -343,6 +343,7 @@ datafusion.optimizer.use_statistics_registry false datafusion.runtime.file_statistics_cache_limit 20M datafusion.runtime.list_files_cache_limit 1M datafusion.runtime.list_files_cache_ttl NULL +datafusion.runtime.max_spill_merge_fan_in 0 datafusion.runtime.max_temp_directory_size 100G datafusion.runtime.memory_limit unlimited datafusion.runtime.metadata_cache_limit 50M @@ -500,6 +501,7 @@ datafusion.optimizer.use_statistics_registry false When set to true, the physica datafusion.runtime.file_statistics_cache_limit 20M Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. datafusion.runtime.list_files_cache_limit 1M Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. datafusion.runtime.list_files_cache_ttl NULL TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. +datafusion.runtime.max_spill_merge_fan_in 0 Maximum number of spill files opened by one external merge pass. Use 0 for unlimited. Values below 2 still use 2 so a merge can make progress. datafusion.runtime.max_temp_directory_size 100G Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. datafusion.runtime.memory_limit unlimited Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. datafusion.runtime.metadata_cache_limit 50M Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index 6f58e5fb3100b..c4c1af2703fe5 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -611,6 +611,18 @@ SHOW datafusion.runtime.max_temp_directory_size ---- datafusion.runtime.max_temp_directory_size 10G +# Test SET and SHOW runtime.max_spill_merge_fan_in +statement ok +SET datafusion.runtime.max_spill_merge_fan_in = '16' + +query TT +SHOW datafusion.runtime.max_spill_merge_fan_in +---- +datafusion.runtime.max_spill_merge_fan_in 16 + +statement ok +RESET datafusion.runtime.max_spill_merge_fan_in + # Test SET and SHOW runtime.file_statistics_cache_limit statement ok SET datafusion.runtime.file_statistics_cache_limit = '42M' @@ -669,6 +681,7 @@ SELECT name FROM information_schema.df_settings WHERE name LIKE 'datafusion.runt datafusion.runtime.file_statistics_cache_limit datafusion.runtime.list_files_cache_limit datafusion.runtime.list_files_cache_ttl +datafusion.runtime.max_spill_merge_fan_in datafusion.runtime.max_temp_directory_size datafusion.runtime.memory_limit datafusion.runtime.metadata_cache_limit diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f70daef317216..7bab57764b473 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -243,6 +243,7 @@ The following runtime configuration settings are available: | datafusion.runtime.file_statistics_cache_limit | 20M | Maximum memory to use for file statistics cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. | | datafusion.runtime.list_files_cache_limit | 1M | Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. | | datafusion.runtime.list_files_cache_ttl | NULL | TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. | +| datafusion.runtime.max_spill_merge_fan_in | 0 | Maximum number of spill files opened by one external merge pass. Use 0 for unlimited. Values below 2 still use 2 so a merge can make progress. | | datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. | | datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. | | datafusion.runtime.metadata_cache_limit | 50M | Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes. |