diff --git a/nextcloud_mcp_server/server/webdav.py b/nextcloud_mcp_server/server/webdav.py index c89f38968..6d2a02b31 100644 --- a/nextcloud_mcp_server/server/webdav.py +++ b/nextcloud_mcp_server/server/webdav.py @@ -1,5 +1,11 @@ +import atexit import base64 +import io import logging +import mimetypes +import os +import tempfile +import zipfile from mcp.server.fastmcp import Context, FastMCP from mcp.types import ToolAnnotations @@ -15,6 +21,286 @@ logger = logging.getLogger(__name__) +# Maximum uncompressed size (bytes) allowed when extracting a single archive +# member. Guards against zip-bomb attacks where a tiny compressed archive +# expands to an enormous member in memory. 50 MB is generous for XML/text +# content while still bounding worst-case memory use. +_MAX_MEMBER_BYTES: int = 50 * 1024 * 1024 # 50 MB + +# Registry of local temp paths created by nc_webdav_download_to_temp. +# Maps local_path -> owning_username so nc_webdav_cleanup_temp can verify +# that the caller is the same user who created the file, preventing one +# multi-user session from deleting another session's temp files. +# Dict mutation is safe in asyncio: single-threaded, GIL protects simple ops. +_temp_registry: dict[str, str] = {} + + +def _cleanup_temp_files_on_exit() -> None: + """Remove all temp files registered by nc_webdav_download_to_temp on process exit.""" + for path in list(_temp_registry): + try: + os.unlink(path) + logger.debug("atexit: removed temp file '%s'", path) + except OSError: + pass + + +atexit.register(_cleanup_temp_files_on_exit) + +# Maximum file size accepted by nc_webdav_download_to_temp. +# Prevents unbounded disk writes, especially in remote-HTTP deployments where +# the caller cannot use the local path anyway. +_MAX_TEMP_DOWNLOAD_BYTES: int = 500 * 1024 * 1024 # 500 MB + +# Maximum archive size for in-memory ZIP operations (list/read member). +# read_file() buffers the full archive in RAM; reject oversized archives +# before attempting extraction so workers don't OOM on huge ZIPs. +_MAX_ARCHIVE_BYTES: int = 100 * 1024 * 1024 # 100 MB + +# Maximum number of members returned by nc_webdav_list_archive_members. +# Large ZIP/JAR files can have thousands of entries; truncate to avoid +# flooding the MCP response and exhausting the context window. +_MAX_ARCHIVE_MEMBERS: int = 500 + +# --------------------------------------------------------------------------- +# Pure helpers — no MCP context required, fully unit-testable +# --------------------------------------------------------------------------- + +# Extensions always treated as UTF-8 text regardless of MIME type. +# Covers XML-based OOXML internals (.rels, .opf, .xhtml, .ncx) that +# mimetypes.guess_type() returns None or application/octet-stream for. +_TEXT_EXTENSIONS: frozenset[str] = frozenset( + { + ".xml", + ".json", + ".html", + ".xhtml", + ".css", + ".js", + ".svg", + ".txt", + ".md", + ".rels", # OOXML relationship files + ".opf", # EPUB Open Packaging Format + ".ncx", # EPUB Navigation Control + ".rdf", # RDF/XML metadata + ".plist", # Apple property list (XML form) + } +) + +# MIME types treated as text even when the extension doesn't match. +_TEXT_MIME_TYPES: frozenset[str] = frozenset( + { + "application/xml", + "application/json", + "application/javascript", + "application/xhtml+xml", + } +) + + +def _list_zip_members( + content: bytes, path: str, content_type: str, max_members: int = 500 +) -> dict: + """Return the member listing of a ZIP archive as a plain dict. + + Args: + content: Raw bytes of the archive. + path: Nextcloud path (used only in error messages). + content_type: MIME type reported by Nextcloud (included in result). + max_members: Maximum number of members to include in the result. + The total member count is always reported; a + ``truncated`` flag is set when the list is cut. + + Returns: + Dict with path, content_type, archive_size, member_count, members, + and an optional truncated=True when the list exceeds max_members. + + Raises: + ValueError: if *content* is not a valid ZIP archive. + """ + try: + with zipfile.ZipFile(io.BytesIO(content)) as zf: + all_infos = zf.infolist() + members = [ + { + "name": info.filename, + "size": info.file_size, + "compressed_size": info.compress_size, + "is_dir": info.is_dir(), + } + for info in all_infos[:max_members] + ] + except zipfile.BadZipFile as exc: + raise ValueError( + f"'{path}' (content-type: {content_type}) is not a valid ZIP archive. " + f"For plain text files use nc_webdav_read_file; for images/video/audio " + f"use nc_webdav_download_to_temp." + ) from exc + + total = len(all_infos) + result: dict = { + "path": path, + "content_type": content_type, + "archive_size": len(content), + "member_count": total, + "members": members, + } + if total > max_members: + result["truncated"] = True + result["truncated_at"] = max_members + return result + + +def _read_zip_member(content: bytes, path: str, member_path: str) -> dict: + """Extract and return a single member from a ZIP archive. + + Text members (detected by MIME type or file extension) are returned as + UTF-8 strings. Binary members are base64-encoded. + + Args: + content: Raw bytes of the archive. + path: Nextcloud path (used only in error messages). + member_path: Path of the member inside the archive. + + Returns: + Dict with archive_path, member_path, content, content_type, size, + and optionally encoding="base64" for binary members. + + Raises: + ValueError: if the archive is invalid, the member is missing, or + the uncompressed member size exceeds _MAX_MEMBER_BYTES. + """ + try: + with zipfile.ZipFile(io.BytesIO(content)) as zf: + try: + info = zf.getinfo(member_path) + except KeyError as exc: + available = [i.filename for i in zf.infolist() if not i.is_dir()] + raise ValueError( + f"Member '{member_path}' not found in '{path}'. " + f"Available files: {available[:30]}" + + (" (truncated)" if len(available) > 30 else "") + ) from exc + + if info.is_dir(): + raise ValueError( + f"Member '{member_path}' is a directory entry; " + f"only file members can be read." + ) + + if info.file_size > _MAX_MEMBER_BYTES: + raise ValueError( + f"Member '{member_path}' uncompressed size " + f"({info.file_size:,} bytes) exceeds the " + f"{_MAX_MEMBER_BYTES // (1024 * 1024)} MB limit. " + f"Use nc_webdav_download_to_temp and extract locally." + ) + + member_bytes = zf.read(member_path) + except zipfile.BadZipFile as exc: + raise ValueError(f"'{path}' is not a valid ZIP archive.") from exc + + member_mime = mimetypes.guess_type(member_path)[0] or "application/octet-stream" + basename = os.path.basename(member_path) + ext = os.path.splitext(basename)[1].lower() + # Dotfiles like ".rels" have no extension per splitext; treat the whole name as the extension. + if not ext and basename.startswith("."): + ext = basename.lower() + + is_text = ( + member_mime.startswith("text/") + or member_mime in _TEXT_MIME_TYPES + or ext in _TEXT_EXTENSIONS + ) + + # Content-sniff fallback: if the extension/MIME heuristics didn't fire + # (e.g. extensionless members like ODF's "mimetype"), try UTF-8 decoding + # and reject if null bytes are present (the classic binary-vs-text probe). + if not is_text and b"\x00" not in member_bytes: + try: + member_bytes.decode("utf-8") + is_text = True + except UnicodeDecodeError: + pass + + if is_text: + try: + return { + "archive_path": path, + "member_path": member_path, + "content": member_bytes.decode("utf-8"), + "content_type": member_mime, + "size": len(member_bytes), + } + except UnicodeDecodeError: + pass # fall through to base64 + + return { + "archive_path": path, + "member_path": member_path, + "content": base64.b64encode(member_bytes).decode("ascii"), + "content_type": member_mime, + "size": len(member_bytes), + "encoding": "base64", + } + + +def _cleanup_temp_path(local_path: str, owner: str | None = None) -> dict: + """Remove a temp file that was registered by nc_webdav_download_to_temp. + + Only paths present in *_temp_registry* may be removed. When *owner* is + supplied (the Nextcloud username of the calling session) the registry entry + must also match that username, preventing one multi-user session from + deleting another session's temp files. + + The registry entry is discarded only after a successful unlink (or when the + file is already gone); it is retained on OSError so the caller can retry. + + Args: + local_path: The path previously returned by nc_webdav_download_to_temp. + owner: Username of the requesting session. Pass ``None`` only in + contexts where ownership cannot be determined (e.g. atexit). + + Returns: + Dict with ``status`` ("ok" or "error"), ``local_path``, and an optional + ``message`` / ``note`` field. + """ + registered_owner = _temp_registry.get(local_path) + if registered_owner is None: + return { + "status": "error", + "local_path": local_path, + "message": ( + "Path was not created by nc_webdav_download_to_temp in this " + "session, or has already been cleaned up." + ), + } + + if owner is not None and registered_owner != owner: + return { + "status": "error", + "local_path": local_path, + "message": "Permission denied: this temp file belongs to a different session.", + } + + try: + os.unlink(local_path) + del _temp_registry[local_path] + logger.debug("Removed temp file '%s'", local_path) + return {"status": "ok", "local_path": local_path} + except FileNotFoundError: + # File already gone — treat as success and clean up registry. + _temp_registry.pop(local_path, None) + return { + "status": "ok", + "local_path": local_path, + "note": "File was already removed.", + } + except OSError as exc: + # Do NOT remove from registry — leave so the caller can retry. + return {"status": "error", "local_path": local_path, "message": str(exc)} + def configure_webdav_tools(mcp: FastMCP): # WebDAV file system tools @@ -68,16 +354,47 @@ async def nc_webdav_list_directory( @require_scopes("files.read") @instrument_tool async def nc_webdav_read_file(path: str, ctx: Context): - """Read the content of a file from NextCloud. + """Read a file from Nextcloud and return its content inline. + + IMPORTANT — choose the right tool for the file type: + + ✅ Use THIS tool for: + - Plain text files (Markdown, CSV, JSON, XML, YAML, source code, logs) + that fit in the context window (roughly < 1 MB of text). + - PDFs, when the document-processing feature is enabled server-side + (text is extracted automatically). + + ❌ Do NOT use this tool for: + - ZIP-based office formats (ODS, ODT, ODP, DOCX, XLSX, PPTX, EPUB …). + If server-side document processing is enabled (ENABLE_DOCUMENT_PROCESSING=true) + and a processor supports the type (e.g. Unstructured handles DOCX/XLSX), + text is extracted automatically — check the server configuration. + When doc-processing is disabled or unsupported for the type, the raw + archive bytes are meaningless in context; use + nc_webdav_list_archive_members + nc_webdav_read_archive_member instead. + - Images (PNG, JPEG, GIF, TIFF, HEIC, RAW …). + Binary image data cannot be interpreted here. Use + nc_webdav_download_to_temp and process locally with tools such as + `convert`, `exiftool`, or `ffmpeg` — only if you have local shell access. + - Audio or video files (MP4, MKV, MP3, FLAC …). + Use nc_webdav_download_to_temp + `ffmpeg`/`ffprobe` if you have shell + access; otherwise these files cannot be processed via MCP. + - Any binary file larger than ~1 MB. The file will be returned as a + base64 blob that wastes the entire context without yielding useful + information. Check the file size with nc_webdav_list_directory first. + + Fallback behaviour (binary files not covered above): + The raw bytes are base64-encoded and returned. This is rarely useful + — prefer the dedicated tools described above. Args: path: Full path to the file to read Returns: Dict with path, content, content_type, size, and optional parsing metadata - - Text files are decoded to UTF-8 - - Documents (PDF, DOCX, etc.) are parsed and text is extracted - - Other binary files are base64 encoded + - Text files: content decoded to UTF-8 string + - PDFs (doc-processing enabled): extracted plain text + - Other binary files: content base64-encoded (avoid for large files) """ client = await get_client(ctx) content, content_type = await client.webdav.read_file(path) @@ -481,3 +798,227 @@ async def nc_webdav_list_favorites( scope=scope, filters_applied={"only_favorites": True}, ) + + @mcp.tool( + title="List Archive Members", + annotations=ToolAnnotations( + readOnlyHint=True, + openWorldHint=True, + ), + ) + @require_scopes("files.read") + @instrument_tool + async def nc_webdav_list_archive_members(path: str, ctx: Context) -> dict: + """List the files contained inside a ZIP-based archive stored in Nextcloud. + + Supported archive formats (all are ZIP-based): + Office: ODS, ODT, ODP, ODG, DOCX, XLSX, PPTX + Other: ZIP, JAR, EPUB + + Use this tool first to discover the internal structure of an archive, + then call nc_webdav_read_archive_member to read a specific member. + + Typical ODF layout: + mimetype — identifies the ODF sub-type + content.xml — document content + styles.xml — formatting styles + meta.xml — document metadata + settings.xml — application settings + META-INF/manifest.xml — archive manifest + + Args: + path: Nextcloud path to the archive file (e.g. "Documents/report.ods") + + Returns: + Dict with path, content_type, archive_size, member_count, and a + members list (capped at 500 entries). Each member has: name, + size (uncompressed), compressed_size, is_dir. If the archive + has more than 500 members the result also contains + truncated=True and truncated_at=500. + + Raises: + ValueError: if the archive exceeds 100 MB or is not valid ZIP + """ + client = await get_client(ctx) + content, content_type = await client.webdav.read_file(path) + if len(content) > _MAX_ARCHIVE_BYTES: + raise ValueError( + f"Archive '{path}' is {len(content):,} bytes, which exceeds the " + f"{_MAX_ARCHIVE_BYTES // (1024 * 1024)} MB in-memory limit. " + f"Use nc_webdav_download_to_temp to work with it locally." + ) + return _list_zip_members( + content, path, content_type, max_members=_MAX_ARCHIVE_MEMBERS + ) + + @mcp.tool( + title="Read Archive Member", + annotations=ToolAnnotations( + readOnlyHint=True, + openWorldHint=True, + ), + ) + @require_scopes("files.read") + @instrument_tool + async def nc_webdav_read_archive_member( + path: str, member_path: str, ctx: Context + ) -> dict: + """Extract and return a single file from inside a ZIP-based archive in Nextcloud. + + The whole archive is downloaded, but only the requested member is + returned — it never appears in the context as a base64 blob. + + Supported archive formats: ODS, ODT, ODP, ODG, DOCX, XLSX, PPTX, + ZIP, JAR, EPUB (anything that Python's zipfile module can open). + + Typical use-cases: + - Read content.xml from an ODS/ODT/ODP to get document content + - Read word/document.xml from a DOCX + - Read xl/worksheets/sheet1.xml from an XLSX + - Inspect META-INF/manifest.xml to understand archive structure + + Use nc_webdav_list_archive_members first to discover available member paths. + + Args: + path: Nextcloud path to the archive (e.g. "Documents/budget.ods") + member_path: Path of the member inside the archive + (e.g. "content.xml" or "META-INF/manifest.xml") + + Returns: + Dict with archive_path, member_path, content, content_type, size. + Text members (XML, HTML, JSON, plain text …) are returned as UTF-8 + strings. Binary members are base64-encoded with encoding="base64". + + Raises: + ValueError: if the archive is not valid ZIP, or the member is not found + """ + client = await get_client(ctx) + content, _ = await client.webdav.read_file(path) + if len(content) > _MAX_ARCHIVE_BYTES: + raise ValueError( + f"Archive '{path}' is {len(content):,} bytes, which exceeds the " + f"{_MAX_ARCHIVE_BYTES // (1024 * 1024)} MB in-memory limit. " + f"Use nc_webdav_download_to_temp to work with it locally." + ) + return _read_zip_member(content, path, member_path) + + @mcp.tool( + title="Download File to Temp", + annotations=ToolAnnotations( + # Not read-only: creates a temp file on disk and mutates _temp_registry. + idempotentHint=False, + openWorldHint=True, + ), + ) + @require_scopes("files.read") + @instrument_tool + async def nc_webdav_download_to_temp(path: str, ctx: Context) -> dict: + """Download a Nextcloud file to a local temporary path and return that path. + + IMPORTANT — this tool only makes sense when the MCP server is running as + a local process on the same machine as the client (stdio transport or + localhost SSE). Over a remote streamable-HTTP connection the temp file is + written to the *server's* filesystem, where local shell tools cannot + reach it. In that case use nc_webdav_read_file or the archive member + tools instead. + + Even in local mode this tool is only useful when you have access to shell + tools (e.g. Claude Code's Bash tool). In Claude Desktop without shell + access the returned path cannot be acted upon and you should not call + this tool. + + Use this tool for file types that require native processing: + Images — then use: convert, exiftool, ffmpeg, identify + Video — then use: ffmpeg, ffprobe, mediainfo + Audio — then use: ffmpeg, ffprobe, sox + PDFs — then use: pdftotext, pdfinfo, pdftk, mutool + Archives — for formats NOT supported by nc_webdav_list_archive_members + (e.g. .tar.gz, .7z, .rar): use tar, 7z, unrar + Any large binary that requires local tooling + + For ZIP-based office formats (ODS, DOCX, XLSX …) prefer + nc_webdav_list_archive_members + nc_webdav_read_archive_member — + they avoid creating temp files entirely. + + Cleanup: always call nc_webdav_cleanup_temp when finished to free disk + space. All remaining temp files are also removed automatically when the + MCP server process exits (via an atexit handler). + + Args: + path: Nextcloud path to the file (e.g. "Videos/holiday.mp4") + + Returns: + Dict with: + local_path — absolute path on the local filesystem + original_path — original Nextcloud path + filename — basename of the original file + content_type — MIME type reported by Nextcloud + size — file size in bytes + """ + client = await get_client(ctx) + content, content_type = await client.webdav.read_file(path) + + if len(content) > _MAX_TEMP_DOWNLOAD_BYTES: + raise ValueError( + f"File '{path}' is {len(content):,} bytes, which exceeds the " + f"{_MAX_TEMP_DOWNLOAD_BYTES // (1024 * 1024)} MB limit for " + f"nc_webdav_download_to_temp." + ) + + filename = os.path.basename(path.rstrip("/")) + _root, suffix = os.path.splitext(filename) + + fd, local_path = tempfile.mkstemp(suffix=suffix, prefix="nc_download_") + try: + with os.fdopen(fd, "wb") as fh: + fh.write(content) + except Exception: + try: + os.unlink(local_path) + except OSError: + pass + raise + + _temp_registry[local_path] = client.username + logger.debug( + "Downloaded '%s' to temp path '%s' (%d bytes)", + path, + local_path, + len(content), + ) + + return { + "local_path": local_path, + "original_path": path, + "filename": filename, + "content_type": content_type, + "size": len(content), + } + + @mcp.tool( + title="Remove Temp File", + annotations=ToolAnnotations( + destructiveHint=True, + idempotentHint=False, # errors on second call (path no longer in registry) + openWorldHint=False, # operates on local filesystem only + ), + ) + @require_scopes("files.read") + @instrument_tool + async def nc_webdav_cleanup_temp(local_path: str, ctx: Context) -> dict: + """Remove a temporary file created by nc_webdav_download_to_temp. + + Only paths that were created by nc_webdav_download_to_temp in this + server session can be removed — arbitrary filesystem paths are rejected. + + Call this when you are done processing a downloaded file to free + disk space. + + Args: + local_path: The local_path value returned by nc_webdav_download_to_temp + + Returns: + Dict with status ("ok" or "error") and the local_path. + """ + client = await get_client(ctx) + return _cleanup_temp_path(local_path, owner=client.username) diff --git a/tests/unit/test_webdav_archive_tools.py b/tests/unit/test_webdav_archive_tools.py new file mode 100644 index 000000000..12c0fcafe --- /dev/null +++ b/tests/unit/test_webdav_archive_tools.py @@ -0,0 +1,596 @@ +"""Unit tests for WebDAV archive-member and temp-download tools. + +All tests call the real production functions (_list_zip_members, +_read_zip_member, _cleanup_temp_path, _cleanup_temp_files_on_exit, +_temp_registry) so that regressions in the implementation are caught rather +than just verifying stdlib zipfile behaviour. +""" + +import io +import os +import zipfile + +import pytest + +import nextcloud_mcp_server.server.webdav as webdav_module +from nextcloud_mcp_server.server.webdav import ( + _cleanup_temp_path, + _list_zip_members, + _read_zip_member, + _temp_registry, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def make_zip(members: dict[str, bytes]) -> bytes: + """Build an in-memory ZIP archive from a {name: content} mapping.""" + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as zf: + for name, data in members.items(): + zf.writestr(name, data) + return buf.getvalue() + + +# --------------------------------------------------------------------------- +# _list_zip_members +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +def test_list_members_returns_expected_structure(): + """_list_zip_members returns correct member names, sizes, and metadata.""" + content = make_zip( + { + "mimetype": b"application/vnd.oasis.opendocument.spreadsheet", + "content.xml": b"", + "META-INF/manifest.xml": b"", + } + ) + result = _list_zip_members( + content, "test.ods", "application/vnd.oasis.opendocument.spreadsheet" + ) + + assert result["path"] == "test.ods" + assert result["member_count"] == 3 + assert result["archive_size"] == len(content) + + names = {m["name"] for m in result["members"]} + assert names == {"mimetype", "content.xml", "META-INF/manifest.xml"} + + content_xml = next(m for m in result["members"] if m["name"] == "content.xml") + assert content_xml["size"] == len(b"") + assert content_xml["is_dir"] is False + + +@pytest.mark.unit +def test_list_members_bad_zip_raises_value_error(): + """_list_zip_members raises ValueError (not BadZipFile) for non-ZIP bytes.""" + with pytest.raises(ValueError, match="not a valid ZIP archive"): + _list_zip_members(b"this is not a zip", "bad.ods", "application/octet-stream") + + +@pytest.mark.unit +def test_list_members_truncated_when_over_limit(): + """_list_zip_members truncates results and sets truncated=True when limit exceeded.""" + members = {f"file_{i}.xml": b"" for i in range(10)} + content = make_zip(members) + result = _list_zip_members(content, "big.zip", "application/zip", max_members=3) + + assert result["member_count"] == 10 + assert len(result["members"]) == 3 + assert result["truncated"] is True + assert result["truncated_at"] == 3 + + +@pytest.mark.unit +def test_list_members_no_truncation_flag_when_within_limit(): + """_list_zip_members does not set truncated when all members fit.""" + content = make_zip({"a.xml": b"", "b.xml": b""}) + result = _list_zip_members(content, "small.zip", "application/zip", max_members=10) + + assert result["member_count"] == 2 + assert len(result["members"]) == 2 + assert "truncated" not in result + + +@pytest.mark.unit +def test_list_members_includes_content_type_in_result(): + """content_type from Nextcloud is passed through to the result dict.""" + content = make_zip({"x.xml": b""}) + mime = "application/vnd.oasis.opendocument.spreadsheet" + result = _list_zip_members(content, "sheet.ods", mime) + assert result["content_type"] == mime + + +# --------------------------------------------------------------------------- +# _read_zip_member — text detection +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +def test_read_member_xml_returned_as_utf8(): + """XML member (content.xml) is returned as a UTF-8 string, not base64.""" + xml = b"hello" + content = make_zip({"content.xml": xml}) + result = _read_zip_member(content, "test.ods", "content.xml") + + assert result["content"] == xml.decode("utf-8") + assert "encoding" not in result + assert result["size"] == len(xml) + + +@pytest.mark.unit +def test_read_member_rels_returned_as_utf8(): + """.rels files (OOXML relationship files) are returned as text, not base64.""" + rels = b'' + content = make_zip({"_rels/.rels": rels}) + result = _read_zip_member(content, "test.docx", "_rels/.rels") + assert result["content"] == rels.decode("utf-8") + assert "encoding" not in result + + +@pytest.mark.unit +def test_read_member_xhtml_returned_as_utf8(): + """.xhtml files (common in EPUB) are returned as text.""" + xhtml = b"hello" + content = make_zip({"OEBPS/chapter1.xhtml": xhtml}) + result = _read_zip_member(content, "book.epub", "OEBPS/chapter1.xhtml") + assert result["content"] == xhtml.decode("utf-8") + assert "encoding" not in result + + +@pytest.mark.unit +def test_read_member_opf_returned_as_utf8(): + """.opf files (EPUB Open Packaging Format) are returned as text.""" + opf = b"" + content = make_zip({"OEBPS/content.opf": opf}) + result = _read_zip_member(content, "book.epub", "OEBPS/content.opf") + assert result["content"] == opf.decode("utf-8") + assert "encoding" not in result + + +@pytest.mark.unit +def test_read_member_extensionless_text_returned_as_utf8(): + """Extensionless text members (e.g. ODF 'mimetype') are detected via content sniff.""" + mime_content = b"application/vnd.oasis.opendocument.spreadsheet" + content = make_zip({"mimetype": mime_content}) + result = _read_zip_member(content, "test.ods", "mimetype") + assert result["content"] == mime_content.decode("utf-8") + assert "encoding" not in result + + +@pytest.mark.unit +def test_read_member_binary_returned_as_base64(): + """Binary members (e.g. embedded images) are base64-encoded.""" + import base64 + + png_bytes = b"\x89PNG\r\n\x1a\n" + b"\x00" * 20 + content = make_zip({"image.png": png_bytes}) + result = _read_zip_member(content, "test.ods", "image.png") + + assert result["encoding"] == "base64" + assert base64.b64decode(result["content"]) == png_bytes + + +# --------------------------------------------------------------------------- +# _read_zip_member — error paths +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +def test_read_member_directory_entry_raises_value_error(): + """Passing a directory member path raises ValueError, not a stdlib error.""" + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + # ZipFile.mkdir creates a directory entry + zf.mkdir("subdir/") + zf.writestr("subdir/content.xml", b"") + content = buf.getvalue() + + with pytest.raises(ValueError, match="directory entry"): + _read_zip_member(content, "test.ods", "subdir/") + + +@pytest.mark.unit +def test_read_member_missing_member_raises_value_error(): + """Missing member raises ValueError with the available file list.""" + content = make_zip({"content.xml": b""}) + with pytest.raises(ValueError, match="not found"): + _read_zip_member(content, "test.ods", "nonexistent.xml") + + +@pytest.mark.unit +def test_read_member_bad_zip_raises_value_error(): + """Non-ZIP bytes raise ValueError (not BadZipFile).""" + with pytest.raises(ValueError, match="not a valid ZIP archive"): + _read_zip_member(b"garbage", "test.ods", "content.xml") + + +@pytest.mark.unit +def test_read_member_size_limit_enforced(monkeypatch): + """Members exceeding _MAX_MEMBER_BYTES raise ValueError before extraction.""" + monkeypatch.setattr(webdav_module, "_MAX_MEMBER_BYTES", 10) + + large_data = b"x" * 100 # well above the patched 10-byte limit + content = make_zip({"big.xml": large_data}) + + with pytest.raises(ValueError, match="exceeds the"): + _read_zip_member(content, "test.ods", "big.xml") + + +@pytest.mark.unit +def test_read_member_size_limit_not_triggered_for_small_member(monkeypatch): + """Members within _MAX_MEMBER_BYTES are extracted without error.""" + monkeypatch.setattr(webdav_module, "_MAX_MEMBER_BYTES", 200) + + small_data = b"" * 10 # 40 bytes, well within 200 + content = make_zip({"small.xml": small_data}) + + result = _read_zip_member(content, "test.ods", "small.xml") + assert result["content"] == small_data.decode("utf-8") + + +# --------------------------------------------------------------------------- +# _cleanup_temp_files_on_exit and _temp_registry +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +def test_atexit_handler_removes_registered_files(tmp_path): + """atexit handler deletes every path currently in the registry.""" + paths = [] + for i in range(3): + p = tmp_path / f"nc_download_test_{i}.bin" + p.write_bytes(b"data") + paths.append(str(p)) + _temp_registry[str(p)] = "testuser" + + try: + webdav_module._cleanup_temp_files_on_exit() + for path in paths: + assert not os.path.exists(path) + finally: + for path in paths: + _temp_registry.pop(path, None) + + +@pytest.mark.unit +def test_atexit_handler_tolerates_already_deleted_files(tmp_path): + """atexit handler does not raise if a registered file was already removed.""" + p = tmp_path / "nc_download_gone.bin" + _temp_registry[str(p)] = "testuser" + try: + webdav_module._cleanup_temp_files_on_exit() # must not raise + finally: + _temp_registry.pop(str(p), None) + + +# --------------------------------------------------------------------------- +# _cleanup_temp_path — calls real production helper +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +def test_cleanup_temp_rejects_unregistered_path(tmp_path): + """_cleanup_temp_path returns an error dict for paths not in _temp_registry.""" + p = tmp_path / "arbitrary.bin" + p.write_bytes(b"secret") + path = str(p) + + assert path not in _temp_registry + + result = _cleanup_temp_path(path, owner="alice") + + assert result["status"] == "error" + assert "session" in result["message"].lower() + # File must be untouched + assert os.path.exists(path) + + +@pytest.mark.unit +def test_cleanup_temp_rejects_wrong_owner(tmp_path): + """_cleanup_temp_path rejects callers who don't own the file.""" + p = tmp_path / "nc_download_alice.bin" + p.write_bytes(b"payload") + path = str(p) + _temp_registry[path] = "alice" + + try: + result = _cleanup_temp_path(path, owner="bob") + + assert result["status"] == "error" + assert "permission" in result["message"].lower() + # File must be untouched + assert os.path.exists(path) + assert path in _temp_registry + finally: + _temp_registry.pop(path, None) + if p.exists(): + p.unlink() + + +@pytest.mark.unit +def test_cleanup_temp_success(tmp_path): + """_cleanup_temp_path deletes the file and removes it from the registry.""" + p = tmp_path / "nc_download_test.bin" + p.write_bytes(b"payload") + path = str(p) + _temp_registry[path] = "alice" + + try: + result = _cleanup_temp_path(path, owner="alice") + + assert result["status"] == "ok" + assert result["local_path"] == path + assert not os.path.exists(path) + assert path not in _temp_registry + finally: + _temp_registry.pop(path, None) + + +@pytest.mark.unit +def test_cleanup_temp_registry_preserved_on_oserror(tmp_path, monkeypatch): + """Registry entry is NOT discarded when os.unlink raises OSError (allows retry).""" + p = tmp_path / "nc_download_locked.bin" + p.write_bytes(b"payload") + path = str(p) + _temp_registry[path] = "alice" + + def _raise(*_a, **_kw): + raise OSError("permission denied") + + monkeypatch.setattr(os, "unlink", _raise) + + try: + result = _cleanup_temp_path(path, owner="alice") + + assert result["status"] == "error" + assert "permission denied" in result["message"] + # Entry must remain so the caller can retry. + assert path in _temp_registry + finally: + _temp_registry.pop(path, None) + monkeypatch.undo() + if p.exists(): + p.unlink() + + +@pytest.mark.unit +def test_cleanup_temp_file_not_found_discards_registry(tmp_path): + """FileNotFoundError (file already gone) still removes the registry entry.""" + path = str(tmp_path / "nc_download_gone.bin") + # Register a path for a file that does NOT exist on disk. + _temp_registry[path] = "alice" + + try: + result = _cleanup_temp_path(path, owner="alice") + + assert result["status"] == "ok" + assert path not in _temp_registry + finally: + _temp_registry.pop(path, None) + + +# --------------------------------------------------------------------------- +# MCP tool wiring — invoke real tool closures via FastMCP tool.run() +# +# tool.run(args_dict, context=None) passes ctx=None to the tool function. +# require_scopes treats ctx=None as BasicAuth mode and skips scope checks, +# so we can test the full wiring (get_client → read_file → helper → result) +# by mocking get_client at the module level. +# --------------------------------------------------------------------------- + + +def _make_tool_map(): + """Build a FastMCP instance and return its tool map (name → Tool).""" + from mcp.server.fastmcp import FastMCP + + from nextcloud_mcp_server.server.webdav import configure_webdav_tools + + mcp = FastMCP("test") + configure_webdav_tools(mcp) + return {t.name: t for t in mcp._tool_manager.list_tools()} + + +@pytest.mark.unit +async def test_tool_list_archive_members_wiring(mocker): + """nc_webdav_list_archive_members calls read_file and delegates to _list_zip_members.""" + zip_bytes = make_zip({"content.xml": b"", "mimetype": b"application/ods"}) + mock_client = mocker.AsyncMock() + mock_client.webdav.read_file = mocker.AsyncMock( + return_value=(zip_bytes, "application/vnd.oasis.opendocument.spreadsheet") + ) + mocker.patch( + "nextcloud_mcp_server.server.webdav.get_client", return_value=mock_client + ) + + tools = _make_tool_map() + result = await tools["nc_webdav_list_archive_members"].run( + {"path": "docs/test.ods"}, context=None + ) + + mock_client.webdav.read_file.assert_awaited_once_with("docs/test.ods") + assert result["path"] == "docs/test.ods" + assert result["member_count"] == 2 + assert result["content_type"] == "application/vnd.oasis.opendocument.spreadsheet" + + +@pytest.mark.unit +async def test_tool_read_archive_member_wiring(mocker): + """nc_webdav_read_archive_member calls read_file and delegates to _read_zip_member.""" + xml = b"" + zip_bytes = make_zip({"content.xml": xml}) + mock_client = mocker.AsyncMock() + mock_client.webdav.read_file = mocker.AsyncMock( + return_value=(zip_bytes, "application/zip") + ) + mocker.patch( + "nextcloud_mcp_server.server.webdav.get_client", return_value=mock_client + ) + + tools = _make_tool_map() + result = await tools["nc_webdav_read_archive_member"].run( + {"path": "docs/test.ods", "member_path": "content.xml"}, context=None + ) + + mock_client.webdav.read_file.assert_awaited_once_with("docs/test.ods") + assert result["content"] == xml.decode("utf-8") + assert "encoding" not in result + + +@pytest.mark.unit +async def test_tool_download_to_temp_writes_file_and_registers_owner(mocker): + """nc_webdav_download_to_temp writes bytes to disk and records username in registry.""" + file_content = b"binary payload" + mock_client = mocker.AsyncMock() + mock_client.webdav.read_file = mocker.AsyncMock( + return_value=(file_content, "application/octet-stream") + ) + mock_client.username = "alice" + mocker.patch( + "nextcloud_mcp_server.server.webdav.get_client", return_value=mock_client + ) + + tools = _make_tool_map() + result = await tools["nc_webdav_download_to_temp"].run( + {"path": "Videos/clip.mp4"}, context=None + ) + + local_path = result["local_path"] + try: + assert result["filename"] == "clip.mp4" + assert result["size"] == len(file_content) + assert os.path.exists(local_path) + assert open(local_path, "rb").read() == file_content + # Ownership must be recorded under alice's username + assert _temp_registry.get(local_path) == "alice" + finally: + _temp_registry.pop(local_path, None) + if os.path.exists(local_path): + os.unlink(local_path) + + +@pytest.mark.unit +async def test_tool_cleanup_temp_passes_owner_from_client(mocker, tmp_path): + """nc_webdav_cleanup_temp passes client.username as owner to _cleanup_temp_path.""" + p = tmp_path / "nc_download_wiring.bin" + p.write_bytes(b"data") + path = str(p) + _temp_registry[path] = "alice" + + mock_client = mocker.AsyncMock() + mock_client.username = "alice" + mocker.patch( + "nextcloud_mcp_server.server.webdav.get_client", return_value=mock_client + ) + + try: + tools = _make_tool_map() + result = await tools["nc_webdav_cleanup_temp"].run( + {"local_path": path}, context=None + ) + + assert result["status"] == "ok" + assert not os.path.exists(path) + assert path not in _temp_registry + finally: + _temp_registry.pop(path, None) + if p.exists(): + p.unlink() + + +@pytest.mark.unit +async def test_tool_list_archive_members_rejects_oversized_archive(mocker): + """nc_webdav_list_archive_members raises ToolError when archive exceeds _MAX_ARCHIVE_BYTES.""" + from mcp.server.fastmcp.exceptions import ToolError + + oversized = b"x" * 200 + mock_client = mocker.AsyncMock() + mock_client.webdav.read_file = mocker.AsyncMock( + return_value=(oversized, "application/zip") + ) + mocker.patch( + "nextcloud_mcp_server.server.webdav.get_client", return_value=mock_client + ) + mocker.patch("nextcloud_mcp_server.server.webdav._MAX_ARCHIVE_BYTES", 100) + + tools = _make_tool_map() + with pytest.raises(ToolError, match="exceeds the"): + await tools["nc_webdav_list_archive_members"].run( + {"path": "huge.zip"}, context=None + ) + + +@pytest.mark.unit +async def test_tool_read_archive_member_rejects_oversized_archive(mocker): + """nc_webdav_read_archive_member raises ToolError when archive exceeds _MAX_ARCHIVE_BYTES.""" + from mcp.server.fastmcp.exceptions import ToolError + + oversized = b"x" * 200 + mock_client = mocker.AsyncMock() + mock_client.webdav.read_file = mocker.AsyncMock( + return_value=(oversized, "application/zip") + ) + mocker.patch( + "nextcloud_mcp_server.server.webdav.get_client", return_value=mock_client + ) + mocker.patch("nextcloud_mcp_server.server.webdav._MAX_ARCHIVE_BYTES", 100) + + tools = _make_tool_map() + with pytest.raises(ToolError, match="exceeds the"): + await tools["nc_webdav_read_archive_member"].run( + {"path": "huge.zip", "member_path": "content.xml"}, context=None + ) + + +@pytest.mark.unit +async def test_tool_download_to_temp_rejects_oversized_file(mocker): + """nc_webdav_download_to_temp raises ValueError when the file exceeds _MAX_TEMP_DOWNLOAD_BYTES.""" + oversized = b"x" * 100 + mock_client = mocker.AsyncMock() + mock_client.webdav.read_file = mocker.AsyncMock( + return_value=(oversized, "application/octet-stream") + ) + mock_client.username = "alice" + mocker.patch( + "nextcloud_mcp_server.server.webdav.get_client", return_value=mock_client + ) + mocker.patch("nextcloud_mcp_server.server.webdav._MAX_TEMP_DOWNLOAD_BYTES", 10) + + from mcp.server.fastmcp.exceptions import ToolError + + tools = _make_tool_map() + with pytest.raises(ToolError, match="exceeds the"): + await tools["nc_webdav_download_to_temp"].run({"path": "big.bin"}, context=None) + + +@pytest.mark.unit +async def test_tool_cleanup_temp_rejects_wrong_owner(mocker, tmp_path): + """nc_webdav_cleanup_temp rejects a caller whose username doesn't match the registry.""" + p = tmp_path / "nc_download_wiring2.bin" + p.write_bytes(b"data") + path = str(p) + _temp_registry[path] = "alice" + + mock_client = mocker.AsyncMock() + mock_client.username = "bob" + mocker.patch( + "nextcloud_mcp_server.server.webdav.get_client", return_value=mock_client + ) + + try: + tools = _make_tool_map() + result = await tools["nc_webdav_cleanup_temp"].run( + {"local_path": path}, context=None + ) + + assert result["status"] == "error" + assert "permission" in result["message"].lower() + assert os.path.exists(path) + finally: + _temp_registry.pop(path, None) + if p.exists(): + p.unlink()