diff --git a/src/anomalib/utils/path.py b/src/anomalib/utils/path.py index c7e3880471..a52bdb201f 100644 --- a/src/anomalib/utils/path.py +++ b/src/anomalib/utils/path.py @@ -32,9 +32,136 @@ across different working directories. """ +import logging +import os import re +import shutil +import sys +from contextlib import suppress from pathlib import Path +logger = logging.getLogger(__name__) + + +def _validate_windows_path(path: Path) -> bool: + """Validate that a path is safe for use in Windows commands. + + Args: + path: Path to validate + + Returns: + True if path is safe, False otherwise + """ + path_str = str(path) + + # Check for shell metacharacters that could be dangerous + dangerous_chars = {"&", "|", ";", "<", ">", "^", '"', "'", "`", "$", "(", ")", "*", "?", "[", "]", "{", "}"} + # Check for command injection patterns + injection_patterns = ["&&", "||", ";", "&", "|"] + + # Perform all validation checks + if ( + any(char in path_str for char in dangerous_chars) + or any(pattern in path_str for pattern in injection_patterns) + or "\x00" in path_str + or len(path_str) > 260 # Windows MAX_PATH + ): + return False + + # Ensure the path exists and is actually a directory (for target) + # or that its parent exists (for tmp) + try: + return path.is_dir() if path.exists() else path.parent.exists() + except (OSError, ValueError): + return False + + +def _is_windows_junction(p: Path) -> bool: + """Return True if path is a directory junction.""" + if not sys.platform.startswith("win"): + return False + + try: + # On Windows, check if it's a directory that's not a symlink + # Junctions appear as directories but resolve to different paths + return p.exists() and p.is_dir() and not p.is_symlink() and p.resolve() != p + except (OSError, RuntimeError): + # Handle cases where path operations fail + return False + + +def _safe_remove_path(p: Path) -> None: + """Remove file/dir/symlink/junction at p without following links.""" + if not os.path.lexists(str(p)): + return + with suppress(FileNotFoundError): + if p.is_symlink(): + p.unlink() + elif _is_windows_junction(p): + # Use rmdir for Windows junctions + p.rmdir() + elif p.is_dir(): + shutil.rmtree(p) + else: + p.unlink() + + +def _make_latest_windows(latest: Path, target: Path) -> None: + # Clean previous latest (symlink/junction/dir/file) + _safe_remove_path(latest) + + tmp = latest.with_name(latest.name + "_tmp") + _safe_remove_path(tmp) + + # Try creating a directory junction using native Python API + try: + # Use Path.symlink_to with target_is_directory=True for directory junction on Windows + # This creates a junction point that doesn't require admin privileges + tmp.symlink_to(target.resolve(), target_is_directory=True) + except (OSError, NotImplementedError): + # Try using Windows mklink command via subprocess + try: + import subprocess + + # Note: Using subprocess with mklink is safe here as we control + # the command and arguments. This is a standard Windows command. + if not _validate_windows_path(tmp) or not _validate_windows_path(target): + logger.warning( + "Warning: Unsafe characters detected in paths. Falling back to text pointer file for 'latest'.", + ) + msg = f"Unsafe path detected: {tmp} -> {target}" + raise ValueError(msg) + result = subprocess.run( # noqa: S603 + [ # noqa: S607 + "cmd", + "/c", + "mklink", + "/J", + str(tmp), + str(target.resolve()), + ], + capture_output=True, + text=True, + check=False, + ) + + if result.returncode == 0 and tmp.exists(): + tmp.replace(latest) + return + except (subprocess.SubprocessError, OSError): + # Subprocess failed, fall through to fallback + pass + else: + # Only reached if symlink creation succeeded + tmp.replace(latest) + return + + # Final fallback: create a text file indicating the latest version + # This preserves the intended behavior without breaking the system + latest.mkdir(exist_ok=True) + version_file = latest / ".version_pointer" + version_file.write_text(str(target.resolve())) + def create_versioned_dir(root_dir: str | Path) -> Path: """Create a new version directory and update the ``latest`` symbolic link. @@ -100,11 +227,16 @@ def create_versioned_dir(root_dir: str | Path) -> Path: # Update the 'latest' symbolic link to point to the new version directory latest_link_path = root_dir / "latest" - if latest_link_path.is_symlink() or latest_link_path.exists(): - latest_link_path.unlink() - latest_link_path.symlink_to(new_version_dir, target_is_directory=True) + if sys.platform.startswith("win"): + _make_latest_windows(latest_link_path, new_version_dir) + else: + if latest_link_path.is_symlink() or latest_link_path.exists(): + latest_link_path.unlink() + latest_link_path.symlink_to(new_version_dir, target_is_directory=True) - return latest_link_path + # Return the versioned directory path, not the latest link + # This ensures training saves to the versioned directory directly + return new_version_dir def convert_to_snake_case(s: str) -> str: