Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use datafusion_execution::cache::cache_manager::{
};
pub use datafusion_execution::config::SessionConfig;
use datafusion_execution::disk_manager::{
DEFAULT_MAX_TEMP_DIRECTORY_SIZE, DiskManagerBuilder,
DEFAULT_MAX_SPILL_MERGE_FAN_IN, DEFAULT_MAX_TEMP_DIRECTORY_SIZE, DiskManagerBuilder,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::HigherOrderUDF;
Expand Down Expand Up @@ -1181,6 +1181,19 @@ impl SessionContext {

let mut state = self.state.write();

if key == "max_spill_merge_fan_in" {
let fan_in = value.parse::<usize>().map_err(|e| {
DataFusionError::Plan(format!(
"Failed to parse non-negative integer from '{variable}', value '{value}': {e}"
))
})?;
state
.runtime_env()
.disk_manager
.set_max_spill_merge_fan_in(fan_in);
return Ok(());
}

let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
builder = match key {
"memory_limit" => {
Expand Down Expand Up @@ -1224,6 +1237,14 @@ impl SessionContext {

let mut state = self.state.write();

if key == "max_spill_merge_fan_in" {
state
.runtime_env()
.disk_manager
.set_max_spill_merge_fan_in(DEFAULT_MAX_SPILL_MERGE_FAN_IN);
return Ok(());
}

let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
match key {
"memory_limit" => {
Expand Down
28 changes: 28 additions & 0 deletions datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,34 @@ async fn test_max_temp_directory_size_enforcement() {
);
}

#[tokio::test]
async fn test_max_spill_merge_fan_in_runtime_config() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.max_spill_merge_fan_in = '8'")
.await
.unwrap()
.collect()
.await
.unwrap();
assert_eq!(ctx.runtime_env().disk_manager.max_spill_merge_fan_in(), 8);

ctx.sql("RESET datafusion.runtime.max_spill_merge_fan_in")
.await
.unwrap()
.collect()
.await
.unwrap();
assert_eq!(ctx.runtime_env().disk_manager.max_spill_merge_fan_in(), 0);

let error = ctx
.sql("SET datafusion.runtime.max_spill_merge_fan_in = '-1'")
.await
.unwrap_err()
.to_string();
assert!(error.contains("Failed to parse non-negative integer"));
}

#[tokio::test]
async fn test_test_metadata_cache_limit() {
let ctx = SessionContext::new();
Expand Down
60 changes: 60 additions & 0 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tempfile::{Builder, NamedTempFile, TempDir};
use datafusion_common::human_readable_size;

pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
pub const DEFAULT_MAX_SPILL_MERGE_FAN_IN: usize = 0;

/// Builder pattern for the [DiskManager] structure
#[derive(Clone, Debug)]
Expand All @@ -40,13 +41,17 @@ pub struct DiskManagerBuilder {
/// The maximum amount of data (in bytes) stored inside the temporary directories.
/// Default to 100GB
max_temp_directory_size: u64,
/// Maximum number of spill files opened by one external merge pass.
/// A value of 0 means unlimited.
max_spill_merge_fan_in: usize,
}

impl Default for DiskManagerBuilder {
fn default() -> Self {
Self {
mode: DiskManagerMode::OsTmpDirectory,
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
max_spill_merge_fan_in: DEFAULT_MAX_SPILL_MERGE_FAN_IN,
}
}
}
Expand All @@ -70,12 +75,22 @@ impl DiskManagerBuilder {
self
}

pub fn set_max_spill_merge_fan_in(&mut self, value: usize) {
self.max_spill_merge_fan_in = value;
}

pub fn with_max_spill_merge_fan_in(mut self, value: usize) -> Self {
self.set_max_spill_merge_fan_in(value);
self
}

/// Create a DiskManager given the builder
pub fn build(self) -> Result<DiskManager> {
match self.mode {
DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: AtomicU64::new(self.max_temp_directory_size),
max_spill_merge_fan_in: AtomicUsize::new(self.max_spill_merge_fan_in),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}),
Expand All @@ -87,13 +102,15 @@ impl DiskManagerBuilder {
Ok(DiskManager {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: AtomicU64::new(self.max_temp_directory_size),
max_spill_merge_fan_in: AtomicUsize::new(self.max_spill_merge_fan_in),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})
}
DiskManagerMode::Disabled => Ok(DiskManager {
local_dirs: Mutex::new(None),
max_temp_directory_size: AtomicU64::new(self.max_temp_directory_size),
max_spill_merge_fan_in: AtomicUsize::new(self.max_spill_merge_fan_in),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}),
Expand Down Expand Up @@ -170,6 +187,9 @@ pub struct DiskManager {
/// Default to 100GB. Stored as `AtomicU64` so it can be adjusted at runtime
/// without requiring exclusive (`&mut`) access to the `DiskManager`.
max_temp_directory_size: AtomicU64,
/// Maximum number of spill files opened by one external merge pass.
/// A value of 0 preserves the memory-driven, unbounded behavior.
max_spill_merge_fan_in: AtomicUsize,
/// Used disk space in the temporary directories. Now only spilled data for
/// external executors are counted.
used_disk_space: Arc<AtomicU64>,
Expand Down Expand Up @@ -201,6 +221,7 @@ impl DiskManager {
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
max_spill_merge_fan_in: AtomicUsize::new(DEFAULT_MAX_SPILL_MERGE_FAN_IN),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
Expand All @@ -214,13 +235,17 @@ impl DiskManager {
max_temp_directory_size: AtomicU64::new(
DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
),
max_spill_merge_fan_in: AtomicUsize::new(
DEFAULT_MAX_SPILL_MERGE_FAN_IN,
),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
max_spill_merge_fan_in: AtomicUsize::new(DEFAULT_MAX_SPILL_MERGE_FAN_IN),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
Expand Down Expand Up @@ -277,6 +302,22 @@ impl DiskManager {
self.max_temp_directory_size.load(Ordering::Relaxed)
}

/// Atomically set the maximum spill merge fan-in.
///
/// A value of 0 disables the cap. Values of 1 are accepted but external
/// merge code will still merge at least two spill streams to make progress.
pub fn set_max_spill_merge_fan_in(&self, max_spill_merge_fan_in: usize) {
self.max_spill_merge_fan_in
.store(max_spill_merge_fan_in, Ordering::Relaxed);
}

/// Returns the maximum number of spill files opened by one merge pass.
///
/// A value of 0 means unlimited.
pub fn max_spill_merge_fan_in(&self) -> usize {
self.max_spill_merge_fan_in.load(Ordering::Relaxed)
}

/// Returns the current spilling progress
pub fn spilling_progress(&self) -> SpillingProgress {
SpillingProgress {
Expand Down Expand Up @@ -875,6 +916,25 @@ mod tests {
Ok(())
}

#[test]
fn test_max_spill_merge_fan_in_builder_and_dynamic_update() -> Result<()> {
let dm = Arc::new(
DiskManager::builder()
.with_max_spill_merge_fan_in(8)
.build()?,
);

assert_eq!(dm.max_spill_merge_fan_in(), 8);

dm.set_max_spill_merge_fan_in(4);
assert_eq!(dm.max_spill_merge_fan_in(), 4);

dm.set_max_spill_merge_fan_in(0);
assert_eq!(dm.max_spill_merge_fan_in(), 0);

Ok(())
}

#[test]
fn test_disabled_disk_manager_rejects_nonzero_limit() -> Result<()> {
let dm = DiskManager::builder()
Expand Down
68 changes: 49 additions & 19 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,28 @@ impl Debug for RuntimeEnv {
/// This helper function defines the structure and metadata for all runtime configuration
/// entries to avoid duplication between `RuntimeEnv::config_entries()` and
/// `RuntimeEnvBuilder::entries()`.
fn create_runtime_config_entries(
struct RuntimeConfigValues {
memory_limit: Option<String>,
max_temp_directory_size: Option<String>,
max_spill_merge_fan_in: Option<String>,
temp_directory: Option<String>,
metadata_cache_limit: Option<String>,
list_files_cache_limit: Option<String>,
list_files_cache_ttl: Option<String>,
file_statistics_cache_limit: Option<String>,
) -> Vec<ConfigEntry> {
}

fn create_runtime_config_entries(values: RuntimeConfigValues) -> Vec<ConfigEntry> {
let RuntimeConfigValues {
memory_limit,
max_temp_directory_size,
max_spill_merge_fan_in,
temp_directory,
metadata_cache_limit,
list_files_cache_limit,
list_files_cache_ttl,
file_statistics_cache_limit,
} = values;
vec![
ConfigEntry {
key: "datafusion.runtime.memory_limit".to_string(),
Expand All @@ -116,6 +129,11 @@ fn create_runtime_config_entries(
value: max_temp_directory_size,
description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes) or '0' for 0. Example: '2G' for 2 gigabytes.",
},
ConfigEntry {
key: "datafusion.runtime.max_spill_merge_fan_in".to_string(),
value: max_spill_merge_fan_in,
description: "Maximum number of spill files opened by one external merge pass. Use 0 for unlimited. Values below 2 still use 2 so a merge can make progress.",
},
ConfigEntry {
key: "datafusion.runtime.temp_directory".to_string(),
value: temp_directory,
Expand Down Expand Up @@ -269,6 +287,8 @@ impl RuntimeEnv {

let max_temp_dir_size = self.disk_manager.max_temp_directory_size();
let max_temp_dir_value = format_byte_size(max_temp_dir_size);
let max_spill_merge_fan_in =
self.disk_manager.max_spill_merge_fan_in().to_string();

let temp_paths = self.disk_manager.temp_dir_paths();
let temp_dir_value = if temp_paths.is_empty() {
Expand Down Expand Up @@ -310,15 +330,16 @@ impl RuntimeEnv {
.expect("File statistics cache size conversion failed"),
);

create_runtime_config_entries(
memory_limit_value,
Some(max_temp_dir_value),
temp_dir_value,
Some(metadata_cache_value),
Some(list_files_cache_value),
create_runtime_config_entries(RuntimeConfigValues {
memory_limit: memory_limit_value,
max_temp_directory_size: Some(max_temp_dir_value),
max_spill_merge_fan_in: Some(max_spill_merge_fan_in),
temp_directory: temp_dir_value,
metadata_cache_limit: Some(metadata_cache_value),
list_files_cache_limit: Some(list_files_cache_value),
list_files_cache_ttl,
Some(file_statistics_cache_value),
)
file_statistics_cache_limit: Some(file_statistics_cache_value),
})
}
}

Expand Down Expand Up @@ -435,6 +456,14 @@ impl RuntimeEnvBuilder {
self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
}

/// Limit the number of spill files opened by one external merge pass.
///
/// A value of 0 means unlimited.
pub fn with_max_spill_merge_fan_in(mut self, fan_in: usize) -> Self {
let builder = self.disk_manager_builder.take().unwrap_or_default();
self.with_disk_manager_builder(builder.with_max_spill_merge_fan_in(fan_in))
}

/// Specify the limit of the file-embedded metadata cache, in bytes.
pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self {
self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit);
Expand Down Expand Up @@ -528,15 +557,16 @@ impl RuntimeEnvBuilder {

/// Returns a list of all available runtime configurations with their current values and descriptions
pub fn entries(&self) -> Vec<ConfigEntry> {
create_runtime_config_entries(
None,
Some("100G".to_string()),
None,
Some("50M".to_owned()),
Some("1M".to_owned()),
None,
Some("20M".to_owned()),
)
create_runtime_config_entries(RuntimeConfigValues {
memory_limit: None,
max_temp_directory_size: Some("100G".to_string()),
max_spill_merge_fan_in: Some("0".to_string()),
temp_directory: None,
metadata_cache_limit: Some("50M".to_owned()),
list_files_cache_limit: Some("1M".to_owned()),
list_files_cache_ttl: None,
file_statistics_cache_limit: Some("20M".to_owned()),
})
}

/// Generate documentation that can be included in the user guide
Expand Down
Loading