diff --git a/pyproject.toml b/pyproject.toml index 1b669ed8e0..27a12bfb73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -118,6 +118,7 @@ navdump = "nav.bin.navdump:main" naventity = "nav.bin.naventity:main" navoidverify = "nav.bin.navoidverify:main" navpgdump = "nav.pgdump:main" +navrefresh = "nav.bin.navrefresh:main" navsnmp = "nav.bin.navsnmp:main" navstats = "nav.bin.navstats:main" navsyncdb = "nav.pgsync:main" diff --git a/python/nav/bin/navrefresh.py b/python/nav/bin/navrefresh.py new file mode 100755 index 0000000000..09d01545da --- /dev/null +++ b/python/nav/bin/navrefresh.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +# +# Copyright 2025 Sikt +# +# This file is part of Network Administration Visualized (NAV). +# +# NAV is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License version 3 as published by +# the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. You should have received a copy of the GNU General Public +# License along with NAV. If not, see . +# +"""A command line interface to send refresh commands to a running ipdevpoll daemon""" + + +import argparse +import sys + +from nav.bootstrap import bootstrap_django + +bootstrap_django(__file__) + +from django.db import transaction +from nav.models.manage import Netbox +from nav.event2 import EventFactory + +RefreshEvent = EventFactory("devBrowse", "ipdevpoll", event_type="notification") + + +def main(): + """Main program""" + args = parse_args() + if not args.netbox: + sys.exit(f"Sysname pattern {args.netbox.pattern!r} matched nothing") + + send_refresh_events(args.netbox, args.job) + + +@transaction.atomic +def send_refresh_events(netboxes: list[Netbox], job: str): + """Sends refresh events for all selected netboxes and jobs""" + for netbox in netboxes: + print(f"Sending refresh event for {netbox.sysname} job {job}") + event = RefreshEvent.notify(netbox=netbox, subid=job) + event.save() + + +def parse_args(): + """Builds an ArgumentParser and returns parsed program arguments""" + parser = argparse.ArgumentParser( + description="Sends job refresh commands to a running ipdevpoll daemon", + ) + parser.add_argument( + "netbox", + type=SysnamePattern, + help="sysname (or sysname prefix) of devices that should be refreshed. Be " + "aware that multiple devices can match.", + ) + parser.add_argument("job", type=non_empty_string, help="ipdevpoll job to refresh.") + return parser.parse_args() + + +class SysnamePattern(list): + """Looks up netboxes based on sysname patterns from arguments""" + + def __init__(self, pattern: str): + super().__init__() + self.pattern = pattern.strip() if pattern else "" + if not self.pattern: + raise ValueError("sysname pattern cannot be empty") + self.extend(Netbox.objects.filter(sysname__startswith=self.pattern)) + + +def non_empty_string(value: str): + """Validates a string to be non-empty""" + if not value.strip(): + raise argparse.ArgumentTypeError("cannot be empty") + return value.strip() + + +if __name__ == '__main__': + main() diff --git a/python/nav/ipdevpoll/daemon.py b/python/nav/ipdevpoll/daemon.py index 00484d1c7b..fd4debb47d 100644 --- a/python/nav/ipdevpoll/daemon.py +++ b/python/nav/ipdevpoll/daemon.py @@ -126,6 +126,9 @@ def setup_scheduling(self): self.work_pool, self.options.onlyjob, ) + reactor.callWhenRunning( + db.subscribe_to_event_notifications, schedule.handle_incoming_events + ) def log_scheduler_jobs(): JobScheduler.log_active_jobs(logging.INFO) @@ -195,6 +198,9 @@ def setup_multiprocess(self, process_count, max_jobs): self.work_pool, self.options.onlyjob, ) + reactor.callWhenRunning( + db.subscribe_to_event_notifications, schedule.handle_incoming_events + ) def log_scheduler_jobs(): JobScheduler.log_active_jobs(logging.INFO) diff --git a/python/nav/ipdevpoll/db.py b/python/nav/ipdevpoll/db.py index 7c6576e9b3..f4f6da7ef1 100644 --- a/python/nav/ipdevpoll/db.py +++ b/python/nav/ipdevpoll/db.py @@ -20,14 +20,17 @@ from pprint import pformat import threading from functools import wraps +from typing import Callable, Optional -from twisted.internet import threads +from twisted.internet import threads, reactor, abstract +from twisted.internet.base import ReactorBase import django.db from django.db import transaction from django.db.utils import OperationalError as DjangoOperationalError from django.db.utils import InterfaceError as DjangoInterfaceError from psycopg2 import InterfaceError, OperationalError + _logger = logging.getLogger(__name__) _query_logger = logging.getLogger(".".join((__name__, "query"))) @@ -159,3 +162,144 @@ def purge_old_job_log_entries(): WHERE ipdevpoll_job_log.id = ranked.id AND rank > 100; """ ) + + +def subscribe_to_event_notifications(trigger: Optional[Callable] = None): + """Ensures the Django database connection in the calling thread is subscribed to + 'new_event' notifications from PostgreSQL. Notification events will be read by + an instance of PostgresNotifyReader, which is added to the Twisted reactor. + + :param trigger: An optional callable trigger function that will be called from + the main reactor thread whenever matching refresh notifications are + received. + """ + + cursor = django.db.connection.cursor() + cursor.execute("LISTEN new_event") + django.db.connection.commit() + reader = PostgresNotifyReader(reactor, trigger) + _logger.debug( + "Subscribed to new event notifications from thread %r", + threading.current_thread(), + ) + reactor.addReader(reader) + + +def resubscribe_to_event_notifications(): + """Removes any existing PostgresNotifyReader from the reactor and adds a new one, + re-using the trigger function of the first removed one. + """ + trigger = _remove_postgres_reader_and_get_its_trigger_function() + + def retry_connect_loop(): + """Re-try event subscription every second, until the database connection has + been re-established. + """ + try: + subscribe_to_event_notifications(trigger) + except Exception as error: + _logger.debug( + "unable to resubscribe to events (%s), retrying in 1 second", + str(error).strip(), + ) + reactor.callLater(1, retry_connect_loop) + + reactor.callLater(1, retry_connect_loop) + + +def _remove_postgres_reader_and_get_its_trigger_function(): + postgres_readers: list[PostgresNotifyReader] = [ + reader + for reader in reactor.getReaders() + if isinstance(reader, PostgresNotifyReader) + ] + if not postgres_readers: + return + _logger.debug("Removing PostgresNotifyReaders: %r", postgres_readers) + primary = postgres_readers[0] + for reader in postgres_readers: + reactor.removeReader(reader) + + return primary.trigger + + +def resubscribe_on_connection_loss(func): + """Decorates function to re-subscribe to event notifications in the event of a + connection loss. + """ + + def _resubscribe(*args, **kwargs): + try: + return func(*args, **kwargs) + except ResetDBConnectionError: + _thread = threading.current_thread() + _logger.debug("resubscribing to event notifications") + resubscribe_to_event_notifications() + + return wraps(func)(_resubscribe) + + +class PostgresNotifyReader(abstract.FileDescriptor): + """Implements a FileDescriptor to act on PostgreSQL notifications asynchronously. + + The LISTEN subscription is run on a random thread in the threadpool. However, + getting the doRead code to run in the same thread as the originating connection + is not really feasible with the API of the Twisted threadpool - so this is just + designed to keep on trying until it's run by a thread where it succeeds. + """ + + def __init__( + self, + reactor: ReactorBase, + trigger: Optional[Callable] = None, + ): + """Initialize a postgres notification reader. + + :param reactor: The event reactor to use for scheduling + :param connection: The database connection object to poll for notifications + :param trigger: A trigger function to be called from the main reactor thread + when a refresh notifications is received. + """ + self.reactor = reactor + self.trigger = trigger + self._fileno = django.db.connection.connection.fileno() + + def fileno(self): + return self._fileno + + @resubscribe_on_connection_loss + @reset_connection_on_interface_error + def doRead(self): + _logger.debug("PostgresNotifyReader.doRead: checking for notifications") + + connection = django.db.connection.connection + if connection: + _logger.debug( + "check_for_notifications: polling for notifications from %r", + threading.current_thread(), + ) + connection.poll() + if connection.notifies: + _logger.debug("Found notifications: %r", connection.notifies) + if any(_is_a_new_ipdevpoll_event(c) for c in connection.notifies): + self.schedule_trigger() + del connection.notifies[:] + else: + _logger.debug( + "check_for_notifications: connection was empty in thread %r " + "(subscription is in %r)", + threading.current_thread(), + ) + + def schedule_trigger(self): + """Schedules the trigger function for an immediate run in the reactor thread""" + if not self.trigger: + return + _logger.debug( + "scheduling %r to be called from main reactor thread", self.trigger + ) + self.reactor.callInThread(self.trigger) + + +def _is_a_new_ipdevpoll_event(notification): + return notification.channel == "new_event" and notification.payload == "ipdevpoll" diff --git a/python/nav/ipdevpoll/schedule.py b/python/nav/ipdevpoll/schedule.py index d1e8ce089d..49280e6abe 100644 --- a/python/nav/ipdevpoll/schedule.py +++ b/python/nav/ipdevpoll/schedule.py @@ -24,6 +24,7 @@ from random import randint from math import ceil +from django.db.transaction import atomic from twisted.python.failure import Failure from twisted.internet import task, reactor from twisted.internet.defer import Deferred @@ -41,7 +42,9 @@ from . import shadows, config, signals from .dataloader import NetboxLoader +from .db import run_in_thread from .jobs import JobHandler, AbortedJobError, SuggestedReschedule +from ..models.event import EventQueue _logger = logging.getLogger(__name__) @@ -350,10 +353,18 @@ def __init__(self, job, pool): self.job = job self.pool = pool self.netboxes = NetboxLoader() - self.active_netboxes = {} + self.active_netboxes: dict[int, NetboxJobScheduler] = {} self.active_schedulers.add(self) + def __repr__(self): + return "<{} job={}>".format(self.__class__.__name__, self.job.name) + + @classmethod + def get_job_schedulers_by_name(cls) -> dict[str, 'JobScheduler']: + """Returns the names of actively scheduled jobs in this process""" + return {scheduler.job.name: scheduler for scheduler in cls.active_schedulers} + @classmethod def initialize_from_config_and_run(cls, pool, onlyjob=None): descriptors = config.get_jobs() @@ -544,3 +555,91 @@ def flush(self): _COUNTERS = CounterFlusher() + + +def handle_incoming_events(): + """Checks the event queue for events addressed to ipdevpoll and handles them""" + # Since this extensively accesses the database, it needs to run in a thread: + return run_in_thread(_handle_incoming_events) + + +@atomic +def _handle_incoming_events(): + events = EventQueue.objects.filter(target='ipdevpoll') + # Filter out (and potentially delete) events not worthy of our attention + events = [event for event in events if _event_pre_filter(event)] + + boxes_to_reschedule = defaultdict(list) + # There may be multiple notifications queued for the same request, so group them + # by netbox+jobname + for event in events: + boxes_to_reschedule[(event.netbox_id, event.subid)].append(event) + _logger.debug("boxes_to_reschedule: %r", boxes_to_reschedule) + + _reschedule_jobs(boxes_to_reschedule) + + +def _reschedule_jobs(boxes_to_reschedule: dict[tuple[int, str], list[EventQueue]]): + job_schedulers = JobScheduler.get_job_schedulers_by_name() + for (netbox_id, job_name), events in boxes_to_reschedule.items(): + first_event = events[0] + job_scheduler = job_schedulers[job_name] + netbox_scheduler = job_scheduler.active_netboxes[netbox_id] + _logger.info( + "Re-scheduling immediate %s run for %s as requested by %s", + first_event.netbox, + job_name, + first_event.source, + ) + # Ensure all re-scheduling happens in the main reactor thread: + reactor.callFromThread(netbox_scheduler.reschedule, 0) + # now we can safely delete all the events + for event in events: + event.delete() + + +def _event_pre_filter(event: EventQueue): + """Returns True if this event is worthy of this process' attention. If the event + isn't worthy of *any* ipdevpoll process' attention, we delete it from the database + too. + """ + _logger.debug("Found event on queue: %r", event) + if not _is_valid_refresh_event(event): + event.delete() + return False + if not _is_refresh_event_for_me(event): + return False + # TODO: Should also delete events that seem to be stale. If the requested job is + # logged as having run after the event's timestamp, the event is stale. + return True + + +def _is_valid_refresh_event(event: EventQueue): + if event.event_type_id != 'notification': + _logger.info("Ignoring non-notification event from %s", event.source) + return False + + if not event.subid: + _logger.info( + "Ignoring notification event from %s with blank job name", event.source + ) + return False + + return True + + +def _is_refresh_event_for_me(event: EventQueue): + schedulers = JobScheduler.get_job_schedulers_by_name() + if event.subid not in schedulers: + _logger.debug( + "This process does not schedule %s, %r is not for us", event.subid, event + ) + return False + + if event.netbox_id not in schedulers[event.subid].netboxes: + _logger.debug( + "This process does not poll from %s, %r is not for us", event.netbox, event + ) + return False + + return True diff --git a/python/nav/models/sql/changes/sc.05.13.0100.sql b/python/nav/models/sql/changes/sc.05.13.0100.sql new file mode 100644 index 0000000000..52a3e510a4 --- /dev/null +++ b/python/nav/models/sql/changes/sc.05.13.0100.sql @@ -0,0 +1,4 @@ +-- Change notification rule to include target subsystem as payload +CREATE OR REPLACE RULE eventq_notify AS + ON INSERT TO eventq + DO ALSO SELECT pg_notify('new_event', NEW.target);