From 133ec1d0d9497493201a332a34e1ecdb86cf6d47 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 20 May 2026 16:20:55 +0100 Subject: [PATCH] Execution rule for Chunked Signed-off-by: Adam Gutglick --- encodings/parquet-variant/src/kernel.rs | 186 ++++++++++++++++++++++++ 1 file changed, 186 insertions(+) diff --git a/encodings/parquet-variant/src/kernel.rs b/encodings/parquet-variant/src/kernel.rs index a136b483678..af048dd2607 100644 --- a/encodings/parquet-variant/src/kernel.rs +++ b/encodings/parquet-variant/src/kernel.rs @@ -17,6 +17,9 @@ use vortex_array::ArrayRef; use vortex_array::ArrayView; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; +use vortex_array::arrays::Chunked; +use vortex_array::arrays::ChunkedArray; +use vortex_array::arrays::chunked::ChunkedArrayExt; use vortex_array::arrays::dict::TakeExecute; use vortex_array::arrays::dict::TakeExecuteAdaptor; use vortex_array::arrays::filter::FilterExecuteAdaptor; @@ -32,6 +35,7 @@ use vortex_array::kernel::ParentKernelSet; use vortex_array::scalar_fn::fns::variant_get::VariantGet; use vortex_array::scalar_fn::fns::variant_get::VariantPath; use vortex_array::scalar_fn::fns::variant_get::VariantPathElement; +use vortex_array::validity::Validity; use vortex_error::VortexResult; use vortex_error::vortex_ensure_eq; use vortex_error::vortex_err; @@ -45,8 +49,128 @@ pub(crate) static PARENT_KERNELS: ParentKernelSet = ParentKernel ParentKernelSet::lift(&SliceExecuteAdaptor(ParquetVariant)), ParentKernelSet::lift(&TakeExecuteAdaptor(ParquetVariant)), ParentKernelSet::lift(&VariantGetKernel), + ParentKernelSet::lift(&ChunkedMergeKernel), ]); +#[derive(Default, Debug)] +struct ChunkedMergeKernel; + +impl ExecuteParentKernel for ChunkedMergeKernel { + type Parent = Chunked; + + fn execute_parent( + &self, + array: ArrayView<'_, ParquetVariant>, + parent: ArrayView<'_, Chunked>, + _child_idx: usize, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if parent.nchunks() < 2 || !ArrayRef::ptr_eq(array.array(), parent.chunk(0)) { + return Ok(None); + } + + let Some(layout) = validate_chunks(parent) else { + return Ok(None); + }; + + let chunks = parent + .iter_chunks() + .map(|chunk| chunk.as_::()) + .collect::>(); + + merge_parquet_variant_chunks(&chunks, layout).map(Some) + } +} + +struct ParquetVariantChunkMergeLayout { + metadata_dtype: DType, + value_dtype: Option, + typed_value_dtype: Option, +} + +/// Validates that all chunks are `ParquetVariant` AND they all share the same structure +fn validate_chunks(parent: ArrayView<'_, Chunked>) -> Option { + let mut chunks = parent.iter_chunks(); + let first = chunks.next()?.as_opt::()?; + let metadata_dtype = first.metadata_array().dtype().clone(); + let value_dtype = first.value_array().map(|value| value.dtype().clone()); + let typed_value_dtype = first + .typed_value_array() + .map(|typed_value| typed_value.dtype().clone()); + + for chunk in chunks { + let chunk = chunk.as_opt::()?; + if chunk.metadata_array().dtype() != &metadata_dtype { + return None; + } + if chunk.value_array().map(ArrayRef::dtype) != value_dtype.as_ref() { + return None; + } + if chunk.typed_value_array().map(ArrayRef::dtype) != typed_value_dtype.as_ref() { + return None; + } + } + + Some(ParquetVariantChunkMergeLayout { + metadata_dtype, + value_dtype, + typed_value_dtype, + }) +} + +fn merge_parquet_variant_chunks( + chunks: &[ArrayView<'_, ParquetVariant>], + layout: ParquetVariantChunkMergeLayout, +) -> VortexResult { + let metadata = ChunkedArray::try_new( + chunks + .iter() + .map(|chunk| chunk.metadata_array().clone()) + .collect(), + layout.metadata_dtype, + )? + .into_array(); + + let validity = Validity::concat( + chunks + .iter() + .map(|chunk| chunk.validity().map(|validity| (validity, chunk.len()))) + .collect::>>()?, + ) + .ok_or_else(|| vortex_err!("cannot merge empty ParquetVariant chunks"))?; + + let value = if let Some(value_dtype) = layout.value_dtype { + let value_chunks = chunks + .iter() + .map(|chunk| { + chunk + .value_array() + .cloned() + .ok_or_else(|| vortex_err!("validated ParquetVariant value child is missing")) + }) + .collect::>>()?; + Some(ChunkedArray::try_new(value_chunks, value_dtype)?.into_array()) + } else { + None + }; + + let typed_value = if let Some(typed_value_dtype) = layout.typed_value_dtype { + let typed_value_chunks = chunks + .iter() + .map(|chunk| { + chunk.typed_value_array().cloned().ok_or_else(|| { + vortex_err!("validated ParquetVariant typed_value child is missing") + }) + }) + .collect::>>()?; + Some(ChunkedArray::try_new(typed_value_chunks, typed_value_dtype)?.into_array()) + } else { + None + }; + + ParquetVariant::try_new(validity, metadata, value, typed_value).map(IntoArray::into_array) +} + #[derive(Default, Debug)] struct VariantGetKernel; @@ -209,6 +333,7 @@ mod tests { use vortex_array::IntoArray; use vortex_array::LEGACY_SESSION; use vortex_array::VortexSessionExecute; + use vortex_array::arrays::ChunkedArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray as VortexStructArray; use vortex_array::arrays::VarBinArray; @@ -233,6 +358,7 @@ mod tests { use vortex_error::vortex_err; use vortex_mask::Mask; + use super::validate_chunks; use crate::ParquetVariant; use crate::ParquetVariantArrayExt; @@ -803,6 +929,17 @@ mod tests { ParquetVariant::from_arrow_variant(&arrow_variant) } + fn make_i64_shredded_typed_array() -> VortexResult { + let metadata = ArrayRef::from_arrow( + binary_view_array(&[b"\x01\x00", b"\x01\x00"]).as_ref(), + false, + )?; + let typed_value = PrimitiveArray::from_iter([10i64, 20]).into_array(); + + ParquetVariant::try_new(Validity::NonNullable, metadata, None, Some(typed_value)) + .map(IntoArray::into_array) + } + fn assert_typed_value_i32( array: &ArrayRef, expected: impl IntoIterator>, @@ -1002,4 +1139,53 @@ mod tests { Ok(()) } + + #[test] + fn test_chunked_parquet_variant_merge_preflight_rejects_mismatched_typed_value_dtype() + -> VortexResult<()> { + let chunked = ChunkedArray::try_new( + vec![ + make_shredded_typed_array()?, + make_i64_shredded_typed_array()?, + ], + VortexDType::Variant(Nullability::NonNullable), + )?; + + assert!(validate_chunks(chunked.as_view()).is_none()); + Ok(()) + } + + #[test] + fn test_chunked_parquet_variant_canonicalization_merges_storage() -> VortexResult<()> { + let dtype = VortexDType::Variant(Nullability::NonNullable); + let chunked = ChunkedArray::try_new( + vec![make_shredded_typed_array()?, make_shredded_typed_array()?], + dtype, + )? + .into_array(); + + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let Canonical::Variant(canonical) = chunked.execute::(&mut ctx)? else { + return Err(vortex_err!("expected canonical variant array")); + }; + + let core_storage = canonical + .core_storage() + .as_opt::() + .ok_or_else(|| vortex_err!("expected merged parquet variant core storage"))?; + assert!(core_storage.typed_value_array().is_none()); + assert!(core_storage.value_array().is_some()); + + let shredded = canonical + .shredded() + .ok_or_else(|| vortex_err!("expected canonical shredded child"))? + .clone() + .execute::(&mut ctx)?; + assert_arrays_eq!( + shredded, + PrimitiveArray::from_option_iter([Some(10), None, Some(30), Some(10), None, Some(30)]) + ); + + Ok(()) + } }