-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfastMCP_server.py
More file actions
87 lines (69 loc) · 3.69 KB
/
fastMCP_server.py
File metadata and controls
87 lines (69 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
from typing import List
from fastmcp import FastMCP
from convex import ConvexClient
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# --- Convex Client Setup ---
CONVEX_URL = os.getenv("CONVEX_URL")
if CONVEX_URL:
convex_client = ConvexClient(CONVEX_URL)
else:
print("Warning: CONVEX_URL not set. Cannot fetch alerts from Convex.")
convex_client = None
# --- FastMCP App ---
mcp = FastMCP("Vulnerability MCP Server")
# --- MCP Tools ---
@mcp.tool()
def get_vulnerability_alerts(repo_id: int = 1069332417) -> List[dict]:
"""Get active vulnerability alerts from Convex for the specified repository."""
if not convex_client:
return [{"message": "Convex client not configured. Cannot fetch alerts."}]
try:
alerts = convex_client.query("dependabotAlerts:getActive", {"repo_id": repo_id})
return alerts if alerts else [{"message": "No active vulnerability alerts found."}]
except Exception as e:
return [{"error": f"Failed to fetch alerts from Convex: {str(e)}"}]
@mcp.tool()
def get_latest_vulnerability(repo_id: int = 1069332417) -> dict:
"""Get the details of the most recent active vulnerability."""
alerts = get_vulnerability_alerts(repo_id)
if not alerts or "message" in alerts[0] or "error" in alerts[0]:
return alerts[0] if alerts else {"message": "No active vulnerability alerts."}
# Return the most recent alert (assuming they're ordered by creation time)
return alerts[-1] if alerts else {"message": "No active vulnerability alerts."}
@mcp.tool()
def suggest_vulnerability_fix(repo_id: int = 1069332417) -> dict:
"""Suggest a fix for the most recent active vulnerability."""
alerts = get_vulnerability_alerts(repo_id)
if not alerts or "message" in alerts[0] or "error" in alerts[0]:
return alerts[0] if alerts else {"message": "No vulnerability alert available to suggest a fix."}
latest_alert = alerts[-1]
# Extract data from the alertData field if it exists, or use the alert directly
alert_data = latest_alert.get("alertData", latest_alert)
# Navigate the nested structure safely
alert_obj = alert_data.get("alert", {})
advisory = alert_obj.get("security_advisory", {})
vulnerabilities = advisory.get("vulnerabilities", [])
if not vulnerabilities:
return {"message": "Could not extract vulnerability information."}
vulnerability = vulnerabilities[0]
package_name = vulnerability.get("package", {}).get("name", "N/A")
patched_version_info = vulnerability.get("first_patched_version")
patched_version = patched_version_info.get("identifier", "N/A") if patched_version_info else "N/A"
dependency = alert_obj.get("dependency", {})
manifest_path = dependency.get("manifest_path", "N/A")
suggestion = (f"To fix the vulnerability in '{package_name}', "
f"update the version to '{patched_version}' in the '{manifest_path}' file."
f"If the vulnerability is not fixed, you can suggest a fix for the vulnerability."
f"If the vulnerability version in question is not found, you can check lock file or use run command pnpm -r why <package_name> to find which dependency is causing the vulnerability to be pulled in.")
return {"suggestion": suggestion, "details": latest_alert}
# --- Server Startup ---
if __name__ == "__main__":
print("Starting MCP Server...")
print("Fetching alerts directly from Convex (no Redis)")
# Run the server using FastMCP's recommended method
# For Cursor/Cline, stdio transport is more reliable
# To use HTTP instead: mcp.run(transport="http", host="127.0.0.1", port=8001)
mcp.run(transport="stdio")