diff --git a/README.md b/README.md
index 3d08b2ff..761ef1f9 100644
--- a/README.md
+++ b/README.md
@@ -76,6 +76,9 @@ You can view the full traces of some samples here: [GAIA Benchmark Traces](https
1. Clone the repository
2. Run the following command
+
+
+
```
chmod +x start.sh
./start.sh
diff --git a/docker/sandbox/Dockerfile b/docker/sandbox/Dockerfile
index 41b94d21..d9997f20 100644
--- a/docker/sandbox/Dockerfile
+++ b/docker/sandbox/Dockerfile
@@ -14,11 +14,14 @@ RUN apt-get update && apt-get install -y \
unzip
COPY src/ii_agent/utils/tool_client /app/ii_client
+COPY ./ /app/
WORKDIR /app
RUN pip install -r ii_client/requirements.txt
+RUN pip install .
+
RUN curl -fsSL https://bun.sh/install | bash
RUN curl -fsSL https://code-server.dev/install.sh | sh
diff --git a/frontend/constants/models.tsx b/frontend/constants/models.tsx
index c7670c23..2f4569b8 100644
--- a/frontend/constants/models.tsx
+++ b/frontend/constants/models.tsx
@@ -136,4 +136,4 @@ export const PROVIDER_MODELS: { [key: string]: IModel[] } = {
provider: "openai",
},
],
-};
+};
\ No newline at end of file
diff --git a/src/ii_agent/agents/function_call.py b/src/ii_agent/agents/function_call.py
index 085512cd..b269d01b 100644
--- a/src/ii_agent/agents/function_call.py
+++ b/src/ii_agent/agents/function_call.py
@@ -38,7 +38,7 @@ class FunctionCallAgent(BaseAgent):
A general agent that can accomplish tasks and answer questions.
If you are faced with a task that involves more than a few steps, or if the task is complex, or if the instructions are very long,
-try breaking down the task into smaller steps. After call this tool to update or create a plan, use write_file or str_replace_tool to update the plan to todo.md
+try breaking down the task into smaller steps. After call this tool to update or create a plan, use Write or Edit tools to update the plan to todo.md
"""
input_schema = {
"type": "object",
diff --git a/src/ii_agent/core/config/llm_config.py b/src/ii_agent/core/config/llm_config.py
index e8777b73..7b1f8c2a 100644
--- a/src/ii_agent/core/config/llm_config.py
+++ b/src/ii_agent/core/config/llm_config.py
@@ -23,7 +23,7 @@ class LLMConfig(BaseModel):
model: str = Field(default=DEFAULT_MODEL)
api_key: SecretStr | None = Field(default=None)
base_url: str | None = Field(default=None)
- max_retries: int = Field(default=3)
+ max_retries: int = Field(default=5)
max_message_chars: int = Field(default=30_000)
temperature: float = Field(default=0.0)
vertex_region: str | None = Field(default=None)
diff --git a/src/ii_agent/llm/__init__.py b/src/ii_agent/llm/__init__.py
index 2bfaafb4..25a8bbbf 100644
--- a/src/ii_agent/llm/__init__.py
+++ b/src/ii_agent/llm/__init__.py
@@ -7,9 +7,7 @@
def get_client(config: LLMConfig) -> LLMClient:
"""Get a client for a given client name."""
if config.api_type == APITypes.ANTHROPIC:
- return AnthropicDirectClient(
- llm_config=config,
- )
+ return AnthropicDirectClient(llm_config=config)
elif config.api_type == APITypes.OPENAI:
return OpenAIDirectClient(llm_config=config)
elif config.api_type == APITypes.GEMINI:
diff --git a/src/ii_agent/llm/gemini.py b/src/ii_agent/llm/gemini.py
index 34ae93a9..c616c985 100644
--- a/src/ii_agent/llm/gemini.py
+++ b/src/ii_agent/llm/gemini.py
@@ -127,7 +127,6 @@ def generate(
config=types.GenerateContentConfig(
tools=tool_params,
system_instruction=system_prompt,
- temperature=temperature,
max_output_tokens=max_tokens,
tool_config={'function_calling_config': {'mode': mode}}
),
@@ -150,8 +149,17 @@ def generate(
raise e
internal_messages = []
- if response.text:
- internal_messages.append(TextResult(text=response.text))
+ text_parts = []
+ if response.candidates and len(response.candidates) > 0:
+ candidate = response.candidates[0]
+ if candidate.content and candidate.content.parts:
+ for part in candidate.content.parts:
+ if hasattr(part, 'text') and part.text:
+ text_parts.append(part.text)
+
+ if text_parts:
+ combined_text = ''.join(text_parts)
+ internal_messages.append(TextResult(text=combined_text))
if response.function_calls:
for fn_call in response.function_calls:
diff --git a/src/ii_agent/llm/openai.py b/src/ii_agent/llm/openai.py
index 124c697b..40fbe8f3 100644
--- a/src/ii_agent/llm/openai.py
+++ b/src/ii_agent/llm/openai.py
@@ -110,9 +110,8 @@ def generate(
role = "user"
elif str(type(internal_message)) == str(TextResult):
internal_message = cast(TextResult, internal_message)
- # For TextResult (assistant), content is handled differently by OpenAI API
- message_content_obj = {"type": "text", "text": internal_message.text}
- openai_message = {"role": "assistant", "content": [message_content_obj]}
+ # For TextResult (assistant), OpenAI expects content as a string for regular messages
+ openai_message = {"role": "assistant", "content": internal_message.text}
openai_messages.append(openai_message)
continue # Move to next message in outer loop
elif str(type(internal_message)) == str(ToolCall):
@@ -163,8 +162,8 @@ def generate(
final_text_for_user_message = f"{system_prompt}\n\n{current_message_text}"
system_prompt_applied = True # Mark as applied
- message_content_obj = {"type": "text", "text": final_text_for_user_message}
- openai_message = {"role": role, "content": [message_content_obj]}
+ # For regular text messages, OpenAI expects content as a string
+ openai_message = {"role": role, "content": final_text_for_user_message}
openai_messages.append(openai_message)
# If cot_model is True and system_prompt was provided but not applied (e.g., no user messages found, though unlikely for an agent)
@@ -173,7 +172,7 @@ def generate(
# Or, one might argue it's an error condition for COT if no user prompt exists.
# For now, let's log a warning and add it as a user message, as some COT models might expect user turn for instructions.
logger.warning("COT mode: System prompt provided but no initial user message to prepend to. Adding as a separate user message.")
- openai_messages.insert(0, {"role": "user", "content": [{"type": "text", "text": system_prompt}]})
+ openai_messages.insert(0, {"role": "user", "content": system_prompt})
# Turn tool_choice into OpenAI tool_choice format
if tool_choice is None:
@@ -234,8 +233,7 @@ def generate(
raise e
else:
print(f"Retrying OpenAI request: {retry + 1}/{self.max_retries}")
- # Sleep 8-12 seconds with jitter to avoid thundering herd.
- time.sleep(10 * random.uniform(0.8, 1.2))
+ time.sleep(30)
# Convert messages back to internal format
internal_messages = []
@@ -248,10 +246,10 @@ def generate(
content = openai_response_message.content
# Exactly one of tool_calls or content should be present
- if tool_calls and content:
- raise ValueError("Only one of tool_calls or content should be present")
- elif not tool_calls and not content:
- raise ValueError("Either tool_calls or content should be present")
+ #if tool_calls and content:
+ # raise ValueError("Only one of tool_calls or content should be present")
+ #elif not tool_calls and not content:
+ # raise ValueError("Either tool_calls or content should be present")
if tool_calls:
available_tool_names = {t.name for t in tools} # Get set of known tool names
@@ -297,10 +295,13 @@ def generate(
if not processed_tool_call:
logger.warning("No valid and available tool calls found after filtering.")
- elif content:
- internal_messages.append(TextResult(text=content))
+ elif content is not None:
+ # Handle empty content (including empty strings) from models that may return reasoning without content
+ internal_messages.append(TextResult(text=content if content else ""))
else:
- raise ValueError(f"Unknown message type: {openai_response_message}")
+ # Only raise error if we truly have no content and no tool calls
+ logger.warning(f"Response has no content or tool_calls: {openai_response_message}")
+ internal_messages.append(TextResult(text=""))
assert response.usage is not None
message_metadata = {
diff --git a/src/ii_agent/migrations/env.py b/src/ii_agent/migrations/env.py
index 3e3a1bd0..bf44eed5 100644
--- a/src/ii_agent/migrations/env.py
+++ b/src/ii_agent/migrations/env.py
@@ -81,4 +81,5 @@ def run_migrations_online() -> None:
if context.is_offline_mode():
run_migrations_offline()
else:
- run_migrations_online()
+ pass
+ #run_migrations_online()
diff --git a/src/ii_agent/prompts/system_prompt.py b/src/ii_agent/prompts/system_prompt.py
index d73b09f0..6fa277f8 100644
--- a/src/ii_agent/prompts/system_prompt.py
+++ b/src/ii_agent/prompts/system_prompt.py
@@ -157,12 +157,9 @@ def get_system_prompt(workspace_mode: WorkSpaceMode):
-- Create todo.md file as checklist based on task planning from planner module
-- Task planning takes precedence over todo.md, while todo.md contains more details
-- Update markers in todo.md via text replacement tool immediately after completing each item
-- Rebuild todo.md when task planning changes significantly
-- Must use todo.md to record and update progress for information gathering tasks
-- When all planned steps are complete, verify todo.md completion and remove skipped items
+- Use TodoRead and TodoWrite tools to manage your todo list
+- Frequently update the todo list to track the progress
+- When all planned steps are complete, verify todo list completion and remove skipped items
@@ -351,7 +348,7 @@ def get_system_prompt(workspace_mode: WorkSpaceMode):
- Events may originate from other system modules; only use explicitly provided tools
-Today is {datetime.now().strftime("%Y-%m-%d")}. The first step of a task is to use `message_user` tool to plan details of the task. Then regularly update the todo.md file to track the progress.
+Today is {datetime.now().strftime("%Y-%m-%d")}. The first step of a task is to use `message_user` tool to plan details of the task. Then regularly update the todo list to track the progress.
"""
diff --git a/src/ii_agent/tools/clients/filesystem_client.py b/src/ii_agent/tools/clients/filesystem_client.py
new file mode 100644
index 00000000..fe5a8766
--- /dev/null
+++ b/src/ii_agent/tools/clients/filesystem_client.py
@@ -0,0 +1,226 @@
+"""Client for file system operations that can work locally or remotely."""
+
+import logging
+from abc import ABC, abstractmethod
+from typing import Optional, Any, List, Dict
+import httpx
+
+from ii_agent.core.config.client_config import ClientConfig
+from ii_agent.core.storage.models.settings import Settings
+from ii_agent.utils.constants import WorkSpaceMode
+from ii_agent.utils.tool_client.manager import FileSystemResponse, FileSystemManager
+
+logger = logging.getLogger(__name__)
+
+
+class FileSystemClientBase(ABC):
+ """Abstract base class for file system clients."""
+
+ @abstractmethod
+ def read_file(
+ self, file_path: str, limit: Optional[int] = None, offset: Optional[int] = None
+ ) -> FileSystemResponse:
+ """Read file contents."""
+ pass
+
+ @abstractmethod
+ def edit_file(
+ self, file_path: str, old_string: str, new_string: str, replace_all: bool = False
+ ) -> FileSystemResponse:
+ """Edit file contents."""
+ pass
+
+ @abstractmethod
+ def write_file(self, file_path: str, content: str) -> FileSystemResponse:
+ """Write file contents."""
+ pass
+
+ @abstractmethod
+ def multi_edit(self, file_path: str, edits: List[Dict[str, Any]]) -> FileSystemResponse:
+ """Perform multiple edits on a file."""
+ pass
+
+ @abstractmethod
+ def ls(self, path: str, ignore: Optional[List[str]] = None) -> FileSystemResponse:
+ """List directory contents."""
+ pass
+
+ @abstractmethod
+ def glob(self, pattern: str, path: Optional[str] = None) -> FileSystemResponse:
+ """Search for files using glob patterns."""
+ pass
+
+ @abstractmethod
+ def grep(
+ self, pattern: str, path: Optional[str] = None, include: Optional[str] = None
+ ) -> FileSystemResponse:
+ """Search for content using regex patterns."""
+ pass
+
+
+class LocalFileSystemClient(FileSystemClientBase):
+ """Local implementation using FileSystemManager directly."""
+
+ def __init__(self, config: ClientConfig):
+ self.config = config
+ workspace_path = config.cwd or "/workspace"
+ self.manager = FileSystemManager(workspace_path=workspace_path)
+
+ def read_file(
+ self, file_path: str, limit: Optional[int] = None, offset: Optional[int] = None
+ ) -> FileSystemResponse:
+ return self.manager.read_file(file_path, limit, offset)
+
+ def edit_file(
+ self, file_path: str, old_string: str, new_string: str, replace_all: bool = False
+ ) -> FileSystemResponse:
+ return self.manager.edit_file(file_path, old_string, new_string, replace_all)
+
+ def write_file(self, file_path: str, content: str) -> FileSystemResponse:
+ return self.manager.write_file(file_path, content)
+
+ def multi_edit(self, file_path: str, edits: List[Dict[str, Any]]) -> FileSystemResponse:
+ return self.manager.multi_edit(file_path, edits)
+
+ def ls(self, path: str, ignore: Optional[List[str]] = None) -> FileSystemResponse:
+ return self.manager.ls(path, ignore)
+
+ def glob(self, pattern: str, path: Optional[str] = None) -> FileSystemResponse:
+ return self.manager.glob(pattern, path)
+
+ def grep(
+ self, pattern: str, path: Optional[str] = None, include: Optional[str] = None
+ ) -> FileSystemResponse:
+ return self.manager.grep(pattern, path, include)
+
+
+class RemoteFileSystemClient(FileSystemClientBase):
+ """Remote implementation using HTTP API calls."""
+
+ def __init__(self, config: ClientConfig):
+ self.config = config
+ if not config.server_url:
+ raise ValueError("server_url is required for remote mode")
+ self.server_url = config.server_url.rstrip("/")
+ self.timeout = config.timeout
+
+ def _make_request(self, endpoint: str, data: Dict[str, Any]) -> FileSystemResponse:
+ """Make an HTTP request to the remote server."""
+ try:
+ with httpx.Client(timeout=self.timeout) as client:
+ response = client.post(
+ f"{self.server_url}/api/filesystem/{endpoint}",
+ json=data,
+ headers={"Content-Type": "application/json"},
+ )
+ response.raise_for_status()
+ result = response.json()
+ return FileSystemResponse(
+ success=result.get("success", False),
+ file_content=result.get("file_content", ""),
+ )
+ except httpx.RequestError as e:
+ logger.error(f"Request error for {endpoint}: {e}")
+ return FileSystemResponse(
+ success=False, file_content=f"Request error: {str(e)}"
+ )
+ except httpx.HTTPStatusError as e:
+ logger.error(f"HTTP error for {endpoint}: {e}")
+ return FileSystemResponse(
+ success=False,
+ file_content=f"HTTP error {e.response.status_code}: {e.response.text}",
+ )
+ except Exception as e:
+ logger.error(f"Unexpected error for {endpoint}: {e}")
+ return FileSystemResponse(
+ success=False, file_content=f"Unexpected error: {str(e)}"
+ )
+
+ def read_file(
+ self, file_path: str, limit: Optional[int] = None, offset: Optional[int] = None
+ ) -> FileSystemResponse:
+ return self._make_request(
+ "read_file",
+ {"file_path": file_path, "limit": limit, "offset": offset},
+ )
+
+ def edit_file(
+ self, file_path: str, old_string: str, new_string: str, replace_all: bool = False
+ ) -> FileSystemResponse:
+ return self._make_request(
+ "edit_file",
+ {
+ "file_path": file_path,
+ "old_string": old_string,
+ "new_string": new_string,
+ "replace_all": replace_all,
+ },
+ )
+
+ def write_file(self, file_path: str, content: str) -> FileSystemResponse:
+ return self._make_request(
+ "write_file", {"file_path": file_path, "content": content}
+ )
+
+ def multi_edit(self, file_path: str, edits: List[Dict[str, Any]]) -> FileSystemResponse:
+ return self._make_request(
+ "multi_edit", {"file_path": file_path, "edits": edits}
+ )
+
+ def ls(self, path: str, ignore: Optional[List[str]] = None) -> FileSystemResponse:
+ return self._make_request("ls", {"path": path, "ignore": ignore})
+
+ def glob(self, pattern: str, path: Optional[str] = None) -> FileSystemResponse:
+ return self._make_request("glob", {"pattern": pattern, "path": path})
+
+ def grep(
+ self, pattern: str, path: Optional[str] = None, include: Optional[str] = None
+ ) -> FileSystemResponse:
+ return self._make_request(
+ "grep", {"pattern": pattern, "path": path, "include": include}
+ )
+
+
+class FileSystemClient:
+ """Factory class for creating the appropriate client based on configuration."""
+
+ def __init__(self, settings: Settings):
+ self.config = settings.client_config
+ if settings.sandbox_config.mode == WorkSpaceMode.LOCAL:
+ self._client = LocalFileSystemClient(self.config)
+ elif (
+ settings.sandbox_config.mode == WorkSpaceMode.DOCKER
+ or settings.sandbox_config.mode == WorkSpaceMode.E2B
+ ):
+ self._client = RemoteFileSystemClient(self.config)
+ else:
+ raise ValueError(
+ f"Unsupported mode: {settings.sandbox_config.mode}. Must be 'local', 'docker', or 'e2b'"
+ )
+
+ def read_file(
+ self, file_path: str, limit: Optional[int] = None, offset: Optional[int] = None
+ ) -> FileSystemResponse:
+ return self._client.read_file(file_path, limit, offset)
+
+ def edit_file(
+ self, file_path: str, old_string: str, new_string: str, replace_all: bool = False
+ ) -> FileSystemResponse:
+ return self._client.edit_file(file_path, old_string, new_string, replace_all)
+
+ def write_file(self, file_path: str, content: str) -> FileSystemResponse:
+ return self._client.write_file(file_path, content)
+
+ def multi_edit(self, file_path: str, edits: List[Dict[str, Any]]) -> FileSystemResponse:
+ return self._client.multi_edit(file_path, edits)
+
+ def ls(self, path: str, ignore: Optional[List[str]] = None) -> FileSystemResponse:
+ return self._client.ls(path, ignore)
+
+ def glob(self, pattern: str, path: Optional[str] = None) -> FileSystemResponse:
+ return self._client.glob(pattern, path)
+
+ def grep(
+ self, pattern: str, path: Optional[str] = None, include: Optional[str] = None
+ ) -> FileSystemResponse:
+ return self._client.grep(pattern, path, include)
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/__init__.py b/src/ii_agent/tools/file_system_tools/__init__.py
new file mode 100644
index 00000000..31a4deab
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/__init__.py
@@ -0,0 +1,23 @@
+"""Internal file system tools for ii-agent."""
+
+from .file_read_tool import FileReadTool
+from .file_edit_tool import FileEditTool
+from .file_write_tool import FileWriteTool
+from .multi_edit_tool import MultiEditTool
+from .ls_tool import LSTool
+from .glob_tool import GlobTool
+from .grep_tool import GrepTool
+from .base import BaseFileSystemTool, FileSystemValidationError
+
+__all__ = [
+ 'FileReadTool',
+ 'FileEditTool',
+ 'FileWriteTool',
+ 'MultiEditTool',
+ 'LSTool',
+ 'GlobTool',
+ 'GrepTool',
+ 'BaseFileSystemTool',
+ 'FileSystemValidationError',
+ 'WorkspaceManager',
+]
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/base.py b/src/ii_agent/tools/file_system_tools/base.py
new file mode 100644
index 00000000..8c4d0b32
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/base.py
@@ -0,0 +1,70 @@
+import os
+
+
+class WorkspaceManager:
+ """Simple workspace manager for file system tools."""
+
+ def __init__(self, workspace_path: str):
+ self.workspace_path = os.path.abspath(workspace_path)
+
+ def get_workspace_path(self) -> str:
+ return self.workspace_path
+
+ def validate_boundary(self, path: str) -> bool:
+ """Check if path is within workspace boundary."""
+ abs_path = os.path.abspath(path)
+ return abs_path.startswith(self.workspace_path)
+
+
+class FileSystemValidationError(Exception):
+ """Custom exception for file system validation errors."""
+ pass
+
+
+class BaseFileSystemTool:
+ def __init__(self, workspace_manager: WorkspaceManager):
+ self.workspace_manager = workspace_manager
+
+ def validate_path(self, path: str) -> None:
+ """Validate that path is absolute and within workspace boundary.
+
+ Raises:
+ FileSystemValidationError
+ """
+ if not path.strip():
+ raise FileSystemValidationError("Path cannot be empty")
+
+ if not os.path.isabs(path):
+ raise FileSystemValidationError(f"Path `{path}` is not absolute")
+
+ workspace_path = self.workspace_manager.get_workspace_path()
+ if not self.workspace_manager.validate_boundary(path):
+ raise FileSystemValidationError(f"Path `{path}` is not within workspace boundary `{workspace_path}`")
+
+ def validate_existing_file_path(self, file_path: str) -> None:
+ """Validate that file_path exists and is a file.
+
+ Raises:
+ FileSystemValidationError
+ """
+ self.validate_path(file_path)
+
+ if not os.path.exists(file_path):
+ raise FileSystemValidationError(f"File `{file_path}` does not exist")
+
+ if not os.path.isfile(file_path):
+ raise FileSystemValidationError(f"Path `{file_path}` exists but is not a file")
+
+ def validate_existing_directory_path(self, directory_path: str) -> None:
+ """Validate that directory_path exists and is a directory.
+
+ Raises:
+ FileSystemValidationError
+ """
+ self.validate_path(directory_path)
+
+ if not os.path.exists(directory_path):
+ raise FileSystemValidationError(f"Directory `{directory_path}` does not exist")
+
+ if not os.path.isdir(directory_path):
+ raise FileSystemValidationError(f"Path `{directory_path}` exists but is not a directory")
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/file_edit_tool.py b/src/ii_agent/tools/file_system_tools/file_edit_tool.py
new file mode 100644
index 00000000..b138436a
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/file_edit_tool.py
@@ -0,0 +1,83 @@
+"""File editing tool for making targeted edits to files."""
+
+from pathlib import Path
+from typing import Annotated
+from pydantic import Field
+from .base import BaseFileSystemTool, FileSystemValidationError
+from ii_agent.utils.file_system_workspace import FileSystemWorkspace
+
+DESCRIPTION = """Performs exact string replacements in files.
+
+Usage:
+- You must use your `Read` tool at least once in the conversation before editing. This tool will error if you attempt an edit without reading the file.
+- When editing text from Read tool output, ensure you preserve the exact indentation (tabs/spaces) as it appears AFTER the line number prefix. The line number prefix format is: spaces + line number + tab. Everything after that tab is the actual file content to match. Never include any part of the line number prefix in the old_string or new_string.
+- ALWAYS prefer editing existing files in the codebase. NEVER write new files unless explicitly required.
+- Only use emojis if the user explicitly requests it. Avoid adding emojis to files unless asked.
+- The edit will FAIL if `old_string` is not unique in the file. Either provide a larger string with more surrounding context to make it unique or use `replace_all` to change every instance of `old_string`.
+- Use `replace_all` for replacing and renaming strings across the file. This parameter is useful if you want to rename a variable for instance."""
+
+
+class FileEditToolError(Exception):
+ """Custom exception for file edit tool errors."""
+ pass
+
+def perform_replacement(content: str, old_string: str, new_string: str, replace_all: bool) -> tuple[str, int]:
+ """Perform string replacement. Returns (new_content, occurrences)."""
+
+ # Count occurrences
+ occurrences = content.count(old_string)
+
+ if occurrences == 0:
+ raise FileEditToolError("String to replace not found in file. Make sure the old_string exactly matches the file content, including whitespace and indentation")
+
+ # For single replacement, ensure uniqueness
+ if not replace_all and occurrences > 1:
+ raise FileEditToolError(f"Found {occurrences} occurrences of old_string. Either provide more context to make it unique, or use replace_all=True to replace all occurrences")
+
+ new_content = content.replace(old_string, new_string)
+
+ return new_content, occurrences
+
+
+class FileEditTool(BaseFileSystemTool):
+ """Tool for making targeted string replacements in files."""
+
+ name = "Edit"
+ description = DESCRIPTION
+
+ def __init__(self, workspace_manager: FileSystemWorkspace):
+ super().__init__(workspace_manager)
+
+ def run_impl(
+ self,
+ file_path: Annotated[str, Field(description="The absolute path to the file to modify")],
+ old_string: Annotated[str, Field(description="The text to replace")],
+ new_string: Annotated[str, Field(description="The text to replace it with (must be different from old_string)")],
+ replace_all: Annotated[bool, Field(description="Replace all occurences of old_string (default false)", default=False)],
+ ) -> str:
+ """Execute the file edit operation."""
+
+ # Validate parameters
+ if old_string == new_string:
+ return "ERROR: old_string and new_string cannot be the same"
+
+ # Validate file path
+ try:
+ self.validate_existing_file_path(file_path)
+
+ path = Path(file_path).resolve()
+
+ # Read current file content
+ current_content = path.read_text(encoding='utf-8')
+
+ # Perform the replacement
+ new_content, occurrences = perform_replacement(
+ current_content, old_string, new_string, replace_all
+ )
+ # Write the new content
+ path.write_text(new_content, encoding='utf-8')
+
+ return f"Modified file `{path}` - made {occurrences} replacement(s). Review the changes and make sure they are as expected. Edit the file again if necessary."
+
+ except (FileSystemValidationError, FileEditToolError) as e:
+ return f"ERROR: {e}"
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/file_read_tool.py b/src/ii_agent/tools/file_system_tools/file_read_tool.py
new file mode 100644
index 00000000..a0c636e1
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/file_read_tool.py
@@ -0,0 +1,215 @@
+"""File reading tool for reading file contents."""
+
+import mimetypes
+import pymupdf
+
+from pathlib import Path
+from typing import Annotated, Optional
+from pydantic import Field
+from .base import BaseFileSystemTool, FileSystemValidationError
+from ii_agent.utils.file_system_workspace import FileSystemWorkspace
+from .utils import encode_image
+
+
+MAX_FILE_READ_LINES = 2000
+MAX_LINE_LENGTH = 2000
+DESCRIPTION = f"""Reads and returns the content of a specified file from the local filesystem. Handles text files, images (PNG, JPG, GIF, WEBP, SVG, BMP), and PDF files.
+
+Usage:
+- The file_path parameter must be an absolute path, not a relative path
+- For text files and PDFs: reads up to {MAX_FILE_READ_LINES} lines by default with optional offset/limit parameters
+- For images: returns base64-encoded content with MIME type information
+- For PDFs: extracts and returns readable text content (falls back to base64 if text extraction fails)
+- Any lines longer than {MAX_LINE_LENGTH} characters will be truncated
+- Results are returned using cat -n format, with line numbers starting at 1
+- If you read a file that exists but has empty contents you will receive a system reminder warning in place of file contents."""
+
+
+
+def _is_binary_file(file_path: Path) -> bool:
+ """Determine if a file is binary by checking its content."""
+ try:
+ with open(file_path, 'rb') as f:
+ chunk = f.read(4096) # Read first 4KB
+ if not chunk:
+ return False # Empty file is not binary
+
+ # Check for null bytes (strong binary indicator)
+ if b'\x00' in chunk:
+ return True
+
+ # Count non-printable characters
+ non_printable = sum(1 for byte in chunk
+ if byte < 9 or (13 < byte < 32))
+
+ # If >30% non-printable characters, consider it binary
+ return non_printable / len(chunk) > 0.3
+ except (OSError, IOError):
+ return False
+
+def _detect_file_type(file_path: Path) -> str:
+ """Detect the type of file based on extension and MIME type."""
+ suffix = file_path.suffix.lower()
+ mime_type, _ = mimetypes.guess_type(str(file_path))
+
+ # Check for images
+ image_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.svg', '.bmp'}
+ if suffix in image_extensions or (mime_type and mime_type.startswith('image/')):
+ return 'image'
+
+ # Check for PDF
+ if suffix == '.pdf' or mime_type == 'application/pdf':
+ return 'pdf'
+
+ # Check for known binary extensions
+ binary_extensions = {
+ '.zip', '.tar', '.gz', '.exe', '.dll', '.so', '.class', '.jar', '.war',
+ '.7z', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.odt', '.ods',
+ '.odp', '.bin', '.dat', '.obj', '.o', '.a', '.lib', '.wasm', '.pyc', '.pyo'
+ }
+ if suffix in binary_extensions:
+ return 'binary'
+
+ # Check if file is binary by content
+ if _is_binary_file(file_path):
+ return 'binary'
+
+ return 'text'
+
+def _read_pdf_file(path: Path):
+ """Read a PDF file and return the content."""
+ doc = pymupdf.open(path)
+ text = ""
+ for page_num in range(len(doc)):
+ page = doc.load_page(page_num)
+ text += page.get_text("text")
+ doc.close()
+
+ if text == "":
+ return "[PDF file is empty or no readable text could be extracted]"
+
+ return text
+
+def _read_image_file(path: Path):
+ """Read an image and return base64 encoded content."""
+
+ # Get MIME type
+ mime_type, _ = mimetypes.guess_type(str(path))
+ if not mime_type:
+ mime_type = "image/png" # Default to PNG if type cannot be determined
+
+ # Encode to base64
+ base64_image = encode_image(str(path))
+
+ return {
+ "type": "base64",
+ "media_type": mime_type,
+ "data": base64_image,
+ }
+
+def _truncate_text_content(content: str, offset: Optional[int] = None, limit: Optional[int] = None):
+ """Truncate text content with optional line range."""
+ lines = content.split('\n')
+
+ # Remove trailing newlines from each line for processing
+ lines = [line.rstrip('\n\r') for line in lines]
+ original_line_count = len(lines)
+
+ # Handle empty file
+ if original_line_count == 0:
+ return '[Empty file]'
+
+ # Apply offset and limit
+ start_line = offset - 1 if offset is not None else 0 # offset starts at 1, need to subtract 1
+ effective_limit = limit if limit is not None else MAX_FILE_READ_LINES
+ end_line = min(start_line + effective_limit, original_line_count)
+
+ # Ensure we don't go beyond array bounds
+ actual_start = min(start_line, original_line_count)
+ selected_lines = lines[actual_start:end_line]
+
+ # Truncate long lines and format with line numbers
+ lines_truncated_in_length = False
+ formatted_lines = []
+
+ for i, line in enumerate(selected_lines):
+ line_number = actual_start + i + 1 # 1-based line numbers
+
+ if len(line) > MAX_LINE_LENGTH:
+ lines_truncated_in_length = True
+ line = line[:MAX_LINE_LENGTH] + '... [truncated]'
+
+ formatted_lines.append(f"{line_number:6d}\t{line}")
+
+ # Check if content was truncated
+ content_range_truncated = end_line < original_line_count
+
+ # Build content with headers if truncated
+ content_parts = []
+ if content_range_truncated:
+ content_parts.append(
+ f"[File content truncated: showing lines {actual_start + 1}-{end_line} "
+ f"of {original_line_count} total lines. Use offset/limit parameters to view more.]"
+ )
+ elif lines_truncated_in_length:
+ content_parts.append(
+ f"[File content partially truncated: some lines exceeded maximum "
+ f"length of {MAX_LINE_LENGTH} characters.]"
+ )
+
+ content_parts.extend(formatted_lines)
+ truncated_content = '\n'.join(content_parts)
+
+ return truncated_content
+
+
+class FileReadTool(BaseFileSystemTool):
+ """Tool for reading file contents with optional line range specification."""
+
+ name = "Read"
+ description = DESCRIPTION
+
+ def __init__(self, workspace_manager: FileSystemWorkspace):
+ super().__init__(workspace_manager)
+
+ def run_impl(
+ self,
+ file_path: Annotated[str, Field(description="The absolute path to the file to read")],
+ limit: Annotated[Optional[int], Field(description="The number of lines to read. Only provide if the file is too large to read at once.")] = None,
+ offset: Annotated[Optional[int], Field(description="The line number to start reading from. Only provide if the file is too large to read at once")] = None,
+ ):
+ """Implementation of the file reading functionality."""
+
+ # Validate parameters
+ if offset is not None and offset < 0:
+ return "ERROR: Offset must be a non-negative number"
+
+ if limit is not None and limit <= 0:
+ return "ERROR: Limit must be a positive number"
+
+ try:
+ self.validate_existing_file_path(file_path)
+
+ path = Path(file_path).resolve()
+
+ # Detect file type
+ file_type = _detect_file_type(path)
+ if file_type == 'binary':
+ return f"ERROR: Cannot display content of binary file: {path}"
+
+ elif file_type == 'text':
+ full_content = path.read_text(encoding='utf-8')
+ return _truncate_text_content(full_content, offset, limit)
+
+ elif file_type == 'pdf':
+ full_content = _read_pdf_file(path)
+ return _truncate_text_content(full_content, offset, limit)
+
+ elif file_type == 'image':
+ return _read_image_file(path)
+
+ else:
+ return f"ERROR: Unsupported file type: {file_type}"
+
+ except (FileSystemValidationError) as e:
+ return f"ERROR: {e}"
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/file_write_tool.py b/src/ii_agent/tools/file_system_tools/file_write_tool.py
new file mode 100644
index 00000000..ecd0baf4
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/file_write_tool.py
@@ -0,0 +1,61 @@
+"""File writing tool for creating and overwriting files."""
+
+from pathlib import Path
+from typing import Annotated
+from pydantic import Field
+from .base import BaseFileSystemTool, FileSystemValidationError
+from ii_agent.utils.workspace_manager import WorkspaceManager
+
+
+DESCRIPTION = """Writes a file to the local filesystem.
+
+Usage:
+- This tool will overwrite the existing file if there is one at the provided path.
+- If this is an existing file, you MUST use the Read tool first to read the file's contents. This tool will fail if you did not read the file first.
+- ALWAYS prefer editing existing files in the codebase. NEVER write new files unless explicitly required.
+- NEVER proactively create documentation files (*.md) or README files. Only create documentation files if explicitly requested by the User.
+- Only use emojis if the user explicitly requests it. Avoid writing emojis to files unless asked."""
+
+
+class FileWriteTool(BaseFileSystemTool):
+ """Tool for writing content to files."""
+
+ name = "Write"
+ description = DESCRIPTION
+
+ def __init__(self, workspace_manager: WorkspaceManager):
+ super().__init__(workspace_manager)
+
+ def run_impl(
+ self,
+ file_path: Annotated[str, Field(description="The absolute path to the file to write")],
+ content: Annotated[str, Field(description="The content to write to the file")],
+ ) -> str:
+ """Execute the file write operation."""
+
+ try:
+ self.validate_path(file_path)
+
+ path = Path(file_path).resolve()
+
+ # Check if path exists and is a directory
+ if path.exists() and path.is_dir():
+ return f"ERROR: Path is a directory, not a file: {file_path}"
+
+ # Create parent directories if they don't exist
+ path.parent.mkdir(parents=True, exist_ok=True)
+
+ # Determine if this is a new file or overwriting existing
+ is_new_file = not path.exists()
+
+ # Write content to file
+ path.write_text(content, encoding='utf-8')
+
+ # Return success message
+ if is_new_file:
+ return f"Successfully created and wrote to new file: {file_path}"
+ else:
+ return f"Successfully overwrote file: {file_path}"
+
+ except (FileSystemValidationError) as e:
+ return f"ERROR: {e}"
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/glob_tool.py b/src/ii_agent/tools/file_system_tools/glob_tool.py
new file mode 100644
index 00000000..558f799c
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/glob_tool.py
@@ -0,0 +1,73 @@
+"""File pattern matching tool using glob patterns."""
+
+from pathlib import Path
+from typing import Annotated, Optional
+from pydantic import Field
+from .base import BaseFileSystemTool, FileSystemValidationError
+from ii_agent.utils.file_system_workspace import FileSystemWorkspace
+
+DESCRIPTION = """- Fast file pattern matching tool that works with any codebase size
+- Supports glob patterns like "**/*.js" or "src/**/*.ts"
+- Returns matching file paths sorted by modification time
+- Use this tool when you need to find files by name patterns
+- When you are doing an open ended search that may require multiple rounds of globbing and grepping, use the Agent tool instead
+- You have the capability to call multiple tools in a single response. It is always better to speculatively perform multiple searches as a batch that are potentially useful."""
+MAX_GLOB_RESULTS = 100
+
+
+class GlobTool(BaseFileSystemTool):
+ """Tool for finding files using glob patterns."""
+
+ name = "Glob"
+ description = DESCRIPTION
+
+ def __init__(self, workspace_manager: FileSystemWorkspace):
+ super().__init__(workspace_manager)
+
+ def run_impl(
+ self,
+ pattern: Annotated[str, Field(description="The glob pattern to match files against")],
+ path: Annotated[Optional[str], Field(description="The directory to search in. If not specified, the current working directory will be used. IMPORTANT: Omit this field to use the default directory. DO NOT enter \"undefined\" or \"null\" - simply omit it for the default behavior. Must be a valid directory path if provided.")] = None,
+ ) -> str:
+ """Execute the glob pattern matching operation."""
+
+ try:
+ if path is None:
+ search_dir = self.workspace_manager.get_workspace_path()
+ else:
+ self.validate_existing_directory_path(path)
+ search_dir = Path(path).resolve()
+
+ # Execute glob pattern using pathlib
+ matches = list(search_dir.glob(pattern))
+
+ # Filter out directories, keep only files
+ file_matches = [match for match in matches if match.is_file()]
+
+ # Sort by modification time (newest first)
+ file_matches.sort(key=lambda f: f.stat().st_mtime, reverse=True)
+
+ # If no matches found
+ if not file_matches:
+ return f"No files found matching pattern \"{pattern}\" within {search_dir}"
+
+ # Limit results and prepare file list
+ original_count = len(file_matches)
+ need_truncation = original_count > MAX_GLOB_RESULTS
+ if need_truncation:
+ file_matches = file_matches[:MAX_GLOB_RESULTS]
+
+ # Convert Path objects to relative strings for display
+ file_list_description = "\n".join(str(match.relative_to(search_dir)) for match in file_matches)
+
+ # Format result message
+ result_message = f"Found {len(file_matches)} file(s) matching \"{pattern}\" within {search_dir}"
+ result_message += f", sorted by modification time (newest first):\n{file_list_description}"
+
+ # Add truncation note if needed
+ if need_truncation:
+ result_message += f"\n\nNote: Results limited to {MAX_GLOB_RESULTS} files. Total matches found: {original_count}"
+
+ return result_message
+ except FileSystemValidationError as e:
+ return f"ERROR: {e}"
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/grep_tool.py b/src/ii_agent/tools/file_system_tools/grep_tool.py
new file mode 100644
index 00000000..f644b620
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/grep_tool.py
@@ -0,0 +1,185 @@
+"""Content search tool using regular expressions."""
+
+import subprocess
+import re
+
+from pathlib import Path
+from typing import Annotated, Dict, List, Optional
+from pydantic import Field
+from .base import BaseFileSystemTool, FileSystemValidationError
+from ii_agent.utils.file_system_workspace import FileSystemWorkspace
+
+
+DESCRIPTION = """\
+- Fast content search tool that works with any codebase size
+- Searches file contents using regular expressions
+- Supports full regex syntax (eg. "log.*Error", "function\\s+\\w+", etc.)
+- Filter files by pattern with the include parameter (eg. "*.js", "*.{ts,tsx}")
+- Returns file paths with at least one match sorted by modification time
+- Use this tool when you need to find files containing specific patterns
+- If you need to identify/count the number of matches within files, use the Bash tool with `rg` (ripgrep) directly. Do NOT use `grep`.
+- When you are doing an open ended search that may require multiple rounds of globbing and grepping, use the Agent tool instead
+"""
+MAX_GLOB_RESULTS = 100
+COMMAND_TIMEOUT = 30
+
+
+class GrepToolError(Exception):
+ """Custom exception for grep tool errors."""
+ pass
+
+def run_ripgrep(pattern: str, search_path: Path, include: Optional[str] = None) -> List[Dict[str, str]]:
+ """Execute ripgrep command and parse results."""
+ try:
+ # Build ripgrep command
+ cmd = ['rg', '--line-number', '--no-heading', '--color=never']
+
+ # Add include pattern if specified
+ if include:
+ cmd.extend(['--glob', include])
+
+ # Add the pattern and search path (convert Path to string for subprocess)
+ cmd.extend([pattern, str(search_path)])
+
+ # Execute ripgrep
+ result = subprocess.run(cmd,
+ capture_output=True,
+ text=True,
+ timeout=COMMAND_TIMEOUT)
+
+ if result.returncode == 1:
+ # No matches found
+ return []
+ elif result.returncode != 0:
+ # Error occurred
+ raise subprocess.CalledProcessError(result.returncode, cmd, result.stderr)
+
+ # Parse the output
+ matches = []
+ for line in result.stdout.strip().split('\n'):
+ if not line:
+ continue
+
+ # ripgrep output format: file:line_number:content
+ parts = line.split(':', 2)
+ matches.append({
+ 'file_path': parts[0],
+ 'line_number': parts[1],
+ 'content': parts[2]
+ })
+
+ return matches
+
+ except subprocess.TimeoutExpired:
+ raise GrepToolError("Search operation timed out")
+ except subprocess.CalledProcessError as e:
+ raise GrepToolError(f"Ripgrep command failed: {e.stderr.strip()}")
+
+class GrepTool(BaseFileSystemTool):
+ """Tool for searching file contents using regular expressions."""
+
+ name = "Grep"
+ description = DESCRIPTION
+
+ def __init__(self, workspace_manager: FileSystemWorkspace):
+ super().__init__(workspace_manager)
+
+ def _validate_regex_pattern(self, pattern: str) -> bool:
+ """Validate if the pattern is a valid regular expression."""
+ try:
+ re.compile(pattern)
+ return True
+ except re.error:
+ return False
+
+ def _format_results(self, matches: List[Dict[str, str]], pattern: str, search_path: Path, include: Optional[str] = None) -> str:
+ """Format search results for display."""
+ if not matches:
+ search_desc = f"pattern \"{pattern}\" in {search_path}"
+ if include:
+ search_desc += f" (filter: {include})"
+ return f"No matches found for {search_desc}"
+
+ # Group matches by file
+ files_with_matches = {}
+ for match in matches:
+ file_path = match['file_path']
+ if file_path not in files_with_matches:
+ files_with_matches[file_path] = []
+ files_with_matches[file_path].append(match)
+
+ # Sort files by name
+ sorted_files = sorted(files_with_matches.keys())
+
+ # Limit total results
+ total_matches = len(matches)
+ if total_matches > MAX_GLOB_RESULTS:
+ # Truncate results
+ truncated_matches = []
+ for file_path in sorted_files:
+ for match in files_with_matches[file_path]:
+ truncated_matches.append(match)
+ if len(truncated_matches) >= MAX_GLOB_RESULTS:
+ break
+ if len(truncated_matches) >= MAX_GLOB_RESULTS:
+ break
+ matches = truncated_matches
+
+ # Recalculate files with matches
+ files_with_matches = {}
+ for match in matches:
+ file_path = match['file_path']
+ if file_path not in files_with_matches:
+ files_with_matches[file_path] = []
+ files_with_matches[file_path].append(match)
+ sorted_files = sorted(files_with_matches.keys())
+
+ # Format output
+ result_lines = []
+ search_desc = f"pattern \"{pattern}\" in {search_path}"
+ if include:
+ search_desc += f" (filter: {include})"
+
+ result_lines.append(f"Found {len(matches)} matches for {search_desc}:")
+ result_lines.append("---")
+
+ for file_path in sorted_files:
+ result_lines.append(f"File: {file_path}")
+ for match in files_with_matches[file_path]:
+ line_content = match['content'].strip()
+ result_lines.append(f"L{match['line_number']}: {line_content}")
+ result_lines.append("---")
+
+ if total_matches > MAX_GLOB_RESULTS:
+ result_lines.append(f"Note: Results limited to {MAX_GLOB_RESULTS} matches. Total matches found: {total_matches}")
+
+ return '\n'.join(result_lines)
+
+ def run_impl(
+ self,
+ pattern: Annotated[str, Field(description="The regular expression pattern to search for in file contents")],
+ path: Annotated[Optional[str], Field(description="The directory to search in. Defaults to the current working directory.")] = None,
+ include: Annotated[Optional[str], Field(description="File pattern to include in the search (e.g. \"*.js\", \"*.{ts,tsx}\")")] = None,
+ ) -> str:
+ """
+ Search for pattern in files using ripgrep.
+ """
+
+ # Validate the regex pattern
+ if not self._validate_regex_pattern(pattern):
+ return f"ERROR: Invalid regular expression pattern: {pattern}"
+
+ try:
+ # Determine search directory using Path
+ if path is None:
+ search_dir = self.workspace_manager.get_workspace_path()
+ else:
+ self.validate_existing_directory_path(path)
+ search_dir = Path(path).resolve()
+
+ matches = run_ripgrep(pattern, search_dir, include)
+
+ # Format and return results
+ return self._format_results(matches, pattern, search_dir, include)
+ except (FileSystemValidationError, GrepToolError) as e:
+ return f"ERROR: {e}"
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/ls_tool.py b/src/ii_agent/tools/file_system_tools/ls_tool.py
new file mode 100644
index 00000000..f6c76d8d
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/ls_tool.py
@@ -0,0 +1,268 @@
+"""Directory listing tool for exploring file system structure."""
+
+import os
+import fnmatch
+
+from pathlib import Path
+from typing import Annotated, Optional, List, NamedTuple
+from pydantic import Field
+from .base import BaseFileSystemTool, FileSystemValidationError
+from ii_agent.utils.file_system_workspace import FileSystemWorkspace
+
+
+DESCRIPTION = """Lists files and directories in a given path. The path parameter must be an absolute path, not a relative path. You can optionally provide an array of glob patterns to ignore with the ignore parameter. You should generally prefer the Glob and Grep tools, if you know which directories to search."""
+
+# Constants
+MAX_FILES = 1000
+TRUNCATED_MESSAGE = f"There are more than {MAX_FILES} files in the repository. Use the LS tool (passing a specific path), Bash tool, and other tools to explore nested directories. The first {MAX_FILES} files and directories are included below:\n\n"
+
+
+class TreeNode(NamedTuple):
+ """Represents a node in the file tree."""
+ name: str
+ path: str
+ type: str # 'file' or 'directory'
+ children: Optional[List['TreeNode']] = None
+
+
+class LSTool(BaseFileSystemTool):
+ """Tool for listing files and directories."""
+
+ name = "LS"
+ description = DESCRIPTION
+
+ def __init__(self, workspace_manager: FileSystemWorkspace):
+ super().__init__(workspace_manager)
+
+ def _should_skip(self, path: Path, ignore_patterns: Optional[List[str]] = None) -> bool:
+ """
+ Determine if a path should be skipped based on filtering rules.
+
+ Args:
+ path: The file or directory Path to check
+ ignore_patterns: Optional list of glob patterns to ignore
+
+ Returns:
+ True if the path should be skipped, False otherwise
+ """
+ # Skip dotfiles and directories (except current directory ".")
+ if path.name.startswith(".") and path.name not in (".", ".."):
+ return True
+
+ # Check if any part of the path contains hidden directories
+ for part in path.parts:
+ if part.startswith(".") and part not in (".", ".."):
+ return True
+
+ # Skip __pycache__ directories
+ if "__pycache__" in path.parts:
+ return True
+
+ # Check custom ignore patterns
+ if ignore_patterns:
+ for pattern in ignore_patterns:
+ if fnmatch.fnmatch(path.name, pattern):
+ return True
+
+ return False
+
+ def _list_directory(self, initial_path: Path, base_path: Path, ignore_patterns: Optional[List[str]] = None) -> List[str]:
+ """
+ Recursively list files and directories.
+
+ Args:
+ initial_path: The starting directory Path
+ base_path: Base directory Path for relative path calculation
+ ignore_patterns: Optional list of glob patterns to ignore
+
+ Returns:
+ List of relative paths from base_path as strings
+ """
+ results = []
+ queue = [initial_path]
+
+ while queue and len(results) <= MAX_FILES:
+ current_path = queue.pop(0)
+
+ if self._should_skip(current_path, ignore_patterns):
+ continue
+
+ # Add directory to results if it's not the initial path
+ if current_path != initial_path:
+ try:
+ relative_path = current_path.relative_to(base_path)
+ results.append(str(relative_path) + os.sep)
+ except ValueError:
+ # Skip if we can't make it relative
+ continue
+
+ # Try to read directory contents
+ try:
+ entries = list(current_path.iterdir())
+ entries.sort(key=lambda p: p.name.lower()) # Sort entries for consistent output
+
+ for entry_path in entries:
+ if self._should_skip(entry_path, ignore_patterns):
+ continue
+
+ if entry_path.is_dir():
+ # Add directory to queue for processing
+ queue.append(entry_path)
+ else:
+ # Add file to results
+ try:
+ relative_path = entry_path.relative_to(base_path)
+ results.append(str(relative_path))
+ except ValueError:
+ # Skip if we can't make it relative
+ continue
+
+ # Check if we've hit the limit
+ if len(results) > MAX_FILES:
+ return results
+
+ except (OSError, PermissionError):
+ # Log error but continue processing
+ continue
+
+ return results
+
+ def _create_file_tree(self, sorted_paths: List[str]) -> List[TreeNode]:
+ """
+ Create a tree structure from a list of sorted paths.
+
+ Args:
+ sorted_paths: List of relative file paths as strings
+
+ Returns:
+ List of TreeNode objects representing the tree structure
+ """
+ root = []
+
+ for path_str in sorted_paths:
+ path = Path(path_str)
+ parts = path.parts
+ current_level = root
+ current_path_parts = []
+
+ for i, part in enumerate(parts):
+ current_path_parts.append(part)
+ current_path_str = str(Path(*current_path_parts))
+ is_last_part = i == len(parts) - 1
+
+ # Find existing node at current level
+ existing_node = None
+ for node in current_level:
+ if node.name == part:
+ existing_node = node
+ break
+
+ if existing_node:
+ # Use existing node
+ current_level = existing_node.children if existing_node.children else []
+ else:
+ # Create new node
+ node_type = "file" if is_last_part else "directory"
+ children = [] if not is_last_part else None
+
+ new_node = TreeNode(
+ name=part,
+ path=current_path_str,
+ type=node_type,
+ children=children
+ )
+
+ current_level.append(new_node)
+ current_level = children if children is not None else []
+
+ return root
+
+ def _print_tree(self, tree: List[TreeNode], level: int = 0, prefix: str = "") -> str:
+ """
+ Format tree structure as a readable string.
+
+ Args:
+ tree: List of TreeNode objects to format
+ level: Current indentation level
+ prefix: Current line prefix
+
+ Returns:
+ Formatted tree string
+ """
+ result = ""
+
+ # Add absolute path at root level
+ if level == 0:
+ result += f"- {Path.cwd()}{os.sep}\n"
+ prefix = " "
+
+ for node in tree:
+ # Add current node
+ suffix = os.sep if node.type == "directory" else ""
+ result += f"{prefix}- {node.name}{suffix}\n"
+
+ # Recursively add children
+ if node.children:
+ result += self._print_tree(node.children, level + 1, f"{prefix} ")
+
+ return result
+
+ def run_impl(
+ self,
+ path: Annotated[str, Field(description="The absolute path to the directory to list (must be absolute, not relative)")],
+ ignore: Annotated[Optional[List[str]], Field(description="List of glob patterns to ignore")] = None,
+ ) -> str:
+ """
+ Execute the directory listing operation.
+
+ Args:
+ path: Absolute path to the directory to list
+ ignore: Optional list of glob patterns to ignore
+
+ Returns:
+ Formatted directory tree as string
+ """
+
+ try:
+ self.validate_existing_directory_path(path)
+
+ target_path = Path(path).resolve()
+
+ # List directory contents
+ file_paths = self._list_directory(target_path, target_path, ignore)
+
+ # Check if directory is empty
+ if not file_paths:
+ return f"Directory {target_path} is empty."
+
+ # Check if we hit the limit
+ is_truncated = len(file_paths) > MAX_FILES
+ if is_truncated:
+ file_paths = file_paths[:MAX_FILES]
+
+ # Sort file paths (directories first, then alphabetically)
+ def sort_key(p: str) -> tuple:
+ # Remove trailing slash for comparison
+ clean_path = p.rstrip(os.sep)
+ # Check if it's a directory by looking for trailing slash in original
+ is_dir = p.endswith(os.sep)
+ # Return tuple for sorting: (not is_dir, lowercase path)
+ # not is_dir so directories (True -> False -> 0) come before files (False -> True -> 1)
+ return (not is_dir, clean_path.lower())
+
+ file_paths.sort(key=sort_key)
+
+ # Create tree structure
+ tree = self._create_file_tree(file_paths)
+
+ # Generate formatted output
+ result = ""
+ if is_truncated:
+ result += TRUNCATED_MESSAGE
+
+ result += self._print_tree(tree)
+
+ return result.rstrip()
+
+ except (FileSystemValidationError) as e:
+ return f"ERROR: {e}"
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/multi_edit_tool.py b/src/ii_agent/tools/file_system_tools/multi_edit_tool.py
new file mode 100644
index 00000000..ae24eabd
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/multi_edit_tool.py
@@ -0,0 +1,110 @@
+"""MultiEdit tool for making multiple edits to a single file atomically."""
+
+from typing import List, Dict, Any
+from pathlib import Path
+from .base import BaseFileSystemTool, FileSystemValidationError
+from .file_edit_tool import perform_replacement, FileEditToolError
+from ii_agent.utils.file_system_workspace import FileSystemWorkspace
+
+
+DESCRIPTION = """This is a tool for making multiple edits to a single file in one operation. It is built on top of the Edit tool and allows you to perform multiple find-and-replace operations efficiently. Prefer this tool over the Edit tool when you need to make multiple edits to the same file.
+
+Before using this tool:
+
+1. Use the Read tool to understand the file's contents and context
+2. Verify the directory path is correct
+
+To make multiple file edits, provide the following:
+1. file_path: The absolute path to the file to modify (must be absolute, not relative)
+2. edits: An array of edit operations to perform, where each edit contains:
+ - old_string: The text to replace (must match the file contents exactly, including all whitespace and indentation)
+ - new_string: The edited text to replace the old_string
+ - replace_all: Replace all occurences of old_string. This parameter is optional and defaults to false.
+
+IMPORTANT:
+- All edits are applied in sequence, in the order they are provided
+- Each edit operates on the result of the previous edit
+- All edits must be valid for the operation to succeed - if any edit fails, none will be applied
+- This tool is ideal when you need to make several changes to different parts of the same file
+
+CRITICAL REQUIREMENTS:
+1. All edits follow the same requirements as the single Edit tool
+2. The edits are atomic - either all succeed or none are applied
+3. Plan your edits carefully to avoid conflicts between sequential operations
+
+WARNING:
+- The tool will fail if edits.old_string doesn't match the file contents exactly (including whitespace)
+- The tool will fail if edits.old_string and edits.new_string are the same
+- Since edits are applied in sequence, ensure that earlier edits don't affect the text that later edits are trying to find
+
+When making edits:
+- Ensure all edits result in idiomatic, correct code
+- Do not leave the code in a broken state
+- Always use absolute file paths (starting with /)
+- Only use emojis if the user explicitly requests it. Avoid adding emojis to files unless asked.
+- Use replace_all for replacing and renaming strings across the file. This parameter is useful if you want to rename a variable for instance."""
+
+
+class MultiEditTool(BaseFileSystemTool):
+ """Tool for making multiple edits to a single file in one operation."""
+
+ name = "MultiEdit"
+ description = DESCRIPTION
+
+ def __init__(self, workspace_manager: FileSystemWorkspace):
+ super().__init__(workspace_manager)
+
+ def run_impl(
+ self,
+ file_path: str,
+ edits: List[Dict[str, Any]],
+ ) -> str:
+ """Execute multiple file edit operations atomically."""
+
+ # Validate that we have edits to perform
+ if not edits:
+ return "ERROR: No edits provided"
+
+ try:
+ self.validate_existing_file_path(file_path)
+ path = Path(file_path).resolve()
+
+ # Read current file content
+ working_content = path.read_text(encoding='utf-8')
+ total_replacements = 0
+
+ # Process each edit in sequence
+ for i, edit in enumerate(edits):
+ # Validate edit structure
+ if not isinstance(edit, dict):
+ return f"ERROR: Edit {i+1} must be a dictionary"
+
+ if 'old_string' not in edit or 'new_string' not in edit:
+ return f"ERROR: Edit {i+1} must contain 'old_string' and 'new_string' fields"
+
+ old_string = edit['old_string']
+ new_string = edit['new_string']
+ replace_all = edit.get('replace_all', False)
+
+ # Validate that old_string and new_string are different
+ if old_string == new_string:
+ return f"ERROR: Edit {i+1}: old_string and new_string cannot be the same"
+
+ # Perform the replacement on working content
+ try:
+ working_content, occurrences = perform_replacement(
+ working_content, old_string, new_string, replace_all
+ )
+ total_replacements += occurrences
+ except FileEditToolError as e:
+ return f"ERROR: Edit {i+1}: {e}"
+
+ # All edits validated successfully, now write the final content
+ path.write_text(working_content, encoding='utf-8')
+
+ return f"Modified file `{path}` - applied {len(edits)} edit(s) with {total_replacements} total replacement(s). Review the changes and make sure they are as expected. Edit the file again if necessary."
+
+ except (FileSystemValidationError, FileEditToolError) as e:
+ return f"ERROR: {e}"
+ except Exception as e:
+ return f"ERROR: {e}"
\ No newline at end of file
diff --git a/src/ii_agent/tools/file_system_tools/utils.py b/src/ii_agent/tools/file_system_tools/utils.py
new file mode 100644
index 00000000..24333e28
--- /dev/null
+++ b/src/ii_agent/tools/file_system_tools/utils.py
@@ -0,0 +1,31 @@
+import os
+import base64
+from glob import glob
+
+
+def encode_image(image_path: str):
+ """Read an image file and encode it to base64."""
+ with open(image_path, "rb") as image_file:
+ return base64.b64encode(image_file.read()).decode("utf-8")
+
+
+def find_similar_file(file_path: str) -> str | None:
+ """Find similar files with different extensions."""
+ try:
+ base_path = os.path.splitext(file_path)[0]
+ parent_dir = os.path.dirname(file_path)
+ base_name = os.path.basename(base_path)
+
+ # Look for files with same base name but different extensions
+ pattern = os.path.join(parent_dir, f"{base_name}.*")
+ similar_files = glob(pattern)
+
+ if similar_files:
+ # Return the first match that's not the original file
+ for similar in similar_files:
+ if similar != file_path:
+ return similar
+
+ return None
+ except Exception:
+ return None
\ No newline at end of file
diff --git a/src/ii_agent/tools/filesystem_tools.py b/src/ii_agent/tools/filesystem_tools.py
new file mode 100644
index 00000000..ffb448f5
--- /dev/null
+++ b/src/ii_agent/tools/filesystem_tools.py
@@ -0,0 +1,619 @@
+"""
+Adapter tools that bridge external file system tools to ii-agent framework.
+
+These tools provide a unified interface for file system operations that work
+in both local and remote (Docker) environments.
+"""
+
+import logging
+import json
+from typing import Any, Optional, List, Dict
+from asyncio import Queue
+
+from ii_agent.llm.message_history import MessageHistory
+from ii_agent.tools.base import LLMTool, ToolImplOutput
+from ii_agent.core.event import EventType, RealtimeEvent
+from ii_agent.core.storage.models.settings import Settings
+from ii_agent.tools.clients.filesystem_client import FileSystemClient
+
+logger = logging.getLogger(__name__)
+
+
+class FileSystemToolError(Exception):
+ """Custom exception for file system tool errors."""
+ pass
+
+
+class ReadTool(LLMTool):
+ """Tool for reading file contents with optional line range specification."""
+
+ name = "Read"
+ description = """Reads and returns the content of a specified file from the local filesystem. Handles text files, images (PNG, JPG, GIF, WEBP, SVG, BMP), and PDF files.
+
+Usage:
+- The file_path parameter must be an absolute path, not a relative path
+- For text files and PDFs: reads up to 2000 lines by default with optional offset/limit parameters
+- For images: returns base64-encoded content with MIME type information
+- For PDFs: extracts and returns readable text content (falls back to base64 if text extraction fails)
+- Any lines longer than 2000 characters will be truncated
+- Results are returned using cat -n format, with line numbers starting at 1
+- If you read a file that exists but has empty contents you will receive a system reminder warning in place of file contents."""
+
+ input_schema = {
+ "type": "object",
+ "properties": {
+ "file_path": {
+ "type": "string",
+ "description": "The absolute path to the file to read",
+ },
+ "limit": {
+ "type": "integer",
+ "description": "The number of lines to read. Only provide if the file is too large to read at once.",
+ },
+ "offset": {
+ "type": "integer",
+ "description": "The line number to start reading from. Only provide if the file is too large to read at once",
+ },
+ },
+ "required": ["file_path"],
+ }
+
+ def __init__(self, settings: Settings, message_queue: Optional[Queue] = None):
+ super().__init__()
+ self.message_queue = message_queue
+ self.filesystem_client = FileSystemClient(settings)
+
+ async def run_impl(
+ self,
+ tool_input: dict[str, Any],
+ message_history: Optional[MessageHistory] = None,
+ ) -> ToolImplOutput:
+ file_path = tool_input["file_path"]
+ limit = tool_input.get("limit")
+ offset = tool_input.get("offset")
+
+ try:
+ response = self.filesystem_client.read_file(file_path, limit, offset)
+
+ if response.success:
+ return ToolImplOutput(
+ response.file_content,
+ f"Successfully read file {file_path}",
+ {"success": True},
+ )
+ else:
+ return ToolImplOutput(
+ response.file_content,
+ f"Failed to read file {file_path}",
+ {"success": False},
+ )
+ except Exception as e:
+ return ToolImplOutput(
+ f"Error reading file: {str(e)}",
+ f"Error reading file {file_path}",
+ {"success": False},
+ )
+
+ def get_tool_start_message(self, tool_input: dict[str, Any]) -> str:
+ return f"Reading file {tool_input['file_path']}"
+
+
+class EditTool(LLMTool):
+ """Tool for making targeted string replacements in files."""
+
+ name = "Edit"
+ description = """Performs exact string replacements in files.
+
+Usage:
+- You must use your `Read` tool at least once in the conversation before editing. This tool will error if you attempt an edit without reading the file.
+- When editing text from Read tool output, ensure you preserve the exact indentation (tabs/spaces) as it appears AFTER the line number prefix. The line number prefix format is: spaces + line number + tab. Everything after that tab is the actual file content to match. Never include any part of the line number prefix in the old_string or new_string.
+- ALWAYS prefer editing existing files in the codebase. NEVER write new files unless explicitly required.
+- Only use emojis if the user explicitly requests it. Avoid adding emojis to files unless asked.
+- The edit will FAIL if `old_string` is not unique in the file. Either provide a larger string with more surrounding context to make it unique or use `replace_all` to change every instance of `old_string`.
+- Use `replace_all` for replacing and renaming strings across the file. This parameter is useful if you want to rename a variable for instance."""
+
+ input_schema = {
+ "type": "object",
+ "properties": {
+ "file_path": {
+ "type": "string",
+ "description": "The absolute path to the file to modify",
+ },
+ "old_string": {
+ "type": "string",
+ "description": "The text to replace",
+ },
+ "new_string": {
+ "type": "string",
+ "description": "The text to replace it with (must be different from old_string)",
+ },
+ "replace_all": {
+ "type": "boolean",
+ "description": "Replace all occurences of old_string (default false)",
+ "default": False,
+ },
+ },
+ "required": ["file_path", "old_string", "new_string"],
+ }
+
+ def __init__(self, settings: Settings, message_queue: Optional[Queue] = None):
+ super().__init__()
+ self.message_queue = message_queue
+ self.filesystem_client = FileSystemClient(settings)
+
+ async def run_impl(
+ self,
+ tool_input: dict[str, Any],
+ message_history: Optional[MessageHistory] = None,
+ ) -> ToolImplOutput:
+ file_path = tool_input["file_path"]
+ old_string = tool_input["old_string"]
+ new_string = tool_input["new_string"]
+ replace_all = tool_input.get("replace_all", False)
+
+ try:
+ response = self.filesystem_client.edit_file(file_path, old_string, new_string, replace_all)
+
+ if response.success:
+ self._send_file_update(file_path)
+ return ToolImplOutput(
+ response.file_content,
+ f"Successfully edited file {file_path}",
+ {"success": True},
+ )
+ else:
+ return ToolImplOutput(
+ response.file_content,
+ f"Failed to edit file {file_path}",
+ {"success": False},
+ )
+ except Exception as e:
+ return ToolImplOutput(
+ f"Error editing file: {str(e)}",
+ f"Error editing file {file_path}",
+ {"success": False},
+ )
+
+ def get_tool_start_message(self, tool_input: dict[str, Any]) -> str:
+ return f"Editing file {tool_input['file_path']}"
+
+ def _send_file_update(self, file_path: str):
+ """Send file update event through message queue if available."""
+ if self.message_queue:
+ # Read the updated file content to send the update
+ try:
+ response = self.filesystem_client.read_file(file_path)
+ if response.success:
+ self.message_queue.put_nowait(
+ RealtimeEvent(
+ type=EventType.FILE_EDIT,
+ content={
+ "path": str(file_path),
+ "content": response.file_content,
+ "total_lines": len(response.file_content.splitlines()),
+ },
+ )
+ )
+ except Exception as e:
+ logger.error(f"Failed to send file update for {file_path}: {e}")
+
+
+class WriteTool(LLMTool):
+ """Tool for creating new files or overwriting existing ones."""
+
+ name = "Write"
+ description = """Writes a file to the local filesystem.
+
+Usage:
+- This tool will overwrite the existing file if there is one at the provided path.
+- If this is an existing file, you MUST use the Read tool first to read the file's contents. This tool will fail if you did not read the file first.
+- ALWAYS prefer editing existing files in the codebase. NEVER write new files unless explicitly required.
+- NEVER proactively create documentation files (*.md) or README files. Only create documentation files if explicitly requested by the User.
+- Only use emojis if the user explicitly requests it. Avoid writing emojis to files unless asked."""
+
+ input_schema = {
+ "type": "object",
+ "properties": {
+ "file_path": {
+ "type": "string",
+ "description": "The absolute path to the file to write (must be absolute, not relative)",
+ },
+ "content": {
+ "type": "string",
+ "description": "The content to write to the file",
+ },
+ },
+ "required": ["file_path", "content"],
+ }
+
+ def __init__(self, settings: Settings, message_queue: Optional[Queue] = None):
+ super().__init__()
+ self.message_queue = message_queue
+ self.filesystem_client = FileSystemClient(settings)
+
+ async def run_impl(
+ self,
+ tool_input: dict[str, Any],
+ message_history: Optional[MessageHistory] = None,
+ ) -> ToolImplOutput:
+ file_path = tool_input["file_path"]
+ content = tool_input["content"]
+
+ try:
+ response = self.filesystem_client.write_file(file_path, content)
+
+ if response.success:
+ self._send_file_update(file_path, content)
+ return ToolImplOutput(
+ response.file_content,
+ f"Successfully wrote file {file_path}",
+ {"success": True},
+ )
+ else:
+ return ToolImplOutput(
+ response.file_content,
+ f"Failed to write file {file_path}",
+ {"success": False},
+ )
+ except Exception as e:
+ return ToolImplOutput(
+ f"Error writing file: {str(e)}",
+ f"Error writing file {file_path}",
+ {"success": False},
+ )
+
+ def get_tool_start_message(self, tool_input: dict[str, Any]) -> str:
+ return f"Writing file {tool_input['file_path']}"
+
+ def _send_file_update(self, file_path: str, content: str):
+ """Send file update event through message queue if available."""
+ if self.message_queue:
+ self.message_queue.put_nowait(
+ RealtimeEvent(
+ type=EventType.FILE_EDIT,
+ content={
+ "path": str(file_path),
+ "content": content,
+ "total_lines": len(content.splitlines()),
+ },
+ )
+ )
+
+
+class MultiEditTool(LLMTool):
+ """Tool for making multiple edits to a single file in one operation."""
+
+ name = "MultiEdit"
+ description = """This is a tool for making multiple edits to a single file in one operation. It is built on top of the Edit tool and allows you to perform multiple find-and-replace operations efficiently. Prefer this tool over the Edit tool when you need to make multiple edits to the same file.
+
+Before using this tool:
+
+1. Use the Read tool to understand the file's contents and context
+2. Verify the directory path is correct
+
+To make multiple file edits, provide the following:
+1. file_path: The absolute path to the file to modify (must be absolute, not relative)
+2. edits: An array of edit operations to perform, where each edit contains:
+ - old_string: The text to replace (must match the file contents exactly, including all whitespace and indentation)
+ - new_string: The edited text to replace the old_string
+ - replace_all: Replace all occurences of old_string. This parameter is optional and defaults to false.
+
+IMPORTANT:
+- All edits are applied in sequence, in the order they are provided
+- Each edit operates on the result of the previous edit
+- All edits must be valid for the operation to succeed - if any edit fails, none will be applied
+- This tool is ideal when you need to make several changes to different parts of the same file
+- For Jupyter notebooks (.ipynb files), use the NotebookEdit instead
+
+CRITICAL REQUIREMENTS:
+1. All edits follow the same requirements as the single Edit tool
+2. The edits are atomic - either all succeed or none are applied
+3. Plan your edits carefully to avoid conflicts between sequential operations
+
+WARNING:
+- The tool will fail if edits.old_string doesn't match the file contents exactly (including whitespace)
+- The tool will fail if edits.old_string and edits.new_string are the same
+- Since edits are applied in sequence, ensure that earlier edits don't affect the text that later edits are trying to find
+
+When making edits:
+- Ensure all edits result in idiomatic, correct code
+- Do not leave the code in a broken state
+- Always use absolute file paths (starting with /)
+- Only use emojis if the user explicitly requests it. Avoid adding emojis to files unless asked.
+- Use replace_all for replacing and renaming strings across the file. This parameter is useful if you want to rename a variable for instance.
+
+If you want to create a new file, use:
+- A new file path, including dir name if needed
+- First edit: empty old_string and the new file's contents as new_string
+- Subsequent edits: normal edit operations on the created content"""
+
+ input_schema = {
+ "type": "object",
+ "properties": {
+ "file_path": {
+ "type": "string",
+ "description": "The absolute path to the file to modify",
+ },
+ "edits": {
+ "type": "array",
+ "description": "Array of edit operations to perform sequentially on the file",
+ "items": {
+ "type": "object",
+ "properties": {
+ "old_string": {
+ "type": "string",
+ "description": "The text to replace",
+ },
+ "new_string": {
+ "type": "string",
+ "description": "The text to replace it with",
+ },
+ "replace_all": {
+ "type": "boolean",
+ "description": "Replace all occurences of old_string (default false).",
+ "default": False,
+ },
+ },
+ "required": ["old_string", "new_string"],
+ },
+ "minItems": 1,
+ },
+ },
+ "required": ["file_path", "edits"],
+ }
+
+ def __init__(self, settings: Settings, message_queue: Optional[Queue] = None):
+ super().__init__()
+ self.message_queue = message_queue
+ self.filesystem_client = FileSystemClient(settings)
+
+ async def run_impl(
+ self,
+ tool_input: dict[str, Any],
+ message_history: Optional[MessageHistory] = None,
+ ) -> ToolImplOutput:
+ file_path = tool_input["file_path"]
+ edits = tool_input["edits"]
+
+ try:
+ response = self.filesystem_client.multi_edit(file_path, edits)
+
+ if response.success:
+ self._send_file_update(file_path)
+ return ToolImplOutput(
+ response.file_content,
+ f"Successfully applied {len(edits)} edits to file {file_path}",
+ {"success": True},
+ )
+ else:
+ return ToolImplOutput(
+ response.file_content,
+ f"Failed to apply edits to file {file_path}",
+ {"success": False},
+ )
+ except Exception as e:
+ return ToolImplOutput(
+ f"Error applying multi-edit: {str(e)}",
+ f"Error applying edits to file {file_path}",
+ {"success": False},
+ )
+
+ def get_tool_start_message(self, tool_input: dict[str, Any]) -> str:
+ num_edits = len(tool_input.get("edits", []))
+ return f"Applying {num_edits} edits to file {tool_input['file_path']}"
+
+ def _send_file_update(self, file_path: str):
+ """Send file update event through message queue if available."""
+ if self.message_queue:
+ try:
+ response = self.filesystem_client.read_file(file_path)
+ if response.success:
+ self.message_queue.put_nowait(
+ RealtimeEvent(
+ type=EventType.FILE_EDIT,
+ content={
+ "path": str(file_path),
+ "content": response.file_content,
+ "total_lines": len(response.file_content.splitlines()),
+ },
+ )
+ )
+ except Exception as e:
+ logger.error(f"Failed to send file update for {file_path}: {e}")
+
+
+class LSTool(LLMTool):
+ """Tool for listing files and directories."""
+
+ name = "LS"
+ description = """Lists files and directories in a given path. The path parameter must be an absolute path, not a relative path. You can optionally provide an array of glob patterns to ignore with the ignore parameter. You should generally prefer the Glob and Grep tools, if you know which directories to search."""
+
+ input_schema = {
+ "type": "object",
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "The absolute path to the directory to list (must be absolute, not relative)",
+ },
+ "ignore": {
+ "type": "array",
+ "description": "List of glob patterns to ignore",
+ "items": {"type": "string"},
+ },
+ },
+ "required": ["path"],
+ }
+
+ def __init__(self, settings: Settings):
+ super().__init__()
+ self.filesystem_client = FileSystemClient(settings)
+
+ async def run_impl(
+ self,
+ tool_input: dict[str, Any],
+ message_history: Optional[MessageHistory] = None,
+ ) -> ToolImplOutput:
+ path = tool_input["path"]
+ ignore = tool_input.get("ignore")
+
+ try:
+ response = self.filesystem_client.ls(path, ignore)
+
+ if response.success:
+ return ToolImplOutput(
+ response.file_content,
+ f"Successfully listed directory {path}",
+ {"success": True},
+ )
+ else:
+ return ToolImplOutput(
+ response.file_content,
+ f"Failed to list directory {path}",
+ {"success": False},
+ )
+ except Exception as e:
+ return ToolImplOutput(
+ f"Error listing directory: {str(e)}",
+ f"Error listing directory {path}",
+ {"success": False},
+ )
+
+ def get_tool_start_message(self, tool_input: dict[str, Any]) -> str:
+ return f"Listing directory {tool_input['path']}"
+
+
+class GlobTool(LLMTool):
+ """Tool for fast file pattern matching."""
+
+ name = "Glob"
+ description = """- Fast file pattern matching tool that works with any codebase size
+- Supports glob patterns like "**/*.js" or "src/**/*.ts"
+- Returns matching file paths sorted by modification time
+- Use this tool when you need to find files by name patterns
+- When you are doing an open ended search that may require multiple rounds of globbing and grepping, use the Agent tool instead
+- You have the capability to call multiple tools in a single response. It is always better to speculatively perform multiple searches as a batch that are potentially useful."""
+
+ input_schema = {
+ "type": "object",
+ "properties": {
+ "pattern": {
+ "type": "string",
+ "description": "The glob pattern to match files against",
+ },
+ "path": {
+ "type": "string",
+ "description": "The directory to search in. If not specified, the current working directory will be used. IMPORTANT: Omit this field to use the default directory. DO NOT enter \"undefined\" or \"null\" - simply omit it for the default behavior. Must be a valid directory path if provided.",
+ },
+ },
+ "required": ["pattern"],
+ }
+
+ def __init__(self, settings: Settings):
+ super().__init__()
+ self.filesystem_client = FileSystemClient(settings)
+
+ async def run_impl(
+ self,
+ tool_input: dict[str, Any],
+ message_history: Optional[MessageHistory] = None,
+ ) -> ToolImplOutput:
+ pattern = tool_input["pattern"]
+ path = tool_input.get("path")
+
+ try:
+ response = self.filesystem_client.glob(pattern, path)
+
+ if response.success:
+ return ToolImplOutput(
+ response.file_content,
+ f"Successfully searched for pattern {pattern}",
+ {"success": True},
+ )
+ else:
+ return ToolImplOutput(
+ response.file_content,
+ f"Failed to search for pattern {pattern}",
+ {"success": False},
+ )
+ except Exception as e:
+ return ToolImplOutput(
+ f"Error searching for pattern: {str(e)}",
+ f"Error searching for pattern {pattern}",
+ {"success": False},
+ )
+
+ def get_tool_start_message(self, tool_input: dict[str, Any]) -> str:
+ return f"Searching for files matching pattern {tool_input['pattern']}"
+
+
+class GrepTool(LLMTool):
+ """Tool for fast content search."""
+
+ name = "Grep"
+ description = """
+- Fast content search tool that works with any codebase size
+- Searches file contents using regular expressions
+- Supports full regex syntax (eg. "log.*Error", "function\\s+\\w+", etc.)
+- Filter files by pattern with the include parameter (eg. "*.js", "*.{ts,tsx}")
+- Returns file paths with at least one match sorted by modification time
+- Use this tool when you need to find files containing specific patterns
+- If you need to identify/count the number of matches within files, use the Bash tool with `rg` (ripgrep) directly. Do NOT use `grep`.
+- When you are doing an open ended search that may require multiple rounds of globbing and grepping, use the Agent tool instead
+"""
+
+ input_schema = {
+ "type": "object",
+ "properties": {
+ "pattern": {
+ "type": "string",
+ "description": "The regular expression pattern to search for in file contents",
+ },
+ "path": {
+ "type": "string",
+ "description": "The directory to search in. Defaults to the current working directory.",
+ },
+ "include": {
+ "type": "string",
+ "description": "File pattern to include in the search (e.g. \"*.js\", \"*.{ts,tsx}\")",
+ },
+ },
+ "required": ["pattern"],
+ }
+
+ def __init__(self, settings: Settings):
+ super().__init__()
+ self.filesystem_client = FileSystemClient(settings)
+
+ async def run_impl(
+ self,
+ tool_input: dict[str, Any],
+ message_history: Optional[MessageHistory] = None,
+ ) -> ToolImplOutput:
+ pattern = tool_input["pattern"]
+ path = tool_input.get("path")
+ include = tool_input.get("include")
+
+ try:
+ response = self.filesystem_client.grep(pattern, path, include)
+
+ if response.success:
+ return ToolImplOutput(
+ response.file_content,
+ f"Successfully searched for content pattern {pattern}",
+ {"success": True},
+ )
+ else:
+ return ToolImplOutput(
+ response.file_content,
+ f"Failed to search for content pattern {pattern}",
+ {"success": False},
+ )
+ except Exception as e:
+ return ToolImplOutput(
+ f"Error searching for content: {str(e)}",
+ f"Error searching for content pattern {pattern}",
+ {"success": False},
+ )
+
+ def get_tool_start_message(self, tool_input: dict[str, Any]) -> str:
+ return f"Searching for content matching pattern {tool_input['pattern']}"
\ No newline at end of file
diff --git a/src/ii_agent/tools/presentation_tool.py b/src/ii_agent/tools/presentation_tool.py
index 67345938..109b0740 100644
--- a/src/ii_agent/tools/presentation_tool.py
+++ b/src/ii_agent/tools/presentation_tool.py
@@ -5,7 +5,7 @@
from ii_agent.tools.base import LLMTool
from ii_agent.utils.workspace_manager import WorkspaceManager
from ii_agent.tools.bash_tool import create_bash_tool
-from ii_agent.tools.str_replace_tool_relative import StrReplaceEditorTool
+from ii_agent.tools.filesystem_tools import EditTool, WriteTool
from ii_agent.llm.message_history import MessageHistory
from ii_agent.tools.base import ToolImplOutput
@@ -185,7 +185,8 @@ def __init__(
self.bash_tool = create_bash_tool(ask_user_permission, workspace_manager.root)
self.tools = [
self.bash_tool,
- StrReplaceEditorTool(workspace_manager=workspace_manager),
+ # TODO: Add back file editing tool when settings are available
+ # StrReplaceEditorTool(workspace_manager=workspace_manager),
]
image_search_tool = ImageSearchTool()
if image_search_tool.is_available():
diff --git a/src/ii_agent/tools/todo_tools.py b/src/ii_agent/tools/todo_tools.py
new file mode 100644
index 00000000..b487eaaa
--- /dev/null
+++ b/src/ii_agent/tools/todo_tools.py
@@ -0,0 +1,168 @@
+import requests
+from typing import Any, Optional
+
+from ii_agent.llm.message_history import MessageHistory
+from ii_agent.tools.base import LLMTool, ToolImplOutput
+from ii_agent.core.storage.models.settings import Settings
+
+
+class TodoReadTool(LLMTool):
+ """Tool for reading the current to-do list for the session."""
+
+ name = "TodoRead"
+ description = """Use this tool to read the current to-do list for the session. This tool should be used proactively and frequently to ensure that you are aware of
+the status of the current task list. You should make use of this tool as often as possible, especially in the following situations:
+- At the beginning of conversations to see what's pending
+- Before starting new tasks to prioritize work
+- When the user asks about previous tasks or plans
+- Whenever you're uncertain about what to do next
+- After completing tasks to update your understanding of remaining work
+- After every few messages to ensure you're on track
+
+Usage:
+- This tool takes in no parameters. So leave the input blank or empty. DO NOT include a dummy object, placeholder string or a key like \"input\" or \"empty\". LEAVE IT BLANK.
+- Returns a list of todo items with their status, priority, and content
+- Use this information to track progress and plan next steps
+- If no todos exist yet, an empty list will be returned"""
+
+ input_schema = {
+ "type": "object",
+ "properties": {
+ },
+ "required": [],
+ }
+
+ def __init__(self, settings: Settings):
+ super().__init__()
+ self.server_url = settings.client_config.server_url
+
+ async def run_impl(
+ self,
+ tool_input: dict[str, Any],
+ message_history: Optional[MessageHistory] = None,
+ ) -> ToolImplOutput:
+ endpoint = f"{self.server_url}/api/todo/todo_read"
+ response = requests.get(endpoint)
+ response.raise_for_status()
+
+ result = response.json()
+ return ToolImplOutput(
+ result["message"],
+ f"Successfully read the todo list from {endpoint}",
+ {"success": True},
+ )
+
+class TodoWriteTool(LLMTool):
+ """Tool for writing to the current to-do list for the session."""
+
+ name = "TodoWrite"
+ description = """Use this tool to create and manage a structured task list for your current coding session. This helps you track progress, organize complex tasks, and demonstrate thoroughness to the user.
+It also helps the user understand the progress of the task and overall progress of their requests.
+
+## When to Use This Tool
+Use this tool proactively in these scenarios:
+
+1. Complex multi-step tasks - When a task requires 3 or more distinct steps or actions
+2. Non-trivial and complex tasks - Tasks that require careful planning or multiple operations
+3. User explicitly requests todo list - When the user directly asks you to use the todo list
+4. User provides multiple tasks - When users provide a list of things to be done (numbered or comma-separated)
+5. After receiving new instructions - Immediately capture user requirements as todos
+6. When you start working on a task - Mark it as in_progress BEFORE beginning work. Ideally you should only have one todo as in_progress at a time
+7. After completing a task - Mark it as completed and add any new follow-up tasks discovered during implementation
+
+## When NOT to Use This Tool
+
+Skip using this tool when:
+1. There is only a single, straightforward task
+2. The task is trivial and tracking it provides no organizational benefit
+3. The task can be completed in less than 3 trivial steps
+4. The task is purely conversational or informational
+
+NOTE that you should not use this tool if there is only one trivial task to do. In this case you are better off just doing the task directly.
+
+## Task States and Management
+
+1. **Task States**: Use these states to track progress:
+ - pending: Task not yet started
+ - in_progress: Currently working on (limit to ONE task at a time)
+ - completed: Task finished successfully
+
+2. **Task Management**:
+ - Update task status in real-time as you work
+ - Mark tasks complete IMMEDIATELY after finishing (don't batch completions)
+ - Only have ONE task in_progress at any time
+ - Complete current tasks before starting new ones
+ - Remove tasks that are no longer relevant from the list entirely
+
+3. **Task Completion Requirements**:
+ - ONLY mark a task as completed when you have FULLY accomplished it
+ - If you encounter errors, blockers, or cannot finish, keep the task as in_progress
+ - When blocked, create a new task describing what needs to be resolved
+ - Never mark a task as completed if:
+ - Tests are failing
+ - Implementation is partial
+ - You encountered unresolved errors
+ - You couldn't find necessary files or dependencies
+
+4. **Task Breakdown**:
+ - Create specific, actionable items
+ - Break complex tasks into smaller, manageable steps
+ - Use clear, descriptive task names
+
+When in doubt, use this tool. Being proactive with task management demonstrates attentiveness and ensures you complete all requirements successfully."""
+
+ input_schema = {
+ "type": "object",
+ "properties": {
+ "todos": {
+ "type": "array",
+ "description": "The updated todo list",
+ "items": {
+ "type": "object",
+ "properties": {
+ "id": {
+ "type": "string",
+ "description": "Unique identifier for the todo item"
+ },
+ "content": {
+ "type": "string",
+ "minLength": 1,
+ "description": "The descriptive content of the todo item"
+ },
+ "status": {
+ "type": "string",
+ "enum": ["pending", "in_progress", "completed"],
+ "description": "Current status of the todo item"
+ },
+ "priority": {
+ "type": "string",
+ "enum": ["high", "medium", "low"],
+ "description": "Priority level of the todo item"
+ }
+ },
+ "required": ["id", "content", "status", "priority"]
+ }
+ },
+ },
+ "required": ["todos"],
+ }
+
+ def __init__(self, settings: Settings):
+ super().__init__()
+ self.server_url = settings.client_config.server_url
+
+ async def run_impl(
+ self,
+ tool_input: dict[str, Any],
+ message_history: Optional[MessageHistory] = None,
+ ) -> ToolImplOutput:
+ endpoint = f"{self.server_url}/api/todo/todo_write"
+ response = requests.post(endpoint, json={"todos": tool_input["todos"]})
+ response.raise_for_status()
+ result = response.json()
+
+ return ToolImplOutput(
+ result["message"],
+ f"Successfully wrote the todo list to {endpoint}",
+ {"success": True},
+ )
\ No newline at end of file
diff --git a/src/ii_agent/tools/tool_manager.py b/src/ii_agent/tools/tool_manager.py
index f5a57ade..24611183 100644
--- a/src/ii_agent/tools/tool_manager.py
+++ b/src/ii_agent/tools/tool_manager.py
@@ -33,6 +33,15 @@
from ii_agent.tools.str_replace_tool_relative import (
StrReplaceEditorTool as StrReplaceEditorToolRelative,
)
+from ii_agent.tools.filesystem_tools import (
+ ReadTool,
+ EditTool,
+ WriteTool,
+ MultiEditTool,
+ LSTool,
+ GlobTool,
+ GrepTool,
+)
from ii_agent.tools.sequential_thinking_tool import SequentialThinkingTool
from ii_agent.tools.message_tool import MessageTool
from ii_agent.tools.complete_tool import (
@@ -78,6 +87,7 @@
from ii_agent.utils.constants import TOKEN_BUDGET
from ii_agent.core.storage.models.settings import Settings
from ii_agent.utils.sandbox_manager import SandboxManager
+from ii_agent.tools.todo_tools import TodoReadTool, TodoWriteTool
def get_system_tools(
@@ -99,7 +109,7 @@ def get_system_tools(
logger = logging.getLogger("tool_manager")
terminal_client = TerminalClient(settings)
- str_replace_client = StrReplaceClient(settings)
+ # str_replace_client = StrReplaceClient(settings)
tools = []
if workspace_manager.is_local_workspace():
@@ -146,31 +156,50 @@ def get_system_tools(
]
)
- # Str replace tools
+ # File system tools (replacing str_replace tools)
tools.extend(
[
- StrReplaceEditorToolRelative(
- workspace_manager=workspace_manager,
- message_queue=message_queue,
- str_replace_client=str_replace_client,
- ),
+ ReadTool(settings=settings, message_queue=message_queue),
+ EditTool(settings=settings, message_queue=message_queue),
+ WriteTool(settings=settings, message_queue=message_queue),
+ MultiEditTool(settings=settings, message_queue=message_queue),
+ LSTool(settings=settings),
+ GlobTool(settings=settings),
+ GrepTool(settings=settings),
]
)
+ tools.extend(
+ [
+ TodoReadTool(settings=settings),
+ TodoWriteTool(settings=settings),
+ ]
+ )
+ # Str replace tools (deprecated - commented out)
+ #tools.extend(
+ # [
+ # StrReplaceEditorToolRelative(
+ # workspace_manager=workspace_manager,
+ # message_queue=message_queue,
+ # str_replace_client=str_replace_client,
+ # ),
+ # ]
+ #)
+
tools.extend(
[
MessageTool(),
WebSearchTool(settings=settings),
VisitWebpageTool(settings=settings),
- SlideDeckInitTool(
- workspace_manager=workspace_manager,
- terminal_client=terminal_client,
- ),
- SlideDeckCompleteTool(
- workspace_manager=workspace_manager,
- str_replace_client=str_replace_client,
- ),
- DisplayImageTool(workspace_manager=workspace_manager),
+# SlideDeckInitTool(
+# workspace_manager=workspace_manager,
+# terminal_client=terminal_client,
+# ),
+# SlideDeckCompleteTool(
+# workspace_manager=workspace_manager,
+# str_replace_client=str_replace_client,
+# ),
+# DisplayImageTool(workspace_manager=workspace_manager),
]
)
diff --git a/src/ii_agent/utils/file_system_workspace.py b/src/ii_agent/utils/file_system_workspace.py
new file mode 100644
index 00000000..3d6341ca
--- /dev/null
+++ b/src/ii_agent/utils/file_system_workspace.py
@@ -0,0 +1,63 @@
+from pathlib import Path
+
+
+class WorkspaceError(Exception):
+ """Custom exception for workspace-related errors."""
+ pass
+
+class FileSystemWorkspace:
+ """Manages file system operations within a designated workspace directory."""
+
+ def __init__(self, workspace_path: str | Path):
+ """
+ Initialize the WorkspaceManager with a workspace directory.
+
+ Args:
+ workspace_path: Path to the workspace directory (string or Path object)
+
+ Raises:
+ WorkspaceError: If the workspace path is invalid or not a directory
+ """
+ # Convert to Path object if it's a string
+ if isinstance(workspace_path, str):
+ workspace_path = Path(workspace_path)
+
+ # Validate that the path exists and is a directory
+ if not workspace_path.exists():
+ raise WorkspaceError(f"Workspace path `{workspace_path}` does not exist")
+
+ if not workspace_path.is_dir():
+ raise WorkspaceError(f"Workspace path `{workspace_path}` is not a directory")
+
+ self.workspace_path = workspace_path.resolve()
+
+ def get_workspace_path(self) -> Path:
+ """
+ Get the absolute path to the workspace directory.
+
+ Returns:
+ Path object representing the workspace directory
+ """
+ return self.workspace_path
+
+ def validate_boundary(self, path: Path | str) -> bool:
+ """
+ Check if a given path is within the workspace directory.
+
+ Args:
+ path: Path to check (string or Path object)
+
+ Returns:
+ True if path is within workspace, False otherwise
+ """
+ # Convert to Path object if it's a string
+ if isinstance(path, str):
+ path = Path(path)
+ try:
+ # Resolve both paths to absolute, normalized form
+ path = path.resolve()
+ workspace = self.get_workspace_path()
+ # Check if the path is the workspace or inside it
+ return workspace in path.parents or path == workspace
+ except Exception:
+ return False(base)
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/manager/__init__.py b/src/ii_agent/utils/tool_client/manager/__init__.py
index 799bfb28..ce46e0fc 100644
--- a/src/ii_agent/utils/tool_client/manager/__init__.py
+++ b/src/ii_agent/utils/tool_client/manager/__init__.py
@@ -1,6 +1,7 @@
from .terminal_manager import PexpectSessionManager
from .tmux_terminal_manager import TmuxSessionManager
from .str_replace_manager import StrReplaceManager
+from .filesystem_manager import FileSystemManager, FileSystemResponse
from .model import SessionResult, StrReplaceResponse, StrReplaceToolError
__all__ = [
@@ -10,4 +11,6 @@
"PexpectSessionManager",
"TmuxSessionManager",
"StrReplaceManager",
+ "FileSystemManager",
+ "FileSystemResponse",
]
diff --git a/src/ii_agent/utils/tool_client/manager/filesystem_manager.py b/src/ii_agent/utils/tool_client/manager/filesystem_manager.py
new file mode 100644
index 00000000..feac054f
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/manager/filesystem_manager.py
@@ -0,0 +1,237 @@
+"""Manager for file system operations using external file system tools."""
+
+import logging
+import sys
+import os
+from typing import Optional, List, Dict, Any
+from dataclasses import dataclass
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class FileSystemResponse:
+ """Response from file system operations."""
+ success: bool
+ file_content: str
+
+
+class FileSystemManager:
+ """Manager for file system operations that uses external tools directly."""
+
+ def __init__(self, workspace_path: str = "/workspace"):
+ self.workspace_path = workspace_path
+ self._setup_tools()
+
+ def _setup_tools(self):
+ """Setup the internal file system tools."""
+ # Import and initialize internal tools
+ try:
+ from ii_agent.tools.file_system_tools import (
+ FileReadTool, FileEditTool, FileWriteTool, MultiEditTool,
+ LSTool, GlobTool, GrepTool
+ )
+ from ii_agent.utils.file_system_workspace import FileSystemWorkspace
+ # Initialize workspace manager
+ self.workspace_manager = FileSystemWorkspace(self.workspace_path)
+
+ # Initialize tools
+ self.read_tool = FileReadTool(self.workspace_manager)
+ self.edit_tool = FileEditTool(self.workspace_manager)
+ self.write_tool = FileWriteTool(self.workspace_manager)
+ self.multi_edit_tool = MultiEditTool(self.workspace_manager)
+ self.ls_tool = LSTool(self.workspace_manager)
+ self.glob_tool = GlobTool(self.workspace_manager)
+ self.grep_tool = GrepTool(self.workspace_manager)
+
+ self.tools_available = True
+ logger.info("✅ Successfully initialized internal file system tools")
+ print("✅ Successfully initialized internal file system tools")
+ except Exception as e:
+ logger.warning(f"⚠️ Failed to import internal tools: {e}")
+ print(f"⚠️ Failed to import internal tools: {e}")
+ self.tools_available = False
+
+ def _ensure_tools_available(self) -> FileSystemResponse:
+ """Check if tools are available, return error response if not."""
+ if not self.tools_available:
+ return FileSystemResponse(
+ success=False,
+ file_content="ERROR: External file system tools not available in this environment"
+ )
+ return None
+
+ def read_file(
+ self,
+ file_path: str,
+ limit: Optional[int] = None,
+ offset: Optional[int] = None
+ ) -> FileSystemResponse:
+ """Read file contents."""
+ error_check = self._ensure_tools_available()
+ if error_check:
+ return error_check
+
+ try:
+ result = self.read_tool.run_impl(
+ file_path=file_path,
+ limit=limit,
+ offset=offset
+ )
+
+ # Handle different return types
+ if isinstance(result, dict):
+ # For images, convert to JSON string
+ import json
+ content = json.dumps(result)
+ else:
+ content = str(result)
+
+ success = not content.startswith("ERROR:")
+ return FileSystemResponse(success=success, file_content=content)
+
+ except Exception as e:
+ return FileSystemResponse(
+ success=False,
+ file_content=f"Error reading file: {str(e)}"
+ )
+
+ def edit_file(
+ self,
+ file_path: str,
+ old_string: str,
+ new_string: str,
+ replace_all: bool = False
+ ) -> FileSystemResponse:
+ """Edit file contents."""
+ error_check = self._ensure_tools_available()
+ if error_check:
+ return error_check
+
+ try:
+ result = self.edit_tool.run_impl(
+ file_path=file_path,
+ old_string=old_string,
+ new_string=new_string,
+ replace_all=replace_all
+ )
+
+ success = not result.startswith("ERROR:")
+ return FileSystemResponse(success=success, file_content=result)
+
+ except Exception as e:
+ return FileSystemResponse(
+ success=False,
+ file_content=f"Error editing file: {str(e)}"
+ )
+
+ def write_file(self, file_path: str, content: str) -> FileSystemResponse:
+ """Write file contents."""
+ error_check = self._ensure_tools_available()
+ if error_check:
+ return error_check
+
+ try:
+ result = self.write_tool.run_impl(
+ file_path=file_path,
+ content=content
+ )
+
+ success = not result.startswith("ERROR:")
+ return FileSystemResponse(success=success, file_content=result)
+
+ except Exception as e:
+ return FileSystemResponse(
+ success=False,
+ file_content=f"Error writing file: {str(e)}"
+ )
+
+ def multi_edit(self, file_path: str, edits: List[Dict[str, Any]]) -> FileSystemResponse:
+ """Perform multiple edits on a file."""
+ error_check = self._ensure_tools_available()
+ if error_check:
+ return error_check
+
+ try:
+ result = self.multi_edit_tool.run_impl(
+ file_path=file_path,
+ edits=edits
+ )
+
+ success = not result.startswith("ERROR:")
+ return FileSystemResponse(success=success, file_content=result)
+
+ except Exception as e:
+ return FileSystemResponse(
+ success=False,
+ file_content=f"Error in multi-edit: {str(e)}"
+ )
+
+ def ls(self, path: str, ignore: Optional[List[str]] = None) -> FileSystemResponse:
+ """List directory contents."""
+ error_check = self._ensure_tools_available()
+ if error_check:
+ return error_check
+
+ try:
+ result = self.ls_tool.run_impl(
+ path=path,
+ ignore=ignore
+ )
+
+ success = not result.startswith("ERROR:")
+ return FileSystemResponse(success=success, file_content=result)
+
+ except Exception as e:
+ return FileSystemResponse(
+ success=False,
+ file_content=f"Error listing directory: {str(e)}"
+ )
+
+ def glob(self, pattern: str, path: Optional[str] = None) -> FileSystemResponse:
+ """Search for files using glob patterns."""
+ error_check = self._ensure_tools_available()
+ if error_check:
+ return error_check
+
+ try:
+ result = self.glob_tool.run_impl(
+ pattern=pattern,
+ path=path
+ )
+
+ success = not result.startswith("ERROR:")
+ return FileSystemResponse(success=success, file_content=result)
+
+ except Exception as e:
+ return FileSystemResponse(
+ success=False,
+ file_content=f"Error in glob search: {str(e)}"
+ )
+
+ def grep(
+ self,
+ pattern: str,
+ path: Optional[str] = None,
+ include: Optional[str] = None
+ ) -> FileSystemResponse:
+ """Search for content using regex patterns."""
+ error_check = self._ensure_tools_available()
+ if error_check:
+ return error_check
+
+ try:
+ result = self.grep_tool.run_impl(
+ pattern=pattern,
+ path=path,
+ include=include
+ )
+
+ success = not result.startswith("ERROR:")
+ return FileSystemResponse(success=success, file_content=result)
+
+ except Exception as e:
+ return FileSystemResponse(
+ success=False,
+ file_content=f"Error in grep search: {str(e)}"
+ )
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/requirements.txt b/src/ii_agent/utils/tool_client/requirements.txt
index e5b30087..b0e55760 100644
--- a/src/ii_agent/utils/tool_client/requirements.txt
+++ b/src/ii_agent/utils/tool_client/requirements.txt
@@ -2,3 +2,5 @@ fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.5.0
pexpect>=4.8.0
+pymupdf>=1.23.0
+httpx>=0.25.0
diff --git a/src/ii_agent/utils/tool_client/sandbox_server.py b/src/ii_agent/utils/tool_client/sandbox_server.py
index 4ad7083b..1f440650 100644
--- a/src/ii_agent/utils/tool_client/sandbox_server.py
+++ b/src/ii_agent/utils/tool_client/sandbox_server.py
@@ -10,6 +10,8 @@
from .server.str_replace_server import create_app as create_str_replace_app
from .server.terminal_server import create_app as create_terminal_app
+from .server.filesystem_server import create_app as create_filesystem_app
+from .server.todo_server import create_app as create_todo_app
logger = logging.getLogger(__name__)
@@ -64,9 +66,21 @@ def __init__(
allowed_origins=allowed_origins,
)
+ filesystem_app = create_filesystem_app(
+ workspace_path=cwd or "/workspace",
+ allowed_origins=allowed_origins,
+ cwd=cwd,
+ )
+
+ todo_app = create_todo_app(
+ allowed_origins=allowed_origins,
+ )
+
# Mount the sub-applications
self.app.mount("/api/str_replace", str_replace_app)
self.app.mount("/api/terminal", terminal_app)
+ self.app.mount("/api/filesystem", filesystem_app)
+ self.app.mount("/api/todo", todo_app)
# Setup main routes
self._setup_routes()
@@ -86,6 +100,8 @@ async def health_check():
"services": {
"str_replace": "available at /api/str_replace/",
"terminal": "available at /api/terminal/",
+ "filesystem": "available at /api/filesystem/",
+ "todo": "available at /api/todo/",
},
}
@@ -125,6 +141,29 @@ async def root():
"/api/terminal/shell_kill_process",
],
},
+ "filesystem": {
+ "description": "File system operations",
+ "base_path": "/api/filesystem",
+ "endpoints": [
+ "/api/filesystem/health",
+ "/api/filesystem/read_file",
+ "/api/filesystem/edit_file",
+ "/api/filesystem/write_file",
+ "/api/filesystem/multi_edit",
+ "/api/filesystem/ls",
+ "/api/filesystem/glob",
+ "/api/filesystem/grep",
+ ],
+ },
+ "todo": {
+ "description": "Todo operations",
+ "base_path": "/api/todo",
+ "endpoints": [
+ "/api/todo/health",
+ "/api/todo/todo_read",
+ "/api/todo/todo_write",
+ ],
+ },
},
}
@@ -227,6 +266,7 @@ def main():
logger.info(f"Starting Combined Sandbox Server on {args.host}:{args.port}")
logger.info("String replace operations available at /api/str_replace/")
logger.info("Terminal operations available at /api/terminal/")
+ logger.info("File system operations available at /api/filesystem/")
server.run(host=args.host, port=args.port)
diff --git a/src/ii_agent/utils/tool_client/server/filesystem_server.py b/src/ii_agent/utils/tool_client/server/filesystem_server.py
new file mode 100644
index 00000000..73e98dd1
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/server/filesystem_server.py
@@ -0,0 +1,270 @@
+"""FastAPI server for file system operations using FileSystemManager."""
+
+import logging
+from typing import Optional, List, Dict, Any
+
+from fastapi import FastAPI, HTTPException, Request
+from fastapi.responses import JSONResponse
+from fastapi.middleware.cors import CORSMiddleware
+from pydantic import BaseModel, Field
+import uvicorn
+
+from ..manager.filesystem_manager import FileSystemManager
+
+logger = logging.getLogger(__name__)
+
+
+# Pydantic models for request/response validation
+class FileSystemServerResponse(BaseModel):
+ success: bool = Field(..., description="Whether the operation was successful")
+ file_content: str = Field(..., description="File content or error message")
+
+
+class ReadFileRequest(BaseModel):
+ file_path: str = Field(..., description="The absolute path to the file to read")
+ limit: Optional[int] = Field(None, description="The number of lines to read")
+ offset: Optional[int] = Field(None, description="The line number to start reading from")
+
+
+class EditFileRequest(BaseModel):
+ file_path: str = Field(..., description="The absolute path to the file to modify")
+ old_string: str = Field(..., description="The text to replace")
+ new_string: str = Field(..., description="The text to replace it with")
+ replace_all: bool = Field(False, description="Replace all occurrences of old_string")
+
+
+class WriteFileRequest(BaseModel):
+ file_path: str = Field(..., description="The absolute path to the file to write")
+ content: str = Field(..., description="The content to write to the file")
+
+
+class MultiEditRequest(BaseModel):
+ file_path: str = Field(..., description="The absolute path to the file to modify")
+ edits: List[Dict[str, Any]] = Field(..., description="Array of edit operations")
+
+
+class LSRequest(BaseModel):
+ path: str = Field(..., description="The absolute path to the directory to list")
+ ignore: Optional[List[str]] = Field(None, description="List of glob patterns to ignore")
+
+
+class GlobRequest(BaseModel):
+ pattern: str = Field(..., description="The glob pattern to match files against")
+ path: Optional[str] = Field(None, description="The directory to search in")
+
+
+class GrepRequest(BaseModel):
+ pattern: str = Field(..., description="The regular expression pattern to search for")
+ path: Optional[str] = Field(None, description="The directory to search in")
+ include: Optional[str] = Field(None, description="File pattern to include in the search")
+
+
+class FileSystemServer:
+ """FastAPI server for file system operations."""
+
+ def __init__(
+ self,
+ workspace_path: str = "/workspace",
+ allowed_origins: Optional[List[str]] = None,
+ ):
+ self.app = FastAPI(
+ title="File System Server",
+ description="HTTP API for file system operations using FileSystemManager",
+ version="1.0.0",
+ )
+
+ # Initialize filesystem manager
+ self.filesystem_manager = FileSystemManager(workspace_path)
+
+ # Add CORS middleware
+ if allowed_origins is None:
+ allowed_origins = ["*"]
+
+ self.app.add_middleware(
+ CORSMiddleware,
+ allow_origins=allowed_origins,
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+ )
+
+ # Setup routes
+ self._setup_routes()
+
+ # Setup exception handlers
+ self._setup_exception_handlers()
+
+ def _setup_routes(self):
+ """Setup all API routes."""
+
+ @self.app.get("/health")
+ async def health_check():
+ """Health check endpoint."""
+ return {"status": "ok", "message": "File System Server is running"}
+
+ @self.app.post("/read_file", response_model=FileSystemServerResponse)
+ async def read_file(request: ReadFileRequest):
+ """Read file contents."""
+ try:
+ response = self.filesystem_manager.read_file(
+ request.file_path, request.limit, request.offset
+ )
+ return FileSystemServerResponse(
+ success=response.success, file_content=response.file_content
+ )
+ except Exception as e:
+ logger.error(f"Error in read_file: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @self.app.post("/edit_file", response_model=FileSystemServerResponse)
+ async def edit_file(request: EditFileRequest):
+ """Edit file contents."""
+ try:
+ response = self.filesystem_manager.edit_file(
+ request.file_path,
+ request.old_string,
+ request.new_string,
+ request.replace_all,
+ )
+ return FileSystemServerResponse(
+ success=response.success, file_content=response.file_content
+ )
+ except Exception as e:
+ logger.error(f"Error in edit_file: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @self.app.post("/write_file", response_model=FileSystemServerResponse)
+ async def write_file(request: WriteFileRequest):
+ """Write file contents."""
+ try:
+ response = self.filesystem_manager.write_file(
+ request.file_path, request.content
+ )
+ return FileSystemServerResponse(
+ success=response.success, file_content=response.file_content
+ )
+ except Exception as e:
+ logger.error(f"Error in write_file: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @self.app.post("/multi_edit", response_model=FileSystemServerResponse)
+ async def multi_edit(request: MultiEditRequest):
+ """Perform multiple edits on a file."""
+ try:
+ response = self.filesystem_manager.multi_edit(
+ request.file_path, request.edits
+ )
+ return FileSystemServerResponse(
+ success=response.success, file_content=response.file_content
+ )
+ except Exception as e:
+ logger.error(f"Error in multi_edit: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @self.app.post("/ls", response_model=FileSystemServerResponse)
+ async def ls(request: LSRequest):
+ """List directory contents."""
+ try:
+ response = self.filesystem_manager.ls(request.path, request.ignore)
+ return FileSystemServerResponse(
+ success=response.success, file_content=response.file_content
+ )
+ except Exception as e:
+ logger.error(f"Error in ls: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @self.app.post("/glob", response_model=FileSystemServerResponse)
+ async def glob(request: GlobRequest):
+ """Search for files using glob patterns."""
+ try:
+ response = self.filesystem_manager.glob(request.pattern, request.path)
+ return FileSystemServerResponse(
+ success=response.success, file_content=response.file_content
+ )
+ except Exception as e:
+ logger.error(f"Error in glob: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+ @self.app.post("/grep", response_model=FileSystemServerResponse)
+ async def grep(request: GrepRequest):
+ """Search for content using regex patterns."""
+ try:
+ response = self.filesystem_manager.grep(
+ request.pattern, request.path, request.include
+ )
+ return FileSystemServerResponse(
+ success=response.success, file_content=response.file_content
+ )
+ except Exception as e:
+ logger.error(f"Error in grep: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+ def _setup_exception_handlers(self):
+ """Setup global exception handlers."""
+
+ @self.app.exception_handler(Exception)
+ async def global_exception_handler(request: Request, exc: Exception):
+ logger.error(f"Unhandled exception: {exc}")
+ return JSONResponse(
+ status_code=500,
+ content={
+ "success": False,
+ "file_content": f"Internal server error: {str(exc)}",
+ },
+ )
+
+ def run(self, host: str = "0.0.0.0", port: int = 8002, **kwargs):
+ """Run the FastAPI server."""
+ uvicorn.run(self.app, host=host, port=port, **kwargs)
+
+
+def create_app(
+ workspace_path: str = "/workspace",
+ allowed_origins: Optional[List[str]] = None,
+ cwd: Optional[str] = None,
+) -> FastAPI:
+ """Factory function to create the FastAPI app."""
+ if cwd:
+ workspace_path = cwd
+ server = FileSystemServer(
+ workspace_path=workspace_path,
+ allowed_origins=allowed_origins,
+ )
+ return server.app
+
+
+def main():
+ """Main entry point for running the server."""
+ import argparse
+
+ parser = argparse.ArgumentParser(description="File System Server")
+ parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
+ parser.add_argument("--port", type=int, default=8002, help="Port to bind to")
+ parser.add_argument("--workspace", default="/workspace", help="Workspace path")
+ parser.add_argument(
+ "--log-level",
+ default="INFO",
+ choices=["DEBUG", "INFO", "WARNING", "ERROR"],
+ help="Logging level",
+ )
+
+ args = parser.parse_args()
+
+ # Setup logging
+ logging.basicConfig(
+ level=getattr(logging, args.log_level),
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+ )
+
+ # Create and run server
+ server = FileSystemServer(
+ workspace_path=args.workspace,
+ )
+
+ logger.info(f"Starting File System Server on {args.host}:{args.port}")
+ logger.info(f"Workspace path: {args.workspace}")
+ server.run(host=args.host, port=args.port)
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/server/todo_server.py b/src/ii_agent/utils/tool_client/server/todo_server.py
new file mode 100644
index 00000000..a67404e0
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/server/todo_server.py
@@ -0,0 +1,222 @@
+"""FastAPI server for file system operations using FileSystemManager."""
+
+import logging
+from typing import Optional, List, Dict, Any, Annotated
+from threading import Lock
+
+from fastapi import FastAPI, HTTPException, Request
+from fastapi.responses import JSONResponse
+from fastapi.middleware.cors import CORSMiddleware
+from pydantic import BaseModel, Field
+import uvicorn
+
+
+
+
+class TodoManager:
+ """Manages the todo list state across productivity tools."""
+
+ def __init__(self):
+ self._todos: List[Dict[str, Any]] = []
+ self._lock = Lock()
+
+ def get_todos(self) -> List[Dict[str, Any]]:
+ """Get the current list of todos."""
+ with self._lock:
+ return self._todos.copy()
+
+ def set_todos(self, todos: List[Dict[str, Any]]) -> None:
+ """Set the entire todo list."""
+ # Validate todo structure
+ for todo in todos:
+ if not isinstance(todo, dict):
+ raise ValueError("Each todo must be a dictionary")
+
+ # Required fields
+ if 'content' not in todo:
+ raise ValueError("Each todo must have a 'content' field")
+ if 'status' not in todo:
+ raise ValueError("Each todo must have a 'status' field")
+ if 'priority' not in todo:
+ raise ValueError("Each todo must have a 'priority' field")
+ if 'id' not in todo:
+ raise ValueError("Each todo must have an 'id' field")
+
+ # Validate status
+ if todo['status'] not in ['pending', 'in_progress', 'completed']:
+ raise ValueError(f"Invalid status '{todo['status']}'. Must be 'pending', 'in_progress', or 'completed'")
+
+ # Validate priority
+ if todo['priority'] not in ['high', 'medium', 'low']:
+ raise ValueError(f"Invalid priority '{todo['priority']}'. Must be 'high', 'medium', or 'low'")
+
+ # Ensure content is not empty
+ if not todo['content'].strip():
+ raise ValueError("Todo content cannot be empty")
+
+ # Ensure only one task is in_progress
+ in_progress_count = sum(1 for todo in todos if todo['status'] == 'in_progress')
+ if in_progress_count > 1:
+ raise ValueError("Only one task can be in_progress at a time")
+
+ with self._lock:
+ self._todos = [todo.copy() for todo in todos]
+
+ def clear_todos(self) -> None:
+ """Clear all todos."""
+ with self._lock:
+ self._todos = []
+
+
+# Global instance to be shared across tools
+_global_manager: TodoManager | None = None
+
+
+def get_todo_manager() -> TodoManager:
+ """Get the global todo manager instance."""
+ global _global_manager
+ if _global_manager is None:
+ _global_manager = TodoManager()
+ return _global_manager
+
+
+class TodoReadTool:
+ def run_impl(self):
+ """Read and return the current todo list."""
+ manager = get_todo_manager()
+ todos = manager.get_todos()
+
+ if not todos:
+ return "No todos found"
+
+ return f"Remember to continue to use update and read from the todo list as you make progress. Here is the current list: {todos}"
+
+
+class TodoWriteTool:
+ def run_impl(
+ self,
+ todos: Annotated[List[Dict[str, Any]], Field(description="The updated todo list. Each todo should have `content`, `status` (one of 'pending', 'in_progress', 'completed'), `priority` (one of 'low', 'medium', 'high'), and `id` (starts from 1) keys.")],
+ ):
+ """Write/update the todo list."""
+ manager = get_todo_manager()
+
+ try:
+ # Set the new todo list (validation happens inside set_todos)
+ manager.set_todos(todos)
+
+ # Return the updated list
+ return "Todos have been modified successfully. Ensure that you continue to use the todo list to track your progress. Please proceed with the current tasks if applicable"
+ except ValueError as e:
+ return f"Error updating todo list: {e}"
+
+class TodoWriteSchema(BaseModel):
+ todos: List[Dict[str, Any]]
+
+class TodoServer:
+ """FastAPI server for todo operations."""
+
+ def __init__(
+ self,
+ allowed_origins: Optional[List[str]] = None,
+ ):
+ self.app = FastAPI(
+ title="Todo Server",
+ description="HTTP API for todo operations",
+ version="1.0.0",
+ )
+
+ # Add CORS middleware
+ if allowed_origins is None:
+ allowed_origins = ["*"]
+
+ self.app.add_middleware(
+ CORSMiddleware,
+ allow_origins=allowed_origins,
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+ )
+
+ # Setup routes
+ self._setup_routes()
+
+ self.todo_read_tool = TodoReadTool()
+ self.todo_write_tool = TodoWriteTool()
+
+ def _setup_routes(self):
+ """Setup all API routes."""
+
+ @self.app.get("/health")
+ async def health_check():
+ """Health check endpoint."""
+ return {"status": "ok", "message": "Todo Server is running"}
+
+ @self.app.get("/todo_read")
+ async def todo_read():
+ """Read the current todo list."""
+ return {"message": self.todo_read_tool.run_impl()}
+
+ # Input a list of dictionaries
+ @self.app.post("/todo_write")
+ async def todo_write(todo_write_schema: TodoWriteSchema):
+ """
+ Update the todo list with a new list of todos.
+ Expects a JSON body: a list of todo dicts, each with 'content', 'status', 'priority', and 'id'.
+ """
+ try:
+ todos = todo_write_schema.todos
+ if not isinstance(todos, list):
+ raise ValueError("Request body must be a list of todos")
+ message = self.todo_write_tool.run_impl(todos)
+ return {"message": message}
+ except Exception as e:
+ return JSONResponse(
+ status_code=400,
+ content={"error": f"Failed to update todos: {e}"},
+ )
+
+
+ def run(self, host: str = "0.0.0.0", port: int = 8002, **kwargs):
+ """Run the FastAPI server."""
+ uvicorn.run(self.app, host=host, port=port, **kwargs)
+
+
+def create_app(
+ allowed_origins: Optional[List[str]] = None,
+) -> FastAPI:
+ """Factory function to create the FastAPI app."""
+ server = TodoServer(
+ allowed_origins=allowed_origins,
+ )
+ return server.app
+
+
+def main():
+ """Main entry point for running the server."""
+ import argparse
+
+ parser = argparse.ArgumentParser(description="Todo Server")
+ parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
+ parser.add_argument("--port", type=int, default=8003, help="Port to bind to")
+ parser.add_argument(
+ "--log-level",
+ default="INFO",
+ choices=["DEBUG", "INFO", "WARNING", "ERROR"],
+ help="Logging level",
+ )
+
+ args = parser.parse_args()
+
+ # Setup logging
+ logging.basicConfig(
+ level=getattr(logging, args.log_level),
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+ )
+
+ # Create and run server
+ server = TodoServer()
+ server.run(host=args.host, port=args.port)
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/src/__init__.py b/src/ii_agent/utils/tool_client/src/__init__.py
new file mode 100644
index 00000000..ec053dcd
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/src/__init__.py
@@ -0,0 +1 @@
+# Empty __init__.py file for Python module recognition
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/src/core/__init__.py b/src/ii_agent/utils/tool_client/src/core/__init__.py
new file mode 100644
index 00000000..ec053dcd
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/src/core/__init__.py
@@ -0,0 +1 @@
+# Empty __init__.py file for Python module recognition
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/src/core/workspace.py b/src/ii_agent/utils/tool_client/src/core/workspace.py
new file mode 100644
index 00000000..bd08d0dc
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/src/core/workspace.py
@@ -0,0 +1,23 @@
+"""Simple workspace manager for Docker environment."""
+
+import os
+from pathlib import Path
+
+
+class WorkspaceManager:
+ """Simple workspace manager that validates file paths within a workspace boundary."""
+
+ def __init__(self, workspace_path="/workspace"):
+ self.workspace_path = os.path.abspath(workspace_path)
+
+ def get_workspace_path(self):
+ """Return the workspace path."""
+ return self.workspace_path
+
+ def validate_boundary(self, path):
+ """Check if the given path is within the workspace boundary."""
+ try:
+ abs_path = os.path.abspath(path)
+ return abs_path.startswith(self.workspace_path)
+ except Exception:
+ return False
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/src/tools/__init__.py b/src/ii_agent/utils/tool_client/src/tools/__init__.py
new file mode 100644
index 00000000..ec053dcd
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/src/tools/__init__.py
@@ -0,0 +1 @@
+# Empty __init__.py file for Python module recognition
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/src/tools/base.py b/src/ii_agent/utils/tool_client/src/tools/base.py
new file mode 100644
index 00000000..e5fe8fc6
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/src/tools/base.py
@@ -0,0 +1,15 @@
+"""Base tool class for external tools."""
+
+from abc import ABC, abstractmethod
+
+
+class BaseTool(ABC):
+ """Base class for tools."""
+
+ name = "base_tool"
+ description = "Base tool"
+
+ @abstractmethod
+ def run_impl(self, **kwargs):
+ """Implementation of the tool."""
+ pass
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/src/tools/file_system/__init__.py b/src/ii_agent/utils/tool_client/src/tools/file_system/__init__.py
new file mode 100644
index 00000000..b3e0709a
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/src/tools/file_system/__init__.py
@@ -0,0 +1 @@
+# File system tools module
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/src/tools/file_system/base.py b/src/ii_agent/utils/tool_client/src/tools/file_system/base.py
new file mode 100644
index 00000000..095ba2b7
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/src/tools/file_system/base.py
@@ -0,0 +1,60 @@
+"""Base classes for file system tools - Docker compatible version."""
+
+import os
+from abc import ABC, abstractmethod
+
+
+class FileSystemValidationError(Exception):
+ """Custom exception for file system validation errors."""
+ pass
+
+
+class BaseFileSystemTool:
+ """Base class for file system tools."""
+
+ def __init__(self, workspace_manager):
+ self.workspace_manager = workspace_manager
+
+ def validate_path(self, path: str) -> None:
+ """Validate that path is absolute and within workspace boundary.
+
+ Raises:
+ FileSystemValidationError
+ """
+ if not path.strip():
+ raise FileSystemValidationError("Path cannot be empty")
+
+ if not os.path.isabs(path):
+ raise FileSystemValidationError(f"Path `{path}` is not absolute")
+
+ workspace_path = self.workspace_manager.get_workspace_path()
+ if not self.workspace_manager.validate_boundary(path):
+ raise FileSystemValidationError(f"Path `{path}` is not within workspace boundary `{workspace_path}`")
+
+ def validate_existing_file_path(self, file_path: str) -> None:
+ """Validate that file_path exists and is a file.
+
+ Raises:
+ FileSystemValidationError
+ """
+ self.validate_path(file_path)
+
+ if not os.path.exists(file_path):
+ raise FileSystemValidationError(f"File `{file_path}` does not exist")
+
+ if not os.path.isfile(file_path):
+ raise FileSystemValidationError(f"Path `{file_path}` exists but is not a file")
+
+ def validate_existing_directory_path(self, directory_path: str) -> None:
+ """Validate that directory_path exists and is a directory.
+
+ Raises:
+ FileSystemValidationError
+ """
+ self.validate_path(directory_path)
+
+ if not os.path.exists(directory_path):
+ raise FileSystemValidationError(f"Directory `{directory_path}` does not exist")
+
+ if not os.path.isdir(directory_path):
+ raise FileSystemValidationError(f"Path `{directory_path}` exists but is not a directory")
\ No newline at end of file
diff --git a/src/ii_agent/utils/tool_client/src/tools/file_system/utils.py b/src/ii_agent/utils/tool_client/src/tools/file_system/utils.py
new file mode 100644
index 00000000..7a762f1b
--- /dev/null
+++ b/src/ii_agent/utils/tool_client/src/tools/file_system/utils.py
@@ -0,0 +1,48 @@
+"""Utility functions for file system tools - Docker compatible version."""
+
+import base64
+import os
+from pathlib import Path
+
+
+def encode_image(image_path):
+ """Encode image file to base64."""
+ try:
+ if image_path.startswith('http'):
+ # Handle URL-based images
+ import urllib.request
+ with urllib.request.urlopen(image_path) as response:
+ image_data = response.read()
+ else:
+ # Handle local file images
+ with open(image_path, 'rb') as image_file:
+ image_data = image_file.read()
+
+ # Encode to base64
+ base64_encoded = base64.b64encode(image_data).decode('utf-8')
+ return base64_encoded
+
+ except Exception as e:
+ raise Exception(f"Failed to encode image {image_path}: {str(e)}")
+
+
+def find_similar_file(file_path, workspace_path):
+ """Find files with similar names but different extensions."""
+ try:
+ file_path = Path(file_path)
+ workspace_path = Path(workspace_path)
+
+ # Get the stem (filename without extension)
+ stem = file_path.stem
+
+ # Search for files with the same stem but different extensions
+ similar_files = []
+ for ext in ['.txt', '.md', '.py', '.js', '.json', '.yaml', '.yml']:
+ potential_file = file_path.parent / (stem + ext)
+ if potential_file.exists() and potential_file != file_path:
+ similar_files.append(str(potential_file))
+
+ return similar_files
+
+ except Exception:
+ return []
\ No newline at end of file
diff --git a/start.sh b/start.sh
index c1833a86..2db7e857 100755
--- a/start.sh
+++ b/start.sh
@@ -21,7 +21,6 @@ GOOGLE_API_KEY=$GOOGLE_API_KEY
GOOGLE_CLIENT_ID=$GOOGLE_CLIENT_ID
GOOGLE_CLIENT_SECRET=$GOOGLE_CLIENT_SECRET
EOF
-
echo "Created frontend/.env file"
else
echo "[✓] Host IP set to: $HOST_IP"