-
Notifications
You must be signed in to change notification settings - Fork 1.6k
refactor filter pushdown APIs #16642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
5c6c119
to
2b0c438
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- this seems like a clear improvement to me from an API perspective: Allows more features with less code 👍 . Thank you for driving this along
cc @ozankabak and @berkaysynnada in case you would like to review the changes to the filter APIs
return Ok(FilterDescription::new_with_child_count(1) | ||
.all_parent_filters_supported(parent_filters) | ||
.with_self_filter(filter)); | ||
child = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this looks much better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job on the refactor!
Left a few comments for your consideration.
let child_column_names: HashSet<&str> = child_schema | ||
.fields() | ||
.iter() | ||
.map(|f| f.name().as_str()) | ||
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could be redundantly rebuilding the entire HashSet
of column names for the exact same ExecutionPlan
node multiple times during the optimization process.
Here’s a scenario to illustrate:
Imagine a complex query plan. A specific node, let's say ParquetScan(file1.parquet), exists as a shared reference (Arc) and might be evaluated in different contexts as the optimizer walks the plan tree.
-
First Encounter: The optimizer analyzes a FilterNode that has ParquetScan(file1.parquet) as a child. It calls
ChildFilterDescription::from_child. This function iterates through the schema of the parquet scan and builds a HashSet of its column names for the first time. -
Second Encounter: Later in the same optimization pass, another node—perhaps a JoinNode—also needs to analyze what can be pushed down to that very same ParquetScan(file1.parquet) instance.
Without caching, the from_child function would be called again for the same ParquetScan node, and it would re-build the exact same HashSet
of column names from scratch.
Perhaps, consider implement a caching mechanism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that performance of optimizer rules and planning is a concern but I think that needs to be solved at a higher level (e.g. caching of plan trees or subtrees).
/// Create a new [`FilterPushdownPropagation`] with the specified filter support. | ||
pub fn with_filters(filters: PredicateSupports) -> Self { | ||
pub fn with_filters(filters: Vec<PredicateSupport>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old .unsupported(...) helper was very explicit. Now, to produce an “all unsupported” result we must write:
FilterPushdownPropagation::with_filters(
filters.into_iter().map(PredicateSupport::Unsupported).collect()
)
While this is idiomatic Rust, it forces the developer to think about the mechanism (map, collect) rather than the intent ("all of these filters were rejected").
How about adding a helper function makes the intent crystal clear at the call site.
FilterPushdownPropagation::all_rejected(filters)
This is better because:
* It's self-documenting. The name of the function tells you exactly what's happening.
* It's higher-level. It allows developers to work at the level of "what they want to do" rather than "how to do it."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the balance we had way too many methods before. I think I'd rather go with less methods for now and then we can have focused PRs in the future to add helper methods where it makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also mostly used in default implementations, I think it's unlikely actual implementers will write code like this (if you're implementing the methods you probably want to allow some filters through or all filters through hence FilterPushdownPropagation::transparent
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your reasons for this and do not disagree.
for _child in self.children() { | ||
let child_filters = parent_filters | ||
.iter() | ||
.map(|f| PredicateSupport::Unsupported(Arc::clone(f))) | ||
.collect(); | ||
desc = desc.with_child(ChildFilterDescription { | ||
parent_filters: child_filters, | ||
self_filters: vec![], | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opportunity for reducing DRY.
This loop is effectively a broadcast of “unsupported” to every child. That’s exactly what FilterDescription::from_children could do if you passed a helper that always returns Unsupported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you suggest adding an optional helper, or making a new method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop is only happening in one place atm. I refactored it a bit to hoist code out of the loop. I think for things like this that are only called in 1 place we should err on the side of not adding new public APIs for 1 call site and wait until people actually implementing these methods on their ExecutionPlan's and request helper methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Looks like has one more failure (and maybe we can merge up too) |
…elpers to simplify APIs
Co-authored-by: Andrew Lamb <[email protected]>
1e49157
to
2f4817a
Compare
done! |
Is this PR ready to merge? |
Yes by me! |
@kosiew are you happy with this PR too? |
yep |
Closes #16188.
As discussed in that issue one of the reasons to hold off on refactoring the APIs was waiting until they were used in more places so we could get a better picture of what was needed.
Working on #16445 necessitated adding new APIs which led me to wanting to do this refactor.
The refactor of the APIs focused on two key things:
PredicateSupports
in favor ofVec<PredicateSupport>
. I think the fact that the PR is net negative LOC indicates removing these helpers and abstractions was worth it.gather_filters_for_pushdown
into a single place, which simplifies away several APIs and lifts a lot of complexity from ExecutionPlan implementations into the pushdown module itself. Notable instead of complex logic w/ projections and such we have a simple 2 step approach: (1) check if the filter only references columns in the child and (2) reassign column indexes to match the child's schema using the existing utility function.All of this makes implementing parent filter pushdown for joins only a couple LOC (which I will do in a followup PR).