diff --git a/django-backend/soroscan/ingest/admin.py b/django-backend/soroscan/ingest/admin.py index 7f323bf5..79f7ebae 100644 --- a/django-backend/soroscan/ingest/admin.py +++ b/django-backend/soroscan/ingest/admin.py @@ -11,9 +11,10 @@ import json from .models import ( + AdminAction, + AdminAuditLog, AlertExecution, AlertRule, - AdminAction, APIKey, ArchivalAuditLog, ArchivedEventBatch, @@ -81,18 +82,44 @@ def _normalize_changes(self, message): return {"message": str(message)} def _audit(self, request, obj, action: str, message) -> None: + user = request.user if getattr(request.user, "is_authenticated", False) else None + ip = self._client_ip(request) + changes = self._normalize_changes(message) + content_type = f"{obj._meta.app_label}.{obj._meta.model_name}" + object_id = str(getattr(obj, "pk", "")) + object_repr = str(obj)[:200] + try: AdminAction.objects.create( - user=request.user if getattr(request.user, "is_authenticated", False) else None, + user=user, action=action, object_type=obj._meta.model_name[:32], - object_id=str(getattr(obj, "pk", "")), - ip_address=self._client_ip(request), - changes=self._normalize_changes(message), + object_id=object_id, + ip_address=ip, + changes=changes, ) except Exception: - # Audit failures should not block admin operations. - return + pass + + try: + # Map legacy action names to AdminAuditLog choices + audit_action = { + "add": AdminAuditLog.ACTION_CREATE, + "change": AdminAuditLog.ACTION_UPDATE, + "delete": AdminAuditLog.ACTION_DELETE, + }.get(action, action) + AdminAuditLog.objects.create( + user=user, + action=audit_action, + object_repr=object_repr, + object_id=object_id, + content_type=content_type, + ip_address=ip, + changes=changes, + ) + except Exception: + # Audit failures should never block admin operations. + pass def log_addition(self, request, obj, message): super().log_addition(request, obj, message) @@ -1213,3 +1240,59 @@ class ContractABIVersionAdmin(admin.ModelAdmin): list_filter = ["has_breaking_changes", "created_at"] search_fields = ["contract__contract_id", "contract__name"] readonly_fields = ["created_at"] + + +# --------------------------------------------------------------------------- +# AdminAuditLog — read-only view of all admin CRUD actions +# --------------------------------------------------------------------------- + +@admin.register(AdminAuditLog) +class AdminAuditLogAdmin(admin.ModelAdmin): + """Read-only admin view for the AdminAuditLog audit trail.""" + + list_display = [ + "timestamp", + "action_colored", + "content_type", + "object_id", + "object_repr", + "user", + "ip_address", + ] + list_filter = ["action", "content_type", "timestamp"] + search_fields = ["object_id", "object_repr", "user__username", "ip_address", "content_type"] + readonly_fields = [ + "user", + "action", + "object_repr", + "object_id", + "content_type", + "changes", + "ip_address", + "timestamp", + ] + ordering = ["-timestamp"] + date_hierarchy = "timestamp" + + def has_add_permission(self, request): + return False + + def has_change_permission(self, request, obj=None): + return False + + def has_delete_permission(self, request, obj=None): + return False + + @admin.display(description="Action") + def action_colored(self, obj): + colors = { + AdminAuditLog.ACTION_CREATE: "#28a745", + AdminAuditLog.ACTION_UPDATE: "#007bff", + AdminAuditLog.ACTION_DELETE: "#dc3545", + } + color = colors.get(obj.action, "#6c757d") + return format_html( + '{}', + color, + obj.get_action_display(), + ) diff --git a/django-backend/soroscan/ingest/migrations/0037_adminauditlog.py b/django-backend/soroscan/ingest/migrations/0037_adminauditlog.py new file mode 100644 index 00000000..852d45a9 --- /dev/null +++ b/django-backend/soroscan/ingest/migrations/0037_adminauditlog.py @@ -0,0 +1,123 @@ +""" +Migration: add AdminAuditLog model for tracking Django Admin CRUD actions. +""" +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("ingest", "0036_trackedcontract_network"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name="AdminAuditLog", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "action", + models.CharField( + choices=[ + ("create", "Create"), + ("update", "Update"), + ("delete", "Delete"), + ], + db_index=True, + help_text="Type of admin action performed", + max_length=16, + ), + ), + ( + "object_repr", + models.CharField( + help_text="String representation of the affected object", + max_length=200, + ), + ), + ( + "object_id", + models.CharField( + db_index=True, + help_text="Primary key of the affected object", + max_length=255, + ), + ), + ( + "content_type", + models.CharField( + db_index=True, + help_text="app_label.model_name of the affected object", + max_length=100, + ), + ), + ( + "changes", + models.JSONField( + default=dict, + help_text="Field-level changes: {field: [old, new]} for updates", + ), + ), + ( + "ip_address", + models.GenericIPAddressField( + blank=True, + help_text="IP address of the admin user", + null=True, + ), + ), + ( + "timestamp", + models.DateTimeField(auto_now_add=True, db_index=True), + ), + ( + "user", + models.ForeignKey( + blank=True, + help_text="Admin user who performed the action", + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="admin_audit_logs", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "verbose_name": "Admin Audit Log", + "verbose_name_plural": "Admin Audit Logs", + "ordering": ["-timestamp"], + }, + ), + migrations.AddIndex( + model_name="adminauditlog", + index=models.Index( + fields=["action", "timestamp"], + name="ingest_adminauditlog_action_ts_idx", + ), + ), + migrations.AddIndex( + model_name="adminauditlog", + index=models.Index( + fields=["content_type", "object_id"], + name="ingest_adminauditlog_ct_obj_idx", + ), + ), + migrations.AddIndex( + model_name="adminauditlog", + index=models.Index( + fields=["user", "timestamp"], + name="ingest_adminauditlog_user_ts_idx", + ), + ), + ] diff --git a/django-backend/soroscan/ingest/models.py b/django-backend/soroscan/ingest/models.py index ee7c9125..06e1cbc9 100644 --- a/django-backend/soroscan/ingest/models.py +++ b/django-backend/soroscan/ingest/models.py @@ -2105,3 +2105,87 @@ class Meta: def __str__(self): return f"ABI v{self.version_number} for {self.contract.contract_id[:8]}... (ledger {self.valid_from_ledger}–{self.valid_to_ledger or '∞'})" + + +# --------------------------------------------------------------------------- +# AdminAuditLog: dedicated audit trail for Django Admin CRUD actions +# --------------------------------------------------------------------------- + +class AdminAuditLog(models.Model): + """ + Immutable audit trail for every create, update, and delete action + performed through the Django Admin interface. + + Records are append-only: save() blocks updates and delete() is blocked + entirely to preserve the integrity of the audit trail. + """ + + ACTION_CREATE = "create" + ACTION_UPDATE = "update" + ACTION_DELETE = "delete" + ACTION_CHOICES = [ + (ACTION_CREATE, "Create"), + (ACTION_UPDATE, "Update"), + (ACTION_DELETE, "Delete"), + ] + + user = models.ForeignKey( + User, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="admin_audit_logs", + help_text="Admin user who performed the action", + ) + action = models.CharField( + max_length=16, + choices=ACTION_CHOICES, + db_index=True, + help_text="Type of admin action performed", + ) + object_repr = models.CharField( + max_length=200, + help_text="String representation of the affected object", + ) + object_id = models.CharField( + max_length=255, + db_index=True, + help_text="Primary key of the affected object", + ) + content_type = models.CharField( + max_length=100, + db_index=True, + help_text="app_label.model_name of the affected object", + ) + changes = models.JSONField( + default=dict, + help_text="Field-level changes: {field: [old, new]} for updates", + ) + ip_address = models.GenericIPAddressField( + null=True, + blank=True, + help_text="IP address of the admin user", + ) + timestamp = models.DateTimeField(auto_now_add=True, db_index=True) + + class Meta: + ordering = ["-timestamp"] + verbose_name = "Admin Audit Log" + verbose_name_plural = "Admin Audit Logs" + indexes = [ + models.Index(fields=["action", "timestamp"], name="ingest_adminauditlog_action_ts_idx"), + models.Index(fields=["content_type", "object_id"], name="ingest_adminauditlog_ct_obj_idx"), + models.Index(fields=["user", "timestamp"], name="ingest_adminauditlog_user_ts_idx"), + ] + + def save(self, *args, **kwargs): + if self.pk: + raise ValidationError("AdminAuditLog is immutable and cannot be updated.") + super().save(*args, **kwargs) + + def delete(self, *args, **kwargs): + raise ValidationError("AdminAuditLog is immutable and cannot be deleted.") + + def __str__(self): + username = self.user.username if self.user else "anonymous" + return f"[{self.action}] {self.content_type}:{self.object_id} by {username} @ {self.timestamp}" diff --git a/django-backend/soroscan/ingest/tests/factories.py b/django-backend/soroscan/ingest/tests/factories.py index f194e22f..f8c1c864 100644 --- a/django-backend/soroscan/ingest/tests/factories.py +++ b/django-backend/soroscan/ingest/tests/factories.py @@ -3,6 +3,7 @@ from factory.django import DjangoModelFactory from soroscan.ingest.models import ( + AdminAuditLog, ContractABI, ContractEvent, ContractMetadata, @@ -152,3 +153,16 @@ class Meta: documentation_url = "" github_repo = "" team_email = "" + + +class AdminAuditLogFactory(DjangoModelFactory): + class Meta: + model = AdminAuditLog + + user = factory.SubFactory(UserFactory) + action = AdminAuditLog.ACTION_UPDATE + object_repr = factory.Sequence(lambda n: f"Object {n}") + object_id = factory.Sequence(lambda n: str(n)) + content_type = "ingest.trackedcontract" + changes = factory.LazyFunction(dict) + ip_address = "127.0.0.1" diff --git a/django-backend/soroscan/ingest/tests/test_admin_audit_log.py b/django-backend/soroscan/ingest/tests/test_admin_audit_log.py new file mode 100644 index 00000000..7ae950e8 --- /dev/null +++ b/django-backend/soroscan/ingest/tests/test_admin_audit_log.py @@ -0,0 +1,367 @@ +""" +Tests for AdminAuditLog model and admin integration. + +Covers: +- Model immutability (no updates, no deletes) +- AdminAuditMixin logs create/update/delete to AdminAuditLog +- AdminAuditLogAdmin is read-only (no add/change/delete permissions) +- Audit entries contain correct user, action, timestamp, object info +- IP address extraction (direct + forwarded) +- Anonymous user handling +- Factory smoke test +""" +import pytest +from django.contrib.admin.sites import AdminSite +from django.contrib.auth import get_user_model +from django.core.exceptions import ValidationError +from django.test import RequestFactory +from django.utils import timezone + +from soroscan.ingest.admin import AdminAuditLogAdmin, AdminAuditMixin +from soroscan.ingest.models import AdminAuditLog, TrackedContract +from soroscan.ingest.tests.factories import ( + AdminAuditLogFactory, + TrackedContractFactory, + UserFactory, +) + +User = get_user_model() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_request(rf, user=None, ip="10.0.0.1", forwarded_for=None): + """Build a minimal fake POST request with optional user and IP.""" + request = rf.post("/admin/") + request.user = user or _anon_user() + request.META["REMOTE_ADDR"] = ip + if forwarded_for: + request.META["HTTP_X_FORWARDED_FOR"] = forwarded_for + return request + + +def _anon_user(): + """Return a minimal anonymous-like user object.""" + return type("AnonUser", (), {"is_authenticated": False, "pk": None})() + + +def _staff_user(): + return UserFactory(is_staff=True) + + +# --------------------------------------------------------------------------- +# Model tests +# --------------------------------------------------------------------------- + +@pytest.mark.django_db +class TestAdminAuditLogModel: + def test_create_stores_all_fields(self): + user = UserFactory() + log = AdminAuditLog.objects.create( + user=user, + action=AdminAuditLog.ACTION_CREATE, + object_repr="TrackedContract object (1)", + object_id="1", + content_type="ingest.trackedcontract", + changes={"name": [None, "My Contract"]}, + ip_address="192.168.1.1", + ) + assert log.pk is not None + assert log.user == user + assert log.action == AdminAuditLog.ACTION_CREATE + assert log.object_repr == "TrackedContract object (1)" + assert log.object_id == "1" + assert log.content_type == "ingest.trackedcontract" + assert log.changes == {"name": [None, "My Contract"]} + assert log.ip_address == "192.168.1.1" + assert log.timestamp is not None + + def test_str_representation(self): + user = UserFactory(username="alice") + log = AdminAuditLog.objects.create( + user=user, + action=AdminAuditLog.ACTION_UPDATE, + object_repr="Contract", + object_id="42", + content_type="ingest.trackedcontract", + ) + assert "update" in str(log).lower() + assert "42" in str(log) + assert "alice" in str(log) + + def test_str_anonymous_user(self): + log = AdminAuditLog.objects.create( + user=None, + action=AdminAuditLog.ACTION_DELETE, + object_repr="Contract", + object_id="7", + content_type="ingest.trackedcontract", + ) + assert "anonymous" in str(log).lower() + + def test_immutable_update_raises(self): + log = AdminAuditLogFactory() + log.object_repr = "tampered" + with pytest.raises(ValidationError, match="immutable"): + log.save() + + def test_immutable_delete_raises(self): + log = AdminAuditLogFactory() + with pytest.raises(ValidationError, match="immutable"): + log.delete() + + def test_ordering_newest_first(self): + u = UserFactory() + log1 = AdminAuditLog.objects.create( + user=u, action=AdminAuditLog.ACTION_CREATE, + object_repr="A", object_id="1", content_type="ingest.trackedcontract", + ) + log2 = AdminAuditLog.objects.create( + user=u, action=AdminAuditLog.ACTION_UPDATE, + object_repr="B", object_id="2", content_type="ingest.trackedcontract", + ) + logs = list(AdminAuditLog.objects.all()) + assert logs[0].pk == log2.pk + assert logs[1].pk == log1.pk + + def test_null_user_allowed(self): + log = AdminAuditLog.objects.create( + user=None, + action=AdminAuditLog.ACTION_CREATE, + object_repr="X", + object_id="99", + content_type="ingest.trackedcontract", + ) + assert log.user is None + + def test_action_choices(self): + choices = dict(AdminAuditLog.ACTION_CHOICES) + assert "create" in choices + assert "update" in choices + assert "delete" in choices + + def test_factory_creates_valid_instance(self): + log = AdminAuditLogFactory() + assert log.pk is not None + assert log.action in dict(AdminAuditLog.ACTION_CHOICES) + + +# --------------------------------------------------------------------------- +# AdminAuditMixin tests +# --------------------------------------------------------------------------- + +class _ConcreteAdmin(AdminAuditMixin): + """Minimal concrete class to test the mixin in isolation.""" + + def log_addition(self, request, obj, message): + self._audit(request, obj, "add", message) + + def log_change(self, request, obj, message): + self._audit(request, obj, "change", message) + + def log_deletions(self, request, queryset): + for obj in list(queryset): + self._audit(request, obj, "delete", "Deleted via Django admin") + + +@pytest.mark.django_db +class TestAdminAuditMixin: + def setup_method(self): + self.mixin = _ConcreteAdmin() + self.rf = RequestFactory() + + def test_log_addition_creates_audit_log(self): + user = UserFactory() + contract = TrackedContractFactory() + request = _make_request(self.rf, user=user, ip="1.2.3.4") + + self.mixin.log_addition(request, contract, "Added contract") + + log = AdminAuditLog.objects.get( + content_type="ingest.trackedcontract", + object_id=str(contract.pk), + action=AdminAuditLog.ACTION_CREATE, + ) + assert log.user == user + assert log.ip_address == "1.2.3.4" + + def test_log_change_creates_audit_log(self): + user = UserFactory() + contract = TrackedContractFactory() + request = _make_request(self.rf, user=user, ip="5.6.7.8") + + self.mixin.log_change(request, contract, [{"changed": {"fields": ["name"]}}]) + + log = AdminAuditLog.objects.get( + content_type="ingest.trackedcontract", + object_id=str(contract.pk), + action=AdminAuditLog.ACTION_UPDATE, + ) + assert log.user == user + assert log.ip_address == "5.6.7.8" + + def test_log_deletions_creates_audit_log_per_object(self): + user = UserFactory() + c1 = TrackedContractFactory() + c2 = TrackedContractFactory() + request = _make_request(self.rf, user=user) + qs = TrackedContract.objects.filter(pk__in=[c1.pk, c2.pk]) + + self.mixin.log_deletions(request, qs) + + logs = AdminAuditLog.objects.filter(action=AdminAuditLog.ACTION_DELETE) + assert logs.count() == 2 + logged_ids = set(logs.values_list("object_id", flat=True)) + assert str(c1.pk) in logged_ids + assert str(c2.pk) in logged_ids + + def test_ip_from_forwarded_header(self): + user = UserFactory() + contract = TrackedContractFactory() + request = _make_request( + self.rf, user=user, ip="10.0.0.1", + forwarded_for="203.0.113.5, 10.0.0.1", + ) + + self.mixin.log_addition(request, contract, "Added") + + log = AdminAuditLog.objects.get( + content_type="ingest.trackedcontract", + object_id=str(contract.pk), + action=AdminAuditLog.ACTION_CREATE, + ) + assert log.ip_address == "203.0.113.5" + + def test_anonymous_user_stored_as_null(self): + contract = TrackedContractFactory() + request = _make_request(self.rf, user=_anon_user()) + + self.mixin.log_addition(request, contract, "Added") + + log = AdminAuditLog.objects.get( + content_type="ingest.trackedcontract", + object_id=str(contract.pk), + action=AdminAuditLog.ACTION_CREATE, + ) + assert log.user is None + + def test_content_type_format(self): + user = UserFactory() + contract = TrackedContractFactory() + request = _make_request(self.rf, user=user) + + self.mixin.log_addition(request, contract, "Added") + + log = AdminAuditLog.objects.get( + object_id=str(contract.pk), + action=AdminAuditLog.ACTION_CREATE, + ) + assert log.content_type == "ingest.trackedcontract" + + def test_object_repr_stored(self): + user = UserFactory() + contract = TrackedContractFactory() + request = _make_request(self.rf, user=user) + + self.mixin.log_addition(request, contract, "Added") + + log = AdminAuditLog.objects.get( + object_id=str(contract.pk), + action=AdminAuditLog.ACTION_CREATE, + ) + assert log.object_repr == str(contract)[:200] + + def test_changes_stored_for_update(self): + user = UserFactory() + contract = TrackedContractFactory() + request = _make_request(self.rf, user=user) + message = [{"changed": {"fields": ["name", "description"]}}] + + self.mixin.log_change(request, contract, message) + + log = AdminAuditLog.objects.get( + object_id=str(contract.pk), + action=AdminAuditLog.ACTION_UPDATE, + ) + assert log.changes != {} + + def test_audit_failure_does_not_raise(self, monkeypatch): + """Audit errors must never propagate to the caller.""" + user = UserFactory() + contract = TrackedContractFactory() + request = _make_request(self.rf, user=user) + + monkeypatch.setattr( + "soroscan.ingest.models.AdminAuditLog.objects.create", + lambda **kw: (_ for _ in ()).throw(Exception("DB down")), + ) + + # Should not raise + self.mixin.log_addition(request, contract, "Added") + + +# --------------------------------------------------------------------------- +# AdminAuditLogAdmin (read-only view) tests +# --------------------------------------------------------------------------- + +@pytest.mark.django_db +class TestAdminAuditLogAdmin: + def setup_method(self): + self.site = AdminSite() + self.admin = AdminAuditLogAdmin(AdminAuditLog, self.site) + self.rf = RequestFactory() + + def _request(self, superuser=True): + user = UserFactory(is_staff=True, is_superuser=superuser) + request = self.rf.get("/admin/") + request.user = user + return request + + def test_has_no_add_permission(self): + assert self.admin.has_add_permission(self._request()) is False + + def test_has_no_change_permission(self): + log = AdminAuditLogFactory() + assert self.admin.has_change_permission(self._request(), log) is False + + def test_has_no_delete_permission(self): + log = AdminAuditLogFactory() + assert self.admin.has_delete_permission(self._request(), log) is False + + def test_all_fields_are_readonly(self): + expected = { + "user", "action", "object_repr", "object_id", + "content_type", "changes", "ip_address", "timestamp", + } + assert expected.issubset(set(self.admin.readonly_fields)) + + def test_action_colored_create(self): + log = AdminAuditLogFactory(action=AdminAuditLog.ACTION_CREATE) + html = self.admin.action_colored(log) + assert "#28a745" in str(html) + assert "Create" in str(html) + + def test_action_colored_update(self): + log = AdminAuditLogFactory(action=AdminAuditLog.ACTION_UPDATE) + html = self.admin.action_colored(log) + assert "#007bff" in str(html) + assert "Update" in str(html) + + def test_action_colored_delete(self): + log = AdminAuditLogFactory(action=AdminAuditLog.ACTION_DELETE) + html = self.admin.action_colored(log) + assert "#dc3545" in str(html) + assert "Delete" in str(html) + + def test_list_display_fields(self): + expected = {"timestamp", "action_colored", "content_type", "object_id", "object_repr", "user", "ip_address"} + assert expected.issubset(set(self.admin.list_display)) + + def test_search_fields_configured(self): + assert len(self.admin.search_fields) > 0 + + def test_list_filter_configured(self): + assert "action" in self.admin.list_filter + assert "timestamp" in self.admin.list_filter