diff --git a/src/shelfai/cli/main.py b/src/shelfai/cli/main.py index 2ab0a44..56374c9 100644 --- a/src/shelfai/cli/main.py +++ b/src/shelfai/cli/main.py @@ -13,6 +13,7 @@ bundle Export the shelf as a portable .shelf bundle import Import a portable .shelf bundle inspect Inspect a .shelf bundle manifest + compile Compile one or more chunks into optimized context prune Clean up stale memory entries chunk-scan Scan agents directory for chunking candidates chunk Run heuristic pre-filter on a monolithic agent file @@ -1098,6 +1099,55 @@ def export_context( console_err.print(f"[dim]Exported (~{estimate_tokens(result):,} tokens)[/dim]") +# ────────────────────────────────────────────── +# shelfai compile +# ────────────────────────────────────────────── + + +@app.command("compile") +def compile_cmd( + chunks: list[str] = typer.Argument(None, help="Chunk IDs to compile"), + task: Optional[str] = typer.Option(None, "--task", help="Auto-select chunks for a task type"), + all_chunks: bool = typer.Option(False, "--all", help="Compile every chunk in the shelf"), + max_tokens: Optional[int] = typer.Option(None, "--max-tokens", help="Token budget"), + order: str = typer.Option("priority", "--order", help="Chunk order: priority, alphabetical, dependency, custom"), + output: Optional[str] = typer.Option(None, "--output", "-o", help="Write compiled context to a file"), + shelf_path: str = typer.Option("./shelf", "--shelf", "-s", help="Path to shelf directory"), +): + """Compile one or more chunks into an optimized context string.""" + from shelfai.core.compiler import ChunkCompiler, CompileConfig + + shelf_root = _resolve_shelf_root(shelf_path) + if shelf_root is None: + console.print(f"[red]No shelf found at {shelf_path}. Run `shelfai init` first.[/red]") + raise typer.Exit(1) + + compiler = ChunkCompiler(str(shelf_root)) + config = CompileConfig(max_tokens=max_tokens, order=order) + + if task: + compiled = compiler.compile_for_task(task, config=config) + elif all_chunks: + compiled = compiler.compile(compiler.list_chunks(), config=config) + elif chunks: + compiled = compiler.compile(list(chunks), config=config) + else: + console.print("[red]Provide chunk IDs, --task, or --all.[/red]") + raise typer.Exit(1) + + if output: + output_path = Path(output).expanduser().resolve() + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(compiled.content, encoding="utf-8") + console.print(f"[green]Wrote compiled context to {output_path}[/green]") + else: + print(compiled.content) + + if compiled.warnings: + for warning in compiled.warnings: + console_err.print(f"[yellow]{warning}[/yellow]") + + # ────────────────────────────────────────────── # shelfai templates # ────────────────────────────────────────────── diff --git a/src/shelfai/core/__init__.py b/src/shelfai/core/__init__.py index 94683b0..3be40fa 100644 --- a/src/shelfai/core/__init__.py +++ b/src/shelfai/core/__init__.py @@ -1,6 +1,7 @@ """ShelfAI core modules.""" from shelfai.core.annotations import AnnotationManager, ChunkAnnotation +from shelfai.core.compiler import ChunkCompiler, CompileConfig, CompiledContext from shelfai.core.conditions import ConditionalLoader, LoadCondition, LoadContext from shelfai.core.diff_report import ChunkDiff, ShelfDiffReport, compare_before_after, compare_shelves from shelfai.core.migrate import MigrationPlan, MigrationStep, ShelfMigrator @@ -9,6 +10,9 @@ __all__ = [ "AnnotationManager", "ChunkAnnotation", + "ChunkCompiler", + "CompileConfig", + "CompiledContext", "ChunkDiff", "ChunkPriority", "ConditionalLoader", diff --git a/src/shelfai/core/compiler.py b/src/shelfai/core/compiler.py new file mode 100644 index 0000000..cb840e7 --- /dev/null +++ b/src/shelfai/core/compiler.py @@ -0,0 +1,443 @@ +"""Chunk compilation helpers for assembling shelf context.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +import re +from pathlib import Path +from shelfai.core.fileops import estimate_tokens, read_shelf_file + + +@dataclass +class CompileConfig: + add_headers: bool = True + add_separator: bool = True + strip_frontmatter: bool = True + strip_comments: bool = True + deduplicate: bool = True + max_tokens: int = None + order: str = "priority" + custom_order: list[str] = None + + +@dataclass +class CompiledContext: + content: str + total_tokens: int + chunks_included: list[str] + chunks_excluded: list[str] + warnings: list[str] + + +@dataclass +class _ChunkSection: + chunk_id: str + label: str + content: str + metadata: dict = field(default_factory=dict) + priority: float = 0.0 + + +class ChunkCompiler: + """Compile multiple chunks into a single context string.""" + + def __init__(self, shelf_path: str): + self.shelf_path = Path(shelf_path).expanduser().resolve() + if not self.shelf_path.exists(): + raise FileNotFoundError(f"Shelf path not found: {self.shelf_path}") + + def compile(self, chunk_ids: list[str], config: CompileConfig = None) -> CompiledContext: + """ + Assemble chunks into a single context string. + """ + config = config or CompileConfig() + if not chunk_ids: + return CompiledContext( + content="", + total_tokens=0, + chunks_included=[], + chunks_excluded=[], + warnings=[], + ) + + ordered_ids = self._order_chunks(chunk_ids, config.order, config.custom_order) + sections: list[_ChunkSection] = [] + warnings: list[str] = [] + excluded: list[str] = [] + + for chunk_id in ordered_ids: + try: + section = self._read_section(chunk_id, config=config) + except FileNotFoundError as exc: + excluded.append(chunk_id) + warnings.append(str(exc)) + continue + sections.append(section) + + if config.deduplicate: + seen: set[str] = set() + deduped_sections: list[_ChunkSection] = [] + for section in sections: + paragraphs = self._split_paragraphs(section.content) + kept: list[str] = [] + for paragraph in paragraphs: + normalized = self._normalize_text(paragraph) + if not normalized or normalized in seen: + continue + seen.add(normalized) + kept.append(paragraph) + if kept: + deduped_sections.append( + _ChunkSection( + chunk_id=section.chunk_id, + label=section.label, + content="\n\n".join(kept).strip(), + metadata=section.metadata, + priority=section.priority, + ) + ) + sections = deduped_sections + + selected_sections = sections + if config.max_tokens is not None: + selected_sections, trimmed_excluded, trim_warnings = self._trim_to_budget( + sections, + config.max_tokens, + ) + excluded.extend(trimmed_excluded) + warnings.extend(trim_warnings) + + compiled = self._render(selected_sections, config) + total_tokens = estimate_tokens(compiled) + + return CompiledContext( + content=compiled, + total_tokens=total_tokens, + chunks_included=[section.chunk_id for section in selected_sections], + chunks_excluded=excluded, + warnings=warnings, + ) + + def compile_for_task( + self, + task_type: str, + context: dict = None, + config: CompileConfig = None, + ) -> CompiledContext: + """ + Auto-select and compile chunks for a task type. + """ + config = config or CompileConfig() + context = context or {} + chunk_ids = self.list_chunks() + if not chunk_ids: + return self.compile([], config=config) + + selected: list[str] = [] + excluded: list[str] = [] + warnings: list[str] = [] + context_blob = self._context_blob(task_type, context) + + for chunk_id in chunk_ids: + section = self._read_section(chunk_id, config=config) + if self._matches_task(section.metadata, task_type, context_blob): + selected.append(chunk_id) + else: + excluded.append(chunk_id) + if self._has_selection_metadata(section.metadata): + warnings.append( + f"Excluded {chunk_id} because it does not match task '{task_type}'." + ) + + compiled = self.compile(selected, config=config) + return CompiledContext( + content=compiled.content, + total_tokens=compiled.total_tokens, + chunks_included=compiled.chunks_included, + chunks_excluded=excluded + compiled.chunks_excluded, + warnings=warnings + compiled.warnings, + ) + + def list_chunks(self) -> list[str]: + """Return all chunk IDs available under the shelf.""" + root = self._chunk_root_path + excluded = {"index", "shelf.config", "AGENT"} + return sorted( + self._relative_chunk_id(path) + for path in root.rglob("*.md") + if path.stem not in excluded and not any(part.startswith(".") for part in path.parts) + ) + + def _strip_frontmatter(self, content: str) -> str: + """Remove YAML frontmatter from markdown.""" + text = str(content or "") + if not text.startswith("---\n"): + return text + match = re.match(r"(?ms)^---\s*\n.*?\n---\s*\n?", text) + if not match: + return text + return text[match.end() :] + + def _deduplicate(self, sections: list[tuple[str, str]]) -> list[tuple[str, str]]: + """Remove duplicate paragraphs across chunks.""" + seen: set[str] = set() + result: list[tuple[str, str]] = [] + + for label, content in sections: + paragraphs = self._split_paragraphs(content) + kept: list[str] = [] + for paragraph in paragraphs: + normalized = self._normalize_text(paragraph) + if not normalized or normalized in seen: + continue + seen.add(normalized) + kept.append(paragraph) + if kept: + result.append((label, "\n\n".join(kept).strip())) + + return result + + def _trim_to_budget( + self, + sections: list[_ChunkSection], + max_tokens: int, + ) -> tuple[list[_ChunkSection], list[str], list[str]]: + """Remove lowest-priority content to fit budget.""" + retained = list(sections) + excluded: list[str] = [] + warnings: list[str] = [] + + while retained and estimate_tokens(self._render(retained, CompileConfig())) > max_tokens: + removable = min( + retained, + key=lambda section: (section.priority, len(section.content), section.label), + ) + retained.remove(removable) + excluded.append(removable.chunk_id) + warnings.append( + f"Excluded {removable.chunk_id} to fit token budget ({max_tokens})." + ) + + if retained and estimate_tokens(self._render(retained, CompileConfig())) > max_tokens: + truncated = self._render(retained, CompileConfig())[: max_tokens * 4] + warnings.append("Content still exceeded budget after removing sections; truncated output.") + return ( + [ + _ChunkSection( + chunk_id="__truncated__", + label="truncated", + content=truncated, + metadata={}, + priority=0.0, + ) + ], + excluded, + warnings, + ) + + return retained, excluded, warnings + + def _order_chunks( + self, + chunk_ids: list[str], + order: str, + custom_order: list[str] = None, + ) -> list[str]: + """Sort chunks by specified ordering strategy.""" + normalized = [self._normalize_chunk_id(chunk_id) for chunk_id in chunk_ids] + if order == "custom" and custom_order: + requested = [self._normalize_chunk_id(chunk_id) for chunk_id in custom_order] + remaining = [chunk_id for chunk_id in normalized if chunk_id not in requested] + return [chunk_id for chunk_id in requested if chunk_id in normalized] + remaining + if order == "custom": + return normalized + + sections = [] + for chunk_id in normalized: + try: + sections.append(self._read_section(chunk_id, config=CompileConfig())) + except FileNotFoundError: + sections.append( + _ChunkSection( + chunk_id=chunk_id, + label=chunk_id, + content="", + metadata={}, + priority=0.0, + ) + ) + + if order == "alphabetical": + return [section.chunk_id for section in sorted(sections, key=lambda s: s.label.lower())] + + if order == "dependency": + return self._topological_order(sections) + + priority_order = sorted( + sections, + key=lambda section: (-section.priority, section.label.lower(), section.chunk_id), + ) + return [section.chunk_id for section in priority_order] + + def _read_section(self, chunk_id: str, config: CompileConfig) -> _ChunkSection: + chunk_path = self._resolve_chunk_path(chunk_id) + try: + relative_path = chunk_path.relative_to(self.shelf_path).as_posix() + except ValueError: + relative_path = chunk_path.name + shelf_file = read_shelf_file(self.shelf_path, relative_path) + raw_content = shelf_file.content + if config.strip_frontmatter: + raw_content = self._strip_frontmatter(raw_content) + if config.strip_comments: + raw_content = self._strip_comments(raw_content) + metadata = dict(shelf_file.metadata) + chunk_key = self._relative_chunk_id(chunk_path) + return _ChunkSection( + chunk_id=chunk_key, + label=self._label_for_chunk(chunk_key, metadata), + content=raw_content.strip(), + metadata=metadata, + priority=self._priority_for(metadata), + ) + + def _render(self, sections: list[_ChunkSection], config: CompileConfig) -> str: + parts: list[str] = [] + for section in sections: + body = section.content.strip() + if not body: + continue + if config.add_headers: + parts.append(f"## [{section.label}]") + parts.append(body) + if not parts: + return "" + separator = "\n\n---\n\n" if config.add_separator else "\n\n" + return separator.join(parts).strip() + + def _resolve_chunk_root(self) -> Path: + if (self.shelf_path / "chunks").exists(): + return self.shelf_path / "chunks" + return self.shelf_path + + @property + def _chunk_root_path(self) -> Path: + return self._resolve_chunk_root() + + def _resolve_chunk_path(self, chunk_id: str) -> Path: + candidate = self._normalize_chunk_id(chunk_id) + path = Path(candidate) + if path.suffix != ".md": + path = path.with_suffix(".md") + + root = self._chunk_root_path + direct = root / path + if direct.exists(): + return direct + + if path.name == path.as_posix(): + for match in root.rglob("*.md"): + if match.stem == path.stem: + return match + + raise FileNotFoundError(f"Chunk not found: {chunk_id}") + + def _relative_chunk_id(self, path: Path) -> str: + root = self._chunk_root_path + try: + return path.relative_to(root).with_suffix("").as_posix() + except ValueError: + return path.with_suffix("").name + + def _normalize_chunk_id(self, chunk_id: str) -> str: + value = str(chunk_id).strip().replace("\\", "/") + if value.endswith(".md"): + value = value[:-3] + return value + + def _label_for_chunk(self, chunk_id: str, metadata: dict) -> str: + title = metadata.get("title") + if title: + return str(title).strip() + return self._normalize_chunk_id(chunk_id) + + def _priority_for(self, metadata: dict) -> float: + value = metadata.get("priority", 0) + try: + return float(value) + except (TypeError, ValueError): + return 0.0 + + def _strip_comments(self, content: str) -> str: + return re.sub(r"(?s)", "", content).strip() + + def _split_paragraphs(self, content: str) -> list[str]: + text = str(content or "").strip() + if not text: + return [] + return [part.strip() for part in re.split(r"\n\s*\n", text) if part.strip()] + + def _normalize_text(self, text: str) -> str: + value = str(text or "").lower() + value = re.sub(r"\s+", " ", value) + value = re.sub(r"[^a-z0-9\s]", "", value) + return value.strip() + + def _context_blob(self, task_type: str, context: dict) -> str: + parts = [task_type] + for value in context.values(): + if isinstance(value, (list, tuple, set)): + parts.extend(str(item) for item in value) + elif isinstance(value, dict): + parts.extend(str(item) for item in value.values()) + else: + parts.append(str(value)) + return " ".join(parts).lower() + + def _matches_task(self, metadata: dict, task_type: str, context_blob: str) -> bool: + selectors: list[str] = [] + for key in ("task", "tasks", "task_type", "task_types", "when", "conditions", "load_when"): + value = metadata.get(key) + if isinstance(value, str): + selectors.append(value) + elif isinstance(value, list): + selectors.extend(str(item) for item in value) + if not selectors: + return True + haystack = context_blob or task_type.lower() + return any(selector.lower() in haystack for selector in selectors) + + def _has_selection_metadata(self, metadata: dict) -> bool: + return any(metadata.get(key) is not None for key in ("task", "tasks", "task_type", "task_types", "when", "conditions", "load_when")) + + def _topological_order(self, sections: list[_ChunkSection]) -> list[str]: + graph: dict[str, set[str]] = {section.chunk_id: set() for section in sections} + for section in sections: + deps = section.metadata.get("depends_on", section.metadata.get("dependencies", [])) + if isinstance(deps, str): + deps = [deps] + for dep in deps or []: + dep_id = self._normalize_chunk_id(dep) + if dep_id in graph and dep_id != section.chunk_id: + graph[section.chunk_id].add(dep_id) + + ordered: list[str] = [] + temporary: set[str] = set() + permanent: set[str] = set() + + def visit(node: str): + if node in permanent: + return + if node in temporary: + return + temporary.add(node) + for dep in sorted(graph[node]): + visit(dep) + temporary.remove(node) + permanent.add(node) + ordered.append(node) + + for section in sorted(sections, key=lambda s: (s.priority * -1, s.label.lower(), s.chunk_id)): + visit(section.chunk_id) + + return ordered diff --git a/src/shelfai/hooks/base.py b/src/shelfai/hooks/base.py index 5527d38..3cba610 100644 --- a/src/shelfai/hooks/base.py +++ b/src/shelfai/hooks/base.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Optional +from shelfai.core.compiler import ChunkCompiler, CompileConfig from shelfai.core.conditions import ConditionalLoader, LoadContext from shelfai.core.fileops import estimate_tokens from shelfai.core.learn import log_load @@ -60,11 +61,12 @@ def load_chunks(self, chunk_ids: list[str]) -> str: Read and concatenate specified chunks from the shelf. Returns the combined context string. """ - parts = [] - for chunk_id in chunk_ids: - content = self.load_chunk(chunk_id) - parts.append(f"## {Path(chunk_id).stem}\n\n{content.strip()}") - return "\n\n---\n\n".join(parts) + compiler = ChunkCompiler(str(self.shelf_path)) + compiled = compiler.compile( + chunk_ids, + config=CompileConfig(order="custom", custom_order=chunk_ids), + ) + return compiled.content def load_chunk(self, chunk_id: str) -> str: """Read a single chunk file and return its content.""" diff --git a/tests/test_compiler.py b/tests/test_compiler.py new file mode 100644 index 0000000..089d318 --- /dev/null +++ b/tests/test_compiler.py @@ -0,0 +1,176 @@ +"""Tests for the chunk compiler.""" + +from typer.testing import CliRunner + +from shelfai.cli.main import app +from shelfai.core.compiler import ChunkCompiler, CompileConfig + + +runner = CliRunner() + + +def _make_compiler_shelf(tmp_path): + shelf = tmp_path / "shelf" + chunks = shelf / "chunks" + chunks.mkdir(parents=True) + return shelf, chunks + + +def test_compile_basic(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "soul.md").write_text("# Soul\n\nIdentity and mission.\n", encoding="utf-8") + (chunks / "rules.md").write_text("# Rules\n\nAlways be safe.\n", encoding="utf-8") + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile(["soul", "rules"], config=CompileConfig(order="custom", custom_order=["soul", "rules"])) + + assert "Identity and mission." in result.content + assert "Always be safe." in result.content + assert result.chunks_included == ["soul", "rules"] + + +def test_compile_with_headers(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "soul.md").write_text("# Soul\n\nIdentity and mission.\n", encoding="utf-8") + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile(["soul"]) + + assert "## [soul]" in result.content + + +def test_compile_strips_frontmatter(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "rules.md").write_text( + "---\ntitle: Rules\npriority: 10\n---\n\n# Rules\n\nAlways be safe.\n", + encoding="utf-8", + ) + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile(["rules"]) + + assert "title:" not in result.content + assert "priority:" not in result.content + assert "Always be safe." in result.content + + +def test_compile_deduplicates(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "soul.md").write_text("# Soul\n\nShared paragraph.\n", encoding="utf-8") + (chunks / "rules.md").write_text("# Rules\n\nShared paragraph.\n", encoding="utf-8") + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile(["soul", "rules"], config=CompileConfig(order="custom", custom_order=["soul", "rules"])) + + assert result.content.count("Shared paragraph.") == 1 + + +def test_compile_token_budget(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "soul.md").write_text("---\npriority: 10\n---\n\nsmall text\n", encoding="utf-8") + (chunks / "rules.md").write_text("---\npriority: 1\n---\n\n" + ("x" * 400) + "\n", encoding="utf-8") + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile(["soul", "rules"], config=CompileConfig(max_tokens=20, order="priority")) + + assert "small text" in result.content + assert "rules" in result.chunks_excluded + assert any("token budget" in warning for warning in result.warnings) + + +def test_compile_priority_order(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "soul.md").write_text("---\npriority: 1\n---\n\nSoul.\n", encoding="utf-8") + (chunks / "rules.md").write_text("---\npriority: 10\n---\n\nRules.\n", encoding="utf-8") + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile(["soul", "rules"], config=CompileConfig(order="priority")) + + assert result.chunks_included == ["rules", "soul"] + + +def test_compile_dependency_order(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "soul.md").write_text("---\npriority: 1\n---\n\nSoul.\n", encoding="utf-8") + (chunks / "workflow.md").write_text( + "---\npriority: 10\ndepends_on:\n - soul\n---\n\nWorkflow.\n", + encoding="utf-8", + ) + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile(["workflow", "soul"], config=CompileConfig(order="dependency")) + + assert result.chunks_included == ["soul", "workflow"] + + +def test_compile_custom_order(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "soul.md").write_text("Soul.\n", encoding="utf-8") + (chunks / "rules.md").write_text("Rules.\n", encoding="utf-8") + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile( + ["soul", "rules"], + config=CompileConfig(order="custom", custom_order=["rules", "soul"]), + ) + + assert result.chunks_included == ["rules", "soul"] + + +def test_compile_for_task(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "review.md").write_text( + "---\ntasks:\n - code_review\n---\n\nReview guidance.\n", + encoding="utf-8", + ) + (chunks / "deploy.md").write_text( + "---\ntasks:\n - deploy\n---\n\nDeploy guidance.\n", + encoding="utf-8", + ) + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile_for_task("code_review") + + assert result.chunks_included == ["review"] + assert "Review guidance." in result.content + + +def test_compile_excluded_chunks(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "review.md").write_text( + "---\ntasks:\n - code_review\n---\n\nReview guidance.\n", + encoding="utf-8", + ) + (chunks / "deploy.md").write_text( + "---\ntasks:\n - deploy\n---\n\nDeploy guidance.\n", + encoding="utf-8", + ) + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile_for_task("code_review") + + assert "deploy" in result.chunks_excluded + assert any("does not match task" in warning for warning in result.warnings) + + +def test_empty_compile(tmp_path): + shelf, _ = _make_compiler_shelf(tmp_path) + + compiler = ChunkCompiler(str(shelf)) + result = compiler.compile([]) + + assert result.content == "" + assert result.total_tokens == 0 + assert result.chunks_included == [] + + +def test_cli_compile_output(tmp_path): + shelf, chunks = _make_compiler_shelf(tmp_path) + (chunks / "soul.md").write_text("# Soul\n\nIdentity and mission.\n", encoding="utf-8") + (chunks / "rules.md").write_text("# Rules\n\nAlways be safe.\n", encoding="utf-8") + + result = runner.invoke(app, ["compile", "--shelf", str(shelf), "soul", "rules"]) + + assert result.exit_code == 0 + assert "Identity and mission." in result.output + assert "Always be safe." in result.output