Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
920 changes: 902 additions & 18 deletions vortex-array/public-api.lock

Large diffs are not rendered by default.

21 changes: 13 additions & 8 deletions vortex-array/src/aggregate_fn/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,19 @@ impl<V: AggregateFnVTable> DynAccumulator for Accumulator<V> {
if let Some(stat) = Stat::from_aggregate_fn(&self.aggregate_fn)
&& let Some(Precision::Exact(partial)) = batch.statistics().get(stat)
{
vortex_ensure!(
partial.dtype() == &self.partial_dtype,
"Aggregate {} read legacy stat {} with dtype {}, expected {}",
self.aggregate_fn,
stat,
partial.dtype(),
self.partial_dtype,
);
let partial = if partial.dtype() == &self.partial_dtype {
partial
} else {
vortex_ensure!(
partial.dtype().eq_ignore_nullability(&self.partial_dtype),
"Aggregate {} read legacy stat {} with dtype {}, expected {}",
self.aggregate_fn,
stat,
partial.dtype(),
self.partial_dtype,
);
partial.cast(&self.partial_dtype)?
};
self.vtable.combine_partials(&mut self.partial, partial)?;
return Ok(());
}
Expand Down
6 changes: 6 additions & 0 deletions vortex-array/src/aggregate_fn/erased.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use vortex_utils::debug_with::DebugWith;

use crate::aggregate_fn::AccumulatorRef;
use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnSatisfaction;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::GroupedAccumulatorRef;
use crate::aggregate_fn::options::AggregateFnOptions;
Expand Down Expand Up @@ -74,6 +75,11 @@ impl AggregateFnRef {
AggregateFnOptions { inner: &*self.0 }
}

/// Return whether this stored aggregate can satisfy `requested`.
pub fn can_satisfy(&self, requested: &AggregateFnRef) -> AggregateFnSatisfaction {
self.0.can_satisfy(requested)
}

/// Coerce the input type for this aggregate function.
pub fn coerce_args(&self, input_dtype: &DType) -> VortexResult<DType> {
self.0.coerce_args(input_dtype)
Expand Down
192 changes: 192 additions & 0 deletions vortex-array/src/aggregate_fn/fns/all_nan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_error::VortexResult;

use crate::ArrayRef;
use crate::Columnar;
use crate::ExecutionCtx;
use crate::IntoArray;
use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::fns::nan_count::nan_count;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::scalar::Scalar;

/// Compute whether every value in an array is NaN.
///
/// Like other `all` aggregates, this is vacuously true for empty input.
///
/// This is a pruning aggregate, not just a convenience wrapper around
/// [`NanCount`][crate::aggregate_fn::fns::nan_count::NanCount]. Pruning aggregates must prove a
/// row-wise fact for every value in the scope, so their partials remain valid when a stats column is
/// sliced or concatenated alongside the data. [`NanCount`][crate::aggregate_fn::fns::nan_count::NanCount]
/// carries cross-row count information instead, so it is useful as a legacy storage format but not
/// as the pruning expression itself.
#[derive(Clone, Debug)]
pub struct AllNan;

impl AggregateFnVTable for AllNan {
type Options = EmptyOptions;
type Partial = bool;

fn id(&self) -> AggregateFnId {
AggregateFnId::new("vortex.all_nan")
}

fn serialize(&self, _options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
Ok(None)
}

fn return_dtype(&self, _options: &Self::Options, input_dtype: &DType) -> Option<DType> {
matches!(input_dtype, DType::Primitive(ptype, _) if ptype.is_float())
.then_some(DType::Bool(Nullability::Nullable))
}

fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
self.return_dtype(options, input_dtype)
}

fn empty_partial(
&self,
_options: &Self::Options,
_input_dtype: &DType,
) -> VortexResult<Self::Partial> {
Ok(true)
}

fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
*partial &= bool::try_from(&other)?;
Ok(())
}

fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
Ok(Scalar::bool(*partial, Nullability::Nullable))
}

fn reset(&self, partial: &mut Self::Partial) {
*partial = true;
}

fn is_saturated(&self, partial: &Self::Partial) -> bool {
!*partial
}

fn try_accumulate(
&self,
state: &mut Self::Partial,
batch: &ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<bool> {
if !matches!(batch.dtype(), DType::Primitive(ptype, _) if ptype.is_float()) {
*state = false;
return Ok(true);
}

*state &= nan_count(batch, ctx)? == batch.len();
Ok(true)
}

fn accumulate(
&self,
partial: &mut Self::Partial,
batch: &Columnar,
ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
// Normal array dispatch is handled by `try_accumulate`, which always short-circuits.
// Keep this fallback in sync for direct Columnar accumulation paths.
let array = match batch {
Columnar::Constant(c) => c.clone().into_array(),
Columnar::Canonical(c) => c.clone().into_array(),
};
if !matches!(array.dtype(), DType::Primitive(ptype, _) if ptype.is_float()) {
*partial = false;
return Ok(());
}

*partial &= nan_count(&array, ctx)? == array.len();
Ok(())
}

fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
Ok(partials)
}

fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
self.to_scalar(partial)
}
}

#[cfg(test)]
mod tests {
use vortex_error::VortexResult;

use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::aggregate_fn::Accumulator;
use crate::aggregate_fn::DynAccumulator;
use crate::aggregate_fn::EmptyOptions;
use crate::aggregate_fn::fns::all_nan::AllNan;
use crate::arrays::PrimitiveArray;
use crate::dtype::DType;
use crate::dtype::Nullability;
use crate::dtype::PType;

#[test]
fn all_nan_aggregate_fn() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let dtype = DType::Primitive(PType::F32, Nullability::Nullable);
let mut acc = Accumulator::try_new(AllNan, EmptyOptions, dtype)?;

let batch = PrimitiveArray::from_option_iter([Some(f32::NAN), Some(f32::NAN)]).into_array();
acc.accumulate(&batch, &mut ctx)?;

assert!(bool::try_from(&acc.finish()?)?);
Ok(())
}

#[test]
fn all_nan_false_with_non_nan() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let dtype = DType::Primitive(PType::F32, Nullability::Nullable);
let mut acc = Accumulator::try_new(AllNan, EmptyOptions, dtype)?;

let batch = PrimitiveArray::from_option_iter([Some(f32::NAN), Some(1.0f32)]).into_array();
acc.accumulate(&batch, &mut ctx)?;

assert!(!bool::try_from(&acc.finish()?)?);
Ok(())
}

#[test]
fn all_nan_unsupported_for_non_float_values() -> VortexResult<()> {
let dtype = DType::Primitive(PType::I32, Nullability::Nullable);
assert!(Accumulator::try_new(AllNan, EmptyOptions, dtype).is_err());
Ok(())
}

#[test]
fn all_nan_false_with_null() -> VortexResult<()> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let dtype = DType::Primitive(PType::F32, Nullability::Nullable);
let mut acc = Accumulator::try_new(AllNan, EmptyOptions, dtype)?;

let batch = PrimitiveArray::from_option_iter([Some(f32::NAN), None]).into_array();
acc.accumulate(&batch, &mut ctx)?;

assert!(!bool::try_from(&acc.finish()?)?);
Ok(())
}

#[test]
fn all_nan_true_for_empty_float_values() -> VortexResult<()> {
let dtype = DType::Primitive(PType::F32, Nullability::Nullable);
let mut acc = Accumulator::try_new(AllNan, EmptyOptions, dtype)?;

assert!(bool::try_from(&acc.finish()?)?);
Ok(())
}
}
3 changes: 3 additions & 0 deletions vortex-array/src/aggregate_fn/fns/all_non_distinct/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::validity::Validity;
/// Returns `true` if and only if:
/// - Both arrays have the same dtype and length
/// - At every position, both are null or both are non-null with the same value
/// - The arrays are empty, vacuously
///
/// This is a fused `bool_all(non_distinct(lhs, rhs))` aggregate that allows early
/// termination via accumulator saturation as soon as a mismatch is found.
Expand Down Expand Up @@ -102,6 +103,8 @@ static NAMES: LazyLock<FieldNames> = LazyLock::new(|| FieldNames::from(["lhs", "
/// as the first distinct pair is found, the accumulator is saturated and remaining batches
/// are skipped.
///
/// Like other `all` aggregates, this is vacuously true for empty input.
///
/// The input is a `Struct{lhs: T, rhs: T}` and the result is `Bool(NonNullable)`.
#[derive(Clone, Debug)]
pub struct AllNonDistinct;
Expand Down
Loading
Loading