diff --git a/Cargo.lock b/Cargo.lock index 6914453b3da2c..168d3bd0c1812 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,8 @@ name = "datafusion-benchmarks" version = "52.3.0" dependencies = [ "arrow", + "async-trait", + "bytes", "clap", "datafusion", "datafusion-common", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index cb4a308ceb516..56f7704309780 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -40,6 +40,8 @@ mimalloc_extended = ["libmimalloc-sys/extended"] [dependencies] arrow = { workspace = true } +async-trait = "0.1" +bytes = { workspace = true } clap = { version = "4.5.60", features = ["derive"] } datafusion = { workspace = true, default-features = true } datafusion-common = { workspace = true, default-features = true } diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 761efa6d591a4..0fc6ede3b3af4 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -42,6 +42,13 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..} DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data} CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"} PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true} +SIMULATE_LATENCY=${SIMULATE_LATENCY:-false} + +# Build latency arg based on SIMULATE_LATENCY setting +LATENCY_ARG="" +if [ "$SIMULATE_LATENCY" = "true" ]; then + LATENCY_ARG="--simulate-latency" +fi usage() { echo " @@ -141,6 +148,7 @@ CARGO_COMMAND command that runs the benchmark binary DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) RESULTS_NAME folder where the benchmark files are stored PREFER_HASH_JOIN Prefer hash join algorithm (default true) +SIMULATE_LATENCY Simulate object store latency to mimic S3 (default false) DATAFUSION_* Set the given datafusion configuration " exit 1 @@ -371,6 +379,7 @@ main() { echo "RESULTS_DIR: ${RESULTS_DIR}" echo "CARGO_COMMAND: ${CARGO_COMMAND}" echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}" + echo "SIMULATE_LATENCY: ${SIMULATE_LATENCY}" echo "***************************" # navigate to the appropriate directory @@ -655,7 +664,7 @@ run_tpch() { echo "Running tpch benchmark..." FORMAT=$2 - debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the tpch in memory (needs tpch parquet data) @@ -671,7 +680,7 @@ run_tpch_mem() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch_mem benchmark..." # -m means in memory - debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the tpcds benchmark @@ -691,7 +700,7 @@ run_tpcds() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpcds benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the compile profile benchmark helper @@ -713,7 +722,7 @@ run_cancellation() { RESULTS_FILE="${RESULTS_DIR}/cancellation.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running cancellation benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}" + debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}" ${LATENCY_ARG} } @@ -767,7 +776,7 @@ run_clickbench_1() { RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the clickbench benchmark with the partitioned parquet dataset (100 files) @@ -775,7 +784,7 @@ run_clickbench_partitioned() { RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (partitioned, 100 files) benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } @@ -784,7 +793,7 @@ run_clickbench_pushdown() { RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } @@ -793,7 +802,7 @@ run_clickbench_extended() { RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (1 file) extended benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors) @@ -908,7 +917,7 @@ run_imdb() { RESULTS_FILE="${RESULTS_DIR}/imdb.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running imdb benchmark..." - debug_run $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } data_h2o() { @@ -980,7 +989,7 @@ run_h2o() { --path "${H2O_DIR}/${FILE_NAME}" \ --queries-path "${QUERY_FILE}" \ -o "${RESULTS_FILE}" \ - ${QUERY_ARG} + ${QUERY_ARG} ${LATENCY_ARG} } # Utility function to run h2o join/window benchmark @@ -1032,7 +1041,7 @@ h2o_runner() { --join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \ --queries-path "${QUERY_FILE}" \ -o "${RESULTS_FILE}" \ - ${QUERY_ARG} + ${QUERY_ARG} ${LATENCY_ARG} } # Runners for h2o join benchmark @@ -1073,7 +1082,7 @@ run_sort_tpch() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running sort tpch benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the sort tpch integration benchmark with limit 100 (topk) @@ -1083,7 +1092,7 @@ run_topk_tpch() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running topk tpch benchmark..." - $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} + $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} ${LATENCY_ARG} } # Runs the nlj benchmark @@ -1091,7 +1100,7 @@ run_nlj() { RESULTS_FILE="${RESULTS_DIR}/nlj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running nlj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the hj benchmark @@ -1100,7 +1109,7 @@ run_hj() { RESULTS_FILE="${RESULTS_DIR}/hj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running hj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the smj benchmark @@ -1108,7 +1117,7 @@ run_smj() { RESULTS_FILE="${RESULTS_DIR}/smj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running smj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } @@ -1250,7 +1259,7 @@ run_clickbench_sorted() { --sorted-by "EventTime" \ -c datafusion.optimizer.prefer_existing_sort=true \ -o "${RESULTS_FILE}" \ - ${QUERY_ARG} + ${QUERY_ARG} ${LATENCY_ARG} } diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index c0f911c566f4d..70aaeb7d2d192 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -207,8 +207,8 @@ impl RunOpt { } } - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); self.register_hits(&ctx).await?; diff --git a/benchmarks/src/h2o.rs b/benchmarks/src/h2o.rs index f55dad27cc638..8b6e04932cb39 100644 --- a/benchmarks/src/h2o.rs +++ b/benchmarks/src/h2o.rs @@ -84,8 +84,8 @@ impl RunOpt { }; let config = self.common.config()?; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); // Register tables depending on which h2o benchmark is being run // (groupby/join/window) diff --git a/benchmarks/src/hj.rs b/benchmarks/src/hj.rs index 6eb828a3aedf8..301fe0d599cd6 100644 --- a/benchmarks/src/hj.rs +++ b/benchmarks/src/hj.rs @@ -324,8 +324,8 @@ impl RunOpt { }; let config = self.common.config()?; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); if let Some(path) = &self.path { for table in &["lineitem", "supplier", "nation", "customer"] { diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index 29ca5249aa5b3..ca9710a920517 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -312,8 +312,8 @@ impl RunOpt { config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; config.options_mut().execution.hash_join_buffering_capacity = self.hash_join_buffering_capacity; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); // register tables self.register_tables(&ctx).await?; @@ -523,6 +523,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; let opt = RunOpt { query: Some(query), @@ -560,6 +561,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; let opt = RunOpt { query: Some(query), diff --git a/benchmarks/src/nlj.rs b/benchmarks/src/nlj.rs index ade8c0f7789bc..361cc35ec200c 100644 --- a/benchmarks/src/nlj.rs +++ b/benchmarks/src/nlj.rs @@ -207,8 +207,8 @@ impl RunOpt { }; let config = self.common.config()?; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); let mut benchmark_run = BenchmarkRun::new(); for query_id in query_range { diff --git a/benchmarks/src/smj.rs b/benchmarks/src/smj.rs index b420ef1d64c60..5056fd5096156 100644 --- a/benchmarks/src/smj.rs +++ b/benchmarks/src/smj.rs @@ -433,8 +433,8 @@ impl RunOpt { let mut config = self.common.config()?; // Disable hash joins to force SMJ config = config.set_bool("datafusion.optimizer.prefer_hash_join", false); - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); let mut benchmark_run = BenchmarkRun::new(); for query_id in query_range { diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 806f1f6c33d0f..95c90d826de20 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -209,10 +209,10 @@ impl RunOpt { /// Benchmark query `query_id` in `SORT_QUERIES` async fn benchmark_query(&self, query_id: usize) -> Result> { let config = self.common.config()?; - let rt_builder = self.common.runtime_env_builder()?; + let rt = self.common.build_runtime()?; let state = SessionStateBuilder::new() .with_config(config) - .with_runtime_env(rt_builder.build_arc()?) + .with_runtime_env(rt) .with_default_features() .build(); let ctx = SessionContext::from(state); diff --git a/benchmarks/src/tpcds/run.rs b/benchmarks/src/tpcds/run.rs index 8e24b121b2f93..f7ef6991515da 100644 --- a/benchmarks/src/tpcds/run.rs +++ b/benchmarks/src/tpcds/run.rs @@ -168,8 +168,8 @@ impl RunOpt { self.enable_piecewise_merge_join; config.options_mut().execution.hash_join_buffering_capacity = self.hash_join_buffering_capacity; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); // register tables self.register_tables(&ctx).await?; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 392e02f8478b7..0d1268013c168 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -129,8 +129,8 @@ impl RunOpt { self.enable_piecewise_merge_join; config.options_mut().execution.hash_join_buffering_capacity = self.hash_join_buffering_capacity; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + let rt = self.common.build_runtime()?; + let ctx = SessionContext::new_with_config_rt(config, rt); // register tables self.register_tables(&ctx).await?; @@ -386,6 +386,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; let opt = RunOpt { query: Some(query), @@ -425,6 +426,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; let opt = RunOpt { query: Some(query), diff --git a/benchmarks/src/util/latency_object_store.rs b/benchmarks/src/util/latency_object_store.rs new file mode 100644 index 0000000000000..9ef8d1b78b751 --- /dev/null +++ b/benchmarks/src/util/latency_object_store.rs @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An ObjectStore wrapper that adds simulated S3-like latency to get and list operations. +//! +//! Cycles through a fixed latency distribution inspired by real S3 performance: +//! - P50: ~30ms +//! - P75-P90: ~100-120ms +//! - P99: ~150-200ms + +use std::fmt; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::path::Path; +use object_store::{ + CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, +}; + +/// GET latency distribution, inspired by S3 latencies. +/// Deterministic but shuffled to avoid artificial patterns. +/// 20 values: 11x P50 (~25-35ms), 5x P75-P90 (~70-110ms), 2x P95 (~120-150ms), 2x P99 (~180-200ms) +/// Sorted: 25,25,28,28,30,30,30,30,32,32,35, 70,85,100,100,110, 130,150, 180,200 +/// P50≈32ms, P90≈110ms, P99≈200ms +const GET_LATENCIES_MS: &[u64] = &[ + 30, 100, 25, 85, 32, 200, 28, 130, 35, 70, 30, 150, 30, 110, 28, 180, 32, 25, 100, 30, +]; + +/// LIST latency distribution, generally higher than GET. +/// 20 values: 11x P50 (~40-70ms), 5x P75-P90 (~120-180ms), 2x P95 (~200-250ms), 2x P99 (~300-400ms) +/// Sorted: 40,40,50,50,55,55,60,60,65,65,70, 120,140,160,160,180, 210,250, 300,400 +/// P50≈65ms, P90≈180ms, P99≈400ms +const LIST_LATENCIES_MS: &[u64] = &[ + 55, 160, 40, 140, 65, 400, 50, 210, 70, 120, 60, 250, 55, 180, 50, 300, 65, 40, 160, + 60, +]; + +/// An ObjectStore wrapper that injects simulated latency on get and list calls. +#[derive(Debug)] +pub struct LatencyObjectStore { + inner: T, + get_counter: AtomicUsize, + list_counter: AtomicUsize, +} + +impl LatencyObjectStore { + pub fn new(inner: T) -> Self { + Self { + inner, + get_counter: AtomicUsize::new(0), + list_counter: AtomicUsize::new(0), + } + } + + fn next_get_latency(&self) -> Duration { + let idx = + self.get_counter.fetch_add(1, Ordering::Relaxed) % GET_LATENCIES_MS.len(); + Duration::from_millis(GET_LATENCIES_MS[idx]) + } + + fn next_list_latency(&self) -> Duration { + let idx = + self.list_counter.fetch_add(1, Ordering::Relaxed) % LIST_LATENCIES_MS.len(); + Duration::from_millis(LIST_LATENCIES_MS[idx]) + } +} + +impl fmt::Display for LatencyObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "LatencyObjectStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for LatencyObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + tokio::time::sleep(self.next_get_latency()).await; + self.inner.get_opts(location, options).await + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[std::ops::Range], + ) -> Result> { + tokio::time::sleep(self.next_get_latency()).await; + self.inner.get_ranges(location, ranges).await + } + + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + let latency = self.next_list_latency(); + let stream = self.inner.list(prefix); + futures::stream::once(async move { + tokio::time::sleep(latency).await; + futures::stream::empty() + }) + .flatten() + .chain(stream) + .boxed() + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + tokio::time::sleep(self.next_list_latency()).await; + self.inner.list_with_delimiter(prefix).await + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> Result<()> { + self.inner.copy_opts(from, to, options).await + } +} diff --git a/benchmarks/src/util/mod.rs b/benchmarks/src/util/mod.rs index ab4579a566f66..6dc11c0f425bd 100644 --- a/benchmarks/src/util/mod.rs +++ b/benchmarks/src/util/mod.rs @@ -16,6 +16,7 @@ // under the License. //! Shared benchmark utilities +pub mod latency_object_store; mod memory; mod options; mod run; diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index bcb4379fbd652..a50a5268c0bfe 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -22,11 +22,15 @@ use datafusion::{ execution::{ disk_manager::DiskManagerBuilder, memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool}, - runtime_env::RuntimeEnvBuilder, + object_store::ObjectStoreUrl, + runtime_env::{RuntimeEnv, RuntimeEnvBuilder}, }, prelude::SessionConfig, }; use datafusion_common::{DataFusionError, Result}; +use object_store::local::LocalFileSystem; + +use super::latency_object_store::LatencyObjectStore; // Common benchmark options (don't use doc comments otherwise this doc // shows up in help files) @@ -61,6 +65,11 @@ pub struct CommonOpt { /// Activate debug mode to see more details #[arg(short, long)] pub debug: bool, + + /// Simulate object store latency to mimic remote storage (e.g. S3). + /// Adds random latency in the range 20-200ms to each object store operation. + #[arg(long = "simulate-latency")] + pub simulate_latency: bool, } impl CommonOpt { @@ -122,6 +131,22 @@ impl CommonOpt { } Ok(rt_builder) } + + /// Build the runtime environment, optionally wrapping the local filesystem + /// with a throttled object store to simulate remote storage latency. + pub fn build_runtime(&self) -> Result> { + let rt = self.runtime_env_builder()?.build_arc()?; + if self.simulate_latency { + let store: Arc = + Arc::new(LatencyObjectStore::new(LocalFileSystem::new())); + let url = ObjectStoreUrl::parse("file:///")?; + rt.register_object_store(url.as_ref(), store); + println!( + "Simulating S3-like object store latency (get: 25-200ms, list: 40-400ms)" + ); + } + Ok(rt) + } } /// Parse capacity limit from string to number of bytes by allowing units: K, M and G. @@ -164,6 +189,7 @@ mod tests { memory_limit: None, sort_spill_reservation_bytes: None, debug: false, + simulate_latency: false, }; // With env var set, builder should succeed and have a memory pool