diff --git a/linux_voice_assistant/__main__.py b/linux_voice_assistant/__main__.py index 8ff6b7ef..74e1ba69 100644 --- a/linux_voice_assistant/__main__.py +++ b/linux_voice_assistant/__main__.py @@ -45,6 +45,9 @@ from .mpv_player import MpvMediaPlayer from .util import call_all, get_mac, is_arm +from .event_bus import EventBus +from .event_led import LedEvent + _LOGGER = logging.getLogger(__name__) _MODULE_DIR = Path(__file__).parent _REPO_DIR = _MODULE_DIR.parent @@ -78,6 +81,8 @@ class ServerState: tts_player: MpvMediaPlayer wakeup_sound: str timer_finished_sound: str + loop: asyncio.AbstractEventLoop + event_bus: EventBus media_player_entity: Optional[MediaPlayerEntity] = None satellite: "Optional[VoiceSatelliteProtocol]" = None @@ -110,11 +115,16 @@ def __init__(self, state: ServerState) -> None: self._continue_conversation = False self._timer_finished = False + self.state.event_bus.publish('ready', {}) + _LOGGER.info('System is ready!') + def handle_voice_event( self, event_type: VoiceAssistantEventType, data: Dict[str, str] ) -> None: _LOGGER.debug("Voice event: type=%s, data=%s", event_type.name, data) + self.state.event_bus.publish(f'voice_{event_type.name}', data) + if event_type == VoiceAssistantEventType.VOICE_ASSISTANT_RUN_START: self._tts_url = data.get("url") self._tts_played = False @@ -157,6 +167,7 @@ def handle_timer_event( self._play_timer_finished() def handle_message(self, msg: message.Message) -> Iterable[message.Message]: + _LOGGER.debug(f'message {msg.__name__}') if isinstance(msg, VoiceAssistantEventResponse): # Pipeline event data: Dict[str, str] = {} @@ -197,14 +208,11 @@ def handle_message(self, msg: message.Message) -> Iterable[message.Message]: | VoiceAssistantFeature.TIMERS ), ) - elif isinstance( - msg, - ( - ListEntitiesRequest, - SubscribeHomeAssistantStatesRequest, - MediaPlayerCommandRequest, - ), - ): + elif isinstance(msg, ( + ListEntitiesRequest, + SubscribeHomeAssistantStatesRequest, + MediaPlayerCommandRequest, + ),): for entity in self.state.entities: yield from entity.handle_message(msg) @@ -245,13 +253,13 @@ def handle_message(self, msg: message.Message) -> Iterable[message.Message]: break def handle_audio(self, audio_chunk: bytes) -> None: - if not self._is_streaming_audio: return self.send_messages([VoiceAssistantAudio(data=audio_chunk)]) def wakeup(self) -> None: + # Why are we stopping the timer? Wouldn't it be better to delay it? if self._timer_finished: # Stop timer instead self._timer_finished = False @@ -264,6 +272,10 @@ def wakeup(self) -> None: self.send_messages( [VoiceAssistantRequest(start=True, wake_word_phrase=wake_word_phrase)] ) + + self.state.event_bus.publish('voice_wakeword', {'wake_word_phrase': wake_word_phrase}) + + self.duck() self._is_streaming_audio = True self.state.tts_player.play(self.state.wakeup_sound) @@ -286,6 +298,8 @@ def play_tts(self) -> None: self._tts_played = True _LOGGER.debug("Playing TTS response: %s", self._tts_url) + self.state.event_bus.publish('voice_play_tts', {}) + self.state.stop_word.is_active = True self.state.tts_player.play(self._tts_url, done_callback=self._tts_finished) @@ -301,6 +315,9 @@ def _tts_finished(self) -> None: self.state.stop_word.is_active = False self.send_messages([VoiceAssistantAnnounceFinished()]) + # Actual time the TTS stops speaking + self.state.event_bus.publish('voice__tts_finished', {}) + if self._continue_conversation: self.send_messages([VoiceAssistantRequest(start=True)]) self._is_streaming_audio = True @@ -432,6 +449,8 @@ async def main() -> None: stop_config_path = wake_word_dir / f"{args.stop_model}.json" _LOGGER.debug("Loading stop model: %s", stop_config_path) stop_model = MicroWakeWord.from_config(stop_config_path, libtensorflowlite_c_path) + + loop = asyncio.get_running_loop() state = ServerState( name=args.name, @@ -441,12 +460,16 @@ async def main() -> None: available_wake_words=available_wake_words, wake_word=wake_model, stop_word=stop_model, + event_bus=EventBus(), + loop=loop, music_player=MpvMediaPlayer(device=args.audio_output_device), tts_player=MpvMediaPlayer(device=args.audio_output_device), wakeup_sound=args.wakeup_sound, timer_finished_sound=args.timer_finished_sound, ) + LedEvent(state) + process_audio_thread = threading.Thread( target=process_audio, args=(state,), daemon=True ) @@ -455,7 +478,6 @@ async def main() -> None: def sd_callback(indata, _frames, _time, _status): state.audio_queue.put_nowait(bytes(indata)) - loop = asyncio.get_running_loop() server = await loop.create_server( lambda: VoiceSatelliteProtocol(state), host=args.host, port=args.port ) diff --git a/linux_voice_assistant/event_bus.py b/linux_voice_assistant/event_bus.py new file mode 100644 index 00000000..8ce5ecab --- /dev/null +++ b/linux_voice_assistant/event_bus.py @@ -0,0 +1,67 @@ +import logging +from typing import Any, Callable, Dict, List, Optional + +_LOGGER = logging.getLogger(__name__) + + +class EventBus: + """A simple synchronous publish/subscribe event bus.""" + + def __init__(self): + # A dictionary to hold listeners for specific string topics + self.topics: Dict[str, List[Callable[[Any], None]]] = {} + + def subscribe(self, topic: str, listener: Callable[[Any], None]) -> None: + """ + Subscribes a listener to a topic. + """ + + # _LOGGER.debug(f'EventBus subscribe {topic}') + + if topic not in self.topics: + self.topics[topic] = [] + self.topics[topic].append(listener) + + def publish(self, topic: str, data: [dict, None]) -> None: + """ + Publishes an event to all subscribed listeners. + """ + + # _LOGGER.debug(f'EventBus publish {topic}') + + data['__topic'] = topic + + listeners = self.topics.get(topic, []) + for listener in listeners: + listener(data) + +# Client helpers for subscriptions + +# The decorator to mark methods for subscription. +def subscribe(func: Callable) -> Callable: + """Decorator to mark a method for event bus subscription.""" + func._event_bus_subscribe = True + return func + +class EventHandler: + """ + A base class for components that subscribe to events. + + Subclasses should define event handlers as methods decorated with `@subscribe`. + The method name will automatically be used as the event topic. + """ + + def __init__(self, state: Any): + self.state = state + self._subscribe_all_methods() + _LOGGER.debug(f"EventHandler {self.__class__.__name__} has subscribed to all decorated methods.") + + def _subscribe_all_methods(self): + """Finds and subscribes all methods decorated with @subscribe.""" + for method_name in dir(self): + method = getattr(self, method_name) + + if hasattr(method, '_event_bus_subscribe'): + # The topic is the name of the method itself. + self.state.event_bus.subscribe(method_name, method) + _LOGGER.debug(f"Subscribed method '{method_name}' to topic '{method_name}'") \ No newline at end of file diff --git a/linux_voice_assistant/event_led.py b/linux_voice_assistant/event_led.py new file mode 100644 index 00000000..651a34a2 --- /dev/null +++ b/linux_voice_assistant/event_led.py @@ -0,0 +1,262 @@ +import logging +from typing import Any, Callable + +from .event_bus import EventHandler, subscribe + +_LOGGER = logging.getLogger(__name__) + + + +"""Controls the LEDs on the ReSpeaker 2mic HAT.""" +from math import ceil +from typing import Tuple + +import time +import asyncio +import gpiozero +import spidev + + +NUM_LEDS = 3 +LEDS_GPIO = 12 +RGB_MAP = { + "rgb": [3, 2, 1], + "rbg": [3, 1, 2], + "grb": [2, 3, 1], + "gbr": [2, 1, 3], + "brg": [1, 3, 2], + "bgr": [1, 2, 3], +} + + + +_OFF = (0, 0, 0) +_WHITE = (255, 255, 255) +_RED = (255, 0, 0) +_YELLOW = (255, 255, 0) +_BLUE = (0, 0, 255) +_GREEN = (0, 255, 0) + +# ----------------------------------------------------------------------------- + + +class APA102: + """ + Driver for APA102 LEDS (aka "DotStar"). + (c) Martin Erzberger 2016-2017 + """ + + # Constants + MAX_BRIGHTNESS = 0b11111 # Safeguard: Set to a value appropriate for your setup + LED_START = 0b11100000 # Three "1" bits, followed by 5 brightness bits + + def __init__( + self, + num_led, + global_brightness, + loop: asyncio.AbstractEventLoop, + order="rgb", + bus=0, + device=1, + max_speed_hz=8000000, + ): + self.num_led = num_led # The number of LEDs in the Strip + order = order.lower() + self.rgb = RGB_MAP.get(order, RGB_MAP["rgb"]) + # Limit the brightness to the maximum if it's set higher + if global_brightness > self.MAX_BRIGHTNESS: + self.global_brightness = self.MAX_BRIGHTNESS + else: + self.global_brightness = global_brightness + print("LED brightness:", self.global_brightness) + + self.leds = [self.LED_START, 0, 0, 0] * self.num_led # Pixel buffer + self.spi = spidev.SpiDev() # Init the SPI device + self.spi.open(bus, device) # Open SPI port 0, slave device (CS) 1 + # Up the speed a bit, so that the LEDs are painted faster + if max_speed_hz: + self.spi.max_speed_hz = max_speed_hz + + self.current_task = None + self.loop = loop + + + def clock_start_frame(self): + """Sends a start frame to the LED strip. + + This method clocks out a start frame, telling the receiving LED + that it must update its own color now. + """ + self.spi.xfer2([0] * 4) # Start frame, 32 zero bits + + def clock_end_frame(self): + """Sends an end frame to the LED strip. + + As explained above, dummy data must be sent after the last real colour + information so that all of the data can reach its destination down the line. + The delay is not as bad as with the human example above. + It is only 1/2 bit per LED. This is because the SPI clock line + needs to be inverted. + + Say a bit is ready on the SPI data line. The sender communicates + this by toggling the clock line. The bit is read by the LED + and immediately forwarded to the output data line. When the clock goes + down again on the input side, the LED will toggle the clock up + on the output to tell the next LED that the bit is ready. + + After one LED the clock is inverted, and after two LEDs it is in sync + again, but one cycle behind. Therefore, for every two LEDs, one bit + of delay gets accumulated. For 300 LEDs, 150 additional bits must be fed to + the input of LED one so that the data can reach the last LED. + + Ultimately, we need to send additional numLEDs/2 arbitrary data bits, + in order to trigger numLEDs/2 additional clock changes. This driver + sends zeroes, which has the benefit of getting LED one partially or + fully ready for the next update to the strip. An optimized version + of the driver could omit the "clockStartFrame" method if enough zeroes have + been sent as part of "clockEndFrame". + """ + + self.spi.xfer2([0xFF] * 4) + + # Round up num_led/2 bits (or num_led/16 bytes) + # for _ in range((self.num_led + 15) // 16): + # self.spi.xfer2([0x00]) + + def set_pixel(self, led_num, red, green, blue, bright_percent=100): + """Sets the color of one pixel in the LED stripe. + + The changed pixel is not shown yet on the Stripe, it is only + written to the pixel buffer. Colors are passed individually. + If brightness is not set the global brightness setting is used. + """ + if led_num < 0: + return # Pixel is invisible, so ignore + if led_num >= self.num_led: + return # again, invisible + + # Calculate pixel brightness as a percentage of the + # defined global_brightness. Round up to nearest integer + # as we expect some brightness unless set to 0 + brightness = int(ceil(bright_percent * self.global_brightness / 100.0)) + + # LED startframe is three "1" bits, followed by 5 brightness bits + ledstart = (brightness & 0b00011111) | self.LED_START + + start_index = 4 * led_num + self.leds[start_index] = ledstart + self.leds[start_index + self.rgb[0]] = red + self.leds[start_index + self.rgb[1]] = green + self.leds[start_index + self.rgb[2]] = blue + + def set_pixel_rgb(self, led_num, rgb_color, bright_percent=100): + """Sets the color of one pixel in the LED stripe. + + The changed pixel is not shown yet on the Stripe, it is only + written to the pixel buffer. + Colors are passed combined (3 bytes concatenated) + If brightness is not set the global brightness setting is used. + """ + self.set_pixel( + led_num, + (rgb_color & 0xFF0000) >> 16, + (rgb_color & 0x00FF00) >> 8, + rgb_color & 0x0000FF, + bright_percent or self.global_brightness, + ) + + def rotate(self, positions=1): + """Rotate the LEDs by the specified number of positions. + + Treating the internal LED array as a circular buffer, rotate it by + the specified number of positions. The number could be negative, + which means rotating in the opposite direction. + """ + cutoff = 4 * (positions % self.num_led) + self.leds = self.leds[cutoff:] + self.leds[:cutoff] + + def show(self): + """Sends the content of the pixel buffer to the strip. + + Todo: More than 1024 LEDs requires more than one xfer operation. + """ + self.clock_start_frame() + # xfer2 kills the list, unfortunately. So it must be copied first + # SPI takes up to 4096 Integers. So we are fine for up to 1024 LEDs. + data = list(self.leds) + while data: + self.spi.xfer2(data[:32]) + data = data[32:] + self.clock_end_frame() + + def cleanup(self): + """Release the SPI device; Call this method at the end""" + + self.spi.close() # Close SPI port + + + def run_action(self, action_mentod_name: str, *args: Any) -> None: + if self.current_task and not self.current_task.done(): + self.current_task.cancel() + + self.current_task = asyncio.run_coroutine_threadsafe(getattr(self, action_mentod_name)(*args), self.loop) + + async def color(self, rgb: Tuple[int, int, int], brightness = None) -> None: + for i in range(NUM_LEDS): + self.set_pixel(i, rgb[0], rgb[1], rgb[2], brightness or self.global_brightness) + + self.show() + + async def blink(self, color, count=10000): + for _ in range(count): + await self.color(color) + await asyncio.sleep(0.3) + await self.color(_OFF) + await asyncio.sleep(0.3) + + async def pulse(self, color: Tuple[int, int, int], speed: float = 0.009): + """Asynchronously pulses the LEDs from off to full brightness and back.""" + # Fade in + while(1): + for brightness in range(1, self.global_brightness+1): + await self.color(color, brightness) + await asyncio.sleep(speed) + + # Fade out + for brightness in range(self.global_brightness+1, 0, -1): + await self.color(color, brightness) + await asyncio.sleep(speed) + + + + +class LedEvent(EventHandler): + def __init__(self, state): + super().__init__(state) + self.leds = APA102(num_led=3, global_brightness=31, loop=self.state.loop) + + @subscribe + def ready(self, data: dict): + _LOGGER.debug('ready LED green blink') + self.leds.run_action("blink", _GREEN, 3) + + @subscribe + def voice_wakeword(self, data: dict): + self.leds.run_action("pulse", _BLUE) + + @subscribe + def voice_VOICE_ASSISTANT_STT_VAD_END(self, data: dict): + self.leds.run_action("pulse", _YELLOW) + + @subscribe + def voice_play_tts(self, data: dict): + self.leds.run_action("pulse", _GREEN) + + # This event fires long before the TTS is done speaking + # @subscribe + # def voice_VOICE_ASSISTANT_RUN_END(self, data: dict): + # self.leds.run_action("color", _OFF, 0) + + @subscribe + def voice__tts_finished(self, data: dict): + self.leds.run_action("color", _OFF, 0) diff --git a/requirements.txt b/requirements.txt index be3245c3..5e879ab4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ aioesphomeapi sounddevice +gpiozero +spidev diff --git a/script/setup b/script/setup index 0ff2e95b..b9cb4334 100755 --- a/script/setup +++ b/script/setup @@ -21,6 +21,9 @@ builder.create(_VENV_DIR) pip = [context.env_exe, "-m", "pip"] subprocess.check_call(pip + ["install", "--upgrade", "pip"]) subprocess.check_call(pip + ["install", "--upgrade", "setuptools", "wheel"]) +subprocess.check_call(pip + ["install", "--upgrade", "python-mpv"]) +subprocess.check_call(pip + ["install", "--upgrade", "gpiozero"]) +subprocess.check_call(pip + ["install", "--upgrade", "spidev"]) # Install requirements subprocess.check_call(pip + ["install", "-e", str(_PROGRAM_DIR)])