Skip to content

nkcodec/customstream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 

Repository files navigation

Custom Stream Processor

This is a generic Spark Streaming application designed to read from a data source, process the data, and write it to a sink. The application is built to be configurable and extensible, allowing for different readers, processors, and writers to be plugged in via configuration.

Project Structure

The project follows a standard Scala project layout:

src
├── main
│   ├── resources
│   │   └── config/application_delta2.conf  # Application configuration
│   └── scala
│       ├── Main.scala              # Main entry point for the application
│       ├── domain/                 # Case classes for data models (e.g., RawUserEvent, UserEngagementResult)
│       ├── io/                     # Data readers (sources) and writers (sinks)
│       ├── usecase/                # Business logic and stream processing
│       └── readSample.scala        # Sample scala file
└── test
    └── scala

Core Components

  • Main.scala: The entry point of the application. It initializes Spark, reads the configuration, and wires together the reader, processor, and writer.
  • DataReader: Responsible for reading data from a source (e.g., Kafka, Kinesis, files).
  • StreamProcessor: Contains the core business logic for transforming the input data stream.
  • DataWriter: Responsible for writing the processed data to a sink (e.g., Delta Lake, console).

Configuration

The application is configured using HOCON (Human-Optimized Config Object Notation) format. The main configuration file is located at src/main/resources/config/application_delta2.conf.

This file defines:

  • The job name.
  • The fully qualified class names for the DataReader, StreamProcessor, and DataWriter to be used.
  • Specific configurations for the reader, processor, and writer components.

How to Run

  1. Build the project:

    mvn clean package
  2. Submit the Spark job: Use spark-submit to run the application. You will need to provide the application JAR, and any other dependencies.

    spark-submit --class Main \
      --master local[*] \
      target/your-project-jar.jar

Extensibility

To add a new data source, processing step, or sink:

  1. Implement the corresponding DataReader, StreamProcessor, or DataWriter trait.
  2. Update the application.conf file to point to your new implementation.

Knowledge Base: Implementing Custom Components

This project is designed around a set of core abstractions (DataReader, DataWriter, StreamProcessor) that allow for easy extension and customization. Below is a guide on how to implement each of these components.

1. DataReader[T]

The DataReader trait is responsible for reading data from an external source and returning it as a Dataset[T].

Trait Definition:

package io

import com.typesafe.config.Config
import org.apache.spark.sql.{Dataset, SparkSession}
import scala.reflect.runtime.universe.TypeTag

trait DataReader[T] {
    def read(config: Config)(implicit spark: SparkSession, tt: TypeTag[T]): Dataset[T]
}

Implementation Example: JsonReader

The JsonReader implementation reads data from a directory of JSON files.

package io.source

import com.typesafe.config.Config
import domain.RawUserEvent
import io.DataReader
import org.apache.spark.sql.{Dataset, SparkSession}

class JsonReader extends DataReader[RawUserEvent] {
    override def read(config: Config)(implicit spark: SparkSession, tt: TypeTag[RawUserEvent]): Dataset[RawUserEvent] = {
        import spark.implicits._

        val inputPath = config.getString("options.path")

        spark.readStream
            .schema(Encoders.product[RawUserEvent].schema)
            .format("json")
            .load(inputPath)
            .as[RawUserEvent]
    }
}

To implement your own DataReader:

  1. Create a class that extends DataReader[YourCaseClass].
  2. Implement the read method to connect to your data source (e.g., Kafka, Kinesis) and transform the data into a Dataset of your case class.
  3. Use the provided Config object to fetch any necessary configurations, such as file paths, server addresses, or topic names.

2. StreamProcessor[IN, OUT]

The StreamProcessor trait is where the core business logic of your streaming application resides. It takes an input Dataset[IN] and transforms it into an output Dataset[OUT].

Trait Definition:

package usecase.contract

import com.typesafe.config.Config
import org.apache.spark.sql.{Dataset, SparkSession}

trait StreamProcessor[IN, OUT] {
    def process(input: Dataset[IN], config: Config)(implicit spark: SparkSession): Dataset[OUT]
}

Implementation Example: UserEngagementProcessor

This processor calculates user engagement metrics (e.g., clicks, purchases) over a sliding window.

package usecase

import domain.{RawUserEvent, UserEngagementResult}
import org.apache.spark.sql.functions.window
import org.apache.spark.sql.{Dataset, SparkSession}
import usecase.contract.StreamProcessor

class UserEngagementProcessor extends StreamProcessor[RawUserEvent, UserEngagementResult] {
    override def process(input: Dataset[RawUserEvent], config: Config)(implicit spark: SparkSession): Dataset[UserEngagementResult] = {
        import spark.implicits._

        val windowDuration = s"${config.getInt("settings.windowDurationSeconds")} seconds"

        input
            .withWatermark("eventTimestamp", "10 seconds")
            .groupBy(window($"eventTimestamp", windowDuration), $"userId")
            .agg(
                // ... aggregation logic ...
            )
            .as[UserEngagementResult]
    }
}

To implement your own StreamProcessor:

  1. Define your input and output case classes (e.g., MyInput, MyOutput).
  2. Create a class that extends StreamProcessor[MyInput, MyOutput].
  3. Implement the process method to apply your business logic using Spark's Dataset API.

3. DataWriter[T]

The DataWriter trait is responsible for writing a Dataset[T] to a sink.

Trait Definition:

package io

import com.typesafe.config.Config
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.streaming.DataStreamWriter

trait DataWriter[T] {
    def write(data: Dataset[T], config: Config): DataStreamWriter[T]
}

Implementation Example: DeltaWriter

The DeltaWriter writes the stream to a Delta Lake table.

package io.sink

import com.typesafe.config.Config
import io.DataWriter
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.streaming.DataStreamWriter

class DeltaWriter[T] extends DataWriter[T] {
    override def write(data: Dataset[T], config: Config): DataStreamWriter[T] = {
        val outputPath = config.getString("options.path")
        val checkpointLocation = config.getString("checkpointLocation")

        data.writeStream
            .format("delta")
            .outputMode(config.getString("outputMode"))
            .option("checkpointLocation", checkpointLocation)
            .option("path", outputPath)
    }
}

To implement your own DataWriter:

  1. Create a class that extends DataWriter[YourCaseClass].
  2. Implement the write method to configure the DataStreamWriter for your desired sink (e.g., console, Kafka, another database).
  3. Use the Config object to get parameters like output paths, checkpoint locations, and output modes.

Summary Template: How to Add a New Pipeline

Here is a quick template to follow when adding a new end-to-end streaming pipeline.

  1. Define Data Models:

    • Create your input and output case classes in the domain package (e.g., MyInput.scala, MyOutput.scala).
  2. Implement DataReader:

    • Create a new class under io.source that extends DataReader[MyInput].
    • Implement the read method to ingest data from your source.
  3. Implement StreamProcessor:

    • Create a new class under usecase that extends StreamProcessor[MyInput, MyOutput].
    • Implement the process method to define your business logic.
  4. Implement DataWriter:

    • Create a new class under io.sink that extends DataWriter[MyOutput].
    • Implement the write method to send data to your sink.
  5. Update Configuration:

    • In your configuration file (e.g., application_delta2.conf), update the class paths to point to your new implementations and provide any necessary options.
    job {
      name = "MyNewStream"
      reader {
        class = "io.source.MyNewReader"
        // ... reader-specific options
      }
      processor {
        class = "usecase.MyNewProcessor"
        // ... processor-specific options
      }
      writer {
        class = "io.sink.MyNewWriter"
        // ... writer-specific options
      }
    }

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages