Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ members = [
"vortex-mask",
"vortex-utils",
"vortex-session",
"vortex-row",
"vortex-flatbuffers",
"vortex-metrics",
"vortex-io",
"vortex-proto",
"vortex-array",
"vortex-row",
"vortex-tensor",
"vortex-turboquant",
"vortex-compressor",
Expand Down Expand Up @@ -103,6 +103,7 @@ arrow-cast = "58"
arrow-data = "58"
arrow-ipc = "58"
arrow-ord = "58"
arrow-row = "58"
arrow-schema = "58"
arrow-select = "58"
arrow-string = "58"
Expand Down
14 changes: 14 additions & 0 deletions vortex-row/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,17 @@ vortex-buffer = { workspace = true }
vortex-error = { workspace = true }
vortex-mask = { workspace = true }
vortex-session = { workspace = true }

[dev-dependencies]
arrow-array = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
divan = { workspace = true }
mimalloc = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
vortex-array = { workspace = true, features = ["_test-harness"] }

[[bench]]
name = "row_encode"
harness = false
177 changes: 177 additions & 0 deletions vortex-row/benches/row_encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

#![expect(
clippy::unwrap_used,
clippy::clone_on_ref_ptr,
clippy::cloned_ref_to_slice_refs,
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::redundant_clone
)]

//! Row-encode throughput benchmarks comparing `arrow-row` against vortex's `convert_columns`
//! for the canonical scenarios shipped in PR 1: a primitive i64 column, a Utf8 column,
//! and a mixed-field struct. Per-encoding fast paths (Constant, Dict, Patched, BitPacked,
//! FoR, Delta) gain their own triplets in PR 3.

use std::sync::Arc;

use arrow_array::Int64Array;
use arrow_array::StringArray;
use arrow_array::StructArray as ArrowStructArray;
use arrow_row::RowConverter;
use arrow_row::SortField as ArrowSortField;
use arrow_schema::DataType;
use arrow_schema::Field;
use divan::counter::BytesCount;
use mimalloc::MiMalloc;
use rand::RngExt;
use rand::SeedableRng;
use rand::distr::Alphanumeric;
use rand::rngs::StdRng;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::StructArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_row::SortField;
use vortex_row::convert_columns;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

const N: usize = 100_000;

fn main() {
divan::main();
}

fn gen_i64(n: usize, seed: u64) -> Vec<i64> {
let mut rng = StdRng::seed_from_u64(seed);
(0..n)
.map(|_| rng.random_range(i64::MIN..i64::MAX))
.collect()
}

fn gen_words(n: usize, mean_len: usize, seed: u64) -> Vec<String> {
let rng = &mut StdRng::seed_from_u64(seed);
(0..n)
.map(|_| {
let len = rng.random_range(mean_len.saturating_sub(4)..=mean_len + 4);
rng.sample_iter(&Alphanumeric)
.take(len)
.map(char::from)
.collect::<String>()
})
.collect()
}

// ---------- primitive_i64 ----------

#[divan::bench]
fn primitive_i64_arrow_row(bencher: divan::Bencher) {
let v = gen_i64(N, 0);
let arr = Arc::new(Int64Array::from(v.clone())) as arrow_array::ArrayRef;
let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Int64)]).unwrap();
let bytes = (N * (1 + 8)) as u64;
bencher
.counter(BytesCount::new(bytes))
.bench_local(|| conv.convert_columns(&[arr.clone()]).unwrap())
}

#[divan::bench]
fn primitive_i64_vortex(bencher: divan::Bencher) {
let v = gen_i64(N, 0);
let col = PrimitiveArray::from_iter(v.clone()).into_array();
let bytes = (N * (1 + 8)) as u64;
bencher.counter(BytesCount::new(bytes)).bench_local(|| {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
convert_columns(&[col.clone()], &[SortField::default()], &mut ctx).unwrap()
})
}

// ---------- utf8 ----------

#[divan::bench]
fn utf8_arrow_row(bencher: divan::Bencher) {
let words = gen_words(N, 16, 7);
let total: u64 = words
.iter()
.map(|w| 1 + (w.len().div_ceil(32) * 33) as u64)
.sum();
let arr = Arc::new(StringArray::from(words.clone())) as arrow_array::ArrayRef;
let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Utf8)]).unwrap();
bencher
.counter(BytesCount::new(total))
.bench_local(|| conv.convert_columns(&[arr.clone()]).unwrap())
}

#[divan::bench]
fn utf8_vortex(bencher: divan::Bencher) {
let words = gen_words(N, 16, 7);
let total: u64 = words
.iter()
.map(|w| 1 + (w.len().div_ceil(32) * 33) as u64)
.sum();
let col = VarBinViewArray::from_iter_str(words.iter().map(String::as_str)).into_array();
bencher.counter(BytesCount::new(total)).bench_local(|| {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
convert_columns(&[col.clone()], &[SortField::default()], &mut ctx).unwrap()
})
}

// ---------- struct_mixed ----------

fn struct_mixed_inputs() -> (Vec<i64>, Vec<String>, u64) {
let ids = gen_i64(N, 1);
let names = gen_words(N, 16, 2);
// sentinel (1) + i64 (1+8=9) + utf8-name (1 + ceil(len/32)*33)
let total: u64 = (0..N)
.map(|i| {
let name_bytes = 1 + (names[i].len().div_ceil(32) * 33) as u64;
1u64 + 9u64 + name_bytes
})
.sum();
(ids, names, total)
}

#[divan::bench]
fn struct_mixed_arrow_row(bencher: divan::Bencher) {
let (ids, names, total) = struct_mixed_inputs();
let id_arr = Arc::new(Int64Array::from(ids)) as arrow_array::ArrayRef;
let name_arr = Arc::new(StringArray::from(names)) as arrow_array::ArrayRef;
let arrow_struct = Arc::new(ArrowStructArray::from(vec![
(Arc::new(Field::new("id", DataType::Int64, false)), id_arr),
(
Arc::new(Field::new("name", DataType::Utf8, false)),
name_arr,
),
])) as arrow_array::ArrayRef;
let struct_fields = vec![
Arc::new(Field::new("id", DataType::Int64, false)),
Arc::new(Field::new("name", DataType::Utf8, false)),
];
let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Struct(
struct_fields.into(),
))])
.unwrap();
bencher
.counter(BytesCount::new(total))
.bench_local(|| conv.convert_columns(&[arrow_struct.clone()]).unwrap())
}

#[divan::bench]
fn struct_mixed_vortex(bencher: divan::Bencher) {
let (ids, names, total) = struct_mixed_inputs();
let id_arr = PrimitiveArray::from_iter(ids).into_array();
let name_arr = VarBinViewArray::from_iter_str(names.iter().map(String::as_str)).into_array();
let struct_arr = StructArray::from_fields(&[("id", id_arr), ("name", name_arr)])
.unwrap()
.into_array();
bencher.counter(BytesCount::new(total)).bench_local(|| {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
convert_columns(&[struct_arr.clone()], &[SortField::default()], &mut ctx).unwrap()
})
}
10 changes: 10 additions & 0 deletions vortex-row/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ pub fn vortex_row::codec::field_size(&vortex_array::canonical::Canonical, vortex

pub fn vortex_row::codec::row_width_for_dtype(&vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_row::codec::RowWidth>

pub mod vortex_row::convert

pub fn vortex_row::convert::compute_row_sizes(&[vortex_array::array::erased::ArrayRef], &[vortex_row::options::SortField], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub fn vortex_row::convert::convert_columns(&[vortex_array::array::erased::ArrayRef], &[vortex_row::options::SortField], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::listview::vtable::ListViewArray>

pub mod vortex_row::encode

pub struct vortex_row::encode::RowEncode
Expand Down Expand Up @@ -410,4 +416,8 @@ pub trait vortex_row::RowSizeKernel: vortex_array::array::vtable::VTable

pub fn vortex_row::RowSizeKernel::row_size_contribution(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &mut [u32], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<()>>

pub fn vortex_row::compute_row_sizes(&[vortex_array::array::erased::ArrayRef], &[vortex_row::options::SortField], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub fn vortex_row::convert_columns(&[vortex_array::array::erased::ArrayRef], &[vortex_row::options::SortField], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::listview::vtable::ListViewArray>

pub fn vortex_row::initialize(&vortex_session::VortexSession)
75 changes: 75 additions & 0 deletions vortex-row/src/convert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! User-facing entry point: turn N columnar arrays into one row-encoded `ListView<u8>`.

use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::arrays::ListViewArray;
use vortex_array::scalar_fn::ScalarFnVTable;
use vortex_array::scalar_fn::VecExecutionArgs;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;

use crate::encode::RowEncode;
use crate::options::RowEncodeOptions;
use crate::options::SortField;
use crate::size::RowSize;

/// Convert N columnar arrays into a single row-oriented [`ListViewArray`] of `u8` whose
/// bytes are lexicographically comparable in the same order as a tuple comparison of the
/// input values according to `fields`.
pub fn convert_columns(
cols: &[ArrayRef],
fields: &[SortField],
ctx: &mut ExecutionCtx,
) -> VortexResult<ListViewArray> {
if cols.len() != fields.len() {
vortex_bail!(
"convert_columns: cols.len() ({}) does not match fields.len() ({})",
cols.len(),
fields.len()
);
}
if cols.is_empty() {
vortex_bail!("convert_columns: at least one column is required");
}
let nrows = cols[0].len();
for (i, col) in cols.iter().enumerate() {
if col.len() != nrows {
vortex_bail!(
"convert_columns: column {} has length {} but expected {}",
i,
col.len(),
nrows
);
}
}

let options = RowEncodeOptions::new(fields.iter().copied());
let args = VecExecutionArgs::new(cols.to_vec(), nrows);
let result = RowEncode.execute(&options, &args, ctx)?;
result.execute::<ListViewArray>(ctx)
}

/// Compute only the per-row sizes (in bytes) of the row-encoded form for N columns.
pub fn compute_row_sizes(
cols: &[ArrayRef],
fields: &[SortField],
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
if cols.len() != fields.len() {
vortex_bail!(
"compute_row_sizes: cols.len() ({}) does not match fields.len() ({})",
cols.len(),
fields.len()
);
}
if cols.is_empty() {
vortex_bail!("compute_row_sizes: at least one column is required");
}
let nrows = cols[0].len();
let options = RowEncodeOptions::new(fields.iter().copied());
let args = VecExecutionArgs::new(cols.to_vec(), nrows);
RowSize.execute(&options, &args, ctx)
}
41 changes: 36 additions & 5 deletions vortex-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,54 @@

//! Row-oriented byte encoder, analogous to Apache Arrow's `arrow-row` crate.
//!
//! Subsequent commits add the encoder, decoder helpers, and per-encoding fast paths.
//! This commit only establishes the crate skeleton and an `initialize` stub.
//! The encoder converts N columnar arrays into a single `List<u8>` array where each row's
//! bytes are lexicographically comparable in the same order as a tuple comparison of the
//! original values. This is useful for sorting, hashing into row containers, and other
//! operations that benefit from a sort-friendly opaque byte representation of a multi-column
//! key.
//!
//! Two variadic scalar functions drive the implementation:
//! - [`RowSize`] computes per-row byte sizes across all N input columns.
//! - [`RowEncode`] writes the row-encoded bytes into a single `ListView<u8>` accumulator
//! in one left-to-right pass.
//!
//! Each scalar function exposes a per-encoding fast-path trait
//! ([`RowSizeKernel`] / [`RowEncodeKernel`]) for downstream encodings to plug into; PR 3
//! adds in-crate impls for `Constant`, `Dict`, and `Patched` and an inventory-based
//! registry for external encodings.
//!
//! The user-facing entry point is [`convert_columns`].
//!
//! Row-encoding scalar functions are not registered in the default
//! [`VortexSession`]. Call [`initialize`] on a session to make `RowSize` and `RowEncode`
//! available via the expression layer.

pub mod codec;
pub mod convert;
pub mod encode;
pub mod options;
pub mod size;

#[cfg(test)]
mod tests;

pub use convert::compute_row_sizes;
pub use convert::convert_columns;
pub use encode::RowEncode;
pub use encode::RowEncodeKernel;
pub use options::RowEncodeOptions;
pub use options::SortField;
pub use size::RowSize;
pub use size::RowSizeKernel;
use vortex_array::scalar_fn::session::ScalarFnSessionExt;
use vortex_session::VortexSession;

/// Register the row-encoding scalar functions on the given session.
/// Register the row-encoding scalar functions ([`RowSize`] and [`RowEncode`]) on the given
/// session.
///
/// Currently a stub: subsequent commits register `RowSize` and `RowEncode` here.
pub fn initialize(_session: &VortexSession) {}
/// Call once on session construction if you want row encoding available via the expression
/// layer or via [`convert_columns`].
pub fn initialize(session: &VortexSession) {
session.scalar_fns().register(RowSize);
session.scalar_fns().register(RowEncode);
}
Loading
Loading