From 059c03713c37009718db5fdc1b39717c21ab9965 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Fri, 31 Jan 2025 18:54:00 +0530 Subject: [PATCH 1/2] impl array_union Signed-off-by: Dharan Aditya --- native/core/src/execution/planner.rs | 17 ++++++++++++++++- native/proto/src/proto/expr.proto | 1 + .../apache/comet/serde/QueryPlanSerde.scala | 1 + .../scala/org/apache/comet/serde/arrays.scala | 16 ++++++++++++++++ .../comet/CometArrayExpressionSuite.scala | 18 ++++++++++++++++++ 5 files changed, 52 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 3dc59a9fd9..0eb5f8c2f2 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -67,7 +67,7 @@ use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr} use datafusion_functions_nested::array_has::array_has_any_udf; use datafusion_functions_nested::concat::ArrayAppend; use datafusion_functions_nested::remove::array_remove_all_udf; -use datafusion_functions_nested::set_ops::array_intersect_udf; +use datafusion_functions_nested::set_ops::{array_intersect_udf, array_union_udf}; use datafusion_functions_nested::string::array_to_string_udf; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; @@ -829,6 +829,21 @@ impl PhysicalPlanner { )); Ok(array_has_any_expr) } + ExprStruct::ArrayUnion(expr) => { + let left_array_expr = + self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right_array_expr = + self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; + let array_union_udf = array_union_udf(); + let return_type = right_array_expr.data_type(&input_schema)?; + let args = vec![Arc::clone(&left_array_expr), right_array_expr]; + Ok(Arc::new(ScalarFunctionExpr::new( + "array_union", + array_union_udf, + args, + return_type, + ))) + } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", expr diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index fd928fd8a3..0276cba900 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -89,6 +89,7 @@ message Expr { BinaryExpr array_intersect = 62; ArrayJoin array_join = 63; BinaryExpr arrays_overlap = 64; + BinaryExpr array_union = 65; } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f4699af8de..24bf0a9b77 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2366,6 +2366,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case _: ArrayIntersect => convert(CometArrayIntersect) case _: ArrayJoin => convert(CometArrayJoin) case _: ArraysOverlap => convert(CometArraysOverlap) + case _: ArrayUnion => convert(CometArrayUnion) case _ => withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) None diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index db1679f22b..f412ce2d98 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -165,3 +165,19 @@ object CometArrayJoin extends CometExpressionSerde with IncompatExpr { } } } + +object CometArrayUnion extends CometExpressionSerde with IncompatExpr { + + override def convert( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + createBinaryExpr( + expr, + expr.children(0), + expr.children(1), + inputs, + binding, + (builder, binaryExpr) => builder.setArrayUnion(binaryExpr)) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index df1fccb698..eb2bab287c 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -292,4 +292,22 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } + test("array_union") { + assume(isSpark34Plus) + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator( + sql("SELECT array_union(array(_2, _3, _4), array(_3, _4)) from t1")) + checkSparkAnswerAndOperator( + sql("SELECT array_union(array(_2 * -1), array(_9, _10)) from t1")) + checkSparkAnswerAndOperator(sql("SELECT array_union(array(_18), array(_19)) from t1")) + } + } + } + } + } From d5858a4b8af115ef7269514295b619b24aaecd72 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Fri, 31 Jan 2025 21:41:37 +0530 Subject: [PATCH 2/2] update ut Signed-off-by: Dharan Aditya --- native/core/src/execution/planner.rs | 17 +++++++++-------- .../comet/CometArrayExpressionSuite.scala | 4 +--- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0eb5f8c2f2..1ee5dd2a56 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -830,19 +830,20 @@ impl PhysicalPlanner { Ok(array_has_any_expr) } ExprStruct::ArrayUnion(expr) => { - let left_array_expr = + let left_expr = self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right_array_expr = + let right_expr = self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?; - let array_union_udf = array_union_udf(); - let return_type = right_array_expr.data_type(&input_schema)?; - let args = vec![Arc::clone(&left_array_expr), right_array_expr]; - Ok(Arc::new(ScalarFunctionExpr::new( + let args = vec![Arc::clone(&left_expr), right_expr]; + let datafusion_array_intersect = array_union_udf(); + let return_type = left_expr.data_type(&input_schema)?; + let array_intersect_expr = Arc::new(ScalarFunctionExpr::new( "array_union", - array_union_udf, + datafusion_array_intersect, args, return_type, - ))) + )); + Ok(array_intersect_expr) } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index eb2bab287c..a2e7c38fbf 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -298,12 +298,10 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled, 10000) + makeParquetFileAllTypes(path, dictionaryEnabled, 1000) spark.read.parquet(path.toString).createOrReplaceTempView("t1") checkSparkAnswerAndOperator( sql("SELECT array_union(array(_2, _3, _4), array(_3, _4)) from t1")) - checkSparkAnswerAndOperator( - sql("SELECT array_union(array(_2 * -1), array(_9, _10)) from t1")) checkSparkAnswerAndOperator(sql("SELECT array_union(array(_18), array(_19)) from t1")) } }