Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,451 changes: 898 additions & 553 deletions Cargo.lock

Large diffs are not rendered by default.

30 changes: 16 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,22 @@ cudarc = { version = "0.19.0", features = [
] }
custom-labels = "0.4.4"
dashmap = "6.1.0"
datafusion = { version = "53", default-features = false, features = ["sql"] }
datafusion-catalog = { version = "53" }
datafusion-common = { version = "53" }
datafusion-common-runtime = { version = "53" }
datafusion-datasource = { version = "53", default-features = false }
datafusion-execution = { version = "53" }
datafusion-expr = { version = "53" }
datafusion-functions = { version = "53" }
datafusion-physical-expr = { version = "53" }
datafusion-physical-expr-adapter = { version = "53" }
datafusion-physical-expr-common = { version = "53" }
datafusion-physical-plan = { version = "53" }
datafusion-pruning = { version = "53" }
datafusion-sqllogictest = { version = "53" }
datafusion = { git = "https://github.com/apache/datafusion", branch = "branch-54", default-features = false, features = [
"sql",
] }
datafusion-catalog = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-common = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-common-runtime = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-datasource = { git = "https://github.com/apache/datafusion", branch = "branch-54", default-features = false }
datafusion-execution = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-expr = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-functions = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-physical-expr-adapter = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-pruning = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
datafusion-sqllogictest = { git = "https://github.com/apache/datafusion", branch = "branch-54" }
dirs = "6.0.0"
divan = { package = "codspeed-divan-compat", version = "4.0.4" }
enum-iterator = "2.0.0"
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use datafusion::datasource::provider::DefaultTableFactory;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::cache::DefaultListFilesCache;
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
use datafusion::execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion::execution::cache::file_statistics_cache::DefaultFileStatisticsCache;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::SessionConfig;
use datafusion::prelude::SessionContext;
Expand All @@ -37,7 +37,7 @@ pub fn get_session_context() -> SessionContext {
let file_static_cache = Arc::new(DefaultFileStatisticsCache::default());
let list_file_cache = Arc::new(DefaultListFilesCache::default());
let cache_config = CacheManagerConfig::default()
.with_files_statistics_cache(Some(file_static_cache))
.with_file_statistics_cache(Some(file_static_cache))
.with_list_files_cache(Some(list_file_cache));
rt_builder = rt_builder.with_cache_manager(cache_config);

Expand Down
63 changes: 21 additions & 42 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ impl DefaultExpressionConvertor {
let mut result = self.convert(source_expr.as_ref())?;
for expr in field_names {
let field_name = expr
.as_any()
.downcast_ref::<df_expr::Literal>()
.ok_or_else(|| exec_datafusion_err!("get_field field name must be a literal"))?
.value()
Expand Down Expand Up @@ -195,19 +194,19 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
fn convert(&self, df: &dyn PhysicalExpr) -> DFResult<Expression> {
// TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep
// for that node, up to any `and` or `or` node.
if let Some(binary_expr) = df.as_any().downcast_ref::<df_expr::BinaryExpr>() {
if let Some(binary_expr) = df.downcast_ref::<df_expr::BinaryExpr>() {
let left = self.convert(binary_expr.left().as_ref())?;
let right = self.convert(binary_expr.right().as_ref())?;
let operator = try_operator_from_df(binary_expr.op())?;

return Ok(Binary.new_expr(operator, [left, right]));
}

if let Some(col_expr) = df.as_any().downcast_ref::<df_expr::Column>() {
if let Some(col_expr) = df.downcast_ref::<df_expr::Column>() {
return Ok(get_item(col_expr.name().to_owned(), root()));
}

if let Some(like) = df.as_any().downcast_ref::<df_expr::LikeExpr>() {
if let Some(like) = df.downcast_ref::<df_expr::LikeExpr>() {
let child = self.convert(like.expr().as_ref())?;
let pattern = self.convert(like.pattern().as_ref())?;
return Ok(Like.new_expr(
Expand All @@ -219,42 +218,34 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
));
}

if let Some(literal) = df.as_any().downcast_ref::<df_expr::Literal>() {
if let Some(literal) = df.downcast_ref::<df_expr::Literal>() {
let value = Scalar::from_df(literal.value());
return Ok(lit(value));
}

if let Some(cast_expr) = df.as_any().downcast_ref::<df_expr::CastExpr>() {
let cast_dtype = DType::from_arrow((cast_expr.cast_type(), Nullability::Nullable));
if let Some(cast_expr) = df.downcast_ref::<df_expr::CastExpr>() {
let cast_dtype = DType::from_arrow(cast_expr.target_field().as_ref());
let child = self.convert(cast_expr.expr().as_ref())?;
return Ok(cast(child, cast_dtype));
}

if let Some(cast_col_expr) = df.as_any().downcast_ref::<df_expr::CastColumnExpr>() {
let target = cast_col_expr.target_field();

let target_dtype = DType::from_arrow((target.data_type(), target.is_nullable().into()));
let child = self.convert(cast_col_expr.expr().as_ref())?;
return Ok(cast(child, target_dtype));
}

if let Some(is_null_expr) = df.as_any().downcast_ref::<df_expr::IsNullExpr>() {
if let Some(is_null_expr) = df.downcast_ref::<df_expr::IsNullExpr>() {
let arg = self.convert(is_null_expr.arg().as_ref())?;
return Ok(is_null(arg));
}

if let Some(is_not_null_expr) = df.as_any().downcast_ref::<df_expr::IsNotNullExpr>() {
if let Some(is_not_null_expr) = df.downcast_ref::<df_expr::IsNotNullExpr>() {
let arg = self.convert(is_not_null_expr.arg().as_ref())?;
return Ok(is_not_null(arg));
}

if let Some(in_list) = df.as_any().downcast_ref::<df_expr::InListExpr>() {
if let Some(in_list) = df.downcast_ref::<df_expr::InListExpr>() {
let value = self.convert(in_list.expr().as_ref())?;
let list_elements: Vec<_> = in_list
.list()
.iter()
.map(|e| {
if let Some(lit) = e.as_any().downcast_ref::<df_expr::Literal>() {
if let Some(lit) = e.downcast_ref::<df_expr::Literal>() {
Ok(Scalar::from_df(lit.value()))
} else {
Err(exec_datafusion_err!("Failed to cast sub-expression"))
Expand All @@ -272,11 +263,11 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
return Ok(if in_list.negated() { not(expr) } else { expr });
}

if let Some(scalar_fn) = df.as_any().downcast_ref::<ScalarFunctionExpr>() {
if let Some(scalar_fn) = df.downcast_ref::<ScalarFunctionExpr>() {
return self.try_convert_scalar_function(scalar_fn);
}

if let Some(case_expr) = df.as_any().downcast_ref::<df_expr::CaseExpr>() {
if let Some(case_expr) = df.downcast_ref::<df_expr::CaseExpr>() {
return self.try_convert_case_expr(case_expr);
}

Expand All @@ -297,7 +288,7 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
for projection_expr in source_projection.iter() {
let r = projection_expr.expr.apply(|node| {
// We only pull column children of scalar functions that we can't push into the scan.
if let Some(scalar_fn_expr) = node.as_any().downcast_ref::<ScalarFunctionExpr>()
if let Some(scalar_fn_expr) = node.downcast_ref::<ScalarFunctionExpr>()
&& !can_scalar_fn_be_pushed_down(scalar_fn_expr)
{
scan_projection.extend(
Expand All @@ -312,7 +303,7 @@ impl ExpressionConvertor for DefaultExpressionConvertor {

// DataFusion assumes different decimal types can be coerced.
// Vortex expects a perfect match so we don't push it down.
if let Some(binary_expr) = node.as_any().downcast_ref::<df_expr::BinaryExpr>()
if let Some(binary_expr) = node.downcast_ref::<df_expr::BinaryExpr>()
&& binary_expr.op().is_numerical_operators()
&& (is_decimal(&binary_expr.left().data_type(input_schema)?)
&& is_decimal(&binary_expr.right().data_type(input_schema)?))
Expand Down Expand Up @@ -406,14 +397,13 @@ fn try_operator_from_df(value: &DFOperator) -> DFResult<Operator> {
}
}

fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
fn can_be_pushed_down_impl(expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
// We currently do not support pushdown of dynamic expressions in DF.
// See issue: https://github.com/vortex-data/vortex/issues/4034
if is_dynamic_physical_expr(df_expr) {
if is_dynamic_physical_expr(expr) {
return false;
}

let expr = df_expr.as_any();
if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
can_binary_be_pushed_down(binary, schema)
} else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
Expand All @@ -429,9 +419,6 @@ fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) ->
} else if let Some(cast_expr) = expr.downcast_ref::<df_expr::CastExpr>() {
// CastExpr child must be an expression type that convert() can handle
is_convertible_expr(cast_expr.expr())
} else if let Some(cast_col_expr) = expr.downcast_ref::<df_expr::CastColumnExpr>() {
// CastColumnExpr child must be an expression type that convert() can handle
is_convertible_expr(cast_col_expr.expr())
} else if let Some(is_null) = expr.downcast_ref::<df_expr::IsNullExpr>() {
can_be_pushed_down_impl(is_null.arg(), schema)
} else if let Some(is_not_null) = expr.downcast_ref::<df_expr::IsNotNullExpr>() {
Expand All @@ -447,17 +434,15 @@ fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) ->
} else if let Some(case_expr) = expr.downcast_ref::<df_expr::CaseExpr>() {
can_case_be_pushed_down(case_expr, schema)
} else {
tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
tracing::debug!(%expr, "DataFusion expression can't be pushed down");
false
}
}

/// Checks if an expression type is one that convert() can handle.
/// This is less restrictive than can_be_pushed_down since it only checks
/// expression types, not data type support.
fn is_convertible_expr(df_expr: &Arc<dyn PhysicalExpr>) -> bool {
let expr = df_expr.as_any();

fn is_convertible_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
// Expression types that convert() handles
expr.downcast_ref::<df_expr::BinaryExpr>().is_some()
|| expr.downcast_ref::<df_expr::Column>().is_some()
Expand All @@ -466,9 +451,6 @@ fn is_convertible_expr(df_expr: &Arc<dyn PhysicalExpr>) -> bool {
|| expr
.downcast_ref::<df_expr::CastExpr>()
.is_some_and(|e| is_convertible_expr(e.expr()))
|| expr
.downcast_ref::<df_expr::CastColumnExpr>()
.is_some_and(|e| is_convertible_expr(e.expr()))
|| expr.downcast_ref::<df_expr::IsNullExpr>().is_some()
|| expr.downcast_ref::<df_expr::IsNotNullExpr>().is_some()
|| expr.downcast_ref::<df_expr::InListExpr>().is_some()
Expand Down Expand Up @@ -568,6 +550,8 @@ mod tests {
use arrow_schema::Field;
use arrow_schema::Schema;
use arrow_schema::TimeUnit as ArrowTimeUnit;
use datafusion::arrow::array::AsArray;
use datafusion::arrow::datatypes::Int32Type;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator as DFOperator;
use datafusion_physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -988,12 +972,7 @@ mod tests {
let vortex_as_arrow = vortex_result.into_primitive().as_slice::<i32>().to_vec();

// Convert DataFusion result to Vec for comparison
let df_as_arrow: Vec<i32> = df_array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values()
.to_vec();
let df_as_arrow: Vec<i32> = df_array.as_primitive::<Int32Type>().values().to_vec();

// Compare results
// Expected: [0, 0, 50, 100, 100] for values [1, 5, 10, 15, 20]
Expand Down
10 changes: 0 additions & 10 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::any::Any;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
Expand Down Expand Up @@ -299,10 +298,6 @@ impl FileFormatFactory for VortexFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(VortexFormat::new(self.session.clone()))
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl VortexFormat {
Expand Down Expand Up @@ -330,10 +325,6 @@ impl VortexFormat {

#[async_trait]
impl FileFormat for VortexFormat {
fn as_any(&self) -> &dyn Any {
self
}

fn compression_type(&self) -> Option<FileCompressionType> {
None
}
Expand Down Expand Up @@ -593,7 +584,6 @@ impl FileFormat for VortexFormat {
) -> DFResult<Arc<dyn ExecutionPlan>> {
let mut source = file_scan_config
.file_source()
.as_any()
.downcast_ref::<VortexSource>()
.cloned()
.ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
Expand Down
11 changes: 4 additions & 7 deletions vortex-datafusion/src/persistent/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,13 @@ impl VortexMetricsFinder {
impl ExecutionPlanVisitor for VortexMetricsFinder {
type Error = std::convert::Infallible;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
// Start with exec metrics or create a new set
let mut set = exec.metrics().unwrap_or_default();

// Include our own metrics from VortexSource
if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
&& let Some(scan) = file_scan
.file_source
.as_any()
.downcast_ref::<VortexSource>()
if let Some(file_scan) = exec.data_source().downcast_ref::<FileScanConfig>()
&& let Some(scan) = file_scan.file_source.downcast_ref::<VortexSource>()
{
for metric in scan
.metrics_registry()
Expand Down Expand Up @@ -240,7 +237,7 @@ mod tests {
&mut self,
plan: &dyn datafusion_physical_plan::ExecutionPlan,
) -> Result<bool, Self::Error> {
if plan.as_any().downcast_ref::<DataSourceExec>().is_some() {
if plan.downcast_ref::<DataSourceExec>().is_some() {
self.0 += 1;
Ok(false)
} else {
Expand Down
27 changes: 15 additions & 12 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ impl FileOpener for VortexOpener {

let mut scan_builder = ScanBuilder::new(session.clone(), Arc::clone(&layout_reader));

if let Some(extensions) = file.extensions
&& let Some(vortex_plan) = extensions.downcast_ref::<VortexAccessPlan>()
{
if let Some(vortex_plan) = file.extensions.get::<VortexAccessPlan>() {
scan_builder = vortex_plan.apply_to_builder(scan_builder);
}

Expand Down Expand Up @@ -1149,9 +1147,12 @@ mod tests {

let schema = batch.schema();
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
file.extensions = Some(Arc::new(VortexAccessPlan::default().with_selection(
Selection::IncludeByIndex(Buffer::from_iter(vec![1, 3, 5, 7])),
)));
file.extensions
.insert(
VortexAccessPlan::default().with_selection(Selection::IncludeByIndex(
Buffer::from_iter(vec![1, 3, 5, 7]),
)),
);

let opener = make_test_opener(
Arc::clone(&object_store),
Expand Down Expand Up @@ -1190,9 +1191,12 @@ mod tests {

let schema = batch.schema();
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
file.extensions = Some(Arc::new(VortexAccessPlan::default().with_selection(
Selection::ExcludeByIndex(Buffer::from_iter(vec![0, 2, 4, 6, 8])),
)));
file.extensions
.insert(
VortexAccessPlan::default().with_selection(Selection::ExcludeByIndex(
Buffer::from_iter(vec![0, 2, 4, 6, 8]),
)),
);

let opener = make_test_opener(
Arc::clone(&object_store),
Expand Down Expand Up @@ -1234,9 +1238,8 @@ mod tests {

let schema = batch.schema();
let mut file = PartitionedFile::new(file_path.to_string(), data_size);
file.extensions = Some(Arc::new(
VortexAccessPlan::default().with_selection(Selection::All),
));
file.extensions
.insert(VortexAccessPlan::default().with_selection(Selection::All));

let opener = make_test_opener(
Arc::clone(&object_store),
Expand Down
5 changes: 0 additions & 5 deletions vortex-datafusion/src/persistent/sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::any::Any;
use std::sync::Arc;

use arrow_schema::SchemaRef;
Expand Down Expand Up @@ -69,10 +68,6 @@ impl DisplayAs for VortexSink {

#[async_trait]
impl DataSink for VortexSink {
fn as_any(&self) -> &dyn Any {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}
Expand Down
Loading
Loading