Comparative Analysis of Temperature Data Using PySpark and MongoDB
YouTube Link: https://youtu.be/8ntezO-WsoM
This project provides scripts to analyze temperature data using two different tools: PySpark and MongoDB. The scripts answer three questions:
- For all valid information (i.e. sensor quality >= 0.95), what is the average temperature per year?
- For every year in the dataset, find the station ID with the highest / lowest temperature.
- For every year in the dataset, find the station ID with the highest maximal temperature for all stations with sensor quality >= 0.95.
pip install pyspark- MongoDB: Installed and running locally or remotely.
- Install required libraries
pip install pymongo-
Setting Up Spark and Data Cleaning The script initializes a Spark session and loads the TSV dataset into a Spark DataFrame. Columns are renamed for better readability.
-
Queries
Query 1: Filters data by Sensor_quality >= 0.95 (using df.filter). Than groups by Year (.groupBy("Year")) and calculates the average temperature using avg().
Query 2: Uses window functions to rank temperatures within each year (Window.partitionBy("Year").orderBy(col("Temperature").desc() / asc())). Identifies the top (highest) or bottom (lowest) ranked stations for each year.
Query 3: Similar to query 2, but filters by valid data (Sensor_quality >= 0.95).
-
Ensure MongoDB is running locally or remotely. Create a database named
temperature_dband a collection namedsensor_data. -
Import Data
with open(file_path, "r") as file:
reader = csv.DictReader(file, delimiter="\t", fieldnames=["Station_id", "Year", "Temperature", "Sensor_quality"])
data = []
for row in reader:
row["Station_id"] = int(row["Station_id"])
row["Year"] = int(row["Year"])
row["Temperature"] = float(row["Temperature"])
row["Sensor_quality"] = float(row["Sensor_quality"])
data.append(row)
result = collection.insert_many(data)- Queries
Query 1: Filters data {"$match": {"Sensor_quality": {"$gte": 0.95}}}, groups by year and calculates the average temperature {"$group": {"_id": "$Year", "avgTemperature": {"$avg": "$Temperature"}}}, sort in a descending order {"$sort": {"_id": -1}}.
Query 2: For each year, calculates the maximum/minimum temperature by applying the $max/$min operators. For the given year and max_temp/ min_temp, it queries the sensor_data collection to find all stations that recorded the maximum/minimum temperature in that year.
Query 3: Similar to query 2, but filters by valid data (Sensor_quality >= 0.95).
- PySpark
python app_pyspark.py - MongoDb
python app_mongodb.py
| Query | PySpark Execution Time | MongoDB Execution Time |
|---|---|---|
| 1 | 0.693s | 0.042s |
| ------- | ------------------------ | ------------------------ |
| 2 | High - 0.959s | High - 0.141s |
| Low - 0.646s | Low - 0.137s | |
| Total - 1.605s | Total - 0.278s | |
| ------- | ------------------------ | ------------------------ |
| 3 | 0.677s | 0.131s |
For the dataset and queries in this analysis, MongoDB was the more performant choice in terms of execution speed. However, PySpark's scalability may become advantageous with significantly larger datasets. SparkSQL can be ideal for processing Structure Data imported in the Spark Cluster where you have millions of data available for big computing. Mongodb can be use where you need NoSQL functionalities(It has full NoSQL Capabilities, compare to SparkSQL).