Skip to content

Commit f60f9e2

Browse files
committed
make EmitTo::NextBlock simpler.
1 parent 808e8d7 commit f60f9e2

File tree

11 files changed

+42
-79
lines changed

11 files changed

+42
-79
lines changed

datafusion/expr-common/src/groups_accumulator.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub enum EmitTo {
3636
/// Emit next block in the blocked managed groups
3737
///
3838
/// Similar as `Emit::All`, will also clear all existing group indexes
39-
NextBlock(bool),
39+
NextBlock,
4040
}
4141

4242
impl EmitTo {
@@ -75,7 +75,7 @@ impl EmitTo {
7575
std::mem::swap(v, &mut t);
7676
t
7777
}
78-
Self::NextBlock(_) => unreachable!("don't support take block in take_needed"),
78+
Self::NextBlock => unreachable!("don't support take block in take_needed"),
7979
}
8080
}
8181

@@ -85,7 +85,7 @@ impl EmitTo {
8585
///
8686
fn take_needed_block<T>(&self, blocks: &mut VecDeque<Vec<T>>) -> Vec<T> {
8787
assert!(
88-
matches!(self, Self::NextBlock(_)),
88+
matches!(self, Self::NextBlock),
8989
"only support take block in take_needed_block"
9090
);
9191
blocks

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ impl SeenValues for FlatSeenValues {
303303
}
304304
first_n_null
305305
}
306-
EmitTo::NextBlock(_) => {
306+
EmitTo::NextBlock => {
307307
unreachable!("not support block emit in flat seen values")
308308
}
309309
};
@@ -415,7 +415,7 @@ impl SeenValues for BlockedSeenValues {
415415
}
416416

417417
fn emit(&mut self, emit_to: EmitTo) -> NullBuffer {
418-
assert!(matches!(emit_to, EmitTo::NextBlock(_)));
418+
assert!(matches!(emit_to, EmitTo::NextBlock));
419419

420420
let mut block = self
421421
.blocked_builders
@@ -542,7 +542,7 @@ impl NullStateAdapter {
542542
let mut return_builder = BooleanBufferBuilder::new(0);
543543
let num_blocks = null_state.seen_values.blocked_builders.len();
544544
for _ in 0..num_blocks {
545-
let blocked_nulls = null_state.build(EmitTo::NextBlock(true));
545+
let blocked_nulls = null_state.build(EmitTo::NextBlock);
546546
for bit in blocked_nulls.inner().iter() {
547547
return_builder.append(bit);
548548
}

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ where
118118
}
119119
first_n
120120
}
121-
EmitTo::NextBlock(_) => {
121+
EmitTo::NextBlock => {
122122
unreachable!("this accumulator still not support blocked groups")
123123
}
124124
};

datafusion/functions-aggregate/src/correlation.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
448448
let n = match emit_to {
449449
EmitTo::All => self.count.len(),
450450
EmitTo::First(n) => n,
451-
EmitTo::NextBlock(_) => {
451+
EmitTo::NextBlock => {
452452
unreachable!("this accumulator still not support blocked groups")
453453
}
454454
};
@@ -504,7 +504,7 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
504504
let n = match emit_to {
505505
EmitTo::All => self.count.len(),
506506
EmitTo::First(n) => n,
507-
EmitTo::NextBlock(_) => {
507+
EmitTo::NextBlock => {
508508
unreachable!("this accumulator still not support blocked groups")
509509
}
510510
};

datafusion/functions-aggregate/src/first_last.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ where
404404
self.size_of_orderings -=
405405
result.iter().map(ScalarValue::size_of_vec).sum::<usize>()
406406
}
407-
EmitTo::NextBlock(_) => {
407+
EmitTo::NextBlock => {
408408
unreachable!("this accumulator still not support blocked groups")
409409
}
410410
}
@@ -431,7 +431,7 @@ where
431431
}
432432
first_n
433433
}
434-
EmitTo::NextBlock(_) => {
434+
EmitTo::NextBlock => {
435435
unreachable!("this group values still not support blocked groups")
436436
}
437437
}

datafusion/functions-aggregate/src/min_max/min_max_bytes.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ impl MinMaxBytesState {
511511
self.total_data_bytes -= first_data_capacity;
512512
(first_data_capacity, first_min_maxes)
513513
}
514-
EmitTo::NextBlock(_) => {
514+
EmitTo::NextBlock => {
515515
unreachable!("this accumulator still not support blocked groups")
516516
}
517517
}

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1154,7 +1154,7 @@ impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
11541154

11551155
output
11561156
}
1157-
EmitTo::NextBlock(_) => {
1157+
EmitTo::NextBlock => {
11581158
unreachable!("this group values still not support blocked groups")
11591159
}
11601160
};

datafusion/physical-plan/src/aggregates/group_values/row.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ impl GroupValues for GroupValuesRows {
229229
});
230230
output
231231
}
232-
EmitTo::NextBlock(_) => {
232+
EmitTo::NextBlock => {
233233
unreachable!("this group values still not support blocked groups")
234234
}
235235
};

datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
116116

117117
emit_group_values
118118
}
119-
EmitTo::NextBlock(_) => {
119+
EmitTo::NextBlock => {
120120
unreachable!("this group values still not support blocked groups")
121121
}
122122
};

datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl GroupValues for GroupValuesBytesView {
117117

118118
emit_group_values
119119
}
120-
EmitTo::NextBlock(_) => {
120+
EmitTo::NextBlock => {
121121
unreachable!("this group values still not support blocked groups")
122122
}
123123
};

datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs

+26-63
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
115115
impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T> {
116116
pub fn new(data_type: DataType) -> Self {
117117
assert!(PrimitiveArray::<T>::is_compatible(&data_type));
118+
119+
// As a optimization, we ensure the `single block` always exist
120+
// in flat mode, it can eliminate an expansive row-level empty checking
118121
let mut values = VecDeque::new();
119122
values.push_back(Vec::new());
120123

@@ -245,74 +248,15 @@ where
245248
// ===============================================
246249
// Emitting in blocked mode
247250
// ===============================================
248-
// TODO: we should consider if it is necessary to support indices modifying
249-
// in `EmitTo::NextBlock`. It is only used in spilling case, maybe we can
250-
// always emit all in blocked mode. So, we just need to clear the map rather
251-
// than doing expansive modification for each buck in it.
252-
EmitTo::NextBlock(true) => {
251+
EmitTo::NextBlock => {
253252
assert!(
254253
self.block_size.is_some(),
255254
"only support EmitTo::Next in blocked group values"
256255
);
257256

258-
// We only emit the first block(`block_id == 0`),
259-
// so erase the entries with `block_id == 0`, and decrease entries with `block_id > 0`
260-
self.map.retain(|packed_idx| {
261-
let old_blk_id =
262-
BlockedGroupIndexOperations::get_block_id(*packed_idx);
263-
match old_blk_id.checked_sub(1) {
264-
// `block_id > 0`, shift `block_id` down
265-
Some(new_blk_id) => {
266-
let blk_offset =
267-
BlockedGroupIndexOperations::get_block_offset(
268-
*packed_idx,
269-
);
270-
let new_packed_idx = BlockedGroupIndexOperations::pack_index(
271-
new_blk_id as u32,
272-
blk_offset,
273-
);
274-
*packed_idx = new_packed_idx;
275-
276-
true
277-
}
278-
279-
// `block_id == 0`, so remove from table
280-
None => false,
281-
}
282-
});
283-
284-
// Similar as `non-nulls`, if `block_id > 0` we decrease, and if `block_id == 0` we erase
285-
let null_block_pair_opt = self.null_group.map(|packed_idx| {
286-
(
287-
BlockedGroupIndexOperations::get_block_id(packed_idx),
288-
BlockedGroupIndexOperations::get_block_offset(packed_idx),
289-
)
290-
});
291-
let null_idx = match null_block_pair_opt {
292-
Some((blk_id, blk_offset)) if blk_id > 0 => {
293-
let new_blk_id = blk_id - 1;
294-
let new_packed_idx = BlockedGroupIndexOperations::pack_index(
295-
new_blk_id, blk_offset,
296-
);
297-
self.null_group = Some(new_packed_idx);
298-
None
299-
}
300-
Some((_, blk_offset)) => {
301-
self.null_group = None;
302-
Some(blk_offset as usize)
303-
}
304-
None => None,
305-
};
306-
307-
let emit_blk = self.values.pop_front().unwrap();
308-
build_primitive(emit_blk, null_idx)
309-
}
310-
311-
EmitTo::NextBlock(false) => {
312-
assert!(
313-
self.block_size.is_some(),
314-
"only support EmitTo::Next in blocked group values"
315-
);
257+
// Similar as `EmitTo:All`, we will clear the old index infos both
258+
// in `map` and `null_group`
259+
self.map.clear();
316260

317261
let null_block_pair_opt = self.null_group.map(|packed_idx| {
318262
(
@@ -359,6 +303,25 @@ where
359303
self.map.clear();
360304
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
361305
}
306+
307+
fn supports_blocked_groups(&self) -> bool {
308+
true
309+
}
310+
311+
fn alter_block_size(&mut self, block_size: Option<usize>) -> Result<()> {
312+
self.map.clear();
313+
self.values.clear();
314+
self.null_group = None;
315+
self.block_size = block_size;
316+
317+
// As mentioned above, we ensure the `single block` always exist
318+
// in `flat mode`
319+
if block_size.is_none() {
320+
self.values.push_back(Vec::new());
321+
}
322+
323+
Ok(())
324+
}
362325
}
363326

364327
impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T>

0 commit comments

Comments
 (0)