Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32444][SQL] Infer filters from DPP #29243

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ object SubqueryExpression {
}

/**
* Returns true when an expression contains a subquery that has outer reference(s). The outer
* Returns true when an expression contains a subquery that has outer reference(s) except
* the [[org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery]]. The outer
* reference attributes are kept as children of subquery expression by
* [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
*/
def hasCorrelatedSubquery(e: Expression): Boolean = {
e.find {
case _: DynamicPruningSubquery => false
case s: SubqueryExpression => s.children.nonEmpty
case _ => false
}.isDefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
override protected val excludedOnceBatches: Set[String] =
Set(
"PartitionPruning",
"Infer Filters from PartitionPruning",
"Extract Python UDFs")

protected def fixedPoint =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ class SparkOptimizer(
Batch("PartitionPruning", Once,
PartitionPruning,
OptimizeSubqueries) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
Batch("Pushdown Filters from PartitionPruning before Inferring Filters", fixedPoint,
PushDownPredicates) :+
Batch("Infer Filters from PartitionPruning", Once,
InferFiltersFromConstraints) :+
Batch("Pushdown Filters from PartitionPruning after Inferring Filters", fixedPoint,
PushDownPredicates) :+
Batch("Cleanup filters that cannot be pushed down", Once,
CleanupDynamicPruningFilters,
BooleanSimplification,
PruneFilters)) ++
postHocOptimizationBatches :+
Batch("Extract Python UDFs", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.execution.dynamicpruning

import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, DynamicPruningSubquery, Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.dynamicpruning.PartitionPruning._
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -32,12 +33,36 @@ import org.apache.spark.sql.internal.SQLConf
*/
object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelper {

// Check whether need to remove inferred DPP.
private def isRemoveInferred(condition: Expression, child: LogicalPlan): Boolean = {
splitConjunctivePredicates(condition).exists {
case DynamicPruningSubquery(pruningKey, buildQuery, buildKeys, index, _, _) =>
getPartitionTableScan(pruningKey, child).isEmpty || (!SQLConf.get.exchangeReuseEnabled &&
!pruningHasBenefit(pruningKey, child, buildKeys(index), buildQuery))
case _ => false
}
}

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!SQLConf.get.dynamicPartitionPruningEnabled) {
return plan
}

plan.transform {
// Remove any DynamicPruning Filters that didn't filter on partition column and
// do not have has benefit. This is inferred by Infer Filters from PartitionPruning.
case f @ Filter(condition, child)
if SQLConf.get.constraintPropagationEnabled && isRemoveInferred(condition, child) =>
val newCondition = condition.transform {
case DynamicPruningSubquery(pruningKey, _, _, _, _, _)
if getPartitionTableScan(pruningKey, child).isEmpty =>
TrueLiteral
case DynamicPruningSubquery(pruningKey, buildQuery, buildKeys, index, _, _)
if !SQLConf.get.exchangeReuseEnabled &&
!pruningHasBenefit(pruningKey, child, buildKeys(index), buildQuery) =>
TrueLiteral
}
f.copy(condition = newCondition)
// pass through anything that is pushed down into PhysicalOperation
case p @ PhysicalOperation(_, _, LogicalRelation(_: HadoopFsRelation, _, _, _)) => p
// remove any Filters with DynamicPruning that didn't get pushed down to PhysicalOperation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
* using column statistics if they are available, otherwise we use the config value of
* `spark.sql.optimizer.joinFilterRatio`.
*/
private def pruningHasBenefit(
def pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
Expand Down Expand Up @@ -147,7 +147,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
}

// the pruning overhead is the total size in bytes of all scan relations
val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum.toFloat
val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum

filterRatio * partPlan.stats.sizeInBytes.toFloat > overhead.toFloat
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ TakeOrderedAndProject (58)
Output [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/customer]
PushedFilters: [IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk)]
PushedFilters: [IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk), IsNotNull(c_customer_sk)]
ReadSchema: struct<c_customer_sk:int,c_current_cdemo_sk:int,c_current_addr_sk:int>

(2) ColumnarToRow [codegen id : 1]
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]

(3) Filter [codegen id : 1]
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Condition : (isnotnull(c_current_addr_sk#5) AND isnotnull(c_current_cdemo_sk#4))
Condition : ((isnotnull(c_current_addr_sk#5) AND isnotnull(c_current_cdemo_sk#4)) AND isnotnull(c_customer_sk#3))

(4) Exchange
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Expand All @@ -85,15 +85,15 @@ Arguments: [c_customer_sk#3 ASC NULLS FIRST], false, 0
Output [2]: [ss_sold_date_sk#7, ss_customer_sk#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/store_sales]
PushedFilters: [IsNotNull(ss_sold_date_sk)]
PushedFilters: [IsNotNull(ss_sold_date_sk), IsNotNull(ss_customer_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>

(7) ColumnarToRow [codegen id : 4]
Input [2]: [ss_sold_date_sk#7, ss_customer_sk#8]

(8) Filter [codegen id : 4]
Input [2]: [ss_sold_date_sk#7, ss_customer_sk#8]
Condition : isnotnull(ss_sold_date_sk#7)
Condition : (isnotnull(ss_sold_date_sk#7) AND isnotnull(ss_customer_sk#8))

(9) Scan parquet default.date_dim
Output [3]: [d_date_sk#9, d_year#10, d_moy#11]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ TakeOrderedAndProject [cd_credit_rating,cd_dep_college_count,cd_dep_count,cd_dep
InputAdapter
Exchange [c_customer_sk] #3
WholeStageCodegen (1)
Filter [c_current_addr_sk,c_current_cdemo_sk]
Filter [c_current_addr_sk,c_current_cdemo_sk,c_customer_sk]
ColumnarToRow
InputAdapter
Scan parquet default.customer [c_current_addr_sk,c_current_cdemo_sk,c_customer_sk]
Expand All @@ -37,7 +37,7 @@ TakeOrderedAndProject [cd_credit_rating,cd_dep_college_count,cd_dep_count,cd_dep
WholeStageCodegen (4)
Project [ss_customer_sk]
BroadcastHashJoin [d_date_sk,ss_sold_date_sk]
Filter [ss_sold_date_sk]
Filter [ss_customer_sk,ss_sold_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ TakeOrderedAndProject (50)
Output [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/customer]
PushedFilters: [IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk)]
PushedFilters: [IsNotNull(c_current_addr_sk), IsNotNull(c_current_cdemo_sk), IsNotNull(c_customer_sk)]
ReadSchema: struct<c_customer_sk:int,c_current_cdemo_sk:int,c_current_addr_sk:int>

(2) ColumnarToRow [codegen id : 9]
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]

(3) Filter [codegen id : 9]
Input [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Condition : (isnotnull(c_current_addr_sk#5) AND isnotnull(c_current_cdemo_sk#4))
Condition : ((isnotnull(c_current_addr_sk#5) AND isnotnull(c_current_cdemo_sk#4)) AND isnotnull(c_customer_sk#3))

(4) Scan parquet default.store_sales
Output [2]: [ss_sold_date_sk#6, ss_customer_sk#7]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/store_sales]
PushedFilters: [IsNotNull(ss_sold_date_sk)]
PushedFilters: [IsNotNull(ss_sold_date_sk), IsNotNull(ss_customer_sk)]
ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>

(5) ColumnarToRow [codegen id : 2]
Input [2]: [ss_sold_date_sk#6, ss_customer_sk#7]

(6) Filter [codegen id : 2]
Input [2]: [ss_sold_date_sk#6, ss_customer_sk#7]
Condition : isnotnull(ss_sold_date_sk#6)
Condition : (isnotnull(ss_sold_date_sk#6) AND isnotnull(ss_customer_sk#7))

(7) Scan parquet default.date_dim
Output [3]: [d_date_sk#8, d_year#9, d_moy#10]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ TakeOrderedAndProject [cd_credit_rating,cd_dep_college_count,cd_dep_count,cd_dep
BroadcastHashJoin [c_customer_sk,cs_ship_customer_sk]
BroadcastHashJoin [c_customer_sk,ws_bill_customer_sk]
BroadcastHashJoin [c_customer_sk,ss_customer_sk]
Filter [c_current_addr_sk,c_current_cdemo_sk]
Filter [c_current_addr_sk,c_current_cdemo_sk,c_customer_sk]
ColumnarToRow
InputAdapter
Scan parquet default.customer [c_current_addr_sk,c_current_cdemo_sk,c_customer_sk]
Expand All @@ -23,7 +23,7 @@ TakeOrderedAndProject [cd_credit_rating,cd_dep_college_count,cd_dep_count,cd_dep
WholeStageCodegen (2)
Project [ss_customer_sk]
BroadcastHashJoin [d_date_sk,ss_sold_date_sk]
Filter [ss_sold_date_sk]
Filter [ss_customer_sk,ss_sold_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ Arguments: [ss_item_sk#2 ASC NULLS FIRST], false, 0
Output [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/item]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id)]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id), IsNotNull(i_item_sk)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(7) ColumnarToRow [codegen id : 17]
Input [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]

(8) Filter [codegen id : 17]
Input [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]
Condition : ((isnotnull(i_class_id#8) AND isnotnull(i_brand_id#7)) AND isnotnull(i_category_id#9))
Condition : (((isnotnull(i_class_id#8) AND isnotnull(i_brand_id#7)) AND isnotnull(i_category_id#9)) AND isnotnull(i_item_sk#6))

(9) Scan parquet default.store_sales
Output [2]: [ss_sold_date_sk#1, ss_item_sk#2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ TakeOrderedAndProject [channel,i_brand_id,i_category_id,i_class_id,sum(number_sa
WholeStageCodegen (17)
Project [i_item_sk]
BroadcastHashJoin [brand_id,category_id,class_id,i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id,i_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_brand_id,i_category_id,i_class_id,i_item_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ Condition : (isnotnull(ss_item_sk#2) AND isnotnull(ss_sold_date_sk#1))
Output [4]: [i_item_sk#5, i_brand_id#6, i_class_id#7, i_category_id#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/item]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id)]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id), IsNotNull(i_item_sk)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(5) ColumnarToRow [codegen id : 11]
Input [4]: [i_item_sk#5, i_brand_id#6, i_class_id#7, i_category_id#8]

(6) Filter [codegen id : 11]
Input [4]: [i_item_sk#5, i_brand_id#6, i_class_id#7, i_category_id#8]
Condition : ((isnotnull(i_class_id#7) AND isnotnull(i_brand_id#6)) AND isnotnull(i_category_id#8))
Condition : (((isnotnull(i_class_id#7) AND isnotnull(i_brand_id#6)) AND isnotnull(i_category_id#8)) AND isnotnull(i_item_sk#5))

(7) Scan parquet default.store_sales
Output [2]: [ss_sold_date_sk#1, ss_item_sk#2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ TakeOrderedAndProject [channel,i_brand_id,i_category_id,i_class_id,sum(number_sa
WholeStageCodegen (11)
Project [i_item_sk]
BroadcastHashJoin [brand_id,category_id,class_id,i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id,i_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_brand_id,i_category_id,i_class_id,i_item_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ Arguments: [ss_item_sk#2 ASC NULLS FIRST], false, 0
Output [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite/item]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id)]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id), IsNotNull(i_item_sk)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(7) ColumnarToRow [codegen id : 17]
Input [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]

(8) Filter [codegen id : 17]
Input [4]: [i_item_sk#6, i_brand_id#7, i_class_id#8, i_category_id#9]
Condition : ((isnotnull(i_class_id#8) AND isnotnull(i_brand_id#7)) AND isnotnull(i_category_id#9))
Condition : (((isnotnull(i_class_id#8) AND isnotnull(i_brand_id#7)) AND isnotnull(i_category_id#9)) AND isnotnull(i_item_sk#6))

(9) Scan parquet default.store_sales
Output [2]: [ss_sold_date_sk#1, ss_item_sk#2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ TakeOrderedAndProject [channel,channel,i_brand_id,i_brand_id,i_category_id,i_cat
WholeStageCodegen (17)
Project [i_item_sk]
BroadcastHashJoin [brand_id,category_id,class_id,i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id,i_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_brand_id,i_category_id,i_class_id,i_item_sk]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ Condition : (isnotnull(ss_item_sk#2) AND isnotnull(ss_sold_date_sk#1))
Output [4]: [i_item_sk#5, i_brand_id#6, i_class_id#7, i_category_id#8]
Batched: true
Location: InMemoryFileIndex [file:/Users/yi.wu/IdeaProjects/spark/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite/item]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id)]
PushedFilters: [IsNotNull(i_class_id), IsNotNull(i_brand_id), IsNotNull(i_category_id), IsNotNull(i_item_sk)]
ReadSchema: struct<i_item_sk:int,i_brand_id:int,i_class_id:int,i_category_id:int>

(5) ColumnarToRow [codegen id : 11]
Input [4]: [i_item_sk#5, i_brand_id#6, i_class_id#7, i_category_id#8]

(6) Filter [codegen id : 11]
Input [4]: [i_item_sk#5, i_brand_id#6, i_class_id#7, i_category_id#8]
Condition : ((isnotnull(i_class_id#7) AND isnotnull(i_brand_id#6)) AND isnotnull(i_category_id#8))
Condition : (((isnotnull(i_class_id#7) AND isnotnull(i_brand_id#6)) AND isnotnull(i_category_id#8)) AND isnotnull(i_item_sk#5))

(7) Scan parquet default.store_sales
Output [2]: [ss_sold_date_sk#1, ss_item_sk#2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ TakeOrderedAndProject [channel,channel,i_brand_id,i_brand_id,i_category_id,i_cat
WholeStageCodegen (11)
Project [i_item_sk]
BroadcastHashJoin [brand_id,category_id,class_id,i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id]
Filter [i_brand_id,i_category_id,i_class_id,i_item_sk]
ColumnarToRow
InputAdapter
Scan parquet default.item [i_brand_id,i_category_id,i_class_id,i_item_sk]
Expand Down
Loading