Skip to content

Commit bc6a2dc

Browse files
authored
feat: use arrow row format for hash-group-by (#4830)
For #2723. This has two effects: - **wider feature support:** We now use the V2 aggregator for all group-column types. The arrow row format support is sufficient for that. V1 will only be used if the aggregator itself doesn't support V2 (and these are quite a few at the moment). We'll improve on that front in follow-up PRs. - **more speed:** Turns out the arrow row format is also faster (see below). Perf results (mind the noise in the benchmarks that are actually not even touched by this code change): ```text ❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue2723a-pre ... Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-fdbe5671f9c3019b) aggregate_query_no_group_by 15 12 time: [779.28 µs 782.77 µs 786.28 µs] change: [+2.1375% +2.7672% +3.4171%] (p = 0.00 < 0.05) Performance has regressed. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild aggregate_query_no_group_by_min_max_f64 time: [712.96 µs 715.90 µs 719.14 µs] change: [+0.8379% +1.7648% +2.6345%] (p = 0.00 < 0.05) Change within noise threshold. Found 10 outliers among 100 measurements (10.00%) 3 (3.00%) low mild 6 (6.00%) high mild 1 (1.00%) high severe Benchmarking aggregate_query_no_group_by_count_distinct_wide: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50. aggregate_query_no_group_by_count_distinct_wide time: [1.7297 ms 1.7399 ms 1.7503 ms] change: [-34.647% -33.908% -33.165%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 5 (5.00%) high mild Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60. aggregate_query_no_group_by_count_distinct_narrow time: [1.0984 ms 1.1045 ms 1.1115 ms] change: [-38.581% -38.076% -37.569%] (p = 0.00 < 0.05) Performance has improved. Found 6 outliers among 100 measurements (6.00%) 1 (1.00%) low mild 5 (5.00%) high mild Benchmarking aggregate_query_group_by: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.1s, enable flat sampling, or reduce sample count to 50. aggregate_query_group_by time: [1.7810 ms 1.7925 ms 1.8057 ms] change: [-25.252% -24.127% -22.737%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 2 (2.00%) low mild 5 (5.00%) high mild 2 (2.00%) high severe Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60. aggregate_query_group_by_with_filter time: [1.2068 ms 1.2119 ms 1.2176 ms] change: [+2.2847% +3.0857% +3.8789%] (p = 0.00 < 0.05) Performance has regressed. Found 10 outliers among 100 measurements (10.00%) 1 (1.00%) low mild 7 (7.00%) high mild 2 (2.00%) high severe Benchmarking aggregate_query_group_by_u64 15 12: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50. aggregate_query_group_by_u64 15 12 time: [1.6762 ms 1.6848 ms 1.6942 ms] change: [-29.598% -28.603% -27.400%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 1 (1.00%) low mild 1 (1.00%) high mild 6 (6.00%) high severe Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60. aggregate_query_group_by_with_filter_u64 15 12 time: [1.1969 ms 1.2008 ms 1.2049 ms] change: [+1.3015% +2.1513% +3.0016%] (p = 0.00 < 0.05) Performance has regressed. Found 6 outliers among 100 measurements (6.00%) 1 (1.00%) low severe 2 (2.00%) high mild 3 (3.00%) high severe aggregate_query_group_by_u64_multiple_keys time: [14.797 ms 15.112 ms 15.427 ms] change: [-12.072% -8.7274% -5.3392%] (p = 0.00 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild aggregate_query_approx_percentile_cont_on_u64 time: [4.1278 ms 4.1687 ms 4.2098 ms] change: [+0.4851% +1.9525% +3.3676%] (p = 0.01 < 0.05) Change within noise threshold. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) low mild 1 (1.00%) high mild aggregate_query_approx_percentile_cont_on_f32 time: [3.4694 ms 3.4967 ms 3.5245 ms] change: [-1.5467% -0.4432% +0.6609%] (p = 0.43 > 0.05) No change in performance detected. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild ```
1 parent 169b522 commit bc6a2dc

File tree

3 files changed

+61
-34
lines changed

3 files changed

+61
-34
lines changed

datafusion/core/src/physical_plan/aggregates/mod.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
5252
use datafusion_physical_expr::equivalence::project_equivalence_properties;
5353
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
5454
use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
55-
use datafusion_row::{row_supported, RowType};
5655

5756
/// Hash aggregate modes
5857
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -273,9 +272,7 @@ impl AggregateExec {
273272
}
274273

275274
fn row_aggregate_supported(&self) -> bool {
276-
let group_schema = group_schema(&self.schema, self.group_by.expr.len());
277-
row_supported(&group_schema, RowType::Compact)
278-
&& accumulator_v2_supported(&self.aggr_expr)
275+
accumulator_v2_supported(&self.aggr_expr)
279276
}
280277

281278
fn execute_typed(

datafusion/core/src/physical_plan/aggregates/row_hash.rs

+27-30
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::task::{Context, Poll};
2222
use std::vec;
2323

2424
use ahash::RandomState;
25+
use arrow::row::{OwnedRow, RowConverter, SortField};
26+
use datafusion_physical_expr::hash_utils::create_row_hashes_v2;
2527
use futures::stream::BoxStream;
2628
use futures::stream::{Stream, StreamExt};
2729

@@ -32,7 +34,6 @@ use crate::physical_plan::aggregates::{
3234
evaluate_group_by, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
3335
PhysicalGroupBy,
3436
};
35-
use crate::physical_plan::hash_utils::create_row_hashes;
3637
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
3738
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
3839
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
@@ -50,7 +51,6 @@ use datafusion_common::ScalarValue;
5051
use datafusion_row::accessor::RowAccessor;
5152
use datafusion_row::layout::RowLayout;
5253
use datafusion_row::reader::{read_row, RowReader};
53-
use datafusion_row::writer::{write_row, RowWriter};
5454
use datafusion_row::{MutableRecordBatch, RowType};
5555
use hashbrown::raw::RawTable;
5656

@@ -90,7 +90,7 @@ struct GroupedHashAggregateStreamV2Inner {
9090
group_by: PhysicalGroupBy,
9191
accumulators: Vec<AccumulatorItemV2>,
9292

93-
group_schema: SchemaRef,
93+
row_converter: RowConverter,
9494
aggr_schema: SchemaRef,
9595
aggr_layout: Arc<RowLayout>,
9696

@@ -136,6 +136,13 @@ impl GroupedHashAggregateStreamV2 {
136136
let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;
137137

138138
let group_schema = group_schema(&schema, group_by.expr.len());
139+
let row_converter = RowConverter::new(
140+
group_schema
141+
.fields()
142+
.iter()
143+
.map(|f| SortField::new(f.data_type().clone()))
144+
.collect(),
145+
)?;
139146
let aggr_schema = aggr_state_schema(&aggr_expr)?;
140147

141148
let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned));
@@ -157,7 +164,7 @@ impl GroupedHashAggregateStreamV2 {
157164
input,
158165
group_by,
159166
accumulators,
160-
group_schema,
167+
row_converter,
161168
aggr_schema,
162169
aggr_layout,
163170
baseline_metrics,
@@ -181,7 +188,7 @@ impl GroupedHashAggregateStreamV2 {
181188
&this.random_state,
182189
&this.group_by,
183190
&mut this.accumulators,
184-
&this.group_schema,
191+
&mut this.row_converter,
185192
this.aggr_layout.clone(),
186193
batch,
187194
&mut this.aggr_state,
@@ -205,7 +212,7 @@ impl GroupedHashAggregateStreamV2 {
205212
let timer = this.baseline_metrics.elapsed_compute().timer();
206213
let result = create_batch_from_map(
207214
&this.mode,
208-
&this.group_schema,
215+
&this.row_converter,
209216
&this.aggr_schema,
210217
this.batch_size,
211218
this.row_group_skip_position,
@@ -270,7 +277,7 @@ fn group_aggregate_batch(
270277
random_state: &RandomState,
271278
grouping_set: &PhysicalGroupBy,
272279
accumulators: &mut [AccumulatorItemV2],
273-
group_schema: &Schema,
280+
row_converter: &mut RowConverter,
274281
state_layout: Arc<RowLayout>,
275282
batch: RecordBatch,
276283
aggr_state: &mut AggregationState,
@@ -283,9 +290,10 @@ fn group_aggregate_batch(
283290
map, group_states, ..
284291
} = aggr_state;
285292
let mut allocated = 0usize;
293+
let row_converter_size_pre = row_converter.size();
286294

287295
for group_values in grouping_by_values {
288-
let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema);
296+
let group_rows = row_converter.convert_columns(&group_values)?;
289297

290298
// evaluate the aggregation expressions.
291299
// We could evaluate them after the `take`, but since we need to evaluate all
@@ -301,15 +309,15 @@ fn group_aggregate_batch(
301309

302310
// 1.1 Calculate the group keys for the group values
303311
let mut batch_hashes = vec![0; batch.num_rows()];
304-
create_row_hashes(&group_rows, random_state, &mut batch_hashes)?;
312+
create_row_hashes_v2(&group_rows, random_state, &mut batch_hashes)?;
305313

306314
for (row, hash) in batch_hashes.into_iter().enumerate() {
307315
let entry = map.get_mut(hash, |(_hash, group_idx)| {
308316
// verify that a group that we are inserting with hash is
309317
// actually the same key value as the group in
310318
// existing_idx (aka group_values @ row)
311319
let group_state = &group_states[*group_idx];
312-
group_rows[row] == group_state.group_by_values
320+
group_rows.row(row) == group_state.group_by_values.row()
313321
});
314322

315323
match entry {
@@ -330,7 +338,7 @@ fn group_aggregate_batch(
330338
None => {
331339
// Add new entry to group_states and save newly created index
332340
let group_state = RowGroupState {
333-
group_by_values: group_rows[row].clone(),
341+
group_by_values: group_rows.row(row).owned(),
334342
aggregation_buffer: vec![0; state_layout.fixed_part_width()],
335343
indices: vec![row as u32], // 1.3
336344
};
@@ -339,7 +347,7 @@ fn group_aggregate_batch(
339347
// NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
340348
// `group_states` (see allocation down below)
341349
allocated += (std::mem::size_of::<u8>()
342-
* group_state.group_by_values.capacity())
350+
* group_state.group_by_values.as_ref().len())
343351
+ (std::mem::size_of::<u8>()
344352
* group_state.aggregation_buffer.capacity())
345353
+ (std::mem::size_of::<u32>() * group_state.indices.capacity());
@@ -438,14 +446,16 @@ fn group_aggregate_batch(
438446
})?;
439447
}
440448

449+
allocated += row_converter.size().saturating_sub(row_converter_size_pre);
450+
441451
Ok(allocated)
442452
}
443453

444454
/// The state that is built for each output group.
445455
#[derive(Debug)]
446456
struct RowGroupState {
447-
/// The actual group by values, stored sequentially
448-
group_by_values: Vec<u8>,
457+
// Group key.
458+
group_by_values: OwnedRow,
449459

450460
// Accumulator state, stored sequentially
451461
aggregation_buffer: Vec<u8>,
@@ -483,23 +493,11 @@ impl std::fmt::Debug for AggregationState {
483493
}
484494
}
485495

486-
/// Create grouping rows
487-
fn create_group_rows(arrays: Vec<ArrayRef>, schema: &Schema) -> Vec<Vec<u8>> {
488-
let mut writer = RowWriter::new(schema, RowType::Compact);
489-
let mut results = vec![];
490-
for cur_row in 0..arrays[0].len() {
491-
write_row(&mut writer, cur_row, schema, &arrays);
492-
results.push(writer.get_row().to_vec());
493-
writer.reset()
494-
}
495-
results
496-
}
497-
498496
/// Create a RecordBatch with all group keys and accumulator' states or values.
499497
#[allow(clippy::too_many_arguments)]
500498
fn create_batch_from_map(
501499
mode: &AggregateMode,
502-
group_schema: &Schema,
500+
converter: &RowConverter,
503501
aggr_schema: &Schema,
504502
batch_size: usize,
505503
skip_items: usize,
@@ -524,11 +522,10 @@ fn create_batch_from_map(
524522
.iter()
525523
.skip(skip_items)
526524
.take(batch_size)
527-
.map(|gs| (gs.group_by_values.clone(), gs.aggregation_buffer.clone()))
525+
.map(|gs| (gs.group_by_values.row(), gs.aggregation_buffer.clone()))
528526
.unzip();
529527

530-
let mut columns: Vec<ArrayRef> =
531-
read_as_batch(&group_buffers, group_schema, RowType::Compact);
528+
let mut columns: Vec<ArrayRef> = converter.convert_rows(group_buffers)?;
532529

533530
match mode {
534531
AggregateMode::Partial => columns.extend(read_as_batch(

datafusion/physical-expr/src/hash_utils.rs

+33
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use ahash::RandomState;
2121
use arrow::array::*;
2222
use arrow::datatypes::*;
23+
use arrow::row::Rows;
2324
use arrow::{downcast_dictionary_array, downcast_primitive_array};
2425
use arrow_buffer::i256;
2526
use datafusion_common::{
@@ -249,6 +250,38 @@ pub fn create_hashes<'a>(
249250
Ok(hashes_buffer)
250251
}
251252

253+
/// Test version of `create_row_hashes_v2` that produces the same value for
254+
/// all hashes (to test collisions)
255+
///
256+
/// See comments on `hashes_buffer` for more details
257+
#[cfg(feature = "force_hash_collisions")]
258+
pub fn create_row_hashes_v2<'a>(
259+
_rows: &Rows,
260+
_random_state: &RandomState,
261+
hashes_buffer: &'a mut Vec<u64>,
262+
) -> Result<&'a mut Vec<u64>> {
263+
for hash in hashes_buffer.iter_mut() {
264+
*hash = 0
265+
}
266+
Ok(hashes_buffer)
267+
}
268+
269+
/// Creates hash values for every row, based on their raw bytes.
270+
#[cfg(not(feature = "force_hash_collisions"))]
271+
pub fn create_row_hashes_v2<'a>(
272+
rows: &Rows,
273+
random_state: &RandomState,
274+
hashes_buffer: &'a mut Vec<u64>,
275+
) -> Result<&'a mut Vec<u64>> {
276+
for hash in hashes_buffer.iter_mut() {
277+
*hash = 0
278+
}
279+
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
280+
*hash = random_state.hash_one(rows.row(i));
281+
}
282+
Ok(hashes_buffer)
283+
}
284+
252285
#[cfg(test)]
253286
mod tests {
254287
use crate::from_slice::FromSlice;

0 commit comments

Comments
 (0)