diff --git a/src/shelfai/cli/main.py b/src/shelfai/cli/main.py index 7b59346..489c08f 100644 --- a/src/shelfai/cli/main.py +++ b/src/shelfai/cli/main.py @@ -18,6 +18,8 @@ chunk Run heuristic pre-filter on a monolithic agent file tokens Show token counts and budget checks for chunk bundles abstract Generate heuristic chunk abstracts and routing context + merge Merge multiple chunks into one file + split Split a large chunk into smaller pieces dead-chunks Identify chunks never loaded at runtime suggest Analyse an AGENT.md and recommend a chunking strategy keywords List and inspect chunk classification keywords @@ -1765,6 +1767,146 @@ def abstract_cmd( console.print(f"\n[green]Stored abstracts in {store.store_path}[/green]") +# ────────────────────────────────────────────── +# shelfai merge +# ────────────────────────────────────────────── + + +@app.command("merge") +def merge_cmd( + chunk_ids: list[str] = typer.Argument(..., help="Chunk files or IDs to merge"), + output: Optional[str] = typer.Option(None, "--output", "-o", help="Output file name"), + preview: bool = typer.Option(False, "--preview", help="Preview merged output without writing"), + delete_originals: bool = typer.Option(False, "--delete-originals", help="Delete source chunks after merge"), + separator: str = typer.Option("\n\n---\n\n", "--separator", help="Separator inserted between merged chunks"), + shelf_path: str = typer.Option("./shelf", "--shelf", "-s", help="Path to shelf directory"), +): + """Merge multiple chunks into one file.""" + from shelfai.core.merge_split import ChunkMerger + from shelfai.core.shelf import Shelf + + shelf = Shelf(shelf_path) + if not shelf.exists(): + console.print(f"[red]No shelf found at {shelf_path}. Run `shelfai init` first.[/red]") + raise typer.Exit(1) + + merger = ChunkMerger() + + if preview: + try: + preview_text = merger.preview_merge(shelf_path, chunk_ids) + except Exception as exc: + console.print(f"[red]Merge preview failed: {exc}[/red]") + raise typer.Exit(1) + if not preview_text: + console.print("[yellow]Nothing to preview.[/yellow]") + return + console.print("\n[bold]Merge Preview[/bold]\n") + console.print(preview_text) + console.print() + return + + if not output: + console.print("[red]`--output` is required unless you use `--preview`.[/red]") + raise typer.Exit(1) + + result = merger.merge( + shelf_path=shelf_path, + chunk_ids=chunk_ids, + output_name=output, + delete_originals=delete_originals, + separator=separator, + ) + + if not result.success: + console.print(f"[red]{result.message}[/red]") + raise typer.Exit(1) + + console.print(f"[green]{result.message}[/green]") + console.print(f"[dim]Source chunks: {', '.join(result.source_chunks)}[/dim]") + console.print(f"[dim]Estimated tokens: ~{result.total_tokens:,}[/dim]") + console.print() + + +# ────────────────────────────────────────────── +# shelfai split +# ────────────────────────────────────────────── + + +@app.command("split") +def split_cmd( + chunk_id: str = typer.Argument(..., help="Chunk file or ID to split"), + by: str = typer.Option("heading", "--by", help="Split method: heading or tokens"), + level: int = typer.Option(2, "--level", help="Heading level to split on"), + max_tokens: int = typer.Option(3000, "--max", help="Max tokens per chunk when splitting by tokens"), + preview: bool = typer.Option(False, "--preview", help="Preview split plan without writing"), + delete_original: bool = typer.Option(False, "--delete-original", help="Delete the source chunk after splitting"), + shelf_path: str = typer.Option("./shelf", "--shelf", "-s", help="Path to shelf directory"), +): + """Split a large chunk into smaller pieces.""" + from shelfai.core.merge_split import ChunkSplitter + from shelfai.core.shelf import Shelf + + shelf = Shelf(shelf_path) + if not shelf.exists(): + console.print(f"[red]No shelf found at {shelf_path}. Run `shelfai init` first.[/red]") + raise typer.Exit(1) + + splitter = ChunkSplitter() + + if preview: + try: + preview_items = splitter.preview_split( + shelf_path=shelf_path, + chunk_id=chunk_id, + method=by, + heading_level=level, + max_tokens=max_tokens, + ) + except Exception as exc: + console.print(f"[red]Split preview failed: {exc}[/red]") + raise typer.Exit(1) + + if not preview_items: + console.print("[yellow]Nothing to preview.[/yellow]") + return + + console.print("\n[bold]Split Preview[/bold]\n") + for item in preview_items: + console.print(f" [cyan]{item['name']}[/cyan] (~{item['tokens']:,} tokens)") + console.print(f" [dim]{item['preview']}[/dim]") + console.print() + return + + if by not in {"heading", "tokens"}: + console.print("[red]`--by` must be `heading` or `tokens`.[/red]") + raise typer.Exit(1) + + if by == "heading": + result = splitter.split_by_heading( + shelf_path=shelf_path, + chunk_id=chunk_id, + heading_level=level, + delete_original=delete_original, + ) + else: + result = splitter.split_by_tokens( + shelf_path=shelf_path, + chunk_id=chunk_id, + max_tokens=max_tokens, + delete_original=delete_original, + ) + + if not result.success: + console.print(f"[red]{result.message}[/red]") + raise typer.Exit(1) + + console.print(f"[green]{result.message}[/green]") + for output_file, tokens in zip(result.output_files, result.tokens_per_chunk): + console.print(f" [cyan]{output_file}[/cyan] (~{tokens:,} tokens)") + console.print() + + # ────────────────────────────────────────────── # shelfai suggest # ────────────────────────────────────────────── diff --git a/src/shelfai/core/merge_split.py b/src/shelfai/core/merge_split.py new file mode 100644 index 0000000..11c6fde --- /dev/null +++ b/src/shelfai/core/merge_split.py @@ -0,0 +1,511 @@ +""" +ShelfAI chunk merge/split helpers. + +These utilities operate on shelf files and preserve frontmatter while +providing preview-mode output for safe planning. +""" + +from __future__ import annotations + +import re +import unicodedata +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +import yaml + +from shelfai.core.fileops import ShelfFile, estimate_tokens +from shelfai.core.safety import validate_path +from shelfai.core.shelf import Shelf + + +@dataclass +class MergeResult: + output_file: str + source_chunks: list[str] + total_tokens: int + success: bool + message: str + + +@dataclass +class SplitResult: + output_files: list[str] + source_chunk: str + tokens_per_chunk: list[int] + success: bool + message: str + + +def _slugify(text: str, default: str = "chunk") -> str: + """Convert text into a stable lowercase file slug.""" + normalized = unicodedata.normalize("NFKD", text) + ascii_text = normalized.encode("ascii", "ignore").decode("ascii") + slug = re.sub(r"[^a-zA-Z0-9]+", "-", ascii_text).strip("-").lower() + return slug or default + + +def _humanize_stem(path: Path) -> str: + return path.stem.replace("_", " ").replace("-", " ").title() + + +def _render_shelf_file(shelf_file: ShelfFile) -> str: + """Render a shelf file body with optional frontmatter.""" + if not shelf_file.metadata: + return shelf_file.content + + meta = yaml.safe_dump(shelf_file.metadata, sort_keys=False).strip() + return f"---\n{meta}\n---\n\n{shelf_file.content}" + + +def _unique_list(values: list) -> list: + seen = set() + result = [] + for value in values: + marker = repr(value) + if marker in seen: + continue + seen.add(marker) + result.append(value) + return result + + +def _combine_metadata(source_files: list[ShelfFile], output_name: str, source_paths: list[str]) -> dict: + """Merge useful metadata fields while preserving the first file's identity.""" + merged: dict = {} + + keys = set() + for file in source_files: + keys.update(file.metadata.keys()) + + for key in sorted(keys): + values = [file.metadata.get(key) for file in source_files if file.metadata.get(key) is not None] + if not values: + continue + + if key in {"depends_on", "tags", "aliases", "related", "see_also"} or all( + isinstance(value, list) for value in values + ): + flattened = [] + for value in values: + if isinstance(value, list): + flattened.extend(value) + else: + flattened.append(value) + merged[key] = _unique_list(flattened) + continue + + if all(value == values[0] for value in values): + merged[key] = values[0] + continue + + merged[key] = values[0] + + merged["title"] = _humanize_stem(Path(output_name)) + merged["merged_from"] = source_paths + return merged + + +def _resolve_input_path(shelf: Shelf, chunk_id: str) -> Path: + """Resolve a chunk identifier to a shelf-relative path.""" + shelf_root = shelf.path + candidate = Path(chunk_id) + + if candidate.is_absolute(): + candidate = candidate.resolve() + try: + candidate.relative_to(shelf_root) + except ValueError as exc: + raise FileNotFoundError(f"Chunk is outside shelf root: {chunk_id}") from exc + if not candidate.exists(): + raise FileNotFoundError(f"Chunk not found: {chunk_id}") + return candidate.relative_to(shelf_root) + + search_order = [candidate] + if candidate.suffix != ".md": + search_order.append(candidate.with_suffix(".md")) + if len(candidate.parts) == 1: + search_order.extend([ + Path("chunks") / candidate, + Path("chunks") / candidate.with_suffix(".md"), + ]) + + for rel_path in search_order: + full_path = shelf_root / rel_path + if full_path.exists(): + validate_path(shelf_root, rel_path.as_posix()) + return rel_path + + raise FileNotFoundError(f"Chunk not found: {chunk_id}") + + +def _resolve_output_path(shelf: Shelf, source_path: Path, output_name: str) -> Path: + """Resolve an output filename relative to the shelf root.""" + shelf_root = shelf.path + candidate = Path(output_name) + + if candidate.is_absolute(): + raise ValueError("Output path must be relative to the shelf root") + + if candidate.suffix != ".md": + candidate = candidate.with_suffix(".md") + + if candidate.parent == Path("."): + candidate = source_path.parent / candidate.name + + validate_path(shelf_root, candidate.as_posix()) + return candidate + + +def _chunk_preview(text: str) -> str: + preview = text.replace("\n", " ") + return preview[:100] + ("..." if len(preview) > 100 else "") + + +def _trim_merge_separator(lines: list[str]) -> list[str]: + """Drop the standard merge separator if it appears at a section boundary.""" + trimmed = lines[:] + while trimmed and trimmed[-1].strip() == "": + trimmed.pop() + if trimmed and trimmed[-1].strip() == "---": + trimmed.pop() + while trimmed and trimmed[-1].strip() == "": + trimmed.pop() + return trimmed + + +def _split_h2_sections(content: str, heading_level: int) -> list[dict]: + """Split content into sections starting at the requested heading level.""" + heading_prefix = "#" * heading_level + " " + lines = content.splitlines() + sections: list[dict] = [] + current_lines: list[str] = [] + current_title: Optional[str] = None + preamble_lines: list[str] = [] + + for line in lines: + if line.startswith(heading_prefix): + if current_title is None: + preamble_lines = current_lines[:] + else: + current_lines = _trim_merge_separator(current_lines) + sections.append( + { + "title": current_title, + "content": "\n".join(current_lines).rstrip(), + } + ) + current_title = line[len(heading_prefix):].strip() + current_lines = [line] + else: + current_lines.append(line) + + if current_title is None: + return [{"title": "entire-file", "content": content.rstrip()}] + + sections.append( + { + "title": current_title, + "content": "\n".join(_trim_merge_separator(current_lines)).rstrip(), + } + ) + + if preamble_lines: + sections[0]["content"] = "\n".join(preamble_lines + sections[0]["content"].splitlines()).rstrip() + + return sections + + +def _split_paragraphs(content: str, max_tokens: int) -> list[str]: + paragraphs = re.split(r"\n\s*\n", content.strip()) if content.strip() else [""] + chunks: list[str] = [] + current: list[str] = [] + + def flush() -> None: + if current: + chunks.append("\n\n".join(current).strip()) + current.clear() + + for paragraph in paragraphs: + candidate = "\n\n".join(current + [paragraph]).strip() if current else paragraph.strip() + if current and estimate_tokens(candidate) > max_tokens: + flush() + if estimate_tokens(paragraph) > max_tokens: + flush() + chunks.append(paragraph.strip()) + continue + current.append(paragraph) + + flush() + return [chunk for chunk in chunks if chunk.strip()] + + +class ChunkMerger: + """Merge multiple chunks into a single chunk.""" + + def merge( + self, + shelf_path: str, + chunk_ids: list[str], + output_name: str, + delete_originals: bool = False, + separator: str = "\n\n---\n\n", + ) -> MergeResult: + """ + Merge specified chunks into one file. + Concatenates content with separator. + Updates frontmatter (combines depends_on, etc.). + Optionally deletes the original files. + """ + shelf = Shelf(shelf_path) + if not shelf.exists(): + return MergeResult("", chunk_ids, 0, False, f"No shelf found at {shelf_path}") + + if not chunk_ids: + return MergeResult("", [], 0, False, "No chunks provided for merge") + + try: + source_paths = [_resolve_input_path(shelf, chunk_id) for chunk_id in chunk_ids] + source_files = [shelf.read_shelf_file(path.as_posix()) for path in source_paths] + merged_content = separator.join(file.content.rstrip() for file in source_files) + output_path = _resolve_output_path(shelf, source_paths[0], output_name) + metadata = _combine_metadata(source_files, output_path.name, [p.as_posix() for p in source_paths]) + output_file = ShelfFile(path=output_path.as_posix(), content=merged_content, metadata=metadata) + + shelf.write_file(output_file.path, output_file.content, output_file.metadata) + + if delete_originals: + for source_path in source_paths: + full_path = shelf.path / source_path + if full_path.resolve() != (shelf.path / output_path).resolve() and full_path.exists(): + full_path.unlink() + + total_tokens = sum(estimate_tokens(file.content) for file in source_files) + message = f"Merged {len(source_files)} chunk(s) into {output_path.as_posix()}" + if delete_originals: + message += " and deleted the originals" + + return MergeResult( + output_file=output_path.as_posix(), + source_chunks=[p.as_posix() for p in source_paths], + total_tokens=total_tokens, + success=True, + message=message, + ) + except Exception as exc: + return MergeResult("", chunk_ids, 0, False, str(exc)) + + def preview_merge(self, shelf_path: str, chunk_ids: list[str]) -> str: + """Show what the merged chunk would look like without writing.""" + shelf = Shelf(shelf_path) + if not shelf.exists() or not chunk_ids: + return "" + + source_paths = [_resolve_input_path(shelf, chunk_id) for chunk_id in chunk_ids] + source_files = [shelf.read_shelf_file(path.as_posix()) for path in source_paths] + merged_content = "\n\n---\n\n".join(file.content.rstrip() for file in source_files) + metadata = _combine_metadata(source_files, "merged.md", [p.as_posix() for p in source_paths]) + preview = ShelfFile(path="merged.md", content=merged_content, metadata=metadata) + return _render_shelf_file(preview) + + +class ChunkSplitter: + """Split a large chunk into smaller pieces.""" + + def split_by_heading( + self, + shelf_path: str, + chunk_id: str, + heading_level: int = 2, + delete_original: bool = False, + ) -> SplitResult: + """ + Split chunk at heading boundaries. + Each section becomes its own chunk file. + File names derived from heading text. + """ + shelf = Shelf(shelf_path) + if not shelf.exists(): + return SplitResult([], chunk_id, [], False, f"No shelf found at {shelf_path}") + + try: + source_path = _resolve_input_path(shelf, chunk_id) + source_file = shelf.read_shelf_file(source_path.as_posix()) + sections = _split_h2_sections(source_file.content, heading_level) + + if len(sections) == 1: + base_name = _slugify(sections[0]["title"]) + output_name = f"{base_name}.md" + output_path = _resolve_output_path(shelf, source_path, output_name) + if delete_original and output_path.resolve() == (shelf.path / source_path).resolve(): + output_name = f"{base_name}-split.md" + output_path = _resolve_output_path(shelf, source_path, output_name) + output_file = ShelfFile( + path=output_path.as_posix(), + content=sections[0]["content"], + metadata={**source_file.metadata, "split_from": source_path.as_posix(), "part": 1}, + ) + shelf.write_file(output_file.path, output_file.content, output_file.metadata) + if delete_original: + (shelf.path / source_path).unlink() + return SplitResult( + output_files=[output_path.as_posix()], + source_chunk=source_path.as_posix(), + tokens_per_chunk=[estimate_tokens(output_file.content)], + success=True, + message=f"Split produced one chunk: {output_path.as_posix()}", + ) + + output_files: list[str] = [] + tokens_per_chunk: list[int] = [] + used_names: dict[str, int] = {} + + for index, section in enumerate(sections, 1): + slug = _slugify(section["title"]) + count = used_names.get(slug, 0) + 1 + used_names[slug] = count + if count > 1: + slug = f"{slug}-{count}" + output_path = source_path.with_name(f"{slug}.md") + output_path = _resolve_output_path(shelf, source_path, output_path.name) + if delete_original and output_path.resolve() == (shelf.path / source_path).resolve(): + output_path = _resolve_output_path(shelf, source_path, f"{slug}-split.md") + section_file = ShelfFile( + path=output_path.as_posix(), + content=section["content"], + metadata={ + **source_file.metadata, + "title": section["title"], + "split_from": source_path.as_posix(), + "part": index, + }, + ) + shelf.write_file(section_file.path, section_file.content, section_file.metadata) + output_files.append(output_path.as_posix()) + tokens_per_chunk.append(estimate_tokens(section_file.content)) + + if delete_original: + (shelf.path / source_path).unlink() + + return SplitResult( + output_files=output_files, + source_chunk=source_path.as_posix(), + tokens_per_chunk=tokens_per_chunk, + success=True, + message=f"Split {source_path.as_posix()} into {len(output_files)} chunk(s)", + ) + except Exception as exc: + return SplitResult([], chunk_id, [], False, str(exc)) + + def split_by_tokens( + self, + shelf_path: str, + chunk_id: str, + max_tokens: int = 3000, + delete_original: bool = False, + ) -> SplitResult: + """ + Split chunk into pieces of max_tokens each. + Tries to split at paragraph boundaries. + """ + shelf = Shelf(shelf_path) + if not shelf.exists(): + return SplitResult([], chunk_id, [], False, f"No shelf found at {shelf_path}") + + try: + source_path = _resolve_input_path(shelf, chunk_id) + source_file = shelf.read_shelf_file(source_path.as_posix()) + pieces = _split_paragraphs(source_file.content, max_tokens=max_tokens) + if not pieces: + pieces = [""] + + output_files: list[str] = [] + tokens_per_chunk: list[int] = [] + for index, piece in enumerate(pieces, 1): + output_path = source_path.with_name(f"{source_path.stem}-part-{index}.md") + output_path = _resolve_output_path(shelf, source_path, output_path.name) + part_file = ShelfFile( + path=output_path.as_posix(), + content=piece, + metadata={ + **source_file.metadata, + "split_from": source_path.as_posix(), + "part": index, + "part_count": len(pieces), + }, + ) + shelf.write_file(part_file.path, part_file.content, part_file.metadata) + output_files.append(output_path.as_posix()) + tokens_per_chunk.append(estimate_tokens(part_file.content)) + + if delete_original: + (shelf.path / source_path).unlink() + + return SplitResult( + output_files=output_files, + source_chunk=source_path.as_posix(), + tokens_per_chunk=tokens_per_chunk, + success=True, + message=f"Split {source_path.as_posix()} into {len(output_files)} chunk(s) by tokens", + ) + except Exception as exc: + return SplitResult([], chunk_id, [], False, str(exc)) + + def preview_split( + self, + shelf_path: str, + chunk_id: str, + method: str = "heading", + **kwargs, + ) -> list[dict]: + """ + Show what the split would produce without writing. + Returns: [{"name": "chunk-name", "tokens": 500, "preview": "first 100 chars..."}] + """ + shelf = Shelf(shelf_path) + if not shelf.exists(): + return [] + + if method == "heading": + heading_level = int(kwargs.get("heading_level", 2)) + source_path = _resolve_input_path(shelf, chunk_id) + source_file = shelf.read_shelf_file(source_path.as_posix()) + sections = _split_h2_sections(source_file.content, heading_level) + if len(sections) == 1: + name = f"{_slugify(sections[0]['title'])}.md" + return [{ + "name": name, + "tokens": estimate_tokens(sections[0]["content"]), + "preview": _chunk_preview(sections[0]["content"]), + }] + previews = [] + used_names: dict[str, int] = {} + for section in sections: + slug = _slugify(section["title"]) + count = used_names.get(slug, 0) + 1 + used_names[slug] = count + if count > 1: + slug = f"{slug}-{count}" + previews.append({ + "name": f"{slug}.md", + "tokens": estimate_tokens(section["content"]), + "preview": _chunk_preview(section["content"]), + }) + return previews + + if method == "tokens": + max_tokens = int(kwargs.get("max_tokens", 3000)) + source_path = _resolve_input_path(shelf, chunk_id) + source_file = shelf.read_shelf_file(source_path.as_posix()) + pieces = _split_paragraphs(source_file.content, max_tokens=max_tokens) or [""] + return [ + { + "name": f"{source_path.stem}-part-{index}.md", + "tokens": estimate_tokens(piece), + "preview": _chunk_preview(piece), + } + for index, piece in enumerate(pieces, 1) + ] + + raise ValueError(f"Unknown split method: {method}") diff --git a/tests/test_merge_split.py b/tests/test_merge_split.py new file mode 100644 index 0000000..8440790 --- /dev/null +++ b/tests/test_merge_split.py @@ -0,0 +1,307 @@ +"""Tests for chunk merge/split helpers and CLI commands.""" + +from pathlib import Path + +from typer.testing import CliRunner + +from shelfai.cli.main import app +from shelfai.core.merge_split import ChunkMerger, ChunkSplitter +from shelfai.core.shelf import Shelf + + +runner = CliRunner() + + +def _write_markdown(path: Path, title: str, body: str, type_: str = "knowledge") -> None: + path.write_text( + f"---\n" + f"title: {title}\n" + f"type: {type_}\n" + f"---\n\n" + f"{body.rstrip()}\n", + encoding="utf-8", + ) + + +class TestChunkMerge: + + def test_merge_two_chunks(self, shelf_dir): + alpha = shelf_dir / "knowledge" / "alpha.md" + beta = shelf_dir / "knowledge" / "beta.md" + _write_markdown(alpha, "Alpha", "# Alpha\n\nFirst chunk.\n") + _write_markdown(beta, "Beta", "# Beta\n\nSecond chunk.\n") + + result = ChunkMerger().merge( + str(shelf_dir), + ["knowledge/alpha.md", "knowledge/beta.md"], + "knowledge/combined.md", + ) + + assert result.success + assert result.output_file == "knowledge/combined.md" + combined = Shelf(str(shelf_dir)).read_shelf_file(result.output_file) + assert "First chunk." in combined.content + assert "Second chunk." in combined.content + assert combined.content.index("First chunk.") < combined.content.index("Second chunk.") + + def test_merge_preserves_order(self, shelf_dir): + first = shelf_dir / "knowledge" / "one.md" + second = shelf_dir / "knowledge" / "two.md" + _write_markdown(first, "One", "# One\n\nBody one.\n") + _write_markdown(second, "Two", "# Two\n\nBody two.\n") + + result = ChunkMerger().merge( + str(shelf_dir), + ["knowledge/two.md", "knowledge/one.md"], + "knowledge/reordered.md", + ) + + assert result.success + combined = Shelf(str(shelf_dir)).read_shelf_file(result.output_file) + assert combined.content.index("Body two.") < combined.content.index("Body one.") + + def test_merge_with_separator(self, shelf_dir): + first = shelf_dir / "knowledge" / "sep-a.md" + second = shelf_dir / "knowledge" / "sep-b.md" + _write_markdown(first, "Sep A", "# A\n\nAlpha.\n") + _write_markdown(second, "Sep B", "# B\n\nBeta.\n") + + result = ChunkMerger().merge( + str(shelf_dir), + ["knowledge/sep-a.md", "knowledge/sep-b.md"], + "knowledge/separated.md", + separator="\n+++\n", + ) + + assert result.success + combined = Shelf(str(shelf_dir)).read_shelf_file(result.output_file) + assert "\n+++\n" in combined.content + + def test_merge_delete_originals(self, shelf_dir): + first = shelf_dir / "knowledge" / "delete-a.md" + second = shelf_dir / "knowledge" / "delete-b.md" + _write_markdown(first, "Delete A", "# A\n\nAlpha.\n") + _write_markdown(second, "Delete B", "# B\n\nBeta.\n") + + result = ChunkMerger().merge( + str(shelf_dir), + ["knowledge/delete-a.md", "knowledge/delete-b.md"], + "knowledge/deleted.md", + delete_originals=True, + ) + + assert result.success + assert not first.exists() + assert not second.exists() + + def test_merge_preview(self, shelf_dir): + first = shelf_dir / "knowledge" / "preview-a.md" + second = shelf_dir / "knowledge" / "preview-b.md" + _write_markdown(first, "Preview A", "# A\n\nAlpha.\n") + _write_markdown(second, "Preview B", "# B\n\nBeta.\n") + + preview = ChunkMerger().preview_merge( + str(shelf_dir), + ["knowledge/preview-a.md", "knowledge/preview-b.md"], + ) + + assert "Alpha." in preview + assert "Beta." in preview + assert not (shelf_dir / "knowledge" / "previewed.md").exists() + + def test_merge_cli_preview(self, shelf_dir): + first = shelf_dir / "knowledge" / "cli-a.md" + second = shelf_dir / "knowledge" / "cli-b.md" + _write_markdown(first, "CLI A", "# A\n\nAlpha.\n") + _write_markdown(second, "CLI B", "# B\n\nBeta.\n") + + result = runner.invoke( + app, + [ + "merge", + "knowledge/cli-a.md", + "knowledge/cli-b.md", + "--preview", + "--shelf", + str(shelf_dir), + ], + ) + + assert result.exit_code == 0 + assert "Merge Preview" in result.output + assert "Alpha." in result.output + assert "Beta." in result.output + + +class TestChunkSplit: + + def test_split_by_heading(self, shelf_dir): + source = shelf_dir / "knowledge" / "guide.md" + _write_markdown( + source, + "Guide", + "# Guide\n\nIntro text.\n\n## Overview\n\nOverview text.\n\n## Details\n\nDetails text.\n", + ) + + result = ChunkSplitter().split_by_heading(str(shelf_dir), "knowledge/guide.md", heading_level=2) + + assert result.success + assert len(result.output_files) == 2 + assert any(path.endswith("overview.md") for path in result.output_files) + assert any(path.endswith("details.md") for path in result.output_files) + + overview = Shelf(str(shelf_dir)).read_shelf_file(next(p for p in result.output_files if p.endswith("overview.md"))) + details = Shelf(str(shelf_dir)).read_shelf_file(next(p for p in result.output_files if p.endswith("details.md"))) + assert "Intro text." in overview.content + assert "Overview text." in overview.content + assert "Details text." in details.content + + def test_split_by_tokens(self, shelf_dir): + source = shelf_dir / "knowledge" / "tokens.md" + body = ( + "# Tokens\n\n" + "Paragraph one is short and tidy.\n\n" + "Paragraph two is also short and tidy.\n\n" + "Paragraph three is short and tidy.\n\n" + "Paragraph four is short and tidy.\n" + ) + _write_markdown(source, "Tokens", body) + + result = ChunkSplitter().split_by_tokens( + str(shelf_dir), + "knowledge/tokens.md", + max_tokens=12, + ) + + assert result.success + assert len(result.output_files) >= 2 + assert all(tokens <= 12 for tokens in result.tokens_per_chunk) + + def test_split_at_paragraph_boundary_and_preserves_content(self, shelf_dir): + source = shelf_dir / "knowledge" / "paragraphs.md" + body = ( + "# Paragraphs\n\n" + "First paragraph has a complete thought.\n\n" + "Second paragraph stays intact.\n\n" + "Third paragraph stays intact too.\n" + ) + _write_markdown(source, "Paragraphs", body) + + result = ChunkSplitter().split_by_tokens( + str(shelf_dir), + "knowledge/paragraphs.md", + max_tokens=12, + ) + + assert result.success + contents = [Shelf(str(shelf_dir)).read_shelf_file(path).content for path in result.output_files] + assert "\n\n".join(contents) == body.rstrip() + + def test_split_delete_original(self, shelf_dir): + source = shelf_dir / "knowledge" / "delete-me.md" + _write_markdown( + source, + "Delete Me", + "# Delete Me\n\nParagraph one.\n\nParagraph two.\n", + ) + + result = ChunkSplitter().split_by_tokens( + str(shelf_dir), + "knowledge/delete-me.md", + max_tokens=12, + delete_original=True, + ) + + assert result.success + assert not source.exists() + + def test_split_preview(self, shelf_dir): + source = shelf_dir / "knowledge" / "preview-split.md" + _write_markdown( + source, + "Preview Split", + "# Preview Split\n\n## One\n\nOne text.\n\n## Two\n\nTwo text.\n", + ) + + preview = ChunkSplitter().preview_split( + str(shelf_dir), + "knowledge/preview-split.md", + method="heading", + heading_level=2, + ) + + assert len(preview) == 2 + assert preview[0]["name"].endswith(".md") + assert "One text." in preview[0]["preview"] + assert "Two text." in preview[1]["preview"] + + def test_split_cli_by_tokens(self, shelf_dir): + source = shelf_dir / "knowledge" / "cli-split.md" + _write_markdown( + source, + "CLI Split", + "# CLI Split\n\nParagraph one. Paragraph one.\n\nParagraph two. Paragraph two.\n", + ) + + result = runner.invoke( + app, + [ + "split", + "knowledge/cli-split.md", + "--by", + "tokens", + "--max", + "12", + "--shelf", + str(shelf_dir), + ], + ) + + assert result.exit_code == 0 + assert "Split" in result.output + assert "part-1" in result.output + + def test_merge_then_split_roundtrip(self, shelf_dir): + first = shelf_dir / "knowledge" / "rt-a.md" + second = shelf_dir / "knowledge" / "rt-b.md" + first_body = "## Alpha\n\nAlpha body.\n" + second_body = "## Beta\n\nBeta body.\n" + _write_markdown(first, "RT A", first_body) + _write_markdown(second, "RT B", second_body) + + merge_result = ChunkMerger().merge( + str(shelf_dir), + ["knowledge/rt-a.md", "knowledge/rt-b.md"], + "knowledge/rt-merged.md", + ) + assert merge_result.success + + split_result = ChunkSplitter().split_by_heading( + str(shelf_dir), + "knowledge/rt-merged.md", + heading_level=2, + ) + + assert split_result.success + merged_alpha = Shelf(str(shelf_dir)).read_shelf_file( + next(path for path in split_result.output_files if path.endswith("alpha.md")) + ) + merged_beta = Shelf(str(shelf_dir)).read_shelf_file( + next(path for path in split_result.output_files if path.endswith("beta.md")) + ) + assert merged_alpha.content == Shelf(str(shelf_dir)).read_shelf_file("knowledge/rt-a.md").content + assert merged_beta.content == Shelf(str(shelf_dir)).read_shelf_file("knowledge/rt-b.md").content + + def test_split_single_heading(self, shelf_dir): + source = shelf_dir / "knowledge" / "single.md" + _write_markdown( + source, + "Single", + "## Single\n\nOnly one section.\n", + ) + + result = ChunkSplitter().split_by_heading(str(shelf_dir), "knowledge/single.md", heading_level=2) + + assert result.success + assert len(result.output_files) == 1 + assert result.output_files[0].endswith("single.md")