From 570d358939f4d8e7919ea377773c0cfdd6637fe8 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 17 May 2026 22:08:07 +0000 Subject: [PATCH] vortex-row: codec for nested canonical types Extend the codec to handle Struct, FixedSizeList, and Extension canonical variants. Each nested row encodes as `outer_sentinel | child bytes...`; for null rows the child bytes are zero-filled after the recursive encoders run so two null rows compare equal regardless of which non-null values would have been written by the children. `row_width_for_dtype` recurses through Struct fields and FSL elements to return `Fixed(w)` when every leaf is fixed; otherwise `Variable`. Extension delegates to its storage dtype. List remains `Variable` and ListView still bails (the row encoder's output is itself a ListView, so nested ListView isn't a near-term use case). Variant and Union bail explicitly. Signed-off-by: Claude --- vortex-row/src/codec.rs | 227 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 216 insertions(+), 11 deletions(-) diff --git a/vortex-row/src/codec.rs b/vortex-row/src/codec.rs index 4f70d80e5ae..8468301e5b3 100644 --- a/vortex-row/src/codec.rs +++ b/vortex-row/src/codec.rs @@ -30,9 +30,15 @@ use vortex_array::ExecutionCtx; use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::BoolArray; use vortex_array::arrays::DecimalArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::FixedSizeListArray; use vortex_array::arrays::NullArray; use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::StructArray; use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt; +use vortex_array::arrays::struct_::StructArrayExt; use vortex_array::dtype::DType; use vortex_array::dtype::DecimalType; use vortex_array::dtype::NativePType; @@ -112,9 +118,28 @@ pub fn row_width_for_dtype(dtype: &DType) -> VortexResult { ))) } DType::Utf8(_) | DType::Binary(_) => Ok(RowWidth::Variable), - DType::Struct(..) | DType::FixedSizeList(..) | DType::List(..) | DType::Extension(..) => { - vortex_bail!("row encoding for {} is not yet supported", dtype) + DType::FixedSizeList(elem, n, _) => match row_width_for_dtype(elem)? { + // FSL is fixed iff its element type is fixed. Add a sentinel byte for the FSL + // itself, then `n` copies of the element width. + RowWidth::Fixed(w) => { + let body = w.saturating_mul(*n); + Ok(RowWidth::Fixed(body.saturating_add(1))) + } + RowWidth::Variable => Ok(RowWidth::Variable), + }, + DType::Struct(fields, _) => { + // Struct is fixed iff all its fields are fixed; sum their widths plus a sentinel. + let mut total: u32 = 1; // outer sentinel + for field_dtype in fields.fields() { + match row_width_for_dtype(&field_dtype)? { + RowWidth::Fixed(w) => total = total.saturating_add(w), + RowWidth::Variable => return Ok(RowWidth::Variable), + } + } + Ok(RowWidth::Fixed(total)) } + DType::List(..) => Ok(RowWidth::Variable), + DType::Extension(ext) => row_width_for_dtype(ext.storage_dtype()), DType::Variant(_) => { vortex_bail!("row encoding does not support Variant arrays (no well-defined ordering)") } @@ -133,7 +158,7 @@ pub fn row_width_for_dtype(dtype: &DType) -> VortexResult { /// variants land in later commits. pub fn field_size( canonical: &Canonical, - _field: SortField, + field: SortField, sizes: &mut [u32], ctx: &mut ExecutionCtx, ) -> VortexResult<()> { @@ -143,10 +168,10 @@ pub fn field_size( Canonical::Primitive(arr) => add_size_primitive(arr, sizes), Canonical::Decimal(arr) => add_size_decimal(arr, sizes), Canonical::VarBinView(arr) => add_size_varbinview(arr, sizes, ctx)?, - Canonical::Struct(_) - | Canonical::FixedSizeList(_) - | Canonical::Extension(_) - | Canonical::List(_) => vortex_bail!( + Canonical::Struct(arr) => add_size_struct(arr, field, sizes, ctx)?, + Canonical::FixedSizeList(arr) => add_size_fsl(arr, field, sizes, ctx)?, + Canonical::Extension(arr) => add_size_extension(arr, field, sizes, ctx)?, + Canonical::List(_) => vortex_bail!( "row encoding does not yet support canonical type {:?}", canonical.dtype() ), @@ -177,10 +202,10 @@ pub fn field_encode( Canonical::Primitive(arr) => encode_primitive(arr, field, offsets, cursors, out, ctx)?, Canonical::Decimal(arr) => encode_decimal(arr, field, offsets, cursors, out, ctx)?, Canonical::VarBinView(arr) => encode_varbinview(arr, field, offsets, cursors, out, ctx)?, - Canonical::Struct(_) - | Canonical::FixedSizeList(_) - | Canonical::Extension(_) - | Canonical::List(_) => vortex_bail!( + Canonical::Struct(arr) => encode_struct(arr, field, offsets, cursors, out, ctx)?, + Canonical::FixedSizeList(arr) => encode_fsl(arr, field, offsets, cursors, out, ctx)?, + Canonical::Extension(arr) => encode_extension(arr, field, offsets, cursors, out, ctx)?, + Canonical::List(_) => vortex_bail!( "row encoding does not yet support canonical type {:?}", canonical.dtype() ), @@ -234,6 +259,60 @@ fn add_size_varbinview( Ok(()) } +fn add_size_struct( + arr: &StructArray, + field: SortField, + sizes: &mut [u32], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + // null sentinel: 1 byte per row. + for s in sizes.iter_mut() { + *s += 1; + } + // Each field adds its own per-row size. + for child in arr.iter_unmasked_fields() { + let canonical = child.clone().execute::(ctx)?; + field_size(&canonical, field, sizes, ctx)?; + } + Ok(()) +} + +fn add_size_fsl( + arr: &FixedSizeListArray, + field: SortField, + sizes: &mut [u32], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let n = arr.len(); + debug_assert_eq!(n, sizes.len()); + let list_size = arr.list_size() as usize; + let elements = arr.elements().clone().execute::(ctx)?; + debug_assert_eq!(elements.len(), n * list_size); + // Sizing: 1 sentinel + sum of element sizes (`list_size` per row). + // We compute element-wise sizes into a contiguous scratch buffer then reduce by row. + let mut elem_sizes = vec![0u32; n * list_size]; + field_size(&elements, field, &mut elem_sizes, ctx)?; + for i in 0..n { + let mut sum: u32 = 1; // sentinel + let base = i * list_size; + for j in 0..list_size { + sum = sum.saturating_add(elem_sizes[base + j]); + } + sizes[i] += sum; + } + Ok(()) +} + +fn add_size_extension( + arr: &ExtensionArray, + field: SortField, + sizes: &mut [u32], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let storage = arr.storage_array().clone().execute::(ctx)?; + field_size(&storage, field, sizes, ctx) +} + fn encode_null( arr: &NullArray, field: SortField, @@ -413,6 +492,132 @@ fn encode_varbinview( Ok(()) } +fn encode_struct( + arr: &StructArray, + field: SortField, + row_offsets: &[u32], + col_offset: &mut [u32], + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let n = arr.len(); + let mask = arr.as_ref().validity()?.execute_mask(n, ctx)?; + let non_null = field.non_null_sentinel(); + let null = field.null_sentinel(); + + // First, write the sentinel for each row. We track the post-sentinel cursor offsets + // for the body in `body_cursors` (which start exactly at +1 of the input cursor). + // For null rows we additionally need to zero-fill the (uniform-width) field bytes, + // but because struct widths are variable in general, we record null indexes first + // and zero-fill after we know each row's contribution. + // + // To keep the implementation simple we: + // 1) advance the cursor past the sentinel, + // 2) recursively encode each field's bytes (the field encoders ignore nullness of + // the struct, but use their own per-field nullness), + // 3) for null struct rows, overwrite the body bytes with zeros so the encoded form + // depends only on the sentinel. + let body_start: Vec = (0..n).map(|i| col_offset[i] + 1).collect(); + for i in 0..n { + let pos = (row_offsets[i] + col_offset[i]) as usize; + out[pos] = if mask.value(i) { non_null } else { null }; + col_offset[i] += 1; + } + + for child in arr.iter_unmasked_fields() { + let canonical = child.clone().execute::(ctx)?; + field_encode(&canonical, field, row_offsets, col_offset, out, ctx)?; + } + + // Zero-fill body bytes of null rows (the field encoders may have written values). + for i in 0..n { + if !mask.value(i) { + let start = (row_offsets[i] + body_start[i]) as usize; + let end = (row_offsets[i] + col_offset[i]) as usize; + for b in &mut out[start..end] { + *b = 0; + } + } + } + + Ok(()) +} + +fn encode_fsl( + arr: &FixedSizeListArray, + field: SortField, + row_offsets: &[u32], + col_offset: &mut [u32], + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let n = arr.len(); + let list_size = arr.list_size() as usize; + let mask = arr.as_ref().validity()?.execute_mask(n, ctx)?; + let non_null = field.non_null_sentinel(); + let null = field.null_sentinel(); + let elements = arr.elements().clone().execute::(ctx)?; + debug_assert_eq!(elements.len(), n * list_size); + + // Write sentinels and remember body start for null zero-fill. + let body_start: Vec = (0..n).map(|i| col_offset[i] + 1).collect(); + for i in 0..n { + let pos = (row_offsets[i] + col_offset[i]) as usize; + out[pos] = if mask.value(i) { non_null } else { null }; + col_offset[i] += 1; + } + + // Encode all `n * list_size` elements into the body. Build a fresh + // (offsets, cursors) pair where each element gets one slot. Then sum bytes back + // into the parent col_offset. + let mut elem_sizes = vec![0u32; n * list_size]; + field_size(&elements, field, &mut elem_sizes, ctx)?; + // Element offsets are sequential starting at each parent's current cursor position. + let mut elem_offsets = vec![0u32; n * list_size]; + for i in 0..n { + let mut acc = row_offsets[i] + col_offset[i]; + for j in 0..list_size { + elem_offsets[i * list_size + j] = acc; + acc = acc.saturating_add(elem_sizes[i * list_size + j]); + } + } + let mut elem_cursors = vec![0u32; n * list_size]; + field_encode(&elements, field, &elem_offsets, &mut elem_cursors, out, ctx)?; + // Advance the parent cursors by the total per-row element bytes. + for i in 0..n { + let mut sum: u32 = 0; + for j in 0..list_size { + sum = sum.saturating_add(elem_sizes[i * list_size + j]); + } + col_offset[i] = col_offset[i].saturating_add(sum); + } + + // Zero-fill null bodies. + for i in 0..n { + if !mask.value(i) { + let start = (row_offsets[i] + body_start[i]) as usize; + let end = (row_offsets[i] + col_offset[i]) as usize; + for b in &mut out[start..end] { + *b = 0; + } + } + } + + Ok(()) +} + +fn encode_extension( + arr: &ExtensionArray, + field: SortField, + row_offsets: &[u32], + col_offset: &mut [u32], + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let storage = arr.storage_array().clone().execute::(ctx)?; + field_encode(&storage, field, row_offsets, col_offset, out, ctx) +} + /// Encode a variable-length byte slice into `out` in 32-byte blocks with /// continuation markers. Returns the number of bytes written. fn encode_varlen_value(bytes: &[u8], out: &mut [u8], descending: bool) -> u32 {