Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Move all array_* serde to new framework, use correct INCOMPAT config #1349

Merged
merged 10 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,15 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXPR_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.expression.allowIncompatible")
.doc(
"Comet is not currently fully compatible with Spark for all expressions. " +
"Set this config to true to allow them anyway. See compatibility guide " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Set this config to true to allow them anyway. See compatibility guide " +
"Set this config to true to allow them anyway. See Compatibility Guide " +

"for more information.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"for more information.")
"for more information. (docs/user-guide/compatibility.md)")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit to update this by referencing a new CometConf.COMPAT_GUIDE variable with the link to the published documentation.

I regenerated the compatibility guide as well.

.booleanConf
.createWithDefault(false)

val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.cast.allowIncompatible")
.doc(
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Comet provides the following configuration settings.
| spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. See compatibility guide for more information. | false |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. See compatibility guide for more information. | false |
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. See Compatibility Guide for more information. (docs/user-guide/compatibility.md) | false |

| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
Expand Down
96 changes: 23 additions & 73 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
binding: Boolean): Option[Expr] = {
SQLConf.get

def convert(handler: CometExpressionSerde): Option[Expr] = {
handler match {
case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() =>
withInfo(
expr,
s"$expr is not fully compatible with Spark. To enable it anyway, set " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s"$expr is not fully compatible with Spark. To enable it anyway, set " +
s"$expr is not fully compatible with Spark, see details in Compatibility Guide. To enable it anyway, set " +

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit to update this by referencing a new CometConf.COMPAT_GUIDE variable with the link to the published documentation.

s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true")
None
case _ =>
handler.convert(expr, inputs, binding)
}
}

expr match {
case a @ Alias(_, _) =>
val r = exprToProtoInternal(a.child, inputs, binding)
Expand Down Expand Up @@ -2371,83 +2384,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
withInfo(expr, "unsupported arguments for GetArrayStructFields", child)
None
}
case expr: ArrayRemove => CometArrayRemove.convert(expr, inputs, binding)
case expr if expr.prettyName == "array_contains" =>
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArrayContains(binaryExpr))
case _ if expr.prettyName == "array_append" =>
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArrayAppend(binaryExpr))
case _ if expr.prettyName == "array_intersect" =>
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArrayIntersect(binaryExpr))
case ArrayJoin(arrayExpr, delimiterExpr, nullReplacementExpr) =>
val arrayExprProto = exprToProto(arrayExpr, inputs, binding)
val delimiterExprProto = exprToProto(delimiterExpr, inputs, binding)

if (arrayExprProto.isDefined && delimiterExprProto.isDefined) {
val arrayJoinBuilder = nullReplacementExpr match {
case Some(nrExpr) =>
val nullReplacementExprProto = exprToProto(nrExpr, inputs, binding)
ExprOuterClass.ArrayJoin
.newBuilder()
.setArrayExpr(arrayExprProto.get)
.setDelimiterExpr(delimiterExprProto.get)
.setNullReplacementExpr(nullReplacementExprProto.get)
case None =>
ExprOuterClass.ArrayJoin
.newBuilder()
.setArrayExpr(arrayExprProto.get)
.setDelimiterExpr(delimiterExprProto.get)
}
Some(
ExprOuterClass.Expr
.newBuilder()
.setArrayJoin(arrayJoinBuilder)
.build())
} else {
val exprs: List[Expression] = nullReplacementExpr match {
case Some(nrExpr) => List(arrayExpr, delimiterExpr, nrExpr)
case None => List(arrayExpr, delimiterExpr)
}
withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*)
None
}
case ArraysOverlap(leftArrayExpr, rightArrayExpr) =>
if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) {
createBinaryExpr(
expr,
leftArrayExpr,
rightArrayExpr,
inputs,
binding,
(builder, binaryExpr) => builder.setArraysOverlap(binaryExpr))
} else {
withInfo(
expr,
s"$expr is not supported yet. To enable all incompatible casts, set " +
s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true")
None
}
case _: ArrayRemove => convert(CometArrayRemove)
case _: ArrayContains => convert(CometArrayContains)
case _ if expr.prettyName == "array_append" => convert(CometArrayAppend)
andygrove marked this conversation as resolved.
Show resolved Hide resolved
case _: ArrayIntersect => convert(CometArrayIntersect)
case _: ArrayJoin => convert(CometArrayJoin)
case _: ArraysOverlap => convert(CometArraysOverlap)
case _ =>
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
None
}

}

/**
Expand Down Expand Up @@ -3490,3 +3437,6 @@ trait CometExpressionSerde {
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr]
}

/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */
trait IncompatExpr {}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later, we could add a method to this trait to provide the reason why an expression is incompatible and this could be used to generate documentation.

104 changes: 102 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.{ArrayRemove, Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, Attribute, Expression}
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, StructType}

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.QueryPlanSerde.createBinaryExpr
import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto}
import org.apache.comet.shims.CometExprShim

object CometArrayRemove extends CometExpressionSerde with CometExprShim {
Expand Down Expand Up @@ -65,3 +65,103 @@ object CometArrayRemove extends CometExpressionSerde with CometExprShim {
(builder, binaryExpr) => builder.setArrayRemove(binaryExpr))
}
}

object CometArrayAppend extends CometExpressionSerde with IncompatExpr {
override def convert(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArrayAppend(binaryExpr))
}
}

object CometArrayContains extends CometExpressionSerde with IncompatExpr {
override def convert(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArrayContains(binaryExpr))
}
}

object CometArrayIntersect extends CometExpressionSerde with IncompatExpr {
override def convert(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArrayIntersect(binaryExpr))
}
}

object CometArraysOverlap extends CometExpressionSerde with IncompatExpr {
override def convert(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
createBinaryExpr(
expr,
expr.children(0),
expr.children(1),
inputs,
binding,
(builder, binaryExpr) => builder.setArraysOverlap(binaryExpr))
}
}

object CometArrayJoin extends CometExpressionSerde with IncompatExpr {
override def convert(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val arrayExpr = expr.asInstanceOf[ArrayJoin]
val arrayExprProto = exprToProto(arrayExpr.array, inputs, binding)
val delimiterExprProto = exprToProto(arrayExpr.delimiter, inputs, binding)

if (arrayExprProto.isDefined && delimiterExprProto.isDefined) {
val arrayJoinBuilder = arrayExpr.nullReplacement match {
case Some(nrExpr) =>
val nullReplacementExprProto = exprToProto(nrExpr, inputs, binding)
ExprOuterClass.ArrayJoin
.newBuilder()
.setArrayExpr(arrayExprProto.get)
.setDelimiterExpr(delimiterExprProto.get)
.setNullReplacementExpr(nullReplacementExprProto.get)
case None =>
ExprOuterClass.ArrayJoin
.newBuilder()
.setArrayExpr(arrayExprProto.get)
.setDelimiterExpr(delimiterExprProto.get)
}
Some(
ExprOuterClass.Expr
.newBuilder()
.setArrayJoin(arrayJoinBuilder)
.build())
} else {
val exprs: List[Expression] = arrayExpr.nullReplacement match {
case Some(nrExpr) => List(arrayExpr, arrayExpr.delimiter, nrExpr)
case None => List(arrayExpr, arrayExpr.delimiter)
}
withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*)
None
}
}
}
Loading
Loading