From 2d97a573e8ee8e91edf9d01473c7cc57cdd6dfd4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Jan 2025 17:18:33 -0700 Subject: [PATCH 1/9] fix --- .../scala/org/apache/comet/CometConf.scala | 9 ++ .../apache/comet/serde/QueryPlanSerde.scala | 94 ++++----------- .../scala/org/apache/comet/serde/arrays.scala | 111 +++++++++++++++++- .../apache/comet/CometExpressionSuite.scala | 70 ++++++----- 4 files changed, 177 insertions(+), 107 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 2e64403c6..04729fdd4 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -605,6 +605,15 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = + conf("spark.comet.expression.allowIncompatible") + .doc( + "Comet is not currently fully compatible with Spark for all expressions. " + + "Set this config to true to allow them anyway. See compatibility guide " + + "for more information.") + .booleanConf + .createWithDefault(false) + val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = conf("spark.comet.cast.allowIncompatible") .doc( diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index cb4fffc1a..71992c869 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -929,6 +929,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim binding: Boolean): Option[Expr] = { SQLConf.get + def toProto(handler: CometExpressionSerde): Option[Expr] = { + if (handler.isIncompat() && !CometConf.COMET_ALLOW_INCOMPATIBLE.get()) { + withInfo( + expr, + s"$expr is not fully compatible with Spark. To enable it anyway, set " + + s"${CometConf.COMET_ALLOW_INCOMPATIBLE.key}=true") + return None + } + handler.convert(expr, inputs, binding) + } + expr match { case a @ Alias(_, _) => val r = exprToProtoInternal(a.child, inputs, binding) @@ -2371,83 +2382,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo(expr, "unsupported arguments for GetArrayStructFields", child) None } - case expr: ArrayRemove => CometArrayRemove.convert(expr, inputs, binding) - case expr if expr.prettyName == "array_contains" => - createBinaryExpr( - expr, - expr.children(0), - expr.children(1), - inputs, - binding, - (builder, binaryExpr) => builder.setArrayContains(binaryExpr)) - case _ if expr.prettyName == "array_append" => - createBinaryExpr( - expr, - expr.children(0), - expr.children(1), - inputs, - binding, - (builder, binaryExpr) => builder.setArrayAppend(binaryExpr)) - case _ if expr.prettyName == "array_intersect" => - createBinaryExpr( - expr, - expr.children(0), - expr.children(1), - inputs, - binding, - (builder, binaryExpr) => builder.setArrayIntersect(binaryExpr)) - case ArrayJoin(arrayExpr, delimiterExpr, nullReplacementExpr) => - val arrayExprProto = exprToProto(arrayExpr, inputs, binding) - val delimiterExprProto = exprToProto(delimiterExpr, inputs, binding) - - if (arrayExprProto.isDefined && delimiterExprProto.isDefined) { - val arrayJoinBuilder = nullReplacementExpr match { - case Some(nrExpr) => - val nullReplacementExprProto = exprToProto(nrExpr, inputs, binding) - ExprOuterClass.ArrayJoin - .newBuilder() - .setArrayExpr(arrayExprProto.get) - .setDelimiterExpr(delimiterExprProto.get) - .setNullReplacementExpr(nullReplacementExprProto.get) - case None => - ExprOuterClass.ArrayJoin - .newBuilder() - .setArrayExpr(arrayExprProto.get) - .setDelimiterExpr(delimiterExprProto.get) - } - Some( - ExprOuterClass.Expr - .newBuilder() - .setArrayJoin(arrayJoinBuilder) - .build()) - } else { - val exprs: List[Expression] = nullReplacementExpr match { - case Some(nrExpr) => List(arrayExpr, delimiterExpr, nrExpr) - case None => List(arrayExpr, delimiterExpr) - } - withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*) - None - } - case ArraysOverlap(leftArrayExpr, rightArrayExpr) => - if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { - createBinaryExpr( - expr, - leftArrayExpr, - rightArrayExpr, - inputs, - binding, - (builder, binaryExpr) => builder.setArraysOverlap(binaryExpr)) - } else { - withInfo( - expr, - s"$expr is not supported yet. To enable all incompatible casts, set " + - s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true") - None - } + case _: ArrayRemove => toProto(CometArrayRemove) + case _: ArrayContains => toProto(CometArrayContains) + case _: ArrayAppend => toProto(CometArrayAppend) + case _: ArrayIntersect => toProto(CometArrayIntersect) + case _: ArrayJoin => toProto(CometArrayJoin) + case _: ArraysOverlap => toProto(CometArraysOverlap) case _ => withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) None } + } /** @@ -3470,6 +3415,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim */ trait CometExpressionSerde { + /** Determine whether this expression should only be enabled if COMET_ALLOW_INCOMPATIBLE is true */ + def isIncompat(): Boolean = false + /** * Convert a Spark expression into a protocol buffer representation that can be passed into * native code. diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 9058a641e..31817b49b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -19,11 +19,12 @@ package org.apache.comet.serde -import org.apache.spark.sql.catalyst.expressions.{ArrayRemove, Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression} import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, StructType} +import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo -import org.apache.comet.serde.QueryPlanSerde.createBinaryExpr +import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto} import org.apache.comet.shims.CometExprShim object CometArrayRemove extends CometExpressionSerde with CometExprShim { @@ -65,3 +66,109 @@ object CometArrayRemove extends CometExpressionSerde with CometExprShim { (builder, binaryExpr) => builder.setArrayRemove(binaryExpr)) } } + +object CometArrayAppend extends CometExpressionSerde with CometExprShim { + override def isIncompat(): Boolean = true + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + createBinaryExpr( + expr, + expr.children(0), + expr.children(1), + inputs, + binding, + (builder, binaryExpr) => builder.setArrayAppend(binaryExpr)) + } +} + +object CometArrayContains extends CometExpressionSerde with CometExprShim { + override def isIncompat(): Boolean = true + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + createBinaryExpr( + expr, + expr.children(0), + expr.children(1), + inputs, + binding, + (builder, binaryExpr) => builder.setArrayContains(binaryExpr)) + } +} + +object CometArrayIntersect extends CometExpressionSerde with CometExprShim { + override def isIncompat(): Boolean = true + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + createBinaryExpr( + expr, + expr.children(0), + expr.children(1), + inputs, + binding, + (builder, binaryExpr) => builder.setArrayIntersect(binaryExpr)) + } +} + +object CometArraysOverlap extends CometExpressionSerde with CometExprShim { + override def isIncompat(): Boolean = true + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + createBinaryExpr( + expr, + expr.children(0), + expr.children(1), + inputs, + binding, + (builder, binaryExpr) => builder.setArraysOverlap(binaryExpr)) + } +} + +object CometArrayJoin extends CometExpressionSerde with CometExprShim { + override def isIncompat(): Boolean = true + + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val arrayExpr = expr.asInstanceOf[ArrayJoin] + val arrayExprProto = exprToProto(arrayExpr, inputs, binding) + val delimiterExprProto = exprToProto(arrayExpr.delimiter, inputs, binding) + + if (arrayExprProto.isDefined && delimiterExprProto.isDefined) { + val arrayJoinBuilder = arrayExpr.nullReplacement match { + case Some(nrExpr) => + val nullReplacementExprProto = exprToProto(nrExpr, inputs, binding) + ExprOuterClass.ArrayJoin + .newBuilder() + .setArrayExpr(arrayExprProto.get) + .setDelimiterExpr(delimiterExprProto.get) + .setNullReplacementExpr(nullReplacementExprProto.get) + case None => + ExprOuterClass.ArrayJoin + .newBuilder() + .setArrayExpr(arrayExprProto.get) + .setDelimiterExpr(delimiterExprProto.get) + } + Some( + ExprOuterClass.Expr + .newBuilder() + .setArrayJoin(arrayJoinBuilder) + .build()) + } else { + val exprs: List[Expression] = arrayExpr.nullReplacement match { + case Some(nrExpr) => List(arrayExpr, arrayExpr.delimiter, nrExpr) + case None => List(arrayExpr, arrayExpr.delimiter) + } + withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*) + None + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index f82101b3a..17fc94f08 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2658,52 +2658,58 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("array_contains") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1"); - checkSparkAnswerAndOperator( - spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); - } - } - - test("array_intersect") { - Seq(true, false).foreach { dictionaryEnabled => + withSQLConf(CometConf.COMET_ALLOW_INCOMPATIBLE.key -> "true") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") + makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1"); checkSparkAnswerAndOperator( - sql("SELECT array_intersect(array(_2, _3, _4), array(_3, _4)) from t1")) + spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1")) checkSparkAnswerAndOperator( - sql("SELECT array_intersect(array(_2 * -1), array(_9, _10)) from t1")) - checkSparkAnswerAndOperator(sql("SELECT array_intersect(array(_18), array(_19)) from t1")) + spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); + } + } + } + + test("array_intersect") { + withSQLConf(CometConf.COMET_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator( + sql("SELECT array_intersect(array(_2, _3, _4), array(_3, _4)) from t1")) + checkSparkAnswerAndOperator( + sql("SELECT array_intersect(array(_2 * -1), array(_9, _10)) from t1")) + checkSparkAnswerAndOperator(sql("SELECT array_intersect(array(_18), array(_19)) from t1")) + } } } } test("array_join") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") - checkSparkAnswerAndOperator(sql( - "SELECT array_join(array(cast(_1 as string), cast(_2 as string), cast(_6 as string)), ' @ ') from t1")) - checkSparkAnswerAndOperator(sql( - "SELECT array_join(array(cast(_1 as string), cast(_2 as string), cast(_6 as string)), ' @ ', ' +++ ') from t1")) - checkSparkAnswerAndOperator(sql( - "SELECT array_join(array('hello', 'world', cast(_2 as string)), ' ') from t1 where _2 is not null")) - checkSparkAnswerAndOperator( - sql("SELECT array_join(array('hello', '-', 'world', cast(_2 as string)), ' ') from t1")) + withSQLConf(CometConf.COMET_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql( + "SELECT array_join(array(cast(_1 as string), cast(_2 as string), cast(_6 as string)), ' @ ') from t1")) + checkSparkAnswerAndOperator(sql( + "SELECT array_join(array(cast(_1 as string), cast(_2 as string), cast(_6 as string)), ' @ ', ' +++ ') from t1")) + checkSparkAnswerAndOperator(sql( + "SELECT array_join(array('hello', 'world', cast(_2 as string)), ' ') from t1 where _2 is not null")) + checkSparkAnswerAndOperator( + sql("SELECT array_join(array('hello', '-', 'world', cast(_2 as string)), ' ') from t1")) + } } } } test("arrays_overlap") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") From 7f00ab91e5fd2fd89124ad2b9e80c9198f9b8e5f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Jan 2025 17:26:36 -0700 Subject: [PATCH 2/9] rename config --- .../scala/org/apache/comet/CometConf.scala | 2 +- .../apache/comet/serde/QueryPlanSerde.scala | 11 +++++++---- .../scala/org/apache/comet/serde/arrays.scala | 18 ++++++++---------- .../apache/comet/CometExpressionSuite.scala | 14 ++++++++------ 4 files changed, 24 insertions(+), 21 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 04729fdd4..c92eeb2b8 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -605,7 +605,7 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = + val COMET_EXPR_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = conf("spark.comet.expression.allowIncompatible") .doc( "Comet is not currently fully compatible with Spark for all expressions. " + diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 71992c869..76e2109cc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -930,11 +930,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim SQLConf.get def toProto(handler: CometExpressionSerde): Option[Expr] = { - if (handler.isIncompat() && !CometConf.COMET_ALLOW_INCOMPATIBLE.get()) { + if (handler.isIncompat && !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { withInfo( expr, s"$expr is not fully compatible with Spark. To enable it anyway, set " + - s"${CometConf.COMET_ALLOW_INCOMPATIBLE.key}=true") + s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true") return None } handler.convert(expr, inputs, binding) @@ -3415,8 +3415,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim */ trait CometExpressionSerde { - /** Determine whether this expression should only be enabled if COMET_ALLOW_INCOMPATIBLE is true */ - def isIncompat(): Boolean = false + /** + * Determine whether this expression should only be enabled if COMET_EXPR_ALLOW_INCOMPATIBLE is + * true + */ + def isIncompat: Boolean = false /** * Convert a Spark expression into a protocol buffer representation that can be passed into diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 31817b49b..80f18f2ae 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -22,7 +22,6 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression} import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, StructType} -import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto} import org.apache.comet.shims.CometExprShim @@ -68,11 +67,11 @@ object CometArrayRemove extends CometExpressionSerde with CometExprShim { } object CometArrayAppend extends CometExpressionSerde with CometExprShim { - override def isIncompat(): Boolean = true + override def isIncompat: Boolean = true override def convert( - expr: Expression, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { createBinaryExpr( expr, expr.children(0), @@ -84,7 +83,7 @@ object CometArrayAppend extends CometExpressionSerde with CometExprShim { } object CometArrayContains extends CometExpressionSerde with CometExprShim { - override def isIncompat(): Boolean = true + override def isIncompat: Boolean = true override def convert( expr: Expression, inputs: Seq[Attribute], @@ -100,7 +99,7 @@ object CometArrayContains extends CometExpressionSerde with CometExprShim { } object CometArrayIntersect extends CometExpressionSerde with CometExprShim { - override def isIncompat(): Boolean = true + override def isIncompat: Boolean = true override def convert( expr: Expression, inputs: Seq[Attribute], @@ -116,7 +115,7 @@ object CometArrayIntersect extends CometExpressionSerde with CometExprShim { } object CometArraysOverlap extends CometExpressionSerde with CometExprShim { - override def isIncompat(): Boolean = true + override def isIncompat: Boolean = true override def convert( expr: Expression, inputs: Seq[Attribute], @@ -132,8 +131,7 @@ object CometArraysOverlap extends CometExpressionSerde with CometExprShim { } object CometArrayJoin extends CometExpressionSerde with CometExprShim { - override def isIncompat(): Boolean = true - + override def isIncompat: Boolean = true override def convert( expr: Expression, inputs: Seq[Attribute], diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 17fc94f08..d1e8ad57e 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -2658,7 +2658,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("array_contains") { - withSQLConf(CometConf.COMET_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000) @@ -2672,7 +2672,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("array_intersect") { - withSQLConf(CometConf.COMET_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -2682,14 +2682,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { sql("SELECT array_intersect(array(_2, _3, _4), array(_3, _4)) from t1")) checkSparkAnswerAndOperator( sql("SELECT array_intersect(array(_2 * -1), array(_9, _10)) from t1")) - checkSparkAnswerAndOperator(sql("SELECT array_intersect(array(_18), array(_19)) from t1")) + checkSparkAnswerAndOperator( + sql("SELECT array_intersect(array(_18), array(_19)) from t1")) } } } } test("array_join") { - withSQLConf(CometConf.COMET_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -2702,14 +2703,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkAnswerAndOperator(sql( "SELECT array_join(array('hello', 'world', cast(_2 as string)), ' ') from t1 where _2 is not null")) checkSparkAnswerAndOperator( - sql("SELECT array_join(array('hello', '-', 'world', cast(_2 as string)), ' ') from t1")) + sql( + "SELECT array_join(array('hello', '-', 'world', cast(_2 as string)), ' ') from t1")) } } } } test("arrays_overlap") { - withSQLConf(CometConf.COMET_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") From 6460add24d00f2f7283641a46bd57803a11c394b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Jan 2025 17:43:58 -0700 Subject: [PATCH 3/9] fix --- docs/source/user-guide/configs.md | 1 + .../src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 2bea501e5..aa14214cf 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -64,6 +64,7 @@ Comet provides the following configuration settings. | spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false | | spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false | | spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | +| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. See compatibility guide for more information. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 76e2109cc..6713629f6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2384,7 +2384,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case _: ArrayRemove => toProto(CometArrayRemove) case _: ArrayContains => toProto(CometArrayContains) - case _: ArrayAppend => toProto(CometArrayAppend) + case _ if expr.prettyName == "array_append" => toProto(CometArrayAppend) case _: ArrayIntersect => toProto(CometArrayIntersect) case _: ArrayJoin => toProto(CometArrayJoin) case _: ArraysOverlap => toProto(CometArraysOverlap) From 5c333b7bed33d32977d49abcf2d34a5824423228 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Jan 2025 07:06:44 -0700 Subject: [PATCH 4/9] fix --- .../apache/comet/serde/QueryPlanSerde.scala | 39 +++-- .../scala/org/apache/comet/serde/arrays.scala | 17 +- .../comet/CometArrayExpressionSuite.scala | 161 ++++++++++++++++++ .../apache/comet/CometExpressionSuite.scala | 157 +---------------- 4 files changed, 187 insertions(+), 187 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 6713629f6..088876821 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -929,15 +929,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim binding: Boolean): Option[Expr] = { SQLConf.get - def toProto(handler: CometExpressionSerde): Option[Expr] = { - if (handler.isIncompat && !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { - withInfo( - expr, - s"$expr is not fully compatible with Spark. To enable it anyway, set " + - s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true") - return None + def convert(handler: CometExpressionSerde): Option[Expr] = { + handler match { + case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() => + withInfo( + expr, + s"$expr is not fully compatible with Spark. To enable it anyway, set " + + s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true") + None + case _ => + handler.convert(expr, inputs, binding) } - handler.convert(expr, inputs, binding) } expr match { @@ -2382,12 +2384,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo(expr, "unsupported arguments for GetArrayStructFields", child) None } - case _: ArrayRemove => toProto(CometArrayRemove) - case _: ArrayContains => toProto(CometArrayContains) - case _ if expr.prettyName == "array_append" => toProto(CometArrayAppend) - case _: ArrayIntersect => toProto(CometArrayIntersect) - case _: ArrayJoin => toProto(CometArrayJoin) - case _: ArraysOverlap => toProto(CometArraysOverlap) + case _: ArrayRemove => convert(CometArrayRemove) + case _: ArrayContains => convert(CometArrayContains) + case _ if expr.prettyName == "array_append" => convert(CometArrayAppend) + case _: ArrayIntersect => convert(CometArrayIntersect) + case _: ArrayJoin => convert(CometArrayJoin) + case _: ArraysOverlap => convert(CometArraysOverlap) case _ => withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) None @@ -3415,12 +3417,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim */ trait CometExpressionSerde { - /** - * Determine whether this expression should only be enabled if COMET_EXPR_ALLOW_INCOMPATIBLE is - * true - */ - def isIncompat: Boolean = false - /** * Convert a Spark expression into a protocol buffer representation that can be passed into * native code. @@ -3441,3 +3437,6 @@ trait CometExpressionSerde { inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] } + +/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */ +trait IncompatExpr {} diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 80f18f2ae..db1679f22 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -66,8 +66,7 @@ object CometArrayRemove extends CometExpressionSerde with CometExprShim { } } -object CometArrayAppend extends CometExpressionSerde with CometExprShim { - override def isIncompat: Boolean = true +object CometArrayAppend extends CometExpressionSerde with IncompatExpr { override def convert( expr: Expression, inputs: Seq[Attribute], @@ -82,8 +81,7 @@ object CometArrayAppend extends CometExpressionSerde with CometExprShim { } } -object CometArrayContains extends CometExpressionSerde with CometExprShim { - override def isIncompat: Boolean = true +object CometArrayContains extends CometExpressionSerde with IncompatExpr { override def convert( expr: Expression, inputs: Seq[Attribute], @@ -98,8 +96,7 @@ object CometArrayContains extends CometExpressionSerde with CometExprShim { } } -object CometArrayIntersect extends CometExpressionSerde with CometExprShim { - override def isIncompat: Boolean = true +object CometArrayIntersect extends CometExpressionSerde with IncompatExpr { override def convert( expr: Expression, inputs: Seq[Attribute], @@ -114,8 +111,7 @@ object CometArrayIntersect extends CometExpressionSerde with CometExprShim { } } -object CometArraysOverlap extends CometExpressionSerde with CometExprShim { - override def isIncompat: Boolean = true +object CometArraysOverlap extends CometExpressionSerde with IncompatExpr { override def convert( expr: Expression, inputs: Seq[Attribute], @@ -130,14 +126,13 @@ object CometArraysOverlap extends CometExpressionSerde with CometExprShim { } } -object CometArrayJoin extends CometExpressionSerde with CometExprShim { - override def isIncompat: Boolean = true +object CometArrayJoin extends CometExpressionSerde with IncompatExpr { override def convert( expr: Expression, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { val arrayExpr = expr.asInstanceOf[ArrayJoin] - val arrayExprProto = exprToProto(arrayExpr, inputs, binding) + val arrayExprProto = exprToProto(arrayExpr.array, inputs, binding) val delimiterExprProto = exprToProto(arrayExpr.delimiter, inputs, binding) if (arrayExprProto.isDefined && delimiterExprProto.isDefined) { diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 5727f9f90..df1fccb69 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -25,8 +25,10 @@ import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.{array, col, expr, lit, udf} import org.apache.spark.sql.types.StructType +import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus, isSpark35Plus} import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -131,4 +133,163 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp checkExplainString = false) } } + + test("array_append") { + assume(isSpark34Plus) + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1"); + checkSparkAnswerAndOperator(spark.sql("Select array_append(array(_1),false) from t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_append(array(_2, _3, _4), 4) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_append(array(_2, _3, _4), null) FROM t1")); + checkSparkAnswerAndOperator( + spark.sql("SELECT array_append(array(_6, _7), CAST(6.5 AS DOUBLE)) FROM t1")); + checkSparkAnswerAndOperator( + spark.sql("SELECT array_append(array(_8), 'test') FROM t1")); + checkSparkAnswerAndOperator(spark.sql("SELECT array_append(array(_19), _19) FROM t1")); + checkSparkAnswerAndOperator( + spark.sql("SELECT array_append((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); + } + } + } + } + + test("array_prepend") { + assume(isSpark35Plus) // in Spark 3.5 array_prepend is implemented via array_insert + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1"); + checkSparkAnswerAndOperator(spark.sql("Select array_prepend(array(_1),false) from t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_prepend(array(_2, _3, _4), 4) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_prepend(array(_2, _3, _4), null) FROM t1")); + checkSparkAnswerAndOperator( + spark.sql("SELECT array_prepend(array(_6, _7), CAST(6.5 AS DOUBLE)) FROM t1")); + checkSparkAnswerAndOperator(spark.sql("SELECT array_prepend(array(_8), 'test') FROM t1")); + checkSparkAnswerAndOperator(spark.sql("SELECT array_prepend(array(_19), _19) FROM t1")); + checkSparkAnswerAndOperator( + spark.sql("SELECT array_prepend((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); + } + } + } + + test("ArrayInsert") { + assume(isSpark34Plus) + Seq(true, false).foreach(dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + val df = spark.read + .parquet(path.toString) + .withColumn("arr", array(col("_4"), lit(null), col("_4"))) + .withColumn("arrInsertResult", expr("array_insert(arr, 1, 1)")) + .withColumn("arrInsertNegativeIndexResult", expr("array_insert(arr, -1, 1)")) + .withColumn("arrPosGreaterThanSize", expr("array_insert(arr, 8, 1)")) + .withColumn("arrNegPosGreaterThanSize", expr("array_insert(arr, -8, 1)")) + .withColumn("arrInsertNone", expr("array_insert(arr, 1, null)")) + checkSparkAnswerAndOperator(df.select("arrInsertResult")) + checkSparkAnswerAndOperator(df.select("arrInsertNegativeIndexResult")) + checkSparkAnswerAndOperator(df.select("arrPosGreaterThanSize")) + checkSparkAnswerAndOperator(df.select("arrNegPosGreaterThanSize")) + checkSparkAnswerAndOperator(df.select("arrInsertNone")) + }) + } + + test("ArrayInsertUnsupportedArgs") { + // This test checks that the else branch in ArrayInsert + // mapping to the comet is valid and fallback to spark is working fine. + assume(isSpark34Plus) + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = false, 10000) + val df = spark.read + .parquet(path.toString) + .withColumn("arr", array(col("_4"), lit(null), col("_4"))) + .withColumn("idx", udf((_: Int) => 1).apply(col("_4"))) + .withColumn("arrUnsupportedArgs", expr("array_insert(arr, idx, 1)")) + checkSparkAnswer(df.select("arrUnsupportedArgs")) + } + } + + test("array_contains") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1"); + checkSparkAnswerAndOperator( + spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1")) + checkSparkAnswerAndOperator( + spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); + } + } + } + + test("array_intersect") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator( + sql("SELECT array_intersect(array(_2, _3, _4), array(_3, _4)) from t1")) + checkSparkAnswerAndOperator( + sql("SELECT array_intersect(array(_2 * -1), array(_9, _10)) from t1")) + checkSparkAnswerAndOperator( + sql("SELECT array_intersect(array(_18), array(_19)) from t1")) + } + } + } + } + + test("array_join") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql( + "SELECT array_join(array(cast(_1 as string), cast(_2 as string), cast(_6 as string)), ' @ ') from t1")) + checkSparkAnswerAndOperator(sql( + "SELECT array_join(array(cast(_1 as string), cast(_2 as string), cast(_6 as string)), ' @ ', ' +++ ') from t1")) + checkSparkAnswerAndOperator(sql( + "SELECT array_join(array('hello', 'world', cast(_2 as string)), ' ') from t1 where _2 is not null")) + checkSparkAnswerAndOperator( + sql( + "SELECT array_join(array('hello', '-', 'world', cast(_2 as string)), ' ') from t1")) + } + } + } + } + + test("arrays_overlap") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END), array(_6, _7)) FROM t1")); + } + } + } + } + } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index d1e8ad57e..6ffc2993d 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types.{Decimal, DecimalType} -import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plus, isSpark35Plus, isSpark40Plus} +import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plus, isSpark40Plus} class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -2575,159 +2575,4 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("array_append") { - assume(isSpark34Plus) - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1"); - checkSparkAnswerAndOperator(spark.sql("Select array_append(array(_1),false) from t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_append(array(_2, _3, _4), 4) FROM t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_append(array(_2, _3, _4), null) FROM t1")); - checkSparkAnswerAndOperator( - spark.sql("SELECT array_append(array(_6, _7), CAST(6.5 AS DOUBLE)) FROM t1")); - checkSparkAnswerAndOperator(spark.sql("SELECT array_append(array(_8), 'test') FROM t1")); - checkSparkAnswerAndOperator(spark.sql("SELECT array_append(array(_19), _19) FROM t1")); - checkSparkAnswerAndOperator( - spark.sql("SELECT array_append((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); - } - } - } - - test("array_prepend") { - assume(isSpark35Plus) // in Spark 3.5 array_prepend is implemented via array_insert - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1"); - checkSparkAnswerAndOperator(spark.sql("Select array_prepend(array(_1),false) from t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_prepend(array(_2, _3, _4), 4) FROM t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_prepend(array(_2, _3, _4), null) FROM t1")); - checkSparkAnswerAndOperator( - spark.sql("SELECT array_prepend(array(_6, _7), CAST(6.5 AS DOUBLE)) FROM t1")); - checkSparkAnswerAndOperator(spark.sql("SELECT array_prepend(array(_8), 'test') FROM t1")); - checkSparkAnswerAndOperator(spark.sql("SELECT array_prepend(array(_19), _19) FROM t1")); - checkSparkAnswerAndOperator( - spark.sql("SELECT array_prepend((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); - } - } - } - - test("ArrayInsert") { - assume(isSpark34Plus) - Seq(true, false).foreach(dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled, 10000) - val df = spark.read - .parquet(path.toString) - .withColumn("arr", array(col("_4"), lit(null), col("_4"))) - .withColumn("arrInsertResult", expr("array_insert(arr, 1, 1)")) - .withColumn("arrInsertNegativeIndexResult", expr("array_insert(arr, -1, 1)")) - .withColumn("arrPosGreaterThanSize", expr("array_insert(arr, 8, 1)")) - .withColumn("arrNegPosGreaterThanSize", expr("array_insert(arr, -8, 1)")) - .withColumn("arrInsertNone", expr("array_insert(arr, 1, null)")) - checkSparkAnswerAndOperator(df.select("arrInsertResult")) - checkSparkAnswerAndOperator(df.select("arrInsertNegativeIndexResult")) - checkSparkAnswerAndOperator(df.select("arrPosGreaterThanSize")) - checkSparkAnswerAndOperator(df.select("arrNegPosGreaterThanSize")) - checkSparkAnswerAndOperator(df.select("arrInsertNone")) - }) - } - - test("ArrayInsertUnsupportedArgs") { - // This test checks that the else branch in ArrayInsert - // mapping to the comet is valid and fallback to spark is working fine. - assume(isSpark34Plus) - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = false, 10000) - val df = spark.read - .parquet(path.toString) - .withColumn("arr", array(col("_4"), lit(null), col("_4"))) - .withColumn("idx", udf((_: Int) => 1).apply(col("_4"))) - .withColumn("arrUnsupportedArgs", expr("array_insert(arr, idx, 1)")) - checkSparkAnswer(df.select("arrUnsupportedArgs")) - } - } - - test("array_contains") { - withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1"); - checkSparkAnswerAndOperator( - spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1")) - checkSparkAnswerAndOperator( - spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1")); - } - } - } - - test("array_intersect") { - withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") - checkSparkAnswerAndOperator( - sql("SELECT array_intersect(array(_2, _3, _4), array(_3, _4)) from t1")) - checkSparkAnswerAndOperator( - sql("SELECT array_intersect(array(_2 * -1), array(_9, _10)) from t1")) - checkSparkAnswerAndOperator( - sql("SELECT array_intersect(array(_18), array(_19)) from t1")) - } - } - } - } - - test("array_join") { - withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") - checkSparkAnswerAndOperator(sql( - "SELECT array_join(array(cast(_1 as string), cast(_2 as string), cast(_6 as string)), ' @ ') from t1")) - checkSparkAnswerAndOperator(sql( - "SELECT array_join(array(cast(_1 as string), cast(_2 as string), cast(_6 as string)), ' @ ', ' +++ ') from t1")) - checkSparkAnswerAndOperator(sql( - "SELECT array_join(array('hello', 'world', cast(_2 as string)), ' ') from t1 where _2 is not null")) - checkSparkAnswerAndOperator( - sql( - "SELECT array_join(array('hello', '-', 'world', cast(_2 as string)), ' ') from t1")) - } - } - } - } - - test("arrays_overlap") { - withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null")) - checkSparkAnswerAndOperator(spark.sql( - "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END), array(_6, _7)) FROM t1")); - } - } - } - } - } From 1c5e992b8a55f6109e67cf979b487c052e251a7e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Jan 2025 10:42:39 -0700 Subject: [PATCH 5/9] Update spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala Co-authored-by: Oleks V --- spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 088876821..376587ecf 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2386,6 +2386,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case _: ArrayRemove => convert(CometArrayRemove) case _: ArrayContains => convert(CometArrayContains) + // Function introduced in 3.4.0. Refer by name to provide compatibility with older Spark builds case _ if expr.prettyName == "array_append" => convert(CometArrayAppend) case _: ArrayIntersect => convert(CometArrayIntersect) case _: ArrayJoin => convert(CometArrayJoin) From a3e9c46793904adbf9b412d05882a671d4fe0e42 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Jan 2025 10:57:26 -0700 Subject: [PATCH 6/9] address feedback --- .../main/scala/org/apache/comet/CometConf.scala | 15 ++++++++------- docs/source/user-guide/configs.md | 6 +++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index c92eeb2b8..0aa73a82c 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -45,6 +45,9 @@ import org.apache.comet.shims.ShimCometConf */ object CometConf extends ShimCometConf { + private val COMPAT_GUIDE = "For more information, refer to the Comet Compatibility " + + "Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html)" + private val TUNING_GUIDE = "For more information, refer to the Comet Tuning " + "Guide (https://datafusion.apache.org/comet/user-guide/tuning.html)" @@ -609,8 +612,7 @@ object CometConf extends ShimCometConf { conf("spark.comet.expression.allowIncompatible") .doc( "Comet is not currently fully compatible with Spark for all expressions. " + - "Set this config to true to allow them anyway. See compatibility guide " + - "for more information.") + s"Set this config to true to allow them anyway. $COMPAT_GUIDE.") .booleanConf .createWithDefault(false) @@ -618,16 +620,15 @@ object CometConf extends ShimCometConf { conf("spark.comet.cast.allowIncompatible") .doc( "Comet is not currently fully compatible with Spark for all cast operations. " + - "Set this config to true to allow them anyway. See compatibility guide " + - "for more information.") + s"Set this config to true to allow them anyway. $COMPAT_GUIDE.") .booleanConf .createWithDefault(false) val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = conf("spark.comet.regexp.allowIncompatible") - .doc("Comet is not currently fully compatible with Spark for all regular expressions. " + - "Set this config to true to allow them anyway using Rust's regular expression engine. " + - "See compatibility guide for more information.") + .doc( + "Comet is not currently fully compatible with Spark for all regular expressions. " + + s"Set this config to true to allow them anyway. $COMPAT_GUIDE.") .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index aa14214cf..8245e7b76 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -25,7 +25,7 @@ Comet provides the following configuration settings. |--------|-------------|---------------| | spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 | | spark.comet.caseConversion.enabled | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false | -| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. See compatibility guide for more information. | false | +| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. | false | | spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 | | spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 | @@ -64,7 +64,7 @@ Comet provides the following configuration settings. | spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false | | spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false | | spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | -| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. See compatibility guide for more information. | false | +| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | @@ -74,7 +74,7 @@ Comet provides the following configuration settings. | spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 | | spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true | | spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 | -| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false | +| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true | | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false | | spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | From d6fa54240093fc997c8f067c029c6ef02f9d8385 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Jan 2025 11:00:49 -0700 Subject: [PATCH 7/9] address feedback --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 0aa73a82c..082423571 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -45,7 +45,7 @@ import org.apache.comet.shims.ShimCometConf */ object CometConf extends ShimCometConf { - private val COMPAT_GUIDE = "For more information, refer to the Comet Compatibility " + + val COMPAT_GUIDE: String = "For more information, refer to the Comet Compatibility " + "Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html)" private val TUNING_GUIDE = "For more information, refer to the Comet Tuning " + diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 376587ecf..03cfc72ff 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -935,7 +935,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo( expr, s"$expr is not fully compatible with Spark. To enable it anyway, set " + - s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true") + s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.") None case _ => handler.convert(expr, inputs, binding) @@ -2386,7 +2386,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case _: ArrayRemove => convert(CometArrayRemove) case _: ArrayContains => convert(CometArrayContains) - // Function introduced in 3.4.0. Refer by name to provide compatibility with older Spark builds + // Function introduced in 3.4.0. Refer by name to provide compatibility with older Spark builds case _ if expr.prettyName == "array_append" => convert(CometArrayAppend) case _: ArrayIntersect => convert(CometArrayIntersect) case _: ArrayJoin => convert(CometArrayJoin) From e52387156133bd3877df9259f1365c0eade9f91a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Jan 2025 14:01:11 -0700 Subject: [PATCH 8/9] Update documentation --- docs/source/user-guide/expressions.md | 29 ++++++++++++++++-------- docs/templates/compatibility-template.md | 22 +++++++++++++----- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 99490f440..6125aa074 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -182,15 +182,26 @@ The following Spark expressions are currently available. Any known compatibility | VariancePop | | | VarianceSamp | | -## Complex Types - -| Expression | Notes | -| ----------------- | ----------- | -| CreateNamedStruct | | -| ElementAt | Arrays only | -| GetArrayItem | | -| GetStructField | | -| StructsToJson | | +## Arrays + +| Expression | Notes | +|-------------------|--------------| +| ArrayAppend | Experimental | +| ArrayContains | Experimental | +| ArrayIntersect | Experimental | +| ArrayJoin | Experimental | +| ArrayRemove | Experimental | +| ArraysOverlap | Experimental | +| ElementAt | Arrays only | +| GetArrayItem | | + +## Structs + +| Expression | Notes | +|-------------------|--------------| +| CreateNamedStruct | | +| GetStructField | | +| StructsToJson | | ## Other diff --git a/docs/templates/compatibility-template.md b/docs/templates/compatibility-template.md index f6a725ac6..c63876e90 100644 --- a/docs/templates/compatibility-template.md +++ b/docs/templates/compatibility-template.md @@ -32,12 +32,6 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. -## Regular Expressions - -Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's -regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but -this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. - ## Floating number comparison Spark normalizes NaN and zero for floating point numbers for several cases. See `NormalizeFloatingNumbers` optimization rule in Spark. @@ -46,6 +40,22 @@ because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`). functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., [arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)). So Comet will add additional normalization expression of NaN and zero for comparison. +## Incompatible Expressions + +Some Comet native expressions are not 100% compatible with Spark and are disabled by default. These expressions +will fall back to Spark but can be enabled by setting `spark.comet.expression.allowIncompatible=true`. + +## Array Expressions + +Comet has experimental support for a number of array expressions. These are experimental and currently marked +as incompatible and can be enabled by setting `spark.comet.expression.allowIncompatible=true`. + +## Regular Expressions + +Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's +regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but +this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. + ## Cast Cast operations in Comet fall into three levels of support: From 23accfa41e5e68c30e08730319804833cf44a294 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Jan 2025 14:16:53 -0700 Subject: [PATCH 9/9] scalastyle --- .../src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 03cfc72ff..7b375bc23 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2386,7 +2386,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case _: ArrayRemove => convert(CometArrayRemove) case _: ArrayContains => convert(CometArrayContains) - // Function introduced in 3.4.0. Refer by name to provide compatibility with older Spark builds + // Function introduced in 3.4.0. Refer by name to provide compatibility + // with older Spark builds case _ if expr.prettyName == "array_append" => convert(CometArrayAppend) case _: ArrayIntersect => convert(CometArrayIntersect) case _: ArrayJoin => convert(CometArrayJoin)