From dc726949cd83653207e2ff73a243e9da5269cb01 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 15 May 2026 17:34:54 +0100 Subject: [PATCH 1/5] Add UncompressedSizeInBytes pushdown for all encodings Signed-off-by: Adam Gutglick --- encodings/alp/src/lib.rs | 12 ++ encodings/bytebool/public-api.lock | 2 + encodings/bytebool/src/lib.rs | 17 +++ encodings/datetime-parts/src/lib.rs | 7 + encodings/decimal-byte-parts/src/lib.rs | 7 + encodings/fastlanes/src/lib.rs | 22 +++ encodings/fsst/public-api.lock | 2 + encodings/fsst/src/compute/mod.rs | 1 + .../fsst/src/compute/uncompressed_size.rs | 104 ++++++++++++++ encodings/fsst/src/lib.rs | 16 +++ encodings/pco/public-api.lock | 2 + encodings/pco/src/lib.rs | 17 +++ encodings/runend/src/lib.rs | 7 + encodings/sequence/src/lib.rs | 7 + encodings/sparse/public-api.lock | 2 + encodings/sparse/src/lib.rs | 15 ++ encodings/zigzag/public-api.lock | 2 + encodings/zigzag/src/lib.rs | 17 +++ encodings/zstd/public-api.lock | 2 + encodings/zstd/src/array.rs | 4 + encodings/zstd/src/compute/mod.rs | 1 + .../zstd/src/compute/uncompressed_size.rs | 129 ++++++++++++++++++ encodings/zstd/src/lib.rs | 28 ++++ encodings/zstd/src/test.rs | 37 +++++ vortex-array/public-api.lock | 14 ++ .../fns/uncompressed_size_in_bytes/kernel.rs | 103 ++++++++++++++ .../fns/uncompressed_size_in_bytes/mod.rs | 64 ++++++++- vortex-array/src/aggregate_fn/session.rs | 50 +++++++ vortex-array/src/arrays/list/compute/mod.rs | 1 + .../arrays/list/compute/uncompressed_size.rs | 69 ++++++++++ vortex-array/src/arrays/varbin/compute/mod.rs | 1 + .../varbin/compute/uncompressed_size.rs | 71 ++++++++++ vortex-file/src/lib.rs | 24 ++-- 33 files changed, 839 insertions(+), 18 deletions(-) create mode 100644 encodings/fsst/src/compute/uncompressed_size.rs create mode 100644 encodings/zstd/src/compute/uncompressed_size.rs create mode 100644 vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs create mode 100644 vortex-array/src/arrays/list/compute/uncompressed_size.rs create mode 100644 vortex-array/src/arrays/varbin/compute/uncompressed_size.rs diff --git a/encodings/alp/src/lib.rs b/encodings/alp/src/lib.rs index 7da676d6c6c..8e30b5d12c7 100644 --- a/encodings/alp/src/lib.rs +++ b/encodings/alp/src/lib.rs @@ -21,6 +21,8 @@ pub use alp_rd::*; use vortex_array::ArrayVTable; use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::nan_count::NanCount; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::session::ArraySessionExt; @@ -46,4 +48,14 @@ pub fn initialize(session: &VortexSession) { Some(NanCount.id()), &compute::nan_count::ALPNanCountKernel, ); + session.aggregate_fns().register_aggregate_kernel( + ALP.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + session.aggregate_fns().register_aggregate_kernel( + ALPRD.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); } diff --git a/encodings/bytebool/public-api.lock b/encodings/bytebool/public-api.lock index 730db87d7d5..f957fff0343 100644 --- a/encodings/bytebool/public-api.lock +++ b/encodings/bytebool/public-api.lock @@ -120,4 +120,6 @@ impl> vo pub fn T::validity(&self) -> vortex_array::validity::Validity +pub fn vortex_bytebool::initialize(&vortex_session::VortexSession) + pub type vortex_bytebool::ByteBoolArray = vortex_array::array::typed::Array diff --git a/encodings/bytebool/src/lib.rs b/encodings/bytebool/src/lib.rs index 3258341cdb9..0eb63b21e64 100644 --- a/encodings/bytebool/src/lib.rs +++ b/encodings/bytebool/src/lib.rs @@ -41,9 +41,26 @@ //! [spec]: https://arrow.apache.org/docs/format/CanonicalExtensions.html#bit-boolean pub use array::*; +use vortex_array::ArrayVTable; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_array::session::ArraySessionExt; +use vortex_session::VortexSession; mod array; mod compute; mod kernel; mod rules; mod slice; + +/// Initialize ByteBool encoding in the given session. +pub fn initialize(session: &VortexSession) { + session.arrays().register(ByteBool); + session.aggregate_fns().register_aggregate_kernel( + ByteBool.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); +} diff --git a/encodings/datetime-parts/src/lib.rs b/encodings/datetime-parts/src/lib.rs index e061d69bccd..3dce95e1e1d 100644 --- a/encodings/datetime-parts/src/lib.rs +++ b/encodings/datetime-parts/src/lib.rs @@ -14,6 +14,8 @@ mod timestamp; use vortex_array::ArrayVTable; use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::is_constant::IsConstant; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -27,6 +29,11 @@ pub fn initialize(session: &VortexSession) { Some(IsConstant.id()), &compute::is_constant::DateTimePartsIsConstantKernel, ); + session.aggregate_fns().register_aggregate_kernel( + DateTimeParts.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); } #[cfg(test)] diff --git a/encodings/decimal-byte-parts/src/lib.rs b/encodings/decimal-byte-parts/src/lib.rs index 86566027afa..30edc0d7e26 100644 --- a/encodings/decimal-byte-parts/src/lib.rs +++ b/encodings/decimal-byte-parts/src/lib.rs @@ -16,6 +16,8 @@ pub use decimal_byte_parts::*; use vortex_array::ArrayVTable; use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::is_constant::IsConstant; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -29,4 +31,9 @@ pub fn initialize(session: &VortexSession) { Some(IsConstant.id()), &DecimalBytePartsIsConstantKernel, ); + session.aggregate_fns().register_aggregate_kernel( + DecimalByteParts.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); } diff --git a/encodings/fastlanes/src/lib.rs b/encodings/fastlanes/src/lib.rs index 9022b7c4e2b..b0e70474e91 100644 --- a/encodings/fastlanes/src/lib.rs +++ b/encodings/fastlanes/src/lib.rs @@ -30,6 +30,8 @@ use vortex_array::ArrayVTable; use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::is_constant::IsConstant; use vortex_array::aggregate_fn::fns::is_sorted::IsSorted; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::session::ArraySessionExt; @@ -64,6 +66,26 @@ pub fn initialize(session: &VortexSession) { Some(IsSorted.id()), &FoRIsSortedKernel, ); + session.aggregate_fns().register_aggregate_kernel( + BitPacked.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + session.aggregate_fns().register_aggregate_kernel( + Delta.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + session.aggregate_fns().register_aggregate_kernel( + FoR.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + session.aggregate_fns().register_aggregate_kernel( + RLE.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); } /// Fill-forward null values in a buffer, replacing each null with the last valid value seen. diff --git a/encodings/fsst/public-api.lock b/encodings/fsst/public-api.lock index 44cf663fa39..d041ebc3cf5 100644 --- a/encodings/fsst/public-api.lock +++ b/encodings/fsst/public-api.lock @@ -186,4 +186,6 @@ pub fn vortex_fsst::fsst_compress_iter<'a, I>(I, usize, vortex_array::dtype::DTy pub fn vortex_fsst::fsst_train_compressor>(&A) -> fsst::Compressor +pub fn vortex_fsst::initialize(&vortex_session::VortexSession) + pub type vortex_fsst::FSSTArray = vortex_array::array::typed::Array diff --git a/encodings/fsst/src/compute/mod.rs b/encodings/fsst/src/compute/mod.rs index 02efdf7febc..4205056009d 100644 --- a/encodings/fsst/src/compute/mod.rs +++ b/encodings/fsst/src/compute/mod.rs @@ -5,6 +5,7 @@ mod cast; mod compare; mod filter; mod like; +pub(crate) mod uncompressed_size; use vortex_array::ArrayRef; use vortex_array::ArrayView; diff --git a/encodings/fsst/src/compute/uncompressed_size.rs b/encodings/fsst/src/compute/uncompressed_size.rs new file mode 100644 index 00000000000..f756b51f435 --- /dev/null +++ b/encodings/fsst/src/compute/uncompressed_size.rs @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::mem::size_of; + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::primitive::PrimitiveArrayExt; +use vortex_array::arrays::varbinview::build_views::BinaryView; +use vortex_array::dtype::IntegerPType; +use vortex_array::match_each_integer_ptype; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use crate::FSST; +use crate::FSSTArrayExt; + +#[derive(Debug)] +pub(crate) struct FSSTUncompressedSizeInBytesKernel; + +impl DynAggregateKernel for FSSTUncompressedSizeInBytesKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(array) = batch.as_opt::() else { + return Ok(None); + }; + + let views_size = checked_len_mul(array.len(), size_of::(), "binary view")?; + let data_size = uncompressed_lengths_size( + &array + .uncompressed_lengths() + .clone() + .execute::(ctx)?, + )?; + let validity_size = validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?; + + let size = views_size + .checked_add(data_size) + .and_then(|size| size.checked_add(validity_size)) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + + Ok(Some(Scalar::from(size))) + } +} + +fn uncompressed_lengths_size(lengths: &PrimitiveArray) -> VortexResult { + match_each_integer_ptype!(lengths.ptype(), |P| { + uncompressed_lengths_size_typed(lengths.as_slice::

()) + }) +} + +fn uncompressed_lengths_size_typed(lengths: &[P]) -> VortexResult { + let mut size = 0u64; + for len in lengths { + let len = len + .to_u64() + .ok_or_else(|| vortex_err!("uncompressed length cannot be negative"))?; + size = size + .checked_add(len) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + } + Ok(size) +} + +fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { + match validity { + Mask::AllTrue(_) => Ok(0), + Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()), + Mask::Values(values) => u64::try_from(values.len()) + .map(|len| len.div_ceil(8)) + .map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")), + } +} + +fn checked_len_mul(len: usize, width: usize, name: &str) -> VortexResult { + let len = u64::try_from(len) + .map_err(|e| vortex_err!("Failed to convert {name} length to u64: {e}"))?; + let width = u64::try_from(width) + .map_err(|e| vortex_err!("Failed to convert {name} byte width to u64: {e}"))?; + + len.checked_mul(width) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) +} diff --git a/encodings/fsst/src/lib.rs b/encodings/fsst/src/lib.rs index 3305c0e66fc..3363a0e80a3 100644 --- a/encodings/fsst/src/lib.rs +++ b/encodings/fsst/src/lib.rs @@ -27,3 +27,19 @@ mod tests; pub use array::*; pub use compress::*; +use vortex_array::ArrayVTable; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_array::session::ArraySessionExt; +use vortex_session::VortexSession; + +/// Initialize FSST encoding in the given session. +pub fn initialize(session: &VortexSession) { + session.arrays().register(FSST); + session.aggregate_fns().register_aggregate_kernel( + FSST.id(), + Some(UncompressedSizeInBytes.id()), + &compute::uncompressed_size::FSSTUncompressedSizeInBytesKernel, + ); +} diff --git a/encodings/pco/public-api.lock b/encodings/pco/public-api.lock index 655ca1d6ecc..b721989a082 100644 --- a/encodings/pco/public-api.lock +++ b/encodings/pco/public-api.lock @@ -164,4 +164,6 @@ pub fn vortex_pco::PcoPageInfo::clear(&mut self) pub fn vortex_pco::PcoPageInfo::encoded_len(&self) -> usize +pub fn vortex_pco::initialize(&vortex_session::VortexSession) + pub type vortex_pco::PcoArray = vortex_array::array::typed::Array diff --git a/encodings/pco/src/lib.rs b/encodings/pco/src/lib.rs index fcf9a9397fb..d0ede911eac 100644 --- a/encodings/pco/src/lib.rs +++ b/encodings/pco/src/lib.rs @@ -7,6 +7,23 @@ mod rules; mod slice; pub use array::*; +use vortex_array::ArrayVTable; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_array::session::ArraySessionExt; +use vortex_session::VortexSession; + +/// Initialize Pco encoding in the given session. +pub fn initialize(session: &VortexSession) { + session.arrays().register(Pco); + session.aggregate_fns().register_aggregate_kernel( + Pco.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); +} #[derive(Clone, prost::Message)] pub struct PcoPageInfo { diff --git a/encodings/runend/src/lib.rs b/encodings/runend/src/lib.rs index 8770dbbf58e..97bc75af5e5 100644 --- a/encodings/runend/src/lib.rs +++ b/encodings/runend/src/lib.rs @@ -31,6 +31,8 @@ use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::is_constant::IsConstant; use vortex_array::aggregate_fn::fns::is_sorted::IsSorted; use vortex_array::aggregate_fn::fns::min_max::MinMax; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -55,6 +57,11 @@ pub fn initialize(session: &VortexSession) { Some(IsSorted.id()), &compute::is_sorted::RunEndIsSortedKernel, ); + session.aggregate_fns().register_aggregate_kernel( + RunEnd.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); } #[cfg(test)] diff --git a/encodings/sequence/src/lib.rs b/encodings/sequence/src/lib.rs index bd6ab2f508c..5a7a14dfc54 100644 --- a/encodings/sequence/src/lib.rs +++ b/encodings/sequence/src/lib.rs @@ -20,6 +20,8 @@ use vortex_array::ArrayVTable; use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::is_sorted::IsSorted; use vortex_array::aggregate_fn::fns::min_max::MinMax; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -39,6 +41,11 @@ pub fn initialize(session: &VortexSession) { Some(IsSorted.id()), &compute::is_sorted::SequenceIsSortedKernel, ); + session.aggregate_fns().register_aggregate_kernel( + Sequence.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); } // TODO(joe): hook up to the compressor diff --git a/encodings/sparse/public-api.lock b/encodings/sparse/public-api.lock index 33ec9fc9361..15da430d199 100644 --- a/encodings/sparse/public-api.lock +++ b/encodings/sparse/public-api.lock @@ -222,4 +222,6 @@ pub fn vortex_array::array::view::ArrayView<'_, vortex_sparse::Sparse>::patches( pub fn vortex_array::array::view::ArrayView<'_, vortex_sparse::Sparse>::resolved_patches(&self) -> vortex_error::VortexResult +pub fn vortex_sparse::initialize(&vortex_session::VortexSession) + pub type vortex_sparse::SparseArray = vortex_array::array::typed::Array diff --git a/encodings/sparse/src/lib.rs b/encodings/sparse/src/lib.rs index 74b137fd1c7..75bd9896816 100644 --- a/encodings/sparse/src/lib.rs +++ b/encodings/sparse/src/lib.rs @@ -23,6 +23,10 @@ use vortex_array::ExecutionCtx; use vortex_array::ExecutionResult; use vortex_array::IntoArray; use vortex_array::Precision; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; use vortex_array::arrays::BoolArray; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::Primitive; @@ -42,6 +46,7 @@ use vortex_array::scalar::Scalar; use vortex_array::scalar::ScalarValue; use vortex_array::scalar_fn::fns::operators::Operator; use vortex_array::serde::ArrayChildren; +use vortex_array::session::ArraySessionExt; use vortex_array::validity::Validity; use vortex_array::vtable::VTable; use vortex_array::vtable::ValidityVTable; @@ -68,6 +73,16 @@ mod ops; mod rules; mod slice; +/// Initialize Sparse encoding in the given session. +pub fn initialize(session: &VortexSession) { + session.arrays().register(Sparse); + session.aggregate_fns().register_aggregate_kernel( + Sparse.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); +} + /// A [`Sparse`]-encoded Vortex array. pub type SparseArray = Array; diff --git a/encodings/zigzag/public-api.lock b/encodings/zigzag/public-api.lock index 3bbc2428ad1..1bf7992b2c2 100644 --- a/encodings/zigzag/public-api.lock +++ b/encodings/zigzag/public-api.lock @@ -116,6 +116,8 @@ pub fn T::encoded(&self) -> &vortex_array::array::erased::ArrayRef pub fn T::ptype(&self) -> vortex_array::dtype::ptype::PType +pub fn vortex_zigzag::initialize(&vortex_session::VortexSession) + pub fn vortex_zigzag::zigzag_decode(vortex_array::arrays::primitive::vtable::PrimitiveArray) -> vortex_array::arrays::primitive::vtable::PrimitiveArray pub fn vortex_zigzag::zigzag_encode(vortex_array::array::view::ArrayView<'_, vortex_array::arrays::primitive::vtable::Primitive>) -> vortex_error::VortexResult diff --git a/encodings/zigzag/src/lib.rs b/encodings/zigzag/src/lib.rs index 89da8bd6069..1ece7a9a473 100644 --- a/encodings/zigzag/src/lib.rs +++ b/encodings/zigzag/src/lib.rs @@ -3,6 +3,13 @@ pub use array::*; pub use compress::*; +use vortex_array::ArrayVTable; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_array::session::ArraySessionExt; +use vortex_session::VortexSession; mod array; mod compress; @@ -10,3 +17,13 @@ mod compute; mod kernel; mod rules; mod slice; + +/// Initialize ZigZag encoding in the given session. +pub fn initialize(session: &VortexSession) { + session.arrays().register(ZigZag); + session.aggregate_fns().register_aggregate_kernel( + ZigZag.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); +} diff --git a/encodings/zstd/public-api.lock b/encodings/zstd/public-api.lock index 43c534a4169..46cc0c1f21c 100644 --- a/encodings/zstd/public-api.lock +++ b/encodings/zstd/public-api.lock @@ -204,6 +204,8 @@ pub fn vortex_zstd::ZstdMetadata::clear(&mut self) pub fn vortex_zstd::ZstdMetadata::encoded_len(&self) -> usize +pub fn vortex_zstd::initialize(&vortex_session::VortexSession) + pub fn vortex_zstd::reconstruct_views(&vortex_buffer::ByteBuffer, usize) -> (alloc::vec::Vec, vortex_buffer::buffer::Buffer) pub type vortex_zstd::ZstdArray = vortex_array::array::typed::Array diff --git a/encodings/zstd/src/array.rs b/encodings/zstd/src/array.rs index b327a6a2a95..fe231524fe1 100644 --- a/encodings/zstd/src/array.rs +++ b/encodings/zstd/src/array.rs @@ -1043,6 +1043,10 @@ impl ZstdData { pub(crate) fn unsliced_n_rows(&self) -> usize { self.unsliced_n_rows } + + pub(crate) fn metadata(&self) -> &ZstdMetadata { + &self.metadata + } } impl ValidityVTable for Zstd { diff --git a/encodings/zstd/src/compute/mod.rs b/encodings/zstd/src/compute/mod.rs index de175eb3154..3502524da21 100644 --- a/encodings/zstd/src/compute/mod.rs +++ b/encodings/zstd/src/compute/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors mod cast; +pub(crate) mod uncompressed_size; #[cfg(test)] mod tests { diff --git a/encodings/zstd/src/compute/uncompressed_size.rs b/encodings/zstd/src/compute/uncompressed_size.rs new file mode 100644 index 00000000000..0dcd3adbe64 --- /dev/null +++ b/encodings/zstd/src/compute/uncompressed_size.rs @@ -0,0 +1,129 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::mem::size_of; + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::aggregate_fn::AggregateFnRef; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use vortex_array::aggregate_fn::kernels::DynAggregateKernel; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::varbinview::build_views::BinaryView; +use vortex_array::dtype::DType; +use vortex_array::scalar::Scalar; +use vortex_array::vtable::child_to_validity; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use crate::Zstd; + +#[derive(Debug)] +pub(crate) struct ZstdUncompressedSizeInBytesKernel; + +impl DynAggregateKernel for ZstdUncompressedSizeInBytesKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + if let Some(size) = + FixedWidthUncompressedSizeInBytesKernel.aggregate(aggregate_fn, batch, ctx)? + { + return Ok(Some(size)); + } + + let Some(array) = batch.as_opt::() else { + return Ok(None); + }; + + if !matches!(array.dtype(), DType::Binary(_) | DType::Utf8(_)) { + return Ok(None); + } + + let views_size = checked_len_mul(array.len(), size_of::(), "binary view")?; + let data_size = selected_frame_uncompressed_size(array, ctx)?; + let validity_size = validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?; + + let size = views_size + .checked_add(data_size) + .and_then(|size| size.checked_add(validity_size)) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + + Ok(Some(Scalar::from(size))) + } +} + +fn selected_frame_uncompressed_size( + array: ArrayView<'_, Zstd>, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let unsliced_validity = + child_to_validity(array.slots()[0].as_ref(), array.dtype().nullability()); + let slice_value_indices = unsliced_validity + .execute_mask(array.unsliced_n_rows(), ctx)? + .valid_counts_for_indices(&[array.slice_start(), array.slice_stop()]); + let slice_value_idx_start = slice_value_indices[0]; + let slice_value_idx_stop = slice_value_indices[1]; + + let mut value_idx_start = 0; + let mut size = 0u64; + for frame_meta in &array.metadata().frames { + if value_idx_start >= slice_value_idx_stop { + break; + } + + let frame_uncompressed_size = usize::try_from(frame_meta.uncompressed_size) + .vortex_expect("uncompressed size must fit in usize"); + let frame_n_values = if frame_meta.n_values == 0 { + frame_uncompressed_size + } else { + usize::try_from(frame_meta.n_values).vortex_expect("frame size must fit usize") + }; + + let value_idx_stop = value_idx_start + frame_n_values; + if value_idx_stop > slice_value_idx_start { + size = size + .checked_add(frame_meta.uncompressed_size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + } + value_idx_start = value_idx_stop; + } + + Ok(size) +} + +fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { + match validity { + Mask::AllTrue(_) => Ok(0), + Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()), + Mask::Values(values) => u64::try_from(values.len()) + .map(|len| len.div_ceil(8)) + .map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")), + } +} + +fn checked_len_mul(len: usize, width: usize, name: &str) -> VortexResult { + let len = u64::try_from(len) + .map_err(|e| vortex_err!("Failed to convert {name} length to u64: {e}"))?; + let width = u64::try_from(width) + .map_err(|e| vortex_err!("Failed to convert {name} byte width to u64: {e}"))?; + + len.checked_mul(width) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) +} diff --git a/encodings/zstd/src/lib.rs b/encodings/zstd/src/lib.rs index a927bbfdf26..a1591918c32 100644 --- a/encodings/zstd/src/lib.rs +++ b/encodings/zstd/src/lib.rs @@ -2,6 +2,12 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors pub use array::*; +use vortex_array::ArrayVTable; +use vortex_array::aggregate_fn::AggregateFnVTable; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use vortex_array::aggregate_fn::session::AggregateFnSessionExt; +use vortex_array::session::ArraySessionExt; +use vortex_session::VortexSession; #[cfg(feature = "unstable_encodings")] pub use zstd_buffers::*; @@ -15,6 +21,28 @@ mod zstd_buffers; #[cfg(test)] mod test; +/// Initialize Zstd encodings in the given session. +pub fn initialize(session: &VortexSession) { + session.arrays().register(Zstd); + session.aggregate_fns().register_aggregate_kernel( + Zstd.id(), + Some(UncompressedSizeInBytes.id()), + &compute::uncompressed_size::ZstdUncompressedSizeInBytesKernel, + ); + + #[cfg(feature = "unstable_encodings")] + { + use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; + + session.arrays().register(ZstdBuffers); + session.aggregate_fns().register_aggregate_kernel( + ZstdBuffers.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + } +} + #[derive(Clone, prost::Message)] pub struct ZstdFrameMetadata { #[prost(uint64, tag = "1")] diff --git a/encodings/zstd/src/test.rs b/encodings/zstd/src/test.rs index 7ed22886b82..4d1c511db3b 100644 --- a/encodings/zstd/src/test.rs +++ b/encodings/zstd/src/test.rs @@ -5,6 +5,8 @@ use vortex_array::IntoArray; use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; +use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::uncompressed_size_in_bytes; +use vortex_array::aggregate_fn::session::AggregateFnSession; use vortex_array::arrays::BoolArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::VarBinViewArray; @@ -12,12 +14,19 @@ use vortex_array::assert_arrays_eq; use vortex_array::assert_nth_scalar; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::Alignment; use vortex_buffer::Buffer; +use vortex_buffer::ByteBuffer; use vortex_mask::Mask; +use vortex_session::VortexSession; use crate::Zstd; +use crate::ZstdData; +use crate::ZstdFrameMetadata; +use crate::ZstdMetadata; #[test] fn test_zstd_compress_decompress() { @@ -45,6 +54,34 @@ fn test_zstd_compress_decompress() { assert_arrays_eq!(slice, PrimitiveArray::from_iter(Vec::::new())); } +#[test] +fn test_uncompressed_size_does_not_decompress_primitive_frames() { + let session = VortexSession::empty() + .with::() + .with::(); + crate::initialize(&session); + let metadata = ZstdMetadata { + dictionary_size: 0, + frames: vec![ZstdFrameMetadata { + uncompressed_size: 16, + n_values: 4, + }], + }; + let data = ZstdData::new(None, vec![ByteBuffer::from(vec![0xff])], metadata, 4); + let array = Zstd::try_new( + DType::Primitive(PType::I32, Nullability::NonNullable), + data, + Validity::NonNullable, + ) + .unwrap() + .into_array(); + let mut ctx = session.create_execution_ctx(); + + let size = uncompressed_size_in_bytes(&array, &mut ctx).unwrap(); + + assert_eq!(size, 16); +} + #[test] fn test_zstd_empty() { let mut ctx = LEGACY_SESSION.create_execution_ctx(); diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 51733789a4b..439c185ed8d 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -834,6 +834,16 @@ pub fn vortex_array::aggregate_fn::fns::sum::sum(&vortex_array::ArrayRef, &mut v pub mod vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes +pub struct vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel + +impl core::fmt::Debug for vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl vortex_array::aggregate_fn::kernels::DynAggregateKernel for vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel::aggregate(&self, &vortex_array::aggregate_fn::AggregateFnRef, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + pub struct vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes impl core::clone::Clone for vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes @@ -888,6 +898,10 @@ pub trait vortex_array::aggregate_fn::kernels::DynAggregateKernel: 'static + cor pub fn vortex_array::aggregate_fn::kernels::DynAggregateKernel::aggregate(&self, &vortex_array::aggregate_fn::AggregateFnRef, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> +impl vortex_array::aggregate_fn::kernels::DynAggregateKernel for vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel + +pub fn vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel::aggregate(&self, &vortex_array::aggregate_fn::AggregateFnRef, &vortex_array::ArrayRef, &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> + pub trait vortex_array::aggregate_fn::kernels::DynGroupedAggregateKernel: 'static + core::marker::Send + core::marker::Sync + core::fmt::Debug pub fn vortex_array::aggregate_fn::kernels::DynGroupedAggregateKernel::grouped_aggregate(&self, &vortex_array::aggregate_fn::AggregateFnRef, &vortex_array::arrays::ListViewArray) -> vortex_error::VortexResult> diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs new file mode 100644 index 00000000000..93a3dcf79d8 --- /dev/null +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use super::UncompressedSizeInBytes; +use super::checked_len_mul; +use super::packed_bit_buffer_size_in_bytes; +use super::validity_uncompressed_size_in_bytes; +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::aggregate_fn::AggregateFnRef; +use crate::aggregate_fn::kernels::DynAggregateKernel; +use crate::dtype::DType; +use crate::dtype::DecimalType; +use crate::scalar::Scalar; + +/// Computes [`UncompressedSizeInBytes`] for fixed-width logical dtypes without decoding values. +/// +/// This kernel is intended for physical encodings whose logical type is `Bool`, `Primitive`, +/// `Decimal`, or an extension over one of those types. Variable-width and nested dtypes return +/// `None` so another encoding-specific kernel or the canonical fallback can handle them. +#[derive(Debug)] +pub struct FixedWidthUncompressedSizeInBytesKernel; + +impl DynAggregateKernel for FixedWidthUncompressedSizeInBytesKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + Ok(fixed_width_uncompressed_size_in_bytes(batch, ctx)?.map(Scalar::from)) + } +} + +pub(crate) fn fixed_width_uncompressed_size_in_bytes( + array: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let Some(value_size) = fixed_width_value_size(array.dtype(), array.len())? else { + return Ok(None); + }; + + if value_size.include_validity { + let validity_size = + validity_uncompressed_size_in_bytes(array.validity()?.execute_mask(array.len(), ctx)?)?; + value_size + .size + .checked_add(validity_size) + .map(Some) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) + } else { + Ok(Some(value_size.size)) + } +} + +struct FixedWidthValueSize { + size: u64, + include_validity: bool, +} + +fn fixed_width_value_size(dtype: &DType, len: usize) -> VortexResult> { + let fixed = match dtype { + DType::Null => FixedWidthValueSize { + size: 0, + include_validity: false, + }, + DType::Bool(_) => FixedWidthValueSize { + size: packed_bit_buffer_size_in_bytes(len)?, + include_validity: true, + }, + DType::Primitive(ptype, _) => FixedWidthValueSize { + size: checked_len_mul(len, ptype.byte_width(), "primitive")?, + include_validity: true, + }, + DType::Decimal(decimal_type, _) => FixedWidthValueSize { + size: checked_len_mul( + len, + DecimalType::smallest_decimal_value_type(decimal_type).byte_width(), + "decimal", + )?, + include_validity: true, + }, + DType::Extension(ext_dtype) => { + return fixed_width_value_size(ext_dtype.storage_dtype(), len); + } + DType::Utf8(_) + | DType::Binary(_) + | DType::List(..) + | DType::FixedSizeList(..) + | DType::Struct(..) + | DType::Union(_) + | DType::Variant(_) => return Ok(None), + }; + + Ok(Some(fixed)) +} diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index ef6844e1a6a..6298b1f6296 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -5,6 +5,7 @@ mod bool; mod decimal; mod extension; mod fixed_size_list; +mod kernel; mod list_view; mod null; mod primitive; @@ -17,6 +18,7 @@ use bool::bool_uncompressed_size_in_bytes; use decimal::decimal_uncompressed_size_in_bytes; use extension::extension_uncompressed_size_in_bytes; use fixed_size_list::fixed_size_list_uncompressed_size_in_bytes; +pub use kernel::FixedWidthUncompressedSizeInBytesKernel; use list_view::list_view_uncompressed_size_in_bytes; use null::null_uncompressed_size_in_bytes; use primitive::primitive_uncompressed_size_in_bytes; @@ -62,7 +64,10 @@ pub fn uncompressed_size_in_bytes(array: &ArrayRef, ctx: &mut ExecutionCtx) -> V .map_err(|e| vortex_err!("Failed to convert uncompressed size to usize: {e}")) } -fn uncompressed_size_in_bytes_u64(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { +pub(crate) fn uncompressed_size_in_bytes_u64( + array: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult { if let Some(Precision::Exact(size_scalar)) = array.statistics().get(Stat::UncompressedSizeInBytes) { @@ -168,6 +173,22 @@ impl AggregateFnVTable for UncompressedSizeInBytes { Ok(()) } + fn try_accumulate( + &self, + partial: &mut Self::Partial, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + let Some(size) = kernel::fixed_width_uncompressed_size_in_bytes(batch, ctx)? else { + return Ok(false); + }; + + *partial = partial + .checked_add(size) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + Ok(true) + } + fn finalize(&self, partials: ArrayRef) -> VortexResult { Ok(partials) } @@ -256,7 +277,7 @@ fn constant_validity_size( validity_uncompressed_size_in_bytes(validity) } -fn checked_len_mul(len: usize, width: usize, name: &str) -> VortexResult { +pub(crate) fn checked_len_mul(len: usize, width: usize, name: &str) -> VortexResult { let len = u64::try_from(len) .map_err(|e| vortex_err!("Failed to convert {name} length to u64: {e}"))?; let width = u64::try_from(width) @@ -323,10 +344,12 @@ mod tests { use crate::arrays::DecimalArray; use crate::arrays::ExtensionArray; use crate::arrays::FixedSizeListArray; + use crate::arrays::ListArray; use crate::arrays::ListViewArray; use crate::arrays::NullArray; use crate::arrays::PrimitiveArray; use crate::arrays::StructArray; + use crate::arrays::VarBinArray; use crate::arrays::VarBinViewArray; use crate::arrays::VariantArray; use crate::builders::builder_with_capacity; @@ -472,6 +495,27 @@ mod tests { Ok(()) } + #[test] + fn varbin_matches_materialized_size() -> VortexResult<()> { + let array = VarBinArray::from_iter( + [ + Some("prefix"), + Some("short"), + None, + Some("this string is longer than twelve bytes"), + ], + DType::Utf8(Nullability::Nullable), + ) + .into_array() + .slice(1..4)?; + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + #[test] fn list_matches_materialized_size() -> VortexResult<()> { let elements = @@ -488,6 +532,22 @@ mod tests { Ok(()) } + #[test] + fn list_array_matches_materialized_size() -> VortexResult<()> { + let elements = + PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::NonNullable).into_array(); + let offsets = buffer![0u32, 2, 4, 5].into_array(); + let array = ListArray::try_new(elements, offsets, Validity::NonNullable)? + .into_array() + .slice(1..3)?; + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + #[test] fn fixed_size_list_matches_materialized_size() -> VortexResult<()> { let elements = diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index 92eb9c0ed38..72f29fa2deb 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -23,6 +23,7 @@ use crate::aggregate_fn::fns::min_max::MinMax; use crate::aggregate_fn::fns::nan_count::NanCount; use crate::aggregate_fn::fns::null_count::NullCount; use crate::aggregate_fn::fns::sum::Sum; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use crate::aggregate_fn::kernels::DynAggregateKernel; use crate::aggregate_fn::kernels::DynGroupedAggregateKernel; @@ -30,10 +31,19 @@ use crate::array::ArrayId; use crate::array::VTable; use crate::arrays::Chunked; use crate::arrays::Dict; +use crate::arrays::Filter; +use crate::arrays::List; +use crate::arrays::Masked; +use crate::arrays::Patched; +use crate::arrays::Shared; +use crate::arrays::Slice; +use crate::arrays::VarBin; use crate::arrays::chunked::compute::aggregate::ChunkedArrayAggregate; use crate::arrays::dict::compute::is_constant::DictIsConstantKernel; use crate::arrays::dict::compute::is_sorted::DictIsSortedKernel; use crate::arrays::dict::compute::min_max::DictMinMaxKernel; +use crate::arrays::list::compute::uncompressed_size::ListUncompressedSizeInBytesKernel; +use crate::arrays::varbin::compute::uncompressed_size::VarBinUncompressedSizeInBytesKernel; /// Registry of aggregate function vtables. pub type AggregateFnRegistry = Registry; @@ -84,6 +94,46 @@ impl Default for AggregateFnSession { this.register_aggregate_kernel(Dict.id(), Some(MinMax.id()), &DictMinMaxKernel); this.register_aggregate_kernel(Dict.id(), Some(IsConstant.id()), &DictIsConstantKernel); this.register_aggregate_kernel(Dict.id(), Some(IsSorted.id()), &DictIsSortedKernel); + this.register_aggregate_kernel( + Dict.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + this.register_aggregate_kernel( + Filter.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + this.register_aggregate_kernel( + List.id(), + Some(UncompressedSizeInBytes.id()), + &ListUncompressedSizeInBytesKernel, + ); + this.register_aggregate_kernel( + Masked.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + this.register_aggregate_kernel( + Patched.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + this.register_aggregate_kernel( + Shared.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + this.register_aggregate_kernel( + Slice.id(), + Some(UncompressedSizeInBytes.id()), + &FixedWidthUncompressedSizeInBytesKernel, + ); + this.register_aggregate_kernel( + VarBin.id(), + Some(UncompressedSizeInBytes.id()), + &VarBinUncompressedSizeInBytesKernel, + ); this } diff --git a/vortex-array/src/arrays/list/compute/mod.rs b/vortex-array/src/arrays/list/compute/mod.rs index 65bacbebdf5..107454baa90 100644 --- a/vortex-array/src/arrays/list/compute/mod.rs +++ b/vortex-array/src/arrays/list/compute/mod.rs @@ -8,6 +8,7 @@ mod mask; pub(crate) mod rules; mod slice; mod take; +pub(crate) mod uncompressed_size; pub(crate) use kernels::PARENT_KERNELS; diff --git a/vortex-array/src/arrays/list/compute/uncompressed_size.rs b/vortex-array/src/arrays/list/compute/uncompressed_size.rs new file mode 100644 index 00000000000..16eacd53171 --- /dev/null +++ b/vortex-array/src/arrays/list/compute/uncompressed_size.rs @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::mem::size_of; + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::aggregate_fn::AggregateFnRef; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::checked_len_mul; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::uncompressed_size_in_bytes_u64; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::validity_uncompressed_size_in_bytes; +use crate::aggregate_fn::kernels::DynAggregateKernel; +use crate::arrays::List; +use crate::arrays::list::ListArrayExt; +use crate::scalar::Scalar; + +#[derive(Debug)] +pub(crate) struct ListUncompressedSizeInBytesKernel; + +impl DynAggregateKernel for ListUncompressedSizeInBytesKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(array) = batch.as_opt::() else { + return Ok(None); + }; + + let start = offset_at(array.offsets(), 0, ctx)?; + let stop = offset_at(array.offsets(), array.len(), ctx)?; + let elements = array.elements().slice(start..stop)?; + let elements_size = uncompressed_size_in_bytes_u64(&elements, ctx)?; + + let offsets_size = checked_len_mul(array.len(), size_of::(), "list offsets")?; + let sizes_size = checked_len_mul(array.len(), size_of::(), "list sizes")?; + let validity_size = validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?; + + let size = elements_size + .checked_add(offsets_size) + .and_then(|size| size.checked_add(sizes_size)) + .and_then(|size| size.checked_add(validity_size)) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + + Ok(Some(Scalar::from(size))) + } +} + +fn offset_at(offsets: &ArrayRef, index: usize, ctx: &mut ExecutionCtx) -> VortexResult { + offsets + .execute_scalar(index, ctx)? + .as_primitive() + .as_::() + .ok_or_else(|| vortex_err!("offset value does not fit in usize")) +} diff --git a/vortex-array/src/arrays/varbin/compute/mod.rs b/vortex-array/src/arrays/varbin/compute/mod.rs index ab2de942138..11e465dbcd8 100644 --- a/vortex-array/src/arrays/varbin/compute/mod.rs +++ b/vortex-array/src/arrays/varbin/compute/mod.rs @@ -9,6 +9,7 @@ mod compare; mod filter; mod mask; mod take; +pub(crate) mod uncompressed_size; #[cfg(test)] mod tests { diff --git a/vortex-array/src/arrays/varbin/compute/uncompressed_size.rs b/vortex-array/src/arrays/varbin/compute/uncompressed_size.rs new file mode 100644 index 00000000000..589253e0b16 --- /dev/null +++ b/vortex-array/src/arrays/varbin/compute/uncompressed_size.rs @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::mem::size_of; + +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::aggregate_fn::AggregateFnRef; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::checked_len_mul; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::validity_uncompressed_size_in_bytes; +use crate::aggregate_fn::kernels::DynAggregateKernel; +use crate::arrays::VarBin; +use crate::arrays::varbin::VarBinArrayExt; +use crate::arrays::varbinview::BinaryView; +use crate::scalar::Scalar; + +#[derive(Debug)] +pub(crate) struct VarBinUncompressedSizeInBytesKernel; + +impl DynAggregateKernel for VarBinUncompressedSizeInBytesKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() { + return Ok(None); + } + + let Some(array) = batch.as_opt::() else { + return Ok(None); + }; + + let first_offset = offset_at(array.offsets(), 0, ctx)?; + let last_offset = offset_at(array.offsets(), array.len(), ctx)?; + let data_size = u64::try_from( + last_offset + .checked_sub(first_offset) + .ok_or_else(|| vortex_err!("VarBin offsets must be monotonically increasing"))?, + ) + .map_err(|e| vortex_err!("Failed to convert VarBin data size to u64: {e}"))?; + + let views_size = checked_len_mul(array.len(), size_of::(), "binary view")?; + let validity_size = validity_uncompressed_size_in_bytes( + array + .as_ref() + .validity()? + .execute_mask(array.as_ref().len(), ctx)?, + )?; + + let size = views_size + .checked_add(data_size) + .and_then(|size| size.checked_add(validity_size)) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; + + Ok(Some(Scalar::from(size))) + } +} + +fn offset_at(offsets: &ArrayRef, index: usize, ctx: &mut ExecutionCtx) -> VortexResult { + offsets + .execute_scalar(index, ctx)? + .as_primitive() + .as_::() + .ok_or_else(|| vortex_err!("offset value does not fit in usize")) +} diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index ce6598173a6..860d854a180 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -113,12 +113,7 @@ use vortex_array::arrays::Dict; use vortex_array::arrays::Patched; use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::session::ArraySessionExt; -use vortex_bytebool::ByteBool; -use vortex_fsst::FSST; -use vortex_pco::Pco; use vortex_session::VortexSession; -use vortex_sparse::Sparse; -use vortex_zigzag::ZigZag; pub use writer::*; /// The current version of the Vortex file format @@ -160,29 +155,26 @@ mod forever_constant { pub fn register_default_encodings(session: &VortexSession) { { let arrays = session.arrays(); - arrays.register(ByteBool); arrays.register(Dict); - arrays.register(FSST); - arrays.register(Pco); - arrays.register(Sparse); - arrays.register(ZigZag); - #[cfg(feature = "zstd")] - arrays.register(vortex_zstd::Zstd); - #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] - arrays.register(vortex_zstd::ZstdBuffers); if use_experimental_patches() { arrays.register(Patched); } } - // Eventually all encodings crates should expose an initialize function. For now it's only - // a few of them. + // Register encoding-specific array plugins and aggregate kernels. vortex_alp::initialize(session); + vortex_bytebool::initialize(session); vortex_datetime_parts::initialize(session); vortex_decimal_byte_parts::initialize(session); vortex_fastlanes::initialize(session); + vortex_fsst::initialize(session); + vortex_pco::initialize(session); vortex_runend::initialize(session); vortex_sequence::initialize(session); + vortex_sparse::initialize(session); + vortex_zigzag::initialize(session); + #[cfg(feature = "zstd")] + vortex_zstd::initialize(session); #[cfg(feature = "unstable_encodings")] vortex_tensor::initialize(session); From 45c15da4f31283a20d929622af457aafd2e5b7a6 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 19 May 2026 13:28:33 +0100 Subject: [PATCH 2/5] some CR comments Signed-off-by: Adam Gutglick --- .../fsst/src/compute/uncompressed_size.rs | 30 +++---------- .../zstd/src/compute/uncompressed_size.rs | 31 +++---------- vortex-array/public-api.lock | 2 + .../fns/uncompressed_size_in_bytes/bool.rs | 2 +- .../fns/uncompressed_size_in_bytes/decimal.rs | 2 +- .../fixed_size_list.rs | 2 +- .../fns/uncompressed_size_in_bytes/kernel.rs | 19 ++++---- .../uncompressed_size_in_bytes/list_view.rs | 2 +- .../fns/uncompressed_size_in_bytes/mod.rs | 43 ++++++------------- .../uncompressed_size_in_bytes/primitive.rs | 2 +- .../fns/uncompressed_size_in_bytes/struct_.rs | 2 +- .../uncompressed_size_in_bytes/varbinview.rs | 2 +- .../arrays/list/compute/uncompressed_size.rs | 14 +++--- .../varbin/compute/uncompressed_size.rs | 8 ++-- vortex-array/src/validity.rs | 32 ++++++++++++++ 15 files changed, 91 insertions(+), 102 deletions(-) diff --git a/encodings/fsst/src/compute/uncompressed_size.rs b/encodings/fsst/src/compute/uncompressed_size.rs index f756b51f435..1ea2f2cdc48 100644 --- a/encodings/fsst/src/compute/uncompressed_size.rs +++ b/encodings/fsst/src/compute/uncompressed_size.rs @@ -5,20 +5,18 @@ use std::mem::size_of; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; -use vortex_array::IntoArray; use vortex_array::aggregate_fn::AggregateFnRef; use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use vortex_array::aggregate_fn::kernels::DynAggregateKernel; -use vortex_array::arrays::ConstantArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::arrays::varbinview::build_views::BinaryView; use vortex_array::dtype::IntegerPType; use vortex_array::match_each_integer_ptype; use vortex_array::scalar::Scalar; +use vortex_array::validity::validity_uncompressed_size_in_bytes; use vortex_error::VortexResult; use vortex_error::vortex_err; -use vortex_mask::Mask; use crate::FSST; use crate::FSSTArrayExt; @@ -41,7 +39,10 @@ impl DynAggregateKernel for FSSTUncompressedSizeInBytesKernel { return Ok(None); }; - let views_size = checked_len_mul(array.len(), size_of::(), "binary view")?; + let views_size = u64::try_from(array.len()) + .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? + .checked_mul(size_of::() as u64) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; let data_size = uncompressed_lengths_size( &array .uncompressed_lengths() @@ -71,6 +72,7 @@ fn uncompressed_lengths_size(lengths: &PrimitiveArray) -> VortexResult { } fn uncompressed_lengths_size_typed(lengths: &[P]) -> VortexResult { + // The lengths child stores decoded byte counts for each logical value. let mut size = 0u64; for len in lengths { let len = len @@ -82,23 +84,3 @@ fn uncompressed_lengths_size_typed(lengths: &[P]) -> VortexResu } Ok(size) } - -fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { - match validity { - Mask::AllTrue(_) => Ok(0), - Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()), - Mask::Values(values) => u64::try_from(values.len()) - .map(|len| len.div_ceil(8)) - .map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")), - } -} - -fn checked_len_mul(len: usize, width: usize, name: &str) -> VortexResult { - let len = u64::try_from(len) - .map_err(|e| vortex_err!("Failed to convert {name} length to u64: {e}"))?; - let width = u64::try_from(width) - .map_err(|e| vortex_err!("Failed to convert {name} byte width to u64: {e}"))?; - - len.checked_mul(width) - .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) -} diff --git a/encodings/zstd/src/compute/uncompressed_size.rs b/encodings/zstd/src/compute/uncompressed_size.rs index 0dcd3adbe64..be625cd748a 100644 --- a/encodings/zstd/src/compute/uncompressed_size.rs +++ b/encodings/zstd/src/compute/uncompressed_size.rs @@ -6,20 +6,18 @@ use std::mem::size_of; use vortex_array::ArrayRef; use vortex_array::ArrayView; use vortex_array::ExecutionCtx; -use vortex_array::IntoArray; use vortex_array::aggregate_fn::AggregateFnRef; use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; use vortex_array::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use vortex_array::aggregate_fn::kernels::DynAggregateKernel; -use vortex_array::arrays::ConstantArray; use vortex_array::arrays::varbinview::build_views::BinaryView; use vortex_array::dtype::DType; use vortex_array::scalar::Scalar; +use vortex_array::validity::validity_uncompressed_size_in_bytes; use vortex_array::vtable::child_to_validity; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; -use vortex_mask::Mask; use crate::Zstd; @@ -51,7 +49,10 @@ impl DynAggregateKernel for ZstdUncompressedSizeInBytesKernel { return Ok(None); } - let views_size = checked_len_mul(array.len(), size_of::(), "binary view")?; + let views_size = u64::try_from(array.len()) + .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? + .checked_mul(size_of::() as u64) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; let data_size = selected_frame_uncompressed_size(array, ctx)?; let validity_size = validity_uncompressed_size_in_bytes( array @@ -91,6 +92,8 @@ fn selected_frame_uncompressed_size( let frame_uncompressed_size = usize::try_from(frame_meta.uncompressed_size) .vortex_expect("uncompressed size must fit in usize"); let frame_n_values = if frame_meta.n_values == 0 { + // Older metadata omitted n_values; fixed-width arrays return above, so byte count is + // the best available slice unit here. frame_uncompressed_size } else { usize::try_from(frame_meta.n_values).vortex_expect("frame size must fit usize") @@ -107,23 +110,3 @@ fn selected_frame_uncompressed_size( Ok(size) } - -fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { - match validity { - Mask::AllTrue(_) => Ok(0), - Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()), - Mask::Values(values) => u64::try_from(values.len()) - .map(|len| len.div_ceil(8)) - .map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")), - } -} - -fn checked_len_mul(len: usize, width: usize, name: &str) -> VortexResult { - let len = u64::try_from(len) - .map_err(|e| vortex_err!("Failed to convert {name} length to u64: {e}"))?; - let width = u64::try_from(width) - .map_err(|e| vortex_err!("Failed to convert {name} byte width to u64: {e}"))?; - - len.checked_mul(width) - .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) -} diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 439c185ed8d..3f7b078b026 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -20608,6 +20608,8 @@ impl vortex_array::ArrayHash for vortex_array::validity::Validity pub fn vortex_array::validity::Validity::array_hash(&self, &mut H, vortex_array::Precision) +pub fn vortex_array::validity::validity_uncompressed_size_in_bytes(vortex_mask::Mask) -> vortex_error::VortexResult + pub mod vortex_array::variants pub struct vortex_array::variants::BinaryTyped<'a>(_) diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs index 4c5c0a4df8d..afd84e4a8cc 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/bool.rs @@ -5,9 +5,9 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use super::packed_bit_buffer_size_in_bytes; -use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::BoolArray; +use crate::validity::validity_uncompressed_size_in_bytes; pub(super) fn bool_uncompressed_size_in_bytes( array: &BoolArray, diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs index c04fa55f0ab..acdcbfd6373 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/decimal.rs @@ -4,11 +4,11 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; -use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::DecimalArray; use crate::arrays::decimal::DecimalArrayExt; use crate::dtype::DecimalType; +use crate::validity::validity_uncompressed_size_in_bytes; pub(super) fn decimal_uncompressed_size_in_bytes( array: &DecimalArray, diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs index 7f8d55b07c1..6e4932c1623 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/fixed_size_list.rs @@ -5,10 +5,10 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use super::uncompressed_size_in_bytes_u64; -use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::FixedSizeListArray; use crate::arrays::fixed_size_list::FixedSizeListArrayExt; +use crate::validity::validity_uncompressed_size_in_bytes; pub(super) fn fixed_size_list_uncompressed_size_in_bytes( array: &FixedSizeListArray, diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs index 93a3dcf79d8..d50f8f01cb4 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs @@ -5,9 +5,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use super::UncompressedSizeInBytes; -use super::checked_len_mul; use super::packed_bit_buffer_size_in_bytes; -use super::validity_uncompressed_size_in_bytes; use crate::ArrayRef; use crate::ExecutionCtx; use crate::aggregate_fn::AggregateFnRef; @@ -15,6 +13,7 @@ use crate::aggregate_fn::kernels::DynAggregateKernel; use crate::dtype::DType; use crate::dtype::DecimalType; use crate::scalar::Scalar; +use crate::validity::validity_uncompressed_size_in_bytes; /// Computes [`UncompressedSizeInBytes`] for fixed-width logical dtypes without decoding values. /// @@ -76,15 +75,19 @@ fn fixed_width_value_size(dtype: &DType, len: usize) -> VortexResult FixedWidthValueSize { - size: checked_len_mul(len, ptype.byte_width(), "primitive")?, + size: u64::try_from(len) + .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? + .checked_mul(ptype.byte_width() as u64) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?, include_validity: true, }, DType::Decimal(decimal_type, _) => FixedWidthValueSize { - size: checked_len_mul( - len, - DecimalType::smallest_decimal_value_type(decimal_type).byte_width(), - "decimal", - )?, + size: u64::try_from(len) + .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? + .checked_mul( + DecimalType::smallest_decimal_value_type(decimal_type).byte_width() as u64, + ) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?, include_validity: true, }, DType::Extension(ext_dtype) => { diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs index 919430195d9..31a36d790b4 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/list_view.rs @@ -5,11 +5,11 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use super::uncompressed_size_in_bytes_u64; -use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::ListViewArray; use crate::arrays::listview::ListViewArrayExt; use crate::arrays::listview::ListViewRebuildMode; +use crate::validity::validity_uncompressed_size_in_bytes; pub(super) fn list_view_uncompressed_size_in_bytes( array: &ListViewArray, diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index 6298b1f6296..ad251b2e203 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -28,13 +28,11 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; -use vortex_mask::Mask; use crate::ArrayRef; use crate::Canonical; use crate::Columnar; use crate::ExecutionCtx; -use crate::IntoArray; use crate::aggregate_fn::Accumulator; use crate::aggregate_fn::AggregateFnId; use crate::aggregate_fn::AggregateFnVTable; @@ -42,7 +40,6 @@ use crate::aggregate_fn::DynAccumulator; use crate::aggregate_fn::EmptyOptions; use crate::array::ArrayView; use crate::arrays::Constant; -use crate::arrays::ConstantArray; use crate::arrays::varbinview::BinaryView; use crate::dtype::DType; use crate::dtype::DecimalType; @@ -53,6 +50,7 @@ use crate::expr::stats::Stat; use crate::expr::stats::StatsProvider; use crate::scalar::Scalar; use crate::scalar::ScalarValue; +use crate::validity::validity_uncompressed_size_in_bytes; /// Return the uncompressed size of an array in bytes. /// @@ -225,14 +223,14 @@ pub(crate) fn constant_uncompressed_size_in_bytes( let value_size = match array.dtype() { DType::Null => return Ok(0), DType::Bool(_) => packed_bit_buffer_size_in_bytes(array.len())?, - DType::Primitive(ptype, _) => { - checked_len_mul(array.len(), ptype.byte_width(), "primitive")? - } - DType::Decimal(decimal_type, _) => checked_len_mul( - array.len(), - DecimalType::smallest_decimal_value_type(decimal_type).byte_width(), - "decimal", - )?, + DType::Primitive(ptype, _) => u64::try_from(array.len()) + .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? + .checked_mul(ptype.byte_width() as u64) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?, + DType::Decimal(decimal_type, _) => u64::try_from(array.len()) + .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? + .checked_mul(DecimalType::smallest_decimal_value_type(decimal_type).byte_width() as u64) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?, DType::Utf8(_) => constant_varbinview_value_size( array.len(), array.scalar().as_utf8().value().map(|value| value.len()), @@ -257,7 +255,10 @@ pub(crate) fn constant_uncompressed_size_in_bytes( } fn constant_varbinview_value_size(len: usize, scalar_len: Option) -> VortexResult { - let views_size = checked_len_mul(len, size_of::(), "binary view")?; + let views_size = u64::try_from(len) + .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? + .checked_mul(size_of::() as u64) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; let data_size = match scalar_len { Some(scalar_len) if scalar_len >= BinaryView::MAX_INLINED_SIZE => u64::try_from(scalar_len) .map_err(|e| vortex_err!("Failed to convert data buffer length to u64: {e}"))?, @@ -277,16 +278,6 @@ fn constant_validity_size( validity_uncompressed_size_in_bytes(validity) } -pub(crate) fn checked_len_mul(len: usize, width: usize, name: &str) -> VortexResult { - let len = u64::try_from(len) - .map_err(|e| vortex_err!("Failed to convert {name} length to u64: {e}"))?; - let width = u64::try_from(width) - .map_err(|e| vortex_err!("Failed to convert {name} byte width to u64: {e}"))?; - - len.checked_mul(width) - .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) -} - fn supports_uncompressed_size_in_bytes(dtype: &DType) -> bool { match dtype { DType::Null @@ -309,14 +300,6 @@ fn supports_uncompressed_size_in_bytes(dtype: &DType) -> bool { } } -pub(crate) fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { - match validity { - Mask::AllTrue(_) => Ok(0), - Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()), - Mask::Values(values) => packed_bit_buffer_size_in_bytes(values.len()), - } -} - pub(crate) fn packed_bit_buffer_size_in_bytes(len: usize) -> VortexResult { u64::try_from(len.div_ceil(8)) .map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")) diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs index 8ca149e791a..407a8b8608b 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/primitive.rs @@ -4,10 +4,10 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; -use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::PrimitiveArray; use crate::arrays::primitive::PrimitiveArrayExt; +use crate::validity::validity_uncompressed_size_in_bytes; pub(super) fn primitive_uncompressed_size_in_bytes( array: &PrimitiveArray, diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs index ffed40babd2..a6bc3c57a78 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/struct_.rs @@ -5,10 +5,10 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use super::uncompressed_size_in_bytes_u64; -use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::StructArray; use crate::arrays::struct_::StructArrayExt; +use crate::validity::validity_uncompressed_size_in_bytes; pub(super) fn struct_uncompressed_size_in_bytes( array: &StructArray, diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs index 820ec8ca88d..c9bfd793169 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/varbinview.rs @@ -6,10 +6,10 @@ use std::mem::size_of; use vortex_error::VortexResult; use vortex_error::vortex_err; -use super::validity_uncompressed_size_in_bytes; use crate::ExecutionCtx; use crate::arrays::VarBinViewArray; use crate::arrays::varbinview::BinaryView; +use crate::validity::validity_uncompressed_size_in_bytes; pub(super) fn varbinview_uncompressed_size_in_bytes( array: &VarBinViewArray, diff --git a/vortex-array/src/arrays/list/compute/uncompressed_size.rs b/vortex-array/src/arrays/list/compute/uncompressed_size.rs index 16eacd53171..5cd9407aea3 100644 --- a/vortex-array/src/arrays/list/compute/uncompressed_size.rs +++ b/vortex-array/src/arrays/list/compute/uncompressed_size.rs @@ -10,13 +10,12 @@ use crate::ArrayRef; use crate::ExecutionCtx; use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; -use crate::aggregate_fn::fns::uncompressed_size_in_bytes::checked_len_mul; use crate::aggregate_fn::fns::uncompressed_size_in_bytes::uncompressed_size_in_bytes_u64; -use crate::aggregate_fn::fns::uncompressed_size_in_bytes::validity_uncompressed_size_in_bytes; use crate::aggregate_fn::kernels::DynAggregateKernel; use crate::arrays::List; use crate::arrays::list::ListArrayExt; use crate::scalar::Scalar; +use crate::validity::validity_uncompressed_size_in_bytes; #[derive(Debug)] pub(crate) struct ListUncompressedSizeInBytesKernel; @@ -41,8 +40,11 @@ impl DynAggregateKernel for ListUncompressedSizeInBytesKernel { let elements = array.elements().slice(start..stop)?; let elements_size = uncompressed_size_in_bytes_u64(&elements, ctx)?; - let offsets_size = checked_len_mul(array.len(), size_of::(), "list offsets")?; - let sizes_size = checked_len_mul(array.len(), size_of::(), "list sizes")?; + // Canonical List materializes as ListView, with u64 offsets and u64 sizes. + let view_buffer_size = u64::try_from(array.len()) + .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? + .checked_mul(size_of::() as u64) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; let validity_size = validity_uncompressed_size_in_bytes( array .as_ref() @@ -51,8 +53,8 @@ impl DynAggregateKernel for ListUncompressedSizeInBytesKernel { )?; let size = elements_size - .checked_add(offsets_size) - .and_then(|size| size.checked_add(sizes_size)) + .checked_add(view_buffer_size) + .and_then(|size| size.checked_add(view_buffer_size)) .and_then(|size| size.checked_add(validity_size)) .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; diff --git a/vortex-array/src/arrays/varbin/compute/uncompressed_size.rs b/vortex-array/src/arrays/varbin/compute/uncompressed_size.rs index 589253e0b16..b81420c4b1f 100644 --- a/vortex-array/src/arrays/varbin/compute/uncompressed_size.rs +++ b/vortex-array/src/arrays/varbin/compute/uncompressed_size.rs @@ -10,13 +10,12 @@ use crate::ArrayRef; use crate::ExecutionCtx; use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; -use crate::aggregate_fn::fns::uncompressed_size_in_bytes::checked_len_mul; -use crate::aggregate_fn::fns::uncompressed_size_in_bytes::validity_uncompressed_size_in_bytes; use crate::aggregate_fn::kernels::DynAggregateKernel; use crate::arrays::VarBin; use crate::arrays::varbin::VarBinArrayExt; use crate::arrays::varbinview::BinaryView; use crate::scalar::Scalar; +use crate::validity::validity_uncompressed_size_in_bytes; #[derive(Debug)] pub(crate) struct VarBinUncompressedSizeInBytesKernel; @@ -45,7 +44,10 @@ impl DynAggregateKernel for VarBinUncompressedSizeInBytesKernel { ) .map_err(|e| vortex_err!("Failed to convert VarBin data size to u64: {e}"))?; - let views_size = checked_len_mul(array.len(), size_of::(), "binary view")?; + let views_size = u64::try_from(array.len()) + .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? + .checked_mul(size_of::() as u64) + .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?; let validity_size = validity_uncompressed_size_in_bytes( array .as_ref() diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 204205d1f51..7644a53ba86 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -587,6 +587,25 @@ impl IntoArray for &MaskValues { } } +/// Returns the number of bytes a validity mask contributes to an uncompressed array. +/// +/// This is the validity portion of the +/// [`UncompressedSizeInBytes`](crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes) +/// aggregate for canonical arrays and encoding kernels that report that aggregate directly. +/// +/// A mask with all valid values contributes no bytes, because Vortex does not materialize a +/// validity buffer for all-valid arrays. An all-invalid mask contributes the size of the canonical +/// constant false validity array. A mixed mask contributes the packed bit-buffer size, rounded up +/// to whole bytes. +pub fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { + match validity { + Mask::AllTrue(_) => Ok(0), + Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()), + Mask::Values(values) => u64::try_from(values.len().div_ceil(8)) + .map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")), + } +} + #[cfg(test)] mod tests { use rstest::rstest; @@ -602,6 +621,19 @@ mod tests { use crate::dtype::Nullability; use crate::validity::BoolArray; use crate::validity::Validity; + use crate::validity::validity_uncompressed_size_in_bytes; + + #[rstest] + #[case(Mask::AllTrue(9), 0)] + #[case(Mask::AllFalse(9), Mask::AllFalse(9).into_array().nbytes())] + #[case(Mask::from_iter([true, false, true, true, false, true, false, true, true]), 2)] + fn validity_uncompressed_size_matches_canonical_mask_size( + #[case] mask: Mask, + #[case] expected: u64, + ) -> vortex_error::VortexResult<()> { + assert_eq!(validity_uncompressed_size_in_bytes(mask)?, expected); + Ok(()) + } #[rstest] #[case(Validity::AllValid, 5, &[2, 4], Validity::AllValid, Validity::AllValid)] From 253041bfd3b7cba09e33363e29d0e5e309cce5f1 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 20 May 2026 12:37:30 +0100 Subject: [PATCH 3/5] dict stuff and remove some other stuff Signed-off-by: Adam Gutglick --- .../fns/uncompressed_size_in_bytes/mod.rs | 82 +++++++++++++++++++ vortex-array/src/aggregate_fn/session.rs | 34 +------- vortex-array/src/arrays/dict/compute/mod.rs | 1 + .../arrays/dict/compute/uncompressed_size.rs | 44 ++++++++++ 4 files changed, 129 insertions(+), 32 deletions(-) create mode 100644 vortex-array/src/arrays/dict/compute/uncompressed_size.rs diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index ad251b2e203..0b3bf8c3cde 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -310,6 +310,7 @@ mod tests { use vortex_buffer::buffer; use vortex_error::VortexResult; use vortex_error::vortex_err; + use vortex_mask::Mask; use crate::ArrayRef; use crate::IntoArray; @@ -325,12 +326,17 @@ mod tests { use crate::arrays::ChunkedArray; use crate::arrays::ConstantArray; use crate::arrays::DecimalArray; + use crate::arrays::DictArray; use crate::arrays::ExtensionArray; + use crate::arrays::FilterArray; use crate::arrays::FixedSizeListArray; use crate::arrays::ListArray; use crate::arrays::ListViewArray; + use crate::arrays::MaskedArray; use crate::arrays::NullArray; use crate::arrays::PrimitiveArray; + use crate::arrays::SharedArray; + use crate::arrays::SliceArray; use crate::arrays::StructArray; use crate::arrays::VarBinArray; use crate::arrays::VarBinViewArray; @@ -499,6 +505,82 @@ mod tests { Ok(()) } + #[test] + fn variable_width_wrappers_match_materialized_size() -> VortexResult<()> { + let nullable_strings = || { + VarBinViewArray::from_iter_nullable_str([ + Some("short"), + Some("a string that is longer than inline"), + None, + Some("omega"), + ]) + .into_array() + }; + let non_nullable_strings = || { + VarBinViewArray::from_iter_str([ + "short", + "a string that is longer than inline", + "middle", + "omega", + ]) + .into_array() + }; + + let cases = vec![ + SliceArray::try_new(nullable_strings(), 1..4)?.into_array(), + FilterArray::try_new( + nullable_strings(), + Mask::from_iter([true, false, true, true]), + )? + .into_array(), + SharedArray::new(nullable_strings()).into_array(), + MaskedArray::try_new( + non_nullable_strings(), + Validity::from_iter([true, false, true, true]), + )? + .into_array(), + ]; + + for array in cases { + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + } + Ok(()) + } + + #[test] + fn dict_string_values_match_materialized_size() -> VortexResult<()> { + let codes = buffer![0u8, 1, 1, 2, 0].into_array(); + let values = VarBinViewArray::from_iter_str([ + "short", + "a string that is longer than inline", + "omega", + ]) + .into_array(); + let array = DictArray::try_new(codes, values)?.into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + + #[test] + fn dict_u64_values_match_materialized_size() -> VortexResult<()> { + let codes = buffer![0u8, 2, 1, 2, 0].into_array(); + let values = buffer![10u64, 20, 30].into_array(); + let array = DictArray::try_new(codes, values)?.into_array(); + + assert_eq!( + aggregate(&array)?, + materialized_uncompressed_size_in_bytes(&array) + ); + Ok(()) + } + #[test] fn list_matches_materialized_size() -> VortexResult<()> { let elements = diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index 72f29fa2deb..4c2a613dff9 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -23,7 +23,6 @@ use crate::aggregate_fn::fns::min_max::MinMax; use crate::aggregate_fn::fns::nan_count::NanCount; use crate::aggregate_fn::fns::null_count::NullCount; use crate::aggregate_fn::fns::sum::Sum; -use crate::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; use crate::aggregate_fn::kernels::DynAggregateKernel; use crate::aggregate_fn::kernels::DynGroupedAggregateKernel; @@ -31,17 +30,13 @@ use crate::array::ArrayId; use crate::array::VTable; use crate::arrays::Chunked; use crate::arrays::Dict; -use crate::arrays::Filter; use crate::arrays::List; -use crate::arrays::Masked; -use crate::arrays::Patched; -use crate::arrays::Shared; -use crate::arrays::Slice; use crate::arrays::VarBin; use crate::arrays::chunked::compute::aggregate::ChunkedArrayAggregate; use crate::arrays::dict::compute::is_constant::DictIsConstantKernel; use crate::arrays::dict::compute::is_sorted::DictIsSortedKernel; use crate::arrays::dict::compute::min_max::DictMinMaxKernel; +use crate::arrays::dict::compute::uncompressed_size::DictUncompressedSizeInBytesKernel; use crate::arrays::list::compute::uncompressed_size::ListUncompressedSizeInBytesKernel; use crate::arrays::varbin::compute::uncompressed_size::VarBinUncompressedSizeInBytesKernel; @@ -97,38 +92,13 @@ impl Default for AggregateFnSession { this.register_aggregate_kernel( Dict.id(), Some(UncompressedSizeInBytes.id()), - &FixedWidthUncompressedSizeInBytesKernel, - ); - this.register_aggregate_kernel( - Filter.id(), - Some(UncompressedSizeInBytes.id()), - &FixedWidthUncompressedSizeInBytesKernel, + &DictUncompressedSizeInBytesKernel, ); this.register_aggregate_kernel( List.id(), Some(UncompressedSizeInBytes.id()), &ListUncompressedSizeInBytesKernel, ); - this.register_aggregate_kernel( - Masked.id(), - Some(UncompressedSizeInBytes.id()), - &FixedWidthUncompressedSizeInBytesKernel, - ); - this.register_aggregate_kernel( - Patched.id(), - Some(UncompressedSizeInBytes.id()), - &FixedWidthUncompressedSizeInBytesKernel, - ); - this.register_aggregate_kernel( - Shared.id(), - Some(UncompressedSizeInBytes.id()), - &FixedWidthUncompressedSizeInBytesKernel, - ); - this.register_aggregate_kernel( - Slice.id(), - Some(UncompressedSizeInBytes.id()), - &FixedWidthUncompressedSizeInBytesKernel, - ); this.register_aggregate_kernel( VarBin.id(), Some(UncompressedSizeInBytes.id()), diff --git a/vortex-array/src/arrays/dict/compute/mod.rs b/vortex-array/src/arrays/dict/compute/mod.rs index c56cc8ef367..6381325d7ea 100644 --- a/vortex-array/src/arrays/dict/compute/mod.rs +++ b/vortex-array/src/arrays/dict/compute/mod.rs @@ -11,6 +11,7 @@ mod mask; pub(crate) mod min_max; pub(crate) mod rules; mod slice; +pub(crate) mod uncompressed_size; use vortex_error::VortexResult; use vortex_mask::Mask; diff --git a/vortex-array/src/arrays/dict/compute/uncompressed_size.rs b/vortex-array/src/arrays/dict/compute/uncompressed_size.rs new file mode 100644 index 00000000000..0fb5276cac4 --- /dev/null +++ b/vortex-array/src/arrays/dict/compute/uncompressed_size.rs @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; + +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::aggregate_fn::AggregateFnRef; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::FixedWidthUncompressedSizeInBytesKernel; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes; +use crate::aggregate_fn::fns::uncompressed_size_in_bytes::uncompressed_size_in_bytes_u64; +use crate::aggregate_fn::kernels::DynAggregateKernel; +use crate::arrays::Dict; +use crate::scalar::Scalar; + +#[derive(Debug)] +pub(crate) struct DictUncompressedSizeInBytesKernel; + +impl DynAggregateKernel for DictUncompressedSizeInBytesKernel { + fn aggregate( + &self, + aggregate_fn: &AggregateFnRef, + batch: &ArrayRef, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if !aggregate_fn.is::() || !batch.is::() { + return Ok(None); + } + + // Fixed-width decoded size only needs the logical width, row count, and derived validity. + if let Some(size) = + FixedWidthUncompressedSizeInBytesKernel.aggregate(aggregate_fn, batch, ctx)? + { + return Ok(Some(size)); + } + + // For variable-width dictionaries, apply the codes to the values, then let the resulting + // array's aggregate kernel compute its decoded size. + let decoded = batch.clone().execute::(ctx)?; + Ok(Some(Scalar::from(uncompressed_size_in_bytes_u64( + &decoded, ctx, + )?))) + } +} From ea71fd056396f5a65815c0ecf32fa35922c10038 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 20 May 2026 12:43:46 +0100 Subject: [PATCH 4/5] Simplify Signed-off-by: Adam Gutglick --- .../fns/uncompressed_size_in_bytes/kernel.rs | 43 +++++++------------ 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs index d50f8f01cb4..3fbf4382985 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/kernel.rs @@ -42,54 +42,41 @@ pub(crate) fn fixed_width_uncompressed_size_in_bytes( array: &ArrayRef, ctx: &mut ExecutionCtx, ) -> VortexResult> { - let Some(value_size) = fixed_width_value_size(array.dtype(), array.len())? else { + let Some((size, include_validity)) = fixed_width_value_size(array.dtype(), array.len())? else { return Ok(None); }; - if value_size.include_validity { + if include_validity { let validity_size = validity_uncompressed_size_in_bytes(array.validity()?.execute_mask(array.len(), ctx)?)?; - value_size - .size - .checked_add(validity_size) + size.checked_add(validity_size) .map(Some) .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64")) } else { - Ok(Some(value_size.size)) + Ok(Some(size)) } } -struct FixedWidthValueSize { - size: u64, - include_validity: bool, -} - -fn fixed_width_value_size(dtype: &DType, len: usize) -> VortexResult> { +fn fixed_width_value_size(dtype: &DType, len: usize) -> VortexResult> { let fixed = match dtype { - DType::Null => FixedWidthValueSize { - size: 0, - include_validity: false, - }, - DType::Bool(_) => FixedWidthValueSize { - size: packed_bit_buffer_size_in_bytes(len)?, - include_validity: true, - }, - DType::Primitive(ptype, _) => FixedWidthValueSize { - size: u64::try_from(len) + DType::Null => (0, false), + DType::Bool(_) => (packed_bit_buffer_size_in_bytes(len)?, true), + DType::Primitive(ptype, _) => ( + u64::try_from(len) .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? .checked_mul(ptype.byte_width() as u64) .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?, - include_validity: true, - }, - DType::Decimal(decimal_type, _) => FixedWidthValueSize { - size: u64::try_from(len) + true, + ), + DType::Decimal(decimal_type, _) => ( + u64::try_from(len) .map_err(|e| vortex_err!("array length does not fit in u64: {e}"))? .checked_mul( DecimalType::smallest_decimal_value_type(decimal_type).byte_width() as u64, ) .ok_or_else(|| vortex_err!("uncompressed size in bytes overflowed u64"))?, - include_validity: true, - }, + true, + ), DType::Extension(ext_dtype) => { return fixed_width_value_size(ext_dtype.storage_dtype(), len); } From 3241f4f420685790eda7ca627121ab1155256ceb Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 20 May 2026 14:53:40 +0100 Subject: [PATCH 5/5] Cleanup Signed-off-by: Adam Gutglick --- .../fns/uncompressed_size_in_bytes/mod.rs | 14 ++++---------- vortex-array/src/validity.rs | 12 +++++------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs index 0b3bf8c3cde..fc582fefbe9 100644 --- a/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs +++ b/vortex-array/src/aggregate_fn/fns/uncompressed_size_in_bytes/mod.rs @@ -398,13 +398,10 @@ mod tests { } #[test] - fn all_invalid_primitive_matches_materialized_size() -> VortexResult<()> { + fn all_invalid_primitive_excludes_uniform_validity_size() -> VortexResult<()> { let array = PrimitiveArray::new(buffer![0i32, 0, 0], Validity::AllInvalid).into_array(); - assert_eq!( - aggregate(&array)?, - materialized_uncompressed_size_in_bytes(&array) - ); + assert_eq!(aggregate(&array)?, 12); Ok(()) } @@ -431,13 +428,10 @@ mod tests { } #[test] - fn all_invalid_bool_matches_materialized_size() -> VortexResult<()> { + fn all_invalid_bool_excludes_uniform_validity_size() -> VortexResult<()> { let array = BoolArray::from_iter([None::, None, None]).into_array(); - assert_eq!( - aggregate(&array)?, - materialized_uncompressed_size_in_bytes(&array) - ); + assert_eq!(aggregate(&array)?, 1); Ok(()) } diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 7644a53ba86..7d99e058b69 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -593,14 +593,12 @@ impl IntoArray for &MaskValues { /// [`UncompressedSizeInBytes`](crate::aggregate_fn::fns::uncompressed_size_in_bytes::UncompressedSizeInBytes) /// aggregate for canonical arrays and encoding kernels that report that aggregate directly. /// -/// A mask with all valid values contributes no bytes, because Vortex does not materialize a -/// validity buffer for all-valid arrays. An all-invalid mask contributes the size of the canonical -/// constant false validity array. A mixed mask contributes the packed bit-buffer size, rounded up +/// Uniform masks contribute no bytes, because Vortex does not materialize a validity buffer for +/// all-valid or all-invalid arrays. A mixed mask contributes the packed bit-buffer size, rounded up /// to whole bytes. pub fn validity_uncompressed_size_in_bytes(validity: Mask) -> VortexResult { match validity { - Mask::AllTrue(_) => Ok(0), - Mask::AllFalse(len) => Ok(ConstantArray::new(false, len).into_array().nbytes()), + Mask::AllTrue(_) | Mask::AllFalse(_) => Ok(0), Mask::Values(values) => u64::try_from(values.len().div_ceil(8)) .map_err(|e| vortex_err!("Failed to convert bit buffer length to u64: {e}")), } @@ -625,9 +623,9 @@ mod tests { #[rstest] #[case(Mask::AllTrue(9), 0)] - #[case(Mask::AllFalse(9), Mask::AllFalse(9).into_array().nbytes())] + #[case(Mask::AllFalse(9), 0)] #[case(Mask::from_iter([true, false, true, true, false, true, false, true, true]), 2)] - fn validity_uncompressed_size_matches_canonical_mask_size( + fn validity_uncompressed_size_matches_validity_buffer_size( #[case] mask: Mask, #[case] expected: u64, ) -> vortex_error::VortexResult<()> {