Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions src/shelfai/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
split Split a large chunk into smaller pieces
dead-chunks Identify chunks never loaded at runtime
suggest Analyse an AGENT.md and recommend a chunking strategy
conditions Inspect conditional chunk loading rules
keywords List and inspect chunk classification keywords
priority Show chunk priorities and budget-aware selection
lint Check shelf health and chunk hygiene
Expand Down Expand Up @@ -2122,6 +2123,67 @@ def suggest_cmd(
console.print(f"[green]chunk.yaml written:[/green] {yaml_path}\n")


# ──────────────────────────────────────────────
# shelfai conditions
# ──────────────────────────────────────────────


@app.command("conditions")
def conditions_cmd(
shelf_path: str = typer.Argument("./shelf", help="Path to a shelf directory or chunk workspace"),
check: Optional[str] = typer.Option(None, "--check", help="Check whether a chunk would load"),
explain: bool = typer.Option(False, "--explain", help="Explain the loading decision"),
task_type: Optional[str] = typer.Option(None, "--task", help="Task type for evaluation"),
environment: Optional[str] = typer.Option(None, "--environment", help="Environment for evaluation"),
agent_id: Optional[str] = typer.Option(None, "--agent-id", help="Agent id for evaluation"),
custom: Optional[list[str]] = typer.Option(None, "--custom", help="Custom context field as key=value; repeatable"),
):
"""Inspect chunk loading conditions and evaluate loading decisions."""
from shelfai.core.conditions import ConditionalLoader, LoadContext

path = Path(shelf_path).resolve()
if not path.exists():
console.print(f"[red]Path not found: {path}[/red]")
raise typer.Exit(1)

loader = ConditionalLoader(str(path))
context = LoadContext(
task_type=task_type,
environment=environment,
agent_id=agent_id,
custom=_parse_custom_context(custom) or None,
)

if check is not None:
should_load = loader.should_load(check, context)
verdict = "[green]load[/green]" if should_load else "[yellow]skip[/yellow]"
console.print(f"{Path(check).stem}: {verdict}")
if explain:
console.print()
console.print(loader.explain(check, context))
return

table = Table(show_header=True, header_style="bold")
table.add_column("Chunk", style="cyan")
table.add_column("always_load")
table.add_column("load_when")
table.add_column("never_load_when")

for condition in loader.list_all_conditions():
table.add_row(
condition.chunk_id,
"true" if condition.always_load else "false",
_format_condition_map(condition.load_when),
_format_condition_map(condition.never_load_when),
)

console.print(table)
if explain:
console.print(
"\n[dim]Use --check <chunk_id> with --task / --environment / --custom to evaluate a specific chunk.[/dim]"
)


# ──────────────────────────────────────────────
# shelfai compact
# ──────────────────────────────────────────────
Expand Down Expand Up @@ -2342,6 +2404,28 @@ def version(
# ──────────────────────────────────────────────


def _parse_custom_context(values: list[str]) -> dict[str, str]:
custom: dict[str, str] = {}
for item in values or []:
if "=" not in item:
continue
key, value = item.split("=", 1)
key = key.strip()
if not key:
continue
custom[key] = value.strip()
return custom


def _format_condition_map(condition_map: dict[str, list[str]]) -> str:
if not condition_map:
return "-"
parts = []
for field, values in condition_map.items():
parts.append(f"{field}=[{', '.join(values)}]")
return "; ".join(parts)


def _display_session_report(report):
"""Pretty-print a session manager report."""

Expand Down
6 changes: 5 additions & 1 deletion src/shelfai/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""ShelfAI core modules."""

from shelfai.core.conditions import ConditionalLoader, LoadCondition, LoadContext
from shelfai.core.diff_report import ChunkDiff, ShelfDiffReport, compare_before_after, compare_shelves
from shelfai.core.priority import ChunkPriority, PriorityManager

__all__ = [
"ChunkDiff",
"ChunkPriority",
"ShelfDiffReport",
"ConditionalLoader",
"LoadCondition",
"LoadContext",
"PriorityManager",
"ShelfDiffReport",
"compare_before_after",
"compare_shelves",
]
274 changes: 274 additions & 0 deletions src/shelfai/core/conditions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional

import yaml

from shelfai.core.fileops import read_shelf_file


@dataclass
class LoadCondition:
chunk_id: str
load_when: dict[str, list[str]] = field(default_factory=dict)
never_load_when: dict[str, list[str]] = field(default_factory=dict)
always_load: bool = False


@dataclass
class LoadContext:
task_type: str = None
environment: str = None
agent_id: str = None
custom: dict = None


class ConditionalLoader:
"""Evaluate chunk loading conditions."""

def __init__(self, shelf_path: str):
self.shelf_path = Path(shelf_path).resolve()
if not self.shelf_path.exists():
raise FileNotFoundError(f"Shelf path not found: {self.shelf_path}")
self._chunk_root = self._resolve_chunk_root()
self._workspace_root = self._resolve_workspace_root()
self._conditions: dict[str, LoadCondition] = {}
self._parse_all()

def _parse_all(self):
"""Read conditions from chunk frontmatter and optional config files."""
file_conditions: dict[str, LoadCondition] = {}
if self._chunk_root.exists():
for chunk_path in sorted(self._chunk_root.glob("*.md")):
metadata = read_shelf_file(self._chunk_root, chunk_path.name).metadata
file_conditions[chunk_path.stem] = self._load_condition(chunk_path.stem, metadata)

config_conditions = self._load_config_conditions()
all_chunk_ids = sorted(set(file_conditions) | set(config_conditions))

for chunk_id in all_chunk_ids:
merged = self._merge_conditions(
chunk_id,
config_conditions.get(chunk_id),
file_conditions.get(chunk_id),
)
self._conditions[chunk_id] = merged

def should_load(self, chunk_id: str, context: LoadContext) -> bool:
"""
Evaluate whether a chunk should be loaded given context.
Logic:
1. If always_load -> True
2. If never_load_when matches -> False
3. If load_when specified and matches -> True
4. If load_when specified and doesn't match -> False
5. If no conditions -> True (default: load)
"""
condition = self.get_conditions(chunk_id)
if condition is None:
return True
if condition.always_load:
return True
if condition.never_load_when and self._matches(condition.never_load_when, context):
return False
if condition.load_when:
return self._matches(condition.load_when, context)
return True

def filter_chunks(self, chunk_ids: list[str], context: LoadContext) -> list[str]:
"""Filter a list of chunks based on conditions."""
filtered: list[str] = []
seen: set[str] = set()
for chunk_id in chunk_ids:
normalized = Path(chunk_id).stem
if normalized in seen:
continue
if self.should_load(normalized, context):
filtered.append(normalized)
seen.add(normalized)
return filtered

def get_conditions(self, chunk_id: str) -> Optional[LoadCondition]:
"""Get conditions for a specific chunk."""
return self._conditions.get(Path(chunk_id).stem)

def list_all_conditions(self) -> list[LoadCondition]:
"""List conditions for all chunks."""
return [self._conditions[key] for key in sorted(self._conditions)]

def explain(self, chunk_id: str, context: LoadContext) -> str:
"""Explain why a chunk would/wouldn't load in given context."""
condition = self.get_conditions(chunk_id)
if condition is None:
return f"{Path(chunk_id).stem} loads by default because no conditions were found."

if condition.always_load:
return f"{condition.chunk_id} loads because always_load is true."

if condition.never_load_when and self._matches(condition.never_load_when, context):
return (
f"{condition.chunk_id} does not load because never_load_when matches "
f"{self._describe_match(condition.never_load_when, context)}."
)

if condition.load_when:
if self._matches(condition.load_when, context):
return (
f"{condition.chunk_id} loads because load_when matches "
f"{self._describe_match(condition.load_when, context)}."
)
return (
f"{condition.chunk_id} does not load because load_when does not match "
f"{self._describe_context(context)}."
)

return f"{condition.chunk_id} loads by default because no load_when or never_load_when rules are set."

def _resolve_chunk_root(self) -> Path:
chunks_dir = self.shelf_path / "chunks"
if chunks_dir.exists() and any(chunks_dir.glob("*.md")):
return chunks_dir
if any(self.shelf_path.glob("*.md")):
return self.shelf_path
return self.shelf_path

def _resolve_workspace_root(self) -> Path:
candidates = [self.shelf_path, self.shelf_path.parent]
for candidate in candidates:
if not candidate.exists():
continue
if (
(candidate / "index.md").exists()
or (candidate / "shelf.config.yaml").exists()
or (candidate / ".chunk-defaults.yaml").exists()
or (candidate / "chunk.yaml").exists()
):
return candidate
return self.shelf_path

def _load_config_conditions(self) -> dict[str, LoadCondition]:
merged: dict[str, LoadCondition] = {}
for config_name in (".chunk-defaults.yaml", "chunk.yaml"):
config_path = self._workspace_root / config_name
if not config_path.exists():
continue

raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
if not isinstance(raw, dict):
continue

conditions = raw.get("conditions", {})
if not isinstance(conditions, dict):
continue

for chunk_id, data in conditions.items():
if not isinstance(data, dict):
continue
existing = merged.get(chunk_id)
merged[chunk_id] = self._merge_conditions(
chunk_id,
existing,
self._load_condition(chunk_id, data),
)

return merged

def _merge_conditions(
self,
chunk_id: str,
base: LoadCondition | None,
overlay: LoadCondition | None,
) -> LoadCondition:
if base is None and overlay is None:
return LoadCondition(chunk_id=chunk_id)
if base is None:
return LoadCondition(
chunk_id=chunk_id,
load_when=dict(overlay.load_when),
never_load_when=dict(overlay.never_load_when),
always_load=overlay.always_load,
)
if overlay is None:
return LoadCondition(
chunk_id=chunk_id,
load_when=dict(base.load_when),
never_load_when=dict(base.never_load_when),
always_load=base.always_load,
)

return LoadCondition(
chunk_id=chunk_id,
load_when={**base.load_when, **overlay.load_when},
never_load_when={**base.never_load_when, **overlay.never_load_when},
always_load=base.always_load or overlay.always_load,
)

def _load_condition(self, chunk_id: str, data: dict | None) -> LoadCondition:
if not isinstance(data, dict):
return LoadCondition(chunk_id=chunk_id)
return LoadCondition(
chunk_id=chunk_id,
load_when=self._normalize_condition_map(data.get("load_when")),
never_load_when=self._normalize_condition_map(data.get("never_load_when")),
always_load=bool(data.get("always_load", False)),
)

def _normalize_condition_map(self, raw: dict | None) -> dict[str, list[str]]:
if not isinstance(raw, dict):
return {}
normalized: dict[str, list[str]] = {}
for field, values in raw.items():
if values is None:
normalized[field] = []
elif isinstance(values, (list, tuple, set)):
normalized[field] = [str(value) for value in values]
else:
normalized[field] = [str(values)]
return normalized

def _matches(self, condition_map: dict[str, list[str]], context: LoadContext) -> bool:
for field, expected_values in condition_map.items():
actual_value = self._context_value(context, field)
if actual_value is None:
return False
if not self._value_matches(actual_value, expected_values):
return False
return True

def _context_value(self, context: LoadContext, field: str):
if field == "task_type":
return context.task_type
if field == "environment":
return context.environment
if field == "agent_id":
return context.agent_id
custom = context.custom or {}
return custom.get(field)

def _value_matches(self, actual_value, expected_values: list[str]) -> bool:
actual_values = actual_value if isinstance(actual_value, (list, tuple, set)) else [actual_value]
normalized_actual = {self._normalize_scalar(value) for value in actual_values}
normalized_expected = {self._normalize_scalar(value) for value in expected_values}
return bool(normalized_actual & normalized_expected)

def _normalize_scalar(self, value) -> str:
return str(value).strip().lower()

def _describe_match(self, condition_map: dict[str, list[str]], context: LoadContext) -> str:
parts = []
for field, expected_values in condition_map.items():
actual_value = self._context_value(context, field)
parts.append(f"{field}={actual_value!r} in {expected_values}")
return ", ".join(parts)

def _describe_context(self, context: LoadContext) -> str:
parts = []
for field in ("task_type", "environment", "agent_id"):
value = getattr(context, field)
if value is not None:
parts.append(f"{field}={value!r}")
for key, value in (context.custom or {}).items():
parts.append(f"{key}={value!r}")
return ", ".join(parts) if parts else "an empty context"
Loading
Loading