diff --git a/vortex-row/public-api.lock b/vortex-row/public-api.lock index 1afc1f05442..85985bf7521 100644 --- a/vortex-row/public-api.lock +++ b/vortex-row/public-api.lock @@ -182,6 +182,46 @@ impl core::marker::StructuralPartialEq for vortex_row::options::SortField pub const vortex_row::options::FIELDS_INLINE: usize +pub mod vortex_row::size + +pub struct vortex_row::size::RowSize + +impl core::clone::Clone for vortex_row::size::RowSize + +pub fn vortex_row::size::RowSize::clone(&self) -> vortex_row::size::RowSize + +impl core::fmt::Debug for vortex_row::size::RowSize + +pub fn vortex_row::size::RowSize::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::scalar_fn::vtable::ScalarFnVTable for vortex_row::size::RowSize + +pub type vortex_row::size::RowSize::Options = vortex_row::options::RowEncodeOptions + +pub fn vortex_row::size::RowSize::arity(&self, &Self::Options) -> vortex_array::scalar_fn::vtable::Arity + +pub fn vortex_row::size::RowSize::child_name(&self, &Self::Options, usize) -> vortex_array::scalar_fn::vtable::ChildName + +pub fn vortex_row::size::RowSize::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_row::size::RowSize::execute(&self, &Self::Options, &dyn vortex_array::scalar_fn::vtable::ExecutionArgs, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_row::size::RowSize::id(&self) -> vortex_array::scalar_fn::ScalarFnId + +pub fn vortex_row::size::RowSize::is_fallible(&self, &Self::Options) -> bool + +pub fn vortex_row::size::RowSize::is_null_sensitive(&self, &Self::Options) -> bool + +pub fn vortex_row::size::RowSize::return_dtype(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult + +pub fn vortex_row::size::RowSize::serialize(&self, &Self::Options) -> vortex_error::VortexResult>> + +pub trait vortex_row::size::RowSizeKernel: vortex_array::array::vtable::VTable + +pub fn vortex_row::size::RowSizeKernel::row_size_contribution(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &mut [u32], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> + +pub fn vortex_row::size::dispatch_size(&vortex_array::array::erased::ArrayRef, vortex_row::options::SortField, &mut [u32], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()> + pub struct vortex_row::RowEncodeOptions pub vortex_row::RowEncodeOptions::fields: smallvec::SmallVec<[vortex_row::options::SortField; 4]> @@ -214,6 +254,38 @@ pub fn vortex_row::options::RowEncodeOptions::hash<__H: core::hash::Hasher>(&sel impl core::marker::StructuralPartialEq for vortex_row::options::RowEncodeOptions +pub struct vortex_row::RowSize + +impl core::clone::Clone for vortex_row::size::RowSize + +pub fn vortex_row::size::RowSize::clone(&self) -> vortex_row::size::RowSize + +impl core::fmt::Debug for vortex_row::size::RowSize + +pub fn vortex_row::size::RowSize::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::scalar_fn::vtable::ScalarFnVTable for vortex_row::size::RowSize + +pub type vortex_row::size::RowSize::Options = vortex_row::options::RowEncodeOptions + +pub fn vortex_row::size::RowSize::arity(&self, &Self::Options) -> vortex_array::scalar_fn::vtable::Arity + +pub fn vortex_row::size::RowSize::child_name(&self, &Self::Options, usize) -> vortex_array::scalar_fn::vtable::ChildName + +pub fn vortex_row::size::RowSize::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult + +pub fn vortex_row::size::RowSize::execute(&self, &Self::Options, &dyn vortex_array::scalar_fn::vtable::ExecutionArgs, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult + +pub fn vortex_row::size::RowSize::id(&self) -> vortex_array::scalar_fn::ScalarFnId + +pub fn vortex_row::size::RowSize::is_fallible(&self, &Self::Options) -> bool + +pub fn vortex_row::size::RowSize::is_null_sensitive(&self, &Self::Options) -> bool + +pub fn vortex_row::size::RowSize::return_dtype(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult + +pub fn vortex_row::size::RowSize::serialize(&self, &Self::Options) -> vortex_error::VortexResult>> + pub struct vortex_row::SortField pub vortex_row::SortField::descending: bool @@ -258,4 +330,8 @@ impl core::marker::Copy for vortex_row::options::SortField impl core::marker::StructuralPartialEq for vortex_row::options::SortField +pub trait vortex_row::RowSizeKernel: vortex_array::array::vtable::VTable + +pub fn vortex_row::RowSizeKernel::row_size_contribution(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &mut [u32], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult> + pub fn vortex_row::initialize(&vortex_session::VortexSession) diff --git a/vortex-row/src/lib.rs b/vortex-row/src/lib.rs index bdac4c8f48e..6f1d8fbeab3 100644 --- a/vortex-row/src/lib.rs +++ b/vortex-row/src/lib.rs @@ -8,9 +8,12 @@ pub mod codec; pub mod options; +pub mod size; pub use options::RowEncodeOptions; pub use options::SortField; +pub use size::RowSize; +pub use size::RowSizeKernel; use vortex_session::VortexSession; /// Register the row-encoding scalar functions on the given session. diff --git a/vortex-row/src/size.rs b/vortex-row/src/size.rs new file mode 100644 index 00000000000..fbde52e1863 --- /dev/null +++ b/vortex-row/src/size.rs @@ -0,0 +1,288 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! `RowSize` variadic scalar function: aggregate per-row byte sizes for N input columns. + +use std::sync::Arc; + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::VTable; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::StructArray; +use vortex_array::dtype::DType; +use vortex_array::dtype::FieldName; +use vortex_array::dtype::FieldNames; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::dtype::StructFields; +use vortex_array::scalar::Scalar; +use vortex_array::scalar_fn::Arity; +use vortex_array::scalar_fn::ChildName; +use vortex_array::scalar_fn::ExecutionArgs; +use vortex_array::scalar_fn::ScalarFnId; +use vortex_array::scalar_fn::ScalarFnVTable; +use vortex_array::validity::Validity; +use vortex_buffer::Buffer; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_session::VortexSession; + +use crate::codec; +use crate::codec::RowWidth; +use crate::options::RowEncodeOptions; +use crate::options::SortField; +use crate::options::deserialize_row_encode_options; +use crate::options::serialize_row_encode_options; + +/// Classification of a single input column for the size pass. +/// +/// Tracks each column's within-row byte offset (the constant prefix from all preceding +/// fixed-width columns) and, for fixed columns, whether any variable-length column has +/// appeared yet — the encode pass uses this to choose between the arithmetic-write fast +/// path (no varlen before this column, so the within-row position is constant) and the +/// cursor-write path. +#[derive(Clone, Copy, Debug)] +#[allow( + dead_code, + reason = "fields read by the RowEncode pipeline in a later commit" +)] +pub(crate) enum ColKind { + /// Column has fixed width `width`. `prefix` is the within-row byte offset of this + /// column's first byte. If `before_varlen` is true, no variable-length column precedes + /// this one, so the within-row offset is constant for every row. + Fixed { + width: u32, + prefix: u32, + before_varlen: bool, + }, + /// Column has variable per-row width. `fixed_prefix` is the sum of widths of all + /// preceding fixed columns; the varlen contribution from earlier varlen columns is + /// added per row. + Variable { fixed_prefix: u32 }, +} + +/// Result of the size pass: enough information for both [`RowSize::execute`] and the +/// downstream [`RowEncode`](super::encode::RowEncode) pipeline. +pub(crate) struct SizePassResult { + pub fixed_per_row: u32, + pub var_lengths: Option>, + pub col_kinds: Vec, + pub first_varlen_idx: Option, + pub columns: Vec, +} + +/// Walk N input columns once, classifying each as fixed-width or variable-length and +/// accumulating per-row size contributions. +/// +/// Fixed-width columns contribute a single scalar increment to `fixed_per_row`; they do +/// not touch `var_lengths`. Variable-length columns add per-row contributions into the +/// lazily-allocated `var_lengths` vec via [`dispatch_size`]. +/// +/// This is shared by [`RowSize::execute`] (which wraps the result into a +/// `Struct { fixed, var }`) and the [`RowEncode`](super::encode::RowEncode) pipeline +/// (which uses the full result, including `col_kinds`, to drive the encode pass). +pub(crate) fn compute_sizes( + options: &RowEncodeOptions, + args: &dyn ExecutionArgs, + ctx: &mut ExecutionCtx, + op_name: &'static str, +) -> VortexResult { + let n_inputs = args.num_inputs(); + if n_inputs == 0 { + vortex_bail!("{} requires at least one input column", op_name); + } + if options.fields.len() != n_inputs { + vortex_bail!( + "{} options.fields.len()={} does not match num_inputs={}", + op_name, + options.fields.len(), + n_inputs + ); + } + let nrows = args.row_count(); + + let mut columns: Vec = Vec::with_capacity(n_inputs); + let mut col_kinds: Vec = Vec::with_capacity(n_inputs); + let mut fixed_per_row: u32 = 0; + let mut var_lengths: Option> = None; + let mut first_varlen_idx: Option = None; + let mut running_fixed_prefix: u32 = 0; + + for i in 0..n_inputs { + let col = args.get(i)?; + if col.len() != nrows { + vortex_bail!( + "{}: column {} has length {} but expected {}", + op_name, + i, + col.len(), + nrows + ); + } + match codec::row_width_for_dtype(col.dtype())? { + RowWidth::Fixed(w) => { + col_kinds.push(ColKind::Fixed { + width: w, + prefix: running_fixed_prefix, + before_varlen: first_varlen_idx.is_none(), + }); + fixed_per_row = fixed_per_row + .checked_add(w) + .vortex_expect("row width overflow"); + running_fixed_prefix = running_fixed_prefix + .checked_add(w) + .vortex_expect("row width overflow"); + } + RowWidth::Variable => { + if first_varlen_idx.is_none() { + first_varlen_idx = Some(i); + } + let v = var_lengths.get_or_insert_with(|| vec![0u32; nrows]); + dispatch_size(&col, options.fields[i], v, ctx)?; + col_kinds.push(ColKind::Variable { + fixed_prefix: running_fixed_prefix, + }); + } + } + columns.push(col); + } + + Ok(SizePassResult { + fixed_per_row, + var_lengths, + col_kinds, + first_varlen_idx, + columns, + }) +} + +/// Variadic scalar function that, given N input columns and per-column [`SortField`]s, +/// returns a `Struct { fixed: U32, var: U32 }` array of per-row byte sizes for the +/// row-oriented encoding produced by [`RowEncode`](super::encode::RowEncode). +/// +/// The `fixed` field is always a [`ConstantArray`] holding the sum of the per-column +/// constant widths of fixed-width inputs (sentinel + value bytes). The `var` field is a +/// `ConstantArray(0)` when there are no variable-length input columns, and a +/// [`PrimitiveArray`] of per-row varlen-byte sums otherwise. +/// +/// The total per-row byte size is `fixed + var`. +#[derive(Clone, Debug)] +pub struct RowSize; + +/// Returns the [`FieldNames`] used by the [`RowSize`] output struct. +pub(crate) fn row_size_field_names() -> FieldNames { + FieldNames::from([FieldName::from("fixed"), FieldName::from("var")]) +} + +/// Returns the output [`DType`] of [`RowSize`]. +pub(crate) fn row_size_struct_dtype() -> DType { + DType::Struct( + StructFields::new( + row_size_field_names(), + vec![ + DType::Primitive(PType::U32, Nullability::NonNullable), + DType::Primitive(PType::U32, Nullability::NonNullable), + ], + ), + Nullability::NonNullable, + ) +} + +impl ScalarFnVTable for RowSize { + type Options = RowEncodeOptions; + + fn id(&self) -> ScalarFnId { + ScalarFnId::from("vortex.row_size") + } + + fn serialize(&self, options: &Self::Options) -> VortexResult>> { + Ok(Some(serialize_row_encode_options(options))) + } + + fn deserialize( + &self, + metadata: &[u8], + _session: &VortexSession, + ) -> VortexResult { + deserialize_row_encode_options(metadata) + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Variadic { min: 1, max: None } + } + + fn child_name(&self, _options: &Self::Options, child_idx: usize) -> ChildName { + ChildName::from(Arc::from(format!("col_{}", child_idx))) + } + + fn return_dtype(&self, _options: &Self::Options, _args: &[DType]) -> VortexResult { + Ok(row_size_struct_dtype()) + } + + fn execute( + &self, + options: &Self::Options, + args: &dyn ExecutionArgs, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let nrows = args.row_count(); + let result = compute_sizes(options, args, ctx, "RowSize")?; + let fixed_array = + ConstantArray::new(Scalar::from(result.fixed_per_row), nrows).into_array(); + let var_array = match result.var_lengths { + Some(v) => PrimitiveArray::new(Buffer::::copy_from(&v), Validity::NonNullable) + .into_array(), + None => ConstantArray::new(Scalar::from(0u32), nrows).into_array(), + }; + Ok(StructArray::try_new( + row_size_field_names(), + vec![fixed_array, var_array], + nrows, + Validity::NonNullable, + )? + .into_array()) + } + + fn is_null_sensitive(&self, _options: &Self::Options) -> bool { + true + } + + fn is_fallible(&self, _options: &Self::Options) -> bool { + false + } +} + +/// Dispatch a single column's per-row size contribution. +/// +/// For PR 1 this is just the canonicalize-then-`codec::field_size` fallback path. In-crate +/// fast paths for `Constant`/`Dict`/`Patched` and the inventory-based registry for +/// downstream encodings are added in PR 3. +pub fn dispatch_size( + col: &ArrayRef, + field: SortField, + sizes: &mut [u32], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let canonical = col.clone().execute::(ctx)?; + codec::field_size(&canonical, field, sizes, ctx) +} + +/// Mutate-buffer kernel: add this column's per-row byte contribution into the shared +/// `sizes` slice. Return `Ok(None)` to decline and fall back to the canonical path. +/// +/// Trait is defined now; per-encoding impls and dispatch wiring land in PR 3. +pub trait RowSizeKernel: VTable { + /// Add this column's per-row byte contribution into `sizes`. + fn row_size_contribution( + column: ArrayView<'_, Self>, + field: SortField, + sizes: &mut [u32], + ctx: &mut ExecutionCtx, + ) -> VortexResult>; +}