Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 164 additions & 142 deletions libs/deepagents/deepagents/backends/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,119 +23,132 @@
WriteResult,
)

_GLOB_COMMAND_TEMPLATE = """python3 -c "
import glob
import os
import json
import base64

# Decode base64-encoded parameters
path = base64.b64decode('{path_b64}').decode('utf-8')
pattern = base64.b64decode('{pattern_b64}').decode('utf-8')

os.chdir(path)
matches = sorted(glob.glob(pattern, recursive=True))
for m in matches:
stat = os.stat(m)
result = {{
'path': m,
'size': stat.st_size,
'mtime': stat.st_mtime,
'is_dir': os.path.isdir(m)
}}
print(json.dumps(result))
" 2>/dev/null"""

_WRITE_COMMAND_TEMPLATE = """python3 -c "
import os
import sys
import base64

file_path = '{file_path}'

# Check if file already exists (atomic with write)
if os.path.exists(file_path):
print(f'Error: File \\'{file_path}\\' already exists', file=sys.stderr)
sys.exit(1)

# Create parent directory if needed
parent_dir = os.path.dirname(file_path) or '.'
os.makedirs(parent_dir, exist_ok=True)

# Decode and write content
content = base64.b64decode('{content_b64}').decode('utf-8')
with open(file_path, 'w') as f:
f.write(content)
" 2>&1"""

_EDIT_COMMAND_TEMPLATE = """python3 -c "
import sys
import base64

# Read file content
with open('{file_path}', 'r') as f:
text = f.read()

# Decode base64-encoded strings
old = base64.b64decode('{old_b64}').decode('utf-8')
new = base64.b64decode('{new_b64}').decode('utf-8')

# Count occurrences
count = text.count(old)

# Exit with error codes if issues found
if count == 0:
sys.exit(1) # String not found
elif count > 1 and not {replace_all}:
sys.exit(2) # Multiple occurrences without replace_all

# Perform replacement
if {replace_all}:
result = text.replace(old, new)
else:
result = text.replace(old, new, 1)

# Write back to file
with open('{file_path}', 'w') as f:
f.write(result)

print(count)
" 2>&1"""

_READ_COMMAND_TEMPLATE = """python3 -c "
import os
import sys
_GLOB_COMMAND_TEMPLATE = """
# Decode base64 parameters using shell tools only
path=$(echo '{path_b64}' | base64 -d)
pattern=$(echo '{pattern_b64}' | base64 -d)

cd "$path" 2>/dev/null || exit 1

# Translate glob patterns to find expressions
# Handle common glob patterns: **/* for recursive, * for wildcard
case "$pattern" in
**/*)
# Recursive pattern like **/*.py or src/**/*.js
# Extract part after **/ and use -name for basename matching
basename_part="${{pattern##**/}}"
prefix_part="${{pattern%%/**}}"
if [ -z "$prefix_part" ]; then
# Pattern like **/*.py - search everywhere
find . -name "$basename_part" -printf '%P\\t%y\\n' 2>/dev/null
else
# Pattern like src/**/*.py - search under prefix
find "./$prefix_part" -name "$basename_part" -printf '%P\\t%y\\n' 2>/dev/null
fi
;;
*)
# Non-recursive pattern - use -path for exact matching
find . -path "./$pattern" -printf '%P\\t%y\\n' 2>/dev/null
;;
esac | while IFS=$'\\t' read -r fpath ftype; do
# Skip empty lines
[ -z "$fpath" ] && continue

# Determine if directory
is_dir=false
[ "$ftype" = "d" ] && is_dir=true

# Escape special chars for JSON string (backslash and quote)
escaped=$(printf '%s' "$fpath" | sed 's/\\\\/\\\\\\\\/g; s/"/\\\\"/g')

# Output JSON format matching the protocol
printf '{{"path":"%s","is_dir":%s}}\\n' "$escaped" "$is_dir"
done
"""

file_path = '{file_path}'
offset = {offset}
limit = {limit}
_WRITE_COMMAND_TEMPLATE = """
if [ -e {file_path} ]; then
echo "Error: File already exists" >&2
exit 1
fi
parent_dir=$(dirname {file_path})
mkdir -p "$parent_dir" 2>/dev/null
echo '{content_b64}' | base64 -d > {file_path}
"""

# Check if file exists
if not os.path.isfile(file_path):
print('Error: File not found')
sys.exit(1)
_EDIT_COMMAND_TEMPLATE = """
if [ ! -f {file_path} ]; then
exit 3
fi

old=$(echo '{old_b64}' | base64 -d)
new=$(echo '{new_b64}' | base64 -d)

# Use awk for literal string replacement that handles multiline correctly
awk -v old="$old" -v new="$new" -v replace_all="{replace_all_str}" '
BEGIN {{
RS = "^$" # Read entire file as one record
ORS = "" # No extra newline on output
}}
{{
content = $0
count = 0

# Count occurrences
temp = content
while ((pos = index(temp, old)) > 0) {{
count++
temp = substr(temp, pos + length(old))
}}

# Check if file is empty
if os.path.getsize(file_path) == 0:
print('System reminder: File exists but has empty contents')
sys.exit(0)
# Check error conditions
if (count == 0) {{
exit 1 # String not found
}}
if (count > 1 && replace_all == "false") {{
exit 2 # Multiple occurrences without replace_all
}}

# Read file with offset and limit
with open(file_path, 'r') as f:
lines = f.readlines()
# Perform replacement
if (replace_all == "true") {{
# Replace all occurrences
result = ""
remaining = content
while ((pos = index(remaining, old)) > 0) {{
result = result substr(remaining, 1, pos - 1) new
remaining = substr(remaining, pos + length(old))
}}
result = result remaining
}} else {{
# Replace first occurrence only
pos = index(content, old)
result = substr(content, 1, pos - 1) new substr(content, pos + length(old))
}}

# Apply offset and limit
start_idx = offset
end_idx = offset + limit
selected_lines = lines[start_idx:end_idx]
# Write result and output count to stderr (so we can capture it)
print result > {file_path}
print count > "/dev/stderr"
}}
' {file_path} 2>&1 | tail -1
"""

# Format with line numbers (1-indexed, starting from offset + 1)
for i, line in enumerate(selected_lines):
line_num = offset + i + 1
# Remove trailing newline for formatting, then add it back
line_content = line.rstrip('\\n')
print(f'{{line_num:6d}}\\t{{line_content}}')
" 2>&1"""
_READ_COMMAND_TEMPLATE = """
if [ ! -f {file_path} ]; then
echo "Error: File not found"
exit 1
fi
if [ ! -s {file_path} ]; then
echo "System reminder: File exists but has empty contents"
exit 0
fi
# Use awk to add line numbers and handle offset/limit
awk -v offset={offset} -v limit={limit} '
NR > offset && NR <= offset + limit {{
printf "%6d\\t%s\\n", NR, $0
}}
NR > offset + limit {{ exit }}
' {file_path}
"""


class BaseSandbox(SandboxBackendProtocol, ABC):
Expand All @@ -161,38 +174,38 @@ def execute(
...

def ls_info(self, path: str) -> list[FileInfo]:
"""Structured listing with file metadata using os.scandir."""
cmd = f"""python3 -c "
import os
import json

path = '{path}'

try:
with os.scandir(path) as it:
for entry in it:
result = {{
'path': entry.name,
'is_dir': entry.is_dir(follow_symlinks=False)
}}
print(json.dumps(result))
except FileNotFoundError:
pass
except PermissionError:
pass
" 2>/dev/null"""
"""Structured listing with file metadata using shell commands."""
# Escape path for safe shell execution
safe_path = shlex.quote(path)
# Use tab as delimiter (less likely to appear in filenames than pipe)
cmd = f"""
if [ ! -d {safe_path} ]; then
exit 1
fi
for entry in {safe_path}/*; do
if [ -e "$entry" ]; then
name=$(basename "$entry")
if [ -d "$entry" ]; then
printf '%s\\t1\\n' "$name"
else
printf '%s\\t0\\n' "$name"
fi
fi
done
"""

result = self.execute(cmd)

if result.exit_code != 0:
return []

file_infos: list[FileInfo] = []
for line in result.output.strip().split("\n"):
if not line:
continue
try:
data = json.loads(line)
file_infos.append({"path": data["path"], "is_dir": data["is_dir"]})
except json.JSONDecodeError:
continue
parts = line.split("\t")
if len(parts) == 2:
file_infos.append({"path": parts[0], "is_dir": parts[1] == "1"})

return file_infos

Expand All @@ -203,8 +216,10 @@ def read(
limit: int = 2000,
) -> str:
"""Read file content with line numbers using a single shell command."""
# Escape file path for safe shell execution
safe_path = shlex.quote(file_path)
# Use template for reading file with offset and limit
cmd = _READ_COMMAND_TEMPLATE.format(file_path=file_path, offset=offset, limit=limit)
cmd = _READ_COMMAND_TEMPLATE.format(file_path=safe_path, offset=offset, limit=limit)
result = self.execute(cmd)

output = result.output.rstrip()
Expand All @@ -223,9 +238,11 @@ def write(
"""Create a new file. Returns WriteResult; error populated on failure."""
# Encode content as base64 to avoid any escaping issues
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
# Escape file path for safe shell execution
safe_path = shlex.quote(file_path)

# Single atomic check + write command
cmd = _WRITE_COMMAND_TEMPLATE.format(file_path=file_path, content_b64=content_b64)
cmd = _WRITE_COMMAND_TEMPLATE.format(file_path=safe_path, content_b64=content_b64)
result = self.execute(cmd)

# Check for errors (exit code or error message in output)
Expand All @@ -247,9 +264,12 @@ def edit(
# Encode strings as base64 to avoid any escaping issues
old_b64 = base64.b64encode(old_string.encode("utf-8")).decode("ascii")
new_b64 = base64.b64encode(new_string.encode("utf-8")).decode("ascii")
replace_all_str = "true" if replace_all else "false"
# Escape file path for safe shell execution
safe_path = shlex.quote(file_path)

# Use template for string replacement
cmd = _EDIT_COMMAND_TEMPLATE.format(file_path=file_path, old_b64=old_b64, new_b64=new_b64, replace_all=replace_all)
cmd = _EDIT_COMMAND_TEMPLATE.format(file_path=safe_path, old_b64=old_b64, new_b64=new_b64, replace_all_str=replace_all_str)
result = self.execute(cmd)

exit_code = result.exit_code
Expand All @@ -273,20 +293,22 @@ def grep_raw(
glob: str | None = None,
) -> list[GrepMatch] | str:
"""Structured search results or error string for invalid input."""
search_path = shlex.quote(path or ".")
search_path = path or "."

# Build grep command to get structured output
grep_opts = "-rHnF" # recursive, with filename, with line number, fixed-strings (literal)
# Use -E for extended regex to support patterns like test[0-9]+
grep_opts = "-rHnE" # recursive, with filename, with line number, extended regex

# Add glob pattern if specified
# Add glob pattern if specified (escape for safe shell execution)
glob_pattern = ""
if glob:
glob_pattern = f"--include='{glob}'"
glob_pattern = f"--include={shlex.quote(glob)}"

# Escape pattern for shell
pattern_escaped = shlex.quote(pattern)
# Escape pattern and path for safe shell execution
safe_pattern = shlex.quote(pattern)
safe_path = shlex.quote(search_path)

cmd = f"grep {grep_opts} {glob_pattern} -e {pattern_escaped} {search_path} 2>/dev/null || true"
cmd = f"grep {grep_opts} {glob_pattern} -e {safe_pattern} {safe_path} 2>/dev/null || true"
result = self.execute(cmd)

output = result.output.rstrip()
Expand Down