Skip to content

Commit 34f5fa2

Browse files
committed
extract the common codes of block for reusing.
1 parent f60f9e2 commit 34f5fa2

File tree

2 files changed

+107
-58
lines changed

2 files changed

+107
-58
lines changed

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs

+84
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod bool_op;
2323
pub mod nulls;
2424
pub mod prim_op;
2525

26+
use std::collections::VecDeque;
2627
use std::fmt::Debug;
2728
use std::mem::{size_of, size_of_val};
2829

@@ -509,6 +510,10 @@ pub(crate) fn slice_and_maybe_filter(
509510
}
510511
}
511512

513+
// ===============================================
514+
// Useful tools for group index
515+
// ===============================================
516+
512517
/// Blocked style group index used in blocked mode group values and accumulators
513518
/// - High 32 bits represent `block_id`
514519
/// - Low 32 bits represent `block_offset`
@@ -554,3 +559,82 @@ impl GroupIndexOperations for FlatGroupIndexOperations {
554559
packed_index
555560
}
556561
}
562+
563+
// ===============================================
564+
// Useful tools for block
565+
// ===============================================
566+
pub(crate) fn ensure_room_enough_for_blocks<B, F>(
567+
blocks: &mut VecDeque<B>,
568+
total_num_groups: usize,
569+
block_size: usize,
570+
new_block: F,
571+
default_value: B::T,
572+
) where
573+
B: Block,
574+
F: Fn(usize) -> B,
575+
{
576+
// For resize, we need to:
577+
// 1. Ensure the blks are enough first
578+
// 2. and then ensure slots in blks are enough
579+
let (mut cur_blk_idx, exist_slots) = if blocks.len() > 0 {
580+
let cur_blk_idx = blocks.len() - 1;
581+
let exist_slots = (blocks.len() - 1) * block_size + blocks.back().unwrap().len();
582+
583+
(cur_blk_idx, exist_slots)
584+
} else {
585+
(0, 0)
586+
};
587+
588+
// No new groups, don't need to expand, just return
589+
if exist_slots >= total_num_groups {
590+
return;
591+
}
592+
593+
// 1. Ensure blks are enough
594+
let exist_blks = blocks.len();
595+
let new_blks = ((total_num_groups + block_size - 1) / block_size) - exist_blks;
596+
if new_blks > 0 {
597+
for _ in 0..new_blks {
598+
let block = new_block(block_size);
599+
blocks.push_back(block);
600+
}
601+
}
602+
603+
// 2. Ensure slots are enough
604+
let mut new_slots = total_num_groups - exist_slots;
605+
606+
// 2.1 Only fill current blk if it may be already enough
607+
let cur_blk_rest_slots = block_size - blocks[cur_blk_idx].len();
608+
if cur_blk_rest_slots >= new_slots {
609+
blocks[cur_blk_idx].fill_default_value(new_slots, default_value.clone());
610+
return;
611+
}
612+
613+
// 2.2 Fill current blk to full
614+
blocks[cur_blk_idx].fill_default_value(cur_blk_rest_slots, default_value.clone());
615+
new_slots -= cur_blk_rest_slots;
616+
617+
// 2.3 Fill complete blks
618+
let complete_blks = new_slots / block_size;
619+
for _ in 0..complete_blks {
620+
cur_blk_idx += 1;
621+
blocks[cur_blk_idx].fill_default_value(block_size, default_value.clone());
622+
}
623+
624+
// 2.4 Fill last blk if needed
625+
let rest_slots = new_slots % block_size;
626+
if rest_slots > 0 {
627+
blocks
628+
.back_mut()
629+
.unwrap()
630+
.fill_default_value(rest_slots, default_value);
631+
}
632+
}
633+
634+
pub(crate) trait Block {
635+
type T: Clone;
636+
637+
fn len(&self) -> usize;
638+
639+
fn fill_default_value(&mut self, fill_len: usize, default_value: Self::T);
640+
}

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs

+23-58
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ use arrow::datatypes::ArrowPrimitiveType;
3030
use datafusion_expr_common::groups_accumulator::EmitTo;
3131

3232
use crate::aggregate::groups_accumulator::{
33-
BlockedGroupIndexOperations, FlatGroupIndexOperations, GroupIndexOperations,
33+
ensure_room_enough_for_blocks, Block, BlockedGroupIndexOperations,
34+
FlatGroupIndexOperations, GroupIndexOperations,
3435
};
3536

3637
/// Track the accumulator null state per row: if any values for that
@@ -347,67 +348,31 @@ impl BlockedSeenValues {
347348
}
348349
}
349350

350-
impl SeenValues for BlockedSeenValues {
351-
fn resize(&mut self, total_num_groups: usize, default_value: bool) {
352-
let block_size = self.block_size;
353-
let blocked_builder = &mut self.blocked_builders;
354-
355-
// For resize, we need to:
356-
// 1. Ensure the blks are enough first
357-
// 2. and then ensure slots in blks are enough
358-
let (mut cur_blk_idx, exist_slots) = if blocked_builder.len() > 0 {
359-
let cur_blk_idx = blocked_builder.len() - 1;
360-
let exist_slots = (blocked_builder.len() - 1) * block_size
361-
+ blocked_builder.back().unwrap().len();
362-
363-
(cur_blk_idx, exist_slots)
364-
} else {
365-
(0, 0)
366-
};
367-
368-
// No new groups, don't need to expand, just return
369-
if exist_slots >= total_num_groups {
370-
return;
371-
}
372-
373-
// 1. Ensure blks are enough
374-
let exist_blks = blocked_builder.len();
375-
let new_blks = ((total_num_groups + block_size - 1) / block_size) - exist_blks;
376-
if new_blks > 0 {
377-
for _ in 0..new_blks {
378-
blocked_builder.push_back(BooleanBufferBuilder::new(block_size));
379-
}
380-
}
351+
impl Block for BooleanBufferBuilder {
352+
type T = bool;
381353

382-
// 2. Ensure slots are enough
383-
let mut new_slots = total_num_groups - exist_slots;
384-
385-
// 2.1 Only fill current blk if it may be already enough
386-
let cur_blk_rest_slots = block_size - blocked_builder[cur_blk_idx].len();
387-
if cur_blk_rest_slots >= new_slots {
388-
blocked_builder[cur_blk_idx].append_n(new_slots, default_value);
389-
return;
390-
}
354+
fn len(&self) -> usize {
355+
self.len()
356+
}
391357

392-
// 2.2 Fill current blk to full
393-
blocked_builder[cur_blk_idx].append_n(cur_blk_rest_slots, default_value);
394-
new_slots -= cur_blk_rest_slots;
358+
fn fill_default_value(&mut self, fill_len: usize, default_value: Self::T) {
359+
self.append_n(fill_len, default_value);
360+
}
361+
}
395362

396-
// 2.3 Fill complete blks
397-
let complete_blks = new_slots / block_size;
398-
for _ in 0..complete_blks {
399-
cur_blk_idx += 1;
400-
blocked_builder[cur_blk_idx].append_n(block_size, default_value);
401-
}
363+
impl SeenValues for BlockedSeenValues {
364+
fn resize(&mut self, total_num_groups: usize, default_value: bool) {
365+
let block_size = self.block_size;
366+
let blocked_builders = &mut self.blocked_builders;
367+
let new_block = |block_size: usize| BooleanBufferBuilder::new(block_size);
402368

403-
// 2.4 Fill last blk if needed
404-
let rest_slots = new_slots % block_size;
405-
if rest_slots > 0 {
406-
blocked_builder
407-
.back_mut()
408-
.unwrap()
409-
.append_n(rest_slots, default_value);
410-
}
369+
ensure_room_enough_for_blocks(
370+
blocked_builders,
371+
total_num_groups,
372+
block_size,
373+
new_block,
374+
default_value,
375+
);
411376
}
412377

413378
fn set_bit(&mut self, block_id: u32, block_offset: u64, value: bool) {

0 commit comments

Comments
 (0)