From e3355defccb173e8f93202187733371cc326911b Mon Sep 17 00:00:00 2001 From: Richard Date: Sun, 3 May 2026 12:41:23 -0400 Subject: [PATCH 1/2] Add benchmarks for dictionary path of new_group_values --- datafusion/physical-plan/Cargo.toml | 4 + .../benches/dictionary_group_values.rs | 175 ++++++++++++++++++ 2 files changed, 179 insertions(+) create mode 100644 datafusion/physical-plan/benches/dictionary_group_values.rs diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 7acb21b8f3b93..c69f23ee648ca 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -106,3 +106,7 @@ required-features = ["test_utils"] harness = false name = "aggregate_vectorized" required-features = ["test_utils"] + +[[bench]] +harness = false +name = "dictionary_group_values" diff --git a/datafusion/physical-plan/benches/dictionary_group_values.rs b/datafusion/physical-plan/benches/dictionary_group_values.rs new file mode 100644 index 0000000000000..f612564d46a07 --- /dev/null +++ b/datafusion/physical-plan/benches/dictionary_group_values.rs @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for `GroupValues` over a single `Dictionary` +//! column. Each iteration is timed end-to-end: it constructs the +//! `Box` returned by `new_group_values`, runs `intern` +//! once (or N times), and then `emit(EmitTo::All)`. + +use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray}; +use arrow::buffer::{Buffer, NullBuffer}; +use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef}; +use criterion::{ + BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main, +}; +use datafusion_expr::EmitTo; +use datafusion_physical_plan::aggregates::group_values::new_group_values; +use datafusion_physical_plan::aggregates::order::GroupOrdering; +use rand::rngs::StdRng; +use rand::seq::SliceRandom; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +const SIZES: [usize; 2] = [8 * 1024, 64 * 1024]; +const CARDS_RELATIVE: [usize; 4] = [20, 75, 300, 8 * 1024]; +const N_BATCHES: usize = 4; +// Fixed for reproducibility. +const SEED: u64 = 0xD1C7; + +fn dict_schema() -> SchemaRef { + let dict_ty = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + Arc::new(Schema::new(vec![Field::new("g", dict_ty, true)])) +} + +/// Build a `Dictionary` column. +fn make_dict(size: usize, cardinality: usize, null_density: f32, seed: u64) -> ArrayRef { + let strings: Vec = (0..cardinality).map(|i| format!("v_{i:08}")).collect(); + let values = Arc::new(StringArray::from( + strings.iter().map(String::as_str).collect::>(), + )); + + let mut rng = StdRng::seed_from_u64(seed); + let keys: Vec = if cardinality == size { + let mut perm: Vec = (0..size as i32).collect(); + perm.shuffle(&mut rng); + perm + } else { + (0..size) + .map(|_| rng.random_range(0..cardinality) as i32) + .collect() + }; + let keys_buf = Buffer::from_slice_ref(&keys); + + let nulls: Option = (null_density > 0.0).then(|| { + (0..size) + .map(|_| !rng.random_bool(null_density as f64)) + .collect() + }); + + let key_array = PrimitiveArray::::new(keys_buf.into(), nulls); + Arc::new(DictionaryArray::::try_new(key_array, values).unwrap()) +} + +fn bench_id( + label: &str, + size: usize, + cardinality: usize, + null_density: f32, +) -> BenchmarkId { + BenchmarkId::new( + label, + format!("size_{size}_card_{cardinality}_null_{null_density:.2}"), + ) +} + +fn bench_intern_emit(c: &mut Criterion) { + let mut group = c.benchmark_group("dict_intern_emit"); + let schema = dict_schema(); + let null_density = 0.0; + + for &size in &SIZES { + let mut cards = CARDS_RELATIVE.to_vec(); + cards.push(size); // all-unique stress case + for cardinality in cards { + let array = make_dict(size, cardinality, null_density, SEED); + group.throughput(Throughput::Elements(size as u64)); + group.bench_function( + bench_id("intern_emit", size, cardinality, null_density), + |b| { + b.iter_batched_ref( + || { + ( + new_group_values(schema.clone(), &GroupOrdering::None) + .unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + gv.intern(std::slice::from_ref(&array), groups).unwrap(); + black_box(&*groups); + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }, + ); + } + } + group.finish(); +} + +fn bench_repeated_intern_emit(c: &mut Criterion) { + let mut group = c.benchmark_group("dict_repeated_intern_emit"); + let schema = dict_schema(); + let null_density = 0.10; + + for &size in &SIZES { + let mut cards = CARDS_RELATIVE.to_vec(); + cards.push(size); + for cardinality in cards { + let batches: Vec = (0..N_BATCHES) + .map(|i| { + make_dict( + size, + cardinality, + null_density, + SEED.wrapping_add(i as u64), + ) + }) + .collect(); + group.throughput(Throughput::Elements((size * N_BATCHES) as u64)); + group.bench_function( + bench_id("repeated_intern_emit", size, cardinality, null_density), + |b| { + b.iter_batched_ref( + || { + ( + new_group_values(schema.clone(), &GroupOrdering::None) + .unwrap(), + Vec::::with_capacity(size), + ) + }, + |(gv, groups)| { + for arr in &batches { + gv.intern(std::slice::from_ref(arr), groups).unwrap(); + black_box(&*groups); + } + black_box(gv.emit(EmitTo::All).unwrap()); + }, + BatchSize::SmallInput, + ); + }, + ); + } + } + group.finish(); +} + +criterion_group!(benches, bench_intern_emit, bench_repeated_intern_emit); +criterion_main!(benches); From 235e11a7062f16cb25f91cb7eb86b663deb951dd Mon Sep 17 00:00:00 2001 From: Richard Date: Mon, 4 May 2026 15:53:27 -0400 Subject: [PATCH 2/2] revised after PR comments --- .../physical-plan/benches/dictionary_group_values.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/benches/dictionary_group_values.rs b/datafusion/physical-plan/benches/dictionary_group_values.rs index f612564d46a07..ded52aebd1100 100644 --- a/datafusion/physical-plan/benches/dictionary_group_values.rs +++ b/datafusion/physical-plan/benches/dictionary_group_values.rs @@ -16,9 +16,10 @@ // under the License. //! Benchmarks for `GroupValues` over a single `Dictionary` -//! column. Each iteration is timed end-to-end: it constructs the -//! `Box` returned by `new_group_values`, runs `intern` -//! once (or N times), and then `emit(EmitTo::All)`. +//! column. Each iteration measures `intern` (once or N times) followed by +//! `emit(EmitTo::All)`. The `Box` returned by +//! `new_group_values` is constructed in the setup closure of +//! `iter_batched_ref` and is not included in the timing. use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray}; use arrow::buffer::{Buffer, NullBuffer}; @@ -36,7 +37,7 @@ use std::hint::black_box; use std::sync::Arc; const SIZES: [usize; 2] = [8 * 1024, 64 * 1024]; -const CARDS_RELATIVE: [usize; 4] = [20, 75, 300, 8 * 1024]; +const CARDS_RELATIVE: [usize; 4] = [20, 75, 300, 1000]; const N_BATCHES: usize = 4; // Fixed for reproducibility. const SEED: u64 = 0xD1C7;