Skip to content
This repository has been archived by the owner on Aug 11, 2023. It is now read-only.

Commit

Permalink
Added grouped; batched; foldF
Browse files Browse the repository at this point in the history
  • Loading branch information
vitaliihonta committed Feb 1, 2019
1 parent 0622edf commit 3fb4dcf
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 45 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
Project: Trembita
Current version: 0.8.3-SNAPSHOT
Current version: 0.8.4-SNAPSHOT
Scala version: 2.11.12, 2.12.8
---

Expand All @@ -18,7 +18,7 @@ Trembita allows you to make complicated transformation pipelines where some of t
```scala
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
libraryDependencies ++= {
val trembitaV = "0.8.3-SNAPSHOT"
val trembitaV = "0.8.4-SNAPSHOT"
Seq(
"ua.pp.itkpi" %% "trembita-kernel" % trembitaV, // kernel,

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Dependencies._

lazy val snapshot: Boolean = true
lazy val v: String = {
val vv = "0.8.3"
val vv = "0.8.4"
if (!snapshot) vv
else vv + "-SNAPSHOT"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,32 @@ trait operations {
override def map[A, B](fa: Future[A])(f: A => B): Future[B] = monadError.map(fa)(f)
def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = monadError.tailRecM(a)(f)
}

implicit def sourceCanBeGrouped[Mat]: CanGrouped[Source[?, Mat]] = new CanGrouped[Source[?, Mat]] {
def grouped[A](fa: Source[A, Mat], n: Int): Source[Iterable[A], Mat] = fa.grouped(n)
}

implicit def sourceCanBeBatcher[Mat]: CanBatched[Source[?, Mat]] = new CanBatched[Source[?, Mat]] {
def batched[A](fa: Source[A, Mat], parts: Int): Source[Iterable[A], Mat] =
fa.sliding(parts)
}

implicit def akkaCanFoldFuture[Mat](implicit mat: ActorMaterializer): CanFoldF[Source[?, Mat], Future] =
new CanFoldF[Source[?, Mat], Future] {
def foldF[A, B](fa: Source[A, Mat])(zero: B)(
f: (B, A) => Future[B]
): Future[B] = fa.runWith(Sink.foldAsync(zero)(f))
}

implicit def akkaCanFoldEffect[F[_]: Effect, Mat](implicit mat: ActorMaterializer): CanFoldF[Source[?, Mat], F] =
new CanFoldF[Source[?, Mat], F] {
def foldF[A, B](fa: Source[A, Mat])(zero: B)(
f: (B, A) => F[B]
): F[B] =
Effect[F].liftIO(IO.fromFuture(IO {
fa.runWith(Sink.foldAsync(zero)((b, a) => Effect[F].toIO(f(b, a)).unsafeToFuture()))
}))
}
}

class AkkaCollectionOutput[Col[x] <: Iterable[x], F[_], Mat](implicit async: Async[F], mat: ActorMaterializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicInteger
import cats._
import cats.implicits._
import cats.effect._
import trembita.internal.ListUtils
import trembita.internal.BatchUtils
import org.scalatest.FlatSpec
import trembita._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import trembita.ql.{AggDecl, AggRes, GroupingCriteria, QueryBuilder, QueryResult
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, Dataset, Encoder, SparkSession}
import shapeless.=:!=

import scala.collection.parallel.immutable.ParVector
import scala.concurrent.{ExecutionContext, Future}
import scala.language.{higherKinds, implicitConversions}
Expand Down
7 changes: 6 additions & 1 deletion kernel/src/main/scala/trembita/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,10 @@ object Output {
@inline def fold[A](zero: A)(f: (A, A) => A): foldDsl[A] = new foldDsl[A](zero -> f)
@inline def foldLeft[A, B](zero: B)(f: (B, A) => B): foldLeftDsl[A, B] = new foldLeftDsl[A, B](zero -> f)
val size: sizeDsl = new sizeDsl()
@inline def ignore[A] = foreach[A](_ => {})

private val ignoreInstance: foreachDsl[Any] = foreach[Any](_ => {})

@inline def ignore[A]: foreachDsl[A] = ignoreInstance.asInstanceOf[foreachDsl[A]]

@inline def foldF[F[_], A, B](zero: B)(f: (B, A) => F[B]): foldFDsl[F, A, B] = new foldFDsl[F, A, B](zero -> f)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import scala.collection.generic.CanBuildFrom
/**
* Used in [[trembita]]
**/
object ListUtils {
object BatchUtils {

/**
* Having some {{{ Iterable[A] = 1 to 10 }}}
Expand Down
18 changes: 18 additions & 0 deletions kernel/src/main/scala/trembita/operations/CanBatched.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package trembita.operations

import trembita.internal.BatchUtils
import scala.collection.parallel.immutable.ParVector
import scala.language.higherKinds

trait CanBatched[F[_]] {
def batched[A](fa: F[A], parts: Int): F[Iterable[A]]
}

object CanBatched {
implicit val vectorCanBeBatched: CanBatched[Vector] = new CanBatched[Vector] {
def batched[A](fa: Vector[A], parts: Int): Vector[Iterable[A]] = BatchUtils.batch(parts)(fa).toVector
}
implicit val parVectorCanBeCatched: CanBatched[ParVector] = new CanBatched[ParVector] {
def batched[A](fa: ParVector[A], parts: Int): ParVector[Iterable[A]] = BatchUtils.batch(parts)(fa.seq).toVector.par
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import scala.collection.parallel.immutable.ParVector
import scala.language.higherKinds
import scala.reflect.ClassTag
import trembita.collections._
import trembita.internal.ListUtils
import trembita.internal.BatchUtils

import scala.collection.{immutable, mutable}

Expand Down Expand Up @@ -35,7 +35,7 @@ object CanCombineByKeyWithParallelism {
fa: ParVector[(K, V)],
parallelism: Int
)(init: V => C, addValue: (C, V) => C, mergeCombiners: (C, C) => C): ParVector[(K, C)] = {
val batched = ListUtils.batch(parts = parallelism)(fa.seq)
val batched = BatchUtils.batch(parts = parallelism)(fa.seq)
val processed = batched.par
.map { batch =>
val iterator = batch.iterator
Expand Down
32 changes: 32 additions & 0 deletions kernel/src/main/scala/trembita/operations/CanFoldF.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package trembita.operations

import cats.Monad
import cats.syntax.all._
import scala.collection.parallel.immutable.ParVector
import scala.language.higherKinds

trait CanFoldF[F[_], G[_]] {
def foldF[A, B](fa: F[A])(zero: B)(f: (B, A) => G[B]): G[B]
}

object CanFoldF {
implicit def canFoldFVector[F[_]: Monad]: CanFoldF[Vector, F] = new CanFoldF[Vector, F] {
def foldF[A, B](fa: Vector[A])(zero: B)(f: (B, A) => F[B]): F[B] =
fa.foldLeft(zero.pure[F])(
(gb, a) =>
gb.flatMap { b =>
f(b, a)
}
)
}

implicit def canFoldFParVector[F[_]: Monad]: CanFoldF[ParVector, F] = new CanFoldF[ParVector, F] {
def foldF[A, B](fa: ParVector[A])(zero: B)(f: (B, A) => F[B]): F[B] =
fa.foldLeft(zero.pure[F])(
(gb, a) =>
gb.flatMap { b =>
f(b, a)
}
)
}
}
16 changes: 16 additions & 0 deletions kernel/src/main/scala/trembita/operations/CanGrouped.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package trembita.operations
import scala.collection.parallel.immutable.ParVector
import scala.language.higherKinds

trait CanGrouped[F[_]] {
def grouped[A](fa: F[A], n: Int): F[Iterable[A]]
}

object CanGrouped {
implicit val vectorCanBeGrouped: CanGrouped[Vector] = new CanGrouped[Vector] {
def grouped[A](fa: Vector[A], n: Int): Vector[Iterable[A]] = fa.grouped(n).toVector
}
implicit val parVectorCanBeGrouped: CanGrouped[ParVector] = new CanGrouped[ParVector] {
def grouped[A](fa: ParVector[A], n: Int): ParVector[Iterable[A]] = fa.seq.grouped(n).toVector.par
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ trait EnvironmentDependentOps[F[_], A, E <: Environment] extends Any {
)(implicit canDistinctBy: CanDistinctBy[E#Repr], F: Monad[F], A: ClassTag[A], E: E, run: E#Run[F]): DataPipelineT[F, A, E] =
`this`.mapRepr(canDistinctBy.distinctBy(_)(f))

def grouped(
size: Int
)(implicit canGrouped: CanGrouped[E#Repr], F: Monad[F], A: ClassTag[A], E: E, run: E#Run[F]): DataPipelineT[F, Iterable[A], E] =
`this`.mapRepr(canGrouped.grouped(_, size))

def batched(
parts: Int
)(implicit canBatched: CanBatched[E#Repr], F: Monad[F], A: ClassTag[A], E: E, run: E#Run[F]): DataPipelineT[F, Iterable[A], E] =
`this`.mapRepr(canBatched.batched(_, parts))

private def widen(run: E#Run[F])(implicit E: E): E.Run[F] =
run.asInstanceOf[E.Run[F]]

Expand Down
15 changes: 13 additions & 2 deletions kernel/src/main/scala/trembita/outputs/internal/OutputDsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package trembita.outputs.internal
import cats.kernel.Monoid
import cats.{~>, Applicative, Id, Monad}
import trembita._
import trembita.operations.{CanFold, CanReduce, HasBigSize, HasSize}
import trembita.operations._
import trembita.outputs.Keep

import scala.collection.generic.CanBuildFrom
Expand Down Expand Up @@ -390,6 +390,17 @@ object foldLeftDsl extends LowPriorityFoldLeftConversions {
): OutputT.Aux[Id, A, E, λ[(G[_], β) => G[B]]] = dsl[Id, E](canFold)(identityK[Id])
}

class foldFDsl[F[_], A, B](val `this`: (B, (B, A) => F[B])) extends AnyVal {
def apply[E <: Environment](implicit canFold: CanFoldF[E#Repr, F], B: ClassTag[B]) =
new FoldFOutput[F, A, B, E](`this`._1)(`this`._2)(canFold)
}

object foldFDsl {
implicit def dslToOutputId[F[_], A, B: ClassTag, E <: Environment](dsl: foldFDsl[F, A, B])(
implicit canFold: CanFoldF[E#Repr, F]
): OutputT.Aux[F, A, E, λ[(G[_], β) => G[B]]] = dsl[E]
}

class sizeDsl(val `dummy`: Boolean = true) extends AnyVal with Serializable {
def apply[F[_], A, E <: Environment](ev: HasSize[E#Repr])(arrow: ev.Result ~> F) =
new SizeOutput[F, A, E, ev.Result](ev)(arrow)
Expand Down Expand Up @@ -432,4 +443,4 @@ object sizeDsl extends LowPrioritySizeConversions {
implicit hasSize: HasBigSize.Aux[E#Repr, Id]
): OutputT.Aux[Id, A, E, λ[(G[_], β) => G[Long]]] =
dsl[Id, A, E](hasSize)(identityK[Id])
}
}
22 changes: 18 additions & 4 deletions kernel/src/main/scala/trembita/outputs/internal/ReduceOutput.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package trembita.outputs.internal

import cats.{Monad, ~>}
import cats.{~>, Monad}
import trembita._
import trembita.operations.{CanFold, CanReduce, HasBigSize, HasSize}
import trembita.operations._

import scala.reflect.ClassTag
import scala.language.higherKinds
Expand Down Expand Up @@ -62,10 +62,24 @@ class SizeOutput[F[_], @specialized(Specializable.BestOfBreed) A, E <: Environme
F.flatMap(pipeline.evalRepr)(repr => arrow(hasSize.size(repr)))
}

class SizeOutput2[F[_], @specialized(Specializable.BestOfBreed) A, E <: Environment, R0[_]](hasSize: HasBigSize.Aux[E#Repr, R0])(arrow: R0 ~> F)
extends OutputT[F, A, E] {
class SizeOutput2[F[_], @specialized(Specializable.BestOfBreed) A, E <: Environment, R0[_]](hasSize: HasBigSize.Aux[E#Repr, R0])(
arrow: R0 ~> F
) extends OutputT[F, A, E] {
type Out[G[_], β] = G[Long]

def apply(pipeline: DataPipelineT[F, A, E])(implicit F: Monad[F], E: E, run: E#Run[F], A: ClassTag[A]): F[Long] =
F.flatMap(pipeline.evalRepr)(repr => arrow(hasSize.size(repr)))
}

class FoldFOutput[F[_], @specialized(Specializable.BestOfBreed) A, @specialized(Specializable.BestOfBreed) B: ClassTag, E <: Environment](
zero: B
)(f: (B, A) => F[B])(canFold: CanFoldF[E#Repr, F])
extends OutputT[F, A, E] {
type Out[G[_], b] = G[B]
override def apply(
pipeline: DataPipelineT[F, A, E]
)(implicit F: Monad[F], E: E, run: E#Run[F], A: ClassTag[A]): F[B] =
F.flatMap(pipeline.evalRepr) { repr =>
canFold.foldF(repr)(zero)(f)
}
}
Loading

0 comments on commit 3fb4dcf

Please sign in to comment.