Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ typedef struct {

void (*cardinality)(void *bind_data, duckdb_vx_node_statistics *node_stats_out);

void (*pushdown_column_type)(void *bind_data, idx_t id, duckdb_logical_type type);
bool (*pushdown_complex_filter)(void *bind_data, duckdb_vx_expr expr, duckdb_vx_error *error_out);

void (*to_string)(void *bind_data, duckdb_vx_string_map map);
Expand Down
15 changes: 15 additions & 0 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,20 @@ void function(ClientContext &, TableFunctionInput &input, DataChunk &output) {
}
}

void type_pushdown(ClientContext &, optional_ptr<FunctionData> bind_data,
const unordered_map<idx_t, LogicalType> &new_column_types) {
const auto &bind = bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind.ffi_data->DataPtr();
for (const auto& [id, type] : new_column_types) {
const duckdb_logical_type casted_type = reinterpret_cast<duckdb_logical_type>(
// This is a flaw of duckdb api which doesn't allow passing const
// LogicalTypes. We guarantee this variable won't be mutated on
// Rust side
const_cast<LogicalType*>(&type));
bind.info.vtab.pushdown_column_type(ffi_bind, id, casted_type);
}
}

void c_pushdown_complex_filter(ClientContext &,
LogicalGet &,
FunctionData *bind_data,
Expand Down Expand Up @@ -394,6 +408,7 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
tf.filter_prune = true;
tf.sampling_pushdown = false;

tf.type_pushdown = type_pushdown;
tf.pushdown_complex_filter = c_pushdown_complex_filter;
tf.cardinality = c_cardinality;
tf.get_partition_info = get_partition_info;
Expand Down
8 changes: 8 additions & 0 deletions vortex-duckdb/src/convert/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ impl TryFrom<&DType> for LogicalType {
}
}

impl TryFrom<&LogicalTypeRef> for DType {
type Error = VortexError;

fn try_from(value: &LogicalTypeRef) -> Result<Self, Self::Error> {
DType::from_logical_type(value, Nullability::Nullable)
}
}

impl TryFrom<StructFields> for LogicalType {
type Error = VortexError;

Expand Down
24 changes: 24 additions & 0 deletions vortex-duckdb/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use vortex::dtype::FieldNames;
use vortex::error::VortexExpect;
use vortex::error::VortexResult;
use vortex::error::vortex_err;
use vortex::error::vortex_panic;
use vortex::expr::Expression;
use vortex::expr::and_collect;
use vortex::expr::col;
Expand Down Expand Up @@ -73,6 +74,7 @@ use crate::duckdb::DataChunkRef;
use crate::duckdb::DuckdbStringMapRef;
use crate::duckdb::ExpressionRef;
use crate::duckdb::LogicalType;
use crate::duckdb::LogicalTypeRef;
use crate::duckdb::PartitionData;
use crate::duckdb::TableFilterSetRef;
use crate::duckdb::TableFunction;
Expand Down Expand Up @@ -118,6 +120,7 @@ pub struct DataSourceBindData {
data_source: Arc<MultiLayoutDataSource>,
filter_exprs: Vec<Expression>,
column_fields: Vec<DuckdbField>,
//column_casts: Vec<(usize, DType)>,
}

impl Clone for DataSourceBindData {
Expand Down Expand Up @@ -569,6 +572,27 @@ impl<T: DataSourceTableFunction> TableFunction for T {
map.push("Filters", &filters.join(" /\\\n"));
}
}

fn pushdown_column_type(
bind_data: &mut Self::BindData,
column_id: u64,
new_type: &LogicalTypeRef,
) {
// TODO virtual column count?
let column_id: usize = column_id.as_();
if column_id >= bind_data.column_fields.len() {
vortex_panic!("column_id {column_id} >= {}", bind_data.column_fields.len());
}
// TODO casting?
let field = &mut bind_data.column_fields[column_id];
let old_dtype = field.dtype.clone();
// TODO we don't need a copy?
field.logical_type = LogicalType::new(new_type.as_type_id());
field.dtype = new_type
.try_into()
.vortex_expect("logical type -> dtype conversion failed");
println!("Cast {} -> {}", old_dtype, field.dtype);
}
}

/// Extracts DuckDB column names and logical types from a Vortex struct DType.
Expand Down
19 changes: 19 additions & 0 deletions vortex-duckdb/src/duckdb/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::duckdb::DataChunk;
use crate::duckdb::DatabaseRef;
use crate::duckdb::Expression;
use crate::duckdb::LogicalType;
use crate::duckdb::LogicalTypeRef;
use crate::duckdb::Value;
use crate::duckdb::client_context::ClientContextRef;
use crate::duckdb::data_chunk::DataChunkRef;
Expand Down Expand Up @@ -103,6 +104,12 @@ pub trait TableFunction: Sized + Debug {
/// Return table scanning progress from 0. to 100.
fn table_scan_progress(global_state: &Self::GlobalState) -> f64;

fn pushdown_column_type(
bind_data: &mut Self::BindData,
column_id: u64,
new_type: &LogicalTypeRef
);

/// Pushes down a filter expression to the table function.
///
/// Returns `true` if the filter was successfully pushed down (and stored on the bind data),
Expand Down Expand Up @@ -157,6 +164,7 @@ impl DatabaseRef {
function: Some(function::<T>),
statistics: Some(statistics::<T>),
cardinality: Some(cardinality_callback::<T>),
pushdown_column_type: Some(pushdown_column_type::<T>),
pushdown_complex_filter: Some(pushdown_complex_filter_callback::<T>),
to_string: Some(to_string_callback::<T>),
table_scan_progress: Some(table_scan_progress_callback::<T>),
Expand Down Expand Up @@ -225,6 +233,17 @@ unsafe extern "C-unwind" fn get_partition_data_callback<T: TableFunction>(
out.file_index = data.file_index;
}

unsafe extern "C-unwind" fn pushdown_column_type<T: TableFunction>(
bind_data: *mut c_void,
column_id: u64,
new_type: cpp::duckdb_logical_type
) {
let bind_data =
unsafe { bind_data.cast::<T::BindData>().as_mut() }.vortex_expect("bind_data null pointer");
let new_type = unsafe { LogicalType::borrow(new_type) };
T::pushdown_column_type(bind_data, column_id, new_type);
}

unsafe extern "C-unwind" fn pushdown_complex_filter_callback<T: TableFunction>(
bind_data: *mut c_void,
expr: cpp::duckdb_vx_expr,
Expand Down