diff --git a/libs/deepagents/deepagents/backends/sandbox.py b/libs/deepagents/deepagents/backends/sandbox.py index 29c4c516..330b31ec 100644 --- a/libs/deepagents/deepagents/backends/sandbox.py +++ b/libs/deepagents/deepagents/backends/sandbox.py @@ -23,119 +23,132 @@ WriteResult, ) -_GLOB_COMMAND_TEMPLATE = """python3 -c " -import glob -import os -import json -import base64 - -# Decode base64-encoded parameters -path = base64.b64decode('{path_b64}').decode('utf-8') -pattern = base64.b64decode('{pattern_b64}').decode('utf-8') - -os.chdir(path) -matches = sorted(glob.glob(pattern, recursive=True)) -for m in matches: - stat = os.stat(m) - result = {{ - 'path': m, - 'size': stat.st_size, - 'mtime': stat.st_mtime, - 'is_dir': os.path.isdir(m) - }} - print(json.dumps(result)) -" 2>/dev/null""" - -_WRITE_COMMAND_TEMPLATE = """python3 -c " -import os -import sys -import base64 - -file_path = '{file_path}' - -# Check if file already exists (atomic with write) -if os.path.exists(file_path): - print(f'Error: File \\'{file_path}\\' already exists', file=sys.stderr) - sys.exit(1) - -# Create parent directory if needed -parent_dir = os.path.dirname(file_path) or '.' -os.makedirs(parent_dir, exist_ok=True) - -# Decode and write content -content = base64.b64decode('{content_b64}').decode('utf-8') -with open(file_path, 'w') as f: - f.write(content) -" 2>&1""" - -_EDIT_COMMAND_TEMPLATE = """python3 -c " -import sys -import base64 - -# Read file content -with open('{file_path}', 'r') as f: - text = f.read() - -# Decode base64-encoded strings -old = base64.b64decode('{old_b64}').decode('utf-8') -new = base64.b64decode('{new_b64}').decode('utf-8') - -# Count occurrences -count = text.count(old) - -# Exit with error codes if issues found -if count == 0: - sys.exit(1) # String not found -elif count > 1 and not {replace_all}: - sys.exit(2) # Multiple occurrences without replace_all - -# Perform replacement -if {replace_all}: - result = text.replace(old, new) -else: - result = text.replace(old, new, 1) - -# Write back to file -with open('{file_path}', 'w') as f: - f.write(result) - -print(count) -" 2>&1""" - -_READ_COMMAND_TEMPLATE = """python3 -c " -import os -import sys +_GLOB_COMMAND_TEMPLATE = """ +# Decode base64 parameters using shell tools only +path=$(echo '{path_b64}' | base64 -d) +pattern=$(echo '{pattern_b64}' | base64 -d) + +cd "$path" 2>/dev/null || exit 1 + +# Translate glob patterns to find expressions +# Handle common glob patterns: **/* for recursive, * for wildcard +case "$pattern" in + **/*) + # Recursive pattern like **/*.py or src/**/*.js + # Extract part after **/ and use -name for basename matching + basename_part="${{pattern##**/}}" + prefix_part="${{pattern%%/**}}" + if [ -z "$prefix_part" ]; then + # Pattern like **/*.py - search everywhere + find . -name "$basename_part" -printf '%P\\t%y\\n' 2>/dev/null + else + # Pattern like src/**/*.py - search under prefix + find "./$prefix_part" -name "$basename_part" -printf '%P\\t%y\\n' 2>/dev/null + fi + ;; + *) + # Non-recursive pattern - use -path for exact matching + find . -path "./$pattern" -printf '%P\\t%y\\n' 2>/dev/null + ;; +esac | while IFS=$'\\t' read -r fpath ftype; do + # Skip empty lines + [ -z "$fpath" ] && continue + + # Determine if directory + is_dir=false + [ "$ftype" = "d" ] && is_dir=true + + # Escape special chars for JSON string (backslash and quote) + escaped=$(printf '%s' "$fpath" | sed 's/\\\\/\\\\\\\\/g; s/"/\\\\"/g') + + # Output JSON format matching the protocol + printf '{{"path":"%s","is_dir":%s}}\\n' "$escaped" "$is_dir" +done +""" -file_path = '{file_path}' -offset = {offset} -limit = {limit} +_WRITE_COMMAND_TEMPLATE = """ +if [ -e {file_path} ]; then + echo "Error: File already exists" >&2 + exit 1 +fi +parent_dir=$(dirname {file_path}) +mkdir -p "$parent_dir" 2>/dev/null +echo '{content_b64}' | base64 -d > {file_path} +""" -# Check if file exists -if not os.path.isfile(file_path): - print('Error: File not found') - sys.exit(1) +_EDIT_COMMAND_TEMPLATE = """ +if [ ! -f {file_path} ]; then + exit 3 +fi + +old=$(echo '{old_b64}' | base64 -d) +new=$(echo '{new_b64}' | base64 -d) + +# Use awk for literal string replacement that handles multiline correctly +awk -v old="$old" -v new="$new" -v replace_all="{replace_all_str}" ' +BEGIN {{ + RS = "^$" # Read entire file as one record + ORS = "" # No extra newline on output +}} +{{ + content = $0 + count = 0 + + # Count occurrences + temp = content + while ((pos = index(temp, old)) > 0) {{ + count++ + temp = substr(temp, pos + length(old)) + }} -# Check if file is empty -if os.path.getsize(file_path) == 0: - print('System reminder: File exists but has empty contents') - sys.exit(0) + # Check error conditions + if (count == 0) {{ + exit 1 # String not found + }} + if (count > 1 && replace_all == "false") {{ + exit 2 # Multiple occurrences without replace_all + }} -# Read file with offset and limit -with open(file_path, 'r') as f: - lines = f.readlines() + # Perform replacement + if (replace_all == "true") {{ + # Replace all occurrences + result = "" + remaining = content + while ((pos = index(remaining, old)) > 0) {{ + result = result substr(remaining, 1, pos - 1) new + remaining = substr(remaining, pos + length(old)) + }} + result = result remaining + }} else {{ + # Replace first occurrence only + pos = index(content, old) + result = substr(content, 1, pos - 1) new substr(content, pos + length(old)) + }} -# Apply offset and limit -start_idx = offset -end_idx = offset + limit -selected_lines = lines[start_idx:end_idx] + # Write result and output count to stderr (so we can capture it) + print result > {file_path} + print count > "/dev/stderr" +}} +' {file_path} 2>&1 | tail -1 +""" -# Format with line numbers (1-indexed, starting from offset + 1) -for i, line in enumerate(selected_lines): - line_num = offset + i + 1 - # Remove trailing newline for formatting, then add it back - line_content = line.rstrip('\\n') - print(f'{{line_num:6d}}\\t{{line_content}}') -" 2>&1""" +_READ_COMMAND_TEMPLATE = """ +if [ ! -f {file_path} ]; then + echo "Error: File not found" + exit 1 +fi +if [ ! -s {file_path} ]; then + echo "System reminder: File exists but has empty contents" + exit 0 +fi +# Use awk to add line numbers and handle offset/limit +awk -v offset={offset} -v limit={limit} ' + NR > offset && NR <= offset + limit {{ + printf "%6d\\t%s\\n", NR, $0 + }} + NR > offset + limit {{ exit }} +' {file_path} +""" class BaseSandbox(SandboxBackendProtocol, ABC): @@ -161,38 +174,38 @@ def execute( ... def ls_info(self, path: str) -> list[FileInfo]: - """Structured listing with file metadata using os.scandir.""" - cmd = f"""python3 -c " -import os -import json - -path = '{path}' - -try: - with os.scandir(path) as it: - for entry in it: - result = {{ - 'path': entry.name, - 'is_dir': entry.is_dir(follow_symlinks=False) - }} - print(json.dumps(result)) -except FileNotFoundError: - pass -except PermissionError: - pass -" 2>/dev/null""" + """Structured listing with file metadata using shell commands.""" + # Escape path for safe shell execution + safe_path = shlex.quote(path) + # Use tab as delimiter (less likely to appear in filenames than pipe) + cmd = f""" +if [ ! -d {safe_path} ]; then + exit 1 +fi +for entry in {safe_path}/*; do + if [ -e "$entry" ]; then + name=$(basename "$entry") + if [ -d "$entry" ]; then + printf '%s\\t1\\n' "$name" + else + printf '%s\\t0\\n' "$name" + fi + fi +done +""" result = self.execute(cmd) + if result.exit_code != 0: + return [] + file_infos: list[FileInfo] = [] for line in result.output.strip().split("\n"): if not line: continue - try: - data = json.loads(line) - file_infos.append({"path": data["path"], "is_dir": data["is_dir"]}) - except json.JSONDecodeError: - continue + parts = line.split("\t") + if len(parts) == 2: + file_infos.append({"path": parts[0], "is_dir": parts[1] == "1"}) return file_infos @@ -203,8 +216,10 @@ def read( limit: int = 2000, ) -> str: """Read file content with line numbers using a single shell command.""" + # Escape file path for safe shell execution + safe_path = shlex.quote(file_path) # Use template for reading file with offset and limit - cmd = _READ_COMMAND_TEMPLATE.format(file_path=file_path, offset=offset, limit=limit) + cmd = _READ_COMMAND_TEMPLATE.format(file_path=safe_path, offset=offset, limit=limit) result = self.execute(cmd) output = result.output.rstrip() @@ -223,9 +238,11 @@ def write( """Create a new file. Returns WriteResult; error populated on failure.""" # Encode content as base64 to avoid any escaping issues content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + # Escape file path for safe shell execution + safe_path = shlex.quote(file_path) # Single atomic check + write command - cmd = _WRITE_COMMAND_TEMPLATE.format(file_path=file_path, content_b64=content_b64) + cmd = _WRITE_COMMAND_TEMPLATE.format(file_path=safe_path, content_b64=content_b64) result = self.execute(cmd) # Check for errors (exit code or error message in output) @@ -247,9 +264,12 @@ def edit( # Encode strings as base64 to avoid any escaping issues old_b64 = base64.b64encode(old_string.encode("utf-8")).decode("ascii") new_b64 = base64.b64encode(new_string.encode("utf-8")).decode("ascii") + replace_all_str = "true" if replace_all else "false" + # Escape file path for safe shell execution + safe_path = shlex.quote(file_path) # Use template for string replacement - cmd = _EDIT_COMMAND_TEMPLATE.format(file_path=file_path, old_b64=old_b64, new_b64=new_b64, replace_all=replace_all) + cmd = _EDIT_COMMAND_TEMPLATE.format(file_path=safe_path, old_b64=old_b64, new_b64=new_b64, replace_all_str=replace_all_str) result = self.execute(cmd) exit_code = result.exit_code @@ -273,20 +293,22 @@ def grep_raw( glob: str | None = None, ) -> list[GrepMatch] | str: """Structured search results or error string for invalid input.""" - search_path = shlex.quote(path or ".") + search_path = path or "." # Build grep command to get structured output - grep_opts = "-rHnF" # recursive, with filename, with line number, fixed-strings (literal) + # Use -E for extended regex to support patterns like test[0-9]+ + grep_opts = "-rHnE" # recursive, with filename, with line number, extended regex - # Add glob pattern if specified + # Add glob pattern if specified (escape for safe shell execution) glob_pattern = "" if glob: - glob_pattern = f"--include='{glob}'" + glob_pattern = f"--include={shlex.quote(glob)}" - # Escape pattern for shell - pattern_escaped = shlex.quote(pattern) + # Escape pattern and path for safe shell execution + safe_pattern = shlex.quote(pattern) + safe_path = shlex.quote(search_path) - cmd = f"grep {grep_opts} {glob_pattern} -e {pattern_escaped} {search_path} 2>/dev/null || true" + cmd = f"grep {grep_opts} {glob_pattern} -e {safe_pattern} {safe_path} 2>/dev/null || true" result = self.execute(cmd) output = result.output.rstrip()