Skip to content

refactor: Generate GroupByHash output in multiple RecordBatches #9818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) enum ExecutionState {
ReadingInput,
/// When producing output, the remaining rows to output are stored
/// here and are sliced off as needed in batch_size chunks
Comment on lines 61 to 62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might wanna update this comment

ProducingOutput(RecordBatch),
ProducingOutput(Vec<RecordBatch>),
Done,
}

Expand Down Expand Up @@ -454,7 +454,9 @@ impl Stream for GroupedHashAggregateStream {

if let Some(to_emit) = self.group_ordering.emit_to() {
let batch = extract_ok!(self.emit(to_emit, false));
self.exec_state = ExecutionState::ProducingOutput(batch);
let batches = self.split_batch(batch)?;
self.exec_state =
ExecutionState::ProducingOutput(batches);
timer.done();
// make sure the exec_state just set is not overwritten below
break 'reading_input;
Expand All @@ -475,25 +477,24 @@ impl Stream for GroupedHashAggregateStream {
}
}

ExecutionState::ProducingOutput(batch) => {
// slice off a part of the batch, if needed
ExecutionState::ProducingOutput(batches) => {
// batches is not empty
let len = batches.len();
assert!(len > 0);
let output_batch;
let size = self.batch_size;
(self.exec_state, output_batch) = if batch.num_rows() <= size {
(self.exec_state, output_batch) = if len == 1 {
(
if self.input_done {
ExecutionState::Done
} else {
ExecutionState::ReadingInput
},
batch.clone(),
batches[0].clone(),
)
} else {
// output first batch_size rows
let size = self.batch_size;
let num_remaining = batch.num_rows() - size;
let remaining = batch.slice(size, num_remaining);
let output = batch.slice(0, size);
// output first batches element
let remaining = batches[1..].to_vec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we push the batches in reverse order (treat batches as a stack) and pop to get each batch, I think could avoid calling to_vec() multiple times (see next comment)

let output = batches[0].clone();
(ExecutionState::ProducingOutput(remaining), output)
};
return Poll::Ready(Some(Ok(
Expand Down Expand Up @@ -728,7 +729,8 @@ impl GroupedHashAggregateStream {
{
let n = self.group_values.len() / self.batch_size * self.batch_size;
let batch = self.emit(EmitTo::First(n), false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
let batches = self.split_batch(batch)?;
self.exec_state = ExecutionState::ProducingOutput(batches);
}
Ok(())
}
Expand Down Expand Up @@ -787,7 +789,8 @@ impl GroupedHashAggregateStream {
let timer = elapsed_compute.timer();
self.exec_state = if self.spill_state.spills.is_empty() {
let batch = self.emit(EmitTo::All, false)?;
ExecutionState::ProducingOutput(batch)
let batches = self.split_batch(batch)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinik the key point of the request is to avoid the call to emit(EmitTo::All) or perhaps change that call to return a Vec

Taking a large single record batch and slicing it up doesn't change how the underlying memory is allocated / laid out (aka the same large contiguous batch is used)

ExecutionState::ProducingOutput(batches)
} else {
// If spill files exist, stream-merge them.
self.update_merged_stream()?;
Expand All @@ -796,4 +799,29 @@ impl GroupedHashAggregateStream {
timer.done();
Ok(())
}

fn split_batch(&self, batch: RecordBatch) -> Result<Vec<RecordBatch>> {
let mut batches = vec![];
let mut length = self.batch_size;
let len = batch.num_rows();
if len == 0 {
return Ok(vec![batch]);
}
for offset in (0..len).step_by(length) {
if offset + length > len {
length = len - offset;
}
let slice_columns = batch
.columns()
.iter()
.map(|array| {
let sliced_array = array.slice(offset, length);
sliced_array.to_owned()
})
.collect();
batches.push(RecordBatch::try_new(batch.schema().clone(), slice_columns)?);
}
Comment on lines +810 to +823
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this can be simplified by first calculating how many slices you'll need (batch.num_rows() / self.batch_size or something like that) which would allow you to preallocate the Vec in advance, and then you could push the RecordBatches in reverse order which allows popping in constant time (previous comment)


Ok(batches)
}
}
Loading