Skip to content

Commit

Permalink
streaming data pipeline fully functional
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitish Bhardwaj authored and Nitish Bhardwaj committed Sep 26, 2024
1 parent f6b2560 commit 22a4063
Show file tree
Hide file tree
Showing 25 changed files with 590 additions and 303 deletions.
4 changes: 3 additions & 1 deletion Dockerfile.spark
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ USER root
RUN apt-get clean && \
apt-get update && \
apt-get install -y curl && \
apt-get install -y python3
apt-get install -y python3 && \
pip3 install confluent-kafka && \
pip3 install requests
34 changes: 17 additions & 17 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@ services:
SPARK_WORKER_MEMORY: 2g
SPARK_MASTER_URL: spark://spark-master:7077

spark-worker-2: &worker
container_name: spark-worker-2
hostname: spark-worker-2
build:
context: .
dockerfile: Dockerfile.spark
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
volumes:
- ./spark-jobs:/opt/bitnami/spark/spark-jobs:rw
- ./resources/schemas:/opt/bitnami/spark/resources/schemas:rw
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 2g
SPARK_MASTER_URL: spark://spark-master:7077
# spark-worker-2: &worker
# container_name: spark-worker-2
# hostname: spark-worker-2
# build:
# context: .
# dockerfile: Dockerfile.spark
# command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
# volumes:
# - ./spark-jobs:/opt/bitnami/spark/spark-jobs:rw
# - ./resources/schemas:/opt/bitnami/spark/resources/schemas:rw
# depends_on:
# - spark-master
# environment:
# SPARK_MODE: worker
# SPARK_WORKER_CORES: 1
# SPARK_WORKER_MEMORY: 1g
# SPARK_MASTER_URL: spark://spark-master:7077

broker:
image: confluentinc/cp-kafka:7.2.0
Expand Down
Binary file added kafka/__pycache__/config.cpython-310.pyc
Binary file not shown.
Binary file added kafka/__pycache__/ride_record.cpython-310.pyc
Binary file not shown.
Binary file added kafka/__pycache__/ride_record_key.cpython-310.pyc
Binary file not shown.
15 changes: 15 additions & 0 deletions kafka/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
INPUT_DATA_PATH = '../resources/data/rides.csv'

SCHEMA_REGISTRY_URL = 'http://localhost:8081'
BOOTSTRAP_SERVERS = 'localhost:9092'

# KAFKA PRODUCER
KAFKA_TOPIC = 'rides_avro'
RIDE_KEY_SCHEMA_PATH = '../resources/schemas/taxi_ride_key.avsc'
RIDE_VALUE_SCHEMA_PATH = '../resources/schemas/taxi_ride_value.avsc'

# KAFKA CONSUMER
CONSUME_TOPIC_TRIP_STATS = 'trip_stats'
CONSUME_TOPIC_TRIP_PICK_UP_COUNT_WINDOW = 'trip_pick_up_count_window'
CONSUME_TOPIC_TRIP_DROP_OFF_COUNT_WINDOW = 'trip_drop_off_count_window'
CONSUME_TOPIC_TRIP_STATS_WINDOW = 'trip_stats_window'
65 changes: 65 additions & 0 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import os
from typing import Dict, List

from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

from ride_record_key import dict_to_ride_record_key
from ride_record import dict_to_ride_record
from config import BOOTSTRAP_SERVERS, SCHEMA_REGISTRY_URL, RIDE_KEY_SCHEMA_PATH, RIDE_VALUE_SCHEMA_PATH, KAFKA_TOPIC


class RideAvroConsumer:
def __init__(self, props: Dict):
key_schema_str = self.load_schema(props['schema.key'])
value_schema_str = self.load_schema(props['schema.value'])
schema_registry_props = {'url': props['schema_registry.url']}
schema_registry_client = SchemaRegistryClient(schema_registry_props)
self.avro_key_deserializer = AvroDeserializer(schema_registry_client=schema_registry_client,
schema_str=key_schema_str,
from_dict=dict_to_ride_record_key)
self.avro_value_deserializer = AvroDeserializer(schema_registry_client=schema_registry_client,
schema_str=value_schema_str,
from_dict=dict_to_ride_record)

consumer_props = {'bootstrap.servers': props['bootstrap.servers'],
'group.id': 'nblabs.taxirides.avro.consumer.v1',
'auto.offset.reset': 'earliest'}
self.consumer = Consumer(consumer_props)

@staticmethod
def load_schema(schema_path: str):
path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/{schema_path}") as f:
schema_str = f.read()
return schema_str

def consume_from_kafka(self, topics: List[str]):
self.consumer.subscribe(topics=topics)
while True:
try:
msg = self.consumer.poll(1.0)
if msg is None:
continue
key = self.avro_key_deserializer(msg.key(), SerializationContext(msg.topic(), MessageField.KEY))
record = self.avro_value_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))

if record is not None:
print("{}, {}".format(key, record))
except KeyboardInterrupt:
break

self.consumer.close()


if __name__ == "__main__":
config = {
'bootstrap.servers': BOOTSTRAP_SERVERS,
'schema_registry.url': SCHEMA_REGISTRY_URL,
'schema.key': RIDE_KEY_SCHEMA_PATH,
'schema.value': RIDE_VALUE_SCHEMA_PATH,
}
avro_consumer = RideAvroConsumer(props=config)
avro_consumer.consume_from_kafka(topics=[KAFKA_TOPIC])
87 changes: 87 additions & 0 deletions kafka/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import csv
from typing import Dict, List

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField

from ride_record_key import RideRecordKey, ride_record_key_to_dict
from ride_record import RideRecord, ride_record_to_dict
from config import RIDE_KEY_SCHEMA_PATH, RIDE_VALUE_SCHEMA_PATH, \
SCHEMA_REGISTRY_URL, BOOTSTRAP_SERVERS, INPUT_DATA_PATH, KAFKA_TOPIC

def delivery_report(err, msg):
if err is not None:
print("Delivery failed for record {}: {}".format(msg.key(), err))
return
print('Record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))

class RideAvroProducer:
def __init__(self, props: Dict):
key_schema_str = self.load_schema(props['schema.key'])
value_schema_str = self.load_schema(props['schema.value'])
schema_registry_props = {'url': props['schema_registry.url']}
schema_registry_client = SchemaRegistryClient(schema_registry_props)
self.key_serializer = AvroSerializer(schema_registry_client, key_schema_str, ride_record_key_to_dict)
self.value_serializer = AvroSerializer(schema_registry_client, value_schema_str, ride_record_to_dict)

producer_props = {'bootstrap.servers': props['bootstrap.servers']}
self.producer = Producer(producer_props)

@staticmethod
def load_schema(schema_path: str):
path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/{schema_path}") as f:
schema_str = f.read()
return schema_str

@staticmethod
def delivery_report(err, msg):
if err is not None:
print("Delivery failed for record {}: {}".format(msg.key(), err))
return
print('Record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))

@staticmethod
def read_records(resource_path: str):
ride_records, ride_keys = [], []
with open(resource_path, 'r') as f:
reader = csv.reader(f)
header = next(reader)
for row in reader:
ride_records.append(RideRecord(arr=row))
ride_keys.append(RideRecordKey(vendor_id=int(row[0])))
return zip(ride_keys, ride_records)

def publish(self, topic: str, records: [RideRecordKey, RideRecord]): # type: ignore
for key_value in records:
key, value = key_value
try:
print(value)
self.producer.produce(topic=topic,
key=self.key_serializer(key,
SerializationContext(topic=topic, field=MessageField.KEY)),
value=self.value_serializer(value,
SerializationContext(topic=topic, field=MessageField.VALUE)),
on_delivery=delivery_report)
except KeyboardInterrupt:
break
except Exception as e:
print(f"Exception while producing record -> {value}: {e}")

self.producer.flush()

if __name__ == "__main__":
config = {
'bootstrap.servers': BOOTSTRAP_SERVERS,
'schema_registry.url': SCHEMA_REGISTRY_URL,
'schema.key': RIDE_KEY_SCHEMA_PATH,
'schema.value': RIDE_VALUE_SCHEMA_PATH
}
producer = RideAvroProducer(props=config)
ride_records = producer.read_records(resource_path=INPUT_DATA_PATH)
producer.publish(topic=KAFKA_TOPIC, records=ride_records)
59 changes: 59 additions & 0 deletions kafka/ride_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import List, Dict
from datetime import datetime

class RideRecord:
def __init__(self, arr: List[str]):
self.vendor_id = int(arr[0])
self.tpep_pickup_datetime = int(datetime.strptime(arr[1], "%Y-%m-%d %H:%M:%S").timestamp() * 1000)
self.tpep_dropoff_datetime = int(datetime.strptime(arr[2], "%Y-%m-%d %H:%M:%S").timestamp() * 1000)
self.passenger_count = int(arr[3])
self.trip_distance = float(arr[4])
self.rate_code_id = int(arr[5])
self.store_and_fwd_flag = arr[6]
self.pu_location_id = int(arr[7])
self.do_location_id = int(arr[8])
self.payment_type = int(arr[9])
self.fare_amount = float(arr[10])
self.extra = float(arr[11])
self.mta_tax = float(arr[12])
self.tip_amount = float(arr[13])
self.tolls_amount = float(arr[14])
self.improvement_surcharge = float(arr[15])
self.total_amount = float(arr[16])
self.congestion_surcharge = float(arr[17])

@classmethod
def from_dict(cls, d: Dict):
return cls(arr=[
d['vendor_id'],
d['tpep_pickup_datetime'][0],
d['tpep_dropoff_datetime'][0],
d['passenger_count'],
d['trip_distance'],
d['rate_code_id'],
d['store_and_fwd_flag'],
d['pu_location_id'],
d['do_location_id'],
d['payment_type'],
d['fare_amount'],
d['extra'],
d['mta_tax'],
d['tip_amount'],
d['tolls_amount'],
d['improvement_surcharge'],
d['total_amount'],
d['congestion_surcharge'],
])

def __repr__(self):
return f'{self.__class__.__name__}: {self.__dict__}'


def dict_to_ride_record(obj, ctx):
if obj is None:
return None

return RideRecord.from_dict(obj)

def ride_record_to_dict(ride_record: RideRecord, ctx):
return ride_record.__dict__
22 changes: 22 additions & 0 deletions kafka/ride_record_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Dict

class RideRecordKey:
def __init__(self, vendor_id):
self.vendor_id = vendor_id

@classmethod
def from_dict(cls, d: Dict):
return cls(vendor_id= d['vendor_id'])

def __repr__(self):
return f'{self.__class__.__name__}: {self.__dict__}'

def dict_to_ride_record_key(obj, ctx):
if obj is None:
return None

return RideRecordKey.from_dict(obj)

def ride_record_key_to_dict(ride_record_key: RideRecordKey, ctx):
return ride_record_key.__dict__

59 changes: 59 additions & 0 deletions kafka/spark_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
from typing import Dict, List

from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.admin import AdminClient

from config import BOOTSTRAP_SERVERS, SCHEMA_REGISTRY_URL, CONSUME_TOPIC_TRIP_STATS_WINDOW


class AvroConsumerClass:
def __init__(self, props: Dict):
schema_str = self.get_schema_from_schema_registry(props['schema_registry.url'], f"{CONSUME_TOPIC_TRIP_STATS_WINDOW}-value")
schema_registry_client = SchemaRegistryClient({'url': props['schema_registry.url']})
self.avro_value_deserializer = AvroDeserializer(schema_registry_client=schema_registry_client,
schema_str=schema_str)

consumer_props = {
'bootstrap.servers': props['bootstrap.servers'],
'group.id': props['group.id'],
'auto.offset.reset': props['auto.offset.reset']
}

self.consumer = Consumer(consumer_props)

@staticmethod
def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
sr = SchemaRegistryClient({'url': schema_registry_url})
latest_version = sr.get_latest_version(schema_registry_subject)
return latest_version.schema.schema_str

def consumer_from_kafka(self, topics: List[str]):
self.consumer.subscribe(topics=topics)
while True:
try:
msg = self.consumer.poll(1.0)
if msg is None:
continue
record = self.avro_value_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
if record is not None:
print("{}".format(record))
except KeyboardInterrupt:
break

self.consumer.close()


if __name__ == "__main__" :
config = {
'bootstrap.servers': BOOTSTRAP_SERVERS,
'auto.offset.reset': 'earliest',
'group.id': 'nblabs.taxirides.avro.consumer.v1',
'schema_registry.url': SCHEMA_REGISTRY_URL,
}

avro_consumer = AvroConsumerClass(props=config)
avro_consumer.consumer_from_kafka(topics=[CONSUME_TOPIC_TRIP_STATS_WINDOW])
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
confluent_kafka
kafka
pyspark
requests
fastavro
Loading

0 comments on commit 22a4063

Please sign in to comment.