Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ It serves as a backend-focused implementation for the ProDev BE case study, meti
| CI/CD | GitHub Actions |
| Testing | PyTest |
| API Documentation | Swagger / OpenAPI |
| Deployment | AWS / Heroku |
| Deployment | Heroku |
| Formatting/Linting | Ruff / Black |
| Code Quality | SonarCloud |
| Asynchronous Tasks | Celery |
| Message Broker | Redis |
| Payment Gateway | Chapa |
| Task Scheduling | Django Celery Beat |

## Getting Started
To get started with The Agora, follow these steps:
Expand Down Expand Up @@ -107,35 +109,35 @@ To get started with The Agora, follow these steps:
```

## API Documentation
| Endpoint | Method | Description |
|--------------------------------------|--------|---------------------------------------------|
| Poll Management | | |
| `/api/v1/polls/` | GET | Retrieve a list of all polls |
| `/api/v1/polls/` | POST | Create a new poll |
| `/api/v1/polls/{poll_id}/` | GET | Retrieve details of a specific poll |
| `/api/v1/polls/{poll_id}/` | PATCH | Update a specific poll |
| `/api/v1/polls/{poll_id}/` | DELETE | Delete a specific poll |
| `/api/v1/polls/{poll_id}/close/` | POST | Close voting for a specific poll |
| Vote on Polls | | |
| `/api/v1/polls/{poll_id}/vote/` | POST | Cast a vote for a specific poll |
| User Management | | |
| `/api/v1/auth/register/` | POST | Register a new user |
| `/api/v1/auth/login/` | POST | User login |
| `/api/v1/auth/logout/` | POST | User logout |
| `/api/v1/auth/verify-email/` | GET | Verify user email |
| `/api/v1/users/resend-verification/` | POST | Resend email verification |
| Real-Time Updates | | |
| `/api/v1/updates/` | WS | WebSocket endpoint for real-time updates |
| Organizational Management | | |
| `/api/v1/organizations/` | GET | Retrieve a list of organizations |
| `/api/v1/organizations/` | POST | Create a new organization |
| `/api/v1/organizations/{org_id}/` | GET | Retrieve details of a specific organization |
| `/api/v1/organizations/{org_id}/` | PATCH | Update a specific organization |
| `/api/v1/organizations/{org_id}/` | DELETE | Delete a specific organization |
| Payment Integration | | |
| `/api/v1/payments/initialize/` | POST | Initialize Payment |
| `/api/v1/payments/verify/` | POST | Verify Payment |
| `/api/v1/payments/{payment_id}/ ` | GET | Retrieve Payment Details |
| Endpoint | Method | Description |
|------------------------------------------|--------|---------------------------------------------|
| Poll Management | | |
| `/api/v1/polls/` | GET | Retrieve a list of all polls |
| `/api/v1/polls/` | POST | Create a new poll |
| `/api/v1/polls/{poll_id}/` | GET | Retrieve details of a specific poll |
| `/api/v1/polls/{poll_id}/` | PATCH | Update a specific poll |
| `/api/v1/polls/{poll_id}/` | DELETE | Delete a specific poll |
| `/api/v1/polls/{poll_id}/close/` | POST | Close voting for a specific poll |
| Vote on Polls | | |
| `/api/v1/polls/{poll_id}/vote/` | POST | Cast a vote for a specific poll |
| User Management | | |
| `/api/v1/auth/register/` | POST | Register a new user |
| `/api/v1/auth/login/` | POST | User login |
| `/api/v1/auth/logout/` | POST | User logout |
| `/api/v1/auth/verify-email/` | GET | Verify user email |
| `/api/v1/users/resend-verification/` | POST | Resend email verification |
| Real-Time Updates | | |
| `ws://127.0.0.1:8000/ws/poll/{poll_id}/` | WS | WebSocket endpoint for real-time updates |
| Organizational Management | | |
| `/api/v1/organizations/` | GET | Retrieve a list of organizations |
| `/api/v1/organizations/` | POST | Create a new organization |
| `/api/v1/organizations/{org_id}/` | GET | Retrieve details of a specific organization |
| `/api/v1/organizations/{org_id}/` | PATCH | Update a specific organization |
| `/api/v1/organizations/{org_id}/` | DELETE | Delete a specific organization |
| Payment Integration | | |
| `/api/v1/payments/initialize/` | POST | Initialize Payment |
| `/api/v1/payments/verify/` | POST | Verify Payment |
| `/api/v1/payments/{payment_id}/` | GET | Retrieve Payment Details |

## API Usage Examples
### Creating a Poll
Expand Down
Binary file modified core/__pycache__/settings.cpython-313.pyc
Binary file not shown.
4 changes: 4 additions & 0 deletions core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,8 @@
"task": "core.tasks.generate_weekly_user_statistics",
"schedule": crontab(day_of_week="sun", hour=23, minute=0),
},
"broadcast-poll-updates": {
"task": "core.tasks.broadcast_poll_updates",
"schedule": 2.0, # every 2 seconds
},
}
56 changes: 55 additions & 1 deletion core/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import logging
from pathlib import Path

from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer
from django.utils import timezone
from datetime import timedelta
from django.contrib.auth import get_user_model
import os

from django_redis import get_redis_connection

from polls.models import PollOption

User = get_user_model()
logger = logging.getLogger("core.tasks")
Expand Down Expand Up @@ -60,3 +65,52 @@ def generate_weekly_user_statistics():

logger.info(f"Weekly stats: {stats}")
return stats


@shared_task
def broadcast_poll_updates():
"""
Periodically checks for 'dirty' polls (polls with new votes)
and broadcasts the latest results to WebSockets.
Prevents 'Thundering Herd' by batching updates.
"""
redis_conn = get_redis_connection("default")
channel_layer = get_channel_layer()

# Check if there are any dirty polls
if not redis_conn.exists("dirty_polls"):
return

# 1. Atomic Snapshot
# Rename the key so we have a static list to process,
# while new votes start filling a fresh 'dirty_polls' set.
try:
redis_conn.rename("dirty_polls", "dirty_polls_processing")
except Exception:
# Key might have disappeared or race condition; skip this tick.
return

# 2. Get all unique Poll IDs that need updates
dirty_poll_ids = redis_conn.smembers("dirty_polls_processing")

logger.info(f"Broadcasting updates for {len(dirty_poll_ids)} polls")

for poll_id_bytes in dirty_poll_ids:
try:
poll_id = poll_id_bytes.decode("utf-8")
room_group_name = f"poll_{poll_id}"

# 3. Fetch Fresh Data (Once per batch, not per vote)
options_data = list(
PollOption.objects.filter(poll_id=poll_id).values("id", "vote_count")
)

# 4. Broadcast to WebSocket Group
async_to_sync(channel_layer.group_send)(
room_group_name, {"type": "poll_update", "results": options_data}
)
except Exception as e:
logger.exception(f"Error broadcasting poll {poll_id}: {e}")

# 5. Cleanup the processing key
redis_conn.delete("dirty_polls_processing")
55 changes: 19 additions & 36 deletions polls/signals.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging

from django.db.models.signals import post_save
from django.dispatch import receiver
from django.db.models import F
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
import logging
from django_redis import get_redis_connection
from redis.exceptions import RedisError

from .models import Vote, PollOption

Expand All @@ -14,40 +15,22 @@
def handle_new_vote(sender, instance, created, **kwargs):
"""
Triggered when a Vote is saved.
1. Atomically increments the PollOption count.
2. Broadcasts the new count to WebSocket via Redis.
1. Atomically increments the PollOption count (DB).
2. Marks the poll as 'dirty' in Redis for the background worker to pick up.
"""
if created:
try:
# 1. Atomic Increment (DB Side)
PollOption.objects.filter(id=instance.option.id).update(
vote_count=F("vote_count") + 1
)

# 2. Real-time Broadcast
channel_layer = get_channel_layer()
# Use raw poll_id to avoid extra DB lookup for the Poll object
poll_id = str(instance.poll_id)
room_group_name = f"poll_{poll_id}"
# Atomic Increment (DB Side)
# Keeps data integrity without race conditions
PollOption.objects.filter(id=instance.option.id).update(
vote_count=F("vote_count") + 1
)

# Fetch fresh counts
# We fetch all options for this poll to ensure the frontend is fully synced
options = PollOption.objects.filter(poll_id=poll_id).only(
"index", "vote_count"
)

# Format data: Map 'index' to 'id' for the frontend
formatted_data = [
{"id": opt.index, "vote_count": opt.vote_count} for opt in options
]

# Send the updated counts to the group
async_to_sync(channel_layer.group_send)(
room_group_name, {"type": "poll_update", "results": formatted_data}
try:
con = get_redis_connection("default")
con.sadd("dirty_polls", str(instance.poll.poll_id))
except RedisError:
# The vote is safe in the DB, just the real-time update might delay.
logger.exception(
"Failed to flag dirty poll in Redis",
extra={"poll_id": str(instance.poll.poll_id)},
)

except Exception as e:
# Log the error but DO NOT crash the request.
# The vote is already saved, so we shouldn't return a 500 error
# just because the real-time update failed.
logger.error(f"Error broadcasting poll update: {e}", exc_info=True)
7 changes: 3 additions & 4 deletions polls/views.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from rest_framework import viewsets, status, permissions, filters, serializers
from rest_framework import viewsets, status, permissions, filters
from rest_framework.decorators import action
from rest_framework.generics import get_object_or_404
from rest_framework.response import Response
from django.db.models import Q
from django.utils import timezone
from drf_spectacular.utils import extend_schema

from .models import Poll, PollCategory, PollOption, Vote
from .models import Poll, PollCategory
from .permissions import CanCreateCategory, IsPollCreatorOrOrgAdmin
from .serializers import (
PollCreateSerializer,
Expand All @@ -15,7 +14,6 @@
VoteSerializer,
)
from organizations.models import OrganizationMember
from .utils import get_client_ip, get_country_from_ip


@extend_schema(tags=["Categories"])
Expand Down Expand Up @@ -77,6 +75,7 @@ def get_queryset(self):
)
.distinct()
.select_related("poll_category", "creator", "organization")
.prefetch_related("options")
)

def perform_create(self, serializer):
Expand Down