From 352bdb7af3d284b6e07f40818ed1f217cbe82649 Mon Sep 17 00:00:00 2001 From: Rahij Ramsharan Date: Mon, 8 Mar 2021 13:56:04 +0000 Subject: [PATCH] Use non-batch scans during sorted bucketed reads (#738) * fallback to non-batch scan during sorted bucketed reads * rm comment --- .../spark/sql/execution/DataSourceScanExec.scala | 2 +- .../apache/spark/sql/sources/BucketedReadSuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 5df26b4b2647e..d8457598cc7e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -153,7 +153,7 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsBatch: Boolean = { - relation.fileFormat.supportBatch(relation.sparkSession, schema) + scanMode == RegularMode && relation.fileFormat.supportBatch(relation.sparkSession, schema) } private lazy val scanMode: ScanMode = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 76aff25b7d930..e285eb81553ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -391,6 +391,18 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight, s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") } + + // check answer with codegen enabled + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + val t1 = spark.table("bucketed_table1") + val t2 = spark.table("bucketed_table2") + val joined = t1.join(t2, joinCondition(t1, t2), joinType) + + checkAnswer( + joined.sort("bucketed_table1.k", "bucketed_table2.k"), + df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k")) + } } }