Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,178 @@ impl<T: vortex_array::dtype::NativePType> vortex_array::accessor::ArrayAccessor<

pub fn vortex_array::arrays::PrimitiveArray::with_iterator<F, R>(&self, f: F) -> R where F: for<'a> core::ops::function::FnOnce(&mut dyn core::iter::traits::iterator::Iterator<Item = core::option::Option<&'a T>>) -> R

pub mod vortex_array::aggregate_fn

pub mod vortex_array::aggregate_fn::fns

pub mod vortex_array::aggregate_fn::session

pub struct vortex_array::aggregate_fn::session::AggregateFnSession

impl vortex_array::aggregate_fn::session::AggregateFnSession

pub fn vortex_array::aggregate_fn::session::AggregateFnSession::register<V: vortex_array::aggregate_fn::AggregateFnVTable>(&self, vtable: V)

pub fn vortex_array::aggregate_fn::session::AggregateFnSession::registry(&self) -> &vortex_array::aggregate_fn::session::AggregateFnRegistry

impl core::default::Default for vortex_array::aggregate_fn::session::AggregateFnSession

pub fn vortex_array::aggregate_fn::session::AggregateFnSession::default() -> vortex_array::aggregate_fn::session::AggregateFnSession

impl core::fmt::Debug for vortex_array::aggregate_fn::session::AggregateFnSession

pub fn vortex_array::aggregate_fn::session::AggregateFnSession::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub trait vortex_array::aggregate_fn::session::AggregateFnSessionExt: vortex_session::SessionExt

pub fn vortex_array::aggregate_fn::session::AggregateFnSessionExt::aggregate_fns(&self) -> vortex_session::Ref<'_, vortex_array::aggregate_fn::session::AggregateFnSession>

impl<S: vortex_session::SessionExt> vortex_array::aggregate_fn::session::AggregateFnSessionExt for S

pub fn S::aggregate_fns(&self) -> vortex_session::Ref<'_, vortex_array::aggregate_fn::session::AggregateFnSession>

pub type vortex_array::aggregate_fn::session::AggregateFnRegistry = vortex_session::registry::Registry<vortex_array::aggregate_fn::AggregateFnPluginRef>

pub struct vortex_array::aggregate_fn::AggregateFn<V: vortex_array::aggregate_fn::AggregateFnVTable>(_)

impl<V: vortex_array::aggregate_fn::AggregateFnVTable> vortex_array::aggregate_fn::AggregateFn<V>

pub fn vortex_array::aggregate_fn::AggregateFn<V>::erased(self) -> vortex_array::aggregate_fn::AggregateFnRef

pub fn vortex_array::aggregate_fn::AggregateFn<V>::new(vtable: V, options: <V as vortex_array::aggregate_fn::AggregateFnVTable>::Options) -> Self

pub fn vortex_array::aggregate_fn::AggregateFn<V>::options(&self) -> &<V as vortex_array::aggregate_fn::AggregateFnVTable>::Options

pub fn vortex_array::aggregate_fn::AggregateFn<V>::vtable(&self) -> &V

pub struct vortex_array::aggregate_fn::AggregateFnOptions<'a>

impl vortex_array::aggregate_fn::AggregateFnOptions<'_>

pub fn vortex_array::aggregate_fn::AggregateFnOptions<'_>::serialize(&self) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

impl<'a> vortex_array::aggregate_fn::AggregateFnOptions<'a>

pub fn vortex_array::aggregate_fn::AggregateFnOptions<'a>::as_any(&self) -> &'a dyn core::any::Any

impl core::cmp::Eq for vortex_array::aggregate_fn::AggregateFnOptions<'_>

impl core::cmp::PartialEq for vortex_array::aggregate_fn::AggregateFnOptions<'_>

pub fn vortex_array::aggregate_fn::AggregateFnOptions<'_>::eq(&self, other: &Self) -> bool

impl core::fmt::Debug for vortex_array::aggregate_fn::AggregateFnOptions<'_>

pub fn vortex_array::aggregate_fn::AggregateFnOptions<'_>::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::fmt::Display for vortex_array::aggregate_fn::AggregateFnOptions<'_>

pub fn vortex_array::aggregate_fn::AggregateFnOptions<'_>::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::hash::Hash for vortex_array::aggregate_fn::AggregateFnOptions<'_>

pub fn vortex_array::aggregate_fn::AggregateFnOptions<'_>::hash<H: core::hash::Hasher>(&self, state: &mut H)

pub struct vortex_array::aggregate_fn::AggregateFnRef(_)

impl vortex_array::aggregate_fn::AggregateFnRef

pub fn vortex_array::aggregate_fn::AggregateFnRef::accumulator(&self, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::aggregate_fn::Accumulator>>

pub fn vortex_array::aggregate_fn::AggregateFnRef::as_<V: vortex_array::aggregate_fn::AggregateFnVTable>(&self) -> &<V as vortex_array::aggregate_fn::AggregateFnVTable>::Options

pub fn vortex_array::aggregate_fn::AggregateFnRef::as_opt<V: vortex_array::aggregate_fn::AggregateFnVTable>(&self) -> core::option::Option<&<V as vortex_array::aggregate_fn::AggregateFnVTable>::Options>

pub fn vortex_array::aggregate_fn::AggregateFnRef::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::AggregateFnRef::is<V: vortex_array::aggregate_fn::AggregateFnVTable>(&self) -> bool

pub fn vortex_array::aggregate_fn::AggregateFnRef::options(&self) -> vortex_array::aggregate_fn::AggregateFnOptions<'_>

pub fn vortex_array::aggregate_fn::AggregateFnRef::return_dtype(&self, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::AggregateFnRef::state_dtype(&self, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::AggregateFnRef::vtable_ref<V: vortex_array::aggregate_fn::AggregateFnVTable>(&self) -> core::option::Option<&V>

impl core::clone::Clone for vortex_array::aggregate_fn::AggregateFnRef

pub fn vortex_array::aggregate_fn::AggregateFnRef::clone(&self) -> vortex_array::aggregate_fn::AggregateFnRef

impl core::cmp::Eq for vortex_array::aggregate_fn::AggregateFnRef

impl core::cmp::PartialEq for vortex_array::aggregate_fn::AggregateFnRef

pub fn vortex_array::aggregate_fn::AggregateFnRef::eq(&self, other: &Self) -> bool

impl core::fmt::Debug for vortex_array::aggregate_fn::AggregateFnRef

pub fn vortex_array::aggregate_fn::AggregateFnRef::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::fmt::Display for vortex_array::aggregate_fn::AggregateFnRef

pub fn vortex_array::aggregate_fn::AggregateFnRef::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::hash::Hash for vortex_array::aggregate_fn::AggregateFnRef

pub fn vortex_array::aggregate_fn::AggregateFnRef::hash<H: core::hash::Hasher>(&self, state: &mut H)

pub trait vortex_array::aggregate_fn::Accumulator: core::marker::Send + core::marker::Sync

pub fn vortex_array::aggregate_fn::Accumulator::accumulate(&mut self, batch: &vortex_array::ArrayRef) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::Accumulator::accumulate_list(&mut self, list: &vortex_array::arrays::ListViewArray) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::Accumulator::finish(self: alloc::boxed::Box<Self>) -> vortex_error::VortexResult<vortex_array::ArrayRef>

pub fn vortex_array::aggregate_fn::Accumulator::flush(&mut self) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::Accumulator::is_saturated(&self) -> bool

pub fn vortex_array::aggregate_fn::Accumulator::merge(&mut self, state: &vortex_array::scalar::Scalar) -> vortex_error::VortexResult<()>

pub fn vortex_array::aggregate_fn::Accumulator::merge_list(&mut self, states: &vortex_array::ArrayRef) -> vortex_error::VortexResult<()>

pub trait vortex_array::aggregate_fn::AggregateFnPlugin: 'static + core::marker::Send + core::marker::Sync

pub fn vortex_array::aggregate_fn::AggregateFnPlugin::deserialize(&self, metadata: &[u8], session: &vortex_session::VortexSession) -> vortex_error::VortexResult<vortex_array::aggregate_fn::AggregateFnRef>

pub fn vortex_array::aggregate_fn::AggregateFnPlugin::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

impl<V: vortex_array::aggregate_fn::AggregateFnVTable> vortex_array::aggregate_fn::AggregateFnPlugin for V

pub fn V::deserialize(&self, metadata: &[u8], session: &vortex_session::VortexSession) -> vortex_error::VortexResult<vortex_array::aggregate_fn::AggregateFnRef>

pub fn V::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub trait vortex_array::aggregate_fn::AggregateFnVTable: 'static + core::marker::Sized + core::clone::Clone + core::marker::Send + core::marker::Sync

pub type vortex_array::aggregate_fn::AggregateFnVTable::Options: 'static + core::marker::Send + core::marker::Sync + core::clone::Clone + core::fmt::Debug + core::fmt::Display + core::cmp::PartialEq + core::cmp::Eq + core::hash::Hash

pub fn vortex_array::aggregate_fn::AggregateFnVTable::accumulator(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<alloc::boxed::Box<dyn vortex_array::aggregate_fn::Accumulator>>

pub fn vortex_array::aggregate_fn::AggregateFnVTable::deserialize(&self, _metadata: &[u8], _session: &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>

pub fn vortex_array::aggregate_fn::AggregateFnVTable::id(&self) -> vortex_array::aggregate_fn::AggregateFnId

pub fn vortex_array::aggregate_fn::AggregateFnVTable::return_dtype(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub fn vortex_array::aggregate_fn::AggregateFnVTable::serialize(&self, options: &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>

pub fn vortex_array::aggregate_fn::AggregateFnVTable::state_dtype(&self, options: &Self::Options, input_dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::dtype::DType>

pub trait vortex_array::aggregate_fn::AggregateFnVTableExt: vortex_array::aggregate_fn::AggregateFnVTable

pub fn vortex_array::aggregate_fn::AggregateFnVTableExt::bind(&self, options: Self::Options) -> vortex_array::aggregate_fn::AggregateFnRef

impl<V: vortex_array::aggregate_fn::AggregateFnVTable> vortex_array::aggregate_fn::AggregateFnVTableExt for V

pub fn V::bind(&self, options: Self::Options) -> vortex_array::aggregate_fn::AggregateFnRef

pub type vortex_array::aggregate_fn::AggregateFnId = arcref::ArcRef<str>

pub type vortex_array::aggregate_fn::AggregateFnPluginRef = alloc::sync::Arc<dyn vortex_array::aggregate_fn::AggregateFnPlugin>

pub mod vortex_array::arrays

pub mod vortex_array::arrays::build_views
Expand Down
75 changes: 75 additions & 0 deletions vortex-array/src/aggregate_fn/accumulator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_error::VortexResult;

use crate::ArrayRef;
use crate::arrays::ListViewArray;
use crate::scalar::Scalar;

/// The execution interface for all aggregation.
///
/// An accumulator processes one group at a time: the caller feeds element batches via
/// [`accumulate`](Accumulator::accumulate), then calls [`flush`](Accumulator::flush) to finalize
/// the group and begin the next. The accumulator owns an output buffer and returns all results
/// via [`finish`](Accumulator::finish).
pub trait Accumulator: Send + Sync {
/// Feed a batch of elements for the currently open group.
///
/// May be called multiple times per group (e.g., chunked elements).
fn accumulate(&mut self, batch: &ArrayRef) -> VortexResult<()>;

/// Accumulate all groups defined by a [`ListViewArray`] in one call.
///
/// Default: for each group, accumulate its elements then flush.
/// Override for vectorized fast paths (e.g., segmented sum over the flat
/// elements + offsets without per-group slicing).
fn accumulate_list(&mut self, list: &ListViewArray) -> VortexResult<()> {
for i in 0..list.len() {
self.accumulate(&list.list_elements_at(i)?)?;
self.flush()?;
}
Ok(())
}

/// Merge pre-computed partial state into the currently open group.
///
/// The scalar's dtype must match the aggregate's `state_dtype`.
/// This is equivalent to having processed raw elements that would produce
/// this state — used by encoding-specific optimizations.
fn merge(&mut self, state: &Scalar) -> VortexResult<()>;

/// Merge an array of pre-computed states, one per group, flushing each.
///
/// The array's dtype must match the aggregate's `state_dtype`.
/// Default: merge + flush for each element.
fn merge_list(&mut self, states: &ArrayRef) -> VortexResult<()> {
for i in 0..states.len() {
self.merge(&states.scalar_at(i)?)?;
self.flush()?;
}
Ok(())
}

/// Whether the currently open group's result is fully determined.
///
/// When true, callers may skip further accumulate/merge calls and proceed
/// directly to [`flush`](Accumulator::flush). Resets to false after flush.
fn is_saturated(&self) -> bool {
false
}

/// Finalize the currently open group: push its result to the output buffer
/// and reset internal state for the next group.
///
/// Flushing a group with zero accumulated elements produces the aggregate's
/// identity value (e.g., 0 for Sum, u64::MAX for Min) or null if no identity
/// exists.
fn flush(&mut self) -> VortexResult<()>;

/// Return all flushed results as a single array.
///
/// Length equals the number of [`flush`](Accumulator::flush) calls made over the
/// accumulator's lifetime.
fn finish(self: Box<Self>) -> VortexResult<ArrayRef>;
}
121 changes: 121 additions & 0 deletions vortex-array/src/aggregate_fn/erased.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Type-erased aggregate function ([`AggregateFnRef`]).

use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::hash::Hash;
use std::hash::Hasher;
use std::sync::Arc;

use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_utils::debug_with::DebugWith;

use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::accumulator::Accumulator;
use crate::aggregate_fn::options::AggregateFnOptions;
use crate::aggregate_fn::typed::AggregateFnInner;
use crate::aggregate_fn::typed::DynAggregateFn;
use crate::dtype::DType;

/// A type-erased aggregate function, pairing a vtable with bound options behind a trait object.
///
/// This stores an [`AggregateFnVTable`] and its options behind an `Arc<dyn DynAggregateFn>`,
/// allowing heterogeneous storage and dispatch.
///
/// Use [`super::AggregateFn::new()`] to construct, and [`super::AggregateFn::erased()`] to
/// obtain an [`AggregateFnRef`].
#[derive(Clone)]
pub struct AggregateFnRef(pub(super) Arc<dyn DynAggregateFn>);

impl AggregateFnRef {
/// Returns the ID of this aggregate function.
pub fn id(&self) -> AggregateFnId {
self.0.id()
}

/// Returns whether the aggregate function is of the given vtable type.
pub fn is<V: AggregateFnVTable>(&self) -> bool {
self.0.as_any().is::<AggregateFnInner<V>>()
}

/// Returns the typed options for this aggregate function if it matches the given vtable type.
pub fn as_opt<V: AggregateFnVTable>(&self) -> Option<&V::Options> {
self.downcast_inner::<V>().map(|inner| &inner.options)
}

/// Returns a reference to the typed vtable if it matches the given vtable type.
pub fn vtable_ref<V: AggregateFnVTable>(&self) -> Option<&V> {
self.downcast_inner::<V>().map(|inner| &inner.vtable)
}

/// Downcast the inner to the concrete `AggregateFnInner<V>`.
fn downcast_inner<V: AggregateFnVTable>(&self) -> Option<&AggregateFnInner<V>> {
self.0.as_any().downcast_ref::<AggregateFnInner<V>>()
}

/// Returns the typed options for this aggregate function if it matches the given vtable type.
///
/// # Panics
///
/// Panics if the vtable type does not match.
pub fn as_<V: AggregateFnVTable>(&self) -> &V::Options {
self.as_opt::<V>()
.vortex_expect("Aggregate function options type mismatch")
}

/// The type-erased options for this aggregate function.
pub fn options(&self) -> AggregateFnOptions<'_> {
AggregateFnOptions { inner: &*self.0 }
}

/// Compute the return [`DType`] per group given the input element type.
pub fn return_dtype(&self, input_dtype: &DType) -> VortexResult<DType> {
self.0.return_dtype(input_dtype)
}

/// DType of the intermediate accumulator state.
pub fn state_dtype(&self, input_dtype: &DType) -> VortexResult<DType> {
self.0.state_dtype(input_dtype)
}

/// Create an accumulator for streaming aggregation.
pub fn accumulator(&self, input_dtype: &DType) -> VortexResult<Box<dyn Accumulator>> {
self.0.accumulator(input_dtype)
}
}

impl Debug for AggregateFnRef {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AggregateFnRef")
.field("vtable", &self.0.id())
.field("options", &DebugWith(|fmt| self.0.options_debug(fmt)))
.finish()
}
}

impl Display for AggregateFnRef {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}(", self.0.id())?;
self.0.options_display(f)?;
write!(f, ")")
}
}

impl PartialEq for AggregateFnRef {
fn eq(&self, other: &Self) -> bool {
self.0.id() == other.0.id() && self.0.options_eq(other.0.options_any())
}
}
impl Eq for AggregateFnRef {}

impl Hash for AggregateFnRef {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.id().hash(state);
self.0.options_hash(state);
}
}
2 changes: 2 additions & 0 deletions vortex-array/src/aggregate_fn/fns/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
Loading
Loading