Skip to content

Commit

Permalink
Merge pull request #17 from apache/spark3.3_compat
Browse files Browse the repository at this point in the history
Compatibility with spark 3.3
  • Loading branch information
jmalkin authored Jan 28, 2025
2 parents eb7b4b2 + 7b311b2 commit 6779cde
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.aggregate

import org.apache.datasketches.kll.{KllSketch, KllDoublesSketch}
import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
Expand Down Expand Up @@ -60,9 +59,8 @@ case class KllDoublesSketchAggBuild(
case null => KllSketch.DEFAULT_K
case k: Int => k
// this shouldn't happen after checkInputDataTypes()
case _ => throw new SparkUnsupportedOperationException(
s"Unsupported input type ${kExpr.dataType.catalogString}",
Map("dataType" -> dataType.toString))
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${kExpr.dataType.catalogString}")
}
}

Expand Down Expand Up @@ -104,8 +102,6 @@ case class KllDoublesSketchAggBuild(

override def nullable: Boolean = false

override def stateful: Boolean = true

override def inputTypes: Seq[AbstractDataType] = Seq(NumericType, IntegerType)

override def checkInputDataTypes(): TypeCheckResult = {
Expand Down Expand Up @@ -136,9 +132,8 @@ case class KllDoublesSketchAggBuild(
case FloatType => sketch.update(value.asInstanceOf[Float].toDouble)
case IntegerType => sketch.update(value.asInstanceOf[Int].toDouble)
case LongType => sketch.update(value.asInstanceOf[Long].toDouble)
case _ => throw new SparkUnsupportedOperationException(
s"Unsupported input type ${dataExpr.dataType.catalogString}",
Map("dataType" -> dataType.toString))
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${dataExpr.dataType.catalogString}")
}
}
sketch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.aggregate
import org.apache.datasketches.memory.Memory
import org.apache.datasketches.kll.{KllSketch, KllDoublesSketch}

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
Expand Down Expand Up @@ -62,9 +61,8 @@ case class KllDoublesSketchAggMerge(
case null => KllSketch.DEFAULT_K
case k: Int => k
// this shouldn't happen after checkInputDataTypes()
case _ => throw new SparkUnsupportedOperationException(
s"Unsupported input type ${right.dataType.catalogString}",
Map("dataType" -> dataType.toString))
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${right.dataType.catalogString}")
}
}

Expand Down Expand Up @@ -102,8 +100,6 @@ case class KllDoublesSketchAggMerge(

override def nullable: Boolean = false

override def stateful: Boolean = true

override def inputTypes: Seq[AbstractDataType] = Seq(KllDoublesSketchType, IntegerType)

override def checkInputDataTypes(): TypeCheckResult = {
Expand Down Expand Up @@ -135,9 +131,8 @@ case class KllDoublesSketchAggMerge(
case KllDoublesSketchType =>
union.merge(KllDoublesSketch.wrap(Memory.wrap(value.asInstanceOf[Array[Byte]])))
union
case _ => throw new SparkUnsupportedOperationException(
s"Unsupported input type ${sketchExpr.dataType.catalogString}",
Map("dataType" -> dataType.toString))
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${sketchExpr.dataType.catalogString}")
}
} else {
union
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.aggregate

import org.apache.datasketches.theta.{UpdateSketch, SetOperation}
import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
Expand Down Expand Up @@ -59,9 +58,8 @@ case class ThetaSketchAggBuild(
right.eval() match {
case null => 12
case lgk: Int => lgk
case _ => throw new SparkUnsupportedOperationException(
s"Unsupported input type ${right.dataType.catalogString}",
Map("dataType" -> dataType.toString))
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${right.dataType.catalogString}")
}
}

Expand Down Expand Up @@ -97,9 +95,8 @@ case class ThetaSketchAggBuild(
case FloatType => wrapper.updateSketch.get.update(value.asInstanceOf[Float])
case IntegerType => wrapper.updateSketch.get.update(value.asInstanceOf[Int])
case LongType => wrapper.updateSketch.get.update(value.asInstanceOf[Long])
case _ => throw new SparkUnsupportedOperationException(
s"Unsupported input type ${left.dataType.catalogString}",
Map("dataType" -> dataType.toString))
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${left.dataType.catalogString}")
}
}
wrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression,
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
import org.apache.spark.sql.catalyst.trees.BinaryLike
import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType, ThetaSketchWrapper, ThetaSketchType}
import org.apache.spark.SparkUnsupportedOperationException

/**
* Theta Union operation.
Expand Down Expand Up @@ -60,9 +59,8 @@ case class ThetaSketchAggUnion(
right.eval() match {
case null => 12
case lgk: Int => lgk
case _ => throw new SparkUnsupportedOperationException(
s"Unsupported input type ${right.dataType.catalogString}",
Map("dataType" -> dataType.toString))
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${right.dataType.catalogString}")
}
}

Expand Down Expand Up @@ -97,9 +95,8 @@ case class ThetaSketchAggUnion(
left.dataType match {
case ThetaSketchType =>
wrapper.union.get.union(Sketch.wrap(Memory.wrap(bytes.asInstanceOf[Array[Byte]])))
case _ => throw new SparkUnsupportedOperationException(
s"Unsupported input type ${left.dataType.catalogString}",
Map("dataType" -> dataType.toString))
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${left.dataType.catalogString}")
}
}
wrapper
Expand Down

0 comments on commit 6779cde

Please sign in to comment.