Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vortex-row/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub fn vortex_row::codec::encode_scalar_null(vortex_row::options::SortField, boo

pub fn vortex_row::codec::encode_scalar_primitive(vortex_array::dtype::ptype::PType, vortex_array::scalar::typed_view::primitive::pvalue::PValue, vortex_row::options::SortField, bool, &mut vortex_buffer::ByteBufferMut) -> vortex_error::VortexResult<()>

pub fn vortex_row::codec::encode_scalar_varlen(core::option::Option<&[u8]>, vortex_row::options::SortField, &mut vortex_buffer::ByteBufferMut)

pub fn vortex_row::codec::encoded_size_for_scalar(&vortex_array::scalar::Scalar, vortex_row::options::SortField) -> vortex_error::VortexResult<u32>

pub fn vortex_row::codec::field_encode(&vortex_array::canonical::Canonical, vortex_row::options::SortField, &[u32], &mut [u32], &mut [u8], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()>
Expand Down
147 changes: 135 additions & 12 deletions vortex-row/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@

use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::DecimalArray;
use vortex_array::arrays::NullArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::dtype::DType;
use vortex_array::dtype::DecimalType;
use vortex_array::dtype::NativePType;
Expand All @@ -53,10 +55,6 @@ pub const VARLEN_BLOCK_TOTAL: usize = VARLEN_BLOCK_SIZE + 1;

/// Returns the size in bytes of the encoded form of a variable-length value of the given length.
#[inline]
#[allow(
dead_code,
reason = "used once varlen support lands in a follow-up commit"
)]
fn encoded_size_for_varlen(len: usize) -> u32 {
// 1 sentinel + ceil(len/32)*33 content bytes (or 1 zero terminator if empty)
if len == 0 {
Expand Down Expand Up @@ -113,9 +111,7 @@ pub fn row_width_for_dtype(dtype: &DType) -> VortexResult<RowWidth> {
vt.byte_width() as u32
)))
}
DType::Utf8(_) | DType::Binary(_) => {
vortex_bail!("row encoding for {} is not yet supported", dtype)
}
DType::Utf8(_) | DType::Binary(_) => Ok(RowWidth::Variable),
DType::Struct(..) | DType::FixedSizeList(..) | DType::List(..) | DType::Extension(..) => {
vortex_bail!("row encoding for {} is not yet supported", dtype)
}
Expand All @@ -139,15 +135,15 @@ pub fn field_size(
canonical: &Canonical,
_field: SortField,
sizes: &mut [u32],
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
match canonical {
Canonical::Null(arr) => add_size_null(arr, sizes),
Canonical::Bool(_) => add_size_const(sizes, encoded_size_for_fixed(1)),
Canonical::Primitive(arr) => add_size_primitive(arr, sizes),
Canonical::Decimal(arr) => add_size_decimal(arr, sizes),
Canonical::VarBinView(_)
| Canonical::Struct(_)
Canonical::VarBinView(arr) => add_size_varbinview(arr, sizes, ctx)?,
Canonical::Struct(_)
| Canonical::FixedSizeList(_)
| Canonical::Extension(_)
| Canonical::List(_) => vortex_bail!(
Expand Down Expand Up @@ -180,8 +176,8 @@ pub fn field_encode(
Canonical::Bool(arr) => encode_bool(arr, field, offsets, cursors, out, ctx)?,
Canonical::Primitive(arr) => encode_primitive(arr, field, offsets, cursors, out, ctx)?,
Canonical::Decimal(arr) => encode_decimal(arr, field, offsets, cursors, out, ctx)?,
Canonical::VarBinView(_)
| Canonical::Struct(_)
Canonical::VarBinView(arr) => encode_varbinview(arr, field, offsets, cursors, out, ctx)?,
Canonical::Struct(_)
| Canonical::FixedSizeList(_)
| Canonical::Extension(_)
| Canonical::List(_) => vortex_bail!(
Expand Down Expand Up @@ -219,6 +215,25 @@ fn add_size_decimal(arr: &DecimalArray, sizes: &mut [u32]) {
add_size_const(sizes, encoded_size_for_fixed(width));
}

fn add_size_varbinview(
arr: &VarBinViewArray,
sizes: &mut [u32],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let mask = arr.as_ref().validity()?.execute_mask(arr.len(), ctx)?;
let views = arr.views();
for (i, view) in views.iter().enumerate() {
let valid = mask.value(i);
if !valid {
sizes[i] += 1; // sentinel only
} else {
let len = view.len() as usize;
sizes[i] += encoded_size_for_varlen(len);
}
}
Ok(())
}

fn encode_null(
arr: &NullArray,
field: SortField,
Expand Down Expand Up @@ -369,6 +384,69 @@ fn encode_decimal_typed<T>(
}
}

fn encode_varbinview(
arr: &VarBinViewArray,
field: SortField,
row_offsets: &[u32],
col_offset: &mut [u32],
out: &mut [u8],
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
let mask = arr.as_ref().validity()?.execute_mask(arr.len(), ctx)?;
let non_null = field.non_null_sentinel();
let null = field.null_sentinel();

arr.with_iterator(|iter| {
for (i, maybe) in iter.enumerate() {
let pos = (row_offsets[i] + col_offset[i]) as usize;
if !mask.value(i) {
out[pos] = null;
col_offset[i] += 1;
continue;
}
let bytes: &[u8] = maybe.unwrap_or(&[]);
out[pos] = non_null;
let written = encode_varlen_value(bytes, &mut out[pos + 1..], field.descending);
col_offset[i] += 1 + written;
}
});
Ok(())
}

/// Encode a variable-length byte slice into `out` in 32-byte blocks with
/// continuation markers. Returns the number of bytes written.
fn encode_varlen_value(bytes: &[u8], out: &mut [u8], descending: bool) -> u32 {
let xor = if descending { 0xFFu8 } else { 0x00 };
if bytes.is_empty() {
// Single zero terminator.
out[0] = xor;
return 1;
}
let mut written = 0usize;
let mut remaining = bytes;
while remaining.len() > VARLEN_BLOCK_SIZE {
// Full block, continuation marker 0xFF (then XORed if descending).
let block = &remaining[..VARLEN_BLOCK_SIZE];
for (i, &b) in block.iter().enumerate() {
out[written + i] = b ^ xor;
}
out[written + VARLEN_BLOCK_SIZE] = 0xFF ^ xor;
written += VARLEN_BLOCK_TOTAL;
remaining = &remaining[VARLEN_BLOCK_SIZE..];
}
// Final partial block: pad with zeros, last byte = remaining.len() (1..=32).
let n = remaining.len();
for (i, &b) in remaining.iter().enumerate() {
out[written + i] = b ^ xor;
}
for j in n..VARLEN_BLOCK_SIZE {
out[written + j] = xor;
}
out[written + VARLEN_BLOCK_SIZE] = (n as u8) ^ xor;
written += VARLEN_BLOCK_TOTAL;
written as u32
}

/// Internal trait for encoding a fixed-width native value into byte slots.
///
/// Implementations must produce a sequence of `size_of::<Self>()` bytes that is
Expand Down Expand Up @@ -507,6 +585,27 @@ pub fn encode_scalar_primitive(
Ok(())
}

/// Encode a single varlen value into a buffer.
pub fn encode_scalar_varlen(value: Option<&[u8]>, field: SortField, out: &mut ByteBufferMut) {
match value {
None => out.push(field.null_sentinel()),
Some(bytes) => {
out.push(field.non_null_sentinel());
let needed = if bytes.is_empty() {
1
} else {
bytes.len().div_ceil(VARLEN_BLOCK_SIZE) * VARLEN_BLOCK_TOTAL
};
let start = out.len();
for _ in 0..needed {
out.push(0);
}
let written = encode_varlen_value(bytes, &mut out[start..], field.descending);
debug_assert_eq!(written as usize, needed);
}
}
}

/// Encode a single boolean value.
pub fn encode_scalar_bool(value: Option<bool>, field: SortField, out: &mut ByteBufferMut) {
match value {
Expand Down Expand Up @@ -546,6 +645,7 @@ pub fn encoded_size_for_scalar(
let vt = DecimalType::smallest_decimal_value_type(dt);
Ok(encoded_size_for_fixed(vt.byte_width() as u32))
}
DType::Utf8(_) | DType::Binary(_) => Ok(1),
_ => vortex_bail!(
"unsupported scalar dtype for row encoding: {}",
scalar.dtype()
Expand All @@ -564,6 +664,18 @@ pub fn encoded_size_for_scalar(
.unwrap_or(DecimalType::I128);
Ok(encoded_size_for_fixed(vt.byte_width() as u32))
}
DType::Utf8(_) => {
let bs = scalar
.as_utf8()
.value()
.map(|s| s.as_str().len())
.unwrap_or(0);
Ok(encoded_size_for_varlen(bs))
}
DType::Binary(_) => {
let bs = scalar.as_binary().value().map(|b| b.len()).unwrap_or(0);
Ok(encoded_size_for_varlen(bs))
}
_ => vortex_bail!(
"unsupported scalar dtype for row encoding: {}",
scalar.dtype()
Expand Down Expand Up @@ -601,6 +713,7 @@ pub fn encode_scalar(
out.push(0);
}
}
DType::Utf8(_) | DType::Binary(_) => out.push(field.null_sentinel()),
_ => vortex_bail!(
"unsupported scalar dtype for row encoding: {}",
scalar.dtype()
Expand Down Expand Up @@ -657,6 +770,16 @@ pub fn encode_scalar(
}
}
}
DType::Utf8(_) => {
let v = scalar.as_utf8();
let bytes = v.value().map(|s| s.as_str().as_bytes()).unwrap_or(&[]);
encode_scalar_varlen(Some(bytes), field, &mut out);
}
DType::Binary(_) => {
let v = scalar.as_binary();
let bytes = v.value().map(|b| b.as_slice()).unwrap_or(&[]);
encode_scalar_varlen(Some(bytes), field, &mut out);
}
_ => vortex_bail!(
"unsupported scalar dtype for row encoding: {}",
scalar.dtype()
Expand Down
Loading