diff --git a/src/arbiter/__main__.py b/src/arbiter/__main__.py index 46151d5..95e4533 100644 --- a/src/arbiter/__main__.py +++ b/src/arbiter/__main__.py @@ -51,6 +51,21 @@ _SEVERITY_RANK = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1} +def _maybe_send_webhook(args: argparse.Namespace, payload_kwargs: dict) -> None: + """If --webhook is set, build a WebhookPayload and send it.""" + webhook_url = getattr(args, "webhook", None) + if not webhook_url: + return + from arbiter.webhooks import WebhookPayload, send_webhook + secret = getattr(args, "webhook_secret", "") or "" + payload = WebhookPayload(**payload_kwargs) + ok = send_webhook(webhook_url, payload, secret=secret) + if ok: + print("Webhook delivered successfully.", file=sys.stderr) + else: + print("WARNING: Webhook delivery failed.", file=sys.stderr) + + def _apply_noise_filter(findings: list[Finding], threshold: int | None) -> list[Finding]: """Apply noise filter if threshold is set. Prints summary and returns filtered findings.""" if threshold is None: @@ -298,6 +313,16 @@ def cmd_score(args: argparse.Namespace) -> None: print(f"Score: n/a ({score.grade}) — no scorable Python LOC | Findings: {score.total_findings} | LOC: {loc:,}") _print_footer() + _maybe_send_webhook(args, { + "repo": repo_path.name, + "score": score.overall if score.is_scorable else 0.0, + "grade": score.grade, + "findings": score.total_findings, + "loc": loc, + "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "dimensions": {"lint": score.lint_score, "security": score.security_score, "complexity": score.complexity_score}, + }) + if cfg.fail_under is not None and score.is_scorable and score.overall < cfg.fail_under: print(f"FAIL: Score {score.overall:.1f} is below threshold {cfg.fail_under}", file=sys.stderr) sys.exit(1) @@ -434,6 +459,16 @@ def cmd_certify(args: argparse.Namespace) -> None: ) print(f"Recorded in audit trail: {args.trail}", file=sys.stderr) + _maybe_send_webhook(args, { + "repo": repo_path.name, + "score": result.overall, + "grade": result.decision, + "findings": result.findings_count, + "loc": loc, + "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "dimensions": {"code": result.code_score, "governance": result.governance_score, "dependencies": result.dep_score}, + }) + def cmd_audit_trail(args: argparse.Namespace) -> None: """Manage the VERUM-aligned audit trail.""" @@ -1458,6 +1493,17 @@ def cmd_score_url(args: argparse.Namespace) -> None: print(f"Score: n/a ({score.grade}) — no scorable Python LOC | Findings: {score.total_findings} | LOC: {loc:,}") _print_footer() + _maybe_send_webhook(args, { + "repo": repo_name, + "score": score.overall if score.is_scorable else 0.0, + "grade": score.grade, + "findings": score.total_findings, + "loc": loc, + "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "dimensions": {"lint": score.lint_score, "security": score.security_score, "complexity": score.complexity_score}, + "url": args.url, + }) + if cfg.fail_under is not None and score.is_scorable and score.overall < cfg.fail_under: print(f"FAIL: Score {score.overall:.1f} is below threshold {cfg.fail_under}", file=sys.stderr) sys.exit(1) @@ -1991,6 +2037,41 @@ def cmd_compliance(args: argparse.Namespace) -> None: print(report) +def cmd_webhook_test(args: argparse.Namespace) -> None: + """Send a test webhook payload to verify connectivity.""" + from arbiter.webhooks import WebhookPayload, detect_format, format_slack, format_discord, send_webhook + + fmt = args.format or detect_format(args.url) + payload = WebhookPayload( + repo="arbiter-test", + score=85.0, + grade="B+", + findings=12, + loc=3500, + timestamp=datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + dimensions={"lint": 90.0, "security": 85.0, "complexity": 80.0}, + url="https://github.com/arbiter/test", + ) + + if args.dry_run: + if fmt == "slack": + data = format_slack(payload) + elif fmt == "discord": + data = format_discord(payload) + else: + data = payload.to_dict() + print(json.dumps(data, indent=2)) + return + + secret = args.secret or "" + ok = send_webhook(args.url, payload, secret=secret) + if ok: + print(f"Test webhook delivered to {fmt} endpoint.") + else: + print(f"FAILED: Could not deliver test webhook to {args.url}", file=sys.stderr) + sys.exit(1) + + def main() -> None: parser = argparse.ArgumentParser(description="Arbiter -- Agent-aware code quality system") parser.add_argument("--db", help="Path to SQLite database (default: arbiter_data.db)") @@ -2012,6 +2093,8 @@ def main() -> None: p_score.add_argument("--fail-under", type=float, default=None, help="Exit non-zero if score is below this threshold") p_score.add_argument("--noise-threshold", type=int, default=None, help="Cap findings per rule_id at this number (default: disabled)") p_score.add_argument("--profile", choices=list_profiles(), help="Scoring profile preset (default, enterprise, startup, oss, strict)") + p_score.add_argument("--webhook", type=str, default=None, help="Webhook URL to POST score results to") + p_score.add_argument("--webhook-secret", type=str, default="", help="HMAC-SHA256 secret for webhook signing") # compare p_compare = subparsers.add_parser("compare", help="Score two repos side-by-side and compare") @@ -2031,6 +2114,8 @@ def main() -> None: p_score_url.add_argument("--no-cleanup", action="store_true", help="Keep cloned repo after scoring") p_score_url.add_argument("--noise-threshold", type=int, default=None, help="Cap findings per rule_id at this number (default: disabled)") p_score_url.add_argument("--profile", choices=list_profiles(), help="Scoring profile preset (default, enterprise, startup, oss, strict)") + p_score_url.add_argument("--webhook", type=str, default=None, help="Webhook URL to POST score results to") + p_score_url.add_argument("--webhook-secret", type=str, default="", help="HMAC-SHA256 secret for webhook signing") # diff p_diff = subparsers.add_parser("diff", help="Score only files changed since base branch") @@ -2073,6 +2158,8 @@ def main() -> None: p_certify.add_argument("--trail", default="arbiter_audit.jsonl", help="Audit trail path") p_certify.add_argument("--no-audit", action="store_true", help="Skip audit trail recording") p_certify.add_argument("--fail-on-failed", action="store_true", help="Exit non-zero if FAILED") + p_certify.add_argument("--webhook", type=str, default=None, help="Webhook URL to POST certification results to") + p_certify.add_argument("--webhook-secret", type=str, default="", help="HMAC-SHA256 secret for webhook signing") # audit-trail p_audit = subparsers.add_parser("audit-trail", @@ -2297,6 +2384,14 @@ def main() -> None: p_compliance.add_argument("--noise-threshold", type=int, default=None, help="Suppress rules with more findings than this") p_compliance.add_argument("--profile", help=f"Scoring profile ({', '.join(SCORING_PROFILES)})") + # webhook-test + p_wh = subparsers.add_parser("webhook-test", help="Send a test webhook payload to verify connectivity") + p_wh.add_argument("url", help="Webhook URL to test") + p_wh.add_argument("--format", choices=["slack", "discord", "generic"], default=None, + help="Force output format (auto-detected from URL if omitted)") + p_wh.add_argument("--secret", type=str, default="", help="HMAC-SHA256 secret for signing") + p_wh.add_argument("--dry-run", action="store_true", help="Print formatted payload without sending") + args = parser.parse_args() commands = { @@ -2338,6 +2433,7 @@ def main() -> None: "profiles": cmd_profiles, "team": cmd_team, "compliance": cmd_compliance, + "webhook-test": cmd_webhook_test, } handler = commands.get(args.command) diff --git a/src/arbiter/webhooks.py b/src/arbiter/webhooks.py new file mode 100644 index 0000000..52b7ca8 --- /dev/null +++ b/src/arbiter/webhooks.py @@ -0,0 +1,179 @@ +"""Webhook notification module — send score results to external services. + +Supports Slack (Block Kit) and Discord (embeds) with auto-detection from URL. +Optional HMAC-SHA256 signing for payload verification. +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import urllib.request +import urllib.error +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone + + +@dataclass +class WebhookPayload: + """Score result payload for webhook delivery.""" + + repo: str + score: float + grade: str + findings: int + loc: int + timestamp: str + dimensions: dict = field(default_factory=dict) + url: str = "" + + def to_json(self) -> str: + """Serialize to JSON string.""" + return json.dumps(asdict(self), indent=2) + + def to_dict(self) -> dict: + """Serialize to plain dict.""" + return asdict(self) + + +def _grade_emoji(grade: str) -> str: + """Map letter grade to emoji for chat formatting.""" + return { + "A+": "\u2b50", "A": "\u2705", "A-": "\u2705", + "B+": "\U0001f7e2", "B": "\U0001f7e2", "B-": "\U0001f7e1", + "C+": "\U0001f7e1", "C": "\U0001f7e0", "C-": "\U0001f7e0", + "D": "\U0001f534", "F": "\u274c", "N/A": "\u2753", + }.get(grade, "\u2753") + + +def _grade_color(grade: str) -> int: + """Map letter grade to Discord embed color (decimal).""" + if grade.startswith("A"): + return 0x2ECC71 # green + if grade.startswith("B"): + return 0x3498DB # blue + if grade.startswith("C"): + return 0xF39C12 # orange + return 0xE74C3C # red + + +def format_slack(payload: WebhookPayload) -> dict: + """Format payload as Slack Block Kit message.""" + emoji = _grade_emoji(payload.grade) + dims = payload.dimensions + dim_parts = [f"*{k.title()}*: {v}" for k, v in dims.items()] if dims else [] + dim_text = " | ".join(dim_parts) if dim_parts else "No dimension breakdown" + + blocks = [ + { + "type": "header", + "text": { + "type": "plain_text", + "text": f"{emoji} Arbiter Score: {payload.repo}", + }, + }, + { + "type": "section", + "fields": [ + {"type": "mrkdwn", "text": f"*Score:* {payload.score:.1f}"}, + {"type": "mrkdwn", "text": f"*Grade:* {payload.grade}"}, + {"type": "mrkdwn", "text": f"*Findings:* {payload.findings:,}"}, + {"type": "mrkdwn", "text": f"*LOC:* {payload.loc:,}"}, + ], + }, + { + "type": "section", + "text": {"type": "mrkdwn", "text": dim_text}, + }, + { + "type": "context", + "elements": [ + {"type": "mrkdwn", "text": f"Scored at {payload.timestamp} by Arbiter"}, + ], + }, + ] + + if payload.url: + blocks.insert( + -1, + { + "type": "section", + "text": {"type": "mrkdwn", "text": f"<{payload.url}|View Repository>"}, + }, + ) + + return {"blocks": blocks} + + +def format_discord(payload: WebhookPayload) -> dict: + """Format payload as Discord embed.""" + emoji = _grade_emoji(payload.grade) + color = _grade_color(payload.grade) + + fields = [ + {"name": "Score", "value": f"{payload.score:.1f}", "inline": True}, + {"name": "Grade", "value": f"{emoji} {payload.grade}", "inline": True}, + {"name": "Findings", "value": f"{payload.findings:,}", "inline": True}, + {"name": "LOC", "value": f"{payload.loc:,}", "inline": True}, + ] + + for k, v in payload.dimensions.items(): + fields.append({"name": k.title(), "value": str(v), "inline": True}) + + embed: dict = { + "title": f"Arbiter Score: {payload.repo}", + "color": color, + "fields": fields, + "footer": {"text": f"Scored at {payload.timestamp} by Arbiter"}, + } + + if payload.url: + embed["url"] = payload.url + + return {"embeds": [embed]} + + +def detect_format(url: str) -> str: + """Auto-detect webhook service from URL pattern. + + Returns 'slack', 'discord', or 'generic'. + """ + if "hooks.slack.com" in url or "slack.com/api" in url: + return "slack" + if "discord.com/api/webhooks" in url or "discordapp.com/api/webhooks" in url: + return "discord" + return "generic" + + +def _sign_payload(body: bytes, secret: str) -> str: + """Compute HMAC-SHA256 signature for the payload body.""" + return hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest() + + +def send_webhook(url: str, payload: WebhookPayload, secret: str = "") -> bool: + """Send score notification to a webhook URL. + + Auto-detects Slack vs Discord from URL and formats accordingly. + Returns True on success, False on any failure. Never raises. + """ + try: + fmt = detect_format(url) + if fmt == "slack": + data = format_slack(payload) + elif fmt == "discord": + data = format_discord(payload) + else: + data = payload.to_dict() + + body = json.dumps(data).encode("utf-8") + + headers = {"Content-Type": "application/json"} + if secret: + headers["X-Arbiter-Signature"] = _sign_payload(body, secret) + + req = urllib.request.Request(url, data=body, headers=headers, method="POST") + with urllib.request.urlopen(req, timeout=10) as resp: + return resp.status < 400 + except Exception: + return False diff --git a/tests/test_webhooks.py b/tests/test_webhooks.py new file mode 100644 index 0000000..feae36e --- /dev/null +++ b/tests/test_webhooks.py @@ -0,0 +1,287 @@ +"""Tests for arbiter.webhooks — webhook notification module.""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from arbiter.webhooks import ( + WebhookPayload, + detect_format, + format_discord, + format_slack, + send_webhook, +) + + +def _make_payload(**overrides) -> WebhookPayload: + defaults = { + "repo": "test-repo", + "score": 82.5, + "grade": "B+", + "findings": 42, + "loc": 5000, + "timestamp": "2026-04-18T12:00:00Z", + "dimensions": {"lint": 90.0, "security": 85.0, "complexity": 72.5}, + "url": "https://github.com/owner/test-repo", + } + defaults.update(overrides) + return WebhookPayload(**defaults) + + +# --- Payload serialization --- + +class TestPayloadSerialization: + def test_to_json_roundtrip(self): + p = _make_payload() + data = json.loads(p.to_json()) + assert data["repo"] == "test-repo" + assert data["score"] == 82.5 + assert data["grade"] == "B+" + assert data["findings"] == 42 + assert data["loc"] == 5000 + assert data["dimensions"]["lint"] == 90.0 + + def test_to_dict(self): + p = _make_payload() + d = p.to_dict() + assert isinstance(d, dict) + assert d["repo"] == "test-repo" + assert d["url"] == "https://github.com/owner/test-repo" + + def test_default_dimensions_and_url(self): + p = WebhookPayload( + repo="x", score=50.0, grade="C", findings=10, + loc=100, timestamp="now", + ) + assert p.dimensions == {} + assert p.url == "" + + +# --- HMAC signing --- + +class TestHMACSigning: + def test_hmac_signature_matches(self): + p = _make_payload() + secret = "test-secret-key" + body = json.dumps(p.to_dict()).encode("utf-8") + expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + + # send_webhook should produce the same sig; test the internal path + from arbiter.webhooks import _sign_payload + assert _sign_payload(body, secret) == expected + + def test_different_secrets_produce_different_sigs(self): + body = b'{"repo":"test"}' + from arbiter.webhooks import _sign_payload + sig1 = _sign_payload(body, "secret-a") + sig2 = _sign_payload(body, "secret-b") + assert sig1 != sig2 + + +# --- Slack format --- + +class TestSlackFormat: + def test_has_blocks(self): + msg = format_slack(_make_payload()) + assert "blocks" in msg + assert len(msg["blocks"]) >= 3 + + def test_header_contains_repo(self): + msg = format_slack(_make_payload()) + header = msg["blocks"][0] + assert header["type"] == "header" + assert "test-repo" in header["text"]["text"] + + def test_fields_contain_score_and_grade(self): + msg = format_slack(_make_payload()) + section = msg["blocks"][1] + field_texts = [f["text"] for f in section["fields"]] + assert any("82.5" in t for t in field_texts) + assert any("B+" in t for t in field_texts) + + def test_url_block_present_when_url_set(self): + msg = format_slack(_make_payload(url="https://github.com/x/y")) + texts = json.dumps(msg) + assert "View Repository" in texts + + def test_no_url_block_when_empty(self): + msg = format_slack(_make_payload(url="")) + texts = json.dumps(msg) + assert "View Repository" not in texts + + def test_dimension_text(self): + msg = format_slack(_make_payload()) + texts = json.dumps(msg) + assert "Lint" in texts + assert "Security" in texts + + +# --- Discord format --- + +class TestDiscordFormat: + def test_has_embeds(self): + msg = format_discord(_make_payload()) + assert "embeds" in msg + assert len(msg["embeds"]) == 1 + + def test_embed_title(self): + msg = format_discord(_make_payload()) + assert "test-repo" in msg["embeds"][0]["title"] + + def test_embed_color_for_grades(self): + a_msg = format_discord(_make_payload(grade="A")) + b_msg = format_discord(_make_payload(grade="B")) + f_msg = format_discord(_make_payload(grade="F")) + assert a_msg["embeds"][0]["color"] == 0x2ECC71 + assert b_msg["embeds"][0]["color"] == 0x3498DB + assert f_msg["embeds"][0]["color"] == 0xE74C3C + + def test_embed_fields(self): + msg = format_discord(_make_payload()) + fields = msg["embeds"][0]["fields"] + names = [f["name"] for f in fields] + assert "Score" in names + assert "Grade" in names + assert "Findings" in names + assert "LOC" in names + + def test_dimensions_as_fields(self): + msg = format_discord(_make_payload()) + fields = msg["embeds"][0]["fields"] + names = [f["name"] for f in fields] + assert "Lint" in names + assert "Security" in names + assert "Complexity" in names + + def test_url_in_embed_when_set(self): + msg = format_discord(_make_payload(url="https://github.com/x/y")) + assert msg["embeds"][0]["url"] == "https://github.com/x/y" + + def test_no_url_when_empty(self): + msg = format_discord(_make_payload(url="")) + assert "url" not in msg["embeds"][0] + + +# --- Format detection --- + +class TestDetectFormat: + def test_slack_url(self): + assert detect_format("https://hooks.slack.com/services/T0/B0/xxx") == "slack" + + def test_discord_url(self): + assert detect_format("https://discord.com/api/webhooks/123/abc") == "discord" + + def test_discordapp_url(self): + assert detect_format("https://discordapp.com/api/webhooks/123/abc") == "discord" + + def test_generic_url(self): + assert detect_format("https://example.com/webhook") == "generic" + + +# --- send_webhook --- + +class TestSendWebhook: + @patch("arbiter.webhooks.urllib.request.urlopen") + def test_send_success(self, mock_urlopen): + mock_resp = MagicMock() + mock_resp.status = 200 + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + mock_urlopen.return_value = mock_resp + + result = send_webhook("https://hooks.slack.com/services/T/B/x", _make_payload()) + assert result is True + mock_urlopen.assert_called_once() + + # Verify the request was JSON + req = mock_urlopen.call_args[0][0] + assert req.get_header("Content-type") == "application/json" + + @patch("arbiter.webhooks.urllib.request.urlopen") + def test_send_with_hmac_header(self, mock_urlopen): + mock_resp = MagicMock() + mock_resp.status = 200 + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + mock_urlopen.return_value = mock_resp + + result = send_webhook( + "https://example.com/hook", _make_payload(), secret="my-secret" + ) + assert result is True + + req = mock_urlopen.call_args[0][0] + sig = req.get_header("X-arbiter-signature") + assert sig is not None + assert len(sig) == 64 # SHA256 hex digest + + @patch("arbiter.webhooks.urllib.request.urlopen") + def test_send_failure_returns_false(self, mock_urlopen): + mock_urlopen.side_effect = urllib_timeout_error() + result = send_webhook("https://example.com/hook", _make_payload()) + assert result is False + + @patch("arbiter.webhooks.urllib.request.urlopen") + def test_send_http_error_returns_false(self, mock_urlopen): + import urllib.error + mock_urlopen.side_effect = urllib.error.HTTPError( + "https://example.com", 500, "Server Error", {}, None + ) + result = send_webhook("https://example.com/hook", _make_payload()) + assert result is False + + @patch("arbiter.webhooks.urllib.request.urlopen") + def test_discord_format_used_for_discord_url(self, mock_urlopen): + mock_resp = MagicMock() + mock_resp.status = 200 + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + mock_urlopen.return_value = mock_resp + + send_webhook("https://discord.com/api/webhooks/123/abc", _make_payload()) + req = mock_urlopen.call_args[0][0] + body = json.loads(req.data) + assert "embeds" in body + + @patch("arbiter.webhooks.urllib.request.urlopen") + def test_generic_format_for_unknown_url(self, mock_urlopen): + mock_resp = MagicMock() + mock_resp.status = 200 + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + mock_urlopen.return_value = mock_resp + + send_webhook("https://example.com/hook", _make_payload()) + req = mock_urlopen.call_args[0][0] + body = json.loads(req.data) + assert "repo" in body # raw payload dict + + +def urllib_timeout_error(): + """Create a urllib timeout exception.""" + import socket + return socket.timeout("timed out") + + +# --- CLI webhook-test help --- + +class TestWebhookTestHelp: + def test_webhook_test_subcommand_exists(self): + """Verify webhook-test subcommand is registered in argparse.""" + import sys + result = subprocess.run( + [sys.executable, "-m", "arbiter", "webhook-test", "--help"], + capture_output=True, text=True, timeout=10, + cwd=str(Path(__file__).parent.parent), + env={"PYTHONPATH": "src", "PATH": "/usr/bin:/bin"}, + ) + # Should show help, not error + assert result.returncode == 0 + assert "webhook" in result.stdout.lower() or "url" in result.stdout.lower()