diff --git a/vortex-row/benches/row_encode.rs b/vortex-row/benches/row_encode.rs index 7ceec508282..aa4d5f2f675 100644 --- a/vortex-row/benches/row_encode.rs +++ b/vortex-row/benches/row_encode.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use arrow_array::DictionaryArray; +use arrow_array::Int32Array; use arrow_array::Int64Array; use arrow_array::PrimitiveArray as ArrowPrimitiveArray; use arrow_array::StringArray; @@ -38,10 +39,12 @@ use vortex_array::IntoArray; use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::Patched; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; use vortex_array::arrays::VarBinViewArray; use vortex_array::builders::dict::dict_encode; +use vortex_array::patches::Patches; use vortex_row::SortField; use vortex_row::convert_columns; @@ -289,3 +292,85 @@ fn dict_utf8_vortex_without_kernel(bencher: divan::Bencher) { convert_columns(&[canonical], &[SortField::default()], &mut ctx).unwrap() }) } + +// ---------- patched_i32 ---------- + +fn gen_patched_i32_inputs() -> (Vec, Vec, u64) { + let mut rng = StdRng::seed_from_u64(400); + // Inner is mostly zero, with random patches at ~5% of positions. + let mut inner = vec![0i32; N]; + let mut values = Vec::new(); + for slot in inner.iter_mut().take(N) { + if rng.random_range(0u32..100) < 5 { + let v = rng.random_range(1i32..1_000_000); + *slot = v; + values.push(v); + } + } + let bytes = (N * (1 + 4)) as u64; + (inner, values, bytes) +} + +#[divan::bench] +fn patched_i32_arrow_row(bencher: divan::Bencher) { + let (inner, _, bytes) = gen_patched_i32_inputs(); + let arr = Arc::new(Int32Array::from(inner)) as arrow_array::ArrayRef; + let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Int32)]).unwrap(); + bencher + .counter(BytesCount::new(bytes)) + .bench_local(|| conv.convert_columns(&[arr.clone()]).unwrap()) +} + +fn patched_i32_array() -> (vortex_array::ArrayRef, u64) { + let mut rng = StdRng::seed_from_u64(400); + let mut indices: Vec = Vec::new(); + let mut values: Vec = Vec::new(); + let mut inner = vec![0i32; N]; + for i in 0..N { + if rng.random_range(0u32..100) < 5 { + let v = rng.random_range(1i32..1_000_000); + inner[i] = v; + indices.push(i as u32); + values.push(v); + } + } + let inner_arr = PrimitiveArray::from_iter(vec![0i32; N]).into_array(); + let patches = Patches::new( + N, + 0, + PrimitiveArray::from_iter(indices).into_array(), + PrimitiveArray::from_iter(values).into_array(), + None, + ) + .unwrap(); + let mut setup_ctx = LEGACY_SESSION.create_execution_ctx(); + let patched = Patched::from_array_and_patches(inner_arr, &patches, &mut setup_ctx) + .unwrap() + .into_array(); + drop(inner); + let bytes = (N * (1 + 4)) as u64; + (patched, bytes) +} + +#[divan::bench] +fn patched_i32_with_kernel(bencher: divan::Bencher) { + let (arr, bytes) = patched_i32_array(); + bencher.counter(BytesCount::new(bytes)).bench_local(|| { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + convert_columns(&[arr.clone()], &[SortField::default()], &mut ctx).unwrap() + }) +} + +#[divan::bench] +fn patched_i32_without_kernel(bencher: divan::Bencher) { + let (arr, bytes) = patched_i32_array(); + bencher.counter(BytesCount::new(bytes)).bench_local(|| { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let canonical = arr + .clone() + .execute::(&mut ctx) + .unwrap() + .into_array(); + convert_columns(&[canonical], &[SortField::default()], &mut ctx).unwrap() + }) +} diff --git a/vortex-row/src/kernels/patched.rs b/vortex-row/src/kernels/patched.rs index 2637116afb4..ecd5d65d556 100644 --- a/vortex-row/src/kernels/patched.rs +++ b/vortex-row/src/kernels/patched.rs @@ -3,38 +3,259 @@ //! Row-encode kernels for `Patched`. //! -//! Stubs in this commit return `Ok(None)` so the dispatch loop falls back to -//! canonicalization. The real impls land in a follow-up commit. +//! Row size is identical to the underlying `inner` array (patches don't change dtype). For +//! row encoding, we first delegate to the inner array's row-encode path, then overlay each +//! patched row's value directly into the output, overwriting the few bytes that the inner +//! encoder wrote at that row's slot. + +#![allow( + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + reason = "row encoding indexes into u32-sized buffers; lengths are validated to fit in u32" +)] use vortex_array::ArrayView; use vortex_array::ExecutionCtx; +use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::patched::Patched; +use vortex_array::arrays::patched::PatchedArrayExt; +use vortex_array::arrays::patched::PatchedArraySlotsExt; +use vortex_array::dtype::DType; +use vortex_array::match_each_native_ptype; use vortex_error::VortexResult; +use crate::codec::RowEncode; use crate::encode::RowEncodeKernel; +use crate::encode::dispatch_encode; use crate::options::SortField; use crate::size::RowSizeKernel; +use crate::size::dispatch_size; impl RowSizeKernel for Patched { fn row_size_contribution( - _column: ArrayView<'_, Self>, - _field: SortField, - _sizes: &mut [u32], - _ctx: &mut ExecutionCtx, + column: ArrayView<'_, Self>, + field: SortField, + sizes: &mut [u32], + ctx: &mut ExecutionCtx, ) -> VortexResult> { - Ok(None) + // Per-row size matches the inner array; patches share its dtype. + dispatch_size(column.inner(), field, sizes, ctx)?; + Ok(Some(())) } } impl RowEncodeKernel for Patched { fn row_encode_into( - _column: ArrayView<'_, Self>, - _field: SortField, - _offsets: &[u32], - _cursors: &mut [u32], - _out: &mut [u8], - _ctx: &mut ExecutionCtx, + column: ArrayView<'_, Self>, + field: SortField, + offsets: &[u32], + cursors: &mut [u32], + out: &mut [u8], + ctx: &mut ExecutionCtx, ) -> VortexResult> { - Ok(None) + let DType::Primitive(ptype, _) = *column.as_ref().dtype() else { + return Ok(None); + }; + let value_bytes = ptype.byte_width(); + + // Snapshot per-row write start positions before the inner encoder advances cursors. + let pre_cursors: Vec = cursors.to_vec(); + dispatch_encode(column.inner(), field, offsets, cursors, out, ctx)?; + + overlay_patches( + column, + ptype, + value_bytes, + field, + offsets, + &pre_cursors, + out, + ctx, + )?; + Ok(Some(())) + } +} + +/// Overlay patch values onto rows whose inner-encoded bytes need to be replaced. +#[allow(clippy::too_many_arguments)] +fn overlay_patches( + column: ArrayView<'_, Patched>, + ptype: vortex_array::dtype::PType, + value_bytes: usize, + field: SortField, + offsets: &[u32], + pre_cursors: &[u32], + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let patch_indices: PrimitiveArray = column + .patch_indices() + .clone() + .execute::(ctx)?; + if patch_indices.is_empty() { + return Ok(()); + } + let patch_values: PrimitiveArray = column + .patch_values() + .clone() + .execute::(ctx)?; + let lane_offsets: PrimitiveArray = column + .lane_offsets() + .clone() + .execute::(ctx)?; + let patch_indices_slice: &[u16] = patch_indices.as_slice(); + let lane_offsets_slice: &[u32] = lane_offsets.as_slice(); + let n_lanes = column.n_lanes(); + let patched_offset = column.offset(); + let array_len = column.as_ref().len(); + let n_chunks = (array_len + patched_offset).div_ceil(1024); + let non_null = field.non_null_sentinel(); + let descending = field.descending; + + match_each_native_ptype!(ptype, |T| { + let values_slice: &[T] = patch_values.as_slice(); + overlay_chunks::( + values_slice, + patch_indices_slice, + lane_offsets_slice, + n_lanes, + patched_offset, + array_len, + n_chunks, + offsets, + pre_cursors, + out, + value_bytes, + non_null, + descending, + ); + }); + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +fn overlay_chunks( + values_slice: &[T], + patch_indices_slice: &[u16], + lane_offsets_slice: &[u32], + n_lanes: usize, + patched_offset: usize, + array_len: usize, + n_chunks: usize, + offsets: &[u32], + pre_cursors: &[u32], + out: &mut [u8], + value_bytes: usize, + non_null: u8, + descending: bool, +) { + for chunk in 0..n_chunks { + for lane in 0..n_lanes { + let slot = chunk * n_lanes + lane; + if slot + 1 >= lane_offsets_slice.len() { + break; + } + let start = lane_offsets_slice[slot] as usize; + let stop = lane_offsets_slice[slot + 1] as usize; + for k in start..stop { + let chunk_local = patch_indices_slice[k] as usize; + let logical_idx = chunk * 1024 + chunk_local; + if logical_idx < patched_offset { + continue; + } + let row = logical_idx - patched_offset; + if row >= array_len { + continue; + } + let slot_start = (offsets[row] + pre_cursors[row]) as usize; + out[slot_start] = non_null; + values_slice[k].encode_to( + &mut out[slot_start + 1..slot_start + 1 + value_bytes], + descending, + ); + } + } + } +} + +#[cfg(test)] +mod tests { + use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::ListViewArray; + use vortex_array::arrays::Patched; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::listview::ListViewArrayExt; + use vortex_array::patches::Patches; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + + use crate::SortField; + use crate::convert_columns; + + fn collect_rows(arr: &ListViewArray) -> Vec> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let n = arr.len(); + (0..n) + .map(|i| { + let slice = arr.list_elements_at(i).unwrap(); + let p = slice.execute::(&mut ctx).unwrap(); + p.as_slice::().to_vec() + }) + .collect() + } + + #[test] + fn patched_row_encode_matches_canonical() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let inner = buffer![0u32; 32].into_array(); + let patches = Patches::new( + 32, + 0, + buffer![1u32, 2, 3].into_array(), + buffer![100u32, 200, 300].into_array(), + None, + )?; + let patched = Patched::from_array_and_patches(inner, &patches, &mut ctx)?.into_array(); + + let mut canonical_vals = vec![0u32; 32]; + canonical_vals[1] = 100; + canonical_vals[2] = 200; + canonical_vals[3] = 300; + let canonical = PrimitiveArray::from_iter(canonical_vals).into_array(); + + let by_canonical = convert_columns(&[canonical], &[SortField::default()], &mut ctx)?; + let by_patched = convert_columns(&[patched], &[SortField::default()], &mut ctx)?; + assert_eq!(collect_rows(&by_canonical), collect_rows(&by_patched)); + Ok(()) + } + + #[test] + fn patched_row_encode_multi_chunk() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let n: usize = 4096; + let inner = PrimitiveArray::from_iter(vec![0u32; n]).into_array(); + let indices: Vec = (0..n as u32).step_by(503).collect(); + let values: Vec = indices.iter().map(|i| i + 1000).collect(); + let patches = Patches::new( + n, + 0, + PrimitiveArray::from_iter(indices.clone()).into_array(), + PrimitiveArray::from_iter(values.clone()).into_array(), + None, + )?; + let patched = Patched::from_array_and_patches(inner, &patches, &mut ctx)?.into_array(); + + let mut canonical_vals = vec![0u32; n]; + for (idx, &i) in indices.iter().enumerate() { + canonical_vals[i as usize] = values[idx]; + } + let canonical = PrimitiveArray::from_iter(canonical_vals).into_array(); + + let by_canonical = convert_columns(&[canonical], &[SortField::default()], &mut ctx)?; + let by_patched = convert_columns(&[patched], &[SortField::default()], &mut ctx)?; + assert_eq!(collect_rows(&by_canonical), collect_rows(&by_patched)); + Ok(()) } }