Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions vortex-row/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,46 @@ pub fn vortex_row::codec::field_size(&vortex_array::canonical::Canonical, vortex

pub fn vortex_row::codec::row_width_for_dtype(&vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_row::codec::RowWidth>

pub mod vortex_row::encode

pub struct vortex_row::encode::RowEncode

impl core::clone::Clone for vortex_row::encode::RowEncode

pub fn vortex_row::encode::RowEncode::clone(&self) -> vortex_row::encode::RowEncode

impl core::fmt::Debug for vortex_row::encode::RowEncode

pub fn vortex_row::encode::RowEncode::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl vortex_array::scalar_fn::vtable::ScalarFnVTable for vortex_row::encode::RowEncode

pub type vortex_row::encode::RowEncode::Options = vortex_row::options::RowEncodeOptions

pub fn vortex_row::encode::RowEncode::arity(&self, &Self::Options) -> vortex_array::scalar_fn::vtable::Arity

pub fn vortex_row::encode::RowEncode::child_name(&self, &Self::Options, usize) -> vortex_array::scalar_fn::vtable::ChildName

pub fn vortex_row::encode::RowEncode::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_row::encode::RowEncode::execute(&self, &Self::Options, &dyn vortex_array::scalar_fn::vtable::ExecutionArgs, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub fn vortex_row::encode::RowEncode::id(&self) -> vortex_array::scalar_fn::ScalarFnId

pub fn vortex_row::encode::RowEncode::is_fallible(&self, &Self::Options) -> bool

pub fn vortex_row::encode::RowEncode::is_null_sensitive(&self, &Self::Options) -> bool

pub fn vortex_row::encode::RowEncode::return_dtype(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_row::encode::RowEncode::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub trait vortex_row::encode::RowEncodeKernel: vortex_array::array::vtable::VTable

pub fn vortex_row::encode::RowEncodeKernel::row_encode_into(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &[u32], &mut [u32], &mut [u8], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<()>>

pub fn vortex_row::encode::dispatch_encode(&vortex_array::array::erased::ArrayRef, vortex_row::options::SortField, &[u32], &mut [u32], &mut [u8], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()>

pub mod vortex_row::options

pub struct vortex_row::options::RowEncodeOptions
Expand Down Expand Up @@ -222,6 +262,38 @@ pub fn vortex_row::size::RowSizeKernel::row_size_contribution(vortex_array::arra

pub fn vortex_row::size::dispatch_size(&vortex_array::array::erased::ArrayRef, vortex_row::options::SortField, &mut [u32], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()>

pub struct vortex_row::RowEncode

impl core::clone::Clone for vortex_row::encode::RowEncode

pub fn vortex_row::encode::RowEncode::clone(&self) -> vortex_row::encode::RowEncode

impl core::fmt::Debug for vortex_row::encode::RowEncode

pub fn vortex_row::encode::RowEncode::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl vortex_array::scalar_fn::vtable::ScalarFnVTable for vortex_row::encode::RowEncode

pub type vortex_row::encode::RowEncode::Options = vortex_row::options::RowEncodeOptions

pub fn vortex_row::encode::RowEncode::arity(&self, &Self::Options) -> vortex_array::scalar_fn::vtable::Arity

pub fn vortex_row::encode::RowEncode::child_name(&self, &Self::Options, usize) -> vortex_array::scalar_fn::vtable::ChildName

pub fn vortex_row::encode::RowEncode::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_row::encode::RowEncode::execute(&self, &Self::Options, &dyn vortex_array::scalar_fn::vtable::ExecutionArgs, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub fn vortex_row::encode::RowEncode::id(&self) -> vortex_array::scalar_fn::ScalarFnId

pub fn vortex_row::encode::RowEncode::is_fallible(&self, &Self::Options) -> bool

pub fn vortex_row::encode::RowEncode::is_null_sensitive(&self, &Self::Options) -> bool

pub fn vortex_row::encode::RowEncode::return_dtype(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_row::encode::RowEncode::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub struct vortex_row::RowEncodeOptions

pub vortex_row::RowEncodeOptions::fields: smallvec::SmallVec<[vortex_row::options::SortField; 4]>
Expand Down Expand Up @@ -330,6 +402,10 @@ impl core::marker::Copy for vortex_row::options::SortField

impl core::marker::StructuralPartialEq for vortex_row::options::SortField

pub trait vortex_row::RowEncodeKernel: vortex_array::array::vtable::VTable

pub fn vortex_row::RowEncodeKernel::row_encode_into(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &[u32], &mut [u32], &mut [u8], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<()>>

pub trait vortex_row::RowSizeKernel: vortex_array::array::vtable::VTable

pub fn vortex_row::RowSizeKernel::row_size_contribution(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &mut [u32], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<()>>
Expand Down
238 changes: 238 additions & 0 deletions vortex-row/src/encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

#![allow(
clippy::cast_possible_truncation,
reason = "row encoding indexes into u32-sized buffers; lengths are validated to fit in u32"
)]

//! `RowEncode` variadic scalar function: encode N input columns into a single `ListView<u8>`.
//!
//! The output's `(elements, offsets, sizes)` triple is built up in a single left-to-right
//! pass over the input columns. The `sizes` array doubles as the per-row write cursor, so
//! when the last column finishes encoding, the accumulator is the final array - no separate
//! conversion step is needed.

use std::sync::Arc;

use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::VTable;
use vortex_array::arrays::ListViewArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PType;
use vortex_array::scalar_fn::Arity;
use vortex_array::scalar_fn::ChildName;
use vortex_array::scalar_fn::ExecutionArgs;
use vortex_array::scalar_fn::ScalarFnId;
use vortex_array::scalar_fn::ScalarFnVTable;
use vortex_array::validity::Validity;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_session::VortexSession;

use crate::codec;
use crate::options::RowEncodeOptions;
use crate::options::SortField;
use crate::options::deserialize_row_encode_options;
use crate::options::serialize_row_encode_options;
use crate::size::compute_sizes;

/// Variadic scalar function that encodes N input columns into a single `List<u8>`
/// [`ListViewArray`] where row `i` contains the row-encoded bytes for column values
/// `cols[0][i], cols[1][i], ...` concatenated left-to-right.
#[derive(Clone, Debug)]
pub struct RowEncode;

impl ScalarFnVTable for RowEncode {
type Options = RowEncodeOptions;

fn id(&self) -> ScalarFnId {
ScalarFnId::from("vortex.row_encode")
}

fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(Some(serialize_row_encode_options(options)))
}

fn deserialize(
&self,
metadata: &[u8],
_session: &VortexSession,
) -> VortexResult<Self::Options> {
deserialize_row_encode_options(metadata)
}

fn arity(&self, _options: &Self::Options) -> Arity {
Arity::Variadic { min: 1, max: None }
}

fn child_name(&self, _options: &Self::Options, child_idx: usize) -> ChildName {
ChildName::from(Arc::from(format!("col_{}", child_idx)))
}

fn return_dtype(&self, _options: &Self::Options, _args: &[DType]) -> VortexResult<DType> {
Ok(DType::List(
Arc::new(DType::Primitive(PType::U8, Nullability::NonNullable)),
Nullability::NonNullable,
))
}

fn execute(
&self,
options: &Self::Options,
args: &dyn ExecutionArgs,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
execute_row_encode(options, args, ctx)
}

fn is_null_sensitive(&self, _options: &Self::Options) -> bool {
true
}

fn is_fallible(&self, _options: &Self::Options) -> bool {
false
}
}

fn execute_row_encode(
options: &RowEncodeOptions,
args: &dyn ExecutionArgs,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let nrows = args.row_count();

// ===== Phase 1: classify + size pass =====
let crate::size::SizePassResult {
fixed_per_row,
var_lengths,
col_kinds: _,
first_varlen_idx: _,
columns,
} = compute_sizes(options, args, ctx, "RowEncode")?;

// ===== Phase 2: totals + buffer =====
let var_total: u64 = var_lengths
.as_ref()
.map_or(0, |v| v.iter().map(|&x| u64::from(x)).sum());
let total: u64 = (nrows as u64)
.checked_mul(u64::from(fixed_per_row))
.and_then(|t| t.checked_add(var_total))
.vortex_expect("row-encoded total bytes overflow");
if total > u32::MAX as u64 {
vortex_bail!("row-encoded output size {} bytes exceeds u32::MAX", total);
}
let total_len = total as usize;

// Allocate the elements buffer (zero-initialized). The zero-init lets every encoder
// assume previously-untouched bytes are zero, simplifying the null-row fill paths.
// PR 2 skips this memset because every byte in the output range is written by some
// encoder.
let mut out_buf: BufferMut<u8> = BufferMut::with_capacity(total_len);
out_buf.push_n(0u8, total_len);

// ===== Phase 3: per-row offsets =====
// listview_offsets[i] is the absolute byte offset where row `i` begins.
// For pure-fixed: i * fixed_per_row.
// For mixed: i * fixed_per_row + exclusive prefix sum of var_lengths.
let mut listview_offsets: Vec<u32> = Vec::with_capacity(nrows);
match var_lengths.as_ref() {
None => {
for i in 0..nrows {
listview_offsets.push(
(i as u32)
.checked_mul(fixed_per_row)
.vortex_expect("row offset overflow (already validated total fits in u32)"),
);
}
}
Some(v) => {
let mut acc: u32 = 0;
for (i, &l) in v.iter().enumerate() {
let off = (i as u32)
.checked_mul(fixed_per_row)
.and_then(|t| t.checked_add(acc))
.vortex_expect("row offset overflow");
listview_offsets.push(off);
acc = acc.checked_add(l).vortex_expect("varlen prefix overflow");
}
}
}

// Per-row write cursor (also doubles as the ListView `sizes` slot when done).
let mut row_cursors = vec![0u32; nrows];

// ===== Phase 4: encode columns via the cursor path =====
for (i, col) in columns.iter().enumerate() {
dispatch_encode(
col,
options.fields[i],
&listview_offsets,
&mut row_cursors,
&mut out_buf,
ctx,
)?;
}

// ===== Phase 5: build ListView output =====
let elements = PrimitiveArray::new(out_buf.freeze(), Validity::NonNullable).into_array();
let offsets_arr = PrimitiveArray::new(
Buffer::<u32>::copy_from(&listview_offsets),
Validity::NonNullable,
)
.into_array();
let sizes_arr = PrimitiveArray::new(
Buffer::<u32>::copy_from(&row_cursors),
Validity::NonNullable,
)
.into_array();
Ok(
ListViewArray::try_new(elements, offsets_arr, sizes_arr, Validity::NonNullable)?
.into_array(),
)
}

/// Dispatch a single column's encoding into the shared `out` buffer.
///
/// For PR 1 this is just the canonicalize-then-`codec::field_encode` fallback path.
/// In-crate fast paths for `Constant`/`Dict`/`Patched` and the inventory-based registry
/// for downstream encodings are added in PR 3.
pub fn dispatch_encode(
col: &ArrayRef,
field: SortField,
offsets: &[u32],
cursors: &mut [u32],
out: &mut [u8],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let canonical = col.clone().execute::<Canonical>(ctx)?;
codec::field_encode(&canonical, field, offsets, cursors, out, ctx)
}

/// Mutate-buffer kernel: write this column's per-row bytes into `out` at
/// `offsets[i] + cursors[i]`, advancing `cursors[i]` by the bytes written.
///
/// Return `Ok(None)` to decline and fall back to the canonical path.
///
/// Trait is defined now; per-encoding impls and dispatch wiring land in PR 3.
pub trait RowEncodeKernel: VTable {
/// Write this column's per-row bytes into `out` at `offsets[i] + cursors[i]`, advancing
/// `cursors[i]` by the bytes written.
fn row_encode_into(
column: ArrayView<'_, Self>,
field: SortField,
offsets: &[u32],
cursors: &mut [u32],
out: &mut [u8],
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<()>>;
}
3 changes: 3 additions & 0 deletions vortex-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
//! This commit only establishes the crate skeleton and an `initialize` stub.

pub mod codec;
pub mod encode;
pub mod options;
pub mod size;

pub use encode::RowEncode;
pub use encode::RowEncodeKernel;
pub use options::RowEncodeOptions;
pub use options::SortField;
pub use size::RowSize;
Expand Down
8 changes: 8 additions & 0 deletions vortex-row/src/size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ pub(crate) enum ColKind {
pub(crate) struct SizePassResult {
pub fixed_per_row: u32,
pub var_lengths: Option<Vec<u32>>,
#[allow(
dead_code,
reason = "consumed by the arithmetic-write fast path added in PR 2"
)]
pub col_kinds: Vec<ColKind>,
#[allow(
dead_code,
reason = "consumed by the arithmetic-write fast path added in PR 2"
)]
pub first_varlen_idx: Option<usize>,
pub columns: Vec<ArrayRef>,
}
Expand Down
Loading