From d8b9bce95600f8049461ee0ca4da153bca971726 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 17 May 2026 22:31:56 +0000 Subject: [PATCH] RunEnd row-encode kernel (vortex-runend) Add a row-encode kernel for `RunEnd` arrays via the inventory-based registry: the encoding lives in `vortex-runend` which depends on `vortex-array` (not the other way around), so a direct downcast inside `dispatch_size` / `dispatch_encode` would create a cycle. The kernel is functionally analogous to the Dict kernel: encode each unique run-value once into a small per-value buffer, then broadcast the value's encoded bytes across each row in its run. The per-unique-value cost is amortized over the number of runs rather than the row count. `walk_runs` translates the run-end array's `(prev_end, curr_end)` windows into `(start_logical, stop_logical)` row ranges accounting for the array's slice offset and length. When ends.len() > len (very sparse runs, or pathological inputs) the kernel declines so canonicalization stays the dominant path. Includes a round-trip test in `compute/row_encode.rs` checking that the RunEnd path matches the canonical path bit-for-bit. Signed-off-by: Claude --- Cargo.lock | 2 + encodings/runend/Cargo.toml | 2 + encodings/runend/src/compute/mod.rs | 1 + encodings/runend/src/compute/row_encode.rs | 214 +++++++++++++++++++++ 4 files changed, 219 insertions(+) create mode 100644 encodings/runend/src/compute/row_encode.rs diff --git a/Cargo.lock b/Cargo.lock index 86cec8f1247..5a7a18061ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11064,6 +11064,7 @@ dependencies = [ "arrow-array 58.2.0", "arrow-schema 58.2.0", "codspeed-divan-compat", + "inventory", "itertools 0.14.0", "num-traits", "prost 0.14.3", @@ -11073,6 +11074,7 @@ dependencies = [ "vortex-buffer", "vortex-error", "vortex-mask", + "vortex-row", "vortex-session", ] diff --git a/encodings/runend/Cargo.toml b/encodings/runend/Cargo.toml index 01a5b8d7a3e..0ec0302a4a8 100644 --- a/encodings/runend/Cargo.toml +++ b/encodings/runend/Cargo.toml @@ -16,6 +16,7 @@ version = { workspace = true } [dependencies] arbitrary = { workspace = true, optional = true } arrow-array = { workspace = true, optional = true } +inventory = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } prost = { workspace = true } @@ -23,6 +24,7 @@ vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-error = { workspace = true } vortex-mask = { workspace = true } +vortex-row = { workspace = true } vortex-session = { workspace = true } [lints] diff --git a/encodings/runend/src/compute/mod.rs b/encodings/runend/src/compute/mod.rs index 9bdb6d67c00..2b9ba9092ad 100644 --- a/encodings/runend/src/compute/mod.rs +++ b/encodings/runend/src/compute/mod.rs @@ -8,6 +8,7 @@ pub(crate) mod filter; pub(crate) mod is_constant; pub(crate) mod is_sorted; pub(crate) mod min_max; +mod row_encode; pub(crate) mod take; pub(crate) mod take_from; diff --git a/encodings/runend/src/compute/row_encode.rs b/encodings/runend/src/compute/row_encode.rs new file mode 100644 index 00000000000..aa24fd9eea7 --- /dev/null +++ b/encodings/runend/src/compute/row_encode.rs @@ -0,0 +1,214 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Row-encode kernels for `RunEndArray`. +//! +//! Like `Dict`, the per-row size and per-row encoded bytes are determined by the column's +//! *values*, so we encode each run-value once and broadcast it across the indices in that +//! run. The per-unique-value cost is amortized over the number of runs rather than the +//! row count. + +#![allow( + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + reason = "row encoding indexes into u32-sized buffers; ends are non-negative" +)] + +use num_traits::AsPrimitive; +use vortex_array::ArrayId; +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::dtype::NativePType; +use vortex_array::match_each_integer_ptype; +use vortex_error::VortexResult; +use vortex_row::RowEncodeRegistration; +use vortex_row::encode::dispatch_encode; +use vortex_row::options::SortField; +use vortex_row::size::dispatch_size; + +use crate::RunEnd; +use crate::RunEndArrayExt; + +/// Function pointer registered for the size contribution of a `RunEnd` column. +fn run_end_size_contribution( + column: &ArrayRef, + field: SortField, + sizes: &mut [u32], + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let Some(view) = column.as_opt::() else { + return Ok(None); + }; + let nruns = view.ends().len(); + if nruns > view.len() { + return Ok(None); + } + + let mut value_sizes = vec![0u32; view.values().len()]; + dispatch_size(view.values(), field, &mut value_sizes, ctx)?; + + let offset = view.offset() as u64; + let len = view.len(); + let ends_prim = view.ends().clone().execute::(ctx)?; + + match_each_integer_ptype!(ends_prim.ptype(), |E| { + let ends = ends_prim.as_slice::(); + walk_runs::(ends, offset, len, |run_idx, start, stop| { + let add = value_sizes[run_idx]; + if add == 0 { + return; + } + for s in &mut sizes[start..stop] { + *s += add; + } + }); + }); + Ok(Some(())) +} + +/// Function pointer registered for the per-row encode of a `RunEnd` column. +fn run_end_encode_into( + column: &ArrayRef, + field: SortField, + offsets: &[u32], + cursors: &mut [u32], + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let Some(view) = column.as_opt::() else { + return Ok(None); + }; + let nruns = view.ends().len(); + if nruns > view.len() { + return Ok(None); + } + + let n_values = view.values().len(); + let mut value_sizes = vec![0u32; n_values]; + dispatch_size(view.values(), field, &mut value_sizes, ctx)?; + + let mut value_offsets = vec![0u32; n_values + 1]; + let mut total: u64 = 0; + for i in 0..n_values { + value_offsets[i] = total as u32; + total += u64::from(value_sizes[i]); + } + value_offsets[n_values] = total as u32; + let mut value_buf = vec![0u8; total as usize]; + let zero_offsets = vec![0u32; n_values]; + let mut inner_cursors = value_offsets[..n_values].to_vec(); + dispatch_encode( + view.values(), + field, + &zero_offsets, + &mut inner_cursors, + &mut value_buf, + ctx, + )?; + + let offset = view.offset() as u64; + let len = view.len(); + let ends_prim = view.ends().clone().execute::(ctx)?; + + match_each_integer_ptype!(ends_prim.ptype(), |E| { + let ends = ends_prim.as_slice::(); + walk_runs::(ends, offset, len, |run_idx, start, stop| { + let v_start = value_offsets[run_idx] as usize; + let v_size = value_sizes[run_idx] as usize; + if v_size == 0 { + return; + } + let value_bytes = &value_buf[v_start..v_start + v_size]; + let v_size_u32 = v_size as u32; + for i in start..stop { + let pos = (offsets[i] + cursors[i]) as usize; + out[pos..pos + v_size].copy_from_slice(value_bytes); + cursors[i] += v_size_u32; + } + }); + }); + Ok(Some(())) +} + +/// For each run, call `f(run_idx, start_logical, stop_logical)` where the logical range is +/// `[max(prev_end - offset, 0), min(curr_end - offset, len))`. +#[inline] +fn walk_runs(ends: &[E], offset: u64, len: usize, mut f: impl FnMut(usize, usize, usize)) +where + E: NativePType + AsPrimitive, +{ + let mut prev: u64 = offset; + for (run_idx, &end) in ends.iter().enumerate() { + let end_u64: u64 = end.as_(); + if end_u64 <= offset { + prev = end_u64; + continue; + } + let start = (prev.saturating_sub(offset)) as usize; + let stop_u64 = end_u64 - offset; + let stop = (stop_u64 as usize).min(len); + if start < stop { + f(run_idx, start, stop); + } + prev = end_u64; + if stop >= len { + break; + } + } +} + +fn run_end_array_id() -> ArrayId { + use vortex_session::registry::CachedId; + static ID: CachedId = CachedId::new("vortex.runend"); + *ID +} + +inventory::submit! { + RowEncodeRegistration { + id: run_end_array_id, + size: run_end_size_contribution, + encode: run_end_encode_into, + } +} + +#[cfg(test)] +mod tests { + use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::ListViewArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::listview::ListViewArrayExt; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_row::SortField; + use vortex_row::convert_columns; + + use crate::RunEnd; + + fn collect_rows(arr: &ListViewArray) -> Vec> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let n = arr.len(); + (0..n) + .map(|i| { + let slice = arr.list_elements_at(i).unwrap(); + let p = slice.execute::(&mut ctx).unwrap(); + p.as_slice::().to_vec() + }) + .collect() + } + + #[test] + fn runend_row_encode_matches_canonical() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let raw = buffer![1i32, 1, 1, 2, 2, 3, 3, 3, 3].into_array(); + let ree = RunEnd::encode(raw.clone(), &mut ctx)?.into_array(); + + let by_canonical = convert_columns(&[raw], &[SortField::default()], &mut ctx)?; + let by_ree = convert_columns(&[ree], &[SortField::default()], &mut ctx)?; + + assert_eq!(collect_rows(&by_canonical), collect_rows(&by_ree)); + Ok(()) + } +}