Skip to content
This repository has been archived by the owner on Aug 11, 2023. It is now read-only.

Commit

Permalink
Improving trembita-ql
Browse files Browse the repository at this point in the history
  • Loading branch information
vitaliihonta committed Feb 7, 2019
1 parent dd6d73f commit 99a5e2b
Show file tree
Hide file tree
Showing 19 changed files with 520 additions and 318 deletions.
2 changes: 1 addition & 1 deletion bench/src/main/scala/trembita/WordsCountFiles.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object WordsCount {
.sequential[Vector]
.create(lines)
.mapConcat(_.split("\\W+"))
.groupBy(identity)
.groupByKey(identity)
.mapValues(_.size)
.into(Output.vector)
.run
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/scala/com/examples/akka/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object Main extends IOApp {
val wordsCount: DataPipelineT[IO, String, Akka[Future[IOResult]]] = fileLines
.map(_.utf8String)
.mapConcat(_.split("\\s"))
.groupBy(identity _)
.groupByKey(identity _)
.mapValues(_.size)
.map { case (word, count) => s"`$word` occurs $count times" }
.mapRepr(
Expand Down
53 changes: 26 additions & 27 deletions examples/src/main/scala/com/examples/kernel/QLSample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,40 @@ object QLSample extends IOApp {
Input.parallelF[IO, Seq].create(IO { 1L to 20L })

val result = numbers
.query(
_.where(_ > 5)
.groupBy(
expr[Long](_ % 2 == 0) as "divisible by 2",
expr[Long](_ % 3 == 0) as "divisible by 3",
expr[Long](_ % 4) as "reminder of 4"
)
.aggregate(
expr[Long](num => (num * num).toDouble) agg avg as "square",
col[Long] agg count as "count",
expr[Long](num => num * num * num * num) agg sum as "^4",
expr[Long](_.toString) agg sum as "some name"
)
.where(_ > 5)
.groupBy(
expr[Long](_ % 2 == 0) as "divisible by 2",
expr[Long](_ % 3 == 0) as "divisible by 3",
expr[Long](_ % 4) as "reminder of 4"
)
.aggregate(
expr[Long](num => (num * num).toDouble) agg avg as "square",
col[Long] agg count as "count",
expr[Long](num => num * num * num * num) agg sum as "^4",
expr[Long](_.toString) agg sum as "some name"
)
.compile
.into(Output.vector)
.run

val numbersDP = Input
.parallelF[IO, Seq]
.create(IO { 15L to 40L })
.query(
_.groupBy(
expr[Long](_ % 2 == 0) as "divisible by 2",
expr[Long](_ % 3 == 0) as "divisible by 3",
expr[Long](_ % 4) as "reminder of 4",
expr[Long](_.toString.length) as "length"
).aggregate(
expr[Long](num => (num * num).toDouble) agg avg as "square",
col[Long] agg count as "count",
expr[Long](num => num * num * num * num) agg sum as "^4",
expr[Long](_.toString) agg sum as "some name",
expr[Long](_.toDouble) agg stdev as "STDEV"
)
.having(agg[String]("some name")(_.contains('1')))
.groupBy(
expr[Long](_ % 2 == 0) as "divisible by 2",
expr[Long](_ % 3 == 0) as "divisible by 3",
expr[Long](_ % 4) as "reminder of 4",
expr[Long](_.toString.length) as "length"
)
.aggregate(
expr[Long](num => (num * num).toDouble) agg avg as "square",
col[Long] agg count as "count",
expr[Long](num => num * num * num * num) agg sum as "^4",
expr[Long](_.toString) agg sum as "some name",
expr[Long](_.toDouble) agg stdev as "STDEV"
)
.having(agg[String]("some name")(_.contains('1')))
.compile
.as[NumbersReport] // transforms directly into case class
.into(Output.vector)
.run
Expand Down
31 changes: 15 additions & 16 deletions examples/src/main/scala/com/examples/spark/QLExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,22 @@ object QLExample extends IOApp {
.to[Spark]

val result = numbers
.query(
_.where(_ > 5)
.groupBy(
expr[Long](_ % 2 == 0) as "divisible by 2",
expr[Long](_ % 3 == 0) as "divisible by 3",
expr[Long](_ % 4) as "reminder of 4",
expr[Long](_.toString.length) as "length"
)
.aggregate(
expr[Long](num => (num * num).toDouble) agg avg as "square",
col[Long] agg count as "count",
expr[Long](num => num * num * num * num) agg sum as "^4",
expr[Long](_.toString) agg sum as "some name",
expr[Long](_.toDouble) agg stdev as "STDEV"
)
.having(agg[Long]("count")(_ > 7))
.where(_ > 5)
.groupBy(
expr[Long](_ % 2 == 0) as "divisible by 2",
expr[Long](_ % 3 == 0) as "divisible by 3",
expr[Long](_ % 4) as "reminder of 4",
expr[Long](_.toString.length) as "length"
)
.aggregate(
expr[Long](num => (num * num).toDouble) agg avg as "square",
col[Long] agg count as "count",
expr[Long](num => num * num * num * num) agg sum as "^4",
expr[Long](_.toString) agg sum as "some name",
expr[Long](_.toDouble) agg stdev as "STDEV"
)
.having(agg[Long]("count")(_ > 7))
.compile
.as[NumbersReport]

result.into(Output.array).run.flatMap(vs => putStrLn(vs.mkString("\n")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,22 @@ object QLExample extends IOApp {
Input.liftF[IO, SparkStreaming].createF(IO { 1L to 2000L })

val result = numbers
.query(
_.where(_ > 5)
.groupBy(
expr[Long](_ % 2 == 0) as "divisible by 2",
expr[Long](_ % 3 == 0) as "divisible by 3",
expr[Long](_ % 4) as "reminder of 4",
expr[Long](_.toString.length) as "length"
)
.aggregate(
expr[Long](num => (num * num).toDouble) agg avg as "square",
col[Long] agg count as "count",
expr[Long](num => num * num * num * num) agg sum as "^4",
expr[Long](_.toString) agg sum as "some name",
expr[Long](_.toDouble) agg stdev as "STDEV"
)
.having(agg[Long]("count")(_ > 7))
.where(_ > 5)
.groupBy(
expr[Long](_ % 2 == 0) as "divisible by 2",
expr[Long](_ % 3 == 0) as "divisible by 3",
expr[Long](_ % 4) as "reminder of 4",
expr[Long](_.toString.length) as "length"
)
.aggregate(
expr[Long](num => (num * num).toDouble) agg avg as "square",
col[Long] agg count as "count",
expr[Long](num => num * num * num * num) agg sum as "^4",
expr[Long](_.toString) agg sum as "some name",
expr[Long](_.toDouble) agg stdev as "STDEV"
)
.having(agg[Long]("count")(_ > 7))
.compile
.as[NumbersReport]

result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import scala.reflect.runtime.universe.TypeTag
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

package object spark extends LowPriorityInstancesForSpark {
package object spark extends LowPriorityInstancesForSpark with trembitaqlForSpark {
type SerializableFuture[+A] = SerializableFutureImpl.NewType[A]
val SerializableFuture = SerializableFutureImpl

Expand Down Expand Up @@ -108,14 +108,6 @@ package object spark extends LowPriorityInstancesForSpark {
with MagnetlessSparkIOOps[A, E]

implicit class SparkOps[F[_], A](val `this`: DataPipelineT[F, A, Spark]) extends AnyVal {
def query[G <: GroupingCriteria, T <: AggDecl, R <: AggRes, Comb](
queryF: QueryBuilder.Empty[A] => Query[A, G, T, R, Comb]
)(implicit trembitaqlForSpark: trembitaqlForSpark[A, G, T, R, Comb],
run: Spark#Run[F],
F: Monad[F],
A: ClassTag[A]): DataPipelineT[F, QueryResult[A, G, R], Spark] =
`this`.mapRepr(trembitaqlForSpark.apply(_, queryF))

def mapM[B: ClassTag](magnet: MagnetF[F, A, B, Spark])(implicit F: Monad[F]): DataPipelineT[F, B, Spark] =
`this`.mapMImpl[A, B](magnet.prepared)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,24 @@
package trembita.spark

import trembita.ql.QueryBuilder.Query
import trembita.ql._
import cats.Monad
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

import scala.annotation.implicitNotFound
import trembita.DataPipelineT
import trembita.ql.QueryBuilder.Query
import trembita.ql._
import scala.language.higherKinds
import scala.reflect.ClassTag

@implicitNotFound("""
Aggregation upon ${A}
- with grouping ${G}
- aggregations ${T}
- and expected result ${R}
cannot be performed in Spark.
Please ensure implicit SparkSession in scope and ClassTag's for your data types
""")
trait trembitaqlForSpark[A, G <: GroupingCriteria, T <: AggDecl, R <: AggRes, Comb] extends Serializable {
def apply(rdd: RDD[A], queryF: QueryBuilder.Empty[A] => Query[A, G, T, R, Comb]): RDD[QueryResult[A, G, R]]
}
object trembitaqlForSpark {
trait trembitaqlForSpark {
implicit def rddBased[A: ClassTag, G <: GroupingCriteria: ClassTag, T <: AggDecl, R <: AggRes, Comb](
implicit spark: SparkSession
): trembitaqlForSpark[A, G, T, R, Comb] =
new trembitaqlForSpark[A, G, T, R, Comb] {
override def apply(
rdd: RDD[A],
queryF: QueryBuilder.Empty[A] => Query[A, G, T, R, Comb]
): RDD[QueryResult[A, G, R]] = {

val query: Query[A, G, T, R, Comb] = queryF(new QueryBuilder.Empty[A])
): trembitaql[A, G, T, R, Comb, Spark] =
new trembitaql[A, G, T, R, Comb, Spark] {
def apply[F[_]](query: Query[F, A, Spark, G, T, R, Comb])(
implicit F: Monad[F],
ex: Spark,
run: RunOnSpark[F]
): DataPipelineT[F, QueryResult[A, G, R], Spark] = {

val getG: A => G = query.getG
val getT: A => T = query.getT
Expand All @@ -57,21 +45,24 @@ object trembitaqlForSpark {
def mergeCombiners(c1: (Comb, Vector[A]), c2: (Comb, Vector[A])): (Comb, Vector[A]) =
aggF.combine(c1._1, c2._1) -> (orderedVs(c1._2) ++ orderedVs(c2._2))

val transformed = rdd
.map(a => getG(a) -> a)
.combineByKey[(Comb, Vector[A])](
createCombiner _,
mergeValue _,
mergeCombiners _
)
.flatMap {
case (group, (comb, vs)) =>
val aggRes = aggF.extract(comb).result
if (query.havingF(aggRes)) List(QueryResult[A, G, R](group, aggRes, orderedVs(vs)))
else Nil
}
query.pipeline
.mapRepr { rdd =>
val transformed = rdd
.map(a => getG(a) -> a)
.combineByKey[(Comb, Vector[A])](
createCombiner _,
mergeValue _,
mergeCombiners _
)
.flatMap {
case (group, (comb, vs)) =>
val aggRes = aggF.extract(comb).result
if (query.havingF(aggRes)) List(QueryResult[A, G, R](group, aggRes, orderedVs(vs)))
else Nil
}

sortAfterAgg(transformed)
sortAfterAgg(transformed)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class SparkSpec extends FlatSpec with BeforeAndAfterAll {
"DataPipeline.groupBy" should "group elements" in {
val pipeline = Input.rdd.create(sc.parallelize(Seq(1, 2, 3, 4)))
val grouped: Array[(Boolean, List[Int])] = pipeline
.groupBy(_ % 2 == 0)
.groupByKey(_ % 2 == 0)
.mapValues(_.toList)
.sortBy(_._1)
.into(Output.array)
Expand Down Expand Up @@ -120,7 +120,7 @@ class SparkSpec extends FlatSpec with BeforeAndAfterAll {
IO(List("a" -> 1, "b" -> 2, "c" -> 3, "a" -> 3, "c" -> 10))
)
.to[Spark]
.groupBy(_._1)
.groupByKey(_._1)
.mapValues(_.foldLeft(0) { case (acc, (_, x)) => acc + x } * 10)
.map { case (k, v) => s"{key=$k, value=$v}" }
.sorted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,8 @@ import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.language.{higherKinds, implicitConversions}

package object streaming extends injections {
package object streaming extends injections with trembitaqlForSparkStreaming {
implicit class SparkStreamingOps[F[_], A](val `this`: DataPipelineT[F, A, SparkStreaming]) extends AnyVal {
def query[G <: GroupingCriteria, T <: AggDecl, R <: AggRes, Comb](
queryF: QueryBuilder.Empty[A] => Query[A, G, T, R, Comb]
)(implicit trembitaqlForSparkStreaming: trembitaqlForSparkStreaming[A, G, T, R, Comb],
run: Spark#Run[F],
F: Monad[F],
A: ClassTag[A]): DataPipelineT[F, QueryResult[A, G, R], SparkStreaming] =
`this`.mapRepr(trembitaqlForSparkStreaming.apply(_, queryF))

def fsmByKey[K: ClassTag, N, D, B: ClassTag](getKey: A => K)(
initial: InitialState[N, D, F]
)(fsmF: FSM.Empty[F, N, D, A, B] => FSM.Func[F, N, D, A, B])(
Expand All @@ -42,7 +34,6 @@ package object streaming extends injections {
magnet: MagnetF[F, A, B, SparkStreaming]
)(implicit F: SerializableMonad[F]): DataPipelineT[F, B, SparkStreaming] =
`this`.mapMImpl[A, B](magnet.prepared)

}

implicit def magnetFFromSpark[F[_], A, B](
Expand Down
Loading

0 comments on commit 99a5e2b

Please sign in to comment.