Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ jobs:
target: wasm32-unknown-unknown
env:
rustflags: "RUSTFLAGS='-A warnings --cfg getrandom_backend=\"unsupported\"'"
args: "--target wasm32-unknown-unknown --exclude vortex --exclude vortex-cuda --exclude vortex-cub --exclude vortex-nvcomp --exclude vortex-datafusion --exclude vortex-duckdb --exclude vortex-tui --exclude vortex-zstd --exclude vortex-test-e2e-cuda --exclude vortex-sqllogictest"
args: "--target wasm32-unknown-unknown --exclude vortex --exclude vortex-cuda --exclude vortex-cub --exclude vortex-nvcomp --exclude vortex-datafusion --exclude vortex-duckdb --exclude vortex-tui --exclude vortex-zstd --exclude vortex-test-e2e-cuda --exclude vortex-sqllogictest --exclude vortex-parquet-variant"
steps:
- uses: runs-on/action@v2
if: github.repository == 'vortex-data/vortex'
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ cargo_metadata = "0.23.1"
cbindgen = "0.29.0"
cc = "1.2"
cfg-if = "1.0.1"
chrono = "0.4.42"
chrono = "0.4.44"
clap = "4.5"
criterion = "0.8"
crossterm = "0.29"
Expand Down Expand Up @@ -289,6 +289,7 @@ vortex-ipc = { version = "0.1.0", path = "./vortex-ipc", default-features = fals
vortex-layout = { version = "0.1.0", path = "./vortex-layout", default-features = false }
vortex-mask = { version = "0.1.0", path = "./vortex-mask", default-features = false }
vortex-metrics = { version = "0.1.0", path = "./vortex-metrics", default-features = false }
vortex-parquet-variant = { version = "0.1.0", path = "./encodings/parquet-variant" }
vortex-pco = { version = "0.1.0", path = "./encodings/pco", default-features = false }
vortex-proto = { version = "0.1.0", path = "./vortex-proto", default-features = false }
vortex-runend = { version = "0.1.0", path = "./encodings/runend", default-features = false }
Expand Down
177 changes: 117 additions & 60 deletions encodings/parquet-variant/src/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,16 @@ mod tests {
use arrow_schema::DataType;
use arrow_schema::Field;
use parquet_variant_compute::VariantArray as ArrowVariantArray;
use rstest::fixture;
use rstest::rstest;
use vortex_array::ArrayContext;
use vortex_array::ArrayEq;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::Precision;
use vortex_array::VTable;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
Expand All @@ -332,6 +335,7 @@ mod tests {
use vortex_file::OpenOptionsSessionExt;
use vortex_file::WriteOptionsSessionExt;
use vortex_io::session::RuntimeSession;
use vortex_layout::LayoutStrategy;
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
use vortex_layout::session::LayoutSession;
use vortex_session::VortexSession;
Expand All @@ -340,28 +344,65 @@ mod tests {
use crate::ParquetVariant;
use crate::array::ParquetVariantArrayExt;

fn roundtrip(array: ArrayRef) -> ArrayRef {
fn roundtrip(array: ArrayRef) -> VortexResult<ArrayRef> {
let dtype = array.dtype().clone();
let len = array.len();

let session = VortexSession::empty().with::<ArraySession>();
session.arrays().register(ParquetVariant);

let ctx = ArrayContext::empty();
let serialized = array
.serialize(&ctx, &session, &SerializeOptions::default())
.unwrap();
let serialized = array.serialize(&ctx, &session, &SerializeOptions::default())?;

let mut concat = ByteBufferMut::empty();
for buf in serialized {
concat.extend_from_slice(buf.as_ref());
}
let concat = concat.freeze();

let parts = SerializedArray::try_from(concat).unwrap();
parts
.decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session)
.unwrap()
let parts = SerializedArray::try_from(concat)?;
parts.decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session)
}

#[fixture]
fn typed_value_variant_array() -> VortexResult<ArrayRef> {
let mut metadata = BinaryViewBuilder::new();
for _ in 0..3 {
metadata.append_value(b"\x01\x00");
}
let metadata: ArrowArrayRef = Arc::new(metadata.finish());
let typed_value: ArrowArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
let arrow_storage = StructArray::try_new(
vec![
Arc::new(Field::new("metadata", DataType::BinaryView, false)),
Arc::new(Field::new("typed_value", DataType::Int32, false)),
]
.into(),
vec![metadata, typed_value],
None,
)?;

ParquetVariant::from_arrow_variant(&ArrowVariantArray::try_new(&arrow_storage)?)
}

#[fixture]
fn parquet_variant_file_session() -> VortexSession {
let session = VortexSession::empty()
.with::<ArraySession>()
.with::<LayoutSession>()
.with::<RuntimeSession>();
vortex_file::register_default_encodings(&session);
session.arrays().register(ParquetVariant);
session
}

#[fixture]
fn write_strategy() -> Arc<dyn LayoutStrategy> {
let mut allowed = vortex_file::ALLOWED_ENCODINGS.clone();
allowed.insert(ParquetVariant.id());
vortex_file::WriteStrategyBuilder::default()
.with_allow_encodings(allowed)
.build()
}

#[test]
Expand Down Expand Up @@ -404,42 +445,22 @@ mod tests {
Ok(())
}

#[rstest]
#[tokio::test]
async fn test_file_roundtrip_typed_value_variant() -> VortexResult<()> {
let mut metadata = BinaryViewBuilder::new();
for _ in 0..3 {
metadata.append_value(b"\x01\x00");
}
let metadata: ArrowArrayRef = Arc::new(metadata.finish());
let typed_value: ArrowArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
let arrow_storage = StructArray::try_new(
vec![
Arc::new(Field::new("metadata", DataType::BinaryView, false)),
Arc::new(Field::new("typed_value", DataType::Int32, false)),
]
.into(),
vec![metadata, typed_value],
None,
)?;
let expected =
ParquetVariant::from_arrow_variant(&ArrowVariantArray::try_new(&arrow_storage)?)?;

let session = VortexSession::empty()
.with::<ArraySession>()
.with::<LayoutSession>()
.with::<RuntimeSession>();
vortex_file::register_default_encodings(&session);
session.arrays().register(ParquetVariant);
async fn test_file_roundtrip_typed_value_variant_with_statistics(
#[from(typed_value_variant_array)] expected: VortexResult<ArrayRef>,
parquet_variant_file_session: VortexSession,
) -> VortexResult<()> {
let expected = expected?;

let mut bytes = ByteBufferMut::empty();
session
parquet_variant_file_session
.write_options()
.with_strategy(Arc::new(FlatLayoutStrategy::default()))
.with_file_statistics(Vec::new())
.write(&mut bytes, expected.to_array_stream())
.await?;

let actual = session
let actual = parquet_variant_file_session
.open_options()
.open_buffer(bytes)?
.scan()?
Expand All @@ -451,8 +472,36 @@ mod tests {
Ok(())
}

#[test]
fn test_serde_roundtrip_typed_value_variant() {
#[rstest]
#[tokio::test]
async fn test_file_roundtrip_typed_value_variant_with_zoned_strategy(
#[from(typed_value_variant_array)] expected: VortexResult<ArrayRef>,
parquet_variant_file_session: VortexSession,
write_strategy: Arc<dyn LayoutStrategy>,
) -> VortexResult<()> {
let expected = expected?;

let mut bytes = ByteBufferMut::empty();
parquet_variant_file_session
.write_options()
.with_strategy(write_strategy)
.write(&mut bytes, expected.to_array_stream())
.await?;

let actual = parquet_variant_file_session
.open_options()
.open_buffer(bytes)?
.scan()?
.into_array_stream()?
.read_all()
.await?;

assert_arrays_eq!(expected, actual);
Ok(())
}

#[rstest]
fn test_serde_roundtrip_typed_value_variant() -> VortexResult<()> {
let outer_metadata =
VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array();

Expand All @@ -464,48 +513,52 @@ mod tests {
inner_metadata,
Some(inner_value),
None,
)
.unwrap();
let typed_value = VariantArray::try_new(inner_pv.into_array(), None)
.unwrap()
.into_array();
)?;
let typed_value = VariantArray::try_new(inner_pv.into_array(), None)?.into_array();

let outer_pv = ParquetVariant::try_new(
Validity::NonNullable,
outer_metadata,
None,
Some(typed_value),
)
.unwrap();
)?;
let array = outer_pv.into_array();
let decoded = roundtrip(array.clone());
let decoded = roundtrip(array.clone())?;

assert!(array.array_eq(&decoded, Precision::Value));
let decoded_pv = decoded.as_opt::<ParquetVariant>().unwrap();
let typed = decoded_pv.typed_value_array().unwrap();
let decoded_pv = decoded
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("expected parquet variant array"))?;
let typed = decoded_pv
.typed_value_array()
.ok_or_else(|| vortex_err!("expected typed_value child"))?;
assert_eq!(typed.dtype(), &DType::Variant(Nullability::NonNullable));
Ok(())
}

#[test]
fn test_serde_roundtrip_with_nullable_validity() {
#[rstest]
fn test_serde_roundtrip_with_nullable_validity() -> VortexResult<()> {
let metadata =
VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array();
let value = VarBinViewArray::from_iter_bin([b"\x10", b"\x11", b"\x12"]).into_array();
let validity = Validity::from(BitBuffer::from_iter([true, false, true]));

let pv = ParquetVariant::try_new(validity, metadata, Some(value), None).unwrap();
let pv = ParquetVariant::try_new(validity, metadata, Some(value), None)?;
let array = pv.into_array();
let decoded = roundtrip(array.clone());
let decoded = roundtrip(array.clone())?;

assert!(array.array_eq(&decoded, Precision::Value));
assert_eq!(decoded.dtype(), &DType::Variant(Nullability::Nullable));
let decoded_pv = decoded.as_opt::<ParquetVariant>().unwrap();
let decoded_pv = decoded
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("expected parquet variant array"))?;
assert!(decoded_pv.value_array().is_some());
assert!(decoded_pv.typed_value_array().is_none());
Ok(())
}

#[test]
fn test_serde_roundtrip_typed_value_int32() {
#[rstest]
fn test_serde_roundtrip_typed_value_int32() -> VortexResult<()> {
let outer_metadata =
VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array();
let typed_value = buffer![10i32, 20, 30].into_array();
Expand All @@ -515,17 +568,21 @@ mod tests {
outer_metadata,
None,
Some(typed_value),
)
.unwrap();
)?;
let array = outer_pv.into_array();
let decoded = roundtrip(array.clone());
let decoded = roundtrip(array.clone())?;

assert!(array.array_eq(&decoded, Precision::Value));
let decoded_pv = decoded.as_opt::<ParquetVariant>().unwrap();
let typed = decoded_pv.typed_value_array().unwrap();
let decoded_pv = decoded
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("expected parquet variant array"))?;
let typed = decoded_pv
.typed_value_array()
.ok_or_else(|| vortex_err!("expected typed_value child"))?;
assert_eq!(
typed.dtype(),
&DType::Primitive(PType::I32, Nullability::NonNullable)
);
Ok(())
}
}
Loading
Loading