diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 9b918ad8b2..5817eeac55 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -149,6 +149,7 @@ jobs: org.apache.comet.CometBitwiseExpressionSuite org.apache.comet.CometMapExpressionSuite org.apache.comet.objectstore.NativeConfigSuite + org.apache.comet.CometBitmapExpressionSuite - name: "sql" value: | org.apache.spark.sql.CometToPrettyStringSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index fb6a8295bc..bb996e17e0 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -114,6 +114,7 @@ jobs: org.apache.comet.CometBitwiseExpressionSuite org.apache.comet.CometMapExpressionSuite org.apache.comet.objectstore.NativeConfigSuite + org.apache.comet.CometBitmapExpressionSuite - name: "sql" value: | org.apache.spark.sql.CometToPrettyStringSuite diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 52b8eb6a30..bddfbc9652 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -40,6 +40,7 @@ use datafusion::{ prelude::{SessionConfig, SessionContext}, }; use datafusion_comet_proto::spark_operator::Operator; +use datafusion_spark::function::bitmap::bitmap_count::BitmapCount; use datafusion_spark::function::bitwise::bit_get::SparkBitGet; use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; @@ -307,6 +308,7 @@ fn prepare_datafusion_session_context( session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitGet::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateAdd::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(BitmapCount::default())); // Must be the last one to override existing functions with the same name datafusion_comet_spark_expr::register_all_comet_functions(&mut session_ctx)?; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 892d8bca63..a267bb2e1d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, NormalizeNaNAndZero} import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec @@ -230,7 +229,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Coalesce] -> CometCoalesce, classOf[Literal] -> CometLiteral, classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId, - classOf[SparkPartitionID] -> CometSparkPartitionId) + classOf[SparkPartitionID] -> CometSparkPartitionId, + classOf[StaticInvoke] -> CometStaticInvoke) /** * Mapping of Spark expression class to Comet expression handler. @@ -821,30 +821,6 @@ object QueryPlanSerde extends Logging with CometExprShim { None } - // With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for - // char types. - // See https://github.com/apache/spark/pull/38151 - case s: StaticInvoke - // classOf gets ther runtime class of T, which lets us compare directly - // Otherwise isInstanceOf[Class[T]] will always evaluate to true for Class[_] - if s.staticObject == classOf[CharVarcharCodegenUtils] && - s.dataType.isInstanceOf[StringType] && - s.functionName == "readSidePadding" && - s.arguments.size == 2 && - s.propagateNull && - !s.returnNullable && - s.isDeterministic => - val argsExpr = Seq( - exprToProtoInternal(Cast(s.arguments(0), StringType), inputs, binding), - exprToProtoInternal(s.arguments(1), inputs, binding)) - - if (argsExpr.forall(_.isDefined)) { - scalarFunctionExprToProto("read_side_padding", argsExpr: _*) - } else { - withInfo(expr, s.arguments: _*) - None - } - case KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) => val dataType = serializeDataType(expr.dataType) if (dataType.isEmpty) { diff --git a/spark/src/main/scala/org/apache/comet/serde/statics.scala b/spark/src/main/scala/org/apache/comet/serde/statics.scala new file mode 100644 index 0000000000..00f8bb2957 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/statics.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke + +import org.apache.comet.CometSparkSessionExtensions.withInfo + +object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { + + private val staticInvokeExpressions: Map[String, CometExpressionSerde[StaticInvoke]] = + Map( + "bitmapCount" -> CometScalarFunction("bitmap_count"), + "readSidePadding" -> CometScalarFunction("read_side_padding")) + + override def convert( + expr: StaticInvoke, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + staticInvokeExpressions.get(expr.functionName) match { + case Some(handler) => + handler.convert(expr, inputs, binding) + case None => + withInfo( + expr, + s"Static invoke expression: ${expr.functionName} is not supported", + expr.children: _*) + None + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometBitmapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometBitmapExpressionSuite.scala new file mode 100644 index 0000000000..044c4a563a --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometBitmapExpressionSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import scala.util.Random + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.CometTestBase + +import org.apache.comet.CometSparkSessionExtensions.isSpark35Plus +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} + +class CometBitmapExpressionSuite extends CometTestBase { + + test("bitmap_count") { + assume(isSpark35Plus) + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + DataGenOptions( + allowNull = true, + generateNegativeZero = true, + generateArray = false, + generateStruct = false, + generateMap = false)) + } + val table = spark.read.parquet(filename) + table.createOrReplaceTempView("t1") + val df = sql("SELECT bitmap_count(c13) FROM t1") + checkSparkAnswerAndOperator(df) + } + } +}