-
Notifications
You must be signed in to change notification settings - Fork 220
Status LED on failure #1136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
you could write a client to the websocket server that the station and dashboard servers expose.
import collections
import itertools
import json
import logging
import asyncio
import secrets
import string
import time
from typing import Union
import websockets
import random
from gpiozero import RGBLED
import backoff
import argparse
RED = (1, 0, 0)
GREEN = (0, 1, 0)
BLUE = (0, 0, 1)
PURPLE = (1, 0, 1)
KEEP_LED_IN_CURRENT_STATE = lambda: None
logger = logging.getLogger()
ONE_MINUTE = 60
def unwrap(payload):
payload = str(payload)
return payload[1:] if len(payload) > 1 else None
@backoff.on_exception(backoff.expo, ConnectionRefusedError, max_time=ONE_MINUTE)
@backoff.on_exception(backoff.expo, websockets.ConnectionClosed, max_time=1)
async def test_status_ui(logger, uri, station_id, led):
async with websockets.connect(uri) as websocket:
logger.info(f"Started test status UI")
while True:
payload = await websocket.recv()
if payload is None:
break # TODO continue instead ?
try:
msg = unwrap(payload)
if msg is None:
continue
latest_test = latest_test_execution(json.loads(msg), station_id)
show_test_status(logger, latest_test, led)
except:
logger.exception(f"unable to handle payload: {payload}")
continue # ignore
@backoff.on_exception(backoff.expo, ConnectionRefusedError, max_time=ONE_MINUTE)
@backoff.on_exception(backoff.expo, websockets.ConnectionClosed, max_time=1)
async def station_status_ui(logger, uri, station_id, led):
was_connected = False
try:
async with websockets.connect(uri) as websocket:
logger.info(f"Started station UI")
was_connected = True
while True:
payload = await websocket.recv()
if payload is None:
break # TODO continue instead ?
try:
msg = unwrap(payload)
if msg is None:
continue
def get_station(event, station_id):
hosts_stations = map(lambda host: host.values(), event)
stations = itertools.chain(*hosts_stations)
get_id = lambda station : station.get("station_id", None)
return max(filter(lambda station: get_id(station) == station_id, stations), key = get_id, default=None)
station = get_station(json.loads(msg), station_id)
if station is None:
continue
station_status = station.get("status", "maybe offline")
logger.info(f"station state is: {station_status}")
set_led_color(led, GREEN if station_status == "ONLINE" else RED)()
except:
logger.exception(f"unable to handle payload: {payload}")
continue # ignore
except (websockets.ConnectionClosed, ConnectionRefusedError):
if was_connected:
logger.info(f"unknown station status: dashboard is unreachable")
led.off()
was_connected = False
raise
def set_led_color(led, color):
def handler():
led.color = color
return handler
def blink(led, color):
def handler():
led.blink(on_color = color, on_time=.2, off_time=.2)
return handler
TestExecutionProgress = collections.namedtuple("TestExecutionProgress", ["status", "outcome", "execution", "test", "start_time_millis", "end_time_millis"])
def as_test_execution(event):
test_state = event.get("state", {})
test_record = test_state.get("test_record", {})
execution_uid = test_state.get("execution_uid", None)
test_uid = event.get("test_uid", None)
if execution_uid and test_uid:
return TestExecutionProgress(
execution = execution_uid,
test = test_uid,
# status enum values at https://github.com/google/openhtf/blob/c85fb069a1ce407e82bb47a8fb1b64220e974c5f/openhtf/core/test_state.py#L152
status = test_state.get("status", "UNKNOWN"),
# outcome enum values at https://github.com/google/openhtf/blob/c85fb069a1ce407e82bb47a8fb1b64220e974c5f/openhtf/core/test_record.py#L54
outcome = test_record.get("outcome", "UNKNOWN"),
start_time_millis = test_record.get("start_time_millis", None),
end_time_millis = test_record.get("end_time_millis", None)
)
return None
def latest_test_execution(events, station_id):
referenced_station = lambda event: event.get("state", {}).get("test_record", {}).get("station_id", None)
events_of_watched_station = filter(lambda event: referenced_station(event) == station_id, events)
test_executions = list(filter(lambda e: e is not None, map(as_test_execution, events_of_watched_station)))
now_in_millis = int(round(time.time() * 1000))
return max(test_executions, key = lambda execution: execution.start_time_millis or now_in_millis, default = None)
def show_test_status(logger, test: Union[TestExecutionProgress, None], led):
if test is None:
return
logger.info(f"test status is {test.status} with outcome {test.outcome}")
logger.debug(f"showing status of test {json.dumps(test, indent=2)}")
display_status = {
"WAITING_FOR_TEST_START": KEEP_LED_IN_CURRENT_STATE,
"RUNNING": blink(led, GREEN),
"COMPLETED": lambda: {
"PASS": set_led_color(led, GREEN),
"FAIL": set_led_color(led, RED),
"ERROR": set_led_color(led, PURPLE),
"TIMEOUT": blink(led, PURPLE),
"ABORTED": led.off,
}.get(test.outcome, set_led_color(led, RED))(),
}.get(test.status, led.off)
display_status()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(prog='ledsui', conflict_handler="error")
parser.add_argument('--log-level', choices=logging._nameToLevel.keys(), default='INFO', help="Set the log level")
station_opts = parser.add_argument_group("Station")
station_opts.add_argument('--station-id', type=str, default="Skipper sensor provisioning", help="id of the test station for which to display statuses", metavar="ID")
station_opts.add_argument('--dashboard-server-host', type=str, default="127.0.0.1", help="Host of the dashboard server", metavar="HOST")
station_opts.add_argument('--dashboard-server-port', type=str, default="4444", help="Port of the dashboard server", metavar="PORT")
station_status_ui_opts = parser.add_argument_group("station status LED")
station_status_ui_opts.add_argument('--station-status-red-led-gpio', type=int, default=17, help="GPIO number of the red component for the station status LED", metavar="GPIO_NUM")
station_status_ui_opts.add_argument('--station-status-green-led-gpio', type=int, default=27, help="GPIO number of the green component for the station status LED", metavar="GPIO_NUM")
station_status_ui_opts.add_argument('--station-status-blue-led-gpio', type=int, default=22, help="GPIO number of the blue component for the station status LED", metavar="GPIO_NUM")
test_status_ui_opts = parser.add_argument_group("test status LED")
test_status_ui_opts.add_argument('--test-status-red-led-gpio', type=int, default=25, help="GPIO number of the red component for the test status LED", metavar="GPIO_NUM")
test_status_ui_opts.add_argument('--test-status-green-led-gpio', type=int, default=24, help="GPIO number of the green component for the test status LED", metavar="GPIO_NUM")
test_status_ui_opts.add_argument('--test-status-blue-led-gpio', type=int, default=23, help="GPIO number of the blue component for the test status LED", metavar="GPIO_NUM")
config = parser.parse_known_args()[0]
logger.setLevel(config.log_level)
logging.getLogger('backoff').setLevel(logging.ERROR)
client_id = str(random.randint(0, 1000))
subscription_id_prefix = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(7))
ws_url = lambda topic: f"ws://{config.dashboard_server_host}:{config.dashboard_server_port}/sub/{topic}/websocket"
uis = [
station_status_ui(
logger.getChild("station_status_ui"),
ws_url(f"dashboard/{client_id}/{subscription_id_prefix}s"),
station_id = config.station_id,
led = RGBLED(
red = config.station_status_red_led_gpio,
green = config.station_status_green_led_gpio,
blue = config.station_status_blue_led_gpio,
initial_value = (0, 0, 0),
active_high = True,
pwm = False
)
),
test_status_ui(
logger.getChild("test_status_ui"),
ws_url(f"station/{client_id}/{subscription_id_prefix}t"),
station_id = config.station_id,
led = RGBLED(
red = config.test_status_red_led_gpio,
green = config.test_status_green_led_gpio,
blue = config.test_status_blue_led_gpio,
initial_value = (0, 0, 0),
active_high = True,
pwm = False
)
)
]
logger.info(f"Starting UI for station of id '{config.station_id}' at {config.dashboard_server_host}:{config.dashboard_server_port}")
loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(asyncio.wait(
[ asyncio.ensure_future(ui) for ui in uis ],
return_when = asyncio.FIRST_COMPLETED
))
for task in pending:
task.cancel()
loop.close()
exit(len(pending)) |
Hey @sibertdeclercq I'm pretty new to OpenHTF so my idea may be completely off, but I think you can store data about the ongoing test in the From my understanding After re-reading your question, I can also interpret it as though you would like to change the LED color depending on the Phase outcome (pass, fail, repeat...) and not just the entire Test outcome. In that case, you might want to make a call to your plug in each of your phases and then update the LED color when you define the phase outcome ( My guess is I'll probably end up having a similar use case as yours once I get into some more in depth testing so I'm glad to try and help you understand how to solve the issue. I think it would be helpful if you illustrated your problem with some code or a more detailed explanation. Best, Jpl |
One of the plugs I have written is an RGB LED to indicate different phases and statusses with. The problem I'm running into is that the test result is only available after the
test.execute
as far as I can see.This means I don't have my plugs available to set the LED to red. I'm now abusing
teardown
to read the test record outcome to:Is there a better wat to achieve this? I've looked into output callbacks but I couldn't find if it's possible to pass plugs to the callbacks.
Some pointers would be appreciated!
The text was updated successfully, but these errors were encountered: