From f0bcb5466392f8135b7117d63ba734d87c0a4a1b Mon Sep 17 00:00:00 2001 From: Daryl Dohner Date: Mon, 3 Apr 2023 11:03:40 -0400 Subject: [PATCH] #66 work so far --- .../lib/orcheostrator/driver_container.py | 55 ++++++---- EosPayload/lib/orcheostrator/health.py | 100 +++++++++++++++++ EosPayload/lib/orcheostrator/orcheostrator.py | 102 +++--------------- 3 files changed, 149 insertions(+), 108 deletions(-) create mode 100644 EosPayload/lib/orcheostrator/health.py diff --git a/EosPayload/lib/orcheostrator/driver_container.py b/EosPayload/lib/orcheostrator/driver_container.py index fee0e58..1b2cd27 100644 --- a/EosPayload/lib/orcheostrator/driver_container.py +++ b/EosPayload/lib/orcheostrator/driver_container.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass from datetime import datetime from enum import Enum, unique from multiprocessing import Process @@ -17,13 +18,24 @@ class Status(Enum): TERMINATED = 5 +@dataclass class StatusUpdate: - def __init__(self, driver_id: int, status: Status, thread_count: int, reporter: Device, effective: datetime): - self.driver_id = driver_id - self.status = status - self.thread_count = thread_count - self.reporter = reporter - self.effective = effective + driver_id: Device = None + status: Status = None + thread_count: int = None + reporter: Device = None + effective: datetime = None + + def update(self, other): + if other.effective is not None and other.effective >= self.effective: + if other.driver_id is not None: + self.driver_id = other.driver_id + if other.status is not None: + self.status = other.status + if other.thread_count is not None: + self.thread_count = other.thread_count + if other.reporter is not None: + self.reporter = other.reporter class DriverContainer: @@ -31,18 +43,19 @@ class DriverContainer: def __init__(self, driver: DriverBase, process: Process = None): self.driver = driver self.process = process - self.status = Status.NONE - self.thread_count = 0 - self.status_reporter = Device.NO_DEVICE - self.status_since = datetime.now() - - def update_status(self, status: Status, thread_count: int = 0, reporter: Device = Device.ORCHEOSTRATOR, - effective: datetime = None): - if effective is None: - effective = datetime.now() - - if effective >= self.status_since: - self.status = status - self.thread_count = thread_count - self.status_reporter = reporter - self.status_since = effective + self.status = StatusUpdate( + status=Status.NONE, + thread_count=0, + reporter=Device.NO_DEVICE, + effective=datetime.now() + ) + + def update_status(self, status_update: StatusUpdate = None, status: Status = None, thread_count: int = None, + reporter: Device = Device.ORCHEOSTRATOR, effective: datetime = None): + if status_update is None: + if effective is None: + effective = datetime.now() + status_update = StatusUpdate(status=status, thread_count=thread_count, + reporter=reporter, effective=effective) + + self.status.update(status_update) diff --git a/EosPayload/lib/orcheostrator/health.py b/EosPayload/lib/orcheostrator/health.py new file mode 100644 index 0000000..ef77530 --- /dev/null +++ b/EosPayload/lib/orcheostrator/health.py @@ -0,0 +1,100 @@ +from datetime import datetime, timedelta +from queue import Queue +import logging +import threading +import traceback + +from EosLib import Device +from EosLib.packet.packet import Packet + +from EosPayload.lib.mqtt.client import Client +from EosPayload.lib.orcheostrator.driver_container import DriverContainer, Status, StatusUpdate + + +class Health: + + @staticmethod + def health_monitor(_client, user_data, message): + try: + packet = None + try: + packet = Packet.decode(message.payload) + except Exception as e: + user_data['logger'].error(f"failed to decode health packet: {e}") + return + + user_data['logger'].info(f"received health packet from device id={packet.data_header.sender}") + + if packet.data_header.sender != user_data['device_id']: + # extracts just the first two fields (is_healthy, thread_count), ignores the rest + is_healthy, thread_count, _ = packet.body.decode('ascii').split(',', 2) + + status_update = StatusUpdate( + driver_id=packet.data_header.sender, + status=Status.HEALTHY if int(is_healthy) else Status.UNHEALTHY, + thread_count=thread_count, + reporter=packet.data_header.sender, + effective=packet.data_header.generate_time + ) + + user_data['queue'].put(status_update) + except Exception as e: + # this is needed b/c apparently an exception in a callback kills the mqtt thread + user_data['logger'].error(f"an unhandled exception occurred while processing health_monitor: {e}" + f"\n{traceback.format_exc()}") + + @staticmethod + def health_check(driver_list: dict[Device|str, DriverContainer], update_queue: Queue, + logger: logging.Logger) -> None: + try: + logger.info("Starting Health Check") + while not update_queue.empty(): + status_update = update_queue.get() + driver_list[status_update.driver_id].update_status(status_update) + + for key, driver in driver_list.items(): + # auto set terminated if it died + if driver.status.status in [Status.HEALTHY, Status.UNHEALTHY]\ + and (driver.process is None or not driver.process.is_alive()): + logger.critical(f"process for driver {key} is no longer running -- marking terminated") + driver.update_status(Status.TERMINATED) + + # auto set unhealthy if we haven't had a ping in the last 30s from this device + if driver.status.status == Status.HEALTHY\ + and driver.status.effective < (datetime.now() - timedelta(seconds=30)): + logger.critical(f"haven't received a health ping from driver {key} in 30s -- marking unhealthy") + driver.update_status(Status.UNHEALTHY) + + logger.info(Health.generate_report(driver_list)) + + logger.info("Done Checking Health") + except Exception as e: + logger.critical("An exception occurred when attempting to perform health check:" + f" {e}\n{traceback.format_exc()}") + + @staticmethod + def generate_report(driver_list: dict[Device|str, DriverContainer]) -> str: + report = {} + for status in Status: + report[status] = [] + + num_threads = threading.active_count() + for key, driver in driver_list.items(): + the_key = key if driver.status in [Status.NONE, Status.INVALID] else driver.driver.get_device_pretty_id() + report[driver.status].append(f"{the_key} ({driver.status.thread_count} threads)" + f" as of {driver.status.effective} (reported by {driver.status.reporter}" + f" [{Device(driver.status.reporter).name}])") + num_threads += int(driver.status.thread_count) + + report_string = f"Health Report: \n{len(report[Status.HEALTHY])} drivers running" + report_string += f"\n{num_threads} total threads in use ({threading.active_count()} by OrchEOStrator)" + for status, reports in report.items(): + report_string += f"\n\t{status}:" + for item in reports: + report_string += f"\n\t\t{item}" + + return report_string + + @staticmethod + def publish_health_update(mqtt: Client, logger: logging.Logger, device_id: Device, ): + diff --git a/EosPayload/lib/orcheostrator/orcheostrator.py b/EosPayload/lib/orcheostrator/orcheostrator.py index 5eb516c..92ca5c7 100644 --- a/EosPayload/lib/orcheostrator/orcheostrator.py +++ b/EosPayload/lib/orcheostrator/orcheostrator.py @@ -1,21 +1,19 @@ -from datetime import datetime, timedelta from multiprocessing import Process from queue import Queue import inspect import logging import os -import threading import time import traceback from EosLib import Device -from EosLib.packet.packet import Packet from EosPayload.lib.driver_base import DriverBase from EosPayload.lib.logger import init_logging from EosPayload.lib.mqtt import MQTT_HOST from EosPayload.lib.mqtt.client import Client, Topic -from EosPayload.lib.orcheostrator.driver_container import DriverContainer, Status, StatusUpdate +from EosPayload.lib.orcheostrator.driver_container import DriverContainer, Status +from EosPayload.lib.orcheostrator.health import Health import EosPayload.drivers as drivers @@ -45,8 +43,12 @@ def __init__(self, output_directory: str): try: self._mqtt = Client(MQTT_HOST) - self._mqtt.user_data_set({'logger': self._logger, 'queue': self._health_queue}) - self._mqtt.register_subscriber(Topic.HEALTH_HEARTBEAT, self.health_monitor) + self._mqtt.user_data_set({ + 'device_id': Device.ORCHEOSTRATOR, + 'logger': self._logger, + 'queue': self._health_queue + }) + self._mqtt.register_subscriber(Topic.HEALTH_HEARTBEAT, Health.health_monitor) except Exception as e: self._logger.critical(f"Failed to setup MQTT: {e}\n{traceback.format_exc()}") @@ -54,7 +56,7 @@ def __del__(self): """ Destructor. Terminates all drivers. """ self._logger.info("shutting down") self.terminate() - self._health_check() + Health.health_check(self._drivers, self._health_queue, self._logger) logging.shutdown() # @@ -64,7 +66,7 @@ def __del__(self): def run(self) -> None: self._spawn_drivers() while True: - self._health_check() + Health.health_check(self._drivers, self._health_queue, self._logger) time.sleep(10) # future: anything else OrchEOStrator is responsible for doing. Perhaps handling "force terminate" commands # or MQTT things @@ -75,7 +77,7 @@ def terminate(self) -> None: if driver.status in [Status.HEALTHY, Status.UNHEALTHY]: self._logger.info(f"terminating process for device id {device_id}") driver.process.terminate() - driver.update_status(Status.TERMINATED) + driver.update_status(status=Status.TERMINATED) @staticmethod def valid_driver(driver) -> bool: @@ -106,26 +108,26 @@ def _spawn_drivers(self) -> None: if driver.get_device_id() is None: self._logger.error(f"can't spawn process for device from class '{driver.__name__}'" " because device id is not defined") - container.update_status(Status.INVALID) + container.update_status(status=Status.INVALID) self._drivers['<' + driver.__name__ + '>'] = container continue if not driver.enabled(): self._logger.warning(f"skipping device '{driver.get_device_pretty_id()}' from" f" class '{driver.__name__}' because it is not enabled") - container.update_status(Status.DISABLED) + container.update_status(status=Status.DISABLED) self._drivers[driver.get_device_id()] = container continue self._logger.info(f"spawning process for device '{driver.get_device_pretty_id()}' from" f" class '{driver.__name__}'") proc = Process(target=self._driver_runner, args=(driver, self.output_directory), daemon=True) container.process = proc - container.update_status(Status.HEALTHY, 1) + container.update_status(status=Status.HEALTHY, thread_count=1) proc.start() self._drivers[driver.get_device_id()] = container except Exception as e: self._logger.critical("A fatal exception occurred when attempting to load driver from" f" class '{driver.__name__}': {e}\n{traceback.format_exc()}") - container.update_status(Status.INVALID) + container.update_status(status=Status.INVALID) self._drivers['<' + driver.__name__ + '>'] = container self._logger.info("Done Spawning Drivers") @@ -138,80 +140,6 @@ def _driver_runner(cls, output_directory: str) -> None: """ cls(output_directory).run() - @staticmethod - def health_monitor(_client, user_data, message): - try: - packet = None - try: - packet = Packet.decode(message.payload) - except Exception as e: - user_data['logger'].error(f"failed to decode health packet: {e}") - return - - user_data['logger'].info(f"received health packet from device id={packet.data_header.sender}") - - is_healthy, thread_count, _ = packet.body.decode('ascii').split(',', 2) - - status_update = StatusUpdate( - driver_id=packet.data_header.sender, - status=Status.HEALTHY if int(is_healthy) else Status.UNHEALTHY, - thread_count=thread_count, - reporter=packet.data_header.sender, - effective=packet.data_header.generate_time - ) - - user_data['queue'].put(status_update) - except Exception as e: - # this is needed b/c apparently an exception in a callback kills the mqtt thread - user_data['logger'].error(f"an unhandled exception occurred while processing health_monitor: {e}" - f"\n{traceback.format_exc()}") - - def _health_check(self) -> None: - try: - self._logger.info("Starting Health Check") - while not self._health_queue.empty(): - status_update = self._health_queue.get() - self._drivers[status_update.driver_id].update_status( - status_update.status, - status_update.thread_count, - status_update.reporter, - status_update.effective, - ) - - num_threads = threading.active_count() - report = {} - for status in Status: - report[status] = [] - for key, driver in self._drivers.items(): - # auto set terminated if it died - if driver.status in [Status.HEALTHY, Status.UNHEALTHY] and (driver.process is None or not driver.process.is_alive()): - self._logger.critical(f"process for driver {key} is no longer running -- marking terminated") - driver.update_status(Status.TERMINATED) - - # auto set unhealthy if we haven't had a ping in the last 30s from this device - if driver.status == Status.HEALTHY and driver.status_since < (datetime.now() - timedelta(seconds=30)): - self._logger.critical(f"haven't received a health ping from driver {key} in 30s -- marking unhealthy") - driver.update_status(Status.UNHEALTHY) - - the_key = key if driver.status in [Status.NONE, Status.INVALID] else driver.driver.get_device_pretty_id() - report[driver.status].append(f"{the_key} ({driver.thread_count} threads)" - f" as of {driver.status_since} (reported by {driver.status_reporter}" - f" [{Device(driver.status_reporter).name}])") - num_threads += int(driver.thread_count) - - report_string = f"Health Report: \n{len(report[Status.HEALTHY])} drivers running" - report_string += f"\n{num_threads} total threads in use ({threading.active_count()} by OrchEOStrator)" - for status, reports in report.items(): - report_string += f"\n\t{status}:" - for item in reports: - report_string += f"\n\t\t{item}" - self._logger.info(report_string) - - self._logger.info("Done Checking Health") - except Exception as e: - self._logger.critical("An exception occurred when attempting to perform health check:" - f" {e}\n{traceback.format_exc()}") - @staticmethod def _spin() -> None: """ Spins to keep the software alive. Never returns. """