Skip to content

Commit 8f48d7d

Browse files
committed
POC: Fan-in fan-out on top of Scala Graph
1 parent 857dc64 commit 8f48d7d

File tree

4 files changed

+33
-6
lines changed

4 files changed

+33
-6
lines changed

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ lazy val `pipeline-sdk` =
6363
ZIO.core,
6464
ZIO.schema,
6565
ZIO.schemaDerivation,
66-
Scalaland.chimney
66+
Scalaland.chimney,
67+
"com.chuusai" %% "shapeless" % "2.3.10"
6768
)
6869
)
6970
.dependsOn(`pipeline-dsl-macros`)

pipeline-sdk/src/main/scala/ai/nikin/pipeline/model/dsl/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ai.nikin.pipeline
22
package model
33

44
import io.scalaland.chimney.dsl._
5+
import shapeless.{::, HList, HNil}
56

67
package object dsl {
78

@@ -23,9 +24,20 @@ package object dsl {
2324

2425
lazy final val schema: zio.schema.Schema[DATA] = s
2526

27+
def &[T <: Product](l: Lake[T]): CombinedLake[T :: DATA :: HNil] =
28+
CombinedLake[T :: DATA :: HNil](Set(this, l))
29+
2630
def toUntyped: UntypedLake = this.transformInto[UntypedLake]
2731
}
2832

33+
case class CombinedLake[L <: HList](lakes: Set[Lake[_]])
34+
extends Vertex[CombinedLake[L]](s"combined-${lakes.map(_.name).mkString("&")}") {
35+
final override type IN = L
36+
final override type OUT = L
37+
38+
def &[T <: Product](l: Lake[T]): CombinedLake[T :: L] = CombinedLake[T :: L](lakes + l)
39+
}
40+
2941
sealed abstract class Transformation[_IN, _OUT](n: String)
3042
extends Vertex[Transformation[_IN, _OUT]](n) {
3143
final override type IN = _IN

pipeline-sdk/src/main/scala/ai/nikin/pipeline/sdk/package.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,19 @@ package object sdk {
2121
implicit def toPipelineBuilder[V <: Vertex[V]](v: V): PipelineBuilder[V] =
2222
PipelineBuilder(v, Graph.empty[Vertex[_], DiEdge])
2323

24+
import shapeless.HList
25+
2426
class PipelineBuilder[SELF <: Vertex[SELF]](
2527
private[sdk] val v: SELF,
2628
private[sdk] val graph: PipelineDef
2729
) {
2830
def >>>[
2931
V <: VertexTO[SELF, V]
3032
](next: V)(implicit @unused ev: CanMakeEdge[SELF, V]): PipelineBuilder[V] =
31-
PipelineBuilder(next, graph ++ Set(v ~> next))
33+
v match {
34+
case cb: CombinedLake[_] => PipelineBuilder(next, graph ++ cb.lakes.map(_ ~> next))
35+
case _ => PipelineBuilder(next, graph ++ Set(v ~> next))
36+
}
3237
}
3338

3439
private object PipelineBuilder {
@@ -45,6 +50,12 @@ package object sdk {
4550
implicit def lakeToTransform[DATA <: Product, OUT]: CanMakeEdge[Lake[DATA], Transformation[DATA, OUT]] =
4651
CanMakeEdge[Lake[DATA], Transformation[DATA, OUT]]()
4752

53+
implicit def combinedLakeToTransform[L <: HList, OUT]: CanMakeEdge[CombinedLake[L], Transformation[L, OUT]] =
54+
CanMakeEdge[CombinedLake[L], Transformation[L, OUT]]()
55+
56+
implicit def transformToCombinedLake[DATA <: HList, IN]: CanMakeEdge[Transformation[IN, DATA], CombinedLake[DATA]] =
57+
CanMakeEdge[Transformation[IN, DATA], CombinedLake[DATA]]()
58+
4859
def aggregation[IN <: Product, OUT <: Product](name: String, f: AggregationFunction)(implicit
4960
inTypeTag: WeakTypeTag[IN],
5061
outTypeTag: WeakTypeTag[OUT]
@@ -54,7 +65,7 @@ package object sdk {
5465
typeTag: WeakTypeTag[DATA]
5566
): Lake[DATA] = Lake(name, extractFQN(typeTag))
5667

57-
private[sdk] def extractFQN[T](typeTag: WeakTypeTag[T]): String = typeTag.tpe.typeSymbol.fullName
68+
private[sdk] def extractFQN[T](typeTag: WeakTypeTag[T]): String = typeTag.tpe.toString
5869

5970
@compileTimeOnly("enable macro paradise")
6071
class Schema extends StaticAnnotation {

pipeline-sdk/src/test/scala/ai/nikin/pipeline/sdk/SdkSpec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package ai.nikin.pipeline.sdk
22

33
import ai.nikin.pipeline.model.dsl._
44
import AggregationFunction.{Avg, Sum}
5-
import ai.nikin.pipeline.sdk.schemas.{RecordA, RecordB}
5+
import ai.nikin.pipeline.sdk.schemas.{RecordA, RecordB, RecordC}
6+
import shapeless._
7+
import zio.schema.DeriveSchema.gen
68

79
class SdkSpec extends TestUtils {
810
test("SDK - aggregation to lake") {
@@ -30,8 +32,9 @@ class SdkSpec extends TestUtils {
3032

3133
test("SDK - lake to aggregation to lake") {
3234
val pipeline =
33-
lake[RecordA]("lA") >>> aggregation[RecordA, RecordB]("tAB", Avg("col1", "col2")) >>>
34-
lake[RecordB]("lB")
35+
(lake[RecordA]("lA") & lake[RecordB]("lB") & lake[RecordC]("lC")) >>>
36+
aggregation[RecordC :: RecordB :: RecordA :: HNil, RecordC :: RecordB :: HNil]("tAB", Avg("col1", "col2")) >>>
37+
(lake[RecordB]("lB") & lake[RecordC]("lC"))
3538

3639
println(pipeline.graph)
3740
}

0 commit comments

Comments
 (0)