diff --git a/build.sbt b/build.sbt index a2782b6..483edbf 100644 --- a/build.sbt +++ b/build.sbt @@ -24,28 +24,58 @@ description := "The Apache DataSketches package for Spark" licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0")) +// determine our java version +val jvmVersionString = settingKey[String]("The JVM version") +jvmVersionString := sys.props("java.version") + +val jvmVersion = settingKey[String]("The JVM major version") +jvmVersion := { + val version = jvmVersionString.value + if (version.startsWith("21")) "21" + else if (version.startsWith("17")) "17" + else if (version.startsWith("11")) "11" + else "8" +} + +val dsJavaVersion = settingKey[String]("The DataSketches Java version") +dsJavaVersion := { + if (jvmVersion.value == "11") "6.2.0" + else if (jvmVersion.value == "17") "7.0.1" + else if (jvmVersion.value == "21") "8.0.0" + else "6.2.0" +} + // these do not impact code generation in spark -javacOptions ++= Seq("-source", "17", "-target", "17") -scalacOptions ++= Seq("-encoding", "UTF-8", "-release", "17") -Test / javacOptions ++= Seq("-source", "17", "-target", "17") -Test / scalacOptions ++= Seq("-encoding", "UTF-8", "-release", "17") +javacOptions ++= Seq("-source", jvmVersion.value, "-target", jvmVersion.value) +scalacOptions ++= Seq("-encoding", "UTF-8", "-release", jvmVersion.value) +Test / javacOptions ++= Seq("-source", jvmVersion.value, "-target", jvmVersion.value) +Test / scalacOptions ++= Seq("-encoding", "UTF-8", "-release", jvmVersion.value) libraryDependencies ++= Seq( + "org.apache.datasketches" % "datasketches-java" % dsJavaVersion.value % "compile", "org.scala-lang" % "scala-library" % "2.12.6", "org.apache.spark" %% "spark-sql" % "3.5.4" % "provided", - "org.apache.datasketches" % "datasketches-java" % "7.0.0" % "compile", "org.scalatest" %% "scalatest" % "3.2.19" % "test", "org.scalatestplus" %% "junit-4-13" % "3.2.19.0" % "test" ) Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oD") -// for java 17 -Test / fork := true -Test / javaOptions ++= Seq( - "--add-modules=jdk.incubator.foreign", - "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED" -) +// additional options for java 17 +Test / fork := { + if (jvmVersion.value == "17") true + else (Test / fork).value +} + +Test / javaOptions ++= { + if (jvmVersion.value == "17") { + Seq("--add-modules=jdk.incubator.foreign", + "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED" + ) + } else { + Seq.empty + } +} scalacOptions ++= Seq( "-deprecation", diff --git a/src/main/scala/org/apache/spark/sql/kll/aggregate/KllAggregate.scala b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllAggregate.scala index 97db9ae..543ce9f 100644 --- a/src/main/scala/org/apache/spark/sql/kll/aggregate/KllAggregate.scala +++ b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllAggregate.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult """, ) // scalastyle:on line.size.limit -case class KllDoublesSketchAgg( +case class KllDoublesSketchAggBuild( dataExpr: Expression, kExpr: Expression, mutableAggBufferOffset: Int = 0, @@ -84,14 +84,14 @@ case class KllDoublesSketchAgg( } // Copy constructors - override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): KllDoublesSketchAgg = + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): KllDoublesSketchAggBuild = copy(mutableAggBufferOffset = newMutableAggBufferOffset) - override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): KllDoublesSketchAgg = + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): KllDoublesSketchAggBuild = copy(inputAggBufferOffset = newInputAggBufferOffset) override protected def withNewChildrenInternal(newLeft: Expression, - newRight: Expression): KllDoublesSketchAgg = { + newRight: Expression): KllDoublesSketchAggBuild = { copy(dataExpr = newLeft, kExpr = newRight) } diff --git a/src/main/scala/org/apache/spark/sql/kll/aggregate/KllMerge.scala b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllMerge.scala index af139a3..f605604 100644 --- a/src/main/scala/org/apache/spark/sql/kll/aggregate/KllMerge.scala +++ b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllMerge.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult //group = "agg_funcs", ) // scalastyle:on line.size.limit -case class KllDoublesSketchMergeAgg( +case class KllDoublesSketchAggMerge( sketchExpr: Expression, kExpr: Expression, mutableAggBufferOffset: Int = 0, @@ -86,13 +86,13 @@ case class KllDoublesSketchMergeAgg( } // Copy constructors - override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): KllDoublesSketchMergeAgg = + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): KllDoublesSketchAggMerge = copy(mutableAggBufferOffset = newMutableAggBufferOffset) - override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): KllDoublesSketchMergeAgg = + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): KllDoublesSketchAggMerge = copy(inputAggBufferOffset = newInputAggBufferOffset) - override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): KllDoublesSketchMergeAgg = + override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): KllDoublesSketchAggMerge = copy(sketchExpr = newLeft, kExpr = newRight) // overrides for TypedImperativeAggregate diff --git a/src/main/scala/org/apache/spark/sql/registrar/KllFunctionRegistry.scala b/src/main/scala/org/apache/spark/sql/registrar/KllFunctionRegistry.scala index 56cfefa..a1fe6a1 100644 --- a/src/main/scala/org/apache/spark/sql/registrar/KllFunctionRegistry.scala +++ b/src/main/scala/org/apache/spark/sql/registrar/KllFunctionRegistry.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.registrar import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo} -import org.apache.spark.sql.aggregate.{KllDoublesSketchAgg, KllDoublesSketchMergeAgg} +import org.apache.spark.sql.aggregate.{KllDoublesSketchAggBuild, KllDoublesSketchAggMerge} import org.apache.spark.sql.expressions.{KllDoublesSketchGetMin, KllDoublesSketchGetMax, KllDoublesSketchGetPmf, KllDoublesSketchGetCdf} object KllFunctionRegistry extends DatasketchesFunctionRegistry { override val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( - expression[KllDoublesSketchAgg]("kll_sketch_double_agg_build"), - expression[KllDoublesSketchMergeAgg]("kll_sketch_double_agg_merge"), + expression[KllDoublesSketchAggBuild]("kll_sketch_double_agg_build"), + expression[KllDoublesSketchAggMerge]("kll_sketch_double_agg_merge"), expression[KllDoublesSketchGetMin]("kll_sketch_double_get_min"), expression[KllDoublesSketchGetMax]("kll_sketch_double_get_max"), expression[KllDoublesSketchGetPmf]("kll_sketch_double_get_pmf"), diff --git a/src/main/scala/org/apache/spark/sql/registrar/ThetaFunctionRegistry.scala b/src/main/scala/org/apache/spark/sql/registrar/ThetaFunctionRegistry.scala index 27fa41a..0062154 100644 --- a/src/main/scala/org/apache/spark/sql/registrar/ThetaFunctionRegistry.scala +++ b/src/main/scala/org/apache/spark/sql/registrar/ThetaFunctionRegistry.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.registrar import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo} -import org.apache.spark.sql.aggregate.{ThetaSketchBuild, ThetaUnion} +import org.apache.spark.sql.aggregate.{ThetaSketchAggBuild, ThetaSketchAggUnion} import org.apache.spark.sql.expressions.ThetaSketchGetEstimate object ThetaFunctionRegistry extends DatasketchesFunctionRegistry { override val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( - expression[ThetaSketchBuild]("theta_sketch_build"), - expression[ThetaUnion]("theta_union"), + expression[ThetaSketchAggBuild]("theta_sketch_agg_build"), + expression[ThetaSketchAggUnion]("theta_sketch_agg_union"), expression[ThetaSketchGetEstimate]("theta_sketch_get_estimate") ) } diff --git a/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_kll.scala b/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_kll.scala index 6493690..734110c 100644 --- a/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_kll.scala +++ b/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_kll.scala @@ -21,14 +21,14 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types.{ArrayType, BooleanType, DoubleType} -import org.apache.spark.sql.aggregate.{KllDoublesSketchMergeAgg, KllDoublesSketchAgg} +import org.apache.spark.sql.aggregate.{KllDoublesSketchAggMerge, KllDoublesSketchAggBuild} import org.apache.spark.sql.expressions.{KllDoublesSketchGetMin, KllDoublesSketchGetMax, KllDoublesSketchGetPmfCdf} object functions_datasketches_kll extends DatasketchesScalaFunctionBase { // build sketch def kll_sketch_double_agg_build(expr: Column, k: Column): Column = withAggregateFunction { - new KllDoublesSketchAgg(expr.expr, k.expr) + new KllDoublesSketchAggBuild(expr.expr, k.expr) } def kll_sketch_double_agg_build(expr: Column, k: Int): Column = { @@ -40,7 +40,7 @@ object functions_datasketches_kll extends DatasketchesScalaFunctionBase { } def kll_sketch_double_agg_build(expr: Column): Column = withAggregateFunction { - new KllDoublesSketchAgg(expr.expr) + new KllDoublesSketchAggBuild(expr.expr) } def kll_sketch_double_agg_build(columnName: String): Column = { @@ -49,7 +49,7 @@ object functions_datasketches_kll extends DatasketchesScalaFunctionBase { // merge sketches def kll_sketch_double_agg_merge(expr: Column): Column = withAggregateFunction { - new KllDoublesSketchMergeAgg(expr.expr) + new KllDoublesSketchAggMerge(expr.expr) } def kll_sketch_double_agg_merge(columnName: String): Column = { @@ -57,11 +57,11 @@ object functions_datasketches_kll extends DatasketchesScalaFunctionBase { } def kll_sketch_double_agg_merge(expr: Column, k: Column): Column = withAggregateFunction { - new KllDoublesSketchMergeAgg(expr.expr, k.expr) + new KllDoublesSketchAggMerge(expr.expr, k.expr) } def kll_sketch_double_agg_merge(expr: Column, k: Int): Column = withAggregateFunction { - new KllDoublesSketchMergeAgg(expr.expr, lit(k).expr) + new KllDoublesSketchAggMerge(expr.expr, lit(k).expr) } def kll_sketch_double_agg_merge(columnName: String, k: Int): Column = { diff --git a/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_theta.scala b/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_theta.scala index 88dfc8f..3c33054 100644 --- a/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_theta.scala +++ b/src/main/scala/org/apache/spark/sql/registrar/functions_datasketches_theta.scala @@ -19,40 +19,40 @@ package org.apache.spark.sql import org.apache.spark.sql.functions.lit -import org.apache.spark.sql.aggregate.{ThetaSketchBuild, ThetaUnion} +import org.apache.spark.sql.aggregate.{ThetaSketchAggBuild, ThetaSketchAggUnion} import org.apache.spark.sql.expressions.ThetaSketchGetEstimate object functions_datasketches_theta extends DatasketchesScalaFunctionBase { - def theta_sketch_build(column: Column, lgk: Int): Column = withAggregateFunction { - new ThetaSketchBuild(column.expr, lgk) + def theta_sketch_agg_build(column: Column, lgk: Int): Column = withAggregateFunction { + new ThetaSketchAggBuild(column.expr, lgk) } - def theta_sketch_build(columnName: String, lgk: Int): Column = { - theta_sketch_build(Column(columnName), lgk) + def theta_sketch_agg_build(columnName: String, lgk: Int): Column = { + theta_sketch_agg_build(Column(columnName), lgk) } - def theta_sketch_build(column: Column): Column = withAggregateFunction { - new ThetaSketchBuild(column.expr) + def theta_sketch_agg_build(column: Column): Column = withAggregateFunction { + new ThetaSketchAggBuild(column.expr) } - def theta_sketch_build(columnName: String): Column = { - theta_sketch_build(Column(columnName)) + def theta_sketch_agg_build(columnName: String): Column = { + theta_sketch_agg_build(Column(columnName)) } - def theta_union(column: Column, lgk: Int): Column = withAggregateFunction { - new ThetaUnion(column.expr, lit(lgk).expr) + def theta_sketch_agg_union(column: Column, lgk: Int): Column = withAggregateFunction { + new ThetaSketchAggUnion(column.expr, lit(lgk).expr) } - def theta_union(columnName: String, lgk: Int): Column = withAggregateFunction { - new ThetaUnion(Column(columnName).expr, lit(lgk).expr) + def theta_sketch_agg_union(columnName: String, lgk: Int): Column = withAggregateFunction { + new ThetaSketchAggUnion(Column(columnName).expr, lit(lgk).expr) } - def theta_union(column: Column): Column = withAggregateFunction { - new ThetaUnion(column.expr) + def theta_sketch_agg_union(column: Column): Column = withAggregateFunction { + new ThetaSketchAggUnion(column.expr) } - def theta_union(columnName: String): Column = withAggregateFunction { - new ThetaUnion(Column(columnName).expr) + def theta_sketch_agg_union(columnName: String): Column = withAggregateFunction { + new ThetaSketchAggUnion(Column(columnName).expr) } def theta_sketch_get_estimate(column: Column): Column = withExpr { diff --git a/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchBuild.scala b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggBuild.scala similarity index 97% rename from src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchBuild.scala rename to src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggBuild.scala index d1bb88d..bbca824 100644 --- a/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchBuild.scala +++ b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggBuild.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType, Long * which can be used to estimate distinct count. * * See [[https://datasketches.apache.org/docs/Theta/ThetaSketches.html]] for more information. - * + * * @param child child expression, from which to build a sketch * @param lgk the size-accraucy trade-off parameter for the sketch */ @@ -46,7 +46,7 @@ import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType, Long """, ) // scalastyle:on line.size.limit -case class ThetaSketchBuild( +case class ThetaSketchAggBuild( left: Expression, right: Expression, mutableAggBufferOffset: Int = 0, @@ -69,13 +69,13 @@ case class ThetaSketchBuild( def this(child: Expression, lgk: Expression) = this(child, lgk, 0, 0) def this(child: Expression, lgk: Int) = this(child, Literal(lgk), 0, 0) - override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ThetaSketchBuild = + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ThetaSketchAggBuild = copy(mutableAggBufferOffset = newMutableAggBufferOffset) - override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaSketchBuild = + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaSketchAggBuild = copy(inputAggBufferOffset = newInputAggBufferOffset) - override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ThetaSketchBuild = { + override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ThetaSketchAggBuild = { copy(left = newLeft, right = newRight) } diff --git a/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaUnion.scala b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggUnion.scala similarity index 97% rename from src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaUnion.scala rename to src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggUnion.scala index f36ad29..1a17b34 100644 --- a/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaUnion.scala +++ b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggUnion.scala @@ -47,7 +47,7 @@ import org.apache.spark.SparkUnsupportedOperationException """ ) // scalastyle:on line.size.limit -case class ThetaUnion( +case class ThetaSketchAggUnion( left: Expression, right: Expression, mutableAggBufferOffset: Int = 0, @@ -71,13 +71,13 @@ case class ThetaUnion( // def this(child: Expression, lgk: Expression) = this(child, lgk, 0, 0) // def this(child: Expression, lgk: Int) = this(child, Literal(lgk), 0, 0) - override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ThetaUnion = + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ThetaSketchAggUnion = copy(mutableAggBufferOffset = newMutableAggBufferOffset) - override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaUnion = + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaSketchAggUnion = copy(inputAggBufferOffset = newInputAggBufferOffset) - override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ThetaUnion = { + override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ThetaSketchAggUnion = { copy(left = newLeft, right = newRight) } diff --git a/src/test/scala/org/apache/spark/sql/ThetaTest.scala b/src/test/scala/org/apache/spark/sql/ThetaTest.scala index 8a47658..4b7f0a4 100644 --- a/src/test/scala/org/apache/spark/sql/ThetaTest.scala +++ b/src/test/scala/org/apache/spark/sql/ThetaTest.scala @@ -27,7 +27,7 @@ class ThetaTest extends SparkSessionManager { val n = 100 val data = (for (i <- 1 to n) yield i).toDF("value") - val sketchDf = data.agg(theta_sketch_build("value").as("sketch")) + val sketchDf = data.agg(theta_sketch_agg_build("value").as("sketch")) val result: Row = sketchDf.select(theta_sketch_get_estimate("sketch").as("estimate")).head assert(result.getAs[Double]("estimate") == 100.0) @@ -42,7 +42,7 @@ class ThetaTest extends SparkSessionManager { val df = spark.sql(s""" SELECT - theta_sketch_get_estimate(theta_sketch_build(value)) AS estimate + theta_sketch_get_estimate(theta_sketch_agg_build(value)) AS estimate FROM theta_input_table """) @@ -58,7 +58,7 @@ class ThetaTest extends SparkSessionManager { val df = spark.sql(s""" SELECT - theta_sketch_get_estimate(theta_sketch_build(value, 14)) AS estimate + theta_sketch_get_estimate(theta_sketch_agg_build(value, 14)) AS estimate FROM theta_input_table """) @@ -70,8 +70,8 @@ class ThetaTest extends SparkSessionManager { val numDistinct = 2000 val data = (for (i <- 1 to numDistinct) yield (i % numGroups, i)).toDF("group", "value") - val groupedDf = data.groupBy("group").agg(theta_sketch_build("value").as("sketch")) - val mergedDf = groupedDf.agg(theta_union("sketch").as("merged")) + val groupedDf = data.groupBy("group").agg(theta_sketch_agg_build("value").as("sketch")) + val mergedDf = groupedDf.agg(theta_sketch_agg_union("sketch").as("merged")) val result: Row = mergedDf.select(theta_sketch_get_estimate("merged").as("estimate")).head assert(result.getAs[Double]("estimate") == numDistinct) } @@ -86,7 +86,7 @@ class ThetaTest extends SparkSessionManager { val groupedDf = spark.sql(s""" SELECT group, - theta_sketch_build(value) AS sketch + theta_sketch_agg_build(value) AS sketch FROM theta_input_table GROUP BY @@ -96,7 +96,7 @@ class ThetaTest extends SparkSessionManager { val mergedDf = spark.sql(s""" SELECT - theta_sketch_get_estimate(theta_union(sketch)) AS estimate + theta_sketch_get_estimate(theta_sketch_agg_union(sketch)) AS estimate FROM theta_sketch_table """) @@ -112,7 +112,7 @@ class ThetaTest extends SparkSessionManager { val groupedDf = spark.sql(s""" SELECT group, - theta_sketch_build(value, 14) AS sketch + theta_sketch_agg_build(value, 14) AS sketch FROM theta_input_table GROUP BY @@ -122,7 +122,7 @@ class ThetaTest extends SparkSessionManager { val mergedDf = spark.sql(s""" SELECT - theta_sketch_get_estimate(theta_union(sketch, 14)) AS estimate + theta_sketch_get_estimate(theta_sketch_agg_union(sketch, 14)) AS estimate FROM theta_sketch_table """)