diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index d3c5d78040683..56e3cd6985ccc 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -335,7 +335,8 @@ fn get_session_config(args: &Args) -> Result { if batch_size == 0 { return config_err!("batch_size must be greater than 0"); } - config_options.execution.batch_size = batch_size; + config_options.execution.batch_size = + datafusion_common::config::ConfigNonZeroUsize::try_new(batch_size)?; }; // use easier to understand "tree" mode by default diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 536afbfed4613..cc263dfe3e619 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -21,7 +21,7 @@ use arrow_ipc::CompressionType; #[cfg(feature = "parquet_encryption")] use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; -use crate::error::_config_err; +use crate::error::{_config_datafusion_err, _config_err}; use crate::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType}; use crate::parquet_config::DFParquetWriterVersion; use crate::parsers::{CompressionTypeVariant, CsvQuoteStyle}; @@ -33,6 +33,7 @@ use std::any::Any; use std::collections::{BTreeMap, HashMap}; use std::error::Error; use std::fmt::{self, Display}; +use std::num::NonZeroUsize; use std::str::FromStr; #[cfg(feature = "parquet_encryption")] use std::sync::Arc; @@ -582,6 +583,86 @@ impl Display for SpillCompression { } } +/// A `usize` configuration value that rejects zero when set from strings. +/// +/// Use this for options where zero is never a meaningful runtime value. +/// Invalid values return a configuration error through [`ConfigField`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ConfigNonZeroUsize(NonZeroUsize); + +/// Private helper for hard-coded defaults in `config_namespace!`, which cannot +/// use `?`. All external construction should use [`ConfigNonZeroUsize::try_new`]. +const fn non_zero_usize_default(value: usize) -> ConfigNonZeroUsize { + match NonZeroUsize::new(value) { + Some(value) => ConfigNonZeroUsize(value), + None => panic!("value must be greater than 0"), + } +} + +impl ConfigNonZeroUsize { + /// Creates a [`ConfigNonZeroUsize`], returning a configuration error if + /// `value` is zero. + pub fn try_new(value: usize) -> Result { + NonZeroUsize::new(value) + .map(Self) + .ok_or_else(|| _config_datafusion_err!("value must be greater than 0")) + } + + /// Returns the wrapped `usize`. + pub const fn get(self) -> usize { + self.0.get() + } +} + +impl From for usize { + fn from(value: ConfigNonZeroUsize) -> Self { + value.get() + } +} + +impl FromStr for ConfigNonZeroUsize { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + Self::try_new(default_config_transform(s)?) + } +} + +impl ConfigField for ConfigNonZeroUsize { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + if !key.is_empty() { + return _config_err!( + "Config field batch_size is a scalar ConfigNonZeroUsize and does not have nested field \"{}\"", + key + ); + } + + *self = ConfigNonZeroUsize::from_str(value)?; + Ok(()) + } + + fn reset(&mut self, key: &str) -> Result<()> { + if key.is_empty() { + Ok(()) + } else { + _config_err!( + "Config field batch_size is a scalar ConfigNonZeroUsize and does not have nested field \"{}\"", + key + ) + } + } +} + +impl Display for ConfigNonZeroUsize { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.get()) + } +} + /// Policy for handling duplicate keys in Spark-compatible map-construction /// functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors /// Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). @@ -649,7 +730,7 @@ config_namespace! { /// Default batch size while creating new batches, it's especially useful for /// buffer-in-memory batches since creating tiny batches would result in too much /// metadata memory consumption - pub batch_size: usize, default = 8192 + pub batch_size: ConfigNonZeroUsize, default = non_zero_usize_default(8192) /// A perfect hash join (see `HashJoinExec` for more details) will be considered /// if the range of keys (max - min) on the build side is < this threshold. diff --git a/datafusion/core/tests/config_from_env.rs b/datafusion/core/tests/config_from_env.rs index 6375d4e25d8eb..6b09a6367deaa 100644 --- a/datafusion/core/tests/config_from_env.rs +++ b/datafusion/core/tests/config_from_env.rs @@ -45,7 +45,7 @@ fn from_env() { // for valid testing env::set_var(env_key, "4096"); let config = ConfigOptions::from_env().unwrap(); - assert_eq!(config.execution.batch_size, 4096); + assert_eq!(config.execution.batch_size.get(), 4096); // for invalid testing env::set_var(env_key, "abc"); @@ -57,6 +57,6 @@ fn from_env() { env::remove_var(env_key); let config = ConfigOptions::from_env().unwrap(); - assert_eq!(config.execution.batch_size, 8192); // set to its default value + assert_eq!(config.execution.batch_size.get(), 8192); // set to its default value } } diff --git a/datafusion/core/tests/execution/datasource_split.rs b/datafusion/core/tests/execution/datasource_split.rs index 370249cd8044e..171e8736496a3 100644 --- a/datafusion/core/tests/execution/datasource_split.rs +++ b/datafusion/core/tests/execution/datasource_split.rs @@ -61,6 +61,7 @@ async fn datasource_splits_large_batches() -> datafusion_common::Result<()> { .options() .execution .batch_size + .get() ); let total: usize = batches.iter().map(|b| b.num_rows()).sum(); assert_eq!(total, batch_size); @@ -70,7 +71,7 @@ async fn datasource_splits_large_batches() -> datafusion_common::Result<()> { #[tokio::test] async fn datasource_exact_batch_size_no_split() -> datafusion_common::Result<()> { let session_config = datafusion_execution::config::SessionConfig::new(); - let configured_batch_size = session_config.options().execution.batch_size; + let configured_batch_size = session_config.options().execution.batch_size.get(); let batches = create_and_collect_batches(configured_batch_size).await?; diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index e6266b2c088d7..5dfcd50c014c9 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -648,7 +648,8 @@ async fn predicate_cache_stats_issue_19561() -> datafusion_common::Result<()> { let mut config = SessionConfig::new(); config.options_mut().execution.parquet.pushdown_filters = true; // force to get multiple batches to trigger repeated metric compound bug - config.options_mut().execution.batch_size = 1; + config.options_mut().execution.batch_size = + datafusion_common::config::ConfigNonZeroUsize::try_new(1)?; let ctx = SessionContext::new_with_config(config); // The cache is on by default, and used when filter pushdown is enabled PredicateCacheTest { diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 426e1fa745e54..942432239612e 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -570,7 +570,9 @@ fn test_suite_default_config_options() -> ConfigOptions { config.execution.target_partitions = 10; // Use a small batch size, to trigger RoundRobin in tests - config.execution.batch_size = 1; + config.execution.batch_size = + datafusion_common::config::ConfigNonZeroUsize::try_new(1) + .expect("test batch size must be greater than zero"); config } diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index a9f57a0793463..604d137540598 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -115,7 +115,7 @@ async fn test_multiple_configs() { assert!(result.is_ok(), "Should not fail due to memory limit"); let state = ctx.state(); - let batch_size = state.config().options().execution.batch_size; + let batch_size = state.config().options().execution.batch_size.get(); assert_eq!(batch_size, 2048); } diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index b2917a4583628..0a2a98eab6225 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -19,7 +19,7 @@ use std::{collections::HashMap, sync::Arc}; use datafusion_common::{ Result, ScalarValue, - config::{ConfigExtension, ConfigOptions, SpillCompression}, + config::{ConfigExtension, ConfigNonZeroUsize, ConfigOptions, SpillCompression}, extensions::Extensions, }; @@ -51,7 +51,7 @@ use datafusion_common::{ /// .set_bool("datafusion.execution.parquet.pushdown_filters", true); /// /// assert_eq!(config.batch_size(), 1234); -/// assert_eq!(config.options().execution.batch_size, 1234); +/// assert_eq!(config.options().execution.batch_size.get(), 1234); /// assert_eq!(config.options().execution.parquet.pushdown_filters, true); /// ``` /// @@ -60,15 +60,16 @@ use datafusion_common::{ /// /// ``` /// # use datafusion_execution::config::SessionConfig; -/// # use datafusion_common::ScalarValue; +/// # use datafusion_common::config::ConfigNonZeroUsize; /// # /// let mut config = SessionConfig::new(); -/// config.options_mut().execution.batch_size = 1234; +/// config.options_mut().execution.batch_size = ConfigNonZeroUsize::try_new(1234)?; /// config.options_mut().execution.parquet.pushdown_filters = true; /// # /// # assert_eq!(config.batch_size(), 1234); -/// # assert_eq!(config.options().execution.batch_size, 1234); +/// # assert_eq!(config.options().execution.batch_size.get(), 1234); /// # assert_eq!(config.options().execution.parquet.pushdown_filters, true); +/// # datafusion_common::Result::<()>::Ok(()) /// ``` /// /// ## Built-in options @@ -137,7 +138,7 @@ impl SessionConfig { /// use datafusion_execution::config::SessionConfig; /// /// let config = SessionConfig::new(); - /// assert!(config.options().execution.batch_size > 0); + /// assert!(config.options().execution.batch_size.get() > 0); /// ``` pub fn options(&self) -> &Arc { &self.options @@ -148,11 +149,13 @@ impl SessionConfig { /// Can be used to set configuration options. /// /// ``` + /// use datafusion_common::config::ConfigNonZeroUsize; /// use datafusion_execution::config::SessionConfig; /// /// let mut config = SessionConfig::new(); - /// config.options_mut().execution.batch_size = 1024; - /// assert_eq!(config.options().execution.batch_size, 1024); + /// config.options_mut().execution.batch_size = ConfigNonZeroUsize::try_new(1024)?; + /// assert_eq!(config.options().execution.batch_size.get(), 1024); + /// # datafusion_common::Result::<()>::Ok(()) /// ``` pub fn options_mut(&mut self) -> &mut ConfigOptions { Arc::make_mut(&mut self.options) @@ -186,9 +189,8 @@ impl SessionConfig { /// Customize batch size pub fn with_batch_size(mut self, n: usize) -> Self { - // batch size must be greater than zero - assert!(n > 0); - self.options_mut().execution.batch_size = n; + self.options_mut().execution.batch_size = + ConfigNonZeroUsize::try_new(n).expect("batch size must be greater than zero"); self } @@ -391,7 +393,7 @@ impl SessionConfig { /// Get the currently configured batch size pub fn batch_size(&self) -> usize { - self.options.execution.batch_size + self.options.execution.batch_size.get() } /// Enables or disables the coalescence of small batches into larger batches diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index 52baa7e6cf8ef..0e8eca6bc2561 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -484,7 +484,7 @@ impl TableProvider for GenerateSeriesTable { _filters: &[Expr], _limit: Option, ) -> Result> { - let batch_size = state.config_options().execution.batch_size; + let batch_size = state.config_options().execution.batch_size.get(); let generator = self.as_generator(batch_size)?; let mut exec = LazyMemoryExec::try_new(self.schema(), vec![generator])? .with_projection(projection.cloned()); diff --git a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs index 6d9550fa50072..76cb59a305a5f 100644 --- a/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/ensure_requirements/enforce_distribution.rs @@ -1080,7 +1080,7 @@ pub fn ensure_distribution( // When `false`, round robin repartition will not be added to increase parallelism let enable_round_robin = config.optimizer.enable_round_robin_repartition; let repartition_file_scans = config.optimizer.repartition_file_scans; - let batch_size = config.execution.batch_size; + let batch_size = config.execution.batch_size.get(); let should_use_estimates = config .execution .use_row_number_estimates_to_optimize_partitioning; diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index 94860e54caa57..efac83bbbe5ba 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -208,7 +208,7 @@ impl ExecutionPlan for AsyncFuncExec { input_stream, batch_coalescer: LimitedBatchCoalescer::new( Arc::clone(&self.input.schema()), - config_options_ref.execution.batch_size, + config_options_ref.execution.batch_size.get(), None, ), }; diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index dcf3a7baad435..77a7d8f8f2e11 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -468,7 +468,8 @@ mod tests { .with_memory_limit(20_000_000, 1.0) .build_arc()?; let mut config = SessionConfig::new(); - config.options_mut().execution.batch_size = target_batch_size; + config.options_mut().execution.batch_size = + datafusion_common::config::ConfigNonZeroUsize::try_new(target_batch_size)?; let task_ctx = TaskContext::default() .with_runtime(runtime) .with_session_config(config); diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index 6f58e5fb3100b..0514deba28a33 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -104,12 +104,12 @@ statement ok set datafusion.catalog.information_schema = true statement ok -SET datafusion.execution.batch_size to 0 +SET datafusion.execution.batch_size to 310104 query TT SHOW datafusion.execution.batch_size ---- -datafusion.execution.batch_size 0 +datafusion.execution.batch_size 310104 statement ok SET datafusion.execution.batch_size to '1' @@ -382,7 +382,7 @@ statement error DataFusion error: Invalid or Unsupported Configuration: Config v RESET datafusion.execution.batches_size # reset invalid variable - extra suffix on valid field -statement error DataFusion error: Invalid or Unsupported Configuration: Config field is a scalar usize and does not have nested field "bar" +statement error DataFusion error: Invalid or Unsupported Configuration: Config field batch_size is a scalar ConfigNonZeroUsize and does not have nested field "bar" RESET datafusion.execution.batch_size.bar ############################################# @@ -707,6 +707,10 @@ SET datafusion.runtime.list_files_cache_ttl = '1m18446744073709551555s' statement error DataFusion error: Error during planning: Duration has overflowed allowed maximum limit due to 'mins \* 60 \+ secs' when setting 'datafusion\.runtime\.list_files_cache_ttl' SET datafusion.runtime.list_files_cache_ttl = '1m18446744073709551556s' +# Set invalid value and ensures error +statement error DataFusion error: Invalid or Unsupported Configuration: value must be greater than 0 +SET datafusion.execution.batch_size = 0 + # Config reset statement ok RESET datafusion.catalog.create_default_catalog_and_schema