Skip to content

Commit 2946c5e

Browse files
committed
Rewrite refresh event logger to handler
This still doesn't reschedule anything, but it filters out incoming events that this process doesn't want to handle, and ignores/deletes events that no ipdevpoll process will handle. It also groups incoming events by netbox+jobname, in case there are multiple requests for the same job to be re-run (i.e. someone impatient keeps sending refresh events, or maybe ipdevpoll was down for a while and events queued up, or two separate users just happened to send the same refresh request at the same time).
1 parent 1d8c051 commit 2946c5e

File tree

2 files changed

+65
-7
lines changed

2 files changed

+65
-7
lines changed

python/nav/ipdevpoll/daemon.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def setup_scheduling(self):
127127
self.options.onlyjob,
128128
)
129129
reactor.callWhenRunning(
130-
db.subscribe_to_event_notifications, schedule.log_received_events
130+
db.subscribe_to_event_notifications, schedule.handle_incoming_events
131131
)
132132

133133
def log_scheduler_jobs():
@@ -199,7 +199,7 @@ def setup_multiprocess(self, process_count, max_jobs):
199199
self.options.onlyjob,
200200
)
201201
reactor.callWhenRunning(
202-
db.subscribe_to_event_notifications, schedule.log_received_events
202+
db.subscribe_to_event_notifications, schedule.handle_incoming_events
203203
)
204204

205205
def log_scheduler_jobs():

python/nav/ipdevpoll/schedule.py

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from random import randint
2525
from math import ceil
2626

27+
from django.db.transaction import atomic
2728
from twisted.python.failure import Failure
2829
from twisted.internet import task, reactor
2930
from twisted.internet.defer import Deferred
@@ -556,12 +557,69 @@ def flush(self):
556557
_COUNTERS = CounterFlusher()
557558

558559

559-
def log_received_events():
560-
"""Checks the event queue for events addressed to ipdevpoll and logs them"""
561-
return run_in_thread(_log_received_events)
560+
def handle_incoming_events():
561+
"""Checks the event queue for events addressed to ipdevpoll and handles them"""
562+
# Since this extensively accesses the database, it needs to run in a thread:
563+
return run_in_thread(_handle_incoming_events)
562564

563565

564-
def _log_received_events():
566+
@atomic
567+
def _handle_incoming_events():
565568
events = EventQueue.objects.filter(target='ipdevpoll')
569+
# Filter out (and potentially delete) events not worthy of our attention
570+
events = [event for event in events if _event_pre_filter(event)]
571+
572+
boxes_to_reschedule = defaultdict(list)
573+
# There may be multiple notifications queued for the same request, so group them
574+
# by netbox+jobname
566575
for event in events:
567-
_logger.debug("Event on queue: %r", event)
576+
boxes_to_reschedule[(event.netbox_id, event.subid)].append(event)
577+
_logger.debug("boxes_to_reschedule: %r", boxes_to_reschedule)
578+
579+
580+
def _event_pre_filter(event: EventQueue):
581+
"""Returns True if this event is worthy of this process' attention. If the event
582+
isn't worthy of *any* ipdevpoll process' attention, we delete it from the database
583+
too.
584+
"""
585+
_logger.debug("Found event on queue: %r", event)
586+
if not _is_valid_refresh_event(event):
587+
event.delete()
588+
return False
589+
if not _is_refresh_event_for_me(event):
590+
return False
591+
# TODO: Should also delete events that seem to be stale. If the requested job is
592+
# logged as having run after the event's timestamp, the event is stale.
593+
return True
594+
595+
596+
def _is_valid_refresh_event(event: EventQueue):
597+
if event.event_type_id != 'notification':
598+
_logger.info("Ignoring non-notification event from %s", event.source)
599+
return False
600+
601+
if not event.subid:
602+
_logger.info(
603+
"Ignoring notification event from %s with blank job name", event.source
604+
)
605+
return False
606+
607+
return True
608+
609+
610+
def _is_refresh_event_for_me(event: EventQueue):
611+
schedulers = JobScheduler.get_job_schedulers_by_name()
612+
s = schedulers['ip2mac']
613+
if event.subid not in schedulers:
614+
_logger.debug(
615+
"This process does not schedule %s, %r is not for us", event.subid, event
616+
)
617+
return False
618+
619+
if event.netbox_id not in schedulers[event.subid].netboxes:
620+
_logger.debug(
621+
"This process does not poll from %s, %r is not for us", event.netbox, event
622+
)
623+
return False
624+
625+
return True

0 commit comments

Comments
 (0)