diff --git a/README.md b/README.md index ca52ae1..1b2c0bc 100644 --- a/README.md +++ b/README.md @@ -33,11 +33,13 @@ It serves as a backend-focused implementation for the ProDev BE case study, meti | CI/CD | GitHub Actions | | Testing | PyTest | | API Documentation | Swagger / OpenAPI | -| Deployment | AWS / Heroku | +| Deployment | Heroku | | Formatting/Linting | Ruff / Black | | Code Quality | SonarCloud | | Asynchronous Tasks | Celery | | Message Broker | Redis | +| Payment Gateway | Chapa | +| Task Scheduling | Django Celery Beat | ## Getting Started To get started with The Agora, follow these steps: @@ -107,35 +109,35 @@ To get started with The Agora, follow these steps: ``` ## API Documentation -| Endpoint | Method | Description | -|--------------------------------------|--------|---------------------------------------------| -| Poll Management | | | -| `/api/v1/polls/` | GET | Retrieve a list of all polls | -| `/api/v1/polls/` | POST | Create a new poll | -| `/api/v1/polls/{poll_id}/` | GET | Retrieve details of a specific poll | -| `/api/v1/polls/{poll_id}/` | PATCH | Update a specific poll | -| `/api/v1/polls/{poll_id}/` | DELETE | Delete a specific poll | -| `/api/v1/polls/{poll_id}/close/` | POST | Close voting for a specific poll | -| Vote on Polls | | | -| `/api/v1/polls/{poll_id}/vote/` | POST | Cast a vote for a specific poll | -| User Management | | | -| `/api/v1/auth/register/` | POST | Register a new user | -| `/api/v1/auth/login/` | POST | User login | -| `/api/v1/auth/logout/` | POST | User logout | - | `/api/v1/auth/verify-email/` | GET | Verify user email | -| `/api/v1/users/resend-verification/` | POST | Resend email verification | -| Real-Time Updates | | | -| `/api/v1/updates/` | WS | WebSocket endpoint for real-time updates | -| Organizational Management | | | -| `/api/v1/organizations/` | GET | Retrieve a list of organizations | -| `/api/v1/organizations/` | POST | Create a new organization | -| `/api/v1/organizations/{org_id}/` | GET | Retrieve details of a specific organization | -| `/api/v1/organizations/{org_id}/` | PATCH | Update a specific organization | -| `/api/v1/organizations/{org_id}/` | DELETE | Delete a specific organization | - | Payment Integration | | | -| `/api/v1/payments/initialize/` | POST | Initialize Payment | -| `/api/v1/payments/verify/` | POST | Verify Payment | -| `/api/v1/payments/{payment_id}/ ` | GET | Retrieve Payment Details | +| Endpoint | Method | Description | +|------------------------------------------|--------|---------------------------------------------| +| Poll Management | | | +| `/api/v1/polls/` | GET | Retrieve a list of all polls | +| `/api/v1/polls/` | POST | Create a new poll | +| `/api/v1/polls/{poll_id}/` | GET | Retrieve details of a specific poll | +| `/api/v1/polls/{poll_id}/` | PATCH | Update a specific poll | +| `/api/v1/polls/{poll_id}/` | DELETE | Delete a specific poll | +| `/api/v1/polls/{poll_id}/close/` | POST | Close voting for a specific poll | +| Vote on Polls | | | +| `/api/v1/polls/{poll_id}/vote/` | POST | Cast a vote for a specific poll | +| User Management | | | +| `/api/v1/auth/register/` | POST | Register a new user | +| `/api/v1/auth/login/` | POST | User login | +| `/api/v1/auth/logout/` | POST | User logout | +| `/api/v1/auth/verify-email/` | GET | Verify user email | +| `/api/v1/users/resend-verification/` | POST | Resend email verification | +| Real-Time Updates | | | +| `ws://127.0.0.1:8000/ws/poll/{poll_id}/` | WS | WebSocket endpoint for real-time updates | +| Organizational Management | | | +| `/api/v1/organizations/` | GET | Retrieve a list of organizations | +| `/api/v1/organizations/` | POST | Create a new organization | +| `/api/v1/organizations/{org_id}/` | GET | Retrieve details of a specific organization | +| `/api/v1/organizations/{org_id}/` | PATCH | Update a specific organization | +| `/api/v1/organizations/{org_id}/` | DELETE | Delete a specific organization | +| Payment Integration | | | +| `/api/v1/payments/initialize/` | POST | Initialize Payment | +| `/api/v1/payments/verify/` | POST | Verify Payment | +| `/api/v1/payments/{payment_id}/` | GET | Retrieve Payment Details | ## API Usage Examples ### Creating a Poll diff --git a/core/__pycache__/settings.cpython-313.pyc b/core/__pycache__/settings.cpython-313.pyc index 69aa5a1..b49ff6d 100644 Binary files a/core/__pycache__/settings.cpython-313.pyc and b/core/__pycache__/settings.cpython-313.pyc differ diff --git a/core/settings.py b/core/settings.py index dfef7a9..0544c0a 100644 --- a/core/settings.py +++ b/core/settings.py @@ -383,4 +383,8 @@ "task": "core.tasks.generate_weekly_user_statistics", "schedule": crontab(day_of_week="sun", hour=23, minute=0), }, + "broadcast-poll-updates": { + "task": "core.tasks.broadcast_poll_updates", + "schedule": 2.0, # every 2 seconds + }, } diff --git a/core/tasks.py b/core/tasks.py index 530623c..14500ff 100644 --- a/core/tasks.py +++ b/core/tasks.py @@ -1,11 +1,16 @@ import logging from pathlib import Path +from asgiref.sync import async_to_sync from celery import shared_task +from channels.layers import get_channel_layer from django.utils import timezone from datetime import timedelta from django.contrib.auth import get_user_model -import os + +from django_redis import get_redis_connection + +from polls.models import PollOption User = get_user_model() logger = logging.getLogger("core.tasks") @@ -60,3 +65,52 @@ def generate_weekly_user_statistics(): logger.info(f"Weekly stats: {stats}") return stats + + +@shared_task +def broadcast_poll_updates(): + """ + Periodically checks for 'dirty' polls (polls with new votes) + and broadcasts the latest results to WebSockets. + Prevents 'Thundering Herd' by batching updates. + """ + redis_conn = get_redis_connection("default") + channel_layer = get_channel_layer() + + # Check if there are any dirty polls + if not redis_conn.exists("dirty_polls"): + return + + # 1. Atomic Snapshot + # Rename the key so we have a static list to process, + # while new votes start filling a fresh 'dirty_polls' set. + try: + redis_conn.rename("dirty_polls", "dirty_polls_processing") + except Exception: + # Key might have disappeared or race condition; skip this tick. + return + + # 2. Get all unique Poll IDs that need updates + dirty_poll_ids = redis_conn.smembers("dirty_polls_processing") + + logger.info(f"Broadcasting updates for {len(dirty_poll_ids)} polls") + + for poll_id_bytes in dirty_poll_ids: + try: + poll_id = poll_id_bytes.decode("utf-8") + room_group_name = f"poll_{poll_id}" + + # 3. Fetch Fresh Data (Once per batch, not per vote) + options_data = list( + PollOption.objects.filter(poll_id=poll_id).values("id", "vote_count") + ) + + # 4. Broadcast to WebSocket Group + async_to_sync(channel_layer.group_send)( + room_group_name, {"type": "poll_update", "results": options_data} + ) + except Exception as e: + logger.exception(f"Error broadcasting poll {poll_id}: {e}") + + # 5. Cleanup the processing key + redis_conn.delete("dirty_polls_processing") diff --git a/polls/signals.py b/polls/signals.py index ef0bfb9..399a478 100644 --- a/polls/signals.py +++ b/polls/signals.py @@ -1,9 +1,10 @@ +import logging + from django.db.models.signals import post_save from django.dispatch import receiver from django.db.models import F -from asgiref.sync import async_to_sync -from channels.layers import get_channel_layer -import logging +from django_redis import get_redis_connection +from redis.exceptions import RedisError from .models import Vote, PollOption @@ -14,40 +15,22 @@ def handle_new_vote(sender, instance, created, **kwargs): """ Triggered when a Vote is saved. - 1. Atomically increments the PollOption count. - 2. Broadcasts the new count to WebSocket via Redis. + 1. Atomically increments the PollOption count (DB). + 2. Marks the poll as 'dirty' in Redis for the background worker to pick up. """ if created: - try: - # 1. Atomic Increment (DB Side) - PollOption.objects.filter(id=instance.option.id).update( - vote_count=F("vote_count") + 1 - ) - - # 2. Real-time Broadcast - channel_layer = get_channel_layer() - # Use raw poll_id to avoid extra DB lookup for the Poll object - poll_id = str(instance.poll_id) - room_group_name = f"poll_{poll_id}" + # Atomic Increment (DB Side) + # Keeps data integrity without race conditions + PollOption.objects.filter(id=instance.option.id).update( + vote_count=F("vote_count") + 1 + ) - # Fetch fresh counts - # We fetch all options for this poll to ensure the frontend is fully synced - options = PollOption.objects.filter(poll_id=poll_id).only( - "index", "vote_count" - ) - - # Format data: Map 'index' to 'id' for the frontend - formatted_data = [ - {"id": opt.index, "vote_count": opt.vote_count} for opt in options - ] - - # Send the updated counts to the group - async_to_sync(channel_layer.group_send)( - room_group_name, {"type": "poll_update", "results": formatted_data} + try: + con = get_redis_connection("default") + con.sadd("dirty_polls", str(instance.poll.poll_id)) + except RedisError: + # The vote is safe in the DB, just the real-time update might delay. + logger.exception( + "Failed to flag dirty poll in Redis", + extra={"poll_id": str(instance.poll.poll_id)}, ) - - except Exception as e: - # Log the error but DO NOT crash the request. - # The vote is already saved, so we shouldn't return a 500 error - # just because the real-time update failed. - logger.error(f"Error broadcasting poll update: {e}", exc_info=True) diff --git a/polls/views.py b/polls/views.py index 9c7d36f..1af3d06 100644 --- a/polls/views.py +++ b/polls/views.py @@ -1,12 +1,11 @@ -from rest_framework import viewsets, status, permissions, filters, serializers +from rest_framework import viewsets, status, permissions, filters from rest_framework.decorators import action -from rest_framework.generics import get_object_or_404 from rest_framework.response import Response from django.db.models import Q from django.utils import timezone from drf_spectacular.utils import extend_schema -from .models import Poll, PollCategory, PollOption, Vote +from .models import Poll, PollCategory from .permissions import CanCreateCategory, IsPollCreatorOrOrgAdmin from .serializers import ( PollCreateSerializer, @@ -15,7 +14,6 @@ VoteSerializer, ) from organizations.models import OrganizationMember -from .utils import get_client_ip, get_country_from_ip @extend_schema(tags=["Categories"]) @@ -77,6 +75,7 @@ def get_queryset(self): ) .distinct() .select_related("poll_category", "creator", "organization") + .prefetch_related("options") ) def perform_create(self, serializer):