diff --git a/encodings/runend/benches/run_end_compress.rs b/encodings/runend/benches/run_end_compress.rs index cc57faf7c30..d30827f0bd8 100644 --- a/encodings/runend/benches/run_end_compress.rs +++ b/encodings/runend/benches/run_end_compress.rs @@ -56,7 +56,7 @@ fn compress(bencher: Bencher, (length, run_step): (usize, usize)) { bencher .with_inputs(|| &values) - .bench_refs(|values| runend_encode(values)); + .bench_refs(|values| runend_encode(values, &mut LEGACY_SESSION.create_execution_ctx())); } #[divan::bench(types = [u8, u16, u32, u64], args = BENCH_ARGS)] @@ -92,7 +92,7 @@ fn take_indices(bencher: Bencher, (length, run_step): (usize, usize)) { ); let source_array = PrimitiveArray::from_iter(0..(length as i32)).into_array(); - let (ends, values) = runend_encode(&values); + let (ends, values) = runend_encode(&values, &mut LEGACY_SESSION.create_execution_ctx()); let runend_array = RunEndArray::try_new(ends.into_array(), values) .unwrap() .to_array(); diff --git a/encodings/runend/public-api.lock b/encodings/runend/public-api.lock index b6dce876336..ffe51212fad 100644 --- a/encodings/runend/public-api.lock +++ b/encodings/runend/public-api.lock @@ -10,13 +10,13 @@ pub fn vortex_runend::compress::runend_decode_typed_bool(run_ends: impl core::it pub fn vortex_runend::compress::runend_decode_typed_primitive(run_ends: impl core::iter::traits::iterator::Iterator, values: &[T], values_validity: vortex_mask::Mask, values_nullability: vortex_array::dtype::nullability::Nullability, length: usize) -> vortex_array::arrays::primitive::array::PrimitiveArray -pub fn vortex_runend::compress::runend_encode(array: &vortex_array::arrays::primitive::array::PrimitiveArray) -> (vortex_array::arrays::primitive::array::PrimitiveArray, vortex_array::array::ArrayRef) +pub fn vortex_runend::compress::runend_encode(array: &vortex_array::arrays::primitive::array::PrimitiveArray, ctx: &mut vortex_array::executor::ExecutionCtx) -> (vortex_array::arrays::primitive::array::PrimitiveArray, vortex_array::array::ArrayRef) pub struct vortex_runend::RunEndArray impl vortex_runend::RunEndArray -pub fn vortex_runend::RunEndArray::encode(array: vortex_array::array::ArrayRef) -> vortex_error::VortexResult +pub fn vortex_runend::RunEndArray::encode(array: vortex_array::array::ArrayRef, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_runend::RunEndArray::ends(&self) -> &vortex_array::array::ArrayRef diff --git a/encodings/runend/src/array.rs b/encodings/runend/src/array.rs index 65d2c57cb5d..9ec90bdeb3b 100644 --- a/encodings/runend/src/array.rs +++ b/encodings/runend/src/array.rs @@ -434,9 +434,9 @@ impl RunEndArray { } /// Run the array through run-end encoding. - pub fn encode(array: ArrayRef) -> VortexResult { + pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { if let Some(parray) = array.as_opt::() { - let (ends, values) = runend_encode(parray); + let (ends, values) = runend_encode(parray, ctx); // SAFETY: runend_encode handles this unsafe { Ok(Self::new_unchecked( diff --git a/encodings/runend/src/arrow.rs b/encodings/runend/src/arrow.rs index 3e833131e1e..27884df665b 100644 --- a/encodings/runend/src/arrow.rs +++ b/encodings/runend/src/arrow.rs @@ -255,6 +255,7 @@ mod tests { fn test_sliced_runend_to_arrow_ree() -> VortexResult<()> { let array = RunEndArray::encode( PrimitiveArray::from_iter(vec![10i32, 10, 20, 20, 20, 30, 30]).into_array(), + &mut SESSION.create_execution_ctx(), )?; // Slicing from index 1 produces a non-zero offset in the RunEndArray. let sliced = array.into_array().slice(1..5)?; diff --git a/encodings/runend/src/compress.rs b/encodings/runend/src/compress.rs index 94f42281f48..9df92309a5e 100644 --- a/encodings/runend/src/compress.rs +++ b/encodings/runend/src/compress.rs @@ -3,6 +3,7 @@ use itertools::Itertools; use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::BoolArray; @@ -29,7 +30,7 @@ use vortex_mask::Mask; use crate::iter::trimmed_ends_iter; /// Run-end encode a `PrimitiveArray`, returning a tuple of `(ends, values)`. -pub fn runend_encode(array: &PrimitiveArray) -> (PrimitiveArray, ArrayRef) { +pub fn runend_encode(array: &PrimitiveArray, ctx: &mut ExecutionCtx) -> (PrimitiveArray, ArrayRef) { let validity = match array.validity() { Validity::NonNullable => None, Validity::AllValid => None, @@ -69,9 +70,8 @@ pub fn runend_encode(array: &PrimitiveArray) -> (PrimitiveArray, ArrayRef) { }; let ends = ends - .narrow() - .vortex_expect("Ends must succeed downcasting") - .to_primitive(); + .narrow(ctx) + .vortex_expect("Ends must succeed downcasting"); ends.statistics() .set(Stat::IsStrictSorted, Precision::Exact(true.into())); @@ -306,7 +306,9 @@ pub fn runend_decode_typed_bool( #[cfg(test)] mod test { + use vortex_array::LEGACY_SESSION; use vortex_array::ToCanonical; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; use vortex_array::validity::Validity; @@ -320,7 +322,7 @@ mod test { #[test] fn encode() { let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]); - let (ends, values) = runend_encode(&arr); + let (ends, values) = runend_encode(&arr, &mut LEGACY_SESSION.create_execution_ctx()); let values = values.to_primitive(); let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]); @@ -337,7 +339,7 @@ mod test { true, true, false, false, true, true, true, true, false, false, ])), ); - let (ends, values) = runend_encode(&arr); + let (ends, values) = runend_encode(&arr, &mut LEGACY_SESSION.create_execution_ctx()); let values = values.to_primitive(); let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]); @@ -353,7 +355,7 @@ mod test { buffer![0, 0, 0, 0, 0], Validity::from(BitBuffer::new_unset(5)), ); - let (ends, values) = runend_encode(&arr); + let (ends, values) = runend_encode(&arr, &mut LEGACY_SESSION.create_execution_ctx()); let values = values.to_primitive(); let expected_ends = PrimitiveArray::from_iter(vec![5u64]); diff --git a/encodings/runend/src/compute/compare.rs b/encodings/runend/src/compute/compare.rs index a364f0be719..8d1e3146ebb 100644 --- a/encodings/runend/src/compute/compare.rs +++ b/encodings/runend/src/compute/compare.rs @@ -47,6 +47,8 @@ impl CompareKernel for RunEndVTable { #[cfg(test)] mod test { use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::BoolArray; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::PrimitiveArray; @@ -59,6 +61,7 @@ mod test { fn ree_array() -> RunEndArray { RunEndArray::encode( PrimitiveArray::from_iter([1, 1, 1, 4, 4, 4, 2, 2, 5, 5, 5, 5]).into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ) .unwrap() } diff --git a/encodings/runend/src/compute/filter.rs b/encodings/runend/src/compute/filter.rs index a85c694f5ad..6eaeddd1fd6 100644 --- a/encodings/runend/src/compute/filter.rs +++ b/encodings/runend/src/compute/filter.rs @@ -117,6 +117,8 @@ fn filter_run_end_primitive + AsPrimitiv mod tests { use vortex_array::Array; use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; use vortex_error::VortexResult; @@ -127,6 +129,7 @@ mod tests { fn ree_array() -> RunEndArray { RunEndArray::encode( PrimitiveArray::from_iter([1, 1, 1, 4, 4, 4, 2, 2, 5, 5, 5, 5]).into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ) .unwrap() } diff --git a/encodings/runend/src/compute/mod.rs b/encodings/runend/src/compute/mod.rs index 0cecb5b08f2..dff583567dd 100644 --- a/encodings/runend/src/compute/mod.rs +++ b/encodings/runend/src/compute/mod.rs @@ -15,6 +15,8 @@ pub(crate) mod take_from; mod tests { use rstest::rstest; use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::compute::conformance::consistency::test_array_consistency; use vortex_buffer::buffer; @@ -24,24 +26,30 @@ mod tests { #[rstest] // Simple run-end arrays #[case::runend_i32(RunEndArray::encode( - buffer![1i32, 1, 1, 2, 2, 3, 3, 3, 3].into_array() + buffer![1i32, 1, 1, 2, 2, 3, 3, 3, 3].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ).unwrap())] #[case::runend_single_run(RunEndArray::encode( - buffer![5i32, 5, 5, 5, 5].into_array() + buffer![5i32, 5, 5, 5, 5].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ).unwrap())] #[case::runend_alternating(RunEndArray::encode( - buffer![1i32, 2, 1, 2, 1, 2].into_array() + buffer![1i32, 2, 1, 2, 1, 2].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ).unwrap())] // Different types #[case::runend_u64(RunEndArray::encode( - buffer![100u64, 100, 200, 200, 200].into_array() + buffer![100u64, 100, 200, 200, 200].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ).unwrap())] // Edge cases #[case::runend_single(RunEndArray::encode( - buffer![42i32].into_array() + buffer![42i32].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ).unwrap())] #[case::runend_large(RunEndArray::encode( - PrimitiveArray::from_iter((0..1000).map(|i| i / 10)).into_array() + PrimitiveArray::from_iter((0..1000).map(|i| i / 10)).into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ).unwrap())] fn test_runend_consistency(#[case] array: RunEndArray) { diff --git a/encodings/runend/src/compute/take.rs b/encodings/runend/src/compute/take.rs index 0b09f831b4c..a9a93fae5c2 100644 --- a/encodings/runend/src/compute/take.rs +++ b/encodings/runend/src/compute/take.rs @@ -104,7 +104,11 @@ mod test { use crate::RunEndArray; fn ree_array() -> RunEndArray { - RunEndArray::encode(buffer![1, 1, 1, 4, 4, 4, 2, 2, 5, 5, 5, 5].into_array()).unwrap() + RunEndArray::encode( + buffer![1, 1, 1, 4, 4, 4, 2, 2, 5, 5, 5, 5].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap() } #[test] @@ -154,6 +158,7 @@ mod test { #[case(ree_array())] #[case(RunEndArray::encode( buffer![1u8, 1, 2, 2, 2, 3, 3, 3, 3, 4].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ).unwrap())] #[case(RunEndArray::encode( PrimitiveArray::from_option_iter([ @@ -166,11 +171,15 @@ mod test { Some(20), ]) .into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ).unwrap())] + #[case(RunEndArray::encode( + buffer![42i32, 42, 42, 42, 42].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ).unwrap())] - #[case(RunEndArray::encode(buffer![42i32, 42, 42, 42, 42].into_array()) - .unwrap())] #[case(RunEndArray::encode( buffer![1i32, 2, 3, 4, 5, 6, 7, 8, 9, 10].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ).unwrap())] #[case({ let mut values = Vec::new(); @@ -179,7 +188,10 @@ mod test { values.push(i); } } - RunEndArray::encode(PrimitiveArray::from_iter(values).into_array()).unwrap() + RunEndArray::encode( + PrimitiveArray::from_iter(values).into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ).unwrap() })] fn test_take_runend_conformance(#[case] array: RunEndArray) { test_take_conformance(&array.to_array()); @@ -190,6 +202,7 @@ mod test { #[case({ let array = RunEndArray::encode( buffer![1i32, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ) .unwrap(); array.slice(2..8).unwrap() diff --git a/encodings/runend/src/compute/take_from.rs b/encodings/runend/src/compute/take_from.rs index 1611dc3ba37..f1319b3bb66 100644 --- a/encodings/runend/src/compute/take_from.rs +++ b/encodings/runend/src/compute/take_from.rs @@ -55,6 +55,8 @@ mod tests { use vortex_array::Array; use vortex_array::ExecutionCtx; use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::DictArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; @@ -73,7 +75,11 @@ mod tests { /// Codes: `[0, 0, 0, 1, 1, 0, 0]` /// RunEnd encoded codes: ends=`[3, 5, 7]`, values=`[0, 1, 0]` fn make_dict_with_runend_codes() -> (RunEndArray, DictArray) { - let codes = RunEndArray::encode(buffer![0u32, 0, 0, 1, 1, 0, 0].into_array()).unwrap(); + let codes = RunEndArray::encode( + buffer![0u32, 0, 0, 1, 1, 0, 0].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let values = buffer![2i32, 3].into_array(); let dict = DictArray::try_new(codes.clone().into_array(), values).unwrap(); (codes, dict) diff --git a/encodings/runend/src/ops.rs b/encodings/runend/src/ops.rs index eb089d75c96..af8f4772b3c 100644 --- a/encodings/runend/src/ops.rs +++ b/encodings/runend/src/ops.rs @@ -44,6 +44,8 @@ mod tests { use vortex_array::Array; use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; use vortex_array::compute::Cost; @@ -151,10 +153,13 @@ mod tests { #[test] fn ree_scalar_at_end() { - let scalar = RunEndArray::encode(buffer![1, 1, 1, 4, 4, 4, 2, 2, 5, 5, 5, 5].into_array()) - .unwrap() - .scalar_at(11) - .unwrap(); + let scalar = RunEndArray::encode( + buffer![1, 1, 1, 4, 4, 4, 2, 2, 5, 5, 5, 5].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap() + .scalar_at(11) + .unwrap(); assert_eq!(scalar, 5.into()); } diff --git a/vortex-array/benches/dict_compare.rs b/vortex-array/benches/dict_compare.rs index 9eac47ff563..9261c7a2be1 100644 --- a/vortex-array/benches/dict_compare.rs +++ b/vortex-array/benches/dict_compare.rs @@ -6,6 +6,7 @@ use std::str::from_utf8; use vortex_array::Canonical; +use vortex_array::LEGACY_SESSION; use vortex_array::RecursiveCanonical; use vortex_array::VortexSessionExecute; use vortex_array::accessor::ArrayAccessor; @@ -49,7 +50,11 @@ const LENGTH_AND_UNIQUE_VALUES: &[(usize, usize)] = &[ #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { let primitive_arr = gen_primitive_for_dict::(len, uniqueness); - let dict = dict_encode(&primitive_arr.to_array()).unwrap(); + let dict = dict_encode( + &primitive_arr.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let value = primitive_arr.as_slice::()[0]; let session = VortexSession::empty(); @@ -67,7 +72,11 @@ fn bench_compare_primitive(bencher: divan::Bencher, (len, uniqueness): (usize, u #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { let varbin_arr = VarBinArray::from(gen_varbin_words(len, uniqueness)); - let dict = dict_encode(&varbin_arr.to_array()).unwrap(); + let dict = dict_encode( + &varbin_arr.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); let value = from_utf8(bytes.as_slice()).unwrap(); let session = VortexSession::empty(); @@ -86,7 +95,11 @@ fn bench_compare_varbin(bencher: divan::Bencher, (len, uniqueness): (usize, usiz #[divan::bench(args = LENGTH_AND_UNIQUE_VALUES)] fn bench_compare_varbinview(bencher: divan::Bencher, (len, uniqueness): (usize, usize)) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, uniqueness)); - let dict = dict_encode(&varbinview_arr.to_array()).unwrap(); + let dict = dict_encode( + &varbinview_arr.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let bytes = varbinview_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); let value = from_utf8(bytes.as_slice()).unwrap(); let session = VortexSession::empty(); @@ -120,7 +133,11 @@ fn bench_compare_sliced_dict_primitive( (codes_len, values_len): (usize, usize), ) { let primitive_arr = gen_primitive_for_dict::(codes_len.max(values_len), values_len); - let dict = dict_encode(&primitive_arr.to_array()).unwrap(); + let dict = dict_encode( + &primitive_arr.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let dict = dict.slice(0..codes_len).unwrap(); let value = primitive_arr.as_slice::()[0]; let session = VortexSession::empty(); @@ -141,7 +158,11 @@ fn bench_compare_sliced_dict_varbinview( (codes_len, values_len): (usize, usize), ) { let varbin_arr = VarBinArray::from(gen_varbin_words(codes_len.max(values_len), values_len)); - let dict = dict_encode(&varbin_arr.to_array()).unwrap(); + let dict = dict_encode( + &varbin_arr.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let dict = dict.slice(0..codes_len).unwrap(); let bytes = varbin_arr.with_iterator(|i| i.next().unwrap().unwrap().to_vec()); let value = from_utf8(bytes.as_slice()).unwrap(); diff --git a/vortex-array/benches/dict_compress.rs b/vortex-array/benches/dict_compress.rs index 20103184e42..0f4102cd635 100644 --- a/vortex-array/benches/dict_compress.rs +++ b/vortex-array/benches/dict_compress.rs @@ -6,6 +6,8 @@ use divan::Bencher; use rand::distr::Distribution; use rand::distr::StandardUniform; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; use vortex_array::arrays::VarBinArray; use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::dict_test::gen_primitive_for_dict; @@ -43,7 +45,7 @@ where bencher .with_inputs(|| &primitive_arr) - .bench_refs(|arr| dict_encode(&arr.to_array())); + .bench_refs(|arr| dict_encode(&arr.to_array(), &mut LEGACY_SESSION.create_execution_ctx())); } #[divan::bench(args = BENCH_ARGS)] @@ -52,7 +54,7 @@ fn encode_varbin(bencher: Bencher, (len, unique_values): (usize, usize)) { bencher .with_inputs(|| &varbin_arr) - .bench_refs(|arr| dict_encode(&arr.to_array())); + .bench_refs(|arr| dict_encode(&arr.to_array(), &mut LEGACY_SESSION.create_execution_ctx())); } #[divan::bench(args = BENCH_ARGS)] @@ -61,7 +63,7 @@ fn encode_varbinview(bencher: Bencher, (len, unique_values): (usize, usize)) { bencher .with_inputs(|| &varbinview_arr) - .bench_refs(|arr| dict_encode(&arr.to_array())); + .bench_refs(|arr| dict_encode(&arr.to_array(), &mut LEGACY_SESSION.create_execution_ctx())); } #[divan::bench(types = [u8, f32, i64], args = BENCH_ARGS)] @@ -71,7 +73,11 @@ where StandardUniform: Distribution, { let primitive_arr = gen_primitive_for_dict::(len, unique_values); - let dict = dict_encode(&primitive_arr.to_array()).unwrap(); + let dict = dict_encode( + &primitive_arr.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); bencher .with_inputs(|| &dict) @@ -81,7 +87,11 @@ where #[divan::bench(args = BENCH_ARGS)] fn decode_varbin(bencher: Bencher, (len, unique_values): (usize, usize)) { let varbin_arr = VarBinArray::from(gen_varbin_words(len, unique_values)); - let dict = dict_encode(&varbin_arr.to_array()).unwrap(); + let dict = dict_encode( + &varbin_arr.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); bencher .with_inputs(|| &dict) @@ -91,7 +101,11 @@ fn decode_varbin(bencher: Bencher, (len, unique_values): (usize, usize)) { #[divan::bench(args = BENCH_ARGS)] fn decode_varbinview(bencher: Bencher, (len, unique_values): (usize, usize)) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(len, unique_values)); - let dict = dict_encode(&varbinview_arr.to_array()).unwrap(); + let dict = dict_encode( + &varbinview_arr.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); bencher .with_inputs(|| &dict) diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index b09392a283c..99e5adcd752 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -2488,7 +2488,7 @@ impl vortex_array::arrays::PrimitiveArray pub fn vortex_array::arrays::PrimitiveArray::as_slice(&self) -> &[T] -pub fn vortex_array::arrays::PrimitiveArray::narrow(&self) -> vortex_error::VortexResult +pub fn vortex_array::arrays::PrimitiveArray::narrow(&self, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::arrays::PrimitiveArray::reinterpret_cast(&self, ptype: vortex_array::dtype::PType) -> Self @@ -4480,9 +4480,9 @@ pub fn vortex_array::builders::dict::DictEncoder::encode(&mut self, array: &vort pub fn vortex_array::builders::dict::DictEncoder::reset(&mut self) -> vortex_array::ArrayRef -pub fn vortex_array::builders::dict::dict_encode(array: &vortex_array::ArrayRef) -> vortex_error::VortexResult +pub fn vortex_array::builders::dict::dict_encode(array: &vortex_array::ArrayRef, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult -pub fn vortex_array::builders::dict::dict_encode_with_constraints(array: &vortex_array::ArrayRef, constraints: &vortex_array::builders::dict::DictConstraints) -> vortex_error::VortexResult +pub fn vortex_array::builders::dict::dict_encode_with_constraints(array: &vortex_array::ArrayRef, constraints: &vortex_array::builders::dict::DictConstraints, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::builders::dict::dict_encoder(array: &vortex_array::ArrayRef, constraints: &vortex_array::builders::dict::DictConstraints) -> alloc::boxed::Box @@ -13762,7 +13762,7 @@ pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::child_name(&se pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult -pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::execute(&self, _options: &Self::Options, args: &dyn vortex_array::scalar_fn::ExecutionArgs, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::execute(&self, _options: &Self::Options, args: &dyn vortex_array::scalar_fn::ExecutionArgs, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::fmt_sql(&self, _options: &Self::Options, expr: &vortex_array::expr::Expression, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result @@ -15262,7 +15262,7 @@ pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::child_name(&se pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult -pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::execute(&self, _options: &Self::Options, args: &dyn vortex_array::scalar_fn::ExecutionArgs, _ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult +pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::execute(&self, _options: &Self::Options, args: &dyn vortex_array::scalar_fn::ExecutionArgs, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_array::scalar_fn::fns::list_contains::ListContains::fmt_sql(&self, _options: &Self::Options, expr: &vortex_array::expr::Expression, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result diff --git a/vortex-array/src/arrays/chunked/compute/take.rs b/vortex-array/src/arrays/chunked/compute/take.rs index 675214e9abf..6c5006854b7 100644 --- a/vortex-array/src/arrays/chunked/compute/take.rs +++ b/vortex-array/src/arrays/chunked/compute/take.rs @@ -14,7 +14,6 @@ use crate::arrays::PrimitiveArray; use crate::arrays::TakeExecute; use crate::arrays::chunked::ChunkedArray; use crate::builtins::ArrayBuiltins; -use crate::canonical::ToCanonical; use crate::dtype::DType; use crate::dtype::PType; use crate::executor::ExecutionCtx; @@ -30,7 +29,7 @@ fn take_chunked( let indices = indices .to_array() .cast(DType::Primitive(PType::U64, indices.dtype().nullability()))? - .to_primitive(); + .execute::(ctx)?; let indices_mask = indices.validity_mask()?; let indices_values = indices.as_slice::(); diff --git a/vortex-array/src/arrays/chunked/vtable/mod.rs b/vortex-array/src/arrays/chunked/vtable/mod.rs index 073282c769a..107fbfb18b1 100644 --- a/vortex-array/src/arrays/chunked/vtable/mod.rs +++ b/vortex-array/src/arrays/chunked/vtable/mod.rs @@ -17,9 +17,9 @@ use crate::EmptyMetadata; use crate::ExecutionCtx; use crate::IntoArray; use crate::Precision; -use crate::ToCanonical; use crate::arrays::ChunkedArray; use crate::arrays::PrimitiveArray; +use crate::arrays::PrimitiveVTable; use crate::arrays::chunked::compute::kernel::PARENT_KERNELS; use crate::arrays::chunked::compute::rules::PARENT_RULES; use crate::arrays::chunked::vtable::canonical::_canonicalize; @@ -163,7 +163,9 @@ impl VTable for ChunkedVTable { // 1 extra offset for the end of the last chunk nchunks + 1, )? - .to_primitive(); + .as_opt::() + .ok_or_else(|| vortex_err!("chunk offsets must be a PrimitiveArray"))? + .clone(); let chunk_offsets_buf = chunk_offsets_array.to_buffer::(); @@ -205,7 +207,10 @@ impl VTable for ChunkedVTable { ); let nchunks = children.len() - 1; - let chunk_offsets_array = children[0].to_primitive(); + let chunk_offsets_array = children[0] + .as_opt::() + .ok_or_else(|| vortex_err!("chunk offsets must be a PrimitiveArray"))? + .clone(); let chunk_offsets_buf = chunk_offsets_array.to_buffer::(); vortex_ensure!( diff --git a/vortex-array/src/arrays/dict/compute/cast.rs b/vortex-array/src/arrays/dict/compute/cast.rs index 7b224f39898..34d1a654f76 100644 --- a/vortex-array/src/arrays/dict/compute/cast.rs +++ b/vortex-array/src/arrays/dict/compute/cast.rs @@ -51,7 +51,9 @@ mod tests { use vortex_buffer::buffer; use crate::IntoArray; + use crate::LEGACY_SESSION; use crate::ToCanonical; + use crate::VortexSessionExecute; use crate::arrays::PrimitiveArray; use crate::arrays::dict::DictVTable; use crate::assert_arrays_eq; @@ -65,7 +67,7 @@ mod tests { #[test] fn test_cast_dict_to_wider_type() { let values = buffer![1i32, 2, 3, 2, 1].into_array(); - let dict = dict_encode(&values).unwrap(); + let dict = dict_encode(&values, &mut LEGACY_SESSION.create_execution_ctx()).unwrap(); let casted = dict .to_array() @@ -84,7 +86,11 @@ mod tests { fn test_cast_dict_nullable() { let values = PrimitiveArray::from_option_iter([Some(10i32), None, Some(20), Some(10), None]); - let dict = dict_encode(&values.to_array()).unwrap(); + let dict = dict_encode( + &values.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let casted = dict .to_array() @@ -100,7 +106,7 @@ mod tests { fn test_cast_dict_allvalid_to_nonnullable_and_back() { // Create an AllValid dict array (no nulls) let values = buffer![10i32, 20, 30, 40].into_array(); - let dict = dict_encode(&values).unwrap(); + let dict = dict_encode(&values, &mut LEGACY_SESSION.create_execution_ctx()).unwrap(); // Verify initial state - codes should be NonNullable, values should be NonNullable assert_eq!(dict.codes().dtype().nullability(), Nullability::NonNullable); @@ -177,10 +183,10 @@ mod tests { } #[rstest] - #[case(dict_encode(&buffer![1i32, 2, 3, 2, 1, 3].into_array()).unwrap().into_array())] - #[case(dict_encode(&buffer![100u32, 200, 100, 300, 200].into_array()).unwrap().into_array())] - #[case(dict_encode(&PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).into_array()).unwrap().into_array())] - #[case(dict_encode(&buffer![1.5f32, 2.5, 1.5, 3.5].into_array()).unwrap().into_array())] + #[case(dict_encode(&buffer![1i32, 2, 3, 2, 1, 3].into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap().into_array())] + #[case(dict_encode(&buffer![100u32, 200, 100, 300, 200].into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap().into_array())] + #[case(dict_encode(&PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap().into_array())] + #[case(dict_encode(&buffer![1.5f32, 2.5, 1.5, 3.5].into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap().into_array())] fn test_cast_dict_conformance(#[case] array: crate::ArrayRef) { test_cast_conformance(&array); } diff --git a/vortex-array/src/arrays/dict/compute/min_max.rs b/vortex-array/src/arrays/dict/compute/min_max.rs index db2abbbee12..c88d615dce5 100644 --- a/vortex-array/src/arrays/dict/compute/min_max.rs +++ b/vortex-array/src/arrays/dict/compute/min_max.rs @@ -49,6 +49,8 @@ mod tests { use super::DictArray; use crate::ArrayRef; use crate::IntoArray; + use crate::LEGACY_SESSION; + use crate::VortexSessionExecute; use crate::arrays::PrimitiveArray; use crate::builders::dict::dict_encode; use crate::compute::min_max; @@ -94,7 +96,7 @@ mod tests { ).unwrap(), (1, 5) )] - #[case::single(dict_encode(&buffer![42i32].into_array()).unwrap(), (42, 42))] + #[case::single(dict_encode(&buffer![42i32].into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap(), (42, 42))] #[case::nullable_codes( DictArray::try_new( PrimitiveArray::from_option_iter([Some(0u32), None, Some(1), Some(2)]).into_array(), @@ -104,7 +106,8 @@ mod tests { )] #[case::nullable_values( dict_encode( - &PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).to_array() + &PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).to_array(), + &mut LEGACY_SESSION.create_execution_ctx() ).unwrap(), (1, 2) )] @@ -115,7 +118,11 @@ mod tests { #[test] fn test_sliced_dict() { let reference = PrimitiveArray::from_iter([1, 5, 10, 50, 100]); - let dict = dict_encode(&reference.to_array()).unwrap(); + let dict = dict_encode( + &reference.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let sliced = dict.slice(1..3).unwrap(); assert_min_max(&sliced, Some((5, 10))); } diff --git a/vortex-array/src/arrays/dict/compute/mod.rs b/vortex-array/src/arrays/dict/compute/mod.rs index d8f81eee47e..98639bcba57 100644 --- a/vortex-array/src/arrays/dict/compute/mod.rs +++ b/vortex-array/src/arrays/dict/compute/mod.rs @@ -60,7 +60,9 @@ mod test { use crate::Array; use crate::ArrayRef; use crate::IntoArray; + use crate::LEGACY_SESSION; use crate::ToCanonical; + use crate::VortexSessionExecute; use crate::accessor::ArrayAccessor; use crate::arrays::ConstantArray; use crate::arrays::PrimitiveArray; @@ -87,8 +89,11 @@ mod test { }) .collect(); - let dict = - dict_encode(&PrimitiveArray::from_option_iter(values.clone()).to_array()).unwrap(); + let dict = dict_encode( + &PrimitiveArray::from_option_iter(values.clone()).to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let actual = dict.to_primitive(); let expected = PrimitiveArray::from_option_iter(values); @@ -101,7 +106,11 @@ mod test { let unique_values: Vec = (0..32).collect(); let expected = PrimitiveArray::from_iter((0..1000).map(|i| unique_values[i % 32])); - let dict = dict_encode(&expected.to_array()).unwrap(); + let dict = dict_encode( + &expected.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let actual = dict.to_primitive(); assert_arrays_eq!(actual, expected); @@ -112,7 +121,11 @@ mod test { let unique_values: Vec = (0..100).collect(); let expected = PrimitiveArray::from_iter((0..1000).map(|i| unique_values[i % 100])); - let dict = dict_encode(&expected.to_array()).unwrap(); + let dict = dict_encode( + &expected.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let actual = dict.to_primitive(); assert_arrays_eq!(actual, expected); @@ -125,7 +138,11 @@ mod test { DType::Utf8(Nullability::Nullable), ); assert_eq!(reference.len(), 6); - let dict = dict_encode(&reference.to_array()).unwrap(); + let dict = dict_encode( + &reference.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let flattened_dict = dict.to_varbinview(); assert_eq!( flattened_dict.with_iterator(|iter| iter @@ -146,7 +163,11 @@ mod test { Some(1), Some(5), ]); - let dict = dict_encode(&reference.to_array()).unwrap(); + let dict = dict_encode( + &reference.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); dict.slice(1..4).unwrap() } @@ -164,12 +185,17 @@ mod test { #[test] fn test_mask_dict_array() { - let array = dict_encode(&buffer![2, 0, 2, 0, 10].into_array()).unwrap(); + let array = dict_encode( + &buffer![2, 0, 2, 0, 10].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); test_mask_conformance(&array.to_array()); let array = dict_encode( &PrimitiveArray::from_option_iter([Some(2), None, Some(2), Some(0), Some(10)]) .to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ) .unwrap(); test_mask_conformance(&array.to_array()); @@ -186,6 +212,7 @@ mod test { DType::Utf8(Nullability::Nullable), ) .into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ) .unwrap(); test_mask_conformance(&array.to_array()); @@ -193,12 +220,17 @@ mod test { #[test] fn test_filter_dict_array() { - let array = dict_encode(&buffer![2, 0, 2, 0, 10].into_array()).unwrap(); + let array = dict_encode( + &buffer![2, 0, 2, 0, 10].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); test_filter_conformance(&array.to_array()); let array = dict_encode( &PrimitiveArray::from_option_iter([Some(2), None, Some(2), Some(0), Some(10)]) .to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ) .unwrap(); test_filter_conformance(&array.to_array()); @@ -215,6 +247,7 @@ mod test { DType::Utf8(Nullability::Nullable), ) .into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ) .unwrap(); test_filter_conformance(&array.to_array()); @@ -222,7 +255,11 @@ mod test { #[test] fn test_take_dict() { - let array = dict_encode(&buffer![1, 2].into_array()).unwrap(); + let array = dict_encode( + &buffer![1, 2].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); assert_eq!( array @@ -235,12 +272,17 @@ mod test { #[test] fn test_take_dict_conformance() { - let array = dict_encode(&buffer![2, 0, 2, 0, 10].into_array()).unwrap(); + let array = dict_encode( + &buffer![2, 0, 2, 0, 10].into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); test_take_conformance(&array.to_array()); let array = dict_encode( &PrimitiveArray::from_option_iter([Some(2), None, Some(2), Some(0), Some(10)]) .to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ) .unwrap(); test_take_conformance(&array.to_array()); @@ -257,6 +299,7 @@ mod test { DType::Utf8(Nullability::Nullable), ) .into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), ) .unwrap(); test_take_conformance(&array.to_array()); @@ -269,6 +312,8 @@ mod tests { use vortex_buffer::buffer; use crate::IntoArray; + use crate::LEGACY_SESSION; + use crate::VortexSessionExecute; use crate::arrays::PrimitiveArray; use crate::arrays::VarBinArray; use crate::arrays::dict::DictArray; @@ -279,32 +324,35 @@ mod tests { #[rstest] // Primitive arrays - #[case::dict_i32(dict_encode(&buffer![1i32, 2, 3, 2, 1].into_array()).unwrap())] + #[case::dict_i32(dict_encode(&buffer![1i32, 2, 3, 2, 1].into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap())] #[case::dict_nullable_codes(DictArray::try_new( buffer![0u32, 1, 2, 2, 0].into_array(), PrimitiveArray::from_option_iter([Some(10), Some(20), None]).into_array(), ).unwrap())] #[case::dict_nullable_values(dict_encode( - &PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).to_array() + &PrimitiveArray::from_option_iter([Some(1i32), None, Some(2), Some(1), None]).to_array(), + &mut LEGACY_SESSION.create_execution_ctx() ).unwrap())] - #[case::dict_u64(dict_encode(&buffer![100u64, 200, 100, 300, 200].into_array()).unwrap())] + #[case::dict_u64(dict_encode(&buffer![100u64, 200, 100, 300, 200].into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap())] // String arrays #[case::dict_str(dict_encode( &VarBinArray::from_iter( ["hello", "world", "hello", "test", "world"].map(Some), DType::Utf8(Nullability::NonNullable), - ).into_array() + ).into_array(), + &mut LEGACY_SESSION.create_execution_ctx() ).unwrap())] #[case::dict_nullable_str(dict_encode( &VarBinArray::from_iter( [Some("hello"), None, Some("world"), Some("hello"), None], DType::Utf8(Nullability::Nullable), - ).into_array() + ).into_array(), + &mut LEGACY_SESSION.create_execution_ctx() ).unwrap())] // Edge cases - #[case::dict_single(dict_encode(&buffer![42i32].into_array()).unwrap())] - #[case::dict_all_same(dict_encode(&buffer![5i32, 5, 5, 5, 5].into_array()).unwrap())] - #[case::dict_large(dict_encode(&PrimitiveArray::from_iter((0..1000).map(|i| i % 10)).into_array()).unwrap())] + #[case::dict_single(dict_encode(&buffer![42i32].into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap())] + #[case::dict_all_same(dict_encode(&buffer![5i32, 5, 5, 5, 5].into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap())] + #[case::dict_large(dict_encode(&PrimitiveArray::from_iter((0..1000).map(|i| i % 10)).into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap())] fn test_dict_consistency(#[case] array: DictArray) { test_array_consistency(&array.to_array()); } diff --git a/vortex-array/src/arrays/extension/compute/cast.rs b/vortex-array/src/arrays/extension/compute/cast.rs index 0d683871a14..4dc91d8055b 100644 --- a/vortex-array/src/arrays/extension/compute/cast.rs +++ b/vortex-array/src/arrays/extension/compute/cast.rs @@ -19,11 +19,7 @@ impl CastReduce for ExtensionVTable { unreachable!("Already verified we have an extension dtype"); }; - let new_storage = match array - .storage() - .cast(ext_dtype.storage_dtype().clone()) - .and_then(|a| a.to_canonical().map(|c| c.into_array())) - { + let new_storage = match array.storage().cast(ext_dtype.storage_dtype().clone()) { Ok(arr) => arr, Err(e) => { tracing::warn!("Failed to cast storage array: {e}"); diff --git a/vortex-array/src/arrays/fixed_size_list/compute/cast.rs b/vortex-array/src/arrays/fixed_size_list/compute/cast.rs index 424d5717975..3ef46b8c556 100644 --- a/vortex-array/src/arrays/fixed_size_list/compute/cast.rs +++ b/vortex-array/src/arrays/fixed_size_list/compute/cast.rs @@ -4,7 +4,6 @@ use vortex_error::VortexResult; use crate::ArrayRef; -use crate::IntoArray; use crate::arrays::FixedSizeListArray; use crate::arrays::FixedSizeListVTable; use crate::builtins::ArrayBuiltins; @@ -22,11 +21,7 @@ impl CastReduce for FixedSizeListVTable { return Ok(None); }; - let elements = array - .elements() - .cast((**target_element_type).clone())? - .to_canonical()? - .into_array(); + let elements = array.elements().cast((**target_element_type).clone())?; let validity = array .validity() .clone() diff --git a/vortex-array/src/arrays/list/array.rs b/vortex-array/src/arrays/list/array.rs index 205cf125167..62f949c2683 100644 --- a/vortex-array/src/arrays/list/array.rs +++ b/vortex-array/src/arrays/list/array.rs @@ -10,8 +10,10 @@ use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_error::vortex_panic; +use crate::AnyCanonical; use crate::Array; use crate::ArrayRef; +use crate::Canonical; use crate::IntoArray; use crate::arrays::ConstantArray; use crate::arrays::ListVTable; @@ -315,8 +317,8 @@ impl ListArray { /// referenced by the `offsets`. pub fn reset_offsets(&self, recurse: bool) -> VortexResult { let mut elements = self.sliced_elements()?; - if recurse && elements.is_canonical() { - elements = elements.to_canonical()?.compact()?.into_array(); + if recurse && let Some(canonical_view) = elements.as_opt::() { + elements = Canonical::from(canonical_view).compact()?.into_array(); } else if recurse && let Some(child_list_array) = elements.as_opt::() { elements = child_list_array.reset_offsets(recurse)?.into_array(); } diff --git a/vortex-array/src/arrays/list/compute/cast.rs b/vortex-array/src/arrays/list/compute/cast.rs index 2295469b804..d0b4d97d170 100644 --- a/vortex-array/src/arrays/list/compute/cast.rs +++ b/vortex-array/src/arrays/list/compute/cast.rs @@ -4,7 +4,6 @@ use vortex_error::VortexResult; use crate::ArrayRef; -use crate::IntoArray; use crate::arrays::ListArray; use crate::arrays::ListVTable; use crate::builtins::ArrayBuiltins; @@ -23,11 +22,7 @@ impl CastReduce for ListVTable { .clone() .cast_nullability(dtype.nullability(), array.len())?; - let new_elements = array - .elements() - .cast((**target_element_type).clone())? - .to_canonical()? - .into_array(); + let new_elements = array.elements().cast((**target_element_type).clone())?; ListArray::try_new(new_elements, array.offsets().clone(), validity) .map(|a| Some(a.to_array())) @@ -42,6 +37,9 @@ mod tests { use vortex_buffer::buffer; use crate::IntoArray; + use crate::LEGACY_SESSION; + use crate::RecursiveCanonical; + use crate::VortexSessionExecute; use crate::arrays::BoolArray; use crate::arrays::ListArray; use crate::arrays::PrimitiveArray; @@ -114,7 +112,8 @@ mod tests { .and_then(|a| a.to_canonical().map(|c| c.into_array())); assert!(result.is_err()); - // Nulls in list element array + // Nulls in list element array — the inner cast error is deferred until + // the elements are executed. let list = ListArray::try_new( PrimitiveArray::from_option_iter([Some(0i32), Some(2), None, None]).to_array(), buffer![0, 2, 3].into_array().to_array(), @@ -127,10 +126,10 @@ mod tests { Nullability::NonNullable, ); - let result = list - .to_array() - .cast(target_dtype) - .and_then(|a| a.to_canonical().map(|c| c.into_array())); + let result = list.to_array().cast(target_dtype).and_then(|a| { + a.execute::(&mut LEGACY_SESSION.create_execution_ctx()) + .map(|c| c.0.into_array()) + }); assert!(result.is_err()); } diff --git a/vortex-array/src/arrays/listview/compute/cast.rs b/vortex-array/src/arrays/listview/compute/cast.rs index 996b7bd94b9..8ac72c61151 100644 --- a/vortex-array/src/arrays/listview/compute/cast.rs +++ b/vortex-array/src/arrays/listview/compute/cast.rs @@ -20,11 +20,7 @@ impl CastReduce for ListViewVTable { }; // Cast the elements to the target element type. - let new_elements = array - .elements() - .cast((**target_element_type).clone())? - .to_canonical()? - .into_array(); + let new_elements = array.elements().cast((**target_element_type).clone())?; let validity = array .validity() .clone() diff --git a/vortex-array/src/arrays/null/compute/take.rs b/vortex-array/src/arrays/null/compute/take.rs index 52054d39564..995e2027918 100644 --- a/vortex-array/src/arrays/null/compute/take.rs +++ b/vortex-array/src/arrays/null/compute/take.rs @@ -6,9 +6,9 @@ use vortex_error::vortex_bail; use crate::ArrayRef; use crate::IntoArray; -use crate::ToCanonical; use crate::arrays::NullArray; use crate::arrays::NullVTable; +use crate::arrays::PrimitiveVTable; use crate::arrays::TakeReduce; use crate::arrays::TakeReduceAdaptor; use crate::match_each_integer_ptype; @@ -17,17 +17,18 @@ use crate::optimizer::rules::ParentRuleSet; impl TakeReduce for NullVTable { #[allow(clippy::cast_possible_truncation)] fn take(array: &NullArray, indices: &ArrayRef) -> VortexResult> { - let indices = indices.to_primitive(); - - // Enforce all indices are valid - match_each_integer_ptype!(indices.ptype(), |T| { - for index in indices.as_slice::() { - if (*index as usize) >= array.len() { - vortex_bail!(OutOfBounds: *index as usize, 0, array.len()); + // Bounds-check when indices are already a PrimitiveArray (no execution needed). + if let Some(indices) = indices.as_opt::() { + match_each_integer_ptype!(indices.ptype(), |T| { + for index in indices.as_slice::() { + if (*index as usize) >= array.len() { + vortex_bail!(OutOfBounds: *index as usize, 0, array.len()); + } } - } - }); + }); + } + // For NullArray, the result is always null regardless of index values. Ok(Some(NullArray::new(indices.len()).into_array())) } } diff --git a/vortex-array/src/arrays/primitive/array/cast.rs b/vortex-array/src/arrays/primitive/array/cast.rs index 7ba21fe4814..3fe2f4c5270 100644 --- a/vortex-array/src/arrays/primitive/array/cast.rs +++ b/vortex-array/src/arrays/primitive/array/cast.rs @@ -6,7 +6,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_panic; -use crate::ToCanonical; +use crate::ExecutionCtx; use crate::arrays::PrimitiveArray; use crate::builtins::ArrayBuiltins; use crate::compute::min_max; @@ -61,7 +61,7 @@ impl PrimitiveArray { } /// Narrow the array to the smallest possible integer type that can represent all values. - pub fn narrow(&self) -> VortexResult { + pub fn narrow(&self, ctx: &mut ExecutionCtx) -> VortexResult { if !self.ptype().is_int() { return Ok(self.clone()); } @@ -93,46 +93,46 @@ impl PrimitiveArray { if min < 0 || max < 0 { // Signed if min >= i8::MIN as i64 && max <= i8::MAX as i64 { - return Ok(self + return self .to_array() .cast(DType::Primitive(PType::I8, self.dtype().nullability()))? - .to_primitive()); + .execute::(ctx); } if min >= i16::MIN as i64 && max <= i16::MAX as i64 { - return Ok(self + return self .to_array() .cast(DType::Primitive(PType::I16, self.dtype().nullability()))? - .to_primitive()); + .execute::(ctx); } if min >= i32::MIN as i64 && max <= i32::MAX as i64 { - return Ok(self + return self .to_array() .cast(DType::Primitive(PType::I32, self.dtype().nullability()))? - .to_primitive()); + .execute::(ctx); } } else { // Unsigned if max <= u8::MAX as i64 { - return Ok(self + return self .to_array() .cast(DType::Primitive(PType::U8, self.dtype().nullability()))? - .to_primitive()); + .execute::(ctx); } if max <= u16::MAX as i64 { - return Ok(self + return self .to_array() .cast(DType::Primitive(PType::U16, self.dtype().nullability()))? - .to_primitive()); + .execute::(ctx); } if max <= u32::MAX as i64 { - return Ok(self + return self .to_array() .cast(DType::Primitive(PType::U32, self.dtype().nullability()))? - .to_primitive()); + .execute::(ctx); } } @@ -146,6 +146,8 @@ mod tests { use vortex_buffer::Buffer; use vortex_buffer::buffer; + use crate::LEGACY_SESSION; + use crate::VortexSessionExecute; use crate::arrays::PrimitiveArray; use crate::dtype::DType; use crate::dtype::Nullability; @@ -159,7 +161,9 @@ mod tests { Validity::AllInvalid, ); - let result = array.narrow().unwrap(); + let result = array + .narrow(&mut LEGACY_SESSION.create_execution_ctx()) + .unwrap(); assert_eq!( result.dtype(), &DType::Primitive(PType::U8, Nullability::Nullable) @@ -178,7 +182,9 @@ mod tests { #[case(vec![i32::MIN as i64, i32::MAX as i64], PType::I32)] fn test_downcast_signed(#[case] values: Vec, #[case] expected_ptype: PType) { let array = PrimitiveArray::from_iter(values); - let result = array.narrow().unwrap(); + let result = array + .narrow(&mut LEGACY_SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(result.ptype(), expected_ptype); } @@ -190,21 +196,27 @@ mod tests { #[case(vec![0_u64, u32::MAX as u64], PType::U32)] fn test_downcast_unsigned(#[case] values: Vec, #[case] expected_ptype: PType) { let array = PrimitiveArray::from_iter(values); - let result = array.narrow().unwrap(); + let result = array + .narrow(&mut LEGACY_SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(result.ptype(), expected_ptype); } #[test] fn test_downcast_keeps_original_if_too_large() { let array = PrimitiveArray::from_iter(vec![0_u64, u64::MAX]); - let result = array.narrow().unwrap(); + let result = array + .narrow(&mut LEGACY_SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(result.ptype(), PType::U64); } #[test] fn test_downcast_preserves_nullability() { let array = PrimitiveArray::from_option_iter([Some(0_i32), None, Some(127)]); - let result = array.narrow().unwrap(); + let result = array + .narrow(&mut LEGACY_SESSION.create_execution_ctx()) + .unwrap(); assert_eq!( result.dtype(), &DType::Primitive(PType::U8, Nullability::Nullable) @@ -217,7 +229,9 @@ mod tests { fn test_downcast_preserves_values() { let values = vec![-100_i16, 0, 100]; let array = PrimitiveArray::from_iter(values); - let result = array.narrow().unwrap(); + let result = array + .narrow(&mut LEGACY_SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(result.ptype(), PType::I8); // Check that the values were properly downscaled @@ -228,24 +242,29 @@ mod tests { #[test] fn test_downcast_with_mixed_signs_chooses_signed() { let array = PrimitiveArray::from_iter(vec![-1_i32, 200]); - let result = array.narrow().unwrap(); + let result = array + .narrow(&mut LEGACY_SESSION.create_execution_ctx()) + .unwrap(); assert_eq!(result.ptype(), PType::I16); } #[test] fn test_downcast_floats() { let array = PrimitiveArray::from_iter(vec![1.0_f32, 2.0, 3.0]); - let result = array.narrow().unwrap(); + let result = array + .narrow(&mut LEGACY_SESSION.create_execution_ctx()) + .unwrap(); // Floats should remain unchanged since they can't be downscaled to integers assert_eq!(result.ptype(), PType::F32); } #[test] fn test_downcast_empty_array() { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); let array = PrimitiveArray::new(Buffer::::empty(), Validity::AllInvalid); - let result = array.narrow().unwrap(); + let result = array.narrow(&mut ctx).unwrap(); let array2 = PrimitiveArray::new(Buffer::::empty(), Validity::NonNullable); - let result2 = array2.narrow().unwrap(); + let result2 = array2.narrow(&mut ctx).unwrap(); // Empty arrays should not have their validity changed assert_eq!(result.validity, Validity::AllInvalid); assert_eq!(result2.validity, Validity::NonNullable); diff --git a/vortex-array/src/arrow/record_batch.rs b/vortex-array/src/arrow/record_batch.rs index 3ccf65920a9..a1734416f40 100644 --- a/vortex-array/src/arrow/record_batch.rs +++ b/vortex-array/src/arrow/record_batch.rs @@ -22,7 +22,9 @@ impl TryFrom<&dyn Array> for RecordBatch { type Error = VortexError; fn try_from(value: &dyn Array) -> VortexResult { - let Canonical::Struct(struct_array) = value.to_canonical()? else { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let Canonical::Struct(struct_array) = value.to_array().execute::(&mut ctx)? + else { vortex_bail!("RecordBatch can only be constructed from ") }; @@ -34,7 +36,7 @@ impl TryFrom<&dyn Array> for RecordBatch { let data_type = struct_array.dtype().to_arrow_dtype()?; let array_ref = struct_array .into_array() - .execute_arrow(Some(&data_type), &mut LEGACY_SESSION.create_execution_ctx())?; + .execute_arrow(Some(&data_type), &mut ctx)?; Ok(RecordBatch::from(array_ref.as_struct())) } } diff --git a/vortex-array/src/builders/dict/bytes.rs b/vortex-array/src/builders/dict/bytes.rs index 7aa7fde0981..f21c1379561 100644 --- a/vortex-array/src/builders/dict/bytes.rs +++ b/vortex-array/src/builders/dict/bytes.rs @@ -206,7 +206,9 @@ impl DictEncoder for BytesDictBuilder { mod test { use std::str; + use crate::LEGACY_SESSION; use crate::ToCanonical; + use crate::VortexSessionExecute; use crate::accessor::ArrayAccessor; use crate::arrays::VarBinArray; use crate::builders::dict::dict_encode; @@ -214,7 +216,8 @@ mod test { #[test] fn encode_varbin() { let arr = VarBinArray::from(vec!["hello", "world", "hello", "again", "world"]); - let dict = dict_encode(&arr.to_array()).unwrap(); + let dict = + dict_encode(&arr.to_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap(); assert_eq!( dict.codes().to_primitive().as_slice::(), &[0, 1, 0, 2, 1] @@ -243,7 +246,8 @@ mod test { ] .into_iter() .collect(); - let dict = dict_encode(&arr.to_array()).unwrap(); + let dict = + dict_encode(&arr.to_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap(); assert_eq!( dict.codes().to_primitive().as_slice::(), &[0, 1, 2, 0, 1, 3, 2, 1] @@ -260,7 +264,8 @@ mod test { #[test] fn repeated_values() { let arr = VarBinArray::from(vec!["a", "a", "b", "b", "a", "b", "a", "b"]); - let dict = dict_encode(&arr.to_array()).unwrap(); + let dict = + dict_encode(&arr.to_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap(); dict.values().to_varbinview().with_iterator(|iter| { assert_eq!( iter.flatten() diff --git a/vortex-array/src/builders/dict/mod.rs b/vortex-array/src/builders/dict/mod.rs index d246be42607..abe53f682e7 100644 --- a/vortex-array/src/builders/dict/mod.rs +++ b/vortex-array/src/builders/dict/mod.rs @@ -3,14 +3,15 @@ use bytes::bytes_dict_builder; use primitive::primitive_dict_builder; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_panic; use crate::Array; use crate::ArrayRef; +use crate::ExecutionCtx; use crate::IntoArray; -use crate::ToCanonical; use crate::arrays::DictArray; use crate::arrays::PrimitiveVTable; use crate::arrays::VarBinVTable; @@ -64,9 +65,14 @@ pub fn dict_encoder(array: &ArrayRef, constraints: &DictConstraints) -> Box VortexResult { let mut encoder = dict_encoder(array, constraints); - let codes = encoder.encode(array).to_primitive().narrow()?; + let codes = encoder + .encode(array) + .as_opt::() + .vortex_expect("dict codes must be a PrimitiveArray") + .narrow(ctx)?; // SAFETY: The encoding process will produce a value set of codes and values // All values in the dictionary are guaranteed to be referenced by at least one code // since we build the dictionary from the codes we observe during encoding @@ -78,8 +84,8 @@ pub fn dict_encode_with_constraints( } } -pub fn dict_encode(array: &ArrayRef) -> VortexResult { - let dict_array = dict_encode_with_constraints(array, &UNCONSTRAINED)?; +pub fn dict_encode(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + let dict_array = dict_encode_with_constraints(array, &UNCONSTRAINED, ctx)?; if dict_array.len() != array.len() { vortex_bail!( "must have encoded all {} elements, but only encoded {}", diff --git a/vortex-array/src/builders/dict/primitive.rs b/vortex-array/src/builders/dict/primitive.rs index 5619cdba1f8..4322fe07453 100644 --- a/vortex-array/src/builders/dict/primitive.rs +++ b/vortex-array/src/builders/dict/primitive.rs @@ -160,6 +160,8 @@ mod test { use crate::Array; use crate::IntoArray as _; + use crate::LEGACY_SESSION; + use crate::VortexSessionExecute; use crate::arrays::PrimitiveArray; use crate::assert_arrays_eq; use crate::builders::dict::dict_encode; @@ -167,7 +169,7 @@ mod test { #[test] fn encode_primitive() { let arr = buffer![1, 1, 3, 3, 3].into_array(); - let dict = dict_encode(&arr).unwrap(); + let dict = dict_encode(&arr, &mut LEGACY_SESSION.create_execution_ctx()).unwrap(); let expected_codes = buffer![0u8, 0, 1, 1, 1].into_array(); assert_arrays_eq!(dict.codes(), expected_codes); @@ -188,7 +190,8 @@ mod test { Some(3), None, ]); - let dict = dict_encode(&arr.to_array()).unwrap(); + let dict = + dict_encode(&arr.to_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap(); let expected_codes = buffer![0u8, 0, 1, 2, 2, 1, 2, 1].into_array(); assert_arrays_eq!(dict.codes(), expected_codes); diff --git a/vortex-array/src/compute/conformance/cast.rs b/vortex-array/src/compute/conformance/cast.rs index 1b5d099f23d..041584a5233 100644 --- a/vortex-array/src/compute/conformance/cast.rs +++ b/vortex-array/src/compute/conformance/cast.rs @@ -5,9 +5,11 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_panic; -use crate::Array; use crate::ArrayRef; use crate::IntoArray; +use crate::LEGACY_SESSION; +use crate::RecursiveCanonical; +use crate::VortexSessionExecute; use crate::builtins::ArrayBuiltins; use crate::compute::MinMaxResult; use crate::compute::min_max; @@ -18,7 +20,11 @@ use crate::scalar::Scalar; /// Cast and force execution via `to_canonical`, returning the canonical array. fn cast_and_execute(array: &ArrayRef, dtype: DType) -> VortexResult { - array.cast(dtype)?.to_canonical().map(|c| c.into_array()) + Ok(array + .cast(dtype)? + .execute::(&mut LEGACY_SESSION.create_execution_ctx())? + .0 + .into_array()) } /// Test conformance of the cast compute function for an array. diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index 37deb7af526..569dfd97ff1 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -26,9 +26,9 @@ use crate::ArrayRef; use crate::ArrayVisitor; use crate::ExecutionCtx; use crate::IntoArray; -use crate::ToCanonical; use crate::arrays::BoolArray; use crate::arrays::PrimitiveArray; +use crate::arrays::PrimitiveVTable; use crate::builtins::ArrayBuiltins; use crate::compute::is_sorted; use crate::dtype::DType; @@ -399,8 +399,7 @@ impl Patches { /// [`SearchResult::Found`] with the position if needle exists, or [`SearchResult::NotFound`] /// with the insertion point if not found. fn search_index_binary_search(indices: &ArrayRef, needle: usize) -> VortexResult { - if indices.is_canonical() { - let primitive = indices.to_primitive(); + if let Some(primitive) = indices.as_opt::() { match_each_integer_ptype!(primitive.ptype(), |T| { let Ok(needle) = T::try_from(needle) else { // If the needle is not of type T, then it cannot possibly be in this array. @@ -626,16 +625,8 @@ impl Patches { } // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship - let filtered_indices = self - .indices - .filter(filter_mask.clone())? - .to_canonical()? - .into_array(); - let filtered_values = self - .values - .filter(filter_mask)? - .to_canonical()? - .into_array(); + let filtered_indices = self.indices.filter(filter_mask.clone())?; + let filtered_values = self.values.filter(filter_mask)?; Ok(Some(Self { array_len: self.array_len, @@ -1167,10 +1158,8 @@ fn filter_patches_with_mask( } let new_patch_indices = new_patch_indices.into_array(); - let new_patch_values = patch_values - .filter(Mask::from_indices(patch_values.len(), new_mask_indices))? - .to_canonical()? - .into_array(); + let new_patch_values = + patch_values.filter(Mask::from_indices(patch_values.len(), new_mask_indices))?; Ok(Some(Patches::new( true_count, diff --git a/vortex-array/src/scalar_fn/fns/list_contains/mod.rs b/vortex-array/src/scalar_fn/fns/list_contains/mod.rs index 5481c285973..b5d703d038c 100644 --- a/vortex-array/src/scalar_fn/fns/list_contains/mod.rs +++ b/vortex-array/src/scalar_fn/fns/list_contains/mod.rs @@ -21,7 +21,6 @@ use crate::Array; use crate::ArrayRef; use crate::ExecutionCtx; use crate::IntoArray; -use crate::ToCanonical; use crate::arrays::BoolArray; use crate::arrays::ConstantArray; use crate::arrays::ConstantVTable; @@ -125,7 +124,7 @@ impl ScalarFnVTable for ListContains { &self, _options: &Self::Options, args: &dyn ExecutionArgs, - _ctx: &mut ExecutionCtx, + ctx: &mut ExecutionCtx, ) -> VortexResult { let list_array = args.get(0)?; let value_array = args.get(1)?; @@ -137,7 +136,7 @@ impl ScalarFnVTable for ListContains { return Ok(ConstantArray::new(result, args.row_count()).into_array()); } - compute_list_contains(&list_array, &value_array) + compute_list_contains(&list_array, &value_array, ctx) } fn stat_falsification( @@ -204,7 +203,11 @@ fn compute_contains_scalar(list: &Scalar, needle: &Scalar) -> VortexResult VortexResult { +fn compute_list_contains( + array: &ArrayRef, + value: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult { let DType::List(elem_dtype, _) = array.dtype() else { vortex_bail!("Array must be of List type"); }; @@ -227,7 +230,7 @@ fn compute_list_contains(array: &ArrayRef, value: &ArrayRef) -> VortexResult VortexResult { // If the list array is constant, we perform a single comparison. if array.len() > 1 && array.is::() { - let contains = list_contains_scalar(&array.slice(0..1)?, value, nullability)?; + let contains = list_contains_scalar(&array.slice(0..1)?, value, nullability, ctx)?; return Ok(ConstantArray::new(contains.scalar_at(0)?, array.len()).into_array()); } - let list_array = array.to_listview(); + let list_array = array.clone().execute::(ctx)?; let elems = list_array.elements(); if elems.is_empty() { @@ -290,7 +294,7 @@ fn list_contains_scalar( let rhs = ConstantArray::new(value.clone(), elems.len()); let matching_elements = Binary.try_new_array(elems.len(), Operator::Eq, &[elems.clone(), rhs.to_array()])?; - let matches = matching_elements.to_bool(); + let matches = matching_elements.execute::(ctx)?; // Fast path: no elements match. if let Some(pred) = matches.as_constant() { @@ -316,14 +320,17 @@ fn list_contains_scalar( // All elements match, and all comparisons are valid (result in `true`). Some(true) => { // True, unless the list itself is empty or NULL. - list_is_not_empty(&list_array, nullability) + list_is_not_empty(&list_array, nullability, ctx) } }; } // Get the offsets and sizes as primitive arrays. - let offsets = list_array.offsets().to_primitive(); - let sizes = list_array.sizes().to_primitive(); + let offsets = list_array + .offsets() + .clone() + .execute::(ctx)?; + let sizes = list_array.sizes().clone().execute::(ctx)?; // Process based on the offset and size types. let list_matches = match_each_integer_ptype!(offsets.ptype(), |O| { @@ -407,6 +414,7 @@ fn list_false_or_null( fn list_is_not_empty( list_array: &ListViewArray, nullability: Nullability, + ctx: &mut ExecutionCtx, ) -> VortexResult { // Short-circuit for all invalid. if matches!(list_array.validity(), Validity::AllInvalid) { @@ -417,7 +425,7 @@ fn list_is_not_empty( .into_array()); } - let sizes = list_array.sizes().to_primitive(); + let sizes = list_array.sizes().clone().execute::(ctx)?; let buffer = match_each_integer_ptype!(sizes.ptype(), |S| { BitBuffer::from_iter(sizes.as_slice::().iter().map(|&size| size != S::zero())) }); diff --git a/vortex-btrblocks/benches/dict_encode.rs b/vortex-btrblocks/benches/dict_encode.rs index 9bed0f11936..2c8f57db3a3 100644 --- a/vortex-btrblocks/benches/dict_encode.rs +++ b/vortex-btrblocks/benches/dict_encode.rs @@ -5,6 +5,8 @@ use divan::Bencher; use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; use vortex_array::arrays::BoolArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::builders::dict::dict_encode; @@ -32,9 +34,9 @@ fn make_array() -> PrimitiveArray { #[divan::bench] fn encode_generic(bencher: Bencher) { let array = make_array().into_array(); - bencher - .with_inputs(|| &array) - .bench_refs(|array| dict_encode(array).unwrap()); + bencher.with_inputs(|| &array).bench_refs(|array| { + dict_encode(array, &mut LEGACY_SESSION.create_execution_ctx()).unwrap() + }); } #[cfg(not(codspeed))] diff --git a/vortex-btrblocks/public-api.lock b/vortex-btrblocks/public-api.lock index c0a6c24cb9c..b9b3aa259b0 100644 --- a/vortex-btrblocks/public-api.lock +++ b/vortex-btrblocks/public-api.lock @@ -208,7 +208,7 @@ pub fn vortex_btrblocks::BtrBlocksCompressor::default() -> Self impl vortex_btrblocks::CanonicalCompressor for vortex_btrblocks::BtrBlocksCompressor -pub fn vortex_btrblocks::BtrBlocksCompressor::compress_canonical(&self, array: vortex_array::canonical::Canonical, ctx: vortex_btrblocks::ctx::CompressorContext, excludes: vortex_btrblocks::ctx::Excludes<'_>) -> vortex_error::VortexResult +pub fn vortex_btrblocks::BtrBlocksCompressor::compress_canonical(&self, array: vortex_array::canonical::Canonical, ctx: vortex_btrblocks::ctx::CompressorContext, excludes: vortex_btrblocks::ctx::Excludes<'_>, exec_ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_btrblocks::BtrBlocksCompressor::float_schemes(&self) -> &[&'static dyn vortex_btrblocks::compressor::float::FloatScheme] @@ -282,7 +282,7 @@ pub fn vortex_btrblocks::IntegerStats::source(&self) -> &vortex_array::arrays::p pub trait vortex_btrblocks::CanonicalCompressor -pub fn vortex_btrblocks::CanonicalCompressor::compress_canonical(&self, array: vortex_array::canonical::Canonical, ctx: vortex_btrblocks::ctx::CompressorContext, excludes: vortex_btrblocks::ctx::Excludes<'_>) -> vortex_error::VortexResult +pub fn vortex_btrblocks::CanonicalCompressor::compress_canonical(&self, array: vortex_array::canonical::Canonical, ctx: vortex_btrblocks::ctx::CompressorContext, excludes: vortex_btrblocks::ctx::Excludes<'_>, exec_ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_btrblocks::CanonicalCompressor::float_schemes(&self) -> &[&'static dyn vortex_btrblocks::compressor::float::FloatScheme] @@ -292,7 +292,7 @@ pub fn vortex_btrblocks::CanonicalCompressor::string_schemes(&self) -> &[&'stati impl vortex_btrblocks::CanonicalCompressor for vortex_btrblocks::BtrBlocksCompressor -pub fn vortex_btrblocks::BtrBlocksCompressor::compress_canonical(&self, array: vortex_array::canonical::Canonical, ctx: vortex_btrblocks::ctx::CompressorContext, excludes: vortex_btrblocks::ctx::Excludes<'_>) -> vortex_error::VortexResult +pub fn vortex_btrblocks::BtrBlocksCompressor::compress_canonical(&self, array: vortex_array::canonical::Canonical, ctx: vortex_btrblocks::ctx::CompressorContext, excludes: vortex_btrblocks::ctx::Excludes<'_>, exec_ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult pub fn vortex_btrblocks::BtrBlocksCompressor::float_schemes(&self) -> &[&'static dyn vortex_btrblocks::compressor::float::FloatScheme] diff --git a/vortex-btrblocks/src/canonical_compressor.rs b/vortex-btrblocks/src/canonical_compressor.rs index 1af8d637b22..068aaaa0440 100644 --- a/vortex-btrblocks/src/canonical_compressor.rs +++ b/vortex-btrblocks/src/canonical_compressor.rs @@ -6,8 +6,11 @@ use vortex_array::Array; use vortex_array::ArrayRef; use vortex_array::Canonical; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; use vortex_array::ToCanonical; +use vortex_array::VortexSessionExecute; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::ExtensionArray; use vortex_array::arrays::FixedSizeListArray; @@ -50,6 +53,7 @@ pub trait CanonicalCompressor { array: Canonical, ctx: CompressorContext, excludes: Excludes, + exec_ctx: &mut ExecutionCtx, ) -> VortexResult; /// Returns the enabled integer compression schemes. @@ -118,7 +122,29 @@ impl BtrBlocksCompressor { // Compact it, removing any wasted space before we attempt to compress it let compact = canonical.compact()?; - self.compress_canonical(compact, CompressorContext::default(), Excludes::none()) + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + self.compress_canonical( + compact, + CompressorContext::default(), + Excludes::none(), + &mut exec_ctx, + ) + } + + /// Compresses an array using an existing execution context. + pub(crate) fn compress_with_ctx( + &self, + array: &ArrayRef, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let canonical = array.to_canonical()?; + let compact = canonical.compact()?; + self.compress_canonical( + compact, + CompressorContext::default(), + Excludes::none(), + exec_ctx, + ) } pub(crate) fn integer_compressor(&self) -> IntCompressor<'_> { @@ -144,21 +170,23 @@ impl BtrBlocksCompressor { &self, list_array: ListArray, ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Reset the offsets to remove garbage data that might prevent us from narrowing our // offsets (there could be a large amount of trailing garbage data that the current // views do not reference at all). let list_array = list_array.reset_offsets(true)?; - let compressed_elems = self.compress(list_array.elements())?; + let compressed_elems = self.compress_with_ctx(list_array.elements(), exec_ctx)?; // Note that since the type of our offsets are not encoded in our `DType`, and since // we guarantee above that all elements are referenced by offsets, we may narrow the // widths. let compressed_offsets = self.compress_canonical( - Canonical::Primitive(list_array.offsets().to_primitive().narrow()?), + Canonical::Primitive(list_array.offsets().to_primitive().narrow(exec_ctx)?), ctx, Excludes::from(&[IntCode::Dict]), + exec_ctx, )?; Ok(ListArray::try_new( @@ -175,17 +203,20 @@ impl BtrBlocksCompressor { &self, list_view: ListViewArray, ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - let compressed_elems = self.compress(list_view.elements())?; + let compressed_elems = self.compress_with_ctx(list_view.elements(), exec_ctx)?; let compressed_offsets = self.compress_canonical( - Canonical::Primitive(list_view.offsets().to_primitive().narrow()?), + Canonical::Primitive(list_view.offsets().to_primitive().narrow(exec_ctx)?), ctx, Excludes::none(), + exec_ctx, )?; let compressed_sizes = self.compress_canonical( - Canonical::Primitive(list_view.sizes().to_primitive().narrow()?), + Canonical::Primitive(list_view.sizes().to_primitive().narrow(exec_ctx)?), ctx, Excludes::none(), + exec_ctx, )?; Ok(ListViewArray::try_new( compressed_elems, @@ -206,6 +237,7 @@ impl CanonicalCompressor for BtrBlocksCompressor { array: Canonical, ctx: CompressorContext, excludes: Excludes, + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { match array { Canonical::Null(null_array) => Ok(null_array.into_array()), @@ -213,19 +245,29 @@ impl CanonicalCompressor for BtrBlocksCompressor { Canonical::Bool(bool_array) => Ok(bool_array.into_array()), Canonical::Primitive(primitive) => { if primitive.ptype().is_int() { - self.integer_compressor() - .compress(self, &primitive, ctx, excludes.int) + self.integer_compressor().compress( + self, + &primitive, + ctx, + excludes.int, + exec_ctx, + ) } else { - self.float_compressor() - .compress(self, &primitive, ctx, excludes.float) + self.float_compressor().compress( + self, + &primitive, + ctx, + excludes.float, + exec_ctx, + ) } } - Canonical::Decimal(decimal) => compress_decimal(self, &decimal), + Canonical::Decimal(decimal) => compress_decimal(self, &decimal, exec_ctx), Canonical::Struct(struct_array) => { let fields = struct_array .unmasked_fields() .iter() - .map(|field| self.compress(field)) + .map(|field| self.compress_with_ctx(field, exec_ctx)) .collect::, _>>()?; Ok(StructArray::try_new( @@ -241,13 +283,13 @@ impl CanonicalCompressor for BtrBlocksCompressor { // Offsets are already monotonic and non-overlapping, so we // can drop the sizes array and compress as a ListArray. let list_array = list_from_list_view(list_view_array)?; - self.compress_list_array(list_array, ctx) + self.compress_list_array(list_array, ctx, exec_ctx) } else { - self.compress_list_view_array(list_view_array, ctx) + self.compress_list_view_array(list_view_array, ctx, exec_ctx) } } Canonical::FixedSizeList(fsl_array) => { - let compressed_elems = self.compress(fsl_array.elements())?; + let compressed_elems = self.compress_with_ctx(fsl_array.elements(), exec_ctx)?; Ok(FixedSizeListArray::try_new( compressed_elems, @@ -262,8 +304,13 @@ impl CanonicalCompressor for BtrBlocksCompressor { .dtype() .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable)) { - self.string_compressor() - .compress(self, &strings, ctx, excludes.string) + self.string_compressor().compress( + self, + &strings, + ctx, + excludes.string, + exec_ctx, + ) } else { // Binary arrays do not compress Ok(strings.into_array()) @@ -288,11 +335,11 @@ impl CanonicalCompressor for BtrBlocksCompressor { ) .into_array()); } - return compress_temporal(self, temporal_array); + return compress_temporal(self, temporal_array, exec_ctx); } // Compress the underlying storage array. - let compressed_storage = self.compress(ext_array.storage())?; + let compressed_storage = self.compress_with_ctx(ext_array.storage(), exec_ctx)?; Ok( ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage) diff --git a/vortex-btrblocks/src/compressor/decimal.rs b/vortex-btrblocks/src/compressor/decimal.rs index 36b3111453e..f652b1d3b5a 100644 --- a/vortex-btrblocks/src/compressor/decimal.rs +++ b/vortex-btrblocks/src/compressor/decimal.rs @@ -3,6 +3,7 @@ use vortex_array::ArrayRef; use vortex_array::Canonical; +use vortex_array::ExecutionCtx; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::narrowed_decimal; @@ -21,6 +22,7 @@ use crate::Excludes; pub fn compress_decimal( compressor: &BtrBlocksCompressor, decimal: &DecimalArray, + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let decimal = narrowed_decimal(decimal.clone()); let validity = decimal.validity(); @@ -36,6 +38,7 @@ pub fn compress_decimal( Canonical::Primitive(prim), CompressorContext::default(), Excludes::none(), + exec_ctx, )?; DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype()).map(|d| d.to_array()) diff --git a/vortex-btrblocks/src/compressor/float/mod.rs b/vortex-btrblocks/src/compressor/float/mod.rs index 159fbc8124f..29da19d60d7 100644 --- a/vortex-btrblocks/src/compressor/float/mod.rs +++ b/vortex-btrblocks/src/compressor/float/mod.rs @@ -14,6 +14,7 @@ use vortex_alp::RDEncoder; use vortex_alp::alp_encode; use vortex_array::ArrayRef; use vortex_array::Canonical; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::ConstantArray; @@ -186,8 +187,14 @@ impl rle::RLEConfig for FloatRLEConfig { values: &vortex_array::arrays::PrimitiveArray, ctx: CompressorContext, excludes: &[FloatCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - compressor.compress_canonical(Canonical::Primitive(values.clone()), ctx, excludes.into()) + compressor.compress_canonical( + Canonical::Primitive(values.clone()), + ctx, + excludes.into(), + exec_ctx, + ) } } @@ -208,6 +215,7 @@ impl Scheme for UncompressedScheme { _stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[FloatCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { Ok(1.0) } @@ -218,6 +226,7 @@ impl Scheme for UncompressedScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[FloatCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { Ok(stats.source().to_array()) } @@ -237,6 +246,7 @@ impl Scheme for ConstantScheme { stats: &Self::StatsType, ctx: CompressorContext, _excludes: &[FloatCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Never select Constant when sampling if ctx.is_sample { @@ -261,6 +271,7 @@ impl Scheme for ConstantScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[FloatCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false)); @@ -298,6 +309,7 @@ impl Scheme for ALPScheme { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[FloatCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // We don't support ALP for f16 if stats.source().ptype() == PType::F16 { @@ -310,7 +322,7 @@ impl Scheme for ALPScheme { return Ok(0.0); } - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } fn compress( @@ -319,6 +331,7 @@ impl Scheme for ALPScheme { stats: &FloatStats, ctx: CompressorContext, excludes: &[FloatCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let alp_encoded = alp_encode(&stats.source().to_primitive(), None)?; let alp = alp_encoded.as_::(); @@ -339,9 +352,13 @@ impl Scheme for ALPScheme { Canonical::Primitive(alp_ints), ctx.descend(), Excludes::int_only(&int_excludes), + exec_ctx, )?; - let patches = alp.patches().map(compress_patches).transpose()?; + let patches = alp + .patches() + .map(|p| compress_patches(p, exec_ctx)) + .transpose()?; Ok(ALPArray::new(compressed_alp_ints, alp.exponents(), patches).into_array()) } @@ -361,12 +378,13 @@ impl Scheme for ALPRDScheme { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[FloatCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { if stats.source().ptype() == PType::F16 { return Ok(0.0); } - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } fn compress( @@ -375,6 +393,7 @@ impl Scheme for ALPRDScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[FloatCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let encoder = match stats.source().ptype() { PType::F32 => RDEncoder::new(stats.source().as_slice::()), @@ -386,7 +405,7 @@ impl Scheme for ALPRDScheme { let patches = alp_rd .left_parts_patches() - .map(compress_patches) + .map(|p| compress_patches(p, exec_ctx)) .transpose()?; alp_rd.replace_left_parts_patches(patches); @@ -408,6 +427,7 @@ impl Scheme for DictScheme { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[FloatCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { if stats.value_count == 0 { return Ok(0.0); @@ -419,7 +439,7 @@ impl Scheme for DictScheme { } // Take a sample and run compression on the sample to determine before/after size. - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } fn compress( @@ -428,6 +448,7 @@ impl Scheme for DictScheme { stats: &Self::StatsType, ctx: CompressorContext, _excludes: &[Self::CodeType], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let dict = dictionary_encode(stats); let has_all_values_referenced = dict.has_all_values_referenced(); @@ -437,6 +458,7 @@ impl Scheme for DictScheme { Canonical::Primitive(codes.to_primitive()), ctx.descend(), Excludes::int_only(&[IntCode::Dict, IntCode::Sequence]), + exec_ctx, )?; assert!(values.is_canonical()); @@ -444,6 +466,7 @@ impl Scheme for DictScheme { Canonical::Primitive(values.to_primitive()), ctx.descend(), Excludes::from(&[FloatCode::Dict]), + exec_ctx, )?; // SAFETY: compressing codes or values does not alter the invariants @@ -471,6 +494,7 @@ impl Scheme for NullDominated { stats: &Self::StatsType, ctx: CompressorContext, _excludes: &[Self::CodeType], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Only use `SparseScheme` if we can cascade. if ctx.allowed_cascading == 0 { @@ -497,6 +521,7 @@ impl Scheme for NullDominated { stats: &Self::StatsType, ctx: CompressorContext, _excludes: &[Self::CodeType], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { assert!(ctx.allowed_cascading > 0); @@ -509,11 +534,12 @@ impl Scheme for NullDominated { // Don't attempt to compress the non-null values - let indices = sparse.patches().indices().to_primitive().narrow()?; + let indices = sparse.patches().indices().to_primitive().narrow(exec_ctx)?; let compressed_indices = compressor.compress_canonical( Canonical::Primitive(indices.to_primitive()), ctx.descend(), Excludes::int_only(&new_excludes), + exec_ctx, )?; SparseArray::try_new( @@ -544,6 +570,7 @@ impl Scheme for PcoScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[FloatCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { Ok(vortex_pco::PcoArray::from_primitive( stats.source(), @@ -561,7 +588,9 @@ mod tests { use vortex_array::Array; use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; use vortex_array::ToCanonical; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; use vortex_array::builders::ArrayBuilder; @@ -584,11 +613,13 @@ mod tests { fn test_empty() -> VortexResult<()> { // Make sure empty array compression does not fail let btr = BtrBlocksCompressor::default(); + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); let result = btr.float_compressor().compress( &btr, &PrimitiveArray::new(Buffer::::empty(), Validity::NonNullable), CompressorContext::default(), &[], + &mut exec_ctx, )?; assert!(result.is_empty()); @@ -608,9 +639,14 @@ mod tests { let floats = values.into_array().to_primitive(); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.float_compressor() - .compress(&btr, &floats, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.float_compressor().compress( + &btr, + &floats, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert_eq!(compressed.len(), 1024); let display = compressed @@ -632,8 +668,14 @@ mod tests { let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let stats = super::FloatStats::generate(&array); let btr = BtrBlocksCompressor::default(); - let compressed = - RLE_FLOAT_SCHEME.compress(&btr, &stats, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = RLE_FLOAT_SCHEME.compress( + &btr, + &stats, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; let decoded = compressed; let expected = Buffer::copy_from(&values).into_array(); @@ -654,9 +696,14 @@ mod tests { let floats = array.finish_into_primitive(); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.float_compressor() - .compress(&btr, &floats, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.float_compressor().compress( + &btr, + &floats, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert_eq!(compressed.len(), 96); let display = compressed @@ -674,6 +721,8 @@ mod tests { mod scheme_selection_tests { use vortex_alp::ALPVTable; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::ConstantVTable; use vortex_array::arrays::DictVTable; use vortex_array::arrays::PrimitiveArray; @@ -693,9 +742,14 @@ mod scheme_selection_tests { let values: Vec = vec![42.5; 100]; let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.float_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.float_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -705,9 +759,14 @@ mod scheme_selection_tests { let values: Vec = (0..1000).map(|i| (i as f64) * 0.01).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.float_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.float_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -720,9 +779,14 @@ mod scheme_selection_tests { .collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.float_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.float_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -736,9 +800,14 @@ mod scheme_selection_tests { builder.append_nulls(95); let array = builder.finish_into_primitive(); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.float_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.float_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; // Verify the compressed array preserves values. assert_eq!(compressed.len(), 100); Ok(()) diff --git a/vortex-btrblocks/src/compressor/integer/mod.rs b/vortex-btrblocks/src/compressor/integer/mod.rs index 58906b2ecf9..e0dcf2a0693 100644 --- a/vortex-btrblocks/src/compressor/integer/mod.rs +++ b/vortex-btrblocks/src/compressor/integer/mod.rs @@ -11,6 +11,7 @@ use enum_iterator::Sequence; pub use stats::IntegerStats; use vortex_array::ArrayRef; use vortex_array::Canonical; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::ConstantArray; @@ -217,8 +218,14 @@ impl rle::RLEConfig for IntRLEConfig { values: &PrimitiveArray, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - compressor.compress_canonical(Canonical::Primitive(values.clone()), ctx, excludes.into()) + compressor.compress_canonical( + Canonical::Primitive(values.clone()), + ctx, + excludes.into(), + exec_ctx, + ) } } @@ -239,6 +246,7 @@ impl Scheme for UncompressedScheme { _stats: &IntegerStats, _ctx: CompressorContext, _excludes: &[IntCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // no compression Ok(1.0) @@ -250,6 +258,7 @@ impl Scheme for UncompressedScheme { stats: &IntegerStats, _ctx: CompressorContext, _excludes: &[IntCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { Ok(stats.source().to_array()) } @@ -273,6 +282,7 @@ impl Scheme for ConstantScheme { stats: &IntegerStats, ctx: CompressorContext, _excludes: &[IntCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Never yield ConstantScheme for a sample, it could be a false-positive. if ctx.is_sample { @@ -293,6 +303,7 @@ impl Scheme for ConstantScheme { stats: &IntegerStats, _ctx: CompressorContext, _excludes: &[IntCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false)); @@ -330,6 +341,7 @@ impl Scheme for FORScheme { stats: &IntegerStats, ctx: CompressorContext, _excludes: &[IntCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Only apply if we are not at the leaf if ctx.allowed_cascading == 0 { @@ -383,6 +395,7 @@ impl Scheme for FORScheme { stats: &IntegerStats, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let for_array = FoRArray::encode(stats.src.clone())?; let biased = for_array.encoded().to_primitive(); @@ -402,7 +415,7 @@ impl Scheme for FORScheme { allowed_cascading: 0, }; let compressed = - BitPackingScheme.compress(compressor, &biased_stats, leaf_ctx, excludes)?; + BitPackingScheme.compress(compressor, &biased_stats, leaf_ctx, excludes, exec_ctx)?; let for_compressed = FoRArray::try_new(compressed, for_array.reference_scalar().clone())?; for_compressed @@ -427,6 +440,7 @@ impl Scheme for ZigZagScheme { stats: &IntegerStats, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // ZigZag is only useful when we cascade it with another encoding if ctx.allowed_cascading == 0 { @@ -444,7 +458,7 @@ impl Scheme for ZigZagScheme { } // Run compression on a sample to see how it performs. - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } fn compress( @@ -453,6 +467,7 @@ impl Scheme for ZigZagScheme { stats: &IntegerStats, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Zigzag encode the values, then recursively compress the inner values. let zag = zigzag_encode(stats.src.clone())?; @@ -472,6 +487,7 @@ impl Scheme for ZigZagScheme { Canonical::Primitive(encoded), ctx.descend(), Excludes::int_only(&new_excludes), + exec_ctx, )?; tracing::debug!("zigzag output: {}", compressed.encoding_id()); @@ -494,6 +510,7 @@ impl Scheme for BitPackingScheme { stats: &IntegerStats, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // BitPacking only works for non-negative values if stats.typed.min_is_negative() { @@ -505,7 +522,7 @@ impl Scheme for BitPackingScheme { return Ok(0.0); } - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } fn compress( @@ -514,6 +531,7 @@ impl Scheme for BitPackingScheme { stats: &IntegerStats, _ctx: CompressorContext, _excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let histogram = bit_width_histogram(stats.source())?; let bw = find_best_bit_width(stats.source().ptype(), &histogram)?; @@ -523,7 +541,10 @@ impl Scheme for BitPackingScheme { } let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?; - let patches = packed.patches().map(compress_patches).transpose()?; + let patches = packed + .patches() + .map(|p| compress_patches(p, exec_ctx)) + .transpose()?; packed.replace_patches(patches); Ok(packed.into_array()) @@ -545,6 +566,7 @@ impl Scheme for SparseScheme { stats: &IntegerStats, ctx: CompressorContext, _excludes: &[IntCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Only use `SparseScheme` if we can cascade. if ctx.allowed_cascading == 0 { @@ -584,6 +606,7 @@ impl Scheme for SparseScheme { stats: &IntegerStats, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { assert!(ctx.allowed_cascading > 0); let (top_pvalue, top_count) = stats.typed.top_value_and_count(); @@ -618,14 +641,16 @@ impl Scheme for SparseScheme { Canonical::Primitive(sparse.patches().values().to_primitive()), ctx.descend(), Excludes::int_only(&new_excludes), + exec_ctx, )?; - let indices = sparse.patches().indices().to_primitive().narrow()?; + let indices = sparse.patches().indices().to_primitive().narrow(exec_ctx)?; let compressed_indices = compressor.compress_canonical( Canonical::Primitive(indices), ctx.descend(), Excludes::int_only(&new_excludes), + exec_ctx, )?; SparseArray::try_new( @@ -655,6 +680,7 @@ impl Scheme for DictScheme { stats: &IntegerStats, ctx: CompressorContext, _excludes: &[IntCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Dict should not be terminal. if ctx.allowed_cascading == 0 { @@ -695,6 +721,7 @@ impl Scheme for DictScheme { stats: &IntegerStats, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { assert!(ctx.allowed_cascading > 0); @@ -709,9 +736,10 @@ impl Scheme for DictScheme { new_excludes.extend_from_slice(excludes); let compressed_codes = compressor.compress_canonical( - Canonical::Primitive(dict.codes().to_primitive().narrow()?), + Canonical::Primitive(dict.codes().to_primitive().narrow(exec_ctx)?), ctx.descend(), Excludes::int_only(&new_excludes), + exec_ctx, )?; // SAFETY: compressing codes does not change their values @@ -739,6 +767,7 @@ impl Scheme for RunEndScheme { stats: &IntegerStats, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // If the run length is below the threshold, drop it. if stats.average_run_length < RUN_END_THRESHOLD { @@ -750,7 +779,7 @@ impl Scheme for RunEndScheme { } // Run compression on a sample, see how it performs. - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } fn compress( @@ -759,11 +788,12 @@ impl Scheme for RunEndScheme { stats: &IntegerStats, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { assert!(ctx.allowed_cascading > 0); // run-end encode the ends - let (ends, values) = runend_encode(&stats.src); + let (ends, values) = runend_encode(&stats.src, exec_ctx); let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()]; new_excludes.extend_from_slice(excludes); @@ -772,12 +802,14 @@ impl Scheme for RunEndScheme { Canonical::Primitive(ends.to_primitive()), ctx.descend(), Excludes::int_only(&new_excludes), + exec_ctx, )?; let compressed_values = compressor.compress_canonical( Canonical::Primitive(values.to_primitive()), ctx.descend(), Excludes::int_only(&new_excludes), + exec_ctx, )?; // SAFETY: compression doesn't affect invariants @@ -804,6 +836,7 @@ impl Scheme for SequenceScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[Self::CodeType], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { if stats.null_count > 0 { return Ok(0.0); @@ -830,6 +863,7 @@ impl Scheme for SequenceScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[Self::CodeType], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { if stats.null_count > 0 { vortex_bail!("sequence encoding does not support nulls"); @@ -853,6 +887,7 @@ impl Scheme for PcoScheme { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[IntCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Pco does not support I8 or U8. if matches!( @@ -862,7 +897,7 @@ impl Scheme for PcoScheme { return Ok(0.0); } - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } fn compress( @@ -871,6 +906,7 @@ impl Scheme for PcoScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[IntCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { Ok(vortex_pco::PcoArray::from_primitive( stats.source(), @@ -891,7 +927,9 @@ mod tests { use rand::rngs::StdRng; use vortex_array::Array; use vortex_array::IntoArray; + use vortex_array::LEGACY_SESSION; use vortex_array::ToCanonical; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::DictVTable; use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; @@ -918,11 +956,13 @@ mod tests { fn test_empty() -> VortexResult<()> { // Make sure empty array compression does not fail let btr = BtrBlocksCompressor::default(); + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); let result = btr.integer_compressor().compress( &btr, &PrimitiveArray::new(Buffer::::empty(), Validity::NonNullable), CompressorContext::default(), &[], + &mut exec_ctx, )?; assert!(result.is_empty()); @@ -951,11 +991,13 @@ mod tests { let primitive = codes.freeze().into_array().to_primitive(); let btr = BtrBlocksCompressor::default(); + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); let compressed = btr.integer_compressor().compress( &btr, &primitive, CompressorContext::default(), &[], + &mut exec_ctx, )?; assert!(compressed.is::()); Ok(()) @@ -968,11 +1010,13 @@ mod tests { Validity::from_iter(vec![true, true, true, true, false]), ); let btr = BtrBlocksCompressor::default(); + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); let compressed = SparseScheme.compress( &btr, &IntegerStats::generate(&array), CompressorContext::default(), &[], + &mut exec_ctx, )?; assert!(compressed.is::()); let decoded = compressed.clone(); @@ -992,11 +1036,13 @@ mod tests { ]), ); let btr = BtrBlocksCompressor::default(); + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); let compressed = SparseScheme.compress( &btr, &IntegerStats::generate(&array), CompressorContext::default(), &[], + &mut exec_ctx, )?; assert!(compressed.is::()); let decoded = compressed.clone(); @@ -1014,11 +1060,13 @@ mod tests { let values = (0i32..20).step_by(7).collect_vec(); let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some)); let btr = BtrBlocksCompressor::default(); + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); let compressed = SequenceScheme.compress( &btr, &IntegerStats::generate(&array), CompressorContext::default(), &[], + &mut exec_ctx, )?; assert!(compressed.is::()); let decoded = compressed; @@ -1036,11 +1084,13 @@ mod tests { let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); let compressed = RLE_INTEGER_SCHEME.compress( &btr, &IntegerStats::generate(&array), CompressorContext::default(), &[], + &mut exec_ctx, )?; let decoded = compressed; @@ -1074,6 +1124,8 @@ mod tests { mod scheme_selection_tests { use std::iter; + use vortex_array::LEGACY_SESSION; + use vortex_array::VortexSessionExecute; use vortex_array::arrays::ConstantVTable; use vortex_array::arrays::DictVTable; use vortex_array::arrays::PrimitiveArray; @@ -1096,9 +1148,14 @@ mod scheme_selection_tests { let values: Vec = iter::repeat_n(42, 100).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.integer_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.integer_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -1108,9 +1165,14 @@ mod scheme_selection_tests { let values: Vec = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.integer_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.integer_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -1120,9 +1182,14 @@ mod scheme_selection_tests { let values: Vec = (0..1000).map(|i| i % 16).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.integer_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.integer_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -1139,9 +1206,14 @@ mod scheme_selection_tests { } let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.integer_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.integer_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -1169,9 +1241,14 @@ mod scheme_selection_tests { let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.integer_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.integer_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -1184,9 +1261,14 @@ mod scheme_selection_tests { } let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.integer_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.integer_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -1196,9 +1278,14 @@ mod scheme_selection_tests { let values: Vec = (0..1000).map(|i| i * 7).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.integer_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.integer_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } @@ -1211,9 +1298,14 @@ mod scheme_selection_tests { } let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); let btr = BtrBlocksCompressor::default(); - let compressed = - btr.integer_compressor() - .compress(&btr, &array, CompressorContext::default(), &[])?; + let mut exec_ctx = LEGACY_SESSION.create_execution_ctx(); + let compressed = btr.integer_compressor().compress( + &btr, + &array, + CompressorContext::default(), + &[], + &mut exec_ctx, + )?; assert!(compressed.is::()); Ok(()) } diff --git a/vortex-btrblocks/src/compressor/mod.rs b/vortex-btrblocks/src/compressor/mod.rs index af59ad41bda..e17a003a667 100644 --- a/vortex-btrblocks/src/compressor/mod.rs +++ b/vortex-btrblocks/src/compressor/mod.rs @@ -4,6 +4,7 @@ //! Compressor traits for type-specific compression. use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::arrays::ConstantArray; use vortex_array::scalar::Scalar; @@ -69,6 +70,7 @@ where stats: &Self::StatsType, ctx: CompressorContext, excludes: &[::CodeType], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult<&'static Self::SchemeType> { let mut best_ratio = 1.0; let mut best_scheme: Option<&'static Self::SchemeType> = None; @@ -95,7 +97,8 @@ where "Trying compression scheme" ); - let ratio = scheme.expected_compression_ratio(compressor, stats, ctx, excludes)?; + let ratio = + scheme.expected_compression_ratio(compressor, stats, ctx, excludes, exec_ctx)?; tracing::trace!( is_sample = ctx.is_sample, depth, @@ -135,6 +138,7 @@ where array: &<::ArrayVTable as VTable>::Array, ctx: CompressorContext, excludes: &[::CodeType], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Avoid compressing empty arrays. if array.is_empty() { @@ -150,9 +154,11 @@ where // Generate stats on the array directly. let stats = self.gen_stats(array); - let best_scheme = self.choose_scheme(btr_blocks_compressor, &stats, ctx, excludes)?; + let best_scheme = + self.choose_scheme(btr_blocks_compressor, &stats, ctx, excludes, exec_ctx)?; - let output = best_scheme.compress(btr_blocks_compressor, &stats, ctx, excludes)?; + let output = + best_scheme.compress(btr_blocks_compressor, &stats, ctx, excludes, exec_ctx)?; if output.nbytes() < array.nbytes() { Ok(output) } else { diff --git a/vortex-btrblocks/src/compressor/patches.rs b/vortex-btrblocks/src/compressor/patches.rs index 9890ab7bd07..bc602a41914 100644 --- a/vortex-btrblocks/src/compressor/patches.rs +++ b/vortex-btrblocks/src/compressor/patches.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_array::Array; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::ConstantArray; @@ -9,9 +10,13 @@ use vortex_array::patches::Patches; use vortex_error::VortexResult; /// Compresses the given patches by downscaling integers and checking for constant values. -pub fn compress_patches(patches: &Patches) -> VortexResult { +pub fn compress_patches(patches: &Patches, exec_ctx: &mut ExecutionCtx) -> VortexResult { // Downscale the patch indices. - let indices = patches.indices().to_primitive().narrow()?.into_array(); + let indices = patches + .indices() + .to_primitive() + .narrow(exec_ctx)? + .into_array(); // Check if the values are constant. let values = patches.values(); diff --git a/vortex-btrblocks/src/compressor/rle.rs b/vortex-btrblocks/src/compressor/rle.rs index c0cb20780bb..469b5870182 100644 --- a/vortex-btrblocks/src/compressor/rle.rs +++ b/vortex-btrblocks/src/compressor/rle.rs @@ -7,6 +7,7 @@ use std::marker::PhantomData; use vortex_array::ArrayRef; use vortex_array::Canonical; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::PrimitiveArray; @@ -51,6 +52,7 @@ pub trait RLEConfig: Debug + Send + Sync + 'static { values: &PrimitiveArray, ctx: CompressorContext, excludes: &[Self::Code], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult; } @@ -87,6 +89,7 @@ impl Scheme for RLEScheme { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[C::Code], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // RLE is only useful when we cascade it with another encoding. if ctx.allowed_cascading == 0 { @@ -104,7 +107,7 @@ impl Scheme for RLEScheme { } // Run compression on a sample to see how it performs. - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } fn compress( @@ -113,6 +116,7 @@ impl Scheme for RLEScheme { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[C::Code], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let rle_array = RLEArray::encode(RLEStats::source(stats))?; @@ -129,18 +133,26 @@ impl Scheme for RLEScheme { &rle_array.values().to_primitive(), ctx.descend(), &new_excludes, + exec_ctx, )?; let compressed_indices = compressor.compress_canonical( - Canonical::Primitive(rle_array.indices().to_primitive().narrow()?), + Canonical::Primitive(rle_array.indices().to_primitive().narrow(exec_ctx)?), ctx.descend(), Excludes::from(&[IntCode::Dict]), + exec_ctx, )?; let compressed_offsets = compressor.compress_canonical( - Canonical::Primitive(rle_array.values_idx_offsets().to_primitive().narrow()?), + Canonical::Primitive( + rle_array + .values_idx_offsets() + .to_primitive() + .narrow(exec_ctx)?, + ), ctx.descend(), Excludes::from(&[IntCode::Dict]), + exec_ctx, )?; // SAFETY: Recursive compression doesn't affect the invariants. diff --git a/vortex-btrblocks/src/compressor/string.rs b/vortex-btrblocks/src/compressor/string.rs index c81001bd6dd..01d080f4fd5 100644 --- a/vortex-btrblocks/src/compressor/string.rs +++ b/vortex-btrblocks/src/compressor/string.rs @@ -7,6 +7,7 @@ use std::hash::Hasher; use enum_iterator::Sequence; use vortex_array::ArrayRef; use vortex_array::Canonical; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::ConstantArray; @@ -256,6 +257,7 @@ impl Scheme for UncompressedScheme { _stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[StringCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { Ok(1.0) } @@ -266,6 +268,7 @@ impl Scheme for UncompressedScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[StringCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { Ok(stats.source().to_array()) } @@ -285,6 +288,7 @@ impl Scheme for DictScheme { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[StringCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // If we don't have a sufficiently high number of distinct values, do not attempt Dict. if stats.estimated_distinct_count > stats.value_count / 2 { @@ -296,7 +300,7 @@ impl Scheme for DictScheme { return Ok(0.0); } - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } fn compress( @@ -305,8 +309,9 @@ impl Scheme for DictScheme { stats: &Self::StatsType, ctx: CompressorContext, _excludes: &[StringCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - let dict = dict_encode(&stats.source().clone().into_array())?; + let dict = dict_encode(&stats.source().clone().into_array(), exec_ctx)?; // If we are not allowed to cascade, do not attempt codes or values compression. if ctx.allowed_cascading == 0 { @@ -318,6 +323,7 @@ impl Scheme for DictScheme { Canonical::Primitive(dict.codes().to_primitive()), ctx.descend(), Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]), + exec_ctx, )?; // Attempt to compress the values with non-Dict compression. @@ -326,6 +332,7 @@ impl Scheme for DictScheme { Canonical::VarBinView(dict.values().to_varbinview()), ctx.descend(), Excludes::from(&[DictScheme.code()]), + exec_ctx, )?; // SAFETY: compressing codes or values does not alter the invariants @@ -353,6 +360,7 @@ impl Scheme for FSSTScheme { stats: &Self::StatsType, ctx: CompressorContext, _excludes: &[StringCode], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let fsst = { let compressor = fsst_train_compressor(&stats.src); @@ -360,15 +368,21 @@ impl Scheme for FSSTScheme { }; let compressed_original_lengths = compressor.compress_canonical( - Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?), + Canonical::Primitive( + fsst.uncompressed_lengths() + .to_primitive() + .narrow(exec_ctx)?, + ), ctx, Excludes::none(), + exec_ctx, )?; let compressed_codes_offsets = compressor.compress_canonical( - Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?), + Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow(exec_ctx)?), ctx, Excludes::none(), + exec_ctx, )?; let compressed_codes = VarBinArray::try_new( compressed_codes_offsets, @@ -407,6 +421,7 @@ impl Scheme for ConstantScheme { stats: &Self::StatsType, ctx: CompressorContext, _excludes: &[Self::CodeType], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { if ctx.is_sample { return Ok(0.0); @@ -428,6 +443,7 @@ impl Scheme for ConstantScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[Self::CodeType], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let scalar_idx = (0..stats.source().len()).position(|idx| stats.source().is_valid(idx).unwrap_or(false)); @@ -465,6 +481,7 @@ impl Scheme for NullDominated { stats: &Self::StatsType, ctx: CompressorContext, _excludes: &[Self::CodeType], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { // Only use `SparseScheme` if we can cascade. if ctx.allowed_cascading == 0 { @@ -491,6 +508,7 @@ impl Scheme for NullDominated { stats: &Self::StatsType, ctx: CompressorContext, _excludes: &[Self::CodeType], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { assert!(ctx.allowed_cascading > 0); @@ -501,11 +519,12 @@ impl Scheme for NullDominated { // Compress the indices only (not the values for strings) let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict]; - let indices = sparse.patches().indices().to_primitive().narrow()?; + let indices = sparse.patches().indices().to_primitive().narrow(exec_ctx)?; let compressed_indices = compressor.compress_canonical( Canonical::Primitive(indices), ctx.descend(), Excludes::int_only(&new_excludes), + exec_ctx, )?; SparseArray::try_new( @@ -536,6 +555,7 @@ impl Scheme for ZstdScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[StringCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let compacted = stats.source().compact_buffers()?; Ok( @@ -560,6 +580,7 @@ impl Scheme for ZstdBuffersScheme { stats: &Self::StatsType, _ctx: CompressorContext, _excludes: &[StringCode], + _exec_ctx: &mut ExecutionCtx, ) -> VortexResult { Ok(vortex_zstd::ZstdBuffersArray::compress(&stats.source().to_array(), 3)?.into_array()) } diff --git a/vortex-btrblocks/src/compressor/temporal.rs b/vortex-btrblocks/src/compressor/temporal.rs index 6fb917be58d..f73a5b57f11 100644 --- a/vortex-btrblocks/src/compressor/temporal.rs +++ b/vortex-btrblocks/src/compressor/temporal.rs @@ -5,6 +5,7 @@ use vortex_array::ArrayRef; use vortex_array::Canonical; +use vortex_array::ExecutionCtx; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::TemporalArray; @@ -22,6 +23,7 @@ use crate::Excludes; pub fn compress_temporal( compressor: &BtrBlocksCompressor, array: TemporalArray, + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let dtype = array.dtype().clone(); let TemporalParts { @@ -33,19 +35,22 @@ pub fn compress_temporal( let ctx = CompressorContext::default().descend(); let days = compressor.compress_canonical( - Canonical::Primitive(days.to_primitive().narrow()?), + Canonical::Primitive(days.to_primitive().narrow(exec_ctx)?), ctx, Excludes::none(), + exec_ctx, )?; let seconds = compressor.compress_canonical( - Canonical::Primitive(seconds.to_primitive().narrow()?), + Canonical::Primitive(seconds.to_primitive().narrow(exec_ctx)?), ctx, Excludes::none(), + exec_ctx, )?; let subseconds = compressor.compress_canonical( - Canonical::Primitive(subseconds.to_primitive().narrow()?), + Canonical::Primitive(subseconds.to_primitive().narrow(exec_ctx)?), ctx, Excludes::none(), + exec_ctx, )?; Ok(DateTimePartsArray::try_new(dtype, days, seconds, subseconds)?.into_array()) diff --git a/vortex-btrblocks/src/scheme.rs b/vortex-btrblocks/src/scheme.rs index 63cb4bad28a..1bd6bbf690b 100644 --- a/vortex-btrblocks/src/scheme.rs +++ b/vortex-btrblocks/src/scheme.rs @@ -8,6 +8,7 @@ use std::hash::Hash; use std::hash::Hasher; use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; use vortex_error::VortexResult; use crate::BtrBlocksCompressor; @@ -46,8 +47,9 @@ pub trait Scheme: Debug { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[Self::CodeType], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { - self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes, exec_ctx) } /// Compress the input with this scheme, yielding a new array. @@ -57,6 +59,7 @@ pub trait Scheme: Debug { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[Self::CodeType], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult; } @@ -84,6 +87,7 @@ pub trait SchemeExt: Scheme { stats: &Self::StatsType, ctx: CompressorContext, excludes: &[Self::CodeType], + exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let sample = if ctx.is_sample { stats.clone() @@ -101,7 +105,13 @@ pub trait SchemeExt: Scheme { }; let after = self - .compress(btr_blocks_compressor, &sample, ctx.as_sample(), excludes)? + .compress( + btr_blocks_compressor, + &sample, + ctx.as_sample(), + excludes, + exec_ctx, + )? .nbytes(); let before = sample.source().nbytes(); diff --git a/vortex-cuda/gpu-scan-cli/Cargo.toml b/vortex-cuda/gpu-scan-cli/Cargo.toml index 9bdf7fc3478..2ec84c64004 100644 --- a/vortex-cuda/gpu-scan-cli/Cargo.toml +++ b/vortex-cuda/gpu-scan-cli/Cargo.toml @@ -21,6 +21,6 @@ tokio = { workspace = true, features = ["macros", "full"] } tracing = { workspace = true, features = ["std", "attributes"] } tracing-perfetto = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } -vortex = { workspace = true } +vortex = { workspace = true, features = ["tokio"] } vortex-cuda = { workspace = true, features = ["_test-harness"] } vortex-cuda-macros = { workspace = true } diff --git a/vortex/benches/common_encoding_tree_throughput.rs b/vortex/benches/common_encoding_tree_throughput.rs index ee047c82653..4c3ae2d1fed 100644 --- a/vortex/benches/common_encoding_tree_throughput.rs +++ b/vortex/benches/common_encoding_tree_throughput.rs @@ -16,7 +16,9 @@ use rand::SeedableRng; use vortex::array::Array; use vortex::array::ArrayRef; use vortex::array::IntoArray; +use vortex::array::LEGACY_SESSION; use vortex::array::ToCanonical; +use vortex::array::VortexSessionExecute; use vortex::array::arrays::DictArray; use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::TemporalArray; @@ -166,7 +168,11 @@ mod setup { } let prim_array = PrimitiveArray::from_iter(values); - let runend = RunEndArray::encode(prim_array.into_array()).unwrap(); + let runend = RunEndArray::encode( + prim_array.into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); // Compress the ends with FoR <- BitPacked let ends_prim = runend.ends().to_primitive(); diff --git a/vortex/benches/single_encoding_throughput.rs b/vortex/benches/single_encoding_throughput.rs index 945496707a9..1f139df8879 100644 --- a/vortex/benches/single_encoding_throughput.rs +++ b/vortex/benches/single_encoding_throughput.rs @@ -13,7 +13,9 @@ use rand::SeedableRng; use rand::prelude::IndexedRandom; use rand::rngs::StdRng; use vortex::array::IntoArray; +use vortex::array::LEGACY_SESSION; use vortex::array::ToCanonical; +use vortex::array::VortexSessionExecute; use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::VarBinViewArray; use vortex::array::builders::dict::dict_encode; @@ -117,13 +119,19 @@ fn bench_runend_compress_u32(bencher: Bencher) { with_byte_counter(bencher, NUM_VALUES * 4) .with_inputs(|| uint_array.clone()) - .bench_values(|a| RunEndArray::encode(a.into_array()).unwrap()); + .bench_values(|a| { + RunEndArray::encode(a.into_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap() + }); } #[divan::bench(name = "runend_decompress_u32")] fn bench_runend_decompress_u32(bencher: Bencher) { let (uint_array, ..) = setup_primitive_arrays(); - let compressed = RunEndArray::encode(uint_array.into_array()).unwrap(); + let compressed = RunEndArray::encode( + uint_array.into_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); with_byte_counter(bencher, NUM_VALUES * 4) .with_inputs(|| &compressed) @@ -180,13 +188,19 @@ fn bench_dict_compress_u32(bencher: Bencher) { with_byte_counter(bencher, NUM_VALUES * 4) .with_inputs(|| &uint_array) - .bench_refs(|a| dict_encode(&a.to_array()).unwrap()); + .bench_refs(|a| { + dict_encode(&a.to_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap() + }); } #[divan::bench(name = "dict_decompress_u32")] fn bench_dict_decompress_u32(bencher: Bencher) { let (uint_array, ..) = setup_primitive_arrays(); - let compressed = dict_encode(&uint_array.to_array()).unwrap(); + let compressed = dict_encode( + &uint_array.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); with_byte_counter(bencher, NUM_VALUES * 4) .with_inputs(|| &compressed) @@ -303,13 +317,19 @@ fn bench_dict_compress_string(bencher: Bencher) { with_byte_counter(bencher, nbytes) .with_inputs(|| &varbinview_arr) - .bench_refs(|a| dict_encode(&a.to_array()).unwrap()); + .bench_refs(|a| { + dict_encode(&a.to_array(), &mut LEGACY_SESSION.create_execution_ctx()).unwrap() + }); } #[divan::bench(name = "dict_decompress_string")] fn bench_dict_decompress_string(bencher: Bencher) { let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(1_000_000, 0.00005)); - let dict = dict_encode(&varbinview_arr.to_array()).unwrap(); + let dict = dict_encode( + &varbinview_arr.to_array(), + &mut LEGACY_SESSION.create_execution_ctx(), + ) + .unwrap(); let nbytes = varbinview_arr.into_array().nbytes() as u64; with_byte_counter(bencher, nbytes)