Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions openwisp_notifications/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

logger = logging.getLogger(__name__)

EXTRA_DATA = app_settings.get_config()["USE_JSONFIELD"]

User = get_user_model()

Notification = load_model("Notification")
Expand Down Expand Up @@ -141,8 +139,10 @@ def notify_handler(**kwargs):
(kwargs.pop(opt, None), opt) for opt in ("target", "action_object")
]

notification_list = []
for recipient in recipients:
notifications_to_create = []
recipients_list = list(recipients)

for recipient in recipients_list:
notification = Notification(
recipient=recipient,
actor=actor,
Expand All @@ -163,10 +163,34 @@ def notify_handler(**kwargs):
"%s_content_type" % opt,
ContentType.objects.get_for_model(obj),
)
if kwargs and EXTRA_DATA:
if kwargs:
notification.data = kwargs
notification.save()
notification_list.append(notification)
notifications_to_create.append(notification)

post_save.disconnect(clear_notification_cache, sender=Notification)

try:
notification_list = Notification.objects.bulk_create(notifications_to_create)

for notification in notification_list:
send_email_notification(Notification, notification, created=True)

for recipient in recipients_list:
Notification.invalidate_unread_cache(recipient)

first_notification = notification_list[0] if notification_list else None
ws_handlers.bulk_notification_update_handler(
recipients=recipients_list,
reload_widget=True,
notification=first_notification,
)

finally:
post_save.connect(
clear_notification_cache,
sender=Notification,
dispatch_uid="clear_notification_cache_saved",
)

return notification_list

Expand Down
140 changes: 124 additions & 16 deletions openwisp_notifications/websockets/handlers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from asgiref.sync import async_to_sync
from channels import layers
from django.core.cache import cache
from django.db.models import Count, Q
from django.utils.timezone import now, timedelta

from openwisp_notifications.api.serializers import NotFound, NotificationListSerializer
Expand All @@ -12,6 +13,95 @@
Notification = load_model("Notification")


def bulk_check_notification_storm_and_unread_count(recipients):
if not recipients:
return {}

recipient_ids = [str(recipient.pk) for recipient in recipients]
cached_storm_data = cache.get_many([f"ow-noti-storm-{pk}" for pk in recipient_ids])

results = {}
uncached_recipients = []

for recipient in recipients:
cache_key = f"ow-noti-storm-{recipient.pk}"
if cache_key in cached_storm_data:
results[recipient.pk] = [cached_storm_data[cache_key], None]
else:
uncached_recipients.append(recipient)
results[recipient.pk] = [False, None]

if uncached_recipients:
short_term_threshold = now() - timedelta(
seconds=app_settings.NOTIFICATION_STORM_PREVENTION["short_term_time_period"]
)
long_term_threshold = now() - timedelta(
seconds=app_settings.NOTIFICATION_STORM_PREVENTION["long_term_time_period"]
)

storm_and_unread_data = (
Notification.objects.filter(recipient_id__in=recipient_ids)
.values("recipient_id")
.annotate(
short_term_count=Count(
"id", filter=Q(timestamp__gte=short_term_threshold)
),
long_term_count=Count(
"id", filter=Q(timestamp__gte=long_term_threshold)
),
unread_count=Count("id", filter=Q(unread=True)),
)
)

cache_updates = {}
for data in storm_and_unread_data:
recipient_id = data["recipient_id"]

in_storm = (
data["short_term_count"]
> app_settings.NOTIFICATION_STORM_PREVENTION[
"short_term_notification_count"
]
or data["long_term_count"]
> app_settings.NOTIFICATION_STORM_PREVENTION[
"long_term_notification_count"
]
)

results[recipient_id] = [in_storm, data["unread_count"]]

if in_storm:
cache_updates[f"ow-noti-storm-{recipient_id}"] = True

if cache_updates:
cache.set_many(cache_updates, timeout=60)

for recipient in uncached_recipients:
if recipient.pk not in [
data["recipient_id"] for data in storm_and_unread_data
]:
results[recipient.pk] = [False, 0]

if any(results[pk][1] is None for pk in results):
recipients_needing_unread = [pk for pk in results if results[pk][1] is None]
unread_data = (
Notification.objects.filter(
recipient_id__in=recipients_needing_unread, unread=True
)
.values("recipient_id")
.annotate(unread_count=Count("id"))
)

for data in unread_data:
results[data["recipient_id"]][1] = data["unread_count"]

for pk in recipients_needing_unread:
if results[pk][1] is None:
results[pk][1] = 0

return {pk: (storm, unread) for pk, (storm, unread) in results.items()}


def user_in_notification_storm(user):
"""
A user is affected by notifications storm if any of short term
Expand Down Expand Up @@ -52,23 +142,41 @@ def user_in_notification_storm(user):
return in_notification_storm


def notification_update_handler(reload_widget=False, notification=None, recipient=None):
def bulk_notification_update_handler(
recipients, reload_widget=False, notification=None
):
if not recipients:
return

channel_layer = layers.get_channel_layer()

serialized_notification = None
try:
assert notification is not None
notification = NotificationListSerializer(notification).data
if notification is not None:
serialized_notification = NotificationListSerializer(notification).data
except (NotFound, AssertionError):
pass
async_to_sync(channel_layer.group_send)(
f"ow-notification-{recipient.pk}",
{
"type": "send.updates",
"reload_widget": reload_widget,
"notification": notification,
"recipient": str(recipient.pk),
"in_notification_storm": user_in_notification_storm(recipient),
"notification_count": normalize_unread_count(
recipient.notifications.unread().count()
),
},
)

bulk_data = bulk_check_notification_storm_and_unread_count(recipients)

for recipient in recipients:
in_storm, unread_count = bulk_data.get(recipient.pk, (False, 0))

async_to_sync(channel_layer.group_send)(
f"ow-notification-{recipient.pk}",
{
"type": "send.updates",
"reload_widget": reload_widget,
"notification": serialized_notification,
"recipient": str(recipient.pk),
"in_notification_storm": in_storm,
"notification_count": normalize_unread_count(unread_count),
},
)


def notification_update_handler(reload_widget=False, notification=None, recipient=None):
if recipient is None:
return

bulk_notification_update_handler([recipient], reload_widget, notification)