diff --git a/client/app/(private)/home/routes/(from)/(to)/page.tsx b/client/app/(private)/home/routes/(from)/(to)/page.tsx index c711f72..0ef8b55 100644 --- a/client/app/(private)/home/routes/(from)/(to)/page.tsx +++ b/client/app/(private)/home/routes/(from)/(to)/page.tsx @@ -75,6 +75,8 @@ const RouteContent = () => { const [selectedRouteIndex, setSelectedRouteIndex] = useState(0); const [routeName, setRouteName] = useState(""); const [showSaveModal, setShowSaveModal] = useState(false); + const [searchId, setSearchId] = useState(null); + const fetchGenerationRef = useRef(0); const saveInputRef = useRef(null); // Parse query parameters @@ -135,7 +137,11 @@ const RouteContent = () => { }; // Fetch scores from backend scoring API - const fetchScores = async (routeData: RouteData[], mode: TravelMode) => { + const fetchScores = async ( + routeData: RouteData[], + mode: TravelMode, + generation: number + ) => { setScoresLoading(true); try { const payload = { @@ -170,6 +176,16 @@ const RouteContent = () => { } const data = await response.json(); + + // If a newer fetchRoutes was triggered while we were waiting, + // discard this stale response entirely. + if (generation !== fetchGenerationRef.current) return; + + // Capture the searchId for later use in saving + if (data.searchId) { + setSearchId(data.searchId); + } + const scoredRoutes = data.data?.routes; if (scoredRoutes && Array.isArray(scoredRoutes)) { const scores = scoredRoutes.map( @@ -194,7 +210,7 @@ const RouteContent = () => { let bestDuration = Infinity; overallScores.forEach((score: number, i: number) => { const dur = - routes[i]?.trafficDuration ?? routes[i]?.duration ?? Infinity; + routeData[i]?.trafficDuration ?? routeData[i]?.duration ?? Infinity; if ( score > overallScores[bestIndex] || (score === overallScores[bestIndex] && dur < bestDuration) @@ -244,6 +260,9 @@ const RouteContent = () => { return; } + // Increment generation so any in-flight fetchScores response is ignored + const generation = ++fetchGenerationRef.current; + setSearchId(null); setIsLoading(true); setError(null); @@ -339,7 +358,7 @@ const RouteContent = () => { }; }); setRoutes(fetchedRoutes); - fetchScores(fetchedRoutes, mode); + fetchScores(fetchedRoutes, mode, generation); } else { setError("No routes found. Please try different locations."); setRoutes([]); @@ -393,6 +412,7 @@ const RouteContent = () => { try { const payload = { name: nameToSave, + searchId, // Include the searchId from the computation from: { address: sourceAddress, location: { @@ -411,8 +431,7 @@ const RouteContent = () => { distance: route.distance / 1000, duration: route.duration / 60, routeGeometry: route.geometry, - lastComputedScore: - route.overallScore || Math.floor(Math.random() * 100), + lastComputedScore: route.overallScore ?? null, lastComputedAt: new Date(), travelMode: selectedMode, })), diff --git a/data-processing/dataProcessingServer/api/__pycache__/urls.cpython-313.pyc b/data-processing/dataProcessingServer/api/__pycache__/urls.cpython-313.pyc new file mode 100644 index 0000000..d81179f Binary files /dev/null and b/data-processing/dataProcessingServer/api/__pycache__/urls.cpython-313.pyc differ diff --git a/data-processing/dataProcessingServer/api/__pycache__/views.cpython-313.pyc b/data-processing/dataProcessingServer/api/__pycache__/views.cpython-313.pyc new file mode 100644 index 0000000..f9d26be Binary files /dev/null and b/data-processing/dataProcessingServer/api/__pycache__/views.cpython-313.pyc differ diff --git a/data-processing/server.py b/data-processing/dataProcessingServer/api/management/__init__.py similarity index 100% rename from data-processing/server.py rename to data-processing/dataProcessingServer/api/management/__init__.py diff --git a/data-processing/dataProcessingServer/api/management/commands/__init__.py b/data-processing/dataProcessingServer/api/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/management/commands/run_batch_scoring.py b/data-processing/dataProcessingServer/api/management/commands/run_batch_scoring.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/pathway/__init__.py b/data-processing/dataProcessingServer/api/pathway/__init__.py new file mode 100644 index 0000000..6e60fae --- /dev/null +++ b/data-processing/dataProcessingServer/api/pathway/__init__.py @@ -0,0 +1,12 @@ +""" +Pathway pipeline module for BreathClean. +""" +from .pipeline import run_batch_pipeline, run_simple_batch +from .transformers import compute_route_score, compute_batch_scores + +__all__ = [ + "run_batch_pipeline", + "run_simple_batch", + "compute_route_score", + "compute_batch_scores" +] diff --git a/data-processing/dataProcessingServer/api/pathway/__pycache__/__init__.cpython-313.pyc b/data-processing/dataProcessingServer/api/pathway/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..c4dfc38 Binary files /dev/null and b/data-processing/dataProcessingServer/api/pathway/__pycache__/__init__.cpython-313.pyc differ diff --git a/data-processing/dataProcessingServer/api/pathway/__pycache__/pipeline.cpython-313.pyc b/data-processing/dataProcessingServer/api/pathway/__pycache__/pipeline.cpython-313.pyc new file mode 100644 index 0000000..c08482e Binary files /dev/null and b/data-processing/dataProcessingServer/api/pathway/__pycache__/pipeline.cpython-313.pyc differ diff --git a/data-processing/dataProcessingServer/api/pathway/__pycache__/transformers.cpython-313.pyc b/data-processing/dataProcessingServer/api/pathway/__pycache__/transformers.cpython-313.pyc new file mode 100644 index 0000000..51de0e5 Binary files /dev/null and b/data-processing/dataProcessingServer/api/pathway/__pycache__/transformers.cpython-313.pyc differ diff --git a/data-processing/dataProcessingServer/api/pathway/connectors.py b/data-processing/dataProcessingServer/api/pathway/connectors.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/pathway/pipeline.py b/data-processing/dataProcessingServer/api/pathway/pipeline.py new file mode 100644 index 0000000..71a90b5 --- /dev/null +++ b/data-processing/dataProcessingServer/api/pathway/pipeline.py @@ -0,0 +1,330 @@ +""" +Pathway pipeline for BreathClean batch score computation. + +This pipeline can run in two modes: +1. Streaming mode: Continuously process incoming data +2. Batch mode: Process a batch of routes and return results + +For the hackathon, we use batch mode triggered by HTTP endpoint. + +NOTE: Pathway requires Linux. On Windows, use run_simple_batch() which +uses direct Python computation. For production, run in Docker/WSL. +""" +from typing import List, Dict, Any, Optional +import json +import logging + +logger = logging.getLogger(__name__) + +# Try to import Pathway - gracefully handle if not available (Windows) +PATHWAY_AVAILABLE = False +pw = None + +try: + import pathway as _pw + # Verify it's the real Pathway package + if hasattr(_pw, 'Schema'): + pw = _pw + PATHWAY_AVAILABLE = True +except (ImportError, AttributeError): + pass + +from .transformers import compute_route_score, compute_batch_scores + + +# Define schema only if Pathway is available +RouteInputSchema = None +if PATHWAY_AVAILABLE and pw is not None: + class RouteInputSchema(pw.Schema): + """Schema for incoming route data.""" + route_id: str + route_index: int + distance: float + duration: float + travel_mode: str + weather_points: str # JSON string of weather data + aqi_points: str # JSON string of AQI data + traffic_value: float + last_computed_score: Optional[float] + + +def process_route_row( + route_id: str, + route_index: int, + distance: float, + duration: float, + travel_mode: str, + weather_points: str, + aqi_points: str, + traffic_value: float, + last_computed_score: Optional[float] +) -> str: + """ + Process a single route row and compute its score. + Returns JSON string of computed score. + """ + try: + weather_data = json.loads(weather_points) if weather_points else [] + except json.JSONDecodeError as e: + logger.warning("Failed to parse weather_points for route: %s", e) + weather_data = [] + + try: + aqi_data = json.loads(aqi_points) if aqi_points else [] + except json.JSONDecodeError as e: + logger.warning("Failed to parse aqi_points for route: %s", e) + aqi_data = [] + + route_data = { + "routeId": route_id, + "routeIndex": route_index, + "distance": distance, + "duration": duration, + "travelMode": travel_mode, + "weatherPoints": weather_data, + "aqiPoints": aqi_data, + "trafficValue": traffic_value, + "lastComputedScore": last_computed_score + } + + result = compute_route_score(route_data) + return json.dumps(result) + + +def run_batch_pipeline(routes_data: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Run Pathway pipeline in batch mode for a list of routes. + + This is the main entry point called by the Django view. + Uses Pathway's batch processing capabilities for efficient computation. + + Falls back to simple batch processing if Pathway is not available (Windows). + + Args: + routes_data: List of route dictionaries with weather/aqi/traffic data + + Returns: + Dictionary with computed scores and summary + """ + # If Pathway is not available, fall back to simple batch processing + if not PATHWAY_AVAILABLE or pw is None: + result = run_simple_batch(routes_data) + result["engine"] = "python-fallback" + result["message"] = "Scores computed (Pathway not available, using Python fallback)" + return result + + # For batch processing, we use Pathway's static mode + # This processes all data at once and returns results + + if not routes_data: + return { + "success": False, + "message": "No routes provided", + "routes": [], + "summary": None + } + + # Convert input to Pathway table format + table_data = [] + for route in routes_data: + table_data.append({ + "route_id": str(route.get("routeId", "")), + "route_index": route.get("routeIndex", 0), + "distance": float(route.get("distance", 0)), + "duration": float(route.get("duration", 0)), + "travel_mode": route.get("travelMode", "driving"), + "weather_points": json.dumps(route.get("weatherPoints", [])), + "aqi_points": json.dumps(route.get("aqiPoints", [])), + "traffic_value": float(route.get("trafficValue", 0)), + "last_computed_score": route.get("lastComputedScore") + }) + + # Create Pathway table from data + input_table = pw.debug.table_from_markdown( + _markdown_from_data(table_data), + schema=RouteInputSchema + ) + + # Apply transformation using UDF + @pw.udf + def compute_score_udf( + route_id: str, + route_index: int, + distance: float, + duration: float, + travel_mode: str, + weather_points: str, + aqi_points: str, + traffic_value: float, + last_computed_score: Optional[float] + ) -> str: + return process_route_row( + route_id, route_index, distance, duration, travel_mode, + weather_points, aqi_points, traffic_value, last_computed_score + ) + + # Transform table + result_table = input_table.select( + score_json=compute_score_udf( + input_table.route_id, + input_table.route_index, + input_table.distance, + input_table.duration, + input_table.travel_mode, + input_table.weather_points, + input_table.aqi_points, + input_table.traffic_value, + input_table.last_computed_score + ) + ) + + # Run pipeline and collect results + results = pw.debug.table_to_pandas(result_table) + + # Parse results back to dictionaries + route_scores = [] + for _, row in results.iterrows(): + try: + score_data = json.loads(row["score_json"]) + route_scores.append(score_data) + except (json.JSONDecodeError, KeyError): + continue + + if not route_scores: + return { + "success": False, + "message": "Failed to compute scores", + "routes": [], + "summary": None + } + + # Sort by route index + route_scores.sort(key=lambda x: x.get("routeIndex", 0)) + + # Find best route + best_route = max(route_scores, key=lambda r: r.get("overallScore", 0)) + + # Calculate summary statistics + overall_scores = [r.get("overallScore", 0) for r in route_scores] + avg_score = round(sum(overall_scores) / len(overall_scores), 1) + + from datetime import datetime, timezone + + return { + "success": True, + "message": "Batch scores computed via Pathway pipeline", + "routes": route_scores, + "bestRoute": { + "index": best_route.get("routeIndex"), + "routeId": best_route.get("routeId"), + "score": best_route.get("overallScore") + }, + "summary": { + "totalRoutes": len(route_scores), + "averageScore": avg_score, + "scoreRange": { + "min": min(overall_scores), + "max": max(overall_scores) + } + }, + "computedAt": datetime.now(timezone.utc).isoformat(), + "engine": "pathway" + } + + +def run_simple_batch(routes_data: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Simplified batch processing without full Pathway table operations. + Falls back to direct transformer calls for reliability. + + Use this if Pathway table operations cause issues. + """ + return compute_batch_scores(routes_data) + + +def _markdown_from_data(data: List[Dict]) -> str: + """ + Convert list of dicts to markdown table format for Pathway. + """ + if not data: + return "" + + headers = list(data[0].keys()) + lines = [" | ".join(headers)] + lines.append(" | ".join(["---"] * len(headers))) + + for row in data: + values = [] + for h in headers: + val = row.get(h) + if val is None: + values.append("") + else: + # Escape pipe characters in values + values.append(str(val).replace("|", "\\|")) + lines.append(" | ".join(values)) + + return "\n".join(lines) + + +# Streaming mode (for future use) +def create_streaming_pipeline(input_connector, output_connector): + """ + Create a streaming Pathway pipeline. + + This is for future use when we want real-time streaming updates + instead of batch processing. + + Raises: + RuntimeError: If Pathway is not available (e.g., on Windows). + """ + if not PATHWAY_AVAILABLE or pw is None: + raise RuntimeError( + "Pathway is not available. Streaming pipeline requires Pathway " + "(Linux only). Use run_simple_batch() as a fallback or run in " + "Docker/WSL." + ) + + # Read from input connector + input_table = pw.io.jsonlines.read( + input_connector, + schema=RouteInputSchema, + mode="streaming" + ) + + @pw.udf + def compute_score_streaming( + route_id: str, + route_index: int, + distance: float, + duration: float, + travel_mode: str, + weather_points: str, + aqi_points: str, + traffic_value: float, + last_computed_score: Optional[float] + ) -> str: + return process_route_row( + route_id, route_index, distance, duration, travel_mode, + weather_points, aqi_points, traffic_value, last_computed_score + ) + + # Transform + result_table = input_table.select( + score_json=compute_score_streaming( + input_table.route_id, + input_table.route_index, + input_table.distance, + input_table.duration, + input_table.travel_mode, + input_table.weather_points, + input_table.aqi_points, + input_table.traffic_value, + input_table.last_computed_score + ) + ) + + # Write to output + pw.io.jsonlines.write(result_table, output_connector) + + return result_table diff --git a/data-processing/dataProcessingServer/api/pathway/transformers.py b/data-processing/dataProcessingServer/api/pathway/transformers.py new file mode 100644 index 0000000..53b9169 --- /dev/null +++ b/data-processing/dataProcessingServer/api/pathway/transformers.py @@ -0,0 +1,383 @@ +""" +Scoring transformers for BreathClean Pathway pipeline. +Ported from server/src/controllers/score.controller.ts +""" +import math +from typing import Dict, List, Optional, Any + + +def calculate_temperature_score(temp: float) -> float: + """ + Calculate weather score based on temperature. + Optimal: 21°C + Stricter curve: Deviating by 5°C drops score to ~70 + """ + optimal = 21 + diff = abs(temp - optimal) + + # Perfect range: +/- 1°C + if diff <= 1: + return 100.0 + + # Stricter penalty: loss of 6 points per degree deviation + return max(0.0, 100.0 - diff * 6) + + +def calculate_humidity_score(humidity: float) -> float: + """ + Calculate weather score based on humidity. + Optimal: 50% + Perfect range: 45-55% + """ + if 45 <= humidity <= 55: + return 100.0 + + ideal = 50 + diff = abs(humidity - ideal) + + # Penalty: loss of 2 points per percent deviation outside optimal + return max(0.0, 100.0 - (diff - 5) * 2) + + +def calculate_pressure_score(pressure: float) -> float: + """ + Calculate weather score based on pressure. + Optimal: 1013 hPa + """ + optimal = 1013 + diff = abs(pressure - optimal) + + if diff <= 2: + return 100.0 + + return max(0.0, 100.0 - (diff - 2) * 4) + + +def calculate_weather_score(weather_points: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Calculate overall weather score for a route. + Returns scores and raw averages. + """ + total_temp = 0.0 + total_humidity = 0.0 + total_pressure = 0.0 + valid_points = 0 + + # Per-field counters so averages aren't skewed by missing fields + temp_count = 0 + humidity_count = 0 + pressure_count = 0 + + raw_temp_sum = 0.0 + raw_humidity_sum = 0.0 + raw_pressure_sum = 0.0 + + for point in weather_points: + main = point.get("main", {}) + if main: + temp = main.get("temp") + humidity = main.get("humidity") + pressure = main.get("pressure") + + if temp is not None: + total_temp += calculate_temperature_score(temp) + raw_temp_sum += temp + temp_count += 1 + if humidity is not None: + total_humidity += calculate_humidity_score(humidity) + raw_humidity_sum += humidity + humidity_count += 1 + if pressure is not None: + total_pressure += calculate_pressure_score(pressure) + raw_pressure_sum += pressure + pressure_count += 1 + + valid_points += 1 + + if valid_points == 0: + return { + "temperature": 0.0, + "humidity": 0.0, + "pressure": 0.0, + "overall": 0.0, + "details": None + } + + temp_score = total_temp / temp_count if temp_count > 0 else 0.0 + humidity_score = total_humidity / humidity_count if humidity_count > 0 else 0.0 + pressure_score = total_pressure / pressure_count if pressure_count > 0 else 0.0 + + # Overall weather score (weighted average) + # Temperature is most perceptible to humans, so higher weight + overall = temp_score * 0.5 + humidity_score * 0.3 + pressure_score * 0.2 + + return { + "temperature": round(temp_score, 1), + "humidity": round(humidity_score, 1), + "pressure": round(pressure_score, 1), + "overall": round(overall, 1), + "details": { + "avgTemp": round(raw_temp_sum / temp_count, 1) if temp_count > 0 else None, + "avgHumidity": round(raw_humidity_sum / humidity_count, 1) if humidity_count > 0 else None, + "avgPressure": round(raw_pressure_sum / pressure_count, 1) if pressure_count > 0 else None + } + } + + +def calculate_aqi_score(aqi_points: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Calculate AQI score based on Air Quality Index. + AQI Scale (Stricter): + 0-20: Excellent (100) + 21-50: Good (100 -> 80 gradient) + 51-100: Moderate (80 -> 50 gradient) + 101-150: Unhealthy for Sensitive (50 -> 30) + 151-200: Unhealthy (30 -> 10) + 200+: Very Unhealthy/Hazardous (10 -> 0) + """ + total_aqi = 0.0 + valid_points = 0 + + # Pollutant aggregation + pollutant_totals = {"pm25": 0, "pm10": 0, "o3": 0, "no2": 0, "so2": 0, "co": 0} + pollutant_counts = {"pm25": 0, "pm10": 0, "o3": 0, "no2": 0, "so2": 0, "co": 0} + dominentpol = None + + for point in aqi_points: + aqi_data = point.get("aqi", {}) + if aqi_data: + aqi_val = aqi_data.get("aqi") + if aqi_val is not None: + try: + val = float(aqi_val) + total_aqi += val + valid_points += 1 + except (ValueError, TypeError): + pass + + # Get dominant pollutant + if aqi_data.get("dominentpol"): + dominentpol = aqi_data["dominentpol"] + + # Aggregate pollutants + iaqi = aqi_data.get("iaqi", {}) + for pol in pollutant_totals.keys(): + pol_data = iaqi.get(pol, {}) + if isinstance(pol_data, dict) and "v" in pol_data: + pollutant_totals[pol] += pol_data["v"] + pollutant_counts[pol] += 1 + + # CRITICAL: If no valid data, default to 0 score (not 100) + if valid_points == 0: + return { + "aqi": 0, + "score": 0.0, + "category": "Unknown - No Data", + "details": None + } + + avg_aqi = total_aqi / valid_points + + # Calculate score based on AQI value (inverted - lower AQI is better) + if avg_aqi <= 20: + score = 100.0 + category = "Excellent" + elif avg_aqi <= 50: + score = 100.0 - ((avg_aqi - 20) / 30) * 20 + category = "Good" + elif avg_aqi <= 100: + score = 80.0 - ((avg_aqi - 50) / 50) * 30 + category = "Moderate" + elif avg_aqi <= 150: + score = 50.0 - ((avg_aqi - 100) / 50) * 20 + category = "Unhealthy for Sensitive Groups" + elif avg_aqi <= 200: + score = 30.0 - ((avg_aqi - 150) / 50) * 20 + category = "Unhealthy" + else: + score = max(0.0, 10.0 - ((avg_aqi - 200) / 100) * 10) + category = "Very Unhealthy" if avg_aqi <= 300 else "Hazardous" + + # Build pollutant details + pollutants = {} + for pol, total in pollutant_totals.items(): + count = pollutant_counts[pol] + if count > 0: + pollutants[pol] = round(total / count, 1) + + return { + "aqi": round(avg_aqi, 1), + "score": round(score, 1), + "category": category, + "details": { + "dominentpol": dominentpol, + "pollutants": pollutants if pollutants else None + } + } + + +def calculate_traffic_score(traffic_value: float) -> float: + """ + Calculate traffic score (normalize to 0-100 scale). + Lower traffic value = better score. + Non-linear penalty for congestion. + + trafficValue: 0 (clear) to 3 (severe) + 0.5 (light) -> ~71.5 + 1.0 (moderate) -> ~53.5 + 2.0 (heavy) -> ~24.7 + 3.0 (severe) -> 0 + """ + if traffic_value <= 0: + return 100.0 + + # Power curve to maintain high score only for VERY clear roads + normalized = min(traffic_value / 3, 1) # 0 to 1 + penalty = math.pow(normalized, 0.7) # Convex curve + + score = round((1 - penalty) * 100, 1) + return score + + +def calculate_overall_score( + weather_score: float, + aqi_score: float, + traffic_score: float +) -> float: + """ + Calculate overall route score (weighted combination). + Weather: 40%, AQI: 30%, Traffic: 30% + """ + overall = weather_score * 0.4 + aqi_score * 0.3 + traffic_score * 0.3 + return round(overall, 1) + + +def compute_route_score(route_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Main function to compute complete score for a single route. + + Input format: + { + "routeIndex": 0, + "distance": 12345, + "duration": 1800, + "travelMode": "driving", + "weatherPoints": [...], # Weather API responses for breakpoints + "aqiPoints": [...], # AQI API responses for breakpoints + "trafficValue": 0.5, # Traffic factor (0-3) + "lastComputedScore": 75.0 # Optional: previous score for delta + } + + Output format (ready for MongoDB): + { + "routeIndex": 0, + "distance": 12345, + "duration": 1800, + "travelMode": "driving", + "breakpointCount": 3, + "weatherScore": {...}, + "aqiScore": {...}, + "trafficScore": 85.0, + "overallScore": 78.5, + "scoreChange": 3.5, + "computedAt": "2026-02-16T12:00:00Z" + } + """ + from datetime import datetime, timezone + + weather_points = route_data.get("weatherPoints", []) + aqi_points = route_data.get("aqiPoints", []) + traffic_value = route_data.get("trafficValue", 0) + last_score = route_data.get("lastComputedScore") + + # Calculate individual scores + weather_result = calculate_weather_score(weather_points) + aqi_result = calculate_aqi_score(aqi_points) + traffic_score = calculate_traffic_score(traffic_value) + + # Calculate overall score + overall_score = calculate_overall_score( + weather_result["overall"], + aqi_result["score"], + traffic_score + ) + + # Calculate score change + score_change = None + if last_score is not None: + score_change = round(overall_score - last_score, 1) + + return { + "routeIndex": route_data.get("routeIndex", 0), + "routeId": route_data.get("routeId"), # MongoDB ObjectId if provided + "distance": route_data.get("distance"), + "duration": route_data.get("duration"), + "travelMode": route_data.get("travelMode"), + "breakpointCount": max(len(weather_points), len(aqi_points)), + "weatherScore": { + "temperature": weather_result["temperature"], + "humidity": weather_result["humidity"], + "pressure": weather_result["pressure"], + "overall": weather_result["overall"] + }, + "weatherDetails": weather_result["details"], + "aqiScore": { + "aqi": aqi_result["aqi"], + "score": aqi_result["score"], + "category": aqi_result["category"] + }, + "aqiDetails": aqi_result["details"], + "trafficScore": traffic_score, + "overallScore": overall_score, + "lastComputedScore": last_score, + "scoreChange": score_change, + "computedAt": datetime.now(timezone.utc).isoformat() + } + + +def compute_batch_scores(routes_data: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Compute scores for multiple routes in batch. + Returns scores + summary statistics. + """ + from datetime import datetime, timezone + + if not routes_data: + return { + "success": False, + "message": "No routes provided", + "routes": [], + "summary": None + } + + route_scores = [] + for route in routes_data: + score = compute_route_score(route) + route_scores.append(score) + + # Find best route + best_route = max(route_scores, key=lambda r: r["overallScore"]) + + # Calculate summary statistics + overall_scores = [r["overallScore"] for r in route_scores] + avg_score = round(sum(overall_scores) / len(overall_scores), 1) + + return { + "success": True, + "message": "Batch scores computed successfully", + "routes": route_scores, + "bestRoute": { + "index": best_route["routeIndex"], + "routeId": best_route.get("routeId"), + "score": best_route["overallScore"] + }, + "summary": { + "totalRoutes": len(route_scores), + "averageScore": avg_score, + "scoreRange": { + "min": min(overall_scores), + "max": max(overall_scores) + } + }, + "computedAt": datetime.now(timezone.utc).isoformat() + } diff --git a/data-processing/dataProcessingServer/api/serializers.py b/data-processing/dataProcessingServer/api/serializers.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/services/__init__.py b/data-processing/dataProcessingServer/api/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/services/breakpoint_fetcher.py b/data-processing/dataProcessingServer/api/services/breakpoint_fetcher.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/services/pathway_client.py b/data-processing/dataProcessingServer/api/services/pathway_client.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/services/score_persister.py b/data-processing/dataProcessingServer/api/services/score_persister.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/tasks/__init__.py b/data-processing/dataProcessingServer/api/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/tasks/compute_scores.py b/data-processing/dataProcessingServer/api/tasks/compute_scores.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/tests/__init__.py b/data-processing/dataProcessingServer/api/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/tests/test_pathway.py b/data-processing/dataProcessingServer/api/tests/test_pathway.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/tests/test_services.py b/data-processing/dataProcessingServer/api/tests/test_services.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/api/urls.py b/data-processing/dataProcessingServer/api/urls.py new file mode 100644 index 0000000..5b42d2d --- /dev/null +++ b/data-processing/dataProcessingServer/api/urls.py @@ -0,0 +1,14 @@ +""" +URL configuration for the api app. +""" +from django.urls import path +from . import views + +urlpatterns = [ + # Health check + path('health/', views.health_check, name='health_check'), + + # Score computation endpoints + path('compute-scores/', views.compute_scores, name='compute_scores'), + path('compute-score/', views.compute_single_score, name='compute_single_score'), +] diff --git a/data-processing/dataProcessingServer/api/views.py b/data-processing/dataProcessingServer/api/views.py new file mode 100644 index 0000000..2fa0149 --- /dev/null +++ b/data-processing/dataProcessingServer/api/views.py @@ -0,0 +1,204 @@ +""" +API Views for BreathClean Data Processing Server. +Provides HTTP endpoints for Pathway-based score computation. +""" +import json +import traceback +from django.http import JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods + +from .pathway.pipeline import run_batch_pipeline, run_simple_batch + + +@csrf_exempt +@require_http_methods(["POST"]) +def compute_scores(request): + """ + Compute route scores using Pathway pipeline. + + Endpoint: POST /api/compute-scores/ + + Request Body: + { + "routes": [ + { + "routeId": "mongo_object_id", // Optional + "routeIndex": 0, + "distance": 12345, + "duration": 1800, + "travelMode": "driving", + "weatherPoints": [ + {"main": {"temp": 22, "humidity": 55, "pressure": 1013}}, + ... + ], + "aqiPoints": [ + {"aqi": {"aqi": 45, "dominentpol": "pm25", "iaqi": {...}}}, + ... + ], + "trafficValue": 0.5, + "lastComputedScore": 75.0 // Optional + }, + ... + ], + "usePathway": true // Optional: use full Pathway pipeline (default: false for reliability) + } + + Response: + { + "success": true, + "message": "Batch scores computed successfully", + "routes": [...], + "bestRoute": {"index": 0, "routeId": "...", "score": 78.5}, + "summary": { + "totalRoutes": 3, + "averageScore": 72.3, + "scoreRange": {"min": 65.0, "max": 78.5} + }, + "computedAt": "2026-02-16T12:00:00Z", + "engine": "pathway" + } + """ + try: + # Parse request body + try: + body = json.loads(request.body) + except json.JSONDecodeError: + return JsonResponse({ + "success": False, + "message": "Invalid JSON in request body" + }, status=400) + + routes = body.get("routes", []) + use_pathway = body.get("usePathway", False) + + # Validation + if not isinstance(routes, list): + return JsonResponse({ + "success": False, + "message": "'routes' must be an array." + }, status=400) + + if not routes: + return JsonResponse({ + "success": False, + "message": "No routes provided. 'routes' array is required." + }, status=400) + + if len(routes) > 10: + return JsonResponse({ + "success": False, + "message": "Maximum 10 routes allowed per batch." + }, status=400) + + # Validate each route has required fields + for i, route in enumerate(routes): + if not isinstance(route, dict): + return JsonResponse({ + "success": False, + "message": f"Route at index {i} must be an object." + }, status=400) + + # Process routes + if use_pathway: + # Use full Pathway pipeline with table operations + result = run_batch_pipeline(routes) + else: + # Use simple batch processing (more reliable for hackathon) + result = run_simple_batch(routes) + + if result.get("success"): + return JsonResponse(result, status=200) + else: + return JsonResponse(result, status=500) + + except Exception: + traceback.print_exc() + return JsonResponse({ + "success": False, + "message": "Internal server error" + }, status=500) + + +@csrf_exempt +@require_http_methods(["POST"]) +def compute_single_score(request): + """ + Compute score for a single route. + + Endpoint: POST /api/compute-score/ + + Request Body: + { + "routeId": "mongo_object_id", + "routeIndex": 0, + "distance": 12345, + "duration": 1800, + "travelMode": "driving", + "weatherPoints": [...], + "aqiPoints": [...], + "trafficValue": 0.5, + "lastComputedScore": 75.0 + } + + Response: + { + "success": true, + "route": {...computed score data...} + } + """ + try: + try: + body = json.loads(request.body) + except json.JSONDecodeError: + return JsonResponse({ + "success": False, + "message": "Invalid JSON in request body" + }, status=400) + + if body is None: + return JsonResponse({ + "success": False, + "message": "Request body is required." + }, status=400) + + # Validate required fields for compute_route_score + required_fields = ["distance", "duration", "travelMode"] + missing_fields = [field for field in required_fields if field not in body] + + if missing_fields: + return JsonResponse({ + "success": False, + "message": f"Missing required fields: {', '.join(missing_fields)}" + }, status=400) + + # Import here to avoid circular imports + from .pathway.transformers import compute_route_score + + result = compute_route_score(body) + + return JsonResponse({ + "success": True, + "route": result + }, status=200) + + except Exception: + traceback.print_exc() + return JsonResponse({ + "success": False, + "message": "Internal server error" + }, status=500) + + +@require_http_methods(["GET"]) +def health_check(request): + """ + Health check endpoint. + + Endpoint: GET /api/health/ + """ + return JsonResponse({ + "status": "healthy", + "service": "BreathClean Data Processing Server", + "engine": "Pathway" + }) diff --git a/data-processing/dataProcessingServer/dataProcessingServer/__init__.py b/data-processing/dataProcessingServer/dataProcessingServer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/__init__.cpython-313.pyc b/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..c0cd256 Binary files /dev/null and b/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/__init__.cpython-313.pyc differ diff --git a/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/settings.cpython-313.pyc b/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/settings.cpython-313.pyc new file mode 100644 index 0000000..27aa1d2 Binary files /dev/null and b/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/settings.cpython-313.pyc differ diff --git a/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/urls.cpython-313.pyc b/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/urls.cpython-313.pyc new file mode 100644 index 0000000..aca67e4 Binary files /dev/null and b/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/urls.cpython-313.pyc differ diff --git a/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/wsgi.cpython-313.pyc b/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/wsgi.cpython-313.pyc new file mode 100644 index 0000000..20d084b Binary files /dev/null and b/data-processing/dataProcessingServer/dataProcessingServer/__pycache__/wsgi.cpython-313.pyc differ diff --git a/data-processing/dataProcessingServer/dataProcessingServer/asgi.py b/data-processing/dataProcessingServer/dataProcessingServer/asgi.py new file mode 100644 index 0000000..d4a230c --- /dev/null +++ b/data-processing/dataProcessingServer/dataProcessingServer/asgi.py @@ -0,0 +1,16 @@ +""" +ASGI config for dataProcessingServer project. + +It exposes the ASGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/6.0/howto/deployment/asgi/ +""" + +import os + +from django.core.asgi import get_asgi_application + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dataProcessingServer.settings') + +application = get_asgi_application() diff --git a/data-processing/dataProcessingServer/dataProcessingServer/settings.py b/data-processing/dataProcessingServer/dataProcessingServer/settings.py new file mode 100644 index 0000000..0bea642 --- /dev/null +++ b/data-processing/dataProcessingServer/dataProcessingServer/settings.py @@ -0,0 +1,145 @@ +""" +Django settings for dataProcessingServer project. + +Generated by 'django-admin startproject' using Django 6.0.2. + +For more information on this file, see +https://docs.djangoproject.com/en/6.0/topics/settings/ + +For the full list of settings and their values, see +https://docs.djangoproject.com/en/6.0/ref/settings/ +""" + +import os +from pathlib import Path + +# Build paths inside the project like this: BASE_DIR / 'subdir'. +BASE_DIR = Path(__file__).resolve().parent.parent + + +# --------------------------------------------------------------------------- +# Security settings — driven by environment variables. +# +# Required in production: +# DJANGO_SECRET_KEY — a long, random secret key +# +# Optional (have sensible dev defaults): +# DJANGO_DEBUG — set to "False" / "0" to disable debug mode +# DJANGO_ALLOWED_HOSTS — comma-separated list of allowed hostnames +# --------------------------------------------------------------------------- + +# Detect whether we are running in development mode: if no explicit secret +# key is provided via the environment, we fall back to an insecure default +# and treat the environment as development. +_SECRET_KEY_ENV = os.getenv('DJANGO_SECRET_KEY') +_IS_DEV = _SECRET_KEY_ENV is None + +if _IS_DEV: + # Development-only fallback — NEVER use this in production. + SECRET_KEY = 'django-insecure-dev-only-key-do-not-use-in-production' +else: + SECRET_KEY = _SECRET_KEY_ENV + +# SECURITY WARNING: don't run with debug turned on in production! +_debug_env = os.getenv('DJANGO_DEBUG') +if _debug_env is not None: + DEBUG = _debug_env.lower() in ('true', '1', 'yes') +else: + # Default to True only in development. + DEBUG = _IS_DEV + +# ALLOWED_HOSTS — populated from a comma-separated env var, or empty in dev. +_allowed_hosts_env = os.getenv('DJANGO_ALLOWED_HOSTS', '') +ALLOWED_HOSTS = [ + h.strip() for h in _allowed_hosts_env.split(',') if h.strip() +] if _allowed_hosts_env else [] + + +# Application definition + +INSTALLED_APPS = [ + 'django.contrib.admin', + 'django.contrib.auth', + 'django.contrib.contenttypes', + 'django.contrib.sessions', + 'django.contrib.messages', + 'django.contrib.staticfiles', + 'api', +] + +MIDDLEWARE = [ + 'django.middleware.security.SecurityMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.middleware.common.CommonMiddleware', + 'django.middleware.csrf.CsrfViewMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + 'django.contrib.messages.middleware.MessageMiddleware', + 'django.middleware.clickjacking.XFrameOptionsMiddleware', +] + +ROOT_URLCONF = 'dataProcessingServer.urls' + +TEMPLATES = [ + { + 'BACKEND': 'django.template.backends.django.DjangoTemplates', + 'DIRS': [], + 'APP_DIRS': True, + 'OPTIONS': { + 'context_processors': [ + 'django.template.context_processors.request', + 'django.contrib.auth.context_processors.auth', + 'django.contrib.messages.context_processors.messages', + ], + }, + }, +] + +WSGI_APPLICATION = 'dataProcessingServer.wsgi.application' + + +# Database +# https://docs.djangoproject.com/en/6.0/ref/settings/#databases + +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': BASE_DIR / 'db.sqlite3', + } +} + + +# Password validation +# https://docs.djangoproject.com/en/6.0/ref/settings/#auth-password-validators + +AUTH_PASSWORD_VALIDATORS = [ + { + 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', + }, +] + + +# Internationalization +# https://docs.djangoproject.com/en/6.0/topics/i18n/ + +LANGUAGE_CODE = 'en-us' + +TIME_ZONE = 'UTC' + +USE_I18N = True + +USE_TZ = True + + +# Static files (CSS, JavaScript, Images) +# https://docs.djangoproject.com/en/6.0/howto/static-files/ + +STATIC_URL = 'static/' diff --git a/data-processing/dataProcessingServer/dataProcessingServer/urls.py b/data-processing/dataProcessingServer/dataProcessingServer/urls.py new file mode 100644 index 0000000..2125076 --- /dev/null +++ b/data-processing/dataProcessingServer/dataProcessingServer/urls.py @@ -0,0 +1,9 @@ +from django.contrib import admin +from django.urls import path, include +from django.http import HttpResponse + +urlpatterns = [ + path('admin/', admin.site.urls), + path('healthCheck', lambda request: HttpResponse('Hello World!')), + path('api/', include('api.urls')), +] diff --git a/data-processing/dataProcessingServer/dataProcessingServer/wsgi.py b/data-processing/dataProcessingServer/dataProcessingServer/wsgi.py new file mode 100644 index 0000000..f6df7e2 --- /dev/null +++ b/data-processing/dataProcessingServer/dataProcessingServer/wsgi.py @@ -0,0 +1,16 @@ +""" +WSGI config for dataProcessingServer project. + +It exposes the WSGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/6.0/howto/deployment/wsgi/ +""" + +import os + +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dataProcessingServer.settings') + +application = get_wsgi_application() diff --git a/data-processing/dataProcessingServer/db.sqlite3 b/data-processing/dataProcessingServer/db.sqlite3 new file mode 100644 index 0000000..e69de29 diff --git a/data-processing/dataProcessingServer/manage.py b/data-processing/dataProcessingServer/manage.py new file mode 100644 index 0000000..c39e036 --- /dev/null +++ b/data-processing/dataProcessingServer/manage.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +"""Django's command-line utility for administrative tasks.""" +import os +import sys + + +def main(): + """Run administrative tasks.""" + os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dataProcessingServer.settings') + try: + from django.core.management import execute_from_command_line + except ImportError as exc: + raise ImportError( + "Couldn't import Django. Are you sure it's installed and " + "available on your PYTHONPATH environment variable? Did you " + "forget to activate a virtual environment?" + ) from exc + execute_from_command_line(sys.argv) + + +if __name__ == '__main__': + main() diff --git a/data-processing/dataProcessingServer/requirements.txt b/data-processing/dataProcessingServer/requirements.txt new file mode 100644 index 0000000..a51bd9c --- /dev/null +++ b/data-processing/dataProcessingServer/requirements.txt @@ -0,0 +1,30 @@ +# BreathClean Data Processing Server Requirements + +# Django (must match version referenced in settings.py — Django 6.0.2) +Django>=6.0,<7.0 + +# Pathway - Real-time data processing framework +# NOTE: Pathway is licensed under BSL 1.1 with service-use restrictions. +# Legal review required: Ensure use case doesn't violate Additional Use Grant. +pathway>=0.8.0,<1.0 + +# Pin urllib3 to a patched release to avoid known vulnerabilities +urllib3>=2.6.0,<3.0 + +# For JSON handling (included in Python stdlib, but explicit) +# json - stdlib + +# Optional: For async support +# uvicorn>=0.24.0 +# gunicorn>=21.0.0 + +# Optional: For database connections (if connecting directly to MongoDB) +# pymongo>=4.6.0 +# motor>=3.3.0 # Async MongoDB driver + +# Optional: For Redis caching +# redis>=5.0.0 + +# Development +# pytest>=7.4.0 +# pytest-django>=4.5.0 diff --git a/server/README.md b/server/README.md new file mode 100644 index 0000000..f741bb1 --- /dev/null +++ b/server/README.md @@ -0,0 +1,106 @@ +# BreathClean Backend Architecture & Data Flow + +This document outlines the backend architecture, scoring logic, data storage strategy, and caching mechanisms used in the BreathClean server. It is designed to serve as a reference for future integrations, specifically for the AI-driven periodic update system (Cron Jobs). + +## 1. System Overview + +The backend is responsible for: + +1. **Computing Route Scores**: Fetching real-time environmental data (AQI, Weather, Traffic) and calculating a "health score" for requested routes. +2. **Caching**: Temporarily storing computation results to optimize performance and reduce API costs. +3. **Persistence**: Saving selected routes and their specific "monitoring points" (Breakpoints) to MongoDB for long-term tracking. + +## 2. Route Scoring Engine (`/api/v1/score/compute`) + +When a user requests route data, the following pipeline is executed: + +### A. Breakpoint Computation + +- **Input**: Route geometry (line string of coordinates) from Mapbox. +- **Logic**: The route is analyzed to extract a set of representative points ("Breakpoints") based on distance. + - < 100km: ~3 points + - 100-500km: ~3-4 points + - Points are distributed evenly to represent different segments of the journey. +- **Purpose**: These points serve as the "sensors" for fetching environmental data. + +### B. Data Fetching & Scoring + +For each breakpoint, we fetch: + +1. **Weather**: Temperature, Humidity, Pressure. + - _Score_: Weighted average based on deviation from optimal conditions (e.g., 21°C, 50% humidity). +2. **AQI (Air Quality)**: PM2.5, PM10, O3, NO2, etc. + - _Score_: Inverse scale (Lower AQI = Higher Score). 0-20 is 100%, >200 is near 0%. +3. **Traffic**: derived from duration difference. + - _Score_: Non-linear penalty for traffic congestion. + +**Final Score Formula**: +`Overall = (Weather * 0.4) + (AQI * 0.3) + (Traffic * 0.3)` + +### C. Caching Strategy (Redis) + +To optimize the "Save Route" flow, we cache the **Breakpoints** only. + +- **Key**: `route_search:{UUID}` (e.g., `route_search:550e8400-e29b...`) +- **Value**: `{ breakpoints: [{lat, lon}, ...], timestamp: "..." }` +- **TTL**: 1 Hour. +- **Why**: We only store coordinates because environmental data (AQI/Weather) is volatile and should be re-fetched, but the _locations_ of the monitoring points for a specific route geometry remain constant. + +--- + +## 3. Data Persistence (`/api/v1/saved-routes`) + +When a user saves a route, we persist two types of documents in MongoDB. + +### A. The Parent `Route` Document + +Stores the high-level route metadata. + +- **Collection**: `routes` +- **Key Fields**: + - `userId`: Owner of the route. + - `from` / `to`: Addresses and start/end coordinates. + - `routes`: Array of route options (distance, duration, geometry). + - _Note_: The `routes` array in this document stores the _geometry_, which allows us to re-render the path on the map. + +### B. The `BreakPoint` Documents + +Stores the specific locations we monitor for this route. + +- **Collection**: `breakpoints` +- **Key Fields**: + - `routeId`: Reference to the parent `Route` document. + - `routeOptionIndex`: Which path this point belongs to (e.g., Route A vs Route B). + - `pointIndex`: Sequence number (0, 1, 2...). + - `location`: GeoJSON object `{ type: "Point", coordinates: [lon, lat] }`. + +**Creation Logic**: + +1. The backend receives a `searchId` from the frontend. +2. It attempts to fetch the cached breakpoints from **Redis**. +3. **Cache Hit**: It uses the cached coordinates to create `BreakPoint` documents. +4. **Cache Miss**: It falls back to the `computeBreakpoints` utility to re-calculate the points from the route geometry, ensuring no data loss. + +--- + +## 4. Future Integration: AI & Cron Jobs + +This architecture is designed for the upcoming periodic update system. + +**The Workflow:** + +1. **Cron Job Trigger**: A scheduled task runs (e.g., hourly). +2. **Fetch Targets**: The system queries the `routes` collection for active/favorite routes. +3. **Retrieve Monitoring Points**: + - For each route, the system queries the `breakpoints` collection: `BreakPoint.find({ routeId: route._id })`. + - This returns an exact list of `{ lat, lon }` coordinates that define that route's health. +4. **Batch Processing (Pathway AI)**: + - These coordinates are sent to the AI/External Library. + - The library fetches fresh AQI/Weather data for these specific points. + - It computes new scores. +5. **Update**: + - The system updates the `Route` document with the new `lastComputedScore` and `lastComputedAt`. + - Optionally, historical data can be stored in a separate time-series collection. + +**Why this structure?** +By decoupling the _locations_ (Breakpoints) from the _environmental data_, we ensure that our Cron Job is efficient. It doesn't need to re-analyze geometry or guess where to check for pollution. It simply looks up the invariant "monitoring points" stored in the `BreakPoint` collection and polls them. diff --git a/server/package-lock.json b/server/package-lock.json index c1c4299..f26c6be 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -9,6 +9,8 @@ "version": "1.0.0", "license": "ISC", "dependencies": { + "@types/uuid": "^10.0.0", + "@upstash/redis": "^1.36.2", "cookie-parser": "^1.4.7", "cors": "^2.8.6", "dotenv": "^17.3.1", @@ -16,7 +18,9 @@ "express-rate-limit": "^8.2.1", "json-web-token": "^3.2.0", "mongoose": "^9.2.1", - "simply-auth": "^0.1.0" + "node-cron": "^3.0.3", + "simply-auth": "^0.1.0", + "uuid": "^13.0.0" }, "devDependencies": { "@eslint/js": "^9.28.0", @@ -25,6 +29,7 @@ "@types/express": "^5.0.6", "@types/jsonwebtoken": "^9.0.10", "@types/node": "^20", + "@types/node-cron": "^3.0.11", "eslint": "^9", "nodemon": "^3.1.11", "tsx": "^4.19.0", @@ -794,6 +799,13 @@ "undici-types": "~6.21.0" } }, + "node_modules/@types/node-cron": { + "version": "3.0.11", + "resolved": "https://registry.npmjs.org/@types/node-cron/-/node-cron-3.0.11.tgz", + "integrity": "sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/qs": { "version": "6.14.0", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.14.0.tgz", @@ -829,6 +841,12 @@ "@types/node": "*" } }, + "node_modules/@types/uuid": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz", + "integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==", + "license": "MIT" + }, "node_modules/@types/webidl-conversions": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/@types/webidl-conversions/-/webidl-conversions-7.0.3.tgz", @@ -1100,6 +1118,15 @@ "url": "https://opencollective.com/typescript-eslint" } }, + "node_modules/@upstash/redis": { + "version": "1.36.2", + "resolved": "https://registry.npmjs.org/@upstash/redis/-/redis-1.36.2.tgz", + "integrity": "sha512-C0Yt8hc12vLaQYRG1fMci8iPrLtnTdbJG0HR5T8vKnvEP/1RdMMblsOJs5/jp0JXZJ1oSzMnQz4J9EVezNpI6A==", + "license": "MIT", + "dependencies": { + "uncrypto": "^0.1.3" + } + }, "node_modules/accepts": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/accepts/-/accepts-2.0.0.tgz", @@ -2889,6 +2916,27 @@ "node": ">= 0.6" } }, + "node_modules/node-cron": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.3.tgz", + "integrity": "sha512-dOal67//nohNgYWb+nWmg5dkFdIwDm8EpeGYMekPMrngV3637lqnX0lbUcCtgibHTz6SEz7DAIjKvKDFYCnO1A==", + "license": "ISC", + "dependencies": { + "uuid": "8.3.2" + }, + "engines": { + "node": ">=6.0.0" + } + }, + "node_modules/node-cron/node_modules/uuid": { + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", + "license": "MIT", + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/nodemon": { "version": "3.1.11", "resolved": "https://registry.npmjs.org/nodemon/-/nodemon-3.1.11.tgz", @@ -3670,6 +3718,12 @@ "typescript": ">=4.8.4 <6.0.0" } }, + "node_modules/uncrypto": { + "version": "0.1.3", + "resolved": "https://registry.npmjs.org/uncrypto/-/uncrypto-0.1.3.tgz", + "integrity": "sha512-Ql87qFHB3s/De2ClA9e0gsnS6zXG27SkTiSJwjCc9MebbfapQfuPzumMIUMi38ezPZVNFcHI9sUIepeQfw8J8Q==", + "license": "MIT" + }, "node_modules/undefsafe": { "version": "2.0.5", "resolved": "https://registry.npmjs.org/undefsafe/-/undefsafe-2.0.5.tgz", @@ -3703,6 +3757,19 @@ "punycode": "^2.1.0" } }, + "node_modules/uuid": { + "version": "13.0.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-13.0.0.tgz", + "integrity": "sha512-XQegIaBTVUjSHliKqcnFqYypAd4S+WCYt5NIeRs6w/UAry7z8Y9j5ZwRRL4kzq9U3sD6v+85er9FvkEaBpji2w==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist-node/bin/uuid" + } + }, "node_modules/vary": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", diff --git a/server/package.json b/server/package.json index c5f8b03..4758346 100644 --- a/server/package.json +++ b/server/package.json @@ -21,13 +21,16 @@ "@types/express": "^5.0.6", "@types/jsonwebtoken": "^9.0.10", "@types/node": "^20", + "@types/node-cron": "^3.0.11", "eslint": "^9", "nodemon": "^3.1.11", "tsx": "^4.19.0", "typescript": "^5", - "typescript-eslint": "^8.33.0" + "typescript-eslint": "^8.33.0", + "@types/uuid": "^10.0.0" }, "dependencies": { + "@upstash/redis": "^1.36.2", "cookie-parser": "^1.4.7", "cors": "^2.8.6", "dotenv": "^17.3.1", @@ -35,6 +38,8 @@ "express-rate-limit": "^8.2.1", "json-web-token": "^3.2.0", "mongoose": "^9.2.1", - "simply-auth": "^0.1.0" + "node-cron": "^3.0.3", + "simply-auth": "^0.1.0", + "uuid": "^13.0.0" } } diff --git a/server/src/Schema/breakPoints.ts b/server/src/Schema/breakPoints.ts new file mode 100644 index 0000000..cfd89a0 --- /dev/null +++ b/server/src/Schema/breakPoints.ts @@ -0,0 +1,64 @@ +import mongoose, { Document, Schema } from "mongoose"; + +// --- Main BreakPoint Interface --- + +export interface IBreakPoint extends Document { + routeId: mongoose.Types.ObjectId; // Reference to the parent Route document + routeOptionIndex: number; // Index of the specific route option (0, 1, 2...) + pointIndex: number; // Index of this point in the sequence (0, 1, 2...) + + location: { + type: "Point"; + coordinates: [number, number]; // [longitude, latitude] + }; +} + +// --- Schema --- + +const pointSchema = new Schema( + { + type: { + type: String, + enum: ["Point"], + required: true, + }, + coordinates: { + type: [Number], + required: true, + }, + }, + { _id: false } +); + +const breakPointSchema = new Schema( + { + routeId: { + type: Schema.Types.ObjectId, + ref: "Route", + required: true, + }, + routeOptionIndex: { + type: Number, + required: true, + }, + pointIndex: { + type: Number, + required: true, + }, + location: { + type: pointSchema, + required: true, + }, + }, + { timestamps: true } +); + +// Index for geospatial queries (finding points near a location) +breakPointSchema.index({ location: "2dsphere" }); + +// Index for retrieving all points for a specific route option +breakPointSchema.index({ routeId: 1, routeOptionIndex: 1 }); + +const BreakPoint = mongoose.model("BreakPoint", breakPointSchema); + +export default BreakPoint; diff --git a/server/src/controllers/savedRoutes.controllers.ts b/server/src/controllers/savedRoutes.controllers.ts index 09c9ba6..66e9be8 100644 --- a/server/src/controllers/savedRoutes.controllers.ts +++ b/server/src/controllers/savedRoutes.controllers.ts @@ -1,6 +1,32 @@ import { Request, Response } from "express"; +import mongoose from "mongoose"; +import BreakPoint from "../Schema/breakPoints.js"; import Route from "../Schema/route.schema.js"; +import redis from "../utils/redis.js"; + +type BreakpointDoc = { + routeId: mongoose.Types.ObjectId; + routeOptionIndex: number; + pointIndex: number; + location: { + type: "Point"; + coordinates: [number, number]; + }; +}; + +type RouteBreakpoints = Record< + string, + { lat: number; lon: number } | undefined +> & { + point_1?: { lat: number; lon: number }; + point_2?: { lat: number; lon: number }; + point_3?: { lat: number; lon: number }; + point_4?: { lat: number; lon: number }; + point_5?: { lat: number; lon: number }; + point_6?: { lat: number; lon: number }; + point_7?: { lat: number; lon: number }; +}; /** * GET /saved-routes @@ -27,7 +53,7 @@ export const fetchSavedRoutes = async ( export const saveRoute = async (req: Request, res: Response): Promise => { try { const userId = req.userId; - const { name, from, to, routes, isFavorite } = req.body; + const { name, from, to, routes, isFavorite, searchId } = req.body; if (!from || !to || !routes || routes.length === 0) { res @@ -36,6 +62,7 @@ export const saveRoute = async (req: Request, res: Response): Promise => { return; } + // 1. Create the Parent Route Document const newRoute = await Route.create({ userId, name, @@ -45,6 +72,86 @@ export const saveRoute = async (req: Request, res: Response): Promise => { isFavorite: isFavorite ?? false, }); + // 2. Prepare BreakPoints + let routeBreakpointsData: RouteBreakpoints[] = []; + + // Try to get from Cache first + if (searchId) { + // Validate searchId format to prevent cache-key injection (expecting UUID) + const uuidRegex = + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9-a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + + if (typeof searchId === "string" && uuidRegex.test(searchId)) { + try { + const cached = (await redis.get(`route_search:${searchId}`)) as { + breakpoints: RouteBreakpoints[]; + } | null; + // Upstash Redis usually returns the object directly if stored as JSON + if (cached && cached.breakpoints) { + routeBreakpointsData = cached.breakpoints; + } + } catch { + console.warn("[Cache] Redis get failed for validated searchId."); + } + } else { + console.warn("[Cache] Ignored invalid searchId format."); + } + } + + // Fallback: Re-compute if cache miss or no searchId + if (!routeBreakpointsData || routeBreakpointsData.length === 0) { + try { + // Dynamically import to avoid circular dep issues if any, + // though static import is fine here too. + const { computeBreakpoints } = + await import("../utils/compute/breakPoint.compute.js"); + routeBreakpointsData = computeBreakpoints(routes) as RouteBreakpoints[]; + } catch (e) { + console.error("Failed to re-compute breakpoints during save:", e); + // If this fails, we save the route without breakpoints rather than crashing + } + } + + // 3. Save BreakPoint Documents + if (routeBreakpointsData && routeBreakpointsData.length > 0) { + const breakPointDocs: BreakpointDoc[] = []; + + // Iterate through each route option (0, 1, 2) + routeBreakpointsData.forEach( + (rb: RouteBreakpoints, routeIndex: number) => { + // Iterate through points in this route (point_1, point_2...) + Object.keys(rb).forEach((key) => { + if (key.startsWith("point_")) { + const parts = key.split("_"); + if (parts.length < 2) return; + const idxStr = parts[1]; + if (!idxStr) return; + const pointIndex = parseInt(idxStr, 10) - 1; // 1-based to 0-based + if (Number.isNaN(pointIndex)) return; + + const coord = rb[key]; // { lat, lon } + + if (coord) { + breakPointDocs.push({ + routeId: newRoute._id, + routeOptionIndex: routeIndex, + pointIndex: pointIndex, + location: { + type: "Point", + coordinates: [coord.lon, coord.lat], // GeoJSON: [lon, lat] + }, + }); + } + } + }); + } + ); + + if (breakPointDocs.length > 0) { + await BreakPoint.insertMany(breakPointDocs); + } + } + res.status(201).json({ success: true, route: newRoute }); } catch (error) { console.error("saveRoute error:", error); @@ -71,6 +178,9 @@ export const deleteRoute = async ( return; } + // Cascade-delete associated breakpoints + await BreakPoint.deleteMany({ routeId: route._id }); + res.status(200).json({ success: true, message: "Route deleted" }); } catch (error) { console.error("deleteRoute error:", error); diff --git a/server/src/controllers/score.controller.ts b/server/src/controllers/score.controller.ts index 9763428..0c13ad5 100644 --- a/server/src/controllers/score.controller.ts +++ b/server/src/controllers/score.controller.ts @@ -1,4 +1,5 @@ import type { Request, Response } from "express"; +import { v4 as uuidv4 } from "uuid"; import { computeAQI, @@ -9,6 +10,7 @@ import { computeWeather, type RouteWeatherResult, } from "../utils/compute/weather.compute.js"; +import redis from "../utils/redis.js"; // Type definitions for the request interface RouteGeometry { @@ -483,9 +485,28 @@ export const getScoreController = async (req: Request, res: Response) => { current.overallScore > best.overallScore ? current : best ); + // --- REDIS CACHING START --- + const searchId = uuidv4(); + try { + // Store ONLY raw breakpoints (to save locations later) + // TTL: 3600 seconds (1 hour) + await redis.set( + `route_search:${searchId}`, + JSON.stringify({ + breakpoints, + timestamp: new Date().toISOString(), + }), + { ex: 3600 } + ); + } catch (redisError) { + console.error("Redis caching failed (continuing anyway):", redisError); + } + // --- REDIS CACHING END --- + res.json({ success: true, message: "Route scores computed successfully", + searchId, // <--- Return this to client data: { routes: routeScores, bestRoute: { diff --git a/server/src/index.ts b/server/src/index.ts index 705921e..76755d0 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -3,10 +3,16 @@ import cors from "cors"; import "dotenv/config"; import express from "express"; +import { tokenVerify } from "./middleware/tokenVerify.js"; import authRoutes from "./routes/auth.routes.js"; import savedRoutesRoutes from "./routes/savedRoutes.routes.js"; import scoreRoutes from "./routes/score.routes.js"; import connectDB from "./utils/connectDB.js"; +import { + initScheduler, + runManualBatchScoring, +} from "./utils/scheduler/computeData.scheduler.js"; +import { checkPathwayHealth } from "./utils/scheduler/pathwayClient.js"; const app = express(); const PORT = process.env.PORT ?? 8000; @@ -29,10 +35,40 @@ app.get("/", (_req, res) => { res.json({ message: "Server is running" }); }); +// Manual trigger for batch scoring (admin only) +app.post("/api/v1/scheduler/run", tokenVerify, async (_req, res) => { + try { + await runManualBatchScoring(); + res + .status(202) + .json({ message: "Batch scoring completed", status: "done" }); + } catch (error) { + console.error("Manual batch scoring failed:", error); + res.status(500).json({ message: "Failed to run batch scoring" }); + } +}); + +// Health check for Pathway server (admin only) +app.get("/api/v1/scheduler/pathway-health", tokenVerify, async (_req, res) => { + const pathwayUrl = process.env.PATHWAY_URL || "http://localhost:8001"; + const isHealthy = await checkPathwayHealth(pathwayUrl); + res.json({ + pathway: isHealthy ? "healthy" : "unhealthy", + }); +}); + connectDB() .then(() => { app.listen(PORT, () => { console.log(`Server running on port ${PORT}`); + + // Initialize the cron scheduler only when explicitly opted in + if (process.env.ENABLE_SCHEDULER === "true") { + initScheduler(); + console.log("Batch scoring scheduler initialized"); + } else { + console.log("Scheduler disabled (set ENABLE_SCHEDULER=true to enable)"); + } }); }) .catch((error) => { diff --git a/server/src/utils/redis.ts b/server/src/utils/redis.ts new file mode 100644 index 0000000..b7fc0d0 --- /dev/null +++ b/server/src/utils/redis.ts @@ -0,0 +1,18 @@ +import { Redis } from "@upstash/redis"; + +const UPSTASH_URL = process.env.UPSTASH_REDIS_REST_URL; +const UPSTASH_TOKEN = process.env.UPSTASH_REDIS_REST_TOKEN; + +if (!UPSTASH_URL || !UPSTASH_TOKEN) { + throw new Error( + "Missing Redis configuration: UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN " + + "must be set in environment variables." + ); +} + +const redis = new Redis({ + url: UPSTASH_URL, + token: UPSTASH_TOKEN, +}); + +export default redis; diff --git a/server/src/utils/scheduler/computeData.scheduler.ts b/server/src/utils/scheduler/computeData.scheduler.ts new file mode 100644 index 0000000..66fde7c --- /dev/null +++ b/server/src/utils/scheduler/computeData.scheduler.ts @@ -0,0 +1,302 @@ +/** + * Periodic Route Score Computation Scheduler + * + * This scheduler runs every 10-15 minutes and: + * 1. Fetches all saved routes from MongoDB + * 2. For each route, retrieves stored breakpoints + * 3. Fetches fresh AQI/weather data for those breakpoints + * 4. Sends data to Pathway for score computation + * 5. Updates route documents with new scores + * + * Uses incremental per-route processing for: + * - Memory efficiency + * - Faster perceived progress (DB updated incrementally) + * - Better error recovery (partial success possible) + * - Rate-limit friendly pacing + */ +import cron from "node-cron"; + +import BreakPoint from "../../Schema/breakPoints.js"; +import Route from "../../Schema/route.schema.js"; +import { computeAQI } from "../compute/aqi.compute.js"; +import { + computeWeather, + type RoutePoints, +} from "../compute/weather.compute.js"; +import { type PathwayRouteInput, sendToPathway } from "./pathwayClient.js"; + +// Configuration +const PATHWAY_URL = process.env.PATHWAY_URL || "http://localhost:8001"; +const BATCH_SIZE = 5; // Number of routes to process in parallel +const CRON_SCHEDULE = process.env.CRON_SCHEDULE || "*/15 * * * *"; // Every 15 minutes + +/** + * Convert stored breakpoints to RoutePoints format expected by compute utilities + */ +function breakpointsToRoutePoints( + breakpoints: Array<{ + pointIndex: number; + location: { coordinates: [number, number] }; + }> +): RoutePoints { + const routePoints: RoutePoints = {}; + + // Sort by pointIndex and map to point_1, point_2, etc. + const sorted = [...breakpoints].sort((a, b) => a.pointIndex - b.pointIndex); + + sorted.forEach((bp, index) => { + const key = `point_${index + 1}` as keyof RoutePoints; + // MongoDB stores as [lon, lat], convert to {lat, lon} + routePoints[key] = { + lat: bp.location.coordinates[1], + lon: bp.location.coordinates[0], + }; + }); + + return routePoints; +} + +/** + * Process a single route: fetch data, compute score, update DB + */ +async function processRoute( + routeId: string, + routeOptionIndex: number, + routeOption: { + distance: number; + duration: number; + travelMode: string; + lastComputedScore?: number; + } +): Promise<{ + success: boolean; + routeId: string; + routeOptionIndex: number; + newScore?: number; + error?: string; +}> { + try { + // Step 1: Fetch breakpoints for this route option from MongoDB + const breakpoints = await BreakPoint.find({ + routeId, + routeOptionIndex, + }).sort({ pointIndex: 1 }); + + if (breakpoints.length === 0) { + console.warn( + `[Scheduler] No breakpoints found for route ${routeId}, option ${routeOptionIndex}` + ); + return { + success: false, + routeId, + routeOptionIndex, + error: "No breakpoints found", + }; + } + + // Step 2: Convert to RoutePoints format + const routePoints = breakpointsToRoutePoints(breakpoints); + + // Step 3: Fetch weather and AQI data in parallel + const [weatherResults, aqiResults] = await Promise.all([ + computeWeather([routePoints]), + computeAQI([routePoints]), + ]); + + const weatherData = weatherResults[0]; + const aqiData = aqiResults[0]; + + if (!weatherData || !aqiData) { + return { + success: false, + routeId, + routeOptionIndex, + error: "Failed to fetch environmental data", + }; + } + + // Step 4: Prepare data for Pathway + const pathwayInput: PathwayRouteInput = { + routeId, + routeIndex: routeOptionIndex, + distance: routeOption.distance, + duration: routeOption.duration, + travelMode: routeOption.travelMode, + weatherPoints: weatherData.points.map((p) => ({ main: p.main })), + aqiPoints: aqiData.points.map((p) => ({ + aqi: p.aqi, + })) as PathwayRouteInput["aqiPoints"], + trafficValue: 0, // TODO: Add traffic computation if needed + ...(routeOption.lastComputedScore !== undefined + ? { lastComputedScore: routeOption.lastComputedScore } + : {}), + }; + + // Step 5: Send to Pathway for computation + const pathwayResult = await sendToPathway(PATHWAY_URL, [pathwayInput]); + + if (!pathwayResult.success || !pathwayResult.routes?.[0]) { + return { + success: false, + routeId, + routeOptionIndex, + error: pathwayResult.message || "Pathway computation failed", + }; + } + + const computedScore = pathwayResult.routes[0]; + + // Step 6: Update route in MongoDB + await Route.updateOne( + { _id: routeId }, + { + $set: { + [`routes.${routeOptionIndex}.lastComputedScore`]: + computedScore.overallScore, + [`routes.${routeOptionIndex}.lastComputedAt`]: new Date(), + }, + } + ); + + return { + success: true, + routeId, + routeOptionIndex, + newScore: computedScore.overallScore, + }; + } catch (error) { + console.error(`[Scheduler] Error processing route ${routeId}:`, error); + return { + success: false, + routeId, + routeOptionIndex, + error: error instanceof Error ? error.message : "Unknown error", + }; + } +} + +/** + * Main batch scoring job + * Processes all routes incrementally + */ +export async function runBatchScoring(): Promise { + const startTime = Date.now(); + + try { + // Fetch all routes (or filter by criteria like isFavorite, etc.) + const routes = await Route.find({}).lean(); + + if (routes.length === 0) { + return; + } + + // Build list of all route options to process + const tasks: Array<{ + routeId: string; + routeOptionIndex: number; + routeOption: { + distance: number; + duration: number; + travelMode: string; + lastComputedScore?: number; + }; + }> = []; + + for (const route of routes) { + route.routes.forEach((option, index) => { + tasks.push({ + routeId: route._id.toString(), + routeOptionIndex: index, + routeOption: { + distance: option.distance, + duration: option.duration, + travelMode: option.travelMode?.type || "driving", + ...(option.lastComputedScore !== undefined && + option.lastComputedScore !== null + ? { lastComputedScore: option.lastComputedScore } + : {}), + }, + }); + }); + } + + // Process in batches for controlled parallelism + const results: Array<{ + success: boolean; + routeId: string; + routeOptionIndex: number; + newScore?: number; + error?: string; + }> = []; + + for (let i = 0; i < tasks.length; i += BATCH_SIZE) { + const batch = tasks.slice(i, i + BATCH_SIZE); + + const batchResults = await Promise.all( + batch.map((task) => + processRoute(task.routeId, task.routeOptionIndex, task.routeOption) + ) + ); + + results.push(...batchResults); + + // Small delay between batches to be rate-limit friendly + if (i + BATCH_SIZE < tasks.length) { + await new Promise((resolve) => setTimeout(resolve, 500)); + } + } + + // Summary + const successful = results.filter((r) => r.success).length; + const failed = results.filter((r) => !r.success).length; + const duration = ((Date.now() - startTime) / 1000).toFixed(2); + + if (failed > 0) { + const failures = results.filter((r) => !r.success); + console.warn("[Scheduler] Failed routes:", failures); + } + + console.info( + `[Scheduler] Batch scoring completed: ${successful} succeeded, ${failed} failed in ${duration}s` + ); + } catch (error) { + console.error("[Scheduler] Critical error in batch scoring:", error); + } +} + +/** + * Initialize the cron scheduler + */ +let _isRunning = false; + +export function initScheduler(): void { + cron.schedule(CRON_SCHEDULE, async () => { + if (_isRunning) { + console.info( + "[Scheduler] Previous batch still running — skipping this tick" + ); + return; + } + _isRunning = true; + try { + await runBatchScoring(); + } catch (error) { + console.error("[Scheduler] Unhandled error in batch scoring:", error); + } finally { + _isRunning = false; + } + }); +} + +/** + * Run batch scoring manually (for testing/debugging) + */ +export async function runManualBatchScoring(): Promise { + await runBatchScoring(); +} + +export default { + initScheduler, + runBatchScoring, + runManualBatchScoring, +}; diff --git a/server/src/utils/scheduler/pathwayClient.ts b/server/src/utils/scheduler/pathwayClient.ts new file mode 100644 index 0000000..f066bb5 --- /dev/null +++ b/server/src/utils/scheduler/pathwayClient.ts @@ -0,0 +1,216 @@ +/** + * Pathway Client + * + * Communicates with the Django/Pathway data processing server + * to compute route health scores. + */ + +// Input format expected by Pathway endpoint +export interface PathwayRouteInput { + routeId?: string; + routeIndex: number; + distance: number; + duration: number; + travelMode: string; + weatherPoints: Array<{ + main: { + temp?: number; + humidity?: number; + pressure?: number; + feels_like?: number; + temp_min?: number; + temp_max?: number; + } | null; + }>; + aqiPoints: Array<{ + aqi: { + aqi?: number; + dominentpol?: string | undefined; + iaqi?: { + pm25?: { v: number }; + pm10?: { v: number }; + o3?: { v: number }; + no2?: { v: number }; + so2?: { v: number }; + co?: { v: number }; + }; + time?: { + s: string; + tz: string; + }; + } | null; + }>; + trafficValue: number; + lastComputedScore?: number; +} + +// Output format from Pathway endpoint +export interface PathwayRouteOutput { + routeIndex: number; + routeId?: string; + distance: number; + duration: number; + travelMode: string; + breakpointCount: number; + weatherScore: { + temperature: number; + humidity: number; + pressure: number; + overall: number; + }; + weatherDetails?: { + avgTemp: number; + avgHumidity: number; + avgPressure: number; + }; + aqiScore: { + aqi: number; + score: number; + category: string; + }; + aqiDetails?: { + dominentpol?: string; + pollutants?: { + pm25?: number; + pm10?: number; + o3?: number; + no2?: number; + so2?: number; + co?: number; + }; + }; + trafficScore: number; + overallScore: number; + lastComputedScore?: number; + scoreChange?: number; + computedAt: string; +} + +export interface PathwayResponse { + success: boolean; + message: string; + routes?: PathwayRouteOutput[]; + bestRoute?: { + index: number; + routeId?: string; + score: number; + }; + summary?: { + totalRoutes: number; + averageScore: number; + scoreRange: { + min: number; + max: number; + }; + }; + computedAt?: string; + engine?: string; +} + +/** + * Send routes to Pathway for score computation + * + * @param baseUrl - Base URL of the Pathway server (e.g., "http://localhost:8001") + * @param routes - Array of route data with pre-fetched weather/AQI + * @returns Pathway response with computed scores + */ +export async function sendToPathway( + baseUrl: string, + routes: PathwayRouteInput[] +): Promise { + const url = `${baseUrl}/api/compute-scores/`; + const timeout = 30000; // 30 second timeout + + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), timeout); + + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ routes }), + signal: controller.signal, + }); + + if (!response.ok) { + const errorText = await response.text(); + console.error( + `[PathwayClient] HTTP error ${response.status}: ${errorText}` + ); + return { + success: false, + message: `HTTP ${response.status}: ${errorText}`, + }; + } + + const data = (await response.json()) as PathwayResponse; + + return data; + } catch (error) { + if (error instanceof Error) { + if (error.name === "AbortError") { + console.error("[PathwayClient] Request timed out"); + return { + success: false, + message: "Request timed out", + }; + } + console.error(`[PathwayClient] Error: ${error.message}`); + return { + success: false, + message: error.message, + }; + } + return { + success: false, + message: "Unknown error", + }; + } finally { + clearTimeout(timeoutId); + } +} + +/** + * Health check for Pathway server + */ +export async function checkPathwayHealth(baseUrl: string): Promise { + const url = `${baseUrl}/api/health/`; + const timeout = 30000; + + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), timeout); + + try { + const response = await fetch(url, { + method: "GET", + signal: controller.signal, + }); + if (response.ok) { + const data = await response.json(); + console.log( + `[PathwayClient] Health check passed: ${JSON.stringify(data)}` + ); + return true; + } + console.error( + `[PathwayClient] Health check failed: HTTP ${response.status}` + ); + return false; + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + console.error("[PathwayClient] Health check timed out"); + } else { + console.error(`[PathwayClient] Health check error:`, error); + } + return false; + } finally { + clearTimeout(timeoutId); + } +} + +export default { + sendToPathway, + checkPathwayHealth, +};