Skip to content

Commit

Permalink
feat: Add spark cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
1ambda committed Nov 15, 2021
1 parent 02ae696 commit bce9a4c
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 2 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
./idea
./idea
.DS_Store
34 changes: 34 additions & 0 deletions _script/docker-spark/apps/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,date_format

def init_spark():
sql = SparkSession.builder\
.appName("trip-app")\
.config("spark.jars", "/opt/spark-apps/postgresql-42.2.22.jar")\
.getOrCreate()
sc = sql.sparkContext
return sql,sc

def main():
url = "jdbc:postgresql://storage-postgres:5432/postgres"
properties = {
"user": "postgres",
"password": "root",
"driver": "org.postgresql.Driver"
}
file = "/opt/spark-data/MTA_2014_08_01.csv"
sql,sc = init_spark()

df = sql.read.load(file,format = "csv", inferSchema="true", sep="\t", header="true") \
.withColumn("report_hour",date_format(col("time_received"),"yyyy-MM-dd HH:00:00")) \
.withColumn("report_date",date_format(col("time_received"),"yyyy-MM-dd"))

# Filter invalid coordinates
df.where("latitude <= 90 AND latitude >= -90 AND longitude <= 180 AND longitude >= -180") \
.where("latitude != 0.000000 OR longitude != 0.000000 ") \
.write \
.jdbc(url=url, table="mta_reports", mode='append', properties=properties) \
.save()

if __name__ == '__main__':
main()
Binary file added _script/docker-spark/apps/postgresql-42.2.22.jar
Binary file not shown.
3 changes: 3 additions & 0 deletions _script/docker-spark/conf/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
spark.eventLog.dir file:/tmp/spark-events
spark.eventLog.enabled true
spark.history.fs.logDirectory file:/tmp/spark-events
1 change: 1 addition & 0 deletions _script/docker-spark/data/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
10 changes: 9 additions & 1 deletion docker-compose.spark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ services:
ports:
- "8080:8080"
- "7077:7077"
- "4040:4040"
volumes:
- ./_script/docker-spark/apps:/opt/spark-apps
- ./_script/docker-spark/data:/opt/spark-data
- ./_script/docker-spark/conf:/spark/conf
- /tmp/spark-events-local:/tmp/spark-events
environment:
- INIT_DAEMON_STEP=setup_spark

Expand All @@ -22,6 +25,8 @@ services:
volumes:
- ./_script/docker-spark/apps:/opt/spark-apps
- ./_script/docker-spark/data:/opt/spark-data
- ./_script/docker-spark/conf:/spark/conf
- /tmp/spark-events-local:/tmp/spark-events
environment:
- "SPARK_MASTER=spark://spark-master:7077"
spark-worker-2:
Expand All @@ -35,6 +40,8 @@ services:
volumes:
- ./_script/docker-spark/apps:/opt/spark-apps
- ./_script/docker-spark/data:/opt/spark-data
- ./_script/docker-spark/conf:/spark/conf
- /tmp/spark-events-local:/tmp/spark-events
environment:
- "SPARK_MASTER=spark://spark-master:7077"

Expand All @@ -46,9 +53,10 @@ services:
ports:
- "18081:18081"
volumes:
- /tmp/spark-events-local:/tmp/spark-events
- ./_script/docker-spark/apps:/opt/spark-apps
- ./_script/docker-spark/data:/opt/spark-data
- ./_script/docker-spark/conf:/spark/conf
- /tmp/spark-events-local:/tmp/spark-events
storage-postgres:
image: postgres:11.7-alpine
container_name: storage-postgers
Expand Down

0 comments on commit bce9a4c

Please sign in to comment.