diff --git a/main.py b/main.py index 8cdc34b2..2f0cc844 100755 --- a/main.py +++ b/main.py @@ -35,7 +35,9 @@ def check_system_requirements( - device_type: DeviceType = DeviceType.ADB, wda_url: str = "http://localhost:8100" + device_type: DeviceType = DeviceType.ADB, + wda_url: str = "http://localhost:8100", + device_id: str | None = None, ) -> bool: """ Check system requirements before running the agent. @@ -195,8 +197,11 @@ def check_system_requirements( if device_type == DeviceType.ADB: print("3. Checking ADB Keyboard...", end=" ") try: + adb_cmd = ["adb"] + if device_id: + adb_cmd.extend(["-s", device_id]) result = subprocess.run( - ["adb", "shell", "ime", "list", "-s"], + adb_cmd + ["shell", "ime", "list", "-s"], capture_output=True, text=True, timeout=10, @@ -269,6 +274,24 @@ def check_system_requirements( return all_passed +def auto_select_adb_device_id() -> str | None: + """Auto-select the first healthy ADB device.""" + try: + result = subprocess.run( + ["adb", "devices"], + capture_output=True, + text=True, + encoding="utf-8", + timeout=5, + ) + for line in result.stdout.splitlines()[1:]: + if "\tdevice" in line: + return line.split("\t", 1)[0].strip() + except Exception: + return None + return None + + def check_model_api(base_url: str, model_name: str, api_key: str = "EMPTY") -> bool: """ Check if the model API is accessible and the specified model exists. @@ -731,12 +754,20 @@ def main(): if handle_device_commands(args): return + # Auto-select a healthy ADB device if not explicitly specified. + if device_type == DeviceType.ADB and not args.device_id: + auto_device_id = auto_select_adb_device_id() + if auto_device_id: + args.device_id = auto_device_id + print(f"Auto-selected ADB device: {args.device_id}") + # Run system requirements check before proceeding if not check_system_requirements( device_type, wda_url=args.wda_url if device_type == DeviceType.IOS else "http://localhost:8100", + device_id=args.device_id if device_type == DeviceType.ADB else None, ): sys.exit(1) diff --git a/phone_agent/adb/device.py b/phone_agent/adb/device.py index 995336a1..75767bfa 100644 --- a/phone_agent/adb/device.py +++ b/phone_agent/adb/device.py @@ -21,21 +21,60 @@ def get_current_app(device_id: str | None = None) -> str: """ adb_prefix = _get_adb_prefix(device_id) - result = subprocess.run( - adb_prefix + ["shell", "dumpsys", "window"], capture_output=True, text=True, encoding="utf-8" + # Different Android builds expose focus info in different dumpsys subcommands. + focus_commands = [ + ["shell", "dumpsys", "window", "displays"], + ["shell", "dumpsys", "window", "windows"], + ["shell", "dumpsys", "window"], + ["shell", "dumpsys", "activity", "activities"], + ["shell", "dumpsys", "activity", "top"], + ] + focus_markers = ( + "mCurrentFocus", + "mFocusedApp", + "mResumedActivity", + "topResumedActivity", ) - output = result.stdout - if not output: - raise ValueError("No output from dumpsys window") - # Parse window focus info - for line in output.split("\n"): - if "mCurrentFocus" in line or "mFocusedApp" in line: - for app_name, package in APP_PACKAGES.items(): - if package in line: - return app_name + diagnostics: list[str] = [] + has_any_output = False - return "System Home" + for cmd in focus_commands: + result = subprocess.run( + adb_prefix + cmd, + capture_output=True, + text=True, + encoding="utf-8", + ) + stdout = result.stdout or "" + stderr = (result.stderr or "").strip() + + if not stdout.strip(): + if result.returncode != 0 or stderr: + diagnostics.append( + f"{' '.join(cmd)} failed (code={result.returncode}): {stderr or 'no stderr'}" + ) + continue + + has_any_output = True + + # Prefer lines that explicitly contain focus markers. + for line in stdout.splitlines(): + if any(marker in line for marker in focus_markers): + for app_name, package in APP_PACKAGES.items(): + if package in line: + return app_name + + # Fallback: match package anywhere in command output. + for app_name, package in APP_PACKAGES.items(): + if package in stdout: + return app_name + + if has_any_output: + return "System Home" + + summary = "; ".join(diagnostics[:3]) if diagnostics else "all dumpsys commands returned empty output" + raise ValueError(f"Unable to get current app: {summary}") def tap( @@ -249,4 +288,25 @@ def _get_adb_prefix(device_id: str | None) -> list: """Get ADB command prefix with optional device specifier.""" if device_id: return ["adb", "-s", device_id] + auto_device_id = _auto_select_device_id() + if auto_device_id: + return ["adb", "-s", auto_device_id] return ["adb"] + + +def _auto_select_device_id() -> str | None: + """Select the first healthy ADB device when device_id is not provided.""" + try: + result = subprocess.run( + ["adb", "devices"], + capture_output=True, + text=True, + encoding="utf-8", + timeout=5, + ) + for line in result.stdout.splitlines()[1:]: + if "\tdevice" in line: + return line.split("\t", 1)[0].strip() + except Exception: + return None + return None diff --git a/phone_agent/adb/input.py b/phone_agent/adb/input.py index 4c1c68cd..4d39c9e7 100644 --- a/phone_agent/adb/input.py +++ b/phone_agent/adb/input.py @@ -106,4 +106,25 @@ def _get_adb_prefix(device_id: str | None) -> list: """Get ADB command prefix with optional device specifier.""" if device_id: return ["adb", "-s", device_id] + auto_device_id = _auto_select_device_id() + if auto_device_id: + return ["adb", "-s", auto_device_id] return ["adb"] + + +def _auto_select_device_id() -> str | None: + """Select the first healthy ADB device when device_id is not provided.""" + try: + result = subprocess.run( + ["adb", "devices"], + capture_output=True, + text=True, + encoding="utf-8", + timeout=5, + ) + for line in result.stdout.splitlines()[1:]: + if "\tdevice" in line: + return line.split("\t", 1)[0].strip() + except Exception: + return None + return None diff --git a/phone_agent/adb/screenshot.py b/phone_agent/adb/screenshot.py index bdc5b092..e4519df5 100644 --- a/phone_agent/adb/screenshot.py +++ b/phone_agent/adb/screenshot.py @@ -49,9 +49,19 @@ def get_screenshot(device_id: str | None = None, timeout: int = 10) -> Screensho timeout=timeout, ) - # Check for screenshot failure (sensitive screen) + # Check for screenshot failure due to sensitive/secure screens. + # Generic command failures (e.g., transport/device selection issues) + # should not be treated as sensitive screens. output = result.stdout + result.stderr - if "Status: -1" in output or "Failed" in output: + output_lower = output.lower() + if result.returncode != 0: + return _create_fallback_screenshot(is_sensitive=False) + if ( + "status: -1" in output_lower + or "sensitive screen" in output_lower + or "secure screen" in output_lower + or "security" in output_lower + ): return _create_fallback_screenshot(is_sensitive=True) # Pull screenshot to local temp path @@ -89,9 +99,30 @@ def _get_adb_prefix(device_id: str | None) -> list: """Get ADB command prefix with optional device specifier.""" if device_id: return ["adb", "-s", device_id] + auto_device_id = _auto_select_device_id() + if auto_device_id: + return ["adb", "-s", auto_device_id] return ["adb"] +def _auto_select_device_id() -> str | None: + """Select the first healthy ADB device when device_id is not provided.""" + try: + result = subprocess.run( + ["adb", "devices"], + capture_output=True, + text=True, + encoding="utf-8", + timeout=5, + ) + for line in result.stdout.splitlines()[1:]: + if "\tdevice" in line: + return line.split("\t", 1)[0].strip() + except Exception: + return None + return None + + def _create_fallback_screenshot(is_sensitive: bool) -> Screenshot: """Create a black fallback image when screenshot fails.""" default_width, default_height = 1080, 2400