From 33706633fec3fab49925a894ddfeadbdf911f5c9 Mon Sep 17 00:00:00 2001 From: Noah Freedman Date: Tue, 31 Mar 2026 02:38:07 -0400 Subject: [PATCH] Add conditional chunk loading --- src/shelfai/cli/main.py | 84 ++++++++++ src/shelfai/core/__init__.py | 6 +- src/shelfai/core/conditions.py | 274 +++++++++++++++++++++++++++++++++ src/shelfai/hooks/base.py | 29 +++- tests/test_conditions.py | 198 ++++++++++++++++++++++++ 5 files changed, 584 insertions(+), 7 deletions(-) create mode 100644 src/shelfai/core/conditions.py create mode 100644 tests/test_conditions.py diff --git a/src/shelfai/cli/main.py b/src/shelfai/cli/main.py index d8408b2..e0c8927 100644 --- a/src/shelfai/cli/main.py +++ b/src/shelfai/cli/main.py @@ -22,6 +22,7 @@ split Split a large chunk into smaller pieces dead-chunks Identify chunks never loaded at runtime suggest Analyse an AGENT.md and recommend a chunking strategy + conditions Inspect conditional chunk loading rules keywords List and inspect chunk classification keywords priority Show chunk priorities and budget-aware selection lint Check shelf health and chunk hygiene @@ -2122,6 +2123,67 @@ def suggest_cmd( console.print(f"[green]chunk.yaml written:[/green] {yaml_path}\n") +# ────────────────────────────────────────────── +# shelfai conditions +# ────────────────────────────────────────────── + + +@app.command("conditions") +def conditions_cmd( + shelf_path: str = typer.Argument("./shelf", help="Path to a shelf directory or chunk workspace"), + check: Optional[str] = typer.Option(None, "--check", help="Check whether a chunk would load"), + explain: bool = typer.Option(False, "--explain", help="Explain the loading decision"), + task_type: Optional[str] = typer.Option(None, "--task", help="Task type for evaluation"), + environment: Optional[str] = typer.Option(None, "--environment", help="Environment for evaluation"), + agent_id: Optional[str] = typer.Option(None, "--agent-id", help="Agent id for evaluation"), + custom: Optional[list[str]] = typer.Option(None, "--custom", help="Custom context field as key=value; repeatable"), +): + """Inspect chunk loading conditions and evaluate loading decisions.""" + from shelfai.core.conditions import ConditionalLoader, LoadContext + + path = Path(shelf_path).resolve() + if not path.exists(): + console.print(f"[red]Path not found: {path}[/red]") + raise typer.Exit(1) + + loader = ConditionalLoader(str(path)) + context = LoadContext( + task_type=task_type, + environment=environment, + agent_id=agent_id, + custom=_parse_custom_context(custom) or None, + ) + + if check is not None: + should_load = loader.should_load(check, context) + verdict = "[green]load[/green]" if should_load else "[yellow]skip[/yellow]" + console.print(f"{Path(check).stem}: {verdict}") + if explain: + console.print() + console.print(loader.explain(check, context)) + return + + table = Table(show_header=True, header_style="bold") + table.add_column("Chunk", style="cyan") + table.add_column("always_load") + table.add_column("load_when") + table.add_column("never_load_when") + + for condition in loader.list_all_conditions(): + table.add_row( + condition.chunk_id, + "true" if condition.always_load else "false", + _format_condition_map(condition.load_when), + _format_condition_map(condition.never_load_when), + ) + + console.print(table) + if explain: + console.print( + "\n[dim]Use --check with --task / --environment / --custom to evaluate a specific chunk.[/dim]" + ) + + # ────────────────────────────────────────────── # shelfai compact # ────────────────────────────────────────────── @@ -2342,6 +2404,28 @@ def version( # ────────────────────────────────────────────── +def _parse_custom_context(values: list[str]) -> dict[str, str]: + custom: dict[str, str] = {} + for item in values or []: + if "=" not in item: + continue + key, value = item.split("=", 1) + key = key.strip() + if not key: + continue + custom[key] = value.strip() + return custom + + +def _format_condition_map(condition_map: dict[str, list[str]]) -> str: + if not condition_map: + return "-" + parts = [] + for field, values in condition_map.items(): + parts.append(f"{field}=[{', '.join(values)}]") + return "; ".join(parts) + + def _display_session_report(report): """Pretty-print a session manager report.""" diff --git a/src/shelfai/core/__init__.py b/src/shelfai/core/__init__.py index 1ba67dc..21ec5cd 100644 --- a/src/shelfai/core/__init__.py +++ b/src/shelfai/core/__init__.py @@ -1,13 +1,17 @@ """ShelfAI core modules.""" +from shelfai.core.conditions import ConditionalLoader, LoadCondition, LoadContext from shelfai.core.diff_report import ChunkDiff, ShelfDiffReport, compare_before_after, compare_shelves from shelfai.core.priority import ChunkPriority, PriorityManager __all__ = [ "ChunkDiff", "ChunkPriority", - "ShelfDiffReport", + "ConditionalLoader", + "LoadCondition", + "LoadContext", "PriorityManager", + "ShelfDiffReport", "compare_before_after", "compare_shelves", ] diff --git a/src/shelfai/core/conditions.py b/src/shelfai/core/conditions.py new file mode 100644 index 0000000..23fdc2d --- /dev/null +++ b/src/shelfai/core/conditions.py @@ -0,0 +1,274 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +import yaml + +from shelfai.core.fileops import read_shelf_file + + +@dataclass +class LoadCondition: + chunk_id: str + load_when: dict[str, list[str]] = field(default_factory=dict) + never_load_when: dict[str, list[str]] = field(default_factory=dict) + always_load: bool = False + + +@dataclass +class LoadContext: + task_type: str = None + environment: str = None + agent_id: str = None + custom: dict = None + + +class ConditionalLoader: + """Evaluate chunk loading conditions.""" + + def __init__(self, shelf_path: str): + self.shelf_path = Path(shelf_path).resolve() + if not self.shelf_path.exists(): + raise FileNotFoundError(f"Shelf path not found: {self.shelf_path}") + self._chunk_root = self._resolve_chunk_root() + self._workspace_root = self._resolve_workspace_root() + self._conditions: dict[str, LoadCondition] = {} + self._parse_all() + + def _parse_all(self): + """Read conditions from chunk frontmatter and optional config files.""" + file_conditions: dict[str, LoadCondition] = {} + if self._chunk_root.exists(): + for chunk_path in sorted(self._chunk_root.glob("*.md")): + metadata = read_shelf_file(self._chunk_root, chunk_path.name).metadata + file_conditions[chunk_path.stem] = self._load_condition(chunk_path.stem, metadata) + + config_conditions = self._load_config_conditions() + all_chunk_ids = sorted(set(file_conditions) | set(config_conditions)) + + for chunk_id in all_chunk_ids: + merged = self._merge_conditions( + chunk_id, + config_conditions.get(chunk_id), + file_conditions.get(chunk_id), + ) + self._conditions[chunk_id] = merged + + def should_load(self, chunk_id: str, context: LoadContext) -> bool: + """ + Evaluate whether a chunk should be loaded given context. + Logic: + 1. If always_load -> True + 2. If never_load_when matches -> False + 3. If load_when specified and matches -> True + 4. If load_when specified and doesn't match -> False + 5. If no conditions -> True (default: load) + """ + condition = self.get_conditions(chunk_id) + if condition is None: + return True + if condition.always_load: + return True + if condition.never_load_when and self._matches(condition.never_load_when, context): + return False + if condition.load_when: + return self._matches(condition.load_when, context) + return True + + def filter_chunks(self, chunk_ids: list[str], context: LoadContext) -> list[str]: + """Filter a list of chunks based on conditions.""" + filtered: list[str] = [] + seen: set[str] = set() + for chunk_id in chunk_ids: + normalized = Path(chunk_id).stem + if normalized in seen: + continue + if self.should_load(normalized, context): + filtered.append(normalized) + seen.add(normalized) + return filtered + + def get_conditions(self, chunk_id: str) -> Optional[LoadCondition]: + """Get conditions for a specific chunk.""" + return self._conditions.get(Path(chunk_id).stem) + + def list_all_conditions(self) -> list[LoadCondition]: + """List conditions for all chunks.""" + return [self._conditions[key] for key in sorted(self._conditions)] + + def explain(self, chunk_id: str, context: LoadContext) -> str: + """Explain why a chunk would/wouldn't load in given context.""" + condition = self.get_conditions(chunk_id) + if condition is None: + return f"{Path(chunk_id).stem} loads by default because no conditions were found." + + if condition.always_load: + return f"{condition.chunk_id} loads because always_load is true." + + if condition.never_load_when and self._matches(condition.never_load_when, context): + return ( + f"{condition.chunk_id} does not load because never_load_when matches " + f"{self._describe_match(condition.never_load_when, context)}." + ) + + if condition.load_when: + if self._matches(condition.load_when, context): + return ( + f"{condition.chunk_id} loads because load_when matches " + f"{self._describe_match(condition.load_when, context)}." + ) + return ( + f"{condition.chunk_id} does not load because load_when does not match " + f"{self._describe_context(context)}." + ) + + return f"{condition.chunk_id} loads by default because no load_when or never_load_when rules are set." + + def _resolve_chunk_root(self) -> Path: + chunks_dir = self.shelf_path / "chunks" + if chunks_dir.exists() and any(chunks_dir.glob("*.md")): + return chunks_dir + if any(self.shelf_path.glob("*.md")): + return self.shelf_path + return self.shelf_path + + def _resolve_workspace_root(self) -> Path: + candidates = [self.shelf_path, self.shelf_path.parent] + for candidate in candidates: + if not candidate.exists(): + continue + if ( + (candidate / "index.md").exists() + or (candidate / "shelf.config.yaml").exists() + or (candidate / ".chunk-defaults.yaml").exists() + or (candidate / "chunk.yaml").exists() + ): + return candidate + return self.shelf_path + + def _load_config_conditions(self) -> dict[str, LoadCondition]: + merged: dict[str, LoadCondition] = {} + for config_name in (".chunk-defaults.yaml", "chunk.yaml"): + config_path = self._workspace_root / config_name + if not config_path.exists(): + continue + + raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + if not isinstance(raw, dict): + continue + + conditions = raw.get("conditions", {}) + if not isinstance(conditions, dict): + continue + + for chunk_id, data in conditions.items(): + if not isinstance(data, dict): + continue + existing = merged.get(chunk_id) + merged[chunk_id] = self._merge_conditions( + chunk_id, + existing, + self._load_condition(chunk_id, data), + ) + + return merged + + def _merge_conditions( + self, + chunk_id: str, + base: LoadCondition | None, + overlay: LoadCondition | None, + ) -> LoadCondition: + if base is None and overlay is None: + return LoadCondition(chunk_id=chunk_id) + if base is None: + return LoadCondition( + chunk_id=chunk_id, + load_when=dict(overlay.load_when), + never_load_when=dict(overlay.never_load_when), + always_load=overlay.always_load, + ) + if overlay is None: + return LoadCondition( + chunk_id=chunk_id, + load_when=dict(base.load_when), + never_load_when=dict(base.never_load_when), + always_load=base.always_load, + ) + + return LoadCondition( + chunk_id=chunk_id, + load_when={**base.load_when, **overlay.load_when}, + never_load_when={**base.never_load_when, **overlay.never_load_when}, + always_load=base.always_load or overlay.always_load, + ) + + def _load_condition(self, chunk_id: str, data: dict | None) -> LoadCondition: + if not isinstance(data, dict): + return LoadCondition(chunk_id=chunk_id) + return LoadCondition( + chunk_id=chunk_id, + load_when=self._normalize_condition_map(data.get("load_when")), + never_load_when=self._normalize_condition_map(data.get("never_load_when")), + always_load=bool(data.get("always_load", False)), + ) + + def _normalize_condition_map(self, raw: dict | None) -> dict[str, list[str]]: + if not isinstance(raw, dict): + return {} + normalized: dict[str, list[str]] = {} + for field, values in raw.items(): + if values is None: + normalized[field] = [] + elif isinstance(values, (list, tuple, set)): + normalized[field] = [str(value) for value in values] + else: + normalized[field] = [str(values)] + return normalized + + def _matches(self, condition_map: dict[str, list[str]], context: LoadContext) -> bool: + for field, expected_values in condition_map.items(): + actual_value = self._context_value(context, field) + if actual_value is None: + return False + if not self._value_matches(actual_value, expected_values): + return False + return True + + def _context_value(self, context: LoadContext, field: str): + if field == "task_type": + return context.task_type + if field == "environment": + return context.environment + if field == "agent_id": + return context.agent_id + custom = context.custom or {} + return custom.get(field) + + def _value_matches(self, actual_value, expected_values: list[str]) -> bool: + actual_values = actual_value if isinstance(actual_value, (list, tuple, set)) else [actual_value] + normalized_actual = {self._normalize_scalar(value) for value in actual_values} + normalized_expected = {self._normalize_scalar(value) for value in expected_values} + return bool(normalized_actual & normalized_expected) + + def _normalize_scalar(self, value) -> str: + return str(value).strip().lower() + + def _describe_match(self, condition_map: dict[str, list[str]], context: LoadContext) -> str: + parts = [] + for field, expected_values in condition_map.items(): + actual_value = self._context_value(context, field) + parts.append(f"{field}={actual_value!r} in {expected_values}") + return ", ".join(parts) + + def _describe_context(self, context: LoadContext) -> str: + parts = [] + for field in ("task_type", "environment", "agent_id"): + value = getattr(context, field) + if value is not None: + parts.append(f"{field}={value!r}") + for key, value in (context.custom or {}).items(): + parts.append(f"{key}={value!r}") + return ", ".join(parts) if parts else "an empty context" diff --git a/src/shelfai/hooks/base.py b/src/shelfai/hooks/base.py index 1e9586d..5527d38 100644 --- a/src/shelfai/hooks/base.py +++ b/src/shelfai/hooks/base.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Optional +from shelfai.core.conditions import ConditionalLoader, LoadContext from shelfai.core.fileops import estimate_tokens from shelfai.core.learn import log_load @@ -18,14 +19,15 @@ def __init__(self, agent_id: str, shelf_path: str = "./shelf/", learn_db: str = if not self.shelf_path.exists(): raise FileNotFoundError(f"Shelf path not found: {self.shelf_path}") self.learn_db = Path(learn_db).resolve() if learn_db else None + self._conditional_loader = ConditionalLoader(str(self.shelf_path)) @property def chunk_root(self) -> Path: - if any(self.shelf_path.glob("*.md")): - return self.shelf_path chunks_dir = self.shelf_path / "chunks" - if chunks_dir.exists(): + if chunks_dir.exists() and any(chunks_dir.glob("*.md")): return chunks_dir + if any(self.shelf_path.glob("*.md")): + return self.shelf_path return self.shelf_path def pre_task(self, task_type: str, context: dict = None) -> list[str]: @@ -47,10 +49,11 @@ def pre_task(self, task_type: str, context: dict = None) -> list[str]: task_type=task_type, ) + load_context = self._build_load_context(task_type=task_type, context=context) candidates = self._historical_chunk_ranking(task_type=task_type) - if candidates: - return candidates - return self.list_chunks() + if not candidates: + candidates = self.list_chunks() + return self._conditional_loader.filter_chunks(candidates, load_context) def load_chunks(self, chunk_ids: list[str]) -> str: """ @@ -152,3 +155,17 @@ def _extract_token_budget(self, context: dict = None) -> Optional[int]: except (TypeError, ValueError): continue return None + + def _build_load_context(self, task_type: str, context: dict = None) -> LoadContext: + context = context or {} + custom = { + key: value + for key, value in context.items() + if key not in {"task_type", "environment", "agent_id"} + } + return LoadContext( + task_type=task_type or context.get("task_type"), + environment=context.get("environment"), + agent_id=context.get("agent_id", self.agent_id), + custom=custom or None, + ) diff --git a/tests/test_conditions.py b/tests/test_conditions.py new file mode 100644 index 0000000..bf20bee --- /dev/null +++ b/tests/test_conditions.py @@ -0,0 +1,198 @@ +"""Tests for conditional chunk loading.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Optional + +import yaml +from typer.testing import CliRunner + +from shelfai.cli.main import app +from shelfai.core.conditions import ConditionalLoader, LoadContext +from shelfai.hooks import ShelfHook + + +runner = CliRunner() + + +def _write_chunk( + chunk_root: Path, + name: str, + body: str = "Body.\n", + metadata: Optional[dict] = None, +) -> Path: + path = chunk_root / f"{name}.md" + if metadata: + meta = yaml.safe_dump(metadata, sort_keys=False).strip() + path.write_text(f"---\n{meta}\n---\n\n{body}", encoding="utf-8") + else: + path.write_text(body, encoding="utf-8") + return path + + +def _build_shelf(tmp_path: Path) -> tuple[Path, Path]: + shelf = tmp_path / "shelf" + chunk_root = shelf / "chunks" + chunk_root.mkdir(parents=True) + (shelf / "index.md").write_text("# Index\n", encoding="utf-8") + return shelf, chunk_root + + +def test_always_load(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "rules", metadata={"always_load": True}) + + loader = ConditionalLoader(str(shelf)) + + assert loader.should_load("rules", LoadContext(task_type="documentation")) is True + + +def test_load_when_matches(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "rules", metadata={"load_when": {"task_type": ["code_review", "refactor"]}}) + + loader = ConditionalLoader(str(shelf)) + + assert loader.should_load("rules", LoadContext(task_type="code_review")) is True + + +def test_load_when_no_match(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "rules", metadata={"load_when": {"task_type": ["code_review"]}}) + + loader = ConditionalLoader(str(shelf)) + + assert loader.should_load("rules", LoadContext(task_type="documentation")) is False + + +def test_never_load_when_matches(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "rules", metadata={"never_load_when": {"task_type": ["documentation"]}}) + + loader = ConditionalLoader(str(shelf)) + + assert loader.should_load("rules", LoadContext(task_type="documentation")) is False + + +def test_never_overrides_load(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk( + chunk_root, + "rules", + metadata={ + "load_when": {"task_type": ["documentation"]}, + "never_load_when": {"task_type": ["documentation"]}, + }, + ) + + loader = ConditionalLoader(str(shelf)) + + assert loader.should_load("rules", LoadContext(task_type="documentation")) is False + + +def test_no_conditions_default_load(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "rules") + + loader = ConditionalLoader(str(shelf)) + + assert loader.should_load("rules", LoadContext(task_type="documentation")) is True + + +def test_filter_chunks(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "allowed", metadata={"load_when": {"task_type": ["code_review"]}}) + _write_chunk(chunk_root, "blocked", metadata={"never_load_when": {"task_type": ["code_review"]}}) + _write_chunk(chunk_root, "open") + + loader = ConditionalLoader(str(shelf)) + + assert loader.filter_chunks(["allowed", "blocked", "open"], LoadContext(task_type="code_review")) == [ + "allowed", + "open", + ] + + +def test_environment_condition(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "deploy", metadata={"load_when": {"environment": ["production"]}}) + + loader = ConditionalLoader(str(shelf)) + + assert loader.should_load("deploy", LoadContext(environment="production")) is True + assert loader.should_load("deploy", LoadContext(environment="staging")) is False + + +def test_custom_conditions(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "policy", metadata={"load_when": {"priority": ["urgent"]}}) + + loader = ConditionalLoader(str(shelf)) + + assert loader.should_load("policy", LoadContext(custom={"priority": "urgent"})) is True + assert loader.should_load("policy", LoadContext(custom={"priority": "low"})) is False + + +def test_explain_output(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "policy", metadata={"load_when": {"task_type": ["code_review"]}}) + + loader = ConditionalLoader(str(shelf)) + explanation = loader.explain("policy", LoadContext(task_type="documentation")) + + assert "does not load" in explanation + assert "load_when" in explanation + + +def test_config_conditions_are_loaded(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "deploy") + (shelf / "chunk.yaml").write_text( + "conditions:\n" + " deploy:\n" + " load_when:\n" + " task_type:\n" + " - code_review\n", + encoding="utf-8", + ) + + loader = ConditionalLoader(str(shelf)) + + assert loader.should_load("deploy", LoadContext(task_type="code_review")) is True + + +def test_hook_pre_task_applies_conditions(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "allowed", metadata={"load_when": {"task_type": ["code_review"]}}) + _write_chunk(chunk_root, "blocked", metadata={"never_load_when": {"task_type": ["code_review"]}}) + _write_chunk(chunk_root, "open") + + hook = ShelfHook(agent_id="bot", shelf_path=str(chunk_root)) + + assert hook.pre_task("code_review") == ["allowed", "open"] + + +def test_cli_conditions_list(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "allowed", metadata={"load_when": {"task_type": ["code_review"]}}) + _write_chunk(chunk_root, "blocked", metadata={"never_load_when": {"task_type": ["documentation"]}}) + + result = runner.invoke(app, ["conditions", str(shelf)]) + + assert result.exit_code == 0 + assert "allowed" in result.output + assert "blocked" in result.output + assert "load_when" in result.output + + +def test_cli_conditions_check_and_explain(tmp_path): + shelf, chunk_root = _build_shelf(tmp_path) + _write_chunk(chunk_root, "policy", metadata={"load_when": {"task_type": ["code_review"]}}) + + result = runner.invoke(app, ["conditions", str(shelf), "--check", "policy", "--task", "documentation", "--explain"]) + + assert result.exit_code == 0 + assert "policy:" in result.output + assert "skip" in result.output + assert "does not load" in result.output