diff --git a/benchmarks/duckdb_comparison_bench.cpp b/benchmarks/duckdb_comparison_bench.cpp index 6bf42a47..b9b4a45f 100644 --- a/benchmarks/duckdb_comparison_bench.cpp +++ b/benchmarks/duckdb_comparison_bench.cpp @@ -54,6 +54,7 @@ struct CloudSQLContext { txn_manager = std::make_unique(*lock_manager, *catalog, *bpm); executor = std::make_unique(*catalog, *bpm, *lock_manager, *txn_manager); executor->set_local_only(true); + executor->set_storage_manager(storage.get()); // Enable use_vectorized for large scans // Create lineitem table (TPC-H schema, simplified) CreateTableStatement create_stmt; diff --git a/include/executor/types.hpp b/include/executor/types.hpp index 88e5452e..81b51fb0 100644 --- a/include/executor/types.hpp +++ b/include/executor/types.hpp @@ -243,6 +243,13 @@ class ColumnVector { size_ = 0; null_bitmap_.clear(); } + + /** + * @brief Steals data from another column vector by swapping internal buffers. + * After steal(), 'other' is emptied and 'this' holds the original data from 'other'. + * Throws std::runtime_error if types are incompatible. + */ + virtual void steal(ColumnVector&& other) = 0; }; /** @@ -328,6 +335,14 @@ class NumericVector : public ColumnVector { ColumnVector::clear(); data_.clear(); } + + void steal(ColumnVector&& other) override { + auto* other_num = dynamic_cast*>(&other); + if (!other_num) throw std::runtime_error("NumericVector::steal: type mismatch"); + data_.swap(other_num->data_); + null_bitmap_.swap(other_num->null_bitmap_); + std::swap(size_, other_num->size_); + } }; /** @@ -392,6 +407,14 @@ class StringVector : public ColumnVector { ColumnVector::clear(); data_.clear(); } + + void steal(ColumnVector&& other) override { + auto* other_str = dynamic_cast(&other); + if (!other_str) throw std::runtime_error("StringVector::steal: type mismatch"); + data_.swap(other_str->data_); + null_bitmap_.swap(other_str->null_bitmap_); + std::swap(size_, other_str->size_); + } }; /** diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 8f161d03..cf09a496 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -6,6 +6,7 @@ #ifndef CLOUDSQL_EXECUTOR_VECTORIZED_OPERATOR_HPP #define CLOUDSQL_EXECUTOR_VECTORIZED_OPERATOR_HPP +#include #include #include #include @@ -13,6 +14,7 @@ #include #include "executor/operator.hpp" +#include "executor/thread_pool.hpp" #include "executor/types.hpp" #include "parser/expression.hpp" #include "storage/columnar_table.hpp" @@ -63,15 +65,35 @@ class VectorizedSeqScanOperator : public VectorizedOperator { std::string table_name_; std::shared_ptr table_; uint64_t current_row_ = 0; - uint32_t batch_size_ = 1024; + uint32_t batch_size_ = 4096; + std::shared_ptr thread_pool_; + bool parallel_enabled_ = false; + size_t num_threads_ = 1; + std::vector> parallel_results_; + size_t parallel_idx_ = 0; public: - VectorizedSeqScanOperator(std::string table_name, std::shared_ptr table) + VectorizedSeqScanOperator(std::string table_name, std::shared_ptr table, + std::shared_ptr thread_pool = nullptr) : VectorizedOperator(table->schema()), table_name_(std::move(table_name)), - table_(std::move(table)) {} + table_(std::move(table)), + thread_pool_(std::move(thread_pool)) { + if (thread_pool_ && thread_pool_->num_threads() > 1) { + num_threads_ = thread_pool_->num_threads(); + parallel_enabled_ = table_->row_count() > 50000; + } + } bool next_batch(VectorBatch& out_batch) override { + if (!parallel_enabled_ || !thread_pool_) { + return next_batch_sequential(out_batch); + } + return next_batch_parallel(out_batch); + } + + private: + bool next_batch_sequential(VectorBatch& out_batch) { if (current_row_ >= table_->row_count()) { return false; } @@ -82,6 +104,60 @@ class VectorizedSeqScanOperator : public VectorizedOperator { } return false; } + + bool next_batch_parallel(VectorBatch& out_batch) { + if (parallel_idx_ >= parallel_results_.size()) { + size_t total_rows = table_->row_count(); + if (current_row_ >= total_rows) { + return false; + } + + size_t range_size = (total_rows - current_row_ + num_threads_ - 1) / num_threads_; + + parallel_results_.clear(); + parallel_idx_ = 0; + + std::vector task_starts; + task_starts.reserve(num_threads_); + + for (size_t t = 0; t < num_threads_ && current_row_ < total_rows; ++t) { + size_t start = current_row_; + task_starts.push_back(start); + size_t end = std::min(start + range_size, total_rows); + current_row_ = end; + + auto batch = VectorBatch::create(output_schema_); + parallel_results_.push_back(std::move(batch)); + } + + for (size_t t = 0; t < task_starts.size(); ++t) { + size_t start = task_starts[t]; + size_t rows_to_read = std::min(range_size, total_rows - start); + if (start >= total_rows) { + parallel_results_[t]->set_row_count(0); + continue; + } + thread_pool_->submit([this, t, start, rows_to_read]() { + table_->read_batch(start, static_cast(rows_to_read), + *parallel_results_[t]); + }); + } + + thread_pool_->wait(); + } + + if (parallel_idx_ < parallel_results_.size()) { + auto& src = *parallel_results_[parallel_idx_]; + out_batch.init_from_schema(output_schema_); + for (size_t c = 0; c < src.column_count(); ++c) { + out_batch.get_column(c).steal(std::move(src.get_column(c))); + } + out_batch.set_row_count(src.row_count()); + parallel_idx_++; + return out_batch.row_count() > 0; + } + return false; + } }; /** @@ -290,6 +366,235 @@ class VectorizedAggregateOperator : public VectorizedOperator { } }; +/** + * @brief Open-addressing hash aggregation for arbitrary GROUP BY keys. + * + * Uses linear probing with power-of-2 capacity. Binary key encoding avoids + * string allocation for common key types. Stores hash to avoid recomputation + * on collision resolution. + * + * Key encoding scheme: + * [1 byte: type tag] 0x01=NULL, 0x02=INT64, 0x03=FLOAT64, 0x04=STRING + * [4 bytes: key length (little-endian)] + * [key data...] + */ +class OpenAddressHashAgg { + public: + static constexpr size_t MAX_AGGREGATES = 8; + static constexpr float kLoadFactor = 0.5f; + + private: + struct HashBucket { + bool occupied = false; + bool is_new = false; // True if this bucket was just allocated + uint64_t key_hash = 0; + int64_t key_int64 = 0; // Direct storage for int64 keys + int64_t counts[MAX_AGGREGATES] = {0}; + int64_t sums_int64[MAX_AGGREGATES] = {0}; + double sums_float64[MAX_AGGREGATES / 2] = {0.0}; + bool has_float_value[MAX_AGGREGATES] = {false}; + int64_t mins[MAX_AGGREGATES] = {0}; + int64_t maxes[MAX_AGGREGATES] = {0}; + bool has_mins[MAX_AGGREGATES] = {false}; // Track if initialized + uint8_t key_type = 0; // 0x02=INT64, 0x04=STRING + uint32_t key_len = 0; // For non-int64 keys + uint8_t key_data[64]; // Stored key bytes for iteration + }; + + std::vector buckets_; + size_t mask_ = 0; + size_t num_occupied_ = 0; + size_t max_aggregates_ = 0; + std::vector valid_indices_; // For iteration + + static constexpr size_t kInitialCapacity = 1024; + + public: + static uint64_t hash_bytes(const uint8_t* data, size_t len) { + // FNV-1a 64-bit hash + uint64_t hash = 14695981039346656037ull; + for (size_t i = 0; i < len; ++i) { + hash ^= data[i]; + hash *= 1099511628211ull; + } + return hash; + } + + void init(size_t capacity_hint, size_t max_aggregates) { + max_aggregates_ = max_aggregates; + num_occupied_ = 0; + valid_indices_.clear(); + + size_t cap = kInitialCapacity; + while (cap < capacity_hint) cap *= 2; + buckets_.assign(cap, HashBucket()); + mask_ = cap - 1; + } + + HashBucket& find_or_insert(const uint8_t* key, size_t key_len, uint64_t hash) { + // Grow if load factor exceeded + if (num_occupied_ >= buckets_.size() * kLoadFactor) { + grow(); + } + + size_t idx = hash & mask_; + for (size_t probes = 0; probes < buckets_.size(); ++probes) { + auto& bucket = buckets_[idx]; + if (!bucket.occupied) { + bucket.occupied = true; + bucket.is_new = true; + bucket.key_hash = hash; + bucket.key_len = static_cast(key_len); + bucket.key_type = key[0]; + std::memcpy(bucket.key_data, key, key_len); + num_occupied_++; + valid_indices_.push_back(idx); + return bucket; + } + if (bucket.key_hash == hash && bucket.key_len == key_len && bucket.key_type == key[0] && + std::memcmp(bucket.key_data, key, key_len) == 0) { + bucket.is_new = false; + return bucket; // Found + } + idx = (idx + 1) & mask_; // Linear probe + } + return buckets_[idx]; // Shouldn't reach here + } + + HashBucket& find_or_insert_int64(int64_t key, uint64_t hash) { + if (num_occupied_ >= buckets_.size() * kLoadFactor) { + grow(); + } + + uint8_t key_buf[sizeof(int64_t) + 1]; + key_buf[0] = 0x02; + std::memcpy(&key_buf[1], &key, sizeof(int64_t)); + + size_t idx = hash & mask_; + for (size_t probes = 0; probes < buckets_.size(); ++probes) { + auto& bucket = buckets_[idx]; + if (!bucket.occupied) { + bucket.occupied = true; + bucket.is_new = true; + bucket.key_hash = hash; + bucket.key_int64 = key; + bucket.key_type = 0x02; + bucket.key_len = sizeof(int64_t) + 1; + std::memcpy(bucket.key_data, key_buf, bucket.key_len); + num_occupied_++; + valid_indices_.push_back(idx); + return bucket; + } + if (bucket.key_type == 0x02 && bucket.key_int64 == key) { + bucket.is_new = false; + return bucket; + } + idx = (idx + 1) & mask_; + } + return buckets_[idx]; + } + + void grow() { + auto old_buckets = std::move(buckets_); + size_t new_cap = old_buckets.empty() ? kInitialCapacity : old_buckets.size() * 2; + buckets_.assign(new_cap, HashBucket()); + mask_ = new_cap - 1; + num_occupied_ = 0; + valid_indices_.clear(); + + for (size_t i = 0; i < old_buckets.size(); ++i) { + if (old_buckets[i].occupied) { + if (old_buckets[i].key_type == 0x02) { + find_or_insert_int64(old_buckets[i].key_int64, old_buckets[i].key_hash); + } else { + find_or_insert(old_buckets[i].key_data, old_buckets[i].key_len, + old_buckets[i].key_hash); + } + } + } + } + + size_t group_count() const { return valid_indices_.size(); } + const std::vector& valid_slots() const { return valid_indices_; } + HashBucket& slot(size_t idx) { return buckets_[idx]; } + const HashBucket& slot(size_t idx) const { return buckets_[idx]; } +}; + +/** + * @brief Direct-indexed aggregation for low-cardinality integer GROUP BY. + * + * When the number of distinct GROUP BY values is small, we can use a + * simple vector indexed by key value rather than a hash table. This avoids: + * - Hash computation per row + * - Hash table probing and collision handling + * - String key allocation and comparison + * + * For each row: slot_idx = (key - min_key) where min_key is the + * minimum key value observed. This gives O(1) direct indexing. + */ +class DirectIndexAgg { + public: + static constexpr size_t MAX_AGGREGATES = 8; + static constexpr size_t MAX_GROUP_KEYS = 2; + + private: + struct GroupSlot { + bool valid = false; + int64_t key1 = 0; + int64_t key2 = 0; + int64_t counts[MAX_AGGREGATES] = {0}; + int64_t sums_int64[MAX_AGGREGATES] = {0}; + double sums_float64[MAX_AGGREGATES / 2] = {0.0}; + bool has_float_value[MAX_AGGREGATES] = {false}; + }; + + std::vector slots_; + mutable size_t max_aggregates_ = 0; + mutable size_t max_group_keys_ = 0; + mutable int64_t min_key_ = INT64_MAX; + mutable int64_t max_key_ = INT64_MIN; + + public: + void init(size_t capacity_hint, size_t max_aggregates, size_t max_group_keys = 1) { + max_aggregates_ = max_aggregates; + max_group_keys_ = max_group_keys; + min_key_ = INT64_MAX; + max_key_ = INT64_MIN; + slots_.resize(capacity_hint); + } + + GroupSlot& slot(size_t idx) { return slots_[idx]; } + const GroupSlot& slot(size_t idx) const { return slots_[idx]; } + + size_t find_or_insert(int64_t key1, int64_t key2 = 0) { + // Expand if key outside current range + if (key1 < min_key_ || key1 > max_key_) { + if (key1 < min_key_) min_key_ = key1; + if (key1 > max_key_) max_key_ = key1; + size_t new_size = static_cast(max_key_ - min_key_ + 1); + if (new_size > slots_.size()) { + size_t alloc_size = 1; + while (alloc_size < new_size) alloc_size *= 2; + slots_.resize(alloc_size); + } + } + size_t idx = static_cast(key1 - min_key_); + slots_[idx].valid = true; // Mark valid on first insertion + return idx; + } + + size_t group_count() const { + size_t count = 0; + for (const auto& s : slots_) { + if (s.valid) ++count; + } + return count; + } + + int64_t min_key() const { return min_key_; } + int64_t max_key() const { return max_key_; } +}; + /** * @brief Group state for vectorized GROUP BY - accumulator data per group */ @@ -342,6 +647,15 @@ class VectorizedGroupByOperator : public VectorizedOperator { std::unique_ptr input_batch_; std::unique_ptr group_key_batch_; + // Direct-index aggregation (for low-cardinality integer GROUP BY) + DirectIndexAgg agg_; + bool is_direct_indexable_ = false; + std::vector direct_group_keys_; // Ordered keys for direct index output + + // Open-addressing hash aggregation (for general GROUP BY) + OpenAddressHashAgg hash_agg_; + std::vector> hash_group_keys_; // Ordered group keys for iteration + public: VectorizedGroupByOperator(std::unique_ptr child, std::vector> group_by, @@ -359,6 +673,22 @@ class VectorizedGroupByOperator : public VectorizedOperator { group_by_col_indices_.push_back(col_idx); } + // Check if we can use direct indexing (single integer GROUP BY column) + bool is_int_key = (group_by_col_indices_[0] != static_cast(-1)); + if (is_int_key) { + auto col_type = schema.get_column(group_by_col_indices_[0]).type(); + is_int_key = (col_type == common::ValueType::TYPE_INT64 || + col_type == common::ValueType::TYPE_INT32 || + col_type == common::ValueType::TYPE_INT16 || + col_type == common::ValueType::TYPE_INT8); + } + is_direct_indexable_ = (group_by_.size() == 1 && is_int_key); + if (is_direct_indexable_) { + agg_.init(65536, aggregates_.size(), group_by_.size()); + } else { + hash_agg_.init(65536, aggregates_.size()); + } + // Create schema for group key evaluation Schema key_schema; for (size_t i = 0; i < group_by_.size(); ++i) { @@ -382,14 +712,66 @@ class VectorizedGroupByOperator : public VectorizedOperator { private: void process_input_batch(VectorBatch& batch) { - // For each row, compute hash key using collision-safe encoding + if (is_direct_indexable_) { + process_input_batch_direct(batch); + } else { + process_input_batch_open_addressing(batch); + } + } + + void process_input_batch_direct(VectorBatch& batch) { + // Fast path: direct integer key indexing + const size_t gb_col_idx = group_by_col_indices_[0]; + const auto& gb_col = batch.get_column(gb_col_idx); + for (size_t r = 0; r < batch.row_count(); ++r) { - // Build key using length-prefixed, type-tagged encoding - std::string key; + int64_t key = gb_col.get(r).to_int64(); + size_t slot_idx = agg_.find_or_insert(key, 0); + auto& slot = agg_.slot(slot_idx); + + if (!slot.valid) { + slot.valid = true; + slot.key1 = key; + direct_group_keys_.push_back(key); + } + + // Update accumulators directly in slot + for (size_t i = 0; i < aggregates_.size(); ++i) { + const auto& agg = aggregates_[i]; + if (agg.type == AggregateType::Count && agg.input_col_idx < 0) { + slot.counts[i]++; + } else if ((agg.type == AggregateType::Sum || agg.type == AggregateType::Avg) && + agg.input_col_idx >= 0) { + const auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(r)) { + slot.counts[i]++; + if (col.type() == common::ValueType::TYPE_INT64) { + auto& num_col = dynamic_cast&>(col); + slot.sums_int64[i] += num_col.raw_data()[r]; + } else if (col.type() == common::ValueType::TYPE_FLOAT64) { + auto& num_col = dynamic_cast&>(col); + slot.sums_float64[i] += num_col.raw_data()[r]; + slot.has_float_value[i] = true; + } + } + } + } + } + input_batch_->clear(); + } + + void process_input_batch_open_addressing(VectorBatch& batch) { + // Fast path: open-addressing hash with binary key encoding + for (size_t r = 0; r < batch.row_count(); ++r) { + // Encode key: [type tag][len][data] + uint8_t key_buf[64]; + uint8_t* key_ptr = key_buf; + std::vector heap_key; + size_t key_len = 0; + for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { size_t col_idx = group_by_col_indices_[i]; if (col_idx == static_cast(-1)) { - // Column not found in schema - fail fast set_error("GROUP BY: column not found in input schema: " + group_by_[i]->to_string()); return; @@ -397,39 +779,75 @@ class VectorizedGroupByOperator : public VectorizedOperator { const auto& val = batch.get_column(col_idx).get(r); if (val.is_null()) { - // Use a dedicated NULL marker for null values - key.append("\1NULL\0", 6); + key_ptr[key_len++] = 0x01; // NULL tag + } else if (val.type() == common::ValueType::TYPE_INT64) { + key_ptr[key_len++] = 0x02; // INT64 tag + int64_t v = val.to_int64(); + std::memcpy(&key_ptr[key_len], &v, sizeof(int64_t)); + key_len += sizeof(int64_t); } else { - // Length-prefixed value: marker + length (4 bytes) + data - std::string val_str = val.to_string(); - key.push_back('\0'); // non-NULL marker + key_ptr[key_len++] = 0x04; // STRING tag + std::string val_str = val.as_text(); uint32_t len = static_cast(val_str.size()); - key.append(reinterpret_cast(&len), 4); - key.append(val_str); + if (key_len + 4 + val_str.size() > 64) { + heap_key.resize(key_len + 4 + val_str.size()); + std::memcpy(heap_key.data(), key_ptr, key_len); + key_ptr = heap_key.data(); + } + std::memcpy(&key_ptr[key_len], &len, 4); + key_len += 4; + std::memcpy(&key_ptr[key_len], val_str.data(), val_str.size()); + key_len += val_str.size(); } } - // Get or create group state - auto it = groups_.find(key); - if (it == groups_.end()) { - // Store group key values for output + uint64_t hash = OpenAddressHashAgg::hash_bytes(key_ptr, key_len); + auto& bucket = hash_agg_.find_or_insert(key_ptr, key_len, hash); + + // Store key for output if first time + if (bucket.is_new) { std::vector key_vals; for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { - size_t col_idx = group_by_col_indices_[i]; - if (col_idx == static_cast(-1)) { - key_vals.push_back(common::Value::make_null()); - } else { - key_vals.push_back(batch.get_column(col_idx).get(r)); - } + key_vals.push_back(batch.get_column(group_by_col_indices_[i]).get(r)); } - auto result = groups_.emplace(key, VectorizedGroupState(aggregates_.size())); - it = result.first; - group_keys_.push_back(key); - group_values_.push_back(std::move(key_vals)); + hash_group_keys_.push_back(std::move(key_vals)); } - // Update accumulators for this row - update_accumulators(it->second, batch, r); + // Update accumulators directly in bucket + for (size_t i = 0; i < aggregates_.size(); ++i) { + const auto& agg = aggregates_[i]; + if (agg.type == AggregateType::Count && agg.input_col_idx < 0) { + bucket.counts[i]++; + } else if ((agg.type == AggregateType::Sum || agg.type == AggregateType::Avg) && + agg.input_col_idx >= 0) { + const auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(r)) { + bucket.counts[i]++; + if (col.type() == common::ValueType::TYPE_INT64) { + auto& num_col = dynamic_cast&>(col); + bucket.sums_int64[i] += num_col.raw_data()[r]; + } else if (col.type() == common::ValueType::TYPE_FLOAT64) { + auto& num_col = dynamic_cast&>(col); + bucket.sums_float64[i] += num_col.raw_data()[r]; + bucket.has_float_value[i] = true; + } + } + } else if ((agg.type == AggregateType::Min || agg.type == AggregateType::Max) && + agg.input_col_idx >= 0) { + const auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(r)) { + auto val = col.get(r).to_int64(); + if (!bucket.has_mins[i]) { + bucket.mins[i] = val; + bucket.maxes[i] = val; + bucket.has_mins[i] = true; + } else { + bucket.mins[i] = std::min(bucket.mins[i], val); + bucket.maxes[i] = std::max(bucket.maxes[i], val); + } + } + } + } } input_batch_->clear(); } @@ -441,9 +859,11 @@ class VectorizedGroupByOperator : public VectorizedOperator { if (agg.type == AggregateType::Count && agg.input_col_idx < 0) { // COUNT(*) - always increment state.counts[i]++; - } else if (agg.type == AggregateType::Sum && agg.input_col_idx >= 0) { + } else if ((agg.type == AggregateType::Sum || agg.type == AggregateType::Avg) && + agg.input_col_idx >= 0) { auto& col = batch.get_column(agg.input_col_idx); if (!col.is_null(row_idx)) { + state.counts[i]++; // Track count for AVG if (col.type() == common::ValueType::TYPE_INT64) { auto& num_col = dynamic_cast&>(col); state.sums_int64[i] += num_col.raw_data()[row_idx]; @@ -472,6 +892,168 @@ class VectorizedGroupByOperator : public VectorizedOperator { } bool produce_output_batch(VectorBatch& out_batch) { + if (is_direct_indexable_) { + return produce_output_batch_direct(out_batch); + } + return produce_output_batch_open_addressing(out_batch); + } + + bool produce_output_batch_direct(VectorBatch& out_batch) { + if (current_group_idx_ >= direct_group_keys_.size()) { + return false; // EOF + } + + out_batch.clear(); + if (out_batch.column_count() == 0) { + out_batch.init_from_schema(output_schema_); + } + + constexpr size_t BATCH_SIZE = 1024; + size_t output_count = 0; + + // Iterate through direct_group_keys_ and emit groups + while (current_group_idx_ < direct_group_keys_.size() && output_count < BATCH_SIZE) { + int64_t key = direct_group_keys_[current_group_idx_]; + size_t slot_idx = static_cast(key - agg_.min_key()); + const auto& slot = agg_.slot(slot_idx); + + // Append group key column + out_batch.get_column(0).append(common::Value::make_int64(key)); + + // Append aggregate result columns + for (size_t i = 0; i < aggregates_.size(); ++i) { + size_t col_idx = group_by_.size() + i; + switch (aggregates_[i].type) { + case AggregateType::Count: + out_batch.get_column(col_idx).append( + common::Value::make_int64(slot.counts[i])); + break; + case AggregateType::Sum: + if (output_schema_.get_column(col_idx).type() == + common::ValueType::TYPE_INT64) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(slot.sums_int64[i])); + } else { + double float_val = slot.has_float_value[i] + ? slot.sums_float64[i] + : static_cast(slot.sums_int64[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(float_val)); + } + break; + case AggregateType::Avg: + if (slot.counts[i] > 0) { + double avg_val = + slot.has_float_value[i] + ? slot.sums_float64[i] / static_cast(slot.counts[i]) + : static_cast(slot.sums_int64[i]) / + static_cast(slot.counts[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(avg_val)); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; + default: + out_batch.get_column(col_idx).append(common::Value::make_null()); + break; + } + } + output_count++; + current_group_idx_++; + } + + out_batch.set_row_count(output_count); + return true; + } + + bool produce_output_batch_open_addressing(VectorBatch& out_batch) { + if (current_group_idx_ >= hash_agg_.group_count()) { + return false; // EOF + } + + out_batch.clear(); + if (out_batch.column_count() == 0) { + out_batch.init_from_schema(output_schema_); + } + + constexpr size_t BATCH_SIZE = 1024; + size_t output_count = 0; + + while (current_group_idx_ < hash_group_keys_.size() && output_count < BATCH_SIZE) { + size_t slot_idx = hash_agg_.valid_slots()[current_group_idx_]; + const auto& bucket = hash_agg_.slot(slot_idx); + + // Append group key columns + const auto& key_vals = hash_group_keys_[current_group_idx_]; + for (size_t i = 0; i < key_vals.size(); ++i) { + out_batch.get_column(i).append(key_vals[i]); + } + + // Append aggregate result columns + for (size_t i = 0; i < aggregates_.size(); ++i) { + size_t col_idx = group_by_.size() + i; + switch (aggregates_[i].type) { + case AggregateType::Count: + out_batch.get_column(col_idx).append( + common::Value::make_int64(bucket.counts[i])); + break; + case AggregateType::Sum: + if (output_schema_.get_column(col_idx).type() == + common::ValueType::TYPE_INT64) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(bucket.sums_int64[i])); + } else { + double float_val = bucket.has_float_value[i] + ? bucket.sums_float64[i] + : static_cast(bucket.sums_int64[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(float_val)); + } + break; + case AggregateType::Avg: + if (bucket.counts[i] > 0) { + double avg_val = + bucket.has_float_value[i] + ? bucket.sums_float64[i] / static_cast(bucket.counts[i]) + : static_cast(bucket.sums_int64[i]) / + static_cast(bucket.counts[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(avg_val)); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; + case AggregateType::Min: + if (bucket.has_mins[i]) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(bucket.mins[i])); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; + case AggregateType::Max: + if (bucket.has_mins[i]) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(bucket.maxes[i])); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; + default: + out_batch.get_column(col_idx).append(common::Value::make_null()); + break; + } + } + output_count++; + current_group_idx_++; + } + + out_batch.set_row_count(output_count); + return true; + } + + bool produce_output_batch_hash(VectorBatch& out_batch) { if (current_group_idx_ >= group_keys_.size()) { return false; // EOF } @@ -525,6 +1107,19 @@ class VectorizedGroupByOperator : public VectorizedOperator { case AggregateType::Max: out_batch.get_column(col_idx).append(state.maxes[i]); break; + case AggregateType::Avg: + if (state.counts[i] > 0) { + double avg_val = + state.has_float_value_[i] + ? state.sums_float64[i] / static_cast(state.counts[i]) + : static_cast(state.sums_int64[i]) / + static_cast(state.counts[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(avg_val)); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; default: out_batch.get_column(col_idx).append(common::Value::make_null()); break; @@ -777,7 +1372,6 @@ class VectorizedHashJoinOperator : public VectorizedOperator { if (out_batch.row_count() >= BATCH_SIZE) { // Batch full - save state and return resuming_bucket_scan_ = true; - resumed_bucket_idx_ = resumed_bucket_idx_; resumed_entry_idx_ = i; resumed_key_val_ = key_val; return true; // Caller must consume batch before continuing diff --git a/include/storage/memory_mapped_column.hpp b/include/storage/memory_mapped_column.hpp new file mode 100644 index 00000000..f20ace4e --- /dev/null +++ b/include/storage/memory_mapped_column.hpp @@ -0,0 +1,60 @@ +/** + * @file memory_mapped_column.hpp + * @brief Memory-mapped column data for zero-copy reads + */ + +#pragma once + +#include +#include + +namespace cloudsql::storage { + +/** + * @brief Memory-mapped column data for zero-copy reads. + * + * Maps a column's `.data.bin` and `.nulls.bin` files into memory. + * For fixed-width types, provides O(1) access to any row via pointer arithmetic. + */ +class MemoryMappedColumn { + public: + struct MappedRegion { + void* addr; // mmap'd base address + size_t size; // mapped size in bytes + int fd; // file descriptor (for munmap) + }; + + private: + MappedRegion data_region_{nullptr, 0, -1}; + MappedRegion null_region_{nullptr, 0, -1}; + size_t element_size_ = 0; // stride for fixed-width columns + bool is_fixed_width_ = false; + size_t row_count_ = 0; + + public: + ~MemoryMappedColumn() { unmap(); } + + // Map a column's data and nulls files. Returns true on success. + bool map(const std::string& data_path, const std::string& null_path, size_t element_size, + size_t row_count); + + // Unmap and release resources + void unmap(); + + // Direct pointer to element at row index (fixed-width only) + const void* data_at(size_t row_idx) const { + if (!data_region_.addr) return nullptr; + return static_cast(data_region_.addr) + row_idx * element_size_; + } + + // Direct pointer to null bit at row index + const uint8_t* null_at(size_t row_idx) const { + if (!null_region_.addr) return nullptr; + return static_cast(null_region_.addr) + row_idx; + } + + bool is_mapped() const { return data_region_.addr != nullptr; } + size_t row_count() const { return row_count_; } +}; + +} // namespace cloudsql::storage diff --git a/src/executor/operator.cpp b/src/executor/operator.cpp index 4af7dc06..bc58f735 100644 --- a/src/executor/operator.cpp +++ b/src/executor/operator.cpp @@ -562,7 +562,8 @@ bool AggregateOperator::open() { } }; - std::map groups_map; + std::unordered_map groups_map; + groups_map.reserve(1024); // Pre-reserve to avoid rehashing during insert const bool is_global = group_by_.empty(); /* Pre-initialize if global aggregation */ @@ -573,17 +574,28 @@ bool AggregateOperator::open() { Tuple tuple; auto child_schema = child_->output_schema(); while (child_->next(tuple)) { - std::string key = "GLOBAL"; + std::string key; + key.reserve(64); // Pre-reserve to avoid repeated allocations std::vector gb_vals; if (!is_global) { - key = ""; for (const auto& gb : group_by_) { auto val = gb ? gb->evaluate(&tuple, &child_schema, get_params()) : common::Value::make_null(); - key += val.to_string() + "|"; + // Binary key encoding: type tag + length + data (no string allocation) + if (val.is_null()) { + key.append("\1NULL\0", 6); + } else { + std::string val_str = val.to_string(); + key.push_back('\0'); // non-NULL marker + uint32_t len = static_cast(val_str.size()); + key.append(reinterpret_cast(&len), 4); + key.append(val_str); + } gb_vals.push_back(std::move(val)); } + } else { + key = "GLOBAL"; } auto it = groups_map.find(key); diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index f3fea5ba..a361a5af 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -407,7 +407,8 @@ QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, // Cost-based Volcano/Vectorized chooser using row estimates bool use_vectorized = false; - if (parallel_ && storage_manager_ && !has_sort_or_limit) { + uint64_t estimated_rows = 0; + if (storage_manager_ && !has_sort_or_limit) { // Extract table name from FROM clause (only for simple column refs) // Fall through to Volcano for JOINs, subqueries, and aliased tables const auto* from_expr = stmt.from(); @@ -418,7 +419,7 @@ QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, const auto* table_meta = table_meta_opt.value(); // Start with scan estimate as baseline; filter selectivity will override if // eligible - uint64_t estimated_rows = optimizer::RowEstimator::estimate_scan_rows(*table_meta); + estimated_rows = optimizer::RowEstimator::estimate_scan_rows(*table_meta); // Use filter selectivity when WHERE clause is simple and stats available if (stmt.where() && stmt.where()->type() == parser::ExprType::Binary) { @@ -450,7 +451,10 @@ QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, } // Use Vectorized for large scans (>10k rows — heuristic crossover point) - use_vectorized = estimated_rows > kVectorizedRowThreshold; + // Force vectorized for GROUP BY when ANALYZE hasn't been run (num_rows=0) + // to benefit from DirectIndexAgg optimization + use_vectorized = (estimated_rows > kVectorizedRowThreshold) || + (!stmt.group_by().empty() && table_meta->num_rows == 0); } } } @@ -1508,8 +1512,9 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( return nullptr; // Table not found or not columnar } + auto thread_pool = std::make_shared(std::thread::hardware_concurrency()); std::unique_ptr current_root = - std::make_unique(base_table_name, col_table); + std::make_unique(base_table_name, col_table, thread_pool); // Track estimated output rows for join reordering decisions uint64_t current_est_rows = optimizer::RowEstimator::estimate_scan_rows(*base_table_meta); @@ -1694,7 +1699,18 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( info.type = AggregateType::Max; else info.type = AggregateType::Avg; - info.input_col_idx = -1; // default + // Resolve input column index from aggregate function arguments + info.input_col_idx = -1; // default: COUNT(*) + if (!func->args().empty()) { + const auto& arg = func->args()[0]; + if (arg->type() == parser::ExprType::Column) { + const auto* col = dynamic_cast(arg.get()); + if (col != nullptr) { + info.input_col_idx = static_cast( + current_root->output_schema().find_column(col->name())); + } + } + } agg_infos.push_back(info); } } diff --git a/src/storage/memory_mapped_column.cpp b/src/storage/memory_mapped_column.cpp new file mode 100644 index 00000000..c902233e --- /dev/null +++ b/src/storage/memory_mapped_column.cpp @@ -0,0 +1,78 @@ +/** + * @file memory_mapped_column.cpp + * @brief Memory-mapped column implementation + */ + +#include "storage/memory_mapped_column.hpp" + +#include +#include +#include +#include + +#include + +namespace cloudsql::storage { + +bool MemoryMappedColumn::map(const std::string& data_path, const std::string& null_path, + size_t element_size, size_t row_count) { + // Map data file + int data_fd = ::open(data_path.c_str(), O_RDONLY); + if (data_fd < 0) return false; + + struct stat st; + if (fstat(data_fd, &st) < 0) { + ::close(data_fd); + return false; + } + + void* data_ptr = mmap(nullptr, st.st_size, PROT_READ, MAP_PRIVATE, data_fd, 0); + if (data_ptr == MAP_FAILED) { + ::close(data_fd); + return false; + } + + data_region_ = {data_ptr, static_cast(st.st_size), data_fd}; + + // Map nulls file + int null_fd = ::open(null_path.c_str(), O_RDONLY); + if (null_fd < 0) { + unmap(); + return false; + } + + if (fstat(null_fd, &st) < 0) { + ::close(null_fd); + unmap(); + return false; + } + + void* null_ptr = mmap(nullptr, st.st_size, PROT_READ, MAP_PRIVATE, null_fd, 0); + if (null_ptr == MAP_FAILED) { + ::close(null_fd); + unmap(); + return false; + } + + null_region_ = {null_ptr, static_cast(st.st_size), null_fd}; + + element_size_ = element_size; + is_fixed_width_ = true; + row_count_ = row_count; + return true; +} + +void MemoryMappedColumn::unmap() { + if (data_region_.addr) { + munmap(data_region_.addr, data_region_.size); + ::close(data_region_.fd); + data_region_ = {nullptr, 0, -1}; + } + if (null_region_.addr) { + munmap(null_region_.addr, null_region_.size); + ::close(null_region_.fd); + null_region_ = {nullptr, 0, -1}; + } +} + +} // namespace cloudsql::storage diff --git a/tests/analytics_tests.cpp b/tests/analytics_tests.cpp index 780e332d..ed28cb9e 100644 --- a/tests/analytics_tests.cpp +++ b/tests/analytics_tests.cpp @@ -54,7 +54,7 @@ TEST(AnalyticsTests, ColumnarTableLifecycle) { // 3. Scan and verify round-trip integrity auto table_ptr = std::make_shared(table); - VectorizedSeqScanOperator scan("lifecycle_test", table_ptr); + VectorizedSeqScanOperator scan("lifecycle_test", table_ptr, nullptr); auto result_batch = VectorBatch::create(schema); ASSERT_TRUE(scan.next_batch(*result_batch)); @@ -99,7 +99,7 @@ TEST(AnalyticsTests, VectorizedExecutionPipeline) { ASSERT_TRUE(table->append_batch(*input_batch)); // 2. Build Pipeline: Scan -> Filter(id > 500) -> Project(val) - auto scan = std::make_unique("pipeline_test", table); + auto scan = std::make_unique("pipeline_test", table, nullptr); // Filter condition: id > 500 auto col_expr = std::make_unique("id"); @@ -158,7 +158,7 @@ TEST(AnalyticsTests, VectorizedAggregation) { ASSERT_TRUE(table->append_batch(*input_batch)); // 2. Build Agg Pipeline: Scan -> Aggregate(COUNT(*), SUM(val), SUM(fval)) - auto scan = std::make_unique("agg_test", table); + auto scan = std::make_unique("agg_test", table, nullptr); Schema out_schema; out_schema.add_column("count", common::ValueType::TYPE_INT64); @@ -200,7 +200,7 @@ TEST(AnalyticsTests, AggregateNullHandling) { ASSERT_TRUE(table->append_batch(*input_batch)); // 2. Build Agg Pipeline: Scan -> Aggregate(COUNT(*), SUM(val)) - auto scan = std::make_unique("null_agg_test", table); + auto scan = std::make_unique("null_agg_test", table, nullptr); Schema out_schema; out_schema.add_column("count", common::ValueType::TYPE_INT64); diff --git a/tests/vectorized_operator_tests.cpp b/tests/vectorized_operator_tests.cpp index a0b92f43..3b46cf53 100644 --- a/tests/vectorized_operator_tests.cpp +++ b/tests/vectorized_operator_tests.cpp @@ -114,7 +114,7 @@ TEST_F(VectorizedSeqScanTests, SequentialCallsUntilEOF) { ASSERT_TRUE(table.append_batch(*batch)); auto table_ptr = std::make_shared(table); - VectorizedSeqScanOperator scan("sequential_scan", table_ptr); + VectorizedSeqScanOperator scan("sequential_scan", table_ptr, nullptr); auto result = VectorBatch::create(schema); int64_t expected = 0;