Skip to content

Commit 66390ff

Browse files
efredineEric Fredinealamb
authored
Add parquet StatisticsConverter for arrow reader (#6046)
* Adds arrow statistics converter for parquet stastistics. * Adds integration tests for arrow statsistics converter. * Fix linting, remove todo, re-use arrow code. * Remove commented out debug::log statements. * Move parquet_column to lib.rs * doc tweaks * Add benchmark * Add parquet_column_index and arrow_field accessors + test * Copy edit docs obsessively * clippy --------- Co-authored-by: Eric Fredine <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 6d4e2f2 commit 66390ff

File tree

7 files changed

+5989
-1
lines changed

7 files changed

+5989
-1
lines changed

parquet/Cargo.toml

+11-1
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ path = "./examples/read_with_rowgroup.rs"
134134
name = "arrow_writer_layout"
135135
required-features = ["arrow"]
136136

137+
[[test]]
138+
name = "arrow_reader"
139+
required-features = ["arrow"]
140+
path = "./tests/arrow_reader/mod.rs"
141+
137142
[[bin]]
138143
name = "parquet-read"
139144
required-features = ["cli"]
@@ -180,6 +185,12 @@ name = "arrow_reader"
180185
required-features = ["arrow", "test_common", "experimental"]
181186
harness = false
182187

188+
[[bench]]
189+
name = "arrow_statistics"
190+
required-features = ["arrow"]
191+
harness = false
192+
193+
183194
[[bench]]
184195
name = "compression"
185196
required-features = ["experimental", "default"]
@@ -190,7 +201,6 @@ name = "encoding"
190201
required-features = ["experimental", "default"]
191202
harness = false
192203

193-
194204
[[bench]]
195205
name = "metadata"
196206
harness = false

parquet/benches/arrow_statistics.rs

+269
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks of benchmark for extracting arrow statistics from parquet
19+
20+
use arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray, UInt64Array};
21+
use arrow_array::{Int32Array, Int64Array, RecordBatch};
22+
use arrow_schema::{
23+
DataType::{self, *},
24+
Field, Schema,
25+
};
26+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
27+
use parquet::{arrow::arrow_reader::ArrowReaderOptions, file::properties::WriterProperties};
28+
use parquet::{
29+
arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter},
30+
file::properties::EnabledStatistics,
31+
};
32+
use std::sync::Arc;
33+
use tempfile::NamedTempFile;
34+
#[derive(Debug, Clone)]
35+
enum TestTypes {
36+
UInt64,
37+
Int64,
38+
F64,
39+
String,
40+
Dictionary,
41+
}
42+
43+
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
44+
use std::fmt;
45+
46+
impl fmt::Display for TestTypes {
47+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48+
match self {
49+
TestTypes::UInt64 => write!(f, "UInt64"),
50+
TestTypes::Int64 => write!(f, "Int64"),
51+
TestTypes::F64 => write!(f, "F64"),
52+
TestTypes::String => write!(f, "String"),
53+
TestTypes::Dictionary => write!(f, "Dictionary(Int32, String)"),
54+
}
55+
}
56+
}
57+
58+
fn create_parquet_file(
59+
dtype: TestTypes,
60+
row_groups: usize,
61+
data_page_row_count_limit: &Option<usize>,
62+
) -> NamedTempFile {
63+
let schema = match dtype {
64+
TestTypes::UInt64 => Arc::new(Schema::new(vec![Field::new("col", DataType::UInt64, true)])),
65+
TestTypes::Int64 => Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, true)])),
66+
TestTypes::F64 => Arc::new(Schema::new(vec![Field::new(
67+
"col",
68+
DataType::Float64,
69+
true,
70+
)])),
71+
TestTypes::String => Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, true)])),
72+
TestTypes::Dictionary => Arc::new(Schema::new(vec![Field::new(
73+
"col",
74+
DataType::Dictionary(Box::new(Int32), Box::new(Utf8)),
75+
true,
76+
)])),
77+
};
78+
79+
let mut props = WriterProperties::builder().set_max_row_group_size(row_groups);
80+
if let Some(limit) = data_page_row_count_limit {
81+
props = props
82+
.set_data_page_row_count_limit(*limit)
83+
.set_statistics_enabled(EnabledStatistics::Page);
84+
};
85+
let props = props.build();
86+
87+
let file = tempfile::Builder::new()
88+
.suffix(".parquet")
89+
.tempfile()
90+
.unwrap();
91+
let mut writer =
92+
ArrowWriter::try_new(file.reopen().unwrap(), schema.clone(), Some(props)).unwrap();
93+
94+
for _ in 0..row_groups {
95+
let batch = match dtype {
96+
TestTypes::UInt64 => make_uint64_batch(),
97+
TestTypes::Int64 => make_int64_batch(),
98+
TestTypes::F64 => make_f64_batch(),
99+
TestTypes::String => make_string_batch(),
100+
TestTypes::Dictionary => make_dict_batch(),
101+
};
102+
if data_page_row_count_limit.is_some() {
103+
// Send batches one at a time. This allows the
104+
// writer to apply the page limit, that is only
105+
// checked on RecordBatch boundaries.
106+
for i in 0..batch.num_rows() {
107+
writer.write(&batch.slice(i, 1)).unwrap();
108+
}
109+
} else {
110+
writer.write(&batch).unwrap();
111+
}
112+
}
113+
writer.close().unwrap();
114+
file
115+
}
116+
117+
fn make_uint64_batch() -> RecordBatch {
118+
let array: ArrayRef = Arc::new(UInt64Array::from(vec![
119+
Some(1),
120+
Some(2),
121+
Some(3),
122+
Some(4),
123+
Some(5),
124+
]));
125+
RecordBatch::try_new(
126+
Arc::new(arrow::datatypes::Schema::new(vec![
127+
arrow::datatypes::Field::new("col", UInt64, false),
128+
])),
129+
vec![array],
130+
)
131+
.unwrap()
132+
}
133+
134+
fn make_int64_batch() -> RecordBatch {
135+
let array: ArrayRef = Arc::new(Int64Array::from(vec![
136+
Some(1),
137+
Some(2),
138+
Some(3),
139+
Some(4),
140+
Some(5),
141+
]));
142+
RecordBatch::try_new(
143+
Arc::new(arrow::datatypes::Schema::new(vec![
144+
arrow::datatypes::Field::new("col", Int64, false),
145+
])),
146+
vec![array],
147+
)
148+
.unwrap()
149+
}
150+
151+
fn make_f64_batch() -> RecordBatch {
152+
let array: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]));
153+
RecordBatch::try_new(
154+
Arc::new(arrow::datatypes::Schema::new(vec![
155+
arrow::datatypes::Field::new("col", Float64, false),
156+
])),
157+
vec![array],
158+
)
159+
.unwrap()
160+
}
161+
162+
fn make_string_batch() -> RecordBatch {
163+
let array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
164+
RecordBatch::try_new(
165+
Arc::new(arrow::datatypes::Schema::new(vec![
166+
arrow::datatypes::Field::new("col", Utf8, false),
167+
])),
168+
vec![array],
169+
)
170+
.unwrap()
171+
}
172+
173+
fn make_dict_batch() -> RecordBatch {
174+
let keys = Int32Array::from(vec![0, 1, 2, 3, 4]);
175+
let values = StringArray::from(vec!["a", "b", "c", "d", "e"]);
176+
let array: ArrayRef = Arc::new(DictionaryArray::try_new(keys, Arc::new(values)).unwrap());
177+
RecordBatch::try_new(
178+
Arc::new(Schema::new(vec![Field::new(
179+
"col",
180+
Dictionary(Box::new(Int32), Box::new(Utf8)),
181+
false,
182+
)])),
183+
vec![array],
184+
)
185+
.unwrap()
186+
}
187+
188+
fn criterion_benchmark(c: &mut Criterion) {
189+
let row_groups = 100;
190+
use TestTypes::*;
191+
let types = vec![Int64, UInt64, F64, String, Dictionary];
192+
let data_page_row_count_limits = vec![None, Some(1)];
193+
194+
for dtype in types {
195+
for data_page_row_count_limit in &data_page_row_count_limits {
196+
let file = create_parquet_file(dtype.clone(), row_groups, data_page_row_count_limit);
197+
let file = file.reopen().unwrap();
198+
let options = ArrowReaderOptions::new().with_page_index(true);
199+
let reader = ArrowReaderBuilder::try_new_with_options(file, options).unwrap();
200+
let metadata = reader.metadata();
201+
let row_groups = metadata.row_groups();
202+
let row_group_indices: Vec<_> = (0..row_groups.len()).collect();
203+
204+
let statistic_type = if data_page_row_count_limit.is_some() {
205+
"data page"
206+
} else {
207+
"row group"
208+
};
209+
210+
let mut group = c.benchmark_group(format!(
211+
"Extract {} statistics for {}",
212+
statistic_type,
213+
dtype.clone()
214+
));
215+
group.bench_function(BenchmarkId::new("extract_statistics", dtype.clone()), |b| {
216+
b.iter(|| {
217+
let converter = StatisticsConverter::try_new(
218+
"col",
219+
reader.schema(),
220+
reader.parquet_schema(),
221+
)
222+
.unwrap();
223+
224+
if data_page_row_count_limit.is_some() {
225+
let column_page_index = reader
226+
.metadata()
227+
.column_index()
228+
.expect("File should have column page indices");
229+
230+
let column_offset_index = reader
231+
.metadata()
232+
.offset_index()
233+
.expect("File should have column offset indices");
234+
235+
let _ = converter.data_page_mins(
236+
column_page_index,
237+
column_offset_index,
238+
&row_group_indices,
239+
);
240+
let _ = converter.data_page_maxes(
241+
column_page_index,
242+
column_offset_index,
243+
&row_group_indices,
244+
);
245+
let _ = converter.data_page_null_counts(
246+
column_page_index,
247+
column_offset_index,
248+
&row_group_indices,
249+
);
250+
let _ = converter.data_page_row_counts(
251+
column_offset_index,
252+
row_groups,
253+
&row_group_indices,
254+
);
255+
} else {
256+
let _ = converter.row_group_mins(row_groups.iter()).unwrap();
257+
let _ = converter.row_group_maxes(row_groups.iter()).unwrap();
258+
let _ = converter.row_group_null_counts(row_groups.iter()).unwrap();
259+
let _ = converter.row_group_row_counts(row_groups.iter()).unwrap();
260+
}
261+
})
262+
});
263+
group.finish();
264+
}
265+
}
266+
}
267+
268+
criterion_group!(benches, criterion_benchmark);
269+
criterion_main!(benches);

parquet/src/arrow/arrow_reader/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::schema::types::SchemaDescriptor;
4242

4343
mod filter;
4444
mod selection;
45+
pub mod statistics;
4546

4647
/// Builder for constructing parquet readers into arrow.
4748
///

0 commit comments

Comments
 (0)