From d48688992e56eb02e15dbcee96bd335d3931daf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 15 Mar 2026 22:03:13 +0100 Subject: [PATCH 01/10] Add --simulate-latency option to dfbench to simulate S3-like object store latency Wraps the local filesystem with ThrottledStore (100ms get, 200ms list) when enabled, allowing benchmarks to measure performance under realistic remote storage conditions. Co-Authored-By: Claude Opus 4.6 (1M context) --- benchmarks/src/clickbench.rs | 4 ++-- benchmarks/src/h2o.rs | 4 ++-- benchmarks/src/hj.rs | 4 ++-- benchmarks/src/imdb/run.rs | 4 ++-- benchmarks/src/nlj.rs | 4 ++-- benchmarks/src/smj.rs | 4 ++-- benchmarks/src/sort_tpch.rs | 4 ++-- benchmarks/src/tpcds/run.rs | 4 ++-- benchmarks/src/tpch/run.rs | 6 ++++-- benchmarks/src/util/options.rs | 36 ++++++++++++++++++++++++++++++++-- 10 files changed, 54 insertions(+), 20 deletions(-) diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index c0f911c566f4d..70aaeb7d2d192 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -207,8 +207,8 @@ impl RunOpt { } } - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); self.register_hits(&ctx).await?; diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index f55dad27cc638..8b6e04932cb39 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -84,8 +84,8 @@ impl RunOpt { }; let config = self.common.config()?; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); // Register tables depending on which h2o benchmark is being run // (groupby/join/window) diff --git a/benchmarks/src/hj.rs b/benchmarks/src/hj.rs index 6eb828a3aedf8..301fe0d599cd6 100644 --- a/benchmarks/src/hj.rs +++ b/benchmarks/src/hj.rs @@ -324,8 +324,8 @@ impl RunOpt { }; let config = self.common.config()?; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); if let Some(path) = &self.path { for table in &["lineitem", "supplier", "nation", "customer"] { diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 29ca5249aa5b3..6d11c4a753dd7 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -312,8 +312,8 @@ impl RunOpt { config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; config.options_mut().execution.hash_join_buffering_capacity = self.hash_join_buffering_capacity; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); // register tables self.register_tables(&ctx).await?; diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs index ade8c0f7789bc..361cc35ec200c 100644 --- a/benchmarks/src/nlj.rs +++ b/benchmarks/src/nlj.rs @@ -207,8 +207,8 @@ impl RunOpt { }; let config = self.common.config()?; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); let mut benchmark_run = BenchmarkRun::new(); for query_id in query_range { diff --git a/benchmarks/src/smj.rs b/benchmarks/src/smj.rs index b420ef1d64c60..5056fd5096156 100644 --- a/benchmarks/src/smj.rs +++ b/benchmarks/src/smj.rs @@ -433,8 +433,8 @@ impl RunOpt { let mut config = self.common.config()?; // Disable hash joins to force SMJ config = config.set_bool("datafusion.optimizer.prefer_hash_join", false); - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); let mut benchmark_run = BenchmarkRun::new(); for query_id in query_range { diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 806f1f6c33d0f..95c90d826de20 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -209,10 +209,10 @@ impl RunOpt { /// Benchmark query `query_id` in `SORT_QUERIES` async fn benchmark_query(&self, query_id: usize) -> Result> { let config = self.common.config()?; - let rt_builder = self.common.runtime_env_builder()?; + let rt = self.common.build_runtime()?; let state = SessionStateBuilder::new() .with_config(config) - .with_runtime_env(rt_builder.build_arc()?) + .with_runtime_env(rt) .with_default_features() .build(); let ctx = SessionContext::from(state); diff --git a/benchmarks/src/tpcds/run.rs b/benchmarks/src/tpcds/run.rs index 8e24b121b2f93..f7ef6991515da 100644 --- a/benchmarks/src/tpcds/run.rs +++ b/benchmarks/src/tpcds/run.rs @@ -168,8 +168,8 @@ impl RunOpt { self.enable_piecewise_merge_join; config.options_mut().execution.hash_join_buffering_capacity = self.hash_join_buffering_capacity; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); // register tables self.register_tables(&ctx).await?; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 392e02f8478b7..0d1268013c168 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -129,8 +129,8 @@ impl RunOpt { self.enable_piecewise_merge_join; config.options_mut().execution.hash_join_buffering_capacity = self.hash_join_buffering_capacity; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); // register tables self.register_tables(&ctx).await?; @@ -386,6 +386,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; let opt = RunOpt { query: Some(query), @@ -425,6 +426,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; let opt = RunOpt { query: Some(query), diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index add8ff17fbf85..12224c75a2639 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -15,18 +15,23 @@ // specific language governing permissions and limitations // under the License. -use std::{num::NonZeroUsize, sync::Arc}; +use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use clap::Args; use datafusion::{ execution::{ disk_manager::DiskManagerBuilder, memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool}, - runtime_env::RuntimeEnvBuilder, + object_store::ObjectStoreUrl, + runtime_env::{RuntimeEnv, RuntimeEnvBuilder}, }, prelude::SessionConfig, }; use datafusion_common::{DataFusionError, Result}; +use object_store::{ + local::LocalFileSystem, + throttle::{ThrottleConfig, ThrottledStore}, +}; // Common benchmark options (don't use doc comments otherwise this doc // shows up in help files) @@ -61,6 +66,11 @@ pub struct CommonOpt { /// Activate debug mode to see more details #[arg(short, long)] pub debug: bool, + + /// Simulate object store latency to mimic remote storage (e.g. S3). + /// Adds random latency in the range 20-200ms to each object store operation. + #[arg(long = "simulate-latency")] + pub simulate_latency: bool, } impl CommonOpt { @@ -114,6 +124,28 @@ impl CommonOpt { } Ok(rt_builder) } + + /// Build the runtime environment, optionally wrapping the local filesystem + /// with a throttled object store to simulate remote storage latency. + pub fn build_runtime(&self) -> Result> { + let rt = self.runtime_env_builder()?.build_arc()?; + if self.simulate_latency { + let config = ThrottleConfig { + wait_get_per_call: Duration::from_millis(100), + wait_list_per_call: Duration::from_millis(200), + wait_list_with_delimiter_per_call: Duration::from_millis(200), + ..Default::default() + }; + let throttled: Arc = + Arc::new(ThrottledStore::new(LocalFileSystem::new(), config)); + let url = ObjectStoreUrl::parse("file:///")?; + rt.register_object_store(url.as_ref(), throttled); + println!( + "Simulating object store latency (get: 100ms, list: 200ms)" + ); + } + Ok(rt) + } } /// Parse capacity limit from string to number of bytes by allowing units: K, M and G. From 508f1a33563d9be2f16e1266d82b0528d24104db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 15 Mar 2026 22:03:52 +0100 Subject: [PATCH 02/10] fmt --- benchmarks/src/util/options.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 12224c75a2639..e3626f386ed42 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -140,9 +140,7 @@ impl CommonOpt { Arc::new(ThrottledStore::new(LocalFileSystem::new(), config)); let url = ObjectStoreUrl::parse("file:///")?; rt.register_object_store(url.as_ref(), throttled); - println!( - "Simulating object store latency (get: 100ms, list: 200ms)" - ); + println!("Simulating object store latency (get: 100ms, list: 200ms)"); } Ok(rt) } From 1aec572d370d173483fef37f66522ee7269cc0b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 15 Mar 2026 22:10:38 +0100 Subject: [PATCH 03/10] Add SIMULATE_LATENCY env var to bench.sh Passes --simulate-latency to all dfbench/imdb benchmark invocations when SIMULATE_LATENCY=true, e.g.: SIMULATE_LATENCY=true ./bench.sh run tpch Co-Authored-By: Claude Opus 4.6 (1M context) --- benchmarks/bench.sh | 43 ++++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 761efa6d591a4..0fc6ede3b3af4 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -42,6 +42,13 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..} DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data} CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"} PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true} +SIMULATE_LATENCY=${SIMULATE_LATENCY:-false} + +# Build latency arg based on SIMULATE_LATENCY setting +LATENCY_ARG="" +if [ "$SIMULATE_LATENCY" = "true" ]; then + LATENCY_ARG="--simulate-latency" +fi usage() { echo " @@ -141,6 +148,7 @@ CARGO_COMMAND command that runs the benchmark binary DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) RESULTS_NAME folder where the benchmark files are stored PREFER_HASH_JOIN Prefer hash join algorithm (default true) +SIMULATE_LATENCY Simulate object store latency to mimic S3 (default false) DATAFUSION_* Set the given datafusion configuration " exit 1 @@ -371,6 +379,7 @@ main() { echo "RESULTS_DIR: ${RESULTS_DIR}" echo "CARGO_COMMAND: ${CARGO_COMMAND}" echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}" + echo "SIMULATE_LATENCY: ${SIMULATE_LATENCY}" echo "***************************" # navigate to the appropriate directory @@ -655,7 +664,7 @@ run_tpch() { echo "Running tpch benchmark..." FORMAT=$2 - debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the tpch in memory (needs tpch parquet data) @@ -671,7 +680,7 @@ run_tpch_mem() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch_mem benchmark..." # -m means in memory - debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the tpcds benchmark @@ -691,7 +700,7 @@ run_tpcds() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpcds benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the compile profile benchmark helper @@ -713,7 +722,7 @@ run_cancellation() { RESULTS_FILE="${RESULTS_DIR}/cancellation.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running cancellation benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}" + debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}" ${LATENCY_ARG} } @@ -767,7 +776,7 @@ run_clickbench_1() { RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the clickbench benchmark with the partitioned parquet dataset (100 files) @@ -775,7 +784,7 @@ run_clickbench_partitioned() { RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (partitioned, 100 files) benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } @@ -784,7 +793,7 @@ run_clickbench_pushdown() { RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } @@ -793,7 +802,7 @@ run_clickbench_extended() { RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) extended benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors) @@ -908,7 +917,7 @@ run_imdb() { RESULTS_FILE="${RESULTS_DIR}/imdb.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running imdb benchmark..." - debug_run $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } data_h2o() { @@ -980,7 +989,7 @@ run_h2o() { --path "${H2O_DIR}/${FILE_NAME}" \ --queries-path "${QUERY_FILE}" \ -o "${RESULTS_FILE}" \ - ${QUERY_ARG} + ${QUERY_ARG} ${LATENCY_ARG} } # Utility function to run h2o join/window benchmark @@ -1032,7 +1041,7 @@ h2o_runner() { --join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \ --queries-path "${QUERY_FILE}" \ -o "${RESULTS_FILE}" \ - ${QUERY_ARG} + ${QUERY_ARG} ${LATENCY_ARG} } # Runners for h2o join benchmark @@ -1073,7 +1082,7 @@ run_sort_tpch() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running sort tpch benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the sort tpch integration benchmark with limit 100 (topk) @@ -1083,7 +1092,7 @@ run_topk_tpch() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running topk tpch benchmark..." - $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} + $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} ${LATENCY_ARG} } # Runs the nlj benchmark @@ -1091,7 +1100,7 @@ run_nlj() { RESULTS_FILE="${RESULTS_DIR}/nlj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running nlj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the hj benchmark @@ -1100,7 +1109,7 @@ run_hj() { RESULTS_FILE="${RESULTS_DIR}/hj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running hj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the smj benchmark @@ -1108,7 +1117,7 @@ run_smj() { RESULTS_FILE="${RESULTS_DIR}/smj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running smj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } @@ -1250,7 +1259,7 @@ run_clickbench_sorted() { --sorted-by "EventTime" \ -c datafusion.optimizer.prefer_existing_sort=true \ -o "${RESULTS_FILE}" \ - ${QUERY_ARG} + ${QUERY_ARG} ${LATENCY_ARG} } From 581ac01900f13ec73b44861bdfd657bd5de87f13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 15 Mar 2026 22:35:14 +0100 Subject: [PATCH 04/10] Fix missing simulate_latency field in IMDB test CommonOpt initializers Co-Authored-By: Claude Opus 4.6 (1M context) --- benchmarks/src/imdb/run.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 6d11c4a753dd7..ca9710a920517 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -523,6 +523,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; let opt = RunOpt { query: Some(query), @@ -560,6 +561,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; let opt = RunOpt { query: Some(query), From a28e6383bedf0deb50204b4020dcd6480d5d0370 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 15 Mar 2026 22:37:12 +0100 Subject: [PATCH 05/10] Enable object_store "fs" feature for ThrottledStore LocalFileSystem support Co-Authored-By: Claude Opus 4.6 (1M context) --- benchmarks/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index cb4a308ceb516..bdf3bc8b3737a 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -48,7 +48,7 @@ futures = { workspace = true } libmimalloc-sys = { version = "0.1", optional = true } log = { workspace = true } mimalloc = { version = "0.1", optional = true, default-features = false } -object_store = { workspace = true } +object_store = { workspace = true, features = ["fs"] } parquet = { workspace = true, default-features = true } rand = { workspace = true } regex.workspace = true From fda2a3281b2183e5e7a33165c22f8cddbabe33cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 16 Mar 2026 08:37:54 +0100 Subject: [PATCH 06/10] Replace ThrottledStore with LatencyObjectStore for realistic S3-like latency simulation Instead of fixed latency via ThrottledStore, cycle through a distribution inspired by real S3 performance (P50: ~30ms, P75-P90: 100ms+, P99: 150-200ms). Co-Authored-By: Claude Opus 4.6 (1M context) --- benchmarks/Cargo.toml | 2 + benchmarks/src/util/latency_object_store.rs | 156 ++++++++++++++++++++ benchmarks/src/util/mod.rs | 1 + benchmarks/src/util/options.rs | 25 ++-- 4 files changed, 169 insertions(+), 15 deletions(-) create mode 100644 benchmarks/src/util/latency_object_store.rs diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index bdf3bc8b3737a..0dc6ae02014dc 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -40,6 +40,8 @@ mimalloc_extended = ["libmimalloc-sys/extended"] [dependencies] arrow = { workspace = true } +async-trait = "0.1" +bytes = { workspace = true } clap = { version = "4.5.60", features = ["derive"] } datafusion = { workspace = true, default-features = true } datafusion-common = { workspace = true, default-features = true } diff --git a/benchmarks/src/util/latency_object_store.rs b/benchmarks/src/util/latency_object_store.rs new file mode 100644 index 0000000000000..62541cca70e2a --- /dev/null +++ b/benchmarks/src/util/latency_object_store.rs @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An ObjectStore wrapper that adds simulated S3-like latency to get and list operations. +//! +//! Cycles through a fixed latency distribution inspired by real S3 performance: +//! - P50: ~30ms +//! - P75-P90: ~100-120ms +//! - P99: ~150-200ms + +use std::fmt; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::path::Path; +use object_store::{ + CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, +}; + +/// Latency distribution to cycle through, inspired by S3 GET/LIST latencies. +/// Sorted so that most calls hit low latencies (simulating P50), +/// with occasional spikes (P90/P99). +const GET_LATENCIES_MS: &[u64] = &[ + 25, 28, 30, 30, 32, 35, // ~P50 range + 70, 85, 100, 110, // ~P75-P90 + 120, 150, // ~P95 + 180, 200, // ~P99 +]; + +const LIST_LATENCIES_MS: &[u64] = &[ + 40, 50, 55, 60, 65, 70, // ~P50 range + 120, 140, 160, 180, // ~P75-P90 + 200, 250, // ~P95 + 300, 400, // ~P99 +]; + +/// An ObjectStore wrapper that injects simulated latency on get and list calls. +#[derive(Debug)] +pub struct LatencyObjectStore { + inner: T, + get_counter: AtomicUsize, + list_counter: AtomicUsize, +} + +impl LatencyObjectStore { + pub fn new(inner: T) -> Self { + Self { + inner, + get_counter: AtomicUsize::new(0), + list_counter: AtomicUsize::new(0), + } + } + + fn next_get_latency(&self) -> Duration { + let idx = + self.get_counter.fetch_add(1, Ordering::Relaxed) % GET_LATENCIES_MS.len(); + Duration::from_millis(GET_LATENCIES_MS[idx]) + } + + fn next_list_latency(&self) -> Duration { + let idx = + self.list_counter.fetch_add(1, Ordering::Relaxed) % LIST_LATENCIES_MS.len(); + Duration::from_millis(LIST_LATENCIES_MS[idx]) + } +} + +impl fmt::Display for LatencyObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "LatencyObjectStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for LatencyObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + tokio::time::sleep(self.next_get_latency()).await; + self.inner.get_opts(location, options).await + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[std::ops::Range], + ) -> Result> { + tokio::time::sleep(self.next_get_latency()).await; + self.inner.get_ranges(location, ranges).await + } + + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + let latency = self.next_list_latency(); + let stream = self.inner.list(prefix); + futures::stream::once(async move { + tokio::time::sleep(latency).await; + futures::stream::empty() + }) + .flatten() + .chain(stream) + .boxed() + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + tokio::time::sleep(self.next_list_latency()).await; + self.inner.list_with_delimiter(prefix).await + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> Result<()> { + self.inner.copy_opts(from, to, options).await + } +} diff --git a/benchmarks/src/util/mod.rs b/benchmarks/src/util/mod.rs index ab4579a566f66..6dc11c0f425bd 100644 --- a/benchmarks/src/util/mod.rs +++ b/benchmarks/src/util/mod.rs @@ -16,6 +16,7 @@ // under the License. //! Shared benchmark utilities +pub mod latency_object_store; mod memory; mod options; mod run; diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index e3626f386ed42..abd36ab94cebb 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{num::NonZeroUsize, sync::Arc}; use clap::Args; use datafusion::{ @@ -28,10 +28,9 @@ use datafusion::{ prelude::SessionConfig, }; use datafusion_common::{DataFusionError, Result}; -use object_store::{ - local::LocalFileSystem, - throttle::{ThrottleConfig, ThrottledStore}, -}; +use object_store::local::LocalFileSystem; + +use super::latency_object_store::LatencyObjectStore; // Common benchmark options (don't use doc comments otherwise this doc // shows up in help files) @@ -130,17 +129,13 @@ impl CommonOpt { pub fn build_runtime(&self) -> Result> { let rt = self.runtime_env_builder()?.build_arc()?; if self.simulate_latency { - let config = ThrottleConfig { - wait_get_per_call: Duration::from_millis(100), - wait_list_per_call: Duration::from_millis(200), - wait_list_with_delimiter_per_call: Duration::from_millis(200), - ..Default::default() - }; - let throttled: Arc = - Arc::new(ThrottledStore::new(LocalFileSystem::new(), config)); + let store: Arc = + Arc::new(LatencyObjectStore::new(LocalFileSystem::new())); let url = ObjectStoreUrl::parse("file:///")?; - rt.register_object_store(url.as_ref(), throttled); - println!("Simulating object store latency (get: 100ms, list: 200ms)"); + rt.register_object_store(url.as_ref(), store); + println!( + "Simulating S3-like object store latency (get: 25-200ms, list: 40-400ms)" + ); } Ok(rt) } From f7c73508d354c8fc4039897fa0cf7d4bb139b544 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 16 Mar 2026 08:59:49 +0100 Subject: [PATCH 07/10] Fixes --- Cargo.lock | 2 ++ benchmarks/src/util/options.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 6914453b3da2c..168d3bd0c1812 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,8 @@ name = "datafusion-benchmarks" version = "52.3.0" dependencies = [ "arrow", + "async-trait", + "bytes", "clap", "datafusion", "datafusion-common", diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index f4437751dbe3e..a50a5268c0bfe 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -189,6 +189,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; // With env var set, builder should succeed and have a memory pool From f894ac17aa943d19c064a171e4830057ee2470b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 16 Mar 2026 09:29:14 +0100 Subject: [PATCH 08/10] Shuffle latency distributions and use separate GET/LIST lists Co-Authored-By: Claude Opus 4.6 (1M context) --- benchmarks/src/util/latency_object_store.rs | 23 ++++++++------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/benchmarks/src/util/latency_object_store.rs b/benchmarks/src/util/latency_object_store.rs index 62541cca70e2a..b338347d15e8f 100644 --- a/benchmarks/src/util/latency_object_store.rs +++ b/benchmarks/src/util/latency_object_store.rs @@ -35,21 +35,16 @@ use object_store::{ ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, }; -/// Latency distribution to cycle through, inspired by S3 GET/LIST latencies. -/// Sorted so that most calls hit low latencies (simulating P50), -/// with occasional spikes (P90/P99). -const GET_LATENCIES_MS: &[u64] = &[ - 25, 28, 30, 30, 32, 35, // ~P50 range - 70, 85, 100, 110, // ~P75-P90 - 120, 150, // ~P95 - 180, 200, // ~P99 -]; - +/// GET latency distribution, inspired by S3 latencies. +/// Deterministic but shuffled to avoid artificial patterns. +/// Distribution: P50 ~30ms, P75-P90 ~100ms, P99 ~200ms. +const GET_LATENCIES_MS: &[u64] = + &[30, 150, 28, 100, 35, 200, 30, 85, 120, 25, 110, 32, 180, 70]; + +/// LIST latency distribution, generally higher than GET. +/// Distribution: P50 ~55ms, P75-P90 ~150ms, P99 ~400ms. const LIST_LATENCIES_MS: &[u64] = &[ - 40, 50, 55, 60, 65, 70, // ~P50 range - 120, 140, 160, 180, // ~P75-P90 - 200, 250, // ~P95 - 300, 400, // ~P99 + 55, 250, 40, 160, 70, 400, 50, 140, 200, 60, 180, 65, 300, 120, ]; /// An ObjectStore wrapper that injects simulated latency on get and list calls. From edd6625668fb57fc5e9072dc2e11839ecfb56932 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 16 Mar 2026 09:30:19 +0100 Subject: [PATCH 09/10] Fix latency distributions to match target percentiles MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GET: P50≈32ms, P90≈110ms, P99≈200ms LIST: P50≈65ms, P90≈180ms, P99≈400ms Co-Authored-By: Claude Opus 4.6 (1M context) --- benchmarks/src/util/latency_object_store.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/benchmarks/src/util/latency_object_store.rs b/benchmarks/src/util/latency_object_store.rs index b338347d15e8f..9ef8d1b78b751 100644 --- a/benchmarks/src/util/latency_object_store.rs +++ b/benchmarks/src/util/latency_object_store.rs @@ -37,14 +37,20 @@ use object_store::{ /// GET latency distribution, inspired by S3 latencies. /// Deterministic but shuffled to avoid artificial patterns. -/// Distribution: P50 ~30ms, P75-P90 ~100ms, P99 ~200ms. -const GET_LATENCIES_MS: &[u64] = - &[30, 150, 28, 100, 35, 200, 30, 85, 120, 25, 110, 32, 180, 70]; +/// 20 values: 11x P50 (~25-35ms), 5x P75-P90 (~70-110ms), 2x P95 (~120-150ms), 2x P99 (~180-200ms) +/// Sorted: 25,25,28,28,30,30,30,30,32,32,35, 70,85,100,100,110, 130,150, 180,200 +/// P50≈32ms, P90≈110ms, P99≈200ms +const GET_LATENCIES_MS: &[u64] = &[ + 30, 100, 25, 85, 32, 200, 28, 130, 35, 70, 30, 150, 30, 110, 28, 180, 32, 25, 100, 30, +]; /// LIST latency distribution, generally higher than GET. -/// Distribution: P50 ~55ms, P75-P90 ~150ms, P99 ~400ms. +/// 20 values: 11x P50 (~40-70ms), 5x P75-P90 (~120-180ms), 2x P95 (~200-250ms), 2x P99 (~300-400ms) +/// Sorted: 40,40,50,50,55,55,60,60,65,65,70, 120,140,160,160,180, 210,250, 300,400 +/// P50≈65ms, P90≈180ms, P99≈400ms const LIST_LATENCIES_MS: &[u64] = &[ - 55, 250, 40, 160, 70, 400, 50, 140, 200, 60, 180, 65, 300, 120, + 55, 160, 40, 140, 65, 400, 50, 210, 70, 120, 60, 250, 55, 180, 50, 300, 65, 40, 160, + 60, ]; /// An ObjectStore wrapper that injects simulated latency on get and list calls. From 4bab06840469f884353e7750dd8a5e8943daa31e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 16 Mar 2026 09:55:25 +0100 Subject: [PATCH 10/10] Revert object_store feature flag to workspace default Co-Authored-By: Claude Opus 4.6 (1M context) --- benchmarks/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 0dc6ae02014dc..56f7704309780 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -50,7 +50,7 @@ futures = { workspace = true } libmimalloc-sys = { version = "0.1", optional = true } log = { workspace = true } mimalloc = { version = "0.1", optional = true, default-features = false } -object_store = { workspace = true, features = ["fs"] } +object_store = { workspace = true } parquet = { workspace = true, default-features = true } rand = { workspace = true } regex.workspace = true