diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c66123facb62..6b92a230793e 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2025,6 +2025,9 @@ async fn collect_left_input( offset += batch.num_rows(); } + // Flatten linked-list chains into contiguous storage for faster probing + hashmap.flatten(); + // Merge all batches into a single batch, so we can directly index into the arrays let batch = concat_batches(&schema, batches_iter.clone())?; @@ -4295,9 +4298,10 @@ mod tests { Ok(()) } - #[test] - fn join_with_hash_collisions_64() -> Result<()> { - let mut hashmap_left = HashTable::with_capacity(4); + /// Tests that hash collisions are resolved by equal_rows_arr. + /// Both build rows get the same collision hash; the probe should still + /// match each to the correct build row via value comparison. + fn hash_collision_test(mut join_hash_map: T) -> Result<()> { let left = build_table_i32( ("a", &vec![10, 20]), ("x", &vec![100, 200]), @@ -4306,19 +4310,13 @@ mod tests { let random_state = RandomState::with_seeds(0, 0, 0, 0); let hashes_buff = &mut vec![0; left.num_rows()]; - let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?; - - // Maps both values to both indices (1 and 2, representing input 0 and 1) - // 0 -> (0, 1) - // 1 -> (0, 2) - // The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1 - hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h); - hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h); + create_hashes([&left.columns()[0]], &random_state, hashes_buff)?; - hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h); - hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h); - - let next = vec![2, 0]; + // Force all build rows to have the same hash (collision) + let collision_hash = hashes_buff[0]; + let fake_hashes = vec![collision_hash; left.num_rows()]; + join_hash_map.update_from_iter(Box::new(fake_hashes.iter().enumerate()), 0); + join_hash_map.flatten(); let right = build_table_i32( ("a", &vec![10, 20]), @@ -4326,16 +4324,13 @@ mod tests { ("c", &vec![30, 40]), ); - // Join key column for both join sides let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _; - - let join_hash_map = JoinHashMapU64::new(hashmap_left, next); - let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?; let right_keys_values = key_column.evaluate(&right)?.into_array(right.num_rows())?; - let mut hashes_buffer = vec![0; right.num_rows()]; - create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?; + + // Probe hashes also use collision_hash for all entries + let hashes_buffer = vec![collision_hash; right.num_rows()]; let mut probe_indices_buffer = Vec::new(); let mut build_indices_buffer = Vec::new(); @@ -4352,74 +4347,21 @@ mod tests { )?; let left_ids: UInt64Array = vec![0, 1].into(); - let right_ids: UInt32Array = vec![0, 1].into(); - assert_eq!(left_ids, l); - assert_eq!(right_ids, r); Ok(()) } #[test] - fn join_with_hash_collisions_u32() -> Result<()> { - let mut hashmap_left = HashTable::with_capacity(4); - let left = build_table_i32( - ("a", &vec![10, 20]), - ("x", &vec![100, 200]), - ("y", &vec![200, 300]), - ); - - let random_state = RandomState::with_seeds(0, 0, 0, 0); - let hashes_buff = &mut vec![0; left.num_rows()]; - let hashes = create_hashes([&left.columns()[0]], &random_state, hashes_buff)?; - - hashmap_left.insert_unique(hashes[0], (hashes[0], 1u32), |(h, _)| *h); - hashmap_left.insert_unique(hashes[0], (hashes[0], 2u32), |(h, _)| *h); - hashmap_left.insert_unique(hashes[1], (hashes[1], 1u32), |(h, _)| *h); - hashmap_left.insert_unique(hashes[1], (hashes[1], 2u32), |(h, _)| *h); - - let next: Vec = vec![2, 0]; - - let right = build_table_i32( - ("a", &vec![10, 20]), - ("b", &vec![0, 0]), - ("c", &vec![30, 40]), - ); - - let key_column: PhysicalExprRef = Arc::new(Column::new("a", 0)) as _; - - let join_hash_map = JoinHashMapU32::new(hashmap_left, next); - - let left_keys_values = key_column.evaluate(&left)?.into_array(left.num_rows())?; - let right_keys_values = - key_column.evaluate(&right)?.into_array(right.num_rows())?; - let mut hashes_buffer = vec![0; right.num_rows()]; - create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?; - - let mut probe_indices_buffer = Vec::new(); - let mut build_indices_buffer = Vec::new(); - let (l, r, _) = lookup_join_hashmap( - &join_hash_map, - &[left_keys_values], - &[right_keys_values], - NullEquality::NullEqualsNothing, - &hashes_buffer, - 8192, - (0, None), - &mut probe_indices_buffer, - &mut build_indices_buffer, - )?; - - // We still expect to match rows 0 and 1 on both sides - let left_ids: UInt64Array = vec![0, 1].into(); - let right_ids: UInt32Array = vec![0, 1].into(); - - assert_eq!(left_ids, l); - assert_eq!(right_ids, r); + fn join_with_hash_collisions_64() -> Result<()> { + hash_collision_test(JoinHashMapU64::with_capacity(2)) + } - Ok(()) + #[test] + fn join_with_hash_collisions_u32() -> Result<()> { + hash_collision_test(JoinHashMapU32::with_capacity(2)) } #[tokio::test] diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index 8f0fb66b64fb..538f9bef77f7 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -15,9 +15,16 @@ // specific language governing permissions and limitations // under the License. -//! This file contains the implementation of the `JoinHashMap` struct, which -//! is used to store the mapping between hash values based on the build side -//! ["on" values] to a list of indices with this key's value. +//! Hash map for join operations using contiguous storage. +//! +//! Packed value encoding (stored in the hash table per key): +//! - **Inline** (single match): high bit set, remaining bits = row index +//! - **Group** (multiple matches): high bit clear, remaining bits = group ID +//! → `group_offsets[id]..group_offsets[id+1]` indexes into `flat_indices` +//! +//! Build phase: first insert per key is stored inline. Subsequent inserts for +//! the same key go into an overflow buffer. `flatten()` promotes inline entries +//! with overflow to groups and builds contiguous `flat_indices` in one sequential pass. use std::fmt::{self, Debug}; use std::ops::Sub; @@ -28,80 +35,12 @@ use arrow::datatypes::ArrowNativeType; use hashbrown::HashTable; use hashbrown::hash_table::Entry::{Occupied, Vacant}; -/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. -/// -/// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, -/// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value. -/// -/// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1 -/// As the key is a hash value, we need to check possible hash collisions in the probe stage -/// During this stage it might be the case that a row is contained the same hashmap value, -/// but the values don't match. Those are checked in the `equal_rows_arr` method. -/// -/// The indices (values) are stored in a separate chained list stored as `Vec` or `Vec`. -/// -/// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value. -/// -/// The chain can be followed until the value "0" has been reached, meaning the end of the list. -/// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487) -/// -/// # Example -/// -/// ``` text -/// See the example below: -/// -/// Insert (10,1) <-- insert hash value 10 with row index 1 -/// map: -/// ---------- -/// | 10 | 2 | -/// ---------- -/// next: -/// --------------------- -/// | 0 | 0 | 0 | 0 | 0 | -/// --------------------- -/// Insert (20,2) -/// map: -/// ---------- -/// | 10 | 2 | -/// | 20 | 3 | -/// ---------- -/// next: -/// --------------------- -/// | 0 | 0 | 0 | 0 | 0 | -/// --------------------- -/// Insert (10,3) <-- collision! row index 3 has a hash value of 10 as well -/// map: -/// ---------- -/// | 10 | 4 | -/// | 20 | 3 | -/// ---------- -/// next: -/// --------------------- -/// | 0 | 0 | 0 | 2 | 0 | <--- hash value 10 maps to 4,2 (which means indices values 3,1) -/// --------------------- -/// Insert (10,4) <-- another collision! row index 4 ALSO has a hash value of 10 -/// map: -/// --------- -/// | 10 | 5 | -/// | 20 | 3 | -/// --------- -/// next: -/// --------------------- -/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 10 maps to 5,4,2 (which means indices values 4,3,1) -/// --------------------- -/// ``` -/// -/// Here we have an option between creating a `JoinHashMapType` using `u32` or `u64` indices -/// based on how many rows were being used for indices. -/// -/// At runtime we choose between using `JoinHashMapU32` and `JoinHashMapU64` which oth implement -/// `JoinHashMapType`. -/// -/// ## Note on use of this trait as a public API -/// This is currently a public trait but is mainly intended for internal use within DataFusion. -/// For example, we may compare references to `JoinHashMapType` implementations by pointer equality -/// rather than deep equality of contents, as deep equality would be expensive and in our usage -/// patterns it is impossible for two different hash maps to have identical contents in a practical sense. +use crate::joins::MapOffset; +use crate::joins::chain::traverse_chain; + +const INLINE_BIT_U32: u32 = 1 << 31; +const INLINE_BIT_U64: u64 = 1 << 63; + pub trait JoinHashMapType: Send + Sync { fn extend_zero(&mut self, len: usize); @@ -126,138 +65,339 @@ pub trait JoinHashMapType: Send + Sync { match_indices: &mut Vec, ) -> Option; - /// Returns a BooleanArray indicating which of the provided hashes exist in the map. fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray; - - /// Returns `true` if the join hash map contains no entries. fn is_empty(&self) -> bool; - - /// Returns the number of entries in the join hash map. fn len(&self) -> usize; + + /// Flatten overflow into contiguous storage. Call after all inserts, before probing. + fn flatten(&mut self); +} + +// --- InlineBit: packed value encoding --- + +trait InlineBit: Copy + PartialEq { + fn is_inline(self) -> bool; + fn inline_value(self) -> u64; + fn make_inline(row: u64) -> Self; + fn make_group_id(id: u32) -> Self; + fn group_id(self) -> u32; } -pub struct JoinHashMapU32 { - // Stores hash value to last row index - map: HashTable<(u64, u32)>, - // Stores indices in chained list data structure - next: Vec, +impl InlineBit for u32 { + #[inline(always)] + fn is_inline(self) -> bool { + self & INLINE_BIT_U32 != 0 + } + #[inline(always)] + fn inline_value(self) -> u64 { + (self & !INLINE_BIT_U32) as u64 + } + #[inline(always)] + fn make_inline(row: u64) -> u32 { + INLINE_BIT_U32 | row as u32 + } + #[inline(always)] + fn make_group_id(id: u32) -> u32 { + id + } + #[inline(always)] + fn group_id(self) -> u32 { + self + } } -impl JoinHashMapU32 { - #[cfg(test)] - pub(crate) fn new(map: HashTable<(u64, u32)>, next: Vec) -> Self { - Self { map, next } +impl InlineBit for u64 { + #[inline(always)] + fn is_inline(self) -> bool { + self & INLINE_BIT_U64 != 0 + } + #[inline(always)] + fn inline_value(self) -> u64 { + self & !INLINE_BIT_U64 + } + #[inline(always)] + fn make_inline(row: u64) -> u64 { + INLINE_BIT_U64 | row + } + #[inline(always)] + fn make_group_id(id: u32) -> u64 { + id as u64 + } + #[inline(always)] + fn group_id(self) -> u32 { + self as u32 } +} + +// --- Generic JoinHashMap --- + +/// Hash map for join build/probe using contiguous storage. +/// `T` is the packed map value type (u32 or u64). +/// `F` is the flat index type (u32 or u64), must impl `Into` for output. +pub(crate) struct JoinHashMap> { + map: HashTable<(u64, T)>, + flat_indices: Vec, + group_offsets: Vec, + overflow: Vec<(u32, F)>, + num_groups: u32, +} +pub type JoinHashMapU32 = JoinHashMap; +pub type JoinHashMapU64 = JoinHashMap; + +impl> JoinHashMap { pub fn with_capacity(cap: usize) -> Self { Self { map: HashTable::with_capacity(cap), - next: vec![0; cap], + flat_indices: Vec::new(), + group_offsets: Vec::new(), + overflow: Vec::new(), + num_groups: 0, } } } -impl Debug for JoinHashMapU32 { +impl> Debug for JoinHashMap { fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { Ok(()) } } -impl JoinHashMapType for JoinHashMapU32 { - fn extend_zero(&mut self, _: usize) {} +// --- Build --- - fn update_from_iter<'a>( - &mut self, - iter: Box + Send + 'a>, - deleted_offset: usize, - ) { - update_from_iter::(&mut self.map, &mut self.next, iter, deleted_offset); +fn build_insert>( + map: &mut HashTable<(u64, T)>, + num_groups: &mut u32, + overflow: &mut Vec<(u32, F)>, + row: usize, + hash_value: u64, +) where + F: From, +{ + let entry = map.entry(hash_value, |&(h, _)| hash_value == h, |&(h, _)| h); + match entry { + Occupied(mut occ) => { + let (_, packed) = occ.get_mut(); + if packed.is_inline() { + let old_row = packed.inline_value(); + let gid = *num_groups; + *num_groups += 1; + *packed = T::make_group_id(gid); + overflow.push((gid, F::from(old_row as u32))); + } + overflow.push((packed.group_id(), F::from(row as u32))); + } + Vacant(vac) => { + vac.insert((hash_value, T::make_inline(row as u64))); + } } +} - fn get_matched_indices<'a>( - &self, - iter: Box + 'a>, - deleted_offset: Option, - ) -> (Vec, Vec) { - get_matched_indices::(&self.map, &self.next, iter, deleted_offset) - } +// --- Flatten --- - fn get_matched_indices_with_limit_offset( - &self, - hash_values: &[u64], - limit: usize, - offset: MapOffset, - input_indices: &mut Vec, - match_indices: &mut Vec, - ) -> Option { - get_matched_indices_with_limit_offset::( - &self.map, - &self.next, - hash_values, - limit, - offset, - input_indices, - match_indices, - ) +fn flatten_overflow>( + num_groups: u32, + overflow: &mut Vec<(u32, F)>, + flat_indices: &mut Vec, + group_offsets: &mut Vec, +) { + if overflow.is_empty() { + return; } - fn contain_hashes(&self, hash_values: &[u64]) -> BooleanArray { - contain_hashes(&self.map, hash_values) - } + let ng = num_groups as usize; - fn is_empty(&self) -> bool { - self.map.is_empty() + // Count entries per group directly into group_offsets + group_offsets.clear(); + group_offsets.resize(ng + 1, 0); + for &(gid, _) in overflow.iter() { + group_offsets[gid as usize + 1] += 1; + } + // Prefix sum + for i in 1..=ng { + group_offsets[i] += group_offsets[i - 1]; } - fn len(&self) -> usize { - self.map.len() + // Place entries in reverse order (LIFO) to match linked-list traversal order. + let total = group_offsets[ng] as usize; + flat_indices.clear(); + flat_indices.resize(total, F::default()); + // Cursors start at end of each group and decrement + let mut cursors = group_offsets[1..=ng].to_vec(); + for &(gid, row) in overflow.iter() { + cursors[gid as usize] -= 1; + flat_indices[cursors[gid as usize] as usize] = row; } -} -pub struct JoinHashMapU64 { - // Stores hash value to last row index - map: HashTable<(u64, u64)>, - // Stores indices in chained list data structure - next: Vec, + overflow.clear(); } -impl JoinHashMapU64 { - #[cfg(test)] - pub(crate) fn new(map: HashTable<(u64, u64)>, next: Vec) -> Self { - Self { map, next } +// --- Probe --- + +/// Emit matches for a single packed entry. Returns `Some(offset)` if limit reached. +#[inline(always)] +fn emit_packed>( + packed: T, + row_idx: usize, + start_pos: usize, + flat_indices: &[F], + group_offsets: &[u32], + remaining: &mut usize, + input_indices: &mut Vec, + match_indices: &mut Vec, +) -> Option { + if packed.is_inline() { + if *remaining == 0 { + return Some((row_idx, None)); + } + match_indices.push(packed.inline_value()); + input_indices.push(row_idx as u32); + *remaining -= 1; + } else { + let gid = packed.group_id() as usize; + let start = if start_pos > 0 { + start_pos + } else { + group_offsets[gid] as usize + }; + let end = group_offsets[gid + 1] as usize; + for pos in start..end { + if *remaining == 0 { + return Some((row_idx, Some(pos as u64 + 1))); + } + match_indices.push(flat_indices[pos].into()); + input_indices.push(row_idx as u32); + *remaining -= 1; + } } + None +} - pub fn with_capacity(cap: usize) -> Self { - Self { - map: HashTable::with_capacity(cap), - next: vec![0; cap], +/// Probe the flattened hash map with batched finds (4 at a time). +/// +/// Offset convention: `Some(0)` = done with this probe idx. +/// For resume within a group: `Some(pos + 1)` where pos is the flat_indices position. +fn probe_flat>( + map: &HashTable<(u64, T)>, + flat_indices: &[F], + group_offsets: &[u32], + hash_values: &[u64], + limit: usize, + offset: MapOffset, + input_indices: &mut Vec, + match_indices: &mut Vec, +) -> Option { + input_indices.clear(); + match_indices.clear(); + let mut remaining = limit; + + let to_skip = match offset { + (idx, None) => idx, + (idx, Some(0)) => idx + 1, + (idx, Some(pos_plus_one)) => { + if let Some((_, packed)) = + map.find(hash_values[idx], |(h, _)| hash_values[idx] == *h) + { + let resume = (pos_plus_one - 1) as usize; + if let Some(off) = emit_packed( + *packed, + idx, + resume, + flat_indices, + group_offsets, + &mut remaining, + input_indices, + match_indices, + ) { + return Some(off); + } + } + idx + 1 + } + }; + + let remaining_slice = &hash_values[to_skip..]; + let chunks = remaining_slice.chunks_exact(4); + let remainder = chunks.remainder(); + + for (chunk_idx, chunk) in chunks.enumerate() { + let base = to_skip + chunk_idx * 4; + let r0 = map.find(chunk[0], |(h, _)| chunk[0] == *h); + let r1 = map.find(chunk[1], |(h, _)| chunk[1] == *h); + let r2 = map.find(chunk[2], |(h, _)| chunk[2] == *h); + let r3 = map.find(chunk[3], |(h, _)| chunk[3] == *h); + + for (j, r) in [r0, r1, r2, r3].into_iter().enumerate() { + if let Some((_, packed)) = r { + if let Some(off) = emit_packed( + *packed, + base + j, + 0, + flat_indices, + group_offsets, + &mut remaining, + input_indices, + match_indices, + ) { + return Some(off); + } + } } } -} -impl Debug for JoinHashMapU64 { - fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { - Ok(()) + let remainder_start = to_skip + remaining_slice.len() - remainder.len(); + for (i, &hash) in remainder.iter().enumerate() { + if let Some((_, packed)) = map.find(hash, |(h, _)| hash == *h) { + if let Some(off) = emit_packed( + *packed, + remainder_start + i, + 0, + flat_indices, + group_offsets, + &mut remaining, + input_indices, + match_indices, + ) { + return Some(off); + } + } } + None } -impl JoinHashMapType for JoinHashMapU64 { +// --- JoinHashMapType impl --- + +impl JoinHashMapType for JoinHashMap +where + T: InlineBit + Send + Sync, + F: Copy + Default + Into + From + Send + Sync, +{ fn extend_zero(&mut self, _: usize) {} fn update_from_iter<'a>( &mut self, iter: Box + Send + 'a>, - deleted_offset: usize, + _deleted_offset: usize, ) { - update_from_iter::(&mut self.map, &mut self.next, iter, deleted_offset); + for (row, hash) in iter { + build_insert( + &mut self.map, + &mut self.num_groups, + &mut self.overflow, + row, + *hash, + ); + } } fn get_matched_indices<'a>( &self, - iter: Box + 'a>, - deleted_offset: Option, + _iter: Box + 'a>, + _deleted_offset: Option, ) -> (Vec, Vec) { - get_matched_indices::(&self.map, &self.next, iter, deleted_offset) + unimplemented!( + "JoinHashMap does not support get_matched_indices; use get_matched_indices_with_limit_offset" + ) } fn get_matched_indices_with_limit_offset( @@ -268,9 +408,10 @@ impl JoinHashMapType for JoinHashMapU64 { input_indices: &mut Vec, match_indices: &mut Vec, ) -> Option { - get_matched_indices_with_limit_offset::( + probe_flat( &self.map, - &self.next, + &self.flat_indices, + &self.group_offsets, hash_values, limit, offset, @@ -290,10 +431,18 @@ impl JoinHashMapType for JoinHashMapU64 { fn len(&self) -> usize { self.map.len() } + + fn flatten(&mut self) { + flatten_overflow( + self.num_groups, + &mut self.overflow, + &mut self.flat_indices, + &mut self.group_offsets, + ); + } } -use crate::joins::MapOffset; -use crate::joins::chain::traverse_chain; +// --- Legacy free functions for PruningJoinHashMap (streaming join) --- pub fn update_from_iter<'a, T>( map: &mut HashTable<(u64, T)>, @@ -310,19 +459,15 @@ pub fn update_from_iter<'a, T>( |&(hash, _)| hash_value == hash, |&(hash, _)| hash, ); - match entry { - Occupied(mut occupied_entry) => { - // Already exists: add index to next array - let (_, index) = occupied_entry.get_mut(); + Occupied(mut occ) => { + let (_, index) = occ.get_mut(); let prev_index = *index; - // Store new value inside hashmap *index = T::try_from(row + 1).unwrap(); - // Update chained Vec at `row` with previous value next[row - deleted_offset] = prev_index; } - Vacant(vacant_entry) => { - vacant_entry.insert((hash_value, T::try_from(row + 1).unwrap())); + Vacant(vac) => { + vac.insert((hash_value, T::try_from(row + 1).unwrap())); } } } @@ -344,16 +489,13 @@ where let one = T::try_from(1).unwrap(); for (row_idx, hash_value) in iter { - // Get the hash and find it in the index if let Some((_, index)) = map.find(*hash_value, |(hash, _)| *hash_value == *hash) { let mut i = *index - one; loop { let match_row_idx = if let Some(offset) = deleted_offset { let offset = T::try_from(offset).unwrap(); - // This arguments means that we prune the next index way before here. if i < offset { - // End of the list due to pruning break; } i - offset @@ -362,17 +504,14 @@ where }; match_indices.push(match_row_idx.into()); input_indices.push(row_idx as u32); - // Follow the chain to get the next index value let next_chain = next[match_row_idx.into() as usize]; if next_chain == zero { - // end of list break; } i = next_chain - one; } } } - (input_indices, match_indices) } @@ -390,40 +529,14 @@ where >::Error: Debug, T: ArrowNativeType, { - // Clear the buffer before producing new results input_indices.clear(); match_indices.clear(); - let one = T::try_from(1).unwrap(); - - // Check if hashmap consists of unique values - // If so, we can skip the chain traversal - if map.len() == next_chain.len() { - let start = offset.0; - let end = (start + limit).min(hash_values.len()); - for (i, &hash) in hash_values[start..end].iter().enumerate() { - if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) { - input_indices.push(start as u32 + i as u32); - match_indices.push((*idx - one).into()); - } - } - return if end == hash_values.len() { - None - } else { - Some((end, None)) - }; - } let mut remaining_output = limit; - // Calculate initial `hash_values` index before iterating let to_skip = match offset { - // None `initial_next_idx` indicates that `initial_idx` processing hasn't been started (idx, None) => idx, - // Zero `initial_next_idx` indicates that `initial_idx` has been processed during - // previous iteration, and it should be skipped (idx, Some(0)) => idx + 1, - // Otherwise, process remaining `initial_idx` matches by traversing `next_chain`, - // to start with the next index (idx, Some(next_idx)) => { let next_idx: T = T::usize_as(next_idx as usize); let is_last = idx == hash_values.len() - 1; @@ -442,12 +555,11 @@ where } }; - let hash_values_len = hash_values.len(); for (i, &hash) in hash_values[to_skip..].iter().enumerate() { let row_idx = to_skip + i; if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) { let idx: T = *idx; - let is_last = row_idx == hash_values_len - 1; + let is_last = row_idx == hash_values.len() - 1; if let Some(next_offset) = traverse_chain( next_chain, row_idx, @@ -480,18 +592,86 @@ mod tests { fn test_contain_hashes() { let mut hash_map = JoinHashMapU32::with_capacity(10); hash_map.update_from_iter(Box::new([10u64, 20u64, 30u64].iter().enumerate()), 0); - let probe_hashes = vec![10, 11, 20, 21, 30, 31]; let array = hash_map.contain_hashes(&probe_hashes); - assert_eq!(array.len(), probe_hashes.len()); - for (i, &hash) in probe_hashes.iter().enumerate() { if matches!(hash, 10 | 20 | 30) { - assert!(array.value(i), "Hash {hash} should exist in the map"); + assert!(array.value(i), "Hash {hash} should exist"); } else { - assert!(!array.value(i), "Hash {hash} should NOT exist in the map"); + assert!(!array.value(i), "Hash {hash} should NOT exist"); } } } + + #[test] + fn test_unique() { + let mut m = JoinHashMapU32::with_capacity(3); + m.update_from_iter(Box::new([10u64, 20u64, 30u64].iter().enumerate()), 0); + m.flatten(); + let mut inp = vec![]; + let mut mat = vec![]; + let r = m.get_matched_indices_with_limit_offset( + &[10, 20, 30, 99], + 100, + (0, None), + &mut inp, + &mut mat, + ); + assert!(r.is_none()); + assert_eq!(mat.len(), 3); + } + + #[test] + fn test_with_dups() { + let mut m = JoinHashMapU32::with_capacity(4); + m.update_from_iter(Box::new([10u64, 10u64, 10u64, 20u64].iter().enumerate()), 0); + m.flatten(); + let mut inp = vec![]; + let mut mat = vec![]; + let r = m.get_matched_indices_with_limit_offset( + &[10, 20], + 100, + (0, None), + &mut inp, + &mut mat, + ); + assert!(r.is_none()); + assert_eq!(mat.len(), 4); + let mut h10: Vec = inp + .iter() + .zip(mat.iter()) + .filter(|&(&i, _)| i == 0) + .map(|(_, &m)| m) + .collect(); + h10.sort(); + assert_eq!(h10, vec![0, 1, 2]); + } + + #[test] + fn test_with_limit() { + let mut m = JoinHashMapU32::with_capacity(4); + m.update_from_iter(Box::new([10u64, 10u64, 10u64, 20u64].iter().enumerate()), 0); + m.flatten(); + let mut inp = vec![]; + let mut mat = vec![]; + let r = m.get_matched_indices_with_limit_offset( + &[10, 20], + 2, + (0, None), + &mut inp, + &mut mat, + ); + assert_eq!(mat.len(), 2); + assert!(r.is_some()); + let r = m.get_matched_indices_with_limit_offset( + &[10, 20], + 100, + r.unwrap(), + &mut inp, + &mut mat, + ); + assert!(r.is_none()); + assert_eq!(mat.len(), 2); + } } diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index beed07f562db..34d03e999474 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -107,6 +107,10 @@ impl JoinHashMapType for PruningJoinHashMap { fn len(&self) -> usize { self.map.len() } + + fn flatten(&mut self) { + // No-op: PruningJoinHashMap is incrementally updated and cannot be flattened. + } } /// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with