-
Notifications
You must be signed in to change notification settings - Fork 1.5k
POC: Test DataFusion with experimental Parquet Filter Pushdown (try 4) #16711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🤖 |
I'm taking a look at the test failures.. |
Thank you @alamb @XiangpengHao , I believe we need to compare the performance apache/arrow-rs#7850 with #16690 instead of the main branch. Because #16690 will gain improvement from apache/arrow-rs#7802. So we may:
Or we can directly use script to compare two branch? |
I believe the bugs are fixed in 2cf1a8f82f722e1c7e4857d7b07ba726f67d9f2f Can you @alamb point to that commit and try the benchmark again? I believe some tests will still fail but because we make filter pushdown to be true by default, which breaks some testing assumptions. |
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
My analysis of these results are very consistent with my last attempt at caching filter results The biggest slow downs are in Q30, Q31
I am fairly sure this is due to the overhad of RowSelection (these queries select many small selections). I started analyzing them here: #16562 (comment) So TLDR is I think the caching approach is good. but to avoid some queries getting slower we will need to improve the RowSelection representation too. I will try and think about this / whip up some POC hopefully over the next few days |
The adaptive selection will help Q30 and Q31 from previous PR result: |
@zhuqi-lucas my largest concern about all the previous adaptive row selection work was the software engineering / keeping the complexity under control. I have been dreaming about this -- I am thinking it might be time to create an internal RowSelection representation like enum InternalRowSelector {
/// Skip the next n rows
Skip(usize),
/// Decode and output the next n rows
Decode(usize),
/// Decode the next filter.len() rows and apply the filter before outputting the rows
DecodeAndFilter(Arc<BooleanArray>),
} Then the actual RecordReader would take a Vec` or something To stage / keep things of reasonable complexity we could make one PR that refactors to enum InternalRowSelector {
/// Skip the next n rows
Skip(usize),
/// Decode and output the next n rows
Decode(usize),
// NO DecodeAndFilter variant
} And then add the DecodeAndFilter as a follow on PR Maybe I'll try and make a PR |
One thing that I think has caused us problems is judging any improvements to pushdown based on not regressing performance when pushdown is enabled vs not. However, this makes making incremental progress really hard. What I think we should start doing is compare any proposed improvements to pushdown when pushdown is already on. In other words, let's make a benchmark that already has filter pushdown on. I'll make a PR for this new benchmark later today |
Thank you @alamb , i agree, even the adaptive selection ratio is experimented by random selection in my PR, i can't find a good way to do it clearly. |
🤖 |
Great point @alamb , i totally agree this, we can first improve the pushdown itself instead of comparing to not pushdown because we currently not default pushdown until now. I guess this PR and #16562 will improve it. |
I actually found a seemingly good one here: https://github.com/apache/datafusion/blob/3ca09a642dac266dfdbf7f57d2a5af82a9c77436/benchmarks/bench.sh#L117-L116 bench.sh run parquet I started it running and will see what happens. I need to do some other non parquet stuff for a few hours. Will be back |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
Ok, I made a PR to add a filter_pushdown benchmark. Once I get that merged I can test this branch using that.
|
I ran the new clickbench_pushdown benchmark and TLDR is the new pushdown decoder look like they make a measurable difference 🎉 Thus I think we should proceed trying to get apache/arrow-rs#7850 merged. My next step will be to analyze some of the queries that seem to have gotten slwer (like Q19) and see if I can reproduce it / find any thing to improve
|
Good news! |
Great! I plan to take another look in a few days (being occupied by other stuff recently), I have a few ideas to improve it more |
Great -- I will also try and look at it more closely later today / tomorrow when I have some time on airplanes |
18763fc
to
a13fd45
Compare
🤖 |
🤖: Benchmark completed Details
|
I looked into this failure running clickbench:
I ran the ]s$ ~/Downloads/datafusion-cli-alamb_test_pushdown -f ~/Software/datafusion/benchmarks/queries/clickbench/queries/q27.sql
DataFusion CLI v48.0.0
thread 'tokio-runtime-worker' panicked at /Users/andrewlamb/.cargo/git/checkouts/arrow-rs-583cca34693b79b8/4d24172/parquet/src/arrow/array_reader/cached_array_reader.rs:118:13:
assertion `left == right` failed
left: 319484
right: 319488
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Join Error
caused by
External error: task 441 panicked with message "assertion `left == right` failed\n left: 319484\n right: 319488" Update: I ran with a debug build and got this stack trace: RUST_BACKTRACE=1 cargo run -p datafusion-cli -- -f ~/Downloads/q27.sql
...
thread 'tokio-runtime-worker' panicked at /Users/andrewlamb/Software/arrow-rs/parquet/src/arrow/array_reader/cached_array_reader.rs:215:24:
attempt to subtract with overflow
stack backtrace:
0: __rustc::rust_begin_unwind
at /rustc/6b00bc3880198600130e1cf62b8f8a93494488cc/library/std/src/panicking.rs:697:5
1: core::panicking::panic_fmt
at /rustc/6b00bc3880198600130e1cf62b8f8a93494488cc/library/core/src/panicking.rs:75:14
2: core::panicking::panic_const::panic_const_sub_overflow
at /rustc/6b00bc3880198600130e1cf62b8f8a93494488cc/library/core/src/panicking.rs:175:17
3: <parquet::arrow::array_reader::cached_array_reader::CachedArrayReader as parquet::arrow::array_reader::ArrayReader>::read_records
at /Users/andrewlamb/Software/arrow-rs/parquet/src/arrow/array_reader/cached_array_reader.rs:215:24
4: <parquet::arrow::array_reader::struct_array::StructArrayReader as parquet::arrow::array_reader::ArrayReader>::read_records
at /Users/andrewlamb/Software/arrow-rs/parquet/src/arrow/array_reader/struct_array.rs:68:30
5: parquet::arrow::arrow_reader::ParquetRecordBatchReader::next_inner
at /Users/andrewlamb/Software/arrow-rs/parquet/src/arrow/arrow_reader/mod.rs:893:27
6: <parquet::arrow::arrow_reader::ParquetRecordBatchReader as core::iter::traits::iterator::Iterator>::next
at /Users/andrewlamb/Software/arrow-rs/parquet/src/arrow/arrow_reader/mod.rs:844:9
7: <parquet::arrow::async_reader::ParquetRecordBatchStream<T> as futures_core::stream::Stream>::poll_next
at /Users/andrewlamb/Software/arrow-rs/parquet/src/arrow/async_reader/mod.rs:872:62
8: <S as futures_core::stream::TryStream>::try_poll_next
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:206:9
9: <futures_util::stream::try_stream::into_stream::IntoStream<St> as futures_core::stream::Stream>::poll_next
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/try_stream/into_stream.rs:38:9
10: <futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_next
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/stream/map.rs:58:26
11: <futures_util::stream::try_stream::MapErr<St,F> as futures_core::stream::Stream>::poll_next
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/lib.rs:97:13
12: <futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_next
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/stream/map.rs:58:26
13: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:130:9
14: futures_util::stream::stream::StreamExt::poll_next_unpin
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/stream/mod.rs:1638:9
15: datafusion_datasource::file_stream::FileStream::poll_inner
at ./datafusion/datasource/src/file_stream.rs:221:34
16: <datafusion_datasource::file_stream::FileStream as futures_core::stream::Stream>::poll_next
at ./datafusion/datasource/src/file_stream.rs:334:22
17: futures_util::stream::stream::StreamExt::poll_next_unpin
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/stream/mod.rs:1638:9
18: <datafusion_physical_plan::coop::CooperativeStream<T> as futures_core::stream::Stream>::poll_next
at ./datafusion/physical-plan/src/coop.rs:160:25
19: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:130:9
20: futures_util::stream::stream::StreamExt::poll_next_unpin
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/stream/mod.rs:1638:9
21: <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next
at ./datafusion/physical-plan/src/aggregates/row_hash.rs:655:34
22: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-core-0.3.31/src/stream.rs:130:9
23: futures_util::stream::stream::StreamExt::poll_next_unpin
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/stream/mod.rs:1638:9
24: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
at /Users/andrewlamb/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.31/src/stream/stream/next.rs:32:9
25: datafusion_physical_plan::repartition::RepartitionExec::pull_from_input::{{closure}}
at ./datafusion/physical-plan/src/repartition/mod.rs:939:40
26: datafusion_common_runtime::trace_utils::trace_future::{{closure}}
at ./datafusion/common-runtime/src/trace_utils.rs:137:29
27: <core::pin::Pin<P> as core::future::future::Future>::poll
at /Users/andrewlamb/.rustup/toolchains/1.88.0-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/future.rs:124:9
... |
Which issue does this PR close?
related to
filter_pushdown
) by default #3463This is a PR to test the next generation parquet pushdown:
Builds on
It forces filter_pushdown on by default and pins to