Skip to content

Commit ba2bf46

Browse files
zhuqi-lucasNirnay Roy
authored and
Nirnay Roy
committed
Add utf8view benchmark for aggregate topk (apache#15518)
* Fix * fix fmt
1 parent 3acb73b commit ba2bf46

File tree

3 files changed

+97
-15
lines changed

3 files changed

+97
-15
lines changed

datafusion/core/benches/data_utils/mod.rs

+51-9
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
2020
use arrow::array::{
2121
builder::{Int64Builder, StringBuilder},
22-
Float32Array, Float64Array, RecordBatch, StringArray, UInt64Array,
22+
ArrayRef, Float32Array, Float64Array, RecordBatch, StringArray, StringViewBuilder,
23+
UInt64Array,
2324
};
2425
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2526
use datafusion::datasource::MemTable;
@@ -158,13 +159,39 @@ pub fn create_record_batches(
158159
.collect::<Vec<_>>()
159160
}
160161

162+
/// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder
163+
/// so that both can be used interchangeably.
164+
enum TraceIdBuilder {
165+
Utf8(StringBuilder),
166+
Utf8View(StringViewBuilder),
167+
}
168+
169+
impl TraceIdBuilder {
170+
/// Append a value to the builder.
171+
fn append_value(&mut self, value: &str) {
172+
match self {
173+
TraceIdBuilder::Utf8(builder) => builder.append_value(value),
174+
TraceIdBuilder::Utf8View(builder) => builder.append_value(value),
175+
}
176+
}
177+
178+
/// Finish building and return the ArrayRef.
179+
fn finish(self) -> ArrayRef {
180+
match self {
181+
TraceIdBuilder::Utf8(mut builder) => Arc::new(builder.finish()),
182+
TraceIdBuilder::Utf8View(mut builder) => Arc::new(builder.finish()),
183+
}
184+
}
185+
}
186+
161187
/// Create time series data with `partition_cnt` partitions and `sample_cnt` rows per partition
162188
/// in ascending order, if `asc` is true, otherwise randomly sampled using a Pareto distribution
163189
#[allow(dead_code)]
164190
pub(crate) fn make_data(
165191
partition_cnt: i32,
166192
sample_cnt: i32,
167193
asc: bool,
194+
use_view: bool,
168195
) -> Result<(Arc<Schema>, Vec<Vec<RecordBatch>>), DataFusionError> {
169196
// constants observed from trace data
170197
let simultaneous_group_cnt = 2000;
@@ -177,11 +204,17 @@ pub(crate) fn make_data(
177204
let mut rng = rand::rngs::SmallRng::from_seed([0; 32]);
178205

179206
// populate data
180-
let schema = test_schema();
207+
let schema = test_schema(use_view);
181208
let mut partitions = vec![];
182209
let mut cur_time = 16909000000000i64;
183210
for _ in 0..partition_cnt {
184-
let mut id_builder = StringBuilder::new();
211+
// Choose the appropriate builder based on use_view.
212+
let mut id_builder = if use_view {
213+
TraceIdBuilder::Utf8View(StringViewBuilder::new())
214+
} else {
215+
TraceIdBuilder::Utf8(StringBuilder::new())
216+
};
217+
185218
let mut ts_builder = Int64Builder::new();
186219
let gen_id = |rng: &mut rand::rngs::SmallRng| {
187220
rng.gen::<[u8; 16]>()
@@ -230,10 +263,19 @@ pub(crate) fn make_data(
230263
Ok((schema, partitions))
231264
}
232265

233-
/// The Schema used by make_data
234-
fn test_schema() -> SchemaRef {
235-
Arc::new(Schema::new(vec![
236-
Field::new("trace_id", DataType::Utf8, false),
237-
Field::new("timestamp_ms", DataType::Int64, false),
238-
]))
266+
/// Returns a Schema based on the use_view flag
267+
fn test_schema(use_view: bool) -> SchemaRef {
268+
if use_view {
269+
// Return Utf8View schema
270+
Arc::new(Schema::new(vec![
271+
Field::new("trace_id", DataType::Utf8View, false),
272+
Field::new("timestamp_ms", DataType::Int64, false),
273+
]))
274+
} else {
275+
// Return regular Utf8 schema
276+
Arc::new(Schema::new(vec![
277+
Field::new("trace_id", DataType::Utf8, false),
278+
Field::new("timestamp_ms", DataType::Int64, false),
279+
]))
280+
}
239281
}

datafusion/core/benches/distinct_query_sql.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ pub async fn create_context_sampled_data(
133133
partition_cnt: i32,
134134
sample_cnt: i32,
135135
) -> Result<(Arc<dyn ExecutionPlan>, Arc<TaskContext>)> {
136-
let (schema, parts) = make_data(partition_cnt, sample_cnt, false /* asc */).unwrap();
136+
let (schema, parts) =
137+
make_data(partition_cnt, sample_cnt, false /* asc */, false).unwrap();
137138
let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap());
138139

139140
// Create the DataFrame

datafusion/core/benches/topk_aggregate.rs

+44-5
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ async fn create_context(
3333
sample_cnt: i32,
3434
asc: bool,
3535
use_topk: bool,
36+
use_view: bool,
3637
) -> Result<(Arc<dyn ExecutionPlan>, Arc<TaskContext>)> {
37-
let (schema, parts) = make_data(partition_cnt, sample_cnt, asc).unwrap();
38+
let (schema, parts) = make_data(partition_cnt, sample_cnt, asc, use_view).unwrap();
3839
let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap());
3940

4041
// Create the DataFrame
@@ -108,7 +109,7 @@ fn criterion_benchmark(c: &mut Criterion) {
108109
|b| {
109110
b.iter(|| {
110111
let real = rt.block_on(async {
111-
create_context(limit, partitions, samples, false, false)
112+
create_context(limit, partitions, samples, false, false, false)
112113
.await
113114
.unwrap()
114115
});
@@ -122,7 +123,7 @@ fn criterion_benchmark(c: &mut Criterion) {
122123
|b| {
123124
b.iter(|| {
124125
let asc = rt.block_on(async {
125-
create_context(limit, partitions, samples, true, false)
126+
create_context(limit, partitions, samples, true, false, false)
126127
.await
127128
.unwrap()
128129
});
@@ -140,7 +141,7 @@ fn criterion_benchmark(c: &mut Criterion) {
140141
|b| {
141142
b.iter(|| {
142143
let topk_real = rt.block_on(async {
143-
create_context(limit, partitions, samples, false, true)
144+
create_context(limit, partitions, samples, false, true, false)
144145
.await
145146
.unwrap()
146147
});
@@ -158,7 +159,45 @@ fn criterion_benchmark(c: &mut Criterion) {
158159
|b| {
159160
b.iter(|| {
160161
let topk_asc = rt.block_on(async {
161-
create_context(limit, partitions, samples, true, true)
162+
create_context(limit, partitions, samples, true, true, false)
163+
.await
164+
.unwrap()
165+
});
166+
run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true)
167+
})
168+
},
169+
);
170+
171+
// Utf8View schema,time-series rows
172+
c.bench_function(
173+
format!(
174+
"top k={limit} aggregate {} time-series rows [Utf8View]",
175+
partitions * samples
176+
)
177+
.as_str(),
178+
|b| {
179+
b.iter(|| {
180+
let topk_real = rt.block_on(async {
181+
create_context(limit, partitions, samples, false, true, true)
182+
.await
183+
.unwrap()
184+
});
185+
run(&rt, topk_real.0.clone(), topk_real.1.clone(), false)
186+
})
187+
},
188+
);
189+
190+
// Utf8View schema,worst-case rows
191+
c.bench_function(
192+
format!(
193+
"top k={limit} aggregate {} worst-case rows [Utf8View]",
194+
partitions * samples
195+
)
196+
.as_str(),
197+
|b| {
198+
b.iter(|| {
199+
let topk_asc = rt.block_on(async {
200+
create_context(limit, partitions, samples, true, true, true)
162201
.await
163202
.unwrap()
164203
});

0 commit comments

Comments
 (0)