Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions vortex-row/benches/row_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::sync::Arc;

use arrow_array::DictionaryArray;
use arrow_array::Int32Array;
use arrow_array::Int64Array;
use arrow_array::PrimitiveArray as ArrowPrimitiveArray;
use arrow_array::StringArray;
Expand All @@ -38,10 +39,12 @@ use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::Patched;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::StructArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::builders::dict::dict_encode;
use vortex_array::patches::Patches;
use vortex_row::SortField;
use vortex_row::convert_columns;

Expand Down Expand Up @@ -289,3 +292,85 @@ fn dict_utf8_vortex_without_kernel(bencher: divan::Bencher) {
convert_columns(&[canonical], &[SortField::default()], &mut ctx).unwrap()
})
}

// ---------- patched_i32 ----------

fn gen_patched_i32_inputs() -> (Vec<i32>, Vec<i32>, u64) {
let mut rng = StdRng::seed_from_u64(400);
// Inner is mostly zero, with random patches at ~5% of positions.
let mut inner = vec![0i32; N];
let mut values = Vec::new();
for slot in inner.iter_mut().take(N) {
if rng.random_range(0u32..100) < 5 {
let v = rng.random_range(1i32..1_000_000);
*slot = v;
values.push(v);
}
}
let bytes = (N * (1 + 4)) as u64;
(inner, values, bytes)
}

#[divan::bench]
fn patched_i32_arrow_row(bencher: divan::Bencher) {
let (inner, _, bytes) = gen_patched_i32_inputs();
let arr = Arc::new(Int32Array::from(inner)) as arrow_array::ArrayRef;
let conv = RowConverter::new(vec![ArrowSortField::new(DataType::Int32)]).unwrap();
bencher
.counter(BytesCount::new(bytes))
.bench_local(|| conv.convert_columns(&[arr.clone()]).unwrap())
}

fn patched_i32_array() -> (vortex_array::ArrayRef, u64) {
let mut rng = StdRng::seed_from_u64(400);
let mut indices: Vec<u32> = Vec::new();
let mut values: Vec<i32> = Vec::new();
let mut inner = vec![0i32; N];
for i in 0..N {
if rng.random_range(0u32..100) < 5 {
let v = rng.random_range(1i32..1_000_000);
inner[i] = v;
indices.push(i as u32);
values.push(v);
}
}
let inner_arr = PrimitiveArray::from_iter(vec![0i32; N]).into_array();
let patches = Patches::new(
N,
0,
PrimitiveArray::from_iter(indices).into_array(),
PrimitiveArray::from_iter(values).into_array(),
None,
)
.unwrap();
let mut setup_ctx = LEGACY_SESSION.create_execution_ctx();
let patched = Patched::from_array_and_patches(inner_arr, &patches, &mut setup_ctx)
.unwrap()
.into_array();
drop(inner);
let bytes = (N * (1 + 4)) as u64;
(patched, bytes)
}

#[divan::bench]
fn patched_i32_with_kernel(bencher: divan::Bencher) {
let (arr, bytes) = patched_i32_array();
bencher.counter(BytesCount::new(bytes)).bench_local(|| {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
convert_columns(&[arr.clone()], &[SortField::default()], &mut ctx).unwrap()
})
}

#[divan::bench]
fn patched_i32_without_kernel(bencher: divan::Bencher) {
let (arr, bytes) = patched_i32_array();
bencher.counter(BytesCount::new(bytes)).bench_local(|| {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let canonical = arr
.clone()
.execute::<Canonical>(&mut ctx)
.unwrap()
.into_array();
convert_columns(&[canonical], &[SortField::default()], &mut ctx).unwrap()
})
}
249 changes: 235 additions & 14 deletions vortex-row/src/kernels/patched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,259 @@

//! Row-encode kernels for `Patched`.
//!
//! Stubs in this commit return `Ok(None)` so the dispatch loop falls back to
//! canonicalization. The real impls land in a follow-up commit.
//! Row size is identical to the underlying `inner` array (patches don't change dtype). For
//! row encoding, we first delegate to the inner array's row-encode path, then overlay each
//! patched row's value directly into the output, overwriting the few bytes that the inner
//! encoder wrote at that row's slot.

#![allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
reason = "row encoding indexes into u32-sized buffers; lengths are validated to fit in u32"
)]

use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::patched::Patched;
use vortex_array::arrays::patched::PatchedArrayExt;
use vortex_array::arrays::patched::PatchedArraySlotsExt;
use vortex_array::dtype::DType;
use vortex_array::match_each_native_ptype;
use vortex_error::VortexResult;

use crate::codec::RowEncode;
use crate::encode::RowEncodeKernel;
use crate::encode::dispatch_encode;
use crate::options::SortField;
use crate::size::RowSizeKernel;
use crate::size::dispatch_size;

impl RowSizeKernel for Patched {
fn row_size_contribution(
_column: ArrayView<'_, Self>,
_field: SortField,
_sizes: &mut [u32],
_ctx: &mut ExecutionCtx,
column: ArrayView<'_, Self>,
field: SortField,
sizes: &mut [u32],
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<()>> {
Ok(None)
// Per-row size matches the inner array; patches share its dtype.
dispatch_size(column.inner(), field, sizes, ctx)?;
Ok(Some(()))
}
}

impl RowEncodeKernel for Patched {
fn row_encode_into(
_column: ArrayView<'_, Self>,
_field: SortField,
_offsets: &[u32],
_cursors: &mut [u32],
_out: &mut [u8],
_ctx: &mut ExecutionCtx,
column: ArrayView<'_, Self>,
field: SortField,
offsets: &[u32],
cursors: &mut [u32],
out: &mut [u8],
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<()>> {
Ok(None)
let DType::Primitive(ptype, _) = *column.as_ref().dtype() else {
return Ok(None);
};
let value_bytes = ptype.byte_width();

// Snapshot per-row write start positions before the inner encoder advances cursors.
let pre_cursors: Vec<u32> = cursors.to_vec();
dispatch_encode(column.inner(), field, offsets, cursors, out, ctx)?;

overlay_patches(
column,
ptype,
value_bytes,
field,
offsets,
&pre_cursors,
out,
ctx,
)?;
Ok(Some(()))
}
}

/// Overlay patch values onto rows whose inner-encoded bytes need to be replaced.
#[allow(clippy::too_many_arguments)]
fn overlay_patches(
column: ArrayView<'_, Patched>,
ptype: vortex_array::dtype::PType,
value_bytes: usize,
field: SortField,
offsets: &[u32],
pre_cursors: &[u32],
out: &mut [u8],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let patch_indices: PrimitiveArray = column
.patch_indices()
.clone()
.execute::<PrimitiveArray>(ctx)?;
if patch_indices.is_empty() {
return Ok(());
}
let patch_values: PrimitiveArray = column
.patch_values()
.clone()
.execute::<PrimitiveArray>(ctx)?;
let lane_offsets: PrimitiveArray = column
.lane_offsets()
.clone()
.execute::<PrimitiveArray>(ctx)?;
let patch_indices_slice: &[u16] = patch_indices.as_slice();
let lane_offsets_slice: &[u32] = lane_offsets.as_slice();
let n_lanes = column.n_lanes();
let patched_offset = column.offset();
let array_len = column.as_ref().len();
let n_chunks = (array_len + patched_offset).div_ceil(1024);
let non_null = field.non_null_sentinel();
let descending = field.descending;

match_each_native_ptype!(ptype, |T| {
let values_slice: &[T] = patch_values.as_slice();
overlay_chunks::<T>(
values_slice,
patch_indices_slice,
lane_offsets_slice,
n_lanes,
patched_offset,
array_len,
n_chunks,
offsets,
pre_cursors,
out,
value_bytes,
non_null,
descending,
);
});
Ok(())
}

#[allow(clippy::too_many_arguments)]
fn overlay_chunks<T: Copy + RowEncode>(
values_slice: &[T],
patch_indices_slice: &[u16],
lane_offsets_slice: &[u32],
n_lanes: usize,
patched_offset: usize,
array_len: usize,
n_chunks: usize,
offsets: &[u32],
pre_cursors: &[u32],
out: &mut [u8],
value_bytes: usize,
non_null: u8,
descending: bool,
) {
for chunk in 0..n_chunks {
for lane in 0..n_lanes {
let slot = chunk * n_lanes + lane;
if slot + 1 >= lane_offsets_slice.len() {
break;
}
let start = lane_offsets_slice[slot] as usize;
let stop = lane_offsets_slice[slot + 1] as usize;
for k in start..stop {
let chunk_local = patch_indices_slice[k] as usize;
let logical_idx = chunk * 1024 + chunk_local;
if logical_idx < patched_offset {
continue;
}
let row = logical_idx - patched_offset;
if row >= array_len {
continue;
}
let slot_start = (offsets[row] + pre_cursors[row]) as usize;
out[slot_start] = non_null;
values_slice[k].encode_to(
&mut out[slot_start + 1..slot_start + 1 + value_bytes],
descending,
);
}
}
}
}

#[cfg(test)]
mod tests {
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::ListViewArray;
use vortex_array::arrays::Patched;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::listview::ListViewArrayExt;
use vortex_array::patches::Patches;
use vortex_buffer::buffer;
use vortex_error::VortexResult;

use crate::SortField;
use crate::convert_columns;

fn collect_rows(arr: &ListViewArray) -> Vec<Vec<u8>> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let n = arr.len();
(0..n)
.map(|i| {
let slice = arr.list_elements_at(i).unwrap();
let p = slice.execute::<PrimitiveArray>(&mut ctx).unwrap();
p.as_slice::<u8>().to_vec()
})
.collect()
}

#[test]
fn patched_row_encode_matches_canonical() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let inner = buffer![0u32; 32].into_array();
let patches = Patches::new(
32,
0,
buffer![1u32, 2, 3].into_array(),
buffer![100u32, 200, 300].into_array(),
None,
)?;
let patched = Patched::from_array_and_patches(inner, &patches, &mut ctx)?.into_array();

let mut canonical_vals = vec![0u32; 32];
canonical_vals[1] = 100;
canonical_vals[2] = 200;
canonical_vals[3] = 300;
let canonical = PrimitiveArray::from_iter(canonical_vals).into_array();

let by_canonical = convert_columns(&[canonical], &[SortField::default()], &mut ctx)?;
let by_patched = convert_columns(&[patched], &[SortField::default()], &mut ctx)?;
assert_eq!(collect_rows(&by_canonical), collect_rows(&by_patched));
Ok(())
}

#[test]
fn patched_row_encode_multi_chunk() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let n: usize = 4096;
let inner = PrimitiveArray::from_iter(vec![0u32; n]).into_array();
let indices: Vec<u32> = (0..n as u32).step_by(503).collect();
let values: Vec<u32> = indices.iter().map(|i| i + 1000).collect();
let patches = Patches::new(
n,
0,
PrimitiveArray::from_iter(indices.clone()).into_array(),
PrimitiveArray::from_iter(values.clone()).into_array(),
None,
)?;
let patched = Patched::from_array_and_patches(inner, &patches, &mut ctx)?.into_array();

let mut canonical_vals = vec![0u32; n];
for (idx, &i) in indices.iter().enumerate() {
canonical_vals[i as usize] = values[idx];
}
let canonical = PrimitiveArray::from_iter(canonical_vals).into_array();

let by_canonical = convert_columns(&[canonical], &[SortField::default()], &mut ctx)?;
let by_patched = convert_columns(&[patched], &[SortField::default()], &mut ctx)?;
assert_eq!(collect_rows(&by_canonical), collect_rows(&by_patched));
Ok(())
}
}
Loading