diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index 22125f4a89a..f6fdcfd068d 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -157,7 +157,7 @@ impl VTable for ALPVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let encoded_ptype = match &dtype { DType::Primitive(PType::F32, n) => DType::Primitive(PType::I32, *n), DType::Primitive(PType::F64, n) => DType::Primitive(PType::I64, *n), @@ -187,6 +187,7 @@ impl VTable for ALPVTable { }, patches, ) + .map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/alp/src/alp_rd/array.rs b/encodings/alp/src/alp_rd/array.rs index e015937ae78..12c18fa5c26 100644 --- a/encodings/alp/src/alp_rd/array.rs +++ b/encodings/alp/src/alp_rd/array.rs @@ -191,7 +191,7 @@ impl VTable for ALPRDVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if children.len() < 2 { vortex_bail!( "Expected at least 2 children for ALPRD encoding, found {}", @@ -252,6 +252,7 @@ impl VTable for ALPRDVTable { })?, left_parts_patches, ) + .map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/bytebool/src/array.rs b/encodings/bytebool/src/array.rs index c7e21795a12..cd7f0094a26 100644 --- a/encodings/bytebool/src/array.rs +++ b/encodings/bytebool/src/array.rs @@ -140,7 +140,7 @@ impl VTable for ByteBoolVTable { _metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let validity = if children.is_empty() { Validity::from(dtype.nullability()) } else if children.len() == 1 { @@ -155,7 +155,7 @@ impl VTable for ByteBoolVTable { } let buffer = buffers[0].clone(); - Ok(ByteBoolArray::new(buffer, validity)) + Ok(ByteBoolArray::new(buffer, validity).into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/datetime-parts/src/array.rs b/encodings/datetime-parts/src/array.rs index f8b89d1d066..a1f47681e2f 100644 --- a/encodings/datetime-parts/src/array.rs +++ b/encodings/datetime-parts/src/array.rs @@ -179,7 +179,7 @@ impl VTable for DateTimePartsVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if children.len() != 3 { vortex_bail!( "Expected 3 children for datetime-parts encoding, found {}", @@ -204,6 +204,7 @@ impl VTable for DateTimePartsVTable { )?; DateTimePartsArray::try_new(dtype.clone(), days, seconds, subseconds) + .map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs b/encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs index eec4fa1aae4..20b88e40f3d 100644 --- a/encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs +++ b/encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs @@ -154,7 +154,7 @@ impl VTable for DecimalBytePartsVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let Some(decimal_dtype) = dtype.as_decimal_opt() else { vortex_bail!("decoding decimal but given non decimal dtype {}", dtype) }; @@ -168,7 +168,7 @@ impl VTable for DecimalBytePartsVTable { "lower_part_count > 0 not currently supported" ); - DecimalBytePartsArray::try_new(msp, *decimal_dtype) + DecimalBytePartsArray::try_new(msp, *decimal_dtype).map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/fastlanes/benches/bitpacking_take.rs b/encodings/fastlanes/benches/bitpacking_take.rs index 9bf7ea4db79..73352ff9ec7 100644 --- a/encodings/fastlanes/benches/bitpacking_take.rs +++ b/encodings/fastlanes/benches/bitpacking_take.rs @@ -30,7 +30,7 @@ fn main() { fn take_10_stratified(bencher: Bencher) { let values = fixture(1_000_000, 8); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let indices = PrimitiveArray::from_iter((0..10).map(|i| i * 10_000)); bencher @@ -48,7 +48,7 @@ fn take_10_stratified(bencher: Bencher) { fn take_10_contiguous(bencher: Bencher) { let values = fixture(1_000_000, 8); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let indices = buffer![0..10].into_array(); bencher @@ -67,7 +67,7 @@ fn take_10k_random(bencher: Bencher) { let values = fixture(1_000_000, 8); let range = Uniform::new(0, values.len()).unwrap(); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let rng = StdRng::seed_from_u64(0); let indices = PrimitiveArray::from_iter(rng.sample_iter(range).take(10_000).map(|i| i as u32)); @@ -87,7 +87,7 @@ fn take_10k_random(bencher: Bencher) { fn take_10k_contiguous(bencher: Bencher) { let values = fixture(1_000_000, 8); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let indices = PrimitiveArray::from_iter(0..10_000); bencher @@ -105,7 +105,7 @@ fn take_10k_contiguous(bencher: Bencher) { fn take_200k_dispersed(bencher: Bencher) { let values = fixture(1_000_000, 8); let uncompressed = PrimitiveArray::new(values.clone(), Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let indices = PrimitiveArray::from_iter((0..200_000).map(|i| (i * 42) % values.len() as u64)); bencher @@ -123,7 +123,7 @@ fn take_200k_dispersed(bencher: Bencher) { fn take_200k_first_chunk_only(bencher: Bencher) { let values = fixture(1_000_000, 8); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let indices = PrimitiveArray::from_iter((0..200_000).map(|i| ((i * 42) % 1024) as u64)); bencher @@ -161,7 +161,7 @@ const NUM_EXCEPTIONS: u32 = 10000; fn patched_take_10_stratified(bencher: Bencher) { let values = (0u32..BIG_BASE2 + NUM_EXCEPTIONS).collect::>(); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); assert!(packed.patches().is_some()); assert_eq!( @@ -186,7 +186,7 @@ fn patched_take_10_stratified(bencher: Bencher) { fn patched_take_10_contiguous(bencher: Bencher) { let values = (0u32..BIG_BASE2 + NUM_EXCEPTIONS).collect::>(); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); assert!(packed.patches().is_some()); assert_eq!( @@ -211,7 +211,7 @@ fn patched_take_10_contiguous(bencher: Bencher) { fn patched_take_10k_random(bencher: Bencher) { let values = (0u32..BIG_BASE2 + NUM_EXCEPTIONS).collect::>(); let uncompressed = PrimitiveArray::new(values.clone(), Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let rng = StdRng::seed_from_u64(0); let range = Uniform::new(0, values.len()).unwrap(); @@ -232,7 +232,7 @@ fn patched_take_10k_random(bencher: Bencher) { fn patched_take_10k_contiguous_not_patches(bencher: Bencher) { let values = (0u32..BIG_BASE2 + NUM_EXCEPTIONS).collect::>(); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let indices = PrimitiveArray::from_iter((0u32..NUM_EXCEPTIONS).cycle().take(10000)); bencher @@ -250,7 +250,7 @@ fn patched_take_10k_contiguous_not_patches(bencher: Bencher) { fn patched_take_10k_contiguous_patches(bencher: Bencher) { let values = (0u32..BIG_BASE2 + NUM_EXCEPTIONS).collect::>(); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); assert!(packed.patches().is_some()); assert_eq!( @@ -276,7 +276,7 @@ fn patched_take_10k_contiguous_patches(bencher: Bencher) { fn patched_take_200k_dispersed(bencher: Bencher) { let values = (0u32..BIG_BASE2 + NUM_EXCEPTIONS).collect::>(); let uncompressed = PrimitiveArray::new(values.clone(), Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let indices = PrimitiveArray::from_iter((0..200_000).map(|i| (i * 42) % values.len() as u64)); bencher @@ -294,7 +294,7 @@ fn patched_take_200k_dispersed(bencher: Bencher) { fn patched_take_200k_first_chunk_only(bencher: Bencher) { let values = (0u32..BIG_BASE2 + NUM_EXCEPTIONS).collect::>(); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let indices = PrimitiveArray::from_iter((0..200_000).map(|i| ((i * 42) % 1024) as u64)); bencher @@ -312,7 +312,7 @@ fn patched_take_200k_first_chunk_only(bencher: Bencher) { fn patched_take_10k_adversarial(bencher: Bencher) { let values = (0u32..BIG_BASE2 + NUM_EXCEPTIONS).collect::>(); let uncompressed = PrimitiveArray::new(values, Validity::NonNullable); - let packed = bitpack_to_best_bit_width(&uncompressed).unwrap(); + let (packed, _) = bitpack_to_best_bit_width(&uncompressed).unwrap(); let per_chunk_count = 100; let indices = PrimitiveArray::from_iter( (0..(NUM_EXCEPTIONS + 1024) / 1024) diff --git a/encodings/fastlanes/benches/compute_between.rs b/encodings/fastlanes/benches/compute_between.rs index d3bc7cc6259..7131f2cef80 100644 --- a/encodings/fastlanes/benches/compute_between.rs +++ b/encodings/fastlanes/benches/compute_between.rs @@ -38,7 +38,10 @@ fn generate_bit_pack_primitive_array( .map(|_| T::from_usize(rng.random_range(0..10_000)).vortex_expect("")) .collect::(); - bitpack_to_best_bit_width(&a).vortex_expect("").into_array() + bitpack_to_best_bit_width(&a) + .vortex_expect("") + .0 + .into_array() } fn generate_alp_bit_pack_primitive_array( @@ -55,6 +58,7 @@ fn generate_alp_bit_pack_primitive_array( let bp = bitpack_to_best_bit_width(&encoded) .vortex_expect("") + .0 .into_array(); ALPArray::new(bp, alp.exponents(), None).into_array() } diff --git a/encodings/fastlanes/src/bitpacking/array/bitpack_compress.rs b/encodings/fastlanes/src/bitpacking/array/bitpack_compress.rs index 5f6ad1dbbf0..4b52496dee8 100644 --- a/encodings/fastlanes/src/bitpacking/array/bitpack_compress.rs +++ b/encodings/fastlanes/src/bitpacking/array/bitpack_compress.rs @@ -27,7 +27,9 @@ use vortex_mask::Mask; use crate::BitPackedArray; use crate::bitpack_decompress; -pub fn bitpack_to_best_bit_width(array: &PrimitiveArray) -> VortexResult { +pub fn bitpack_to_best_bit_width( + array: &PrimitiveArray, +) -> VortexResult<(BitPackedArray, Option)> { let bit_width_freq = bit_width_histogram(array)?; let best_bit_width = find_best_bit_width(array.ptype(), &bit_width_freq)?; bitpack_encode(array, best_bit_width, Some(&bit_width_freq)) @@ -38,7 +40,7 @@ pub fn bitpack_encode( array: &PrimitiveArray, bit_width: u8, bit_width_freq: Option<&[usize]>, -) -> VortexResult { +) -> VortexResult<(BitPackedArray, Option)> { let bit_width_freq = match bit_width_freq { Some(freq) => freq, None => &bit_width_histogram(array)?, @@ -77,7 +79,6 @@ pub fn bitpack_encode( BufferHandle::new_host(packed), array.dtype().clone(), array.validity().clone(), - patches, bit_width, array.len(), 0, @@ -87,7 +88,8 @@ pub fn bitpack_encode( .stats_set .to_ref(bitpacked.as_ref()) .inherit_from(array.statistics()); - Ok(bitpacked) + + Ok((bitpacked, patches)) } /// Bitpack an array into the specified bit-width without checking statistics. @@ -111,7 +113,6 @@ pub unsafe fn bitpack_encode_unchecked( BufferHandle::new_host(packed), array.dtype().clone(), array.validity().clone(), - None, bit_width, array.len(), 0, diff --git a/encodings/fastlanes/src/bitpacking/array/mod.rs b/encodings/fastlanes/src/bitpacking/array/mod.rs index cffeac17090..e4915d9fc32 100644 --- a/encodings/fastlanes/src/bitpacking/array/mod.rs +++ b/encodings/fastlanes/src/bitpacking/array/mod.rs @@ -8,7 +8,6 @@ use vortex_array::buffer::BufferHandle; use vortex_array::dtype::DType; use vortex_array::dtype::NativePType; use vortex_array::dtype::PType; -use vortex_array::patches::Patches; use vortex_array::stats::ArrayStats; use vortex_array::validity::Validity; use vortex_error::VortexResult; @@ -28,7 +27,6 @@ pub struct BitPackedArrayParts { pub bit_width: u8, pub len: usize, pub packed: BufferHandle, - pub patches: Option, pub validity: Validity, } @@ -41,7 +39,6 @@ pub struct BitPackedArray { pub(super) dtype: DType, pub(super) bit_width: u8, pub(super) packed: BufferHandle, - pub(super) patches: Option, pub(super) validity: Validity, pub(super) stats_set: ArrayStats, } @@ -71,7 +68,6 @@ impl BitPackedArray { packed: BufferHandle, dtype: DType, validity: Validity, - patches: Option, bit_width: u8, len: usize, offset: u16, @@ -82,7 +78,6 @@ impl BitPackedArray { dtype, bit_width, packed, - patches, validity, stats_set: Default::default(), } @@ -113,27 +108,18 @@ impl BitPackedArray { packed: BufferHandle, ptype: PType, validity: Validity, - patches: Option, bit_width: u8, length: usize, offset: u16, ) -> VortexResult { - Self::validate( - &packed, - ptype, - &validity, - patches.as_ref(), - bit_width, - length, - offset, - )?; + Self::validate(&packed, ptype, &validity, bit_width, length, offset)?; let dtype = DType::Primitive(ptype, validity.nullability()); // SAFETY: all components validated above unsafe { Ok(Self::new_unchecked( - packed, dtype, validity, patches, bit_width, length, offset, + packed, dtype, validity, bit_width, length, offset, )) } } @@ -142,7 +128,6 @@ impl BitPackedArray { packed: &BufferHandle, ptype: PType, validity: &Validity, - patches: Option<&Patches>, bit_width: u8, length: usize, offset: u16, @@ -163,11 +148,6 @@ impl BitPackedArray { "Offset must be less than the full block i.e., 1024, got {offset}" ); - // Validate patches - if let Some(patches) = patches { - Self::validate_patches(patches, ptype, length)?; - } - // Validate packed buffer let expected_packed_len = (length + offset as usize).div_ceil(1024) * (128 * bit_width as usize); @@ -181,24 +161,6 @@ impl BitPackedArray { Ok(()) } - fn validate_patches(patches: &Patches, ptype: PType, len: usize) -> VortexResult<()> { - // Ensure that array and patches have same ptype - vortex_ensure!( - patches.dtype().eq_ignore_nullability(ptype.into()), - "Patches DType {} does not match BitPackedArray dtype {}", - patches.dtype().as_nonnullable(), - ptype - ); - - vortex_ensure!( - patches.array_len() == len, - "BitPackedArray patches length {} != expected {len}", - patches.array_len(), - ); - - Ok(()) - } - pub fn ptype(&self) -> PType { self.dtype.as_ptype() } @@ -239,19 +201,6 @@ impl BitPackedArray { self.bit_width } - /// Access the patches array. - /// - /// If present, patches MUST be a `SparseArray` with equal-length to this array, and whose - /// indices indicate the locations of patches. The indices must have non-zero length. - #[inline] - pub fn patches(&self) -> Option<&Patches> { - self.patches.as_ref() - } - - pub fn replace_patches(&mut self, patches: Option) { - self.patches = patches; - } - #[inline] pub fn offset(&self) -> u16 { self.offset @@ -291,7 +240,6 @@ impl BitPackedArray { bit_width: self.bit_width, len: self.len, packed: self.packed, - patches: self.patches, validity: self.validity, } } diff --git a/encodings/fastlanes/src/bitpacking/vtable/mod.rs b/encodings/fastlanes/src/bitpacking/vtable/mod.rs index d213fb9f1ed..77d3ed22e8f 100644 --- a/encodings/fastlanes/src/bitpacking/vtable/mod.rs +++ b/encodings/fastlanes/src/bitpacking/vtable/mod.rs @@ -9,9 +9,11 @@ use vortex_array::ArrayRef; use vortex_array::DeserializeMetadata; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; use vortex_array::Precision; use vortex_array::ProstMetadata; use vortex_array::SerializeMetadata; +use vortex_array::arrays::PatchedArray; use vortex_array::buffer::BufferHandle; use vortex_array::builders::ArrayBuilder; use vortex_array::dtype::DType; @@ -25,8 +27,8 @@ use vortex_array::validity::Validity; use vortex_array::vtable; use vortex_array::vtable::ArrayId; use vortex_array::vtable::VTable; +use vortex_array::vtable::ValidityHelper; use vortex_array::vtable::ValidityVTableFromValidityHelper; -use vortex_array::vtable::patches_child; use vortex_array::vtable::patches_child_name; use vortex_array::vtable::patches_nchildren; use vortex_array::vtable::validity_nchildren; @@ -57,8 +59,21 @@ pub struct BitPackedMetadata { pub(crate) bit_width: u32, #[prost(uint32, tag = "2")] pub(crate) offset: u32, // must be <1024 + + // NOTE(aduffy): Starting with format version 0.58.0, this field should never be set. It is + // only set by older writers and we use it to migrate to the new PatchedArray wrapper. #[prost(message, optional, tag = "3")] - pub(crate) patches: Option, + patches: Option, +} + +impl BitPackedMetadata { + pub(crate) fn new(bit_width: u32, offset: u32) -> Self { + Self { + bit_width, + offset, + patches: None, + } + } } impl VTable for BitPackedVTable { @@ -95,7 +110,6 @@ impl VTable for BitPackedVTable { array.dtype.hash(state); array.bit_width.hash(state); array.packed.array_hash(state, precision); - array.patches.array_hash(state, precision); array.validity.array_hash(state, precision); } @@ -105,7 +119,6 @@ impl VTable for BitPackedVTable { && array.dtype == other.dtype && array.bit_width == other.bit_width && array.packed.array_eq(&other.packed, precision) - && array.patches.array_eq(&other.patches, precision) && array.validity.array_eq(&other.validity, precision) } @@ -128,19 +141,11 @@ impl VTable for BitPackedVTable { } fn nchildren(array: &BitPackedArray) -> usize { - array.patches().map_or(0, patches_nchildren) + validity_nchildren(&array.validity) + validity_nchildren(&array.validity) } fn child(array: &BitPackedArray, idx: usize) -> ArrayRef { - let pc = array.patches().map_or(0, patches_nchildren); - if idx < pc { - patches_child( - array - .patches() - .vortex_expect("BitPackedArray child index out of bounds"), - idx, - ) - } else if idx < pc + validity_nchildren(&array.validity) { + if idx < validity_nchildren(&array.validity) { validity_to_child(&array.validity, array.len) .vortex_expect("BitPackedArray child index out of bounds") } else { @@ -149,12 +154,11 @@ impl VTable for BitPackedVTable { } fn child_name(array: &BitPackedArray, idx: usize) -> String { - let pc = array.patches().map_or(0, patches_nchildren); - if idx < pc { - patches_child_name(idx).to_string() - } else { - "validity".to_string() + if idx < validity_nchildren(array.validity()) { + return "validity".to_string(); } + + vortex_panic!("invalid child index for BitPackedArray: {idx}"); } fn reduce_parent( @@ -166,83 +170,34 @@ impl VTable for BitPackedVTable { } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { - // Children: patches (if present): indices, values, chunk_offsets; then validity (if present) - let patches_info = array - .patches() - .map(|p| (p.offset(), p.chunk_offsets().is_some())); - - let mut child_idx = 0; - let patches = if let Some((patch_offset, has_chunk_offsets)) = patches_info { - let patch_indices = children - .get(child_idx) - .ok_or_else(|| vortex_err!("Expected patch_indices child at index {}", child_idx))? - .clone(); - child_idx += 1; - - let patch_values = children - .get(child_idx) - .ok_or_else(|| vortex_err!("Expected patch_values child at index {}", child_idx))? - .clone(); - child_idx += 1; - - let patch_chunk_offsets = if has_chunk_offsets { - let offsets = children - .get(child_idx) - .ok_or_else(|| { - vortex_err!("Expected patch_chunk_offsets child at index {}", child_idx) - })? - .clone(); - child_idx += 1; - Some(offsets) - } else { - None - }; - - Some(Patches::new( - array.len(), - patch_offset, - patch_indices, - patch_values, - patch_chunk_offsets, - )?) + // Children: validity (if present). + let expected_children = if matches!(array.validity, Validity::Array(_)) { + 1 } else { - None + 0 }; - let validity = if child_idx < children.len() { - Validity::Array(children[child_idx].clone()) - } else { - Validity::from(array.dtype().nullability()) - }; - - let expected_children = child_idx - + if matches!(validity, Validity::Array(_)) { - 1 - } else { - 0 - }; vortex_ensure!( children.len() == expected_children, - "Expected {} children, got {}", - expected_children, + "expected {expected_children} children for BitPackedArray, received {}", children.len() ); - array.patches = patches; + let validity = match children.into_iter().next() { + Some(child) => Validity::Array(child), + None => Validity::from(array.dtype.nullability()), + }; + array.validity = validity; Ok(()) } fn metadata(array: &BitPackedArray) -> VortexResult { - Ok(ProstMetadata(BitPackedMetadata { - bit_width: array.bit_width() as u32, - offset: array.offset() as u32, - patches: array - .patches() - .map(|p| p.to_metadata(array.len(), array.dtype())) - .transpose()?, - })) + Ok(ProstMetadata(BitPackedMetadata::new( + array.bit_width as u32, + array.offset() as u32, + ))) } fn serialize(metadata: Self::Metadata) -> VortexResult>> { @@ -271,7 +226,7 @@ impl VTable for BitPackedVTable { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if buffers.len() != 1 { vortex_bail!("Expected 1 buffer, got {}", buffers.len()); } @@ -315,11 +270,10 @@ impl VTable for BitPackedVTable { }) .transpose()?; - BitPackedArray::try_new( + let bp_array = BitPackedArray::try_new( packed, PType::try_from(dtype)?, validity, - patches, u8::try_from(metadata.bit_width).map_err(|_| { vortex_err!( "BitPackedMetadata bit_width {} does not fit in u8", @@ -333,7 +287,19 @@ impl VTable for BitPackedVTable { metadata.offset ) })?, - ) + )? + .into_array(); + + if let Some(patches) = patches { + // TODO(aduffy): this is only needed for backward compatibility. + let mut ctx = ExecutionCtx::new(LEGACY_SESSION.clone()); + Ok( + PatchedArray::from_array_and_patches(bp_array.into_array(), &patches, &mut ctx)? + .into_array(), + ) + } else { + Ok(bp_array) + } } fn append_to_builder( diff --git a/encodings/fastlanes/src/bitpacking/vtable/operations.rs b/encodings/fastlanes/src/bitpacking/vtable/operations.rs index 86cad42c433..114b62f2fd9 100644 --- a/encodings/fastlanes/src/bitpacking/vtable/operations.rs +++ b/encodings/fastlanes/src/bitpacking/vtable/operations.rs @@ -11,15 +11,7 @@ use crate::bitpack_decompress; impl OperationsVTable for BitPackedVTable { fn scalar_at(array: &BitPackedArray, index: usize) -> VortexResult { - Ok( - if let Some(patches) = array.patches() - && let Some(patch) = patches.get_patched(index)? - { - patch - } else { - bitpack_decompress::unpack_single(array, index) - }, - ) + Ok(bitpack_decompress::unpack_single(array, index)) } } @@ -39,7 +31,6 @@ mod test { use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::dtype::PType; - use vortex_array::patches::Patches; use vortex_array::scalar::Scalar; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; @@ -183,7 +174,7 @@ mod test { } #[test] - fn scalar_at_invalid_patches() { + fn scalar_at_invalid() { let packed_array = unsafe { BitPackedArray::new_unchecked( BufferHandle::new_host(ByteBuffer::copy_from_aligned( @@ -192,16 +183,6 @@ mod test { )), DType::Primitive(PType::U32, true.into()), Validity::AllInvalid, - Some( - Patches::new( - 8, - 0, - buffer![1u32].into_array(), - PrimitiveArray::new(buffer![999u32], Validity::AllValid).into_array(), - None, - ) - .unwrap(), - ), 1, 8, 0, diff --git a/encodings/fastlanes/src/delta/vtable/mod.rs b/encodings/fastlanes/src/delta/vtable/mod.rs index 96c0e0dd248..a4a0525242f 100644 --- a/encodings/fastlanes/src/delta/vtable/mod.rs +++ b/encodings/fastlanes/src/delta/vtable/mod.rs @@ -171,7 +171,7 @@ impl VTable for DeltaVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { assert_eq!(children.len(), 2); let ptype = PType::try_from(dtype)?; let lanes = match_each_unsigned_integer_ptype!(ptype, |T| { ::LANES }); @@ -186,7 +186,7 @@ impl VTable for DeltaVTable { let bases = children.get(0, dtype, bases_len)?; let deltas = children.get(1, dtype, deltas_len)?; - DeltaArray::try_new(bases, deltas, metadata.0.offset as usize, len) + DeltaArray::try_new(bases, deltas, metadata.0.offset as usize, len).map(|a| a.into_array()) } fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult { diff --git a/encodings/fastlanes/src/for/vtable/mod.rs b/encodings/fastlanes/src/for/vtable/mod.rs index cef31f8250c..c9e4398436a 100644 --- a/encodings/fastlanes/src/for/vtable/mod.rs +++ b/encodings/fastlanes/src/for/vtable/mod.rs @@ -144,7 +144,7 @@ impl VTable for FoRVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if children.len() != 1 { vortex_bail!( "Expected 1 child for FoR encoding, found {}", @@ -154,7 +154,7 @@ impl VTable for FoRVTable { let encoded = children.get(0, dtype, len)?; - FoRArray::try_new(encoded, metadata.clone()) + FoRArray::try_new(encoded, metadata.clone()).map(|a| a.into_array()) } fn reduce_parent( diff --git a/encodings/fastlanes/src/rle/vtable/mod.rs b/encodings/fastlanes/src/rle/vtable/mod.rs index 0a682d3abf7..86bd677e933 100644 --- a/encodings/fastlanes/src/rle/vtable/mod.rs +++ b/encodings/fastlanes/src/rle/vtable/mod.rs @@ -189,7 +189,7 @@ impl VTable for RLEVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let metadata = &metadata.0; let values = children.get( 0, @@ -219,6 +219,7 @@ impl VTable for RLEVTable { metadata.offset as usize, len, ) + .map(|a| a.into_array()) } fn execute_parent( diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index 175ca6fdb59..034182b1c96 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -217,7 +217,7 @@ impl VTable for FSSTVTable { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let symbols = Buffer::::from_byte_buffer(buffers[0].clone().try_to_host_sync()?); let symbol_lengths = Buffer::::from_byte_buffer(buffers[1].clone().try_to_host_sync()?); @@ -251,7 +251,8 @@ impl VTable for FSSTVTable { symbol_lengths, codes, uncompressed_lengths, - ); + ) + .map(|a| a.into_array()); } // Check for the current deserialization path. @@ -298,7 +299,8 @@ impl VTable for FSSTVTable { symbol_lengths, codes, uncompressed_lengths, - ); + ) + .map(|a| a.into_array()); } vortex_bail!( diff --git a/encodings/pco/src/array.rs b/encodings/pco/src/array.rs index e1c99d37d45..5d3ccb424be 100644 --- a/encodings/pco/src/array.rs +++ b/encodings/pco/src/array.rs @@ -207,7 +207,7 @@ impl VTable for PcoVTable { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let validity = if children.is_empty() { Validity::from(dtype.nullability()) } else if children.len() == 1 { @@ -242,7 +242,8 @@ impl VTable for PcoVTable { metadata.0.clone(), len, validity, - )) + ) + .into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/runend/src/array.rs b/encodings/runend/src/array.rs index 267920c81e4..498d9be3f56 100644 --- a/encodings/runend/src/array.rs +++ b/encodings/runend/src/array.rs @@ -154,7 +154,7 @@ impl VTable for RunEndVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable); let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize"); let ends = children.get(0, &ends_dtype, runs)?; @@ -167,6 +167,7 @@ impl VTable for RunEndVTable { usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"), len, ) + .map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/sequence/src/array.rs b/encodings/sequence/src/array.rs index 8f2258f025e..0f264e92a24 100644 --- a/encodings/sequence/src/array.rs +++ b/encodings/sequence/src/array.rs @@ -363,7 +363,7 @@ impl VTable for SequenceVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], _children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { SequenceArray::try_new( metadata.base, metadata.multiplier, @@ -371,6 +371,7 @@ impl VTable for SequenceVTable { dtype.nullability(), len, ) + .map(|a| a.into_array()) } fn with_children(_array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/sparse/src/lib.rs b/encodings/sparse/src/lib.rs index 69cf9ca715d..284f3d36f94 100644 --- a/encodings/sparse/src/lib.rs +++ b/encodings/sparse/src/lib.rs @@ -187,7 +187,7 @@ impl VTable for SparseVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { vortex_ensure_eq!( children.len(), 2, @@ -213,6 +213,7 @@ impl VTable for SparseVTable { len, metadata.fill_value.clone(), ) + .map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/zigzag/src/array.rs b/encodings/zigzag/src/array.rs index 319c7e2616d..b4ba66952e7 100644 --- a/encodings/zigzag/src/array.rs +++ b/encodings/zigzag/src/array.rs @@ -126,7 +126,7 @@ impl VTable for ZigZagVTable { _metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if children.len() != 1 { vortex_bail!("Expected 1 child, got {}", children.len()); } @@ -135,7 +135,7 @@ impl VTable for ZigZagVTable { let encoded_type = DType::Primitive(ptype.to_unsigned(), dtype.nullability()); let encoded = children.get(0, &encoded_type, len)?; - ZigZagArray::try_new(encoded) + ZigZagArray::try_new(encoded).map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/zstd/src/array.rs b/encodings/zstd/src/array.rs index 88c9c97c657..037b0be143d 100644 --- a/encodings/zstd/src/array.rs +++ b/encodings/zstd/src/array.rs @@ -215,7 +215,7 @@ impl VTable for ZstdVTable { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let validity = if children.is_empty() { Validity::from(dtype.nullability()) } else if children.len() == 1 { @@ -252,7 +252,8 @@ impl VTable for ZstdVTable { metadata.0.clone(), len, validity, - )) + ) + .into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/encodings/zstd/src/zstd_buffers.rs b/encodings/zstd/src/zstd_buffers.rs index 30c26bbf30a..55b8a33f048 100644 --- a/encodings/zstd/src/zstd_buffers.rs +++ b/encodings/zstd/src/zstd_buffers.rs @@ -10,6 +10,7 @@ use vortex_array::ArrayEq; use vortex_array::ArrayHash; use vortex_array::ArrayRef; use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; use vortex_array::Precision; use vortex_array::ProstMetadata; use vortex_array::buffer::BufferHandle; @@ -438,7 +439,7 @@ impl VTable for ZstdBuffersVTable { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let compressed_buffers: Vec = buffers.to_vec(); let child_arrays: Vec = (0..children.len()) @@ -458,7 +459,7 @@ impl VTable for ZstdBuffersVTable { }; array.validate()?; - Ok(array) + Ok(array.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index e915c8e2002..e47430981d4 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -94,6 +94,10 @@ serde_json = { workspace = true } serde_test = { workspace = true } vortex-array = { path = ".", features = ["_test-harness", "table-display"] } +[[bench]] +name = "patched" +harness = false + [[bench]] name = "search_sorted" harness = false diff --git a/vortex-array/benches/patched.rs b/vortex-array/benches/patched.rs new file mode 100644 index 00000000000..e80e1c5fba4 --- /dev/null +++ b/vortex-array/benches/patched.rs @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use divan::Bencher; +use divan::counter::BytesCount; +use divan::counter::ItemsCount; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::arrays::PatchedArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::patches::Patches; +use vortex_buffer::buffer; + +fn main() { + divan::main() +} + +#[divan::bench(args = [1, 10, 100, 1024, 2048, 65_536])] +fn bench_patch_transpose(bencher: Bencher, n_patches: usize) { + const N: u32 = 1024 * 512; + let numbers = PrimitiveArray::from_iter(0u32..N).into_array(); + + let patch_indices = + PrimitiveArray::from_iter((0..N).step_by(N as usize / n_patches)).into_array(); + let patch_values = buffer![u32::MAX; patch_indices.len()].into_array(); + + let patches = Patches::new(N as usize, 0, patch_indices, patch_values, None).unwrap(); + + let mut ctx = ExecutionCtx::new(LEGACY_SESSION.clone()); + + bencher.counter(ItemsCount::new(n_patches)).bench_local(|| { + PatchedArray::from_array_and_patches(numbers.clone(), &patches, &mut ctx).unwrap() + }); +} diff --git a/vortex-array/src/arrays/bool/vtable/mod.rs b/vortex-array/src/arrays/bool/vtable/mod.rs index f935f89236c..e3d623696dd 100644 --- a/vortex-array/src/arrays/bool/vtable/mod.rs +++ b/vortex-array/src/arrays/bool/vtable/mod.rs @@ -149,7 +149,7 @@ impl VTable for BoolVTable { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if buffers.len() != 1 { vortex_bail!("Expected 1 buffer, got {}", buffers.len()); } @@ -166,6 +166,7 @@ impl VTable for BoolVTable { let buffer = buffers[0].clone(); BoolArray::try_new_from_handle(buffer, metadata.offset as usize, len, validity) + .map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/chunked/vtable/mod.rs b/vortex-array/src/arrays/chunked/vtable/mod.rs index ab05f262563..621bf0ba461 100644 --- a/vortex-array/src/arrays/chunked/vtable/mod.rs +++ b/vortex-array/src/arrays/chunked/vtable/mod.rs @@ -148,7 +148,7 @@ impl VTable for ChunkedVTable { _metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if children.is_empty() { vortex_bail!("Chunked array needs at least one child"); } @@ -194,7 +194,8 @@ impl VTable for ChunkedVTable { chunk_offsets, chunks, stats_set: Default::default(), - }) + } + .into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/constant/vtable/mod.rs b/vortex-array/src/arrays/constant/vtable/mod.rs index 14017fd4044..0e20c0e01d9 100644 --- a/vortex-array/src/arrays/constant/vtable/mod.rs +++ b/vortex-array/src/arrays/constant/vtable/mod.rs @@ -156,8 +156,8 @@ impl VTable for ConstantVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], _children: &dyn ArrayChildren, - ) -> VortexResult { - Ok(ConstantArray::new(metadata.clone(), len)) + ) -> VortexResult { + Ok(ConstantArray::new(metadata.clone(), len).into_array()) } fn with_children(_array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/decimal/vtable/mod.rs b/vortex-array/src/arrays/decimal/vtable/mod.rs index 65cc99da536..d8e0a102a0b 100644 --- a/vortex-array/src/arrays/decimal/vtable/mod.rs +++ b/vortex-array/src/arrays/decimal/vtable/mod.rs @@ -156,7 +156,7 @@ impl VTable for DecimalVTable { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if buffers.len() != 1 { vortex_bail!("Expected 1 buffer, got {}", buffers.len()); } @@ -183,6 +183,7 @@ impl VTable for DecimalVTable { D::DECIMAL_TYPE ); DecimalArray::try_new_handle(values, metadata.values_type(), *decimal_dtype, validity) + .map(|a| a.into_array()) }) } diff --git a/vortex-array/src/arrays/dict/vtable/mod.rs b/vortex-array/src/arrays/dict/vtable/mod.rs index 6c6555b596a..9c257e63a39 100644 --- a/vortex-array/src/arrays/dict/vtable/mod.rs +++ b/vortex-array/src/arrays/dict/vtable/mod.rs @@ -152,7 +152,7 @@ impl VTable for DictVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if children.len() != 2 { vortex_bail!( "Expected 2 children for dict encoding, found {}", @@ -172,7 +172,9 @@ impl VTable for DictVTable { // SAFETY: We've validated the metadata and children. Ok(unsafe { - DictArray::new_unchecked(codes, values).set_all_values_referenced(all_values_referenced) + DictArray::new_unchecked(codes, values) + .set_all_values_referenced(all_values_referenced) + .into_array() }) } diff --git a/vortex-array/src/arrays/extension/vtable/mod.rs b/vortex-array/src/arrays/extension/vtable/mod.rs index ceab73b456f..7e9932611de 100644 --- a/vortex-array/src/arrays/extension/vtable/mod.rs +++ b/vortex-array/src/arrays/extension/vtable/mod.rs @@ -125,7 +125,7 @@ impl VTable for ExtensionVTable { _metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let DType::Extension(ext_dtype) = dtype else { vortex_bail!("Not an extension DType"); }; @@ -133,7 +133,7 @@ impl VTable for ExtensionVTable { vortex_bail!("Expected 1 child, got {}", children.len()); } let storage = children.get(0, ext_dtype.storage_dtype(), len)?; - Ok(ExtensionArray::new(ext_dtype.clone(), storage)) + Ok(ExtensionArray::new(ext_dtype.clone(), storage).into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/filter/vtable.rs b/vortex-array/src/arrays/filter/vtable.rs index 23b22612873..b68e763822d 100644 --- a/vortex-array/src/arrays/filter/vtable.rs +++ b/vortex-array/src/arrays/filter/vtable.rs @@ -131,14 +131,15 @@ impl VTable for FilterVTable { metadata: &FilterMetadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { assert_eq!(len, metadata.0.true_count()); let child = children.get(0, dtype, metadata.0.len())?; Ok(FilterArray { child, mask: metadata.0.clone(), stats: Default::default(), - }) + } + .into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/fixed_size_list/vtable/mod.rs b/vortex-array/src/arrays/fixed_size_list/vtable/mod.rs index 4544dd976a4..ba8aa976b96 100644 --- a/vortex-array/src/arrays/fixed_size_list/vtable/mod.rs +++ b/vortex-array/src/arrays/fixed_size_list/vtable/mod.rs @@ -165,7 +165,7 @@ impl VTable for FixedSizeListVTable { _metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { vortex_ensure!( buffers.is_empty(), "`FixedSizeListVTable::build` expects no buffers" @@ -192,7 +192,7 @@ impl VTable for FixedSizeListVTable { let num_elements = len * (*list_size as usize); let elements = children.get(0, element_dtype.as_ref(), num_elements)?; - FixedSizeListArray::try_new(elements, *list_size, validity, len) + FixedSizeListArray::try_new(elements, *list_size, validity, len).map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/list/vtable/mod.rs b/vortex-array/src/arrays/list/vtable/mod.rs index 99c052565b2..958bdf1fb6e 100644 --- a/vortex-array/src/arrays/list/vtable/mod.rs +++ b/vortex-array/src/arrays/list/vtable/mod.rs @@ -157,7 +157,7 @@ impl VTable for ListVTable { metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let validity = if children.len() == 2 { Validity::from(dtype.nullability()) } else if children.len() == 3 { @@ -182,7 +182,7 @@ impl VTable for ListVTable { len + 1, )?; - ListArray::try_new(elements, offsets, validity) + ListArray::try_new(elements, offsets, validity).map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/listview/vtable/mod.rs b/vortex-array/src/arrays/listview/vtable/mod.rs index cbef285662e..50ee78f6f9c 100644 --- a/vortex-array/src/arrays/listview/vtable/mod.rs +++ b/vortex-array/src/arrays/listview/vtable/mod.rs @@ -164,7 +164,7 @@ impl VTable for ListViewVTable { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { vortex_ensure!( buffers.is_empty(), "`ListViewArray::build` expects no buffers" @@ -207,7 +207,7 @@ impl VTable for ListViewVTable { len, )?; - ListViewArray::try_new(elements, offsets, sizes, validity) + ListViewArray::try_new(elements, offsets, sizes, validity).map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/masked/vtable/mod.rs b/vortex-array/src/arrays/masked/vtable/mod.rs index 3ba48543c85..d17da49b20c 100644 --- a/vortex-array/src/arrays/masked/vtable/mod.rs +++ b/vortex-array/src/arrays/masked/vtable/mod.rs @@ -138,7 +138,7 @@ impl VTable for MaskedVTable { _metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if !buffers.is_empty() { vortex_bail!("Expected 0 buffer, got {}", buffers.len()); } @@ -157,7 +157,7 @@ impl VTable for MaskedVTable { ); }; - MaskedArray::try_new(child, validity) + MaskedArray::try_new(child, validity).map(|a| a.into_array()) } fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult { diff --git a/vortex-array/src/arrays/mod.rs b/vortex-array/src/arrays/mod.rs index 0d5fa96c258..0a716773b26 100644 --- a/vortex-array/src/arrays/mod.rs +++ b/vortex-array/src/arrays/mod.rs @@ -28,6 +28,7 @@ mod list; mod listview; mod masked; mod null; +mod patched; mod primitive; mod scalar_fn; mod shared; @@ -54,6 +55,7 @@ pub use list::*; pub use listview::*; pub use masked::*; pub use null::*; +pub use patched::*; pub use primitive::*; pub use scalar_fn::*; pub use shared::*; diff --git a/vortex-array/src/arrays/null/mod.rs b/vortex-array/src/arrays/null/mod.rs index c3549fa9255..5cc1a2ae1db 100644 --- a/vortex-array/src/arrays/null/mod.rs +++ b/vortex-array/src/arrays/null/mod.rs @@ -110,8 +110,8 @@ impl VTable for NullVTable { _metadata: &Self::Metadata, _buffers: &[BufferHandle], _children: &dyn ArrayChildren, - ) -> VortexResult { - Ok(NullArray::new(len)) + ) -> VortexResult { + Ok(NullArray::new(len).into_array()) } fn with_children(_array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/patched/array.rs b/vortex-array/src/arrays/patched/array.rs new file mode 100644 index 00000000000..1be462a16de --- /dev/null +++ b/vortex-array/src/arrays/patched/array.rs @@ -0,0 +1,219 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_buffer::Buffer; +use vortex_buffer::BufferMut; +use vortex_error::VortexResult; +use vortex_error::vortex_ensure; + +use crate::ArrayRef; +use crate::Canonical; +use crate::DynArray; +use crate::ExecutionCtx; +use crate::arrays::patched::PatchAccessor; +use crate::arrays::patched::TransposedPatches; +use crate::arrays::patched::patch_lanes; +use crate::buffer::BufferHandle; +use crate::dtype::IntegerPType; +use crate::dtype::NativePType; +use crate::dtype::PType; +use crate::match_each_native_ptype; +use crate::match_each_unsigned_integer_ptype; +use crate::patches::Patches; +use crate::stats::ArrayStats; + +/// An array that partially "patches" another array with new values. +/// +/// Patched arrays implement the set of nodes that do this instead here...I think? +#[derive(Debug, Clone)] +pub struct PatchedArray { + /// The inner array that is being patched. This is the zeroth child. + pub(super) inner: ArrayRef, + + /// Number of 1024-element chunks. Pre-computed for convenience. + pub(super) n_chunks: usize, + + /// Number of lanes the patch indices and values have been split into. Each of the `n_chunks` + /// of 1024 values is split into `n_lanes` lanes horizontally, each lane having 1024 / n_lanes + /// values that might be patched. + pub(super) n_lanes: usize, + + /// Offset into the first chunk + pub(super) offset: usize, + /// Total length. + pub(super) len: usize, + + /// lane offsets. The PType of these MUST be u32 + pub(super) lane_offsets: BufferHandle, + /// indices within a 1024-element chunk. The PType of these MUST be u16 + pub(super) indices: BufferHandle, + /// patch values corresponding to the indices. The ptype is specified by `values_ptype`. + pub(super) values: BufferHandle, + /// PType of the scalars in `values`. Can be any native type. + pub(super) values_ptype: PType, + + pub(super) stats_set: ArrayStats, +} + +impl PatchedArray { + pub fn from_array_and_patches( + inner: ArrayRef, + patches: &Patches, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + vortex_ensure!( + inner.dtype().eq_with_nullability_superset(patches.dtype()), + "array DType must match patches DType" + ); + + let values_ptype = patches.dtype().as_ptype(); + + let TransposedPatches { + n_chunks, + n_lanes, + lane_offsets, + indices, + values, + } = transpose_patches(patches, ctx)?; + + let len = inner.len(); + + Ok(Self { + inner, + n_chunks, + n_lanes, + values_ptype, + offset: 0, + len, + lane_offsets: BufferHandle::new_host(lane_offsets), + indices: BufferHandle::new_host(indices), + values: BufferHandle::new_host(values), + stats_set: ArrayStats::default(), + }) + } + + /// Get an accessor, which allows ranged access to patches by chunk/lane. + pub fn accessor(&self) -> PatchAccessor<'_, V> { + PatchAccessor { + n_lanes: self.n_lanes, + lane_offsets: self.lane_offsets.as_host().reinterpret::(), + indices: self.indices.as_host().reinterpret::(), + values: self.values.as_host().reinterpret::(), + } + } +} + +/// Transpose a set of patches from the default sorted layout into the data parallel layout. +#[allow(clippy::cognitive_complexity)] +fn transpose_patches(patches: &Patches, ctx: &mut ExecutionCtx) -> VortexResult { + let array_len = patches.array_len(); + let offset = patches.offset(); + + let indices = patches + .indices() + .clone() + .execute::(ctx)? + .into_primitive(); + + let values = patches + .values() + .clone() + .execute::(ctx)? + .into_primitive(); + + let indices_ptype = indices.ptype(); + let values_ptype = values.ptype(); + + let indices = indices.buffer_handle().clone().unwrap_host(); + let values = values.buffer_handle().clone().unwrap_host(); + + match_each_unsigned_integer_ptype!(indices_ptype, |I| { + match_each_native_ptype!(values_ptype, |V| { + let indices: Buffer = Buffer::from_byte_buffer(indices); + let values: Buffer = Buffer::from_byte_buffer(values); + + Ok(transpose( + indices.as_slice(), + values.as_slice(), + offset, + array_len, + )) + }) + }) +} + +#[allow(clippy::cast_possible_truncation)] +fn transpose( + indices_in: &[I], + values_in: &[V], + offset: usize, + array_len: usize, +) -> TransposedPatches { + // Total number of slots is number of chunks times number of lanes. + let n_chunks = array_len.div_ceil(1024); + assert!( + n_chunks <= u32::MAX as usize, + "Cannot transpose patches for array with >= 4 trillion elements" + ); + + let n_lanes = patch_lanes::(); + + // We know upfront how many indices and values we'll have. + let mut indices_buffer = BufferMut::with_capacity(indices_in.len()); + let mut values_buffer = BufferMut::with_capacity(values_in.len()); + + // number of patches in each chunk. + let mut lane_offsets: BufferMut = BufferMut::zeroed(n_chunks * n_lanes + 1); + + // Scan the index/values once to get chunk/lane counts + for index in indices_in { + let index = index.as_() - offset; + let chunk = index / 1024; + let lane = index % n_lanes; + + lane_offsets[chunk * n_lanes + lane + 1] += 1; + } + + // Prefix-sum sizes -> offsets + for index in 1..lane_offsets.len() { + lane_offsets[index] += lane_offsets[index - 1]; + } + + // Loop over patches, writing thme to final positions + let indices_out = indices_buffer.spare_capacity_mut(); + let values_out = values_buffer.spare_capacity_mut(); + for (index, &value) in std::iter::zip(indices_in, values_in) { + let index = index.as_() - offset; + let chunk = index / 1024; + let lane = index % n_lanes; + + let position = &mut lane_offsets[chunk * n_lanes + lane]; + indices_out[*position as usize].write((index % 1024) as u16); + values_out[*position as usize].write(value); + *position += 1; + } + + // SAFETY: we know there are exactly indices_in.len() indices/values, and we just + // set them to the appropriate values in the loop above. + unsafe { + indices_buffer.set_len(indices_in.len()); + values_buffer.set_len(values_in.len()); + } + + // Now, pass over all the indices and values again and subtract out the position increments. + for index in indices_in { + let index = index.as_() - offset; + let chunk = index / 1024; + let lane = index % n_lanes; + + lane_offsets[chunk * n_lanes + lane] -= 1; + } + + TransposedPatches { + n_chunks, + n_lanes, + lane_offsets: lane_offsets.freeze().into_byte_buffer(), + indices: indices_buffer.freeze().into_byte_buffer(), + values: values_buffer.freeze().into_byte_buffer(), + } +} diff --git a/vortex-array/src/arrays/patched/compute/mod.rs b/vortex-array/src/arrays/patched/compute/mod.rs new file mode 100644 index 00000000000..09f0654183a --- /dev/null +++ b/vortex-array/src/arrays/patched/compute/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub(crate) mod rules; diff --git a/vortex-array/src/arrays/patched/compute/rules.rs b/vortex-array/src/arrays/patched/compute/rules.rs new file mode 100644 index 00000000000..7109ddd25c4 --- /dev/null +++ b/vortex-array/src/arrays/patched/compute/rules.rs @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use crate::arrays::PatchedVTable; +use crate::arrays::SliceReduceAdaptor; +use crate::optimizer::rules::ParentRuleSet; + +pub(crate) const PARENT_RULES: ParentRuleSet = + ParentRuleSet::new(&[ParentRuleSet::lift(&SliceReduceAdaptor(PatchedVTable))]); diff --git a/vortex-array/src/arrays/patched/mod.rs b/vortex-array/src/arrays/patched/mod.rs new file mode 100644 index 00000000000..f035204c188 --- /dev/null +++ b/vortex-array/src/arrays/patched/mod.rs @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod array; +mod compute; +mod vtable; + +pub use array::*; +use vortex_buffer::ByteBuffer; +pub use vtable::*; + +/// Patches that have been transposed into GPU format. +struct TransposedPatches { + n_chunks: usize, + n_lanes: usize, + lane_offsets: ByteBuffer, + indices: ByteBuffer, + values: ByteBuffer, +} + +/// Number of lanes used at patch time for a value of type `V`. +/// +/// This is *NOT* equal to the number of FastLanes lanes for the type `V`, rather this is going to +/// correspond to how many "lanes" we will end up copying data on. +/// +/// When applied on the CPU, this configuration doesn't really matter. On the GPU, it is based +/// on the number of patches involved here. +const fn patch_lanes() -> usize { + // For types 32-bits or smaller, we use a 32 lane configuration, and for 64-bit we use 16 lanes. + // This matches up with the number of lanes we use to execute copying results from bit-unpacking + // from shared to global memory. + if size_of::() < 8 { 32 } else { 16 } +} + +pub struct PatchAccessor<'a, V> { + n_lanes: usize, + lane_offsets: &'a [u32], + indices: &'a [u16], + values: &'a [V], +} + +impl<'a, V: Sized> PatchAccessor<'a, V> { + /// Access the patches for a particular lane + pub fn access(&'a self, chunk: usize, lane: usize) -> LanePatches<'a, V> { + let start = self.lane_offsets[chunk * self.n_lanes + lane] as usize; + let stop = self.lane_offsets[chunk * self.n_lanes + lane + 1] as usize; + + LanePatches { + indices: &self.indices[start..stop], + values: &self.values[start..stop], + } + } +} + +pub struct LanePatches<'a, V> { + pub indices: &'a [u16], + pub values: &'a [V], +} + +impl<'a, V: Copy> LanePatches<'a, V> { + pub fn len(&self) -> usize { + self.indices.len() + } + + pub fn is_empty(&self) -> bool { + self.indices.is_empty() + } + + pub fn iter(&self) -> impl Iterator { + self.indices + .iter() + .copied() + .zip(self.values.iter().copied()) + } +} diff --git a/vortex-array/src/arrays/patched/vtable/mod.rs b/vortex-array/src/arrays/patched/vtable/mod.rs new file mode 100644 index 00000000000..a97ee22187a --- /dev/null +++ b/vortex-array/src/arrays/patched/vtable/mod.rs @@ -0,0 +1,377 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod operations; +mod slice; + +use std::hash::Hash; +use std::hash::Hasher; + +use vortex_buffer::Buffer; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; + +use crate::ArrayEq; +use crate::ArrayHash; +use crate::ArrayRef; +use crate::Canonical; +use crate::DeserializeMetadata; +use crate::DynArray; +use crate::ExecutionCtx; +use crate::IntoArray; +use crate::Precision; +use crate::ProstMetadata; +use crate::arrays::PrimitiveArray; +use crate::arrays::PrimitiveArrayParts; +use crate::arrays::patched::PatchedArray; +use crate::arrays::patched::compute::rules::PARENT_RULES; +use crate::arrays::patched::patch_lanes; +use crate::buffer::BufferHandle; +use crate::dtype::DType; +use crate::dtype::NativePType; +use crate::match_each_native_ptype; +use crate::serde::ArrayChildren; +use crate::stats::ArrayStats; +use crate::stats::StatsSetRef; +use crate::vtable; +use crate::vtable::ArrayId; +use crate::vtable::VTable; +use crate::vtable::ValidityChild; +use crate::vtable::ValidityVTableFromChild; + +vtable!(Patched); + +#[derive(Debug)] +pub struct PatchedVTable; + +impl ValidityChild for PatchedVTable { + fn validity_child(array: &PatchedArray) -> &ArrayRef { + &array.inner + } +} + +#[derive(Clone, prost::Message)] +pub struct PatchedMetadata { + #[prost(uint32, tag = "1")] + pub(crate) offset: u32, +} + +impl VTable for PatchedVTable { + type Array = PatchedArray; + type Metadata = ProstMetadata; + type OperationsVTable = Self; + type ValidityVTable = ValidityVTableFromChild; + + fn id(_array: &Self::Array) -> ArrayId { + ArrayId::new_ref("vortex.patched") + } + + fn len(array: &Self::Array) -> usize { + array.len + } + + fn dtype(array: &Self::Array) -> &DType { + array.inner.dtype() + } + + fn stats(array: &Self::Array) -> StatsSetRef<'_> { + array.stats_set.to_ref(array.as_ref()) + } + + fn array_hash(array: &Self::Array, state: &mut H, precision: Precision) { + array.inner.array_hash(state, precision); + array.values_ptype.hash(state); + array.n_chunks.hash(state); + array.n_lanes.hash(state); + array.lane_offsets.array_hash(state, precision); + array.indices.array_hash(state, precision); + array.values.array_hash(state, precision); + } + + fn array_eq(array: &Self::Array, other: &Self::Array, precision: Precision) -> bool { + array.n_chunks == other.n_chunks + && array.n_lanes == other.n_lanes + && array.values_ptype == other.values_ptype + && array.inner.array_eq(&other.inner, precision) + && array.lane_offsets.array_eq(&other.lane_offsets, precision) + && array.indices.array_eq(&other.indices, precision) + && array.values.array_eq(&other.values, precision) + } + + fn nbuffers(_array: &Self::Array) -> usize { + 3 + } + + fn buffer(array: &Self::Array, idx: usize) -> BufferHandle { + match idx { + 0 => array.lane_offsets.clone(), + 1 => array.indices.clone(), + 2 => array.values.clone(), + _ => vortex_panic!("invalid buffer index for PatchedArray: {idx}"), + } + } + + fn buffer_name(_array: &Self::Array, idx: usize) -> Option { + match idx { + 0 => Some("lane_offsets".to_string()), + 1 => Some("patch_indices".to_string()), + 2 => Some("patch_values".to_string()), + _ => vortex_panic!("invalid buffer index for PatchedArray: {idx}"), + } + } + + fn nchildren(_array: &Self::Array) -> usize { + 1 + } + + fn child(array: &Self::Array, idx: usize) -> ArrayRef { + if idx == 0 { + array.inner.clone() + } else { + vortex_panic!("invalid child index for PatchedArray: {idx}"); + } + } + + fn child_name(_array: &Self::Array, idx: usize) -> String { + if idx == 0 { + "inner".to_string() + } else { + vortex_panic!("invalid child index for PatchedArray: {idx}"); + } + } + + #[allow(clippy::cast_possible_truncation)] + fn metadata(array: &Self::Array) -> VortexResult { + Ok(ProstMetadata(PatchedMetadata { + offset: array.offset as u32, + })) + } + + fn serialize(_metadata: Self::Metadata) -> VortexResult>> { + Ok(Some(vec![])) + } + + fn deserialize( + bytes: &[u8], + _dtype: &DType, + _len: usize, + _buffers: &[BufferHandle], + _session: &VortexSession, + ) -> VortexResult { + let inner = as DeserializeMetadata>::deserialize(bytes)?; + Ok(ProstMetadata(inner)) + } + + fn build( + dtype: &DType, + len: usize, + metadata: &Self::Metadata, + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + ) -> VortexResult { + let inner = children.get(0, dtype, len)?; + + let n_chunks = len.div_ceil(1024); + + let n_lanes = match_each_native_ptype!(dtype.as_ptype(), |P| { patch_lanes::

() }); + + let &[lane_offsets, indices, values] = &buffers else { + vortex_bail!("invalid buffer count for PatchedArray"); + }; + + Ok(PatchedArray { + inner, + n_chunks, + n_lanes, + offset: metadata.offset as usize, + len, + lane_offsets: lane_offsets.clone(), + indices: indices.clone(), + values: values.clone(), + values_ptype: dtype.as_ptype(), + stats_set: ArrayStats::default(), + } + .into_array()) + } + + fn with_children(array: &mut Self::Array, mut children: Vec) -> VortexResult<()> { + vortex_ensure!( + children.len() == 1, + "PatchedArray must have exactly 1 child" + ); + + array.inner = children.remove(0); + + Ok(()) + } + + fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult { + let inner = array + .inner + .clone() + .execute::(ctx)? + .into_primitive(); + + let PrimitiveArrayParts { + buffer, + ptype, + validity, + } = inner.into_parts(); + + let lane_offsets: Buffer = + Buffer::from_byte_buffer(array.lane_offsets.clone().unwrap_host()); + let indices: Buffer = Buffer::from_byte_buffer(array.indices.clone().unwrap_host()); + + let patched_values = match_each_native_ptype!(array.values_ptype, |V| { + let mut output = Buffer::::from_byte_buffer(buffer.unwrap_host()).into_mut(); + let values: Buffer = Buffer::from_byte_buffer(array.values.clone().unwrap_host()); + + let offset = array.offset; + let len = array.len; + + apply::( + &mut output, + offset, + len, + array.n_chunks, + array.n_lanes, + &lane_offsets, + &indices, + &values, + ); + + // The output will always be aligned to a chunk boundary, we apply the offset/len + // at the end to slice to only the in-bounds values. + let _output = output.as_slice(); + let output = output.freeze().slice(offset..offset + len); + + PrimitiveArray::from_byte_buffer(output.into_byte_buffer(), ptype, validity) + }); + + Ok(patched_values.into_array()) + } + + fn reduce_parent( + array: &Self::Array, + parent: &ArrayRef, + child_idx: usize, + ) -> VortexResult> { + PARENT_RULES.evaluate(array, parent, child_idx) + } +} + +/// Apply patches on top of the existing value types. +fn apply( + output: &mut [V], + offset: usize, + len: usize, + n_chunks: usize, + n_lanes: usize, + lane_offsets: &[u32], + indices: &[u16], + values: &[V], +) { + for chunk in 0..n_chunks { + let start = lane_offsets[chunk * n_lanes] as usize; + let stop = lane_offsets[chunk * n_lanes + n_lanes] as usize; + + for idx in start..stop { + // the indices slice is measured as an offset into the 1024-value chunk. + let index = chunk * 1024 + indices[idx] as usize; + if index < offset || index >= offset + len { + continue; + } + + let value = values[idx]; + output[index] = value; + } + } +} + +#[cfg(test)] +mod tests { + use vortex_buffer::buffer; + use vortex_buffer::buffer_mut; + use vortex_session::VortexSession; + + use crate::Canonical; + use crate::ExecutionCtx; + use crate::IntoArray; + use crate::arrays::PatchedArray; + use crate::dtype::Nullability; + use crate::patches::Patches; + use crate::scalar::Scalar; + + #[test] + fn test_execute() { + let values = buffer![0u16; 1024].into_array(); + let patches = Patches::new( + 1024, + 0, + buffer![1u32, 2, 3].into_array(), + buffer![1u16; 3].into_array(), + None, + ) + .unwrap(); + + let session = VortexSession::empty(); + let mut ctx = ExecutionCtx::new(session); + + let array = PatchedArray::from_array_and_patches(values, &patches, &mut ctx) + .unwrap() + .into_array(); + + let executed = array + .execute::(&mut ctx) + .unwrap() + .into_primitive() + .into_buffer::(); + + let mut expected = buffer_mut![0u16; 1024]; + expected[1] = 1; + expected[2] = 1; + expected[3] = 1; + + assert_eq!(executed, expected.freeze()); + } + + #[test] + fn test_scalar_at() { + let values = buffer![0u16; 1024].into_array(); + let patches = Patches::new( + 1024, + 0, + buffer![1u32, 2, 3].into_array(), + buffer![1u16; 3].into_array(), + None, + ) + .unwrap(); + + let session = VortexSession::empty(); + let mut ctx = ExecutionCtx::new(session); + + let array = PatchedArray::from_array_and_patches(values, &patches, &mut ctx) + .unwrap() + .into_array(); + + assert_eq!( + array.scalar_at(0).unwrap(), + Scalar::primitive(0u16, Nullability::NonNullable) + ); + assert_eq!( + array.scalar_at(1).unwrap(), + Scalar::primitive(1u16, Nullability::NonNullable) + ); + assert_eq!( + array.scalar_at(2).unwrap(), + Scalar::primitive(1u16, Nullability::NonNullable) + ); + assert_eq!( + array.scalar_at(3).unwrap(), + Scalar::primitive(1u16, Nullability::NonNullable) + ); + } +} diff --git a/vortex-array/src/arrays/patched/vtable/operations.rs b/vortex-array/src/arrays/patched/vtable/operations.rs new file mode 100644 index 00000000000..46dec4d5a58 --- /dev/null +++ b/vortex-array/src/arrays/patched/vtable/operations.rs @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; + +use crate::DynArray; +use crate::arrays::patched::PatchedArray; +use crate::arrays::patched::PatchedVTable; +use crate::arrays::patched::patch_lanes; +use crate::match_each_native_ptype; +use crate::scalar::Scalar; +use crate::vtable::OperationsVTable; + +impl OperationsVTable for PatchedVTable { + fn scalar_at(array: &PatchedArray, index: usize) -> VortexResult { + // First check the patches + let chunk = index / 1024; + #[allow(clippy::cast_possible_truncation)] + let chunk_index = (index % 1024) as u16; + match_each_native_ptype!(array.values_ptype, |V| { + let lane = index % patch_lanes::(); + let accessor = array.accessor::(); + let patches = accessor.access(chunk, lane); + // NOTE: we do linear scan as lane has <= 32 patches, binary search would likely + // be slower. + for (patch_index, patch_value) in patches.iter() { + if patch_index == chunk_index { + return Ok(Scalar::primitive( + patch_value, + array.inner.dtype().nullability(), + )); + } + } + }); + + // Otherwise, access the underlying value. + array.inner.scalar_at(index) + } +} diff --git a/vortex-array/src/arrays/patched/vtable/slice.rs b/vortex-array/src/arrays/patched/vtable/slice.rs new file mode 100644 index 00000000000..f46b91079ec --- /dev/null +++ b/vortex-array/src/arrays/patched/vtable/slice.rs @@ -0,0 +1,175 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use vortex_error::VortexResult; + +use crate::ArrayRef; +use crate::DynArray; +use crate::IntoArray; +use crate::arrays::PatchedArray; +use crate::arrays::PatchedVTable; +use crate::arrays::SliceReduce; +use crate::stats::ArrayStats; + +/// Is this something that uses a SliceKernel or a SliceReduce +impl SliceReduce for PatchedVTable { + fn slice(array: &Self::Array, range: Range) -> VortexResult> { + // We **always** slice at 1024-element chunk boundaries. We keep the offset + len + // around so that when we execute we know how much to chop off. + let new_offset = (range.start + array.offset) % 1024; + let new_len = range.end - range.start; + + let chunk_start = (range.start + array.offset) / 1024; + let chunk_stop = (range.end + array.offset).div_ceil(1024); + + // Slice the inner to chunk boundaries + let inner_start = chunk_start * 1024; + let inner_stop = (chunk_stop * 1024).min(array.inner.len()); + let inner = array.inner.slice(inner_start..inner_stop)?; + + // Slice to only maintain offsets to the sliced chunks + let sliced_lane_offsets = array + .lane_offsets + .slice_typed::((chunk_start * array.n_lanes)..(chunk_stop * array.n_lanes) + 1); + + Ok(Some( + PatchedArray { + inner, + n_chunks: chunk_stop - chunk_start, + n_lanes: array.n_lanes, + + offset: new_offset, + len: new_len, + lane_offsets: sliced_lane_offsets, + indices: array.indices.clone(), + values: array.values.clone(), + values_ptype: array.values_ptype, + stats_set: ArrayStats::default(), + } + .into_array(), + )) + } +} + +#[cfg(test)] +mod tests { + use std::ops::Range; + + use rstest::rstest; + use vortex_buffer::Buffer; + use vortex_buffer::BufferMut; + use vortex_buffer::buffer; + + use crate::Canonical; + use crate::DynArray; + use crate::ExecutionCtx; + use crate::IntoArray; + use crate::LEGACY_SESSION; + use crate::arrays::PatchedArray; + use crate::arrays::PrimitiveArray; + use crate::assert_arrays_eq; + use crate::dtype::NativePType; + use crate::patches::Patches; + + #[test] + fn test_slice_basic() { + let values = buffer![0u16; 512].into_array(); + let patch_indices = buffer![1u32, 8, 30].into_array(); + let patch_values = buffer![u16::MAX; 3].into_array(); + let patches = Patches::new(512, 0, patch_indices, patch_values, None).unwrap(); + + let mut ctx = ExecutionCtx::new(LEGACY_SESSION.clone()); + + let patched_array = + PatchedArray::from_array_and_patches(values, &patches, &mut ctx).unwrap(); + + let sliced = patched_array + .slice(1..10) + .unwrap() + .execute::(&mut ctx) + .unwrap() + .into_primitive(); + + let executed = sliced.as_slice::(); + + assert_eq!(&[u16::MAX, 0, 0, 0, 0, 0, 0, u16::MAX, 0], executed); + } + + #[rstest] + #[case::trivial(buffer![1u64; 2], buffer![1u32], buffer![u64::MAX], 1..2)] + #[case::one_chunk(buffer![0u64; 1024], buffer![1u32, 8, 30], buffer![u64::MAX; 3], 1..10)] + #[case::multichunk(buffer![1u64; 10_000], buffer![0u32, 1, 2, 3, 4, 16, 17, 18, 19, 1024, 2048, 2049], buffer![u64::MAX; 12], 1024..5000)] + fn test_cases( + #[case] inner: Buffer, + #[case] patch_indices: Buffer, + #[case] patch_values: Buffer, + #[case] range: Range, + ) { + // Create patched array. + let patches = Patches::new( + inner.len(), + 0, + patch_indices.into_array(), + patch_values.into_array(), + None, + ) + .unwrap(); + + let mut ctx = ExecutionCtx::new(LEGACY_SESSION.clone()); + + let patched_array = + PatchedArray::from_array_and_patches(inner.into_array(), &patches, &mut ctx).unwrap(); + + // Verify that applying slice first yields same result as applying slice at end. + let slice_first = patched_array + .slice(range.clone()) + .unwrap() + .execute::(&mut ctx) + .unwrap() + .into_array(); + + let slice_last = patched_array + .into_array() + .execute::(&mut ctx) + .unwrap() + .into_primitive() + .slice(range) + .unwrap(); + + assert_arrays_eq!(slice_first, slice_last); + } + + #[test] + fn test_stacked_slices() { + let values = PrimitiveArray::from_iter(0u64..10_000).into_array(); + + let patched_indices = buffer![1u32, 2, 1024, 2048, 3072, 3088].into_array(); + let patched_values = buffer![0u64, 1, 2, 3, 4, 5].into_array(); + + let patches = Patches::new(10_000, 0, patched_indices, patched_values, None).unwrap(); + let mut ctx = ExecutionCtx::new(LEGACY_SESSION.clone()); + + let patched_array = + PatchedArray::from_array_and_patches(values, &patches, &mut ctx).unwrap(); + + let sliced = patched_array + .slice(1024..5000) + .unwrap() + .slice(1..2065) + .unwrap() + .execute::(&mut ctx) + .unwrap() + .into_array(); + + let mut expected = BufferMut::from_iter(1025u64..=3088); + expected[1023] = 3; + expected[2047] = 4; + expected[2063] = 5; + + let expected = expected.into_array(); + + assert_arrays_eq!(expected, sliced); + } +} diff --git a/vortex-array/src/arrays/primitive/vtable/mod.rs b/vortex-array/src/arrays/primitive/vtable/mod.rs index 479d05bb2d2..d52bf404676 100644 --- a/vortex-array/src/arrays/primitive/vtable/mod.rs +++ b/vortex-array/src/arrays/primitive/vtable/mod.rs @@ -135,7 +135,7 @@ impl VTable for PrimitiveVTable { _metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { if buffers.len() != 1 { vortex_bail!("Expected 1 buffer, got {}", buffers.len()); } @@ -176,9 +176,7 @@ impl VTable for PrimitiveVTable { // SAFETY: checked ahead of time unsafe { - Ok(PrimitiveArray::new_unchecked_from_handle( - buffer, ptype, validity, - )) + Ok(PrimitiveArray::new_unchecked_from_handle(buffer, ptype, validity).into_array()) } } diff --git a/vortex-array/src/arrays/scalar_fn/vtable/mod.rs b/vortex-array/src/arrays/scalar_fn/vtable/mod.rs index 8e1debf9554..66fb5d6cb4c 100644 --- a/vortex-array/src/arrays/scalar_fn/vtable/mod.rs +++ b/vortex-array/src/arrays/scalar_fn/vtable/mod.rs @@ -156,7 +156,7 @@ impl VTable for ScalarFnVTable { metadata: &ScalarFnMetadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let children: Vec<_> = metadata .child_dtypes .iter() @@ -180,7 +180,8 @@ impl VTable for ScalarFnVTable { len, children, stats: Default::default(), - }) + } + .into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/shared/vtable.rs b/vortex-array/src/arrays/shared/vtable.rs index d79ca3de4d6..a937855103d 100644 --- a/vortex-array/src/arrays/shared/vtable.rs +++ b/vortex-array/src/arrays/shared/vtable.rs @@ -12,6 +12,7 @@ use crate::ArrayRef; use crate::Canonical; use crate::EmptyMetadata; use crate::ExecutionCtx; +use crate::IntoArray; use crate::Precision; use crate::arrays::shared::SharedArray; use crate::buffer::BufferHandle; @@ -125,9 +126,9 @@ impl VTable for SharedVTable { _metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn crate::serde::ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let child = children.get(0, dtype, len)?; - Ok(SharedArray::new(child)) + Ok(SharedArray::new(child).into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/slice/vtable.rs b/vortex-array/src/arrays/slice/vtable.rs index f90392f0dbb..23f22470880 100644 --- a/vortex-array/src/arrays/slice/vtable.rs +++ b/vortex-array/src/arrays/slice/vtable.rs @@ -20,6 +20,7 @@ use crate::ArrayHash; use crate::ArrayRef; use crate::Canonical; use crate::DynArray; +use crate::IntoArray; use crate::Precision; use crate::arrays::slice::array::SliceArray; use crate::arrays::slice::rules::PARENT_RULES; @@ -131,14 +132,15 @@ impl VTable for SliceVTable { metadata: &SliceMetadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { assert_eq!(len, metadata.0.len()); let child = children.get(0, dtype, metadata.0.end)?; Ok(SliceArray { child, range: metadata.0.clone(), stats: Default::default(), - }) + } + .into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/struct_/vtable/mod.rs b/vortex-array/src/arrays/struct_/vtable/mod.rs index ca9f622b83d..e264887b3f2 100644 --- a/vortex-array/src/arrays/struct_/vtable/mod.rs +++ b/vortex-array/src/arrays/struct_/vtable/mod.rs @@ -142,7 +142,7 @@ impl VTable for StructVTable { _metadata: &Self::Metadata, _buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let DType::Struct(struct_dtype, nullability) = dtype else { vortex_bail!("Expected struct dtype, found {:?}", dtype) }; @@ -172,6 +172,7 @@ impl VTable for StructVTable { .try_collect()?; StructArray::try_new_with_dtype(children, struct_dtype.clone(), len, validity) + .map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/varbin/vtable/mod.rs b/vortex-array/src/arrays/varbin/vtable/mod.rs index 9b3d3a1ec71..acf09cabf3d 100644 --- a/vortex-array/src/arrays/varbin/vtable/mod.rs +++ b/vortex-array/src/arrays/varbin/vtable/mod.rs @@ -154,7 +154,7 @@ impl VTable for VarBinVTable { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let validity = if children.len() == 1 { Validity::from(dtype.nullability()) } else if children.len() == 2 { @@ -175,7 +175,7 @@ impl VTable for VarBinVTable { } let bytes = buffers[0].clone().try_to_host_sync()?; - VarBinArray::try_new(offsets, bytes, dtype.clone(), validity) + VarBinArray::try_new(offsets, bytes, dtype.clone(), validity).map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/arrays/varbinview/vtable/mod.rs b/vortex-array/src/arrays/varbinview/vtable/mod.rs index ea958acca37..73d135c30a9 100644 --- a/vortex-array/src/arrays/varbinview/vtable/mod.rs +++ b/vortex-array/src/arrays/varbinview/vtable/mod.rs @@ -163,7 +163,7 @@ impl VTable for VarBinViewVTable { _metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { let Some((views_handle, data_handles)) = buffers.split_last() else { vortex_bail!("Expected at least 1 buffer, got 0"); }; @@ -196,7 +196,8 @@ impl VTable for VarBinViewVTable { Arc::from(data_handles.to_vec()), dtype.clone(), validity, - ); + ) + .map(|a| a.into_array()); } let data_buffers = data_handles @@ -206,6 +207,7 @@ impl VTable for VarBinViewVTable { let views = Buffer::::from_byte_buffer(views_handle.clone().as_host().clone()); VarBinViewArray::try_new(views, Arc::from(data_buffers), dtype.clone(), validity) + .map(|a| a.into_array()) } fn with_children(array: &mut Self::Array, children: Vec) -> VortexResult<()> { diff --git a/vortex-array/src/vtable/dyn_.rs b/vortex-array/src/vtable/dyn_.rs index 4b3bad9223a..df050ff3a66 100644 --- a/vortex-array/src/vtable/dyn_.rs +++ b/vortex-array/src/vtable/dyn_.rs @@ -91,7 +91,7 @@ impl DynVTable for ArrayVTableAdapter { let array = V::build(dtype, len, &metadata, buffers, children)?; assert_eq!(array.len(), len, "Array length mismatch after building"); assert_eq!(array.dtype(), dtype, "Array dtype mismatch after building"); - Ok(array.into_array()) + Ok(array) } fn with_children(&self, array: &ArrayRef, children: Vec) -> VortexResult { diff --git a/vortex-array/src/vtable/mod.rs b/vortex-array/src/vtable/mod.rs index b8b20c04760..e099f23898f 100644 --- a/vortex-array/src/vtable/mod.rs +++ b/vortex-array/src/vtable/mod.rs @@ -174,7 +174,7 @@ pub trait VTable: 'static + Sized + Send + Sync + Debug { metadata: &Self::Metadata, buffers: &[BufferHandle], children: &dyn ArrayChildren, - ) -> VortexResult; + ) -> VortexResult; /// Replaces the children in `array` with `children`. The count must be the same and types /// of children must be expected. diff --git a/vortex-buffer/src/buffer.rs b/vortex-buffer/src/buffer.rs index f941db6dbec..2676937afc7 100644 --- a/vortex-buffer/src/buffer.rs +++ b/vortex-buffer/src/buffer.rs @@ -498,6 +498,34 @@ impl Buffer { } } +impl ByteBuffer { + /// Reinterpret the byte buffer as a slice of values of type `V`. + /// + /// # Panics + /// + /// This method will only work if the buffer has the proper size and alignment to be viewed + /// as a buffer of `V` values. + pub fn reinterpret(&self) -> &[V] { + assert!( + self.is_aligned(Alignment::of::()), + "ByteBuffer not properly aligned to {}", + type_name::() + ); + + assert_eq!( + self.length % size_of::(), + 0, + "ByteBuffer length not a multiple of the value length" + ); + + let v_len = self.length / size_of::(); + let v_ptr = self.bytes.as_ptr().cast::(); + + // SAFETY: we checked that alignment and length are suitable to treat this as a &[V]. + unsafe { std::slice::from_raw_parts(v_ptr, v_len) } + } +} + /// An iterator over Buffer elements. /// /// This is an analog to the `std::slice::Iter` type. diff --git a/vortex-python/src/arrays/py/vtable.rs b/vortex-python/src/arrays/py/vtable.rs index 83ebfa226cf..dccbe48b22f 100644 --- a/vortex-python/src/arrays/py/vtable.rs +++ b/vortex-python/src/arrays/py/vtable.rs @@ -142,7 +142,7 @@ impl VTable for PythonVTable { _metadata: &Self::Metadata, _buffers: &[BufferHandle], _children: &dyn ArrayChildren, - ) -> VortexResult { + ) -> VortexResult { todo!() }