-
Notifications
You must be signed in to change notification settings - Fork 10
Update #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Update #77
Changes from all commits
9ba1b32
a558e10
15df3fa
b20e500
74e8fd4
4709da4
27ec593
1e2c497
a7f53d2
fe13f34
3ef5ba7
17c2690
95ecaaf
ced371e
9573f72
ab015f7
4ead16b
fdefbfb
ca21a78
b0ae053
2b621da
3739dd7
a799181
e35a2f8
f57eb1e
447086e
5044cd3
b76d49b
1a01857
ca2c5d4
0f0f851
9d2525a
0405df2
00de323
431d54c
e05a1dc
4af1e62
e655722
9d89387
c7c2887
ca4cf4b
7e0d892
d3d2f94
bbcdb4d
e98db09
dd7bf9c
f9cb0db
723c07f
cce5d6f
6c38afc
77c3057
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| """ | ||
| This module provides utilities for interacting with Android TVs via ADB (Android Debug Bridge). | ||
|
|
||
| It includes functions for managing ADB keys, connecting to devices, retrieving installed apps, | ||
| and verifying device authorization. | ||
| """ | ||
|
|
||
| import os | ||
| from pathlib import Path | ||
| from typing import Dict, Optional | ||
|
|
||
| from adb_shell.adb_device_async import AdbDeviceTcpAsync | ||
| from adb_shell.auth.keygen import keygen | ||
| from adb_shell.auth.sign_pythonrsa import PythonRSASigner | ||
|
|
||
| ADB_CERTS_DIR = Path(os.environ.get("UC_CONFIG_HOME", "./config")) / "certs" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the defined get_config_root() function in config.py to get the configuration path to avoid duplicating logic. |
||
| ADB_CERTS_DIR.mkdir(parents=True, exist_ok=True) | ||
|
|
||
|
|
||
| def get_adb_key_paths(device_id: str) -> tuple[Path, Path]: | ||
| """ | ||
| Return the paths to the private and public ADB keys for a given device. | ||
|
|
||
| Args: | ||
| device_id (str): The unique identifier for the device. | ||
|
|
||
| Returns: | ||
| tuple[Path, Path]: Paths to the private and public key files. | ||
| """ | ||
| priv = ADB_CERTS_DIR / f"adb_{device_id}" | ||
| pub = ADB_CERTS_DIR / f"adb_{device_id}.pub" | ||
| return priv, pub | ||
|
|
||
|
|
||
| def load_or_generate_adb_keys(device_id: str) -> PythonRSASigner: | ||
| """ | ||
| Ensure ADB RSA keys exist for the device and return the signer. | ||
|
|
||
| Args: | ||
| device_id (str): The unique identifier for the device. | ||
|
|
||
| Returns: | ||
| PythonRSASigner: The signer object for ADB authentication. | ||
| """ | ||
| priv_path, pub_path = get_adb_key_paths(device_id) | ||
|
|
||
| if not priv_path.exists() or not pub_path.exists(): | ||
| keygen(str(priv_path)) | ||
|
|
||
| with open(priv_path, encoding="utf-8") as f: | ||
| priv = f.read() | ||
| with open(pub_path, encoding="utf-8") as f: | ||
| pub = f.read() | ||
| return PythonRSASigner(pub, priv) | ||
|
|
||
|
|
||
| async def adb_connect(device_id: str, host: str, port: int = 5555) -> Optional[AdbDeviceTcpAsync]: | ||
| """ | ||
| Connect to an Android device via ADB. | ||
|
|
||
| Args: | ||
| device_id (str): The unique identifier for the device. | ||
| host (str): The IP address or hostname of the device. | ||
| port (int, optional): The port number for the ADB connection. Defaults to 5555. | ||
|
|
||
| Returns: | ||
| Optional[AdbDeviceTcpAsync]: The connected ADB device object, or None if the connection fails. | ||
| """ | ||
| signer = load_or_generate_adb_keys(device_id) | ||
| device = AdbDeviceTcpAsync(host, port, default_transport_timeout_s=9.0) | ||
|
|
||
| try: | ||
| await device.connect(rsa_keys=[signer], auth_timeout_s=20) | ||
| return device | ||
| except Exception as e: | ||
| print(f"ADB connection failed to {host}:{port} — {e}") | ||
| return None | ||
|
|
||
|
|
||
| async def get_installed_apps(device: AdbDeviceTcpAsync) -> Dict[str, Dict[str, str]]: | ||
| """ | ||
| Retrieve a list of installed non-system apps in a structured format. | ||
|
|
||
| Args: | ||
| device (AdbDeviceTcpAsync): The connected ADB device. | ||
|
|
||
| Returns: | ||
| Dict[str, Dict[str, str]]: A dictionary of app package names and their metadata. | ||
| """ | ||
| output = await device.shell("pm list packages -3 -e") | ||
| packages = sorted(line.replace("package:", "").strip() for line in output.splitlines()) | ||
| return {package: {"url": f"market://launch?id={package}"} for package in packages} | ||
|
|
||
|
|
||
| async def is_authorised(device: AdbDeviceTcpAsync) -> bool: | ||
| """ | ||
| Check if the connected device is authorized for ADB communication. | ||
|
|
||
| Args: | ||
| device (AdbDeviceTcpAsync): The connected ADB device. | ||
|
|
||
| Returns: | ||
| bool: True if the device is authorized, False otherwise. | ||
| """ | ||
| try: | ||
| result = await device.shell("echo ADB_OK") | ||
| return "ADB_OK" in result | ||
| except Exception: | ||
| return False | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,13 +11,27 @@ | |
| import logging | ||
| import os | ||
| from dataclasses import dataclass | ||
| from pathlib import Path | ||
| from typing import Iterator | ||
|
|
||
| _LOG = logging.getLogger(__name__) | ||
|
|
||
| _CFG_FILENAME = "config.json" | ||
|
|
||
|
|
||
| # Paths | ||
| def _get_config_root() -> Path: | ||
| config_home = Path(os.environ.get("UC_CONFIG_HOME", "./config")) | ||
| config_home.mkdir(parents=True, exist_ok=True) | ||
| return config_home | ||
|
|
||
|
|
||
| def _get_data_root() -> Path: | ||
|
Comment on lines
+23
to
+29
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exported functions should not be prefixed with
|
||
| data_home = Path(os.environ.get("UC_DATA_HOME", "./data")) | ||
| data_home.mkdir(parents=True, exist_ok=True) | ||
| return data_home | ||
|
|
||
|
|
||
| @dataclass | ||
| class AtvDevice: | ||
| """Android TV device configuration.""" | ||
|
|
@@ -38,6 +52,8 @@ class AtvDevice: | |
| """Enable External Metadata.""" | ||
| use_chromecast: bool = False | ||
| """Enable Chromecast features.""" | ||
| use_adb: bool = False | ||
| """Enable ADB features.""" | ||
| use_chromecast_volume: bool = False | ||
| """Enable volume driven by Chromecast protocol.""" | ||
| volume_step: int = 10 | ||
|
|
@@ -137,26 +153,27 @@ def update(self, atv: AtvDevice) -> bool: | |
| item.auth_error = atv.auth_error | ||
| item.use_external_metadata = atv.use_external_metadata | ||
| item.use_chromecast = atv.use_chromecast | ||
| item.use_adb = atv.use_adb | ||
| item.use_chromecast_volume = atv.use_chromecast_volume | ||
| item.volume_step = atv.volume_step if atv.volume_step else 10 | ||
| return self.store() | ||
| return False | ||
|
|
||
| def default_certfile(self) -> str: | ||
| """Return the default certificate file for initializing a device.""" | ||
| return os.path.join(self._data_path, "androidtv_remote_cert.pem") | ||
| return os.path.join(self._data_path + "/certs", "androidtv_remote_cert.pem") | ||
|
|
||
| def default_keyfile(self) -> str: | ||
| """Return the default key file for initializing a device.""" | ||
| return os.path.join(self._data_path, "androidtv_remote_key.pem") | ||
| return os.path.join(self._data_path + "/certs", "androidtv_remote_key.pem") | ||
|
|
||
| def certfile(self, atv_id: str) -> str: | ||
| """Return the certificate file of the device.""" | ||
| return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_cert.pem") | ||
| return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_cert.pem") | ||
|
|
||
| def keyfile(self, atv_id: str) -> str: | ||
| """Return the key file of the device.""" | ||
| return os.path.join(self._data_path, f"androidtv_{atv_id}_remote_key.pem") | ||
| return os.path.join(self._data_path + "/certs", f"androidtv_{atv_id}_remote_key.pem") | ||
|
|
||
| def remove(self, atv_id: str) -> bool: | ||
| """Remove the given device configuration.""" | ||
|
|
@@ -242,6 +259,7 @@ def load(self) -> bool: | |
| item.get("auth_error", False), | ||
| item.get("use_external_metadata", False), | ||
| item.get("use_chromecast", False), | ||
| item.get("use_adb", False), | ||
| item.get("use_chromecast_volume", False), | ||
| item.get("volume_step", 10), | ||
| ) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The https://github.com/Electronic-Mango/simple-justwatch-python-api library is licensed under GPLv3. Unfortunately we cannot include this in our device firmware.
Please either remove it or replace it with another library licensed under BSD, MIT, Apache-2.0 or MPL-2.0