Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 216 additions & 11 deletions vortex-row/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@ use vortex_array::ExecutionCtx;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::DecimalArray;
use vortex_array::arrays::ExtensionArray;
use vortex_array::arrays::FixedSizeListArray;
use vortex_array::arrays::NullArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::StructArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::extension::ExtensionArrayExt;
use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
use vortex_array::arrays::struct_::StructArrayExt;
use vortex_array::dtype::DType;
use vortex_array::dtype::DecimalType;
use vortex_array::dtype::NativePType;
Expand Down Expand Up @@ -112,9 +118,28 @@ pub fn row_width_for_dtype(dtype: &DType) -> VortexResult<RowWidth> {
)))
}
DType::Utf8(_) | DType::Binary(_) => Ok(RowWidth::Variable),
DType::Struct(..) | DType::FixedSizeList(..) | DType::List(..) | DType::Extension(..) => {
vortex_bail!("row encoding for {} is not yet supported", dtype)
DType::FixedSizeList(elem, n, _) => match row_width_for_dtype(elem)? {
// FSL is fixed iff its element type is fixed. Add a sentinel byte for the FSL
// itself, then `n` copies of the element width.
RowWidth::Fixed(w) => {
let body = w.saturating_mul(*n);
Ok(RowWidth::Fixed(body.saturating_add(1)))
}
RowWidth::Variable => Ok(RowWidth::Variable),
},
DType::Struct(fields, _) => {
// Struct is fixed iff all its fields are fixed; sum their widths plus a sentinel.
let mut total: u32 = 1; // outer sentinel
for field_dtype in fields.fields() {
match row_width_for_dtype(&field_dtype)? {
RowWidth::Fixed(w) => total = total.saturating_add(w),
RowWidth::Variable => return Ok(RowWidth::Variable),
}
}
Ok(RowWidth::Fixed(total))
}
DType::List(..) => Ok(RowWidth::Variable),
DType::Extension(ext) => row_width_for_dtype(ext.storage_dtype()),
DType::Variant(_) => {
vortex_bail!("row encoding does not support Variant arrays (no well-defined ordering)")
}
Expand All @@ -133,7 +158,7 @@ pub fn row_width_for_dtype(dtype: &DType) -> VortexResult<RowWidth> {
/// variants land in later commits.
pub fn field_size(
canonical: &Canonical,
_field: SortField,
field: SortField,
sizes: &mut [u32],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
Expand All @@ -143,10 +168,10 @@ pub fn field_size(
Canonical::Primitive(arr) => add_size_primitive(arr, sizes),
Canonical::Decimal(arr) => add_size_decimal(arr, sizes),
Canonical::VarBinView(arr) => add_size_varbinview(arr, sizes, ctx)?,
Canonical::Struct(_)
| Canonical::FixedSizeList(_)
| Canonical::Extension(_)
| Canonical::List(_) => vortex_bail!(
Canonical::Struct(arr) => add_size_struct(arr, field, sizes, ctx)?,
Canonical::FixedSizeList(arr) => add_size_fsl(arr, field, sizes, ctx)?,
Canonical::Extension(arr) => add_size_extension(arr, field, sizes, ctx)?,
Canonical::List(_) => vortex_bail!(
"row encoding does not yet support canonical type {:?}",
canonical.dtype()
),
Expand Down Expand Up @@ -177,10 +202,10 @@ pub fn field_encode(
Canonical::Primitive(arr) => encode_primitive(arr, field, offsets, cursors, out, ctx)?,
Canonical::Decimal(arr) => encode_decimal(arr, field, offsets, cursors, out, ctx)?,
Canonical::VarBinView(arr) => encode_varbinview(arr, field, offsets, cursors, out, ctx)?,
Canonical::Struct(_)
| Canonical::FixedSizeList(_)
| Canonical::Extension(_)
| Canonical::List(_) => vortex_bail!(
Canonical::Struct(arr) => encode_struct(arr, field, offsets, cursors, out, ctx)?,
Canonical::FixedSizeList(arr) => encode_fsl(arr, field, offsets, cursors, out, ctx)?,
Canonical::Extension(arr) => encode_extension(arr, field, offsets, cursors, out, ctx)?,
Canonical::List(_) => vortex_bail!(
"row encoding does not yet support canonical type {:?}",
canonical.dtype()
),
Expand Down Expand Up @@ -234,6 +259,60 @@ fn add_size_varbinview(
Ok(())
}

fn add_size_struct(
arr: &StructArray,
field: SortField,
sizes: &mut [u32],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
// null sentinel: 1 byte per row.
for s in sizes.iter_mut() {
*s += 1;
}
// Each field adds its own per-row size.
for child in arr.iter_unmasked_fields() {
let canonical = child.clone().execute::<Canonical>(ctx)?;
field_size(&canonical, field, sizes, ctx)?;
}
Ok(())
}

fn add_size_fsl(
arr: &FixedSizeListArray,
field: SortField,
sizes: &mut [u32],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let n = arr.len();
debug_assert_eq!(n, sizes.len());
let list_size = arr.list_size() as usize;
let elements = arr.elements().clone().execute::<Canonical>(ctx)?;
debug_assert_eq!(elements.len(), n * list_size);
// Sizing: 1 sentinel + sum of element sizes (`list_size` per row).
// We compute element-wise sizes into a contiguous scratch buffer then reduce by row.
let mut elem_sizes = vec![0u32; n * list_size];
field_size(&elements, field, &mut elem_sizes, ctx)?;
for i in 0..n {
let mut sum: u32 = 1; // sentinel
let base = i * list_size;
for j in 0..list_size {
sum = sum.saturating_add(elem_sizes[base + j]);
}
sizes[i] += sum;
}
Ok(())
}

fn add_size_extension(
arr: &ExtensionArray,
field: SortField,
sizes: &mut [u32],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let storage = arr.storage_array().clone().execute::<Canonical>(ctx)?;
field_size(&storage, field, sizes, ctx)
}

fn encode_null(
arr: &NullArray,
field: SortField,
Expand Down Expand Up @@ -413,6 +492,132 @@ fn encode_varbinview(
Ok(())
}

fn encode_struct(
arr: &StructArray,
field: SortField,
row_offsets: &[u32],
col_offset: &mut [u32],
out: &mut [u8],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let n = arr.len();
let mask = arr.as_ref().validity()?.execute_mask(n, ctx)?;
let non_null = field.non_null_sentinel();
let null = field.null_sentinel();

// First, write the sentinel for each row. We track the post-sentinel cursor offsets
// for the body in `body_cursors` (which start exactly at +1 of the input cursor).
// For null rows we additionally need to zero-fill the (uniform-width) field bytes,
// but because struct widths are variable in general, we record null indexes first
// and zero-fill after we know each row's contribution.
//
// To keep the implementation simple we:
// 1) advance the cursor past the sentinel,
// 2) recursively encode each field's bytes (the field encoders ignore nullness of
// the struct, but use their own per-field nullness),
// 3) for null struct rows, overwrite the body bytes with zeros so the encoded form
// depends only on the sentinel.
let body_start: Vec<u32> = (0..n).map(|i| col_offset[i] + 1).collect();
for i in 0..n {
let pos = (row_offsets[i] + col_offset[i]) as usize;
out[pos] = if mask.value(i) { non_null } else { null };
col_offset[i] += 1;
}

for child in arr.iter_unmasked_fields() {
let canonical = child.clone().execute::<Canonical>(ctx)?;
field_encode(&canonical, field, row_offsets, col_offset, out, ctx)?;
}

// Zero-fill body bytes of null rows (the field encoders may have written values).
for i in 0..n {
if !mask.value(i) {
let start = (row_offsets[i] + body_start[i]) as usize;
let end = (row_offsets[i] + col_offset[i]) as usize;
for b in &mut out[start..end] {
*b = 0;
}
}
}

Ok(())
}

fn encode_fsl(
arr: &FixedSizeListArray,
field: SortField,
row_offsets: &[u32],
col_offset: &mut [u32],
out: &mut [u8],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let n = arr.len();
let list_size = arr.list_size() as usize;
let mask = arr.as_ref().validity()?.execute_mask(n, ctx)?;
let non_null = field.non_null_sentinel();
let null = field.null_sentinel();
let elements = arr.elements().clone().execute::<Canonical>(ctx)?;
debug_assert_eq!(elements.len(), n * list_size);

// Write sentinels and remember body start for null zero-fill.
let body_start: Vec<u32> = (0..n).map(|i| col_offset[i] + 1).collect();
for i in 0..n {
let pos = (row_offsets[i] + col_offset[i]) as usize;
out[pos] = if mask.value(i) { non_null } else { null };
col_offset[i] += 1;
}

// Encode all `n * list_size` elements into the body. Build a fresh
// (offsets, cursors) pair where each element gets one slot. Then sum bytes back
// into the parent col_offset.
let mut elem_sizes = vec![0u32; n * list_size];
field_size(&elements, field, &mut elem_sizes, ctx)?;
// Element offsets are sequential starting at each parent's current cursor position.
let mut elem_offsets = vec![0u32; n * list_size];
for i in 0..n {
let mut acc = row_offsets[i] + col_offset[i];
for j in 0..list_size {
elem_offsets[i * list_size + j] = acc;
acc = acc.saturating_add(elem_sizes[i * list_size + j]);
}
}
let mut elem_cursors = vec![0u32; n * list_size];
field_encode(&elements, field, &elem_offsets, &mut elem_cursors, out, ctx)?;
// Advance the parent cursors by the total per-row element bytes.
for i in 0..n {
let mut sum: u32 = 0;
for j in 0..list_size {
sum = sum.saturating_add(elem_sizes[i * list_size + j]);
}
col_offset[i] = col_offset[i].saturating_add(sum);
}

// Zero-fill null bodies.
for i in 0..n {
if !mask.value(i) {
let start = (row_offsets[i] + body_start[i]) as usize;
let end = (row_offsets[i] + col_offset[i]) as usize;
for b in &mut out[start..end] {
*b = 0;
}
}
}

Ok(())
}

fn encode_extension(
arr: &ExtensionArray,
field: SortField,
row_offsets: &[u32],
col_offset: &mut [u32],
out: &mut [u8],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let storage = arr.storage_array().clone().execute::<Canonical>(ctx)?;
field_encode(&storage, field, row_offsets, col_offset, out, ctx)
}

/// Encode a variable-length byte slice into `out` in 32-byte blocks with
/// continuation markers. Returns the number of bytes written.
fn encode_varlen_value(bytes: &[u8], out: &mut [u8], descending: bool) -> u32 {
Expand Down
Loading