From ce1f621979ba38ed1f634d7edc97f0e53db8a6d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Mon, 18 May 2026 17:12:21 +0300 Subject: [PATCH 01/21] Performance investigation: GROUP BY aggregation fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Key changes: - query_executor.cpp: Remove parallel_ requirement from use_vectorized gate (Fix 3: enable vectorized path for large scans without parallel mode) - query_executor.cpp: Resolve input_col_idx from aggregate function arguments (Fix 5: enable SUM/AVG to work on specific columns, not just COUNT(*)) - vectorized_operator.hpp: Add AVG support in update_accumulators and produce_output_batch - operator.cpp: Replace std::map with std::unordered_map + binary key encoding (Fix 1+2: O(log n) → O(1) hash, string concat → length-prefixed binary) Benchmark baseline (Q1 @ 100k rows): - Before fixes: ~152k rows/sec - After fixes: ~166k rows/sec - DuckDB: ~197M rows/sec The vectorized path is now enabled but VectorizedGroupByOperator needs further optimization (batch-oriented key construction) to close gap. --- benchmarks/duckdb_comparison_bench.cpp | 1 + include/executor/vectorized_operator.hpp | 15 ++++++++++++++- src/executor/operator.cpp | 20 ++++++++++++++++---- src/executor/query_executor.cpp | 15 +++++++++++++-- 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/benchmarks/duckdb_comparison_bench.cpp b/benchmarks/duckdb_comparison_bench.cpp index 6bf42a47..b9b4a45f 100644 --- a/benchmarks/duckdb_comparison_bench.cpp +++ b/benchmarks/duckdb_comparison_bench.cpp @@ -54,6 +54,7 @@ struct CloudSQLContext { txn_manager = std::make_unique(*lock_manager, *catalog, *bpm); executor = std::make_unique(*catalog, *bpm, *lock_manager, *txn_manager); executor->set_local_only(true); + executor->set_storage_manager(storage.get()); // Enable use_vectorized for large scans // Create lineitem table (TPC-H schema, simplified) CreateTableStatement create_stmt; diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 8f161d03..e69a2bc1 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -441,9 +441,11 @@ class VectorizedGroupByOperator : public VectorizedOperator { if (agg.type == AggregateType::Count && agg.input_col_idx < 0) { // COUNT(*) - always increment state.counts[i]++; - } else if (agg.type == AggregateType::Sum && agg.input_col_idx >= 0) { + } else if ((agg.type == AggregateType::Sum || agg.type == AggregateType::Avg) && + agg.input_col_idx >= 0) { auto& col = batch.get_column(agg.input_col_idx); if (!col.is_null(row_idx)) { + state.counts[i]++; // Track count for AVG if (col.type() == common::ValueType::TYPE_INT64) { auto& num_col = dynamic_cast&>(col); state.sums_int64[i] += num_col.raw_data()[row_idx]; @@ -525,6 +527,17 @@ class VectorizedGroupByOperator : public VectorizedOperator { case AggregateType::Max: out_batch.get_column(col_idx).append(state.maxes[i]); break; + case AggregateType::Avg: + if (state.counts[i] > 0) { + double avg_val = state.has_float_value_[i] + ? state.sums_float64[i] / static_cast(state.counts[i]) + : static_cast(state.sums_int64[i]) / + static_cast(state.counts[i]); + out_batch.get_column(col_idx).append(common::Value::make_float64(avg_val)); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; default: out_batch.get_column(col_idx).append(common::Value::make_null()); break; diff --git a/src/executor/operator.cpp b/src/executor/operator.cpp index 4af7dc06..bc58f735 100644 --- a/src/executor/operator.cpp +++ b/src/executor/operator.cpp @@ -562,7 +562,8 @@ bool AggregateOperator::open() { } }; - std::map groups_map; + std::unordered_map groups_map; + groups_map.reserve(1024); // Pre-reserve to avoid rehashing during insert const bool is_global = group_by_.empty(); /* Pre-initialize if global aggregation */ @@ -573,17 +574,28 @@ bool AggregateOperator::open() { Tuple tuple; auto child_schema = child_->output_schema(); while (child_->next(tuple)) { - std::string key = "GLOBAL"; + std::string key; + key.reserve(64); // Pre-reserve to avoid repeated allocations std::vector gb_vals; if (!is_global) { - key = ""; for (const auto& gb : group_by_) { auto val = gb ? gb->evaluate(&tuple, &child_schema, get_params()) : common::Value::make_null(); - key += val.to_string() + "|"; + // Binary key encoding: type tag + length + data (no string allocation) + if (val.is_null()) { + key.append("\1NULL\0", 6); + } else { + std::string val_str = val.to_string(); + key.push_back('\0'); // non-NULL marker + uint32_t len = static_cast(val_str.size()); + key.append(reinterpret_cast(&len), 4); + key.append(val_str); + } gb_vals.push_back(std::move(val)); } + } else { + key = "GLOBAL"; } auto it = groups_map.find(key); diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index f3fea5ba..8cb322bd 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -407,7 +407,7 @@ QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, // Cost-based Volcano/Vectorized chooser using row estimates bool use_vectorized = false; - if (parallel_ && storage_manager_ && !has_sort_or_limit) { + if (storage_manager_ && !has_sort_or_limit) { // Extract table name from FROM clause (only for simple column refs) // Fall through to Volcano for JOINs, subqueries, and aliased tables const auto* from_expr = stmt.from(); @@ -1694,7 +1694,18 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( info.type = AggregateType::Max; else info.type = AggregateType::Avg; - info.input_col_idx = -1; // default + // Resolve input column index from aggregate function arguments + info.input_col_idx = -1; // default: COUNT(*) + if (!func->args().empty()) { + const auto& arg = func->args()[0]; + if (arg->type() == parser::ExprType::Column) { + const auto* col = dynamic_cast(arg.get()); + if (col != nullptr) { + info.input_col_idx = + static_cast(current_root->output_schema().find_column(col->name())); + } + } + } agg_infos.push_back(info); } } From d295dc04fc565a811f7a3340b25f5074ce5679e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Wed, 20 May 2026 00:28:54 +0300 Subject: [PATCH 02/21] Re-implement parallel scan with ThreadPool and steal() method This commits the full parallel scan implementation: - VectorizedSeqScanOperator now accepts optional ThreadPool parameter - When enabled (>50k rows, >1 thread), splits table into range chunks and reads each range in parallel via thread pool - Added steal() to ColumnVector/NumericVector/StringVector for O(1) batch transfer via vector swap (avoids per-element copy) - Parallel results are collected and returned via steal() to out_batch The slow benchmark (2.6M/s) vs fast (598M/s for Q6/10k) suggests the GROUP BY aggregation is the bottleneck, not I/O. The parallel scan overhead (file opens/closes per task) likely exceeds I/O benefit at 100k row scale. --- include/executor/types.hpp | 23 +++++++ include/executor/vectorized_operator.hpp | 79 +++++++++++++++++++++++- include/storage/memory_mapped_column.hpp | 60 ++++++++++++++++++ src/executor/query_executor.cpp | 3 +- src/storage/memory_mapped_column.cpp | 76 +++++++++++++++++++++++ 5 files changed, 237 insertions(+), 4 deletions(-) create mode 100644 include/storage/memory_mapped_column.hpp create mode 100644 src/storage/memory_mapped_column.cpp diff --git a/include/executor/types.hpp b/include/executor/types.hpp index 88e5452e..81b51fb0 100644 --- a/include/executor/types.hpp +++ b/include/executor/types.hpp @@ -243,6 +243,13 @@ class ColumnVector { size_ = 0; null_bitmap_.clear(); } + + /** + * @brief Steals data from another column vector by swapping internal buffers. + * After steal(), 'other' is emptied and 'this' holds the original data from 'other'. + * Throws std::runtime_error if types are incompatible. + */ + virtual void steal(ColumnVector&& other) = 0; }; /** @@ -328,6 +335,14 @@ class NumericVector : public ColumnVector { ColumnVector::clear(); data_.clear(); } + + void steal(ColumnVector&& other) override { + auto* other_num = dynamic_cast*>(&other); + if (!other_num) throw std::runtime_error("NumericVector::steal: type mismatch"); + data_.swap(other_num->data_); + null_bitmap_.swap(other_num->null_bitmap_); + std::swap(size_, other_num->size_); + } }; /** @@ -392,6 +407,14 @@ class StringVector : public ColumnVector { ColumnVector::clear(); data_.clear(); } + + void steal(ColumnVector&& other) override { + auto* other_str = dynamic_cast(&other); + if (!other_str) throw std::runtime_error("StringVector::steal: type mismatch"); + data_.swap(other_str->data_); + null_bitmap_.swap(other_str->null_bitmap_); + std::swap(size_, other_str->size_); + } }; /** diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index e69a2bc1..12aacdd7 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -63,15 +63,35 @@ class VectorizedSeqScanOperator : public VectorizedOperator { std::string table_name_; std::shared_ptr table_; uint64_t current_row_ = 0; - uint32_t batch_size_ = 1024; + uint32_t batch_size_ = 4096; + std::shared_ptr thread_pool_; + bool parallel_enabled_ = false; + size_t num_threads_ = 1; + std::vector> parallel_results_; + size_t parallel_idx_ = 0; public: - VectorizedSeqScanOperator(std::string table_name, std::shared_ptr table) + VectorizedSeqScanOperator(std::string table_name, std::shared_ptr table, + std::shared_ptr thread_pool = nullptr) : VectorizedOperator(table->schema()), table_name_(std::move(table_name)), - table_(std::move(table)) {} + table_(std::move(table)), + thread_pool_(std::move(thread_pool)) { + if (thread_pool_ && thread_pool_->num_threads() > 1) { + num_threads_ = thread_pool_->num_threads(); + parallel_enabled_ = table_->row_count() > 50000; + } + } bool next_batch(VectorBatch& out_batch) override { + if (!parallel_enabled_ || !thread_pool_) { + return next_batch_sequential(out_batch); + } + return next_batch_parallel(out_batch); + } + + private: + bool next_batch_sequential(VectorBatch& out_batch) { if (current_row_ >= table_->row_count()) { return false; } @@ -82,6 +102,59 @@ class VectorizedSeqScanOperator : public VectorizedOperator { } return false; } + + bool next_batch_parallel(VectorBatch& out_batch) { + if (parallel_idx_ >= parallel_results_.size()) { + size_t total_rows = table_->row_count(); + if (current_row_ >= total_rows) { + return false; + } + + size_t range_size = (total_rows - current_row_ + num_threads_ - 1) / num_threads_; + + parallel_results_.clear(); + parallel_idx_ = 0; + + std::vector task_starts; + task_starts.reserve(num_threads_); + + for (size_t t = 0; t < num_threads_ && current_row_ < total_rows; ++t) { + size_t start = current_row_; + task_starts.push_back(start); + size_t end = std::min(start + range_size, total_rows); + current_row_ = end; + + auto batch = VectorBatch::create(output_schema_); + parallel_results_.push_back(std::move(batch)); + } + + for (size_t t = 0; t < task_starts.size(); ++t) { + size_t start = task_starts[t]; + size_t rows_to_read = std::min(range_size, total_rows - start); + if (start >= total_rows) { + parallel_results_[t]->set_row_count(0); + continue; + } + thread_pool_->submit([this, t, start, rows_to_read]() { + table_->read_batch(start, static_cast(rows_to_read), *parallel_results_[t]); + }); + } + + thread_pool_->wait(); + } + + if (parallel_idx_ < parallel_results_.size()) { + auto& src = *parallel_results_[parallel_idx_]; + out_batch.init_from_schema(output_schema_); + for (size_t c = 0; c < src.column_count(); ++c) { + out_batch.get_column(c).steal(std::move(src.get_column(c))); + } + out_batch.set_row_count(src.row_count()); + parallel_idx_++; + return out_batch.row_count() > 0; + } + return false; + } }; /** diff --git a/include/storage/memory_mapped_column.hpp b/include/storage/memory_mapped_column.hpp new file mode 100644 index 00000000..0cfbbdae --- /dev/null +++ b/include/storage/memory_mapped_column.hpp @@ -0,0 +1,60 @@ +/** + * @file memory_mapped_column.hpp + * @brief Memory-mapped column data for zero-copy reads + */ + +#pragma once + +#include +#include + +namespace cloudsql::storage { + +/** + * @brief Memory-mapped column data for zero-copy reads. + * + * Maps a column's `.data.bin` and `.nulls.bin` files into memory. + * For fixed-width types, provides O(1) access to any row via pointer arithmetic. + */ +class MemoryMappedColumn { +public: + struct MappedRegion { + void* addr; // mmap'd base address + size_t size; // mapped size in bytes + int fd; // file descriptor (for munmap) + }; + +private: + MappedRegion data_region_{nullptr, 0, -1}; + MappedRegion null_region_{nullptr, 0, -1}; + size_t element_size_ = 0; // stride for fixed-width columns + bool is_fixed_width_ = false; + size_t row_count_ = 0; + +public: + ~MemoryMappedColumn() { unmap(); } + + // Map a column's data and nulls files. Returns true on success. + bool map(const std::string& data_path, const std::string& null_path, + size_t element_size, size_t row_count); + + // Unmap and release resources + void unmap(); + + // Direct pointer to element at row index (fixed-width only) + const void* data_at(size_t row_idx) const { + if (!data_region_.addr) return nullptr; + return static_cast(data_region_.addr) + row_idx * element_size_; + } + + // Direct pointer to null bit at row index + const uint8_t* null_at(size_t row_idx) const { + if (!null_region_.addr) return nullptr; + return static_cast(null_region_.addr) + row_idx; + } + + bool is_mapped() const { return data_region_.addr != nullptr; } + size_t row_count() const { return row_count_; } +}; + +} // namespace cloudsql::storage \ No newline at end of file diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index 8cb322bd..38cad72d 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -1508,8 +1508,9 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( return nullptr; // Table not found or not columnar } + auto thread_pool = std::make_shared(std::thread::hardware_concurrency()); std::unique_ptr current_root = - std::make_unique(base_table_name, col_table); + std::make_unique(base_table_name, col_table, thread_pool); // Track estimated output rows for join reordering decisions uint64_t current_est_rows = optimizer::RowEstimator::estimate_scan_rows(*base_table_meta); diff --git a/src/storage/memory_mapped_column.cpp b/src/storage/memory_mapped_column.cpp new file mode 100644 index 00000000..cd078257 --- /dev/null +++ b/src/storage/memory_mapped_column.cpp @@ -0,0 +1,76 @@ +/** + * @file memory_mapped_column.cpp + * @brief Memory-mapped column implementation + */ + +#include "storage/memory_mapped_column.hpp" +#include +#include +#include +#include +#include + +namespace cloudsql::storage { + +bool MemoryMappedColumn::map(const std::string& data_path, const std::string& null_path, + size_t element_size, size_t row_count) { + // Map data file + int data_fd = ::open(data_path.c_str(), O_RDONLY); + if (data_fd < 0) return false; + + struct stat st; + if (fstat(data_fd, &st) < 0) { + ::close(data_fd); + return false; + } + + void* data_ptr = mmap(nullptr, st.st_size, PROT_READ, MAP_PRIVATE, data_fd, 0); + if (data_ptr == MAP_FAILED) { + ::close(data_fd); + return false; + } + + data_region_ = {data_ptr, static_cast(st.st_size), data_fd}; + + // Map nulls file + int null_fd = ::open(null_path.c_str(), O_RDONLY); + if (null_fd < 0) { + unmap(); + return false; + } + + if (fstat(null_fd, &st) < 0) { + ::close(null_fd); + unmap(); + return false; + } + + void* null_ptr = mmap(nullptr, st.st_size, PROT_READ, MAP_PRIVATE, null_fd, 0); + if (null_ptr == MAP_FAILED) { + ::close(null_fd); + unmap(); + return false; + } + + null_region_ = {null_ptr, static_cast(st.st_size), null_fd}; + + element_size_ = element_size; + is_fixed_width_ = true; + row_count_ = row_count; + return true; +} + +void MemoryMappedColumn::unmap() { + if (data_region_.addr) { + munmap(data_region_.addr, data_region_.size); + ::close(data_region_.fd); + data_region_ = {nullptr, 0, -1}; + } + if (null_region_.addr) { + munmap(null_region_.addr, null_region_.size); + ::close(null_region_.fd); + null_region_ = {nullptr, 0, -1}; + } +} + +} // namespace cloudsql::storage \ No newline at end of file From d03493517e44df229801eb7298205fb892324031 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 21 May 2026 13:55:51 +0300 Subject: [PATCH 03/21] Add DirectIndexAgg for low-cardinality integer GROUP BY Implements direct array indexing for integer GROUP BY keys instead of string-based hashing. When GROUP BY is on a single integer column, uses slot = key - min_key for O(1) lookup without hash computation. Also adds steal() method to ColumnVector/NumericVector/StringVector for O(1) batch transfers via vector swap. Note: Benchmark still shows ~2.6M/s suggesting the optimization path may not be triggered, or there's another bottleneck in the execution path that needs investigation. --- include/executor/vectorized_operator.hpp | 225 ++++++++++++++++++++++- 1 file changed, 223 insertions(+), 2 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 12aacdd7..a243a676 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -363,6 +363,86 @@ class VectorizedAggregateOperator : public VectorizedOperator { } }; +/** + * @brief Direct-indexed aggregation for low-cardinality integer GROUP BY. + * + * When the number of distinct GROUP BY values is small, we can use a + * simple vector indexed by key value rather than a hash table. This avoids: + * - Hash computation per row + * - Hash table probing and collision handling + * - String key allocation and comparison + * + * For each row: slot_idx = (key - min_key) where min_key is the + * minimum key value observed. This gives O(1) direct indexing. + */ +class DirectIndexAgg { + public: + static constexpr size_t MAX_AGGREGATES = 8; + static constexpr size_t MAX_GROUP_KEYS = 2; + + private: + struct GroupSlot { + bool valid = false; + int64_t key1 = 0; + int64_t key2 = 0; + int64_t counts[MAX_AGGREGATES] = {0}; + int64_t sums_int64[MAX_AGGREGATES] = {0}; + double sums_float64[MAX_AGGREGATES / 2] = {0.0}; + bool has_float_value[MAX_AGGREGATES] = {false}; + }; + + std::vector slots_; + mutable size_t max_aggregates_ = 0; + mutable size_t max_group_keys_ = 0; + mutable int64_t min_key_ = INT64_MAX; + mutable int64_t max_key_ = INT64_MIN; + + // Track valid slots for iteration + std::vector valid_slot_indices_; + + public: + void init(size_t capacity_hint, size_t max_aggregates, size_t max_group_keys = 1) { + max_aggregates_ = max_aggregates; + max_group_keys_ = max_group_keys; + min_key_ = INT64_MAX; + max_key_ = INT64_MIN; + slots_.resize(capacity_hint); + valid_slot_indices_.reserve(capacity_hint); + } + + GroupSlot& slot(size_t idx) { return slots_[idx]; } + const GroupSlot& slot(size_t idx) const { return slots_[idx]; } + + size_t find_or_insert(int64_t key1, int64_t key2 = 0) { + // Expand if key outside current range + if (key1 < min_key_ || key1 > max_key_) { + if (key1 < min_key_) min_key_ = key1; + if (key1 > max_key_) max_key_ = key1; + size_t new_size = static_cast(max_key_ - min_key_ + 1); + if (new_size > slots_.size()) { + size_t alloc_size = 1; + while (alloc_size < new_size) alloc_size *= 2; + slots_.resize(alloc_size); + } + } + size_t idx = static_cast(key1 - min_key_); + return idx; + } + + size_t group_count() const { + size_t count = 0; + for (const auto& s : slots_) { + if (s.valid) ++count; + } + return count; + } + + const std::vector& valid_slots() const { return valid_slot_indices_; } + + int64_t min_key() const { return min_key_; } + int64_t max_key() const { return max_key_; } +}; + /** * @brief Group state for vectorized GROUP BY - accumulator data per group */ @@ -415,6 +495,11 @@ class VectorizedGroupByOperator : public VectorizedOperator { std::unique_ptr input_batch_; std::unique_ptr group_key_batch_; + // Direct-index aggregation (for low-cardinality integer GROUP BY) + DirectIndexAgg agg_; + bool is_direct_indexable_ = false; + std::vector direct_group_keys_; // Ordered keys for direct index output + public: VectorizedGroupByOperator(std::unique_ptr child, std::vector> group_by, @@ -432,6 +517,20 @@ class VectorizedGroupByOperator : public VectorizedOperator { group_by_col_indices_.push_back(col_idx); } + // Check if we can use direct indexing (single integer GROUP BY column) + bool is_int_key = (group_by_col_indices_[0] != static_cast(-1)); + if (is_int_key) { + auto col_type = schema.get_column(group_by_col_indices_[0]).type(); + is_int_key = (col_type == common::ValueType::TYPE_INT64 || + col_type == common::ValueType::TYPE_INT32 || + col_type == common::ValueType::TYPE_INT16 || + col_type == common::ValueType::TYPE_INT8); + } + is_direct_indexable_ = (group_by_.size() == 1 && is_int_key); + if (is_direct_indexable_) { + agg_.init(65536, aggregates_.size(), group_by_.size()); + } + // Create schema for group key evaluation Schema key_schema; for (size_t i = 0; i < group_by_.size(); ++i) { @@ -455,9 +554,57 @@ class VectorizedGroupByOperator : public VectorizedOperator { private: void process_input_batch(VectorBatch& batch) { - // For each row, compute hash key using collision-safe encoding + if (is_direct_indexable_) { + process_input_batch_direct(batch); + } else { + process_input_batch_hash(batch); + } + } + + void process_input_batch_direct(VectorBatch& batch) { + // Fast path: direct integer key indexing + const size_t gb_col_idx = group_by_col_indices_[0]; + const auto& gb_col = batch.get_column(gb_col_idx); + + for (size_t r = 0; r < batch.row_count(); ++r) { + int64_t key = gb_col.get(r).to_int64(); + size_t slot_idx = agg_.find_or_insert(key, 0); + auto& slot = agg_.slot(slot_idx); + + if (!slot.valid) { + slot.valid = true; + slot.key1 = key; + direct_group_keys_.push_back(key); + } + + // Update accumulators directly in slot + for (size_t i = 0; i < aggregates_.size(); ++i) { + const auto& agg = aggregates_[i]; + if (agg.type == AggregateType::Count && agg.input_col_idx < 0) { + slot.counts[i]++; + } else if ((agg.type == AggregateType::Sum || agg.type == AggregateType::Avg) && + agg.input_col_idx >= 0) { + const auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(r)) { + slot.counts[i]++; + if (col.type() == common::ValueType::TYPE_INT64) { + auto& num_col = dynamic_cast&>(col); + slot.sums_int64[i] += num_col.raw_data()[r]; + } else if (col.type() == common::ValueType::TYPE_FLOAT64) { + auto& num_col = dynamic_cast&>(col); + slot.sums_float64[i] += num_col.raw_data()[r]; + slot.has_float_value[i] = true; + } + } + } + } + } + input_batch_->clear(); + } + + void process_input_batch_hash(VectorBatch& batch) { + // Build key using length-prefixed, type-tagged encoding for (size_t r = 0; r < batch.row_count(); ++r) { - // Build key using length-prefixed, type-tagged encoding std::string key; for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { size_t col_idx = group_by_col_indices_[i]; @@ -547,6 +694,80 @@ class VectorizedGroupByOperator : public VectorizedOperator { } bool produce_output_batch(VectorBatch& out_batch) { + if (is_direct_indexable_) { + return produce_output_batch_direct(out_batch); + } + return produce_output_batch_hash(out_batch); + } + + bool produce_output_batch_direct(VectorBatch& out_batch) { + if (current_group_idx_ >= direct_group_keys_.size()) { + return false; // EOF + } + + out_batch.clear(); + if (out_batch.column_count() == 0) { + out_batch.init_from_schema(output_schema_); + } + + constexpr size_t BATCH_SIZE = 1024; + size_t output_count = 0; + + // Iterate through direct_group_keys_ and emit groups + while (current_group_idx_ < direct_group_keys_.size() && output_count < BATCH_SIZE) { + int64_t key = direct_group_keys_[current_group_idx_]; + size_t slot_idx = static_cast(key - agg_.min_key()); + const auto& slot = agg_.slot(slot_idx); + + // Append group key column + out_batch.get_column(0).append(common::Value::make_int64(key)); + + // Append aggregate result columns + for (size_t i = 0; i < aggregates_.size(); ++i) { + size_t col_idx = group_by_.size() + i; + switch (aggregates_[i].type) { + case AggregateType::Count: + out_batch.get_column(col_idx).append(common::Value::make_int64(slot.counts[i])); + break; + case AggregateType::Sum: + if (output_schema_.get_column(col_idx).type() == + common::ValueType::TYPE_INT64) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(slot.sums_int64[i])); + } else { + double float_val = slot.has_float_value[i] + ? slot.sums_float64[i] + : static_cast(slot.sums_int64[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(float_val)); + } + break; + case AggregateType::Avg: + if (slot.counts[i] > 0) { + double avg_val = slot.has_float_value[i] + ? slot.sums_float64[i] / static_cast(slot.counts[i]) + : static_cast(slot.sums_int64[i]) / + static_cast(slot.counts[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(avg_val)); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; + default: + out_batch.get_column(col_idx).append(common::Value::make_null()); + break; + } + } + output_count++; + current_group_idx_++; + } + + out_batch.set_row_count(output_count); + return true; + } + + bool produce_output_batch_hash(VectorBatch& out_batch) { if (current_group_idx_ >= group_keys_.size()) { return false; // EOF } From d86ab761a798571f4cf558c7c9bab56aedf9bdb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 21 May 2026 15:54:29 +0300 Subject: [PATCH 04/21] Fix vectorized path activation for GROUP BY with unanalyzed tables The estimated_rows variable was being redeclared inside a nested block, causing the GROUP BY heuristic check to always see stale (0) values. Also adds the condition to enable vectorized path when GROUP BY exists but num_rows==0 (ANALYZE not run), allowing DirectIndexAgg optimization to activate. --- src/executor/query_executor.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index 38cad72d..3972a073 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -407,6 +407,7 @@ QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, // Cost-based Volcano/Vectorized chooser using row estimates bool use_vectorized = false; + uint64_t estimated_rows = 0; if (storage_manager_ && !has_sort_or_limit) { // Extract table name from FROM clause (only for simple column refs) // Fall through to Volcano for JOINs, subqueries, and aliased tables @@ -418,7 +419,7 @@ QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, const auto* table_meta = table_meta_opt.value(); // Start with scan estimate as baseline; filter selectivity will override if // eligible - uint64_t estimated_rows = optimizer::RowEstimator::estimate_scan_rows(*table_meta); + estimated_rows = optimizer::RowEstimator::estimate_scan_rows(*table_meta); // Use filter selectivity when WHERE clause is simple and stats available if (stmt.where() && stmt.where()->type() == parser::ExprType::Binary) { @@ -450,7 +451,10 @@ QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, } // Use Vectorized for large scans (>10k rows — heuristic crossover point) - use_vectorized = estimated_rows > kVectorizedRowThreshold; + // Force vectorized for GROUP BY when ANALYZE hasn't been run (num_rows=0) + // to benefit from DirectIndexAgg optimization + use_vectorized = (estimated_rows > kVectorizedRowThreshold) || + (!stmt.group_by().empty() && table_meta->num_rows == 0); } } } From 864bbca7a453f642f56a81b8fc33d8a579fe2449 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Thu, 21 May 2026 12:55:04 +0000 Subject: [PATCH 05/21] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 31 ++++++++++++++---------- include/storage/memory_mapped_column.hpp | 14 +++++------ src/executor/query_executor.cpp | 4 +-- src/storage/memory_mapped_column.cpp | 6 +++-- 4 files changed, 31 insertions(+), 24 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index a243a676..daeb9339 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -136,7 +136,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator { continue; } thread_pool_->submit([this, t, start, rows_to_read]() { - table_->read_batch(start, static_cast(rows_to_read), *parallel_results_[t]); + table_->read_batch(start, static_cast(rows_to_read), + *parallel_results_[t]); }); } @@ -727,7 +728,8 @@ class VectorizedGroupByOperator : public VectorizedOperator { size_t col_idx = group_by_.size() + i; switch (aggregates_[i].type) { case AggregateType::Count: - out_batch.get_column(col_idx).append(common::Value::make_int64(slot.counts[i])); + out_batch.get_column(col_idx).append( + common::Value::make_int64(slot.counts[i])); break; case AggregateType::Sum: if (output_schema_.get_column(col_idx).type() == @@ -736,18 +738,19 @@ class VectorizedGroupByOperator : public VectorizedOperator { common::Value::make_int64(slot.sums_int64[i])); } else { double float_val = slot.has_float_value[i] - ? slot.sums_float64[i] - : static_cast(slot.sums_int64[i]); + ? slot.sums_float64[i] + : static_cast(slot.sums_int64[i]); out_batch.get_column(col_idx).append( common::Value::make_float64(float_val)); } break; case AggregateType::Avg: if (slot.counts[i] > 0) { - double avg_val = slot.has_float_value[i] - ? slot.sums_float64[i] / static_cast(slot.counts[i]) - : static_cast(slot.sums_int64[i]) / - static_cast(slot.counts[i]); + double avg_val = + slot.has_float_value[i] + ? slot.sums_float64[i] / static_cast(slot.counts[i]) + : static_cast(slot.sums_int64[i]) / + static_cast(slot.counts[i]); out_batch.get_column(col_idx).append( common::Value::make_float64(avg_val)); } else { @@ -823,11 +826,13 @@ class VectorizedGroupByOperator : public VectorizedOperator { break; case AggregateType::Avg: if (state.counts[i] > 0) { - double avg_val = state.has_float_value_[i] - ? state.sums_float64[i] / static_cast(state.counts[i]) - : static_cast(state.sums_int64[i]) / - static_cast(state.counts[i]); - out_batch.get_column(col_idx).append(common::Value::make_float64(avg_val)); + double avg_val = + state.has_float_value_[i] + ? state.sums_float64[i] / static_cast(state.counts[i]) + : static_cast(state.sums_int64[i]) / + static_cast(state.counts[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(avg_val)); } else { out_batch.get_column(col_idx).append(common::Value::make_null()); } diff --git a/include/storage/memory_mapped_column.hpp b/include/storage/memory_mapped_column.hpp index 0cfbbdae..827cbe7f 100644 --- a/include/storage/memory_mapped_column.hpp +++ b/include/storage/memory_mapped_column.hpp @@ -17,26 +17,26 @@ namespace cloudsql::storage { * For fixed-width types, provides O(1) access to any row via pointer arithmetic. */ class MemoryMappedColumn { -public: + public: struct MappedRegion { - void* addr; // mmap'd base address + void* addr; // mmap'd base address size_t size; // mapped size in bytes - int fd; // file descriptor (for munmap) + int fd; // file descriptor (for munmap) }; -private: + private: MappedRegion data_region_{nullptr, 0, -1}; MappedRegion null_region_{nullptr, 0, -1}; size_t element_size_ = 0; // stride for fixed-width columns bool is_fixed_width_ = false; size_t row_count_ = 0; -public: + public: ~MemoryMappedColumn() { unmap(); } // Map a column's data and nulls files. Returns true on success. - bool map(const std::string& data_path, const std::string& null_path, - size_t element_size, size_t row_count); + bool map(const std::string& data_path, const std::string& null_path, size_t element_size, + size_t row_count); // Unmap and release resources void unmap(); diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index 3972a073..a361a5af 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -1706,8 +1706,8 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( if (arg->type() == parser::ExprType::Column) { const auto* col = dynamic_cast(arg.get()); if (col != nullptr) { - info.input_col_idx = - static_cast(current_root->output_schema().find_column(col->name())); + info.input_col_idx = static_cast( + current_root->output_schema().find_column(col->name())); } } } diff --git a/src/storage/memory_mapped_column.cpp b/src/storage/memory_mapped_column.cpp index cd078257..3ce50c49 100644 --- a/src/storage/memory_mapped_column.cpp +++ b/src/storage/memory_mapped_column.cpp @@ -4,16 +4,18 @@ */ #include "storage/memory_mapped_column.hpp" + +#include #include #include -#include #include + #include namespace cloudsql::storage { bool MemoryMappedColumn::map(const std::string& data_path, const std::string& null_path, - size_t element_size, size_t row_count) { + size_t element_size, size_t row_count) { // Map data file int data_fd = ::open(data_path.c_str(), O_RDONLY); if (data_fd < 0) return false; From d63b76391700543102d52af9581e71443aa50104 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 21 May 2026 16:21:51 +0300 Subject: [PATCH 06/21] Clean up DirectIndexAgg: remove dead code, mark slot valid in find_or_insert - Remove unused valid_slot_indices_ member (produces_output_batch_direct iterates direct_group_keys_ instead) - Remove orphaned valid_slots() accessor - Fix find_or_insert to mark slot.valid=true on first insertion (was relying on accidental post-condition in caller) --- include/executor/vectorized_operator.hpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index daeb9339..3e76ffc2 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -398,9 +398,6 @@ class DirectIndexAgg { mutable int64_t min_key_ = INT64_MAX; mutable int64_t max_key_ = INT64_MIN; - // Track valid slots for iteration - std::vector valid_slot_indices_; - public: void init(size_t capacity_hint, size_t max_aggregates, size_t max_group_keys = 1) { max_aggregates_ = max_aggregates; @@ -408,7 +405,6 @@ class DirectIndexAgg { min_key_ = INT64_MAX; max_key_ = INT64_MIN; slots_.resize(capacity_hint); - valid_slot_indices_.reserve(capacity_hint); } GroupSlot& slot(size_t idx) { return slots_[idx]; } @@ -427,6 +423,7 @@ class DirectIndexAgg { } } size_t idx = static_cast(key1 - min_key_); + slots_[idx].valid = true; // Mark valid on first insertion return idx; } @@ -438,8 +435,6 @@ class DirectIndexAgg { return count; } - const std::vector& valid_slots() const { return valid_slot_indices_; } - int64_t min_key() const { return min_key_; } int64_t max_key() const { return max_key_; } }; From 0f02154d4df1890e90513e730a8ddc299a0d1b24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 21 May 2026 16:53:24 +0300 Subject: [PATCH 07/21] Add OpenAddressHashAgg class for efficient hash-based aggregation - Open-addressing hash table with linear probing and FNV-1a 64-bit hash - Binary key encoding: [type_tag (1B)][len (4B)][data...] - avoids string allocation - HashBucket stores key_bytes inline for fast comparison, is_new flag for iteration - Added to VectorizedGroupByOperator as fallback when DirectIndexAgg doesn't apply - Fixed grow() to use key_data instead of casting key_int64 Note: process_input_batch_open_addressing() is written but not yet wired into process_input_batch() - the old hash path is still being used. This is intentional to allow incremental testing. --- include/executor/vectorized_operator.hpp | 227 +++++++++++++++++++++++ 1 file changed, 227 insertions(+) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 3e76ffc2..dbff148c 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -6,6 +6,7 @@ #ifndef CLOUDSQL_EXECUTOR_VECTORIZED_OPERATOR_HPP #define CLOUDSQL_EXECUTOR_VECTORIZED_OPERATOR_HPP +#include #include #include #include @@ -364,6 +365,155 @@ class VectorizedAggregateOperator : public VectorizedOperator { } }; +/** + * @brief Open-addressing hash aggregation for arbitrary GROUP BY keys. + * + * Uses linear probing with power-of-2 capacity. Binary key encoding avoids + * string allocation for common key types. Stores hash to avoid recomputation + * on collision resolution. + * + * Key encoding scheme: + * [1 byte: type tag] 0x01=NULL, 0x02=INT64, 0x03=FLOAT64, 0x04=STRING + * [4 bytes: key length (little-endian)] + * [key data...] + */ +class OpenAddressHashAgg { + public: + static constexpr size_t MAX_AGGREGATES = 8; + static constexpr float kLoadFactor = 0.5f; + + private: + struct HashBucket { + bool occupied = false; + bool is_new = false; // True if this bucket was just allocated + uint64_t key_hash = 0; + int64_t key_int64 = 0; // Direct storage for int64 keys + std::string key_string; // Fallback for strings/long keys + int64_t counts[MAX_AGGREGATES] = {0}; + int64_t sums_int64[MAX_AGGREGATES] = {0}; + double sums_float64[MAX_AGGREGATES / 2] = {0.0}; + bool has_float_value[MAX_AGGREGATES] = {false}; + uint8_t key_type = 0; // 0x02=INT64, 0x04=STRING + uint32_t key_len = 0; // For non-int64 keys + uint8_t key_data[64]; // Stored key bytes for iteration + }; + + std::vector buckets_; + size_t mask_ = 0; + size_t num_occupied_ = 0; + size_t max_aggregates_ = 0; + std::vector valid_indices_; // For iteration + + static constexpr size_t kInitialCapacity = 1024; + + public: + static uint64_t hash_bytes(const uint8_t* data, size_t len) { + // FNV-1a 64-bit hash + uint64_t hash = 14695981039346656037ull; + for (size_t i = 0; i < len; ++i) { + hash ^= data[i]; + hash *= 1099511628211ull; + } + return hash; + } + + void init(size_t capacity_hint, size_t max_aggregates) { + max_aggregates_ = max_aggregates; + num_occupied_ = 0; + valid_indices_.clear(); + + size_t cap = kInitialCapacity; + while (cap < capacity_hint) cap *= 2; + buckets_.assign(cap, HashBucket()); + mask_ = cap - 1; + } + + HashBucket& find_or_insert(const uint8_t* key, size_t key_len, uint64_t hash) { + // Grow if load factor exceeded + if (num_occupied_ >= buckets_.size() * kLoadFactor) { + grow(); + } + + size_t idx = hash & mask_; + for (size_t probes = 0; probes < buckets_.size(); ++probes) { + auto& bucket = buckets_[idx]; + if (!bucket.occupied) { + bucket.occupied = true; + bucket.is_new = true; + bucket.key_hash = hash; + bucket.key_len = static_cast(key_len); + std::memcpy(bucket.key_data, key, key_len); + num_occupied_++; + valid_indices_.push_back(idx); + return bucket; + } + if (bucket.key_hash == hash && bucket.key_len == key_len && + std::memcmp(bucket.key_data, key, key_len) == 0) { + bucket.is_new = false; + return bucket; // Found + } + idx = (idx + 1) & mask_; // Linear probe + } + return buckets_[idx]; // Shouldn't reach here + } + + HashBucket& find_or_insert_int64(int64_t key, uint64_t hash) { + if (num_occupied_ >= buckets_.size() * kLoadFactor) { + grow(); + } + + uint8_t key_buf[sizeof(int64_t) + 1]; + key_buf[0] = 0x02; + std::memcpy(&key_buf[1], &key, sizeof(int64_t)); + + size_t idx = hash & mask_; + for (size_t probes = 0; probes < buckets_.size(); ++probes) { + auto& bucket = buckets_[idx]; + if (!bucket.occupied) { + bucket.occupied = true; + bucket.is_new = true; + bucket.key_hash = hash; + bucket.key_int64 = key; + bucket.key_type = 0x02; + bucket.key_len = sizeof(int64_t) + 1; + std::memcpy(bucket.key_data, key_buf, bucket.key_len); + num_occupied_++; + valid_indices_.push_back(idx); + return bucket; + } + if (bucket.key_type == 0x02 && bucket.key_int64 == key) { + bucket.is_new = false; + return bucket; + } + idx = (idx + 1) & mask_; + } + return buckets_[idx]; + } + + void grow() { + auto old_buckets = std::move(buckets_); + size_t new_cap = old_buckets.empty() ? kInitialCapacity : old_buckets.size() * 2; + buckets_.assign(new_cap, HashBucket()); + mask_ = new_cap - 1; + num_occupied_ = 0; + valid_indices_.clear(); + + for (size_t i = 0; i < old_buckets.size(); ++i) { + if (old_buckets[i].occupied) { + if (old_buckets[i].key_type == 0x02) { + find_or_insert_int64(old_buckets[i].key_int64, old_buckets[i].key_hash); + } else { + find_or_insert(old_buckets[i].key_data, old_buckets[i].key_len, + old_buckets[i].key_hash); + } + } + } + } + + size_t group_count() const { return valid_indices_.size(); } + const std::vector& valid_slots() const { return valid_indices_; } +}; + /** * @brief Direct-indexed aggregation for low-cardinality integer GROUP BY. * @@ -496,6 +646,10 @@ class VectorizedGroupByOperator : public VectorizedOperator { bool is_direct_indexable_ = false; std::vector direct_group_keys_; // Ordered keys for direct index output + // Open-addressing hash aggregation (for general GROUP BY) + OpenAddressHashAgg hash_agg_; + std::vector hash_group_keys_; // Ordered int64 keys for iteration + public: VectorizedGroupByOperator(std::unique_ptr child, std::vector> group_by, @@ -525,6 +679,8 @@ class VectorizedGroupByOperator : public VectorizedOperator { is_direct_indexable_ = (group_by_.size() == 1 && is_int_key); if (is_direct_indexable_) { agg_.init(65536, aggregates_.size(), group_by_.size()); + } else { + hash_agg_.init(65536, aggregates_.size()); } // Create schema for group key evaluation @@ -598,6 +754,77 @@ class VectorizedGroupByOperator : public VectorizedOperator { input_batch_->clear(); } + void process_input_batch_open_addressing(VectorBatch& batch) { + // Fast path: open-addressing hash with binary key encoding + for (size_t r = 0; r < batch.row_count(); ++r) { + // Encode key: [type tag][len][data] + uint8_t key_buf[64]; + size_t key_len = 0; + + for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { + size_t col_idx = group_by_col_indices_[i]; + if (col_idx == static_cast(-1)) { + set_error("GROUP BY: column not found in input schema: " + + group_by_[i]->to_string()); + return; + } + + const auto& val = batch.get_column(col_idx).get(r); + if (val.is_null()) { + key_buf[key_len++] = 0x01; // NULL tag + } else if (val.type() == common::ValueType::TYPE_INT64) { + key_buf[key_len++] = 0x02; // INT64 tag + int64_t v = val.to_int64(); + std::memcpy(&key_buf[key_len], &v, sizeof(int64_t)); + key_len += sizeof(int64_t); + } else { + key_buf[key_len++] = 0x04; // STRING tag + std::string val_str = val.as_text(); + uint32_t len = static_cast(val_str.size()); + std::memcpy(&key_buf[key_len], &len, 4); + key_len += 4; + if (key_len + val_str.size() > sizeof(key_buf)) { + set_error("GROUP BY key too large for buffer"); + return; + } + std::memcpy(&key_buf[key_len], val_str.data(), val_str.size()); + key_len += val_str.size(); + } + } + + uint64_t hash = OpenAddressHashAgg::hash_bytes(key_buf, key_len); + auto& bucket = hash_agg_.find_or_insert(key_buf, key_len, hash); + + // Store key for output if first time + if (bucket.is_new) { + hash_group_keys_.push_back(bucket.key_int64); + } + + // Update accumulators directly in bucket + for (size_t i = 0; i < aggregates_.size(); ++i) { + const auto& agg = aggregates_[i]; + if (agg.type == AggregateType::Count && agg.input_col_idx < 0) { + bucket.counts[i]++; + } else if ((agg.type == AggregateType::Sum || agg.type == AggregateType::Avg) && + agg.input_col_idx >= 0) { + const auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(r)) { + bucket.counts[i]++; + if (col.type() == common::ValueType::TYPE_INT64) { + auto& num_col = dynamic_cast&>(col); + bucket.sums_int64[i] += num_col.raw_data()[r]; + } else if (col.type() == common::ValueType::TYPE_FLOAT64) { + auto& num_col = dynamic_cast&>(col); + bucket.sums_float64[i] += num_col.raw_data()[r]; + bucket.has_float_value[i] = true; + } + } + } + } + } + input_batch_->clear(); + } + void process_input_batch_hash(VectorBatch& batch) { // Build key using length-prefixed, type-tagged encoding for (size_t r = 0; r < batch.row_count(); ++r) { From adff7ce0213d92f85aa825b0ea00566ded3f724b Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Thu, 21 May 2026 13:53:55 +0000 Subject: [PATCH 08/21] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index dbff148c..fb458a42 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -385,17 +385,17 @@ class OpenAddressHashAgg { private: struct HashBucket { bool occupied = false; - bool is_new = false; // True if this bucket was just allocated + bool is_new = false; // True if this bucket was just allocated uint64_t key_hash = 0; - int64_t key_int64 = 0; // Direct storage for int64 keys - std::string key_string; // Fallback for strings/long keys + int64_t key_int64 = 0; // Direct storage for int64 keys + std::string key_string; // Fallback for strings/long keys int64_t counts[MAX_AGGREGATES] = {0}; int64_t sums_int64[MAX_AGGREGATES] = {0}; double sums_float64[MAX_AGGREGATES / 2] = {0.0}; bool has_float_value[MAX_AGGREGATES] = {false}; - uint8_t key_type = 0; // 0x02=INT64, 0x04=STRING - uint32_t key_len = 0; // For non-int64 keys - uint8_t key_data[64]; // Stored key bytes for iteration + uint8_t key_type = 0; // 0x02=INT64, 0x04=STRING + uint32_t key_len = 0; // For non-int64 keys + uint8_t key_data[64]; // Stored key bytes for iteration }; std::vector buckets_; From 41c0c17230656c06444e1cafed788fd22a7f7ff3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 21 May 2026 17:08:16 +0300 Subject: [PATCH 09/21] Wire OpenAddressHashAgg into process_input_batch --- include/executor/vectorized_operator.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index fb458a42..0fb99cc4 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -709,7 +709,7 @@ class VectorizedGroupByOperator : public VectorizedOperator { if (is_direct_indexable_) { process_input_batch_direct(batch); } else { - process_input_batch_hash(batch); + process_input_batch_open_addressing(batch); } } From 6e891aa917748ef45b41bae0e31a8c52d3667b04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 14:15:13 +0300 Subject: [PATCH 10/21] Fix key_type check in OpenAddressHashAgg::find_or_insert --- include/executor/vectorized_operator.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 0fb99cc4..48b3d477 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -442,12 +442,14 @@ class OpenAddressHashAgg { bucket.is_new = true; bucket.key_hash = hash; bucket.key_len = static_cast(key_len); + bucket.key_type = key[0]; std::memcpy(bucket.key_data, key, key_len); num_occupied_++; valid_indices_.push_back(idx); return bucket; } if (bucket.key_hash == hash && bucket.key_len == key_len && + bucket.key_type == key[0] && std::memcmp(bucket.key_data, key, key_len) == 0) { bucket.is_new = false; return bucket; // Found From 25d2c5e8e541efe2e486b69eb9a142fc9c1b3855 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 14:15:53 +0300 Subject: [PATCH 11/21] Handle string key overflow gracefully with heap allocation Instead of returning an error for GROUP BY keys larger than 56 bytes, use std::vector heap_key when stack buffer is too small. Also fix hash_agg_.find_or_insert call to use key_ptr instead of key_buf when heap allocation was used. --- include/executor/vectorized_operator.hpp | 27 +++++++++++++----------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 48b3d477..1cf34a68 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -761,6 +761,8 @@ class VectorizedGroupByOperator : public VectorizedOperator { for (size_t r = 0; r < batch.row_count(); ++r) { // Encode key: [type tag][len][data] uint8_t key_buf[64]; + uint8_t* key_ptr = key_buf; + std::vector heap_key; size_t key_len = 0; for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { @@ -773,29 +775,30 @@ class VectorizedGroupByOperator : public VectorizedOperator { const auto& val = batch.get_column(col_idx).get(r); if (val.is_null()) { - key_buf[key_len++] = 0x01; // NULL tag + key_ptr[key_len++] = 0x01; // NULL tag } else if (val.type() == common::ValueType::TYPE_INT64) { - key_buf[key_len++] = 0x02; // INT64 tag + key_ptr[key_len++] = 0x02; // INT64 tag int64_t v = val.to_int64(); - std::memcpy(&key_buf[key_len], &v, sizeof(int64_t)); + std::memcpy(&key_ptr[key_len], &v, sizeof(int64_t)); key_len += sizeof(int64_t); } else { - key_buf[key_len++] = 0x04; // STRING tag + key_ptr[key_len++] = 0x04; // STRING tag std::string val_str = val.as_text(); uint32_t len = static_cast(val_str.size()); - std::memcpy(&key_buf[key_len], &len, 4); - key_len += 4; - if (key_len + val_str.size() > sizeof(key_buf)) { - set_error("GROUP BY key too large for buffer"); - return; + if (key_len + 4 + val_str.size() > 64) { + heap_key.resize(key_len + 4 + val_str.size()); + std::memcpy(heap_key.data(), key_ptr, key_len); + key_ptr = heap_key.data(); } - std::memcpy(&key_buf[key_len], val_str.data(), val_str.size()); + std::memcpy(&key_ptr[key_len], &len, 4); + key_len += 4; + std::memcpy(&key_ptr[key_len], val_str.data(), val_str.size()); key_len += val_str.size(); } } - uint64_t hash = OpenAddressHashAgg::hash_bytes(key_buf, key_len); - auto& bucket = hash_agg_.find_or_insert(key_buf, key_len, hash); + uint64_t hash = OpenAddressHashAgg::hash_bytes(key_ptr, key_len); + auto& bucket = hash_agg_.find_or_insert(key_ptr, key_len, hash); // Store key for output if first time if (bucket.is_new) { From 50c28257066f2575ec8b517e110d39916b2808db Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 11:16:19 +0000 Subject: [PATCH 12/21] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 1cf34a68..87692a93 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -448,8 +448,7 @@ class OpenAddressHashAgg { valid_indices_.push_back(idx); return bucket; } - if (bucket.key_hash == hash && bucket.key_len == key_len && - bucket.key_type == key[0] && + if (bucket.key_hash == hash && bucket.key_len == key_len && bucket.key_type == key[0] && std::memcmp(bucket.key_data, key, key_len) == 0) { bucket.is_new = false; return bucket; // Found From bb64516eeb7b282a07835b30c70eb0fc37759ec1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 14:18:37 +0300 Subject: [PATCH 13/21] Add produce_output_batch_open_addressing and wire it in - Add slot() accessor to OpenAddressHashAgg for bucket iteration - Add produce_output_batch_open_addressing() to iterate hash_agg_ buckets - Wire it into produce_output_batch() instead of the old hash path - Keep process_input_batch_hash as dead code for now (for reference) --- include/executor/vectorized_operator.hpp | 72 +++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 87692a93..a94aaf47 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -513,6 +513,8 @@ class OpenAddressHashAgg { size_t group_count() const { return valid_indices_.size(); } const std::vector& valid_slots() const { return valid_indices_; } + HashBucket& slot(size_t idx) { return buckets_[idx]; } + const HashBucket& slot(size_t idx) const { return buckets_[idx]; } }; /** @@ -924,7 +926,7 @@ class VectorizedGroupByOperator : public VectorizedOperator { if (is_direct_indexable_) { return produce_output_batch_direct(out_batch); } - return produce_output_batch_hash(out_batch); + return produce_output_batch_open_addressing(out_batch); } bool produce_output_batch_direct(VectorBatch& out_batch) { @@ -996,6 +998,74 @@ class VectorizedGroupByOperator : public VectorizedOperator { return true; } + bool produce_output_batch_open_addressing(VectorBatch& out_batch) { + if (current_group_idx_ >= hash_agg_.group_count()) { + return false; // EOF + } + + out_batch.clear(); + if (out_batch.column_count() == 0) { + out_batch.init_from_schema(output_schema_); + } + + constexpr size_t BATCH_SIZE = 1024; + size_t output_count = 0; + + while (current_group_idx_ < hash_group_keys_.size() && output_count < BATCH_SIZE) { + int64_t key = hash_group_keys_[current_group_idx_]; + size_t slot_idx = hash_agg_.valid_slots()[current_group_idx_]; + const auto& bucket = hash_agg_.slot(slot_idx); + + // Append group key column + out_batch.get_column(0).append(common::Value::make_int64(key)); + + // Append aggregate result columns + for (size_t i = 0; i < aggregates_.size(); ++i) { + size_t col_idx = group_by_.size() + i; + switch (aggregates_[i].type) { + case AggregateType::Count: + out_batch.get_column(col_idx).append( + common::Value::make_int64(bucket.counts[i])); + break; + case AggregateType::Sum: + if (output_schema_.get_column(col_idx).type() == + common::ValueType::TYPE_INT64) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(bucket.sums_int64[i])); + } else { + double float_val = bucket.has_float_value[i] + ? bucket.sums_float64[i] + : static_cast(bucket.sums_int64[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(float_val)); + } + break; + case AggregateType::Avg: + if (bucket.counts[i] > 0) { + double avg_val = + bucket.has_float_value[i] + ? bucket.sums_float64[i] / static_cast(bucket.counts[i]) + : static_cast(bucket.sums_int64[i]) / + static_cast(bucket.counts[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(avg_val)); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; + default: + out_batch.get_column(col_idx).append(common::Value::make_null()); + break; + } + } + output_count++; + current_group_idx_++; + } + + out_batch.set_row_count(output_count); + return true; + } + bool produce_output_batch_hash(VectorBatch& out_batch) { if (current_group_idx_ >= group_keys_.size()) { return false; // EOF From 42d60e3b15ad6ef39bf7f98142570ac7fbdfb9ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 15:14:53 +0300 Subject: [PATCH 14/21] Cleanup: remove dead code from PR #157 review - Remove unused key_string field from HashBucket (saves 32 bytes/bucket) - Delete process_input_batch_hash dead code (never called) - Fix self-assignment warning (resumed_bucket_idx_ = resumed_bucket_idx_) - Add trailing newlines to MemoryMappedColumn files --- include/executor/vectorized_operator.hpp | 56 +----------------------- include/storage/memory_mapped_column.hpp | 2 +- src/storage/memory_mapped_column.cpp | 2 +- 3 files changed, 3 insertions(+), 57 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index a94aaf47..7606c9a2 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -388,8 +388,7 @@ class OpenAddressHashAgg { bool is_new = false; // True if this bucket was just allocated uint64_t key_hash = 0; int64_t key_int64 = 0; // Direct storage for int64 keys - std::string key_string; // Fallback for strings/long keys - int64_t counts[MAX_AGGREGATES] = {0}; + int64_t counts[MAX_AGGREGATES] = {0}; int64_t sums_int64[MAX_AGGREGATES] = {0}; double sums_float64[MAX_AGGREGATES / 2] = {0.0}; bool has_float_value[MAX_AGGREGATES] = {false}; @@ -831,58 +830,6 @@ class VectorizedGroupByOperator : public VectorizedOperator { input_batch_->clear(); } - void process_input_batch_hash(VectorBatch& batch) { - // Build key using length-prefixed, type-tagged encoding - for (size_t r = 0; r < batch.row_count(); ++r) { - std::string key; - for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { - size_t col_idx = group_by_col_indices_[i]; - if (col_idx == static_cast(-1)) { - // Column not found in schema - fail fast - set_error("GROUP BY: column not found in input schema: " + - group_by_[i]->to_string()); - return; - } - - const auto& val = batch.get_column(col_idx).get(r); - if (val.is_null()) { - // Use a dedicated NULL marker for null values - key.append("\1NULL\0", 6); - } else { - // Length-prefixed value: marker + length (4 bytes) + data - std::string val_str = val.to_string(); - key.push_back('\0'); // non-NULL marker - uint32_t len = static_cast(val_str.size()); - key.append(reinterpret_cast(&len), 4); - key.append(val_str); - } - } - - // Get or create group state - auto it = groups_.find(key); - if (it == groups_.end()) { - // Store group key values for output - std::vector key_vals; - for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { - size_t col_idx = group_by_col_indices_[i]; - if (col_idx == static_cast(-1)) { - key_vals.push_back(common::Value::make_null()); - } else { - key_vals.push_back(batch.get_column(col_idx).get(r)); - } - } - auto result = groups_.emplace(key, VectorizedGroupState(aggregates_.size())); - it = result.first; - group_keys_.push_back(key); - group_values_.push_back(std::move(key_vals)); - } - - // Update accumulators for this row - update_accumulators(it->second, batch, r); - } - input_batch_->clear(); - } - void update_accumulators(VectorizedGroupState& state, VectorBatch& batch, size_t row_idx) { for (size_t i = 0; i < aggregates_.size(); ++i) { const auto& agg = aggregates_[i]; @@ -1385,7 +1332,6 @@ class VectorizedHashJoinOperator : public VectorizedOperator { if (out_batch.row_count() >= BATCH_SIZE) { // Batch full - save state and return resuming_bucket_scan_ = true; - resumed_bucket_idx_ = resumed_bucket_idx_; resumed_entry_idx_ = i; resumed_key_val_ = key_val; return true; // Caller must consume batch before continuing diff --git a/include/storage/memory_mapped_column.hpp b/include/storage/memory_mapped_column.hpp index 827cbe7f..f20ace4e 100644 --- a/include/storage/memory_mapped_column.hpp +++ b/include/storage/memory_mapped_column.hpp @@ -57,4 +57,4 @@ class MemoryMappedColumn { size_t row_count() const { return row_count_; } }; -} // namespace cloudsql::storage \ No newline at end of file +} // namespace cloudsql::storage diff --git a/src/storage/memory_mapped_column.cpp b/src/storage/memory_mapped_column.cpp index 3ce50c49..c902233e 100644 --- a/src/storage/memory_mapped_column.cpp +++ b/src/storage/memory_mapped_column.cpp @@ -75,4 +75,4 @@ void MemoryMappedColumn::unmap() { } } -} // namespace cloudsql::storage \ No newline at end of file +} // namespace cloudsql::storage From bffd6fb756f17373fed9343fff7fe8ff48a22cd0 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 12:15:47 +0000 Subject: [PATCH 15/21] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 7606c9a2..981ac708 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -387,8 +387,8 @@ class OpenAddressHashAgg { bool occupied = false; bool is_new = false; // True if this bucket was just allocated uint64_t key_hash = 0; - int64_t key_int64 = 0; // Direct storage for int64 keys - int64_t counts[MAX_AGGREGATES] = {0}; + int64_t key_int64 = 0; // Direct storage for int64 keys + int64_t counts[MAX_AGGREGATES] = {0}; int64_t sums_int64[MAX_AGGREGATES] = {0}; double sums_float64[MAX_AGGREGATES / 2] = {0.0}; bool has_float_value[MAX_AGGREGATES] = {false}; From a0afa36eb99018efaf5182344f5c81467510ce22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 15:18:11 +0300 Subject: [PATCH 16/21] Fix analytics_tests: add nullptr thread_pool arg to VectorizedSeqScanOperator The constructor signature changed to accept an optional thread_pool parameter, but the test constructions weren't updated. --- tests/analytics_tests.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/analytics_tests.cpp b/tests/analytics_tests.cpp index 780e332d..2c3cbfb7 100644 --- a/tests/analytics_tests.cpp +++ b/tests/analytics_tests.cpp @@ -99,7 +99,7 @@ TEST(AnalyticsTests, VectorizedExecutionPipeline) { ASSERT_TRUE(table->append_batch(*input_batch)); // 2. Build Pipeline: Scan -> Filter(id > 500) -> Project(val) - auto scan = std::make_unique("pipeline_test", table); + auto scan = std::make_unique("pipeline_test", table, nullptr); // Filter condition: id > 500 auto col_expr = std::make_unique("id"); @@ -158,7 +158,7 @@ TEST(AnalyticsTests, VectorizedAggregation) { ASSERT_TRUE(table->append_batch(*input_batch)); // 2. Build Agg Pipeline: Scan -> Aggregate(COUNT(*), SUM(val), SUM(fval)) - auto scan = std::make_unique("agg_test", table); + auto scan = std::make_unique("agg_test", table, nullptr); Schema out_schema; out_schema.add_column("count", common::ValueType::TYPE_INT64); @@ -200,7 +200,7 @@ TEST(AnalyticsTests, AggregateNullHandling) { ASSERT_TRUE(table->append_batch(*input_batch)); // 2. Build Agg Pipeline: Scan -> Aggregate(COUNT(*), SUM(val)) - auto scan = std::make_unique("null_agg_test", table); + auto scan = std::make_unique("null_agg_test", table, nullptr); Schema out_schema; out_schema.add_column("count", common::ValueType::TYPE_INT64); From 712dca2b4bb7d9bd73924d1dc538dc2e40fc0184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 15:19:49 +0300 Subject: [PATCH 17/21] Fix analytics_tests: add nullptr thread_pool to direct VectorizedSeqScanOperator construction Line 57 uses direct construction, not make_unique, so wasn't caught by the previous nullptr fix. --- tests/analytics_tests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/analytics_tests.cpp b/tests/analytics_tests.cpp index 2c3cbfb7..ed28cb9e 100644 --- a/tests/analytics_tests.cpp +++ b/tests/analytics_tests.cpp @@ -54,7 +54,7 @@ TEST(AnalyticsTests, ColumnarTableLifecycle) { // 3. Scan and verify round-trip integrity auto table_ptr = std::make_shared(table); - VectorizedSeqScanOperator scan("lifecycle_test", table_ptr); + VectorizedSeqScanOperator scan("lifecycle_test", table_ptr, nullptr); auto result_batch = VectorBatch::create(schema); ASSERT_TRUE(scan.next_batch(*result_batch)); From 8338c9cc0e4e80761fedc250b0c86babadde1487 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 15:30:13 +0300 Subject: [PATCH 18/21] Fix g++ build: include thread_pool.hpp in vectorized_operator.hpp g++ was treating ThreadPool as int in forward declaration resolution. Include the header to ensure the type is fully defined. --- include/executor/vectorized_operator.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 981ac708..5daeb118 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -14,6 +14,7 @@ #include #include "executor/operator.hpp" +#include "executor/thread_pool.hpp" #include "executor/types.hpp" #include "parser/expression.hpp" #include "storage/columnar_table.hpp" From a84b0b554447c18197891c166c907ed6dcd619d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 15:55:08 +0300 Subject: [PATCH 19/21] Fix group key type emission in open-addressing output path Store actual common::Value objects in hash_group_keys_ instead of just int64_t keys, preserving column type (TEXT, INT64, etc.) for correct output emission. --- include/executor/vectorized_operator.hpp | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 5daeb118..24832a99 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -651,7 +651,7 @@ class VectorizedGroupByOperator : public VectorizedOperator { // Open-addressing hash aggregation (for general GROUP BY) OpenAddressHashAgg hash_agg_; - std::vector hash_group_keys_; // Ordered int64 keys for iteration + std::vector> hash_group_keys_; // Ordered group keys for iteration public: VectorizedGroupByOperator(std::unique_ptr child, @@ -803,7 +803,11 @@ class VectorizedGroupByOperator : public VectorizedOperator { // Store key for output if first time if (bucket.is_new) { - hash_group_keys_.push_back(bucket.key_int64); + std::vector key_vals; + for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { + key_vals.push_back(batch.get_column(group_by_col_indices_[i]).get(r)); + } + hash_group_keys_.push_back(std::move(key_vals)); } // Update accumulators directly in bucket @@ -960,12 +964,14 @@ class VectorizedGroupByOperator : public VectorizedOperator { size_t output_count = 0; while (current_group_idx_ < hash_group_keys_.size() && output_count < BATCH_SIZE) { - int64_t key = hash_group_keys_[current_group_idx_]; size_t slot_idx = hash_agg_.valid_slots()[current_group_idx_]; const auto& bucket = hash_agg_.slot(slot_idx); - // Append group key column - out_batch.get_column(0).append(common::Value::make_int64(key)); + // Append group key columns + const auto& key_vals = hash_group_keys_[current_group_idx_]; + for (size_t i = 0; i < key_vals.size(); ++i) { + out_batch.get_column(i).append(key_vals[i]); + } // Append aggregate result columns for (size_t i = 0; i < aggregates_.size(); ++i) { From b2da17cef9bd63a5c9c1be38f539d4013ca30645 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 16:18:26 +0300 Subject: [PATCH 20/21] Fix MIN/MAX aggregates and nullptr thread_pool in vectorized tests - Add mins/maxes/has_mins fields to HashBucket for MIN/MAX tracking - Wire MIN/MAX into process_input_batch_open_addressing accumulator loop - Emit MIN/MAX in produce_output_batch_open_addressing - Fix VectorizedSeqScanOperator construction in vectorized_operator_tests --- include/executor/vectorized_operator.hpp | 33 ++++++++++++++++++++++++ tests/vectorized_operator_tests.cpp | 2 +- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 24832a99..733255ff 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -393,6 +393,9 @@ class OpenAddressHashAgg { int64_t sums_int64[MAX_AGGREGATES] = {0}; double sums_float64[MAX_AGGREGATES / 2] = {0.0}; bool has_float_value[MAX_AGGREGATES] = {false}; + int64_t mins[MAX_AGGREGATES] = {0}; + int64_t maxes[MAX_AGGREGATES] = {0}; + bool has_mins[MAX_AGGREGATES] = {false}; // Track if initialized uint8_t key_type = 0; // 0x02=INT64, 0x04=STRING uint32_t key_len = 0; // For non-int64 keys uint8_t key_data[64]; // Stored key bytes for iteration @@ -829,6 +832,20 @@ class VectorizedGroupByOperator : public VectorizedOperator { bucket.has_float_value[i] = true; } } + } else if ((agg.type == AggregateType::Min || agg.type == AggregateType::Max) && + agg.input_col_idx >= 0) { + const auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(r)) { + auto val = col.get(r).to_int64(); + if (!bucket.has_mins[i]) { + bucket.mins[i] = val; + bucket.maxes[i] = val; + bucket.has_mins[i] = true; + } else { + bucket.mins[i] = std::min(bucket.mins[i], val); + bucket.maxes[i] = std::max(bucket.maxes[i], val); + } + } } } } @@ -1007,6 +1024,22 @@ class VectorizedGroupByOperator : public VectorizedOperator { out_batch.get_column(col_idx).append(common::Value::make_null()); } break; + case AggregateType::Min: + if (bucket.has_mins[i]) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(bucket.mins[i])); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; + case AggregateType::Max: + if (bucket.has_mins[i]) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(bucket.maxes[i])); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; default: out_batch.get_column(col_idx).append(common::Value::make_null()); break; diff --git a/tests/vectorized_operator_tests.cpp b/tests/vectorized_operator_tests.cpp index a0b92f43..3b46cf53 100644 --- a/tests/vectorized_operator_tests.cpp +++ b/tests/vectorized_operator_tests.cpp @@ -114,7 +114,7 @@ TEST_F(VectorizedSeqScanTests, SequentialCallsUntilEOF) { ASSERT_TRUE(table.append_batch(*batch)); auto table_ptr = std::make_shared(table); - VectorizedSeqScanOperator scan("sequential_scan", table_ptr); + VectorizedSeqScanOperator scan("sequential_scan", table_ptr, nullptr); auto result = VectorBatch::create(schema); int64_t expected = 0; From ab7e939aa1edcbbd4f0e19dfa1f8e601042a1278 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Fri, 22 May 2026 13:19:02 +0000 Subject: [PATCH 21/21] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 733255ff..cf09a496 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -396,9 +396,9 @@ class OpenAddressHashAgg { int64_t mins[MAX_AGGREGATES] = {0}; int64_t maxes[MAX_AGGREGATES] = {0}; bool has_mins[MAX_AGGREGATES] = {false}; // Track if initialized - uint8_t key_type = 0; // 0x02=INT64, 0x04=STRING - uint32_t key_len = 0; // For non-int64 keys - uint8_t key_data[64]; // Stored key bytes for iteration + uint8_t key_type = 0; // 0x02=INT64, 0x04=STRING + uint32_t key_len = 0; // For non-int64 keys + uint8_t key_data[64]; // Stored key bytes for iteration }; std::vector buckets_;