Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/arbiter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@
_SEVERITY_RANK = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}


def _maybe_send_webhook(args: argparse.Namespace, payload_kwargs: dict) -> None:
"""If --webhook is set, build a WebhookPayload and send it."""
webhook_url = getattr(args, "webhook", None)
if not webhook_url:
return
from arbiter.webhooks import WebhookPayload, send_webhook
secret = getattr(args, "webhook_secret", "") or ""
payload = WebhookPayload(**payload_kwargs)
ok = send_webhook(webhook_url, payload, secret=secret)
if ok:
print("Webhook delivered successfully.", file=sys.stderr)
else:
print("WARNING: Webhook delivery failed.", file=sys.stderr)


def _apply_noise_filter(findings: list[Finding], threshold: int | None) -> list[Finding]:
"""Apply noise filter if threshold is set. Prints summary and returns filtered findings."""
if threshold is None:
Expand Down Expand Up @@ -298,6 +313,16 @@ def cmd_score(args: argparse.Namespace) -> None:
print(f"Score: n/a ({score.grade}) — no scorable Python LOC | Findings: {score.total_findings} | LOC: {loc:,}")
_print_footer()

_maybe_send_webhook(args, {
"repo": repo_path.name,
"score": score.overall if score.is_scorable else 0.0,
"grade": score.grade,
"findings": score.total_findings,
"loc": loc,
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"dimensions": {"lint": score.lint_score, "security": score.security_score, "complexity": score.complexity_score},
})

if cfg.fail_under is not None and score.is_scorable and score.overall < cfg.fail_under:
print(f"FAIL: Score {score.overall:.1f} is below threshold {cfg.fail_under}", file=sys.stderr)
sys.exit(1)
Expand Down Expand Up @@ -434,6 +459,16 @@ def cmd_certify(args: argparse.Namespace) -> None:
)
print(f"Recorded in audit trail: {args.trail}", file=sys.stderr)

_maybe_send_webhook(args, {
"repo": repo_path.name,
"score": result.overall,
"grade": result.decision,
"findings": result.findings_count,
"loc": loc,
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"dimensions": {"code": result.code_score, "governance": result.governance_score, "dependencies": result.dep_score},
})


def cmd_audit_trail(args: argparse.Namespace) -> None:
"""Manage the VERUM-aligned audit trail."""
Expand Down Expand Up @@ -1458,6 +1493,17 @@ def cmd_score_url(args: argparse.Namespace) -> None:
print(f"Score: n/a ({score.grade}) — no scorable Python LOC | Findings: {score.total_findings} | LOC: {loc:,}")
_print_footer()

_maybe_send_webhook(args, {
"repo": repo_name,
"score": score.overall if score.is_scorable else 0.0,
"grade": score.grade,
"findings": score.total_findings,
"loc": loc,
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"dimensions": {"lint": score.lint_score, "security": score.security_score, "complexity": score.complexity_score},
"url": args.url,
})

if cfg.fail_under is not None and score.is_scorable and score.overall < cfg.fail_under:
print(f"FAIL: Score {score.overall:.1f} is below threshold {cfg.fail_under}", file=sys.stderr)
sys.exit(1)
Expand Down Expand Up @@ -1991,6 +2037,41 @@ def cmd_compliance(args: argparse.Namespace) -> None:
print(report)


def cmd_webhook_test(args: argparse.Namespace) -> None:
"""Send a test webhook payload to verify connectivity."""
from arbiter.webhooks import WebhookPayload, detect_format, format_slack, format_discord, send_webhook

fmt = args.format or detect_format(args.url)
payload = WebhookPayload(
repo="arbiter-test",
score=85.0,
grade="B+",
findings=12,
loc=3500,
timestamp=datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
dimensions={"lint": 90.0, "security": 85.0, "complexity": 80.0},
url="https://github.com/arbiter/test",
)

if args.dry_run:
if fmt == "slack":
data = format_slack(payload)
elif fmt == "discord":
data = format_discord(payload)
else:
data = payload.to_dict()
print(json.dumps(data, indent=2))
return

secret = args.secret or ""
ok = send_webhook(args.url, payload, secret=secret)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Apply --format override when sending webhook-test payload

The webhook-test --format flag is documented as a forced format, but in non---dry-run mode the code always calls send_webhook(args.url, ...), which re-detects format from URL and ignores the user-provided override; this breaks testing for Slack/Discord payloads on custom domains (or tunneled endpoints) and can print a misleading success message with the requested format even when a different payload was sent.

Useful? React with 👍 / 👎.

if ok:
print(f"Test webhook delivered to {fmt} endpoint.")
else:
print(f"FAILED: Could not deliver test webhook to {args.url}", file=sys.stderr)
sys.exit(1)


def main() -> None:
parser = argparse.ArgumentParser(description="Arbiter -- Agent-aware code quality system")
parser.add_argument("--db", help="Path to SQLite database (default: arbiter_data.db)")
Expand All @@ -2012,6 +2093,8 @@ def main() -> None:
p_score.add_argument("--fail-under", type=float, default=None, help="Exit non-zero if score is below this threshold")
p_score.add_argument("--noise-threshold", type=int, default=None, help="Cap findings per rule_id at this number (default: disabled)")
p_score.add_argument("--profile", choices=list_profiles(), help="Scoring profile preset (default, enterprise, startup, oss, strict)")
p_score.add_argument("--webhook", type=str, default=None, help="Webhook URL to POST score results to")
p_score.add_argument("--webhook-secret", type=str, default="", help="HMAC-SHA256 secret for webhook signing")

# compare
p_compare = subparsers.add_parser("compare", help="Score two repos side-by-side and compare")
Expand All @@ -2031,6 +2114,8 @@ def main() -> None:
p_score_url.add_argument("--no-cleanup", action="store_true", help="Keep cloned repo after scoring")
p_score_url.add_argument("--noise-threshold", type=int, default=None, help="Cap findings per rule_id at this number (default: disabled)")
p_score_url.add_argument("--profile", choices=list_profiles(), help="Scoring profile preset (default, enterprise, startup, oss, strict)")
p_score_url.add_argument("--webhook", type=str, default=None, help="Webhook URL to POST score results to")
p_score_url.add_argument("--webhook-secret", type=str, default="", help="HMAC-SHA256 secret for webhook signing")

# diff
p_diff = subparsers.add_parser("diff", help="Score only files changed since base branch")
Expand Down Expand Up @@ -2073,6 +2158,8 @@ def main() -> None:
p_certify.add_argument("--trail", default="arbiter_audit.jsonl", help="Audit trail path")
p_certify.add_argument("--no-audit", action="store_true", help="Skip audit trail recording")
p_certify.add_argument("--fail-on-failed", action="store_true", help="Exit non-zero if FAILED")
p_certify.add_argument("--webhook", type=str, default=None, help="Webhook URL to POST certification results to")
p_certify.add_argument("--webhook-secret", type=str, default="", help="HMAC-SHA256 secret for webhook signing")

# audit-trail
p_audit = subparsers.add_parser("audit-trail",
Expand Down Expand Up @@ -2297,6 +2384,14 @@ def main() -> None:
p_compliance.add_argument("--noise-threshold", type=int, default=None, help="Suppress rules with more findings than this")
p_compliance.add_argument("--profile", help=f"Scoring profile ({', '.join(SCORING_PROFILES)})")

# webhook-test
p_wh = subparsers.add_parser("webhook-test", help="Send a test webhook payload to verify connectivity")
p_wh.add_argument("url", help="Webhook URL to test")
p_wh.add_argument("--format", choices=["slack", "discord", "generic"], default=None,
help="Force output format (auto-detected from URL if omitted)")
p_wh.add_argument("--secret", type=str, default="", help="HMAC-SHA256 secret for signing")
p_wh.add_argument("--dry-run", action="store_true", help="Print formatted payload without sending")

args = parser.parse_args()

commands = {
Expand Down Expand Up @@ -2338,6 +2433,7 @@ def main() -> None:
"profiles": cmd_profiles,
"team": cmd_team,
"compliance": cmd_compliance,
"webhook-test": cmd_webhook_test,
}

handler = commands.get(args.command)
Expand Down
179 changes: 179 additions & 0 deletions src/arbiter/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Webhook notification module — send score results to external services.

Supports Slack (Block Kit) and Discord (embeds) with auto-detection from URL.
Optional HMAC-SHA256 signing for payload verification.
"""

from __future__ import annotations

import hashlib
import hmac
import json
import urllib.request
import urllib.error
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone


@dataclass
class WebhookPayload:
"""Score result payload for webhook delivery."""

repo: str
score: float
grade: str
findings: int
loc: int
timestamp: str
dimensions: dict = field(default_factory=dict)
url: str = ""

def to_json(self) -> str:
"""Serialize to JSON string."""
return json.dumps(asdict(self), indent=2)

def to_dict(self) -> dict:
"""Serialize to plain dict."""
return asdict(self)


def _grade_emoji(grade: str) -> str:
"""Map letter grade to emoji for chat formatting."""
return {
"A+": "\u2b50", "A": "\u2705", "A-": "\u2705",
"B+": "\U0001f7e2", "B": "\U0001f7e2", "B-": "\U0001f7e1",
"C+": "\U0001f7e1", "C": "\U0001f7e0", "C-": "\U0001f7e0",
"D": "\U0001f534", "F": "\u274c", "N/A": "\u2753",
}.get(grade, "\u2753")


def _grade_color(grade: str) -> int:
"""Map letter grade to Discord embed color (decimal)."""
if grade.startswith("A"):
return 0x2ECC71 # green
if grade.startswith("B"):
return 0x3498DB # blue
if grade.startswith("C"):
return 0xF39C12 # orange
return 0xE74C3C # red


def format_slack(payload: WebhookPayload) -> dict:
"""Format payload as Slack Block Kit message."""
emoji = _grade_emoji(payload.grade)
dims = payload.dimensions
dim_parts = [f"*{k.title()}*: {v}" for k, v in dims.items()] if dims else []
dim_text = " | ".join(dim_parts) if dim_parts else "No dimension breakdown"

blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} Arbiter Score: {payload.repo}",
},
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Score:* {payload.score:.1f}"},
{"type": "mrkdwn", "text": f"*Grade:* {payload.grade}"},
{"type": "mrkdwn", "text": f"*Findings:* {payload.findings:,}"},
{"type": "mrkdwn", "text": f"*LOC:* {payload.loc:,}"},
],
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": dim_text},
},
{
"type": "context",
"elements": [
{"type": "mrkdwn", "text": f"Scored at {payload.timestamp} by Arbiter"},
],
},
]

if payload.url:
blocks.insert(
-1,
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"<{payload.url}|View Repository>"},
},
)

return {"blocks": blocks}


def format_discord(payload: WebhookPayload) -> dict:
"""Format payload as Discord embed."""
emoji = _grade_emoji(payload.grade)
color = _grade_color(payload.grade)

fields = [
{"name": "Score", "value": f"{payload.score:.1f}", "inline": True},
{"name": "Grade", "value": f"{emoji} {payload.grade}", "inline": True},
{"name": "Findings", "value": f"{payload.findings:,}", "inline": True},
{"name": "LOC", "value": f"{payload.loc:,}", "inline": True},
]

for k, v in payload.dimensions.items():
fields.append({"name": k.title(), "value": str(v), "inline": True})

embed: dict = {
"title": f"Arbiter Score: {payload.repo}",
"color": color,
"fields": fields,
"footer": {"text": f"Scored at {payload.timestamp} by Arbiter"},
}

if payload.url:
embed["url"] = payload.url

return {"embeds": [embed]}


def detect_format(url: str) -> str:
"""Auto-detect webhook service from URL pattern.

Returns 'slack', 'discord', or 'generic'.
"""
if "hooks.slack.com" in url or "slack.com/api" in url:
return "slack"
if "discord.com/api/webhooks" in url or "discordapp.com/api/webhooks" in url:
return "discord"
return "generic"


def _sign_payload(body: bytes, secret: str) -> str:
"""Compute HMAC-SHA256 signature for the payload body."""
return hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()


def send_webhook(url: str, payload: WebhookPayload, secret: str = "") -> bool:
"""Send score notification to a webhook URL.

Auto-detects Slack vs Discord from URL and formats accordingly.
Returns True on success, False on any failure. Never raises.
"""
try:
fmt = detect_format(url)
if fmt == "slack":
data = format_slack(payload)
elif fmt == "discord":
data = format_discord(payload)
else:
data = payload.to_dict()

body = json.dumps(data).encode("utf-8")

headers = {"Content-Type": "application/json"}
if secret:
headers["X-Arbiter-Signature"] = _sign_payload(body, secret)

req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=10) as resp:
return resp.status < 400
except Exception:
return False
Loading
Loading