Skip to content

Make ipdevpoll able to respond to refresh events #3348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ navdump = "nav.bin.navdump:main"
naventity = "nav.bin.naventity:main"
navoidverify = "nav.bin.navoidverify:main"
navpgdump = "nav.pgdump:main"
navrefresh = "nav.bin.navrefresh:main"
navsnmp = "nav.bin.navsnmp:main"
navstats = "nav.bin.navstats:main"
navsyncdb = "nav.pgsync:main"
Expand Down
86 changes: 86 additions & 0 deletions python/nav/bin/navrefresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python3
#
# Copyright 2025 Sikt
#
# This file is part of Network Administration Visualized (NAV).
#
# NAV is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License version 3 as published by
# the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details. You should have received a copy of the GNU General Public
# License along with NAV. If not, see <http://www.gnu.org/licenses/>.
#
"""A command line interface to send refresh commands to a running ipdevpoll daemon"""


import argparse
import sys

from nav.bootstrap import bootstrap_django

bootstrap_django(__file__)

from django.db import transaction
from nav.models.manage import Netbox
from nav.event2 import EventFactory

RefreshEvent = EventFactory("devBrowse", "ipdevpoll", event_type="notification")


def main():
"""Main program"""
args = parse_args()
if not args.netbox:
sys.exit(f"Sysname pattern {args.netbox.pattern!r} matched nothing")

send_refresh_events(args.netbox, args.job)


@transaction.atomic
def send_refresh_events(netboxes: list[Netbox], job: str):
"""Sends refresh events for all selected netboxes and jobs"""
for netbox in netboxes:
print(f"Sending refresh event for {netbox.sysname} job {job}")
event = RefreshEvent.notify(netbox=netbox, subid=job)
event.save()


def parse_args():
"""Builds an ArgumentParser and returns parsed program arguments"""
parser = argparse.ArgumentParser(
description="Sends job refresh commands to a running ipdevpoll daemon",
)
parser.add_argument(
"netbox",
type=SysnamePattern,
help="sysname (or sysname prefix) of devices that should be refreshed. Be "
"aware that multiple devices can match.",
)
parser.add_argument("job", type=non_empty_string, help="ipdevpoll job to refresh.")
return parser.parse_args()


class SysnamePattern(list):
"""Looks up netboxes based on sysname patterns from arguments"""

def __init__(self, pattern: str):
super().__init__()
self.pattern = pattern.strip() if pattern else ""
if not self.pattern:
raise ValueError("sysname pattern cannot be empty")
self.extend(Netbox.objects.filter(sysname__startswith=self.pattern))


def non_empty_string(value: str):
"""Validates a string to be non-empty"""
if not value.strip():
raise argparse.ArgumentTypeError("cannot be empty")
return value.strip()


if __name__ == '__main__':
main()
6 changes: 6 additions & 0 deletions python/nav/ipdevpoll/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def setup_scheduling(self):
self.work_pool,
self.options.onlyjob,
)
reactor.callWhenRunning(
db.subscribe_to_event_notifications, schedule.handle_incoming_events
)

def log_scheduler_jobs():
JobScheduler.log_active_jobs(logging.INFO)
Expand Down Expand Up @@ -195,6 +198,9 @@ def setup_multiprocess(self, process_count, max_jobs):
self.work_pool,
self.options.onlyjob,
)
reactor.callWhenRunning(
db.subscribe_to_event_notifications, schedule.handle_incoming_events
)

def log_scheduler_jobs():
JobScheduler.log_active_jobs(logging.INFO)
Expand Down
146 changes: 145 additions & 1 deletion python/nav/ipdevpoll/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
from pprint import pformat
import threading
from functools import wraps
from typing import Callable, Optional

from twisted.internet import threads
from twisted.internet import threads, reactor, abstract
from twisted.internet.base import ReactorBase
import django.db
from django.db import transaction
from django.db.utils import OperationalError as DjangoOperationalError
from django.db.utils import InterfaceError as DjangoInterfaceError
from psycopg2 import InterfaceError, OperationalError


_logger = logging.getLogger(__name__)
_query_logger = logging.getLogger(".".join((__name__, "query")))

Expand Down Expand Up @@ -159,3 +162,144 @@ def purge_old_job_log_entries():
WHERE ipdevpoll_job_log.id = ranked.id AND rank > 100;
"""
)


def subscribe_to_event_notifications(trigger: Optional[Callable] = None):
"""Ensures the Django database connection in the calling thread is subscribed to
'new_event' notifications from PostgreSQL. Notification events will be read by
an instance of PostgresNotifyReader, which is added to the Twisted reactor.

:param trigger: An optional callable trigger function that will be called from
the main reactor thread whenever matching refresh notifications are
received.
"""

cursor = django.db.connection.cursor()
cursor.execute("LISTEN new_event")
django.db.connection.commit()
reader = PostgresNotifyReader(reactor, trigger)
_logger.debug(
"Subscribed to new event notifications from thread %r",
threading.current_thread(),
)
reactor.addReader(reader)


def resubscribe_to_event_notifications():
"""Removes any existing PostgresNotifyReader from the reactor and adds a new one,
re-using the trigger function of the first removed one.
"""
trigger = _remove_postgres_reader_and_get_its_trigger_function()

def retry_connect_loop():
"""Re-try event subscription every second, until the database connection has
been re-established.
"""
try:
subscribe_to_event_notifications(trigger)
except Exception as error:
_logger.debug(
"unable to resubscribe to events (%s), retrying in 1 second",
str(error).strip(),
)
reactor.callLater(1, retry_connect_loop)

reactor.callLater(1, retry_connect_loop)


def _remove_postgres_reader_and_get_its_trigger_function():
postgres_readers: list[PostgresNotifyReader] = [
reader
for reader in reactor.getReaders()
if isinstance(reader, PostgresNotifyReader)
]
if not postgres_readers:
return
_logger.debug("Removing PostgresNotifyReaders: %r", postgres_readers)
primary = postgres_readers[0]
for reader in postgres_readers:
reactor.removeReader(reader)

return primary.trigger


def resubscribe_on_connection_loss(func):
"""Decorates function to re-subscribe to event notifications in the event of a
connection loss.
"""

def _resubscribe(*args, **kwargs):
try:
return func(*args, **kwargs)
except ResetDBConnectionError:
_thread = threading.current_thread()
_logger.debug("resubscribing to event notifications")
resubscribe_to_event_notifications()

return wraps(func)(_resubscribe)


class PostgresNotifyReader(abstract.FileDescriptor):
"""Implements a FileDescriptor to act on PostgreSQL notifications asynchronously.

The LISTEN subscription is run on a random thread in the threadpool. However,
getting the doRead code to run in the same thread as the originating connection
is not really feasible with the API of the Twisted threadpool - so this is just
designed to keep on trying until it's run by a thread where it succeeds.
"""

def __init__(
self,
reactor: ReactorBase,
trigger: Optional[Callable] = None,
):
"""Initialize a postgres notification reader.

:param reactor: The event reactor to use for scheduling
:param connection: The database connection object to poll for notifications
:param trigger: A trigger function to be called from the main reactor thread
when a refresh notifications is received.
"""
self.reactor = reactor
self.trigger = trigger
self._fileno = django.db.connection.connection.fileno()

def fileno(self):
return self._fileno

@resubscribe_on_connection_loss
@reset_connection_on_interface_error
def doRead(self):
_logger.debug("PostgresNotifyReader.doRead: checking for notifications")

connection = django.db.connection.connection
if connection:
_logger.debug(
"check_for_notifications: polling for notifications from %r",
threading.current_thread(),
)
connection.poll()
if connection.notifies:
_logger.debug("Found notifications: %r", connection.notifies)
if any(_is_a_new_ipdevpoll_event(c) for c in connection.notifies):
self.schedule_trigger()
del connection.notifies[:]
else:
_logger.debug(
"check_for_notifications: connection was empty in thread %r "
"(subscription is in %r)",
threading.current_thread(),
)

def schedule_trigger(self):
"""Schedules the trigger function for an immediate run in the reactor thread"""
if not self.trigger:
return
_logger.debug(
"scheduling %r to be called from main reactor thread", self.trigger
)
self.reactor.callInThread(self.trigger)


def _is_a_new_ipdevpoll_event(notification):
return notification.channel == "new_event" and notification.payload == "ipdevpoll"
Loading
Loading