Skip to content

Commit d41f99e

Browse files
committed
new poc1.
1 parent aaeea80 commit d41f99e

File tree

1 file changed

+57
-2
lines changed
  • datafusion/functions-aggregate-common/src/aggregate/groups_accumulator

1 file changed

+57
-2
lines changed

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs

+57-2
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ pub struct NullState<O: GroupIndexOperations> {
6969
/// pass the filter yet for group `i`
7070
seen_values: Blocks<BooleanBufferBuilderWrapper>,
7171

72+
block_indices_buffer: Vec<(u32, u32)>,
73+
74+
row_selection: Vec<bool>,
75+
7276
/// phantom data for required type `<O>`
7377
_phantom: PhantomData<O>,
7478
}
@@ -120,6 +124,53 @@ impl<O: GroupIndexOperations> NullState<O> {
120124
});
121125
}
122126

127+
pub fn accumulate_blocks<T, F>(
128+
&mut self,
129+
group_indices: &[usize],
130+
values: &PrimitiveArray<T>,
131+
opt_filter: Option<&BooleanArray>,
132+
total_num_groups: usize,
133+
mut value_fn: F,
134+
) where
135+
T: ArrowPrimitiveType + Send,
136+
F: FnMut(u32, u64, T::Native) + Send,
137+
{
138+
// ensure the seen_values is big enough (start everything at
139+
// "not seen" valid)
140+
self.seen_values.resize(total_num_groups, false);
141+
let seen_values = &mut self.seen_values;
142+
143+
// Parse indices
144+
self.block_indices_buffer.clear();
145+
for packed_index in group_indices.iter() {
146+
let packed_index = *packed_index as u64;
147+
let block_id = (packed_index >> 32) as u32;
148+
let block_offset = packed_index as u32;
149+
self.block_indices_buffer.push((block_id, block_offset));
150+
}
151+
152+
// Get row selection
153+
accumulate_row_selection(values, opt_filter, &mut self.row_selection);
154+
155+
// Accumulate values
156+
let data = values.values().iter();
157+
if self.row_selection.is_empty() {
158+
let iter = self.block_indices_buffer.iter().zip(data);
159+
for (&(block_id, block_offset), &value) in iter {
160+
seen_values.set_bit(block_id, block_offset as u64, true);
161+
value_fn(block_id, block_offset as u64, value);
162+
}
163+
} else {
164+
let iter = self.block_indices_buffer.iter().zip(data);
165+
for (row_idx, (&(block_id, block_offset), &value)) in iter.enumerate() {
166+
if self.row_selection[row_idx] {
167+
seen_values.set_bit(block_id, block_offset as u64, true);
168+
value_fn(block_id, block_offset as u64, value);
169+
}
170+
}
171+
}
172+
}
173+
123174
/// Invokes `value_fn(group_index, value)` for each non null, non
124175
/// filtered value in `values`, while tracking which groups have
125176
/// seen null inputs and which groups have seen any inputs, for
@@ -266,7 +317,7 @@ impl NullStateAdapter {
266317
total_num_groups,
267318
value_fn,
268319
),
269-
NullStateAdapter::Blocked(null_state) => null_state.accumulate(
320+
NullStateAdapter::Blocked(null_state) => null_state.accumulate_blocks(
270321
group_indices,
271322
values,
272323
opt_filter,
@@ -395,6 +446,8 @@ impl Default for FlatNullState {
395446
fn default() -> Self {
396447
Self {
397448
seen_values: Blocks::new(None),
449+
block_indices_buffer: Vec::new(),
450+
row_selection: Vec::new(),
398451
_phantom: PhantomData {},
399452
}
400453
}
@@ -427,6 +480,8 @@ impl BlockedNullState {
427480
pub fn new(block_size: usize) -> Self {
428481
Self {
429482
seen_values: Blocks::new(Some(block_size)),
483+
block_indices_buffer: Vec::new(),
484+
row_selection: Vec::new(),
430485
_phantom: PhantomData {},
431486
}
432487
}
@@ -648,7 +703,7 @@ pub fn accumulate<T, F>(
648703
}
649704
}
650705

651-
pub fn accumulate_row_selection<T, F>(
706+
pub fn accumulate_row_selection<T>(
652707
values: &PrimitiveArray<T>,
653708
opt_filter: Option<&BooleanArray>,
654709
results: &mut Vec<bool>,

0 commit comments

Comments
 (0)