Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 107 additions & 146 deletions datafusion/core/benches/topk_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ fn run(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool, asc: boo
black_box(rt.block_on(async { aggregate(ctx, limit, use_topk, asc).await })).unwrap();
}

fn run_string(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool) {
fn run_string(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool) -> String {
black_box(rt.block_on(async { aggregate_string(ctx, limit, use_topk).await }))
.unwrap();
.unwrap()
}

fn run_distinct(
Expand Down Expand Up @@ -187,7 +187,7 @@ async fn aggregate_string(
ctx: SessionContext,
limit: usize,
use_topk: bool,
) -> Result<()> {
) -> Result<String> {
let sql = format!(
"select max(trace_id) from traces group by timestamp_ms order by max(trace_id) desc limit {limit};"
);
Expand All @@ -204,7 +204,7 @@ async fn aggregate_string(
let batch = batches.first().unwrap();
assert_eq!(batch.num_rows(), LIMIT);

Ok(())
Ok(format!("{}", pretty_format_batches(&batches)?))
}

async fn aggregate_distinct(
Expand Down Expand Up @@ -290,152 +290,113 @@ fn criterion_benchmark(c: &mut Criterion) {
let limit = LIMIT;
let partitions = 10;
let samples = 1_000_000;
let total_rows = partitions * samples;

// Numeric aggregate benchmarks
// (asc, use_topk, use_view)
let numeric_cases: &[(&str, bool, bool, bool)] = &[
("aggregate {rows} time-series rows", false, false, false),
("aggregate {rows} worst-case rows", true, false, false),
(
"top k={limit} aggregate {rows} time-series rows",
false,
true,
false,
),
(
"top k={limit} aggregate {rows} worst-case rows",
true,
true,
false,
),
(
"top k={limit} aggregate {rows} time-series rows [Utf8View]",
false,
true,
true,
),
(
"top k={limit} aggregate {rows} worst-case rows [Utf8View]",
true,
true,
true,
),
];
for &(name_tpl, asc, use_topk, use_view) in numeric_cases {
let name = name_tpl
.replace("{rows}", &total_rows.to_string())
.replace("{limit}", &limit.to_string());
let ctx = rt
.block_on(create_context(partitions, samples, asc, use_topk, use_view))
.unwrap();
c.bench_function(&name, |b| {
b.iter(|| run(&rt, ctx.clone(), limit, use_topk, asc))
});
}

let ctx = rt
.block_on(create_context(partitions, samples, false, false, false))
.unwrap();
c.bench_function(
format!("aggregate {} time-series rows", partitions * samples).as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)),
);

let ctx = rt
.block_on(create_context(partitions, samples, true, false, false))
.unwrap();
c.bench_function(
format!("aggregate {} worst-case rows", partitions * samples).as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)),
);

let ctx = rt
.block_on(create_context(partitions, samples, false, true, false))
.unwrap();
c.bench_function(
format!(
"top k={limit} aggregate {} time-series rows",
partitions * samples
)
.as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
);

let ctx = rt
.block_on(create_context(partitions, samples, true, true, false))
.unwrap();
c.bench_function(
format!(
"top k={limit} aggregate {} worst-case rows",
partitions * samples
)
.as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
);

// Utf8View schema,time-series rows
let ctx = rt
.block_on(create_context(partitions, samples, false, true, true))
.unwrap();
c.bench_function(
format!(
"top k={limit} aggregate {} time-series rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
);

// Utf8View schema,worst-case rows
let ctx = rt
.block_on(create_context(partitions, samples, true, true, true))
.unwrap();
c.bench_function(
format!(
"top k={limit} aggregate {} worst-case rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
);

// String aggregate benchmarks - grouping by timestamp, aggregating string column
let ctx = rt
.block_on(create_context(partitions, samples, false, true, false))
.unwrap();
c.bench_function(
format!(
"top k={limit} string aggregate {} time-series rows [Utf8]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
);

let ctx = rt
.block_on(create_context(partitions, samples, true, true, false))
.unwrap();
c.bench_function(
format!(
"top k={limit} string aggregate {} worst-case rows [Utf8]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
);

let ctx = rt
.block_on(create_context(partitions, samples, false, true, true))
.unwrap();
c.bench_function(
format!(
"top k={limit} string aggregate {} time-series rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
);
for asc in [false, true] {
for use_topk in [false, true] {
let ctx_utf8 = rt
.block_on(create_context(partitions, samples, asc, use_topk, false))
.unwrap();
let ctx_view = rt
.block_on(create_context(partitions, samples, asc, use_topk, true))
.unwrap();
let result_utf8 = run_string(&rt, ctx_utf8, limit, use_topk);
let result_view = run_string(&rt, ctx_view, limit, use_topk);
assert_eq!(
result_utf8, result_view,
"Utf8 vs Utf8View mismatch for asc={asc}, use_topk={use_topk}"
);
}
}

let ctx = rt
.block_on(create_context(partitions, samples, true, true, true))
.unwrap();
c.bench_function(
format!(
"top k={limit} string aggregate {} worst-case rows [Utf8View]",
partitions * samples
)
.as_str(),
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
);
// String aggregate benchmarks
// (asc, use_topk, use_view)
let string_cases: &[(bool, bool, bool)] = &[
(false, false, false),
(true, false, false),
(false, false, true),
(true, false, true),
(false, true, false),
(true, true, false),
(false, true, true),
(true, true, true),
];
for &(asc, use_topk, use_view) in string_cases {
let scenario = if asc { "worst-case" } else { "time-series" };
let type_label = if use_view { "Utf8View" } else { "Utf8" };
let name = if use_topk {
format!(
"top k={limit} string aggregate {total_rows} {scenario} rows [{type_label}]"
)
} else {
format!("string aggregate {total_rows} {scenario} rows [{type_label}]")
};
let ctx = rt
.block_on(create_context(partitions, samples, asc, use_topk, use_view))
.unwrap();
c.bench_function(&name, |b| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new TopK-disabled string cases expand the matrix, but they still only go through run_string()/aggregate_string(), which currently checks row-count and whether the physical plan contains lim=[...]. That means this PR does not actually verify correctness across Utf8 and Utf8View group keys, even though that is part of the motivation. Can we strengthen the string benchmark path with an expected-result assertion (or add a dedicated helper/test that compares Utf8 vs Utf8View output for both TopK modes) so the new variants catch ordering/value regressions instead of only plan-shape changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrongly assumed we wanted to only check performance but I understand. I've added a assert to check results of each

b.iter(|| run_string(&rt, ctx.clone(), limit, use_topk))
});
}

// DISTINCT benchmarks
let ctx = rt.block_on(async {
create_context_distinct(partitions, samples, false)
.await
.unwrap()
});
c.bench_function(
format!("distinct {} rows desc [no TopK]", partitions * samples).as_str(),
|b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, false)),
);

c.bench_function(
format!("distinct {} rows asc [no TopK]", partitions * samples).as_str(),
|b| b.iter(|| run_distinct(&rt, ctx.clone(), limit, false, true)),
);

let ctx_topk = rt.block_on(async {
create_context_distinct(partitions, samples, true)
.await
.unwrap()
});
c.bench_function(
format!("distinct {} rows desc [TopK]", partitions * samples).as_str(),
|b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, false)),
);

c.bench_function(
format!("distinct {} rows asc [TopK]", partitions * samples).as_str(),
|b| b.iter(|| run_distinct(&rt, ctx_topk.clone(), limit, true, true)),
);
for use_topk in [false, true] {
let ctx = rt.block_on(async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactor now rebuilds the DISTINCT input/context once per (use_topk, asc) pair, even though asc only affects the query text and the old version shared one context per TopK mode. It is outside the timed loop, so not a benchmark-result bug, but it does add a lot of setup work for 10M rows. Could we hoist context creation by use_topk again, or extract a small helper that caches the two contexts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand I think I've resolved this by using and cloning the session (shallow copy afais) but can you check if possible

create_context_distinct(partitions, samples, use_topk)
.await
.unwrap()
});
let topk_label = if use_topk { "TopK" } else { "no TopK" };
for asc in [false, true] {
let dir = if asc { "asc" } else { "desc" };
let name = format!("distinct {total_rows} rows {dir} [{topk_label}]");
c.bench_function(&name, |b| {
b.iter(|| run_distinct(&rt, ctx.clone(), limit, use_topk, asc))
});
}
}
}

criterion_group!(benches, criterion_benchmark);
Expand Down
Loading