diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d44810..ac489ed9 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -133,3 +133,15 @@ class AuditLog(db.Model): user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True) action = db.Column(db.String(100), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + +class AuditLog(db.Model): + """Immutable audit trail for GDPR-regulated actions.""" + + __tablename__ = "audit_logs" + + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, index=True) + action = db.Column(db.String(64), nullable=False) + detail = db.Column(db.String(1000), nullable=True) + performed_at = db.Column(db.DateTime(timezone=True), nullable=False) diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f89..288b9816 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .gdpr import bp as gdpr_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(gdpr_bp, url_prefix="/gdpr") diff --git a/packages/backend/app/routes/gdpr.py b/packages/backend/app/routes/gdpr.py new file mode 100644 index 00000000..c4c38ed4 --- /dev/null +++ b/packages/backend/app/routes/gdpr.py @@ -0,0 +1,96 @@ +""" +GDPR Routes +----------- +GET /gdpr/export - Download ZIP of all personal data +DELETE /gdpr/account - Permanently delete account and all PII +GET /gdpr/audit-log - View GDPR action history for current user +""" +from __future__ import annotations + +import logging + +from flask import Blueprint, jsonify, make_response +from flask_jwt_extended import get_jwt_identity, jwt_required + +from ..services.gdpr import GDPRService + +bp = Blueprint("gdpr", __name__) +logger = logging.getLogger("finmind.gdpr") + + +@bp.get("/export") +@jwt_required() +def export_data(): + """ + Stream a ZIP archive containing all personal data for the authenticated user. + Compliant with GDPR Art. 20 (right to data portability). + """ + user_id = int(get_jwt_identity()) + try: + svc = GDPRService(user_id) + zip_bytes = svc.export_zip() + except ValueError as exc: + return jsonify(error=str(exc)), 404 + except Exception: + logger.exception("GDPR export failed for user_id=%s", user_id) + return jsonify(error="Export failed. Please try again later."), 500 + + response = make_response(zip_bytes) + response.headers["Content-Type"] = "application/zip" + response.headers["Content-Disposition"] = "attachment; filename=finmind_data_export.zip" + response.headers["Content-Length"] = len(zip_bytes) + return response + + +@bp.delete("/account") +@jwt_required() +def delete_account(): + """ + Permanently and irreversibly delete the authenticated user's account and + all associated personal data. This action cannot be undone. + Compliant with GDPR Art. 17 (right to erasure). + """ + user_id = int(get_jwt_identity()) + try: + svc = GDPRService(user_id) + result = svc.delete_account() + except ValueError as exc: + return jsonify(error=str(exc)), 404 + except Exception: + logger.exception("GDPR delete failed for user_id=%s", user_id) + return jsonify(error="Deletion failed. Please try again later."), 500 + + return jsonify(result), 200 + + +@bp.get("/audit-log") +@jwt_required() +def audit_log(): + """ + Return the GDPR audit trail for the authenticated user. + Lists all export and deletion events with timestamps. + """ + from ..extensions import db + from ..models import AuditLog + + user_id = int(get_jwt_identity()) + entries = ( + db.session.query(AuditLog) + .filter_by(user_id=user_id) + .order_by(AuditLog.performed_at.desc()) + .limit(100) + .all() + ) + return jsonify( + { + "audit_log": [ + { + "id": e.id, + "action": e.action, + "detail": e.detail, + "performed_at": e.performed_at.isoformat(), + } + for e in entries + ] + } + ), 200 diff --git a/packages/backend/app/services/gdpr.py b/packages/backend/app/services/gdpr.py new file mode 100644 index 00000000..6a86331f --- /dev/null +++ b/packages/backend/app/services/gdpr.py @@ -0,0 +1,209 @@ +""" +GDPR PII Export & Delete Service +--------------------------------- +Handles user data export (ZIP) and irreversible account deletion +with a full audit trail for GDPR compliance. +""" +from __future__ import annotations + +import io +import json +import logging +import zipfile +from datetime import datetime, timezone +from typing import Any, Dict + +from ..extensions import db +from ..models import ( + AuditLog, + Bill, + Category, + Expense, + RecurringExpense, + User, +) + +logger = logging.getLogger("finmind.gdpr") + + +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + +def _serialize_row(obj: Any) -> Dict[str, Any]: + """Convert a SQLAlchemy model instance to a plain dict.""" + out: Dict[str, Any] = {} + for col in obj.__table__.columns: + val = getattr(obj, col.name) + if isinstance(val, datetime): + val = val.isoformat() + out[col.name] = val + return out + + +class GDPRService: + """Encapsulates all GDPR data operations for a single user.""" + + def __init__(self, user_id: int) -> None: + self.user_id = user_id + + # ── Helpers ─────────────────────────────────────────────────────────────── + + def _get_user(self) -> User: + user = db.session.get(User, self.user_id) + if not user: + raise ValueError(f"User {self.user_id} not found") + return user + + def _log_audit(self, action: str, detail: str = "") -> None: + entry = AuditLog( + user_id=self.user_id, + action=action, + detail=detail[:1000], + performed_at=_utcnow(), + ) + db.session.add(entry) + + # ── Export ───────────────────────────────────────────────────────────────── + + def export_zip(self) -> bytes: + """ + Build a GDPR data export ZIP containing: + - profile.json + - expenses.json + - categories.json + - recurring_expenses.json + - bills.json + - audit_log.json + - README.txt + Returns raw bytes of the ZIP file. + """ + user = self._get_user() + + categories = ( + db.session.query(Category).filter_by(user_id=self.user_id).all() + ) + expenses = ( + db.session.query(Expense).filter_by(user_id=self.user_id).all() + ) + recurring = ( + db.session.query(RecurringExpense) + .filter_by(user_id=self.user_id) + .all() + ) + bills = db.session.query(Bill).filter_by(user_id=self.user_id).all() + audit_entries = ( + db.session.query(AuditLog) + .filter_by(user_id=self.user_id) + .order_by(AuditLog.performed_at) + .all() + ) + + profile = { + "id": user.id, + "email": user.email, + "preferred_currency": user.preferred_currency, + "role": user.role, + "created_at": user.created_at.isoformat() if user.created_at else None, + } + + buf = io.BytesIO() + with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: + zf.writestr("profile.json", json.dumps(profile, indent=2)) + zf.writestr( + "categories.json", + json.dumps([_serialize_row(c) for c in categories], indent=2), + ) + zf.writestr( + "expenses.json", + json.dumps([_serialize_row(e) for e in expenses], indent=2), + ) + zf.writestr( + "recurring_expenses.json", + json.dumps([_serialize_row(r) for r in recurring], indent=2), + ) + zf.writestr( + "bills.json", + json.dumps([_serialize_row(b) for b in bills], indent=2), + ) + zf.writestr( + "audit_log.json", + json.dumps([_serialize_row(a) for a in audit_entries], indent=2), + ) + zf.writestr( + "README.txt", + ( + "FinMind Personal Data Export\n" + "============================\n" + f"Exported at : {_utcnow().isoformat()}\n" + f"Account : {user.email}\n\n" + "Files included:\n" + " profile.json - Account details\n" + " categories.json - Your spending categories\n" + " expenses.json - All expense records\n" + " recurring_expenses.json- Recurring expense rules\n" + " bills.json - Bill reminders\n" + " audit_log.json - GDPR action history\n\n" + "This export was generated in compliance with GDPR Art. 20.\n" + ), + ) + + self._log_audit("EXPORT_REQUESTED", f"ZIP export generated for {user.email}") + db.session.commit() + + logger.info("GDPR export generated for user_id=%s", self.user_id) + return buf.getvalue() + + # ── Delete ───────────────────────────────────────────────────────────────── + + def delete_account(self) -> Dict[str, Any]: + """ + Permanently and irreversibly delete all PII for this user. + Cascade order: expenses → recurring → bills → categories → audit_log → user. + Returns a summary dict with counts of deleted records. + """ + user = self._get_user() + email_snapshot = user.email # capture before deletion + + # Log intent *before* deletion so the entry exists if deletion fails + self._log_audit( + "DELETE_REQUESTED", + f"Irreversible account deletion initiated for {email_snapshot}", + ) + db.session.flush() + + counts: Dict[str, int] = {} + + counts["expenses"] = ( + db.session.query(Expense).filter_by(user_id=self.user_id).delete() + ) + counts["recurring_expenses"] = ( + db.session.query(RecurringExpense) + .filter_by(user_id=self.user_id) + .delete() + ) + counts["bills"] = ( + db.session.query(Bill).filter_by(user_id=self.user_id).delete() + ) + counts["categories"] = ( + db.session.query(Category).filter_by(user_id=self.user_id).delete() + ) + # Audit log last — keeps history until the very end + counts["audit_log_entries"] = ( + db.session.query(AuditLog).filter_by(user_id=self.user_id).delete() + ) + counts["user"] = ( + db.session.query(User).filter_by(id=self.user_id).delete() + ) + + db.session.commit() + + logger.info( + "GDPR account deleted email=%s counts=%s", email_snapshot, counts + ) + return { + "deleted": True, + "email": email_snapshot, + "records_removed": counts, + "performed_at": _utcnow().isoformat(), + } diff --git a/packages/backend/tests/test_gdpr.py b/packages/backend/tests/test_gdpr.py new file mode 100644 index 00000000..aaf7cb1d --- /dev/null +++ b/packages/backend/tests/test_gdpr.py @@ -0,0 +1,125 @@ +"""Tests for GDPR export & delete endpoints.""" +import io +import json +import zipfile + +import pytest + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _auth_headers(client, email="gdpr@example.com", password="testpass123"): + client.post( + "/auth/register", + json={"email": email, "password": password}, + ) + resp = client.post( + "/auth/login", + json={"email": email, "password": password}, + ) + token = resp.get_json()["access_token"] + return {"Authorization": f"Bearer {token}"} + + +# ── Export tests ────────────────────────────────────────────────────────────── + +class TestGDPRExport: + def test_export_returns_zip(self, client): + headers = _auth_headers(client) + resp = client.get("/gdpr/export", headers=headers) + assert resp.status_code == 200 + assert resp.content_type == "application/zip" + assert b"PK" in resp.data # ZIP magic bytes + + def test_export_zip_contains_expected_files(self, client): + headers = _auth_headers(client, email="export2@example.com") + resp = client.get("/gdpr/export", headers=headers) + zf = zipfile.ZipFile(io.BytesIO(resp.data)) + names = zf.namelist() + assert "profile.json" in names + assert "expenses.json" in names + assert "categories.json" in names + assert "recurring_expenses.json" in names + assert "bills.json" in names + assert "audit_log.json" in names + assert "README.txt" in names + + def test_export_profile_contains_email(self, client): + email = "export3@example.com" + headers = _auth_headers(client, email=email) + resp = client.get("/gdpr/export", headers=headers) + zf = zipfile.ZipFile(io.BytesIO(resp.data)) + profile = json.loads(zf.read("profile.json")) + assert profile["email"] == email + + def test_export_creates_audit_entry(self, client): + headers = _auth_headers(client, email="export4@example.com") + client.get("/gdpr/export", headers=headers) + audit_resp = client.get("/gdpr/audit-log", headers=headers) + log = audit_resp.get_json()["audit_log"] + assert any(e["action"] == "EXPORT_REQUESTED" for e in log) + + def test_export_requires_auth(self, client): + resp = client.get("/gdpr/export") + assert resp.status_code == 401 + + +# ── Delete tests ────────────────────────────────────────────────────────────── + +class TestGDPRDelete: + def test_delete_returns_200(self, client): + headers = _auth_headers(client, email="del1@example.com") + resp = client.delete("/gdpr/account", headers=headers) + assert resp.status_code == 200 + data = resp.get_json() + assert data["deleted"] is True + + def test_delete_removes_user_data(self, client): + headers = _auth_headers(client, email="del2@example.com") + client.delete("/gdpr/account", headers=headers) + # Login should now fail + login = client.post( + "/auth/login", + json={"email": "del2@example.com", "password": "testpass123"}, + ) + assert login.status_code == 401 + + def test_delete_response_includes_counts(self, client): + headers = _auth_headers(client, email="del3@example.com") + resp = client.delete("/gdpr/account", headers=headers) + data = resp.get_json() + assert "records_removed" in data + assert "user" in data["records_removed"] + + def test_delete_requires_auth(self, client): + resp = client.delete("/gdpr/account") + assert resp.status_code == 401 + + def test_delete_is_irreversible(self, client): + headers = _auth_headers(client, email="del4@example.com") + client.delete("/gdpr/account", headers=headers) + # Second delete should 404 (user gone) + resp = client.delete("/gdpr/account", headers=headers) + assert resp.status_code in (401, 404) + + +# ── Audit log tests ─────────────────────────────────────────────────────────── + +class TestGDPRAuditLog: + def test_audit_log_empty_initially(self, client): + headers = _auth_headers(client, email="audit1@example.com") + resp = client.get("/gdpr/audit-log", headers=headers) + assert resp.status_code == 200 + assert resp.get_json()["audit_log"] == [] + + def test_audit_log_records_export(self, client): + headers = _auth_headers(client, email="audit2@example.com") + client.get("/gdpr/export", headers=headers) + resp = client.get("/gdpr/audit-log", headers=headers) + log = resp.get_json()["audit_log"] + assert len(log) >= 1 + assert log[0]["action"] == "EXPORT_REQUESTED" + + def test_audit_log_requires_auth(self, client): + resp = client.get("/gdpr/audit-log") + assert resp.status_code == 401