Skip to content

Commit 1d8df4f

Browse files
committed
[SPARK-45606][SQL] Release restrictions on multi-layer runtime filter
### What changes were proposed in this pull request? Before #39170, Spark only supports insert runtime filter for application side of shuffle join on single-layer. Considered it's not worth to insert more runtime filter if the column already exists runtime filter, Spark restricts it at https://github.com/apache/spark/blob/7057952f6bc2c5cf97dd408effd1b18bee1cb8f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala#L346 For example `select * from bf1 join bf2 on bf1.c1 = bf2.c2 and bf1.c1 = bf2.b2 where bf2.a2 = 62` This SQL have two join conditions. There will insert two runtime filter on `bf1.c1` if haven't the restriction mentioned above. At that time, it was reasonable. After #39170, Spark supports insert runtime filter for one side of any shuffle join on multi-layer. But the restrictions on multi-layer runtime filter mentioned above looks outdated. For example `select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5` Assume bf2 as the build side and insert a runtime filter for bf1. We can't insert the same runtime filter for bf3 due to there are already a runtime filter on `bf1.c1`. The behavior is different from the origin and is unexpected. The change of the PR doesn't affect the restriction mentioned above. ### Why are the changes needed? Release restrictions on multi-layer runtime filter. Expand optimization surface. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Test cases updated. Micro benchmark for q9 in TPC-H. **TPC-H 100** Query | Master(ms) | PR(ms) | Difference(ms) | Percent -- | -- | -- | -- | -- q9 | 26491 | 20725 | 5766| 27.82% ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43449 from beliefer/SPARK-45606. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Jiaan Geng <[email protected]>
1 parent a912706 commit 1d8df4f

File tree

2 files changed

+18
-23
lines changed

2 files changed

+18
-23
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -247,15 +247,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
247247
}
248248
}
249249

250-
private def hasBloomFilter(
251-
left: LogicalPlan,
252-
right: LogicalPlan,
253-
leftKey: Expression,
254-
rightKey: Expression): Boolean = {
255-
findBloomFilterWithKey(left, leftKey) || findBloomFilterWithKey(right, rightKey)
256-
}
257-
258-
private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression): Boolean = {
250+
private def hasBloomFilter(plan: LogicalPlan, key: Expression): Boolean = {
259251
plan.exists {
260252
case Filter(condition, _) =>
261253
splitConjunctivePredicates(condition).exists {
@@ -277,28 +269,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
277269
leftKeys.lazyZip(rightKeys).foreach((l, r) => {
278270
// Check if:
279271
// 1. There is already a DPP filter on the key
280-
// 2. There is already a bloom filter on the key
281-
// 3. The keys are simple cheap expressions
272+
// 2. The keys are simple cheap expressions
282273
if (filterCounter < numFilterThreshold &&
283274
!hasDynamicPruningSubquery(left, right, l, r) &&
284-
!hasBloomFilter(newLeft, newRight, l, r) &&
285275
isSimpleExpression(l) && isSimpleExpression(r)) {
286276
val oldLeft = newLeft
287277
val oldRight = newRight
288-
// Check if the current join is a shuffle join or a broadcast join that
289-
// has a shuffle below it
278+
// Check if:
279+
// 1. The current join type supports prune the left side with runtime filter
280+
// 2. The current join is a shuffle join or a broadcast join that
281+
// has a shuffle below it
282+
// 3. There is no bloom filter on the left key yet
290283
val hasShuffle = isProbablyShuffleJoin(left, right, hint)
291-
if (canPruneLeft(joinType) && (hasShuffle || probablyHasShuffle(left))) {
284+
if (canPruneLeft(joinType) && (hasShuffle || probablyHasShuffle(left)) &&
285+
!hasBloomFilter(newLeft, l)) {
292286
extractBeneficialFilterCreatePlan(left, right, l, r).foreach {
293287
case (filterCreationSideKey, filterCreationSidePlan) =>
294288
newLeft = injectFilter(l, newLeft, filterCreationSideKey, filterCreationSidePlan)
295289
}
296290
}
297291
// Did we actually inject on the left? If not, try on the right
298-
// Check if the current join is a shuffle join or a broadcast join that
299-
// has a shuffle below it
292+
// Check if:
293+
// 1. The current join type supports prune the right side with runtime filter
294+
// 2. The current join is a shuffle join or a broadcast join that
295+
// has a shuffle below it
296+
// 3. There is no bloom filter on the right key yet
300297
if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
301-
(hasShuffle || probablyHasShuffle(right))) {
298+
(hasShuffle || probablyHasShuffle(right)) && !hasBloomFilter(newRight, r)) {
302299
extractBeneficialFilterCreatePlan(right, left, r, l).foreach {
303300
case (filterCreationSideKey, filterCreationSidePlan) =>
304301
newRight = injectFilter(

sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -335,14 +335,12 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
335335
"bf1.c1 = bf2.c2 where bf2.a2 = 5) as a join bf3 on bf3.c3 = a.c1", 2)
336336
assertRewroteWithBloomFilter("select * from (select * from bf1 right join bf2 on " +
337337
"bf1.c1 = bf2.c2 where bf2.a2 = 5) as a join bf3 on bf3.c3 = a.c1", 2)
338-
// Can't leverage the transitivity of join keys due to runtime filters already exists.
339-
// bf2 as creation side and inject runtime filter for bf1.
340338
assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " +
341-
"and bf3.c3 = bf1.c1 where bf2.a2 = 5")
339+
"and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2)
342340
assertRewroteWithBloomFilter("select * from bf1 left outer join bf2 join bf3 on " +
343-
"bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5")
341+
"bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2)
344342
assertRewroteWithBloomFilter("select * from bf1 right outer join bf2 join bf3 on " +
345-
"bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5")
343+
"bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2)
346344
}
347345

348346
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",

0 commit comments

Comments
 (0)