From 60d00420d0b39f3e804ce295df8395ebd78c71f9 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 15 May 2026 14:04:13 +0100 Subject: [PATCH 01/12] Actually write Variant to files Signed-off-by: Adam Gutglick --- Cargo.lock | 1 + Cargo.toml | 1 + encodings/parquet-variant/src/vtable.rs | 92 ++++++++++++++++------ vortex-compressor/src/compressor.rs | 5 +- vortex-file/Cargo.toml | 1 + vortex-file/src/strategy.rs | 4 + vortex-layout/src/layouts/zoned/builder.rs | 11 +++ 7 files changed, 85 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f2b7bb670e..91c061d6edb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9716,6 +9716,7 @@ dependencies = [ "vortex-layout", "vortex-mask", "vortex-metrics", + "vortex-parquet-variant", "vortex-pco", "vortex-runend", "vortex-scan", diff --git a/Cargo.toml b/Cargo.toml index fb87a953154..92890d2e31a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -289,6 +289,7 @@ vortex-ipc = { version = "0.1.0", path = "./vortex-ipc", default-features = fals vortex-layout = { version = "0.1.0", path = "./vortex-layout", default-features = false } vortex-mask = { version = "0.1.0", path = "./vortex-mask", default-features = false } vortex-metrics = { version = "0.1.0", path = "./vortex-metrics", default-features = false } +vortex-parquet-variant = { version = "0.1.0", path = "./encodings/parquet-variant" } vortex-pco = { version = "0.1.0", path = "./encodings/pco", default-features = false } vortex-proto = { version = "0.1.0", path = "./vortex-proto", default-features = false } vortex-runend = { version = "0.1.0", path = "./encodings/runend", default-features = false } diff --git a/encodings/parquet-variant/src/vtable.rs b/encodings/parquet-variant/src/vtable.rs index aafd5969caf..48ee372e4d0 100644 --- a/encodings/parquet-variant/src/vtable.rs +++ b/encodings/parquet-variant/src/vtable.rs @@ -309,6 +309,7 @@ mod tests { use vortex_array::IntoArray; use vortex_array::LEGACY_SESSION; use vortex_array::Precision; + use vortex_array::VTable; use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::VarBinViewArray; @@ -364,6 +365,44 @@ mod tests { .unwrap() } + fn typed_value_variant_array() -> VortexResult { + let mut metadata = BinaryViewBuilder::new(); + for _ in 0..3 { + metadata.append_value(b"\x01\x00"); + } + let metadata: ArrowArrayRef = Arc::new(metadata.finish()); + let typed_value: ArrowArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30])); + let arrow_storage = StructArray::try_new( + vec![ + Arc::new(Field::new("metadata", DataType::BinaryView, false)), + Arc::new(Field::new("typed_value", DataType::Int32, false)), + ] + .into(), + vec![metadata, typed_value], + None, + )?; + + ParquetVariant::from_arrow_variant(&ArrowVariantArray::try_new(&arrow_storage)?) + } + + fn parquet_variant_file_session() -> VortexSession { + let session = VortexSession::empty() + .with::() + .with::() + .with::(); + vortex_file::register_default_encodings(&session); + session.arrays().register(ParquetVariant); + session + } + + fn write_strategy_with_parquet_variant() -> Arc { + let mut allowed = vortex_file::ALLOWED_ENCODINGS.clone(); + allowed.insert(ParquetVariant.id()); + vortex_file::WriteStrategyBuilder::default() + .with_allow_encodings(allowed) + .build() + } + #[test] fn test_execute_exposes_typed_value_as_canonical_shredded() -> VortexResult<()> { let metadata = @@ -405,37 +444,38 @@ mod tests { } #[tokio::test] - async fn test_file_roundtrip_typed_value_variant() -> VortexResult<()> { - let mut metadata = BinaryViewBuilder::new(); - for _ in 0..3 { - metadata.append_value(b"\x01\x00"); - } - let metadata: ArrowArrayRef = Arc::new(metadata.finish()); - let typed_value: ArrowArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30])); - let arrow_storage = StructArray::try_new( - vec![ - Arc::new(Field::new("metadata", DataType::BinaryView, false)), - Arc::new(Field::new("typed_value", DataType::Int32, false)), - ] - .into(), - vec![metadata, typed_value], - None, - )?; - let expected = - ParquetVariant::from_arrow_variant(&ArrowVariantArray::try_new(&arrow_storage)?)?; - - let session = VortexSession::empty() - .with::() - .with::() - .with::(); - vortex_file::register_default_encodings(&session); - session.arrays().register(ParquetVariant); + async fn test_file_roundtrip_typed_value_variant_with_statistics() -> VortexResult<()> { + let expected = typed_value_variant_array()?; + let session = parquet_variant_file_session(); let mut bytes = ByteBufferMut::empty(); session .write_options() .with_strategy(Arc::new(FlatLayoutStrategy::default())) - .with_file_statistics(Vec::new()) + .write(&mut bytes, expected.to_array_stream()) + .await?; + + let actual = session + .open_options() + .open_buffer(bytes)? + .scan()? + .into_array_stream()? + .read_all() + .await?; + + assert_arrays_eq!(expected, actual); + Ok(()) + } + + #[tokio::test] + async fn test_file_roundtrip_typed_value_variant_with_zoned_strategy() -> VortexResult<()> { + let expected = typed_value_variant_array()?; + let session = parquet_variant_file_session(); + + let mut bytes = ByteBufferMut::empty(); + session + .write_options() + .with_strategy(write_strategy_with_parquet_variant()) .write(&mut bytes, expected.to_array_stream()) .await?; diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index d3ad0d9d5c3..402ede34a88 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -27,7 +27,6 @@ use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::scalar::Scalar; use vortex_error::VortexResult; -use vortex_error::vortex_bail; use crate::builtins::IntDictScheme; use crate::ctx::CompressorContext; @@ -250,9 +249,7 @@ impl CascadingCompressor { .into_array(), ) } - Canonical::Variant(_) => { - vortex_bail!("Variant arrays can not be compressed") - } + Canonical::Variant(variant_array) => Ok(variant_array.into_array()), } } diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 77d664a12cb..8bfbeabc5ca 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -46,6 +46,7 @@ vortex-io = { workspace = true } vortex-layout = { workspace = true } vortex-mask = { workspace = true } vortex-metrics = { workspace = true } +vortex-parquet-variant = { workspace = true } vortex-pco = { workspace = true } vortex-runend = { workspace = true } vortex-scan = { workspace = true } diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 71c72ffc904..d878d99f7a0 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -26,6 +26,7 @@ use vortex_array::arrays::Primitive; use vortex_array::arrays::Struct; use vortex_array::arrays::VarBin; use vortex_array::arrays::VarBinView; +use vortex_array::arrays::Variant; use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::dtype::FieldPath; use vortex_btrblocks::BtrBlocksCompressorBuilder; @@ -52,6 +53,7 @@ use vortex_layout::layouts::repartition::RepartitionWriterOptions; use vortex_layout::layouts::table::TableStrategy; use vortex_layout::layouts::zoned::writer::ZonedLayoutOptions; use vortex_layout::layouts::zoned::writer::ZonedStrategy; +use vortex_parquet_variant::ParquetVariant; use vortex_pco::Pco; use vortex_runend::RunEnd; use vortex_sequence::Sequence; @@ -89,6 +91,7 @@ pub static ALLOWED_ENCODINGS: LazyLock> = LazyLock::new(|| { allowed.insert(Constant.id()); allowed.insert(Masked.id()); allowed.insert(Dict.id()); + allowed.insert(Variant.id()); // Compressed encodings from encoding crates allowed.insert(ALP.id()); @@ -111,6 +114,7 @@ pub static ALLOWED_ENCODINGS: LazyLock> = LazyLock::new(|| { if use_experimental_patches() { allowed.insert(Patched.id()); + allowed.insert(ParquetVariant.id()); } #[cfg(feature = "zstd")] diff --git a/vortex-layout/src/layouts/zoned/builder.rs b/vortex-layout/src/layouts/zoned/builder.rs index 25a6e89f9db..3d43661201e 100644 --- a/vortex-layout/src/layouts/zoned/builder.rs +++ b/vortex-layout/src/layouts/zoned/builder.rs @@ -47,6 +47,13 @@ pub struct StatsAccumulator { impl StatsAccumulator { pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self { + if !supports_file_stats(dtype) { + return Self { + builders: Vec::new(), + length: 0, + }; + } + let builders = stats .iter() .filter_map(|&stat| { @@ -165,6 +172,10 @@ impl StatsAccumulator { } } +fn supports_file_stats(dtype: &DType) -> bool { + !matches!(dtype, DType::Variant(_)) +} + fn stats_builder_with_capacity( stat: Stat, dtype: &DType, From c976bb276ee51e12f132b84e716da4d3f1c63fd5 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 15 May 2026 15:40:50 +0100 Subject: [PATCH 02/12] chrono Signed-off-by: Adam Gutglick --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 92890d2e31a..f225db1e812 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ cargo_metadata = "0.23.1" cbindgen = "0.29.0" cc = "1.2" cfg-if = "1.0.1" -chrono = "0.4.42" +chrono = "0.4.44" clap = "4.5" criterion = "0.8" crossterm = "0.29" From 709f6287e7a631fda1db4d551a87618030b63980 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 15 May 2026 15:47:04 +0100 Subject: [PATCH 03/12] fix thing Signed-off-by: Adam Gutglick --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 42e9040d86d..afabc6b4adf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -162,7 +162,7 @@ jobs: target: wasm32-unknown-unknown env: rustflags: "RUSTFLAGS='-A warnings --cfg getrandom_backend=\"unsupported\"'" - args: "--target wasm32-unknown-unknown --exclude vortex --exclude vortex-cuda --exclude vortex-cub --exclude vortex-nvcomp --exclude vortex-datafusion --exclude vortex-duckdb --exclude vortex-tui --exclude vortex-zstd --exclude vortex-test-e2e-cuda --exclude vortex-sqllogictest" + args: "--target wasm32-unknown-unknown --exclude vortex --exclude vortex-cuda --exclude vortex-cub --exclude vortex-nvcomp --exclude vortex-datafusion --exclude vortex-duckdb --exclude vortex-tui --exclude vortex-zstd --exclude vortex-test-e2e-cuda --exclude vortex-sqllogictest --exclude vortex-parquet-variant" steps: - uses: runs-on/action@v2 if: github.repository == 'vortex-data/vortex' From e5a3868175d47c7521d0e8cb37af6990d146f42c Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 15 May 2026 15:47:54 +0100 Subject: [PATCH 04/12] id Signed-off-by: Adam Gutglick --- vortex-file/src/strategy.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index d878d99f7a0..661e72240d1 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -53,7 +53,6 @@ use vortex_layout::layouts::repartition::RepartitionWriterOptions; use vortex_layout::layouts::table::TableStrategy; use vortex_layout::layouts::zoned::writer::ZonedLayoutOptions; use vortex_layout::layouts::zoned::writer::ZonedStrategy; -use vortex_parquet_variant::ParquetVariant; use vortex_pco::Pco; use vortex_runend::RunEnd; use vortex_sequence::Sequence; @@ -114,7 +113,6 @@ pub static ALLOWED_ENCODINGS: LazyLock> = LazyLock::new(|| { if use_experimental_patches() { allowed.insert(Patched.id()); - allowed.insert(ParquetVariant.id()); } #[cfg(feature = "zstd")] From 24fb3f71dff5c1bab6092c052922210ffebe7fd5 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 15 May 2026 16:53:12 +0100 Subject: [PATCH 05/12] compress stuff Signed-off-by: Adam Gutglick --- vortex-compressor/src/compressor.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index 402ede34a88..b17e91928c7 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -15,6 +15,7 @@ use vortex_array::arrays::ListArray; use vortex_array::arrays::ListViewArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; +use vortex_array::arrays::VariantArray; use vortex_array::arrays::extension::ExtensionArrayExt; use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt; use vortex_array::arrays::list::ListArrayExt; @@ -23,6 +24,7 @@ use vortex_array::arrays::listview::list_from_list_view; use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::arrays::scalar_fn::AnyScalarFn; use vortex_array::arrays::struct_::StructArrayExt; +use vortex_array::arrays::variant::VariantArrayExt; use vortex_array::dtype::DType; use vortex_array::dtype::Nullability; use vortex_array::scalar::Scalar; @@ -249,7 +251,14 @@ impl CascadingCompressor { .into_array(), ) } - Canonical::Variant(variant_array) => Ok(variant_array.into_array()), + Canonical::Variant(variant_array) => { + let core_storage = self.compress(variant_array.core_storage(), exec_ctx)?; + let shredded = variant_array + .shredded() + .map(|arr| self.compress(arr, exec_ctx)) + .transpose()?; + Ok(VariantArray::try_new(core_storage, shredded)?.into_array()) + } } } From e92d537a1a523ab235e3407976ee925dd1dd2ba4 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 15 May 2026 17:04:36 +0100 Subject: [PATCH 06/12] compressor Signed-off-by: Adam Gutglick --- vortex-compressor/src/compressor.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index b17e91928c7..74a7585af74 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -4,6 +4,7 @@ //! Cascading array compression implementation. use vortex_array::ArrayRef; +use vortex_array::ArraySlots; use vortex_array::Canonical; use vortex_array::CanonicalValidity; use vortex_array::ExecutionCtx; @@ -252,11 +253,13 @@ impl CascadingCompressor { ) } Canonical::Variant(variant_array) => { - let core_storage = self.compress(variant_array.core_storage(), exec_ctx)?; + let core_storage = + self.compress_physical_slots(variant_array.core_storage(), exec_ctx)?; let shredded = variant_array .shredded() - .map(|arr| self.compress(arr, exec_ctx)) + .map(|arr| self.compress_physical_slots(arr, exec_ctx)) .transpose()?; + Ok(VariantArray::try_new(core_storage, shredded)?.into_array()) } } @@ -563,6 +566,24 @@ impl CascadingCompressor { )? .into_array()) } + + fn compress_physical_slots( + &self, + array: &ArrayRef, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let slots = array + .slots() + .iter() + .map(|slot| { + slot.as_ref() + .map(|child| self.compress(child, exec_ctx)) + .transpose() + }) + .collect::>()?; + + array.clone().with_slots(slots) + } } #[cfg(test)] From 9faabefab5c3e78d9459b6230d7277c8dbc2b2b1 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 19 May 2026 11:10:14 +0100 Subject: [PATCH 07/12] nicer pq-variant tests Signed-off-by: Adam Gutglick --- encodings/parquet-variant/src/vtable.rs | 130 ++++++++++++++---------- 1 file changed, 76 insertions(+), 54 deletions(-) diff --git a/encodings/parquet-variant/src/vtable.rs b/encodings/parquet-variant/src/vtable.rs index 48ee372e4d0..754253471a4 100644 --- a/encodings/parquet-variant/src/vtable.rs +++ b/encodings/parquet-variant/src/vtable.rs @@ -302,6 +302,8 @@ mod tests { use arrow_schema::DataType; use arrow_schema::Field; use parquet_variant_compute::VariantArray as ArrowVariantArray; + use rstest::fixture; + use rstest::rstest; use vortex_array::ArrayContext; use vortex_array::ArrayEq; use vortex_array::ArrayRef; @@ -333,6 +335,7 @@ mod tests { use vortex_file::OpenOptionsSessionExt; use vortex_file::WriteOptionsSessionExt; use vortex_io::session::RuntimeSession; + use vortex_layout::LayoutStrategy; use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; use vortex_layout::session::LayoutSession; use vortex_session::VortexSession; @@ -341,30 +344,32 @@ mod tests { use crate::ParquetVariant; use crate::array::ParquetVariantArrayExt; - fn roundtrip(array: ArrayRef) -> ArrayRef { - let dtype = array.dtype().clone(); - let len = array.len(); + type Roundtrip = fn(ArrayRef) -> VortexResult; - let session = VortexSession::empty().with::(); - session.arrays().register(ParquetVariant); + #[fixture] + fn roundtrip() -> Roundtrip { + |array| { + let dtype = array.dtype().clone(); + let len = array.len(); - let ctx = ArrayContext::empty(); - let serialized = array - .serialize(&ctx, &session, &SerializeOptions::default()) - .unwrap(); + let session = VortexSession::empty().with::(); + session.arrays().register(ParquetVariant); - let mut concat = ByteBufferMut::empty(); - for buf in serialized { - concat.extend_from_slice(buf.as_ref()); - } - let concat = concat.freeze(); + let ctx = ArrayContext::empty(); + let serialized = array.serialize(&ctx, &session, &SerializeOptions::default())?; + + let mut concat = ByteBufferMut::empty(); + for buf in serialized { + concat.extend_from_slice(buf.as_ref()); + } + let concat = concat.freeze(); - let parts = SerializedArray::try_from(concat).unwrap(); - parts - .decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session) - .unwrap() + let parts = SerializedArray::try_from(concat)?; + parts.decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session) + } } + #[fixture] fn typed_value_variant_array() -> VortexResult { let mut metadata = BinaryViewBuilder::new(); for _ in 0..3 { @@ -385,6 +390,7 @@ mod tests { ParquetVariant::from_arrow_variant(&ArrowVariantArray::try_new(&arrow_storage)?) } + #[fixture] fn parquet_variant_file_session() -> VortexSession { let session = VortexSession::empty() .with::() @@ -395,7 +401,8 @@ mod tests { session } - fn write_strategy_with_parquet_variant() -> Arc { + #[fixture] + fn write_strategy() -> Arc { let mut allowed = vortex_file::ALLOWED_ENCODINGS.clone(); allowed.insert(ParquetVariant.id()); vortex_file::WriteStrategyBuilder::default() @@ -443,19 +450,22 @@ mod tests { Ok(()) } + #[rstest] #[tokio::test] - async fn test_file_roundtrip_typed_value_variant_with_statistics() -> VortexResult<()> { - let expected = typed_value_variant_array()?; - let session = parquet_variant_file_session(); + async fn test_file_roundtrip_typed_value_variant_with_statistics( + #[from(typed_value_variant_array)] expected: VortexResult, + parquet_variant_file_session: VortexSession, + ) -> VortexResult<()> { + let expected = expected?; let mut bytes = ByteBufferMut::empty(); - session + parquet_variant_file_session .write_options() .with_strategy(Arc::new(FlatLayoutStrategy::default())) .write(&mut bytes, expected.to_array_stream()) .await?; - let actual = session + let actual = parquet_variant_file_session .open_options() .open_buffer(bytes)? .scan()? @@ -467,19 +477,23 @@ mod tests { Ok(()) } + #[rstest] #[tokio::test] - async fn test_file_roundtrip_typed_value_variant_with_zoned_strategy() -> VortexResult<()> { - let expected = typed_value_variant_array()?; - let session = parquet_variant_file_session(); + async fn test_file_roundtrip_typed_value_variant_with_zoned_strategy( + #[from(typed_value_variant_array)] expected: VortexResult, + parquet_variant_file_session: VortexSession, + write_strategy: Arc, + ) -> VortexResult<()> { + let expected = expected?; let mut bytes = ByteBufferMut::empty(); - session + parquet_variant_file_session .write_options() - .with_strategy(write_strategy_with_parquet_variant()) + .with_strategy(write_strategy) .write(&mut bytes, expected.to_array_stream()) .await?; - let actual = session + let actual = parquet_variant_file_session .open_options() .open_buffer(bytes)? .scan()? @@ -491,8 +505,8 @@ mod tests { Ok(()) } - #[test] - fn test_serde_roundtrip_typed_value_variant() { + #[rstest] + fn test_serde_roundtrip_typed_value_variant(roundtrip: Roundtrip) -> VortexResult<()> { let outer_metadata = VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array(); @@ -504,48 +518,52 @@ mod tests { inner_metadata, Some(inner_value), None, - ) - .unwrap(); - let typed_value = VariantArray::try_new(inner_pv.into_array(), None) - .unwrap() - .into_array(); + )?; + let typed_value = VariantArray::try_new(inner_pv.into_array(), None)?.into_array(); let outer_pv = ParquetVariant::try_new( Validity::NonNullable, outer_metadata, None, Some(typed_value), - ) - .unwrap(); + )?; let array = outer_pv.into_array(); - let decoded = roundtrip(array.clone()); + let decoded = roundtrip(array.clone())?; assert!(array.array_eq(&decoded, Precision::Value)); - let decoded_pv = decoded.as_opt::().unwrap(); - let typed = decoded_pv.typed_value_array().unwrap(); + let decoded_pv = decoded + .as_opt::() + .ok_or_else(|| vortex_err!("expected parquet variant array"))?; + let typed = decoded_pv + .typed_value_array() + .ok_or_else(|| vortex_err!("expected typed_value child"))?; assert_eq!(typed.dtype(), &DType::Variant(Nullability::NonNullable)); + Ok(()) } - #[test] - fn test_serde_roundtrip_with_nullable_validity() { + #[rstest] + fn test_serde_roundtrip_with_nullable_validity(roundtrip: Roundtrip) -> VortexResult<()> { let metadata = VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array(); let value = VarBinViewArray::from_iter_bin([b"\x10", b"\x11", b"\x12"]).into_array(); let validity = Validity::from(BitBuffer::from_iter([true, false, true])); - let pv = ParquetVariant::try_new(validity, metadata, Some(value), None).unwrap(); + let pv = ParquetVariant::try_new(validity, metadata, Some(value), None)?; let array = pv.into_array(); - let decoded = roundtrip(array.clone()); + let decoded = roundtrip(array.clone())?; assert!(array.array_eq(&decoded, Precision::Value)); assert_eq!(decoded.dtype(), &DType::Variant(Nullability::Nullable)); - let decoded_pv = decoded.as_opt::().unwrap(); + let decoded_pv = decoded + .as_opt::() + .ok_or_else(|| vortex_err!("expected parquet variant array"))?; assert!(decoded_pv.value_array().is_some()); assert!(decoded_pv.typed_value_array().is_none()); + Ok(()) } - #[test] - fn test_serde_roundtrip_typed_value_int32() { + #[rstest] + fn test_serde_roundtrip_typed_value_int32(roundtrip: Roundtrip) -> VortexResult<()> { let outer_metadata = VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array(); let typed_value = buffer![10i32, 20, 30].into_array(); @@ -555,17 +573,21 @@ mod tests { outer_metadata, None, Some(typed_value), - ) - .unwrap(); + )?; let array = outer_pv.into_array(); - let decoded = roundtrip(array.clone()); + let decoded = roundtrip(array.clone())?; assert!(array.array_eq(&decoded, Precision::Value)); - let decoded_pv = decoded.as_opt::().unwrap(); - let typed = decoded_pv.typed_value_array().unwrap(); + let decoded_pv = decoded + .as_opt::() + .ok_or_else(|| vortex_err!("expected parquet variant array"))?; + let typed = decoded_pv + .typed_value_array() + .ok_or_else(|| vortex_err!("expected typed_value child"))?; assert_eq!( typed.dtype(), &DType::Primitive(PType::I32, Nullability::NonNullable) ); + Ok(()) } } From bd4c963fbd4ba6085700d2a5b68f7617d975c2c5 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 19 May 2026 13:44:52 +0100 Subject: [PATCH 08/12] doc Signed-off-by: Adam Gutglick --- vortex-compressor/src/compressor.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index 74a7585af74..b27e8e8dac0 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -567,6 +567,7 @@ impl CascadingCompressor { .into_array()) } + /// Compress very child slot of the array, then re-build it from them. fn compress_physical_slots( &self, array: &ArrayRef, From bcf8cb2e94668c1e2485e45d42f99a4a0a01e86f Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 19 May 2026 14:36:46 +0100 Subject: [PATCH 09/12] fix pytest Signed-off-by: Adam Gutglick --- vortex-python/src/io.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-python/src/io.rs b/vortex-python/src/io.rs index 7da526390bb..bad3804b31b 100644 --- a/vortex-python/src/io.rs +++ b/vortex-python/src/io.rs @@ -258,7 +258,7 @@ impl PyVortexWriteOptions { /// >>> vx.io.VortexWriteOptions.default().write(sprl, "chonky.vortex") /// >>> import os /// >>> os.path.getsize('chonky.vortex') - /// 215972 + /// 216004 /// /// Wow, Vortex manages to use about two bytes per integer! So advanced. So tiny. /// @@ -268,7 +268,7 @@ impl PyVortexWriteOptions { /// /// >>> vx.io.VortexWriteOptions.compact().write(sprl, "tiny.vortex") /// >>> os.path.getsize('tiny.vortex') - /// 55088 + /// 55120 /// /// Random numbers are not (usually) composed of random bytes! #[staticmethod] From b9115344e92a09fe47903ccdf33c0e2a39e6c37f Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 19 May 2026 14:40:29 +0100 Subject: [PATCH 10/12] fix dependency issue Signed-off-by: Adam Gutglick --- Cargo.lock | 1 - vortex-file/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91c061d6edb..6f2b7bb670e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9716,7 +9716,6 @@ dependencies = [ "vortex-layout", "vortex-mask", "vortex-metrics", - "vortex-parquet-variant", "vortex-pco", "vortex-runend", "vortex-scan", diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 8bfbeabc5ca..77d664a12cb 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -46,7 +46,6 @@ vortex-io = { workspace = true } vortex-layout = { workspace = true } vortex-mask = { workspace = true } vortex-metrics = { workspace = true } -vortex-parquet-variant = { workspace = true } vortex-pco = { workspace = true } vortex-runend = { workspace = true } vortex-scan = { workspace = true } From 5efb77530085d9aef45782b65ee0f12d88633af8 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 20 May 2026 12:01:56 +0100 Subject: [PATCH 11/12] simplify test Signed-off-by: Adam Gutglick --- encodings/parquet-variant/src/vtable.rs | 39 +++++++++++-------------- vortex-compressor/src/compressor.rs | 10 ++++++- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/encodings/parquet-variant/src/vtable.rs b/encodings/parquet-variant/src/vtable.rs index 754253471a4..40f8dbb0604 100644 --- a/encodings/parquet-variant/src/vtable.rs +++ b/encodings/parquet-variant/src/vtable.rs @@ -344,29 +344,24 @@ mod tests { use crate::ParquetVariant; use crate::array::ParquetVariantArrayExt; - type Roundtrip = fn(ArrayRef) -> VortexResult; + fn roundtrip(array: ArrayRef) -> VortexResult { + let dtype = array.dtype().clone(); + let len = array.len(); - #[fixture] - fn roundtrip() -> Roundtrip { - |array| { - let dtype = array.dtype().clone(); - let len = array.len(); - - let session = VortexSession::empty().with::(); - session.arrays().register(ParquetVariant); - - let ctx = ArrayContext::empty(); - let serialized = array.serialize(&ctx, &session, &SerializeOptions::default())?; + let session = VortexSession::empty().with::(); + session.arrays().register(ParquetVariant); - let mut concat = ByteBufferMut::empty(); - for buf in serialized { - concat.extend_from_slice(buf.as_ref()); - } - let concat = concat.freeze(); + let ctx = ArrayContext::empty(); + let serialized = array.serialize(&ctx, &session, &SerializeOptions::default())?; - let parts = SerializedArray::try_from(concat)?; - parts.decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session) + let mut concat = ByteBufferMut::empty(); + for buf in serialized { + concat.extend_from_slice(buf.as_ref()); } + let concat = concat.freeze(); + + let parts = SerializedArray::try_from(concat)?; + parts.decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session) } #[fixture] @@ -506,7 +501,7 @@ mod tests { } #[rstest] - fn test_serde_roundtrip_typed_value_variant(roundtrip: Roundtrip) -> VortexResult<()> { + fn test_serde_roundtrip_typed_value_variant() -> VortexResult<()> { let outer_metadata = VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array(); @@ -542,7 +537,7 @@ mod tests { } #[rstest] - fn test_serde_roundtrip_with_nullable_validity(roundtrip: Roundtrip) -> VortexResult<()> { + fn test_serde_roundtrip_with_nullable_validity() -> VortexResult<()> { let metadata = VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array(); let value = VarBinViewArray::from_iter_bin([b"\x10", b"\x11", b"\x12"]).into_array(); @@ -563,7 +558,7 @@ mod tests { } #[rstest] - fn test_serde_roundtrip_typed_value_int32(roundtrip: Roundtrip) -> VortexResult<()> { + fn test_serde_roundtrip_typed_value_int32() -> VortexResult<()> { let outer_metadata = VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array(); let typed_value = buffer![10i32, 20, 30].into_array(); diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index b27e8e8dac0..da95e6a832f 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -16,6 +16,7 @@ use vortex_array::arrays::ListArray; use vortex_array::arrays::ListViewArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; +use vortex_array::arrays::Variant; use vortex_array::arrays::VariantArray; use vortex_array::arrays::extension::ExtensionArrayExt; use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt; @@ -257,7 +258,14 @@ impl CascadingCompressor { self.compress_physical_slots(variant_array.core_storage(), exec_ctx)?; let shredded = variant_array .shredded() - .map(|arr| self.compress_physical_slots(arr, exec_ctx)) + .map(|arr| { + // Avoid stack-overflow for variant shredded values + if arr.is::() { + self.compress_physical_slots(arr, exec_ctx) + } else { + self.compress(arr, exec_ctx) + } + }) .transpose()?; Ok(VariantArray::try_new(core_storage, shredded)?.into_array()) From 93cc8939e7aefaae9438f47bec741f0b264cb591 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 20 May 2026 21:44:55 +0100 Subject: [PATCH 12/12] Fix variant canonicalization Signed-off-by: Adam Gutglick --- vortex-array/src/canonical.rs | 83 +++++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 9 deletions(-) diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 3a4271c462c..b1773d453f2 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -660,10 +660,14 @@ impl Executable for CanonicalValidity { let shredded = variant .shredded() .map(|shredded| { - shredded - .clone() - .execute::(ctx) - .map(|canonical| canonical.0.into_array()) + if shredded.is::() { + recursively_canonicalize_slots(shredded, ctx) + } else { + shredded + .clone() + .execute::(ctx) + .map(|canonical| canonical.0.into_array()) + } }) .transpose()?; Ok(CanonicalValidity(Canonical::Variant( @@ -702,7 +706,6 @@ fn recursively_canonicalize_slots( .collect::>()?; array.clone().with_slots(slots) } - impl Executable for RecursiveCanonical { fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { match array.execute::(ctx)? { @@ -827,10 +830,14 @@ impl Executable for RecursiveCanonical { let shredded = variant .shredded() .map(|shredded| { - shredded - .clone() - .execute::(ctx) - .map(|canonical| canonical.0.into_array()) + if shredded.is::() { + recursively_canonicalize_slots(shredded, ctx) + } else { + shredded + .clone() + .execute::(ctx) + .map(|canonical| canonical.0.into_array()) + } }) .transpose()?; Ok(RecursiveCanonical(Canonical::Variant( @@ -1113,15 +1120,73 @@ mod test { use arrow_schema::DataType; use arrow_schema::Field; use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_error::vortex_err; use crate::ArrayRef; + use crate::Canonical; + use crate::CanonicalValidity; use crate::IntoArray; use crate::LEGACY_SESSION; use crate::VortexSessionExecute; + use crate::arrays::Constant; use crate::arrays::ConstantArray; + use crate::arrays::Primitive; + use crate::arrays::Struct; + use crate::arrays::Variant; + use crate::arrays::VariantArray; + use crate::arrays::struct_::StructArrayExt; + use crate::arrays::variant::VariantArrayExt; use crate::arrow::ArrowSessionExt; use crate::arrow::FromArrowArray; use crate::canonical::StructArray; + use crate::dtype::Nullability; + use crate::scalar::Scalar; + + fn variant_core_storage(len: usize) -> ArrayRef { + ConstantArray::new( + Scalar::variant(Scalar::primitive(1i32, Nullability::NonNullable)), + len, + ) + .into_array() + } + + #[test] + fn canonical_validity_canonicalizes_variant_shredded_physical_slots() -> VortexResult<()> { + let len = 2; + let nested_shredded = + StructArray::try_from_iter([("value", ConstantArray::new(10i32, len).into_array())])?; + let inner_variant = VariantArray::try_new( + variant_core_storage(len), + Some(nested_shredded.into_array()), + )?; + let outer_variant = + VariantArray::try_new(variant_core_storage(len), Some(inner_variant.into_array()))?; + + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let Canonical::Variant(canonical) = outer_variant + .into_array() + .execute::(&mut ctx)? + .0 + else { + return Err(vortex_err!("expected canonical variant")); + }; + + let nested_variant = canonical + .shredded() + .and_then(|shredded| shredded.as_opt::()) + .ok_or_else(|| vortex_err!("expected nested variant shredded child"))?; + let nested_struct = nested_variant + .shredded() + .and_then(|shredded| shredded.as_opt::()) + .ok_or_else(|| vortex_err!("expected nested struct shredded child"))?; + let value = nested_struct.unmasked_field_by_name("value")?; + + assert!(value.is::()); + assert!(!value.is::()); + + Ok(()) + } #[test] fn test_canonicalize_nested_struct() {