Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@


def check_system_requirements(
device_type: DeviceType = DeviceType.ADB, wda_url: str = "http://localhost:8100"
device_type: DeviceType = DeviceType.ADB,
wda_url: str = "http://localhost:8100",
device_id: str | None = None,
) -> bool:
"""
Check system requirements before running the agent.
Expand Down Expand Up @@ -195,8 +197,11 @@ def check_system_requirements(
if device_type == DeviceType.ADB:
print("3. Checking ADB Keyboard...", end=" ")
try:
adb_cmd = ["adb"]
if device_id:
adb_cmd.extend(["-s", device_id])
result = subprocess.run(
["adb", "shell", "ime", "list", "-s"],
adb_cmd + ["shell", "ime", "list", "-s"],
capture_output=True,
text=True,
timeout=10,
Expand Down Expand Up @@ -269,6 +274,24 @@ def check_system_requirements(
return all_passed


def auto_select_adb_device_id() -> str | None:
"""Auto-select the first healthy ADB device."""
try:
result = subprocess.run(
["adb", "devices"],
capture_output=True,
text=True,
encoding="utf-8",
timeout=5,
)
for line in result.stdout.splitlines()[1:]:
if "\tdevice" in line:
return line.split("\t", 1)[0].strip()
except Exception:
return None
return None


def check_model_api(base_url: str, model_name: str, api_key: str = "EMPTY") -> bool:
"""
Check if the model API is accessible and the specified model exists.
Expand Down Expand Up @@ -731,12 +754,20 @@ def main():
if handle_device_commands(args):
return

# Auto-select a healthy ADB device if not explicitly specified.
if device_type == DeviceType.ADB and not args.device_id:
auto_device_id = auto_select_adb_device_id()
if auto_device_id:
args.device_id = auto_device_id
print(f"Auto-selected ADB device: {args.device_id}")

# Run system requirements check before proceeding
if not check_system_requirements(
device_type,
wda_url=args.wda_url
if device_type == DeviceType.IOS
else "http://localhost:8100",
device_id=args.device_id if device_type == DeviceType.ADB else None,
):
sys.exit(1)

Expand Down
84 changes: 72 additions & 12 deletions phone_agent/adb/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,60 @@ def get_current_app(device_id: str | None = None) -> str:
"""
adb_prefix = _get_adb_prefix(device_id)

result = subprocess.run(
adb_prefix + ["shell", "dumpsys", "window"], capture_output=True, text=True, encoding="utf-8"
# Different Android builds expose focus info in different dumpsys subcommands.
focus_commands = [
["shell", "dumpsys", "window", "displays"],
["shell", "dumpsys", "window", "windows"],
["shell", "dumpsys", "window"],
["shell", "dumpsys", "activity", "activities"],
["shell", "dumpsys", "activity", "top"],
]
focus_markers = (
"mCurrentFocus",
"mFocusedApp",
"mResumedActivity",
"topResumedActivity",
)
output = result.stdout
if not output:
raise ValueError("No output from dumpsys window")

# Parse window focus info
for line in output.split("\n"):
if "mCurrentFocus" in line or "mFocusedApp" in line:
for app_name, package in APP_PACKAGES.items():
if package in line:
return app_name
diagnostics: list[str] = []
has_any_output = False

return "System Home"
for cmd in focus_commands:
result = subprocess.run(
adb_prefix + cmd,
capture_output=True,
text=True,
encoding="utf-8",
)
stdout = result.stdout or ""
stderr = (result.stderr or "").strip()

if not stdout.strip():
if result.returncode != 0 or stderr:
diagnostics.append(
f"{' '.join(cmd)} failed (code={result.returncode}): {stderr or 'no stderr'}"
)
continue

has_any_output = True

# Prefer lines that explicitly contain focus markers.
for line in stdout.splitlines():
if any(marker in line for marker in focus_markers):
for app_name, package in APP_PACKAGES.items():
if package in line:
return app_name

# Fallback: match package anywhere in command output.
for app_name, package in APP_PACKAGES.items():
if package in stdout:
return app_name

if has_any_output:
return "System Home"

summary = "; ".join(diagnostics[:3]) if diagnostics else "all dumpsys commands returned empty output"
raise ValueError(f"Unable to get current app: {summary}")


def tap(
Expand Down Expand Up @@ -249,4 +288,25 @@ def _get_adb_prefix(device_id: str | None) -> list:
"""Get ADB command prefix with optional device specifier."""
if device_id:
return ["adb", "-s", device_id]
auto_device_id = _auto_select_device_id()
if auto_device_id:
return ["adb", "-s", auto_device_id]
return ["adb"]


def _auto_select_device_id() -> str | None:
"""Select the first healthy ADB device when device_id is not provided."""
try:
result = subprocess.run(
["adb", "devices"],
capture_output=True,
text=True,
encoding="utf-8",
timeout=5,
)
for line in result.stdout.splitlines()[1:]:
if "\tdevice" in line:
return line.split("\t", 1)[0].strip()
except Exception:
return None
return None
21 changes: 21 additions & 0 deletions phone_agent/adb/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,25 @@ def _get_adb_prefix(device_id: str | None) -> list:
"""Get ADB command prefix with optional device specifier."""
if device_id:
return ["adb", "-s", device_id]
auto_device_id = _auto_select_device_id()
if auto_device_id:
return ["adb", "-s", auto_device_id]
return ["adb"]


def _auto_select_device_id() -> str | None:
"""Select the first healthy ADB device when device_id is not provided."""
try:
result = subprocess.run(
["adb", "devices"],
capture_output=True,
text=True,
encoding="utf-8",
timeout=5,
)
for line in result.stdout.splitlines()[1:]:
if "\tdevice" in line:
return line.split("\t", 1)[0].strip()
except Exception:
return None
return None
35 changes: 33 additions & 2 deletions phone_agent/adb/screenshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,19 @@ def get_screenshot(device_id: str | None = None, timeout: int = 10) -> Screensho
timeout=timeout,
)

# Check for screenshot failure (sensitive screen)
# Check for screenshot failure due to sensitive/secure screens.
# Generic command failures (e.g., transport/device selection issues)
# should not be treated as sensitive screens.
output = result.stdout + result.stderr
if "Status: -1" in output or "Failed" in output:
output_lower = output.lower()
if result.returncode != 0:
return _create_fallback_screenshot(is_sensitive=False)
if (
"status: -1" in output_lower
or "sensitive screen" in output_lower
or "secure screen" in output_lower
or "security" in output_lower
):
return _create_fallback_screenshot(is_sensitive=True)

# Pull screenshot to local temp path
Expand Down Expand Up @@ -89,9 +99,30 @@ def _get_adb_prefix(device_id: str | None) -> list:
"""Get ADB command prefix with optional device specifier."""
if device_id:
return ["adb", "-s", device_id]
auto_device_id = _auto_select_device_id()
if auto_device_id:
return ["adb", "-s", auto_device_id]
return ["adb"]


def _auto_select_device_id() -> str | None:
"""Select the first healthy ADB device when device_id is not provided."""
try:
result = subprocess.run(
["adb", "devices"],
capture_output=True,
text=True,
encoding="utf-8",
timeout=5,
)
for line in result.stdout.splitlines()[1:]:
if "\tdevice" in line:
return line.split("\t", 1)[0].strip()
except Exception:
return None
return None


def _create_fallback_screenshot(is_sensitive: bool) -> Screenshot:
"""Create a black fallback image when screenshot fails."""
default_width, default_height = 1080, 2400
Expand Down