Skip to content

Commit

Permalink
#66 work so far
Browse files Browse the repository at this point in the history
  • Loading branch information
DarylDohner committed Apr 3, 2023
1 parent 9399136 commit f0bcb54
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 108 deletions.
55 changes: 34 additions & 21 deletions EosPayload/lib/orcheostrator/driver_container.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, unique
from multiprocessing import Process
Expand All @@ -17,32 +18,44 @@ class Status(Enum):
TERMINATED = 5


@dataclass
class StatusUpdate:
def __init__(self, driver_id: int, status: Status, thread_count: int, reporter: Device, effective: datetime):
self.driver_id = driver_id
self.status = status
self.thread_count = thread_count
self.reporter = reporter
self.effective = effective
driver_id: Device = None
status: Status = None
thread_count: int = None
reporter: Device = None
effective: datetime = None

def update(self, other):
if other.effective is not None and other.effective >= self.effective:
if other.driver_id is not None:
self.driver_id = other.driver_id
if other.status is not None:
self.status = other.status
if other.thread_count is not None:
self.thread_count = other.thread_count
if other.reporter is not None:
self.reporter = other.reporter


class DriverContainer:

def __init__(self, driver: DriverBase, process: Process = None):
self.driver = driver
self.process = process
self.status = Status.NONE
self.thread_count = 0
self.status_reporter = Device.NO_DEVICE
self.status_since = datetime.now()

def update_status(self, status: Status, thread_count: int = 0, reporter: Device = Device.ORCHEOSTRATOR,
effective: datetime = None):
if effective is None:
effective = datetime.now()

if effective >= self.status_since:
self.status = status
self.thread_count = thread_count
self.status_reporter = reporter
self.status_since = effective
self.status = StatusUpdate(
status=Status.NONE,
thread_count=0,
reporter=Device.NO_DEVICE,
effective=datetime.now()
)

def update_status(self, status_update: StatusUpdate = None, status: Status = None, thread_count: int = None,
reporter: Device = Device.ORCHEOSTRATOR, effective: datetime = None):
if status_update is None:
if effective is None:
effective = datetime.now()
status_update = StatusUpdate(status=status, thread_count=thread_count,
reporter=reporter, effective=effective)

self.status.update(status_update)
100 changes: 100 additions & 0 deletions EosPayload/lib/orcheostrator/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from datetime import datetime, timedelta
from queue import Queue
import logging
import threading
import traceback

from EosLib import Device
from EosLib.packet.packet import Packet

from EosPayload.lib.mqtt.client import Client
from EosPayload.lib.orcheostrator.driver_container import DriverContainer, Status, StatusUpdate


class Health:

@staticmethod
def health_monitor(_client, user_data, message):
try:
packet = None
try:
packet = Packet.decode(message.payload)
except Exception as e:
user_data['logger'].error(f"failed to decode health packet: {e}")
return

user_data['logger'].info(f"received health packet from device id={packet.data_header.sender}")

if packet.data_header.sender != user_data['device_id']:
# extracts just the first two fields (is_healthy, thread_count), ignores the rest
is_healthy, thread_count, _ = packet.body.decode('ascii').split(',', 2)

status_update = StatusUpdate(
driver_id=packet.data_header.sender,
status=Status.HEALTHY if int(is_healthy) else Status.UNHEALTHY,
thread_count=thread_count,
reporter=packet.data_header.sender,
effective=packet.data_header.generate_time
)

user_data['queue'].put(status_update)
except Exception as e:
# this is needed b/c apparently an exception in a callback kills the mqtt thread
user_data['logger'].error(f"an unhandled exception occurred while processing health_monitor: {e}"
f"\n{traceback.format_exc()}")

@staticmethod
def health_check(driver_list: dict[Device|str, DriverContainer], update_queue: Queue,
logger: logging.Logger) -> None:
try:
logger.info("Starting Health Check")
while not update_queue.empty():
status_update = update_queue.get()
driver_list[status_update.driver_id].update_status(status_update)

for key, driver in driver_list.items():
# auto set terminated if it died
if driver.status.status in [Status.HEALTHY, Status.UNHEALTHY]\
and (driver.process is None or not driver.process.is_alive()):
logger.critical(f"process for driver {key} is no longer running -- marking terminated")
driver.update_status(Status.TERMINATED)

# auto set unhealthy if we haven't had a ping in the last 30s from this device
if driver.status.status == Status.HEALTHY\
and driver.status.effective < (datetime.now() - timedelta(seconds=30)):
logger.critical(f"haven't received a health ping from driver {key} in 30s -- marking unhealthy")
driver.update_status(Status.UNHEALTHY)

logger.info(Health.generate_report(driver_list))

logger.info("Done Checking Health")
except Exception as e:
logger.critical("An exception occurred when attempting to perform health check:"
f" {e}\n{traceback.format_exc()}")

@staticmethod
def generate_report(driver_list: dict[Device|str, DriverContainer]) -> str:
report = {}
for status in Status:
report[status] = []

num_threads = threading.active_count()
for key, driver in driver_list.items():
the_key = key if driver.status in [Status.NONE, Status.INVALID] else driver.driver.get_device_pretty_id()
report[driver.status].append(f"{the_key} ({driver.status.thread_count} threads)"
f" as of {driver.status.effective} (reported by {driver.status.reporter}"
f" [{Device(driver.status.reporter).name}])")
num_threads += int(driver.status.thread_count)

report_string = f"Health Report: \n{len(report[Status.HEALTHY])} drivers running"
report_string += f"\n{num_threads} total threads in use ({threading.active_count()} by OrchEOStrator)"
for status, reports in report.items():
report_string += f"\n\t{status}:"
for item in reports:
report_string += f"\n\t\t{item}"

return report_string

@staticmethod
def publish_health_update(mqtt: Client, logger: logging.Logger, device_id: Device, ):

102 changes: 15 additions & 87 deletions EosPayload/lib/orcheostrator/orcheostrator.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
from datetime import datetime, timedelta
from multiprocessing import Process
from queue import Queue
import inspect
import logging
import os
import threading
import time
import traceback

from EosLib import Device
from EosLib.packet.packet import Packet

from EosPayload.lib.driver_base import DriverBase
from EosPayload.lib.logger import init_logging
from EosPayload.lib.mqtt import MQTT_HOST
from EosPayload.lib.mqtt.client import Client, Topic
from EosPayload.lib.orcheostrator.driver_container import DriverContainer, Status, StatusUpdate
from EosPayload.lib.orcheostrator.driver_container import DriverContainer, Status
from EosPayload.lib.orcheostrator.health import Health
import EosPayload.drivers as drivers


Expand Down Expand Up @@ -45,16 +43,20 @@ def __init__(self, output_directory: str):

try:
self._mqtt = Client(MQTT_HOST)
self._mqtt.user_data_set({'logger': self._logger, 'queue': self._health_queue})
self._mqtt.register_subscriber(Topic.HEALTH_HEARTBEAT, self.health_monitor)
self._mqtt.user_data_set({
'device_id': Device.ORCHEOSTRATOR,
'logger': self._logger,
'queue': self._health_queue
})
self._mqtt.register_subscriber(Topic.HEALTH_HEARTBEAT, Health.health_monitor)
except Exception as e:
self._logger.critical(f"Failed to setup MQTT: {e}\n{traceback.format_exc()}")

def __del__(self):
""" Destructor. Terminates all drivers. """
self._logger.info("shutting down")
self.terminate()
self._health_check()
Health.health_check(self._drivers, self._health_queue, self._logger)
logging.shutdown()

#
Expand All @@ -64,7 +66,7 @@ def __del__(self):
def run(self) -> None:
self._spawn_drivers()
while True:
self._health_check()
Health.health_check(self._drivers, self._health_queue, self._logger)
time.sleep(10)
# future: anything else OrchEOStrator is responsible for doing. Perhaps handling "force terminate" commands
# or MQTT things
Expand All @@ -75,7 +77,7 @@ def terminate(self) -> None:
if driver.status in [Status.HEALTHY, Status.UNHEALTHY]:
self._logger.info(f"terminating process for device id {device_id}")
driver.process.terminate()
driver.update_status(Status.TERMINATED)
driver.update_status(status=Status.TERMINATED)

@staticmethod
def valid_driver(driver) -> bool:
Expand Down Expand Up @@ -106,26 +108,26 @@ def _spawn_drivers(self) -> None:
if driver.get_device_id() is None:
self._logger.error(f"can't spawn process for device from class '{driver.__name__}'"
" because device id is not defined")
container.update_status(Status.INVALID)
container.update_status(status=Status.INVALID)
self._drivers['<' + driver.__name__ + '>'] = container
continue
if not driver.enabled():
self._logger.warning(f"skipping device '{driver.get_device_pretty_id()}' from"
f" class '{driver.__name__}' because it is not enabled")
container.update_status(Status.DISABLED)
container.update_status(status=Status.DISABLED)
self._drivers[driver.get_device_id()] = container
continue
self._logger.info(f"spawning process for device '{driver.get_device_pretty_id()}' from"
f" class '{driver.__name__}'")
proc = Process(target=self._driver_runner, args=(driver, self.output_directory), daemon=True)
container.process = proc
container.update_status(Status.HEALTHY, 1)
container.update_status(status=Status.HEALTHY, thread_count=1)
proc.start()
self._drivers[driver.get_device_id()] = container
except Exception as e:
self._logger.critical("A fatal exception occurred when attempting to load driver from"
f" class '{driver.__name__}': {e}\n{traceback.format_exc()}")
container.update_status(Status.INVALID)
container.update_status(status=Status.INVALID)
self._drivers['<' + driver.__name__ + '>'] = container
self._logger.info("Done Spawning Drivers")

Expand All @@ -138,80 +140,6 @@ def _driver_runner(cls, output_directory: str) -> None:
"""
cls(output_directory).run()

@staticmethod
def health_monitor(_client, user_data, message):
try:
packet = None
try:
packet = Packet.decode(message.payload)
except Exception as e:
user_data['logger'].error(f"failed to decode health packet: {e}")
return

user_data['logger'].info(f"received health packet from device id={packet.data_header.sender}")

is_healthy, thread_count, _ = packet.body.decode('ascii').split(',', 2)

status_update = StatusUpdate(
driver_id=packet.data_header.sender,
status=Status.HEALTHY if int(is_healthy) else Status.UNHEALTHY,
thread_count=thread_count,
reporter=packet.data_header.sender,
effective=packet.data_header.generate_time
)

user_data['queue'].put(status_update)
except Exception as e:
# this is needed b/c apparently an exception in a callback kills the mqtt thread
user_data['logger'].error(f"an unhandled exception occurred while processing health_monitor: {e}"
f"\n{traceback.format_exc()}")

def _health_check(self) -> None:
try:
self._logger.info("Starting Health Check")
while not self._health_queue.empty():
status_update = self._health_queue.get()
self._drivers[status_update.driver_id].update_status(
status_update.status,
status_update.thread_count,
status_update.reporter,
status_update.effective,
)

num_threads = threading.active_count()
report = {}
for status in Status:
report[status] = []
for key, driver in self._drivers.items():
# auto set terminated if it died
if driver.status in [Status.HEALTHY, Status.UNHEALTHY] and (driver.process is None or not driver.process.is_alive()):
self._logger.critical(f"process for driver {key} is no longer running -- marking terminated")
driver.update_status(Status.TERMINATED)

# auto set unhealthy if we haven't had a ping in the last 30s from this device
if driver.status == Status.HEALTHY and driver.status_since < (datetime.now() - timedelta(seconds=30)):
self._logger.critical(f"haven't received a health ping from driver {key} in 30s -- marking unhealthy")
driver.update_status(Status.UNHEALTHY)

the_key = key if driver.status in [Status.NONE, Status.INVALID] else driver.driver.get_device_pretty_id()
report[driver.status].append(f"{the_key} ({driver.thread_count} threads)"
f" as of {driver.status_since} (reported by {driver.status_reporter}"
f" [{Device(driver.status_reporter).name}])")
num_threads += int(driver.thread_count)

report_string = f"Health Report: \n{len(report[Status.HEALTHY])} drivers running"
report_string += f"\n{num_threads} total threads in use ({threading.active_count()} by OrchEOStrator)"
for status, reports in report.items():
report_string += f"\n\t{status}:"
for item in reports:
report_string += f"\n\t\t{item}"
self._logger.info(report_string)

self._logger.info("Done Checking Health")
except Exception as e:
self._logger.critical("An exception occurred when attempting to perform health check:"
f" {e}\n{traceback.format_exc()}")

@staticmethod
def _spin() -> None:
""" Spins to keep the software alive. Never returns. """
Expand Down

0 comments on commit f0bcb54

Please sign in to comment.