diff --git a/.gitignore b/.gitignore index 93d1656..b4787a0 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ state.json.bak logs/ .codex .copilot +.env diff --git a/README.md b/README.md index e34e708..edc7669 100644 --- a/README.md +++ b/README.md @@ -258,6 +258,14 @@ GitHub documents these Copilot CLI approval controls here: If `true`, enable the `/commit` Telegram command. Default: `false` +- `AGENT_HARD_TIMEOUT_SECONDS` + Hard time limit in seconds for a single agent run. + When the agent subprocess is still running after this many seconds, the bot sends a timeout message and terminates the process. + Set to `0` (the default) to disable the limit entirely. + Disabling is recommended for Copilot, which can legitimately take over an hour on complex tasks. + Set a value (e.g. `600`) only when you want a hard cap — typically for shorter, bounded Codex jobs. + Default: `0` (disabled) + ### Telegram Behavior - `SNAPSHOT_TEXT_FILE_MAX_BYTES` @@ -357,7 +365,18 @@ Each session stores: - timestamps - active session selection for that bot/chat scope -## ⚠️ Diff (file changes) +### Workspace concurrency lock + +Only one agent run can be active per **project folder** at a time — regardless of which chat ID or Telegram bot triggers it. + +If a message arrives while an agent is already running on the same project, the bot immediately replies: + +> ⏳ An agent is already running on project '…'. Please wait for it to finish. + +The lock is held in memory (not on disk), so it is automatically released when the agent finishes, errors out, or if the server restarts. There are no stale lock files to clean up after a crash. + +## ⚠ Diff (file changes) + _During each agent run, the bot also takes a lightweight before/after project snapshot so it can summarize changed files and send diffs back to Telegram. This snapshot is taken by the bot app itself, not by Codex or Copilot._ **Snapshot notes:** @@ -418,11 +437,16 @@ The chosen branch is stored with the session, so switching sessions restores the ## 🪵 Logs -Logs are written under `LOG_DIR`. +Logs are written to **both stdout and a rotating log file** under `LOG_DIR`. Main log file: -- `coding-agent-telegram.log` +- `coding-agent-telegram.log` (rotated at 10 MB, 3 backups kept) + +> **Note:** Because messages go to both stdout and the log file, watching the terminal +> **and** tailing the log file at the same time (e.g. `tail -f logs/coding-agent-telegram.log`) +> will make each message appear twice — once from each sink. This is expected behavior. +> View one or the other, not both simultaneously. Typical logged events: @@ -431,7 +455,7 @@ Typical logged events: - session creation - session switching - active session reporting -- normal run execution +- normal run execution (includes an audit log line with the truncated prompt) - session replacement after resume failure - warnings and runtime errors diff --git a/src/coding_agent_telegram/agent_runner.py b/src/coding_agent_telegram/agent_runner.py index 39180cc..26536f2 100644 --- a/src/coding_agent_telegram/agent_runner.py +++ b/src/coding_agent_telegram/agent_runner.py @@ -3,6 +3,7 @@ import json import logging import os +import re import subprocess import tempfile import threading @@ -16,6 +17,27 @@ AssistantEvent = Union[dict[str, Any], list[Any], str] +# Only allow alphanumeric, hyphens, underscores, and dots — must not start with "-" +# to prevent a crafted LLM session_id from injecting CLI flags. +_SESSION_ID_RE = re.compile(r"^[A-Za-z0-9_.\-]{1,128}$") + + +def _validate_session_id(raw: str) -> Optional[str]: + """Return ``raw`` only if it looks like a legitimate session identifier. + + Rejects values that could be used to inject CLI flags (e.g. ``--exec``) + into subsequent subprocess calls that pass the session ID as an argument. + """ + if not raw or not isinstance(raw, str): + return None + if raw.startswith("-"): + logger.warning("Rejected session_id starting with '-' from agent output: %r", raw[:64]) + return None + if _SESSION_ID_RE.match(raw): + return raw + logger.warning("Rejected suspicious session_id from agent output: %r", raw[:64]) + return None + @dataclass class AgentRunResult: @@ -69,6 +91,7 @@ def __init__( copilot_allow_tools: tuple[str, ...] = (), copilot_deny_tools: tuple[str, ...] = (), copilot_available_tools: tuple[str, ...] = (), + hard_timeout_seconds: int = 0, ) -> None: self.codex_bin = codex_bin self.copilot_bin = copilot_bin @@ -83,6 +106,8 @@ def __init__( self.copilot_allow_tools = tuple(tool.strip() for tool in copilot_allow_tools if tool.strip()) self.copilot_deny_tools = tuple(tool.strip() for tool in copilot_deny_tools if tool.strip()) self.copilot_available_tools = tuple(tool.strip() for tool in copilot_available_tools if tool.strip()) + # 0 = disabled. When > 0, the agent subprocess is killed after this many seconds. + self.hard_timeout_seconds = max(0, int(hard_timeout_seconds)) def _looks_textual_key(self, key: str) -> bool: normalized = key.strip().lower() @@ -185,7 +210,9 @@ def _parse_jsonl( for ev in events: for key in ("session_id", "thread_id", "sessionId", "threadId"): if isinstance(ev.get(key), str): - session_id = ev[key] + validated = _validate_session_id(ev[key]) + if validated: + session_id = validated extracted_text = assistant_text_extractor(ev) if extracted_text: assistant_text = extracted_text @@ -323,6 +350,27 @@ def read_stream(stream, chunks: list[str], *, is_stderr: bool) -> None: stdout_thread.start() stderr_thread.start() + # Watchdog: kills the process if it exceeds the hard timeout. + # Uses threading.Event (not time.monotonic or proc.poll) so it does not + # interfere with existing stall-detection logic or test mocks. + # A timeout of 0 means disabled — the process can run indefinitely. + _proc_exited = threading.Event() + _watchdog_timeout = self.hard_timeout_seconds if self.hard_timeout_seconds > 0 else None + + def _watchdog() -> None: + if not _proc_exited.wait(timeout=_watchdog_timeout): + logger.warning( + "Agent command exceeded hard timeout of %ds; terminating process.", + _watchdog_timeout, + ) + try: + proc.kill() + except OSError: + pass + + watchdog_thread = threading.Thread(target=_watchdog, daemon=True, name="agent-watchdog") + watchdog_thread.start() + stall_reported = False while proc.poll() is None: if on_stall and not stall_reported: @@ -353,6 +401,8 @@ def read_stream(stream, chunks: list[str], *, is_stderr: bool) -> None: stdout_thread.join() stderr_thread.join() + _proc_exited.set() + watchdog_thread.join() stdout = "".join(stdout_chunks) stderr = "".join(stderr_chunks) if provider == "codex": diff --git a/src/coding_agent_telegram/bot.py b/src/coding_agent_telegram/bot.py index 6ee336f..7236203 100644 --- a/src/coding_agent_telegram/bot.py +++ b/src/coding_agent_telegram/bot.py @@ -7,6 +7,7 @@ from telegram.ext import Application, CallbackQueryHandler, CommandHandler, MessageHandler, filters as tg_filters from coding_agent_telegram.command_router import CommandRouter +from coding_agent_telegram.session_store import SessionStoreError logger = logging.getLogger(__name__) @@ -45,6 +46,14 @@ async def initialize_bot_commands(app: Application, *, enable_commit_command: bo async def handle_error(update, context) -> None: + if isinstance(context.error, SessionStoreError): + logger.warning("Session store lock conflict: %s", context.error) + if update is not None and getattr(update, "effective_chat", None) is not None: + await context.bot.send_message( + chat_id=update.effective_chat.id, + text=f"⚠️ {context.error}", + ) + return logger.exception("Telegram handler failed.", exc_info=context.error) if update is not None and getattr(update, "effective_chat", None) is not None: await context.bot.send_message( diff --git a/src/coding_agent_telegram/cli.py b/src/coding_agent_telegram/cli.py index 3e8ca8a..a9c1597 100644 --- a/src/coding_agent_telegram/cli.py +++ b/src/coding_agent_telegram/cli.py @@ -125,6 +125,7 @@ def main() -> None: copilot_allow_tools=cfg.copilot_allow_tools, copilot_deny_tools=cfg.copilot_deny_tools, copilot_available_tools=cfg.copilot_available_tools, + hard_timeout_seconds=cfg.agent_hard_timeout_seconds, ) try: asyncio.run(_run(cfg, store, runner)) diff --git a/src/coding_agent_telegram/config.py b/src/coding_agent_telegram/config.py index 28fb1c4..1efbf17 100644 --- a/src/coding_agent_telegram/config.py +++ b/src/coding_agent_telegram/config.py @@ -14,6 +14,8 @@ DEFAULT_MAX_PHOTO_ATTACHMENT_BYTES = 5 * 1024 * 1024 DEFAULT_ENV_FILE_NAME = ".env_coding_agent_telegram" LEGACY_ENV_FILE_NAME = ".env" +# 0 = disabled. Set to a positive value to kill runaway agent processes. +DEFAULT_AGENT_HARD_TIMEOUT_SECONDS = 0 @dataclass(frozen=True) @@ -44,6 +46,7 @@ class AppConfig: max_telegram_message_length: int enable_sensitive_diff_filter: bool default_agent_provider: str + agent_hard_timeout_seconds: int def _parse_bool(value: str, default: bool = False) -> bool: @@ -141,4 +144,7 @@ def load_config(env_file: Optional[Path] = None) -> AppConfig: ), enable_sensitive_diff_filter=_parse_bool(os.getenv("ENABLE_SENSITIVE_DIFF_FILTER", "true"), default=True), default_agent_provider=provider, + agent_hard_timeout_seconds=int( + os.getenv("AGENT_HARD_TIMEOUT_SECONDS", str(DEFAULT_AGENT_HARD_TIMEOUT_SECONDS)) + ), ) diff --git a/src/coding_agent_telegram/diff_utils.py b/src/coding_agent_telegram/diff_utils.py index 2fc4281..558aeb7 100644 --- a/src/coding_agent_telegram/diff_utils.py +++ b/src/coding_agent_telegram/diff_utils.py @@ -3,6 +3,7 @@ import difflib import fnmatch import importlib.resources +import logging import os import subprocess from functools import lru_cache @@ -15,6 +16,8 @@ INTERNAL_APP_DIR = ".coding-agent-telegram" TEXTUAL_DIFF_UNAVAILABLE = "Binary or large file changed; textual diff unavailable." +logger = logging.getLogger(__name__) + @dataclass class FileDiff: @@ -116,7 +119,8 @@ def is_snapshot_excluded_path(path: str) -> bool: def _read_snapshot_text(file_path: Path, *, max_text_file_bytes: int) -> Optional[str]: try: data = file_path.read_bytes() - except OSError: + except OSError as exc: + logger.debug("Could not read snapshot file %s: %s", file_path, exc) return None if len(data) > max_text_file_bytes: return None diff --git a/src/coding_agent_telegram/git_utils.py b/src/coding_agent_telegram/git_utils.py index ea6a6f5..e923045 100644 --- a/src/coding_agent_telegram/git_utils.py +++ b/src/coding_agent_telegram/git_utils.py @@ -10,15 +10,7 @@ def _sanitize_git_output(text: str) -> str: - """ - Sanitize git command output to remove credential information. - - Removes: - - HTTPS URLs with embedded credentials: https://user:password@ - - SSH connection strings that might contain sensitive info - - Security-critical function: Prevents credential leaks in logs. - """ + """Sanitize git command output to remove credential information.""" if not text: return text @@ -37,6 +29,20 @@ def _sanitize_git_output(text: str) -> str: return sanitized +# Allows letters, digits, dots, hyphens, forward-slashes (remote/branch). +# Must not start with '-' to prevent flag injection into git subcommands. +_BRANCH_NAME_RE = re.compile(r"^[A-Za-z0-9._/\-]{1,200}$") + + +def _validate_branch_name(name: str) -> bool: + """Return True only if *name* is safe to pass as a git branch name argument.""" + if not name: + return False + if name.startswith("-"): + return False + return bool(_BRANCH_NAME_RE.match(name)) + + @dataclass class GitCommandResult: success: bool @@ -157,6 +163,8 @@ def refresh_current_branch(self, project_path: Path) -> BranchOperationResult: ) def checkout_branch(self, project_path: Path, branch_name: str) -> BranchOperationResult: + if not _validate_branch_name(branch_name): + return BranchOperationResult(False, f"Invalid branch name: {branch_name!r}") result = self._run(project_path, "checkout", branch_name) if result.returncode != 0: return BranchOperationResult(False, _sanitize_git_output(result.stderr.strip()) or f"Failed to checkout branch: {branch_name}") @@ -171,6 +179,10 @@ def prepare_branch( ) -> BranchOperationResult: if not self.is_git_repo(project_path): return BranchOperationResult(False, "Current project is not a git repository.") + if not _validate_branch_name(new_branch): + return BranchOperationResult(False, f"Invalid branch name: {new_branch!r}") + if origin_branch and not _validate_branch_name(origin_branch): + return BranchOperationResult(False, f"Invalid origin branch name: {origin_branch!r}") current_branch = self.current_branch(project_path) default_branch = self.default_branch(project_path) diff --git a/src/coding_agent_telegram/logging_utils.py b/src/coding_agent_telegram/logging_utils.py index f5137a9..2e946a7 100644 --- a/src/coding_agent_telegram/logging_utils.py +++ b/src/coding_agent_telegram/logging_utils.py @@ -1,17 +1,32 @@ import logging import sys +from logging.handlers import RotatingFileHandler from pathlib import Path def setup_logging(level: str, log_dir: Path) -> Path: log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / "coding-agent-telegram.log" - logging.basicConfig( - level=getattr(logging, level.upper(), logging.INFO), - format="%(asctime)s %(levelname)s %(name)s: %(message)s", - handlers=[logging.StreamHandler(sys.stdout)], - force=True, + log_level = getattr(logging, level.upper(), logging.INFO) + fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s") + + root = logging.getLogger() + root.setLevel(log_level) + root.handlers.clear() + + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setFormatter(fmt) + root.addHandler(stdout_handler) + + file_handler = RotatingFileHandler( + log_file, + maxBytes=10 * 1024 * 1024, + backupCount=3, + encoding="utf-8", ) + file_handler.setFormatter(fmt) + root.addHandler(file_handler) + for logger_name in ("httpx", "httpcore", "telegram", "telegram.ext.ExtBot"): logging.getLogger(logger_name).setLevel(logging.WARNING) return log_file diff --git a/src/coding_agent_telegram/resources/.env.example b/src/coding_agent_telegram/resources/.env.example index 8f5f604..076e421 100644 --- a/src/coding_agent_telegram/resources/.env.example +++ b/src/coding_agent_telegram/resources/.env.example @@ -91,3 +91,12 @@ ENABLE_SENSITIVE_DIFF_FILTER=true # Default agent provider for new sessions: codex or copilot. DEFAULT_AGENT_PROVIDER=codex + +# Hard time limit (seconds) for a single agent run. +# When the subprocess is still running after this many seconds the bot sends +# a timeout message and terminates the agent. +# Set to 0 (the default) to disable the limit entirely — recommended when +# using Copilot, which can legitimately take over an hour for heavy tasks. +# Useful as a safeguard when using Codex for shorter, bounded jobs. +# Example: AGENT_HARD_TIMEOUT_SECONDS=600 (10 minutes) +AGENT_HARD_TIMEOUT_SECONDS=0 diff --git a/src/coding_agent_telegram/router/base.py b/src/coding_agent_telegram/router/base.py index adecb95..74e85b1 100644 --- a/src/coding_agent_telegram/router/base.py +++ b/src/coding_agent_telegram/router/base.py @@ -114,6 +114,11 @@ def __init__(self, deps: RouterDeps) -> None: git=self.git, run_with_typing=self._run_with_typing, ) + # Per-workspace asyncio locks keyed by project_folder name. + # Prevents concurrent agent runs on the same workspace regardless of + # which chat ID or session triggers the run. + # These are in-memory only — no stale locks survive a server restart. + self._workspace_locks: dict[str, asyncio.Lock] = {} def _sorted_sessions(self, sessions: dict[str, dict[str, str]]) -> list[tuple[str, dict[str, str]]]: return sorted( @@ -163,6 +168,29 @@ async def _run_with_typing(self, update: Update, context: ContextTypes.DEFAULT_T if chat is None: return await asyncio.to_thread(fn, *args, **kwargs) + # workspace_lock_key is supplied by callers that invoke the agent on a + # specific project folder. When set, only one agent run per workspace is + # allowed at a time — regardless of which chat ID or session triggers it. + workspace_lock_key: str | None = kwargs.pop("workspace_lock_key", None) + is_agent_call = isinstance(getattr(fn, "__self__", None), MultiAgentRunner) + + if is_agent_call and workspace_lock_key: + lock = self._workspace_locks.setdefault(workspace_lock_key, asyncio.Lock()) + if lock.locked(): + await send_text( + update, + context, + f"⏳ An agent is already running on project '{workspace_lock_key}'. " + "Please wait for it to finish.", + ) + return None + async with lock: + return await self._execute_with_typing(update, context, chat, fn, *args, **kwargs) + + return await self._execute_with_typing(update, context, chat, fn, *args, **kwargs) + + async def _execute_with_typing(self, update: Update, context: ContextTypes.DEFAULT_TYPE, chat, fn, *args, **kwargs): + stall_message = kwargs.pop("stall_message", None) progress_label = kwargs.pop("progress_label", None) progress_state = {"message_id": None, "last_text": "", "closed": False, "futures": set()} @@ -521,6 +549,16 @@ def _path_within_project(project_path: Path, token: str) -> bool: return False if any(part == ".." for part in normalized.parts): return False + + # Resolve symlinks to prevent escaping the project root via in-project symlinks. + try: + resolved = (project_path / candidate).resolve() + project_resolved = project_path.resolve() + if not str(resolved).startswith(str(project_resolved) + os.sep) and resolved != project_resolved: + return False + except (OSError, ValueError): + return False + return True @classmethod diff --git a/src/coding_agent_telegram/router/message_commands.py b/src/coding_agent_telegram/router/message_commands.py index a37feee..09a7d0b 100644 --- a/src/coding_agent_telegram/router/message_commands.py +++ b/src/coding_agent_telegram/router/message_commands.py @@ -13,12 +13,13 @@ class MessageCommandMixin: async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: if update.message is None or not update.message.text: return + user_message = update.message.text chat_id = update.effective_chat.id self._store_pending_action( chat_id, { "kind": "message", - "user_message": update.message.text, + "user_message": user_message, }, ) if await self._continue_pending_action(update, context): @@ -37,12 +38,13 @@ async def handle_photo(self, update: Update, context: ContextTypes.DEFAULT_TYPE) await send_text(update, context, "Photo attachments are currently supported only for codex sessions.") return + caption = update.message.caption or "" try: attachment_path = await self.photo_attachments.store_photo(update, session["project_folder"]) except ValueError as exc: await send_text(update, context, str(exc)) return - prompt = self.photo_attachments.build_prompt(attachment_path, project_path, update.message.caption or "") + prompt = self.photo_attachments.build_prompt(attachment_path, project_path, caption) await self.runtime.run_active_session(update, context, user_message=prompt, image_paths=(attachment_path,)) @require_allowed_chat() diff --git a/src/coding_agent_telegram/router/session_commands.py b/src/coding_agent_telegram/router/session_commands.py index 415e0bf..341cb07 100644 --- a/src/coding_agent_telegram/router/session_commands.py +++ b/src/coding_agent_telegram/router/session_commands.py @@ -240,6 +240,7 @@ async def _create_session_for_context( provider, project_path, f"Create session: {creation_label}", + workspace_lock_key=project_folder, skip_git_repo_check=self.runtime.should_skip_git_repo_check(project_folder), stall_message=( "Session creation appears stuck.\n" diff --git a/src/coding_agent_telegram/session_runtime.py b/src/coding_agent_telegram/session_runtime.py index 52fe21c..951460b 100644 --- a/src/coding_agent_telegram/session_runtime.py +++ b/src/coding_agent_telegram/session_runtime.py @@ -4,6 +4,7 @@ import hashlib import html import logging +import re from pathlib import Path import os from typing import Awaitable, Callable, Optional, Sequence @@ -56,6 +57,33 @@ "On macOS this often means a hidden permission dialog is waiting for input on the machine running the bot." ) +# Patterns for secrets that the agent might echo back from files it has read. +# Matches are replaced with a placeholder before sending to Telegram. +_SECRET_SCRUB_PATTERNS: tuple[tuple[re.Pattern[str], str], ...] = ( + (re.compile(r"\d{9,10}:[A-Za-z0-9_-]{35}"), ""), + (re.compile(r"gh[pousr]_[A-Za-z0-9]{36,}"), ""), + (re.compile(r"sk-[A-Za-z0-9]{20,}T3BlbkFJ[A-Za-z0-9]{20,}"), ""), + (re.compile(r"glpat-[A-Za-z0-9\-_]{20,}"), ""), + (re.compile(r"xox[baprs]-[A-Za-z0-9\-]{10,}"), ""), + (re.compile(r"AIza[A-Za-z0-9\-_]{35}"), ""), + (re.compile(r"AKIA[A-Z0-9]{16}"), ""), +) + +# Matches absolute filesystem paths (Unix and Windows styles) in error messages. +_ABSOLUTE_PATH_RE = re.compile(r"(?:^|(?<=\s)|(?<=[\"'(]))((?:/[^\s\"',;)]+)+|[A-Za-z]:\\[^\s\"',;)]+)") + + +def _scrub_secrets(text: str) -> str: + """Replace known secret patterns in *text* with redaction placeholders.""" + for pattern, replacement in _SECRET_SCRUB_PATTERNS: + text = pattern.sub(replacement, text) + return text + + +def _sanitize_agent_error(text: str) -> str: + """Remove absolute filesystem paths from agent error messages before sending to users.""" + return _ABSOLUTE_PATH_RE.sub("", text) + class PhotoAttachmentStore: MAX_PHOTO_BYTES = DEFAULT_MAX_PHOTO_ATTACHMENT_BYTES # Telegram photos are capped to keep local storage bounded. @@ -188,12 +216,14 @@ async def run_active_session( provider = session.get("provider", "codex") branch_name = session.get("branch_name", "") logger.info( - "Running message for chat %s on session '%s' (%s) in project '%s' with provider '%s'.", + "Running message for chat %s on session '%s' (%s) in project '%s' with provider '%s'. " + "Prompt (first 200 chars): %.200r", chat_id, session["name"], active_id, project_folder, provider, + user_message, ) if branch_name and self.git.is_git_repo(project_path): @@ -215,6 +245,7 @@ async def run_active_session( active_id, project_path, user_message, + workspace_lock_key=project_folder, skip_git_repo_check=self.should_skip_git_repo_check(project_folder), image_paths=image_paths, stall_message=ACTIVE_RUN_STALL_MESSAGE, @@ -246,7 +277,8 @@ async def run_active_session( active_id, result.error_message or "unknown error", ) - await send_text(update, context, result.error_message or "Agent run failed.") + error_text = _sanitize_agent_error(result.error_message) if result.error_message else "Agent run failed." + await send_text(update, context, error_text) return if result.session_id and result.session_id != active_id: @@ -339,6 +371,7 @@ async def _replace_invalid_session_if_needed( provider, project_path, user_message, + workspace_lock_key=project_folder, skip_git_repo_check=self.should_skip_git_repo_check(project_folder), image_paths=image_paths, stall_message=REPLACEMENT_SESSION_STALL_MESSAGE, @@ -441,6 +474,7 @@ async def _send_assistant_chunks( *, provider: str, ) -> None: + assistant_text = _scrub_secrets(assistant_text) segments = split_assistant_output(assistant_text) if not segments: return diff --git a/src/coding_agent_telegram/session_store.py b/src/coding_agent_telegram/session_store.py index 259f3c9..fd49e07 100644 --- a/src/coding_agent_telegram/session_store.py +++ b/src/coding_agent_telegram/session_store.py @@ -11,6 +11,10 @@ T = TypeVar("T") +class SessionStoreError(Exception): + """Raised when the session store cannot be accessed due to a file-lock conflict.""" + + class SessionStore: """Persist per-chat session state with file locking and crash-safe writes.""" @@ -81,8 +85,11 @@ def _load_unlocked(self) -> dict[str, Any]: def load(self) -> dict[str, Any]: self._ensure_paths() - with portalocker.Lock(str(self.lock_file), timeout=5): - return self._load_unlocked() + try: + with portalocker.Lock(str(self.lock_file), timeout=5): + return self._load_unlocked() + except portalocker.LockException as exc: + raise SessionStoreError("Session data is temporarily locked. Please try again in a moment.") from exc def _save_unlocked(self, state: dict[str, Any]) -> None: serialized = json.dumps(state, indent=2, ensure_ascii=False) @@ -93,16 +100,22 @@ def _save_unlocked(self, state: dict[str, Any]) -> None: def save(self, state: dict[str, Any]) -> None: self._ensure_paths() - with portalocker.Lock(str(self.lock_file), timeout=5): - self._save_unlocked(state) + try: + with portalocker.Lock(str(self.lock_file), timeout=5): + self._save_unlocked(state) + except portalocker.LockException as exc: + raise SessionStoreError("Session data is temporarily locked. Please try again in a moment.") from exc def _mutate_state(self, fn: Callable[[dict[str, Any]], T]) -> T: self._ensure_paths() - with portalocker.Lock(str(self.lock_file), timeout=5): - state = self._load_unlocked() - result = fn(state) - self._save_unlocked(state) - return result + try: + with portalocker.Lock(str(self.lock_file), timeout=5): + state = self._load_unlocked() + result = fn(state) + self._save_unlocked(state) + return result + except portalocker.LockException as exc: + raise SessionStoreError("Session data is temporarily locked. Please try again in a moment.") from exc def _mutate_chat_data( self, @@ -274,12 +287,15 @@ def mutate(chat_data: dict[str, Any]) -> bool: def list_sessions(self, bot_id: str, chat_id: int) -> dict[str, dict[str, str]]: self._ensure_paths() - with portalocker.Lock(str(self.lock_file), timeout=5): - state = self._load_unlocked() - chat_data, migrated = self._get_chat_data(state, bot_id, chat_id) - if migrated: - self._save_unlocked(state) - return {} if chat_data is None else dict(chat_data.get("sessions", {})) + try: + with portalocker.Lock(str(self.lock_file), timeout=5): + state = self._load_unlocked() + chat_data, migrated = self._get_chat_data(state, bot_id, chat_id) + if migrated: + self._save_unlocked(state) + return {} if chat_data is None else dict(chat_data.get("sessions", {})) + except portalocker.LockException as exc: + raise SessionStoreError("Session data is temporarily locked. Please try again in a moment.") from exc def set_active_session_branch(self, bot_id: str, chat_id: int, branch_name: str) -> None: def mutate(chat_data: dict[str, Any]) -> None: @@ -297,12 +313,15 @@ def mutate(chat_data: dict[str, Any]) -> None: def get_chat_state(self, bot_id: str, chat_id: int) -> dict[str, Any]: self._ensure_paths() - with portalocker.Lock(str(self.lock_file), timeout=5): - state = self._load_unlocked() - chat_data, migrated = self._get_chat_data(state, bot_id, chat_id) - if migrated: - self._save_unlocked(state) - return {} if chat_data is None else dict(chat_data) + try: + with portalocker.Lock(str(self.lock_file), timeout=5): + state = self._load_unlocked() + chat_data, migrated = self._get_chat_data(state, bot_id, chat_id) + if migrated: + self._save_unlocked(state) + return {} if chat_data is None else dict(chat_data) + except portalocker.LockException as exc: + raise SessionStoreError("Session data is temporarily locked. Please try again in a moment.") from exc def get_session(self, bot_id: str, chat_id: int, session_id: str) -> Optional[dict[str, str]]: return self.list_sessions(bot_id, chat_id).get(session_id) diff --git a/tests/test_agent_runner.py b/tests/test_agent_runner.py index 1190664..28766ba 100644 --- a/tests/test_agent_runner.py +++ b/tests/test_agent_runner.py @@ -390,3 +390,186 @@ def test_copilot_runner_passes_tool_permission_flags(monkeypatch): assert "--allow-tool" not in calls[0][0] assert "--deny-tool" not in calls[0][0] assert "--available-tools" not in calls[0][0] + + +# --------------------------------------------------------------------------- +# _validate_session_id +# --------------------------------------------------------------------------- + + +def test_validate_session_id_accepts_valid_ids(): + from coding_agent_telegram.agent_runner import _validate_session_id + + assert _validate_session_id("abc123") == "abc123" + assert _validate_session_id("sess-1.0_abc") == "sess-1.0_abc" + assert _validate_session_id("A" * 128) == "A" * 128 + + +def test_validate_session_id_rejects_flag_like(): + from coding_agent_telegram.agent_runner import _validate_session_id + + assert _validate_session_id("--exec") is None + assert _validate_session_id("-c") is None + + +def test_validate_session_id_rejects_special_chars(): + from coding_agent_telegram.agent_runner import _validate_session_id + + assert _validate_session_id("sess;rm -rf") is None + assert _validate_session_id("sess\x00null") is None + assert _validate_session_id("a" * 129) is None # too long + + +def test_validate_session_id_rejects_empty_and_none(): + from coding_agent_telegram.agent_runner import _validate_session_id + + assert _validate_session_id("") is None + assert _validate_session_id(None) is None # type: ignore[arg-type] + + +def test_parse_jsonl_rejects_flag_session_id(monkeypatch): + """A session_id starting with '--' emitted by the LLM must not be stored.""" + calls: list = [] + monkeypatch.setattr( + "coding_agent_telegram.agent_runner.subprocess.Popen", + make_fake_popen(calls, process_stdout='{"session_id": "--malicious"}\n'), + ) + runner = MultiAgentRunner( + codex_bin="codex", + copilot_bin="copilot", + approval_policy="never", + sandbox_mode="workspace-write", + ) + result = runner.create_session("codex", Path("/tmp/project"), "hello", skip_git_repo_check=True) + assert result.session_id is None + + +def test_parse_jsonl_accepts_valid_session_id(monkeypatch): + """A well-formed session_id from the LLM must be stored and returned.""" + calls: list = [] + monkeypatch.setattr( + "coding_agent_telegram.agent_runner.subprocess.Popen", + make_fake_popen(calls, process_stdout='{"session_id": "valid-sess-123"}\n'), + ) + runner = MultiAgentRunner( + codex_bin="codex", + copilot_bin="copilot", + approval_policy="never", + sandbox_mode="workspace-write", + ) + result = runner.create_session("codex", Path("/tmp/project"), "hello", skip_git_repo_check=True) + assert result.session_id == "valid-sess-123" + + +# --------------------------------------------------------------------------- +# Unsupported provider +# --------------------------------------------------------------------------- + + +def test_create_session_returns_failure_for_unsupported_provider(monkeypatch): + calls: list = [] + monkeypatch.setattr("coding_agent_telegram.agent_runner.subprocess.Popen", make_fake_popen(calls)) + + runner = MultiAgentRunner( + codex_bin="codex", + copilot_bin="copilot", + approval_policy="never", + sandbox_mode="workspace-write", + ) + result = runner.create_session("badprovider", Path("/tmp/project"), "hello") + + assert result.success is False + assert "Unsupported provider" in (result.error_message or "") + assert calls == [] # no subprocess launched + + +def test_resume_session_returns_failure_for_unsupported_provider(monkeypatch): + calls: list = [] + monkeypatch.setattr("coding_agent_telegram.agent_runner.subprocess.Popen", make_fake_popen(calls)) + + runner = MultiAgentRunner( + codex_bin="codex", + copilot_bin="copilot", + approval_policy="never", + sandbox_mode="workspace-write", + ) + result = runner.resume_session("badprovider", "sess_1", Path("/tmp/project"), "hello") + + assert result.success is False + assert "Unsupported provider" in (result.error_message or "") + assert calls == [] + + +def test_copilot_resume_rejects_image_attachments(monkeypatch): + calls: list = [] + monkeypatch.setattr("coding_agent_telegram.agent_runner.subprocess.Popen", make_fake_popen(calls)) + + runner = MultiAgentRunner( + codex_bin="codex", + copilot_bin="copilot", + approval_policy="never", + sandbox_mode="workspace-write", + ) + result = runner.resume_session( + "copilot", + "sess_1", + Path("/tmp/project"), + "hello", + image_paths=[Path("/tmp/image.png")], + ) + + assert result.success is False + assert "not supported" in (result.error_message or "").lower() + assert calls == [] # no subprocess launched + + +# --------------------------------------------------------------------------- +# _collect_text_fragments / _looks_textual_key heuristics +# --------------------------------------------------------------------------- + + +def test_looks_textual_key_matches_known_tokens(): + runner = MultiAgentRunner("codex", "copilot", "never", "workspace-write") + assert runner._looks_textual_key("message") is True + assert runner._looks_textual_key("content") is True + assert runner._looks_textual_key("delta") is True + assert runner._looks_textual_key("id") is False + assert runner._looks_textual_key("") is False + + +def test_looks_metadata_key_matches_id_timestamp(): + runner = MultiAgentRunner("codex", "copilot", "never", "workspace-write") + assert runner._looks_metadata_key("session_id") is True + assert runner._looks_metadata_key("timestamp") is True + assert runner._looks_metadata_key("text") is False + assert runner._looks_metadata_key("") is False + + +def test_collect_text_fragments_extracts_from_nested_dict(): + runner = MultiAgentRunner("codex", "copilot", "never", "workspace-write") + event = {"message": {"content": "hello from agent"}, "session_id": "ignored"} + fragments = runner._collect_text_fragments(event) + assert "hello from agent" in fragments + + +def test_extract_codex_assistant_text_handles_list_of_strings(): + runner = MultiAgentRunner("codex", "copilot", "never", "workspace-write") + result = runner._extract_codex_assistant_text(["line 1", "line 2"]) + assert "line 1" in result + assert "line 2" in result + + +# --------------------------------------------------------------------------- +# _parse_jsonl: error and session success fields +# --------------------------------------------------------------------------- + + +def test_parse_jsonl_extracts_success_false_from_error_field(monkeypatch): + calls: list = [] + monkeypatch.setattr( + "coding_agent_telegram.agent_runner.subprocess.Popen", + make_fake_popen(calls, process_stdout='{"error": "agent crashed"}\n', returncode=1), + ) + runner = MultiAgentRunner("codex", "copilot", "never", "workspace-write") + result = runner.create_session("codex", Path("/tmp/project"), "hello", skip_git_repo_check=True) + assert result.success is False diff --git a/tests/test_bot.py b/tests/test_bot.py index 7ec60a0..e76e717 100644 --- a/tests/test_bot.py +++ b/tests/test_bot.py @@ -17,3 +17,172 @@ def test_default_bot_commands_show_commit_and_push_when_enabled(): assert "provider" in names assert "commit" in names assert "push" in names + + +# --------------------------------------------------------------------------- +# allowed_private_chat_filter +# --------------------------------------------------------------------------- + + +def test_allowed_private_chat_filter_uses_chat_id_and_private_type(): + from coding_agent_telegram.bot import allowed_private_chat_filter + + flt = allowed_private_chat_filter({123, 456}) + # Should create a filter — just check it is not None and is callable/usable + assert flt is not None + + +# --------------------------------------------------------------------------- +# handle_error +# --------------------------------------------------------------------------- + + +import asyncio +from types import SimpleNamespace +from coding_agent_telegram.session_store import SessionStoreError + + +def test_handle_error_sends_friendly_message_for_session_store_error(): + """handle_error must send the SessionStoreError message to the chat.""" + import asyncio + from coding_agent_telegram.bot import handle_error + + messages = [] + + class FakeBot: + async def send_message(self, chat_id, text): + messages.append((chat_id, text)) + + update = SimpleNamespace(effective_chat=SimpleNamespace(id=42)) + context = SimpleNamespace( + error=SessionStoreError("State file temporarily locked."), + bot=FakeBot(), + ) + + asyncio.run(handle_error(update, context)) + + assert len(messages) == 1 + assert "temporarily locked" in messages[0][1] + + +def test_handle_error_skips_send_when_update_has_no_chat(): + """handle_error must not crash when update is None.""" + import asyncio + from coding_agent_telegram.bot import handle_error + + class FakeBot: + async def send_message(self, chat_id, text): + raise AssertionError("should not send when no chat") + + context = SimpleNamespace( + error=SessionStoreError("locked"), + bot=FakeBot(), + ) + + # Should not raise + asyncio.run(handle_error(None, context)) + + +def test_handle_error_sends_generic_message_for_unexpected_exceptions(): + """handle_error must send a generic failure message for non-SessionStoreError exceptions.""" + import asyncio + from coding_agent_telegram.bot import handle_error + + messages = [] + + class FakeBot: + async def send_message(self, chat_id, text): + messages.append((chat_id, text)) + + update = SimpleNamespace(effective_chat=SimpleNamespace(id=7)) + context = SimpleNamespace( + error=RuntimeError("something went wrong"), + bot=FakeBot(), + ) + + asyncio.run(handle_error(update, context)) + + assert len(messages) == 1 + assert "failed" in messages[0][1].lower() or "error" in messages[0][1].lower() or "check" in messages[0][1].lower() + + +# --------------------------------------------------------------------------- +# initialize_bot_commands +# --------------------------------------------------------------------------- + + +import asyncio + + +def test_initialize_bot_commands_calls_delete_and_set(): + """initialize_bot_commands must call delete_my_commands once and + set_my_commands once per allowed chat.""" + from coding_agent_telegram.bot import initialize_bot_commands + + deleted = [] + set_calls = [] + + class FakeBot: + async def delete_my_commands(self, scope=None): + deleted.append(scope) + + async def set_my_commands(self, commands, scope=None): + set_calls.append((commands, scope)) + + class FakeApp: + bot = FakeBot() + + asyncio.run( + initialize_bot_commands(FakeApp(), enable_commit_command=False, allowed_chat_ids={10, 20}) + ) + + assert len(deleted) == 1 + assert len(set_calls) == 2 + chat_ids_called = {s[1].chat_id for s in set_calls} + assert chat_ids_called == {10, 20} + + +def test_initialize_bot_commands_includes_commit_when_enabled(): + from coding_agent_telegram.bot import initialize_bot_commands + + set_calls = [] + + class FakeBot: + async def delete_my_commands(self, scope=None): + pass + + async def set_my_commands(self, commands, scope=None): + set_calls.append(commands) + + class FakeApp: + bot = FakeBot() + + asyncio.run( + initialize_bot_commands(FakeApp(), enable_commit_command=True, allowed_chat_ids={1}) + ) + + command_names = [c.command for c in set_calls[0]] + assert "commit" in command_names + + +def test_initialize_bot_commands_empty_allowed_chat_ids(): + """No set_my_commands calls when allowed_chat_ids is empty.""" + from coding_agent_telegram.bot import initialize_bot_commands + + set_calls = [] + + class FakeBot: + async def delete_my_commands(self, scope=None): + pass + + async def set_my_commands(self, commands, scope=None): + set_calls.append(commands) + + class FakeApp: + bot = FakeBot() + + asyncio.run( + initialize_bot_commands(FakeApp(), enable_commit_command=False, allowed_chat_ids=set()) + ) + + assert set_calls == [] diff --git a/tests/test_command_router.py b/tests/test_command_router.py index 7479c27..20721c8 100644 --- a/tests/test_command_router.py +++ b/tests/test_command_router.py @@ -385,6 +385,7 @@ def make_config(tmp_path: Path) -> AppConfig: max_telegram_message_length=3000, enable_sensitive_diff_filter=True, default_agent_provider="codex", + agent_hard_timeout_seconds=0, ) @@ -2236,3 +2237,1109 @@ def test_push_reports_missing_project_folder_before_git_calls(tmp_path: Path): assert router.git.push_calls == [] assert "Project folder no longer exists for this session: backend" in bot.messages[-1][1] + + +# --------------------------------------------------------------------------- +# _path_within_project — symlink traversal guard +# --------------------------------------------------------------------------- + + +def test_path_within_project_allows_normal_files(tmp_path: Path): + from coding_agent_telegram.router.base import CommandRouterBase + + project = tmp_path / "project" + project.mkdir() + (project / "src").mkdir() + (project / "src" / "file.py").write_text("x") + + assert CommandRouterBase._path_within_project(project, "src/file.py") + assert CommandRouterBase._path_within_project(project, "README.md") + + +def test_path_within_project_blocks_symlink_escape(tmp_path: Path): + """A symlink inside the project pointing outside must be rejected.""" + from coding_agent_telegram.router.base import CommandRouterBase + + project = tmp_path / "project" + project.mkdir() + outside = tmp_path / "outside" + outside.mkdir() + (outside / "secret.txt").write_text("secret") + + # Create a symlink inside the project that points outside + link = project / "external_link" + link.symlink_to(outside) + + assert not CommandRouterBase._path_within_project(project, "external_link/secret.txt") + + +def test_path_within_project_blocks_symlink_to_file_outside(tmp_path: Path): + """A symlink to a single file outside the project must also be rejected.""" + from coding_agent_telegram.router.base import CommandRouterBase + + project = tmp_path / "project" + project.mkdir() + secret = tmp_path / "secret.txt" + secret.write_text("secret") + + link = project / "linked_secret.txt" + link.symlink_to(secret) + + assert not CommandRouterBase._path_within_project(project, "linked_secret.txt") + + +# --------------------------------------------------------------------------- +# Workspace lock — concurrent agent runs on the same project are blocked +# --------------------------------------------------------------------------- + + +def test_workspace_lock_blocks_second_call_on_same_project(tmp_path: Path): + """When a workspace lock is held, a second _run_with_typing call on the same + project must be rejected immediately with an 'already running' message.""" + from coding_agent_telegram.agent_runner import MultiAgentRunner + + runner_real = MultiAgentRunner( + codex_bin="codex", + copilot_bin="copilot", + approval_policy="never", + sandbox_mode="workspace-write", + ) + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner_real, bot_id="bot-a")) + + messages_sent: list = [] + + async def run(): + # Manually hold the workspace lock to simulate an in-flight agent run. + lock = router._workspace_locks.setdefault("myproject", asyncio.Lock()) + async with lock: + bot = FakeBot() + update = make_update() + context = SimpleNamespace(args=[], bot=bot) + result = await router._run_with_typing( + update, + context, + runner_real.resume_session, + "codex", + "sess_1", + tmp_path, + "hello", + workspace_lock_key="myproject", + ) + assert result is None + messages_sent.extend(bot.messages) + + asyncio.run(run()) + assert any("already running" in msg[1] for msg in messages_sent) + + +def test_workspace_lock_allows_different_projects_concurrently(tmp_path: Path): + """Two calls with different workspace_lock_keys must not block each other.""" + from coding_agent_telegram.agent_runner import MultiAgentRunner + + runner_real = MultiAgentRunner( + codex_bin="codex", + copilot_bin="copilot", + approval_policy="never", + sandbox_mode="workspace-write", + ) + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner_real, bot_id="bot-a")) + + async def run(): + # Hold the lock for "project-a" + lock_a = router._workspace_locks.setdefault("project-a", asyncio.Lock()) + async with lock_a: + # "project-b" lock should be free — lock.locked() returns False + lock_b = router._workspace_locks.setdefault("project-b", asyncio.Lock()) + assert not lock_b.locked() + + asyncio.run(run()) + + +# --------------------------------------------------------------------------- +# message_commands — null message guards (lines 15, 31) +# --------------------------------------------------------------------------- + + +def test_handle_message_does_nothing_when_message_is_none(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + message=None, + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_message(update, context)) + + assert bot.messages == [] + + +def test_handle_message_does_nothing_when_text_is_empty(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + message=SimpleNamespace(text="", photo=None, caption=None), + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_message(update, context)) + + assert bot.messages == [] + + +def test_handle_photo_does_nothing_when_message_is_none(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + message=None, + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_photo(update, context)) + + assert bot.messages == [] + + +# --------------------------------------------------------------------------- +# git_commands — commit disabled, empty args, git command fails +# --------------------------------------------------------------------------- + + +def test_commit_sends_disabled_message_when_not_enabled(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + # enable_commit_command defaults to False in make_config + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=True) + router.runtime.git = router.git + + update = make_update(text="/commit git add .") + bot = FakeBot() + context = SimpleNamespace(args=["git", "add", "."], bot=bot) + asyncio.run(router.handle_commit(update, context)) + + assert "disabled" in bot.messages[-1][1].lower() + + +def test_commit_sends_usage_when_no_text_after_command(tmp_path: Path): + router, _ = _make_commit_router(tmp_path, git_manager=FakeGitManager(is_git_repo=True)) + + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + message=SimpleNamespace(text="/commit", photo=None, caption=None), + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + asyncio.run(router.handle_commit(update, context)) + + assert "Usage" in bot.messages[-1][1] + + +def test_commit_reports_failed_git_command_and_stops(tmp_path: Path): + router, backend = _make_commit_router( + tmp_path, + git_manager=FakeGitManager( + is_git_repo=True, + git_command_results=[SimpleNamespace(success=False, message="nothing to commit")], + ), + ) + + bot = _run_commit_command(router, "/commit git add -u") + + assert router.git.safe_git_commands != [] + assert "nothing to commit" in bot.messages[-1][1] + + +# --------------------------------------------------------------------------- +# git_commands — handle_push edge cases +# --------------------------------------------------------------------------- + + +def test_push_sends_usage_when_extra_args_provided(tmp_path: Path): + backend = (tmp_path / "backend").resolve() + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.create_session("bot-a", 123, "sess1", "s1", "backend", "codex", branch_name="main") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=True, current_branch="main") + router.runtime.git = router.git + + update = make_update(text="/push extra") + bot = FakeBot() + context = SimpleNamespace(args=["extra"], bot=bot) + asyncio.run(router.handle_push(update, context)) + + assert "Usage" in bot.messages[-1][1] + + +def test_push_warns_when_branch_cannot_be_determined(tmp_path: Path): + backend = (tmp_path / "backend").resolve() + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + # session with no branch_name stored + store.create_session("bot-a", 123, "sess1", "s1", "backend", "codex") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + # current_branch also returns None + router.git = FakeGitManager(is_git_repo=True, current_branch=None) + router.runtime.git = router.git + + bot = _run_push_command(router) + + assert "Could not determine" in bot.messages[-1][1] + + +# --------------------------------------------------------------------------- +# session_commands — handle_switch +# --------------------------------------------------------------------------- + + +def _make_switch_router(tmp_path: Path) -> CommandRouter: + backend = (tmp_path / "backend").resolve() + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=False) + router.runtime.git = router.git + return router + + +def test_switch_command_reports_no_sessions_when_empty(tmp_path: Path): + router = _make_switch_router(tmp_path) + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + asyncio.run(router.handle_switch(update, context)) + assert "No sessions found" in bot.messages[-1][1] + + +def test_switch_command_lists_sessions_when_no_arg(tmp_path: Path): + router = _make_switch_router(tmp_path) + router.deps.store.create_session("bot-a", 123, "sess-abc", "my-session", "backend", "codex") + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + asyncio.run(router.handle_switch(update, context)) + + assert "my-session" in bot.messages[-1][1] + + +def test_switch_command_handles_page_arg(tmp_path: Path): + router = _make_switch_router(tmp_path) + router.deps.store.create_session("bot-a", 123, "sess-abc", "my-session", "backend", "codex") + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["page", "1"], bot=bot) + asyncio.run(router.handle_switch(update, context)) + + assert "my-session" in bot.messages[-1][1] + + +def test_switch_command_rejects_non_numeric_page(tmp_path: Path): + router = _make_switch_router(tmp_path) + router.deps.store.create_session("bot-a", 123, "sess-abc", "my-session", "backend", "codex") + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["page", "abc"], bot=bot) + asyncio.run(router.handle_switch(update, context)) + + assert "Invalid page number" in bot.messages[-1][1] + + +def test_switch_command_by_session_id_activates_session(tmp_path: Path): + router = _make_switch_router(tmp_path) + router.deps.store.create_session("bot-a", 123, "sess-abc", "my-session", "backend", "codex") + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["sess-abc"], bot=bot) + asyncio.run(router.handle_switch(update, context)) + + # Should send a confirmation message + assert any("my-session" in m[1] or "sess-abc" in m[1] or "Switched" in m[1] for m in bot.messages) + + +def test_switch_command_reports_not_found_for_unknown_session_id(tmp_path: Path): + router = _make_switch_router(tmp_path) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["sess-nonexistent"], bot=bot) + asyncio.run(router.handle_switch(update, context)) + + assert "not found" in bot.messages[-1][1].lower() + + +def test_switch_command_reports_missing_project_folder(tmp_path: Path): + backend = (tmp_path / "vanished").resolve() # intentionally doesn't exist + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.create_session("bot-a", 123, "sess-xyz", "ghost-session", "vanished", "codex") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=False) + router.runtime.git = router.git + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["sess-xyz"], bot=bot) + asyncio.run(router.handle_switch(update, context)) + + assert "no longer exists" in bot.messages[-1][1].lower() or "vanished" in bot.messages[-1][1] + + +# --------------------------------------------------------------------------- +# session_commands — handle_provider +# --------------------------------------------------------------------------- + + +def test_handle_provider_sends_keyboard_when_no_args(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + asyncio.run(router.handle_provider(update, context)) + + # Should have sent a message with a reply_markup keyboard + assert len(bot.messages) >= 1 + assert bot.messages[-1][3] is not None # reply_markup present + + +def test_handle_provider_sends_usage_when_args_provided(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["codex"], bot=bot) + asyncio.run(router.handle_provider(update, context)) + + assert "Usage" in bot.messages[-1][1] + + +# --------------------------------------------------------------------------- +# project_commands — handle_project edge cases +# --------------------------------------------------------------------------- + + +def test_project_command_sends_usage_when_no_args(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + asyncio.run(router.handle_project(update, context)) + + assert "Usage" in bot.messages[-1][1] + + +def test_project_command_rejects_when_path_is_a_file(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + # Create a file (not directory) at the project path + (tmp_path / "myfile").write_text("x") + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["myfile"], bot=bot) + asyncio.run(router.handle_project(update, context)) + + assert "not a directory" in bot.messages[-1][1].lower() or "exists" in bot.messages[-1][1].lower() + + +# --------------------------------------------------------------------------- +# project_commands — handle_branch edge cases +# --------------------------------------------------------------------------- + + +def test_branch_command_sends_error_when_no_project_selected(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["new-branch"], bot=bot) + asyncio.run(router.handle_branch(update, context)) + + assert "No project selected" in bot.messages[-1][1] or "project" in bot.messages[-1][1].lower() + + +def test_branch_command_rejects_wrong_number_of_args(tmp_path: Path): + backend = (tmp_path / "backend").resolve() + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=True) + router.runtime.git = router.git + # Set the project folder via the store directly + store.set_current_project_folder(router.deps.bot_id, 123, "backend") + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["a", "b", "c"], bot=bot) # 3 args — wrong + asyncio.run(router.handle_branch(update, context)) + + assert "Usage" in bot.messages[-1][1] + + +# --------------------------------------------------------------------------- +# trust_project_callback — edge cases +# --------------------------------------------------------------------------- + + +def test_trust_project_callback_handles_invalid_payload(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + edited = [] + + async def fake_answer(): + return None + + async def fake_edit(text, parse_mode=None): + edited.append(text) + + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + callback_query=SimpleNamespace( + data="trustproject:yes", # missing folder — only 2 parts + answer=fake_answer, + edit_message_text=fake_edit, + ), + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + asyncio.run(router.handle_trust_project_callback(update, context)) + + assert any("Invalid" in e for e in edited) + + +def test_trust_project_callback_no_decision_leaves_project_untrusted(tmp_path: Path): + backend = (tmp_path / "backend").resolve() + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + edited = [] + + async def fake_answer(): + return None + + async def fake_edit(text, parse_mode=None): + edited.append(text) + + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + callback_query=SimpleNamespace( + data="trustproject:no:backend", + answer=fake_answer, + edit_message_text=fake_edit, + ), + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + asyncio.run(router.handle_trust_project_callback(update, context)) + + assert not store.is_project_trusted("backend") + assert any("untrusted" in e.lower() for e in edited) + + +# --------------------------------------------------------------------------- +# git_commands.py — additional coverage +# --------------------------------------------------------------------------- + + +def test_commit_disabled_sends_message(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + # enable_commit_command defaults to False in make_config + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + bot = _run_commit_command(router, "/commit git add -u") + + assert "/commit is disabled" in bot.messages[-1][1] + + +def test_commit_no_args_sends_usage(tmp_path: Path): + router, _ = _make_commit_router(tmp_path, git_manager=FakeGitManager(is_git_repo=True)) + + update = make_update(text="/commit") + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + asyncio.run(router.handle_commit(update, context)) + + assert "Usage: /commit" in bot.messages[-1][1] + + +def test_commit_no_valid_git_commands_found(tmp_path: Path): + router, _ = _make_commit_router(tmp_path, git_manager=FakeGitManager(is_git_repo=True)) + + # Only non-git segments — no valid git commands + bot = _run_commit_command(router, "/commit echo hello && ls -la") + + assert "No valid git commit commands were found." in bot.messages[-1][1] + + +def test_commit_fails_mid_execution_sends_partial_results(tmp_path: Path): + router, _ = _make_commit_router( + tmp_path, + git_manager=FakeGitManager( + is_git_repo=True, + git_command_results=[ + SimpleNamespace(success=True, message="git status completed."), + SimpleNamespace(success=False, message="nothing to commit"), + ], + ), + ) + + bot = _run_commit_command(router, "/commit git status && git commit -m safe") + + last_msg = bot.messages[-1][1] + assert "git status" in last_msg + assert "nothing to commit" in last_msg + assert len(router.git.safe_git_commands) == 2 + + +def test_push_with_args_sends_usage(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update(text="/push origin") + bot = FakeBot() + context = SimpleNamespace(args=["origin"], bot=bot) + asyncio.run(router.handle_push(update, context)) + + assert "Usage: /push" in bot.messages[-1][1] + + +def test_push_no_branch_warns(tmp_path: Path): + backend = (tmp_path / "backend").resolve() + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + # Session with no branch_name; git also returns None + store.create_session("bot-a", 123, "sess_pb", "push-nobranch", "backend", "codex") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=True, current_branch=None) + router.runtime.git = router.git + + bot = _run_push_command(router) + + assert "Could not determine the branch" in bot.messages[-1][1] + + +def test_push_callback_unknown_action_returns_silently(tmp_path: Path): + backend = (tmp_path / "backend").resolve() + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.create_session("bot-a", 123, "sess_push", "push-session", "backend", "codex", branch_name="feature-1") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=True, current_branch="feature-1") + router.runtime.git = router.git + + edited = [] + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + callback_query=SimpleNamespace( + data="push:unknown", + answer=None, + edit_message_text=None, + ), + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + async def fake_answer(): + return None + + async def fake_edit(text, parse_mode=None): + edited.append(text) + + update.callback_query.answer = fake_answer + update.callback_query.edit_message_text = fake_edit + + asyncio.run(router.handle_push_callback(update, context)) + + assert edited == [] + assert bot.messages == [] + + +def test_push_callback_empty_branch_warns(tmp_path: Path): + backend = (tmp_path / "backend").resolve() + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + # Session with no branch_name; git also returns None + store.create_session("bot-a", 123, "sess_push", "push-session", "backend", "codex") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=True, current_branch=None) + router.runtime.git = router.git + + edited = [] + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + callback_query=SimpleNamespace( + data="push:confirm", + answer=None, + edit_message_text=None, + ), + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + async def fake_answer(): + return None + + async def fake_edit(text, parse_mode=None): + edited.append(text) + + update.callback_query.answer = fake_answer + update.callback_query.edit_message_text = fake_edit + + asyncio.run(router.handle_push_callback(update, context)) + + assert any("Could not determine the branch" in e for e in edited) + + +def test_push_callback_checkout_failure_sends_edit(tmp_path: Path): + backend = (tmp_path / "backend").resolve() + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.create_session("bot-a", 123, "sess_push", "push-session", "backend", "codex", branch_name="feature-x") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + # current_branch differs from session branch so checkout is attempted + router.git = FakeGitManager( + is_git_repo=True, + current_branch="main", + checkout_result=SimpleNamespace(success=False, message="error: pathspec 'feature-x' did not match"), + ) + router.runtime.git = router.git + + edited = [] + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + callback_query=SimpleNamespace( + data="push:confirm", + answer=None, + edit_message_text=None, + ), + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + async def fake_answer(): + return None + + async def fake_edit(text, parse_mode=None): + edited.append(text) + + update.callback_query.answer = fake_answer + update.callback_query.edit_message_text = fake_edit + + asyncio.run(router.handle_push_callback(update, context)) + + assert any("Push cancelled" in e for e in edited) + assert router.git.push_calls == [] + + +# --------------------------------------------------------------------------- +# session_commands.py — additional coverage +# --------------------------------------------------------------------------- + + +def test_switch_no_sessions_sends_not_found(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_switch(update, context)) + + assert "No sessions found." in bot.messages[-1][1] + + +def test_switch_page_invalid_number_sends_error(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.create_session("bot-a", 123, "sess_a", "session-a", "backend", "codex") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["page", "abc"], bot=bot) + + asyncio.run(router.handle_switch(update, context)) + + assert "Invalid page number" in bot.messages[-1][1] + + +def test_switch_invalid_session_id_sends_not_found(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.create_session("bot-a", 123, "sess_a", "session-a", "backend", "codex") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["nonexistent-id"], bot=bot) + + asyncio.run(router.handle_switch(update, context)) + + assert "Session not found" in bot.messages[-1][1] + + +def test_provider_with_args_sends_usage(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update(text="/provider codex") + bot = FakeBot() + context = SimpleNamespace(args=["codex"], bot=bot) + + asyncio.run(router.handle_provider(update, context)) + + assert "Usage: /provider" in bot.messages[-1][1] + + +def test_provider_callback_unavailable_shows_not_found(tmp_path: Path, monkeypatch): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + monkeypatch.setattr("coding_agent_telegram.router.session_commands.shutil.which", lambda _: None) + + edited = [] + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + callback_query=SimpleNamespace( + data="provider:set:codex", + answer=None, + edit_message_text=None, + ), + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + async def fake_answer(): + return None + + async def fake_edit(text): + edited.append(text) + + update.callback_query.answer = fake_answer + update.callback_query.edit_message_text = fake_edit + + asyncio.run(router.handle_provider_callback(update, context)) + + assert edited + assert "not found" in edited[0].lower() or "CLI not found" in edited[0] + + +def test_provider_callback_available_sets_provider(tmp_path: Path, monkeypatch): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + monkeypatch.setattr("coding_agent_telegram.router.session_commands.shutil.which", lambda _: "/usr/bin/codex") + + edited = [] + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + callback_query=SimpleNamespace( + data="provider:set:codex", + answer=None, + edit_message_text=None, + ), + ) + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + async def fake_answer(): + return None + + async def fake_edit(text): + edited.append(text) + + update.callback_query.answer = fake_answer + update.callback_query.edit_message_text = fake_edit + + asyncio.run(router.handle_provider_callback(update, context)) + + assert "codex" in edited[0] + assert store.get_chat_state("bot-a", 123)["current_provider"] == "codex" + + +# --------------------------------------------------------------------------- +# project_commands.py — additional coverage +# --------------------------------------------------------------------------- + + +def test_project_no_args_sends_usage(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_project(update, context)) + + assert "Usage: /project" in bot.messages[-1][1] + + +def test_project_path_is_file_sends_error(tmp_path: Path): + # Create a file where the project folder would go + file_path = tmp_path / "backend" + file_path.write_text("not a directory") + + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["backend"], bot=bot) + + asyncio.run(router.handle_project(update, context)) + + assert "not a directory" in bot.messages[-1][1] + + +def test_branch_not_git_repo_sends_error(tmp_path: Path): + backend = tmp_path / "backend" + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.set_current_project_folder("bot-a", 123, "backend") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=False) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_branch(update, context)) + + assert "not a git repository" in bot.messages[-1][1] + + +def test_branch_wrong_arg_count_sends_usage(tmp_path: Path): + backend = tmp_path / "backend" + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.set_current_project_folder("bot-a", 123, "backend") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager(is_git_repo=True) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["a", "b", "c"], bot=bot) + + asyncio.run(router.handle_branch(update, context)) + + assert "Usage: /branch" in bot.messages[-1][1] + + +def test_branch_prepare_fails_sends_failure_message(tmp_path: Path): + backend = tmp_path / "backend" + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.set_current_project_folder("bot-a", 123, "backend") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + router.git = FakeGitManager( + is_git_repo=True, + prepare_result=SimpleNamespace( + success=False, + message="Branch preparation failed: branch already exists.", + current_branch=None, + default_branch="main", + ), + ) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=["feature-new"], bot=bot) + + asyncio.run(router.handle_branch(update, context)) + + assert "Branch preparation failed" in bot.messages[-1][1] + + +def _make_trust_callback_update(data: str): + """Build a fake update + edited-list for handle_trust_project_callback.""" + edited = [] + + async def fake_answer(): + return None + + async def fake_edit(text): + edited.append(text) + + update = SimpleNamespace( + effective_chat=SimpleNamespace(id=123, type="private"), + callback_query=SimpleNamespace( + data=data, + answer=fake_answer, + edit_message_text=fake_edit, + ), + ) + return update, edited + + +def test_trust_project_callback_invalid_payload(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update, edited = _make_trust_callback_update("trustproject:onlytwoparts") + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_trust_project_callback(update, context)) + + assert "Invalid trust decision." in edited + + +def test_trust_project_callback_no_leaves_untrusted(tmp_path: Path): + backend = tmp_path / "backend" + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update, edited = _make_trust_callback_update("trustproject:no:backend") + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_trust_project_callback(update, context)) + + assert any("left untrusted" in e for e in edited) + assert not store.is_project_trusted("backend") + + +def test_trust_project_callback_already_trusted(tmp_path: Path): + backend = tmp_path / "backend" + backend.mkdir() + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + store.trust_project("backend") + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update, edited = _make_trust_callback_update("trustproject:yes:backend") + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_trust_project_callback(update, context)) + + assert any("already trusted" in e for e in edited) + + +# --------------------------------------------------------------------------- +# base.py — additional coverage +# --------------------------------------------------------------------------- + + +def test_handle_unsupported_message_sends_unsupported_text(tmp_path: Path): + runner = DummyRunner() + cfg = make_config(tmp_path) + store = SessionStore(cfg.state_file, cfg.state_backup_file) + router = CommandRouter(RouterDeps(cfg=cfg, store=store, agent_runner=runner, bot_id="bot-a")) + + update = make_update() + bot = FakeBot() + context = SimpleNamespace(args=[], bot=bot) + + asyncio.run(router.handle_unsupported_message(update, context)) + + assert "Unsupported message type" in bot.messages[-1][1] + + +def test_format_git_response_with_ignored_segments(): + from coding_agent_telegram.router.base import CommandRouterBase + + result = SimpleNamespace(success=True, message="git status completed.", stdout=None, returncode=0) + output = CommandRouterBase._format_git_response( + [(["status"], result)], + ["echo hello", "ls -la"], + ) + + assert "Ignored non-git commands:" in output + assert "echo hello" in output + assert "ls -la" in output diff --git a/tests/test_config.py b/tests/test_config.py index d21363e..64ca585 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -169,3 +169,42 @@ def test_load_config_uses_env_file_and_overrides_empty_process_values(monkeypatc assert cfg.workspace_root.name == "git" assert cfg.telegram_bot_tokens == ("token-a",) assert cfg.allowed_chat_ids == {123} + + +# --------------------------------------------------------------------------- +# _parse_bool edge cases +# --------------------------------------------------------------------------- + + +def test_parse_bool_accepts_truthy_values(): + from coding_agent_telegram.config import _parse_bool + + for val in ("1", "true", "True", "TRUE", "yes", "on"): + assert _parse_bool(val) is True, f"Expected True for {val!r}" + + +def test_parse_bool_treats_none_as_default(): + from coding_agent_telegram.config import _parse_bool + + assert _parse_bool(None, default=True) is True + assert _parse_bool(None, default=False) is False + + +def test_parse_bool_rejects_unknown_string(): + from coding_agent_telegram.config import _parse_bool + + assert _parse_bool("maybe") is False + assert _parse_bool("0") is False + + +# --------------------------------------------------------------------------- +# resolve_env_file_path: legacy fallback +# --------------------------------------------------------------------------- + + +def test_resolve_env_file_path_returns_default_path_when_neither_file_exists(tmp_path, monkeypatch): + from coding_agent_telegram.config import resolve_env_file_path, DEFAULT_ENV_FILE_NAME + + monkeypatch.chdir(tmp_path) + path = resolve_env_file_path() + assert path.name == DEFAULT_ENV_FILE_NAME diff --git a/tests/test_filters.py b/tests/test_filters.py index 46d14cc..0ac1762 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -8,3 +8,14 @@ def test_is_sensitive_path_matches_resource_patterns(): assert is_sensitive_path("config/secrets.toml") is True assert is_sensitive_path(".ssh/id_ed25519") is True assert is_sensitive_path("src/app.py") is False + + +# --------------------------------------------------------------------------- +# is_valid_project_folder — backslash rejection (line 28) +# --------------------------------------------------------------------------- + + +def test_is_valid_project_folder_rejects_backslash(): + from coding_agent_telegram.filters import is_valid_project_folder + + assert is_valid_project_folder("back\\slash") is False diff --git a/tests/test_git_utils.py b/tests/test_git_utils.py index c6c7e2a..14e36f5 100644 --- a/tests/test_git_utils.py +++ b/tests/test_git_utils.py @@ -1,5 +1,7 @@ from pathlib import Path +from types import SimpleNamespace +import pytest from coding_agent_telegram.git_utils import GitWorkspaceManager @@ -160,3 +162,489 @@ def test_run_safe_commit_command_prefers_explicit_git_identity_env(tmp_path: Pat .stdout.strip() ) assert name == "Env User " + + +# --------------------------------------------------------------------------- +# _validate_branch_name +# --------------------------------------------------------------------------- + + +def test_validate_branch_name_accepts_valid_names(): + from coding_agent_telegram.git_utils import _validate_branch_name + + assert _validate_branch_name("main") is True + assert _validate_branch_name("feature/TICKET-123") is True + assert _validate_branch_name("hotfix-1.0") is True + assert _validate_branch_name("origin/main") is True + assert _validate_branch_name("A" * 200) is True + + +def test_validate_branch_name_rejects_flag_like(): + from coding_agent_telegram.git_utils import _validate_branch_name + + assert _validate_branch_name("-b") is False + assert _validate_branch_name("--exec") is False + assert _validate_branch_name("") is False + + +def test_validate_branch_name_rejects_special_chars(): + from coding_agent_telegram.git_utils import _validate_branch_name + + assert _validate_branch_name("branch;rm") is False + assert _validate_branch_name("branch name") is False # space + assert _validate_branch_name("branch\x00null") is False + assert _validate_branch_name("a" * 201) is False # too long + + +def test_checkout_branch_rejects_invalid_name(tmp_path: Path): + """checkout_branch must reject branch names that fail validation without making git calls.""" + project = tmp_path / "repo" + project.mkdir() + manager = GitWorkspaceManager() + + result = manager.checkout_branch(project, "--malicious") + + assert result.success is False + assert "Invalid branch name" in result.message + + +def test_prepare_branch_rejects_invalid_new_branch(tmp_path: Path): + """prepare_branch must reject an invalid new_branch without making git calls.""" + project = tmp_path / "repo" + project.mkdir() + _git(project, "init", "-b", "main") + _git(project, "config", "user.name", "Test") + _git(project, "config", "user.email", "t@t.com") + (project / "f").write_text("x") + _git(project, "add", "f") + _git(project, "commit", "-m", "init") + + manager = GitWorkspaceManager() + result = manager.prepare_branch(project, origin_branch=None, new_branch="-exec") + + assert result.success is False + assert "Invalid branch name" in result.message + + +def test_prepare_branch_rejects_invalid_origin_branch(tmp_path: Path): + """prepare_branch must reject an invalid origin_branch without making git calls.""" + project = tmp_path / "repo" + project.mkdir() + _git(project, "init", "-b", "main") + _git(project, "config", "user.name", "Test") + _git(project, "config", "user.email", "t@t.com") + (project / "f").write_text("x") + _git(project, "add", "f") + _git(project, "commit", "-m", "init") + + manager = GitWorkspaceManager() + result = manager.prepare_branch(project, origin_branch="--bad", new_branch="good-branch") + + assert result.success is False + assert "Invalid" in result.message + + +# --------------------------------------------------------------------------- +# checkout_branch — git failure path +# --------------------------------------------------------------------------- + + +def test_checkout_branch_returns_failure_when_git_errors(tmp_path: Path): + project = tmp_path / "repo" + project.mkdir() + _git(project, "init", "-b", "main") + _git(project, "config", "user.name", "Test") + _git(project, "config", "user.email", "t@t.com") + (project / "f").write_text("x") + _git(project, "add", "f") + _git(project, "commit", "-m", "init") + + manager = GitWorkspaceManager() + # Branch does not exist + result = manager.checkout_branch(project, "nonexistent-branch") + + assert result.success is False + assert result.message # some error message from git + + +# --------------------------------------------------------------------------- +# push_branch — success path +# --------------------------------------------------------------------------- + + +def test_push_branch_returns_success_result_structure(tmp_path: Path): + """push_branch should return a BranchOperationResult with success=False + when there is no remote (the real git call fails).""" + project = tmp_path / "repo" + project.mkdir() + _git(project, "init", "-b", "main") + _git(project, "config", "user.name", "Test") + _git(project, "config", "user.email", "t@t.com") + (project / "f").write_text("x") + _git(project, "add", "f") + _git(project, "commit", "-m", "init") + + manager = GitWorkspaceManager() + result = manager.push_branch(project, "main") + + # No remote configured so push fails, but result structure is valid + assert hasattr(result, "success") + assert hasattr(result, "message") + + +# --------------------------------------------------------------------------- +# list_local_branches +# --------------------------------------------------------------------------- + + +def test_list_local_branches_returns_branches_in_git_repo(tmp_path: Path): + project = tmp_path / "repo" + project.mkdir() + _git(project, "init", "-b", "main") + _git(project, "config", "user.name", "Test") + _git(project, "config", "user.email", "t@t.com") + (project / "f").write_text("x") + _git(project, "add", "f") + _git(project, "commit", "-m", "init") + _git(project, "checkout", "-b", "feature-a") + + manager = GitWorkspaceManager() + branches = manager.list_local_branches(project) + + assert "main" in branches + assert "feature-a" in branches + + +def test_list_local_branches_returns_empty_list_for_non_repo(tmp_path: Path): + # Use an existing directory that is not a git repo + project = tmp_path / "plain_dir" + project.mkdir() + manager = GitWorkspaceManager() + result = manager.list_local_branches(project) + assert result == [] + + +# --------------------------------------------------------------------------- +# default_branch fallbacks +# --------------------------------------------------------------------------- + + +def test_default_branch_falls_back_to_current_branch_without_origin(tmp_path: Path): + project = tmp_path / "repo" + project.mkdir() + _git(project, "init", "-b", "trunk") + _git(project, "config", "user.name", "Test") + _git(project, "config", "user.email", "t@t.com") + (project / "f").write_text("x") + _git(project, "add", "f") + _git(project, "commit", "-m", "init") + + manager = GitWorkspaceManager() + branch = manager.default_branch(project) + + assert branch in ("trunk", "main", "master") + + +# --------------------------------------------------------------------------- +# is_git_repo +# --------------------------------------------------------------------------- + + +def test_is_git_repo_returns_false_for_plain_directory(tmp_path: Path): + manager = GitWorkspaceManager() + assert manager.is_git_repo(tmp_path) is False + + +def test_is_git_repo_returns_true_for_initialized_repo(tmp_path: Path): + project = tmp_path / "repo" + project.mkdir() + _git(project, "init", "-b", "main") + + manager = GitWorkspaceManager() + assert manager.is_git_repo(project) is True + + +# --------------------------------------------------------------------------- +# current_branch +# --------------------------------------------------------------------------- + + +def test_current_branch_returns_none_for_non_repo(tmp_path: Path): + manager = GitWorkspaceManager() + assert manager.current_branch(tmp_path) is None + + +def test_current_branch_returns_branch_name(tmp_path: Path): + project = tmp_path / "repo" + project.mkdir() + _git(project, "init", "-b", "my-branch") + _git(project, "config", "user.name", "Test") + _git(project, "config", "user.email", "t@t.com") + (project / "f").write_text("x") + _git(project, "add", "f") + _git(project, "commit", "-m", "init") + + manager = GitWorkspaceManager() + assert manager.current_branch(project) == "my-branch" + + +# --------------------------------------------------------------------------- +# default_branch: origin/HEAD that starts with "origin/" +# --------------------------------------------------------------------------- + + +def test_default_branch_parses_origin_head(tmp_path: Path): + """When symbolic-ref returns 'origin/main', default_branch should return 'main'.""" + git = GitWorkspaceManager() + + run_results = iter([ + SimpleNamespace(returncode=0, stdout="origin/main\n", stderr=""), + ]) + + def fake_run(path, *args): + return next(run_results) + + git._run = fake_run + assert git.default_branch(tmp_path) == "main" + + +# --------------------------------------------------------------------------- +# refresh_current_branch: no current branch detected +# --------------------------------------------------------------------------- + + +def test_refresh_current_branch_no_current_branch(tmp_path: Path): + """When current_branch returns None, an informative error must be returned.""" + git = GitWorkspaceManager() + git.is_git_repo = lambda p: True + git.current_branch = lambda p: None + + result = git.refresh_current_branch(tmp_path) + assert not result.success + assert "current branch" in result.message.lower() + + +# --------------------------------------------------------------------------- +# refresh_current_branch: fetch fails +# --------------------------------------------------------------------------- + + +def test_refresh_current_branch_fetch_fails(tmp_path: Path): + """When fetch fails, a warning must be included in the result.""" + git = GitWorkspaceManager() + call_count = {"n": 0} + + def fake_run(path, *args): + call_count["n"] += 1 + if "symbolic-ref" in args or "show-ref" in args or "rev-parse" in args: + if "rev-parse" in args: + return SimpleNamespace(returncode=0, stdout="main\n", stderr="") + return SimpleNamespace(returncode=1, stdout="", stderr="") + if "fetch" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="fetch failed") + return SimpleNamespace(returncode=0, stdout="main\n", stderr="") + + git._run = fake_run + # Provide is_git_repo=True and current_branch=main via monkeypatching attrs + git.is_git_repo = lambda p: True + git.current_branch = lambda p: "main" + git.default_branch = lambda p: "main" + + result = git.refresh_current_branch(tmp_path) + assert result.success + assert result.warnings + + +# --------------------------------------------------------------------------- +# refresh_current_branch: pull fails +# --------------------------------------------------------------------------- + + +def test_refresh_current_branch_pull_fails(tmp_path: Path): + """When pull fails after a successful fetch, a warning must be included.""" + git = GitWorkspaceManager() + + def fake_run(path, *args): + if "fetch" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if "pull" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="pull failed") + return SimpleNamespace(returncode=0, stdout="main\n", stderr="") + + git._run = fake_run + git.is_git_repo = lambda p: True + git.current_branch = lambda p: "main" + git.default_branch = lambda p: "main" + + result = git.refresh_current_branch(tmp_path) + assert result.success + assert result.warnings + + +# --------------------------------------------------------------------------- +# prepare_branch: fetch fails +# --------------------------------------------------------------------------- + + +def test_prepare_branch_fetch_fails(tmp_path: Path): + """When fetch fails, prepare_branch must return a failure result.""" + git = GitWorkspaceManager() + git.is_git_repo = lambda p: True + git.current_branch = lambda p: "main" + git.default_branch = lambda p: "main" + + def fake_run(path, *args): + if "fetch" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="network error") + return SimpleNamespace(returncode=0, stdout="", stderr="") + + git._run = fake_run + + result = git.prepare_branch(tmp_path, origin_branch=None, new_branch="feature-x") + assert not result.success + + +# --------------------------------------------------------------------------- +# prepare_branch: existing branch checkout fails +# --------------------------------------------------------------------------- + + +def test_prepare_branch_existing_branch_checkout_fails(tmp_path: Path): + """When the existing branch exists but checkout fails, return failure.""" + git = GitWorkspaceManager() + git.is_git_repo = lambda p: True + git.current_branch = lambda p: "main" + git.default_branch = lambda p: "main" + + def fake_run(path, *args): + if "fetch" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if "show-ref" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") # branch exists + if "checkout" in args and "feature-x" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="checkout failed") + return SimpleNamespace(returncode=0, stdout="", stderr="") + + git._run = fake_run + + result = git.prepare_branch(tmp_path, origin_branch=None, new_branch="feature-x") + assert not result.success + + +# --------------------------------------------------------------------------- +# prepare_branch: existing branch == base branch but pull fails +# --------------------------------------------------------------------------- + + +def test_prepare_branch_existing_is_base_pull_fails(tmp_path: Path): + """When new_branch == base_branch and pull fails, return failure.""" + git = GitWorkspaceManager() + git.is_git_repo = lambda p: True + git.current_branch = lambda p: "main" + git.default_branch = lambda p: "main" + + def fake_run(path, *args): + if "fetch" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if "show-ref" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") # branch exists + if "checkout" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if "pull" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="pull error") + return SimpleNamespace(returncode=0, stdout="", stderr="") + + git._run = fake_run + + # new_branch == base_branch (both "main") + result = git.prepare_branch(tmp_path, origin_branch="main", new_branch="main") + assert not result.success + + +# --------------------------------------------------------------------------- +# prepare_branch: base branch checkout fails +# --------------------------------------------------------------------------- + + +def test_prepare_branch_base_branch_checkout_fails(tmp_path: Path): + """When checkout of base branch fails, prepare_branch must return failure.""" + git = GitWorkspaceManager() + git.is_git_repo = lambda p: True + git.current_branch = lambda p: "main" + git.default_branch = lambda p: "main" + + def fake_run(path, *args): + if "fetch" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if "show-ref" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="") # branch doesn't exist + if "checkout" in args and "-b" not in args: + return SimpleNamespace(returncode=1, stdout="", stderr="cannot checkout base") + return SimpleNamespace(returncode=0, stdout="", stderr="") + + git._run = fake_run + + result = git.prepare_branch(tmp_path, origin_branch=None, new_branch="new-feature") + assert not result.success + + +# --------------------------------------------------------------------------- +# prepare_branch: base branch pull fails +# --------------------------------------------------------------------------- + + +def test_prepare_branch_base_pull_fails(tmp_path: Path): + """When pull of base branch fails, prepare_branch must return failure.""" + git = GitWorkspaceManager() + git.is_git_repo = lambda p: True + git.current_branch = lambda p: "main" + git.default_branch = lambda p: "main" + + def fake_run(path, *args): + if "fetch" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if "show-ref" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="") # branch doesn't exist + if "checkout" in args and "-b" not in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if "pull" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="pull error") + if "-b" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + return SimpleNamespace(returncode=0, stdout="", stderr="") + + git._run = fake_run + + result = git.prepare_branch(tmp_path, origin_branch=None, new_branch="new-feature") + assert not result.success + + +# --------------------------------------------------------------------------- +# prepare_branch: git create -b fails +# --------------------------------------------------------------------------- + + +def test_prepare_branch_create_branch_fails(tmp_path: Path): + """When 'git checkout -b' fails, prepare_branch must return failure.""" + git = GitWorkspaceManager() + git.is_git_repo = lambda p: True + git.current_branch = lambda p: "main" + git.default_branch = lambda p: "main" + + def fake_run(path, *args): + if "fetch" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if "show-ref" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="") # branch doesn't exist + if "-b" in args: + return SimpleNamespace(returncode=1, stdout="", stderr="cannot create") + if "checkout" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + if "pull" in args: + return SimpleNamespace(returncode=0, stdout="", stderr="") + return SimpleNamespace(returncode=0, stdout="", stderr="") + + git._run = fake_run + + result = git.prepare_branch(tmp_path, origin_branch=None, new_branch="new-feature") + assert not result.success diff --git a/tests/test_logging_utils.py b/tests/test_logging_utils.py new file mode 100644 index 0000000..b706857 --- /dev/null +++ b/tests/test_logging_utils.py @@ -0,0 +1,67 @@ +"""Tests for logging_utils.setup_logging.""" +from __future__ import annotations + +import logging +from pathlib import Path + + +def test_setup_logging_creates_log_file(tmp_path: Path): + from coding_agent_telegram.logging_utils import setup_logging + + log_file = setup_logging("INFO", tmp_path / "logs") + + assert log_file.exists() + assert log_file.name == "coding-agent-telegram.log" + + +def test_setup_logging_adds_stream_and_file_handlers(tmp_path: Path): + from coding_agent_telegram.logging_utils import setup_logging + + setup_logging("DEBUG", tmp_path / "logs") + + root = logging.getLogger() + handler_types = {type(h).__name__ for h in root.handlers} + assert "StreamHandler" in handler_types + assert "RotatingFileHandler" in handler_types + + +def test_setup_logging_removes_existing_handlers_before_adding_new_ones(tmp_path: Path): + from coding_agent_telegram.logging_utils import setup_logging + + # Add a stale handler manually + stale = logging.StreamHandler() + logging.getLogger().addHandler(stale) + + setup_logging("INFO", tmp_path / "logs") + + root = logging.getLogger() + assert stale not in root.handlers + # Exactly 2 handlers (StreamHandler + RotatingFileHandler) + assert len(root.handlers) == 2 + + +def test_setup_logging_sets_root_level_correctly(tmp_path: Path): + from coding_agent_telegram.logging_utils import setup_logging + + setup_logging("WARNING", tmp_path / "logs") + + assert logging.getLogger().level == logging.WARNING + + +def test_setup_logging_suppresses_noisy_third_party_loggers(tmp_path: Path): + from coding_agent_telegram.logging_utils import setup_logging + + setup_logging("DEBUG", tmp_path / "logs") + + for name in ("httpx", "httpcore", "telegram"): + assert logging.getLogger(name).level == logging.WARNING + + +def test_setup_logging_writes_to_file(tmp_path: Path): + from coding_agent_telegram.logging_utils import setup_logging + + log_file = setup_logging("INFO", tmp_path / "logs") + logging.getLogger("test.write_check").info("hello from test") + + content = log_file.read_text(encoding="utf-8") + assert "hello from test" in content diff --git a/tests/test_session_runtime_diff_merge.py b/tests/test_session_runtime_diff_merge.py index bb25c16..d9916a4 100644 --- a/tests/test_session_runtime_diff_merge.py +++ b/tests/test_session_runtime_diff_merge.py @@ -35,3 +35,81 @@ def test_merge_snapshot_diffs_falls_back_to_git_diff_when_snapshot_unavailable() merged = runtime._merge_snapshot_diffs(git_diffs, snapshot_diffs) assert merged[0].diff == "diff-from-git-head" + + +# --------------------------------------------------------------------------- +# _chunk_assistant_prose — splitting large prose +# --------------------------------------------------------------------------- + + +def test_chunk_assistant_prose_returns_empty_for_blank_text(): + runtime = _runtime() + runtime.cfg = type("Cfg", (), {"max_telegram_message_length": 3000})() + result = runtime._chunk_assistant_prose("Output", " ") + assert result == [] + + +def test_chunk_assistant_prose_returns_single_chunk_for_short_text(): + runtime = _runtime() + runtime.cfg = type("Cfg", (), {"max_telegram_message_length": 3000})() + chunks = runtime._chunk_assistant_prose("Output", "Short message.") + assert len(chunks) == 1 + assert "Short message." in chunks[0] + + +def test_chunk_assistant_prose_splits_text_that_exceeds_max_length(): + runtime = _runtime() + runtime.cfg = type("Cfg", (), {"max_telegram_message_length": 100})() + long_text = "word " * 50 # 250 chars — way over 100 including the title + chunks = runtime._chunk_assistant_prose("Out", long_text) + assert len(chunks) > 1 + for chunk in chunks: + assert len(chunk) <= 100 + 50 # allow a little slack for HTML tags + + +# --------------------------------------------------------------------------- +# _split_assistant_body +# --------------------------------------------------------------------------- + + +def test_split_assistant_body_splits_multiline_at_midpoint(): + runtime = _runtime() + runtime.cfg = None + lines = ["line1", "line2", "line3", "line4"] + body = "\n".join(lines) + left, right = runtime._split_assistant_body(body) + assert left + assert right + assert "line1" in left + assert "line3" in right or "line4" in right + + +def test_split_assistant_body_handles_single_long_line(): + runtime = _runtime() + runtime.cfg = None + body = "a" * 200 + left, right = runtime._split_assistant_body(body) + assert left + assert right + + +# --------------------------------------------------------------------------- +# _merge_snapshot_diffs — edge cases +# --------------------------------------------------------------------------- + + +def test_merge_snapshot_diffs_uses_git_diff_when_no_snapshot_for_path(): + runtime = _runtime() + git_diffs = [FileDiff(path="new_file.py", diff="diff-only-in-git")] + snapshot_diffs: dict = {} + + merged = runtime._merge_snapshot_diffs(git_diffs, snapshot_diffs) + + assert len(merged) == 1 + assert merged[0].diff == "diff-only-in-git" + + +def test_merge_snapshot_diffs_handles_empty_inputs(): + runtime = _runtime() + merged = runtime._merge_snapshot_diffs([], {}) + assert merged == [] diff --git a/tests/test_session_runtime_security.py b/tests/test_session_runtime_security.py new file mode 100644 index 0000000..82fe036 --- /dev/null +++ b/tests/test_session_runtime_security.py @@ -0,0 +1,83 @@ +"""Tests for output-sanitization helpers in session_runtime.""" +from __future__ import annotations + +from coding_agent_telegram.session_runtime import _sanitize_agent_error, _scrub_secrets + + +# --------------------------------------------------------------------------- +# _scrub_secrets +# --------------------------------------------------------------------------- + + +def test_scrub_secrets_redacts_telegram_bot_token(): + text = "Token is 1234567890:AAHabcdefghij0123456789ABCDEFGHIJklm in config." + result = _scrub_secrets(text) + assert "" in result + assert "AAHabcdefghij0123456789ABCDEFGHIJklm" not in result + + +def test_scrub_secrets_redacts_github_pat(): + # GitHub PAT pattern requires 36+ alphanumeric chars after the prefix. + text = "Use token ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij1234 for auth." + result = _scrub_secrets(text) + assert "" in result + assert "ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij1234" not in result + + +def test_scrub_secrets_redacts_aws_access_key(): + text = "AWS key: AKIAIOSFODNN7EXAMPLE is set." + result = _scrub_secrets(text) + assert "" in result + assert "AKIAIOSFODNN7EXAMPLE" not in result + + +def test_scrub_secrets_leaves_normal_text_unchanged(): + text = "Updated README with installation steps and examples." + assert _scrub_secrets(text) == text + + +def test_scrub_secrets_leaves_short_token_like_strings_unchanged(): + # A short string that does not match any pattern must pass through. + text = "Session ID: abc-123" + assert _scrub_secrets(text) == text + + +def test_scrub_secrets_handles_multiple_secrets_in_same_text(): + text = ( + "bot token: 9876543210:BBHabcdefghij0123456789ABCDEFGHIJklm " + "github: ghs_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij1234" + ) + result = _scrub_secrets(text) + assert "" in result + assert "" in result + assert "BBHabcdefghij" not in result + assert "ghs_ABCDE" not in result + + +# --------------------------------------------------------------------------- +# _sanitize_agent_error +# --------------------------------------------------------------------------- + + +def test_sanitize_agent_error_redacts_unix_absolute_path(): + text = "Cannot read /home/user/projects/myapp/config.py" + result = _sanitize_agent_error(text) + assert "" in result + assert "/home/user/projects" not in result + + +def test_sanitize_agent_error_redacts_windows_absolute_path(): + text = r"File not found: C:\Users\rayli\project\src\main.py" + result = _sanitize_agent_error(text) + assert "" in result + assert r"C:\Users\rayli" not in result + + +def test_sanitize_agent_error_leaves_relative_paths_unchanged(): + text = "Could not open src/main.py for reading" + assert _sanitize_agent_error(text) == text + + +def test_sanitize_agent_error_leaves_plain_messages_unchanged(): + text = "Agent timed out after 300 seconds." + assert _sanitize_agent_error(text) == text diff --git a/tests/test_session_store.py b/tests/test_session_store.py index c02d429..eb9401e 100644 --- a/tests/test_session_store.py +++ b/tests/test_session_store.py @@ -1,6 +1,7 @@ from pathlib import Path import json +import pytest from coding_agent_telegram.session_store import SessionStore @@ -154,3 +155,299 @@ def test_rebind_session_returns_false_when_chat_state_is_missing(tmp_path: Path) store = SessionStore(state, backup) assert store.rebind_session("bot-a", 123, "old", "new") is False + + +# --------------------------------------------------------------------------- +# SessionStoreError — lock-timeout handling +# --------------------------------------------------------------------------- + + +def test_load_raises_session_store_error_on_lock_timeout(tmp_path, monkeypatch): + """load() must raise SessionStoreError when portalocker cannot acquire the lock.""" + import portalocker + + from coding_agent_telegram.session_store import SessionStoreError + + state = tmp_path / "state.json" + backup = tmp_path / "state.json.bak" + store = SessionStore(state, backup) + + class _FailingLock: + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + raise portalocker.LockException("simulated timeout") + + def __exit__(self, *args): + pass + + monkeypatch.setattr(portalocker, "Lock", _FailingLock) + + import pytest + + with pytest.raises(SessionStoreError, match="temporarily locked"): + store.load() + + +def test_save_raises_session_store_error_on_lock_timeout(tmp_path, monkeypatch): + """save() must raise SessionStoreError when portalocker cannot acquire the lock.""" + import portalocker + + from coding_agent_telegram.session_store import SessionStoreError + + state = tmp_path / "state.json" + backup = tmp_path / "state.json.bak" + store = SessionStore(state, backup) + + class _FailingLock: + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + raise portalocker.LockException("simulated timeout") + + def __exit__(self, *args): + pass + + monkeypatch.setattr(portalocker, "Lock", _FailingLock) + + import pytest + + with pytest.raises(SessionStoreError, match="temporarily locked"): + store.save({}) + + +# --------------------------------------------------------------------------- +# _load_unlocked: error handling branches +# --------------------------------------------------------------------------- + + +def test_load_unlocked_oserror_returns_default(tmp_path: Path, monkeypatch): + """When the state file raises OSError on read, _load_unlocked should return default.""" + from coding_agent_telegram.session_store import SessionStore + + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store._ensure_paths() + store.state_file.write_text("{}", encoding="utf-8") + + # Patch Path.read_text to raise OSError only for the state file + original_read_text = store.state_file.__class__.read_text + + def fail_read(self, *args, **kwargs): + if self == store.state_file: + raise OSError("Permission denied") + return original_read_text(self, *args, **kwargs) + + monkeypatch.setattr(store.state_file.__class__, "read_text", fail_read) + + result = store._load_unlocked() + assert isinstance(result, dict) + + +def test_load_unlocked_invalid_json_returns_default(tmp_path: Path): + """When state file contains invalid JSON, _load_unlocked should return default.""" + from coding_agent_telegram.session_store import SessionStore + + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store._ensure_paths() + store.state_file.write_text("NOT JSON!!!", encoding="utf-8") + + result = store._load_unlocked() + assert isinstance(result, dict) + + +def test_load_unlocked_non_dict_json_returns_default(tmp_path: Path): + """When state file contains valid JSON but not a dict, return default.""" + import json + from coding_agent_telegram.session_store import SessionStore + + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store._ensure_paths() + store.state_file.write_text(json.dumps([1, 2, 3]), encoding="utf-8") + + result = store._load_unlocked() + assert isinstance(result, dict) + + +# --------------------------------------------------------------------------- +# list_sessions and get_chat_state: lock errors raise SessionStoreError +# --------------------------------------------------------------------------- + + +def test_list_sessions_raises_session_store_error_on_lock_timeout(tmp_path, monkeypatch): + """list_sessions must raise SessionStoreError when the lock cannot be acquired.""" + import portalocker + from coding_agent_telegram.session_store import SessionStore, SessionStoreError + + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + + class FailLock: + def __enter__(self): + raise portalocker.LockException("locked") + def __exit__(self, *args): + pass + + monkeypatch.setattr(portalocker, "Lock", lambda *a, **kw: FailLock()) + + with pytest.raises(SessionStoreError): + store.list_sessions("bot1", 1) + + +def test_get_chat_state_raises_session_store_error_on_lock_timeout(tmp_path, monkeypatch): + """get_chat_state must raise SessionStoreError when the lock cannot be acquired.""" + import portalocker + from coding_agent_telegram.session_store import SessionStore, SessionStoreError + + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + + class FailLock: + def __enter__(self): + raise portalocker.LockException("locked") + def __exit__(self, *args): + pass + + monkeypatch.setattr(portalocker, "Lock", lambda *a, **kw: FailLock()) + + with pytest.raises(SessionStoreError): + store.get_chat_state("bot1", 1) + + +# --------------------------------------------------------------------------- +# save(): happy path (line 105) +# --------------------------------------------------------------------------- + + +def test_save_persists_state(tmp_path: Path): + """save() must persist a state dict to disk.""" + import json + + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store._ensure_paths() + + state = {"chats": {}, "trusted_projects": [], "custom": True} + store.save(state) + + loaded = json.loads(store.state_file.read_text(encoding="utf-8")) + assert loaded.get("custom") is True + + +# --------------------------------------------------------------------------- +# _mutate_state: lock exception raises SessionStoreError (lines 117-118) +# --------------------------------------------------------------------------- + + +def test_mutate_state_raises_session_store_error_on_lock_timeout(tmp_path, monkeypatch): + """_mutate_state must raise SessionStoreError when lock cannot be acquired.""" + import portalocker + from coding_agent_telegram.session_store import SessionStore, SessionStoreError + + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + + class FailLock: + def __enter__(self): + raise portalocker.LockException("locked") + def __exit__(self, *args): + pass + + monkeypatch.setattr(portalocker, "Lock", lambda *a, **kw: FailLock()) + + with pytest.raises(SessionStoreError): + store._mutate_state(lambda state: None) + + +# --------------------------------------------------------------------------- +# create_session: with branch_name sets current_branch (line 257) +# --------------------------------------------------------------------------- + + +def test_create_session_with_branch_name_sets_current_branch(tmp_path: Path): + """create_session with branch_name must store current_branch in chat state.""" + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store.create_session("bot1", 1, "ses1", "Session 1", "myproject", "codex", branch_name="feature-x") + + state = store.get_chat_state("bot1", 1) + assert state.get("current_branch") == "feature-x" + + +# --------------------------------------------------------------------------- +# rebind_session: all branches (lines 269-283) +# --------------------------------------------------------------------------- + + +def test_rebind_session_returns_false_when_session_not_found(tmp_path: Path): + """rebind_session must return False when old_session_id doesn't exist.""" + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store.create_session("bot1", 1, "ses1", "Session 1", "proj", "codex") + + result = store.rebind_session("bot1", 1, "nonexistent", "new-id") + assert result is False + + +def test_rebind_session_same_id_updates_timestamp(tmp_path: Path): + """rebind_session with old==new must update timestamp and return True.""" + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store.create_session("bot1", 1, "ses1", "Session 1", "proj", "codex") + + result = store.rebind_session("bot1", 1, "ses1", "ses1") + assert result is True + + +def test_rebind_session_renames_active_session(tmp_path: Path): + """rebind_session with a new ID must rename the session and update active_session_id.""" + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store.create_session("bot1", 1, "ses1", "Session 1", "proj", "codex") + + result = store.rebind_session("bot1", 1, "ses1", "ses2") + assert result is True + + sessions = store.list_sessions("bot1", 1) + assert "ses2" in sessions + assert "ses1" not in sessions + + state = store.get_chat_state("bot1", 1) + assert state.get("active_session_id") == "ses2" + + +# --------------------------------------------------------------------------- +# replace_session: with branch_name sets current_branch (line 257) +# --------------------------------------------------------------------------- + + +def test_replace_session_with_branch_name_sets_current_branch(tmp_path: Path): + """replace_session with branch_name must store current_branch in chat state.""" + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store.create_session("bot1", 1, "ses1", "Session 1", "proj", "codex") + store.replace_session("bot1", 1, "ses1", "ses2", "Session 2", "proj", "codex", branch_name="fix-x") + + state = store.get_chat_state("bot1", 1) + assert state.get("current_branch") == "fix-x" + + +# --------------------------------------------------------------------------- +# switch_session: returns False when session not found (line 333) +# --------------------------------------------------------------------------- + + +def test_switch_session_returns_false_when_session_id_missing(tmp_path: Path): + """switch_session must return False when the requested session_id doesn't exist.""" + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store.create_session("bot1", 1, "ses1", "Session 1", "proj", "codex") + + result = store.switch_session("bot1", 1, "nonexistent-id") + assert result is False + + +# --------------------------------------------------------------------------- +# switch_session: sets current_branch when session has one (line 339) +# --------------------------------------------------------------------------- + + +def test_switch_session_sets_current_branch_from_session(tmp_path: Path): + """switch_session must copy branch_name into chat_state.current_branch.""" + store = SessionStore(tmp_path / "state.json", tmp_path / "state.json.bak") + store.create_session("bot1", 1, "ses1", "Session 1", "proj", "codex", branch_name="my-branch") + + store.switch_session("bot1", 1, "ses1") + state = store.get_chat_state("bot1", 1) + assert state.get("current_branch") == "my-branch" diff --git a/tests/test_telegram_sender.py b/tests/test_telegram_sender.py index ed50b84..b42fd14 100644 --- a/tests/test_telegram_sender.py +++ b/tests/test_telegram_sender.py @@ -1,9 +1,16 @@ import asyncio from types import SimpleNamespace +import pytest from telegram.error import BadRequest -from coding_agent_telegram.telegram_sender import markdownish_to_html, send_code_block, send_html_text, send_text +from coding_agent_telegram.telegram_sender import ( + markdownish_to_html, + send_code_block, + send_html_text, + send_markdown_text, + send_text, +) def test_markdownish_to_html_handles_mixed_links_code_and_bold_without_unbalanced_code_tags(): @@ -89,3 +96,360 @@ async def send_message(self, chat_id, text, parse_mode=None): assert len(calls) >= 4 assert all(len(call[1]) <= 500 for call in calls) + + +# --------------------------------------------------------------------------- +# Null effective_chat guards +# --------------------------------------------------------------------------- + + +def test_send_text_does_nothing_when_effective_chat_is_none(): + """send_text must return without error when update.effective_chat is None.""" + called = [] + + class FakeBot: + async def send_message(self, chat_id, text, parse_mode=None): + called.append(text) + + update = SimpleNamespace(effective_chat=None) + context = SimpleNamespace(bot=FakeBot()) + + asyncio.run(send_text(update, context, "hello")) + assert called == [] + + +def test_send_html_text_does_nothing_when_effective_chat_is_none(): + called = [] + + class FakeBot: + async def send_message(self, chat_id, text, parse_mode=None): + called.append(text) + + update = SimpleNamespace(effective_chat=None) + context = SimpleNamespace(bot=FakeBot()) + + asyncio.run(send_html_text(update, context, "hi")) + assert called == [] + + +def test_send_code_block_does_nothing_when_effective_chat_is_none(): + called = [] + + class FakeBot: + async def send_message(self, chat_id, text, parse_mode=None): + called.append(text) + + update = SimpleNamespace(effective_chat=None) + context = SimpleNamespace(bot=FakeBot()) + + asyncio.run(send_code_block(update, context, "Output", "some code")) + assert called == [] + + +# --------------------------------------------------------------------------- +# _max_telegram_message_length fallbacks +# --------------------------------------------------------------------------- + + +def test_send_text_uses_default_length_when_no_bot_data(): + """Without bot_data, send_text must use the package default max length.""" + calls = [] + + class FakeBot: + async def send_message(self, chat_id, text, parse_mode=None): + calls.append(text) + + update = SimpleNamespace(effective_chat=SimpleNamespace(id=1)) + context = SimpleNamespace(bot=FakeBot()) # no bot_data attribute + + asyncio.run(send_text(update, context, "short message")) + assert len(calls) == 1 + + +# --------------------------------------------------------------------------- +# markdownish_to_html: non-http link falls back to code span +# --------------------------------------------------------------------------- + + +def test_markdownish_to_html_renders_non_http_link_as_code(): + from coding_agent_telegram.telegram_sender import markdownish_to_html + + result = markdownish_to_html("[relative/path](relative/path)") + assert "relative/path" in result + assert "bold" in result + + +# --------------------------------------------------------------------------- +# _split_plain_text_chunk edge cases +# --------------------------------------------------------------------------- + + +def test_split_text_chunks_handles_single_long_word_without_spaces(): + """A chunk with no whitespace must still be split at the midpoint.""" + from coding_agent_telegram.telegram_sender import _split_plain_text_chunk + + text = "A" * 200 + left, right = _split_plain_text_chunk(text) + assert left + assert right + assert left + right == text or len(left) + len(right) <= len(text) + + +def test_split_text_chunks_prefers_whitespace_split(): + from coding_agent_telegram.telegram_sender import _split_plain_text_chunk + + text = "first half second half" + left, right = _split_plain_text_chunk(text) + assert left + assert right + assert " " not in left[-1] or " " not in right[0] + + +# --------------------------------------------------------------------------- +# send_markdown_text happy path +# --------------------------------------------------------------------------- + + +def test_send_markdown_text_sends_message(): + """send_markdown_text must call bot.send_message with MARKDOWN parse mode.""" + calls = [] + + class FakeBot: + async def send_message(self, chat_id, text, parse_mode=None): + calls.append((chat_id, text, parse_mode)) + + from telegram.constants import ParseMode + + update = SimpleNamespace(effective_chat=SimpleNamespace(id=99)) + context = SimpleNamespace(bot=FakeBot()) + + asyncio.run(send_markdown_text(update, context, "**hello**")) + assert len(calls) == 1 + assert calls[0][0] == 99 + assert calls[0][1] == "**hello**" + assert calls[0][2] == ParseMode.MARKDOWN + + +def test_send_markdown_text_does_nothing_when_no_chat(): + """send_markdown_text must silently return when effective_chat is None.""" + called = [] + + class FakeBot: + async def send_message(self, **kwargs): + called.append(kwargs) + + update = SimpleNamespace(effective_chat=None) + context = SimpleNamespace(bot=FakeBot()) + + asyncio.run(send_markdown_text(update, context, "hi")) + assert called == [] + + +# --------------------------------------------------------------------------- +# send_html_text: BadRequest that is NOT a parse-entities error must re-raise +# --------------------------------------------------------------------------- + + +def test_send_html_text_reraises_non_parse_bad_request(): + """A BadRequest with a different message should propagate, not be swallowed.""" + from telegram.error import BadRequest + + class FakeBot: + async def send_message(self, chat_id, text, parse_mode=None): + raise BadRequest("Message is too long") + + update = SimpleNamespace(effective_chat=SimpleNamespace(id=1)) + context = SimpleNamespace(bot=FakeBot(), bot_data={}) + + import pytest + + with pytest.raises(BadRequest): + asyncio.run(send_html_text(update, context, "x")) + + +# --------------------------------------------------------------------------- +# markdownish_to_html: HTTP link renders as anchor tag +# --------------------------------------------------------------------------- + + +def test_markdownish_to_html_renders_http_link_as_anchor(): + """A markdown link with an http/https URL must become an HTML anchor.""" + from coding_agent_telegram.telegram_sender import markdownish_to_html + + result = markdownish_to_html("[GitHub](https://github.com)") + assert 'href="https://github.com"' in result + assert "GitHub" in result + + +# --------------------------------------------------------------------------- +# _split_plain_text_chunk: multi-line path and fallback paths +# --------------------------------------------------------------------------- + + +def test_split_plain_text_chunk_splits_multiline_at_midpoint(): + """Multi-line text must be split at the midpoint line.""" + from coding_agent_telegram.telegram_sender import _split_plain_text_chunk + + text = "line1\nline2\nline3\nline4" + left, right = _split_plain_text_chunk(text) + assert left + assert right + assert "line1" in left + assert "line3" in right + + +def test_split_plain_text_chunk_fallback_when_no_whitespace(): + """A string with no whitespace must still produce two non-empty parts.""" + from coding_agent_telegram.telegram_sender import _split_plain_text_chunk + + # Single-line with no internal whitespace forces the final fallback + text = "abcdefghij" + left, right = _split_plain_text_chunk(text) + assert left + assert right + assert left + right # both parts non-empty + + +# --------------------------------------------------------------------------- +# split_assistant_output: text with shell commands +# --------------------------------------------------------------------------- + + +def test_split_assistant_output_with_shell_command(): + """Lines starting with shell-command prefixes must become code segments.""" + from coding_agent_telegram.telegram_sender import split_assistant_output + + text = "Some description.\n$ git status\n$ git diff" + segments = split_assistant_output(text) + kinds = [s.kind for s in segments] + assert "code" in kinds + assert "prose" in kinds + + +def test_split_assistant_output_command_then_prose(): + """A command block followed by prose text must produce both segment types.""" + from coding_agent_telegram.telegram_sender import split_assistant_output + + text = "$ npm install\nDone. The packages are installed." + segments = split_assistant_output(text) + kinds = [s.kind for s in segments] + assert "code" in kinds + + +# --------------------------------------------------------------------------- +# _looks_like_shell_block +# --------------------------------------------------------------------------- + + +def test_looks_like_shell_block_true(): + """Text where all lines look like shell commands must return True.""" + from coding_agent_telegram.telegram_sender import _looks_like_shell_block + + assert _looks_like_shell_block("$ echo hello\n$ ls -la") + + +def test_looks_like_shell_block_false(): + """Plain prose must not be recognised as a shell block.""" + from coding_agent_telegram.telegram_sender import _looks_like_shell_block + + assert not _looks_like_shell_block("This is just regular text.") + + +def test_looks_like_shell_block_empty(): + """Empty text must return False.""" + from coding_agent_telegram.telegram_sender import _looks_like_shell_block + + assert not _looks_like_shell_block("") + + +# --------------------------------------------------------------------------- +# send_code_block without language parameter +# --------------------------------------------------------------------------- + + +def test_send_code_block_without_language(): + """send_code_block with no language must use plain
 tags."""
+    calls = []
+
+    class FakeBot:
+        async def send_message(self, chat_id, text, parse_mode=None):
+            calls.append(text)
+
+    update = SimpleNamespace(effective_chat=SimpleNamespace(id=7))
+    context = SimpleNamespace(bot=FakeBot(), bot_data={})
+
+    asyncio.run(send_code_block(update, context, "Output", "hello world"))
+    # Should produce a header message and a code message without language class
+    code_msg = calls[-1]
+    assert "
" in code_msg
+    assert "language-" not in code_msg
+
+
+# ---------------------------------------------------------------------------
+# _split_text_chunks: empty / whitespace-only input
+# ---------------------------------------------------------------------------
+
+
+def test_split_text_chunks_empty_string_returns_empty_list():
+    from coding_agent_telegram.telegram_sender import _split_text_chunks
+
+    assert _split_text_chunks("") == []
+    assert _split_text_chunks("   ") == []
+
+
+# ---------------------------------------------------------------------------
+# _split_plain_text_chunk: edge cases that hit the left/right fill-in lines
+# ---------------------------------------------------------------------------
+
+
+def test_split_plain_text_chunk_single_char_fills_left():
+    """Single char: midpoint=0, left becomes empty and is filled from text[:1]."""
+    from coding_agent_telegram.telegram_sender import _split_plain_text_chunk
+
+    left, right = _split_plain_text_chunk("x")
+    assert left  # must not be empty
+    assert right  # must not be empty
+
+
+def test_split_plain_text_chunk_single_space_fills_both():
+    """Single space: both halves become empty and are filled from text[:1]/text[-1:]."""
+    from coding_agent_telegram.telegram_sender import _split_plain_text_chunk
+
+    left, right = _split_plain_text_chunk(" ")
+    assert left
+    assert right
+
+
+# ---------------------------------------------------------------------------
+# split_assistant_output: blank line breaks command block (line 243)
+# and continuation lines are absorbed (lines 245-247)
+# ---------------------------------------------------------------------------
+
+
+def test_split_assistant_output_blank_line_breaks_command():
+    """A blank line after a command must end the command block."""
+    from coding_agent_telegram.telegram_sender import split_assistant_output
+
+    text = "$ git status\n\nSome prose after."
+    segments = split_assistant_output(text)
+    kinds = [s.kind for s in segments]
+    assert "code" in kinds
+
+
+def test_split_assistant_output_continuation_line_joins_command():
+    """A continuation line (starting with &&, |, etc.) must be merged into the command."""
+    from coding_agent_telegram.telegram_sender import split_assistant_output
+
+    text = "$ git add .\n&& git commit -m 'msg'\nDone."
+    segments = split_assistant_output(text)
+    code_segs = [s for s in segments if s.kind == "code"]
+    assert code_segs
+    # The continuation line must be part of the command segment
+    assert "&&" in code_segs[0].text