Add Dominion Observatory trust verification guardrail plugin#18
Add Dominion Observatory trust verification guardrail plugin#18vdineshk wants to merge 1 commit into
Conversation
Adds a new guardrail plugin that checks MCP server behavioral trust scores via the Dominion Observatory API before forwarding tool calls. Servers scoring below the configurable threshold (default: 60) are blocked. Includes 5-minute score caching and fail-open/fail-closed modes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR adds a new Dominion Observatory Trust Verification guardrail plugin to the MCP Gateway. The plugin gates tool-call requests based on behavioral trust scores fetched from an external API, with built-in TTL caching and configurable handling of API failures. ChangesDominion Trust Verification Plugin
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
mcp_gateway/plugins/guardrails/dominion.py (1)
98-107: ⚡ Quick winValidate config values against documented bounds.
trust_threshold,cache_ttl_seconds, andrequest_timeout_secondsare accepted without validation, so invalid values can silently disable or over-block trust enforcement.Proposed guard clauses
def load(self, config: Optional[Dict[str, Any]] = None) -> None: @@ - self.trust_threshold = config.get("trust_threshold", DEFAULT_TRUST_THRESHOLD) - self.request_timeout = config.get( + trust_threshold = config.get("trust_threshold", DEFAULT_TRUST_THRESHOLD) + request_timeout = config.get( "request_timeout_seconds", DEFAULT_REQUEST_TIMEOUT_SECONDS ) + cache_ttl = config.get("cache_ttl_seconds", DEFAULT_CACHE_TTL_SECONDS) + + if not isinstance(trust_threshold, int) or not (0 <= trust_threshold <= 100): + raise ValueError("trust_threshold must be an integer between 0 and 100") + if not isinstance(request_timeout, int) or request_timeout <= 0: + raise ValueError("request_timeout_seconds must be a positive integer") + if not isinstance(cache_ttl, int) or cache_ttl <= 0: + raise ValueError("cache_ttl_seconds must be a positive integer") + + self.trust_threshold = trust_threshold + self.request_timeout = request_timeout self.fail_open = config.get("fail_open", False) - - cache_ttl = config.get("cache_ttl_seconds", DEFAULT_CACHE_TTL_SECONDS) self._cache = TrustScoreCache(ttl_seconds=cache_ttl)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@mcp_gateway/plugins/guardrails/dominion.py` around lines 98 - 107, Validate incoming config values for trust_threshold, cache_ttl_seconds, and request_timeout_seconds before using them: check that trust_threshold (used to set self.trust_threshold) is within 0.0–1.0, that cache_ttl_seconds (used to construct TrustScoreCache(ttl_seconds=...)) is a positive integer within your documented max/min, and that request_timeout_seconds (assigned to self.request_timeout) is a positive number within acceptable bounds; if a value is out of range either clamp it to the nearest valid bound or raise a clear ValueError and fall back to the DEFAULT_* constants (DEFAULT_TRUST_THRESHOLD, DEFAULT_CACHE_TTL_SECONDS, DEFAULT_REQUEST_TIMEOUT_SECONDS), and include these checks immediately before assigning api_base_url/trust_threshold/request_timeout/cache_ttl in the constructor so invalid configs cannot silently disable or over-enforce trust rules.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@mcp_gateway/plugins/guardrails/dominion.py`:
- Around line 202-212: The code assumes result.get("trust_score") is numeric;
guard against non-numeric values by validating or coercing trust_score before
comparing to self.trust_threshold: after obtaining trust_score from result in
dominion.py, check isinstance(trust_score, (int, float)) or attempt to coerce
via float() inside a try/except, log a warning including server_name and the bad
value on failure, and then honor self.fail_open by returning context.arguments
(or return None if closed) instead of allowing a TypeError to propagate during
the trust_score < self.trust_threshold comparison.
- Around line 127-135: The URL construction currently interpolates server_name
raw and allows any scheme on api_base_url; update the code that builds the
trust-score URL to URL-encode server_name (use urllib.parse.quote for a path
segment) and validate api_base_url's scheme by parsing it with
urllib.parse.urlparse and only allowing 'http' or 'https' (log/raise if not).
Locate the block using api_base_url, server_name, urllib.request.Request and
urllib.request.urlopen and change url =
f"{self.api_base_url.rstrip('/')}/benchmark/{server_name}" to build the path
with the quoted server_name and perform scheme validation using the parsed
result before issuing the request (leave request_timeout and headers handling
unchanged).
---
Nitpick comments:
In `@mcp_gateway/plugins/guardrails/dominion.py`:
- Around line 98-107: Validate incoming config values for trust_threshold,
cache_ttl_seconds, and request_timeout_seconds before using them: check that
trust_threshold (used to set self.trust_threshold) is within 0.0–1.0, that
cache_ttl_seconds (used to construct TrustScoreCache(ttl_seconds=...)) is a
positive integer within your documented max/min, and that
request_timeout_seconds (assigned to self.request_timeout) is a positive number
within acceptable bounds; if a value is out of range either clamp it to the
nearest valid bound or raise a clear ValueError and fall back to the DEFAULT_*
constants (DEFAULT_TRUST_THRESHOLD, DEFAULT_CACHE_TTL_SECONDS,
DEFAULT_REQUEST_TIMEOUT_SECONDS), and include these checks immediately before
assigning api_base_url/trust_threshold/request_timeout/cache_ttl in the
constructor so invalid configs cannot silently disable or over-enforce trust
rules.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 47c08ea2-c0a2-474f-a5fd-15b1f0090fe0
📒 Files selected for processing (3)
mcp_gateway/plugins/guardrails/__init__.pymcp_gateway/plugins/guardrails/dominion.pytests/test_dominion_trust.py
📜 Review details
🧰 Additional context used
🪛 Ruff (0.15.13)
mcp_gateway/plugins/guardrails/dominion.py
[error] 131-134: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.
(S310)
[error] 135-135: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.
(S310)
🔇 Additional comments (2)
tests/test_dominion_trust.py (1)
21-291: LGTM!mcp_gateway/plugins/guardrails/__init__.py (1)
8-15: LGTM!
| url = f"{self.api_base_url.rstrip('/')}/benchmark/{server_name}" | ||
| logger.debug(f"Fetching trust score from: {url}") | ||
|
|
||
| try: | ||
| req = urllib.request.Request( | ||
| url, | ||
| headers={"Accept": "application/json", "User-Agent": "mcp-gateway-dominion-plugin/1.0"}, | ||
| ) | ||
| with urllib.request.urlopen(req, timeout=self.request_timeout) as resp: |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Does Python urllib.request.urlopen accept file:// URLs, and what schemes are supported by default?
💡 Result:
Yes, Python's urllib.request.urlopen function accepts file:// URLs [1][2][3]. By default, the urllib.request module supports the following URL schemes: - HTTP [1][2][3] - HTTPS [1][2] - FTP [1][2][3] - file (local files) [1][2][3] - data (data URLs) [1][2][3] When using file:// URLs, the function utilizes the FileHandler class to open local files [3]. Note that if a URL appears to point to a local file but cannot be opened, the module may attempt to re-interpret it using the FTP protocol for backward compatibility, which can occasionally lead to confusing error messages [3]. Additionally, while urlopen provides a consistent interface across these schemes, certain features like timeout parameters are specifically limited to HTTP, HTTPS, and FTP connections and do not apply to file or data URLs [1][2].
Citations:
- 1: https://docs.python.org/3/library/urllib.request.html?highlight=urllib
- 2: https://docs.python.org/3/library/urllib.request.html
- 3: https://docs.python.org/3/library/urllib.request.html?highlight=requests
🏁 Script executed:
#!/bin/bash
set -euo pipefail
ls -la
sed -n '1,220p' mcp_gateway/plugins/guardrails/dominion.pyRepository: lasso-security/mcp-gateway
Length of output: 9147
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Continue showing the rest of the file around where the snippet left off.
sed -n '220,420p' mcp_gateway/plugins/guardrails/dominion.py
# See if there is any other use/validation of api_base_url or server_name in this plugin.
rg -n "api_base_url|server_name" mcp_gateway/plugins/guardrails/dominion.py
# Search the repo for where process_request/context.server_name is populated.
rg -n "class PluginContext|server_name" mcp_gateway | head -n 50Repository: lasso-security/mcp-gateway
Length of output: 7624
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '200,420p' mcp_gateway/server.py
sed -n '1,260p' mcp_gateway/sanitizers.py
# See if any sanitizer touches server_name or plugin configs.
rg -n "sanitize.*server_name|server_name=.*sanitize|api_base_url" mcp_gateway | head -n 100Repository: lasso-security/mcp-gateway
Length of output: 17141
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find where Server.name is set and how it maps from config.
rg -n "self\.name\s*=" mcp_gateway/server.py mcp_gateway/gateway.py mcp_gateway/**/*.py || true
# Show the Server class header + __init__ area.
sed -n '1,220p' mcp_gateway/server.py
# Also inspect gateway initialization of proxied servers/context.
sed -n '1,220p' mcp_gateway/gateway.py
# Check sanitization/dispatch entrypoints for PluginContext fields.
rg -n "PluginContext\(" -S mcp_gateway | head -n 50Repository: lasso-security/mcp-gateway
Length of output: 16931
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '1,220p' mcp_gateway/plugins/manager.py
sed -n '220,380p' mcp_gateway/plugins/manager.py
sed -n '1,220p' mcp_gateway/config.py
rg -n "api_base_url|trust_threshold|request_timeout_seconds|fail_open|cache_ttl_seconds" mcp_gateway | head -n 200Repository: lasso-security/mcp-gateway
Length of output: 22847
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find where PluginManager is created and whether it passes config to plugins.
rg -n "PluginManager\(" -S mcp_gateway | head -n 50
# Inspect gateway/main initialization area.
sed -n '1,240p' mcp_gateway/gateway.py
# Search for any passing of plugin config into load().
rg -n "\.load\(" mcp_gateway/plugins | head -n 50Repository: lasso-security/mcp-gateway
Length of output: 9017
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '320,460p' mcp_gateway/gateway.py
rg -n "enabled_plugins|enabled_types" mcp_gateway/gateway.py mcp_gateway/*.py mcp_gateway/**/gateway*.py || true
# Locate where proxied servers are created from config and what "name" is.
rg -n "Server\(" mcp_gateway/gateway.py mcp_gateway | head -n 50
sed -n '460,620p' mcp_gateway/gateway.pyRepository: lasso-security/mcp-gateway
Length of output: 13081
Harden Dominion trust-score URL construction (escape server_name; restrict api_base_url schemes).
In mcp_gateway/plugins/guardrails/dominion.py, server_name is interpolated directly into the request path; URL-encode it to avoid malformed URLs / path-or-query injection. Also, urllib.request.urlopen() can handle non-HTTP schemes (e.g., file://), so enforcing http/https on api_base_url is good defense-in-depth—though the gateway currently calls plugin_instance.load({}), so api_base_url stays at the default https://... unless that behavior changes.
Suggested hardening
+import urllib.parse
@@
- url = f"{self.api_base_url.rstrip('/')}/benchmark/{server_name}"
+ base_url = self.api_base_url.rstrip("/")
+ parsed = urllib.parse.urlsplit(base_url)
+ if parsed.scheme not in {"http", "https"}:
+ logger.error(f"Unsupported API URL scheme: {parsed.scheme}")
+ return None
+
+ safe_server_name = urllib.parse.quote(server_name, safe="")
+ url = f"{base_url}/benchmark/{safe_server_name}"🧰 Tools
🪛 Ruff (0.15.13)
[error] 131-134: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.
(S310)
[error] 135-135: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.
(S310)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@mcp_gateway/plugins/guardrails/dominion.py` around lines 127 - 135, The URL
construction currently interpolates server_name raw and allows any scheme on
api_base_url; update the code that builds the trust-score URL to URL-encode
server_name (use urllib.parse.quote for a path segment) and validate
api_base_url's scheme by parsing it with urllib.parse.urlparse and only allowing
'http' or 'https' (log/raise if not). Locate the block using api_base_url,
server_name, urllib.request.Request and urllib.request.urlopen and change url =
f"{self.api_base_url.rstrip('/')}/benchmark/{server_name}" to build the path
with the quoted server_name and perform scheme validation using the parsed
result before issuing the request (leave request_timeout and headers handling
unchanged).
| trust_score = result.get("trust_score") | ||
| if trust_score is None: | ||
| logger.warning( | ||
| f"No trust_score field in API response for server '{server_name}': {result}" | ||
| ) | ||
| if self.fail_open: | ||
| return context.arguments | ||
| return None | ||
|
|
||
| if trust_score < self.trust_threshold: | ||
| logger.warning( |
There was a problem hiding this comment.
Guard against non-numeric trust_score to avoid runtime failure.
If trust_score is returned as a string/object, Line 211 can raise TypeError and crash the guardrail path instead of applying fail_open/fail_closed.
Type-safe trust score handling
trust_score = result.get("trust_score")
if trust_score is None:
@@
if self.fail_open:
return context.arguments
return None
+
+ if not isinstance(trust_score, (int, float)) or isinstance(trust_score, bool):
+ logger.warning(
+ f"Invalid trust_score type for server '{server_name}': {type(trust_score).__name__}"
+ )
+ return context.arguments if self.fail_open else None📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| trust_score = result.get("trust_score") | |
| if trust_score is None: | |
| logger.warning( | |
| f"No trust_score field in API response for server '{server_name}': {result}" | |
| ) | |
| if self.fail_open: | |
| return context.arguments | |
| return None | |
| if trust_score < self.trust_threshold: | |
| logger.warning( | |
| trust_score = result.get("trust_score") | |
| if trust_score is None: | |
| logger.warning( | |
| f"No trust_score field in API response for server '{server_name}': {result}" | |
| ) | |
| if self.fail_open: | |
| return context.arguments | |
| return None | |
| if not isinstance(trust_score, (int, float)) or isinstance(trust_score, bool): | |
| logger.warning( | |
| f"Invalid trust_score type for server '{server_name}': {type(trust_score).__name__}" | |
| ) | |
| return context.arguments if self.fail_open else None | |
| if trust_score < self.trust_threshold: | |
| logger.warning( |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@mcp_gateway/plugins/guardrails/dominion.py` around lines 202 - 212, The code
assumes result.get("trust_score") is numeric; guard against non-numeric values
by validating or coercing trust_score before comparing to self.trust_threshold:
after obtaining trust_score from result in dominion.py, check
isinstance(trust_score, (int, float)) or attempt to coerce via float() inside a
try/except, log a warning including server_name and the bad value on failure,
and then honor self.fail_open by returning context.arguments (or return None if
closed) instead of allowing a TypeError to propagate during the trust_score <
self.trust_threshold comparison.
Summary
DominionTrustPluginguardrail that checks MCP server behavioral trust scores via the Dominion Observatory API before forwarding tool callsDetails
The plugin integrates with the existing guardrail plugin system and can be enabled via
--plugin dominionor-p dominion.Configuration options (via plugin config):
trust_threshold(default: 60) - minimum score to allow tool callscache_ttl_seconds(default: 300) - how long to cache trust scoresfail_open(default: false) - whether to allow requests when the API is unreachableapi_base_url- override the Dominion Observatory API endpointAPI:
GET /benchmark/{server_name}returns{trust_score: 0-100, ...}Files Changed
mcp_gateway/plugins/guardrails/dominion.py- Plugin implementation with cachingmcp_gateway/plugins/guardrails/__init__.py- Register the new plugintests/test_dominion_trust.py- Comprehensive test suite (19 tests covering caching, trust decisions, error handling, fail-open/closed modes)Test plan
pytest tests/test_dominion_trust.pyto verify all unit tests pass--plugin dominionand verify it blocks low-trust servers🤖 Generated with Claude Code
Summary by CodeRabbit