diff --git a/vortex-row/public-api.lock b/vortex-row/public-api.lock index 4990e30ba16..1afc1f05442 100644 --- a/vortex-row/public-api.lock +++ b/vortex-row/public-api.lock @@ -92,6 +92,8 @@ pub fn vortex_row::codec::encode_scalar_null(vortex_row::options::SortField, boo pub fn vortex_row::codec::encode_scalar_primitive(vortex_array::dtype::ptype::PType, vortex_array::scalar::typed_view::primitive::pvalue::PValue, vortex_row::options::SortField, bool, &mut vortex_buffer::ByteBufferMut) -> vortex_error::VortexResult<()> +pub fn vortex_row::codec::encode_scalar_varlen(core::option::Option<&[u8]>, vortex_row::options::SortField, &mut vortex_buffer::ByteBufferMut) + pub fn vortex_row::codec::encoded_size_for_scalar(&vortex_array::scalar::Scalar, vortex_row::options::SortField) -> vortex_error::VortexResult pub fn vortex_row::codec::field_encode(&vortex_array::canonical::Canonical, vortex_row::options::SortField, &[u32], &mut [u32], &mut [u8], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()> diff --git a/vortex-row/src/codec.rs b/vortex-row/src/codec.rs index 73aa7a37db4..4f70d80e5ae 100644 --- a/vortex-row/src/codec.rs +++ b/vortex-row/src/codec.rs @@ -27,10 +27,12 @@ use vortex_array::Canonical; use vortex_array::ExecutionCtx; +use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::BoolArray; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::NullArray; use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinViewArray; use vortex_array::dtype::DType; use vortex_array::dtype::DecimalType; use vortex_array::dtype::NativePType; @@ -53,10 +55,6 @@ pub const VARLEN_BLOCK_TOTAL: usize = VARLEN_BLOCK_SIZE + 1; /// Returns the size in bytes of the encoded form of a variable-length value of the given length. #[inline] -#[allow( - dead_code, - reason = "used once varlen support lands in a follow-up commit" -)] fn encoded_size_for_varlen(len: usize) -> u32 { // 1 sentinel + ceil(len/32)*33 content bytes (or 1 zero terminator if empty) if len == 0 { @@ -113,9 +111,7 @@ pub fn row_width_for_dtype(dtype: &DType) -> VortexResult { vt.byte_width() as u32 ))) } - DType::Utf8(_) | DType::Binary(_) => { - vortex_bail!("row encoding for {} is not yet supported", dtype) - } + DType::Utf8(_) | DType::Binary(_) => Ok(RowWidth::Variable), DType::Struct(..) | DType::FixedSizeList(..) | DType::List(..) | DType::Extension(..) => { vortex_bail!("row encoding for {} is not yet supported", dtype) } @@ -139,15 +135,15 @@ pub fn field_size( canonical: &Canonical, _field: SortField, sizes: &mut [u32], - _ctx: &mut ExecutionCtx, + ctx: &mut ExecutionCtx, ) -> VortexResult<()> { match canonical { Canonical::Null(arr) => add_size_null(arr, sizes), Canonical::Bool(_) => add_size_const(sizes, encoded_size_for_fixed(1)), Canonical::Primitive(arr) => add_size_primitive(arr, sizes), Canonical::Decimal(arr) => add_size_decimal(arr, sizes), - Canonical::VarBinView(_) - | Canonical::Struct(_) + Canonical::VarBinView(arr) => add_size_varbinview(arr, sizes, ctx)?, + Canonical::Struct(_) | Canonical::FixedSizeList(_) | Canonical::Extension(_) | Canonical::List(_) => vortex_bail!( @@ -180,8 +176,8 @@ pub fn field_encode( Canonical::Bool(arr) => encode_bool(arr, field, offsets, cursors, out, ctx)?, Canonical::Primitive(arr) => encode_primitive(arr, field, offsets, cursors, out, ctx)?, Canonical::Decimal(arr) => encode_decimal(arr, field, offsets, cursors, out, ctx)?, - Canonical::VarBinView(_) - | Canonical::Struct(_) + Canonical::VarBinView(arr) => encode_varbinview(arr, field, offsets, cursors, out, ctx)?, + Canonical::Struct(_) | Canonical::FixedSizeList(_) | Canonical::Extension(_) | Canonical::List(_) => vortex_bail!( @@ -219,6 +215,25 @@ fn add_size_decimal(arr: &DecimalArray, sizes: &mut [u32]) { add_size_const(sizes, encoded_size_for_fixed(width)); } +fn add_size_varbinview( + arr: &VarBinViewArray, + sizes: &mut [u32], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let mask = arr.as_ref().validity()?.execute_mask(arr.len(), ctx)?; + let views = arr.views(); + for (i, view) in views.iter().enumerate() { + let valid = mask.value(i); + if !valid { + sizes[i] += 1; // sentinel only + } else { + let len = view.len() as usize; + sizes[i] += encoded_size_for_varlen(len); + } + } + Ok(()) +} + fn encode_null( arr: &NullArray, field: SortField, @@ -369,6 +384,69 @@ fn encode_decimal_typed( } } +fn encode_varbinview( + arr: &VarBinViewArray, + field: SortField, + row_offsets: &[u32], + col_offset: &mut [u32], + out: &mut [u8], + ctx: &mut ExecutionCtx, +) -> VortexResult<()> { + let mask = arr.as_ref().validity()?.execute_mask(arr.len(), ctx)?; + let non_null = field.non_null_sentinel(); + let null = field.null_sentinel(); + + arr.with_iterator(|iter| { + for (i, maybe) in iter.enumerate() { + let pos = (row_offsets[i] + col_offset[i]) as usize; + if !mask.value(i) { + out[pos] = null; + col_offset[i] += 1; + continue; + } + let bytes: &[u8] = maybe.unwrap_or(&[]); + out[pos] = non_null; + let written = encode_varlen_value(bytes, &mut out[pos + 1..], field.descending); + col_offset[i] += 1 + written; + } + }); + Ok(()) +} + +/// Encode a variable-length byte slice into `out` in 32-byte blocks with +/// continuation markers. Returns the number of bytes written. +fn encode_varlen_value(bytes: &[u8], out: &mut [u8], descending: bool) -> u32 { + let xor = if descending { 0xFFu8 } else { 0x00 }; + if bytes.is_empty() { + // Single zero terminator. + out[0] = xor; + return 1; + } + let mut written = 0usize; + let mut remaining = bytes; + while remaining.len() > VARLEN_BLOCK_SIZE { + // Full block, continuation marker 0xFF (then XORed if descending). + let block = &remaining[..VARLEN_BLOCK_SIZE]; + for (i, &b) in block.iter().enumerate() { + out[written + i] = b ^ xor; + } + out[written + VARLEN_BLOCK_SIZE] = 0xFF ^ xor; + written += VARLEN_BLOCK_TOTAL; + remaining = &remaining[VARLEN_BLOCK_SIZE..]; + } + // Final partial block: pad with zeros, last byte = remaining.len() (1..=32). + let n = remaining.len(); + for (i, &b) in remaining.iter().enumerate() { + out[written + i] = b ^ xor; + } + for j in n..VARLEN_BLOCK_SIZE { + out[written + j] = xor; + } + out[written + VARLEN_BLOCK_SIZE] = (n as u8) ^ xor; + written += VARLEN_BLOCK_TOTAL; + written as u32 +} + /// Internal trait for encoding a fixed-width native value into byte slots. /// /// Implementations must produce a sequence of `size_of::()` bytes that is @@ -507,6 +585,27 @@ pub fn encode_scalar_primitive( Ok(()) } +/// Encode a single varlen value into a buffer. +pub fn encode_scalar_varlen(value: Option<&[u8]>, field: SortField, out: &mut ByteBufferMut) { + match value { + None => out.push(field.null_sentinel()), + Some(bytes) => { + out.push(field.non_null_sentinel()); + let needed = if bytes.is_empty() { + 1 + } else { + bytes.len().div_ceil(VARLEN_BLOCK_SIZE) * VARLEN_BLOCK_TOTAL + }; + let start = out.len(); + for _ in 0..needed { + out.push(0); + } + let written = encode_varlen_value(bytes, &mut out[start..], field.descending); + debug_assert_eq!(written as usize, needed); + } + } +} + /// Encode a single boolean value. pub fn encode_scalar_bool(value: Option, field: SortField, out: &mut ByteBufferMut) { match value { @@ -546,6 +645,7 @@ pub fn encoded_size_for_scalar( let vt = DecimalType::smallest_decimal_value_type(dt); Ok(encoded_size_for_fixed(vt.byte_width() as u32)) } + DType::Utf8(_) | DType::Binary(_) => Ok(1), _ => vortex_bail!( "unsupported scalar dtype for row encoding: {}", scalar.dtype() @@ -564,6 +664,18 @@ pub fn encoded_size_for_scalar( .unwrap_or(DecimalType::I128); Ok(encoded_size_for_fixed(vt.byte_width() as u32)) } + DType::Utf8(_) => { + let bs = scalar + .as_utf8() + .value() + .map(|s| s.as_str().len()) + .unwrap_or(0); + Ok(encoded_size_for_varlen(bs)) + } + DType::Binary(_) => { + let bs = scalar.as_binary().value().map(|b| b.len()).unwrap_or(0); + Ok(encoded_size_for_varlen(bs)) + } _ => vortex_bail!( "unsupported scalar dtype for row encoding: {}", scalar.dtype() @@ -601,6 +713,7 @@ pub fn encode_scalar( out.push(0); } } + DType::Utf8(_) | DType::Binary(_) => out.push(field.null_sentinel()), _ => vortex_bail!( "unsupported scalar dtype for row encoding: {}", scalar.dtype() @@ -657,6 +770,16 @@ pub fn encode_scalar( } } } + DType::Utf8(_) => { + let v = scalar.as_utf8(); + let bytes = v.value().map(|s| s.as_str().as_bytes()).unwrap_or(&[]); + encode_scalar_varlen(Some(bytes), field, &mut out); + } + DType::Binary(_) => { + let v = scalar.as_binary(); + let bytes = v.value().map(|b| b.as_slice()).unwrap_or(&[]); + encode_scalar_varlen(Some(bytes), field, &mut out); + } _ => vortex_bail!( "unsupported scalar dtype for row encoding: {}", scalar.dtype()