diff --git a/vortex-array/src/arrays/chunked/compute/zip.rs b/vortex-array/src/arrays/chunked/compute/zip.rs index b76970852b7..f7c969fdf6e 100644 --- a/vortex-array/src/arrays/chunked/compute/zip.rs +++ b/vortex-array/src/arrays/chunked/compute/zip.rs @@ -28,39 +28,10 @@ impl ZipKernel for ChunkedVTable { .union_nullability(if_false.dtype().nullability()); let mut out_chunks = Vec::with_capacity(if_true.nchunks() + if_false.nchunks()); - let mut lhs_idx = 0; - let mut rhs_idx = 0; - let mut lhs_offset = 0; - let mut rhs_offset = 0; - let mut pos = 0; - let total_len = if_true.len(); - - while pos < total_len { - let lhs_chunk = if_true.chunk(lhs_idx); - let rhs_chunk = if_false.chunk(rhs_idx); - - let lhs_rem = lhs_chunk.len() - lhs_offset; - let rhs_rem = rhs_chunk.len() - rhs_offset; - let take_until = lhs_rem.min(rhs_rem); - - let mask_slice = mask.slice(pos..pos + take_until)?; - let lhs_slice = lhs_chunk.slice(lhs_offset..lhs_offset + take_until)?; - let rhs_slice = rhs_chunk.slice(rhs_offset..rhs_offset + take_until)?; - - out_chunks.push(mask_slice.zip(lhs_slice, rhs_slice)?); - - pos += take_until; - lhs_offset += take_until; - rhs_offset += take_until; - - if lhs_offset == lhs_chunk.len() { - lhs_idx += 1; - lhs_offset = 0; - } - if rhs_offset == rhs_chunk.len() { - rhs_idx += 1; - rhs_offset = 0; - } + for pair in if_true.paired_chunks(if_false) { + let pair = pair?; + let mask_slice = mask.slice(pair.pos)?; + out_chunks.push(mask_slice.zip(pair.left, pair.right)?); } // SAFETY: chunks originate from zipping slices of inputs that share dtype/nullability. diff --git a/vortex-array/src/arrays/chunked/mod.rs b/vortex-array/src/arrays/chunked/mod.rs index 4ef5ed10cf2..07d0bbcc346 100644 --- a/vortex-array/src/arrays/chunked/mod.rs +++ b/vortex-array/src/arrays/chunked/mod.rs @@ -5,6 +5,7 @@ mod array; pub use array::ChunkedArray; mod compute; +mod paired_chunks; mod vtable; pub use vtable::ChunkedVTable; diff --git a/vortex-array/src/arrays/chunked/paired_chunks.rs b/vortex-array/src/arrays/chunked/paired_chunks.rs new file mode 100644 index 00000000000..bc91c53b1e3 --- /dev/null +++ b/vortex-array/src/arrays/chunked/paired_chunks.rs @@ -0,0 +1,258 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use vortex_error::VortexResult; + +use crate::ArrayRef; +use crate::arrays::ChunkedArray; + +/// A pre-sliced, aligned pair of array chunks from two `ChunkedArray`s. +pub(crate) struct AlignedPair { + pub left: ArrayRef, + pub right: ArrayRef, + pub pos: Range, +} + +/// An iterator that walks two equally-sized `ChunkedArray`s in lockstep, +/// yielding aligned `(left, right)` slices at every chunk boundary of either +/// input. Empty chunks are skipped automatically. +pub(crate) struct PairedChunks<'a> { + left: &'a ChunkedArray, + right: &'a ChunkedArray, + lhs_idx: usize, + rhs_idx: usize, + lhs_offset: usize, + rhs_offset: usize, + pos: usize, + total_len: usize, +} + +impl ChunkedArray { + /// Returns an iterator that walks `self` and `other` in lockstep, yielding + /// [`AlignedPair`]s sliced at every chunk boundary of either input. + /// + /// # Panics + /// + /// Panics if `self.len() != other.len()`. + pub(crate) fn paired_chunks<'a>(&'a self, other: &'a ChunkedArray) -> PairedChunks<'a> { + assert_eq!( + self.len(), + other.len(), + "paired_chunks requires arrays of equal length" + ); + PairedChunks { + left: self, + right: other, + lhs_idx: 0, + rhs_idx: 0, + lhs_offset: 0, + rhs_offset: 0, + pos: 0, + total_len: self.len(), + } + } +} + +impl Iterator for PairedChunks<'_> { + type Item = VortexResult; + + fn next(&mut self) -> Option { + // Skip empty chunks on either side. + while self.lhs_idx < self.left.nchunks() && self.left.chunk(self.lhs_idx).is_empty() { + self.lhs_idx += 1; + } + while self.rhs_idx < self.right.nchunks() && self.right.chunk(self.rhs_idx).is_empty() { + self.rhs_idx += 1; + } + + if self.pos >= self.total_len { + return None; + } + + let lhs_chunk = self.left.chunk(self.lhs_idx); + let rhs_chunk = self.right.chunk(self.rhs_idx); + + let lhs_rem = lhs_chunk.len() - self.lhs_offset; + let rhs_rem = rhs_chunk.len() - self.rhs_offset; + let take = lhs_rem.min(rhs_rem); + + let lhs_slice = match lhs_chunk.slice(self.lhs_offset..self.lhs_offset + take) { + Ok(s) => s, + Err(e) => return Some(Err(e)), + }; + let rhs_slice = match rhs_chunk.slice(self.rhs_offset..self.rhs_offset + take) { + Ok(s) => s, + Err(e) => return Some(Err(e)), + }; + + let start = self.pos; + self.pos += take; + self.lhs_offset += take; + self.rhs_offset += take; + + if self.lhs_offset == lhs_chunk.len() { + self.lhs_idx += 1; + self.lhs_offset = 0; + } + if self.rhs_offset == rhs_chunk.len() { + self.rhs_idx += 1; + self.rhs_offset = 0; + } + + Some(Ok(AlignedPair { + left: lhs_slice, + right: rhs_slice, + pos: start..self.pos, + })) + } +} + +#[cfg(test)] +mod tests { + use vortex_buffer::buffer; + use vortex_error::VortexResult; + + use crate::IntoArray; + use crate::arrays::ChunkedArray; + use crate::dtype::DType; + use crate::dtype::Nullability; + use crate::dtype::PType; + + fn i32_dtype() -> DType { + DType::Primitive(PType::I32, Nullability::NonNullable) + } + + #[allow(clippy::type_complexity)] + fn collect_pairs( + left: &ChunkedArray, + right: &ChunkedArray, + ) -> VortexResult, Vec, std::ops::Range)>> { + use crate::ToCanonical; + let mut result = Vec::new(); + for pair in left.paired_chunks(right) { + let pair = pair?; + let l: Vec = pair.left.to_primitive().as_slice::().to_vec(); + let r: Vec = pair.right.to_primitive().as_slice::().to_vec(); + result.push((l, r, pair.pos)); + } + Ok(result) + } + + #[test] + fn test_aligned_chunks() -> VortexResult<()> { + let left = ChunkedArray::try_new( + vec![buffer![1i32, 2].into_array(), buffer![3i32, 4].into_array()], + i32_dtype(), + )?; + let right = ChunkedArray::try_new( + vec![ + buffer![10i32, 20].into_array(), + buffer![30i32, 40].into_array(), + ], + i32_dtype(), + )?; + + let pairs = collect_pairs(&left, &right)?; + assert_eq!(pairs.len(), 2); + assert_eq!(pairs[0], (vec![1, 2], vec![10, 20], 0..2)); + assert_eq!(pairs[1], (vec![3, 4], vec![30, 40], 2..4)); + Ok(()) + } + + #[test] + fn test_misaligned_chunks() -> VortexResult<()> { + let left = ChunkedArray::try_new( + vec![ + buffer![1i32, 2].into_array(), + buffer![3i32].into_array(), + buffer![4i32, 5].into_array(), + ], + i32_dtype(), + )?; + let right = ChunkedArray::try_new( + vec![ + buffer![10i32].into_array(), + buffer![20i32, 30].into_array(), + buffer![40i32, 50].into_array(), + ], + i32_dtype(), + )?; + + let pairs = collect_pairs(&left, &right)?; + // Left: [1,2] [3] [4,5] → boundaries at 0,2,3,5 + // Right: [10] [20,30] [40,50] → boundaries at 0,1,3,5 + // Aligned at: 0,1,2,3,5 + assert_eq!(pairs.len(), 4); + assert_eq!(pairs[0], (vec![1], vec![10], 0..1)); + assert_eq!(pairs[1], (vec![2], vec![20], 1..2)); + assert_eq!(pairs[2], (vec![3], vec![30], 2..3)); + assert_eq!(pairs[3], (vec![4, 5], vec![40, 50], 3..5)); + Ok(()) + } + + #[test] + fn test_empty_chunks() -> VortexResult<()> { + let left = ChunkedArray::try_new( + vec![ + buffer![0i32; 0].into_array(), + buffer![1i32, 2, 3].into_array(), + ], + i32_dtype(), + )?; + let right = ChunkedArray::try_new( + vec![ + buffer![10i32, 20, 30].into_array(), + buffer![0i32; 0].into_array(), + ], + i32_dtype(), + )?; + + let pairs = collect_pairs(&left, &right)?; + assert_eq!(pairs.len(), 1); + assert_eq!(pairs[0], (vec![1, 2, 3], vec![10, 20, 30], 0..3)); + Ok(()) + } + + #[test] + fn test_single_element_chunks() -> VortexResult<()> { + let left = ChunkedArray::try_new( + vec![ + buffer![1i32].into_array(), + buffer![2i32].into_array(), + buffer![3i32].into_array(), + ], + i32_dtype(), + )?; + let right = ChunkedArray::try_new(vec![buffer![10i32, 20, 30].into_array()], i32_dtype())?; + + let pairs = collect_pairs(&left, &right)?; + assert_eq!(pairs.len(), 3); + assert_eq!(pairs[0], (vec![1], vec![10], 0..1)); + assert_eq!(pairs[1], (vec![2], vec![20], 1..2)); + assert_eq!(pairs[2], (vec![3], vec![30], 2..3)); + Ok(()) + } + + #[test] + fn test_both_empty() -> VortexResult<()> { + let left = ChunkedArray::try_new(vec![], i32_dtype())?; + let right = ChunkedArray::try_new(vec![], i32_dtype())?; + + let pairs = collect_pairs(&left, &right)?; + assert!(pairs.is_empty()); + Ok(()) + } + + #[test] + #[should_panic(expected = "paired_chunks requires arrays of equal length")] + fn test_length_mismatch_panics() { + let left = ChunkedArray::try_new(vec![buffer![1i32, 2].into_array()], i32_dtype()).unwrap(); + let right = + ChunkedArray::try_new(vec![buffer![10i32, 20, 30].into_array()], i32_dtype()).unwrap(); + + // Should panic. + drop(left.paired_chunks(&right).collect::>()); + } +} diff --git a/vortex-array/src/scalar_fn/fns/zip/mod.rs b/vortex-array/src/scalar_fn/fns/zip/mod.rs index 7a11dbd04dc..4b13bf53c89 100644 --- a/vortex-array/src/scalar_fn/fns/zip/mod.rs +++ b/vortex-array/src/scalar_fn/fns/zip/mod.rs @@ -115,7 +115,9 @@ impl ScalarFnVTable for Zip { let if_false = args.get(1)?; let mask_array = args.get(2)?; - let mask = mask_array.execute::(ctx)?.to_mask(); + let mask = mask_array + .execute::(ctx)? + .to_mask_fill_null_false(); let return_dtype = if_true .dtype() @@ -228,20 +230,23 @@ mod tests { use vortex_mask::Mask; use crate::ArrayRef; + use crate::DynArray; use crate::IntoArray; use crate::LEGACY_SESSION; use crate::VortexSessionExecute; + use crate::arrays::BoolArray; use crate::arrays::ConstantArray; use crate::arrays::PrimitiveArray; use crate::arrays::StructArray; use crate::arrays::StructVTable; - use crate::arrays::VarBinViewArray; + use crate::arrays::VarBinViewVTable; use crate::arrow::IntoArrowArray; use crate::assert_arrays_eq; use crate::builders::ArrayBuilder; use crate::builders::BufferGrowthStrategy; use crate::builders::VarBinViewBuilder; use crate::builtins::ArrayBuiltins; + use crate::columnar::Columnar; use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; @@ -326,10 +331,12 @@ mod tests { let mask = Mask::from_indices(len, indices); let mask_array = mask.into_array(); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); let result = mask_array .clone() .zip(const1.clone(), const2.clone())? - .execute::(&mut LEGACY_SESSION.create_execution_ctx())?; + .execute::(&mut ctx)? + .into_array(); insta::assert_snapshot!(result.display_tree(), @r" root: vortex.varbinview(utf8?, len=100) nbytes=1.66 kB (100.00%) [all_valid] @@ -345,7 +352,7 @@ mod tests { let wrapped_result = mask_array .zip(wrapped1, wrapped2)? - .execute::(&mut LEGACY_SESSION.create_execution_ctx())?; + .execute::(&mut ctx)?; assert!(wrapped_result.is::()); Ok(()) @@ -387,11 +394,13 @@ mod tests { let mask = Mask::from_indices(200, (0..100).filter(|i| i % 3 != 0).collect()); let mask_array = mask.clone().into_array(); + let mut ctx = LEGACY_SESSION.create_execution_ctx(); let zipped = mask_array .zip(if_true.clone(), if_false.clone()) .unwrap() - .execute::(&mut LEGACY_SESSION.create_execution_ctx()) + .execute::(&mut ctx) .unwrap(); + let zipped = zipped.as_opt::().unwrap(); assert_eq!(zipped.nbuffers(), 2); // assert the result is the same as arrow @@ -405,7 +414,53 @@ mod tests { ) .unwrap(); - let actual = zipped.into_array().into_arrow_preferred().unwrap(); + let actual = zipped.clone().into_array().into_arrow_preferred().unwrap(); assert_eq!(actual.as_ref(), expected.as_ref()); } + + #[test] + fn test_zip_nullable_mask() { + // Null mask values are treated as false (selecting if_false). + let mask = BoolArray::from_iter([Some(true), None, Some(false), None, Some(true)]); + let if_true = buffer![10, 20, 30, 40, 50].into_array(); + let if_false = buffer![1, 2, 3, 4, 5].into_array(); + + let result = mask.into_array().zip(if_true, if_false).unwrap(); + let expected = buffer![10, 2, 3, 4, 50].into_array(); + + assert_arrays_eq!(result, expected); + } + + #[test] + fn test_zip_nullable_mask_all_null() { + // All-null mask should select entirely from if_false. + let mask = BoolArray::from_iter([None, None, None]); + let if_true = buffer![10, 20, 30].into_array(); + let if_false = buffer![1, 2, 3].into_array(); + + let result = mask.into_array().zip(if_true, if_false).unwrap(); + let expected = buffer![1, 2, 3].into_array(); + + assert_arrays_eq!(result, expected); + } + + #[test] + fn test_zip_nullable_mask_with_nullable_values() { + // Nullable mask combined with nullable if_true and if_false. + let mask = BoolArray::from_iter([Some(true), None, Some(false), Some(true)]); + let if_true = + PrimitiveArray::from_option_iter([Some(10), Some(20), Some(30), None]).into_array(); + let if_false = + PrimitiveArray::from_option_iter([Some(1), None, Some(3), Some(4)]).into_array(); + + let result = mask.into_array().zip(if_true, if_false).unwrap(); + // mask[0]=true → if_true[0]=10 + // mask[1]=null → if_false[1]=null + // mask[2]=false → if_false[2]=3 + // mask[3]=true → if_true[3]=null + let expected = + PrimitiveArray::from_option_iter([Some(10), None, Some(3), None]).into_array(); + + assert_arrays_eq!(result, expected); + } } diff --git a/vortex-cuda/gpu-scan-cli/Cargo.toml b/vortex-cuda/gpu-scan-cli/Cargo.toml index 9bdf7fc3478..2ec84c64004 100644 --- a/vortex-cuda/gpu-scan-cli/Cargo.toml +++ b/vortex-cuda/gpu-scan-cli/Cargo.toml @@ -21,6 +21,6 @@ tokio = { workspace = true, features = ["macros", "full"] } tracing = { workspace = true, features = ["std", "attributes"] } tracing-perfetto = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } -vortex = { workspace = true } +vortex = { workspace = true, features = ["tokio"] } vortex-cuda = { workspace = true, features = ["_test-harness"] } vortex-cuda-macros = { workspace = true }