Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions django-backend/soroscan/ingest/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import hashlib
import hmac
import json
import logging
from functools import wraps

from rest_framework import status
from rest_framework.response import Response

from .models import WebhookSubscription

logger = logging.getLogger(__name__)


def webhook_hmac_required(header_name="X-SoroScan-Signature"):
"""
DRF view decorator that validates an incoming webhook's HMAC signature.

The decorator expects the JSON payload to contain a 'contract_id' field,
which is used to look up the corresponding WebhookSubscription(s) and their secrets.
The signature is verified against the serialized JSON body using hmac.compare_digest.
"""

def decorator(view_func):
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
signature_header = request.headers.get(header_name)
if not signature_header:
logger.warning("Missing webhook signature header: %s", header_name)
return Response(
{"detail": f"Missing signature header {header_name}"},
status=status.HTTP_401_UNAUTHORIZED,
)

if "=" not in signature_header:
logger.warning("Invalid signature header format: %s", signature_header)
return Response(
{"detail": "Invalid signature header format"},
status=status.HTTP_401_UNAUTHORIZED,
)

prefix, sig_hex = signature_header.split("=", 1)
prefix = prefix.lower()

try:
# DRF parses the JSON body into request.data
payload = request.data
contract_id = payload.get("contract_id")
except Exception:
logger.warning("Failed to parse request data for HMAC validation")
return Response(
{"detail": "Invalid JSON payload"},
status=status.HTTP_400_BAD_REQUEST,
)

if not contract_id:
logger.warning("Missing contract_id in webhook payload")
return Response(
{"detail": "Missing contract_id in payload"},
status=status.HTTP_400_BAD_REQUEST,
)

# Find active subscriptions for this contract
subscriptions = WebhookSubscription.objects.filter(
contract__contract_id=contract_id, is_active=True
)

if not subscriptions.exists():
logger.warning(
"No active webhook subscription found for contract: %s", contract_id
)
return Response(
{"detail": "Subscription not found or inactive"},
status=status.HTTP_401_UNAUTHORIZED,
)

# Re-serialize exactly as the dispatcher does (sort_keys=True, utf-8)
# This ensures we are testing against the same byte sequence used for signing.
try:
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
except (TypeError, ValueError) as exc:
logger.error("Failed to re-serialize payload for HMAC check: %s", exc)
return Response(
{"detail": "Payload serialization failed"},
status=status.HTTP_400_BAD_REQUEST,
)

if prefix == "sha256":
digestmod = hashlib.sha256
elif prefix == "sha1":
digestmod = hashlib.sha1
else:
logger.warning("Unsupported signature algorithm: %s", prefix)
return Response(
{"detail": f"Unsupported signature algorithm: {prefix}"},
status=status.HTTP_401_UNAUTHORIZED,
)

verified = False
for sub in subscriptions:
# Verify secret exists
if not sub.secret:
continue

expected_sig = hmac.new(
sub.secret.encode("utf-8"),
msg=payload_bytes,
digestmod=digestmod,
).hexdigest()

if hmac.compare_digest(expected_sig, sig_hex):
verified = True
request.webhook_subscription = sub
break

if not verified:
logger.warning("HMAC signature mismatch for contract: %s", contract_id)
return Response(
{"detail": "Invalid HMAC signature"},
status=status.HTTP_401_UNAUTHORIZED,
)

return view_func(request, *args, **kwargs)

return _wrapped_view

return decorator
113 changes: 113 additions & 0 deletions django-backend/soroscan/ingest/tests/test_decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import hashlib
import hmac
import json

import pytest
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APIClient

from soroscan.ingest.tests.factories import (
TrackedContractFactory,
WebhookSubscriptionFactory,
)


@pytest.fixture
def api_client():
return APIClient()


@pytest.mark.django_db
class TestWebhookHMACDecorator:
def setup_method(self):
self.contract = TrackedContractFactory()
self.webhook = WebhookSubscriptionFactory(
contract=self.contract,
secret="test-secret-123",
signature_algorithm="sha256",
)
self.url = reverse("webhook-receiver-example")

def test_valid_sha256_signature(self, api_client):
payload = {
"contract_id": self.contract.contract_id,
"event_type": "transfer",
"payload": {"amount": 100},
}
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
sig = hmac.new(b"test-secret-123", payload_bytes, hashlib.sha256).hexdigest()

headers = {"X-SoroScan-Signature": f"sha256={sig}"}
response = api_client.post(self.url, payload, format="json", headers=headers)

assert response.status_code == status.HTTP_200_OK
assert response.data["status"] == "verified"

def test_valid_sha1_signature(self, api_client):
self.webhook.signature_algorithm = "sha1"
self.webhook.save()

payload = {
"contract_id": self.contract.contract_id,
"event_type": "transfer",
"payload": {"amount": 100},
}
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
sig = hmac.new(b"test-secret-123", payload_bytes, hashlib.sha1).hexdigest()

headers = {"X-SoroScan-Signature": f"sha1={sig}"}
response = api_client.post(self.url, payload, format="json", headers=headers)

assert response.status_code == status.HTTP_200_OK
assert response.data["status"] == "verified"

def test_invalid_signature(self, api_client):
payload = {"contract_id": self.contract.contract_id, "event_type": "transfer"}
headers = {"X-SoroScan-Signature": "sha256=invalid-sig"}
response = api_client.post(self.url, payload, format="json", headers=headers)

assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert "Invalid HMAC signature" in response.data["detail"]

def test_missing_signature_header(self, api_client):
payload = {"contract_id": self.contract.contract_id}
response = api_client.post(self.url, payload, format="json")

assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert "Missing signature header" in response.data["detail"]

def test_missing_contract_id(self, api_client):
payload = {"event_type": "transfer"}
headers = {"X-SoroScan-Signature": "sha256=some-sig"}
response = api_client.post(self.url, payload, format="json", headers=headers)

assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "Missing contract_id" in response.data["detail"]

def test_inactive_subscription(self, api_client):
self.webhook.is_active = False
self.webhook.save()

payload = {"contract_id": self.contract.contract_id}
headers = {"X-SoroScan-Signature": "sha256=some-sig"}
response = api_client.post(self.url, payload, format="json", headers=headers)

assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert "Subscription not found or inactive" in response.data["detail"]

def test_multiple_subscriptions_one_matches(self, api_client):
# Create another webhook for same contract with different secret
WebhookSubscriptionFactory(
contract=self.contract, secret="other-secret", signature_algorithm="sha256"
)

payload = {"contract_id": self.contract.contract_id, "event_type": "transfer"}
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
# Sign with the first secret
sig = hmac.new(b"test-secret-123", payload_bytes, hashlib.sha256).hexdigest()

headers = {"X-SoroScan-Signature": f"sha256={sig}"}
response = api_client.post(self.url, payload, format="json", headers=headers)

assert response.status_code == status.HTTP_200_OK
13 changes: 12 additions & 1 deletion django-backend/soroscan/ingest/urls.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
URL patterns for SoroScan ingest API.
"""

from django.urls import include, path
from rest_framework.routers import DefaultRouter

Expand All @@ -23,6 +24,7 @@
health_check,
record_event_view,
restore_archived_events,
webhook_receiver_example,
transaction_events_view,
vulnerability_impact_view,
)
Expand All @@ -36,7 +38,11 @@
router.register(r"teams", TeamViewSet, basename="team")

urlpatterns = [
path("contracts/<str:contract_id>/timeline/", contract_timeline_view, name="contract-timeline"),
path(
"contracts/<str:contract_id>/timeline/",
contract_timeline_view,
name="contract-timeline",
),
path(
"contracts/<str:contract_id>/events/explorer/",
contract_event_explorer_view,
Expand Down Expand Up @@ -64,6 +70,11 @@
path("events/restore-archive/", restore_archived_events, name="restore-archive"),
path("audit-trail/", audit_trail_view, name="audit-trail"),
path("admin/ingest-errors/", admin_ingest_errors_view, name="admin-ingest-errors"),
path(
"webhooks/receiver-example/",
webhook_receiver_example,
name="webhook-receiver-example",
),
path(
"admin/organization-costs/",
organization_cost_breakdown_view,
Expand Down
Loading
Loading