Skip to content
This repository has been archived by the owner on Aug 11, 2023. It is now read-only.

Commit

Permalink
INITIAL COMMIT
Browse files Browse the repository at this point in the history
  • Loading branch information
vitaliihonta committed Jan 14, 2019
0 parents commit a6e845f
Show file tree
Hide file tree
Showing 11 changed files with 685 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
target/
.idea
*.iml
*.log
logs/
.DS_Store
.bloop/
.metals/
10 changes: 10 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
style = defaultWithAlign
maxColumn = 140
align.openParenCallSite = false
align.openParenDefnSite = false
align = true
danglingParentheses = true

rewrite.rules = [RedundantBraces, RedundantParens, SortImports, PreferCurlyFors]
rewrite.redundantBraces.includeUnitMethods = true
rewrite.redundantBraces.stringInterpolation = true
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
language: scala
scala:
- 2.12.8
- 2.11.12

script:
- sbt clean coverage test coverageReport

after_success:
- bash <(curl -s https://codecov.io/bash)
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2018 dataroot

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
215 changes: 215 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
---
Project: Trembita
Current version: 0.7.2-SNAPSHOT
Scala version: 2.11.12, 2.12.8
---

[![codecov](https://codecov.io/gh/vitaliihonta/trembita/branch/master/graph/badge.svg)](https://codecov.io/gh/vitaliihonta/trembita)
[![Build Status](https://travis-ci.com/vitaliihonta/trembita.svg?branch=master)](https://travis-ci.com/vitaliihonta/trembita)

![Cats Friendly Badge](https://typelevel.org/cats/img/cats-badge-tiny.png)

<img src="https://raw.githubusercontent.com/vitaliihonta/trembita/master/assets/trembita-p.png" alt="trembita"/>

## Description
Project Trembita - Functional Data Pipelining library.
Lets you query and transform your data in a pure functional, typesafe & declarative way.
Trembita allows you to make complicated transformation pipelines where some of them are executed locally sequentially, locally in parallel on in other environments (for instance on Spark cluster, see below)

```scala
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
libraryDependencies ++= {
val trembitaV = "0.7.2-SNAPSHOT"
Seq(
"com.github.vitaliihonta.trembita" %% "trembita-kernel" % trembitaV, // kernel,

"com.github.vitaliihonta.trembita" %% "trembita-cassandra-connector" % trembitaV, // cassandra

"com.github.vitaliihonta.trembita" %% "trembita-cassandra-connector-phantom" % trembitaV, // phantom

"com.github.vitaliihonta.trembita" %% "trembita-slf4j" % trembitaV // slf4j, for logging
)
}
```

## Core features

- [Typesafe querying dsl](./examples/src/main/scala/com/examples/kernel/QLSample.scala) for data pipelines provides a unified model and typechecking for various data sources (including collections and Spark RDD)
- [Purely functional stateful transformations using Finite State Machines](./examples/src/main/scala/com/examples/kernel/FSMSample.scala) provides dsl for defining FSM that can be run on various data source (collections, Spark Datasets, Akka Streams...)
- [Caching](./caching)
- [Logging](./utils/logging)

## Available Integrations
- Apache Spark ([core](http://spark.apache.org/docs/latest/rdd-programming-guide.html), [SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html))
- Apache Spark [Streaming](http://spark.apache.org/docs/latest/streaming-programming-guide.html)
- [Akka Streams](https://doc.akka.io/docs/akka/current/stream/)
- [Cassandra](http://cassandra.apache.org/)
- Cassandra using [phantom](https://github.com/outworkers/phantom))
- [Infinispan](http://infinispan.org/)

## Processing modules
- [kernel](./kernel) - lazy (parallel) data pipelines, QL for grouping/aggregations and stateful computations using [Cats](https://github.com/typelevel/cats) and [Shapeless](https://github.com/milessabin/shapeless)

## Data sources
- Any `Iterable` - just wrap your collection into `DataPipeline`
- [cassandra connector](./cassandra_connector) - fetch rows from your `Cassandra` database with `CassandraSource`
- [cassandra phantom](./cassandra_connector_phantom) - provides [Phantom](https://github.com/outworkers/phantom) library support
- [akka stream](./integrations/akka/streams) - allows to make pipeline from akka stream (e.g. from any data source compatible with akka)
- [spark RDD / DataSet](./integrations/spark/core) - allows to make pipeline from RDD / DataSet (e.g. from any non-streaming data source compatible with Spark)

## Miscelone
- [trembita slf4j](./utils/logging/slf4j) - provides [slf4j](https://www.slf4j.org/) logging support. Use it with any compatible logging backend (for instance [logback](https://logback.qos.ch/))
- [trembita log4j](./utils/logging/log4j) - provides [log4j](https://logging.apache.org/log4j/2.x/manual/scala-api.html) logging support.

## Experimental: Spark support
### Introducing spark pipelines
You can run some your transformations on [spark](http://spark.apache.org/) cluster.
To do that, add the following dependencies:
```scala
libraryDependencies ++= Seq(
"com.github.vitaliihonta.trembita" %% "trembita-spark" % trembitaV,
"org.apache.spark" %% "spark-core" % "2.4.0" // first spark version with scala 2.12 support
)
```
### Asynchronous computations in spark
Using spark integration you can even easily run asynchronous computations on spark with Futures:
```scala
import com.github.trembita._
import com.github.trembita.experimental.spark._
import org.apache.spark._
import scala.concurrent.{ExecutionContext, Future}
import java.util.concurrent.Executors

implicit val sc: SparkContext = ??? // requires implicit SparkContext in scope
implicit val timeout: Timeout = Timeout(5.minutes) // requires implicit timeout for async operations
implicit val ec: ExecutionContext = ???

val cachedThreadPool =
ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

val numbers = DataPipelineT[Future, Int](1, 2, 3, 20, 40, 60) // some basic pipeline
// will be executed on spark
numbers
.to[Spark]
// below transformations will be executed on spark
.map(_ + 1)
.mapM { i: Int =>
val n = Future { i + 1 }(cachedThreadPool)
val b = Future {
val x = 1 + 2
x * 3
}

for {
nx <- n
bx <- b
} yield nx + bx
}
.eval // collects results into driver program
```
Trembita will do the best to transform async lambda into serializable format.
By default a special macro detects all references to `ExecutionContext` within lambda you pass into `mapM`.
All `ExecutionContext`'s should be globally accessible (e.g. need to be `def` or `val` in some object).
If not - your code won't compile with appropriate error.
If everything is ok - macro creates helper object with references to all found `ExecutionContext`s making them `@transient lazy val` (well known technique) and rewrites your lambda so that all async transformations references to fields in that object.
You can find full example [here](./examples/src/main/scala/com/examples/spark/Main.scala).

Happy to say that using `cats.effect.IO` on spark is also supported =)
### FSM on Spark Datasets
You can now define stateful transformations on Spark Dataset using Finite State Machines.
It's implemented using `Dataset.mapWithState`.
Defining FSM for Spark is as simple as defining FSM for regular pipeline except of state is preserved only at level for specific `key` (due to `mapWithState` limitation).
To do so, use `fsmByKey`:
```scala
val pipeline: DataPipelineT[F, A, Spark] = ???
pipeline.fsmByKey(getKey = ???)(... /* your FSM definition here */)
```
Full example can be found [here](./examples/src/main/scala/com/examples/spark/FSMSample.scala).
### Typesafe QL on RDD
See the full example [here](./examples/src/main/scala/com/examples/spark/QLExample.scala)
### Limitations
- Be careful not to make closures against the `SparkContext` or `SparkSession` because it will fall in runtime
- Other non-serializable resources also will fail in runtime. This will be adapted later

### Examples
You can find a script to run the example on spark cluster within docker:
```bash
# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run.sh
```
To run Spark FSM example in docker use the following script:
```bash
# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run_fsm.sh
```

To run Spark QL example in docker use the following script:
```bash
# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run_ql.sh
```

Before running QL please remove [spire](https://github.com/non/spire) jars from spark classpath to avoid dependency conflicts

## Experimental: Akka streams support
Trembita now supports running a part of your transformations on [akka-streams](https://doc.akka.io/docs/akka/current/stream/).
To use it, add the following dependency:
```scala
libraryDependencies += "com.github.vitaliihonta.trembita" %% "trembita-akka-streams" % trembitaV
```

You can run existing pipeline through akka stream or create a pipeline from source directly:
```scala
import akka.stream.scaladsl._
import com.github.trembita.experimental.akka._

val fileLines =
DataPipelineT
.fromReprF[IO, ByteString, Akka](IO {
FileIO
.fromPath(Paths.get(getClass.getResource("/words.txt").toURI))
.mapMaterializedValue(_ => NotUsed)
})
```

Akka streaming pipelines also support `FSM` using custom graph state:
```scala
val pipeline: DataPipelineT[IO, Int, Akka] = ???
val stateful = pipeline.fsm(/* your FSM definition here */)
```
You can find full examples [here](./examples/src/main/scala/com/examples/akka)

## Seamless Akka to Spark integration
Add the following dependency if you wan't to run your pipeline through both akka streams and spark RDD:
```scala
libraryDependencies += "com.github.vitaliihonta.trembita" %% "trembita-seamless-akka-spark" % trembitaV
```
It goal is to avoid additional overhead when switching between akka and spark.
`Akka -> Spark` is implemented using custom Sink.
`Spark -> Akka` is implemented using `toLocalIterator`

## Experimental: Spark streaming support
Trembita now allows to write `QL` and `FSM` upon [spark DStreams](https://spark.apache.org/docs/latest/streaming-programming-guide.html).
```scala
libraryDependencies += "com.github.vitaliihonta.trembita" %% "trembita-spark-streaming" % trembitaV
```

For examples see [here](./examples/src/main/scala/com/examples/spark/streaming)
Run scripts:
- [basic](./examples/src/main/resources/spark/cluster/run_streaming.sh)
- [FSM](./examples/src/main/resources/spark/cluster/run_streaming_fsm.sh)
- [QL](./examples/src/main/resources/spark/cluster/run_streaming_ql.sh)

## To be done
- [x] caching
- [x] integration with distributed streaming frameworks
- [ ] tensorflow
- [ ] slick

## What means `trembita`?
<img src="http://typical.if.ua/wp-content/uploads/2015/12/213.jpg" alt="trembita"/>

Trembita is a alpine horn made of wood. It is common among Ukrainian highlanders Hutsuls who used to live in western Ukraine, eastern Poland, Slovakia and northern Romania. In southern Poland it's called trombita, bazuna in the North and ligawka in central Poland.
Binary file added assets/trembita-p.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit a6e845f

Please sign in to comment.