Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ futures = { workspace = true }
libmimalloc-sys = { version = "0.1", optional = true }
log = { workspace = true }
mimalloc = { version = "0.1", optional = true, default-features = false }
object_store = { workspace = true }
object_store = { workspace = true, features = ["fs"] }
parquet = { workspace = true, default-features = true }
rand = { workspace = true }
regex.workspace = true
Expand Down
43 changes: 26 additions & 17 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..}
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true}
SIMULATE_LATENCY=${SIMULATE_LATENCY:-false}

# Build latency arg based on SIMULATE_LATENCY setting
LATENCY_ARG=""
if [ "$SIMULATE_LATENCY" = "true" ]; then
LATENCY_ARG="--simulate-latency"
fi

usage() {
echo "
Expand Down Expand Up @@ -141,6 +148,7 @@ CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
PREFER_HASH_JOIN Prefer hash join algorithm (default true)
SIMULATE_LATENCY Simulate object store latency to mimic S3 (default false)
DATAFUSION_* Set the given datafusion configuration
"
exit 1
Expand Down Expand Up @@ -371,6 +379,7 @@ main() {
echo "RESULTS_DIR: ${RESULTS_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
echo "SIMULATE_LATENCY: ${SIMULATE_LATENCY}"
echo "***************************"

# navigate to the appropriate directory
Expand Down Expand Up @@ -655,7 +664,7 @@ run_tpch() {
echo "Running tpch benchmark..."

FORMAT=$2
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the tpch in memory (needs tpch parquet data)
Expand All @@ -671,7 +680,7 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the tpcds benchmark
Expand All @@ -691,7 +700,7 @@ run_tpcds() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpcds benchmark..."

debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- tpcds --iterations 5 --path "${TPCDS_DIR}" --query_path "../datafusion/core/tests/tpc-ds" --prefer_hash_join "${PREFER_HASH_JOIN}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the compile profile benchmark helper
Expand All @@ -713,7 +722,7 @@ run_cancellation() {
RESULTS_FILE="${RESULTS_DIR}/cancellation.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running cancellation benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}"
debug_run $CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}" ${LATENCY_ARG}
}


Expand Down Expand Up @@ -767,15 +776,15 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the clickbench benchmark with the partitioned parquet dataset (100 files)
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}


Expand All @@ -784,7 +793,7 @@ run_clickbench_pushdown() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_pushdown.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark with pushdown_filters=true, reorder_filters=true..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --pushdown --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}


Expand All @@ -793,7 +802,7 @@ run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors)
Expand Down Expand Up @@ -908,7 +917,7 @@ run_imdb() {
RESULTS_FILE="${RESULTS_DIR}/imdb.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running imdb benchmark..."
debug_run $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin imdb -- benchmark datafusion --iterations 5 --path "${IMDB_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

data_h2o() {
Expand Down Expand Up @@ -980,7 +989,7 @@ run_h2o() {
--path "${H2O_DIR}/${FILE_NAME}" \
--queries-path "${QUERY_FILE}" \
-o "${RESULTS_FILE}" \
${QUERY_ARG}
${QUERY_ARG} ${LATENCY_ARG}
}

# Utility function to run h2o join/window benchmark
Expand Down Expand Up @@ -1032,7 +1041,7 @@ h2o_runner() {
--join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \
--queries-path "${QUERY_FILE}" \
-o "${RESULTS_FILE}" \
${QUERY_ARG}
${QUERY_ARG} ${LATENCY_ARG}
}

# Runners for h2o join benchmark
Expand Down Expand Up @@ -1073,7 +1082,7 @@ run_sort_tpch() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort tpch benchmark..."

debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the sort tpch integration benchmark with limit 100 (topk)
Expand All @@ -1083,15 +1092,15 @@ run_topk_tpch() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running topk tpch benchmark..."

$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG}
$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the nlj benchmark
run_nlj() {
RESULTS_FILE="${RESULTS_DIR}/nlj.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running nlj benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the hj benchmark
Expand All @@ -1100,15 +1109,15 @@ run_hj() {
RESULTS_FILE="${RESULTS_DIR}/hj.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running hj benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the smj benchmark
run_smj() {
RESULTS_FILE="${RESULTS_DIR}/smj.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running smj benchmark..."
debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}


Expand Down Expand Up @@ -1250,7 +1259,7 @@ run_clickbench_sorted() {
--sorted-by "EventTime" \
-c datafusion.optimizer.prefer_existing_sort=true \
-o "${RESULTS_FILE}" \
${QUERY_ARG}
${QUERY_ARG} ${LATENCY_ARG}
}


Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ impl RunOpt {
}
}

let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);

self.register_hits(&ctx).await?;

Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ impl RunOpt {
};

let config = self.common.config()?;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);

// Register tables depending on which h2o benchmark is being run
// (groupby/join/window)
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/hj.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ impl RunOpt {
};

let config = self.common.config()?;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);

if let Some(path) = &self.path {
for table in &["lineitem", "supplier", "nation", "customer"] {
Expand Down
6 changes: 4 additions & 2 deletions benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ impl RunOpt {
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config.options_mut().execution.hash_join_buffering_capacity =
self.hash_join_buffering_capacity;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);

// register tables
self.register_tables(&ctx).await?;
Expand Down Expand Up @@ -523,6 +523,7 @@ mod tests {
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
simulate_latency: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -560,6 +561,7 @@ mod tests {
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
simulate_latency: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/nlj.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ impl RunOpt {
};

let config = self.common.config()?;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);

let mut benchmark_run = BenchmarkRun::new();
for query_id in query_range {
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/smj.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ impl RunOpt {
let mut config = self.common.config()?;
// Disable hash joins to force SMJ
config = config.set_bool("datafusion.optimizer.prefer_hash_join", false);
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);

let mut benchmark_run = BenchmarkRun::new();
for query_id in query_range {
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ impl RunOpt {
/// Benchmark query `query_id` in `SORT_QUERIES`
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self.common.config()?;
let rt_builder = self.common.runtime_env_builder()?;
let rt = self.common.build_runtime()?;
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(rt_builder.build_arc()?)
.with_runtime_env(rt)
.with_default_features()
.build();
let ctx = SessionContext::from(state);
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/tpcds/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ impl RunOpt {
self.enable_piecewise_merge_join;
config.options_mut().execution.hash_join_buffering_capacity =
self.hash_join_buffering_capacity;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);
// register tables
self.register_tables(&ctx).await?;

Expand Down
6 changes: 4 additions & 2 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ impl RunOpt {
self.enable_piecewise_merge_join;
config.options_mut().execution.hash_join_buffering_capacity =
self.hash_join_buffering_capacity;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);
// register tables
self.register_tables(&ctx).await?;

Expand Down Expand Up @@ -386,6 +386,7 @@ mod tests {
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
simulate_latency: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down Expand Up @@ -425,6 +426,7 @@ mod tests {
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
simulate_latency: false,
};
let opt = RunOpt {
query: Some(query),
Expand Down
34 changes: 32 additions & 2 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
// specific language governing permissions and limitations
// under the License.

use std::{num::NonZeroUsize, sync::Arc};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};

use clap::Args;
use datafusion::{
execution::{
disk_manager::DiskManagerBuilder,
memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool},
runtime_env::RuntimeEnvBuilder,
object_store::ObjectStoreUrl,
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
},
prelude::SessionConfig,
};
use datafusion_common::{DataFusionError, Result};
use object_store::{
local::LocalFileSystem,
throttle::{ThrottleConfig, ThrottledStore},
};

// Common benchmark options (don't use doc comments otherwise this doc
// shows up in help files)
Expand Down Expand Up @@ -61,6 +66,11 @@ pub struct CommonOpt {
/// Activate debug mode to see more details
#[arg(short, long)]
pub debug: bool,

/// Simulate object store latency to mimic remote storage (e.g. S3).
/// Adds random latency in the range 20-200ms to each object store operation.
#[arg(long = "simulate-latency")]
pub simulate_latency: bool,
}

impl CommonOpt {
Expand Down Expand Up @@ -114,6 +124,26 @@ impl CommonOpt {
}
Ok(rt_builder)
}

/// Build the runtime environment, optionally wrapping the local filesystem
/// with a throttled object store to simulate remote storage latency.
pub fn build_runtime(&self) -> Result<Arc<RuntimeEnv>> {
let rt = self.runtime_env_builder()?.build_arc()?;
if self.simulate_latency {
let config = ThrottleConfig {
wait_get_per_call: Duration::from_millis(100),
wait_list_per_call: Duration::from_millis(200),
wait_list_with_delimiter_per_call: Duration::from_millis(200),
..Default::default()
};
let throttled: Arc<dyn object_store::ObjectStore> =
Arc::new(ThrottledStore::new(LocalFileSystem::new(), config));
let url = ObjectStoreUrl::parse("file:///")?;
rt.register_object_store(url.as_ref(), throttled);
println!("Simulating object store latency (get: 100ms, list: 200ms)");
}
Ok(rt)
}
}

/// Parse capacity limit from string to number of bytes by allowing units: K, M and G.
Expand Down
Loading