-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add dynamic pruning filters from TopK state #15301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Made a bit of progress on this... I think the general idea of sharing the state is there. The nice thing is that this mechanism can be used to push down other dynamic filters (joins, etc.). What I'm having trouble with a bit is the wiring... I need to think of:
Maybe that's the wrong approach... maybe this should be a method on |
Inspired by discussion in #13054 I went with adding this to |
Tomorrow I plan on doing some tracer bullet testing to see if this approach works at all. |
cc @alamb |
This is really cool! I have a high-level question: |
I think this is just part of the picture. To fully match DuckDB we'd have to do something like the rewrite proposed in #15177 (comment) aka "late materialization" of the projection. |
Ok did a tracer bullet test with |
To expand on this: what I implemented here is just "dump" filter pushdown. To make a query like
|
let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| { | ||
Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b)) | ||
}); | ||
// TODO: need to recalculate page and row group pruning predicates to include dynamic filters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb marking this as ready for an initial review. There's still a lot of work to be done I guess (I'd like to see Q23 results!) but I'd like to get some feedback on the approach and missing pieces first. There's a working implementation, including a test; more are needed. |
fn should_enable_page_index( | ||
enable_page_index: bool, | ||
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>, | ||
has_dynamic_filters: bool, | ||
) -> bool { | ||
enable_page_index | ||
&& page_pruning_predicate.is_some() | ||
&& page_pruning_predicate | ||
.as_ref() | ||
.map(|p| p.filter_number() > 0) | ||
.unwrap_or(false) | ||
&& (page_pruning_predicate.is_some() | ||
&& page_pruning_predicate | ||
.as_ref() | ||
.map(|p| p.filter_number() > 0) | ||
.unwrap_or(false)) | ||
|| has_dynamic_filters | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from #15057
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment would be helpful to explain why the presence of dynamic filters should trigger page index enablement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut pruning_predicate = pruning_predicate; | ||
let mut page_pruning_predicate = page_pruning_predicate; | ||
|
||
if let Some(predicate) = predicate.as_ref() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to #15057
|
||
/// Build a page pruning predicate from an optional predicate expression. | ||
/// If the predicate is None or the predicate cannot be converted to a page pruning | ||
/// predicate, return None. | ||
pub(crate) fn build_page_pruning_predicate( | ||
predicate: &Arc<dyn PhysicalExpr>, | ||
file_schema: &SchemaRef, | ||
) -> Arc<PagePruningAccessPlanFilter> { | ||
Arc::new(PagePruningAccessPlanFilter::new( | ||
predicate, | ||
Arc::clone(file_schema), | ||
)) | ||
} | ||
|
||
/// A vistor for a PhysicalExpr that collects all column references to determine what columns the expression needs to be evaluated. | ||
struct FilterSchemaBuilder<'schema> { | ||
filter_schema_fields: BTreeSet<Arc<Field>>, | ||
file_schema: &'schema Schema, | ||
table_schema: &'schema Schema, | ||
} | ||
|
||
impl<'schema> FilterSchemaBuilder<'schema> { | ||
fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> Self { | ||
Self { | ||
filter_schema_fields: BTreeSet::new(), | ||
file_schema, | ||
table_schema, | ||
} | ||
} | ||
|
||
fn sort_fields( | ||
fields: &mut Vec<Arc<Field>>, | ||
table_schema: &Schema, | ||
file_schema: &Schema, | ||
) { | ||
fields.sort_by_key(|f| f.name().to_string()); | ||
fields.dedup_by_key(|f| f.name().to_string()); | ||
fields.sort_by_key(|f| { | ||
let table_schema_index = | ||
table_schema.index_of(f.name()).unwrap_or(usize::MAX); | ||
let file_schema_index = file_schema.index_of(f.name()).unwrap_or(usize::MAX); | ||
(table_schema_index, file_schema_index) | ||
}); | ||
} | ||
|
||
fn build(self) -> SchemaRef { | ||
let mut fields = self.filter_schema_fields.into_iter().collect::<Vec<_>>(); | ||
FilterSchemaBuilder::sort_fields( | ||
&mut fields, | ||
self.table_schema, | ||
self.file_schema, | ||
); | ||
Arc::new(Schema::new(fields)) | ||
} | ||
} | ||
|
||
impl<'node> TreeNodeVisitor<'node> for FilterSchemaBuilder<'_> { | ||
type Node = Arc<dyn PhysicalExpr>; | ||
|
||
fn f_down( | ||
&mut self, | ||
node: &'node Arc<dyn PhysicalExpr>, | ||
) -> Result<TreeNodeRecursion> { | ||
if let Some(column) = node.as_any().downcast_ref::<Column>() { | ||
if let Ok(field) = self.table_schema.field_with_name(column.name()) { | ||
self.filter_schema_fields.insert(Arc::new(field.clone())); | ||
} else if let Ok(field) = self.file_schema.field_with_name(column.name()) { | ||
self.filter_schema_fields.insert(Arc::new(field.clone())); | ||
} else { | ||
// valid fields are the table schema's fields + the file schema's fields, preferring the table schema's fields when there is a conflict | ||
let mut valid_fields = self | ||
.table_schema | ||
.fields() | ||
.iter() | ||
.chain(self.file_schema.fields().iter()) | ||
.cloned() | ||
.collect::<Vec<_>>(); | ||
FilterSchemaBuilder::sort_fields( | ||
&mut valid_fields, | ||
self.table_schema, | ||
self.file_schema, | ||
); | ||
let valid_fields = valid_fields | ||
.into_iter() | ||
.map(|f| datafusion_common::Column::new_unqualified(f.name())) | ||
.collect(); | ||
let field = datafusion_common::Column::new_unqualified(column.name()); | ||
return Err(datafusion_common::DataFusionError::SchemaError( | ||
SchemaError::FieldNotFound { | ||
field: Box::new(field), | ||
valid_fields, | ||
}, | ||
Box::new(None), | ||
)); | ||
} | ||
} | ||
|
||
Ok(TreeNodeRecursion::Continue) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from #15057
The point I'm trying to make is that I think this is both a useful change and that the real diff here or there will be smaller once the other is merged
fn supports_dynamic_filter_pushdown(&self) -> bool { | ||
true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I could nix this method and just treat a Ok(None)
from push_down_dynamic_filter
as not supported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree -- that would be a nicer interface -- or return a specific Enum perhaps 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay will refactor to do this instead 👍🏻
lit(threshold.value.clone()), | ||
)); | ||
|
||
// TODO: handle nulls first/last? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!!
Not sure exactly how to translate that into a dynamic filter...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This transformation might work:
For nulls-first =>(threshold.value is not null) and (threshold.expr is null or comparison)
For nulls-last => comparison
// comparison include (threshold.expr is not null)
That said, if we go with this approach, the following part might no longer be needed:
// Skip null threshold values - can't create a meaningful filter
if threshold.value.is_null() {
continue;
}
WDYT :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hurts my brain a little bit but sounds good to me as long as we add good tests 😄
Would you like to make a PR to this PR to add that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! I'd be happy to give it a try.
let sql = format!("explain analyze {query}"); | ||
let batches = ctx.sql(&sql).await.unwrap().collect().await.unwrap(); | ||
let explain_plan = format!("{}", pretty_format_batches(&batches).unwrap()); | ||
assert_contains!(explain_plan, "row_groups_pruned_statistics=96"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proof this works!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add tests for:
- Negative scenarios where dynamic filter pushdown should not be applied.
- Edge cases such as empty datasets or cases with only dynamic filters without a static predicate.
- Verification that the combined predicate (static AND dynamic) behaves as expected in different configurations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! More tests! I just tried this in my full system and found a bug w/ hive partition columns. Making a note to add a test and fix.
I ran this against Q23, results look promising! Elapsed 3.173 seconds with
|
Nice! I plan to check this PR out tomorrow carefully |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
A lot better than my earlier tinkering effort at this.
I left some comments for your consideration.
let sql = format!("explain analyze {query}"); | ||
let batches = ctx.sql(&sql).await.unwrap().collect().await.unwrap(); | ||
let explain_plan = format!("{}", pretty_format_batches(&batches).unwrap()); | ||
assert_contains!(explain_plan, "row_groups_pruned_statistics=96"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add tests for:
- Negative scenarios where dynamic filter pushdown should not be applied.
- Edge cases such as empty datasets or cases with only dynamic filters without a static predicate.
- Verification that the combined predicate (static AND dynamic) behaves as expected in different configurations.
fn should_enable_page_index( | ||
enable_page_index: bool, | ||
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>, | ||
has_dynamic_filters: bool, | ||
) -> bool { | ||
enable_page_index | ||
&& page_pruning_predicate.is_some() | ||
&& page_pruning_predicate | ||
.as_ref() | ||
.map(|p| p.filter_number() > 0) | ||
.unwrap_or(false) | ||
&& (page_pruning_predicate.is_some() | ||
&& page_pruning_predicate | ||
.as_ref() | ||
.map(|p| p.filter_number() > 0) | ||
.unwrap_or(false)) | ||
|| has_dynamic_filters | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment would be helpful to explain why the presence of dynamic filters should trigger page index enablement.
// Collect dynamic_filters into a single predicate by reducing with AND | ||
let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| { | ||
Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dynamic filters are reduced using the AND operator when combined with the static predicate. It would help to document the reasoning behind this choice and any assumptions about how these predicates interact.
eg Using the AND operator to combine the dynamic filters with the static predicate means that a row (or a row group) must satisfy both conditions before it's read from disk.
The approach assumes that the static predicate and dynamic filters are independent and complementary. In other words, the dynamic filters are not meant to replace or override the original predicate; they refine the set of rows even further. If they were combined using OR, you might end up with more rows than necessary, which would negate the benefits of dynamic filtering.
Since the dynamic filters are calculated at runtime, they might sometimes be conservative estimates. By combining them with AND, the system errs on the side of safety—only excluding data when it’s reasonably certain that the rows won’t match the overall query conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut heap = self.heap.try_write().map_err(|_| { | ||
DataFusionError::Internal( | ||
"Failed to acquire write lock on TopK heap".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of try_write() here and
let mut heap = self.heap.write().map_err later at line 235 immediately return an error if the lock cannot be acquired. This means that if another thread holds the lock—even for a brief moment—the current thread will error out and convert the failure into an internal error. In the blocking version using write(), if the lock acquisition fails due to a poisoned lock, it’s also immediately converted into an internal error.
Converting lock acquisition failures into an internal error is a straightforward approach. It flags an unexpected situation but doesn’t provide details on the nature of the contention or any attempt to recover from transient lock conflicts.
Should we consider retry for transient contention or add logging for diagnostic information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a think a poisoned lock is likely not a scenario that happens often in practice so it is not something that needs a lot of special handing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side issue here - is there a "policy" about using std
locks vs parking_lot
? a basic search showed they are used roughly equally but it seems like a weird inconsistency (There's also this ~3 year-old PR by @xudong963).
datafusion/datasource/src/file.rs
Outdated
|
||
fn supports_dynamic_filter_pushdown(&self) -> bool { | ||
false | ||
} | ||
|
||
fn push_down_dynamic_filter( | ||
&self, | ||
_dynamic_filter: Arc<dyn DynamicFilterSource>, | ||
) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> { | ||
Ok(None) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR adds support for dynamic filter pushdown in multiple modules (e.g., in FileSource, DataSource, ProjectionExec, RepartitionExec, FilterExec, and SortExec).
Common helper functions or traits could reduce code duplication.
For example, a shared trait for dynamic filter pushdown behavior might centralize the logic and reduce maintenance overhead.
Could you share the benchmark script? I tried but the results are similar, perhaps I missed some configurations. |
@adriangb looks super promising, especially as it paves the way for general dynamic filtering! I tested out your branch to see the overlap with #15529, and I had trouble understanding exactly when and how the filters got applied. I dumped the execution plan after running a topk query and the filters were way broader than what I expected. Would it be possible to add an end-to-end test as a "demo" of the feature, especially validating the final filters? |
datafusion/core/tests/parquet/mod.rs
Outdated
@@ -1072,3 +1080,364 @@ async fn make_test_file_page(scenario: Scenario, row_per_page: usize) -> NamedTe | |||
writer.close().unwrap(); | |||
output_file | |||
} | |||
|
|||
struct DynamicFilterTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to move this test block to a dedicated file? The parquet/mod.rs
is already pretty long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep moved into parquet/filter_pushdown.rs
which is both smaller and more related
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to see the progress! I’ve left a few suggestions that might help improve the code’s clarity—especially around parts that weren’t immediately obvious to me on the first read.
Ok(Box::pin(RecordBatchStreamAdapter::new( | ||
self.schema(), | ||
futures::stream::once(async move { | ||
while let Some(batch) = input.next().await { | ||
let batch = batch?; | ||
topk.insert_batch(batch)?; | ||
if enable_dynamic_filter_pushdown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code to update the current executor's dynamic filter can be extracted to a separate function like self.maybe_update_dynamic_filter(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps as a follow-up PR to make this PR easier to merge.
#[derive(Clone, Default)] | ||
#[derive(Clone, Default, Debug)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just an annoyance during debugging. Can revert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- we can also make a separate PR for it too (not needed)
filters.as_ref(), | ||
None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We kind of need to do this otherwise you end up with duplicate filters for the case where ListingTable
said inexact -> a FilterExec gets created -> we then push down from the filter exec in to the DataSourceExec that already had the filter -> duplicate filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because essentially this PR is having to introduce a generalized way to do filter pushdown instead of the very specific way that ListingTable does it. And we wouldn't want to do both at the same time. What we want is for ListingTable to tell us:
- Which filters it can apply just from partitioning (Exact)
- Any other filter becomes
Inexact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can split out the part of this PR that changes how the filters are pushed down (aka pruning predicate per file rather than one overall) as a separate PR to isolate the changes into smaller PRs?
@@ -498,6 +489,7 @@ impl FileSource for ParquetSource { | |||
reorder_filters: self.reorder_filters(), | |||
enable_page_index: self.enable_page_index(), | |||
enable_bloom_filter: self.bloom_filter_on_read(), | |||
enable_stats_pruning: self.table_parquet_options.global.pruning, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to add this for the case where this is disabled (false
): otherwise if we push down filters they end up getting used for row group pruning, which differs from our current behavior.
Startingt o check this one out again |
@alamb check out ecc89f9#diff-05ace4c36d20453103f49749bad98864aea48680b0a4d5691d7ba5185d8ae4c9, I added a lot of docs / comments |
/// ```text | ||
// ┌──────────────────────┐ | ||
// │ CoalesceBatchesExec │ | ||
// └──────────────────────┘ | ||
// │ | ||
// ▼ | ||
// ┌──────────────────────┐ | ||
// │ FilterExec │ | ||
// │ filters = │ | ||
// │ [cost>50,id=1] │ | ||
// └──────────────────────┘ | ||
// │ | ||
// ▼ | ||
// ┌──────────────────────┐ | ||
// │ ProjectionExec │ | ||
// │ cost = price * 1.2 │ | ||
// └──────────────────────┘ | ||
// │ | ||
// ▼ | ||
// ┌──────────────────────┐ | ||
// │ DataSourceExec │ | ||
// │ projection = * │ | ||
// └──────────────────────┘ | ||
/// ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an interesting example.
It made me realize we need to expand PhysicalExpr::suports_filter_pushdown(&self) -> bool
to PhysicalExpr::suports_filter_pushdown(&self, filters: &[&Arc<dyn PhysicalExpr>]) -> Vec<FilterPushdownSupport>
or similar so that ProjectionExec
can check which filters reference the columns it is creating and block those but allow others
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you so much @adriangb @AdamGS @ctsk @YjyJeff @2010YOUY01 @kosiew @suibianwanwank @geoffreyclaude and @berkaysynnada -- this is a pretty amazing piece of optimization and technology and a great team effort.
In my opinion this is a very important feature and the structure in this PR is a great foundation for more general dynamic filtering as @geoffreyclaude says.
Suggested next steps
Since it is looking good, and we should start working on merging it. However, as you have said given the size of this PR I think it might be easier to do so if we broke it into pieces
I suggest we first make a PR for adding the physical filter pushdown (and associated ExecutionPlan methods) in datafusion/physical-optimizer/src/filter_pushdown.rs
BTW
It is quite cool to see a dynamic filter in this explain plan (predicate=DynamicFilterPhysicalExpr [ SortDynamicFilterSource[ ] ] |
)
> explain format indent select * from hits ORDER BY "EventTime" DESC limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: hits.EventTime DESC NULLS FIRST, fetch=10 |
| | TableScan: hits projection=[WatchID, JavaEnable, Title, GoodEvent, EventTime, EventDate, CounterID, ClientIP, RegionID, UserID, CounterClass, OS, UserAgent, URL, Referer, IsRefresh, RefererCategoryID, RefererRegionID, URLCategoryID, URLRegionID, ResolutionWidth, ResolutionHeight, ResolutionDepth, FlashMajor, FlashMinor, FlashMinor2, NetMajor, NetMinor, UserAgentMajor, UserAgentMinor, CookieEnable, JavascriptEnable, IsMobile, MobilePhone, MobilePhoneModel, Params, IPNetworkID, TraficSourceID, SearchEngineID, SearchPhrase, AdvEngineID, IsArtifical, WindowClientWidth, WindowClientHeight, ClientTimeZone, ClientEventTime, SilverlightVersion1, SilverlightVersion2, SilverlightVersion3, SilverlightVersion4, PageCharset, CodeVersion, IsLink, IsDownload, IsNotBounce, FUniqID, OriginalURL, HID, IsOldCounter, IsEvent, IsParameter, DontCountHits, WithHash, HitColor, LocalEventTime, Age, Sex, Income, Interests, Robotness, RemoteIP, WindowName, OpenerName, HistoryLength, BrowserLanguage, BrowserCountry, SocialNetwork, SocialAction, HTTPError, SendTiming, DNSTiming, ConnectTiming, ResponseStartTiming, ResponseEndTiming, FetchTiming, SocialSourceNetworkID, SocialSourcePage, ParamPrice, ParamOrderID, ParamCurrency, ParamCurrencyID, OpenstatServiceName, OpenstatCampaignID, OpenstatAdID, OpenstatSourceID, UTMSource, UTMMedium, UTMCampaign, UTMContent, UTMTerm, FromTag, HasGCLID, RefererHash, URLHash, CLID] |
| physical_plan | SortPreservingMergeExec: [EventTime@4 DESC], fetch=10 |
| | SortExec: TopK(fetch=10), expr=[EventTime@4 DESC], preserve_partitioning=[true] |
| | DataSourceExec: file_groups={16 groups: [[Users/andrewlamb/Downloads/hits/hits.parquet:0..923748528], [Users/andrewlamb/Downloads/hits/hits.parquet:923748528..1847497056], [Users/andrewlamb/Downloads/hits/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Downloads/hits/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Downloads/hits/hits.parquet:3694994112..4618742640], ...]}, projection=[WatchID, JavaEnable, Title, GoodEvent, EventTime, EventDate, CounterID, ClientIP, RegionID, UserID, CounterClass, OS, UserAgent, URL, Referer, IsRefresh, RefererCategoryID, RefererRegionID, URLCategoryID, URLRegionID, ResolutionWidth, ResolutionHeight, ResolutionDepth, FlashMajor, FlashMinor, FlashMinor2, NetMajor, NetMinor, UserAgentMajor, UserAgentMinor, CookieEnable, JavascriptEnable, IsMobile, MobilePhone, MobilePhoneModel, Params, IPNetworkID, TraficSourceID, SearchEngineID, SearchPhrase, AdvEngineID, IsArtifical, WindowClientWidth, WindowClientHeight, ClientTimeZone, ClientEventTime, SilverlightVersion1, SilverlightVersion2, SilverlightVersion3, SilverlightVersion4, PageCharset, CodeVersion, IsLink, IsDownload, IsNotBounce, FUniqID, OriginalURL, HID, IsOldCounter, IsEvent, IsParameter, DontCountHits, WithHash, HitColor, LocalEventTime, Age, Sex, Income, Interests, Robotness, RemoteIP, WindowName, OpenerName, HistoryLength, BrowserLanguage, BrowserCountry, SocialNetwork, SocialAction, HTTPError, SendTiming, DNSTiming, ConnectTiming, ResponseStartTiming, ResponseEndTiming, FetchTiming, SocialSourceNetworkID, SocialSourcePage, ParamPrice, ParamOrderID, ParamCurrency, ParamCurrencyID, OpenstatServiceName, OpenstatCampaignID, OpenstatAdID, OpenstatSourceID, UTMSource, UTMMedium, UTMCampaign, UTMContent, UTMTerm, FromTag, HasGCLID, RefererHash, URLHash, CLID], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ SortDynamicFilterSource[ ] ] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.059 seconds.
#[derive(Clone, Default)] | ||
#[derive(Clone, Default, Debug)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- we can also make a separate PR for it too (not needed)
@@ -349,11 +337,13 @@ impl ParquetSource { | |||
} | |||
|
|||
/// Optional reference to this parquet scan's pruning predicate | |||
#[deprecated(note = "ParquetDataSource no longer constructs a PruningPredicate.")] | |||
pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> { | |||
self.pruning_predicate.as_ref() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For anyone following along
filters.as_ref(), | ||
None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can split out the part of this PR that changes how the filters are pushed down (aka pruning predicate per file rather than one overall) as a separate PR to isolate the changes into smaller PRs?
I am trying to write some tests for filter_pushdown now, btw, as a way to help make that a separate PR |
If you like this pattern I recommend:
|
/// There are two pushdowns we can do here: | ||
/// 1. Push down the `[d.size > 100]` filter through the `HashJoinExec` node to the `DataSourceExec` node for the `departments` table. | ||
/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading | ||
/// rows from teh `users` table that will be eliminated by the join. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// rows from teh `users` table that will be eliminated by the join. | |
/// rows from the `users` table that will be eliminated by the join. |
input: Arc::clone(&self.input), | ||
metrics: self.metrics.clone(), | ||
default_selectivity: self.default_selectivity, | ||
cache: self.cache.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if predicate
is updated, should we update the cache too?
} | ||
} | ||
|
||
impl PhysicalExpr for DynamicFilterPhysicalExpr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PhysicalExpr
is usually in physical-expr
crate
/// Sort expressions | ||
expr: LexOrdering, | ||
/// Current threshold values | ||
thresholds: Arc<RwLock<Vec<Option<ScalarValue>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need Arc<RwLock<T>>
? I think we will only have single instance so it is safe to update the values
pub fn update_values(&self, new_values: &[ScalarValue]) -> Result<()> { | ||
let replace = { | ||
let thresholds = self.thresholds.read().map_err(|_| { | ||
datafusion_common::DataFusionError::Execution( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exec_err!()
None => Some(predicate), | ||
Some(acc) => Some(Arc::new(BinaryExpr::new(acc, Operator::And, predicate))), | ||
}) | ||
.unwrap_or_else(|| crate::expressions::lit(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can check the len of predicates
before calling this function so we know need to create lit(true) for this case
/// | ||
/// See `TopKDynamicFilterSource` in datafusion/physical-plan/src/topk/mod.rs for examples. | ||
pub trait DynamicFilterSource: | ||
Send + Sync + std::fmt::Debug + DynEq + DynHash + Display + 'static |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need 'static
|
||
fn push_down_filters( | ||
&self, | ||
filters: &[Arc<dyn PhysicalExpr>], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filters: &[Arc<dyn PhysicalExpr>], | |
filters: &[PhysicalExprRef], |
filters: &[Arc<dyn PhysicalExpr>], | ||
) -> Result<Option<DataSourceFilterPushdownResult>> { | ||
if let Some(file_source_result) = self.file_source.push_down_filters(filters)? { | ||
let mut new_self = self.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could try to avoid clone if possible
fn push_down_filters(
self: Arc<Self>,
filters: &[Arc<dyn PhysicalExpr>],
) -> Result<Option<DataSourceFilterPushdownResult>> {
if let Some(file_source_result) = self.file_source.push_down_filters(filters)? {
let mut inner = Arc::into_inner(self).unwrap();
inner.file_source = file_source_result.inner;
Ok(Some(DataSourceFilterPushdownResult {
inner: Arc::new(inner) as Arc<dyn DataSource>,
support: file_source_result.support,
}))
} else {
Ok(None)
}
}
I'm closing this now massive PR in favor of splitting it up into units of work #15512 (comment) Thank you all for the amazing reviews! Lets continue work in the smaller PRs so that it's more tractable and easier to review / diff. |
let mut filters: Vec<Arc<dyn PhysicalExpr>> = | ||
Vec::with_capacity(thresholds.len()); | ||
|
||
let mut prev_sort_expr: Option<Arc<dyn PhysicalExpr>> = None; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be written with fewer expressions, like the following:
col0 < threshold0 || (col0 == threshold0 && col1 < threshold1 || (...)
ORDER BY LIMIT
queries) #15037This introduces a general mechanism for arbitrary
ExecutionPlan
s to push down filters to their children.This mechanism can be used in the future to tackle #7955.
This PR only implements this for the TopK operator.
If filter pushdown on parquet is turned on this is showing a ~3x performance improvement for Q23, overall 10x faster than main.
And I believe there is more juice to squeeze by:
SortPreservingMergeExec
which will enable "global" pushdown helping plans with many partitions.