Skip to content

Commit a148ad8

Browse files
committed
POC: Fan-in fan-out on top of Scala Graph
1 parent 3ad570d commit a148ad8

File tree

4 files changed

+31
-6
lines changed

4 files changed

+31
-6
lines changed

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ lazy val `pipeline-sdk` =
6363
ZIO.core,
6464
ZIO.schema,
6565
ZIO.schemaDerivation,
66-
Scalaland.chimney
66+
Scalaland.chimney,
67+
"com.chuusai" %% "shapeless" % "2.3.10"
6768
)
6869
)
6970
.dependsOn(`pipeline-dsl-macros`)

pipeline-sdk/src/main/scala/ai/nikin/pipeline/model/dsl/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ai.nikin.pipeline
22
package model
33

44
import io.scalaland.chimney.dsl._
5+
import shapeless.{::, HList, HNil}
56

67
package object dsl {
78

@@ -23,9 +24,20 @@ package object dsl {
2324

2425
lazy final val schema: zio.schema.Schema[DATA] = s
2526

27+
def &[T <: Product](l: Lake[T]): CombinedLake[T :: DATA :: HNil] =
28+
CombinedLake[T :: DATA :: HNil](Set(this, l))
29+
2630
def toUntyped: UntypedLake = this.transformInto[UntypedLake]
2731
}
2832

33+
case class CombinedLake[L <: HList](lakes: Set[Lake[_]])
34+
extends Vertex[CombinedLake[L]](s"combined-${lakes.map(_.name).mkString("&")}") {
35+
final override type IN = L
36+
final override type OUT = L
37+
38+
def &[T <: Product](l: Lake[T]): CombinedLake[T :: L] = CombinedLake[T :: L](lakes + l)
39+
}
40+
2941
sealed abstract class Transformation[_IN, _OUT](n: String)
3042
extends Vertex[Transformation[_IN, _OUT]](n) {
3143
final override type IN = _IN

pipeline-sdk/src/main/scala/ai/nikin/pipeline/sdk/package.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package object sdk {
1313
import scalax.collection.GraphEdge.DiEdge
1414
import scalax.collection.immutable.Graph
1515
import scalax.collection.GraphPredef._
16+
import shapeless.HList
1617

1718
type PipelineDef = Graph[Vertex[_], DiEdge]
1819

@@ -28,7 +29,10 @@ package object sdk {
2829
def >>>[
2930
V <: VertexTO[SELF, V]
3031
](next: V)(implicit @unused ev: CanMakeEdge[SELF, V]): PipelineBuilder[V] =
31-
new PipelineBuilder(next, graph ++ Set(v ~> next))
32+
v match {
33+
case cb: CombinedLake[_] => new PipelineBuilder(next, graph ++ cb.lakes.map(_ ~> next))
34+
case _ => new PipelineBuilder(next, graph ++ Set(v ~> next))
35+
}
3236
}
3337

3438
type VertexTO[FROM <: Vertex[FROM], TO <: Vertex[TO] { type IN = FROM#OUT }] =
@@ -40,6 +44,12 @@ package object sdk {
4044
implicit def lakeToTransform[DATA <: Product, OUT]: CanMakeEdge[Lake[DATA], Transformation[DATA, OUT]] =
4145
CanMakeEdge[Lake[DATA], Transformation[DATA, OUT]]()
4246

47+
implicit def combinedLakeToTransform[L <: HList, OUT]: CanMakeEdge[CombinedLake[L], Transformation[L, OUT]] =
48+
CanMakeEdge[CombinedLake[L], Transformation[L, OUT]]()
49+
50+
implicit def transformToCombinedLake[DATA <: HList, IN]: CanMakeEdge[Transformation[IN, DATA], CombinedLake[DATA]] =
51+
CanMakeEdge[Transformation[IN, DATA], CombinedLake[DATA]]()
52+
4353
def aggregation[IN <: Product, OUT <: Product](name: String, f: AggregationFunction)(implicit
4454
inTypeTag: WeakTypeTag[IN],
4555
outTypeTag: WeakTypeTag[OUT]
@@ -49,7 +59,7 @@ package object sdk {
4959
typeTag: WeakTypeTag[DATA]
5060
): Lake[DATA] = Lake(name, extractFQN(typeTag))
5161

52-
private[sdk] def extractFQN[T](typeTag: WeakTypeTag[T]): String = typeTag.tpe.typeSymbol.fullName
62+
private[sdk] def extractFQN[T](typeTag: WeakTypeTag[T]): String = typeTag.tpe.toString
5363

5464
implicit def schemaGen[T]: ZSchema[T] = macro zio.schema.DeriveSchema.genImpl[T]
5565

pipeline-sdk/src/test/scala/ai/nikin/pipeline/sdk/SdkSpec.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package ai.nikin.pipeline.sdk
22

33
import ai.nikin.pipeline.model.dsl._
44
import AggregationFunction.{Avg, Sum}
5-
import ai.nikin.pipeline.sdk.schemas.{RecordA, RecordB}
5+
import ai.nikin.pipeline.sdk.schemas.{RecordA, RecordB, RecordC}
6+
import shapeless._
67

78
class SdkSpec extends TestUtils {
89
test("SDK - aggregation to lake") {
@@ -30,8 +31,9 @@ class SdkSpec extends TestUtils {
3031

3132
test("SDK - lake to aggregation to lake") {
3233
val pipeline =
33-
lake[RecordA]("lA") >>> aggregation[RecordA, RecordB]("tAB", Avg("col1", "col2")) >>>
34-
lake[RecordB]("lB")
34+
(lake[RecordA]("lA") & lake[RecordB]("lB") & lake[RecordC]("lC")) >>>
35+
aggregation[RecordC :: RecordB :: RecordA :: HNil, RecordC :: RecordB :: HNil]("tAB", Avg("col1", "col2")) >>>
36+
(lake[RecordB]("lB") & lake[RecordC]("lC"))
3537

3638
println(pipeline.graph)
3739
}

0 commit comments

Comments
 (0)