-
Notifications
You must be signed in to change notification settings - Fork 1.5k
refactor: Generate GroupByHash output in multiple RecordBatches #9818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: guojidan <[email protected]>
I don't know Is this right to do, need some recommendation 😄 |
Thanks @guojidan -- I will try and check it out over the next day or two. Unfortunately I am backed up on reviews at the moment |
/// When producing output, the remaining rows to output are stored | ||
/// here and are sliced off as needed in batch_size chunks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might wanna update this comment
let remaining = batch.slice(size, num_remaining); | ||
let output = batch.slice(0, size); | ||
// output first batches element | ||
let remaining = batches[1..].to_vec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we push the batches in reverse order (treat batches
as a stack) and pop to get each batch, I think could avoid calling to_vec()
multiple times (see next comment)
for offset in (0..len).step_by(length) { | ||
if offset + length > len { | ||
length = len - offset; | ||
} | ||
let slice_columns = batch | ||
.columns() | ||
.iter() | ||
.map(|array| { | ||
let sliced_array = array.slice(offset, length); | ||
sliced_array.to_owned() | ||
}) | ||
.collect(); | ||
batches.push(RecordBatch::try_new(batch.schema().clone(), slice_columns)?); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this can be simplified by first calculating how many slices you'll need (batch.num_rows() / self.batch_size
or something like that) which would allow you to preallocate the Vec
in advance, and then you could push the RecordBatches
in reverse order which allows popping in constant time (previous comment)
@@ -787,7 +789,8 @@ impl GroupedHashAggregateStream { | |||
let timer = elapsed_compute.timer(); | |||
self.exec_state = if self.spill_state.spills.is_empty() { | |||
let batch = self.emit(EmitTo::All, false)?; | |||
ExecutionState::ProducingOutput(batch) | |||
let batches = self.split_batch(batch)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thinik the key point of the request is to avoid the call to emit(EmitTo::All)
or perhaps change that call to return a Vec
Taking a large single record batch and slicing it up doesn't change how the underlying memory is allocated / laid out (aka the same large contiguous batch is used)
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Which issue does this PR close?
Closes #9562 .
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?