diff --git a/Cargo.lock b/Cargo.lock index aa3f9c589a9..917c1544870 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10809,6 +10809,7 @@ version = "0.1.0" dependencies = [ "arrow-array 57.2.0", "arrow-schema 57.2.0", + "async-trait", "bit-vec 0.8.0", "futures", "itertools 0.14.0", diff --git a/vortex-scan/Cargo.toml b/vortex-scan/Cargo.toml index 0f9f34b00de..a8ea3c16398 100644 --- a/vortex-scan/Cargo.toml +++ b/vortex-scan/Cargo.toml @@ -29,6 +29,7 @@ vortex-mask = { workspace = true } vortex-metrics = { workspace = true } vortex-session = { workspace = true } +async-trait = { workspace = true } bit-vec = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/vortex-scan/src/datasource.rs b/vortex-scan/src/datasource.rs new file mode 100644 index 00000000000..c6ea110ee44 --- /dev/null +++ b/vortex-scan/src/datasource.rs @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use vortex_array::expr::Expression; +use vortex_array::stream::SendableArrayStream; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +/// Create a Vortex source from serialized configuration. +/// +/// Providers can be registered with Vortex under a specific +#[async_trait(?Send)] +pub trait DataSourceProvider: 'static { + /// URI schemes handled by this source provider. + /// + /// TODO(ngates): this might not be the right way to plugin sources. + fn schemes(&self) -> &[&str]; + + /// Initialize a new source. + async fn init_source(&self, uri: String) -> VortexResult; + + /// Serialize a source split to bytes. + async fn serialize_split(&self, split: &dyn Split) -> VortexResult>; + + /// Deserialize a source split from bytes. + async fn deserialize_split(&self, data: &[u8]) -> VortexResult; +} + +/// A reference-counted data source. +pub type DataSourceRef = Arc; + +/// A data source represents a streamable dataset that can be scanned with projection and filter +/// expressions. Each scan produces splits that can be executed (potentially in parallel) to read +/// data. Each split can be serialized for remote execution. +#[async_trait] +pub trait DataSource: 'static + Send + Sync { + /// Returns the dtype of the source. + fn dtype(&self) -> &DType; + + /// Returns an estimate of the row count of the source. + fn row_count_estimate(&self) -> Estimate; + + /// Returns a scan over the source. + async fn scan(&self, scan_request: ScanRequest) -> VortexResult; +} + +#[derive(Debug, Clone, Default)] +pub struct ScanRequest { + /// Projection expression, `None` implies `root()`. + pub projection: Option, + /// Filter expression, `None` implies no filter. + pub filter: Option, + /// Optional limit on the number of rows to scan. + pub limit: Option, +} + +/// A boxed data source scan. +pub type DataSourceScanRef = Box; + +#[async_trait] +pub trait DataSourceScan: 'static + Send + Sync { + /// The returned dtype of the scan. + fn dtype(&self) -> &DType; + + /// An estimate of the remaining splits. + fn remaining_splits_estimate(&self) -> Estimate; + + /// Returns the next batch of splits to be processed. + /// + /// This should not return _more_ than `max_splits` splits, but may return fewer. + async fn next_splits(&mut self, max_splits: usize) -> VortexResult>; +} + +pub type SplitStream = BoxStream<'static, VortexResult>; +pub type SplitRef = Arc; + +/// A split represents a unit of work that can be executed to produce a stream of arrays. +pub trait Split: 'static + Send + Sync { + /// Downcast the split to a concrete type. + fn as_any(&self) -> &dyn Any; + + /// Executes the split. + fn execute(&self) -> VortexResult; + + /// Returns an estimate of the row count for this split. + fn row_count_estimate(&self) -> Estimate; + + /// Returns an estimate of the byte size for this split. + fn byte_size_estimate(&self) -> Estimate; +} + +/// An estimate that can be exact, an upper bound, or unknown. +#[derive(Default)] +pub enum Estimate { + Exact(T), + UpperBound(T), + #[default] + Unknown, +} diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index 52eb0dd47bb..c7a94d24b29 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -7,6 +7,7 @@ const IDEAL_SPLIT_SIZE: u64 = 100_000; pub mod arrow; +pub mod datasource; mod filter; pub mod row_mask; mod splits; diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 5240e3c22cb..dc2c1231277 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -39,6 +39,7 @@ pub mod compressor { pub mod dtype { pub use vortex_dtype::*; } + pub mod error { pub use vortex_error::*; }