Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions encodings/pco/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-mask = { workspace = true }
vortex-scalar = { workspace = true }
vortex-session = { workspace = true }

[dev-dependencies]
divan = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion encodings/pco/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ impl BaseArrayVTable<PcoVTable> for PcoVTable {

impl CanonicalVTable<PcoVTable> for PcoVTable {
fn canonicalize(array: &PcoArray) -> VortexResult<Canonical> {
array.decompress().to_canonical()
Ok(Canonical::Primitive(array.decompress()))
}
}

Expand Down
45 changes: 25 additions & 20 deletions encodings/pco/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors
#![allow(clippy::cast_possible_truncation)]

use std::sync::LazyLock;

use vortex_array::ArrayContext;
use vortex_array::IntoArray;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrow::compute::to_arrow_preferred;
use vortex_array::arrow::ArrowArrayExecutor;
use vortex_array::assert_arrays_eq;
use vortex_array::assert_nth_scalar;
use vortex_array::serde::ArrayParts;
Expand All @@ -21,7 +24,12 @@ use vortex_buffer::BufferMut;
use vortex_dtype::DType;
use vortex_dtype::Nullability;
use vortex_dtype::PType;
use vortex_error::VortexResult;
use vortex_mask::Mask;
use vortex_session::VortexSession;

static LEGACY_SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());

use crate::PcoArray;
use crate::PcoVTable;
Expand Down Expand Up @@ -129,11 +137,9 @@ fn test_validity_vtable() {
}

#[test]
fn test_serde() {
let data: BufferMut<i32> = (0..1_000_000).collect();
let pco = PcoArray::from_primitive(&PrimitiveArray::new(data, Validity::NonNullable), 3, 100)
.unwrap()
.to_array();
fn test_serde() -> VortexResult<()> {
let data: PrimitiveArray = (0i32..1_000_000).collect();
let pco = PcoArray::from_primitive(&data, 3, 100)?.to_array();

let session = ArraySession::default();
let context = ArrayContext::new(
Expand All @@ -151,23 +157,22 @@ fn test_serde() {
offset: 0,
include_padding: true,
},
)
.unwrap()
)?
.into_iter()
.flat_map(|x| x.into_iter())
.collect::<BufferMut<u8>>()
.freeze();

let parts = ArrayParts::try_from(bytes).unwrap();
let decoded = parts
.decode(
&context,
&DType::Primitive(PType::I32, Nullability::NonNullable),
1_000_000,
)
.unwrap();
assert_eq!(
&to_arrow_preferred(&pco).unwrap(),
&to_arrow_preferred(&decoded).unwrap()
);
let parts = ArrayParts::try_from(bytes)?;
let decoded = parts.decode(
&context,
&DType::Primitive(PType::I32, Nullability::NonNullable),
1_000_000,
)?;
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let data_type = data.dtype().to_arrow_dtype()?;
let pco_arrow = pco.execute_arrow(Some(&data_type), &mut ctx)?;
let decoded_arrow = decoded.execute_arrow(Some(&data_type), &mut ctx)?;
assert!(pco_arrow == decoded_arrow);
Ok(())
}
3 changes: 2 additions & 1 deletion vortex-array/src/arrays/varbin/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ impl CompareKernel for VarBinVTable {

let lhs = Datum::try_new(lhs.as_ref())?;

// TODO(robert): Handle LargeString/Binary arrays
// Use StringViewArray/BinaryViewArray to match the Utf8View/BinaryView types
// produced by Datum::try_new (which uses into_arrow_preferred())
let arrow_rhs: &dyn arrow_array::Datum = match rhs_const.dtype() {
DType::Utf8(_) => &rhs_const
.as_utf8()
Expand Down
133 changes: 88 additions & 45 deletions vortex-array/src/arrays/varbin/vtable/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,106 @@

use std::sync::Arc;

use arrow_array::BinaryViewArray;
use arrow_array::StringViewArray;
use arrow_array::cast::AsArray;
use arrow_schema::DataType;
use vortex_dtype::DType;
use vortex_error::VortexExpect;
use num_traits::AsPrimitive;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;
use vortex_vector::binaryview::BinaryView;

use crate::ArrayRef;
use crate::Canonical;
use crate::ToCanonical;
use crate::arrays::VarBinVTable;
use crate::arrays::VarBinViewArray;
use crate::arrays::varbin::VarBinArray;
use crate::arrow::FromArrowArray;
use crate::arrow::IntoArrowArray;
use crate::vtable::CanonicalVTable;

impl CanonicalVTable<VarBinVTable> for VarBinVTable {
fn canonicalize(array: &VarBinArray) -> VortexResult<Canonical> {
let dtype = array.dtype().clone();
let nullable = dtype.is_nullable();

// Zero the offsets first to ensure the bytes buffer starts at 0
let array = array.clone().zero_offsets();
assert_eq!(array.offset_at(0), 0);
let (dtype, bytes, offsets, validity) = array.into_parts();

let array_ref = array
.to_array()
.into_arrow_preferred()
.vortex_expect("VarBinArray must be convertible to arrow array");
let offsets = offsets.to_primitive();

let array = match (&dtype, array_ref.data_type()) {
(DType::Utf8(_), DataType::Utf8) => {
Arc::new(StringViewArray::from(array_ref.as_string::<i32>()))
as Arc<dyn arrow_array::Array>
}
(DType::Utf8(_), DataType::LargeUtf8) => {
Arc::new(StringViewArray::from(array_ref.as_string::<i64>()))
as Arc<dyn arrow_array::Array>
}
// Build views directly from offsets
#[expect(clippy::cast_possible_truncation, reason = "BinaryView offset is u32")]
let views: Buffer<BinaryView> = match_each_integer_ptype!(offsets.ptype(), |O| {
let offsets_slice = offsets.as_slice::<O>();
let bytes_slice = bytes.as_ref();

(DType::Binary(_), DataType::Binary) => {
Arc::new(BinaryViewArray::from(array_ref.as_binary::<i32>()))
}
(DType::Binary(_), DataType::LargeBinary) => {
Arc::new(BinaryViewArray::from(array_ref.as_binary::<i64>()))
let mut views = BufferMut::<BinaryView>::with_capacity(offsets_slice.len() - 1);
for window in offsets_slice.windows(2) {
let start: usize = window[0].as_();
let end: usize = window[1].as_();
let value = &bytes_slice[start..end];
views.push(BinaryView::make_view(value, 0, start as u32));
}
// If its already a view, no need to do anything
(DType::Binary(_), DataType::BinaryView) | (DType::Utf8(_), DataType::Utf8View) => {
array_ref
}
_ => unreachable!("VarBinArray must have Utf8 or Binary dtype, instead got: {dtype}",),
};
Ok(Canonical::VarBinView(
ArrayRef::from_arrow(array.as_ref(), nullable).to_varbinview(),
))
views.freeze()
});

// Create VarBinViewArray with the original bytes buffer and computed views
// SAFETY: views are correctly computed from valid offsets
let varbinview =
unsafe { VarBinViewArray::new_unchecked(views, Arc::from([bytes]), dtype, validity) };
Ok(Canonical::VarBinView(varbinview))
}
}

// Convert a VarBinArray to VarBinViewArray using Arrow's conversion.
//
// This method leverages Arrow's `From<&GenericByteArray<FROM>> for GenericByteViewArray<V>`
// implementation to perform the conversion, then converts back to Vortex.
// pub fn canonicalize_via_arrow(array: &VarBinArray) -> VortexResult<VarBinViewArray> {
// match array.dtype() {
// DType::Utf8(_) => canonicalize_via_arrow_typed::<Utf8Type, StringViewType>(array),
// DType::Binary(_) => canonicalize_via_arrow_typed::<BinaryType, BinaryViewType>(array),
// _ => unreachable!("VarBinArray must have Utf8 or Binary dtype"),
// }
// }
//
// fn canonicalize_via_arrow_typed<FROM, V>(array: &VarBinArray) -> VortexResult<VarBinViewArray>
// where
// FROM: ByteArrayType,
// FROM::Offset: NativePType,
// V: ByteViewType<Native = FROM::Native>,
// {
// let nullable = array.dtype().is_nullable();
//
// // Build Arrow GenericByteArray from VarBinArray
// // Cast offsets to the required offset type (i32 for Utf8/Binary, i64 for Large variants)
// let offsets = cast(
// array.offsets().as_ref(),
// &DType::Primitive(FROM::Offset::PTYPE, Nullability::NonNullable),
// )?
// .to_primitive()
// .to_buffer::<FROM::Offset>()
// .into_arrow_offset_buffer();
//
// let data = array.bytes().clone().into_arrow_buffer();
//
// // Convert validity mask to Arrow NullBuffer
// let null_buffer = match array.validity_mask() {
// Mask::AllTrue(_) => None,
// Mask::AllFalse(len) => Some(NullBuffer::new_null(len)),
// Mask::Values(values) => Some(NullBuffer::from(BooleanBuffer::from(
// values.bit_buffer().clone(),
// ))),
// };
// let null_buffer = crate::arrow::null_buffer::to_null_buffer()
//
// // SAFETY: VarBinArray invariants guarantee valid offsets and UTF-8 (if Utf8 dtype)
// let arrow_byte_array =
// unsafe { GenericByteArray::<FROM>::new_unchecked(offsets, data, null_buffer) };
//
// // Use Arrow's From impl to convert to view array
// let arrow_view_array: GenericByteViewArray<V> = GenericByteViewArray::from(&arrow_byte_array);
//
// // Convert back to Vortex
// let vortex_array = ArrayRef::from_arrow(&arrow_view_array, nullable);
// Ok(vortex_array.as_::<VarBinViewVTable>().clone())
// }

#[cfg(test)]
mod tests {
use rstest::rstest;
Expand All @@ -83,18 +125,19 @@ mod tests {
varbin.append_value("1234567890123".as_bytes());
let varbin = varbin.finish(dtype.clone());

let varbin = varbin.slice(1..4);

let canonical = varbin.to_varbinview();
assert_eq!(canonical.dtype(), &dtype);

assert!(!canonical.is_valid(0));
assert!(!canonical.is_valid(1));

// First value is inlined (12 bytes)
assert!(canonical.views()[2].is_inlined());
assert_eq!(canonical.bytes_at(2).as_slice(), "123456789012".as_bytes());
assert!(canonical.views()[1].is_inlined());
assert_eq!(canonical.bytes_at(1).as_slice(), "123456789012".as_bytes());

// Second value is not inlined (13 bytes)
assert!(!canonical.views()[3].is_inlined());
assert_eq!(canonical.bytes_at(3).as_slice(), "1234567890123".as_bytes());
assert!(!canonical.views()[2].is_inlined());
assert_eq!(canonical.bytes_at(2).as_slice(), "1234567890123".as_bytes());
}
}
1 change: 0 additions & 1 deletion vortex-array/src/arrow/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@

mod to_arrow;

pub(crate) use to_arrow::warm_up_vtable;
pub use to_arrow::*;
Loading
Loading