-
Notifications
You must be signed in to change notification settings - Fork 916
Add benchmark for parquet reader with row_filter and project settings #7401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @zhuqi-lucas -- this is a great start and very much appreciated. I left some comments on the structure so far, and I am going to spend some time now writing up details of what we are testing. I'll have a proposal soon for your consideration
Thank you so much again. This project is really important, but I think requires focus and determination
let num_rows = 100_000; | ||
|
||
// int64 column: sequential numbers 0..num_rows | ||
let int_values: Vec<i64> = (0..num_rows as i64).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is more common to use fixed seeded random values to create values to avoid artifacts that such regular patterns may introduce
There are some good examples here: https://github.com/apache/arrow-rs/blob/d0260fcffa07a4cb8650cc290ab29027a3a8e65c/parquet/benches/arrow_writer.rs#L101-L100
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically, I suggest:
- Random ints
- Random floats
- ordered timestamps (as you have them, as that is quite common)
- Random strings
For string view, it is also important to include strings that are more than 12 bytes long (strings less than this are entirely inlined into the view)
I printed out the data it is quite regular:
Running benches/arrow_reader_row_filter.rs (target/debug/deps/arrow_reader_row_filter-6404dc89531cf7fd)
Gnuplot not found, using plotters backend
Batch created with 100000 rows
First 100 rows
+-------+---------------------+----------+-------------------------+
| int64 | float64 | utf8View | ts |
+-------+---------------------+----------+-------------------------+
| 0 | 0.0 | const | 1970-01-01T00:00:00 |
| 1 | 0.1 | | 1970-01-01T00:00:00.001 |
| 2 | 0.2 | nonempty | 1970-01-01T00:00:00.002 |
| 3 | 0.30000000000000004 | | 1970-01-01T00:00:00.003 |
| 4 | 0.4 | nonempty | 1970-01-01T00:00:00.004 |
| 5 | 0.5 | | 1970-01-01T00:00:00.005 |
| 6 | 0.6000000000000001 | nonempty | 1970-01-01T00:00:00.006 |
| 7 | 0.7000000000000001 | | 1970-01-01T00:00:00.007 |
| 8 | 0.8 | nonempty | 1970-01-01T00:00:00.008 |
| 9 | 0.9 | | 1970-01-01T00:00:00.009 |
| 10 | 1.0 | nonempty | 1970-01-01T00:00:00.010 |
| 11 | 1.1 | | 1970-01-01T00:00:00.011 |
| 12 | 1.2000000000000002 | nonempty | 1970-01-01T00:00:00.012 |
| 13 | 1.3 | | 1970-01-01T00:00:00.013 |
| 14 | 1.4000000000000001 | nonempty | 1970-01-01T00:00:00.014 |
| 15 | 1.5 | | 1970-01-01T00:00:00.015 |
| 16 | 1.6 | nonempty | 1970-01-01T00:00:00.016 |
| 17 | 1.7000000000000002 | | 1970-01-01T00:00:00.017 |
| 18 | 1.8 | nonempty | 1970-01-01T00:00:00.018 |
| 19 | 1.9000000000000001 | | 1970-01-01T00:00:00.019 |
| 20 | 2.0 | nonempty | 1970-01-01T00:00:00.020 |
| 21 | 2.1 | | 1970-01-01T00:00:00.021 |
| 22 | 2.2 | nonempty | 1970-01-01T00:00:00.022 |
| 23 | 2.3000000000000003 | | 1970-01-01T00:00:00.023 |
| 24 | 2.4000000000000004 | nonempty | 1970-01-01T00:00:00.024 |
| 25 | 2.5 | | 1970-01-01T00:00:00.025 |
| 26 | 2.6 | nonempty | 1970-01-01T00:00:00.026 |
| 27 | 2.7 | | 1970-01-01T00:00:00.027 |
| 28 | 2.8000000000000003 | nonempty | 1970-01-01T00:00:00.028 |
| 29 | 2.9000000000000004 | | 1970-01-01T00:00:00.029 |
| 30 | 3.0 | nonempty | 1970-01-01T00:00:00.030 |
| 31 | 3.1 | | 1970-01-01T00:00:00.031 |
| 32 | 3.2 | nonempty | 1970-01-01T00:00:00.032 |
| 33 | 3.3000000000000003 | | 1970-01-01T00:00:00.033 |
| 34 | 3.4000000000000004 | nonempty | 1970-01-01T00:00:00.034 |
| 35 | 3.5 | | 1970-01-01T00:00:00.035 |
I printed this using
diff --git a/parquet/benches/arrow_reader_row_filter.rs b/parquet/benches/arrow_reader_row_filter.rs
index af07636e49..87e353f2c0 100644
--- a/parquet/benches/arrow_reader_row_filter.rs
+++ b/parquet/benches/arrow_reader_row_filter.rs
@@ -57,6 +57,7 @@ use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMas
use parquet::file::properties::WriterProperties;
use tokio::fs::File;
use tokio::runtime::Runtime;
+use arrow_cast::pretty::pretty_format_batches;
/// Create a RecordBatch with 100K rows and four columns.
fn make_record_batch() -> RecordBatch {
@@ -101,11 +102,17 @@ fn make_record_batch() -> RecordBatch {
),
]));
- RecordBatch::try_new(
+ let batch = RecordBatch::try_new(
schema,
vec![int_array, float_array, utf8_view_array, ts_array],
)
- .unwrap()
+ .unwrap();
+
+ // Verify the batch was created correctly
+ println!("Batch created with {} rows", num_rows);
+ println!("First 100 rows");
+ println!("{}", pretty_format_batches(&[batch.clone().slice(0, 100)]).unwrap());
+ batch
}
.as_any() | ||
.downcast_ref::<StringViewArray>() | ||
.unwrap(); | ||
let mut builder = BooleanBuilder::with_capacity(array.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the arrows here, like:
https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html
That will:
- Better model what real systems do
- Not be a bottleneck for evaluation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed it in latest PR, thank you @alamb !
#[derive(Clone)] | ||
enum FilterType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having the background about why these particular filters are chosen is important. I realize i did not do a good job of describing them on the ticket, but I will now work on some diagrams and descriptions to explain it better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would help here to add ASCII art for each filter pattern in the comments
I can make a PR to do so too. I was thinking we could just copy/paste from
#7363 (comment)
Thank you @alamb for review and good suggestion! I will address it soon. And meanwhile, the testing works well for main branch, but the testing seems stuck when change to the cache page branch. I am trying to debug it. I agree, and when we have a better testing, we can improve the code to make the performance better, i am looking forward to help it! |
THANK YOU! |
@zhuqi-lucas here is a proposed addition to this PR to improve the comments: Also after thinking about this more today, I think we should have 6 predicates that generate filters in particular patterns, as described here: #7363 (comment) I also have some smaller code suggestions I will leave inline Again, thank you for your work on this PR |
} | ||
|
||
/// Filter function: returns a BooleanArray with true when utf8View <> "". | ||
fn filter_utf8_view_nonempty(batch: &RecordBatch) -> BooleanArray { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these functions are specific to the FilterType
you could potentially add them as methods, like
impl FilterType {
fn filter_batch(&self, batch: &RecordBatch) -> BooleanArray {
match self {
Utf8ViewNonEmpty => {
// iimplement filter here
}
...
}
}
}
|
||
// Iterate by value (Copy is available for FilterType and fn pointers) | ||
for (filter_type, filter_fn) in filter_funcs.into_iter() { | ||
for proj_case in ["all_columns", "exclude_filter_column"].iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to make proj_case
an enum too (rather than a string) -- mostly so we can document the rationale for each of the cases more easily
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good point! Thank you @alamb !
BTW if you have a moment to review (and hopefully merge) zhuqi-lucas#1 into this PR I think it helps provide some additional backstory |
Update comments, add background for `arrow_reader_row_filter` benchmark
Thank you @alamb ! |
Updated the increment working result for now: Batch created with 100000 rows
First 100 rows:
+-------+--------------------+----------------------+-------------------------+
| int64 | float64 | utf8View | ts |
+-------+--------------------+----------------------+-------------------------+
| 52 | 75.29055829914246 | AU28OVEkn | 1970-01-01T00:00:00 |
| 54 | 42.00429538020429 | ykST6JiLOLnIvNdO8keC | 1970-01-01T00:00:00.001 |
| 63 | 39.47472888998955 | Uda3p | 1970-01-01T00:00:00.002 |
| 40 | 37.652320722764564 | WGHu0pjs | 1970-01-01T00:00:00.003 |
| 3 | 8.279850835910874 | a44w5jeFASYTFQgW3iV | 1970-01-01T00:00:00.004 |
| 41 | 99.68887981607988 | RAPQfwZ9m60dIbmH3h | 1970-01-01T00:00:00.005 |
| 73 | 15.642169170494281 | qzs0rF3m | 1970-01-01T00:00:00.006 |
| 84 | 3.0599704626508517 | 44slxrHHq3BuqNTpmq | 1970-01-01T00:00:00.007 |
| 13 | 47.50509938869523 | suZSLelrnv | 1970-01-01T00:00:00.008 |
| 0 | 46.94079610631825 | jfu782Q6x5KhgR | 1970-01-01T00:00:00.009 |
| 93 | 2.237369040094883 | b9ap7NjJCE9dJVq | 1970-01-01T00:00:00.010 |
| 50 | 30.5204368120215 | 7qi4MfzbMc07ejaCp | 1970-01-01T00:00:00.011 |
| 39 | 49.964436637304765 | | 1970-01-01T00:00:00.012 |
| 14 | 81.21243682393427 | Af7dd | 1970-01-01T00:00:00.013 |
| 52 | 84.10733431750074 | 19wgJtNafjeDTkwJ7NMe | 1970-01-01T00:00:00.014 |
| 21 | 18.775381989621764 | JEx | 1970-01-01T00:00:00.015 |
| 1 | 30.094453508629293 | | 1970-01-01T00:00:00.016 |
| 51 | 97.85547929153601 | | 1970-01-01T00:00:00.017 |
| 5 | 1.5650630448398983 | aEIoZISJCn | 1970-01-01T00:00:00.018 |
| 64 | 32.92594588933944 | | 1970-01-01T00:00:00.019 |
| 84 | 34.75713196487942 | | 1970-01-01T00:00:00.020 |
| 49 | 85.08277125229844 | t00JSmkxL | 1970-01-01T00:00:00.021 |
| 50 | 1.5595488657399459 | 126rqaeF | 1970-01-01T00:00:00.022 |
| 5 | 99.60261436398793 | | 1970-01-01T00:00:00.023 |
| 35 | 26.889462612385074 | jgn5nyjhiabkjIrBPouz | 1970-01-01T00:00:00.024 |
| 44 | 52.09203708588272 | WmElvJn0ASOAM3MXGoNP | 1970-01-01T00:00:00.025 |
| 79 | 24.244945328566693 | 0nOH6YkcqZH | 1970-01-01T00:00:00.026 |
| 20 | 80.4881869486388 | IV7YdFkJWGdf41CPH8 | 1970-01-01T00:00:00.027 |
| 40 | 21.993884216986824 | AMIvZh7cHebSbLGNj | 1970-01-01T00:00:00.028 |
| 65 | 41.93935741938668 | yAC9SmAsywBRHleHC | 1970-01-01T00:00:00.029 |
| 42 | 71.43136884411643 | | 1970-01-01T00:00:00.030 |
| 88 | 5.797096388791534 | NNPg5WeX8QCUp | 1970-01-01T00:00:00.031 |
| 92 | 56.88161001490073 | uU4mnzRaE6VSW7hYy5pe | 1970-01-01T00:00:00.032 |
| 86 | 39.601092437561846 | Yok4PyZLi2dCv4a4 | 1970-01-01T00:00:00.033 |
| 64 | 81.25356113009576 | bgXE9N8zVNVs4nsDW | 1970-01-01T00:00:00.034 |
| 22 | 39.31769325193022 | QMgL2EqTb8URjstEXzjI | 1970-01-01T00:00:00.035 |
| 57 | 14.376478306308215 | GFG8pIHrjJUjAhmUAV8i | 1970-01-01T00:00:00.036 |
| 39 | 9.888186822097422 | i4yUq23Aai9t3MSqN | 1970-01-01T00:00:00.037 |
| 6 | 34.72877797026843 | qSlyyDsHPsfRT | 1970-01-01T00:00:00.038 |
| 42 | 51.15632091421703 | reilXpQo5HPwgPLpGQ | 1970-01-01T00:00:00.039 |
| 98 | 32.11896139596986 | S0D | 1970-01-01T00:00:00.040 |
| 60 | 79.6209958377313 | HIEsFN6U | 1970-01-01T00:00:00.041 |
| 45 | 11.777310743492041 | XsA8CEmXJVLnikMPpX | 1970-01-01T00:00:00.042 |
| 2 | 93.48072533355275 | cOeWn8pNF6olMKt | 1970-01-01T00:00:00.043 |
| 15 | 49.255154886306116 | 9MHk0QPDutL | 1970-01-01T00:00:00.044 |
| 38 | 39.65521098625906 | J8jmjXDMsUn4sJEOzArx | 1970-01-01T00:00:00.045 |
| 49 | 55.818267296909866 | Toy | 1970-01-01T00:00:00.046 |
| 96 | 6.53810475249561 | SrpurjQU3 | 1970-01-01T00:00:00.047 |
| 85 | 28.657278445635946 | oUEkNgu87LFUoYZZ4Oj | 1970-01-01T00:00:00.048 |
| 54 | 46.751290803153786 | zqr5ztbjsrIQ9 | 1970-01-01T00:00:00.049 |
| 76 | 16.949201307322646 | | 1970-01-01T00:00:00.050 |
| 81 | 74.82959959310287 | | 1970-01-01T00:00:00.051 |
| 57 | 21.12272740796881 | nsN15c34 | 1970-01-01T00:00:00.052 |
| 23 | 35.90262404388276 | const | 1970-01-01T00:00:00.053 |
| 38 | 22.36390240284072 | const | 1970-01-01T00:00:00.054 |
| 39 | 12.906486704120844 | mHVJXj5m7PlATtrD5 | 1970-01-01T00:00:00.055 |
| 22 | 61.04861907542707 | 0dbwS1Z9qZZQx | 1970-01-01T00:00:00.056 |
| 24 | 60.19047454105533 | const | 1970-01-01T00:00:00.057 |
| 63 | 37.87083161838445 | BuuZv | 1970-01-01T00:00:00.058 |
| 47 | 8.917416824543523 | v0cvTOySrSMTj | 1970-01-01T00:00:00.059 |
| 96 | 37.88542707039231 | OtOEY8t7wGAdK | 1970-01-01T00:00:00.060 |
| 16 | 95.8425578109825 | n3uHoHZSF | 1970-01-01T00:00:00.061 |
| 46 | 34.596806837946524 | auALuYSQ | 1970-01-01T00:00:00.062 |
| 53 | 77.44362478760178 | pOOG3GvBXz68OG | 1970-01-01T00:00:00.063 |
| 11 | 61.264664050060944 | const | 1970-01-01T00:00:00.064 |
| 32 | 76.60073517949739 | 02S9yDBbL9CP8SgDD78z | 1970-01-01T00:00:00.065 |
| 24 | 64.7027198658178 | | 1970-01-01T00:00:00.066 |
| 56 | 37.41292582697877 | const | 1970-01-01T00:00:00.067 |
| 68 | 6.947916965143208 | 9chF7kpKt2s5ZnzeH | 1970-01-01T00:00:00.068 |
| 5 | 74.12254766001676 | oatmPsrNP6Ztcdq2j | 1970-01-01T00:00:00.069 |
| 47 | 3.083198152446931 | 50vNsgPNlVL6u8CidY3z | 1970-01-01T00:00:00.070 |
| 61 | 47.3781774520466 | XjfZ | 1970-01-01T00:00:00.071 |
| 87 | 33.539490632676674 | SLr | 1970-01-01T00:00:00.072 |
| 56 | 64.6233624278942 | L4ebg0lii | 1970-01-01T00:00:00.073 |
| 48 | 58.87439054561894 | | 1970-01-01T00:00:00.074 |
| 44 | 63.88560113126003 | wwKj83rD | 1970-01-01T00:00:00.075 |
| 92 | 87.68921556395925 | 0RNzF0X1FhdEYC | 1970-01-01T00:00:00.076 |
| 4 | 70.75339996354626 | Vc5W | 1970-01-01T00:00:00.077 |
| 48 | 27.855269069373144 | 9dbRf6MmNU31Ps6g | 1970-01-01T00:00:00.078 |
| 75 | 49.17098678115342 | QQkeS2czm6UGYD | 1970-01-01T00:00:00.079 |
| 32 | 50.032024521105114 | FSlQohoBW | 1970-01-01T00:00:00.080 |
| 34 | 54.533046655679286 | | 1970-01-01T00:00:00.081 |
| 38 | 22.985685534257104 | lddH4rXZ2ru8utRf | 1970-01-01T00:00:00.082 |
| 72 | 28.85573899982439 | hqr8iV9rj1 | 1970-01-01T00:00:00.083 |
| 68 | 64.85335642795329 | const | 1970-01-01T00:00:00.084 |
| 57 | 72.72457035820736 | kwNfoP7CJ2oPrlIw60 | 1970-01-01T00:00:00.085 |
| 7 | 93.95090254075085 | NIRwKTAjNP3NqV | 1970-01-01T00:00:00.086 |
| 39 | 30.231147314466543 | aX5LV8z92TKXJl | 1970-01-01T00:00:00.087 |
| 71 | 85.33817736512638 | IWJ59XjQ | 1970-01-01T00:00:00.088 |
| 93 | 88.33878286035095 | yHyVAkgWbCE | 1970-01-01T00:00:00.089 |
| 8 | 12.36187647918956 | S2LjZXDjIU | 1970-01-01T00:00:00.090 |
| 84 | 9.116452532476792 | EmLfpBDV9NGGrpiws | 1970-01-01T00:00:00.091 |
| 57 | 6.384645798196109 | rSV0fxSmguGG7d | 1970-01-01T00:00:00.092 |
| 78 | 31.432748390699473 | o8bodsMjMzr | 1970-01-01T00:00:00.093 |
| 76 | 5.538006671057039 | MatvWB22 | 1970-01-01T00:00:00.094 |
| 32 | 28.516358395709407 | iJepntbG5Vak1oZjIteI | 1970-01-01T00:00:00.095 |
| 16 | 96.84918967737681 | Zyvw | 1970-01-01T00:00:00.096 |
| 79 | 4.029872840673621 | qX7FaPZ05Xe11Z5 | 1970-01-01T00:00:00.097 |
| 87 | 24.648767055465882 | xF0yet | 1970-01-01T00:00:00.098 |
| 31 | 48.07113837488295 | mDM9LXJ | 1970-01-01T00:00:00.099 |
+-------+--------------------+----------------------+-------------------------+ End of today, tomorrow plan:
|
FWIW I think I saw something similar when I was testing the branch in datafusion -- so in other words I think it is a real bug |
Thanks again @zhuqi-lucas -- pleasant dreams! |
I found the deadlock happen in the following code for page cache branch: cc @alamb @XiangpengHao
while total_records_read < max_records && self.has_next()? {
}
#[inline]
pub(crate) fn has_next(&mut self) -> Result<bool> {
if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values {
// TODO: should we return false if read_new_page() = true and
// num_buffered_values = 0?
println!("num_buffered_values: {}, num_decoded_values: {}", self.num_buffered_values, self.num_decoded_values);
if !self.read_new_page()? {
Ok(false)
} else {
Ok(self.num_buffered_values != 0)
}
} else {
Ok(true)
}
}
/// Reads a new page and set up the decoders for levels, values or dictionary.
/// Returns false if there's no page left.
fn read_new_page(&mut self) -> Result<bool> {
println!("GenericColumnReader read_new_page");
loop {
match self.page_reader.get_next_page()? {
// No more page to read
None => return Ok(false),
Some(current_page) => {
//println!("GenericColumnReader read_new_page current_page: {:?}", current_page.page_type());
match current_page {
// 1. Dictionary page: configure dictionary for this page.
Page::DictionaryPage {
buf,
num_values,
encoding,
is_sorted,
} => {
self.values_decoder
.set_dict(buf, num_values, encoding, is_sorted)?;
continue;
}
}
}
}
impl<R: ChunkReader> PageReader for CachedPageReader<R> {
fn get_next_page(&mut self) -> Result<Option<Page>, ParquetError> {
//println!("CachedPageReader get next page");
let next_page_offset = self.inner.peek_next_page_offset()?;
//println!("CachedPageReader next page offset: {:?}", next_page_offset);
let Some(offset) = next_page_offset else {
return Ok(None);
};
let mut cache = self.cache.get();
let page = cache.get_page(self.col_id, offset);
if let Some(page) = page {
self.inner.skip_next_page()?;
//println!("CachedPageReader skip next page");
Ok(Some(page))
} else {
//println!("CachedPageReader insert page");
let inner_page = self.inner.get_next_page()?;
let Some(inner_page) = inner_page else {
return Ok(None);
};
cache.insert_page(self.col_id, offset, inner_page.clone());
Ok(Some(inner_page))
}
} |
I found the bug is caused by the depending usage for skip page for the cache page branch, it will not skip dict page so the has_next will always in the loop, i created the PR try to fix the issue: And why main branch did not trigger it, because currently the skip dict page usage are not triggering besides the benchmark for cache page logic. |
Benchmark Result compare after the fix: group better_decode main
----- ------------- ----
arrow_reader_row_filter/filter: 1% Unclustered Filter proj: all_columns/ 1.07 7.7±0.09ms ? ?/sec 1.00 7.2±0.13ms ? ?/sec
arrow_reader_row_filter/filter: 1% Unclustered Filter proj: exclude_filter_column/ 1.08 7.0±0.08ms ? ?/sec 1.00 6.5±0.10ms ? ?/sec
arrow_reader_row_filter/filter: 10% Clustered Filter proj: all_columns/ 1.12 6.8±0.15ms ? ?/sec 1.00 6.0±0.07ms ? ?/sec
arrow_reader_row_filter/filter: 10% Clustered Filter proj: exclude_filter_column/ 1.12 6.0±0.08ms ? ?/sec 1.00 5.4±0.07ms ? ?/sec
arrow_reader_row_filter/filter: 10% Unclustered Filter proj: all_columns/ 1.06 14.5±0.13ms ? ?/sec 1.00 13.7±0.20ms ? ?/sec
arrow_reader_row_filter/filter: 10% Unclustered Filter proj: exclude_filter_column/ 1.05 13.2±0.08ms ? ?/sec 1.00 12.6±0.22ms ? ?/sec
arrow_reader_row_filter/filter: 90% Clustered Filter proj: all_columns/ 1.10 9.8±0.32ms ? ?/sec 1.00 9.0±0.10ms ? ?/sec
arrow_reader_row_filter/filter: 90% Clustered Filter proj: exclude_filter_column/ 1.07 9.4±0.09ms ? ?/sec 1.00 8.8±0.10ms ? ?/sec
arrow_reader_row_filter/filter: 99% Unclustered Filter proj: all_columns/ 1.09 12.0±0.07ms ? ?/sec 1.00 11.0±0.11ms ? ?/sec
arrow_reader_row_filter/filter: 99% Unclustered Filter proj: exclude_filter_column/ 1.09 11.7±0.09ms ? ?/sec 1.00 10.7±0.09ms ? ?/sec
arrow_reader_row_filter/filter: Point Lookup proj: all_columns/ 1.27 6.5±0.10ms ? ?/sec 1.00 5.1±0.21ms ? ?/sec
arrow_reader_row_filter/filter: Point Lookup proj: exclude_filter_column/ 1.31 5.9±0.07ms ? ?/sec 1.00 4.5±0.06ms ? ?/sec
arrow_reader_row_filter/filter: float64 > 50.0 proj: all_columns/ 1.02 26.6±0.21ms ? ?/sec 1.00 26.0±0.23ms ? ?/sec
arrow_reader_row_filter/filter: float64 > 50.0 proj: exclude_filter_column/ 1.04 23.8±0.15ms ? ?/sec 1.00 22.8±0.17ms ? ?/sec
arrow_reader_row_filter/filter: int64 > 0 proj: all_columns/ 1.05 10.7±0.12ms ? ?/sec 1.00 10.1±0.18ms ? ?/sec
arrow_reader_row_filter/filter: int64 > 0 proj: exclude_filter_column/ 1.07 10.4±0.11ms ? ?/sec 1.00 9.7±0.10ms ? ?/sec
arrow_reader_row_filter/filter: ts > 50_000 proj: all_columns/ 1.12 7.4±0.05ms ? ?/sec 1.00 6.6±0.08ms ? ?/sec
arrow_reader_row_filter/filter: ts > 50_000 proj: exclude_filter_column/ 1.13 7.3±0.05ms ? ?/sec 1.00 6.5±0.08ms ? ?/sec
arrow_reader_row_filter/filter: utf8View <> '' proj: all_columns/ 1.00 22.9±0.21ms ? ?/sec 1.03 23.5±0.70ms ? ?/sec
arrow_reader_row_filter/filter: utf8View <> '' proj: exclude_filter_column/ 1.01 20.6±0.15ms ? ?/sec 1.00 20.3±0.17ms ? ?/sec
arrow_reader_row_filter/filter: utf8View = 'const' proj: all_columns/ 1.04 10.3±0.21ms ? ?/sec 1.00 9.9±0.10ms ? ?/sec
arrow_reader_row_filter/filter: utf8View = 'const' proj: exclude_filter_column/ 1.04 9.4±0.11ms ? ?/sec 1.00 9.0±0.09ms ? ?/sec
arrow_reader_row_filter/filter_case: int64 = 0 project_case: all_columns/ 1.00 821.4±37.47µs ? ?/sec 1.00 820.8±12.99µs ? ?/sec
arrow_reader_row_filter/filter_case: int64 = 0 project_case: exclude_filter_column/ 1.00 754.9±29.52µs ? ?/sec 1.03 779.4±8.15µs ? ?/sec
arrow_reader_row_filter/filter_case: int64 even project_case: all_columns/ 1.01 18.7±0.77ms ? ?/sec 1.00 18.6±0.37ms ? ?/sec
arrow_reader_row_filter/filter_case: int64 even project_case: exclude_filter_column/ 1.00 13.8±0.60ms ? ?/sec 1.00 13.8±0.11ms ? ?/sec
arrow_reader_row_filter/filter_case: ts > 50_000 project_case: all_columns/ 1.12 1288.7±69.60µs ? ?/sec 1.00 1148.8±29.81µs ? ?/sec
arrow_reader_row_filter/filter_case: ts > 50_000 project_case: exclude_filter_column/ 1.08 1142.3±71.12µs ? ?/sec 1.00 1054.0±16.71µs ? ?/sec
arrow_reader_row_filter/filter_case: utf8View <> '' project_case: all_columns/ 1.00 17.7±0.14ms ? ?/sec 1.05 18.6±0.15ms ? ?/sec
arrow_reader_row_filter/filter_case: utf8View <> '' project_case: exclude_filter_column/ 1.00 14.8±0.23ms ? ?/sec 1.05 15.6±0.12ms ? ?/sec
arrow_reader_row_filter/filter_case: utf8View = 'const' project_case: all_columns/ 1.00 921.0±43.84µs ? ?/sec 1.19 1093.5±19.65µs ? ?/sec
arrow_reader_row_filter/filter_case: utf8View = 'const' project_case: exclude_filter_column/ 1.00 887.9±47.56µs ? ?/sec 1.22 1080.5±15.66µs ? ?/sec
composite_filter/Composite Filter: sel AND mod_clustered/ 1.10 7.5±0.12ms ? ?/sec 1.00 6.8±0.12ms ? ?/sec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas -- I am sorry for the back and forth on this PR but I think once we have this benchmark sorted out making the filter pushdown performance better will be quite easy.
I have a few more comments on the structure of this PR -- notably I think we should reduce the number of filters / columns. I am happy to make these changes in the PR myself too, but I wanted to ask you first.
//! The benchmark creates an in-memory Parquet file with 100K rows and ten columns. | ||
//! The first four columns are: | ||
//! - int64: random integers (range: 0..100) generated with a fixed seed. | ||
//! - float64: random floating-point values (range: 0.0..100.0) generated with a fixed seed. | ||
//! - utf8View: random strings with some empty values and occasional constant "const" values. | ||
//! - ts: sequential timestamps in milliseconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @zhuqi-lucas - this is looking really nice.
I am worried about a few things:
- The overlap / duplication of the first four column and original predicates: there is duplication across the cases with the specific columns
- The over representation of
StringView
-- in the benchmark now there are 7 StringView columns. I think that will skew the benchmark results much more heavily towards string columns
In order to resove this, I suggest we try and keep the four original columns and pick predicates that implement the filter patterns in terms of that data
For example, the point
lookup filter can can be implemented as picking a single value from the int64
column rather than creating an entirely new column
The "unsel_clustered" could be modeled as a predicate on the ts
column (would have to update ts column to be values 0...10k, 1...10k, etc - then the predicate ts >= 9000
would generate the correct pattern I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point @alamb , i will try to address soon.
#[derive(Clone)] | ||
enum FilterType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would help here to add ASCII art for each filter pattern in the comments
I can make a PR to do so too. I was thinking we could just copy/paste from
#7363 (comment)
Utf8ViewNonEmpty, | ||
Utf8ViewConst, | ||
Int64GTZero, | ||
Float64GTHalf, | ||
TimestampGt, | ||
PointLookup, | ||
SelectiveUnclustered, | ||
ModeratelySelectiveClustered, | ||
ModeratelySelectiveUnclustered, | ||
UnselectiveUnclustered, | ||
UnselectiveClustered, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned above, I think we could/should reduce this to only 6 filter patterns (rather than 10)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the 6 filter patterns in latest PR.
Also add 2 additional case, the compose case and the Utf8View case because i don't use the Utf8View column for above 6 filter.
/// This benchmark creates a composite row filter that ANDs two predicates: | ||
/// one on the "sel" column (exactly 1% selected) and one on the "mod_clustered" column | ||
/// (first 10 rows in each 10K block), then measures the performance of the combined filtering. | ||
fn benchmark_composite_filters(c: &mut Criterion) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this -- I wonder is there any reason this can't be modeled as another FilterType
enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good suggestion!
]; | ||
let mut group = c.benchmark_group("arrow_reader_row_filter"); | ||
|
||
for filter_type in filter_types.clone() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also test the SyncReader (which is all the more reason to reduce the number of FilterTypes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb for review and good suggestion, the sync reader is not supported by page cache now, may be we can file a ticket to follow-up.
Thank you @alamb for patient review and good suggestion! Addressed it in latest PR. And feel free to open your PR to my branch if i am missing anything, i am happy with it, thanks! |
It looks like i forget to set the compression when writing parquet file. It may cause the result don't show performance improvement for page cache. |
Still can't see performance improvement, need to investigate... critcmp better_decode main
group better_decode main
----- ------------- ----
arrow_reader_row_filter/filter: float64 <= 99.0 proj: all_columns/ 1.02 2.6±0.03ms ? ?/sec 1.00 2.5±0.20ms ? ?/sec
arrow_reader_row_filter/filter: float64 <= 99.0 proj: exclude_filter_column/ 1.10 2.5±0.17ms ? ?/sec 1.00 2.3±0.02ms ? ?/sec
arrow_reader_row_filter/filter: float64 > 99.0 AND ts >= 9000 proj: all_columns/ 1.06 2.3±0.03ms ? ?/sec 1.00 2.2±0.04ms ? ?/sec
arrow_reader_row_filter/filter: float64 > 99.0 AND ts >= 9000 proj: exclude_filter_column/ 1.09 2.3±0.03ms ? ?/sec 1.00 2.1±0.15ms ? ?/sec
arrow_reader_row_filter/filter: float64 > 99.0 proj: all_columns/ 1.02 2.6±0.03ms ? ?/sec 1.00 2.5±0.16ms ? ?/sec
arrow_reader_row_filter/filter: float64 > 99.0 proj: exclude_filter_column/ 1.14 2.6±0.27ms ? ?/sec 1.00 2.2±0.02ms ? ?/sec
arrow_reader_row_filter/filter: int64 == 9999 proj: all_columns/ 1.31 2.1±0.04ms ? ?/sec 1.00 1571.4±48.50µs ? ?/sec
arrow_reader_row_filter/filter: int64 == 9999 proj: exclude_filter_column/ 1.26 2.0±0.02ms ? ?/sec 1.00 1607.4±135.41µs ? ?/sec
arrow_reader_row_filter/filter: int64 > 90 proj: all_columns/ 1.06 5.1±0.24ms ? ?/sec 1.00 4.8±0.04ms ? ?/sec
arrow_reader_row_filter/filter: int64 > 90 proj: exclude_filter_column/ 1.05 4.4±0.04ms ? ?/sec 1.00 4.2±0.06ms ? ?/sec
arrow_reader_row_filter/filter: ts < 9000 proj: all_columns/ 1.07 2.8±0.18ms ? ?/sec 1.00 2.6±0.04ms ? ?/sec
arrow_reader_row_filter/filter: ts < 9000 proj: exclude_filter_column/ 1.05 2.6±0.03ms ? ?/sec 1.00 2.5±0.15ms ? ?/sec
arrow_reader_row_filter/filter: ts >= 9000 proj: all_columns/ 1.04 2.2±0.07ms ? ?/sec 1.00 2.1±0.02ms ? ?/sec
arrow_reader_row_filter/filter: ts >= 9000 proj: exclude_filter_column/ 1.07 2.2±0.11ms ? ?/sec 1.00 2.0±0.15ms ? ?/sec
arrow_reader_row_filter/filter: utf8View <> '' proj: all_columns/ 1.00 10.5±0.13ms ? ?/sec 1.03 10.8±0.43ms ? ?/sec
arrow_reader_row_filter/filter: utf8View <> '' proj: exclude_filter_column/ 1.03 8.1±0.31ms ? ?/sec 1.00 7.9±0.12ms ? ?/sec |
Thanks @zhuqi-lucas -- I took the liberty of pushing several commits directly to this branch. I tried to keep them independent so you can see what I changed: use in memory buffer to avoid file IO duirng benchmarking, updated docs, some refactoring I also added benchmarks for the sync reader as well |
I also verified the filter patterns like this: Patch
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 8bbe175daf..11ceaed569 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -976,7 +976,9 @@ pub(crate) fn evaluate_predicate(
input_selection: Option<RowSelection>,
predicate: &mut dyn ArrowPredicate,
) -> Result<RowSelection> {
+ println!("Evaluating predicate, batch_size: {batch_size}, input_selection: {:?}", input_selection);
let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
+ let mut total_input_rows = 0;
let mut filters = vec![];
for maybe_batch in reader {
let maybe_batch = maybe_batch?;
@@ -993,9 +995,15 @@ pub(crate) fn evaluate_predicate(
0 => filters.push(filter),
_ => filters.push(prep_null_mask_filter(&filter)),
};
+ total_input_rows += input_rows;
}
let raw = RowSelection::from_filters(&filters);
+ let selected_rows = raw.row_count();
+ let num_selections = raw.iter().count();
+ let selectivity = 100.0* (selected_rows as f64 / total_input_rows as f64);
+ println!(" Selected {selected_rows} rows in {num_selections} selections ({selectivity:.3}%)", );
+ println!(" RowSelection: {}", raw);
Ok(match input_selection {
Some(selection) => selection.and_then(&raw),
None => raw,
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index c53d47be2e..475b06315d 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -19,6 +19,7 @@ use arrow_array::{Array, BooleanArray};
use arrow_select::filter::SlicesIterator;
use std::cmp::Ordering;
use std::collections::VecDeque;
+use std::fmt::{Display, Formatter};
use std::ops::Range;
/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
@@ -32,6 +33,16 @@ pub struct RowSelector {
pub skip: bool,
}
+impl Display for RowSelector {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ if self.skip {
+ write!(f, "skip({})", self.row_count)
+ } else {
+ write!(f, "select({})", self.row_count)
+ }
+ }
+}
+
impl RowSelector {
/// Select `row_count` rows
pub fn select(row_count: usize) -> Self {
@@ -101,6 +112,22 @@ pub struct RowSelection {
selectors: Vec<RowSelector>,
}
+/// Prints a human understandable representation of the RowSelection
+impl Display for RowSelection {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "[")?;
+ let mut selectors = self.selectors.iter();
+
+ if let Some(first) = selectors.next() {
+ write!(f, "{}", first)?;
+ for selector in selectors {
+ write!(f, " {}", selector)?;
+ }
+ }
+ write!(f, "]")
+ }
+}
+
impl RowSelection {
/// Creates a [`RowSelection`] from a slice of [`BooleanArray`]
/// |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @zhuqi-lucas -- I verified that the selection patterns match what we is in the design and i think that between us this benchmark is looking quite good!
Onwards!
And still no performance improvement compare the page cache PR to main branch. I am confused why datafusion benchmark will be improved but the benchmark here will not show performance improvement for the page cache branch. |
I agree -- let's figure it out. |
This is great work, thank you @zhuqi-lucas
I plan to take a closer look at this as well. Sorry I was occupied by other stuff recently. |
Thank you @alamb @XiangpengHao for double checking! |
Which issue does this PR close?
Add benchmark for parquet reader with row_filter and project settings
Rationale for this change
Add benchmark for parquet reader with row_filter and project settings
See details:
#7363 (comment)
What changes are included in this PR?
Add benchmark for parquet reader with row_filter and project settings
Are there any user-facing changes?
No