From f46aefd135e6c08e9b95b92542691a0a95426521 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 17 May 2026 22:35:06 +0000 Subject: [PATCH] BitPacked row-encode kernel (vortex-fastlanes) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a row-encode kernel for BitPacked arrays. The kernel walks the packed storage in 1024-element fastlanes chunks via `BitUnpackedChunks::full_chunks`, unpacks each chunk into a stack-local buffer, and writes the row-encoded bytes for that chunk in one pass. Patches (when present) are applied per-chunk to the stack buffer so a patched cell encodes its corrected value rather than the bit-packed placeholder. The shared `row_encode_common` module factors out the per-chunk encode primitive (`encode_primitive_chunk`) and a small `PrimRowEncode` trait — the same shape FoR and Delta will use in the next commit so those kernels can share the chunk-walk machinery. Kernel is registered via the `inventory`-based registry, since `vortex-fastlanes` depends on `vortex-array`. Includes a `bitpacked_i32_*` bench triplet (arrow-row baseline, vortex with kernel, vortex through canonicalization). Signed-off-by: Claude --- Cargo.lock | 3 + encodings/fastlanes/Cargo.toml | 2 + .../fastlanes/src/bitpacking/compute/mod.rs | 1 + .../src/bitpacking/compute/row_encode.rs | 403 ++++++++++++++++++ encodings/fastlanes/src/lib.rs | 1 + encodings/fastlanes/src/row_encode_common.rs | 126 ++++++ vortex-row/Cargo.toml | 1 + vortex-row/benches/row_encode.rs | 55 +++ 8 files changed, 592 insertions(+) create mode 100644 encodings/fastlanes/src/bitpacking/compute/row_encode.rs create mode 100644 encodings/fastlanes/src/row_encode_common.rs diff --git a/Cargo.lock b/Cargo.lock index 5a7a18061ad..949a9594359 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10687,6 +10687,7 @@ version = "0.1.0" dependencies = [ "codspeed-divan-compat", "fastlanes", + "inventory", "itertools 0.14.0", "lending-iterator", "num-traits", @@ -10699,6 +10700,7 @@ dependencies = [ "vortex-error", "vortex-fastlanes", "vortex-mask", + "vortex-row", "vortex-session", ] @@ -11051,6 +11053,7 @@ dependencies = [ "vortex-array", "vortex-buffer", "vortex-error", + "vortex-fastlanes", "vortex-mask", "vortex-session", "vortex-utils", diff --git a/encodings/fastlanes/Cargo.toml b/encodings/fastlanes/Cargo.toml index a14e19389bc..3a255310ac5 100644 --- a/encodings/fastlanes/Cargo.toml +++ b/encodings/fastlanes/Cargo.toml @@ -18,6 +18,7 @@ workspace = true [dependencies] fastlanes = { workspace = true } +inventory = { workspace = true } itertools = { workspace = true } lending-iterator = { workspace = true } num-traits = { workspace = true } @@ -27,6 +28,7 @@ vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-error = { workspace = true } vortex-mask = { workspace = true } +vortex-row = { workspace = true } vortex-session = { workspace = true } [dev-dependencies] diff --git a/encodings/fastlanes/src/bitpacking/compute/mod.rs b/encodings/fastlanes/src/bitpacking/compute/mod.rs index 2501d952356..a260d0e0ffd 100644 --- a/encodings/fastlanes/src/bitpacking/compute/mod.rs +++ b/encodings/fastlanes/src/bitpacking/compute/mod.rs @@ -4,6 +4,7 @@ mod cast; mod filter; pub(crate) mod is_constant; +pub(crate) mod row_encode; mod slice; mod take; diff --git a/encodings/fastlanes/src/bitpacking/compute/row_encode.rs b/encodings/fastlanes/src/bitpacking/compute/row_encode.rs new file mode 100644 index 00000000000..412993dd166 --- /dev/null +++ b/encodings/fastlanes/src/bitpacking/compute/row_encode.rs @@ -0,0 +1,403 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Row-encode kernel for `BitPackedArray`. +//! +//! Walks the bit-packed storage in 1024-element chunks, unpacks each chunk into a +//! stack-local buffer, and writes the row-encoded bytes in one pass. Avoids +//! materializing a canonical `PrimitiveArray` first. + +#![allow( + clippy::cast_possible_truncation, + clippy::cast_possible_wrap, + clippy::cast_sign_loss, + reason = "row encoding indexes into u32-sized buffers and bit-packed widths are small" +)] +#![allow( + unused_imports, + reason = "Item is consumed by the #[gat(Item)] macro expansion" +)] + +use lending_iterator::gat; +#[allow(unused_imports)] +use lending_iterator::prelude::Item; +#[gat(Item)] +use lending_iterator::prelude::LendingIterator; +use vortex_array::ArrayId; +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::dtype::NativePType; +use vortex_array::dtype::PType; +use vortex_array::match_each_integer_ptype; +use vortex_array::validity::Validity; +use vortex_error::VortexResult; +use vortex_row::RowEncodeRegistration; +use vortex_row::options::SortField; + +use crate::BitPacked; +use crate::BitPackedArrayExt; +use crate::row_encode_common::PrimRowEncode; +use crate::row_encode_common::encode_primitive_chunk; +use crate::row_encode_common::encoded_size_for_ptype; +use crate::unpack_iter::BitPacked as BitPackedUnpack; + +/// Per-row size contribution for a `BitPacked` column. +fn bitpacked_size_contribution( + column: &ArrayRef, + _field: SortField, + sizes: &mut [u32], + _ctx: &mut ExecutionCtx, +) -> VortexResult> { + let Some(view) = column.as_opt::() else { + return Ok(None); + }; + let add = encoded_size_for_ptype(view.dtype().as_ptype()); + for s in sizes.iter_mut().take(view.as_ref().len()) { + *s += add; + } + Ok(Some(())) +} + +/// Per-row byte encoding for a `BitPacked` column. +fn bitpacked_encode_into( + column: &ArrayRef, + field: SortField, + offsets: &[u32], + cursors: &mut [u32], + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let Some(view) = column.as_opt::() else { + return Ok(None); + }; + let ptype = view.dtype().as_ptype(); + if !matches!( + ptype, + PType::I8 + | PType::I16 + | PType::I32 + | PType::I64 + | PType::U8 + | PType::U16 + | PType::U32 + | PType::U64 + ) { + return Ok(None); + } + // Materialize validity once and fast-path the common all-valid case. + // Use the explicit Ext method which returns a `Validity` (the inherent `validity()` on + // `ArrayView` returns `VortexResult`). + let validity = BitPackedArrayExt::validity(&view); + let mask = match &validity { + Validity::NonNullable | Validity::AllValid => None, + _ => Some(validity.execute_mask(view.as_ref().len(), ctx)?), + }; + + // Materialize patches (rare; if patches are present we materialize the patch + // index/value slices once outside the hot loop). + let patches = view.patches(); + let patch_pairs = if let Some(p) = patches { + let indices = p.indices().clone().execute::(ctx)?; + let values = p.values().clone().execute::(ctx)?; + Some((indices, values, p.offset())) + } else { + None + }; + + match_each_integer_ptype!(ptype, |T| { + encode_bitpacked_typed::( + view, + field, + offsets, + cursors, + out, + mask.as_ref(), + patch_pairs.as_ref(), + )?; + }); + Ok(Some(())) +} + +#[allow(clippy::too_many_arguments)] +fn encode_bitpacked_typed( + arr_view: vortex_array::ArrayView<'_, BitPacked>, + field: SortField, + offsets: &[u32], + cursors: &mut [u32], + out: &mut [u8], + mask: Option<&vortex_mask::Mask>, + patch_pairs: Option<&(PrimitiveArray, PrimitiveArray, usize)>, +) -> VortexResult<()> +where + T: BitPackedUnpack + NativePType + PrimRowEncode, +{ + let total_len = arr_view.as_ref().len(); + let descending = field.descending; + let non_null = field.non_null_sentinel(); + let null = field.null_sentinel(); + let value_bytes = size_of::(); + let stride = (1 + value_bytes) as u32; + + let mut local_idx: usize = 0; + let mut unpacked = arr_view.unpacked_chunks::()?; + + // Walk the array: initial sliced chunk, full middle chunks, trailing sliced chunk. + if let Some(initial) = unpacked.initial() { + let len_chunk = initial.len(); + // Apply patches that fall in this chunk (logical rows local_idx..local_idx+len_chunk). + apply_patches_in_range::(initial, patch_pairs, local_idx, local_idx + len_chunk); + write_chunk_rows::( + initial, + local_idx, + offsets, + cursors, + out, + mask, + non_null, + null, + descending, + value_bytes, + stride, + ); + local_idx += len_chunk; + } + + let mut chunks_iter = unpacked.full_chunks(); + while let Some(chunk) = chunks_iter.next() { + // Determine logical length: full chunk is 1024. + let len_chunk = 1024.min(total_len - local_idx); + // Apply patches that fall in this chunk. + apply_patches_in_range::( + &mut chunk[..len_chunk], + patch_pairs, + local_idx, + local_idx + len_chunk, + ); + write_chunk_rows::( + &chunk[..len_chunk], + local_idx, + offsets, + cursors, + out, + mask, + non_null, + null, + descending, + value_bytes, + stride, + ); + local_idx += len_chunk; + } + + if let Some(trailer) = unpacked.trailer() { + let len_chunk = trailer.len(); + apply_patches_in_range::(trailer, patch_pairs, local_idx, local_idx + len_chunk); + write_chunk_rows::( + trailer, + local_idx, + offsets, + cursors, + out, + mask, + non_null, + null, + descending, + value_bytes, + stride, + ); + local_idx += len_chunk; + } + + debug_assert_eq!(local_idx, total_len); + Ok(()) +} + +/// Overwrite values in `chunk` (which covers logical rows `[chunk_start, chunk_end)`) with +/// any patch values that fall in that range. +fn apply_patches_in_range( + chunk: &mut [T], + patch_pairs: Option<&(PrimitiveArray, PrimitiveArray, usize)>, + chunk_start: usize, + chunk_end: usize, +) { + let Some((indices_p, values_p, patch_offset)) = patch_pairs else { + return; + }; + let values: &[T] = values_p.as_slice(); + // Indices may be u32 or u64. We search for the first index >= chunk_start + patch_offset. + // For simplicity, scan linearly per chunk; patches are rare. + let logical_start = chunk_start + *patch_offset; + let logical_end = chunk_end + *patch_offset; + let indices_ptype = indices_p.ptype(); + match indices_ptype { + PType::U32 => { + let idx: &[u32] = indices_p.as_slice(); + for (i, &raw_idx) in idx.iter().enumerate() { + let raw_idx = raw_idx as usize; + if raw_idx < logical_start { + continue; + } + if raw_idx >= logical_end { + break; + } + let local = raw_idx - logical_start; + chunk[local] = values[i]; + } + } + PType::U64 => { + let idx: &[u64] = indices_p.as_slice(); + for (i, &raw_idx) in idx.iter().enumerate() { + let raw_idx = raw_idx as usize; + if raw_idx < logical_start { + continue; + } + if raw_idx >= logical_end { + break; + } + let local = raw_idx - logical_start; + chunk[local] = values[i]; + } + } + PType::U16 => { + let idx: &[u16] = indices_p.as_slice(); + for (i, &raw_idx) in idx.iter().enumerate() { + let raw_idx = raw_idx as usize; + if raw_idx < logical_start { + continue; + } + if raw_idx >= logical_end { + break; + } + let local = raw_idx - logical_start; + chunk[local] = values[i]; + } + } + PType::U8 => { + let idx: &[u8] = indices_p.as_slice(); + for (i, &raw_idx) in idx.iter().enumerate() { + let raw_idx = raw_idx as usize; + if raw_idx < logical_start { + continue; + } + if raw_idx >= logical_end { + break; + } + let local = raw_idx - logical_start; + chunk[local] = values[i]; + } + } + _ => {} + } +} + +#[allow(clippy::too_many_arguments)] +fn write_chunk_rows( + chunk: &[T], + row_start: usize, + offsets: &[u32], + cursors: &mut [u32], + out: &mut [u8], + mask: Option<&vortex_mask::Mask>, + non_null: u8, + null: u8, + descending: bool, + value_bytes: usize, + stride: u32, +) { + encode_primitive_chunk::( + chunk, + row_start, + offsets, + cursors, + out, + mask, + non_null, + null, + descending, + value_bytes, + stride, + ); +} + +fn bitpacked_array_id() -> ArrayId { + use vortex_session::registry::CachedId; + static ID: CachedId = CachedId::new("fastlanes.bitpacked"); + *ID +} + +inventory::submit! { + RowEncodeRegistration { + id: bitpacked_array_id, + size: bitpacked_size_contribution, + encode: bitpacked_encode_into, + } +} + +#[cfg(test)] +mod tests { + use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::ListViewArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::listview::ListViewArrayExt; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_row::SortField; + use vortex_row::convert_columns; + + use crate::BitPackedArrayExt; + use crate::BitPackedData; + + fn collect_rows(arr: &ListViewArray) -> Vec> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let n = arr.len(); + (0..n) + .map(|i| { + let slice = arr.list_elements_at(i).unwrap(); + let p = slice.execute::(&mut ctx).unwrap(); + p.as_slice::().to_vec() + }) + .collect() + } + + #[test] + fn bitpacked_row_encode_matches_canonical() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let raw = buffer![1u32, 2, 3, 4, 5, 6, 7, 8, 9].into_array(); + let bp = BitPackedData::encode(&raw, 4, &mut ctx)?.into_array(); + + let by_canonical = convert_columns(&[raw], &[SortField::default()], &mut ctx)?; + let by_bp = convert_columns(&[bp], &[SortField::default()], &mut ctx)?; + assert_eq!(collect_rows(&by_canonical), collect_rows(&by_bp)); + Ok(()) + } + + #[test] + fn bitpacked_row_encode_with_patches() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values: Vec = (0..200) + .map(|i| if i % 30 == 0 { 5000 + i } else { i % 16 }) + .collect(); + let raw = PrimitiveArray::from_iter(values).into_array(); + let bp = BitPackedData::encode(&raw, 4, &mut ctx)?.into_array(); + assert!(bp.as_opt::().unwrap().patches().is_some()); + let by_canonical = convert_columns(&[raw], &[SortField::default()], &mut ctx)?; + let by_bp = convert_columns(&[bp], &[SortField::default()], &mut ctx)?; + assert_eq!(collect_rows(&by_canonical), collect_rows(&by_bp)); + Ok(()) + } + + #[test] + fn bitpacked_row_encode_multi_chunk() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values: Vec = (0..3000).map(|i| i % 64).collect(); + let raw = PrimitiveArray::from_iter(values).into_array(); + let bp = BitPackedData::encode(&raw, 6, &mut ctx)?.into_array(); + let by_canonical = convert_columns(&[raw], &[SortField::default()], &mut ctx)?; + let by_bp = convert_columns(&[bp], &[SortField::default()], &mut ctx)?; + assert_eq!(collect_rows(&by_canonical), collect_rows(&by_bp)); + Ok(()) + } +} diff --git a/encodings/fastlanes/src/lib.rs b/encodings/fastlanes/src/lib.rs index 9022b7c4e2b..613a4784356 100644 --- a/encodings/fastlanes/src/lib.rs +++ b/encodings/fastlanes/src/lib.rs @@ -20,6 +20,7 @@ mod bitpacking; mod delta; mod r#for; mod rle; +mod row_encode_common; pub(crate) const FL_CHUNK_SIZE: usize = 1024; diff --git a/encodings/fastlanes/src/row_encode_common.rs b/encodings/fastlanes/src/row_encode_common.rs new file mode 100644 index 00000000000..7f44ebc7463 --- /dev/null +++ b/encodings/fastlanes/src/row_encode_common.rs @@ -0,0 +1,126 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Shared helpers for the FastLanes row-encode kernels (BitPacked, FoR, Delta). +//! +//! Each kernel walks the compressed storage in 1024-element chunks, unpacks each chunk into +//! a stack-local buffer, and writes the row-encoded bytes in one pass. This module defines +//! the per-row write primitive used after a chunk has been unpacked. + +#![allow( + clippy::cast_possible_truncation, + clippy::cast_possible_wrap, + reason = "row encoding indexes into u32-sized buffers" +)] + +use vortex_array::dtype::NativePType; +use vortex_array::dtype::PType; + +/// Trait implemented by primitive types that can be written into a row-encoded byte slot. +/// +/// Mirrors `vortex_row::codec::RowEncode` for the integer types that show up as the output +/// of BitPacked/FoR/Delta. +pub trait PrimRowEncode: Copy { + /// Encode this value into `out`, inverting the bytes for descending order. + fn row_encode_to(self, out: &mut [u8], descending: bool); +} + +macro_rules! impl_unsigned { + ($t:ty) => { + impl PrimRowEncode for $t { + #[inline] + fn row_encode_to(self, out: &mut [u8], descending: bool) { + let bytes = self.to_be_bytes(); + if descending { + for (i, b) in bytes.iter().enumerate() { + out[i] = b ^ 0xFF; + } + } else { + out.copy_from_slice(&bytes); + } + } + } + }; +} + +macro_rules! impl_signed { + ($t:ty) => { + impl PrimRowEncode for $t { + #[inline] + fn row_encode_to(self, out: &mut [u8], descending: bool) { + let mut bytes = self.to_be_bytes(); + bytes[0] ^= 0x80; + if descending { + for (i, b) in bytes.iter().enumerate() { + out[i] = b ^ 0xFF; + } + } else { + out.copy_from_slice(&bytes); + } + } + } + }; +} + +impl_unsigned!(u8); +impl_unsigned!(u16); +impl_unsigned!(u32); +impl_unsigned!(u64); +impl_signed!(i8); +impl_signed!(i16); +impl_signed!(i32); +impl_signed!(i64); + +/// Encoded row width (sentinel + value bytes) for the given primitive type. +#[inline] +pub fn encoded_size_for_ptype(ptype: PType) -> u32 { + 1 + (ptype.byte_width() as u32) +} + +/// Write a contiguous slice of unpacked values (one chunk) into the row-encoded output buffer. +/// +/// `chunk[j]` is the value for logical row `row_start + j`. The output position for row `i` +/// is `offsets[i] + cursors[i]`; the cursor is advanced by `stride` after each row write. +#[allow(clippy::too_many_arguments)] +#[inline] +pub fn encode_primitive_chunk( + chunk: &[T], + row_start: usize, + offsets: &[u32], + cursors: &mut [u32], + out: &mut [u8], + mask: Option<&vortex_mask::Mask>, + non_null: u8, + null: u8, + descending: bool, + value_bytes: usize, + stride: u32, +) { + match mask { + None => { + for (j, &v) in chunk.iter().enumerate() { + let row = row_start + j; + let pos = (offsets[row] + cursors[row]) as usize; + out[pos] = non_null; + v.row_encode_to(&mut out[pos + 1..pos + 1 + value_bytes], descending); + cursors[row] += stride; + } + } + Some(m) => { + for (j, &v) in chunk.iter().enumerate() { + let row = row_start + j; + let pos = (offsets[row] + cursors[row]) as usize; + if m.value(row) { + out[pos] = non_null; + v.row_encode_to(&mut out[pos + 1..pos + 1 + value_bytes], descending); + } else { + out[pos] = null; + for b in &mut out[pos + 1..pos + 1 + value_bytes] { + *b = 0; + } + } + cursors[row] += stride; + } + } + } +} diff --git a/vortex-row/Cargo.toml b/vortex-row/Cargo.toml index 947b1df47da..6ccfa258bc9 100644 --- a/vortex-row/Cargo.toml +++ b/vortex-row/Cargo.toml @@ -36,6 +36,7 @@ mimalloc = { workspace = true } rand = { workspace = true } rstest = { workspace = true } vortex-array = { workspace = true, features = ["_test-harness"] } +vortex-fastlanes = { workspace = true } [[bench]] name = "row_encode" diff --git a/vortex-row/benches/row_encode.rs b/vortex-row/benches/row_encode.rs index aa4d5f2f675..769404c82c3 100644 --- a/vortex-row/benches/row_encode.rs +++ b/vortex-row/benches/row_encode.rs @@ -45,6 +45,7 @@ use vortex_array::arrays::StructArray; use vortex_array::arrays::VarBinViewArray; use vortex_array::builders::dict::dict_encode; use vortex_array::patches::Patches; +use vortex_fastlanes::BitPackedData; use vortex_row::SortField; use vortex_row::convert_columns; @@ -374,3 +375,57 @@ fn patched_i32_without_kernel(bencher: divan::Bencher) { convert_columns(&[canonical], &[SortField::default()], &mut ctx).unwrap() }) } + +// ---------- bitpacked_i32 ---------- + +fn gen_bitpacked_i32_values(n: usize, seed: u64) -> Vec { + // Small positive integers in the 0..255 range so they bit-pack to 8 bits without patches. + let mut rng = StdRng::seed_from_u64(seed); + (0..n).map(|_| rng.random_range(0i32..256)).collect() +} + +#[divan::bench] +fn bitpacked_i32_arrow_row(bencher: divan::Bencher) { + let v = gen_bitpacked_i32_values(N, 100); + let arr = Arc::new(Int32Array::from(v.clone())) as arrow_array::ArrayRef; + let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Int32)]).unwrap(); + let bytes = (N * (1 + 4)) as u64; + bencher + .counter(BytesCount::new(bytes)) + .bench_local(|| conv.convert_columns(&[arr.clone()]).unwrap()) +} + +#[divan::bench] +fn bitpacked_i32_with_kernel(bencher: divan::Bencher) { + let v = gen_bitpacked_i32_values(N, 100); + let raw = PrimitiveArray::from_iter(v.clone()).into_array(); + let mut setup_ctx = LEGACY_SESSION.create_execution_ctx(); + let bp = BitPackedData::encode(&raw, 8, &mut setup_ctx) + .unwrap() + .into_array(); + let bytes = (N * (1 + 4)) as u64; + bencher.counter(BytesCount::new(bytes)).bench_local(|| { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + convert_columns(&[bp.clone()], &[SortField::default()], &mut ctx).unwrap() + }) +} + +#[divan::bench] +fn bitpacked_i32_without_kernel(bencher: divan::Bencher) { + let v = gen_bitpacked_i32_values(N, 100); + let raw = PrimitiveArray::from_iter(v.clone()).into_array(); + let mut setup_ctx = LEGACY_SESSION.create_execution_ctx(); + let bp = BitPackedData::encode(&raw, 8, &mut setup_ctx) + .unwrap() + .into_array(); + let bytes = (N * (1 + 4)) as u64; + bencher.counter(BytesCount::new(bytes)).bench_local(|| { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let canonical = bp + .clone() + .execute::(&mut ctx) + .unwrap() + .into_array(); + convert_columns(&[canonical], &[SortField::default()], &mut ctx).unwrap() + }) +}