Skip to content

Improve the hash join performance by replacing the RawTable to a simple Vec for JoinHashMap #6913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 20 additions & 54 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,26 +566,21 @@ async fn collect_left_input(
reservation.try_grow(estimated_hastable_size)?;
metrics.build_mem_used.add(estimated_hastable_size);

let mut hashmap = JoinHashMap::with_capacity(num_rows);
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
update_hash(
&on_left,
batch,
&mut hashmap,
offset,
&random_state,
&mut hashes_buffer,
)?;
offset += batch.num_rows();
}
// Merge all batches into a single batch, so we
// can directly index into the arrays
let single_batch = concat_batches(&schema, &batches, num_rows)?;

let mut hashmap = JoinHashMap::with_capacity(estimated_buckets, num_rows);
let mut hashes_buffer = vec![0; num_rows];
update_hash(
&on_left,
&single_batch,
&mut hashmap,
0,
&random_state,
&mut hashes_buffer,
)?;

Ok((hashmap, single_batch, reservation))
}

Expand All @@ -610,26 +605,7 @@ pub fn update_hash(

// insert hashes to key of the hashmap
for (row, hash_value) in hash_values.iter().enumerate() {
let item = hash_map
.map
.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
if let Some((_, index)) = item {
// Already exists: add index to next array
let prev_index = *index;
// Store new value inside hashmap
*index = (row + offset + 1) as u64;
// Update chained Vec at row + offset with previous value
hash_map.next[row + offset] = prev_index;
} else {
hash_map.map.insert(
*hash_value,
// store the value + 1 as 0 value reserved for end of list
(*hash_value, (row + offset + 1) as u64),
|(hash, _)| *hash,
);
// chained list at (row + offset) is already initialized with 0
// meaning end of list
}
hash_map.insert(*hash_value, row + offset);
}
Ok(())
}
Expand Down Expand Up @@ -740,11 +716,9 @@ pub fn build_equal_condition_join_indices(
// For every item on the build and probe we check if it matches
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
if let Some((_, index)) = build_hashmap
.map
.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
let mut i = *index - 1;
let index = build_hashmap.get_first_index(*hash_value);
if index != 0 {
let mut i = index - 1;
loop {
build_indices.append(i);
probe_indices.append(row as u32);
Expand Down Expand Up @@ -1348,7 +1322,6 @@ mod tests {
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::Literal;
use hashbrown::raw::RawTable;

use crate::execution::context::SessionConfig;
use crate::physical_expr::expressions::BinaryExpr;
Expand Down Expand Up @@ -2689,7 +2662,6 @@ mod tests {

#[test]
fn join_with_hash_collision() -> Result<()> {
let mut hashmap_left = RawTable::with_capacity(2);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
Expand All @@ -2701,25 +2673,19 @@ mod tests {
let hashes =
create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;

// Create hash collisions (same hashes)
hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
let mut join_hash_map = JoinHashMap::with_capacity(2, left.num_rows());
for (row, hash_value) in hashes.iter().enumerate() {
join_hash_map.insert(*hash_value, row);
}

let next = vec![2, 0];
let left_data = (join_hash_map, left);

let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);

let left_data = (
JoinHashMap {
map: hashmap_left,
next,
},
left,
);
let (l, r) = build_equal_condition_join_indices(
&left_data.0,
&left_data.1,
Expand Down
77 changes: 61 additions & 16 deletions datafusion/core/src/physical_plan/joins/hash_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,79 @@ use datafusion_common::Result;
// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1)
// ---------------------

// TODO: speed up collision checks
// https://github.com/apache/arrow-datafusion/issues/50
#[cfg(not(test))]
const MIN_JOIN_HASH_MAP_LEN: usize = 1024;

pub struct JoinHashMap {
// Stores hash value to first index
pub map: RawTable<(u64, u64)>,
// Stores the first index for a bucket
buckets: Vec<u64>,
// Stores indices in chained list data structure
pub next: Vec<u64>,
// Stores the bucket mask for quickly finding a bucket for a hash value
bucket_mask: usize,
}

/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate
/// and shrink the indices.
pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);

impl JoinHashMap {
pub(crate) fn with_capacity(capacity: usize) -> Self {
#[cfg(not(test))]
pub(crate) fn with_capacity(bucket_capacity: usize, capacity: usize) -> Self {
let mut bucket_capacity = bucket_capacity.next_power_of_two();
if bucket_capacity < MIN_JOIN_HASH_MAP_LEN {
bucket_capacity = MIN_JOIN_HASH_MAP_LEN;
}
let bucket_mask = bucket_capacity - 1;
JoinHashMap {
map: RawTable::with_capacity(capacity),
buckets: vec![0; bucket_capacity],
next: vec![0; capacity],
bucket_mask,
}
}

#[cfg(test)]
pub(crate) fn with_capacity(bucket_capacity: usize, capacity: usize) -> Self {
assert!(bucket_capacity > 0);
let bucket_capacity = bucket_capacity.next_power_of_two();
let bucket_mask = bucket_capacity - 1;
JoinHashMap {
buckets: vec![0; bucket_capacity],
next: vec![0; capacity],
bucket_mask,
}
}

#[inline]
pub(crate) fn insert(&mut self, hash_value: u64, index: usize) {
let bucket_index = self.bucket_mask & (hash_value as usize);
// We are sure the `bucket_index` is legal
unsafe {
let index_ref = self.buckets.get_unchecked_mut(bucket_index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can limit the unsafe to:

Suggested change
let index_ref = self.buckets.get_unchecked_mut(bucket_index);
let index_ref = unsafe {self.buckets.get_unchecked_mut(bucket_index); }

let prev_index = *index_ref;
// chained list at index is already initialized with 0
// 0 meaning end of list
*index_ref = index as u64 + 1;
if prev_index != 0 {
// Update chained Vec at index with previous value
self.next[index] = prev_index;
}
}
}

#[inline]
pub(crate) fn get_first_index(&self, hash_value: u64) -> u64 {
let bucket_index = self.bucket_mask & (hash_value as usize);
unsafe { *self.buckets.get_unchecked(bucket_index) }
}
}

impl fmt::Debug for JoinHashMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
Ok(())
}
}

/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate
/// and shrink the indices.
pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);

impl SymmetricJoinHashMap {
pub(crate) fn with_capacity(capacity: usize) -> Self {
Self(RawTable::with_capacity(capacity))
Expand Down Expand Up @@ -138,12 +189,6 @@ impl SymmetricJoinHashMap {
}
}

impl fmt::Debug for JoinHashMap {
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
Ok(())
}
}

fn check_filter_expr_contains_sort_information(
expr: &Arc<dyn PhysicalExpr>,
reference: &Arc<dyn PhysicalExpr>,
Expand Down