Skip to content

Commit 2863809

Browse files
committed
support blocked mode for PrimitiveGroupsAccumulator.
1 parent 34f5fa2 commit 2863809

File tree

2 files changed

+79
-13
lines changed

2 files changed

+79
-13
lines changed

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
//! [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator
2121
2222
use std::collections::VecDeque;
23-
use std::fmt::{self, Debug};
23+
use std::fmt::Debug;
2424
use std::marker::PhantomData;
2525

2626
use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray};
@@ -401,6 +401,7 @@ impl SeenValues for BlockedSeenValues {
401401

402402
/// Adapter for supporting dynamic dispatching of [`FlatNullState`] and [`BlockedNullState`].
403403
/// For performance, the cost of batch-level dynamic dispatching is acceptable.
404+
#[derive(Debug)]
404405
pub enum NullStateAdapter {
405406
Flat(FlatNullState),
406407
Blocked(BlockedNullState),
@@ -479,6 +480,13 @@ impl NullStateAdapter {
479480
}
480481
}
481482

483+
pub fn size(&self) -> usize {
484+
match self {
485+
NullStateAdapter::Flat(null_state) => null_state.size(),
486+
NullStateAdapter::Blocked(null_state) => null_state.size(),
487+
}
488+
}
489+
482490
/// Clone and build a single [`BooleanBuffer`] from `seen_values`,
483491
/// only used for testing.
484492
#[cfg(test)]

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs

+70-12
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,21 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::VecDeque;
19+
use std::iter;
1820
use std::mem::size_of;
1921
use std::sync::Arc;
2022

21-
use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray};
23+
use arrow::array::{ArrayRef, ArrowNativeTypeOp, AsArray, BooleanArray, PrimitiveArray};
2224
use arrow::buffer::NullBuffer;
2325
use arrow::compute;
2426
use arrow::datatypes::ArrowPrimitiveType;
2527
use arrow::datatypes::DataType;
2628
use datafusion_common::{internal_datafusion_err, DataFusionError, Result};
2729
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
2830

29-
use super::accumulate::FlatNullState;
31+
use crate::aggregate::groups_accumulator::accumulate::NullStateAdapter;
32+
use crate::aggregate::groups_accumulator::{ensure_room_enough_for_blocks, Block};
3033

3134
/// An accumulator that implements a single operation over
3235
/// [`ArrowPrimitiveType`] where the accumulated state is the same as
@@ -43,8 +46,8 @@ where
4346
T: ArrowPrimitiveType + Send,
4447
F: Fn(&mut T::Native, T::Native) + Send + Sync,
4548
{
46-
/// values per group, stored as the native type
47-
values: Vec<T::Native>,
49+
/// Values per group, stored as the native type
50+
values: VecDeque<Vec<T::Native>>,
4851

4952
/// The output type (needed for Decimal precision and scale)
5053
data_type: DataType,
@@ -53,10 +56,20 @@ where
5356
starting_value: T::Native,
5457

5558
/// Track nulls in the input / filters
56-
null_state: FlatNullState,
59+
null_state: NullStateAdapter,
5760

5861
/// Function that computes the primitive result
5962
prim_fn: F,
63+
64+
/// Block size of current `GroupAccumulator` if exist:
65+
/// - If `None`, it means block optimization is disabled,
66+
/// all `group values`` will be stored in a single `Vec`
67+
///
68+
/// - If `Some(blk_size)`, it means block optimization is enabled,
69+
/// `group values` will be stored in multiple `Vec`s, and each
70+
/// `Vec` if of `blk_size` len, and we call it a `block`
71+
///
72+
block_size: Option<usize>,
6073
}
6174

6275
impl<T, F> PrimitiveGroupsAccumulator<T, F>
@@ -66,11 +79,12 @@ where
6679
{
6780
pub fn new(data_type: &DataType, prim_fn: F) -> Self {
6881
Self {
69-
values: vec![],
82+
values: VecDeque::new(),
7083
data_type: data_type.clone(),
71-
null_state: FlatNullState::new(),
84+
null_state: NullStateAdapter::new(None),
7285
starting_value: T::default_value(),
7386
prim_fn,
87+
block_size: None,
7488
}
7589
}
7690

@@ -96,17 +110,37 @@ where
96110
assert_eq!(values.len(), 1, "single argument to update_batch");
97111
let values = values[0].as_primitive::<T>();
98112

99-
// update values
100-
self.values.resize(total_num_groups, self.starting_value);
113+
// Expand to ensure values are large enough
114+
if let Some(blk_size) = self.block_size {
115+
// Expand blocks in `blocked mode`
116+
let new_block = |block_size: usize| Vec::with_capacity(block_size);
117+
ensure_room_enough_for_blocks(
118+
&mut self.values,
119+
total_num_groups,
120+
blk_size,
121+
new_block,
122+
self.starting_value,
123+
);
124+
} else {
125+
// Expand the single block in `flat mode`
126+
if self.values.is_empty() {
127+
self.values.push_back(Vec::new());
128+
}
129+
130+
self.values
131+
.back_mut()
132+
.unwrap()
133+
.resize(total_num_groups, self.starting_value);
134+
}
101135

102136
// NullState dispatches / handles tracking nulls and groups that saw no values
103137
self.null_state.accumulate(
104138
group_indices,
105139
values,
106140
opt_filter,
107141
total_num_groups,
108-
|_, group_index, new_value| {
109-
let value = &mut self.values[group_index as usize];
142+
|block_id, block_offset, new_value| {
143+
let value = &mut self.values[block_id as usize][block_offset as usize];
110144
(self.prim_fn)(value, new_value);
111145
},
112146
);
@@ -115,7 +149,7 @@ where
115149
}
116150

117151
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
118-
let values = emit_to.take_needed_rows(&mut self.values);
152+
let values = emit_to.take_needed(&mut self.values, self.block_size.is_some());
119153
let nulls = self.null_state.build(emit_to);
120154
let values = PrimitiveArray::<T>::new(values.into(), Some(nulls)) // no copy
121155
.with_data_type(self.data_type.clone());
@@ -198,4 +232,28 @@ where
198232
fn size(&self) -> usize {
199233
self.values.capacity() * size_of::<T::Native>() + self.null_state.size()
200234
}
235+
236+
fn supports_blocked_groups(&self) -> bool {
237+
true
238+
}
239+
240+
fn alter_block_size(&mut self, block_size: Option<usize>) -> Result<()> {
241+
self.values.clear();
242+
self.null_state = NullStateAdapter::new(block_size);
243+
self.block_size = block_size;
244+
245+
Ok(())
246+
}
247+
}
248+
249+
impl<N: ArrowNativeTypeOp> Block for Vec<N> {
250+
type T = N;
251+
252+
fn len(&self) -> usize {
253+
self.len()
254+
}
255+
256+
fn fill_default_value(&mut self, fill_len: usize, default_value: Self::T) {
257+
self.extend(iter::repeat(default_value.clone()).take(fill_len));
258+
}
201259
}

0 commit comments

Comments
 (0)