Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 267 additions & 5 deletions api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2570,6 +2570,10 @@ def handle_get(handler, parsed) -> bool:
if parsed.path == "/api/mcp/servers":
return _handle_mcp_servers_list(handler)

# ── MCP Tools (GET) ──
if parsed.path == "/api/mcp/tools":
return _handle_mcp_tools_list(handler)

# ── Checkpoints / Rollback (GET) ──
if parsed.path == "/api/rollback/list":
qs = parse_qs(parsed.query)
Expand Down Expand Up @@ -7133,33 +7137,291 @@ def _mask_secrets(obj):
return masked


def _server_summary(name, cfg):
def _parse_mcp_enabled(value) -> bool:
"""Parse Hermes MCP ``enabled`` values without raising on bad config."""
if value is None:
return True
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return value != 0
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"true", "1", "yes", "on"}:
return True
if normalized in {"false", "0", "no", "off"}:
return False
return True


def _mcp_runtime_status_by_name() -> dict[str, dict]:
"""Return already-known MCP runtime status without starting servers.

``tools.mcp_tool.get_mcp_status()`` only reads the existing MCP registry and
configuration; it does not probe or spawn MCP subprocesses. If Hermes Agent
is unavailable, fall back to an empty map so the API remains safe.
"""
try:
from tools.mcp_tool import get_mcp_status
statuses = get_mcp_status()
except Exception:
return {}
if not isinstance(statuses, list):
return {}
return {
str(entry.get("name")): entry
for entry in statuses
if isinstance(entry, dict) and entry.get("name")
}


def _server_summary(name, cfg, runtime_status=None):
"""Return a safe summary of an MCP server config."""
runtime_status = runtime_status if isinstance(runtime_status, dict) else {}
out = {"name": name}
if not isinstance(cfg, dict):
out.update({
"transport": "invalid",
"timeout": 120,
"connect_timeout": 60,
"enabled": False,
"active": False,
"status": "invalid_config",
"tool_count": None,
})
return out

enabled = _parse_mcp_enabled(cfg.get("enabled", True))
connected = bool(runtime_status.get("connected")) if enabled else False
if "url" in cfg:
out["transport"] = "http"
# Mask auth headers
if "headers" in cfg:
out["headers"] = _mask_secrets(cfg["headers"])
out["url"] = cfg["url"]
else:
elif "command" in cfg:
out["transport"] = "stdio"
out["command"] = cfg.get("command", "")
out["args"] = cfg.get("args", [])
if "env" in cfg:
out["env"] = _mask_secrets(cfg["env"])
else:
out["transport"] = "invalid"
enabled = False
connected = False

out["timeout"] = cfg.get("timeout", 120)
out["connect_timeout"] = cfg.get("connect_timeout", 60)
out["enabled"] = enabled
out["active"] = connected
if out["transport"] == "invalid":
out["status"] = "invalid_config"
elif not enabled:
out["status"] = "disabled"
elif connected:
out["status"] = "active"
else:
out["status"] = "configured"
out["tool_count"] = runtime_status.get("tools") if runtime_status else None
return out


def _mcp_safe_display_text(value, *, limit: int) -> str:
"""Return redacted, bounded MCP text safe for WebUI inventory rows."""
if not isinstance(value, str):
value = "" if value is None else str(value)
value = _redact_text(value).strip()
value = re.sub(r"Authorization:\s*Bearer\s+\S+", "[REDACTED CREDENTIAL]", value, flags=re.I)
if len(value) > limit:
value = value[: max(0, limit - 1)].rstrip() + "…"
return value


def _mcp_schema_type(schema) -> str:
"""Return a compact, non-sensitive display type for a JSON schema node."""
if not isinstance(schema, dict):
return "unknown"
typ = schema.get("type")
if isinstance(typ, list):
typ = "/".join(str(t) for t in typ if t)
if isinstance(typ, str) and typ:
return typ
for composite in ("anyOf", "oneOf", "allOf"):
if isinstance(schema.get(composite), list) and schema[composite]:
return composite
if "enum" in schema:
return "enum"
return "unknown"


def _mcp_schema_summary(schema, *, limit: int = 12) -> list[dict]:
"""Summarize an MCP input schema without exposing raw defaults/examples.

The WebUI only needs searchable/displayable argument hints. Returning raw
JSON Schema can overexpose server-provided defaults, examples, enums, or
vendor extensions, so this strips each parameter down to name/type/required
and a redacted description.
"""
if not isinstance(schema, dict):
return []
properties = schema.get("properties")
if not isinstance(properties, dict):
return []
required = schema.get("required")
required_names = set(required) if isinstance(required, list) else set()
out = []
for name, prop in properties.items():
if len(out) >= limit:
break
if not isinstance(name, str):
continue
prop = prop if isinstance(prop, dict) else {}
desc = prop.get("description", "")
if not isinstance(desc, str):
desc = ""
desc = _mcp_safe_display_text(desc, limit=180)
out.append({
"name": name,
"type": _mcp_schema_type(prop),
"required": name in required_names,
"description": desc,
})
return out


def _mcp_tool_schema_from_payload(tool):
if not isinstance(tool, dict):
return {}
for key in ("parameters", "inputSchema", "input_schema", "schema"):
value = tool.get(key)
if isinstance(value, dict):
if key == "schema" and isinstance(value.get("parameters"), dict):
return value["parameters"]
return value
return {}


def _mcp_tool_summary(name, tool, server_summary):
"""Return a safe global inventory row for one MCP tool."""
server_summary = server_summary if isinstance(server_summary, dict) else {}
if isinstance(tool, str):
tool = {"name": tool}
elif not isinstance(tool, dict):
tool = {}
tool_name = str(tool.get("name") or name or "")
description = tool.get("description") or ""
if not isinstance(description, str):
description = str(description)
description = _mcp_safe_display_text(description, limit=360)
return {
"name": tool_name,
"server": str(server_summary.get("name") or ""),
"description": description,
"active": bool(server_summary.get("active")),
"enabled": bool(server_summary.get("enabled")),
"status": server_summary.get("status") or "unknown",
"schema_summary": _mcp_schema_summary(_mcp_tool_schema_from_payload(tool)),
}


def _mcp_tools_from_runtime_status(runtime_by_name, server_summaries):
"""Read detailed MCP tool payloads from runtime status when available."""
tools = []
if not isinstance(runtime_by_name, dict):
return tools
for server_name, runtime in runtime_by_name.items():
if not isinstance(runtime, dict):
continue
raw_tools = runtime.get("tools")
if not isinstance(raw_tools, list):
raw_tools = runtime.get("tool_schemas")
if not isinstance(raw_tools, list):
continue
server_summary = server_summaries.get(str(server_name), {"name": str(server_name)})
for index, tool in enumerate(raw_tools):
fallback_name = f"{server_name}:{index}"
summary = _mcp_tool_summary(fallback_name, tool, server_summary)
if summary["name"]:
tools.append(summary)
return tools


def _mcp_tools_from_registry(server_summaries):
"""Read already-registered MCP tool schemas without probing MCP servers."""
try:
from tools.registry import registry
except Exception:
return []
tools = []
try:
names = registry.get_all_tool_names()
except Exception:
return []
for tool_name in names:
try:
toolset = registry.get_toolset_for_tool(tool_name)
except Exception:
continue
if not isinstance(toolset, str) or not toolset.startswith("mcp-"):
continue
server_name = toolset[len("mcp-"):]
schema = registry.get_schema(tool_name) or {}
server_summary = server_summaries.get(server_name, {
"name": server_name,
"enabled": True,
"active": False,
"status": "configured",
})
tools.append(_mcp_tool_summary(tool_name, schema, server_summary))
return tools


def _handle_mcp_tools_list(handler):
"""List known MCP tools from already-available runtime inventory only."""
cfg = get_config()
servers = cfg.get("mcp_servers", {})
if not isinstance(servers, dict):
servers = {}
runtime = _mcp_runtime_status_by_name()
server_summaries = {
str(name): _server_summary(str(name), scfg, runtime.get(str(name)))
for name, scfg in servers.items()
}
tools = _mcp_tools_from_runtime_status(runtime, server_summaries)
source = "mcp_runtime_status"
if not tools:
tools = _mcp_tools_from_registry(server_summaries)
source = "tool_registry" if tools else "none"
tools.sort(key=lambda row: (row.get("server", ""), row.get("name", "")))
unavailable_servers = [
summary["name"] for summary in server_summaries.values()
if summary.get("enabled") and not summary.get("active")
]
return j(handler, {
"tools": tools,
"total": len(tools),
"source": source,
"inventory_scope": "already_known_runtime_only",
"unavailable_servers": unavailable_servers,
})


def _handle_mcp_servers_list(handler):
"""List all configured MCP servers."""
"""List configured MCP servers with safe, read-only runtime visibility."""
cfg = get_config()
servers = cfg.get("mcp_servers", {})
if not isinstance(servers, dict):
servers = {}
result = [_server_summary(name, scfg) for name, scfg in servers.items()]
return j(handler, {"servers": result})
runtime = _mcp_runtime_status_by_name()
result = [
_server_summary(name, scfg, runtime.get(str(name)))
for name, scfg in servers.items()
]
return j(handler, {
"servers": result,
"toggle_supported": False,
"reload_required": True,
})


def _handle_mcp_server_delete(handler, name):
Expand Down
Binary file added docs/pr-media/696/mcp-servers-system-panel.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/pr-media/697/mcp-tools-search-filter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading