Skip to content

Commit f690940

Browse files
committed
support block approach for GroupValuesPrimitive.
1 parent d6c33ff commit f690940

File tree

4 files changed

+174
-82
lines changed

4 files changed

+174
-82
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-plan/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ datafusion-common = { workspace = true, default-features = true }
5151
datafusion-common-runtime = { workspace = true, default-features = true }
5252
datafusion-execution = { workspace = true }
5353
datafusion-expr = { workspace = true }
54+
datafusion-functions-aggregate-common = { workspace = true }
5455
datafusion-functions-window-common = { workspace = true }
5556
datafusion-physical-expr = { workspace = true, default-features = true }
5657
datafusion-physical-expr-common = { workspace = true }

datafusion/physical-plan/src/aggregates/group_values/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow::array::types::{
2424
};
2525
use arrow::array::{downcast_primitive, ArrayRef, RecordBatch};
2626
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
27-
use datafusion_common::Result;
27+
use datafusion_common::{DataFusionError, Result};
2828

2929
use datafusion_expr::EmitTo;
3030

datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs

+171-81
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ use arrow::record_batch::RecordBatch;
2727
use datafusion_common::Result;
2828
use datafusion_execution::memory_pool::proxy::VecAllocExt;
2929
use datafusion_expr::EmitTo;
30+
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{
31+
BlockedGroupIndexOperations, FlatGroupIndexOperations, GroupIndexOperations,
32+
};
3033
use half::f16;
3134
use hashbrown::hash_table::HashTable;
35+
use std::collections::VecDeque;
3236
use std::mem::size_of;
3337
use std::sync::Arc;
3438

@@ -81,17 +85,31 @@ hash_float!(f16, f32, f64);
8185
pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
8286
/// The data type of the output array
8387
data_type: DataType,
88+
8489
/// Stores the group index based on the hash of its value
8590
///
8691
/// We don't store the hashes as hashing fixed width primitives
8792
/// is fast enough for this not to benefit performance
88-
map: HashTable<usize>,
93+
map: HashTable<u64>,
94+
8995
/// The group index of the null value if any
90-
null_group: Option<usize>,
96+
null_group: Option<u64>,
97+
9198
/// The values for each group index
92-
values: Vec<T::Native>,
99+
values: VecDeque<Vec<T::Native>>,
100+
93101
/// The random state used to generate hashes
94102
random_state: RandomState,
103+
104+
/// Block size of current `GroupValues` if exist:
105+
/// - If `None`, it means block optimization is disabled,
106+
/// all `group values`` will be stored in a single `Vec`
107+
///
108+
/// - If `Some(blk_size)`, it means block optimization is enabled,
109+
/// `group values` will be stored in multiple `Vec`s, and each
110+
/// `Vec` if of `blk_size` len, and we call it a `block`
111+
///
112+
block_size: Option<usize>,
95113
}
96114

97115
impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T> {
@@ -100,9 +118,10 @@ impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T> {
100118
Self {
101119
data_type,
102120
map: HashTable::with_capacity(128),
103-
values: Vec::with_capacity(128),
121+
values: VecDeque::new(),
104122
null_group: None,
105123
random_state: Default::default(),
124+
block_size: None,
106125
}
107126
}
108127
}
@@ -112,43 +131,30 @@ where
112131
T::Native: HashValue,
113132
{
114133
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
115-
assert_eq!(cols.len(), 1);
116-
groups.clear();
117-
118-
for v in cols[0].as_primitive::<T>() {
119-
let group_id = match v {
120-
None => *self.null_group.get_or_insert_with(|| {
121-
let group_id = self.values.len();
122-
self.values.push(Default::default());
123-
group_id
124-
}),
125-
Some(key) => {
126-
let state = &self.random_state;
127-
let hash = key.hash(state);
128-
let insert = self.map.entry(
129-
hash,
130-
|g| unsafe { self.values.get_unchecked(*g).is_eq(key) },
131-
|g| unsafe { self.values.get_unchecked(*g).hash(state) },
132-
);
133-
134-
match insert {
135-
hashbrown::hash_table::Entry::Occupied(o) => *o.get(),
136-
hashbrown::hash_table::Entry::Vacant(v) => {
137-
let g = self.values.len();
138-
v.insert(g);
139-
self.values.push(key);
140-
g
141-
}
142-
}
134+
if let Some(block_size) = self.block_size {
135+
let before_add_group = |group_values: &mut VecDeque<Vec<T::Native>>| {
136+
if group_values.back().unwrap().len() == block_size {
137+
let new_block = Vec::with_capacity(block_size);
138+
group_values.push_back(new_block);
143139
}
144140
};
145-
groups.push(group_id)
141+
self.get_or_create_groups::<_, BlockedGroupIndexOperations>(
142+
cols,
143+
groups,
144+
before_add_group,
145+
)
146+
} else {
147+
self.get_or_create_groups::<_, FlatGroupIndexOperations>(
148+
cols,
149+
groups,
150+
|_: &mut VecDeque<Vec<T::Native>>| {},
151+
)
146152
}
147-
Ok(())
148153
}
149154

150155
fn size(&self) -> usize {
151-
self.map.capacity() * size_of::<usize>() + self.values.allocated_size()
156+
todo!()
157+
// self.map.capacity() * size_of::<usize>() + self.values.len
152158
}
153159

154160
fn is_empty(&self) -> bool {
@@ -160,54 +166,55 @@ where
160166
}
161167

162168
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
163-
fn build_primitive<T: ArrowPrimitiveType>(
164-
values: Vec<T::Native>,
165-
null_idx: Option<usize>,
166-
) -> PrimitiveArray<T> {
167-
let nulls = null_idx.map(|null_idx| {
168-
let mut buffer = NullBufferBuilder::new(values.len());
169-
buffer.append_n_non_nulls(null_idx);
170-
buffer.append_null();
171-
buffer.append_n_non_nulls(values.len() - null_idx - 1);
172-
// NOTE: The inner builder must be constructed as there is at least one null
173-
buffer.finish().unwrap()
174-
});
175-
PrimitiveArray::<T>::new(values.into(), nulls)
176-
}
169+
todo!()
170+
// fn build_primitive<T: ArrowPrimitiveType>(
171+
// values: Vec<T::Native>,
172+
// null_idx: Option<usize>,
173+
// ) -> PrimitiveArray<T> {
174+
// let nulls = null_idx.map(|null_idx| {
175+
// let mut buffer = NullBufferBuilder::new(values.len());
176+
// buffer.append_n_non_nulls(null_idx);
177+
// buffer.append_null();
178+
// buffer.append_n_non_nulls(values.len() - null_idx - 1);
179+
// // NOTE: The inner builder must be constructed as there is at least one null
180+
// buffer.finish().unwrap()
181+
// });
182+
// PrimitiveArray::<T>::new(values.into(), nulls)
183+
// }
177184

178-
let array: PrimitiveArray<T> = match emit_to {
179-
EmitTo::All => {
180-
self.map.clear();
181-
build_primitive(std::mem::take(&mut self.values), self.null_group.take())
182-
}
183-
EmitTo::First(n) => {
184-
self.map.retain(|group_idx| {
185-
// Decrement group index by n
186-
match group_idx.checked_sub(n) {
187-
// Group index was >= n, shift value down
188-
Some(sub) => {
189-
*group_idx = sub;
190-
true
191-
}
192-
// Group index was < n, so remove from table
193-
None => false,
194-
}
195-
});
196-
let null_group = match &mut self.null_group {
197-
Some(v) if *v >= n => {
198-
*v -= n;
199-
None
200-
}
201-
Some(_) => self.null_group.take(),
202-
None => None,
203-
};
204-
let mut split = self.values.split_off(n);
205-
std::mem::swap(&mut self.values, &mut split);
206-
build_primitive(split, null_group)
207-
}
208-
};
185+
// let array: PrimitiveArray<T> = match emit_to {
186+
// EmitTo::All => {
187+
// self.map.clear();
188+
// build_primitive(std::mem::take(&mut self.values), self.null_group.take())
189+
// }
190+
// EmitTo::First(n) => {
191+
// self.map.retain(|group_idx| {
192+
// // Decrement group index by n
193+
// match group_idx.checked_sub(n) {
194+
// // Group index was >= n, shift value down
195+
// Some(sub) => {
196+
// *group_idx = sub;
197+
// true
198+
// }
199+
// // Group index was < n, so remove from table
200+
// None => false,
201+
// }
202+
// });
203+
// let null_group = match &mut self.null_group {
204+
// Some(v) if *v >= n => {
205+
// *v -= n;
206+
// None
207+
// }
208+
// Some(_) => self.null_group.take(),
209+
// None => None,
210+
// };
211+
// let mut split = self.values.split_off(n);
212+
// std::mem::swap(&mut self.values, &mut split);
213+
// build_primitive(split, null_group)
214+
// }
215+
// };
209216

210-
Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))])
217+
// Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))])
211218
}
212219

213220
fn clear_shrink(&mut self, batch: &RecordBatch) {
@@ -218,3 +225,86 @@ where
218225
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
219226
}
220227
}
228+
229+
impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T>
230+
where
231+
T::Native: HashValue,
232+
{
233+
fn get_or_create_groups<F, O>(
234+
&mut self,
235+
cols: &[ArrayRef],
236+
groups: &mut Vec<usize>,
237+
mut before_add_group: F,
238+
) -> Result<()>
239+
where
240+
F: FnMut(&mut VecDeque<Vec<T::Native>>),
241+
O: GroupIndexOperations,
242+
{
243+
assert_eq!(cols.len(), 1);
244+
groups.clear();
245+
246+
for v in cols[0].as_primitive::<T>() {
247+
let group_index = match v {
248+
None => *self.null_group.get_or_insert_with(|| {
249+
// actions before add new group like checking if room is enough
250+
before_add_group(&mut self.values);
251+
252+
// get block infos and update block
253+
let block_id = self.values.len() as u32;
254+
let current_block = self.values.back_mut().unwrap();
255+
let block_offset = current_block.len() as u64;
256+
current_block.push(Default::default());
257+
258+
// get group index and finish actions needed it
259+
O::pack_index(block_id, block_offset)
260+
}),
261+
Some(key) => {
262+
let state = &self.random_state;
263+
let hash = key.hash(state);
264+
let insert = self.map.entry(
265+
hash,
266+
|g| unsafe {
267+
let block_id = O::get_block_id(*g);
268+
let block_offset = O::get_block_offset(*g);
269+
self.values
270+
.get(block_id as usize)
271+
.unwrap()
272+
.get_unchecked(block_offset as usize)
273+
.is_eq(key)
274+
},
275+
|g| unsafe {
276+
let block_id = O::get_block_id(*g);
277+
let block_offset = O::get_block_offset(*g);
278+
self.values
279+
.get(block_id as usize)
280+
.unwrap()
281+
.get_unchecked(block_offset as usize)
282+
.hash(state)
283+
},
284+
);
285+
286+
match insert {
287+
hashbrown::hash_table::Entry::Occupied(o) => *o.get(),
288+
hashbrown::hash_table::Entry::Vacant(v) => {
289+
// actions before add new group like checking if room is enough
290+
before_add_group(&mut self.values);
291+
292+
// get block infos and update block
293+
let block_id = self.values.len() as u32;
294+
let current_block = self.values.back_mut().unwrap();
295+
let block_offset = current_block.len() as u64;
296+
current_block.push(key);
297+
298+
// get group index and finish actions needed it
299+
let packed_index = O::pack_index(block_id, block_offset);
300+
v.insert(packed_index);
301+
packed_index
302+
}
303+
}
304+
}
305+
};
306+
groups.push(group_index as usize)
307+
}
308+
Ok(())
309+
}
310+
}

0 commit comments

Comments
 (0)