import org.apache.spark.input.PortableDataStream
import java.util.zip.ZipInputStream
import java.io.BufferedReader
import java.io.InputStreamReader
sc.hadoopConfiguration.set("fs.s3a.access.key", "*****") sc.hadoopConfiguration.set("fs.s3a.secret.key", "*****")
sc.hadoopConfiguration.setInt("fs.s3a.connection.maximum", 10000)
val eventRDD = sc.binaryFiles("s3a://telecom.gdelt/20171[0-9]*.export.CSV.zip")
.flatMap{case (name: String, content: PortableDataStream) => val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ => val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
eventRDD.count()
val evtest = eventRDD.map(x => x.split("\t")).map(row => row.mkString(";")).map(x => x.split(";")).toDF()
val events = evtest.withColumn("_tmp", $"value").select($"_tmp".getItem(0).as("GlobalEventID"),$"_tmp".getItem(1).as("Date"),$"_tmp".getItem(27).as("EBC"),$"_tmp".getItem(51).as("TypeGeo"),$"_tmp".getItem(52).as("Geo"),$"_tmp".getItem(58).as("ActionGeo"))
events.count()
val mentionsRDD = sc.binaryFiles("s3a://telecom.gdelt/20171[0-9]*.mentions.CSV.zip")
.flatMap{
case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}
}
mentionsRDD.count()
val mentest = mentionsRDD.map(x => x.split("\t")).map(row => row.mkString(";")).map(x => x.split(";")).toDF()
val mentions = mentest.withColumn("_tmp", $"value").select($"_tmp".getItem(0).as("GlobalEventID"),$"_tmp".getItem(2).as("MentionTimeDate"),$"_tmp".getItem(3).as("MentionType"),$"_tmp".getItem(4).as("MentionSourceName"))
mentions.count()
val clean_mentions = mentions.filter($"MentionType" === 1)
clean_mentions.show(5)
val domain = spark.read
.format("csv")
.option("header", "true")
.load("hdfs:///user/hadoop/domainCountry")
val clean_domain = domain.withColumnRenamed("Domain", "MentionSourceName")
clean_domain.show()
val df_mentions = clean_mentions.join(clean_domain,"MentionSourceName")
df_mentions.show(5)
val df_final = df_mentions.join(events,"GlobalEventID")
df_final.show()
df_final.write.parquet("s3a://gdelt.project-fpss-backup/prepared_dataset")
sudo service cassandra stop
- 61621
- 8888
- 7199
- 7001
- 61620
- 9160
- 7000
- 9042
- 22
sudo vi /etc/cassandra/cassandra.yaml
- cluster_name => gdelt
- seeds => private ip
- listen_address => private ip
- rpc_address => private ip
- endpoint_snitch => Ec2Snitch
sudo rm -rf /var/lib/cassandra/data/system/*
sudo rm /etc/cassandra/cassandra-topology.properties
sudo service cassandra start
CREATE KEYSPACE gdelt
WITH replication = {
'class' : 'SimpleStrategy',
'replication_factor' : 3
};
CREATE TABLE gdelt.mentions_by_location_eventcode (
location text,
eventcode int,
day text,
country text,
frequency int,
PRIMARY KEY (( location, eventcode ), day, country)
);
spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.1 --conf spark.cassandra.connection.host=ec2-35-170-17-129.compute-1.amazonaws.com
sc.hadoopConfiguration.set("fs.s3a.access.key", "*****")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "*****")
sc.hadoopConfiguration.setInt("fs.s3a.connection.maximum", 1000000)
val df = spark.read.parquet("s3a://gdelt.project-fpss-backup/prepared_dataset")
val df2 = df.filter($"EBC" === "180" || $"EBC" === "183").withColumn("Geo", split($"Geo", "\\,").getItem(0)).withColumn("MentionTimeDate", $"MentionTimeDate".substr(1,9)).groupBy($"Geo", $"EBC", $"MentionTimeDate", $"FIPSCountryCode").agg(count(lit(1)).alias("Freq"))
val newnames = Seq("location", "eventcode", "day", "country", "frequency")
val dfRenamed = df2.toDF(newnames: _*)
dfRenamed.show()
location | eventcode | day | country | frequency |
---|---|---|---|---|
Boston | 180 | 201711031 | US | 7 |
Mumbai | 180 | 201712050 | IN | 6 |
Columbus | 180 | 201710021 | US | 28 |
Las Vegas | 183 | 201710080 | US | 7 |
Pretoria | 180 | 201710090 | US | 2 |
Toronto | 180 | 201711141 | CA | 3 |
Lakki Marwat | 180 | 201711271 | PK | 3 |
Abilene | 180 | 201711292 | US | 5 |
Allegan County | 180 | 201712041 | US | 1 |
California | 180 | 201712142 | US | 109 |
Montgomery County | 180 | 201710130 | US | 1 |
Florida | 180 | 201711011 | US | 10 |
Middlebury College | 180 | 201711100 | US | 4 |
Kansas | 180 | 201711110 | US | 12 |
London | 180 | 201711280 | US | 8 |
Cuba | 180 | 201710162 | US | 1 |
Cairo | 180 | 201711250 | MY | 6 |
Alabama | 180 | 201712061 | US | 76 |
New Mexico | 180 | 201712121 | RS | 2 |
Dagestan | 180 | 201710031 | FR | 3 |
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._
dfRenamed.write.format("org.apache.spark.sql.cassandra").mode("overwrite").options(Map("table" -> "mentions_by_location_eventcode", "keyspace" -> "gdelt")).save()
- Connect to one of the nodes (ssh)
cqlsh ip of the node
USE gdelt;
SELECT * FROM mentions_by_location_eventcode LIMIT 10;