diff --git a/vortex-duckdb/build.rs b/vortex-duckdb/build.rs index af44aceedeb..377e415ba02 100644 --- a/vortex-duckdb/build.rs +++ b/vortex-duckdb/build.rs @@ -437,6 +437,7 @@ fn main() { .size_t_is_usize(true) .clang_arg(format!("-I{}", duckdb_include_path.display())) .clang_arg(format!("-I{}", crate_dir.join("cpp/include").display())) + .generate_comments(true) // Tell cargo to invalidate the built crate whenever any of the // included header files changed. .parse_callbacks(Box::new(bindgen::CargoCallbacks::new())) diff --git a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h index 5299d773271..e8f483514a5 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h @@ -62,22 +62,38 @@ void duckdb_vx_string_map_free(duckdb_vx_string_map map); // Input data passed into the init_global and init_local callbacks. typedef struct { const void *bind_data; + + /** + * Projected columns that are requested to be read. These are not + * all columns, only the ones DuckDB optimizer thinks we should read. + */ idx_t *column_ids; size_t column_ids_count; - // uint64_t *column_indexes; - // size_t column_indexes_count; + + /** + * Post filter projected columns. Our table function implements filter + * pushdown so this list is a subset of columns referenced in column_ids + * after filter pushdown and filter pruning. May be empty, in which case + * column_ids should be used. + * Indices in this list reference values from column_ids. I.e. if + * column_ids=[1,5,6], projection_ids=[1], output column should be + * column_ids[1] = 5 + * + * Example usage: + * https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/include/duckdb/function/table_function.hpp#L147 + */ const idx_t *projection_ids; size_t projection_ids_count; + duckdb_vx_table_filter_set filters; duckdb_client_context client_context; - // void *sample_options; } duckdb_vx_tfunc_init_input; // Result data returned from the cardinality callback. typedef struct { idx_t estimated_cardinality; - bool has_estimated_cardinality; idx_t max_cardinality; + bool has_estimated_cardinality; bool has_max_cardinality; } duckdb_vx_node_statistics; diff --git a/vortex-duckdb/cpp/table_function.cpp b/vortex-duckdb/cpp/table_function.cpp index d8f96287f7d..24c579d5832 100644 --- a/vortex-duckdb/cpp/table_function.cpp +++ b/vortex-duckdb/cpp/table_function.cpp @@ -214,8 +214,8 @@ unique_ptr c_cardinality(ClientContext &context, const FunctionD duckdb_vx_node_statistics node_stats_out = { .estimated_cardinality = 0, - .has_estimated_cardinality = false, .max_cardinality = 0, + .has_estimated_cardinality = false, .has_max_cardinality = false, }; bind.info->vtab.cardinality(bind_data->Cast().ffi_data->DataPtr(), &node_stats_out); diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index b958e247f3d..6bbcc990b0a 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -17,6 +17,7 @@ use custom_labels::CURRENT_LABELSET; use futures::StreamExt; use itertools::Itertools; use num_traits::AsPrimitive; +use tracing::debug; use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::VortexSessionExecute; @@ -62,10 +63,19 @@ use crate::duckdb::VirtualColumnsResultRef; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; -// taken from duckdb/common/constants.h COLUMN_IDENTIFIER_EMPTY -// This is used by duckdb whenever there is no projection id in a logical_get node. -// For some reason we cannot return an empty DataChunk and duckdb will look for the virtual column -// with this index and create a data chunk with a single vector of that type. +/// Taken from +/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/include/duckdb/common/constants.hpp#L44 +/// +/// If DuckDB requests a zero-column projection from read_vortex like count(*), +/// its planner tries to get any column: +/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/planner/operator/logical_get.cpp#L149 +/// +/// If you define COLUMN_IDENTIFIER_EMPTY, planner takes it, otherwise the +/// first column. As we don't want to fill the output chunk and we can leave +/// it uninitialized in this case, we define COLUMN_IDENTIFIER_EMPTY as a +/// virtual column in our table function vtab's get_virtual_columns. +/// See vortex-duckdb/cpp/include/duckdb_vx/table_function.h +/// See virtual_columns in this file static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; static EMPTY_COLUMN_NAME: &str = ""; @@ -126,9 +136,11 @@ impl Debug for DataSourceBindData { } } +type DataSourceIterator = ThreadSafeIterator)>>; + /// Global scan state for driving a `DataSource` scan through DuckDB. pub struct DataSourceGlobal { - iterator: ThreadSafeIterator)>>, + iterator: DataSourceIterator, batch_id: AtomicU64, bytes_total: Arc, bytes_read: AtomicU64, @@ -136,7 +148,7 @@ pub struct DataSourceGlobal { /// Per-thread local scan state. pub struct DataSourceLocal { - iterator: ThreadSafeIterator)>>, + iterator: DataSourceIterator, exporter: Option, /// The unique batch id of the last chunk exported via scan(). batch_id: Option, @@ -193,9 +205,11 @@ impl TableFunction for T { } fn init_global(init_input: &TableInitInput) -> VortexResult { + debug!("table init input: {init_input:?}"); + let bind_data = init_input.bind_data(); - let projection_ids = init_input.projection_ids().unwrap_or(&[]); let column_ids = init_input.column_ids(); + let projection_ids = init_input.projection_ids(); let projection_expr = extract_projection_expr(projection_ids, column_ids, &bind_data.column_names); @@ -207,13 +221,10 @@ impl TableFunction for T { bind_data.data_source.dtype(), )?; - tracing::debug!( - "Global init Vortex scan SELECT {} WHERE {}", - &projection_expr, - filter_expr - .as_ref() - .map_or_else(|| "true".to_string(), |f| f.to_string()) - ); + let filter_expr_str = filter_expr + .as_ref() + .map_or_else(|| "true".to_string(), |f| f.to_string()); + debug!("Global init Vortex scan SELECT {projection_expr} WHERE {filter_expr_str}"); let request = ScanRequest { projection: projection_expr, @@ -316,8 +327,8 @@ impl TableFunction for T { return Ok(()); }; let (array_result, conversion_cache) = result?; - let array_result = array_result.optimize_recursive()?; + let array_result = if let Some(array) = array_result.as_opt::() { array.clone() } else if let Some(array) = array_result.as_opt::() @@ -455,27 +466,36 @@ fn extract_schema_from_dtype(dtype: &DType) -> VortexResult<(Vec, Vec, column_ids: &[u64], column_names: &[String], ) -> Expression { - select( - projection_ids - .iter() - .map(|p| { - let idx: usize = p.as_(); - let val: usize = column_ids[idx].as_(); - val - }) - .map(|idx| { - column_names - .get(idx) - .vortex_expect("prune idx in column names") - }) - .map(|s| Arc::from(s.as_str())) - .collect::(), - root(), - ) + // Projection ids may be empty, in which case you need to use projection_ids + // https://github.com/duckdb/duckdb/blob/6e211da91657a94803c465fd0ce585f4c6754b54/src/planner/operator/logical_get.cpp#L168 + let (projection_ids, has_projection_ids) = match projection_ids { + Some(ids) => (ids, true), + None => (column_ids, false), + }; + + // duckdb index is u64 (size_t) but in Rust u64 and usize are different things. + #[allow(clippy::cast_possible_truncation)] + let names = projection_ids + .iter() + .filter(|p| **p != EMPTY_COLUMN_IDX) + .map(|mut idx| { + if has_projection_ids { + idx = &column_ids[*idx as usize]; + } + + #[allow(clippy::cast_possible_truncation)] + column_names + .get(*idx as usize) + .vortex_expect("prune idx in column names") + }) + .map(|s| Arc::from(s.as_str())) + .collect::(); + + select(names, root()) } /// Creates a table filter expression from the table filter set, column metadata, additional diff --git a/vortex-duckdb/src/duckdb/data_chunk.rs b/vortex-duckdb/src/duckdb/data_chunk.rs index ff3d0b259f3..dce2a5637bf 100644 --- a/vortex-duckdb/src/duckdb/data_chunk.rs +++ b/vortex-duckdb/src/duckdb/data_chunk.rs @@ -44,6 +44,10 @@ impl DataChunkRef { .vortex_expect("Column count exceeds usize") } + pub fn reset(&mut self) { + unsafe { cpp::duckdb_data_chunk_reset(self.as_ptr()) } + } + /// Set the length of the data chunk. pub fn set_len(&mut self, len: usize) { unsafe { cpp::duckdb_data_chunk_set_size(self.as_ptr(), len as _) } diff --git a/vortex-duckdb/src/duckdb/table_function/init.rs b/vortex-duckdb/src/duckdb/table_function/init.rs index fc9e310d0ab..edfd8e324b6 100644 --- a/vortex-duckdb/src/duckdb/table_function/init.rs +++ b/vortex-duckdb/src/duckdb/table_function/init.rs @@ -93,12 +93,10 @@ impl<'a, T: TableFunction> TableInitInput<'a, T> { unsafe { &*self.input.bind_data.cast::() } } - /// Returns the column_ids for the table function. pub fn column_ids(&self) -> &[u64] { unsafe { std::slice::from_raw_parts(self.input.column_ids, self.input.column_ids_count) } } - /// Returns the projection_ids for the table function. pub fn projection_ids(&self) -> Option<&[u64]> { if self.input.projection_ids.is_null() { return None; diff --git a/vortex-duckdb/src/exporter/mod.rs b/vortex-duckdb/src/exporter/mod.rs index 10d2445f23a..2cb55c63366 100644 --- a/vortex-duckdb/src/exporter/mod.rs +++ b/vortex-duckdb/src/exporter/mod.rs @@ -44,6 +44,8 @@ use crate::duckdb::VectorRef; pub struct ArrayExporter { ctx: ExecutionCtx, + /// Columns DuckDB requested to read from file. If empty, it's a zero-column + /// projection and should be handled accordingly, see ArrayExporter::export. fields: Vec>, array_len: usize, remaining: usize, @@ -62,6 +64,7 @@ impl ArrayExporter { .iter() .map(|field| new_array_exporter(field.clone(), cache, &mut ctx)) .collect::>>()?; + Ok(Self { ctx, fields, @@ -74,19 +77,16 @@ impl ArrayExporter { /// /// Returns `true` if a chunk was exported, `false` if all rows have been exported. pub fn export(&mut self, chunk: &mut DataChunkRef) -> VortexResult { + chunk.reset(); if self.remaining == 0 { return Ok(false); } - if self.fields.is_empty() { - // In the case of a projection pushdown with zero columns duckdb will ask us for the - // `EMPTY_COLUMN_IDX`, which we define as a bool column, we can leave the vector as - // uninitialized and just return a DataChunk with the correct length. - // One place no fields can occur is in count(*) queries. - chunk.set_len(self.remaining); - self.remaining = 0; - - return Ok(true); + let expected_cols = self.fields.len(); + let chunk_cols = chunk.column_count(); + let zero_projection = expected_cols == 0; + if !zero_projection && chunk_cols != expected_cols { + vortex_bail!("Expected {expected_cols} columns in output chunk, got {chunk_cols}"); } let chunk_len = DUCKDB_STANDARD_VECTOR_SIZE.min(self.remaining); @@ -94,6 +94,13 @@ impl ArrayExporter { self.remaining -= chunk_len; chunk.set_len(chunk_len); + // DuckDB asked us for zero columns. This may happen with aggregation + // functions like count(*). In such case we can leave chunk contents + // uninitialized. See EMPTY_COLUMN_IDX comment why this works. + if zero_projection { + return Ok(true); + } + for (i, field) in self.fields.iter_mut().enumerate() { field.export(position, chunk_len, chunk.get_vector_mut(i), &mut self.ctx)?; } diff --git a/vortex-duckdb/src/multi_file.rs b/vortex-duckdb/src/multi_file.rs index 3ce2ed3856e..0ad7708a05c 100644 --- a/vortex-duckdb/src/multi_file.rs +++ b/vortex-duckdb/src/multi_file.rs @@ -41,6 +41,7 @@ impl DataSourceTableFunction for VortexMultiFileScan { let glob_url_str = glob_url_parameter.as_string(); let glob_url = match Url::parse(glob_url_str.as_str()) { Ok(url) => Ok(url), + // TODO(myrrc): doesn't parse relative paths like FROM 'test.vortex' Err(_) => Url::from_file_path(Path::new(glob_url_str.as_str())) .map_err(|_| vortex_err!("Neither URL nor path: '{}' ", glob_url_str.as_str())), }?; diff --git a/vortex-sqllogictest/slt/duckdb/string-hasjoin.slt b/vortex-sqllogictest/slt/duckdb/string-hasjoin.slt deleted file mode 100644 index 38d2a01398a..00000000000 --- a/vortex-sqllogictest/slt/duckdb/string-hasjoin.slt +++ /dev/null @@ -1,21 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright the Vortex contributors - -# Check a string table hashjoin with itself works correctly. - -include ../setup.slt - -query I -COPY (SELECT '' as id) TO '$__TEST_DIR__/hashjoin.vortex'; ----- -1 - -query I -SELECT first(t) AS sales -FROM ( - SELECT id, 0 as t FROM read_vortex('$__TEST_DIR__/hashjoin.vortex') - UNION ALL SELECT id, 0 as t FROM read_vortex('$__TEST_DIR__/hashjoin.vortex') -) -GROUP BY id; ----- -0 diff --git a/vortex-sqllogictest/slt/projection-pushdown.slt b/vortex-sqllogictest/slt/projection-pushdown.slt new file mode 100644 index 00000000000..04618c9bd9b --- /dev/null +++ b/vortex-sqllogictest/slt/projection-pushdown.slt @@ -0,0 +1,157 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +# Check projection pushdowns with different sets of pre- and +# post-filter column set + +include ./setup.slt + +query I +COPY (SELECT 0 AS i) TO '$__TEST_DIR__/projection-pushdown.vortex'; +---- +1 + +query II +SELECT i, 0 AS t FROM '$__TEST_DIR__/projection-pushdown.vortex'; +---- +0 0 + +query I +SELECT i FROM (SELECT i, 8 AS t FROM '$__TEST_DIR__/projection-pushdown.vortex'); +---- +0 + +query I +SELECT t FROM (SELECT i, 8 AS t FROM '$__TEST_DIR__/projection-pushdown.vortex'); +---- +8 + +query I +SELECT 0 FROM '$__TEST_DIR__/projection-pushdown.vortex'; +---- +0 + +query I +COPY (SELECT 'ololo' AS id) TO '$__TEST_DIR__/projection-pushdown-id.vortex'; +---- +1 + +onlyif duckdb +query I +SELECT first(const) FROM ( + SELECT id, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id.vortex' + UNION ALL SELECT id, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id.vortex' +) +GROUP BY id; +---- +0 + +query TI +SELECT * FROM ( + SELECT id, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id.vortex' + UNION ALL SELECT id, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id.vortex' +) +---- +ololo 0 +ololo 0 + +onlyif duckdb +query TI +SELECT * FROM ( + SELECT id, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id.vortex' + UNION ALL SELECT 'd', 0 AS const +) +---- +ololo 0 +d 0 + +onlyif datafusion +query TI +SELECT * FROM ( + SELECT id, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id.vortex' + UNION ALL SELECT '', 0 AS const +) +ORDER BY id; +---- +(empty) 0 +ololo 0 + +onlyif duckdb +query I +SELECT first(t) AS sales +FROM ( + SELECT id, 0 AS t FROM '$__TEST_DIR__/projection-pushdown-id.vortex' + UNION ALL SELECT id, 0 AS t FROM '$__TEST_DIR__/projection-pushdown-id.vortex' +) +GROUP BY id; +---- +0 + +query I +COPY (SELECT 'ololo' AS id, '' AS id2) TO '$__TEST_DIR__/projection-pushdown-id2.vortex'; +---- +1 + +query I +SELECT 0 FROM '$__TEST_DIR__/projection-pushdown-id2.vortex'; +---- +0 + +onlyif duckdb +query TTI +SELECT * FROM ( + SELECT id, id2, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id2.vortex' + UNION ALL SELECT '' AS id, '' AS id2, 0 AS const +) +---- +ololo 0 + 0 + +onlyif datafusion +query TTI +SELECT * FROM ( + SELECT id, id2, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id2.vortex' + UNION ALL SELECT '' AS id, '' AS id2, 0 AS const +) +ORDER BY id; +---- +(empty) (empty) 0 +ololo (empty) 0 + +query I +SELECT count(*) FROM ( + SELECT id, id2, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id2.vortex' + UNION ALL SELECT '' AS id, '' AS id2, 0 AS const +) +---- +2 + +query TI +SELECT id, count(*) FROM ( + SELECT id, id2, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id2.vortex' + UNION ALL SELECT 'a', '', 0 AS const +) +GROUP BY id +ORDER BY id; +---- +a 1 +ololo 1 + +query T +SELECT id FROM ( + SELECT id, id2, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id2.vortex' + UNION ALL SELECT 'a', '', 0 AS const +) +ORDER BY id +---- +a +ololo + +query T +SELECT id FROM ( + SELECT id, id2, 0 AS const FROM '$__TEST_DIR__/projection-pushdown-id2.vortex' + WHERE id2 != '' + UNION ALL SELECT 'a', '', 0 AS const +) +---- +a