Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions encodings/parquet-variant/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::Chunked;
use vortex_array::arrays::ChunkedArray;
use vortex_array::arrays::chunked::ChunkedArrayExt;
use vortex_array::arrays::dict::TakeExecute;
use vortex_array::arrays::dict::TakeExecuteAdaptor;
use vortex_array::arrays::filter::FilterExecuteAdaptor;
Expand All @@ -32,6 +35,7 @@ use vortex_array::kernel::ParentKernelSet;
use vortex_array::scalar_fn::fns::variant_get::VariantGet;
use vortex_array::scalar_fn::fns::variant_get::VariantPath;
use vortex_array::scalar_fn::fns::variant_get::VariantPathElement;
use vortex_array::validity::Validity;
use vortex_error::VortexResult;
use vortex_error::vortex_ensure_eq;
use vortex_error::vortex_err;
Expand All @@ -45,8 +49,128 @@ pub(crate) static PARENT_KERNELS: ParentKernelSet<ParquetVariant> = ParentKernel
ParentKernelSet::lift(&SliceExecuteAdaptor(ParquetVariant)),
ParentKernelSet::lift(&TakeExecuteAdaptor(ParquetVariant)),
ParentKernelSet::lift(&VariantGetKernel),
ParentKernelSet::lift(&ChunkedMergeKernel),
]);

#[derive(Default, Debug)]
struct ChunkedMergeKernel;

impl ExecuteParentKernel<ParquetVariant> for ChunkedMergeKernel {
type Parent = Chunked;

fn execute_parent(
&self,
array: ArrayView<'_, ParquetVariant>,
parent: ArrayView<'_, Chunked>,
_child_idx: usize,
_ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
if parent.nchunks() < 2 || !ArrayRef::ptr_eq(array.array(), parent.chunk(0)) {
return Ok(None);
}

let Some(layout) = validate_chunks(parent) else {
return Ok(None);
};

let chunks = parent
.iter_chunks()
.map(|chunk| chunk.as_::<ParquetVariant>())
.collect::<Vec<_>>();

merge_parquet_variant_chunks(&chunks, layout).map(Some)
}
}

struct ParquetVariantChunkMergeLayout {
metadata_dtype: DType,
value_dtype: Option<DType>,
typed_value_dtype: Option<DType>,
}

/// Validates that all chunks are `ParquetVariant` AND they all share the same structure
fn validate_chunks(parent: ArrayView<'_, Chunked>) -> Option<ParquetVariantChunkMergeLayout> {
let mut chunks = parent.iter_chunks();
let first = chunks.next()?.as_opt::<ParquetVariant>()?;
let metadata_dtype = first.metadata_array().dtype().clone();
let value_dtype = first.value_array().map(|value| value.dtype().clone());
let typed_value_dtype = first
.typed_value_array()
.map(|typed_value| typed_value.dtype().clone());

for chunk in chunks {
let chunk = chunk.as_opt::<ParquetVariant>()?;
if chunk.metadata_array().dtype() != &metadata_dtype {
return None;
}
if chunk.value_array().map(ArrayRef::dtype) != value_dtype.as_ref() {
return None;
}
if chunk.typed_value_array().map(ArrayRef::dtype) != typed_value_dtype.as_ref() {
return None;
}
}

Some(ParquetVariantChunkMergeLayout {
metadata_dtype,
value_dtype,
typed_value_dtype,
})
}

fn merge_parquet_variant_chunks(
chunks: &[ArrayView<'_, ParquetVariant>],
layout: ParquetVariantChunkMergeLayout,
) -> VortexResult<ArrayRef> {
let metadata = ChunkedArray::try_new(
chunks
.iter()
.map(|chunk| chunk.metadata_array().clone())
.collect(),
layout.metadata_dtype,
)?
.into_array();

let validity = Validity::concat(
chunks
.iter()
.map(|chunk| chunk.validity().map(|validity| (validity, chunk.len())))
.collect::<VortexResult<Vec<_>>>()?,
)
.ok_or_else(|| vortex_err!("cannot merge empty ParquetVariant chunks"))?;

let value = if let Some(value_dtype) = layout.value_dtype {
let value_chunks = chunks
.iter()
.map(|chunk| {
chunk
.value_array()
.cloned()
.ok_or_else(|| vortex_err!("validated ParquetVariant value child is missing"))
})
.collect::<VortexResult<Vec<_>>>()?;
Some(ChunkedArray::try_new(value_chunks, value_dtype)?.into_array())
} else {
None
};

let typed_value = if let Some(typed_value_dtype) = layout.typed_value_dtype {
let typed_value_chunks = chunks
.iter()
.map(|chunk| {
chunk.typed_value_array().cloned().ok_or_else(|| {
vortex_err!("validated ParquetVariant typed_value child is missing")
})
})
.collect::<VortexResult<Vec<_>>>()?;
Some(ChunkedArray::try_new(typed_value_chunks, typed_value_dtype)?.into_array())
} else {
None
};

ParquetVariant::try_new(validity, metadata, value, typed_value).map(IntoArray::into_array)
}

#[derive(Default, Debug)]
struct VariantGetKernel;

Expand Down Expand Up @@ -209,6 +333,7 @@ mod tests {
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::ChunkedArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::StructArray as VortexStructArray;
use vortex_array::arrays::VarBinArray;
Expand All @@ -233,6 +358,7 @@ mod tests {
use vortex_error::vortex_err;
use vortex_mask::Mask;

use super::validate_chunks;
use crate::ParquetVariant;
use crate::ParquetVariantArrayExt;

Expand Down Expand Up @@ -803,6 +929,17 @@ mod tests {
ParquetVariant::from_arrow_variant(&arrow_variant)
}

fn make_i64_shredded_typed_array() -> VortexResult<ArrayRef> {
let metadata = ArrayRef::from_arrow(
binary_view_array(&[b"\x01\x00", b"\x01\x00"]).as_ref(),
false,
)?;
let typed_value = PrimitiveArray::from_iter([10i64, 20]).into_array();

ParquetVariant::try_new(Validity::NonNullable, metadata, None, Some(typed_value))
.map(IntoArray::into_array)
}

fn assert_typed_value_i32(
array: &ArrayRef,
expected: impl IntoIterator<Item = Option<i32>>,
Expand Down Expand Up @@ -1002,4 +1139,53 @@ mod tests {

Ok(())
}

#[test]
fn test_chunked_parquet_variant_merge_preflight_rejects_mismatched_typed_value_dtype()
-> VortexResult<()> {
let chunked = ChunkedArray::try_new(
vec![
make_shredded_typed_array()?,
make_i64_shredded_typed_array()?,
],
VortexDType::Variant(Nullability::NonNullable),
)?;

assert!(validate_chunks(chunked.as_view()).is_none());
Ok(())
}

#[test]
fn test_chunked_parquet_variant_canonicalization_merges_storage() -> VortexResult<()> {
let dtype = VortexDType::Variant(Nullability::NonNullable);
let chunked = ChunkedArray::try_new(
vec![make_shredded_typed_array()?, make_shredded_typed_array()?],
dtype,
)?
.into_array();

let mut ctx = LEGACY_SESSION.create_execution_ctx();
let Canonical::Variant(canonical) = chunked.execute::<Canonical>(&mut ctx)? else {
return Err(vortex_err!("expected canonical variant array"));
};

let core_storage = canonical
.core_storage()
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("expected merged parquet variant core storage"))?;
assert!(core_storage.typed_value_array().is_none());
assert!(core_storage.value_array().is_some());

let shredded = canonical
.shredded()
.ok_or_else(|| vortex_err!("expected canonical shredded child"))?
.clone()
.execute::<PrimitiveArray>(&mut ctx)?;
assert_arrays_eq!(
shredded,
PrimitiveArray::from_option_iter([Some(10), None, Some(30), Some(10), None, Some(30)])
);

Ok(())
}
}
Loading