diff --git a/Cargo.lock b/Cargo.lock index 0b84f6dd260..23ca027d592 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11038,7 +11038,14 @@ dependencies = [ name = "vortex-row" version = "0.1.0" dependencies = [ + "arrow-array 58.2.0", + "arrow-row 58.2.0", + "arrow-schema 58.2.0", "bytes", + "codspeed-divan-compat", + "mimalloc", + "rand 0.10.1", + "rstest", "smallvec", "vortex-array", "vortex-buffer", diff --git a/Cargo.toml b/Cargo.toml index 9fae5b564bf..cdf28137563 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,12 +7,12 @@ members = [ "vortex-mask", "vortex-utils", "vortex-session", - "vortex-row", "vortex-flatbuffers", "vortex-metrics", "vortex-io", "vortex-proto", "vortex-array", + "vortex-row", "vortex-tensor", "vortex-turboquant", "vortex-compressor", @@ -103,6 +103,7 @@ arrow-cast = "58" arrow-data = "58" arrow-ipc = "58" arrow-ord = "58" +arrow-row = "58" arrow-schema = "58" arrow-select = "58" arrow-string = "58" diff --git a/vortex-row/Cargo.toml b/vortex-row/Cargo.toml index aaed9a55f51..50d6547474a 100644 --- a/vortex-row/Cargo.toml +++ b/vortex-row/Cargo.toml @@ -24,3 +24,17 @@ vortex-buffer = { workspace = true } vortex-error = { workspace = true } vortex-mask = { workspace = true } vortex-session = { workspace = true } + +[dev-dependencies] +arrow-array = { workspace = true } +arrow-row = { workspace = true } +arrow-schema = { workspace = true } +divan = { workspace = true } +mimalloc = { workspace = true } +rand = { workspace = true } +rstest = { workspace = true } +vortex-array = { workspace = true, features = ["_test-harness"] } + +[[bench]] +name = "row_encode" +harness = false diff --git a/vortex-row/benches/row_encode.rs b/vortex-row/benches/row_encode.rs new file mode 100644 index 00000000000..8d631d785da --- /dev/null +++ b/vortex-row/benches/row_encode.rs @@ -0,0 +1,177 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![expect( + clippy::unwrap_used, + clippy::clone_on_ref_ptr, + clippy::cloned_ref_to_slice_refs, + clippy::cast_possible_truncation, + clippy::cast_possible_wrap, + clippy::redundant_clone +)] + +//! Row-encode throughput benchmarks comparing `arrow-row` against vortex's `convert_columns` +//! for the canonical scenarios shipped in PR 1: a primitive i64 column, a Utf8 column, +//! and a mixed-field struct. Per-encoding fast paths (Constant, Dict, Patched, BitPacked, +//! FoR, Delta) gain their own triplets in PR 3. + +use std::sync::Arc; + +use arrow_array::Int64Array; +use arrow_array::StringArray; +use arrow_array::StructArray as ArrowStructArray; +use arrow_row::RowConverter; +use arrow_row::SortField as ArrowSortField; +use arrow_schema::DataType; +use arrow_schema::Field; +use divan::counter::BytesCount; +use mimalloc::MiMalloc; +use rand::RngExt; +use rand::SeedableRng; +use rand::distr::Alphanumeric; +use rand::rngs::StdRng; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::StructArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_row::SortField; +use vortex_row::convert_columns; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +const N: usize = 100_000; + +fn main() { + divan::main(); +} + +fn gen_i64(n: usize, seed: u64) -> Vec { + let mut rng = StdRng::seed_from_u64(seed); + (0..n) + .map(|_| rng.random_range(i64::MIN..i64::MAX)) + .collect() +} + +fn gen_words(n: usize, mean_len: usize, seed: u64) -> Vec { + let rng = &mut StdRng::seed_from_u64(seed); + (0..n) + .map(|_| { + let len = rng.random_range(mean_len.saturating_sub(4)..=mean_len + 4); + rng.sample_iter(&Alphanumeric) + .take(len) + .map(char::from) + .collect::() + }) + .collect() +} + +// ---------- primitive_i64 ---------- + +#[divan::bench] +fn primitive_i64_arrow_row(bencher: divan::Bencher) { + let v = gen_i64(N, 0); + let arr = Arc::new(Int64Array::from(v.clone())) as arrow_array::ArrayRef; + let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Int64)]).unwrap(); + let bytes = (N * (1 + 8)) as u64; + bencher + .counter(BytesCount::new(bytes)) + .bench_local(|| conv.convert_columns(&[arr.clone()]).unwrap()) +} + +#[divan::bench] +fn primitive_i64_vortex(bencher: divan::Bencher) { + let v = gen_i64(N, 0); + let col = PrimitiveArray::from_iter(v.clone()).into_array(); + let bytes = (N * (1 + 8)) as u64; + bencher.counter(BytesCount::new(bytes)).bench_local(|| { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + convert_columns(&[col.clone()], &[SortField::default()], &mut ctx).unwrap() + }) +} + +// ---------- utf8 ---------- + +#[divan::bench] +fn utf8_arrow_row(bencher: divan::Bencher) { + let words = gen_words(N, 16, 7); + let total: u64 = words + .iter() + .map(|w| 1 + (w.len().div_ceil(32) * 33) as u64) + .sum(); + let arr = Arc::new(StringArray::from(words.clone())) as arrow_array::ArrayRef; + let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Utf8)]).unwrap(); + bencher + .counter(BytesCount::new(total)) + .bench_local(|| conv.convert_columns(&[arr.clone()]).unwrap()) +} + +#[divan::bench] +fn utf8_vortex(bencher: divan::Bencher) { + let words = gen_words(N, 16, 7); + let total: u64 = words + .iter() + .map(|w| 1 + (w.len().div_ceil(32) * 33) as u64) + .sum(); + let col = VarBinViewArray::from_iter_str(words.iter().map(String::as_str)).into_array(); + bencher.counter(BytesCount::new(total)).bench_local(|| { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + convert_columns(&[col.clone()], &[SortField::default()], &mut ctx).unwrap() + }) +} + +// ---------- struct_mixed ---------- + +fn struct_mixed_inputs() -> (Vec, Vec, u64) { + let ids = gen_i64(N, 1); + let names = gen_words(N, 16, 2); + // sentinel (1) + i64 (1+8=9) + utf8-name (1 + ceil(len/32)*33) + let total: u64 = (0..N) + .map(|i| { + let name_bytes = 1 + (names[i].len().div_ceil(32) * 33) as u64; + 1u64 + 9u64 + name_bytes + }) + .sum(); + (ids, names, total) +} + +#[divan::bench] +fn struct_mixed_arrow_row(bencher: divan::Bencher) { + let (ids, names, total) = struct_mixed_inputs(); + let id_arr = Arc::new(Int64Array::from(ids)) as arrow_array::ArrayRef; + let name_arr = Arc::new(StringArray::from(names)) as arrow_array::ArrayRef; + let arrow_struct = Arc::new(ArrowStructArray::from(vec![ + (Arc::new(Field::new("id", DataType::Int64, false)), id_arr), + ( + Arc::new(Field::new("name", DataType::Utf8, false)), + name_arr, + ), + ])) as arrow_array::ArrayRef; + let struct_fields = vec![ + Arc::new(Field::new("id", DataType::Int64, false)), + Arc::new(Field::new("name", DataType::Utf8, false)), + ]; + let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Struct( + struct_fields.into(), + ))]) + .unwrap(); + bencher + .counter(BytesCount::new(total)) + .bench_local(|| conv.convert_columns(&[arrow_struct.clone()]).unwrap()) +} + +#[divan::bench] +fn struct_mixed_vortex(bencher: divan::Bencher) { + let (ids, names, total) = struct_mixed_inputs(); + let id_arr = PrimitiveArray::from_iter(ids).into_array(); + let name_arr = VarBinViewArray::from_iter_str(names.iter().map(String::as_str)).into_array(); + let struct_arr = StructArray::from_fields(&[("id", id_arr), ("name", name_arr)]) + .unwrap() + .into_array(); + bencher.counter(BytesCount::new(total)).bench_local(|| { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + convert_columns(&[struct_arr.clone()], &[SortField::default()], &mut ctx).unwrap() + }) +} diff --git a/vortex-row/public-api.lock b/vortex-row/public-api.lock index f999303948d..ed231a1e556 100644 --- a/vortex-row/public-api.lock +++ b/vortex-row/public-api.lock @@ -102,6 +102,12 @@ pub fn vortex_row::codec::field_size(&vortex_array::canonical::Canonical, vortex pub fn vortex_row::codec::row_width_for_dtype(&vortex_array::dtype::DType) -> vortex_error::VortexResult +pub mod vortex_row::convert + +pub fn vortex_row::convert::compute_row_sizes(&[vortex_array::array::erased::ArrayRef], &[vortex_row::options::SortField], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_row::convert::convert_columns(&[vortex_array::array::erased::ArrayRef], &[vortex_row::options::SortField], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + pub mod vortex_row::encode pub struct vortex_row::encode::RowEncode @@ -410,4 +416,8 @@ pub trait vortex_row::RowSizeKernel: vortex_array::array::vtable::VTable pub fn vortex_row::RowSizeKernel::row_size_contribution(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &mut [u32], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> +pub fn vortex_row::compute_row_sizes(&[vortex_array::array::erased::ArrayRef], &[vortex_row::options::SortField], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_row::convert_columns(&[vortex_array::array::erased::ArrayRef], &[vortex_row::options::SortField], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + pub fn vortex_row::initialize(&vortex_session::VortexSession) diff --git a/vortex-row/src/convert.rs b/vortex-row/src/convert.rs new file mode 100644 index 00000000000..c3b06d92748 --- /dev/null +++ b/vortex-row/src/convert.rs @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! User-facing entry point: turn N columnar arrays into one row-encoded `ListView`. + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::ListViewArray; +use vortex_array::scalar_fn::ScalarFnVTable; +use vortex_array::scalar_fn::VecExecutionArgs; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; + +use crate::encode::RowEncode; +use crate::options::RowEncodeOptions; +use crate::options::SortField; +use crate::size::RowSize; + +/// Convert N columnar arrays into a single row-oriented [`ListViewArray`] of `u8` whose +/// bytes are lexicographically comparable in the same order as a tuple comparison of the +/// input values according to `fields`. +pub fn convert_columns( + cols: &[ArrayRef], + fields: &[SortField], + ctx: &mut ExecutionCtx, +) -> VortexResult { + if cols.len() != fields.len() { + vortex_bail!( + "convert_columns: cols.len() ({}) does not match fields.len() ({})", + cols.len(), + fields.len() + ); + } + if cols.is_empty() { + vortex_bail!("convert_columns: at least one column is required"); + } + let nrows = cols[0].len(); + for (i, col) in cols.iter().enumerate() { + if col.len() != nrows { + vortex_bail!( + "convert_columns: column {} has length {} but expected {}", + i, + col.len(), + nrows + ); + } + } + + let options = RowEncodeOptions::new(fields.iter().copied()); + let args = VecExecutionArgs::new(cols.to_vec(), nrows); + let result = RowEncode.execute(&options, &args, ctx)?; + result.execute::(ctx) +} + +/// Compute only the per-row sizes (in bytes) of the row-encoded form for N columns. +pub fn compute_row_sizes( + cols: &[ArrayRef], + fields: &[SortField], + ctx: &mut ExecutionCtx, +) -> VortexResult { + if cols.len() != fields.len() { + vortex_bail!( + "compute_row_sizes: cols.len() ({}) does not match fields.len() ({})", + cols.len(), + fields.len() + ); + } + if cols.is_empty() { + vortex_bail!("compute_row_sizes: at least one column is required"); + } + let nrows = cols[0].len(); + let options = RowEncodeOptions::new(fields.iter().copied()); + let args = VecExecutionArgs::new(cols.to_vec(), nrows); + RowSize.execute(&options, &args, ctx) +} diff --git a/vortex-row/src/lib.rs b/vortex-row/src/lib.rs index ef0209f3d9c..fddcca665c1 100644 --- a/vortex-row/src/lib.rs +++ b/vortex-row/src/lib.rs @@ -3,23 +3,54 @@ //! Row-oriented byte encoder, analogous to Apache Arrow's `arrow-row` crate. //! -//! Subsequent commits add the encoder, decoder helpers, and per-encoding fast paths. -//! This commit only establishes the crate skeleton and an `initialize` stub. +//! The encoder converts N columnar arrays into a single `List` array where each row's +//! bytes are lexicographically comparable in the same order as a tuple comparison of the +//! original values. This is useful for sorting, hashing into row containers, and other +//! operations that benefit from a sort-friendly opaque byte representation of a multi-column +//! key. +//! +//! Two variadic scalar functions drive the implementation: +//! - [`RowSize`] computes per-row byte sizes across all N input columns. +//! - [`RowEncode`] writes the row-encoded bytes into a single `ListView` accumulator +//! in one left-to-right pass. +//! +//! Each scalar function exposes a per-encoding fast-path trait +//! ([`RowSizeKernel`] / [`RowEncodeKernel`]) for downstream encodings to plug into; PR 3 +//! adds in-crate impls for `Constant`, `Dict`, and `Patched` and an inventory-based +//! registry for external encodings. +//! +//! The user-facing entry point is [`convert_columns`]. +//! +//! Row-encoding scalar functions are not registered in the default +//! [`VortexSession`]. Call [`initialize`] on a session to make `RowSize` and `RowEncode` +//! available via the expression layer. pub mod codec; +pub mod convert; pub mod encode; pub mod options; pub mod size; +#[cfg(test)] +mod tests; + +pub use convert::compute_row_sizes; +pub use convert::convert_columns; pub use encode::RowEncode; pub use encode::RowEncodeKernel; pub use options::RowEncodeOptions; pub use options::SortField; pub use size::RowSize; pub use size::RowSizeKernel; +use vortex_array::scalar_fn::session::ScalarFnSessionExt; use vortex_session::VortexSession; -/// Register the row-encoding scalar functions on the given session. +/// Register the row-encoding scalar functions ([`RowSize`] and [`RowEncode`]) on the given +/// session. /// -/// Currently a stub: subsequent commits register `RowSize` and `RowEncode` here. -pub fn initialize(_session: &VortexSession) {} +/// Call once on session construction if you want row encoding available via the expression +/// layer or via [`convert_columns`]. +pub fn initialize(session: &VortexSession) { + session.scalar_fns().register(RowSize); + session.scalar_fns().register(RowEncode); +} diff --git a/vortex-row/src/tests.rs b/vortex-row/src/tests.rs new file mode 100644 index 00000000000..ff7d8fb274a --- /dev/null +++ b/vortex-row/src/tests.rs @@ -0,0 +1,324 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow( + clippy::approx_constant, + clippy::cloned_ref_to_slice_refs, + clippy::redundant_clone, + reason = "tests value clarity over micro-optimization" +)] + +//! Tests for the row encoder. + +use rstest::rstest; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::BoolArray; +use vortex_array::arrays::ListViewArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::listview::ListViewArrayExt; +use vortex_error::VortexResult; + +use crate::SortField; +use crate::convert_columns; + +fn collect_row_bytes(array: &ListViewArray) -> Vec> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let nrows = array.len(); + (0..nrows) + .map(|i| { + let slice = array.list_elements_at(i).unwrap(); + let p = slice.execute::(&mut ctx).unwrap(); + p.as_slice::().to_vec() + }) + .collect() +} + +/// Encode each column independently, sort the resulting row bytes, and check the permutation +/// matches the natural sort order of `values`. +fn assert_sort_order_i64(values: Vec, descending: bool) -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let col = PrimitiveArray::from_iter(values.clone()).into_array(); + let field = SortField { + descending, + nulls_first: true, + }; + let encoded = convert_columns(&[col], &[field], &mut ctx)?; + let rows = collect_row_bytes(&encoded); + + // Build expected permutation: sort values naturally then compare to bytes-sorted order. + let mut idx: Vec = (0..values.len()).collect(); + if descending { + idx.sort_by(|a, b| values[*b].cmp(&values[*a])); + } else { + idx.sort_by(|a, b| values[*a].cmp(&values[*b])); + } + let expected_order: Vec> = idx.iter().map(|&i| rows[i].clone()).collect(); + + let mut sorted = rows.clone(); + sorted.sort(); + assert_eq!( + sorted, expected_order, + "Row-encoded bytes do not match natural sort order" + ); + Ok(()) +} + +#[rstest] +#[case::ascending(false)] +#[case::descending(true)] +fn primitive_i64_roundtrip(#[case] descending: bool) -> VortexResult<()> { + let values: Vec = vec![-5, 0, 5, i64::MIN, i64::MAX, 7, -7, 1]; + assert_sort_order_i64(values, descending) +} + +#[test] +fn primitive_u32_sort_order() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values: Vec = vec![0, 1, 100, u32::MAX, 42, 17]; + let col = PrimitiveArray::from_iter(values.clone()).into_array(); + let encoded = convert_columns(&[col], &[SortField::default()], &mut ctx)?; + let rows = collect_row_bytes(&encoded); + + let mut sorted_rows = rows.clone(); + sorted_rows.sort(); + + let mut sorted_idx: Vec = (0..values.len()).collect(); + sorted_idx.sort_by(|a, b| values[*a].cmp(&values[*b])); + let expected: Vec> = sorted_idx.iter().map(|&i| rows[i].clone()).collect(); + assert_eq!(sorted_rows, expected); + Ok(()) +} + +#[test] +fn primitive_f64_sort_order() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + // We use IEEE total-ordering semantics: -0.0 < +0.0 in the byte encoding (matches + // `arrow-row`). Avoid -0.0 in the natural-order baseline since partial_cmp says + // -0.0 == 0.0. + let values: Vec = vec![-1.5, 0.0, 1.5, f64::INFINITY, f64::NEG_INFINITY, 3.14]; + let col = PrimitiveArray::from_iter(values.clone()).into_array(); + let encoded = convert_columns(&[col], &[SortField::default()], &mut ctx)?; + let rows = collect_row_bytes(&encoded); + + let mut sorted_rows = rows.clone(); + sorted_rows.sort(); + + let mut sorted_idx: Vec = (0..values.len()).collect(); + sorted_idx.sort_by(|a, b| values[*a].partial_cmp(&values[*b]).unwrap()); + let expected: Vec> = sorted_idx.iter().map(|&i| rows[i].clone()).collect(); + assert_eq!(sorted_rows, expected); + Ok(()) +} + +#[test] +fn bool_sort_order() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let col = BoolArray::from_iter([true, false, true, false]).into_array(); + let encoded = convert_columns(&[col], &[SortField::default()], &mut ctx)?; + let rows = collect_row_bytes(&encoded); + + let mut sorted = rows.clone(); + sorted.sort(); + // false rows come first (2x), true rows after (2x) + assert_eq!(sorted[0], rows[1]); + assert_eq!(sorted[1], rows[3]); + assert_eq!(sorted[2], rows[0]); + assert_eq!(sorted[3], rows[2]); + Ok(()) +} + +#[test] +fn utf8_sort_order() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values = vec![ + "banana", + "apple", + "", + "cherry", + "ban", + "banana_loaf_for_test", + ]; + let col = VarBinViewArray::from_iter_str(values.clone()).into_array(); + let encoded = convert_columns(&[col], &[SortField::default()], &mut ctx)?; + let rows = collect_row_bytes(&encoded); + + let mut sorted = rows.clone(); + sorted.sort(); + + let mut sorted_idx: Vec = (0..values.len()).collect(); + sorted_idx.sort_by(|a, b| values[*a].cmp(values[*b])); + let expected: Vec> = sorted_idx.iter().map(|&i| rows[i].clone()).collect(); + assert_eq!(sorted, expected); + Ok(()) +} + +#[test] +fn multi_column_sort() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let ints: Vec = vec![1, 2, 1, 2, 1, 3]; + let strs = vec!["b", "a", "a", "b", "c", "z"]; + let col0 = PrimitiveArray::from_iter(ints.clone()).into_array(); + let col1 = VarBinViewArray::from_iter_str(strs.clone()).into_array(); + let encoded = convert_columns( + &[col0, col1], + &[SortField::default(), SortField::default()], + &mut ctx, + )?; + let rows = collect_row_bytes(&encoded); + + let mut sorted = rows.clone(); + sorted.sort(); + let mut idx: Vec = (0..ints.len()).collect(); + idx.sort_by(|a, b| ints[*a].cmp(&ints[*b]).then_with(|| strs[*a].cmp(strs[*b]))); + let expected: Vec> = idx.iter().map(|&i| rows[i].clone()).collect(); + assert_eq!(sorted, expected); + Ok(()) +} + +#[test] +fn nulls_first_and_last() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values: Vec> = vec![Some(5), None, Some(1), None, Some(3)]; + let col = PrimitiveArray::from_option_iter(values.clone()).into_array(); + + // nulls_first=true + let encoded = convert_columns( + &[col.clone()], + &[SortField { + descending: false, + nulls_first: true, + }], + &mut ctx, + )?; + let rows = collect_row_bytes(&encoded); + let mut sorted = rows.clone(); + sorted.sort(); + // The first two sorted entries should be nulls + let null_count = values.iter().filter(|v| v.is_none()).count(); + for i in 0..null_count { + // a null encoded row begins with 0x00 + assert_eq!(sorted[i][0], 0x00); + } + // nulls_first=false + let encoded = convert_columns( + &[col], + &[SortField { + descending: false, + nulls_first: false, + }], + &mut ctx, + )?; + let rows = collect_row_bytes(&encoded); + let mut sorted = rows.clone(); + sorted.sort(); + // The last two sorted entries should be nulls + for i in 0..null_count { + let pos = sorted.len() - 1 - i; + assert_eq!(sorted[pos][0], 0x02); + } + Ok(()) +} + +#[test] +fn struct_sort_order() -> VortexResult<()> { + use vortex_array::arrays::StructArray; + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let ids: Vec = vec![3, 1, 3, 1, 2]; + let names = vec!["b", "a", "a", "b", "z"]; + let id_arr = PrimitiveArray::from_iter(ids.clone()).into_array(); + let name_arr = VarBinViewArray::from_iter_str(names.clone()).into_array(); + let struct_arr = StructArray::from_fields(&[("id", id_arr), ("name", name_arr)])?.into_array(); + + let encoded = convert_columns(&[struct_arr], &[SortField::default()], &mut ctx)?; + let rows = collect_row_bytes(&encoded); + + let mut sorted = rows.clone(); + sorted.sort(); + let mut idx: Vec = (0..ids.len()).collect(); + idx.sort_by(|a, b| ids[*a].cmp(&ids[*b]).then_with(|| names[*a].cmp(names[*b]))); + let expected: Vec> = idx.iter().map(|&i| rows[i].clone()).collect(); + assert_eq!(sorted, expected); + Ok(()) +} + +#[test] +fn row_size_struct_shape() -> VortexResult<()> { + use vortex_array::arrays::Constant; + use vortex_array::arrays::StructArray; + use vortex_array::arrays::struct_::StructArrayExt; + + use crate::compute_row_sizes; + + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let ints: Vec = vec![1, 2, 3, 4, 5]; + let strs = vec!["a", "bb", "ccc", "", "eeeee"]; + let col0 = PrimitiveArray::from_iter(ints).into_array(); + let col1 = VarBinViewArray::from_iter_str(strs).into_array(); + + let sizes = compute_row_sizes( + &[col0, col1], + &[SortField::default(), SortField::default()], + &mut ctx, + )?; + // Shape must be Struct { fixed, var } + let struct_arr = sizes.execute::(&mut ctx)?; + assert_eq!(struct_arr.struct_fields().nfields(), 2); + let fixed = struct_arr.unmasked_field(0); + let var = struct_arr.unmasked_field(1); + + // `fixed` must be ConstantArray with value = encoded i32 width = 1 + 4 = 5. + let fixed_const = fixed + .as_opt::() + .expect("fixed field should be a ConstantArray"); + assert_eq!( + fixed_const.scalar(), + &vortex_array::scalar::Scalar::from(5u32), + "fixed scalar should be encoded primitive i32 width" + ); + + // `var` must be a PrimitiveArray, since we have a varlen column. + let var_prim = var.clone().execute::(&mut ctx)?; + let v: &[u32] = var_prim.as_slice(); + assert_eq!(v.len(), 5); + // empty string: sentinel(1) + 1 byte; non-empty: sentinel(1) + 33 bytes (single block). + let expected: Vec = vec![34, 34, 34, 2, 34]; + assert_eq!(v, expected.as_slice()); + Ok(()) +} + +#[test] +fn single_buffer_invariant() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + // Encoded rows here are all > 12 bytes, forcing the Ref-view path that points back into + // the shared data buffer. + let nrows = 64usize; + let primitives: Vec = (0..nrows as i64).collect(); + let strings: Vec = (0..nrows) + .map(|i| format!("row_{}_with_padding", i)) + .collect(); + let col0 = PrimitiveArray::from_iter(primitives.clone()).into_array(); + let col1 = VarBinViewArray::from_iter_str(strings.iter().map(String::as_str)).into_array(); + let encoded = convert_columns( + &[col0, col1], + &[SortField::default(), SortField::default()], + &mut ctx, + )?; + + let rows = collect_row_bytes(&encoded); + let expected_total: usize = rows.iter().map(|r| r.len()).sum(); + + // The shared data buffer holds the contiguous concatenation of every row's encoded bytes; + // per-row allocations would produce many small buffers instead of one shared buffer. + // ListView's elements array is a single contiguous primitive (u8) array; its length + // equals the sum of all per-row sizes. A per-row allocation strategy would instead + // produce N separate elements arrays or a sparse one. + let elements_len = encoded.elements().len(); + assert_eq!( + elements_len, expected_total, + "elements buffer size mismatch" + ); + Ok(()) +}