diff --git a/src/main/scala/org/apache/spark/sql/kll/aggregate/KllDoublesSketchAggBuild.scala b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllDoublesSketchAggBuild.scala index 543ce9f..53225ea 100644 --- a/src/main/scala/org/apache/spark/sql/kll/aggregate/KllDoublesSketchAggBuild.scala +++ b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllDoublesSketchAggBuild.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.aggregate import org.apache.datasketches.kll.{KllSketch, KllDoublesSketch} -import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal} import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate @@ -60,9 +59,8 @@ case class KllDoublesSketchAggBuild( case null => KllSketch.DEFAULT_K case k: Int => k // this shouldn't happen after checkInputDataTypes() - case _ => throw new SparkUnsupportedOperationException( - s"Unsupported input type ${kExpr.dataType.catalogString}", - Map("dataType" -> dataType.toString)) + case _ => throw new IllegalArgumentException( + s"Unsupported input type ${kExpr.dataType.catalogString}") } } @@ -104,8 +102,6 @@ case class KllDoublesSketchAggBuild( override def nullable: Boolean = false - override def stateful: Boolean = true - override def inputTypes: Seq[AbstractDataType] = Seq(NumericType, IntegerType) override def checkInputDataTypes(): TypeCheckResult = { @@ -136,9 +132,8 @@ case class KllDoublesSketchAggBuild( case FloatType => sketch.update(value.asInstanceOf[Float].toDouble) case IntegerType => sketch.update(value.asInstanceOf[Int].toDouble) case LongType => sketch.update(value.asInstanceOf[Long].toDouble) - case _ => throw new SparkUnsupportedOperationException( - s"Unsupported input type ${dataExpr.dataType.catalogString}", - Map("dataType" -> dataType.toString)) + case _ => throw new IllegalArgumentException( + s"Unsupported input type ${dataExpr.dataType.catalogString}") } } sketch diff --git a/src/main/scala/org/apache/spark/sql/kll/aggregate/KllDoublesSketchAggMerge.scala b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllDoublesSketchAggMerge.scala index f605604..bf4c51c 100644 --- a/src/main/scala/org/apache/spark/sql/kll/aggregate/KllDoublesSketchAggMerge.scala +++ b/src/main/scala/org/apache/spark/sql/kll/aggregate/KllDoublesSketchAggMerge.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.aggregate import org.apache.datasketches.memory.Memory import org.apache.datasketches.kll.{KllSketch, KllDoublesSketch} -import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal} import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate @@ -62,9 +61,8 @@ case class KllDoublesSketchAggMerge( case null => KllSketch.DEFAULT_K case k: Int => k // this shouldn't happen after checkInputDataTypes() - case _ => throw new SparkUnsupportedOperationException( - s"Unsupported input type ${right.dataType.catalogString}", - Map("dataType" -> dataType.toString)) + case _ => throw new IllegalArgumentException( + s"Unsupported input type ${right.dataType.catalogString}") } } @@ -102,8 +100,6 @@ case class KllDoublesSketchAggMerge( override def nullable: Boolean = false - override def stateful: Boolean = true - override def inputTypes: Seq[AbstractDataType] = Seq(KllDoublesSketchType, IntegerType) override def checkInputDataTypes(): TypeCheckResult = { @@ -135,9 +131,8 @@ case class KllDoublesSketchAggMerge( case KllDoublesSketchType => union.merge(KllDoublesSketch.wrap(Memory.wrap(value.asInstanceOf[Array[Byte]]))) union - case _ => throw new SparkUnsupportedOperationException( - s"Unsupported input type ${sketchExpr.dataType.catalogString}", - Map("dataType" -> dataType.toString)) + case _ => throw new IllegalArgumentException( + s"Unsupported input type ${sketchExpr.dataType.catalogString}") } } else { union diff --git a/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggBuild.scala b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggBuild.scala index bbca824..d88af6e 100644 --- a/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggBuild.scala +++ b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggBuild.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.aggregate import org.apache.datasketches.theta.{UpdateSketch, SetOperation} -import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal} import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate @@ -59,9 +58,8 @@ case class ThetaSketchAggBuild( right.eval() match { case null => 12 case lgk: Int => lgk - case _ => throw new SparkUnsupportedOperationException( - s"Unsupported input type ${right.dataType.catalogString}", - Map("dataType" -> dataType.toString)) + case _ => throw new IllegalArgumentException( + s"Unsupported input type ${right.dataType.catalogString}") } } @@ -97,9 +95,8 @@ case class ThetaSketchAggBuild( case FloatType => wrapper.updateSketch.get.update(value.asInstanceOf[Float]) case IntegerType => wrapper.updateSketch.get.update(value.asInstanceOf[Int]) case LongType => wrapper.updateSketch.get.update(value.asInstanceOf[Long]) - case _ => throw new SparkUnsupportedOperationException( - s"Unsupported input type ${left.dataType.catalogString}", - Map("dataType" -> dataType.toString)) + case _ => throw new IllegalArgumentException( + s"Unsupported input type ${left.dataType.catalogString}") } } wrapper diff --git a/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggUnion.scala b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggUnion.scala index 1a17b34..107382a 100644 --- a/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggUnion.scala +++ b/src/main/scala/org/apache/spark/sql/theta/aggregate/ThetaSketchAggUnion.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate import org.apache.spark.sql.catalyst.trees.BinaryLike import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegerType, ThetaSketchWrapper, ThetaSketchType} -import org.apache.spark.SparkUnsupportedOperationException /** * Theta Union operation. @@ -60,9 +59,8 @@ case class ThetaSketchAggUnion( right.eval() match { case null => 12 case lgk: Int => lgk - case _ => throw new SparkUnsupportedOperationException( - s"Unsupported input type ${right.dataType.catalogString}", - Map("dataType" -> dataType.toString)) + case _ => throw new IllegalArgumentException( + s"Unsupported input type ${right.dataType.catalogString}") } } @@ -97,9 +95,8 @@ case class ThetaSketchAggUnion( left.dataType match { case ThetaSketchType => wrapper.union.get.union(Sketch.wrap(Memory.wrap(bytes.asInstanceOf[Array[Byte]]))) - case _ => throw new SparkUnsupportedOperationException( - s"Unsupported input type ${left.dataType.catalogString}", - Map("dataType" -> dataType.toString)) + case _ => throw new IllegalArgumentException( + s"Unsupported input type ${left.dataType.catalogString}") } } wrapper