diff --git a/datafusion/core/benches/topk_aggregate.rs b/datafusion/core/benches/topk_aggregate.rs index f71cf1087be7d..502db30f7a036 100644 --- a/datafusion/core/benches/topk_aggregate.rs +++ b/datafusion/core/benches/topk_aggregate.rs @@ -117,9 +117,9 @@ fn run(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool, asc: boo black_box(rt.block_on(async { aggregate(ctx, limit, use_topk, asc).await })).unwrap(); } -fn run_string(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool) { +fn run_string(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool) -> String { black_box(rt.block_on(async { aggregate_string(ctx, limit, use_topk).await })) - .unwrap(); + .unwrap() } fn run_distinct( @@ -187,7 +187,7 @@ async fn aggregate_string( ctx: SessionContext, limit: usize, use_topk: bool, -) -> Result<()> { +) -> Result { let sql = format!( "select max(trace_id) from traces group by timestamp_ms order by max(trace_id) desc limit {limit};" ); @@ -204,7 +204,7 @@ async fn aggregate_string( let batch = batches.first().unwrap(); assert_eq!(batch.num_rows(), LIMIT); - Ok(()) + Ok(format!("{}", pretty_format_batches(&batches)?)) } async fn aggregate_distinct( @@ -290,152 +290,113 @@ fn criterion_benchmark(c: &mut Criterion) { let limit = LIMIT; let partitions = 10; let samples = 1_000_000; + let total_rows = partitions * samples; + + // Numeric aggregate benchmarks + // (asc, use_topk, use_view) + let numeric_cases: &[(&str, bool, bool, bool)] = &[ + ("aggregate {rows} time-series rows", false, false, false), + ("aggregate {rows} worst-case rows", true, false, false), + ( + "top k={limit} aggregate {rows} time-series rows", + false, + true, + false, + ), + ( + "top k={limit} aggregate {rows} worst-case rows", + true, + true, + false, + ), + ( + "top k={limit} aggregate {rows} time-series rows [Utf8View]", + false, + true, + true, + ), + ( + "top k={limit} aggregate {rows} worst-case rows [Utf8View]", + true, + true, + true, + ), + ]; + for &(name_tpl, asc, use_topk, use_view) in numeric_cases { + let name = name_tpl + .replace("{rows}", &total_rows.to_string()) + .replace("{limit}", &limit.to_string()); + let ctx = rt + .block_on(create_context(partitions, samples, asc, use_topk, use_view)) + .unwrap(); + c.bench_function(&name, |b| { + b.iter(|| run(&rt, ctx.clone(), limit, use_topk, asc)) + }); + } - let ctx = rt - .block_on(create_context(partitions, samples, false, false, false)) - .unwrap(); - c.bench_function( - format!("aggregate {} time-series rows", partitions * samples).as_str(), - |b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)), - ); - - let ctx = rt - .block_on(create_context(partitions, samples, true, false, false)) - .unwrap(); - c.bench_function( - format!("aggregate {} worst-case rows", partitions * samples).as_str(), - |b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)), - ); - - let ctx = rt - .block_on(create_context(partitions, samples, false, true, false)) - .unwrap(); - c.bench_function( - format!( - "top k={limit} aggregate {} time-series rows", - partitions * samples - ) - .as_str(), - |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)), - ); - - let ctx = rt - .block_on(create_context(partitions, samples, true, true, false)) - .unwrap(); - c.bench_function( - format!( - "top k={limit} aggregate {} worst-case rows", - partitions * samples - ) - .as_str(), - |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)), - ); - - // Utf8View schema,time-series rows - let ctx = rt - .block_on(create_context(partitions, samples, false, true, true)) - .unwrap(); - c.bench_function( - format!( - "top k={limit} aggregate {} time-series rows [Utf8View]", - partitions * samples - ) - .as_str(), - |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)), - ); - - // Utf8View schema,worst-case rows - let ctx = rt - .block_on(create_context(partitions, samples, true, true, true)) - .unwrap(); - c.bench_function( - format!( - "top k={limit} aggregate {} worst-case rows [Utf8View]", - partitions * samples - ) - .as_str(), - |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)), - ); - - // String aggregate benchmarks - grouping by timestamp, aggregating string column - let ctx = rt - .block_on(create_context(partitions, samples, false, true, false)) - .unwrap(); - c.bench_function( - format!( - "top k={limit} string aggregate {} time-series rows [Utf8]", - partitions * samples - ) - .as_str(), - |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)), - ); - - let ctx = rt - .block_on(create_context(partitions, samples, true, true, false)) - .unwrap(); - c.bench_function( - format!( - "top k={limit} string aggregate {} worst-case rows [Utf8]", - partitions * samples - ) - .as_str(), - |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)), - ); - - let ctx = rt - .block_on(create_context(partitions, samples, false, true, true)) - .unwrap(); - c.bench_function( - format!( - "top k={limit} string aggregate {} time-series rows [Utf8View]", - partitions * samples - ) - .as_str(), - |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)), - ); + for asc in [false, true] { + for use_topk in [false, true] { + let ctx_utf8 = rt + .block_on(create_context(partitions, samples, asc, use_topk, false)) + .unwrap(); + let ctx_view = rt + .block_on(create_context(partitions, samples, asc, use_topk, true)) + .unwrap(); + let result_utf8 = run_string(&rt, ctx_utf8, limit, use_topk); + let result_view = run_string(&rt, ctx_view, limit, use_topk); + assert_eq!( + result_utf8, result_view, + "Utf8 vs Utf8View mismatch for asc={asc}, use_topk={use_topk}" + ); + } + } - let ctx = rt - .block_on(create_context(partitions, samples, true, true, true)) - .unwrap(); - c.bench_function( - format!( - "top k={limit} string aggregate {} worst-case rows [Utf8View]", - partitions * samples - ) - .as_str(), - |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)), - ); + // String aggregate benchmarks + // (asc, use_topk, use_view) + let string_cases: &[(bool, bool, bool)] = &[ + (false, false, false), + (true, false, false), + (false, false, true), + (true, false, true), + (false, true, false), + (true, true, false), + (false, true, true), + (true, true, true), + ]; + for &(asc, use_topk, use_view) in string_cases { + let scenario = if asc { "worst-case" } else { "time-series" }; + let type_label = if use_view { "Utf8View" } else { "Utf8" }; + let name = if use_topk { + format!( + "top k={limit} string aggregate {total_rows} {scenario} rows [{type_label}]" + ) + } else { + format!("string aggregate {total_rows} {scenario} rows [{type_label}]") + }; + let ctx = rt + .block_on(create_context(partitions, samples, asc, use_topk, use_view)) + .unwrap(); + c.bench_function(&name, |b| { + b.iter(|| run_string(&rt, ctx.clone(), limit, use_topk)) + }); + } // DISTINCT benchmarks - let ctx = rt.block_on(async { - create_context_distinct(partitions, samples, false) - .await - .unwrap() - }); - c.bench_function( - format!("distinct {} rows desc [no TopK]", partitions * samples).as_str(), - |b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, false)), - ); - - c.bench_function( - format!("distinct {} rows asc [no TopK]", partitions * samples).as_str(), - |b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, true)), - ); - - let ctx_topk = rt.block_on(async { - create_context_distinct(partitions, samples, true) - .await - .unwrap() - }); - c.bench_function( - format!("distinct {} rows desc [TopK]", partitions * samples).as_str(), - |b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, false)), - ); - - c.bench_function( - format!("distinct {} rows asc [TopK]", partitions * samples).as_str(), - |b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, true)), - ); + for use_topk in [false, true] { + let ctx = rt.block_on(async { + create_context_distinct(partitions, samples, use_topk) + .await + .unwrap() + }); + let topk_label = if use_topk { "TopK" } else { "no TopK" }; + for asc in [false, true] { + let dir = if asc { "asc" } else { "desc" }; + let name = format!("distinct {total_rows} rows {dir} [{topk_label}]"); + c.bench_function(&name, |b| { + b.iter(|| run_distinct(&rt, ctx.clone(), limit, use_topk, asc)) + }); + } + } } criterion_group!(benches, criterion_benchmark);