diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala index 9602a28688..e6ab425d08 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala @@ -52,6 +52,12 @@ case class CometSparkToColumnarExec(child: SparkPlan) override def supportsColumnar: Boolean = true + override def nodeName: String = if (child.supportsColumnar) { + "CometSparkColumnarToColumnar" + } else { + "CometSparkRowToColumnar" + } + override lazy val metrics: Map[String, SQLMetric] = Map( "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index c054d02d79..9606e768ba 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1707,6 +1707,59 @@ class CometExecSuite extends CometTestBase { } } + test("SparkToColumnar override node name for row input") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + val df = spark + .range(1000) + .selectExpr("id as key", "id % 8 as value") + .toDF("key", "value") + .groupBy("key") + .count() + df.collect() + + val planAfter = df.queryExecution.executedPlan + assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) + val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + val nodeNames = adaptivePlan.collect { case c: CometSparkToColumnarExec => + c.nodeName + } + assert(nodeNames.length == 1) + assert(nodeNames.head == "CometSparkRowToColumnar") + } + } + + test("SparkToColumnar override node name for columnar input") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + SQLConf.USE_V1_SOURCE_LIST.key -> "", + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + spark + .range(1000) + .selectExpr("id as key", "id % 8 as value") + .toDF("key", "value") + .write + .mode("overwrite") + .parquet("/tmp/test") + val df = spark.read.parquet("/tmp/test") + val rowDf = df.toDF() + rowDf.collect() + + val planAfter = rowDf.queryExecution.executedPlan + assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) + val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + val nodeNames = adaptivePlan.collect { case c: CometSparkToColumnarExec => + c.nodeName + } + assert(nodeNames.length == 1) + assert(nodeNames.head == "CometSparkColumnarToColumnar") + } + } + test("aggregate window function for all types") { val numValues = 2048