diff --git a/requirements.txt b/requirements.txt index 6b008a8..dedf3b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,7 @@ pillow>=11.2.1 requests>=2.32 pychromecast~=14.0.7 httpx~=0.28.1 -sanitize-filename~=1.2.0 \ No newline at end of file +sanitize-filename~=1.2.0 +async_timeout~=5.0.1 +adb_shell==0.4.4 +simple-justwatch-python-api>=0.16 \ No newline at end of file diff --git a/src/adb_tv.py b/src/adb_tv.py new file mode 100644 index 0000000..867c583 --- /dev/null +++ b/src/adb_tv.py @@ -0,0 +1,109 @@ +""" +This module provides utilities for interacting with Android TVs via ADB (Android Debug Bridge). + +It includes functions for managing ADB keys, connecting to devices, retrieving installed apps, +and verifying device authorization. +""" + +import os +from pathlib import Path +from typing import Dict, Optional + +from adb_shell.adb_device_async import AdbDeviceTcpAsync +from adb_shell.auth.keygen import keygen +from adb_shell.auth.sign_pythonrsa import PythonRSASigner + +ADB_CERTS_DIR = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" +ADB_CERTS_DIR.mkdir(parents=True, exist_ok=True) + + +def get_adb_key_paths(device_id: str) -> tuple[Path, Path]: + """ + Return the paths to the private and public ADB keys for a given device. + + Args: + device_id (str): The unique identifier for the device. + + Returns: + tuple[Path, Path]: Paths to the private and public key files. + """ + priv = ADB_CERTS_DIR / f"adb_{device_id}" + pub = ADB_CERTS_DIR / f"adb_{device_id}.pub" + return priv, pub + + +def load_or_generate_adb_keys(device_id: str) -> PythonRSASigner: + """ + Ensure ADB RSA keys exist for the device and return the signer. + + Args: + device_id (str): The unique identifier for the device. + + Returns: + PythonRSASigner: The signer object for ADB authentication. + """ + priv_path, pub_path = get_adb_key_paths(device_id) + + if not priv_path.exists() or not pub_path.exists(): + keygen(str(priv_path)) + + with open(priv_path, encoding="utf-8") as f: + priv = f.read() + with open(pub_path, encoding="utf-8") as f: + pub = f.read() + return PythonRSASigner(pub, priv) + + +async def adb_connect(device_id: str, host: str, port: int = 5555) -> Optional[AdbDeviceTcpAsync]: + """ + Connect to an Android device via ADB. + + Args: + device_id (str): The unique identifier for the device. + host (str): The IP address or hostname of the device. + port (int, optional): The port number for the ADB connection. Defaults to 5555. + + Returns: + Optional[AdbDeviceTcpAsync]: The connected ADB device object, or None if the connection fails. + """ + signer = load_or_generate_adb_keys(device_id) + device = AdbDeviceTcpAsync(host, port, default_transport_timeout_s=9.0) + + try: + await device.connect(rsa_keys=[signer], auth_timeout_s=20) + return device + except Exception as e: + print(f"ADB connection failed to {host}:{port} — {e}") + return None + + +async def get_installed_apps(device: AdbDeviceTcpAsync) -> Dict[str, Dict[str, str]]: + """ + Retrieve a list of installed non-system apps in a structured format. + + Args: + device (AdbDeviceTcpAsync): The connected ADB device. + + Returns: + Dict[str, Dict[str, str]]: A dictionary of app package names and their metadata. + """ + output = await device.shell("pm list packages -3 -e") + packages = sorted(line.replace("package:", "").strip() for line in output.splitlines()) + return {package: {"url": f"market://launch?id={package}"} for package in packages} + + +async def is_authorised(device: AdbDeviceTcpAsync) -> bool: + """ + Check if the connected device is authorized for ADB communication. + + Args: + device (AdbDeviceTcpAsync): The connected ADB device. + + Returns: + bool: True if the device is authorized, False otherwise. + """ + try: + result = await device.shell("echo ADB_OK") + return "ADB_OK" in result + except Exception: + return False diff --git a/src/config.py b/src/config.py index 72ca8d4..f64d16f 100644 --- a/src/config.py +++ b/src/config.py @@ -11,6 +11,7 @@ import logging import os from dataclasses import dataclass +from pathlib import Path from typing import Iterator _LOG = logging.getLogger(__name__) @@ -18,6 +19,19 @@ _CFG_FILENAME = "config.json" +# Paths +def _get_config_root() -> Path: + config_home = Path(os.environ.get("UC_CONFIG_HOME", "./config")) + config_home.mkdir(parents=True, exist_ok=True) + return config_home + + +def _get_data_root() -> Path: + data_home = Path(os.environ.get("UC_DATA_HOME", "./data")) + data_home.mkdir(parents=True, exist_ok=True) + return data_home + + @dataclass class AtvDevice: """Android TV device configuration.""" @@ -38,6 +52,8 @@ class AtvDevice: """Enable External Metadata.""" use_chromecast: bool = False """Enable Chromecast features.""" + use_adb: bool = False + """Enable ADB features.""" use_chromecast_volume: bool = False """Enable volume driven by Chromecast protocol.""" volume_step: int = 10 @@ -137,6 +153,7 @@ def update(self, atv: AtvDevice) -> bool: item.auth_error = atv.auth_error item.use_external_metadata = atv.use_external_metadata item.use_chromecast = atv.use_chromecast + item.use_adb = atv.use_adb item.use_chromecast_volume = atv.use_chromecast_volume item.volume_step = atv.volume_step if atv.volume_step else 10 return self.store() @@ -144,19 +161,19 @@ def update(self, atv: AtvDevice) -> bool: def default_certfile(self) -> str: """Return the default certificate file for initializing a device.""" - return os.path.join(self._data_path, "androidtv_remote_cert.pem") + return os.path.join(self._data_path + "/certs", "androidtv_remote_cert.pem") def default_keyfile(self) -> str: """Return the default key file for initializing a device.""" - return os.path.join(self._data_path, "androidtv_remote_key.pem") + return os.path.join(self._data_path + "/certs", "androidtv_remote_key.pem") def certfile(self, atv_id: str) -> str: """Return the certificate file of the device.""" - return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_cert.pem") + return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_cert.pem") def keyfile(self, atv_id: str) -> str: """Return the key file of the device.""" - return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_key.pem") + return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_key.pem") def remove(self, atv_id: str) -> bool: """Remove the given device configuration.""" @@ -242,6 +259,7 @@ def load(self) -> bool: item.get("auth_error", False), item.get("use_external_metadata", False), item.get("use_chromecast", False), + item.get("use_adb", False), item.get("use_chromecast_volume", False), item.get("volume_step", 10), ) diff --git a/src/external_metadata.py b/src/external_metadata.py index f2c0dd1..763ace2 100644 --- a/src/external_metadata.py +++ b/src/external_metadata.py @@ -9,11 +9,11 @@ import base64 import json import logging -import os +import re from io import BytesIO from pathlib import Path from typing import Dict -from urllib.parse import urlparse +from urllib.parse import quote, urlparse import google_play_scraper import httpx @@ -21,6 +21,9 @@ from PIL.Image import Resampling from pychromecast.controllers.media import MediaImage from sanitize_filename import sanitize +from simplejustwatchapi import justwatch + +from config import _get_config_root, _get_data_root _LOG = logging.getLogger(__name__) @@ -30,27 +33,14 @@ # Paths -def _get_config_root() -> Path: - config_home = Path(os.environ.get("UC_CONFIG_HOME", "./config")) - config_home.mkdir(parents=True, exist_ok=True) - return config_home - - -def _get_cache_root() -> Path: - data_home = Path(os.environ.get("UC_DATA_HOME", "./data")) - cache_root = data_home / CACHE_ROOT - cache_root.mkdir(parents=True, exist_ok=True) - return cache_root - - def _get_metadata_dir() -> Path: - metadata_dir = _get_cache_root() + metadata_dir = _get_data_root() / CACHE_ROOT metadata_dir.mkdir(parents=True, exist_ok=True) return metadata_dir def _get_icon_dir() -> Path: - icon_dir = _get_cache_root() / ICON_SUBDIR + icon_dir = _get_data_root() / CACHE_ROOT / ICON_SUBDIR icon_dir.mkdir(parents=True, exist_ok=True) return icon_dir @@ -119,6 +109,61 @@ def resize_image() -> str: return None +# async def encode_icon_to_data_uri(icon_name: str) -> str: +# """ +# Encode an image from a local file path or remote URL. +# +# Returns a base64-encoded PNG data URI. +# """ +# if isinstance(icon_name, MediaImage): +# icon_name = icon_name.url +# +# if isinstance(icon_name, str) and icon_name.startswith("data:image"): +# _LOG.debug("Icon is already a data URI") +# return icon_name +# +# _LOG.debug("Encoding icon to data URI: %s", icon_name) +# try: +# if _is_url(icon_name): +# async with httpx.AsyncClient() as client: +# response = await client.get(icon_name, timeout=10) +# response.raise_for_status() +# img_bytes = BytesIO(response.content) +# +# def encode_image() -> str: +# img = Image.open(img_bytes) +# img = img.convert("RGBA") +# buffer = BytesIO() +# img.save(buffer, format="PNG") +# encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") +# return f"data:image/png;base64,{encoded}" +# +# return await asyncio.to_thread(encode_image) +# +# def load_and_encode() -> str: +# icon_path = _get_icon_path(icon_name) +# if not icon_path.exists(): +# raise FileNotFoundError(f"Icon not found: {icon_name}") +# with open(icon_path, "rb") as f: +# img = Image.open(f) +# img.load() +# img = img.convert("RGBA") +# buffer = BytesIO() +# img.save(buffer, format="PNG") +# encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") +# return f"data:image/png;base64,{encoded}" +# +# return await asyncio.to_thread(load_and_encode) +# +# except Exception as e: +# _LOG.warning("Failed to encode icon to base64 for %s: %s", icon_name, e) +# return "" + + +_ENCODED_ICON_CACHE: dict[str, str] = {} +_MISSING_ICON_CACHE: set[str] = set() + + async def encode_icon_to_data_uri(icon_name: str) -> str: """ Encode an image from a local file path or remote URL. @@ -132,7 +177,15 @@ async def encode_icon_to_data_uri(icon_name: str) -> str: _LOG.debug("Icon is already a data URI") return icon_name + if icon_name in _ENCODED_ICON_CACHE: + return _ENCODED_ICON_CACHE[icon_name] + + if icon_name in _MISSING_ICON_CACHE: + _LOG.debug("Skipping encoding; previously failed: %s", icon_name) + return "" + _LOG.debug("Encoding icon to data URI: %s", icon_name) + try: if _is_url(icon_name): async with httpx.AsyncClient() as client: @@ -148,12 +201,18 @@ def encode_image() -> str: encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") return f"data:image/png;base64,{encoded}" - return await asyncio.to_thread(encode_image) + encoded = await asyncio.to_thread(encode_image) + _ENCODED_ICON_CACHE[icon_name] = encoded + return encoded + + # Local file path + icon_path = _get_icon_path(icon_name) + if not icon_path.exists(): + _LOG.warning("Icon not found on disk: %s", icon_path) + _MISSING_ICON_CACHE.add(icon_name) + return "" def load_and_encode() -> str: - icon_path = _get_icon_path(icon_name) - if not icon_path.exists(): - raise FileNotFoundError(f"Icon not found: {icon_name}") with open(icon_path, "rb") as f: img = Image.open(f) img.load() @@ -163,10 +222,13 @@ def load_and_encode() -> str: encoded = base64.b64encode(buffer.getvalue()).decode("utf-8") return f"data:image/png;base64,{encoded}" - return await asyncio.to_thread(load_and_encode) + encoded = await asyncio.to_thread(load_and_encode) + _ENCODED_ICON_CACHE[icon_name] = encoded + return encoded except Exception as e: _LOG.warning("Failed to encode icon to base64 for %s: %s", icon_name, e) + _MISSING_ICON_CACHE.add(icon_name) return "" @@ -228,4 +290,96 @@ async def get_app_metadata(package_id: str) -> Dict[str, str]: return {"name": metadata["name"], "icon": icon_data_uri} _LOG.debug("Falling back to default metadata for %s", package_id) - return {"name": package_id, "icon": ""} + return {"name": "", "icon": ""} + + +async def youtube_search(query: str): + """Search for poster images using YouTube.""" + url = f"https://www.youtube.com/results?search_query={quote(query)}" + headers = {"User-Agent": "Mozilla/5.0"} + + with httpx.Client(headers=headers, timeout=10) as client: + response = client.get(url) + html = response.text + + # Extract the ytInitialData JSON + match = re.search(r"var ytInitialData = ({.*?});", html) + if not match: + raise RuntimeError("Could not find ytInitialData in the page") + + data = json.loads(match.group(1)) + + try: + items = data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"][ + "contents" + ][0]["itemSectionRenderer"]["contents"] + except (KeyError, IndexError): + raise RuntimeError("Could not parse YouTube data structure") + + for item in items: + if "videoRenderer" in item: + video = item["videoRenderer"] + video_id = video.get("videoId") + + return f"https://img.youtube.com/vi/{video_id}/0.jpg" + + return None + + +async def search_poster_justwatch(query: str, country: str = "GB") -> str | None: + """Search for poster images using JustWatch API.""" + response = justwatch.search(query, country, "en", count=1, best_only=True) + + if not response[0].poster: + return None + + poster_url = response[0].poster + + if poster_url: + return poster_url + + return None + + +async def get_best_artwork(title: str, artist: str = None, current_package: str = None) -> str | None: + """Get artwork for a TV show or movie based on current app and title/artist.""" + _LOG.debug( + "Resolving best artwork for title='%s', artist='%s', current_package='%s'", title, artist, current_package + ) + + search_query = f"{title} - {artist}" if artist else title + + if current_package in [ + "com.google.android.youtube.tv", + "com.liskovsoft.videomanager", + "com.teamsmart.videomanager.tv", + ]: + + _LOG.debug("YouTube detected. Searching for artwork.") + + youtube = await youtube_search(search_query) + + if youtube: + _LOG.debug("Artwork result:\n%s", json.dumps(youtube, indent=2)) + return youtube + _LOG.debug("No artwork found from YouTube search.") + + else: + + _LOG.debug("Non-YouTube package detected. Searching for artwork.") + justwatch_poster = await search_poster_justwatch(search_query) + + if justwatch_poster: + _LOG.debug("Artwork result:\n%s", json.dumps(justwatch, indent=2)) + return justwatch_poster + _LOG.debug("No artwork found from JustWatch search.") + + _LOG.debug("No artwork source applicable. Returning False.") + return None + + +# async def test(): +# posters = await get_best_artwork("Episode 1", "Breaking Bad", "com.plexapp.android") +# print(posters) +# +# asyncio.run(test()) diff --git a/src/profiles.py b/src/profiles.py index f4b261a..b868963 100644 --- a/src/profiles.py +++ b/src/profiles.py @@ -12,8 +12,10 @@ import json import logging import os +import string from dataclasses import dataclass from enum import IntEnum +from typing import Any from ucapi import media_player @@ -260,6 +262,24 @@ def match(self, manufacturer: str, model: str, use_chromecast: bool) -> Profile: select_profile = copy.copy(select_profile) select_profile.features.extend(CHROMECAST_FEATURES) + for char in string.ascii_uppercase + " ": + # Use 'SPACE' as the key name for the space character + key = "SPACE" if char == " " else char + command_name = f"TEXT_{key}" + + # Append to simple_commands list + select_profile.simple_commands.append(command_name) + + # Map the command: space gets ' ', others get lowercase letter + command_value = " " if char == " " else char.lower() + select_profile.command_map[command_name] = Command(command_value, "TEXT") + + select_profile.simple_commands.append("TEXT_BACKSPACE") + select_profile.command_map["TEXT_BACKSPACE"] = Command("DEL", KeyPress.SHORT) + + select_profile.simple_commands.append("TEXT_ENTER") + select_profile.command_map["TEXT_ENTER"] = Command("ENTER", KeyPress.SHORT) + return select_profile @@ -278,7 +298,7 @@ def _str_to_feature(value: str) -> media_player.Features | None: return None -def _convert_command_map(values: dict[str, any]) -> dict[str, Command]: +def _convert_command_map(values: dict[str, Any]) -> dict[str, Command]: cmd_map = {} for key, value in values.items(): try: diff --git a/src/setup_flow.py b/src/setup_flow.py index 7defe1c..57bddd1 100644 --- a/src/setup_flow.py +++ b/src/setup_flow.py @@ -6,8 +6,11 @@ """ import asyncio +import json import logging +import os from enum import IntEnum +from pathlib import Path import ucapi from ucapi import ( @@ -25,7 +28,10 @@ import config import discover import tv -from config import AtvDevice +from adb_tv import adb_connect, get_installed_apps, is_authorised +from apps import Apps, IdMappings +from config import AtvDevice, _get_config_root +from external_metadata import get_app_metadata _LOG = logging.getLogger(__name__) @@ -38,7 +44,8 @@ class SetupSteps(IntEnum): DISCOVER = 2 DEVICE_CHOICE = 3 PAIRING_PIN = 4 - RECONFIGURE = 5 + APP_SELECTION = 5 + RECONFIGURE = 6 _setup_step = SetupSteps.INIT @@ -48,6 +55,9 @@ class SetupSteps(IntEnum): _use_external_metadata: bool = False _reconfigured_device: AtvDevice | None = None _use_chromecast: bool = False +_use_adb: bool = False +_adb_device_id: str = "" +_device_info: dict[str, str] = {} _use_chromecast_volume: bool = False _volume_step: int = 10 @@ -110,6 +120,8 @@ async def driver_setup_handler(msg: SetupDriver) -> SetupAction: return await handle_device_choice(msg) if _setup_step == SetupSteps.PAIRING_PIN and "pin" in msg.input_values: return await handle_user_data_pin(msg) + if _setup_step == SetupSteps.APP_SELECTION: + return await handle_app_selection(msg) if _setup_step == SetupSteps.RECONFIGURE: return await _handle_device_reconfigure(msg) _LOG.error("No or invalid user response was received: %s", msg) @@ -290,6 +302,16 @@ async def handle_configuration_mode( _LOG.warning("Could not remove device from configuration: %s", choice) return SetupError(error_type=IntegrationSetupError.OTHER) config.devices.store() + adb_cert_path = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" / f"adb_{choice}" + adb_cert_path_pub = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" / f"adb_{choice}.pub" + if adb_cert_path.exists(): + adb_cert_path.unlink() + if adb_cert_path_pub.exists(): + adb_cert_path_pub.unlink() + appslist_path = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / f"appslist_{choice}.json" + if appslist_path.exists(): + appslist_path.unlink() + _LOG.info("Device removed from configuration: %s", choice) return SetupComplete() case "configure": # Reconfigure device if the identifier has changed @@ -302,6 +324,7 @@ async def handle_configuration_mode( _setup_step = SetupSteps.RECONFIGURE _reconfigured_device = selected_device use_chromecast = selected_device.use_chromecast if selected_device.use_chromecast else False + use_adb = selected_device.use_adb if selected_device.use_adb else False use_chromecast_volume = ( selected_device.use_chromecast_volume if selected_device.use_chromecast_volume else False ) @@ -321,6 +344,7 @@ async def handle_configuration_mode( __cfg_chromecast_volume(use_chromecast_volume), __cfg_volume_step(volume_step), __cfg_external_metadata(use_external_metadata), + __cfg_adb(use_adb), ], ) case "reset": @@ -428,6 +452,7 @@ async def _handle_discovery(msg: UserDataResponse) -> RequestUserInput | SetupEr __cfg_chromecast_volume(False), __cfg_volume_step(10), __cfg_external_metadata(False), + __cfg_adb(False), ], ) @@ -446,6 +471,7 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu global _use_chromecast_volume global _setup_step global _use_external_metadata + global _use_adb global _volume_step choice = msg.input_values["choice"] @@ -453,6 +479,7 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu _use_chromecast = msg.input_values.get("chromecast", "false") == "true" _use_chromecast_volume = msg.input_values.get("chromecast_volume", "false") == "true" _volume_step = int(msg.input_values.get("volume_step", 10)) + _use_adb = msg.input_values.get("adb", "false") == "true" name = "" for discovered_tv in _discovered_android_tvs: @@ -470,6 +497,7 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu id="", use_external_metadata=False, use_chromecast=False, + use_adb=False, use_chromecast_volume=_use_chromecast_volume, volume_step=_volume_step, ), @@ -504,16 +532,17 @@ async def handle_device_choice(msg: UserDataResponse) -> RequestUserInput | Setu return _setup_error_from_device_state(_pairing_android_tv.state) -async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupError: +async def handle_user_data_pin(msg: UserDataResponse) -> RequestUserInput | SetupComplete | SetupError: """ Process user data pairing pin response in a setup process. - Driver setup callback to provide requested user data during the setup process. - :param msg: response data from the requested user data - :return: the setup action on how to continue: SetupComplete if a valid Android TV device was chosen. + :return: the setup action on how to continue. """ global _pairing_android_tv + global _setup_step + + _LOG.debug("Entered handle_user_data_pin with msg: %s", msg) if _pairing_android_tv is None: _LOG.error("Can't handle pairing pin: no device instance! Aborting setup") @@ -522,23 +551,33 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr _LOG.info("[%s] User has entered the PIN", _pairing_android_tv.log_id) res = await _pairing_android_tv.finish_pairing(msg.input_values["pin"]) + _LOG.debug("[%s] finish_pairing result: %s", _pairing_android_tv.log_id, res) + _pairing_android_tv.disconnect() + _LOG.debug("[%s] Disconnected after pairing attempt", _pairing_android_tv.log_id) - device_info = None + if res != ucapi.StatusCodes.OK: + _LOG.warning("[%s] Pairing failed", _pairing_android_tv.log_id) + return SetupError() + + _LOG.info("[%s] Pairing done, retrieving device information", _pairing_android_tv.log_id) + timeout = int(tv.CONNECTION_TIMEOUT) + res = ucapi.StatusCodes.SERVER_ERROR + _device_info = None + + if await _pairing_android_tv.init(timeout): + _LOG.debug("[%s] Initialization successful", _pairing_android_tv.log_id) + if await _pairing_android_tv.connect(timeout): + _LOG.debug("[%s] Connection successful", _pairing_android_tv.log_id) + _device_info = _pairing_android_tv.device_info or {} + _LOG.debug("[%s] Retrieved device info: %s", _pairing_android_tv.log_id, _device_info) - # Connect again to retrieve device identifier (with init()) and additional device information (with connect()) - if res == ucapi.StatusCodes.OK: - _LOG.info( - "[%s] Pairing done, retrieving device information", - _pairing_android_tv.log_id, - ) - res = ucapi.StatusCodes.SERVER_ERROR - timeout = int(tv.CONNECTION_TIMEOUT) - if await _pairing_android_tv.init(timeout) and await _pairing_android_tv.connect(timeout): - device_info = _pairing_android_tv.device_info or {} if config.devices.assign_default_certs_to_device(_pairing_android_tv.identifier, True): res = ucapi.StatusCodes.OK - _pairing_android_tv.disconnect() + _LOG.debug("[%s] Default certificates assigned successfully", _pairing_android_tv.log_id) + + _pairing_android_tv.disconnect() + _LOG.debug("[%s] Disconnected after retrieving device information", _pairing_android_tv.log_id) if res != ucapi.StatusCodes.OK: state = _pairing_android_tv.state @@ -546,25 +585,159 @@ async def handle_user_data_pin(msg: UserDataResponse) -> SetupComplete | SetupEr _pairing_android_tv = None return _setup_error_from_device_state(state) + adb_apps = {} + if _use_adb: + _LOG.debug("ADB is enabled, proceeding with ADB setup") + + if not msg.input_values.get("adb", False): + _LOG.error("ADB setup failed: 'adb' not found in input values") + return SetupError() + + device_id = _pairing_android_tv.identifier + ip_address = _pairing_android_tv.address + _LOG.debug("Attempting ADB setup for device_id: %s, ip_address: %s", device_id, ip_address) + + adb_device = await adb_connect(device_id, ip_address) + if not adb_device or not await is_authorised(adb_device): + return SetupError(error_type=IntegrationSetupError.AUTHORIZATION_ERROR) + + _LOG.debug("ADB authorisation confirmed") + adb_apps = await get_installed_apps(adb_device) + _LOG.debug("Retrieved ADB apps: %s", adb_apps) + await adb_device.close() + + offline_friendly_names = set(IdMappings.values()) + # offline_package_ids = set(Apps.keys()) + + if _use_adb: + filtered_adb_apps = {} + for package, details in adb_apps.items(): + if package in Apps or package in IdMappings: + continue + friendly = details.get("name", "") + if friendly and friendly in offline_friendly_names: + continue + filtered_adb_apps[package] = details + + merged_apps = {**filtered_adb_apps, **Apps} + else: + merged_apps = Apps + + _LOG.debug("Merged apps (offline preferred): %s", merged_apps) + + app_entries = [] + + for package, details in merged_apps.items(): + mapped_name = IdMappings.get(package) + name = mapped_name or details.get("name", package) + editable = False + + if _use_external_metadata and not mapped_name: + try: + metadata = await get_app_metadata(package) + if metadata.get("name"): + name = metadata["name"] + editable = True + except Exception as e: + _LOG.warning("Metadata lookup failed for %s: %s", package, e) + + is_unfriendly = name.startswith("com.") or name.count(".") >= 2 + app_entries.append((is_unfriendly, name.lower(), package, name, editable)) + + app_entries.sort() + + settings = [] + + for _, _, package, name, editable in app_entries: + is_adb_only = _use_adb and package in adb_apps and package not in Apps + + settings.append( + { + "id": f"{package}_enabled", + "label": { + "en": name if not is_adb_only else f"{package}", + "de": name if not is_adb_only else f"{package}", + "fr": name if not is_adb_only else f"{package}", + }, + "field": {"checkbox": {"value": False}}, + } + ) + + if is_adb_only or editable: + settings.append( + { + "id": f"{package}_name", + "label": { + "en": f"Friendly name for {package}", + "de": f"Anzeigename für {package}", + "fr": f"Nom convivial pour {package}", + }, + "field": {"text": {"value": name}}, + } + ) + + _setup_step = SetupSteps.APP_SELECTION + return RequestUserInput( + title={ + "en": "Select visible apps", + "de": "Wähle sichtbare Apps", + "fr": "Sélectionnez les applications visibles", + }, + settings=settings, + ) + + +async def handle_app_selection(msg: UserDataResponse) -> SetupComplete | SetupError: + global _pairing_android_tv + + selected_apps = {} + + for field_id, value in msg.input_values.items(): + if field_id.endswith("_enabled") and str(value).lower() == "true": + package = field_id.removesuffix("_enabled") + name_field = f"{package}_name" + friendly_name = msg.input_values.get(name_field, package) + + # Prefer static app URL if it exists + static_entry = Apps.get(friendly_name) or Apps.get(package) + url = static_entry["url"] if static_entry else f"market://launch?id={package}" + + selected_apps[friendly_name] = {"url": url} + + filename = f"appslist_{_pairing_android_tv.identifier}.json" + apps_file = _get_config_root() / filename + try: + apps_file.write_text(json.dumps(selected_apps, indent=2)) + _LOG.info("App selection stored: %s", selected_apps) + except Exception as e: + _LOG.error("Failed to write selected apps: %s", e) + return SetupError() + device = AtvDevice( id=_pairing_android_tv.identifier, name=_pairing_android_tv.name, address=_pairing_android_tv.address, + manufacturer=_device_info.get("manufacturer", ""), + model=_device_info.get("model", ""), use_external_metadata=_use_external_metadata, use_chromecast=_use_chromecast, use_chromecast_volume=_use_chromecast_volume, - manufacturer=device_info.get("manufacturer", ""), - model=device_info.get("model", ""), volume_step=_volume_step, + use_adb=_use_adb, ) + _LOG.debug("Created AtvDevice: %s", device) + + config.devices.add_or_update(device) + _LOG.debug("Device added/updated in configuration") - config.devices.add_or_update(device) # triggers AndroidTv instance creation config.devices.store() + _LOG.debug("Configuration stored") - # ATV device connection will be triggered with subscribe_entities request - _pairing_android_tv = None await asyncio.sleep(1) _LOG.info("[%s] Setup successfully completed for %s", device.name, device.id) + + _pairing_android_tv = None + return SetupComplete() @@ -588,11 +761,13 @@ async def _handle_device_reconfigure( use_chromecast_volume = msg.input_values.get("chromecast_volume", "false") == "true" use_external_metadata = msg.input_values.get("external_metadata", "false") == "true" volume_step = int(msg.input_values.get("volume_step", 10)) + use_adb = msg.input_values.get("adb", "false") == "true" _LOG.debug("User has changed configuration") _reconfigured_device.use_chromecast = use_chromecast _reconfigured_device.use_chromecast_volume = use_chromecast_volume _reconfigured_device.use_external_metadata = use_external_metadata + _reconfigured_device.use_adb = use_adb _reconfigured_device.volume_step = volume_step config.devices.add_or_update(_reconfigured_device) # triggers ATV instance update @@ -660,9 +835,21 @@ def __cfg_external_metadata(enabled: bool): return { "id": "external_metadata", "label": { - "en": "Preview feature: Enable external Google Play metadata", - "de": "Vorschaufunktion: Aktiviere externe Google Play Metadaten", - "fr": "Fonctionnalité en aperçu: Activer les métadonnées externes de Google Play", + "en": "Preview feature: Enable external metadata", + "de": "Vorschaufunktion: Aktiviere externe Metadaten", + "fr": "Fonctionnalité en aperçu: Activer les métadonnées externes", + }, + "field": {"checkbox": {"value": enabled}}, + } + + +def __cfg_adb(enabled: bool): + return { + "id": "adb", + "label": { + "en": "Preview feature: Enable ADB connection (for app list)", + "de": "Vorschaufunktion: Aktiviere ADB Verbindung (für App-Browsing)", + "fr": "Fonctionnalité en aperçu: Activer la connexion ADB (pour la navigation dans les applications)", }, "field": {"checkbox": {"value": enabled}}, } diff --git a/src/tv.py b/src/tv.py index ab28acf..8a609b1 100644 --- a/src/tv.py +++ b/src/tv.py @@ -8,6 +8,7 @@ # pylint: disable=too-many-lines import asyncio +import json import logging import os import socket @@ -49,8 +50,12 @@ import apps import discover import inputs -from config import AtvDevice -from external_metadata import encode_icon_to_data_uri, get_app_metadata +from config import AtvDevice, _get_config_root +from external_metadata import ( + encode_icon_to_data_uri, + get_app_metadata, + get_best_artwork, +) from profiles import KeyPress, Profile from util import filter_data_img_properties @@ -232,6 +237,8 @@ def __init__( self._use_app_url = not device_config.use_chromecast self._player_state = media_player.States.ON self._muted = False + self._is_chromecast_playing = False + self._chromecast_metadata_active = False def __del__(self): """Destructs instance, disconnect AndroidTVRemote.""" @@ -599,111 +606,18 @@ def disconnect(self) -> None: self.events.emit(Events.DISCONNECTED, self._identifier) # Callbacks - async def _apply_current_app_metadata(self, current_app: str) -> dict: - global HOMESCREEN_IMAGE - - update = {} - # one-time initialization - if HOMESCREEN_IMAGE is None: - HOMESCREEN_IMAGE = "" - HOMESCREEN_IMAGE = await encode_icon_to_data_uri("config://androidtv.png") - - # Special handling for homescreen & Android TV system apps: show pre-defined icon - homescreen_app = apps.is_homescreen_app(current_app) - if homescreen_app or apps.is_standby_app(current_app): - update[MediaAttr.SOURCE] = apps.IdMappings[current_app] - update[MediaAttr.MEDIA_TITLE] = "" - update[MediaAttr.MEDIA_IMAGE_URL] = HOMESCREEN_IMAGE - update[MediaAttr.STATE] = ( - media_player.States.ON.value if homescreen_app else media_player.States.STANDBY.value - ) - return update - - # Track state of data sources - offline_name = None - offline_match = None - external_name = None - external_icon = None - - # Try offline ID mapping first - if current_app in apps.IdMappings: - offline_name = apps.IdMappings[current_app] - self._media_app = offline_name - - # Try fuzzy offline name matching if ID mapping failed - if not offline_name: - for query, name in apps.NameMatching.items(): - if query in current_app: - offline_match = name - self._media_app = name - break - - # Try external metadata - metadata = ( - await get_app_metadata(current_app) if current_app and self._device_config.use_external_metadata else None - ) - if metadata: - if _LOG.isEnabledFor(logging.DEBUG): - _LOG.debug("App metadata: %s", filter_data_img_properties(metadata)) - external_name = metadata.get("name") - external_icon = metadata.get("icon") - if external_name: - self._media_app = external_name - if external_icon: - self._app_image_url = external_icon - - # Determine final name/title to use - name_to_use = offline_name or offline_match or external_name or current_app - # TODO why set name to both source & media title fields? - update[MediaAttr.SOURCE] = name_to_use - if not self._media_title and not self._media_image_url: - update[MediaAttr.MEDIA_TITLE] = name_to_use - - # Determine which icon to use - icon_to_use = None - if self._device_config.use_external_metadata or self._use_app_url: - if external_icon: - icon_to_use = external_icon - else: - icon_to_use = "" - elif self._media_image_url: - icon_to_use = await encode_icon_to_data_uri(self._media_image_url) - - update[MediaAttr.STATE] = media_player.States.PLAYING.value - # Skip applying app icon if media image from cast is present - if not self._media_image_url: - if not icon_to_use: - update[MediaAttr.MEDIA_IMAGE_URL] = HOMESCREEN_IMAGE - else: - update[MediaAttr.MEDIA_IMAGE_URL] = icon_to_use - - return update - - def _is_on_updated(self, is_on: bool) -> None: - """Notify that the Android TV power state is updated.""" - asyncio.create_task(self._handle_is_on_updated(is_on)) - async def _handle_is_on_updated(self, is_on: bool): - _LOG.info("[%s] is on: %s", self.log_id, is_on) - current_app = self._atv.current_app or "" - if is_on: - self._chromecast_connect() - update = await self._apply_current_app_metadata(current_app) - update[MediaAttr.STATE] = media_player.States.ON.value - else: - update = await self._apply_current_app_metadata(current_app) - update[MediaAttr.STATE] = media_player.States.OFF.value - - self.events.emit(Events.UPDATE, self._identifier, update) - - def _current_app_updated(self, current_app: str) -> None: - """Notify that the current app on Android TV is updated.""" - asyncio.create_task(self._handle_current_app_updated(current_app)) + def _is_on_updated(self, _is_on: bool) -> None: + if not self._loop or not self._loop.is_running(): + _LOG.warning("[%s] No running event loop for power update", self.log_id) + return + asyncio.run_coroutine_threadsafe(self._update_media_status(), self._loop) - async def _handle_current_app_updated(self, current_app: str): - _LOG.debug("[%s] current_app: %s", self.log_id, current_app) - update = await self._apply_current_app_metadata(current_app) - self.events.emit(Events.UPDATE, self._identifier, update) + def _current_app_updated(self, _current_app: str) -> None: + if not self._loop or not self._loop.is_running(): + _LOG.warning("[%s] No running event loop for app update", self.log_id) + return + asyncio.run_coroutine_threadsafe(self._update_media_status(), self._loop) def _volume_info_updated(self, volume_info: dict[str, str | bool]) -> None: """Notify that the Android TV volume information is updated.""" @@ -721,8 +635,21 @@ def _is_available_updated(self, is_available: bool): def _update_app_list(self) -> None: update = {} source_list = [] - for app in apps.Apps: - source_list.append(app) + + filename = f"appslist_{self._identifier}.json" + apps_file = _get_config_root() / filename + + if apps_file.exists(): + try: + with apps_file.open("r", encoding="utf-8") as f: + selected_apps = json.load(f) + source_list.extend(selected_apps.keys()) + except Exception as e: + _LOG.warning("Failed to read apps list from %s: %s", apps_file, e) + else: + _LOG.info("No saved app list found for %s, falling back to default", self._identifier) + + source_list.extend(apps.Apps.keys()) update[MediaAttr.SOURCE_LIST] = source_list self.events.emit(Events.UPDATE, self._identifier, update) @@ -777,15 +704,33 @@ async def turn_off(self) -> ucapi.StatusCodes: async def select_source(self, source: str) -> ucapi.StatusCodes: """ - Select a given source, either a pre-defined app, input or by app-link/id. + Launch an app on the Android TV or select in input source on a TV running Android TV. + + Select a given source, either a user-defined app (from JSON), + an input source (KeyCode), or directly by app-link / id. :param source: the friendly source name or an app-link / id """ - if source in apps.Apps: - return await self._launch_app(apps.Apps[source]["url"]) + # Load saved apps for this device + apps_file = _get_config_root() / f"appslist_{self._identifier}.json" + apps_list = {} + + if apps_file.exists(): + try: + with apps_file.open("r", encoding="utf-8") as f: + apps_list = json.load(f) + except Exception as e: + _LOG.warning("Failed to read apps list for %s: %s", self._identifier, e) + + # Match known friendly app name + if source in apps_list: + return await self._launch_app(apps_list[source]["url"]) + + # Match input source if source in inputs.KeyCode: return await self._switch_input(source) + # Fall back to direct launch (e.g. package name or intent URI) return await self._launch_app(source) @async_handle_atvlib_errors @@ -809,6 +754,17 @@ async def _send_command(self, keycode: int | str, action: KeyPress = KeyPress.SH :return: OK if scheduled to be sent, other error code in case of an error """ # noqa + if action == "TEXT": + # Special handling for text input + if not isinstance(keycode, str): + _LOG.error("[%s] Cannot send command, invalid key_code: %s", self.log_id, keycode) + return ucapi.StatusCodes.BAD_REQUEST + if keycode in ("DEL", "ENTER"): + self._atv.send_key_command(keycode, "SHORT") + else: + self._atv.send_text(keycode) + return ucapi.StatusCodes.OK + if action in (KeyPress.LONG, KeyPress.BEGIN): direction = "START_LONG" elif action == KeyPress.END: @@ -832,6 +788,12 @@ async def _launch_app(self, app: str) -> ucapi.StatusCodes: self._atv.send_launch_app_command(app) return ucapi.StatusCodes.OK + @async_handle_atvlib_errors + async def _send_text(self, text: str) -> ucapi.StatusCodes: + """Launch an app on Android TV.""" + self._atv.send_text(text) + return ucapi.StatusCodes.OK + async def _switch_input(self, source: str) -> ucapi.StatusCodes: """ TEST FUNCTION: Send a KEYCODE_TV_INPUT_* key. @@ -849,84 +811,9 @@ def new_connection_status(self, status: ConnectionStatus) -> None: def new_media_status(self, status: MediaStatus) -> None: """Receive new media status event from Google cast.""" if not self._loop or not self._loop.is_running(): - _LOG.warning("[%s] No running event loop for handling new media status", self.log_id) + _LOG.warning("[%s] No running event loop for Chromecast status", self.log_id) return - - try: - asyncio.run_coroutine_threadsafe(self._handle_new_media_status(status), self._loop) - except Exception as e: - _LOG.error("[%s] Failed to schedule media status handler: %s", self.log_id, e) - - async def _handle_new_media_status(self, status: MediaStatus): - update = {} - - if ( - status.player_state - and GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) != self._player_state - ): - # PLAYING, PAUSED, IDLE - self._player_state = GOOGLE_CAST_MEDIA_STATES_MAP.get(status.player_state, media_player.States.PLAYING) - self._last_update_position_time = 0 - update[MediaAttr.STATE] = self._player_state - - if status.album_name != self._media_album: - self._media_album = status.album_name or "" - update[MediaAttr.MEDIA_ALBUM] = self._media_album - - if status.artist != self._media_artist: - self._media_artist = status.artist or "" - update[MediaAttr.MEDIA_ARTIST] = self._media_artist - - if status.title != self._media_title: - current_title = self.media_title - self._media_title = status.title or "" - if current_title != self.media_title: - _LOG.debug("[%s] Chromecast Media info updated : %s", self.log_id, status) - update[MediaAttr.MEDIA_TITLE] = self.media_title - - current_time = int(status.current_time) if status.current_time else 0 - duration = int(status.duration) if status.duration else 0 - changed_duration = False - - if duration != self._media_duration: - self._media_duration = duration - update[MediaAttr.MEDIA_DURATION] = self._media_duration - changed_duration = True - - # Update position every 30 seconds - if changed_duration or ( - current_time != self._media_position and self._last_update_position_time + 30 < time.time() - ): - self._media_position = current_time - update[MediaAttr.MEDIA_POSITION] = self._media_position - update[MediaAttr.MEDIA_DURATION] = self._media_duration - self._last_update_position_time = time.time() - - if ( - status.metadata_type - and GOOGLE_CAST_MEDIA_TYPES_MAP.get(status.metadata_type, MediaType.VIDEO) != self._media_type - ): - self._media_type = GOOGLE_CAST_MEDIA_TYPES_MAP.get(status.metadata_type, MediaType.VIDEO) - update[MediaAttr.MEDIA_TYPE] = self._media_type - - if status.images and len(status.images) > 0 and status.images[0].url != self._media_image_url: - self._media_image_url = status.images[0].url - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri(self._media_image_url) - self._use_app_url = False - else: - self._media_image_url = None - if self._device_config.use_external_metadata: - self._use_app_url = True - if self._app_image_url: - update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri(self._app_image_url) - - if update: - if _LOG.isEnabledFor(logging.DEBUG): - _LOG.debug( - "[%s] Update remote with Chromecast info : %s", self.log_id, filter_data_img_properties(update) - ) - - self.events.emit(Events.UPDATE, self._identifier, update) + asyncio.run_coroutine_threadsafe(self._update_media_status(), self._loop) def load_media_failed(self, queue_item_id: int, error_code: int) -> None: """Receive new media failed event from Google cast.""" @@ -1027,3 +914,151 @@ async def volume_set(self, volume: int | None) -> ucapi.StatusCodes: except PyChromecastError as ex: _LOG.error("[%s] Chromecast error sending command : %s", self.log_id, ex) return ucapi.StatusCodes.SERVER_ERROR + + async def _update_media_status(self): + global HOMESCREEN_IMAGE + + update = {} + + # Initialize homescreen icon once + if HOMESCREEN_IMAGE is None: + HOMESCREEN_IMAGE = await encode_icon_to_data_uri("config://androidtv.png") + + current_app = self._atv.current_app or "" + is_on = self._atv.is_on + homescreen_app = apps.is_homescreen_app(current_app) + standby_app = apps.is_standby_app(current_app) + + # Device is OFF + if not is_on: + self._chromecast_metadata_active = False + update.update( + { + MediaAttr.STATE: media_player.States.OFF.value, + MediaAttr.MEDIA_TITLE: "", + MediaAttr.SOURCE: "", + MediaAttr.MEDIA_IMAGE_URL: "", + } + ) + self.events.emit(Events.UPDATE, self._identifier, update) + return + + # Device on homescreen or standby app + if homescreen_app or standby_app: + self._chromecast_metadata_active = False + update.update( + { + MediaAttr.STATE: ( + media_player.States.ON.value if homescreen_app else media_player.States.STANDBY.value + ), + MediaAttr.SOURCE: "", + MediaAttr.MEDIA_TITLE: "", + MediaAttr.MEDIA_IMAGE_URL: "", + } + ) + self.events.emit(Events.UPDATE, self._identifier, update) + return + + # Chromecast status (only if enabled) + chromecast_playing_states = ( + MEDIA_PLAYER_STATE_PLAYING, + MEDIA_PLAYER_STATE_PAUSED, + MEDIA_PLAYER_STATE_BUFFERING, + ) + + chromecast_active = False + chromecast_status = None + + if self._device_config.use_chromecast and self._chromecast: + chromecast_status = self._chromecast.media_controller.status + chromecast_active = ( + self._chromecast.socket_client.is_connected + and chromecast_status + and chromecast_status.player_state in chromecast_playing_states + and (chromecast_status.title or chromecast_status.images) + ) + + self._chromecast_metadata_active = chromecast_active + + # App metadata from offline mappings + offline_name = apps.IdMappings.get(current_app) + offline_match = next((name for query, name in apps.NameMatching.items() if query in current_app), None) + + external_name = None + external_icon = None + + # External metadata (if enabled) + if self._device_config.use_external_metadata: + metadata = await get_app_metadata(current_app) + if metadata: + external_name = metadata.get("name") + external_icon = metadata.get("icon") + _LOG.debug("External app metadata: %s", filter_data_img_properties(metadata)) + + app_name = external_name or offline_name or offline_match or current_app + self._media_app = app_name + self._app_image_url = external_icon or "" + + update[MediaAttr.SOURCE] = app_name + + # --- Chromecast Active State Handling --- + if chromecast_active: + # Media Title from Chromecast (fallback to app_name) + self._media_title = ( + chromecast_status.title if chromecast_status.title is not None else chromecast_status.artist or "" + ) + update[MediaAttr.MEDIA_TITLE] = self._media_title or chromecast_status.artist + + # Media Image from Chromecast + if chromecast_status.images and chromecast_status.images[0].url: + self._media_image_url = chromecast_status.images[0].url + + # External Artwork fallback (if enabled) + elif self._device_config.use_external_metadata and chromecast_status.player_state in [ + "PLAYING", + "PAUSED", + "BUFFERING", + ]: + self._media_image_url = ( + await get_best_artwork(chromecast_status.title, chromecast_status.artist, current_app) + or external_icon + or HOMESCREEN_IMAGE + ) + else: + self._media_image_url = external_icon or HOMESCREEN_IMAGE + + update[MediaAttr.MEDIA_IMAGE_URL] = await encode_icon_to_data_uri(self._media_image_url) + + update[MediaAttr.STATE] = GOOGLE_CAST_MEDIA_STATES_MAP.get( + chromecast_status.player_state, media_player.States.PLAYING + ) + + update.update( + { + MediaAttr.MEDIA_ARTIST: chromecast_status.artist or "", + MediaAttr.MEDIA_ALBUM: chromecast_status.album_name or "", + MediaAttr.MEDIA_POSITION: int(chromecast_status.current_time or 0), + MediaAttr.MEDIA_DURATION: int(chromecast_status.duration or 0), + MediaAttr.SOURCE: app_name, + } + ) + + # --- Chromecast Inactive or Disabled: App-level metadata only --- + else: + self._media_title = app_name + self._media_image_url = self._app_image_url or HOMESCREEN_IMAGE + + update.update( + { + MediaAttr.MEDIA_TITLE: app_name, + MediaAttr.MEDIA_IMAGE_URL: await encode_icon_to_data_uri(self._media_image_url), + MediaAttr.STATE: media_player.States.PLAYING.value, + MediaAttr.SOURCE: app_name, + MediaAttr.MEDIA_ARTIST: "", + MediaAttr.MEDIA_ALBUM: "", + MediaAttr.MEDIA_POSITION: 0, + MediaAttr.MEDIA_DURATION: 0, + } + ) + + self.events.emit(Events.UPDATE, self._identifier, update)