Skip to content

Commit

Permalink
Merge pull request #13 from apache/renaming
Browse files Browse the repository at this point in the history
Rename for consistency
  • Loading branch information
jmalkin authored Jan 22, 2025
2 parents 395c55e + e1650fe commit 5e194b3
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 66 deletions.
52 changes: 41 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,58 @@ description := "The Apache DataSketches package for Spark"

licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0"))

// determine our java version
val jvmVersionString = settingKey[String]("The JVM version")
jvmVersionString := sys.props("java.version")

val jvmVersion = settingKey[String]("The JVM major version")
jvmVersion := {
val version = jvmVersionString.value
if (version.startsWith("21")) "21"
else if (version.startsWith("17")) "17"
else if (version.startsWith("11")) "11"
else "8"
}

val dsJavaVersion = settingKey[String]("The DataSketches Java version")
dsJavaVersion := {
if (jvmVersion.value == "11") "6.2.0"
else if (jvmVersion.value == "17") "7.0.1"
else if (jvmVersion.value == "21") "8.0.0"
else "6.2.0"
}

// these do not impact code generation in spark
javacOptions ++= Seq("-source", "17", "-target", "17")
scalacOptions ++= Seq("-encoding", "UTF-8", "-release", "17")
Test / javacOptions ++= Seq("-source", "17", "-target", "17")
Test / scalacOptions ++= Seq("-encoding", "UTF-8", "-release", "17")
javacOptions ++= Seq("-source", jvmVersion.value, "-target", jvmVersion.value)
scalacOptions ++= Seq("-encoding", "UTF-8", "-release", jvmVersion.value)
Test / javacOptions ++= Seq("-source", jvmVersion.value, "-target", jvmVersion.value)
Test / scalacOptions ++= Seq("-encoding", "UTF-8", "-release", jvmVersion.value)

libraryDependencies ++= Seq(
"org.apache.datasketches" % "datasketches-java" % dsJavaVersion.value % "compile",
"org.scala-lang" % "scala-library" % "2.12.6",
"org.apache.spark" %% "spark-sql" % "3.5.4" % "provided",
"org.apache.datasketches" % "datasketches-java" % "7.0.0" % "compile",
"org.scalatest" %% "scalatest" % "3.2.19" % "test",
"org.scalatestplus" %% "junit-4-13" % "3.2.19.0" % "test"
)

Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oD")

// for java 17
Test / fork := true
Test / javaOptions ++= Seq(
"--add-modules=jdk.incubator.foreign",
"--add-exports=java.base/sun.nio.ch=ALL-UNNAMED"
)
// additional options for java 17
Test / fork := {
if (jvmVersion.value == "17") true
else (Test / fork).value
}

Test / javaOptions ++= {
if (jvmVersion.value == "17") {
Seq("--add-modules=jdk.incubator.foreign",
"--add-exports=java.base/sun.nio.ch=ALL-UNNAMED"
)
} else {
Seq.empty
}
}

scalacOptions ++= Seq(
"-deprecation",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
""",
)
// scalastyle:on line.size.limit
case class KllDoublesSketchAgg(
case class KllDoublesSketchAggBuild(
dataExpr: Expression,
kExpr: Expression,
mutableAggBufferOffset: Int = 0,
Expand Down Expand Up @@ -84,14 +84,14 @@ case class KllDoublesSketchAgg(
}

// Copy constructors
override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): KllDoublesSketchAgg =
override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): KllDoublesSketchAggBuild =
copy(mutableAggBufferOffset = newMutableAggBufferOffset)

override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): KllDoublesSketchAgg =
override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): KllDoublesSketchAggBuild =
copy(inputAggBufferOffset = newInputAggBufferOffset)

override protected def withNewChildrenInternal(newLeft: Expression,
newRight: Expression): KllDoublesSketchAgg = {
newRight: Expression): KllDoublesSketchAggBuild = {
copy(dataExpr = newLeft, kExpr = newRight)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
//group = "agg_funcs",
)
// scalastyle:on line.size.limit
case class KllDoublesSketchMergeAgg(
case class KllDoublesSketchAggMerge(
sketchExpr: Expression,
kExpr: Expression,
mutableAggBufferOffset: Int = 0,
Expand Down Expand Up @@ -86,13 +86,13 @@ case class KllDoublesSketchMergeAgg(
}

// Copy constructors
override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): KllDoublesSketchMergeAgg =
override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): KllDoublesSketchAggMerge =
copy(mutableAggBufferOffset = newMutableAggBufferOffset)

override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): KllDoublesSketchMergeAgg =
override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): KllDoublesSketchAggMerge =
copy(inputAggBufferOffset = newInputAggBufferOffset)

override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): KllDoublesSketchMergeAgg =
override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): KllDoublesSketchAggMerge =
copy(sketchExpr = newLeft, kExpr = newRight)

// overrides for TypedImperativeAggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.spark.sql.registrar
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo}

import org.apache.spark.sql.aggregate.{KllDoublesSketchAgg, KllDoublesSketchMergeAgg}
import org.apache.spark.sql.aggregate.{KllDoublesSketchAggBuild, KllDoublesSketchAggMerge}
import org.apache.spark.sql.expressions.{KllDoublesSketchGetMin, KllDoublesSketchGetMax, KllDoublesSketchGetPmf, KllDoublesSketchGetCdf}

object KllFunctionRegistry extends DatasketchesFunctionRegistry {
override val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
expression[KllDoublesSketchAgg]("kll_sketch_double_agg_build"),
expression[KllDoublesSketchMergeAgg]("kll_sketch_double_agg_merge"),
expression[KllDoublesSketchAggBuild]("kll_sketch_double_agg_build"),
expression[KllDoublesSketchAggMerge]("kll_sketch_double_agg_merge"),
expression[KllDoublesSketchGetMin]("kll_sketch_double_get_min"),
expression[KllDoublesSketchGetMax]("kll_sketch_double_get_max"),
expression[KllDoublesSketchGetPmf]("kll_sketch_double_get_pmf"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.spark.sql.registrar
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo}

import org.apache.spark.sql.aggregate.{ThetaSketchBuild, ThetaUnion}
import org.apache.spark.sql.aggregate.{ThetaSketchAggBuild, ThetaSketchAggUnion}
import org.apache.spark.sql.expressions.ThetaSketchGetEstimate

object ThetaFunctionRegistry extends DatasketchesFunctionRegistry {
override val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
expression[ThetaSketchBuild]("theta_sketch_build"),
expression[ThetaUnion]("theta_union"),
expression[ThetaSketchAggBuild]("theta_sketch_agg_build"),
expression[ThetaSketchAggUnion]("theta_sketch_agg_union"),
expression[ThetaSketchGetEstimate]("theta_sketch_get_estimate")
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{ArrayType, BooleanType, DoubleType}

import org.apache.spark.sql.aggregate.{KllDoublesSketchMergeAgg, KllDoublesSketchAgg}
import org.apache.spark.sql.aggregate.{KllDoublesSketchAggMerge, KllDoublesSketchAggBuild}
import org.apache.spark.sql.expressions.{KllDoublesSketchGetMin, KllDoublesSketchGetMax, KllDoublesSketchGetPmfCdf}

object functions_datasketches_kll extends DatasketchesScalaFunctionBase {

// build sketch
def kll_sketch_double_agg_build(expr: Column, k: Column): Column = withAggregateFunction {
new KllDoublesSketchAgg(expr.expr, k.expr)
new KllDoublesSketchAggBuild(expr.expr, k.expr)
}

def kll_sketch_double_agg_build(expr: Column, k: Int): Column = {
Expand All @@ -40,7 +40,7 @@ object functions_datasketches_kll extends DatasketchesScalaFunctionBase {
}

def kll_sketch_double_agg_build(expr: Column): Column = withAggregateFunction {
new KllDoublesSketchAgg(expr.expr)
new KllDoublesSketchAggBuild(expr.expr)
}

def kll_sketch_double_agg_build(columnName: String): Column = {
Expand All @@ -49,19 +49,19 @@ object functions_datasketches_kll extends DatasketchesScalaFunctionBase {

// merge sketches
def kll_sketch_double_agg_merge(expr: Column): Column = withAggregateFunction {
new KllDoublesSketchMergeAgg(expr.expr)
new KllDoublesSketchAggMerge(expr.expr)
}

def kll_sketch_double_agg_merge(columnName: String): Column = {
kll_sketch_double_agg_merge(Column(columnName))
}

def kll_sketch_double_agg_merge(expr: Column, k: Column): Column = withAggregateFunction {
new KllDoublesSketchMergeAgg(expr.expr, k.expr)
new KllDoublesSketchAggMerge(expr.expr, k.expr)
}

def kll_sketch_double_agg_merge(expr: Column, k: Int): Column = withAggregateFunction {
new KllDoublesSketchMergeAgg(expr.expr, lit(k).expr)
new KllDoublesSketchAggMerge(expr.expr, lit(k).expr)
}

def kll_sketch_double_agg_merge(columnName: String, k: Int): Column = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,40 @@ package org.apache.spark.sql

import org.apache.spark.sql.functions.lit

import org.apache.spark.sql.aggregate.{ThetaSketchBuild, ThetaUnion}
import org.apache.spark.sql.aggregate.{ThetaSketchAggBuild, ThetaSketchAggUnion}
import org.apache.spark.sql.expressions.ThetaSketchGetEstimate

object functions_datasketches_theta extends DatasketchesScalaFunctionBase {
def theta_sketch_build(column: Column, lgk: Int): Column = withAggregateFunction {
new ThetaSketchBuild(column.expr, lgk)
def theta_sketch_agg_build(column: Column, lgk: Int): Column = withAggregateFunction {
new ThetaSketchAggBuild(column.expr, lgk)
}

def theta_sketch_build(columnName: String, lgk: Int): Column = {
theta_sketch_build(Column(columnName), lgk)
def theta_sketch_agg_build(columnName: String, lgk: Int): Column = {
theta_sketch_agg_build(Column(columnName), lgk)
}

def theta_sketch_build(column: Column): Column = withAggregateFunction {
new ThetaSketchBuild(column.expr)
def theta_sketch_agg_build(column: Column): Column = withAggregateFunction {
new ThetaSketchAggBuild(column.expr)
}

def theta_sketch_build(columnName: String): Column = {
theta_sketch_build(Column(columnName))
def theta_sketch_agg_build(columnName: String): Column = {
theta_sketch_agg_build(Column(columnName))
}

def theta_union(column: Column, lgk: Int): Column = withAggregateFunction {
new ThetaUnion(column.expr, lit(lgk).expr)
def theta_sketch_agg_union(column: Column, lgk: Int): Column = withAggregateFunction {
new ThetaSketchAggUnion(column.expr, lit(lgk).expr)
}

def theta_union(columnName: String, lgk: Int): Column = withAggregateFunction {
new ThetaUnion(Column(columnName).expr, lit(lgk).expr)
def theta_sketch_agg_union(columnName: String, lgk: Int): Column = withAggregateFunction {
new ThetaSketchAggUnion(Column(columnName).expr, lit(lgk).expr)
}

def theta_union(column: Column): Column = withAggregateFunction {
new ThetaUnion(column.expr)
def theta_sketch_agg_union(column: Column): Column = withAggregateFunction {
new ThetaSketchAggUnion(column.expr)
}

def theta_union(columnName: String): Column = withAggregateFunction {
new ThetaUnion(Column(columnName).expr)
def theta_sketch_agg_union(columnName: String): Column = withAggregateFunction {
new ThetaSketchAggUnion(Column(columnName).expr)
}

def theta_sketch_get_estimate(column: Column): Column = withExpr {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType, Long
* which can be used to estimate distinct count.
*
* See [[https://datasketches.apache.org/docs/Theta/ThetaSketches.html]] for more information.
*
*
* @param child child expression, from which to build a sketch
* @param lgk the size-accraucy trade-off parameter for the sketch
*/
Expand All @@ -46,7 +46,7 @@ import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType, Long
""",
)
// scalastyle:on line.size.limit
case class ThetaSketchBuild(
case class ThetaSketchAggBuild(
left: Expression,
right: Expression,
mutableAggBufferOffset: Int = 0,
Expand All @@ -69,13 +69,13 @@ case class ThetaSketchBuild(
def this(child: Expression, lgk: Expression) = this(child, lgk, 0, 0)
def this(child: Expression, lgk: Int) = this(child, Literal(lgk), 0, 0)

override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ThetaSketchBuild =
override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ThetaSketchAggBuild =
copy(mutableAggBufferOffset = newMutableAggBufferOffset)

override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaSketchBuild =
override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaSketchAggBuild =
copy(inputAggBufferOffset = newInputAggBufferOffset)

override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ThetaSketchBuild = {
override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ThetaSketchAggBuild = {
copy(left = newLeft, right = newRight)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.SparkUnsupportedOperationException
"""
)
// scalastyle:on line.size.limit
case class ThetaUnion(
case class ThetaSketchAggUnion(
left: Expression,
right: Expression,
mutableAggBufferOffset: Int = 0,
Expand All @@ -71,13 +71,13 @@ case class ThetaUnion(
// def this(child: Expression, lgk: Expression) = this(child, lgk, 0, 0)
// def this(child: Expression, lgk: Int) = this(child, Literal(lgk), 0, 0)

override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ThetaUnion =
override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ThetaSketchAggUnion =
copy(mutableAggBufferOffset = newMutableAggBufferOffset)

override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaUnion =
override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ThetaSketchAggUnion =
copy(inputAggBufferOffset = newInputAggBufferOffset)

override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ThetaUnion = {
override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ThetaSketchAggUnion = {
copy(left = newLeft, right = newRight)
}

Expand Down
18 changes: 9 additions & 9 deletions src/test/scala/org/apache/spark/sql/ThetaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ThetaTest extends SparkSessionManager {
val n = 100
val data = (for (i <- 1 to n) yield i).toDF("value")

val sketchDf = data.agg(theta_sketch_build("value").as("sketch"))
val sketchDf = data.agg(theta_sketch_agg_build("value").as("sketch"))
val result: Row = sketchDf.select(theta_sketch_get_estimate("sketch").as("estimate")).head

assert(result.getAs[Double]("estimate") == 100.0)
Expand All @@ -42,7 +42,7 @@ class ThetaTest extends SparkSessionManager {

val df = spark.sql(s"""
SELECT
theta_sketch_get_estimate(theta_sketch_build(value)) AS estimate
theta_sketch_get_estimate(theta_sketch_agg_build(value)) AS estimate
FROM
theta_input_table
""")
Expand All @@ -58,7 +58,7 @@ class ThetaTest extends SparkSessionManager {

val df = spark.sql(s"""
SELECT
theta_sketch_get_estimate(theta_sketch_build(value, 14)) AS estimate
theta_sketch_get_estimate(theta_sketch_agg_build(value, 14)) AS estimate
FROM
theta_input_table
""")
Expand All @@ -70,8 +70,8 @@ class ThetaTest extends SparkSessionManager {
val numDistinct = 2000
val data = (for (i <- 1 to numDistinct) yield (i % numGroups, i)).toDF("group", "value")

val groupedDf = data.groupBy("group").agg(theta_sketch_build("value").as("sketch"))
val mergedDf = groupedDf.agg(theta_union("sketch").as("merged"))
val groupedDf = data.groupBy("group").agg(theta_sketch_agg_build("value").as("sketch"))
val mergedDf = groupedDf.agg(theta_sketch_agg_union("sketch").as("merged"))
val result: Row = mergedDf.select(theta_sketch_get_estimate("merged").as("estimate")).head
assert(result.getAs[Double]("estimate") == numDistinct)
}
Expand All @@ -86,7 +86,7 @@ class ThetaTest extends SparkSessionManager {
val groupedDf = spark.sql(s"""
SELECT
group,
theta_sketch_build(value) AS sketch
theta_sketch_agg_build(value) AS sketch
FROM
theta_input_table
GROUP BY
Expand All @@ -96,7 +96,7 @@ class ThetaTest extends SparkSessionManager {

val mergedDf = spark.sql(s"""
SELECT
theta_sketch_get_estimate(theta_union(sketch)) AS estimate
theta_sketch_get_estimate(theta_sketch_agg_union(sketch)) AS estimate
FROM
theta_sketch_table
""")
Expand All @@ -112,7 +112,7 @@ class ThetaTest extends SparkSessionManager {
val groupedDf = spark.sql(s"""
SELECT
group,
theta_sketch_build(value, 14) AS sketch
theta_sketch_agg_build(value, 14) AS sketch
FROM
theta_input_table
GROUP BY
Expand All @@ -122,7 +122,7 @@ class ThetaTest extends SparkSessionManager {

val mergedDf = spark.sql(s"""
SELECT
theta_sketch_get_estimate(theta_union(sketch, 14)) AS estimate
theta_sketch_get_estimate(theta_sketch_agg_union(sketch, 14)) AS estimate
FROM
theta_sketch_table
""")
Expand Down

0 comments on commit 5e194b3

Please sign in to comment.