Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion src/nodetool/api/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
)
from huggingface_hub import try_to_load_from_cache
from huggingface_hub.constants import HF_HUB_CACHE
from nodetool.common.settings import (
get_system_file_path,
get_system_data_path,
SETTINGS_FILE,
SECRETS_FILE,
)
from nodetool.api.utils import current_user, flatten_models
from fastapi import APIRouter, Depends, Query
from nodetool.common.huggingface_models import (
Expand Down Expand Up @@ -128,6 +134,43 @@
log.warning(f"Hugging Face cache directory {hf_cache_path} does not exist or is not a directory.")
except Exception as e:
log.error(f"Error determining Hugging Face cache directory: {e}")

# Add Nodetool config and data directories
try:
settings_dir = get_system_file_path(SETTINGS_FILE).parent.resolve()
secrets_dir = get_system_file_path(SECRETS_FILE).parent.resolve()
data_dir = get_system_data_path("").resolve()
logs_dir = get_system_data_path("logs").resolve()
for p in [settings_dir, secrets_dir, data_dir, logs_dir]:
if p.exists() and p.is_dir():
safe_roots.append(p)
except Exception as e:
log.error(f"Error determining Nodetool config/data directories: {e}")

# Add Electron userData and logs directories (best-effort)
try:
if sys.platform == "win32":
appdata = os.getenv("APPDATA")
if appdata:
electron_user = Path(appdata) / "nodetool-electron"
electron_logs = electron_user / "logs"
for p in [electron_user.resolve(), electron_logs.resolve()]:
if p.exists() and p.is_dir():
safe_roots.append(p)
elif sys.platform == "darwin":
electron_user = Path.home() / "Library" / "Application Support" / "nodetool-electron"
electron_logs = Path.home() / "Library" / "Logs" / "nodetool-electron"
for p in [electron_user.resolve(), electron_logs.resolve()]:
if p.exists() and p.is_dir():
safe_roots.append(p)
else:
electron_user = Path.home() / ".config" / "nodetool-electron"
electron_logs = electron_user / "logs"
for p in [electron_user.resolve(), electron_logs.resolve()]:
if p.exists() and p.is_dir():
safe_roots.append(p)
except Exception as e:
log.error(f"Error determining Electron directories: {e}")

return safe_roots

Expand Down Expand Up @@ -341,7 +384,14 @@

path_to_open: str | None = None
try:
requested_path = Path(path).resolve()
# Expand env vars (%APPDATA%, $HOME) and ~
expanded = os.path.expandvars(path)
expanded = os.path.expanduser(expanded)
requested_path = Path(expanded).resolve()

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI 4 months ago

To fully mitigate the risk of path traversal and ensure that only paths within the allowed directories can be opened, we should:

  • Ensure that all entries in safe_roots are resolved to absolute paths before comparison.
  • When checking if the requested path is within a safe root, use the resolved absolute paths for both.
  • Optionally, handle the case where is_relative_to is not available (Python <3.9), but since the code uses it, we assume Python 3.9+.
  • Make these changes in the /open_in_explorer endpoint, specifically in the block where safe_roots are used and compared.

No new imports are needed, but we should add a line to resolve each root directory before the comparison loop.


Suggested changeset 1
src/nodetool/api/model.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/nodetool/api/model.py b/src/nodetool/api/model.py
--- a/src/nodetool/api/model.py
+++ b/src/nodetool/api/model.py
@@ -375,8 +375,10 @@
                   or an error (e.g., {"status": "error", "message": "..."}).
         """
         safe_roots = _get_valid_explorable_roots()
+        # Ensure all safe_roots are resolved to absolute paths
+        resolved_safe_roots = [Path(root).resolve() for root in safe_roots]
 
-        if not safe_roots:
+        if not resolved_safe_roots:
             return {
                 "status": "error",
                 "message": "Cannot open path: No safe directories (like Ollama or Hugging Face cache) could be determined.",
@@ -393,7 +394,7 @@
             if requested_path.is_file():
                 requested_path = requested_path.parent
             is_safe_path = False
-            for root_dir in safe_roots:
+            for root_dir in resolved_safe_roots:
                 if requested_path.is_relative_to(root_dir):
                     is_safe_path = True
                     break
EOF
@@ -375,8 +375,10 @@
or an error (e.g., {"status": "error", "message": "..."}).
"""
safe_roots = _get_valid_explorable_roots()
# Ensure all safe_roots are resolved to absolute paths
resolved_safe_roots = [Path(root).resolve() for root in safe_roots]

if not safe_roots:
if not resolved_safe_roots:
return {
"status": "error",
"message": "Cannot open path: No safe directories (like Ollama or Hugging Face cache) could be determined.",
@@ -393,7 +394,7 @@
if requested_path.is_file():
requested_path = requested_path.parent
is_safe_path = False
for root_dir in safe_roots:
for root_dir in resolved_safe_roots:
if requested_path.is_relative_to(root_dir):
is_safe_path = True
break
Copilot is powered by AI and may make mistakes. Always verify output.

# If a file is passed, open the parent directory for better UX
if requested_path.is_file():

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI 4 months ago

To fix the problem, the code should ensure that all manipulations of the user-provided path are performed before the safety check. Specifically, after expanding environment variables and resolving the path, and after replacing the path with its parent directory if it is a file, the code should then check if the final path is within one of the safe root directories using is_relative_to. This ensures that no matter what the user provides, the path that is ultimately opened is always within an allowed directory. Only after passing this check should the path be used.

Steps to fix:

  • Move the "is file" check and possible parent directory replacement before the safety check.
  • Only perform the is_relative_to check on the final path that will be opened.
  • No additional imports are needed, as all required modules are already imported.

Edit only the relevant region in src/nodetool/api/model.py (lines 393–401).


Suggested changeset 1
src/nodetool/api/model.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/nodetool/api/model.py b/src/nodetool/api/model.py
--- a/src/nodetool/api/model.py
+++ b/src/nodetool/api/model.py
@@ -392,12 +392,13 @@
             # If a file is passed, open the parent directory for better UX
             if requested_path.is_file():
                 requested_path = requested_path.parent
+
             is_safe_path = False
             for root_dir in safe_roots:
                 if requested_path.is_relative_to(root_dir):
                     is_safe_path = True
                     break
-            
+
             if not is_safe_path:
                 log.warning(
                     f"Path traversal attempt: User path {requested_path} is not within any of the configured safe directories: {safe_roots}"
EOF
@@ -392,12 +392,13 @@
# If a file is passed, open the parent directory for better UX
if requested_path.is_file():
requested_path = requested_path.parent

is_safe_path = False
for root_dir in safe_roots:
if requested_path.is_relative_to(root_dir):
is_safe_path = True
break

if not is_safe_path:
log.warning(
f"Path traversal attempt: User path {requested_path} is not within any of the configured safe directories: {safe_roots}"
Copilot is powered by AI and may make mistakes. Always verify output.
requested_path = requested_path.parent
is_safe_path = False
for root_dir in safe_roots:
if requested_path.is_relative_to(root_dir):
Expand All @@ -350,7 +400,7 @@

if not is_safe_path:
log.warning(
f"Path traversal attempt: User path {requested_path} is not within any of the configured safe directories: {safe_roots}"

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (secret)
as clear text.
)
return {
"status": "error",
Expand Down
4 changes: 3 additions & 1 deletion src/nodetool/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from nodetool.metadata.types import Provider
from nodetool.packages.registry import get_nodetool_package_source_folders

from . import asset, job, message, node, storage, workflow, model, settings, thread
from . import asset, job, message, node, storage, workflow, model, settings, thread, system
import mimetypes

from nodetool.common.websocket_updates import websocket_updates
Expand Down Expand Up @@ -103,6 +103,8 @@ def get_routers(cls) -> List[APIRouter]:
DEFAULT_ROUTERS.append(file.router)
DEFAULT_ROUTERS.append(settings.router)
DEFAULT_ROUTERS.append(collection.router)
# System endpoints are only available in non-production
DEFAULT_ROUTERS.append(system.router)
DEFAULT_ROUTERS.append(package.router)


Expand Down
162 changes: 162 additions & 0 deletions src/nodetool/api/system/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from typing import List, Literal, Optional, Any, Dict
from fastapi import APIRouter
from pydantic import BaseModel

from nodetool.common.system_stats import get_system_stats, SystemStats

from .info import get_os_info, get_versions_info, get_paths_info
from .health import run_health_checks


router = APIRouter(prefix="/api/system", tags=["system"])


class OSInfo(BaseModel):
platform: str
release: str
arch: str


class VersionsInfo(BaseModel):
python: Optional[str] = None
nodetool_core: Optional[str] = None
nodetool_base: Optional[str] = None
cuda: Optional[str] = None
gpu_name: Optional[str] = None
vram_total_gb: Optional[str] = None
driver_version: Optional[str] = None


class PathsInfo(BaseModel):
settings_path: str
secrets_path: str
data_dir: str
core_logs_dir: str
core_log_file: str
ollama_models_dir: str
huggingface_cache_dir: str
electron_user_data: str
electron_log_file: str
electron_logs_dir: str
electron_main_log_file: str


class SystemInfoResponse(BaseModel):
os: OSInfo
versions: VersionsInfo
paths: PathsInfo


class HealthCheck(BaseModel):
id: str
status: Literal["ok", "warn", "error"]
details: Optional[str] = None
fix_hint: Optional[str] = None


class HealthSummary(BaseModel):
ok: int
warn: int
error: int


class HealthResponse(BaseModel):
checks: List[HealthCheck]
summary: HealthSummary


_CACHE: dict[str, tuple[float, dict]] = {}
_TTL_SECONDS = 30.0


@router.get("/")
async def get_system_info() -> SystemInfoResponse:
import time

now = time.time()
cached = _CACHE.get("system_info")
if cached and (now - cached[0]) < _TTL_SECONDS:
payload = cached[1]
else:
os_info = get_os_info()
versions = get_versions_info()
paths = get_paths_info()
payload = {
"os": os_info,
"versions": versions,
"paths": paths,
}
_CACHE["system_info"] = (now, payload)

return SystemInfoResponse(
os=OSInfo(**payload["os"]),
versions=VersionsInfo(**payload["versions"]),
paths=PathsInfo(**payload["paths"]),
)


@router.get("/health")
async def get_system_health() -> HealthResponse:
try:
result: Dict[str, Any] = run_health_checks()

# Validate the structure of the result
if not isinstance(result, dict):
raise ValueError("Health check result must be a dictionary")

checks_list: List[Dict[str, Any]] = result.get("checks", []) or []
if not isinstance(checks_list, list):
raise ValueError("Health check 'checks' must be a list")

# Validate each check has required fields
validated_checks = []
for check in checks_list:
if not isinstance(check, dict):
continue # Skip invalid checks
if "id" not in check or "status" not in check:
continue # Skip checks missing required fields
validated_checks.append(check)

checks_models = [HealthCheck(**c) for c in validated_checks]

summary_data = result.get("summary", {}) or {}
if not isinstance(summary_data, dict):
summary_data = {"ok": 0, "warn": 0, "error": 0}

# Ensure summary has required fields with defaults
summary_data.setdefault("ok", 0)
summary_data.setdefault("warn", 0)
summary_data.setdefault("error", 0)

summary = HealthSummary(**summary_data)
return HealthResponse(checks=checks_models, summary=summary)

except Exception as e:
# Return a safe fallback response if health checks fail
return HealthResponse(
checks=[HealthCheck(
id="health_check_error",
status="error",
details=f"Health check system error: {str(e)}",
fix_hint="Check system logs for more details"
)],
summary=HealthSummary(ok=0, warn=0, error=1)
)


@router.get("/stats")
async def get_stats() -> SystemStats:
import time

now = time.time()
cached = _CACHE.get("system_stats")
if cached and (now - cached[0]) < _TTL_SECONDS:
return SystemStats(**cached[1])
else:
stats = get_system_stats()
stats_dict = stats.model_dump()
_CACHE["system_stats"] = (now, stats_dict)
return stats



Loading
Loading