-
Notifications
You must be signed in to change notification settings - Fork 137
use full column set on empty post-filter projection list in duckdb #6831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ use custom_labels::CURRENT_LABELSET; | |
| use futures::StreamExt; | ||
| use itertools::Itertools; | ||
| use num_traits::AsPrimitive; | ||
| use tracing::debug; | ||
| use vortex::array::ArrayRef; | ||
| use vortex::array::Canonical; | ||
| use vortex::array::VortexSessionExecute; | ||
|
|
@@ -62,10 +63,19 @@ use crate::duckdb::VirtualColumnsResultRef; | |
| use crate::exporter::ArrayExporter; | ||
| use crate::exporter::ConversionCache; | ||
|
|
||
| // taken from duckdb/common/constants.h COLUMN_IDENTIFIER_EMPTY | ||
| // This is used by duckdb whenever there is no projection id in a logical_get node. | ||
| // For some reason we cannot return an empty DataChunk and duckdb will look for the virtual column | ||
| // with this index and create a data chunk with a single vector of that type. | ||
| /// Taken from | ||
| /// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/include/duckdb/common/constants.hpp#L44 | ||
| /// | ||
| /// If DuckDB requests a zero-column projection from read_vortex like count(*), | ||
| /// its planner tries to get any column: | ||
| /// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/planner/operator/logical_get.cpp#L149 | ||
| /// | ||
| /// If you define COLUMN_IDENTIFIER_EMPTY, planner takes it, otherwise the | ||
| /// first column. As we don't want to fill the output chunk and we can leave | ||
| /// it uninitialized in this case, we define COLUMN_IDENTIFIER_EMPTY as a | ||
| /// virtual column in our table function vtab's get_virtual_columns. | ||
| /// See vortex-duckdb/cpp/include/duckdb_vx/table_function.h | ||
| /// See virtual_columns in this file | ||
| static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; | ||
| static EMPTY_COLUMN_NAME: &str = ""; | ||
|
|
||
|
|
@@ -126,17 +136,19 @@ impl Debug for DataSourceBindData { | |
| } | ||
| } | ||
|
|
||
| type DataSourceIterator = ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>; | ||
|
|
||
| /// Global scan state for driving a `DataSource` scan through DuckDB. | ||
| pub struct DataSourceGlobal { | ||
| iterator: ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>, | ||
| iterator: DataSourceIterator, | ||
| batch_id: AtomicU64, | ||
| bytes_total: Arc<AtomicU64>, | ||
| bytes_read: AtomicU64, | ||
| } | ||
|
|
||
| /// Per-thread local scan state. | ||
| pub struct DataSourceLocal { | ||
| iterator: ThreadSafeIterator<VortexResult<(ArrayRef, Arc<ConversionCache>)>>, | ||
| iterator: DataSourceIterator, | ||
| exporter: Option<ArrayExporter>, | ||
| /// The unique batch id of the last chunk exported via scan(). | ||
| batch_id: Option<u64>, | ||
|
|
@@ -193,27 +205,29 @@ impl<T: DataSourceTableFunction> TableFunction for T { | |
| } | ||
|
|
||
| fn init_global(init_input: &TableInitInput<Self>) -> VortexResult<Self::GlobalState> { | ||
| debug!("table init input: {init_input:?}"); | ||
|
|
||
| let bind_data = init_input.bind_data(); | ||
| let projection_ids = init_input.projection_ids().unwrap_or(&[]); | ||
| let column_ids = init_input.column_ids(); | ||
| let projection_ids = init_input.projection_ids(); | ||
| let post_filter_projection_ids = init_input.post_filter_projection_ids(); | ||
|
|
||
| let projection_expr = | ||
| extract_projection_expr(projection_ids, column_ids, &bind_data.column_names); | ||
| let projection_expr = extract_projection_expr( | ||
| post_filter_projection_ids, | ||
| projection_ids, | ||
| &bind_data.column_names, | ||
| ); | ||
| let filter_expr = extract_table_filter_expr( | ||
| init_input.table_filter_set(), | ||
| column_ids, | ||
| projection_ids, | ||
| &bind_data.column_names, | ||
| &bind_data.filter_exprs, | ||
| bind_data.data_source.dtype(), | ||
| )?; | ||
|
|
||
| tracing::debug!( | ||
| "Global init Vortex scan SELECT {} WHERE {}", | ||
| &projection_expr, | ||
| filter_expr | ||
| .as_ref() | ||
| .map_or_else(|| "true".to_string(), |f| f.to_string()) | ||
| ); | ||
| let filter_expr_str = filter_expr | ||
| .as_ref() | ||
| .map_or_else(|| "true".to_string(), |f| f.to_string()); | ||
| debug!("Global init Vortex scan SELECT {projection_expr} WHERE {filter_expr_str}"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will this not alloc the str on even if debug ignores it? |
||
|
|
||
| let request = ScanRequest { | ||
| projection: projection_expr, | ||
|
|
@@ -318,6 +332,7 @@ impl<T: DataSourceTableFunction> TableFunction for T { | |
| let (array_result, conversion_cache) = result?; | ||
|
|
||
| let array_result = array_result.optimize_recursive()?; | ||
|
|
||
| let array_result = if let Some(array) = array_result.as_opt::<StructVTable>() { | ||
| array.clone() | ||
| } else if let Some(array) = array_result.as_opt::<ScalarFnVTable>() | ||
|
|
@@ -455,35 +470,45 @@ fn extract_schema_from_dtype(dtype: &DType) -> VortexResult<(Vec<String>, Vec<Lo | |
|
|
||
| /// Creates a projection expression from raw projection/column ID slices and column names. | ||
| fn extract_projection_expr( | ||
| post_filter_projection_ids: Option<&[u64]>, | ||
| projection_ids: &[u64], | ||
| column_ids: &[u64], | ||
| column_names: &[String], | ||
| projection_names: &[String], | ||
| ) -> Expression { | ||
| select( | ||
| projection_ids | ||
| .iter() | ||
| .map(|p| { | ||
| let idx: usize = p.as_(); | ||
| let val: usize = column_ids[idx].as_(); | ||
| val | ||
| }) | ||
| .map(|idx| { | ||
| column_names | ||
| .get(idx) | ||
| .vortex_expect("prune idx in column names") | ||
| }) | ||
| .map(|s| Arc::from(s.as_str())) | ||
| .collect::<FieldNames>(), | ||
| root(), | ||
| ) | ||
| // Post filter projection ids may be empty, in which case | ||
| // you need to use projection_ids | ||
| // https://github.com/duckdb/duckdb/blob/6e211da91657a94803c465fd0ce585f4c6754b54/src/planner/operator/logical_get.cpp#L168 | ||
| let (post_filter_projection_ids, has_projection_ids) = match post_filter_projection_ids { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so this is a projection_id then? |
||
| Some(ids) => (ids, true), | ||
| None => (projection_ids, false), | ||
| }; | ||
|
|
||
| let names = post_filter_projection_ids | ||
| .iter() | ||
| .filter(|p| **p != EMPTY_COLUMN_IDX) | ||
| .map(|p| { | ||
| let mut idx: usize = p.as_(); | ||
| if has_projection_ids { | ||
| idx = projection_ids[idx].as_(); | ||
| } | ||
| idx | ||
| }) | ||
| .map(|idx| { | ||
| projection_names | ||
| .get(idx) | ||
| .vortex_expect("prune idx in column names") | ||
| }) | ||
| .map(|s| Arc::from(s.as_str())) | ||
| .collect::<FieldNames>(); | ||
|
|
||
| select(names, root()) | ||
| } | ||
|
|
||
| /// Creates a table filter expression from the table filter set, column metadata, additional | ||
| /// filter expressions, and the top-level DType. | ||
| fn extract_table_filter_expr( | ||
| table_filter_set: Option<&TableFilterSetRef>, | ||
| column_ids: &[u64], | ||
| column_names: &[String], | ||
| projection_ids: &[u64], | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i find this naming more confusing, column seems to make more sense here. i think |
||
| projection_names: &[String], | ||
| additional_filters: &[Expression], | ||
| dtype: &DType, | ||
| ) -> VortexResult<Option<Expression>> { | ||
|
|
@@ -492,8 +517,8 @@ fn extract_table_filter_expr( | |
| .into_iter() | ||
| .map(|(idx, ex)| { | ||
| let idx_u: usize = idx.as_(); | ||
| let col_idx: usize = column_ids[idx_u].as_(); | ||
| let name = column_names.get(col_idx).vortex_expect("exists"); | ||
| let col_idx: usize = projection_ids[idx_u].as_(); | ||
| let name = projection_names.get(col_idx).vortex_expect("exists"); | ||
| try_from_table_filter(ex, &col(name.as_str()), dtype) | ||
| }) | ||
| .collect::<VortexResult<Option<HashSet<_>>>>()? | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,8 @@ use crate::duckdb::VectorRef; | |
|
|
||
| pub struct ArrayExporter { | ||
| ctx: ExecutionCtx, | ||
| /// Columns DuckDB requested to read from file. If empty, it's a zero-column | ||
| /// projection and should be handled accordingly, see ArrayExporter::export. | ||
| fields: Vec<Box<dyn ColumnExporter>>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could Model this with an enum? It might be easier to understand?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've thought about this, but then we need something like NonzeroVec, and I believe it's an overkill. |
||
| array_len: usize, | ||
| remaining: usize, | ||
|
|
@@ -62,6 +64,7 @@ impl ArrayExporter { | |
| .iter() | ||
| .map(|field| new_array_exporter(field.clone(), cache, &mut ctx)) | ||
| .collect::<VortexResult<Vec<_>>>()?; | ||
|
|
||
| Ok(Self { | ||
| ctx, | ||
| fields, | ||
|
|
@@ -74,26 +77,30 @@ impl ArrayExporter { | |
| /// | ||
| /// Returns `true` if a chunk was exported, `false` if all rows have been exported. | ||
| pub fn export(&mut self, chunk: &mut DataChunkRef) -> VortexResult<bool> { | ||
| chunk.reset(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you sure we need this? why if we estimate card |
||
| if self.remaining == 0 { | ||
| return Ok(false); | ||
| } | ||
|
|
||
| if self.fields.is_empty() { | ||
| // In the case of a projection pushdown with zero columns duckdb will ask us for the | ||
| // `EMPTY_COLUMN_IDX`, which we define as a bool column, we can leave the vector as | ||
| // uninitialized and just return a DataChunk with the correct length. | ||
| // One place no fields can occur is in count(*) queries. | ||
| chunk.set_len(self.remaining); | ||
| self.remaining = 0; | ||
|
|
||
| return Ok(true); | ||
| let expected_cols = self.fields.len(); | ||
| let chunk_cols = chunk.column_count(); | ||
| let zero_projection = expected_cols == 0; | ||
| if !zero_projection && chunk_cols != expected_cols { | ||
| vortex_bail!("Expected {expected_cols} columns in output chunk, got {chunk_cols}"); | ||
| } | ||
|
|
||
| let chunk_len = DUCKDB_STANDARD_VECTOR_SIZE.min(self.remaining); | ||
| let position = self.array_len - self.remaining; | ||
| self.remaining -= chunk_len; | ||
| chunk.set_len(chunk_len); | ||
|
|
||
| // DuckDB asked us for zero columns. This may happen with aggregation | ||
| // functions like count(*). In such case we can leave chunk contents | ||
| // uninitialized. See EMPTY_COLUMN_IDX comment why this works. | ||
| if zero_projection { | ||
| return Ok(true); | ||
| } | ||
|
|
||
| for (i, field) in self.fields.iter_mut().enumerate() { | ||
| field.export(position, chunk_len, chunk.get_vector_mut(i), &mut self.ctx)?; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i really think we should keep the naming here and explain the semantics.
It nice to keep the duckdb name