diff --git a/vortex-row/src/codec.rs b/vortex-row/src/codec.rs index 7c89c81e2e4..4138a0d55b8 100644 --- a/vortex-row/src/codec.rs +++ b/vortex-row/src/codec.rs @@ -210,6 +210,66 @@ pub fn field_size( Ok(()) } +/// Encode each row's bytes for the given canonical view at arithmetic offsets, without +/// reading or writing any per-row cursor. +/// +/// For row `i`, the column's bytes are written at `out[i * row_stride + var_prefix[i] + +/// col_prefix ..]` where `var_prefix` is the exclusive prefix sum of varlen contributions +/// (`None` when there are no varlen columns in the row layout). This is the fast path used +/// for fixed-width columns that appear before any varlen column in the row. +/// +/// This path requires that `canonical` is a fixed-width type; the per-column slot has +/// exactly `width` bytes per row (sentinel + value). +/// +/// # Errors +/// +/// Returns an error if `canonical` is not a supported fixed-width canonical variant. +#[allow(clippy::too_many_arguments)] +pub fn field_encode_fixed_arithmetic( + canonical: &Canonical, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + width: u32, + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + match canonical { + Canonical::Null(arr) => { + encode_null_arith(arr, field, col_prefix, row_stride, var_prefix, out) + } + Canonical::Bool(arr) => { + encode_bool_arith(arr, field, col_prefix, row_stride, var_prefix, out, ctx)? + } + Canonical::Primitive(arr) => encode_primitive_arith( + arr, field, col_prefix, row_stride, var_prefix, width, out, ctx, + )?, + Canonical::Decimal(arr) => encode_decimal_arith( + arr, field, col_prefix, row_stride, var_prefix, width, out, ctx, + )?, + Canonical::Struct(arr) => encode_struct_arith( + arr, field, col_prefix, row_stride, var_prefix, width, out, ctx, + )?, + Canonical::FixedSizeList(arr) => encode_fsl_arith( + arr, field, col_prefix, row_stride, var_prefix, width, out, ctx, + )?, + Canonical::Extension(arr) => { + let storage = arr.storage_array().clone().execute::(ctx)?; + field_encode_fixed_arithmetic( + &storage, field, col_prefix, row_stride, var_prefix, width, out, ctx, + )?; + } + Canonical::VarBinView(_) | Canonical::List(_) | Canonical::Variant(_) => { + vortex_bail!( + "field_encode_fixed_arithmetic called on non-fixed canonical type {:?}", + canonical.dtype() + ) + } + } + Ok(()) +} + /// Encode each row's bytes for the given canonical view into `out`, writing starting at /// `offsets[i] + cursors[i]` for row `i` and advancing `cursors[i]` by the number of /// bytes written. @@ -1138,3 +1198,338 @@ pub fn encode_scalar( } Ok(out.freeze().into_inner()) } + +/// Per-row write start offset for the fixed-arithmetic encode path. +#[inline] +fn arith_pos(i: usize, col_prefix: u32, row_stride: u32, var_prefix: Option<&[u32]>) -> usize { + let base = (i as u32) * row_stride + col_prefix; + let pos = match var_prefix { + Some(vp) => base + vp[i], + None => base, + }; + pos as usize +} + +fn encode_null_arith( + arr: &NullArray, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + out: &mut [u8], +) { + let sentinel = field.null_sentinel(); + let n = arr.len(); + for i in 0..n { + let pos = arith_pos(i, col_prefix, row_stride, var_prefix); + out[pos] = sentinel; + } +} + +fn encode_bool_arith( + arr: &BoolArray, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let mask = arr.as_ref().validity()?.execute_mask(arr.len(), ctx)?; + let bits = arr.clone().into_bit_buffer(); + let non_null = field.non_null_sentinel(); + let null = field.null_sentinel(); + let xor = if field.descending { 0xFF } else { 0x00 }; + for i in 0..bits.len() { + let pos = arith_pos(i, col_prefix, row_stride, var_prefix); + if mask.value(i) { + out[pos] = non_null; + let raw = if bits.value(i) { 0x02u8 } else { 0x01u8 }; + out[pos + 1] = raw ^ xor; + } else { + out[pos] = null; + out[pos + 1] = 0; + } + } + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +fn encode_primitive_arith( + arr: &PrimitiveArray, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + _width: u32, + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + match_each_native_ptype!(arr.ptype(), |T| { + encode_primitive_arith_typed::( + arr, field, col_prefix, row_stride, var_prefix, out, ctx, + )?; + }); + Ok(()) +} + +#[inline] +fn encode_primitive_arith_typed( + arr: &PrimitiveArray, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let mask = arr.as_ref().validity()?.execute_mask(arr.len(), ctx)?; + let slice: &[T] = arr.as_slice(); + let non_null = field.non_null_sentinel(); + let null = field.null_sentinel(); + let value_bytes = size_of::(); + let stride = row_stride as usize; + let prefix = col_prefix as usize; + let descending = field.descending; + // `slot_size` = sentinel + value bytes. This is a compile-time constant per T. + let slot_size = 1 + value_bytes; + + // Hot path: contiguous all-valid, no varlen prefix. The chunked write isolates each + // row's slot into a fixed-length `&mut [u8; slot_size]` view, which lets the compiler + // fold the bounds check on the inner write — matching `arrow-row::encode_not_null`. + if var_prefix.is_none() && mask.all_true() { + let chunks = out.chunks_exact_mut(stride); + for (chunk, &v) in chunks.zip(slice.iter()) { + // SAFETY: `prefix + slot_size <= stride` always holds (the row width sums to + // the stride; classifier and Phase 1 maintain that invariant). The first byte + // is the sentinel; the next `value_bytes` are the encoded value. + let slot_ptr = unsafe { chunk.as_mut_ptr().add(prefix) }; + // SAFETY: `slot_ptr..slot_ptr+slot_size` lies within `chunk`, which is of + // length `stride`. Writing one byte for the sentinel is safe. + unsafe { slot_ptr.write(non_null) }; + // SAFETY: the encoded value's `value_bytes` lie in `slot_ptr+1..slot_ptr+1+ + // value_bytes`, all within the chunk. + let val_slice = + unsafe { std::slice::from_raw_parts_mut(slot_ptr.add(1), value_bytes) }; + v.encode_to(val_slice, descending); + } + return Ok(()); + } + if var_prefix.is_none() { + let chunks = out.chunks_exact_mut(stride); + for (i, (chunk, &v)) in chunks.zip(slice.iter()).enumerate() { + // SAFETY: classifier guarantees `prefix + slot_size <= stride`. + let slot: &mut [u8] = + unsafe { chunk.get_unchecked_mut(prefix..prefix + slot_size) }; + if mask.value(i) { + slot[0] = non_null; + v.encode_to(&mut slot[1..], descending); + } else { + slot[0] = null; + for b in &mut slot[1..] { + *b = 0; + } + } + } + return Ok(()); + } + // Mixed (fixed-before-varlen with varlen prefix array). + let vp = var_prefix.unwrap_or_else(|| unreachable!()); + for (i, &v) in slice.iter().enumerate() { + let pos = ((i as u32) * row_stride + col_prefix + vp[i]) as usize; + if mask.value(i) { + out[pos] = non_null; + v.encode_to(&mut out[pos + 1..pos + 1 + value_bytes], descending); + } else { + out[pos] = null; + for b in &mut out[pos + 1..pos + 1 + value_bytes] { + *b = 0; + } + } + } + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +fn encode_decimal_arith( + arr: &DecimalArray, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + _width: u32, + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let mask = arr.as_ref().validity()?.execute_mask(arr.len(), ctx)?; + match arr.values_type() { + DecimalType::I8 => encode_decimal_arith_typed::( + arr, &mask, field, col_prefix, row_stride, var_prefix, out, + ), + DecimalType::I16 => encode_decimal_arith_typed::( + arr, &mask, field, col_prefix, row_stride, var_prefix, out, + ), + DecimalType::I32 => encode_decimal_arith_typed::( + arr, &mask, field, col_prefix, row_stride, var_prefix, out, + ), + DecimalType::I64 => encode_decimal_arith_typed::( + arr, &mask, field, col_prefix, row_stride, var_prefix, out, + ), + DecimalType::I128 => encode_decimal_arith_typed::( + arr, &mask, field, col_prefix, row_stride, var_prefix, out, + ), + DecimalType::I256 => { + vortex_bail!("row encoding for Decimal256 is not yet implemented") + } + } + Ok(()) +} + +fn encode_decimal_arith_typed( + arr: &DecimalArray, + mask: &vortex_mask::Mask, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + out: &mut [u8], +) where + T: vortex_array::dtype::NativeDecimalType + RowEncode, +{ + let non_null = field.non_null_sentinel(); + let null = field.null_sentinel(); + let value_bytes = size_of::(); + let slice = arr.buffer::(); + for i in 0..slice.len() { + let pos = arith_pos(i, col_prefix, row_stride, var_prefix); + if mask.value(i) { + out[pos] = non_null; + slice[i].encode_to(&mut out[pos + 1..pos + 1 + value_bytes], field.descending); + } else { + out[pos] = null; + for b in &mut out[pos + 1..pos + 1 + value_bytes] { + *b = 0; + } + } + } +} + +#[allow(clippy::too_many_arguments)] +fn encode_struct_arith( + arr: &StructArray, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + width: u32, + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + // Struct is only "fixed" when all fields are fixed; classifier guarantees that here. + let n = arr.len(); + let mask = arr.as_ref().validity()?.execute_mask(n, ctx)?; + let non_null = field.non_null_sentinel(); + let null = field.null_sentinel(); + + // Write outer sentinel. + for i in 0..n { + let pos = arith_pos(i, col_prefix, row_stride, var_prefix); + out[pos] = if mask.value(i) { non_null } else { null }; + } + + // Recursively encode each child at child_prefix = col_prefix + 1 + sum-of-prior-child-widths. + let mut child_prefix = col_prefix.saturating_add(1); + for child in arr.iter_unmasked_fields() { + let cw = match row_width_for_dtype(child.dtype())? { + RowWidth::Fixed(w) => w, + RowWidth::Variable => unreachable!("encode_struct_arith called on non-fixed struct"), + }; + let canonical = child.clone().execute::(ctx)?; + field_encode_fixed_arithmetic( + &canonical, + field, + child_prefix, + row_stride, + var_prefix, + cw, + out, + ctx, + )?; + child_prefix = child_prefix.saturating_add(cw); + } + + // Zero-fill body bytes of null rows. + let body_len = width.saturating_sub(1) as usize; + if body_len > 0 { + for i in 0..n { + if !mask.value(i) { + let pos = arith_pos(i, col_prefix, row_stride, var_prefix); + for b in &mut out[pos + 1..pos + 1 + body_len] { + *b = 0; + } + } + } + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +fn encode_fsl_arith( + arr: &FixedSizeListArray, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + width: u32, + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + // FSL is only "fixed" when its element type is fixed; classifier guarantees that here. + let n = arr.len(); + let list_size = arr.list_size() as usize; + let mask = arr.as_ref().validity()?.execute_mask(n, ctx)?; + let non_null = field.non_null_sentinel(); + let null = field.null_sentinel(); + let elements = arr.elements().clone().execute::(ctx)?; + debug_assert_eq!(elements.len(), n * list_size); + + let elem_width = match row_width_for_dtype(elements.dtype())? { + RowWidth::Fixed(w) => w, + RowWidth::Variable => unreachable!("encode_fsl_arith called on non-fixed FSL"), + }; + + // Write outer sentinel for each row. + for i in 0..n { + let pos = arith_pos(i, col_prefix, row_stride, var_prefix); + out[pos] = if mask.value(i) { non_null } else { null }; + } + + // Build temporary row_offsets/cursors arrays where each *element* slot has its own + // offset. Since FSL is fixed-arith, the per-element start position is deterministic. + // Fall back to the cursor-based path for elements: it's a constant computation but + // simpler than threading arithmetic offsets through recursion. + let mut elem_offsets = vec![0u32; n * list_size]; + for i in 0..n { + let row_start = arith_pos(i, col_prefix, row_stride, var_prefix) as u32 + 1; + for j in 0..list_size { + elem_offsets[i * list_size + j] = row_start + (j as u32) * elem_width; + } + } + let mut elem_cursors = vec![0u32; n * list_size]; + field_encode(&elements, field, &elem_offsets, &mut elem_cursors, out, ctx)?; + + // Zero-fill null bodies. + let body_len = width.saturating_sub(1) as usize; + if body_len > 0 { + for i in 0..n { + if !mask.value(i) { + let pos = arith_pos(i, col_prefix, row_stride, var_prefix); + for b in &mut out[pos + 1..pos + 1 + body_len] { + *b = 0; + } + } + } + } + Ok(()) +} diff --git a/vortex-row/src/encode.rs b/vortex-row/src/encode.rs index 330d6437d04..8c930406643 100644 --- a/vortex-row/src/encode.rs +++ b/vortex-row/src/encode.rs @@ -22,6 +22,7 @@ use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::VTable; use vortex_array::arrays::ListViewArray; +use vortex_array::arrays::Primitive; use vortex_array::arrays::PrimitiveArray; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; @@ -44,6 +45,7 @@ use crate::options::RowEncodeOptions; use crate::options::SortField; use crate::options::deserialize_row_encode_options; use crate::options::serialize_row_encode_options; +use crate::size::ColKind; use crate::size::compute_sizes; /// Variadic scalar function that encodes N input columns into a single `List` @@ -115,8 +117,8 @@ fn execute_row_encode( let crate::size::SizePassResult { fixed_per_row, var_lengths, - col_kinds: _, - first_varlen_idx: _, + col_kinds, + first_varlen_idx, columns, } = compute_sizes(options, args, ctx, "RowEncode")?; @@ -149,7 +151,23 @@ fn execute_row_encode( // listview_offsets[i] is the absolute byte offset where row `i` begins. // For pure-fixed: i * fixed_per_row. // For mixed: i * fixed_per_row + exclusive prefix sum of var_lengths. + // + // When fixed-before-varlen columns exist alongside a varlen column, we also build + // `var_prefix_for_arith[i] = exclusive cumsum of var_lengths[..i]` and pass it to + // the arithmetic encoders so they can compute per-row positions without a cursor. + let need_arith_prefix = first_varlen_idx.is_some() + && col_kinds.iter().any(|k| { + matches!( + k, + ColKind::Fixed { + before_varlen: true, + .. + } + ) + }); + let mut listview_offsets: Vec = Vec::with_capacity(nrows); + let mut var_prefix_for_arith: Option> = None; match var_lengths.as_ref() { None => { // Pure-fixed: offsets[i] = i * fixed_per_row. Materialize via a tight @@ -169,34 +187,82 @@ fn execute_row_encode( // var_prefix is the exclusive cumsum of varlen lengths. Same raw-pointer // write loop as the pure-fixed branch (auto-vectorized); the total was // validated to fit in u32 upstream so `wrapping_add` is sound here. + let mut vp: Option> = need_arith_prefix.then(|| Vec::with_capacity(nrows)); // SAFETY: we just reserved nrows; writes at indices [0, nrows) are valid. + // Likewise `vp` (if Some) has reserved nrows. unsafe { let off_ptr = listview_offsets.as_mut_ptr(); + let vp_ptr = vp.as_mut().map(|p| p.as_mut_ptr()); let mut acc: u32 = 0; for (i, &l) in v.iter().enumerate() { + if let Some(p) = vp_ptr { + p.add(i).write(acc); + } off_ptr .add(i) .write((i as u32).wrapping_mul(fixed_per_row).wrapping_add(acc)); acc = acc.wrapping_add(l); } listview_offsets.set_len(nrows); + if let Some(p) = vp.as_mut() { + p.set_len(nrows); + } } + var_prefix_for_arith = vp; } } // Per-row write cursor (also doubles as the ListView `sizes` slot when done). - let mut row_cursors = vec![0u32; nrows]; + // + // The cursor path starts at `prefix_at_first_varlen` so that `listview_offsets[i] + + // cursors[i]` lands at the position of the first cursor-path column (i.e. after the + // bytes already written by the arithmetic path for fixed-before-varlen columns). + // + // When there are no varlen columns at all, every column went through the arith path, + // so the cursor path runs zero iterations. Pre-seeding the cursors with + // `fixed_per_row` makes them already correct as per-row sizes in that case. + let initial_cursor: u32 = match first_varlen_idx { + Some(idx) => match col_kinds[idx] { + ColKind::Variable { fixed_prefix } => fixed_prefix, + ColKind::Fixed { .. } => unreachable!("first_varlen_idx points to a varlen column"), + }, + None => fixed_per_row, + }; + let mut row_cursors = vec![initial_cursor; nrows]; - // ===== Phase 4: encode columns via the cursor path ===== + // ===== Phase 4: encode columns ===== + // Fixed-before-varlen columns take the arithmetic write path (no cursor mutation). + // Fixed-after-varlen and varlen columns take the cursor path, which already runs + // through `dispatch_encode`. for (i, col) in columns.iter().enumerate() { - dispatch_encode( - col, - options.fields[i], - &listview_offsets, - &mut row_cursors, - &mut out_buf, - ctx, - )?; + match col_kinds[i] { + ColKind::Fixed { + width, + prefix, + before_varlen: true, + } => { + dispatch_encode_fixed_arith( + col, + options.fields[i], + prefix, + fixed_per_row, + var_prefix_for_arith.as_deref(), + width, + &mut out_buf, + ctx, + )?; + } + ColKind::Fixed { .. } | ColKind::Variable { .. } => { + dispatch_encode( + col, + options.fields[i], + &listview_offsets, + &mut row_cursors, + &mut out_buf, + ctx, + )?; + } + } } // ===== Phase 5: build ListView output ===== @@ -226,6 +292,39 @@ fn execute_row_encode( .into_array()) } +/// Dispatch a single column's encoding through the arithmetic fast path. This is used for +/// fixed-width columns that appear before any variable-length column in the row layout: the +/// within-row write offset is a constant `col_prefix + var_prefix[i]` (or just `col_prefix` +/// for the pure-fixed case), so we can skip the per-row cursor read/write entirely. +#[allow(clippy::too_many_arguments)] +fn dispatch_encode_fixed_arith( + col: &ArrayRef, + field: SortField, + col_prefix: u32, + row_stride: u32, + var_prefix: Option<&[u32]>, + width: u32, + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + // Already-canonical PrimitiveArray: bypass the canonicalization machinery entirely so + // the hot loop is reached without going through `execute_until::`. + if col.as_opt::().is_some() + && let Ok(parr) = col.clone().try_downcast::() + { + let canonical = Canonical::Primitive(parr); + return codec::field_encode_fixed_arithmetic( + &canonical, field, col_prefix, row_stride, var_prefix, width, out, ctx, + ); + } + // For other fixed columns route through canonicalization and the codec helpers. The + // Constant fast path is layered on in a follow-up commit. + let canonical = col.clone().execute::(ctx)?; + codec::field_encode_fixed_arithmetic( + &canonical, field, col_prefix, row_stride, var_prefix, width, out, ctx, + ) +} + /// Dispatch a single column's encoding into the shared `out` buffer. /// /// For PR 1 this is just the canonicalize-then-`codec::field_encode` fallback path. diff --git a/vortex-row/src/size.rs b/vortex-row/src/size.rs index 7148a2a21d8..bfe5f647dc5 100644 --- a/vortex-row/src/size.rs +++ b/vortex-row/src/size.rs @@ -48,10 +48,6 @@ use crate::options::serialize_row_encode_options; /// path (no varlen before this column, so the within-row position is constant) and the /// cursor-write path. #[derive(Clone, Copy, Debug)] -#[allow( - dead_code, - reason = "fields read by the RowEncode pipeline in a later commit" -)] pub(crate) enum ColKind { /// Column has fixed width `width`. `prefix` is the within-row byte offset of this /// column's first byte. If `before_varlen` is true, no variable-length column precedes @@ -72,15 +68,7 @@ pub(crate) enum ColKind { pub(crate) struct SizePassResult { pub fixed_per_row: u32, pub var_lengths: Option>, - #[allow( - dead_code, - reason = "consumed by the arithmetic-write fast path added in PR 2" - )] pub col_kinds: Vec, - #[allow( - dead_code, - reason = "consumed by the arithmetic-write fast path added in PR 2" - )] pub first_varlen_idx: Option, pub columns: Vec, }