Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions encodings/runend/benches/run_end_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

use divan::Bencher;
use itertools::repeat_n;
use vortex_array::DynArray;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::RecursiveCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::compute::warm_up_vtables;
use vortex_array::dtype::IntegerPType;
use vortex_array::validity::Validity;
Expand Down Expand Up @@ -76,8 +76,12 @@ fn decompress<T: IntegerPType>(bencher: Bencher, (length, run_step): (usize, usi
let array = run_end_array.into_array();

bencher
.with_inputs(|| &array)
.bench_refs(|array| array.to_canonical());
.with_inputs(|| (array.clone(), LEGACY_SESSION.create_execution_ctx()))
.bench_values(|(array, mut execution_ctx)| {
array
.execute::<RecursiveCanonical>(&mut execution_ctx)
.unwrap()
});
}

#[divan::bench(args = BENCH_ARGS)]
Expand Down Expand Up @@ -113,3 +117,26 @@ fn take_indices(bencher: Bencher, (length, run_step): (usize, usize)) {
.unwrap()
});
}

#[divan::bench(args = BENCH_ARGS)]
fn decompress_utf8(bencher: Bencher, (length, run_step): (usize, usize)) {
let num_runs = length.div_ceil(run_step);
let ends = (0..num_runs)
.map(|i| ((i + 1) * run_step).min(length) as u64)
.collect::<Buffer<_>>()
.into_array();

let values = VarBinViewArray::from_iter_str((0..num_runs).map(|i| format!("run_value_{i}")))
.into_array();

let run_end_array = RunEndArray::new(ends, values);
let array = run_end_array.into_array();

bencher
.with_inputs(|| (array.clone(), LEGACY_SESSION.create_execution_ctx()))
.bench_values(|(array, mut execution_ctx)| {
array
.execute::<RecursiveCanonical>(&mut execution_ctx)
.unwrap()
});
}
2 changes: 2 additions & 0 deletions encodings/runend/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub fn vortex_runend::compress::runend_decode_typed_bool(run_ends: impl core::it

pub fn vortex_runend::compress::runend_decode_typed_primitive<T: vortex_array::dtype::ptype::NativePType>(run_ends: impl core::iter::traits::iterator::Iterator<Item = usize>, values: &[T], values_validity: vortex_mask::Mask, values_nullability: vortex_array::dtype::nullability::Nullability, length: usize) -> vortex_array::arrays::primitive::array::PrimitiveArray

pub fn vortex_runend::compress::runend_decode_varbinview(ends: vortex_array::arrays::primitive::array::PrimitiveArray, values: vortex_array::arrays::varbinview::array::VarBinViewArray, offset: usize, length: usize) -> vortex_error::VortexResult<vortex_array::arrays::varbinview::array::VarBinViewArray>

pub fn vortex_runend::compress::runend_encode(array: &vortex_array::arrays::primitive::array::PrimitiveArray) -> (vortex_array::arrays::primitive::array::PrimitiveArray, vortex_array::array::ArrayRef)

pub struct vortex_runend::RunEndArray
Expand Down
3 changes: 1 addition & 2 deletions encodings/runend/src/arbitrary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ pub struct ArbitraryRunEndArray(pub RunEndArray);

impl<'a> Arbitrary<'a> for ArbitraryRunEndArray {
fn arbitrary(u: &mut Unstructured<'a>) -> Result<Self> {
// RunEnd supports Bool or Primitive types for values
// Pick a random primitive type for values
// Pick a random primitive type for values.
let ptype: PType = u.arbitrary()?;
let nullability: Nullability = u.arbitrary()?;
let dtype = DType::Primitive(ptype, nullability);
Expand Down
76 changes: 43 additions & 33 deletions encodings/runend/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::SerializeMetadata;
use vortex_array::arrays::PrimitiveVTable;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
Expand All @@ -39,6 +40,7 @@ use vortex_session::VortexSession;

use crate::compress::runend_decode_bools;
use crate::compress::runend_decode_primitive;
use crate::compress::runend_decode_varbinview;
use crate::compress::runend_encode;
use crate::kernel::PARENT_KERNELS;
use crate::rules::RULES;
Expand Down Expand Up @@ -239,12 +241,6 @@ impl RunEndArray {
"run ends must be unsigned integers, was {}",
ends.dtype(),
);
vortex_ensure!(
values.dtype().is_primitive() || values.dtype().is_boolean(),
"RunEnd array can only have Bool or Primitive values, {} given",
values.dtype()
);

vortex_ensure!(
ends.len() == values.len(),
"run ends len != run values len, {} != {}",
Expand Down Expand Up @@ -342,32 +338,7 @@ impl RunEndArray {
///
/// # Validation
///
/// The `ends` must be non-nullable unsigned integers. The values may be `Bool` or `Primitive`
/// types.
///
/// # Examples
///
/// ```
/// # use vortex_array::arrays::{BoolArray, VarBinViewArray};
/// # use vortex_array::IntoArray;
/// # use vortex_buffer::buffer;
/// # use vortex_runend::RunEndArray;
///
/// // Error to provide incorrectly-typed values!
/// let result = RunEndArray::try_new(
/// buffer![1u8, 2u8].into_array(),
/// VarBinViewArray::from_iter_str(["bad", "dtype"]).into_array(),
/// );
/// assert!(result.is_err());
///
/// // This array is happy
/// let result = RunEndArray::try_new(
/// buffer![1u8, 2u8].into_array(),
/// BoolArray::from_iter([false, true]).into_array(),
/// );
///
/// assert!(result.is_ok());
/// ```
/// The `ends` must be non-nullable unsigned integers.
pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
let length: usize = if ends.is_empty() {
0
Expand Down Expand Up @@ -510,6 +481,7 @@ pub(super) fn run_end_canonicalize(
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let pends = array.ends().clone().execute_as("ends", ctx)?;

Ok(match array.dtype() {
DType::Bool(_) => {
let bools = array.values().clone().execute_as("values", ctx)?;
Expand All @@ -519,13 +491,22 @@ pub(super) fn run_end_canonicalize(
let pvalues = array.values().clone().execute_as("values", ctx)?;
runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
}
_ => vortex_panic!("Only Primitive and Bool values are supported"),
DType::Utf8(_) | DType::Binary(_) => {
let values = array
.values()
.clone()
.execute_as::<VarBinViewArray>("values", ctx)?;
runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
}
_ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
})
}

#[cfg(test)]
mod tests {
use vortex_array::IntoArray;
use vortex_array::arrays::DictArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
Expand All @@ -552,4 +533,33 @@ mod tests {
let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
assert_arrays_eq!(arr.into_array(), expected);
}

#[test]
fn test_runend_utf8() {
let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
let arr = RunEndArray::new(buffer![2u32, 5, 10].into_array(), values);
assert_eq!(arr.len(), 10);
assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));

let expected =
VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
.into_array();
assert_arrays_eq!(arr.into_array(), expected);
}

#[test]
fn test_runend_dict() {
let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
let dict_codes = buffer![0u32, 1, 2].into_array();
let dict = DictArray::try_new(dict_codes, dict_values).unwrap();

let arr =
RunEndArray::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
assert_eq!(arr.len(), 10);

let expected =
VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
.into_array();
assert_arrays_eq!(arr.into_array(), expected);
}
}
62 changes: 57 additions & 5 deletions encodings/runend/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use vortex_array::ToCanonical;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::NativePType;
use vortex_array::dtype::Nullability;
use vortex_array::expr::stats::Precision;
Expand Down Expand Up @@ -204,13 +206,16 @@ pub fn runend_decode_bools(
}))
}

pub fn runend_decode_typed_primitive<T: NativePType>(
/// Decode a run-end encoded slice of values into a flat `Buffer<T>` and `Validity`.
///
/// This is the core decode loop shared by primitive and varbinview run-end decoding.
fn runend_decode_slice<T: Copy + Default>(
run_ends: impl Iterator<Item = usize>,
values: &[T],
values_validity: Mask,
values_nullability: Nullability,
length: usize,
) -> PrimitiveArray {
) -> (Buffer<T>, Validity) {
match values_validity {
Mask::AllTrue(_) => {
let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
Expand All @@ -225,9 +230,9 @@ pub fn runend_decode_typed_primitive<T: NativePType>(
// We preallocate enough capacity because we know the total length
unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
}
PrimitiveArray::new(decoded, values_nullability.into())
(decoded.into(), values_nullability.into())
}
Mask::AllFalse(_) => PrimitiveArray::new(Buffer::<T>::zeroed(length), Validity::AllInvalid),
Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
Mask::Values(mask) => {
let mut decoded = BufferMut::with_capacity(length);
let mut decoded_validity = BitBufferMut::with_capacity(length);
Expand Down Expand Up @@ -258,11 +263,28 @@ pub fn runend_decode_typed_primitive<T: NativePType>(
}
}
}
PrimitiveArray::new(decoded, Validity::from(decoded_validity.freeze()))
(decoded.into(), Validity::from(decoded_validity.freeze()))
}
}
}

pub fn runend_decode_typed_primitive<T: NativePType>(
run_ends: impl Iterator<Item = usize>,
values: &[T],
values_validity: Mask,
values_nullability: Nullability,
length: usize,
) -> PrimitiveArray {
let (decoded, validity) = runend_decode_slice(
run_ends,
values,
values_validity,
values_nullability,
length,
);
PrimitiveArray::new(decoded, validity)
}

pub fn runend_decode_typed_bool(
run_ends: impl Iterator<Item = usize>,
values: &BitBuffer,
Expand Down Expand Up @@ -304,6 +326,36 @@ pub fn runend_decode_typed_bool(
}
}

/// Decode a run-end encoded VarBinView array by expanding views directly.
pub fn runend_decode_varbinview(
ends: PrimitiveArray,
values: VarBinViewArray,
offset: usize,
length: usize,
) -> VortexResult<VarBinViewArray> {
let validity_mask = values.validity_mask()?;
let views = values.views();

let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
runend_decode_slice(
trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
views,
validity_mask,
values.dtype().nullability(),
length,
)
});

let parts = values.into_parts();
let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());

// SAFETY: we are expanding views from a valid VarBinViewArray with the same
// buffers, so all buffer indices and offsets remain valid.
Ok(unsafe {
VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
})
}

#[cfg(test)]
mod test {
use vortex_array::ToCanonical;
Expand Down
Loading