-
Notifications
You must be signed in to change notification settings - Fork 2k
[Minor] add non topk benchmarks for utf8/utf8view string aggregates #21073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -117,9 +117,9 @@ fn run(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool, asc: boo | |
| black_box(rt.block_on(async { aggregate(ctx, limit, use_topk, asc).await })).unwrap(); | ||
| } | ||
|
|
||
| fn run_string(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool) { | ||
| fn run_string(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool) -> String { | ||
| black_box(rt.block_on(async { aggregate_string(ctx, limit, use_topk).await })) | ||
| .unwrap(); | ||
| .unwrap() | ||
| } | ||
|
|
||
| fn run_distinct( | ||
|
|
@@ -187,7 +187,7 @@ async fn aggregate_string( | |
| ctx: SessionContext, | ||
| limit: usize, | ||
| use_topk: bool, | ||
| ) -> Result<()> { | ||
| ) -> Result<String> { | ||
| let sql = format!( | ||
| "select max(trace_id) from traces group by timestamp_ms order by max(trace_id) desc limit {limit};" | ||
| ); | ||
|
|
@@ -204,7 +204,7 @@ async fn aggregate_string( | |
| let batch = batches.first().unwrap(); | ||
| assert_eq!(batch.num_rows(), LIMIT); | ||
|
|
||
| Ok(()) | ||
| Ok(format!("{}", pretty_format_batches(&batches)?)) | ||
| } | ||
|
|
||
| async fn aggregate_distinct( | ||
|
|
@@ -290,152 +290,113 @@ fn criterion_benchmark(c: &mut Criterion) { | |
| let limit = LIMIT; | ||
| let partitions = 10; | ||
| let samples = 1_000_000; | ||
| let total_rows = partitions * samples; | ||
|
|
||
| // Numeric aggregate benchmarks | ||
| // (asc, use_topk, use_view) | ||
| let numeric_cases: &[(&str, bool, bool, bool)] = &[ | ||
| ("aggregate {rows} time-series rows", false, false, false), | ||
| ("aggregate {rows} worst-case rows", true, false, false), | ||
| ( | ||
| "top k={limit} aggregate {rows} time-series rows", | ||
| false, | ||
| true, | ||
| false, | ||
| ), | ||
| ( | ||
| "top k={limit} aggregate {rows} worst-case rows", | ||
| true, | ||
| true, | ||
| false, | ||
| ), | ||
| ( | ||
| "top k={limit} aggregate {rows} time-series rows [Utf8View]", | ||
| false, | ||
| true, | ||
| true, | ||
| ), | ||
| ( | ||
| "top k={limit} aggregate {rows} worst-case rows [Utf8View]", | ||
| true, | ||
| true, | ||
| true, | ||
| ), | ||
| ]; | ||
| for &(name_tpl, asc, use_topk, use_view) in numeric_cases { | ||
| let name = name_tpl | ||
| .replace("{rows}", &total_rows.to_string()) | ||
| .replace("{limit}", &limit.to_string()); | ||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, asc, use_topk, use_view)) | ||
| .unwrap(); | ||
| c.bench_function(&name, |b| { | ||
| b.iter(|| run(&rt, ctx.clone(), limit, use_topk, asc)) | ||
| }); | ||
| } | ||
|
|
||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, false, false, false)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!("aggregate {} time-series rows", partitions * samples).as_str(), | ||
| |b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)), | ||
| ); | ||
|
|
||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, true, false, false)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!("aggregate {} worst-case rows", partitions * samples).as_str(), | ||
| |b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)), | ||
| ); | ||
|
|
||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, false, true, false)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!( | ||
| "top k={limit} aggregate {} time-series rows", | ||
| partitions * samples | ||
| ) | ||
| .as_str(), | ||
| |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)), | ||
| ); | ||
|
|
||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, true, true, false)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!( | ||
| "top k={limit} aggregate {} worst-case rows", | ||
| partitions * samples | ||
| ) | ||
| .as_str(), | ||
| |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)), | ||
| ); | ||
|
|
||
| // Utf8View schema,time-series rows | ||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, false, true, true)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!( | ||
| "top k={limit} aggregate {} time-series rows [Utf8View]", | ||
| partitions * samples | ||
| ) | ||
| .as_str(), | ||
| |b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)), | ||
| ); | ||
|
|
||
| // Utf8View schema,worst-case rows | ||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, true, true, true)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!( | ||
| "top k={limit} aggregate {} worst-case rows [Utf8View]", | ||
| partitions * samples | ||
| ) | ||
| .as_str(), | ||
| |b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)), | ||
| ); | ||
|
|
||
| // String aggregate benchmarks - grouping by timestamp, aggregating string column | ||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, false, true, false)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!( | ||
| "top k={limit} string aggregate {} time-series rows [Utf8]", | ||
| partitions * samples | ||
| ) | ||
| .as_str(), | ||
| |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)), | ||
| ); | ||
|
|
||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, true, true, false)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!( | ||
| "top k={limit} string aggregate {} worst-case rows [Utf8]", | ||
| partitions * samples | ||
| ) | ||
| .as_str(), | ||
| |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)), | ||
| ); | ||
|
|
||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, false, true, true)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!( | ||
| "top k={limit} string aggregate {} time-series rows [Utf8View]", | ||
| partitions * samples | ||
| ) | ||
| .as_str(), | ||
| |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)), | ||
| ); | ||
| for asc in [false, true] { | ||
| for use_topk in [false, true] { | ||
| let ctx_utf8 = rt | ||
| .block_on(create_context(partitions, samples, asc, use_topk, false)) | ||
| .unwrap(); | ||
| let ctx_view = rt | ||
| .block_on(create_context(partitions, samples, asc, use_topk, true)) | ||
| .unwrap(); | ||
| let result_utf8 = run_string(&rt, ctx_utf8, limit, use_topk); | ||
| let result_view = run_string(&rt, ctx_view, limit, use_topk); | ||
| assert_eq!( | ||
| result_utf8, result_view, | ||
| "Utf8 vs Utf8View mismatch for asc={asc}, use_topk={use_topk}" | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, true, true, true)) | ||
| .unwrap(); | ||
| c.bench_function( | ||
| format!( | ||
| "top k={limit} string aggregate {} worst-case rows [Utf8View]", | ||
| partitions * samples | ||
| ) | ||
| .as_str(), | ||
| |b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)), | ||
| ); | ||
| // String aggregate benchmarks | ||
| // (asc, use_topk, use_view) | ||
| let string_cases: &[(bool, bool, bool)] = &[ | ||
| (false, false, false), | ||
| (true, false, false), | ||
| (false, false, true), | ||
| (true, false, true), | ||
| (false, true, false), | ||
| (true, true, false), | ||
| (false, true, true), | ||
| (true, true, true), | ||
| ]; | ||
| for &(asc, use_topk, use_view) in string_cases { | ||
| let scenario = if asc { "worst-case" } else { "time-series" }; | ||
| let type_label = if use_view { "Utf8View" } else { "Utf8" }; | ||
| let name = if use_topk { | ||
| format!( | ||
| "top k={limit} string aggregate {total_rows} {scenario} rows [{type_label}]" | ||
| ) | ||
| } else { | ||
| format!("string aggregate {total_rows} {scenario} rows [{type_label}]") | ||
| }; | ||
| let ctx = rt | ||
| .block_on(create_context(partitions, samples, asc, use_topk, use_view)) | ||
| .unwrap(); | ||
| c.bench_function(&name, |b| { | ||
| b.iter(|| run_string(&rt, ctx.clone(), limit, use_topk)) | ||
| }); | ||
| } | ||
|
|
||
| // DISTINCT benchmarks | ||
| let ctx = rt.block_on(async { | ||
| create_context_distinct(partitions, samples, false) | ||
| .await | ||
| .unwrap() | ||
| }); | ||
| c.bench_function( | ||
| format!("distinct {} rows desc [no TopK]", partitions * samples).as_str(), | ||
| |b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, false)), | ||
| ); | ||
|
|
||
| c.bench_function( | ||
| format!("distinct {} rows asc [no TopK]", partitions * samples).as_str(), | ||
| |b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, true)), | ||
| ); | ||
|
|
||
| let ctx_topk = rt.block_on(async { | ||
| create_context_distinct(partitions, samples, true) | ||
| .await | ||
| .unwrap() | ||
| }); | ||
| c.bench_function( | ||
| format!("distinct {} rows desc [TopK]", partitions * samples).as_str(), | ||
| |b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, false)), | ||
| ); | ||
|
|
||
| c.bench_function( | ||
| format!("distinct {} rows asc [TopK]", partitions * samples).as_str(), | ||
| |b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, true)), | ||
| ); | ||
| for use_topk in [false, true] { | ||
| let ctx = rt.block_on(async { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This refactor now rebuilds the DISTINCT input/context once per (use_topk, asc) pair, even though asc only affects the query text and the old version shared one context per TopK mode. It is outside the timed loop, so not a benchmark-result bug, but it does add a lot of setup work for 10M rows. Could we hoist context creation by use_topk again, or extract a small helper that caches the two contexts?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand I think I've resolved this by using and cloning the session (shallow copy afais) but can you check if possible |
||
| create_context_distinct(partitions, samples, use_topk) | ||
| .await | ||
| .unwrap() | ||
| }); | ||
| let topk_label = if use_topk { "TopK" } else { "no TopK" }; | ||
| for asc in [false, true] { | ||
| let dir = if asc { "asc" } else { "desc" }; | ||
| let name = format!("distinct {total_rows} rows {dir} [{topk_label}]"); | ||
| c.bench_function(&name, |b| { | ||
| b.iter(|| run_distinct(&rt, ctx.clone(), limit, use_topk, asc)) | ||
| }); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| criterion_group!(benches, criterion_benchmark); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These new TopK-disabled string cases expand the matrix, but they still only go through run_string()/aggregate_string(), which currently checks row-count and whether the physical plan contains lim=[...]. That means this PR does not actually verify correctness across Utf8 and Utf8View group keys, even though that is part of the motivation. Can we strengthen the string benchmark path with an expected-result assertion (or add a dedicated helper/test that compares Utf8 vs Utf8View output for both TopK modes) so the new variants catch ordering/value regressions instead of only plan-shape changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrongly assumed we wanted to only check performance but I understand. I've added a
assertto check results of each