Skip to content

nkcodec/streamcore

Repository files navigation

StreamCore

A configuration-driven Spark Structured Streaming framework built with Scala. StreamCore allows you to build and run data streaming pipelines by defining sources, processors, and sinks in a configuration file, without changing the application code.

Core Concepts

The framework is built around three main components that are dynamically loaded at runtime based on your job configuration:

  • Readable: A data source responsible for reading data from a streaming or batch source. It creates the initial DataFrame.
  • Processable: A data processor that contains the business logic. It takes an input DataFrame, applies transformations, and returns an output DataFrame.
  • Writable: A data sink responsible for writing the final DataFrame to a destination.

The JobRunner is the main entry point of the application. It reads the job configuration, instantiates the required reader, processor, and writer, and executes the Spark streaming query.

Available Modules

Sources (Readable)

  • io.nkcodec.streamcore.api.source.JsonReader: Reads data from a directory of JSON files.
  • io.nkcodec.streamcore.api.source.ParquetReader: Reads data from a directory of Parquet files.
  • io.nkcodec.streamcore.api.source.KafkaReader: Reads data from a Kafka topic.
  • io.nkcodec.streamcore.api.source.DeltaReader: Reads data from a Delta Lake table.

Sinks (Writable)

  • io.nkcodec.streamcore.api.sink.ParquetWriter: Writes data to a directory as Parquet files.
  • io.nkcodec.streamcore.api.sink.KafkaWriter: Writes data to a Kafka topic.
  • io.nkcodec.streamcore.api.sink.DeltaWriter: Writes data to a Delta Lake table.

Creating Custom Components

StreamCore is designed to be extensible. You can create your own sources, processors, and sinks by implementing the corresponding traits from the io.nkcodec.streamcore.api.ports package.

A Note on the Design (Ad-Hoc Polymorphism)

Before diving into the examples, it's helpful to understand the design pattern used in StreamCore. The framework is built on a concept called ad-hoc polymorphism, which is achieved in Scala using traits.

A trait is similar to an interface in other languages. It defines a contract—a set of methods that a class must implement. In our case, the Readable, Processable, and Writable traits define the contracts for data sources, processors, and sinks.

For example, the Readable[T] trait declares that any class implementing it must have a read method that returns a Dataset[T].

trait Readable[T] {
  def read(config: Config)(implicit spark: SparkSession): Dataset[T]
}

This allows the JobRunner to be completely decoupled from the specific implementations. It only needs to know that it's working with an object of type Readable, not whether it's a KafkaReader or a custom MyApiReader. At runtime, the correct implementation is loaded based on your configuration, and its read method is called. This makes the system highly extensible and modular.

1. Create a Custom Source

To create a custom data source, implement the Readable[T] trait. The read method should return a Dataset[T].

Example: com.mycompany.stream.sources.RateReader.scala

package com.mycompany.stream.sources

import com.typesafe.config.Config
import io.nkcodec.streamcore.api.ports.Readable
import org.apache.spark.sql.{Dataset, SparkSession}

case class Rate(timestamp: java.sql.Timestamp, value: Long)

class RateReader extends Readable[Rate] {
  override def read(config: Config)(implicit spark: SparkSession): Dataset[Rate] = {
    import spark.implicits._
    spark.readStream
      .format("rate")
      .option("rowsPerSecond", config.getInt("rowsPerSecond"))
      .load()
      .as[Rate]
  }
}

2. Create a Custom Processor

To create a custom data processor, implement the Processable[IN, OUT] trait. The process method takes an input Dataset[IN] and returns a transformed Dataset[OUT].

Example: com.mycompany.stream.processors.RateProcessor.scala

package com.mycompany.stream.processors

import com.typesafe.config.Config
import io.nkcodec.streamcore.api.ports.Processable
import org.apache.spark.sql.{Dataset, SparkSession}

case class Rate(timestamp: java.sql.Timestamp, value: Long)
case class ProcessedRate(time: java.sql.Timestamp, value: Long, isEven: Boolean)

class RateProcessor extends Processable[Rate, ProcessedRate] {
  override def process(input: Dataset[Rate], config: Config)(implicit spark: SparkSession): Dataset[ProcessedRate] = {
    import spark.implicits._
    input.map(rate => ProcessedRate(rate.timestamp, rate.value, rate.value % 2 == 0))
  }
}

3. Create a Custom Sink

To create a custom data sink, implement the Writable[T] trait. The write method should configure and return a DataStreamWriter[T].

Example: com.mycompany.stream.sinks.ConsoleWriter.scala

package com.mycompany.stream.sinks

import com.typesafe.config.Config
import io.nkcodec.streamcore.api.ports.Writable
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}

case class ProcessedRate(time: java.sql.Timestamp, value: Long, isEven: Boolean)

class ConsoleWriter extends Writable[ProcessedRate] {
  override def write(data: Dataset[ProcessedRate], config: Config): DataStreamWriter[ProcessedRate] = {
    data.writeStream
      .format("console")
      .outputMode(config.getString("outputMode"))
      .trigger(Trigger.ProcessingTime(config.getString("trigger.processingTime")))
      .option("truncate", "false")
  }
}

4. Update Job Configuration

Finally, update your job configuration file to use your new custom components.

Example: jobs/custom-pipeline.conf

job {
  name = "CustomPipelineStream"

  reader {
    class = "com.mycompany.stream.sources.RateReader"
    rowsPerSecond = 1
  }

  processor {
    class = "com.mycompany.stream.processors.RateProcessor"
  }

  writer {
    class = "com.mycompany.stream.sinks.ConsoleWriter"
    outputMode = "update"
    trigger.processingTime = "10 seconds"
  }
}

How to Run a Job

To run a streaming job, you need to provide a configuration file. The application is executed using spark-submit.

1. Create a Job Configuration File

Create a HOCON (.conf) file that defines your pipeline. This file specifies the classes for the reader, processor, and writer, along with their specific parameters.

Example: jobs/kafka-to-parquet.conf

job {
  name = "KafkaToParquetStream"

  reader {
    class = "io.nkcodec.streamcore.api.source.KafkaReader"
    kafka.bootstrap.servers = "localhost:9092"
    subscribe = "input-topic"
  }

  // Assuming you have a custom processor class
  processor {
    class = "com.mycompany.stream.processors.MyDataProcessor"
    // Custom processor parameters go here
  }

  writer {
    class = "io.nkcodec.streamcore.api.sink.ParquetWriter"
    path = "/path/to/output/data"
    checkpointLocation = "/path/to/checkpoints"
    trigger.processingTime = "1 minute"
    outputMode = "append"
  }
}

2. Submit the Job

Package your application into a JAR file (including any custom processors) and run it using spark-submit.

spark-submit --class io.nkcodec.streamcore.api.JobRunner \
  --master <your-spark-master> \
  /path/to/your/streamcore-app.jar \
  --job-conf jobs/kafka-to-parquet.conf

Key Takeaways

  • Configuration-Driven: Define and modify your streaming pipelines by changing a configuration file. No code changes are needed for existing components.
  • Extensible by Design: The framework is built on traits (Readable, Processable, Writable), allowing you to easily add new sources, sinks, or processing logic.
  • Decoupled Architecture: The core application (JobRunner) is completely separate from the implementation of your pipeline's components.
  • Ad-Hoc Polymorphism: Leverages Scala's traits to dynamically load components at runtime, making the system highly modular and flexible.
  • Powered by Spark: Built on top of Spark Structured Streaming, inheriting its performance, scalability, and fault-tolerance.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages