Skip to content

Commit fef38a7

Browse files
committed
only read subset of rows
1 parent b9de6d0 commit fef38a7

File tree

1 file changed

+75
-39
lines changed

1 file changed

+75
-39
lines changed

parquet/benches/arrow_reader_clickbench.rs

+75-39
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
use arrow::compute::kernels::cmp::neq;
2929
use arrow::record_batch::RecordBatch;
3030
use arrow_array::StringViewArray;
31+
use arrow_schema::{DataType, Fields, Schema};
3132
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
3233
use futures::StreamExt;
33-
use parquet::arrow::arrow_reader::{ArrowPredicate, ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter};
34+
use parquet::arrow::arrow_reader::{
35+
ArrowPredicate, ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter,
36+
};
3437
use parquet::arrow::{ArrowSchemaConverter, ParquetRecordBatchStreamBuilder, ProjectionMask};
3538
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3639
use parquet::schema::types::SchemaDescriptor;
@@ -154,6 +157,25 @@ enum QueryPattern {
154157
}
155158

156159
impl QueryPattern {
160+
/// Returns the number of (passing) rows to read from the dataset
161+
///
162+
/// There are 100M rows in the ClickBench dataset, and it is not necessary
163+
/// to read and filter all `100M` rows to get a representative set in this
164+
/// benchmark.
165+
pub(crate) fn row_count(&self) -> usize {
166+
match self {
167+
QueryPattern::SearchPhrase1 => 100_000,
168+
QueryPattern::SearchPhrase2 => 100_000,
169+
QueryPattern::SearchPhrase3 => 100_000,
170+
QueryPattern::SearchPhrase4 => 100_000,
171+
QueryPattern::SearchPhrase5 => 100_000,
172+
173+
QueryPattern::AdvEngine => 100_000,
174+
QueryPattern::MobilePhoneModel1 => 100_000,
175+
QueryPattern::MobilePhoneModel2 => 100_000,
176+
}
177+
}
178+
157179
pub(crate) fn row_filter(&self, schema: &SchemaDescriptor) -> RowFilter {
158180
let filter_columns = match self {
159181
QueryPattern::SearchPhrase1 => {
@@ -177,24 +199,11 @@ impl QueryPattern {
177199
move |record_batch: RecordBatch| {
178200
let search_phrase = record_batch.column(0);
179201

180-
// TODO make it utf8view
181-
println!("Search phrase type: {:?}", search_phrase.data_type());
182-
183202
let search_phrase = search_phrase
184203
.as_any()
185204
.downcast_ref::<StringViewArray>()
186205
.unwrap();
187-
let res = neq(search_phrase, &empty_string);
188-
189-
match &res {
190-
Ok(res) => {
191-
println!("Predicate result is: {:?}", res);
192-
}
193-
Err(e) => {
194-
println!("Predicate error is: {:?}", e);
195-
}
196-
}
197-
res
206+
neq(search_phrase, &empty_string)
198207
},
199208
)));
200209
}
@@ -314,10 +323,10 @@ fn benchmark_filters_and_projections(c: &mut Criterion) {
314323
let hits_path = &files.hits_path;
315324
let reader_metadata = load_metadata(hits_path);
316325
let schema_descr = reader_metadata.metadata().file_metadata().schema_descr();
317-
326+
318327
// Determine the correct selection
319328
let projection_mask = pattern.projection_mask(schema_descr);
320-
329+
321330
group.bench_function(bench_id, |b| {
322331
b.iter(|| {
323332
rt_captured.block_on(async {
@@ -327,18 +336,28 @@ fn benchmark_filters_and_projections(c: &mut Criterion) {
327336

328337
// Create the arrow predicate (does not implement clone, so must create it each time)
329338
let row_filter = pattern.row_filter(schema_descr);
330-
331-
let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_file, reader_metadata.clone())
332-
.with_batch_size(8192)
333-
.with_projection(projection_mask.clone())
334-
.with_row_filter(row_filter)
335-
.build()
336-
.unwrap();
337339

340+
let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
341+
parquet_file,
342+
reader_metadata.clone(),
343+
)
344+
.with_batch_size(8192)
345+
.with_projection(projection_mask.clone())
346+
.with_row_filter(row_filter)
347+
.build()
348+
.unwrap();
349+
350+
// read up to the row count limit rows from batches to see
351+
// how fast they can be read;
352+
let mut remain = pattern.row_count();
353+
338354
while let Some(b) = stream.next().await {
339-
// consume the batches to see how fast they can be read
340-
// checking for errors
341-
b.unwrap();
355+
let b = b.unwrap();
356+
let num_rows = b.num_rows();
357+
if remain <= num_rows {
358+
break;
359+
}
360+
remain -= num_rows;
342361
}
343362
})
344363
})
@@ -349,20 +368,37 @@ fn benchmark_filters_and_projections(c: &mut Criterion) {
349368
/// Loads Parquet metadata from the given path, including page indexes
350369
fn load_metadata(path: &PathBuf) -> ArrowReaderMetadata {
351370
let file = std::fs::File::open(path).unwrap();
352-
let options = ArrowReaderOptions::new()
353-
.with_page_index(true);
354-
let orig_metadata = ArrowReaderMetadata::load(&file, options.clone()).expect("parquet-metadata loading failed");
355-
371+
let options = ArrowReaderOptions::new().with_page_index(true);
372+
let orig_metadata =
373+
ArrowReaderMetadata::load(&file, options.clone()).expect("parquet-metadata loading failed");
374+
356375
// Update the arrow schema so that it reads View types for binary and utf8 columns
357-
let arrow_schema = orig_metadata.schema();
358-
359-
360-
let new_arrow_schema = arrow_schema.clone();
376+
let new_fields = orig_metadata
377+
.schema()
378+
.fields()
379+
.iter()
380+
.map(|f| {
381+
//println!("Converting field: {:?}", f);
382+
// Read UTF8 fields as Utf8View
383+
//
384+
// The clickbench_partitioned dataset has textual fields listed as
385+
// binary for some historical reason so translate Binary to Utf8View as
386+
// well
387+
if matches!(f.data_type(), DataType::Utf8 | DataType::Binary) {
388+
let new_field = f.as_ref().clone().with_data_type(DataType::Utf8View);
389+
//println!(" set to Utf8View");
390+
Arc::new(new_field)
391+
} else {
392+
// otherwise just clone the field
393+
Arc::clone(f)
394+
}
395+
})
396+
.collect::<Vec<_>>();
397+
398+
let new_arrow_schema = Arc::new(Schema::new(new_fields));
399+
361400
let new_options = options.with_schema(new_arrow_schema);
362-
ArrowReaderMetadata::try_new(
363-
Arc::clone(orig_metadata.metadata()),
364-
new_options,
365-
).unwrap()
401+
ArrowReaderMetadata::try_new(Arc::clone(orig_metadata.metadata()), new_options).unwrap()
366402
}
367403

368404
criterion_group!(benches, benchmark_filters_and_projections,);

0 commit comments

Comments
 (0)