-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathincrementalMain.py
48 lines (34 loc) · 1.29 KB
/
incrementalMain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# IMPORTS
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
import sys
import json
# FILE IMPORTS
from incrementalModels import incLR, incNB, incSVM, incKM
from PreProcessing import preproc
spark = SparkSession.builder.master('local[2]').appName('Sentiment').getOrCreate()
ssc = StreamingContext(spark.sparkContext, 1)
sqlContext = SQLContext(spark)
spark.sparkContext.setLogLevel('ERROR')
def streamer(rdd):
rddValues = rdd.collect()
if(len(rddValues) > 0):
SCHEMA = ['Sentiment', 'Tweet']
df = spark.createDataFrame(json.loads(rddValues[0]).values(), SCHEMA)
df = df.na.drop()
df = preproc(df)
accuracy_logRegression = incLR.logRegression(df)
print('Incremental Logistic Regression Accuracy =', accuracy_logRegression)
accuracy_NB = incNB.nBayes(df)
print('Incremental Naive Bayes Accuracy =', accuracy_NB)
accuracy_SVM = incSVM.linSVC(df)
print('Incremental SVM Accuracy =', accuracy_SVM)
error_KMM = incKM.kmm(df)
print('Incremental K-Means Clustering Squared Error =', error_KMM)
dstream = ssc.socketTextStream("localhost", 6100)
dstream1 = dstream.flatMap(lambda line: line.split("\n"))
dstream1.foreachRDD(lambda x : streamer(x))
ssc.start()
ssc.awaitTermination()