Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions vortex-row/benches/row_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

use std::sync::Arc;

use arrow_array::DictionaryArray;
use arrow_array::Int64Array;
use arrow_array::PrimitiveArray as ArrowPrimitiveArray;
use arrow_array::StringArray;
use arrow_array::StructArray as ArrowStructArray;
use arrow_array::types::Int32Type;
use arrow_row::RowConverter;
use arrow_row::SortField as ArrowSortField;
use arrow_schema::DataType;
Expand All @@ -38,6 +41,7 @@ use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::StructArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::builders::dict::dict_encode;
use vortex_row::SortField;
use vortex_row::convert_columns;

Expand Down Expand Up @@ -214,3 +218,74 @@ fn constant_i64_vortex_without_kernel(bencher: divan::Bencher) {
convert_columns(&[canonical], &[SortField::default()], &mut ctx).unwrap()
})
}

// ---------- dict_utf8 ----------

fn dict_utf8_inputs() -> (Vec<String>, Vec<String>, Vec<i32>, u64) {
let n_unique = 1024usize;
let unique = gen_words(n_unique, 16, 13);
let mut rng = StdRng::seed_from_u64(17);
let codes: Vec<i32> = (0..N)
.map(|_| rng.random_range(0..n_unique) as i32)
.collect();
let strings: Vec<String> = codes.iter().map(|&c| unique[c as usize].clone()).collect();
let bytes: u64 = strings
.iter()
.map(|w| 1 + (w.len().div_ceil(32) * 33) as u64)
.sum();
(unique, strings, codes, bytes)
}

#[divan::bench]
fn dict_utf8_arrow_dict(bencher: divan::Bencher) {
let (unique, _, codes, total) = dict_utf8_inputs();
let values: Arc<dyn arrow_array::Array> = Arc::new(StringArray::from(unique.clone()));
let dict_arr: DictionaryArray<Int32Type> =
DictionaryArray::new(ArrowPrimitiveArray::from(codes), values);
let arr = Arc::new(dict_arr) as arrow_array::ArrayRef;
let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
))])
.unwrap();
bencher
.counter(BytesCount::new(total))
.bench_local(|| conv.convert_columns(&[arr.clone()]).unwrap())
}

#[divan::bench]
fn dict_utf8_arrow_canonical(bencher: divan::Bencher) {
let (_, strings, _, total) = dict_utf8_inputs();
let arr = Arc::new(StringArray::from(strings.clone())) as arrow_array::ArrayRef;
let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Utf8)]).unwrap();
bencher
.counter(BytesCount::new(total))
.bench_local(|| conv.convert_columns(&[arr.clone()]).unwrap())
}

#[divan::bench]
fn dict_utf8_vortex_with_kernel(bencher: divan::Bencher) {
let (_, strings, _, total) = dict_utf8_inputs();
let raw = VarBinViewArray::from_iter_str(strings.iter().map(String::as_str)).into_array();
let dict = dict_encode(&raw).unwrap().into_array();
bencher.counter(BytesCount::new(total)).bench_local(|| {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
convert_columns(&[dict.clone()], &[SortField::default()], &mut ctx).unwrap()
})
}

#[divan::bench]
fn dict_utf8_vortex_without_kernel(bencher: divan::Bencher) {
let (_, strings, _, total) = dict_utf8_inputs();
let raw = VarBinViewArray::from_iter_str(strings.iter().map(String::as_str)).into_array();
let dict = dict_encode(&raw).unwrap().into_array();
bencher.counter(BytesCount::new(total)).bench_local(|| {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let canonical = dict
.clone()
.execute::<Canonical>(&mut ctx)
.unwrap()
.into_array();
convert_columns(&[canonical], &[SortField::default()], &mut ctx).unwrap()
})
}
148 changes: 134 additions & 14 deletions vortex-row/src/kernels/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,158 @@

//! Row-encode kernels for `DictArray`.
//!
//! Stubs in this commit return `Ok(None)` so the dispatch loop falls back to
//! canonicalization. The real impls land in a follow-up commit.
//! These kernels skip canonicalization by encoding each *unique value* once into a small
//! per-value buffer keyed by code, then materializing the per-row contribution via the codes
//! array. The per-unique-value cost is amortized over the dictionary cardinality rather than
//! the row count.

#![allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
reason = "row encoding indexes into u32-sized buffers; codes are non-negative indices into the values array"
)]

use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::dict::Dict;
use vortex_array::arrays::dict::DictArraySlotsExt;
use vortex_array::dtype::NativePType;
use vortex_array::dtype::PType;
use vortex_array::match_each_integer_ptype;
use vortex_error::VortexResult;

use crate::encode::RowEncodeKernel;
use crate::encode::dispatch_encode;
use crate::options::SortField;
use crate::size::RowSizeKernel;
use crate::size::dispatch_size;

impl RowSizeKernel for Dict {
fn row_size_contribution(
_column: ArrayView<'_, Self>,
_field: SortField,
_sizes: &mut [u32],
_ctx: &mut ExecutionCtx,
column: ArrayView<'_, Self>,
field: SortField,
sizes: &mut [u32],
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<()>> {
Ok(None)
if column.values().len() > column.codes().len() {
return Ok(None);
}
let n_values = column.values().len();
let mut value_sizes = vec![0u32; n_values];
dispatch_size(column.values(), field, &mut value_sizes, ctx)?;

let codes_prim = column.codes().clone().execute::<PrimitiveArray>(ctx)?;
let ptype = codes_prim.ptype();
match_each_integer_ptype!(ptype, |T| {
add_codes_sizes::<T>(&codes_prim, &value_sizes, sizes);
});
Ok(Some(()))
}
}

impl RowEncodeKernel for Dict {
fn row_encode_into(
_column: ArrayView<'_, Self>,
_field: SortField,
_offsets: &[u32],
_cursors: &mut [u32],
_out: &mut [u8],
_ctx: &mut ExecutionCtx,
column: ArrayView<'_, Self>,
field: SortField,
offsets: &[u32],
cursors: &mut [u32],
out: &mut [u8],
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<()>> {
Ok(None)
if column.values().len() > column.codes().len() {
return Ok(None);
}

let n_values = column.values().len();
let mut value_sizes = vec![0u32; n_values];
dispatch_size(column.values(), field, &mut value_sizes, ctx)?;

// Build per-value offsets and a small contiguous per-value encoded buffer.
let mut value_offsets = vec![0u32; n_values + 1];
let mut total: u64 = 0;
for i in 0..n_values {
value_offsets[i] = total as u32;
total += u64::from(value_sizes[i]);
}
value_offsets[n_values] = total as u32;

let mut value_buf = vec![0u8; total as usize];
// Inner dispatch uses zero base offsets (small buffer) with per-value start cursors.
let zero_offsets = vec![0u32; n_values];
let mut inner_cursors = value_offsets[..n_values].to_vec();
dispatch_encode(
column.values(),
field,
&zero_offsets,
&mut inner_cursors,
&mut value_buf,
ctx,
)?;

let codes_prim = column.codes().clone().execute::<PrimitiveArray>(ctx)?;
let ptype = codes_prim.ptype();
match_each_integer_ptype!(ptype, |T| {
copy_codes::<T>(
&codes_prim,
&value_buf,
&value_offsets,
&value_sizes,
offsets,
cursors,
out,
);
});
Ok(Some(()))
}
}

#[inline]
fn add_codes_sizes<T>(codes: &PrimitiveArray, value_sizes: &[u32], sizes: &mut [u32])
where
T: NativePType + Copy + TryInto<usize>,
{
let slice: &[T] = codes.as_slice();
debug_assert_eq!(slice.len(), sizes.len());
if T::PTYPE == PType::U8 {
// SAFETY: T == u8
let raw = unsafe { std::slice::from_raw_parts(slice.as_ptr().cast::<u8>(), slice.len()) };
for (i, &c) in raw.iter().enumerate() {
sizes[i] += value_sizes[c as usize];
}
return;
}
for (i, &c) in slice.iter().enumerate() {
let idx: usize = c
.try_into()
.unwrap_or_else(|_| vortex_error::vortex_panic!("dict code does not fit in usize"));
sizes[i] += value_sizes[idx];
}
}

#[inline]
#[allow(clippy::too_many_arguments)]
fn copy_codes<T>(
codes: &PrimitiveArray,
value_buf: &[u8],
value_offsets: &[u32],
value_sizes: &[u32],
offsets: &[u32],
cursors: &mut [u32],
out: &mut [u8],
) where
T: NativePType + Copy + TryInto<usize>,
{
let slice: &[T] = codes.as_slice();
debug_assert_eq!(slice.len(), cursors.len());
for (i, &c) in slice.iter().enumerate() {
let idx: usize = c
.try_into()
.unwrap_or_else(|_| vortex_error::vortex_panic!("dict code does not fit in usize"));
let v_start = value_offsets[idx] as usize;
let v_size = value_sizes[idx] as usize;
let dst = (offsets[i] + cursors[i]) as usize;
out[dst..dst + v_size].copy_from_slice(&value_buf[v_start..v_start + v_size]);
cursors[i] += v_size as u32;
}
}
23 changes: 23 additions & 0 deletions vortex-row/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use vortex_array::arrays::ListViewArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::listview::ListViewArrayExt;
use vortex_array::builders::dict::dict_encode;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_error::VortexResult;

use crate::SortField;
Expand Down Expand Up @@ -223,6 +226,26 @@ fn nulls_first_and_last() -> VortexResult<()> {
Ok(())
}

#[test]
fn dict_path_matches_canonical() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let raw = VarBinViewArray::from_iter(
vec![Some("a"), Some("bb"), Some("a"), Some("ccc"), Some("bb")],
DType::Utf8(Nullability::NonNullable),
)
.into_array();
let dict_arr = dict_encode(&raw)?.into_array();

let canonical_enc = convert_columns(&[raw], &[SortField::default()], &mut ctx)?;
let dict_enc = convert_columns(&[dict_arr], &[SortField::default()], &mut ctx)?;

assert_eq!(
collect_row_bytes(&canonical_enc),
collect_row_bytes(&dict_enc)
);
Ok(())
}

#[test]
fn constant_path_matches_canonical() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
Expand Down
Loading